diff --git a/server/src/engine/storage/v2/raw/journal/raw/tests/mod.rs b/server/src/engine/storage/v2/raw/journal/raw/tests/mod.rs index 21ba37e5..e21c755d 100644 --- a/server/src/engine/storage/v2/raw/journal/raw/tests/mod.rs +++ b/server/src/engine/storage/v2/raw/journal/raw/tests/mod.rs @@ -34,12 +34,17 @@ use { }, crate::engine::{ error::StorageError, - storage::{common::sdss::sdss_r1::rw::TrackedReader, v2::raw::spec::SystemDatabaseV1}, + storage::{ + common::{checksum::SCrc64, sdss::sdss_r1::rw::TrackedReader}, + v2::raw::spec::SystemDatabaseV1, + }, RuntimeResult, }, std::cell::RefCell, }; +const SANE_MEM_LIMIT_BYTES: usize = 2048; + /* impls for journal tests */ @@ -103,8 +108,14 @@ macro_rules! impl_db_event { impl_db_event!( DbEventPush<'_> as 0 => |me, buf| { - buf.extend(&(me.0.len() as u64).to_le_bytes()); - buf.extend(me.0.as_bytes()); + let length_bytes = (me.0.len() as u64).to_le_bytes(); + let me_bytes = me.0.as_bytes(); + let mut checksum = SCrc64::new(); + checksum.update(&length_bytes); + checksum.update(&me_bytes); + buf.extend(&(checksum.finish().to_le_bytes())); // checksum + buf.extend(&(me.0.len() as u64).to_le_bytes()); // length + buf.extend(me.0.as_bytes()); // payload }, DbEventPop as 1, DbEventClear as 2 @@ -161,19 +172,25 @@ impl RawJournalAdapter for SimpleDBJournal { ) -> RuntimeResult<()> { match meta { EventMeta::NewKey => { - let key_size = u64::from_le_bytes(file.read_block()?) as usize; - let mut keybuf = Vec::::new(); - if keybuf.try_reserve_exact(key_size as usize).is_err() { + let checksum = u64::from_le_bytes(file.read_block()?); + let length = u64::from_le_bytes(file.read_block()?) as usize; + let mut payload = Vec::::new(); + if length > SANE_MEM_LIMIT_BYTES + || payload.try_reserve_exact(length as usize).is_err() + { return Err(StorageError::RawJournalDecodeEventCorruptedPayload.into()); } unsafe { - keybuf.as_mut_ptr().write_bytes(0, key_size); - keybuf.set_len(key_size); + payload.as_mut_ptr().write_bytes(0, length); + payload.set_len(length); } - file.tracked_read(&mut keybuf)?; - match String::from_utf8(keybuf) { - Ok(k) => gs.data.borrow_mut().push(k), - Err(_) => { + file.tracked_read(&mut payload)?; + let mut this_checksum = SCrc64::new(); + this_checksum.update(&length.to_le_bytes()); + this_checksum.update(&payload); + match String::from_utf8(payload) { + Ok(k) if this_checksum.finish() == checksum => gs.data.borrow_mut().push(k), + Err(_) | Ok(_) => { return Err(StorageError::RawJournalDecodeEventCorruptedPayload.into()) } } diff --git a/server/src/engine/storage/v2/raw/journal/raw/tests/recovery.rs b/server/src/engine/storage/v2/raw/journal/raw/tests/recovery.rs index 0af5188e..8eb527e4 100644 --- a/server/src/engine/storage/v2/raw/journal/raw/tests/recovery.rs +++ b/server/src/engine/storage/v2/raw/journal/raw/tests/recovery.rs @@ -52,9 +52,14 @@ use { std::{collections::BTreeMap, io::ErrorKind as IoErrorKind, ops::Range}, }; +/// the number of events that we want to usually emulate for const TRIALS: usize = 100; +/// See impl of [`apply_event_mix`]. We remove every 10th element const POST_TRIALS_SIZE: usize = TRIALS - (TRIALS / 10); +/// a test key for single events +const KEY: &str = concat!("1234567890-0987654321"); +/// The initializer for a corruption test case struct Initializer { journal_id: &'static str, initializer_fn: fn(&str) -> RuntimeResult, @@ -62,6 +67,7 @@ struct Initializer { } #[derive(Debug)] +/// Information about the modified journal generated by an [`Initializer`] struct ModifiedJournalInfo { init: InitializerInfo, _storage: ModifiedJournalStorageInfo, @@ -83,12 +89,14 @@ impl ModifiedJournalInfo { } #[derive(Debug, Clone, Copy)] +/// Information about the initial state of a "good journal". Generated from [`Initializer`] struct InitializerInfo { corrupted_event_id: u64, last_executed_event_id: u64, } impl InitializerInfo { + /// The initializer only creates one event fn new_last_event(last_event_id: u64) -> Self { Self::new(last_event_id, last_event_id) } @@ -98,6 +106,7 @@ impl InitializerInfo { last_executed_event_id, } } + /// Returns true if the initializer created multiple events (and not a single event) fn not_last_event(&self) -> bool { self.corrupted_event_id != self.last_executed_event_id } @@ -120,8 +129,12 @@ impl Initializer { } } +fn make_corrupted_file_name(journal_id: &str, trim_size: usize) -> String { + format!("{journal_id}-trimmed-{trim_size}.db") +} + #[derive(Debug)] -#[allow(unused)] +/// Information about the layout of the modified journal struct ModifiedJournalStorageInfo { original_file_size: usize, modified_file_size: usize, @@ -142,6 +155,13 @@ impl ModifiedJournalStorageInfo { } } +/** + Emulate a sequentially varying corruption. + - The initializer creates a modified journal and provides information about it + - We go over each initializer and then enumerate a bunch of corruption test cases. + - Generally, we take the size of the event, n, (it isn't necessary that it's a static size but + should atleast be computable/traced somehow) and then shave off 1 bit, followed by upto n bytes +*/ fn emulate_sequentially_varying_single_corruption( initializers: impl IntoIterator, modified_journal_generator_fn: impl Fn( @@ -190,7 +210,7 @@ fn emulate_sequentially_varying_single_corruption( // now trim and repeat for trim_size in 1..=last_event_size { // create a copy of the "good" journal and corrupt it - let corrupted_journal_path = format!("{journal_id}-trimmed-{trim_size}.db"); + let corrupted_journal_path = make_corrupted_file_name(journal_id, trim_size); let open_journal_fn = |db: &SimpleDB| { open_journal::( &corrupted_journal_path, @@ -207,6 +227,11 @@ fn emulate_sequentially_varying_single_corruption( &original_offsets, ) .unwrap(); + assert_ne!(storage_info.corruption_range.len(), 0); + assert_ne!( + storage_info.modified_file_size, + storage_info.original_file_size + ); let modified_journal_info = ModifiedJournalInfo::new(initializer_info, storage_info, initializer_id); // now let the caller handle any post corruption work @@ -245,10 +270,15 @@ fn emulate_sequentially_varying_single_corruption( repaired_journal_reopen_result, ); } + // we're done, delete the corrupted journal + FileSystem::remove_file(&corrupted_journal_path).unwrap(); } + // delete the good journal, we're done with this one as well + FileSystem::remove_file(journal_id).unwrap(); } } +/// In this emulation, we sequentially corrupt the last event across multiple trials fn emulate_final_event_corruption( initializers: impl IntoIterator, post_corruption_handler: impl Fn( @@ -285,6 +315,7 @@ fn emulate_final_event_corruption( ) } +/// In this emulation, we sequentially corrupt an intermediary event across multiple trials fn emulate_midway_corruption( initializers: impl IntoIterator, post_corruption_handler: impl Fn( @@ -341,10 +372,14 @@ fn emulate_midway_corruption( ) } +/// Format a key as a string (padded to six bytes) fn keyfmt(num: usize) -> String { format!("key-{num:06}") } +/// Apply an event mix +/// - Add [`TRIALS`] count of elements +/// - Remove every 10th element fn apply_event_mix(jrnl: &mut RawJournalWriter) -> RuntimeResult { let mut op_count = 0; let mut sdb = SimpleDB::new(); @@ -362,22 +397,34 @@ fn apply_event_mix(jrnl: &mut RawJournalWriter) -> RuntimeResul #[test] fn corruption_before_close() { - let initializers = vec![ - // open and close - Initializer::new_driver_type("close_event_corruption_empty.db", |jrnl_id| { - let mut jrnl = create_journal::(jrnl_id)?; - RawJournalWriter::close_driver(&mut jrnl)?; - Ok(InitializerInfo::new_last_event(0)) - }), - // open, apply mix of events, close - Initializer::new_driver_type("close_event_corruption.db", |jrnl_id| { - let mut jrnl = create_journal::(jrnl_id)?; - let operation_count = apply_event_mix(&mut jrnl)?; - RawJournalWriter::close_driver(&mut jrnl)?; - Ok(InitializerInfo::new_last_event(operation_count)) - }), - // open, close, reinit, close + let initializers = [ Initializer::new_driver_type( + /* + in this case we: create, close (0), corrupt close (0) + */ + "close_event_corruption_empty.db", + |jrnl_id| { + let mut jrnl = create_journal::(jrnl_id)?; + RawJournalWriter::close_driver(&mut jrnl)?; + Ok(InitializerInfo::new_last_event(0)) + }, + ), + Initializer::new_driver_type( + /* + in this case we: create, apply events ([0,99]), close (100). corrupt close (100). expect no data loss. + */ + "close_event_corruption.db", + |jrnl_id| { + let mut jrnl = create_journal::(jrnl_id)?; + let operation_count = apply_event_mix(&mut jrnl)?; + RawJournalWriter::close_driver(&mut jrnl)?; + Ok(InitializerInfo::new_last_event(operation_count)) + }, + ), + Initializer::new_driver_type( + /* + in this case we: create, close (0), reopen(1), close(2). corrupt last close (2) + */ "close_event_corruption_open_close_open_close.db", |jrnl_id| { // open and close @@ -408,7 +455,7 @@ fn corruption_before_close() { assert_eq!( db.data().len(), 0, - "failed at {trim_size} for journal {journal_id}" + "failed at trim_size {trim_size} for journal {journal_id}" ); assert_eq!( trace, @@ -424,18 +471,18 @@ fn corruption_before_close() { assert_eq!( db.data().len(), POST_TRIALS_SIZE, - "failed at {trim_size} for journal {journal_id}" + "failed at trim_size {trim_size} for journal {journal_id}" ); assert_eq!( *db.data().last().unwrap(), keyfmt(TRIALS - 1), - "failed at {trim_size} for journal {journal_id}" + "failed at trim_size {trim_size} for journal {journal_id}" ); } else { assert_eq!( db.data().len(), 0, - "failed at {trim_size} for journal {journal_id}" + "failed at trim_size {trim_size} for journal {journal_id}" ); } assert_eq!( @@ -451,7 +498,7 @@ fn corruption_before_close() { assert_eq!( db.data().len(), 0, - "failed at {trim_size} for journal {journal_id}" + "failed at trim_size {trim_size} for journal {journal_id}" ); assert_eq!( trace, @@ -471,18 +518,18 @@ fn corruption_before_close() { assert_eq!( db.data().len(), POST_TRIALS_SIZE, - "failed at {trim_size} for journal {journal_id}" + "failed at trim_size {trim_size} for journal {journal_id}" ); assert_eq!( *db.data().last().unwrap(), keyfmt(TRIALS - 1), - "failed at {trim_size} for journal {journal_id}" + "failed at trim_size {trim_size} for journal {journal_id}" ); } else { assert_eq!( db.data().len(), 0, - "failed at {trim_size} for journal {journal_id}" + "failed at trim_size {trim_size} for journal {journal_id}" ); } assert_eq!( @@ -506,7 +553,9 @@ fn corruption_before_close() { }, |journal_id, modified_journal_info, trim_size, repair_result, db, reopen_result| { assert_eq!( - repair_result.unwrap(), + repair_result.expect(&format!( + "failed at trim_size {trim_size} for journal {journal_id}" + )), RepairResult::UnspecifiedLoss((DriverEvent::FULL_EVENT_SIZE - trim_size) as _), "failed at trim_size {trim_size} for journal {journal_id}" ); @@ -516,22 +565,24 @@ fn corruption_before_close() { assert_eq!( db.data().len(), 0, - "failed at {trim_size} for journal {journal_id}" + "failed at trim_size {trim_size} for journal {journal_id}" ); } else { // in the second case, we apply the event mix so we need to check this assert_eq!( db.data().len(), POST_TRIALS_SIZE, - "failed at {trim_size} for journal {journal_id}" + "failed at trim_size {trim_size} for journal {journal_id}" ); assert_eq!( *db.data().last().unwrap(), keyfmt(TRIALS - 1), - "failed at {trim_size} for journal {journal_id}" + "failed at trim_size {trim_size} for journal {journal_id}" ); } - let _ = reopen_result.unwrap(); + let _ = reopen_result.expect(&format!( + "failed at trim_size {trim_size} for journal {journal_id}" + )); // clear trace let _ = debug_get_trace(); let _ = debug_get_offsets(); @@ -541,24 +592,44 @@ fn corruption_before_close() { #[test] fn corruption_after_reopen() { - let initializers = vec![ - Initializer::new_driver_type("corruption_after_reopen.db", |jrnl_id| { - let mut jrnl = create_journal::(jrnl_id)?; - RawJournalWriter::close_driver(&mut jrnl)?; - drop(jrnl); - // reopen, but don't close - open_journal::(jrnl_id, &SimpleDB::new(), JournalSettings::default())?; - Ok(InitializerInfo::new_last_event(1)) - }), - Initializer::new_driver_type("corruption_after_ropen_multi_before_close.db", |jrnl_id| { - let mut jrnl = create_journal::(jrnl_id)?; - let operation_count = apply_event_mix(&mut jrnl)?; - RawJournalWriter::close_driver(&mut jrnl)?; - drop(jrnl); - // reopen, but don't close - open_journal::(jrnl_id, &SimpleDB::new(), JournalSettings::default())?; - Ok(InitializerInfo::new_last_event(operation_count + 1)) // + 1 since we have the reopen event which is the next event that'll vanish - }), + let initializers = [ + Initializer::new_driver_type( + /* + in this case we: create, close (0), reopen(1). corrupt reopen (1) + */ + "corruption_after_reopen.db", + |jrnl_id| { + let mut jrnl = create_journal::(jrnl_id)?; + RawJournalWriter::close_driver(&mut jrnl)?; + drop(jrnl); + // reopen, but don't close + open_journal::( + jrnl_id, + &SimpleDB::new(), + JournalSettings::default(), + )?; + Ok(InitializerInfo::new_last_event(1)) + }, + ), + Initializer::new_driver_type( + /* + in this case we: create, apply events([0,99]), close (100), reopen(101). corrupt reopen (101). expect no data loss. + */ + "corruption_after_ropen_multi_before_close.db", + |jrnl_id| { + let mut jrnl = create_journal::(jrnl_id)?; + let operation_count = apply_event_mix(&mut jrnl)?; + RawJournalWriter::close_driver(&mut jrnl)?; + drop(jrnl); + // reopen, but don't close + open_journal::( + jrnl_id, + &SimpleDB::new(), + JournalSettings::default(), + )?; + Ok(InitializerInfo::new_last_event(operation_count + 1)) // + 1 since we have the reopen event which is the next event that'll vanish + }, + ), ]; emulate_final_event_corruption( initializers, @@ -574,14 +645,15 @@ fn corruption_after_reopen() { will potentially introduce more bugs due to increased complexity. Get a good filesystem and disk controller (that attaches checksums to sectors)! -- @ohsayan */ - let mut jrnl = - open_result.expect(&format!("failed at {trim_size} for journal {journal_id}")); + let mut jrnl = open_result.expect(&format!( + "failed at trim_size {trim_size} for journal {journal_id}" + )); if modified_journal_info.init.last_executed_event_id == 1 { // empty log, only the reopen assert_eq!( db.data().len(), 0, - "failed at {trim_size} for journal {journal_id}" + "failed at trim_size {trim_size} for journal {journal_id}" ); assert_eq!( trace, @@ -603,19 +675,19 @@ fn corruption_after_reopen() { JournalWriterTraceEvent::DriverEventCompleted, JournalWriterTraceEvent::ReinitializeComplete, ], - "failed at {trim_size} for journal {journal_id}" + "failed at trim_size {trim_size} for journal {journal_id}" ); } else { // we will have upto the last event since only the reopen is gone assert_eq!( db.data().len(), POST_TRIALS_SIZE, - "failed at {trim_size} for journal {journal_id}" + "failed at trim_size {trim_size} for journal {journal_id}" ); assert_eq!( *db.data().last().unwrap(), keyfmt(TRIALS - 1), - "failed at {trim_size} for journal {journal_id}" + "failed at trim_size {trim_size} for journal {journal_id}" ); assert_eq!( &trace[trace.len() - 12..], @@ -639,7 +711,7 @@ fn corruption_after_reopen() { JournalWriterTraceEvent::DriverEventCompleted, JournalWriterTraceEvent::ReinitializeComplete ], - "failed at {trim_size} for journal {journal_id}" + "failed at trim_size {trim_size} for journal {journal_id}" ) } // now close this so that this works with the post repair handler @@ -650,14 +722,14 @@ fn corruption_after_reopen() { assert_eq!( open_result.unwrap_err().kind(), &ErrorKind::IoError(IoErrorKind::UnexpectedEof.into()), - "failed at {trim_size} for journal {journal_id}" + "failed at trim_size {trim_size} for journal {journal_id}" ); if modified_journal_info.init.last_executed_event_id == 1 { // empty log, only the reopen assert_eq!( db.data().len(), 0, - "failed at {trim_size} for journal {journal_id}" + "failed at trim_size {trim_size} for journal {journal_id}" ); assert_eq!( trace, @@ -673,7 +745,7 @@ fn corruption_after_reopen() { modified_journal_info.init.corrupted_event_id ) ], - "failed at {trim_size} for journal {journal_id}" + "failed at trim_size {trim_size} for journal {journal_id}" ); } else { // we will have upto the last event since only the reopen is gone @@ -690,7 +762,7 @@ fn corruption_after_reopen() { modified_journal_info.init.corrupted_event_id ) ], - "failed at {trim_size} for journal {journal_id}" + "failed at trim_size {trim_size} for journal {journal_id}" ); } } @@ -700,24 +772,28 @@ fn corruption_after_reopen() { if trim_size == DriverEvent::FULL_EVENT_SIZE { // see earlier comment assert_eq!( - repair_result.unwrap(), + repair_result.expect(&format!( + "failed at trim_size {trim_size} for journal {journal_id}" + )), RepairResult::NoErrors, - "failed at {trim_size} for journal {journal_id}" + "failed at trim_size {trim_size} for journal {journal_id}" ); } else { assert_eq!( - repair_result.unwrap(), + repair_result.expect(&format!( + "failed at trim_size {trim_size} for journal {journal_id}" + )), RepairResult::UnspecifiedLoss( (DriverEvent::FULL_EVENT_SIZE - trim_size) as u64 ), - "failed at {trim_size} for journal {journal_id}" + "failed at trim_size {trim_size} for journal {journal_id}" ); } if modified_journal_info.init.last_executed_event_id == 1 { assert_eq!( db.data().len(), 0, - "failed at {trim_size} for journal {journal_id}" + "failed at trim_size {trim_size} for journal {journal_id}" ); } else { assert_eq!(db.data().len(), POST_TRIALS_SIZE); @@ -732,7 +808,6 @@ fn corruption_after_reopen() { #[test] fn corruption_at_runtime() { // first get the offsets to compute the size of the event - const KEY: &str = "hello, universe"; let offset = { debug_set_offset_tracking(true); let mut sdb = SimpleDB::new(); @@ -745,8 +820,14 @@ fn corruption_at_runtime() { let _ = debug_get_trace(); ret }; - let initializers = vec![ + let initializers = [ Initializer::new( + /* + for this one we: + - PRC1: we create and apply one event (0) + + exepct data loss (0). + */ "corruption_at_runtime_open_commit_corrupt", |jrnl_id| { let mut sdb = SimpleDB::new(); @@ -758,6 +839,11 @@ fn corruption_at_runtime() { offset, ), Initializer::new( + /* + for this one we: + - PRC1: we create and apply events ([0,99]) + expect data loss (99) + */ "corruption_at_runtime_open_multi_commit_then_corrupt", |jrnl_id| { let mut op_count = 0; @@ -870,7 +956,9 @@ fn corruption_at_runtime() { |journal_id, modified_journal_info, trim_size, repair_result, db, reopen_result| { assert!(reopen_result.is_ok()); assert_eq!( - repair_result.unwrap(), + repair_result.expect(&format!( + "failed at trim_size {trim_size} for journal {journal_id}" + )), RepairResult::UnspecifiedLoss((offset - trim_size) as u64), "failed for journal {journal_id} with trim_size {trim_size}" ); @@ -897,10 +985,24 @@ fn corruption_at_runtime() { ) } +/* + midway corruption tests + --- + while in the prior tests we tested cases where the last event was corrupted, we now test cases where some middle + portion of the journal gets corrupted. the trouble is that we'll have to enumerate all cases for generated traces... + which is absolutely not feasible. Instead, we just ensure that the pre and post states are valid. +*/ + #[test] fn midway_corruption_close() { - let initializers = vec![ + let initializers = [ Initializer::new_driver_type("midway_corruption_close_direct", |jrnl_id| { + /* + in this test corruption case we: + - PR cycle 1: create and close (0) + - PR cycle 2: open (1) and close (2) + we emulate a sequential corruption case for (0) + */ // create and close let mut jrnl = create_journal::(jrnl_id)?; RawJournalWriter::close_driver(&mut jrnl)?; @@ -916,6 +1018,13 @@ fn midway_corruption_close() { Ok(InitializerInfo::new(0, 2)) // close (to corrupt), reopen, close }), Initializer::new_driver_type( + /* + in this test case we: + - PR cycle 1: create and close (0) + - PR cycle 2: reopen (1), apply events([2,101]), close (102) + - PR cycle 3: reopen (103), close (104) + we emulate a sequential corruption case for (102). expect all events to persist (<= 101) + */ "midway_corruption_close_events_before_second_close", |jrnl_id| { { @@ -947,6 +1056,14 @@ fn midway_corruption_close() { }, ), Initializer::new_driver_type( + /* + in this test case: + - PR cycle 1: create and close (0) + - PR cycle 2: reopen (1) and close (2) + - PR cycle 3: reopen(3), apply events([4,103]), close(104) + we emulate a sequential corruption of (2) which results in a catastrophic corruption. expect major + data loss (==TRIALS) + */ "midway_corruption_close_events_before_third_close", |jrnl_id| { { @@ -1000,7 +1117,7 @@ fn midway_corruption_close() { assert_eq!( *db.data().last().unwrap(), keyfmt(TRIALS - 1), - "failed at {trim_size} for journal {journal_id}" + "failed at trim_size {trim_size} for journal {journal_id}" ); } _ => panic!(), @@ -1009,7 +1126,9 @@ fn midway_corruption_close() { let _ = debug_get_trace(); }, |journal_id, modified_journal_info, trim_size, repair_result, db, reopen_result| { - let _ = reopen_result.unwrap(); + let _ = reopen_result.expect(&format!( + "failed at trim_size {trim_size} for journal {journal_id}" + )); match modified_journal_info.initializer_id { 0 | 2 => { // all data will be lost, so the DB will be empty @@ -1023,7 +1142,9 @@ fn midway_corruption_close() { { // the first event was corrupted assert_eq!( - repair_result.unwrap(), + repair_result.expect(&format!( + "failed at trim_size {trim_size} for journal {journal_id}" + )), RepairResult::UnspecifiedLoss( ((DriverEvent::FULL_EVENT_SIZE * 3) - trim_size) as u64 ), @@ -1033,7 +1154,7 @@ fn midway_corruption_close() { // this is a serious midway corruption with major data loss let full_log_size = File::open(journal_id).unwrap().f_len().unwrap(); assert_eq!( - repair_result.unwrap(), + repair_result.expect(&format!("failed at trim_size {trim_size} for journal {journal_id}")), RepairResult::UnspecifiedLoss( full_log_size - <::Spec as FileSpecV1>::SIZE // account for header @@ -1050,7 +1171,7 @@ fn midway_corruption_close() { assert_eq!( *db.data().last().unwrap(), keyfmt(TRIALS - 1), - "failed at {trim_size} for journal {journal_id}" + "failed at trim_size {trim_size} for journal {journal_id}" ); } _ => panic!(), @@ -1061,3 +1182,277 @@ fn midway_corruption_close() { ); debug_set_offset_tracking(false); } + +#[test] +fn midway_corruption_reopen() { + let initializers = [ + Initializer::new( + "midway_corruption_reopen_close_reopen_close", + |jrnl_id| { + /* + for this test case we create and close (0) the journal and in the next power cycle we reopen (1) and close (2) the + journal. we emulate a midway corruption where the reopen (1) gets corrupted. + */ + { + let mut jrnl = create_journal::(jrnl_id)?; + RawJournalWriter::close_driver(&mut jrnl)?; // (0) + } + { + let mut jrnl = open_journal::( + jrnl_id, + &SimpleDB::new(), + JournalSettings::default(), + )?; // (1) <-- corrupt + RawJournalWriter::close_driver(&mut jrnl)?; // (2) + } + Ok(InitializerInfo::new(1, 2)) + }, + DriverEvent::FULL_EVENT_SIZE, + ), + Initializer::new( + /* + create, apply ([0,99]), close (100). reopen(101), close (102). corrupt (101). expect no data loss + */ + "midway_corruption_reopen_apply_close_reopen_close", + |jrnl_id| { + let op_count; + { + let mut jrnl = create_journal::(jrnl_id)?; + op_count = apply_event_mix(&mut jrnl)?; + RawJournalWriter::close_driver(&mut jrnl)?; + } + { + let mut jrnl = open_journal::( + jrnl_id, + &SimpleDB::new(), + JournalSettings::default(), + )?; + RawJournalWriter::close_driver(&mut jrnl)?; + } + Ok(InitializerInfo::new((op_count + 1) as u64, 102)) + }, + DriverEvent::FULL_EVENT_SIZE, + ), + Initializer::new( + /* + create, close (0). reopen(1), apply ([2,101]), close (102). corrupt (1). expect full data loss + */ + "midway_corruption_reopen_apply_post_corrupted_reopen", + |jrnl_id| { + { + let mut jrnl = create_journal::(jrnl_id)?; + RawJournalWriter::close_driver(&mut jrnl)?; + } + { + let mut jrnl = open_journal::( + jrnl_id, + &SimpleDB::new(), + JournalSettings::default(), + )?; // <-- corrupt this one + let _ = apply_event_mix(&mut jrnl)?; // apply mix + RawJournalWriter::close_driver(&mut jrnl)?; + } + Ok(InitializerInfo::new(1, 102)) + }, + DriverEvent::FULL_EVENT_SIZE, + ), + ]; + debug_set_offset_tracking(true); // we need to track offsets + emulate_midway_corruption( + initializers, + |journal_id, modified_journal_info, trim_size, db, open_result| { + let _ = open_result.unwrap_err(); + match modified_journal_info.initializer_id { + 0 | 2 => { + assert_eq!( + db.data().len(), + 0, + "failed at trim_size {trim_size} for journal {journal_id}" + ); + } + 1 => { + assert_eq!( + db.data().len(), + POST_TRIALS_SIZE, + "failed at trim_size {trim_size} for journal {journal_id}" + ); + assert_eq!( + *db.data().last().unwrap(), + keyfmt(TRIALS - 1), + "failed at trim_size {trim_size} for journal {journal_id}" + ) + } + _ => panic!(), + } + let _ = debug_get_trace(); + let _ = debug_get_offsets(); + }, + |journal_id, modified_journal_info, trim_size, repair_result, db, open_result| { + let _ = open_result.expect(&format!( + "failed at trim_size {trim_size} for journal {journal_id}" + )); + let _ = repair_result.expect(&format!( + "failed at trim_size {trim_size} for journal {journal_id}" + )); + match modified_journal_info.initializer_id { + 0 | 2 => { + assert_eq!( + db.data().len(), + 0, + "failed at trim_size {trim_size} for journal {journal_id}" + ); + } + 1 => { + assert_eq!( + db.data().len(), + POST_TRIALS_SIZE, + "failed at trim_size {trim_size} for journal {journal_id}" + ); + assert_eq!( + *db.data().last().unwrap(), + keyfmt(TRIALS - 1), + "failed at trim_size {trim_size} for journal {journal_id}" + ) + } + _ => panic!(), + } + let _ = debug_get_trace(); + let _ = debug_get_offsets(); + }, + ); + debug_set_offset_tracking(false); +} + +#[test] +fn midway_corruption_at_runtime() { + debug_set_offset_tracking(true); + // compute offset size + let event_size_fixed_size_key = { + let mut jrnl = + create_journal::("midway_corruption_at_runtime_fixed_key").unwrap(); + SimpleDB::new().push(&mut jrnl, KEY).unwrap(); + let (_, offsets) = (debug_get_trace(), debug_get_offsets()); + *offsets.get(&0).unwrap() as usize + - <::Spec as FileSpecV1>::SIZE + }; + // compute offset size + let event_size_dynamic_key = { + let mut jrnl = + create_journal::("midway_corruption_at_runtime_dynamic_key").unwrap(); + SimpleDB::new().push(&mut jrnl, keyfmt(0)).unwrap(); + let (_, offsets) = (debug_get_trace(), debug_get_offsets()); + *offsets.get(&0).unwrap() as usize + - <::Spec as FileSpecV1>::SIZE + }; + let initializers = [ + Initializer::new( + /* + open, apply (0), close (1). corrupt (0). expect complete loss of the push event (0). + */ + "midway_corruption_at_runtime_open_server_event_close", + |jrnl_id| { + let mut jrnl = create_journal::(jrnl_id)?; + SimpleDB::new().push(&mut jrnl, KEY)?; + RawJournalWriter::close_driver(&mut jrnl)?; + Ok(InitializerInfo::new(0, 1)) + }, + event_size_fixed_size_key, + ), + Initializer::new( + /* + open, apply([0,99]), close (100). corrupt (99). expect complete loss of the last event(99). + */ + "midway_corruption_at_runtime_open_multiserver_event_then_close_corrupt_last", + |jrnl_id| { + let mut jrnl = create_journal::(jrnl_id)?; + let mut sdb = SimpleDB::new(); + for num in 1..=TRIALS { + sdb.push(&mut jrnl, keyfmt(num))?; + } + RawJournalWriter::close_driver(&mut jrnl)?; + Ok(InitializerInfo::new(TRIALS as u64 - 1, TRIALS as u64)) + }, + event_size_dynamic_key, + ), + Initializer::new( + /* + open, apply([0,99]), close (100). corrupt (0). expect complete loss of all events + */ + "midway_corruption_at_runtime_open_multiserver_event_then_close_corrupt_first", + |jrnl_id| { + let mut jrnl = create_journal::(jrnl_id)?; + let mut sdb = SimpleDB::new(); + for num in 1..=TRIALS { + sdb.push(&mut jrnl, keyfmt(num))?; + } + RawJournalWriter::close_driver(&mut jrnl)?; + Ok(InitializerInfo::new(0, TRIALS as u64)) + }, + event_size_dynamic_key, + ), + ]; + emulate_midway_corruption( + initializers, + |journal_id, modified_journal_info, trim_size, db, open_result| { + let _ = open_result.unwrap_err(); + let (_, _) = (debug_get_trace(), debug_get_offsets()); + match modified_journal_info.initializer_id { + 0 | 2 => { + assert_eq!( + db.data().len(), + 0, + "failed at trim_size {trim_size} for journal {journal_id}. data={:?}", + db.data() + ); + } + 1 => { + // expect to have all keys upto TRIALS - 1 + assert_eq!( + db.data().len(), + TRIALS - 1, + "failed at trim_size {trim_size} for journal {journal_id}. data={:?}", + db.data() + ); + // last key is TRIALS - 1 + assert_eq!(*db.data().last().unwrap(), keyfmt(TRIALS - 1)); + } + _ => panic!(), + } + }, + |journal_id, modified_journal_info, trim_size, repair_result, db, open_result| { + let _ = repair_result.expect(&format!( + "failed at trim_size {trim_size} for journal {journal_id}. file data={:?}. original_data={:?}", + FileSystem::read(&make_corrupted_file_name(journal_id, trim_size)), + FileSystem::read(journal_id), + )); + let _ = open_result.expect(&format!( + "failed at trim_size {trim_size} for journal {journal_id}. file data={:?}. original_data={:?}", + FileSystem::read(&make_corrupted_file_name(journal_id, trim_size)), + FileSystem::read(journal_id), + )); + match modified_journal_info.initializer_id { + 0 | 2 => { + assert_eq!( + db.data().len(), + 0, + "failed at trim_size {trim_size} for journal {journal_id}. data={:?}", + db.data() + ); + } + 1 => { + // expect to have all keys upto TRIALS - 1 + assert_eq!( + db.data().len(), + TRIALS - 1, + "failed at trim_size {trim_size} for journal {journal_id}. data={:?}", + db.data() + ); + // last key is TRIALS - 1 + assert_eq!(*db.data().last().unwrap(), keyfmt(TRIALS - 1)); + } + _ => panic!(), + } + let (_, _) = (debug_get_trace(), debug_get_offsets()); + }, + ) +}