diff --git a/server/src/engine/storage/v2/raw/journal/raw/mod.rs b/server/src/engine/storage/v2/raw/journal/raw/mod.rs index 4d065ce3..4130d792 100644 --- a/server/src/engine/storage/v2/raw/journal/raw/mod.rs +++ b/server/src/engine/storage/v2/raw/journal/raw/mod.rs @@ -631,6 +631,10 @@ impl RawJournalWriter { { self.commit_with_ctx(event, Default::default()) } + /// roll back to the last txn + /// WARNING: only call on failure + /// + /// NB: Idempotency is guaranteed. Will rollback to, and only to the last event pub fn __rollback(&mut self) -> RuntimeResult<()> { // ensure cursors are in sync, even if out of position self.log_file.verify_cursor()?; diff --git a/server/src/engine/storage/v2/raw/journal/raw/tests/recovery.rs b/server/src/engine/storage/v2/raw/journal/raw/tests/recovery.rs index a85b749c..81e91f56 100644 --- a/server/src/engine/storage/v2/raw/journal/raw/tests/recovery.rs +++ b/server/src/engine/storage/v2/raw/journal/raw/tests/recovery.rs @@ -139,6 +139,17 @@ fn make_corrupted_file_name(journal_id: &str, trim_size: usize) -> String { format!("{journal_id}-trimmed-{trim_size}.db") } +fn journal_init(journal_id: &str) -> RuntimeResult> { + create_journal(journal_id) +} + +fn journal_open( + journal_id: &str, + db: &SimpleDB, +) -> RuntimeResult> { + open_journal(journal_id, db, JournalSettings::default()) +} + #[derive(Debug)] /// Information about the layout of the modified journal struct ModifiedJournalStorageInfo { @@ -217,13 +228,7 @@ fn emulate_sequentially_varying_single_corruption( for trim_size in 1..=last_event_size { // create a copy of the "good" journal and corrupt it let corrupted_journal_path = make_corrupted_file_name(journal_id, trim_size); - let open_journal_fn = |db: &SimpleDB| { - open_journal::( - &corrupted_journal_path, - db, - JournalSettings::default(), - ) - }; + let open_journal_fn = |db: &SimpleDB| journal_open(&corrupted_journal_path, db); // modify journal let storage_info = modified_journal_generator_fn( journal_id, @@ -412,7 +417,7 @@ fn corruption_before_close() { */ "close_event_corruption_empty.db", |jrnl_id| { - let mut jrnl = create_journal::(jrnl_id)?; + let mut jrnl = journal_init(jrnl_id)?; RawJournalWriter::close_driver(&mut jrnl)?; Ok(InitializerInfo::new_last_event(0)) }, @@ -423,7 +428,7 @@ fn corruption_before_close() { */ "close_event_corruption.db", |jrnl_id| { - let mut jrnl = create_journal::(jrnl_id)?; + let mut jrnl = journal_init(jrnl_id)?; let operation_count = apply_event_mix(&mut jrnl)?; RawJournalWriter::close_driver(&mut jrnl)?; Ok(InitializerInfo::new_last_event(operation_count)) @@ -436,15 +441,11 @@ fn corruption_before_close() { "close_event_corruption_open_close_open_close.db", |jrnl_id| { // open and close - let mut jrnl = create_journal::(jrnl_id)?; + let mut jrnl = journal_init(jrnl_id)?; RawJournalWriter::close_driver(&mut jrnl)?; drop(jrnl); // reinit and close - let mut jrnl = open_journal::( - jrnl_id, - &SimpleDB::new(), - JournalSettings::default(), - )?; + let mut jrnl = journal_open(jrnl_id, &SimpleDB::new())?; RawJournalWriter::close_driver(&mut jrnl)?; Ok(InitializerInfo::new_last_event(2)) }, @@ -607,15 +608,11 @@ fn corruption_after_reopen() { */ "corruption_after_reopen.db", |jrnl_id| { - let mut jrnl = create_journal::(jrnl_id)?; + let mut jrnl = journal_init(jrnl_id)?; RawJournalWriter::close_driver(&mut jrnl)?; drop(jrnl); // reopen, but don't close - open_journal::( - jrnl_id, - &SimpleDB::new(), - JournalSettings::default(), - )?; + journal_open(jrnl_id, &SimpleDB::new())?; Ok(InitializerInfo::new_last_event(1)) }, ), @@ -625,16 +622,12 @@ fn corruption_after_reopen() { */ "corruption_after_ropen_multi_before_close.db", |jrnl_id| { - let mut jrnl = create_journal::(jrnl_id)?; + let mut jrnl = journal_init(jrnl_id)?; let operation_count = apply_event_mix(&mut jrnl)?; RawJournalWriter::close_driver(&mut jrnl)?; drop(jrnl); // reopen, but don't close - open_journal::( - jrnl_id, - &SimpleDB::new(), - JournalSettings::default(), - )?; + journal_open(jrnl_id, &SimpleDB::new())?; Ok(InitializerInfo::new_last_event(operation_count + 1)) // + 1 since we have the reopen event which is the next event that'll vanish }, ), @@ -1011,15 +1004,11 @@ fn midway_corruption_close() { we emulate a sequential corruption case for (0) */ // create and close - let mut jrnl = create_journal::(jrnl_id)?; + let mut jrnl = journal_init(jrnl_id)?; RawJournalWriter::close_driver(&mut jrnl)?; drop(jrnl); // reopen and close - let mut jrnl = open_journal::( - jrnl_id, - &SimpleDB::new(), - JournalSettings::default(), - )?; + let mut jrnl = journal_open(jrnl_id, &SimpleDB::new())?; RawJournalWriter::close_driver(&mut jrnl)?; drop(jrnl); Ok(InitializerInfo::new(0, 2)) // close (to corrupt), reopen, close @@ -1036,27 +1025,19 @@ fn midway_corruption_close() { |jrnl_id| { { // create and close - let mut jrnl = create_journal::(jrnl_id)?; + let mut jrnl = journal_init(jrnl_id)?; RawJournalWriter::close_driver(&mut jrnl)?; // (0) } let op_cnt; { // reopen, apply mix and close - let mut jrnl = open_journal::( - jrnl_id, - &SimpleDB::new(), - JournalSettings::default(), - )?; // (1) + let mut jrnl = journal_open(jrnl_id, &SimpleDB::new())?; // (1) op_cnt = apply_event_mix(&mut jrnl)?; RawJournalWriter::close_driver(&mut jrnl)?; // <-- (op_cnt + 2) corrupt this one } { // reopen and close - let mut jrnl = open_journal::( - jrnl_id, - &SimpleDB::new(), - JournalSettings::default(), - )?; // (op_cnt + 3) + let mut jrnl = journal_open(jrnl_id, &SimpleDB::new())?; // (op_cnt + 3) RawJournalWriter::close_driver(&mut jrnl)?; // (op_cnt + 4) } Ok(InitializerInfo::new(op_cnt + 2, op_cnt + 4)) @@ -1075,25 +1056,17 @@ fn midway_corruption_close() { |jrnl_id| { { // create and close - let mut jrnl = create_journal::(jrnl_id)?; + let mut jrnl = journal_init(jrnl_id)?; RawJournalWriter::close_driver(&mut jrnl)?; // (0) } { // reopen and close - let mut jrnl = open_journal::( - jrnl_id, - &SimpleDB::new(), - JournalSettings::default(), - )?; // (1) + let mut jrnl = journal_open(jrnl_id, &SimpleDB::new())?; // (1) RawJournalWriter::close_driver(&mut jrnl)?; // <-- (2) corrupt this one } let op_cnt; { - let mut jrnl = open_journal::( - jrnl_id, - &SimpleDB::new(), - JournalSettings::default(), - )?; // (3) + let mut jrnl = journal_open(jrnl_id, &SimpleDB::new())?; // (3) op_cnt = apply_event_mix(&mut jrnl)?; // (3 + op_count) RawJournalWriter::close_driver(&mut jrnl)?; // (4 + op_count) } @@ -1203,15 +1176,11 @@ fn midway_corruption_reopen() { journal. we emulate a midway corruption where the reopen (1) gets corrupted. */ { - let mut jrnl = create_journal::(jrnl_id)?; + let mut jrnl = journal_init(jrnl_id)?; RawJournalWriter::close_driver(&mut jrnl)?; // (0) } { - let mut jrnl = open_journal::( - jrnl_id, - &SimpleDB::new(), - JournalSettings::default(), - )?; // (1) <-- corrupt + let mut jrnl = journal_open(jrnl_id, &SimpleDB::new())?; // (1) <-- corrupt RawJournalWriter::close_driver(&mut jrnl)?; // (2) } Ok(InitializerInfo::new(1, 2)) @@ -1226,16 +1195,12 @@ fn midway_corruption_reopen() { |jrnl_id| { let op_count; { - let mut jrnl = create_journal::(jrnl_id)?; + let mut jrnl = journal_init(jrnl_id)?; op_count = apply_event_mix(&mut jrnl)?; RawJournalWriter::close_driver(&mut jrnl)?; } { - let mut jrnl = open_journal::( - jrnl_id, - &SimpleDB::new(), - JournalSettings::default(), - )?; + let mut jrnl = journal_open(jrnl_id, &SimpleDB::new())?; RawJournalWriter::close_driver(&mut jrnl)?; } Ok(InitializerInfo::new((op_count + 1) as u64, 102)) @@ -1249,15 +1214,11 @@ fn midway_corruption_reopen() { "midway_corruption_reopen_apply_post_corrupted_reopen", |jrnl_id| { { - let mut jrnl = create_journal::(jrnl_id)?; + let mut jrnl = journal_init(jrnl_id)?; RawJournalWriter::close_driver(&mut jrnl)?; } { - let mut jrnl = open_journal::( - jrnl_id, - &SimpleDB::new(), - JournalSettings::default(), - )?; // <-- corrupt this one + let mut jrnl = journal_open(jrnl_id, &SimpleDB::new())?; // <-- corrupt this one let _ = apply_event_mix(&mut jrnl)?; // apply mix RawJournalWriter::close_driver(&mut jrnl)?; } @@ -1367,7 +1328,7 @@ fn midway_corruption_at_runtime() { */ "midway_corruption_at_runtime_open_server_event_close", |jrnl_id| { - let mut jrnl = create_journal::(jrnl_id)?; + let mut jrnl = journal_init(jrnl_id)?; SimpleDB::new().push(&mut jrnl, KEY)?; RawJournalWriter::close_driver(&mut jrnl)?; Ok(InitializerInfo::new(0, 1)) @@ -1380,7 +1341,7 @@ fn midway_corruption_at_runtime() { */ "midway_corruption_at_runtime_open_multiserver_event_then_close_corrupt_last", |jrnl_id| { - let mut jrnl = create_journal::(jrnl_id)?; + let mut jrnl = journal_init(jrnl_id)?; let mut sdb = SimpleDB::new(); for num in 1..=TRIALS { sdb.push(&mut jrnl, keyfmt(num))?; @@ -1396,7 +1357,7 @@ fn midway_corruption_at_runtime() { */ "midway_corruption_at_runtime_open_multiserver_event_then_close_corrupt_first", |jrnl_id| { - let mut jrnl = create_journal::(jrnl_id)?; + let mut jrnl = journal_init(jrnl_id)?; let mut sdb = SimpleDB::new(); for num in 1..=TRIALS { sdb.push(&mut jrnl, keyfmt(num))?; @@ -1521,13 +1482,15 @@ fn emulate_failure_for_rollback( let mut jrnl = create_journal::(journal_id).unwrap(); let err = action(&mut db, &mut jrnl).unwrap_err(); verify_error(err); - jrnl.__rollback().unwrap(); + for _ in 0..1000 { + // idempotency guarantee: no matter how many times this is called, the underlying state will rollback to, and only to the last event + jrnl.__rollback().unwrap(); + } RawJournalWriter::close_driver(&mut jrnl).unwrap(); } { let db = SimpleDB::new(); - let mut jrnl = open_journal::(journal_id, &db, JournalSettings::default()) - .expect(&format!("{:#?}", debug_get_trace())); + let mut jrnl = journal_open(journal_id, &db).expect(&format!("{:#?}", debug_get_trace())); post_rollback(&db); RawJournalWriter::close_driver(&mut jrnl).unwrap(); }