diff --git a/server/src/engine/storage/v2/raw/journal/raw/mod.rs b/server/src/engine/storage/v2/raw/journal/raw/mod.rs index fee98a15..1a837d12 100644 --- a/server/src/engine/storage/v2/raw/journal/raw/mod.rs +++ b/server/src/engine/storage/v2/raw/journal/raw/mod.rs @@ -1054,11 +1054,11 @@ impl RawJournalReader { return Ok(true); } self.state = JournalState::AwaitingReopen; + jtrace_reader!(DriverEventExpectingReopenBlock); return self.handle_reopen(); } fn handle_reopen(&mut self) -> RuntimeResult { jtrace_reader!(AttemptingEvent(self.txn_id as u64)); - jtrace_reader!(DriverEventExpectingReopenBlock); // now we must look for a reopen event let event_block = self.tr.read_block::<{ DriverEvent::FULL_EVENT_SIZE }>()?; let reopen_event = match DriverEvent::decode(event_block) { diff --git a/server/src/engine/storage/v2/raw/journal/raw/tests/recovery.rs b/server/src/engine/storage/v2/raw/journal/raw/tests/recovery.rs index b161af07..eb32c16c 100644 --- a/server/src/engine/storage/v2/raw/journal/raw/tests/recovery.rs +++ b/server/src/engine/storage/v2/raw/journal/raw/tests/recovery.rs @@ -34,8 +34,8 @@ use { v2::raw::journal::{ create_journal, open_journal, raw::{ - obtain_offsets, obtain_trace, DriverEvent, JournalReaderTraceEvent, - RawJournalWriter, + obtain_offsets, obtain_trace, DriverEvent, DriverEventKind, + JournalReaderTraceEvent, JournalWriterTraceEvent, RawJournalWriter, }, repair_journal, JournalRepairMode, JournalSettings, RepairResult, }, @@ -47,15 +47,84 @@ use { std::io::ErrorKind as IoErrorKind, }; +type Initializer = (&'static str, fn(&str) -> RuntimeResult); + fn create_trimmed_file(from: &str, to: &str, trim_to: u64) -> IoResult<()> { FileSystem::copy(from, to)?; let mut f = File::open(to)?; f.f_truncate(trim_to) } +fn emulate_corrupted_final_event( + initializers: impl IntoIterator, + post_corruption_handler: impl Fn(&str, u64, usize, RuntimeResult>), + post_repair_handler: impl Fn( + &str, + usize, + RuntimeResult, + RuntimeResult>, + ), +) { + for (journal_id, initializer) in initializers { + // initialize journal, get size and clear traces + let repaired_last_event_id = match initializer(journal_id) { + Ok(nid) => nid, + Err(e) => panic!( + "failed to initialize {journal_id} due to {e}. trace: {:?}, file_data={:?}", + obtain_trace(), + FileSystem::read(journal_id), + ), + }; + let journal_size = File::open(journal_id).unwrap().f_len().unwrap(); + let _ = obtain_trace(); + let _ = obtain_offsets(); + // now trim and repeat + for (trim_size, new_size) in (1..=DriverEvent::FULL_EVENT_SIZE) + .rev() + .map(|trim_size| (trim_size, journal_size - trim_size as u64)) + { + // create a copy of the "good" journal and trim to simulate data loss + let trimmed_journal_path = format!("{journal_id}-trimmed-{trim_size}.db"); + create_trimmed_file(journal_id, &trimmed_journal_path, new_size).unwrap(); + // init misc + let simple_db = SimpleDB::new(); + let open_journal_fn = || { + open_journal::( + &trimmed_journal_path, + &simple_db, + JournalSettings::default(), + ) + }; + // now let the caller handle any post corruption work + let open_journal_result = open_journal_fn(); + post_corruption_handler( + journal_id, + repaired_last_event_id, + trim_size, + open_journal_result, + ); + // repair + let repair_result = repair_journal::( + &trimmed_journal_path, + &simple_db, + JournalSettings::default(), + JournalRepairMode::Simple, + ); + let repaired_journal_reopen_result = open_journal_fn(); + // let caller handle any post repair work + post_repair_handler( + journal_id, + trim_size, + repair_result, + repaired_journal_reopen_result, + ); + } + } +} + #[test] -fn corruption_at_close() { - let initializers: Vec<(&'static str, fn(&str) -> RuntimeResult)> = vec![ +fn corruption_before_close() { + let initializers: Vec = vec![ // open and close ("close_event_corruption_empty.db", |jrnl_id| { let mut jrnl = create_journal::(jrnl_id)?; @@ -97,42 +166,15 @@ fn corruption_at_close() { }, ), ]; - for (journal_id, initializer) in initializers { - // initialize journal, get size and clear traces - let close_event_id = match initializer(journal_id) { - Ok(nid) => nid, - Err(e) => panic!( - "failed to initialize {journal_id} due to {e}. trace: {:?}, file_data={:?}", - obtain_trace(), - FileSystem::read(journal_id), - ), - }; - let journal_size = File::open(journal_id).unwrap().f_len().unwrap(); - let _ = obtain_trace(); - let _ = obtain_offsets(); - // now trim and repeat - for (trim_size, new_size) in (1..=DriverEvent::FULL_EVENT_SIZE) - .rev() - .map(|trim_size| (trim_size, journal_size - trim_size as u64)) - { - // create a copy of the "good" journal and trim to simulate data loss - let trimmed_journal_path = format!("{journal_id}-trimmed-{trim_size}.db"); - create_trimmed_file(journal_id, &trimmed_journal_path, new_size).unwrap(); - // init misc - let simple_db = SimpleDB::new(); - let open_journal_fn = || { - open_journal::( - &trimmed_journal_path, - &simple_db, - JournalSettings::default(), - ) - }; + emulate_corrupted_final_event( + initializers, + |journal_id, repaired_last_event_id, trim_size, open_result| { // open the journal and validate failure - let open_err = open_journal_fn().unwrap_err(); + let open_err = open_result.unwrap_err(); let trace = obtain_trace(); if trim_size > (DriverEvent::FULL_EVENT_SIZE - (sizeof!(u128) + sizeof!(u64))) { // the amount of trim from the end of the file causes us to lose valuable metadata - if close_event_id == 0 { + if repaired_last_event_id == 0 { // empty log assert_eq!( trace, @@ -151,14 +193,14 @@ fn corruption_at_close() { } } else { // the amount of trim still allows us to read some metadata - if close_event_id == 0 { + if repaired_last_event_id == 0 { // empty log assert_eq!( trace, intovec![ JournalReaderTraceEvent::Initialized, JournalReaderTraceEvent::LookingForEvent, - JournalReaderTraceEvent::AttemptingEvent(close_event_id), + JournalReaderTraceEvent::AttemptingEvent(repaired_last_event_id), JournalReaderTraceEvent::DriverEventExpectingClose, ], "failed at trim_size {trim_size} for journal {journal_id}" @@ -168,7 +210,7 @@ fn corruption_at_close() { &trace[trace.len() - 3..], &into_array![ JournalReaderTraceEvent::LookingForEvent, - JournalReaderTraceEvent::AttemptingEvent(close_event_id), + JournalReaderTraceEvent::AttemptingEvent(repaired_last_event_id), JournalReaderTraceEvent::DriverEventExpectingClose ], "failed at trim_size {trim_size} for journal {journal_id}" @@ -180,24 +222,112 @@ fn corruption_at_close() { &ErrorKind::IoError(IoErrorKind::UnexpectedEof.into()), "failed at trim_size {trim_size} for journal {journal_id}" ); - // now repair this log + }, + |journal_id, trim_size, repair_result, reopen_result| { assert_eq!( - repair_journal::( - &trimmed_journal_path, - &simple_db, - JournalSettings::default(), - JournalRepairMode::Simple, - ) - .unwrap(), + repair_result.unwrap(), RepairResult::UnspecifiedLoss((DriverEvent::FULL_EVENT_SIZE - trim_size) as _), "failed at trim_size {trim_size} for journal {journal_id}" ); + let mut jrnl = reopen_result.unwrap(); // now reopen log and ensure it's repaired - let mut jrnl = open_journal_fn().unwrap(); RawJournalWriter::close_driver(&mut jrnl).unwrap(); // clear trace let _ = obtain_trace(); let _ = obtain_offsets(); - } - } + }, + ) +} + +#[test] +fn corruption_after_reopen() { + let initializers: Vec = vec![("corruption_after_reopen.db", |jrnl_id| { + let mut jrnl = create_journal::(jrnl_id)?; + RawJournalWriter::close_driver(&mut jrnl)?; + drop(jrnl); + // reopen, but don't close + open_journal::(jrnl_id, &SimpleDB::new(), JournalSettings::default())?; + Ok(1) + })]; + emulate_corrupted_final_event( + initializers, + |journal_id, repaired_last_event_id, trim_size, open_result| { + let trace = obtain_trace(); + if trim_size == DriverEvent::FULL_EVENT_SIZE { + /* + IMPORTANT IFFY SITUATION: undetectable error. if an entire "correct" part of the log vanishes, it's not going to be detected. + while possible in theory, it's going to have to be one heck of a coincidence for it to happen in practice. the only way to work + around this is to use a secondary checksum. I'm not a fan of that approach either (and I don't even consider it to be a good mitigation) + because it can potentially violate consistency, conflicting the source of truth. for example: if we have a database crash, should we trust + the checksum file or the log? guarding that further requires an enormous amount of effort and it will still have holes and ironically, + will potentially introduce more bugs due to increased complexity. Get a good filesystem and disk controller (that attaches checksums to sectors)! + -- @ohsayan + */ + let mut jrnl = + open_result.expect(&format!("failed at {trim_size} for journal {journal_id}")); + assert_eq!( + trace, + intovec![ + JournalReaderTraceEvent::Initialized, + JournalReaderTraceEvent::LookingForEvent, + JournalReaderTraceEvent::AttemptingEvent(0), + JournalReaderTraceEvent::DriverEventExpectingClose, + JournalReaderTraceEvent::DriverEventCompletedBlockRead, + JournalReaderTraceEvent::DriverEventExpectedCloseGotClose, + JournalReaderTraceEvent::ClosedAndReachedEof, + JournalReaderTraceEvent::Completed, + JournalWriterTraceEvent::ReinitializeAttempt, + JournalWriterTraceEvent::DriverEventAttemptCommit { + event: DriverEventKind::Reopened, + event_id: repaired_last_event_id, + prev_id: 0 + }, + JournalWriterTraceEvent::DriverEventCompleted, + JournalWriterTraceEvent::ReinitializeComplete, + ], + "failed at {trim_size} for journal {journal_id}" + ); + // now close this so that this works with the post repair handler + RawJournalWriter::close_driver(&mut jrnl).unwrap(); + let _ = obtain_offsets(); + let _ = obtain_trace(); + } else { + assert_eq!( + open_result.unwrap_err().kind(), + &ErrorKind::IoError(IoErrorKind::UnexpectedEof.into()) + ); + assert_eq!( + trace, + intovec![ + JournalReaderTraceEvent::Initialized, + JournalReaderTraceEvent::LookingForEvent, + JournalReaderTraceEvent::AttemptingEvent(0), + JournalReaderTraceEvent::DriverEventExpectingClose, + JournalReaderTraceEvent::DriverEventCompletedBlockRead, + JournalReaderTraceEvent::DriverEventExpectedCloseGotClose, + JournalReaderTraceEvent::DriverEventExpectingReopenBlock, + JournalReaderTraceEvent::AttemptingEvent(repaired_last_event_id) + ] + ); + } + }, + |journal_id, trim_size, repair_result, reopen_result| { + assert!(reopen_result.is_ok()); + if trim_size == DriverEvent::FULL_EVENT_SIZE { + // see earlier comment + assert_eq!( + repair_result.unwrap(), + RepairResult::NoErrors, + "failed at {trim_size} for journal {journal_id}" + ); + } else { + assert_eq!( + repair_result.unwrap(), + RepairResult::LostBytes((DriverEvent::FULL_EVENT_SIZE - trim_size) as u64) + ); + } + let _ = obtain_trace(); + let _ = obtain_offsets(); + }, + ) }