storage: test RB idempotency guarantee

next
Sayan Nandan 6 months ago
parent b603eb05d3
commit 852dcf2341
No known key found for this signature in database
GPG Key ID: 0EBD769024B24F0A

@ -631,6 +631,10 @@ impl<J: RawJournalAdapter> RawJournalWriter<J> {
{ {
self.commit_with_ctx(event, Default::default()) self.commit_with_ctx(event, Default::default())
} }
/// roll back to the last txn
/// WARNING: only call on failure
///
/// NB: Idempotency is guaranteed. Will rollback to, and only to the last event
pub fn __rollback(&mut self) -> RuntimeResult<()> { pub fn __rollback(&mut self) -> RuntimeResult<()> {
// ensure cursors are in sync, even if out of position // ensure cursors are in sync, even if out of position
self.log_file.verify_cursor()?; self.log_file.verify_cursor()?;

@ -139,6 +139,17 @@ fn make_corrupted_file_name(journal_id: &str, trim_size: usize) -> String {
format!("{journal_id}-trimmed-{trim_size}.db") format!("{journal_id}-trimmed-{trim_size}.db")
} }
fn journal_init(journal_id: &str) -> RuntimeResult<RawJournalWriter<SimpleDBJournal>> {
create_journal(journal_id)
}
fn journal_open(
journal_id: &str,
db: &SimpleDB,
) -> RuntimeResult<RawJournalWriter<SimpleDBJournal>> {
open_journal(journal_id, db, JournalSettings::default())
}
#[derive(Debug)] #[derive(Debug)]
/// Information about the layout of the modified journal /// Information about the layout of the modified journal
struct ModifiedJournalStorageInfo { struct ModifiedJournalStorageInfo {
@ -217,13 +228,7 @@ fn emulate_sequentially_varying_single_corruption(
for trim_size in 1..=last_event_size { for trim_size in 1..=last_event_size {
// create a copy of the "good" journal and corrupt it // create a copy of the "good" journal and corrupt it
let corrupted_journal_path = make_corrupted_file_name(journal_id, trim_size); let corrupted_journal_path = make_corrupted_file_name(journal_id, trim_size);
let open_journal_fn = |db: &SimpleDB| { let open_journal_fn = |db: &SimpleDB| journal_open(&corrupted_journal_path, db);
open_journal::<SimpleDBJournal>(
&corrupted_journal_path,
db,
JournalSettings::default(),
)
};
// modify journal // modify journal
let storage_info = modified_journal_generator_fn( let storage_info = modified_journal_generator_fn(
journal_id, journal_id,
@ -412,7 +417,7 @@ fn corruption_before_close() {
*/ */
"close_event_corruption_empty.db", "close_event_corruption_empty.db",
|jrnl_id| { |jrnl_id| {
let mut jrnl = create_journal::<SimpleDBJournal>(jrnl_id)?; let mut jrnl = journal_init(jrnl_id)?;
RawJournalWriter::close_driver(&mut jrnl)?; RawJournalWriter::close_driver(&mut jrnl)?;
Ok(InitializerInfo::new_last_event(0)) Ok(InitializerInfo::new_last_event(0))
}, },
@ -423,7 +428,7 @@ fn corruption_before_close() {
*/ */
"close_event_corruption.db", "close_event_corruption.db",
|jrnl_id| { |jrnl_id| {
let mut jrnl = create_journal::<SimpleDBJournal>(jrnl_id)?; let mut jrnl = journal_init(jrnl_id)?;
let operation_count = apply_event_mix(&mut jrnl)?; let operation_count = apply_event_mix(&mut jrnl)?;
RawJournalWriter::close_driver(&mut jrnl)?; RawJournalWriter::close_driver(&mut jrnl)?;
Ok(InitializerInfo::new_last_event(operation_count)) Ok(InitializerInfo::new_last_event(operation_count))
@ -436,15 +441,11 @@ fn corruption_before_close() {
"close_event_corruption_open_close_open_close.db", "close_event_corruption_open_close_open_close.db",
|jrnl_id| { |jrnl_id| {
// open and close // open and close
let mut jrnl = create_journal::<SimpleDBJournal>(jrnl_id)?; let mut jrnl = journal_init(jrnl_id)?;
RawJournalWriter::close_driver(&mut jrnl)?; RawJournalWriter::close_driver(&mut jrnl)?;
drop(jrnl); drop(jrnl);
// reinit and close // reinit and close
let mut jrnl = open_journal::<SimpleDBJournal>( let mut jrnl = journal_open(jrnl_id, &SimpleDB::new())?;
jrnl_id,
&SimpleDB::new(),
JournalSettings::default(),
)?;
RawJournalWriter::close_driver(&mut jrnl)?; RawJournalWriter::close_driver(&mut jrnl)?;
Ok(InitializerInfo::new_last_event(2)) Ok(InitializerInfo::new_last_event(2))
}, },
@ -607,15 +608,11 @@ fn corruption_after_reopen() {
*/ */
"corruption_after_reopen.db", "corruption_after_reopen.db",
|jrnl_id| { |jrnl_id| {
let mut jrnl = create_journal::<SimpleDBJournal>(jrnl_id)?; let mut jrnl = journal_init(jrnl_id)?;
RawJournalWriter::close_driver(&mut jrnl)?; RawJournalWriter::close_driver(&mut jrnl)?;
drop(jrnl); drop(jrnl);
// reopen, but don't close // reopen, but don't close
open_journal::<SimpleDBJournal>( journal_open(jrnl_id, &SimpleDB::new())?;
jrnl_id,
&SimpleDB::new(),
JournalSettings::default(),
)?;
Ok(InitializerInfo::new_last_event(1)) Ok(InitializerInfo::new_last_event(1))
}, },
), ),
@ -625,16 +622,12 @@ fn corruption_after_reopen() {
*/ */
"corruption_after_ropen_multi_before_close.db", "corruption_after_ropen_multi_before_close.db",
|jrnl_id| { |jrnl_id| {
let mut jrnl = create_journal::<SimpleDBJournal>(jrnl_id)?; let mut jrnl = journal_init(jrnl_id)?;
let operation_count = apply_event_mix(&mut jrnl)?; let operation_count = apply_event_mix(&mut jrnl)?;
RawJournalWriter::close_driver(&mut jrnl)?; RawJournalWriter::close_driver(&mut jrnl)?;
drop(jrnl); drop(jrnl);
// reopen, but don't close // reopen, but don't close
open_journal::<SimpleDBJournal>( journal_open(jrnl_id, &SimpleDB::new())?;
jrnl_id,
&SimpleDB::new(),
JournalSettings::default(),
)?;
Ok(InitializerInfo::new_last_event(operation_count + 1)) // + 1 since we have the reopen event which is the next event that'll vanish Ok(InitializerInfo::new_last_event(operation_count + 1)) // + 1 since we have the reopen event which is the next event that'll vanish
}, },
), ),
@ -1011,15 +1004,11 @@ fn midway_corruption_close() {
we emulate a sequential corruption case for (0) we emulate a sequential corruption case for (0)
*/ */
// create and close // create and close
let mut jrnl = create_journal::<SimpleDBJournal>(jrnl_id)?; let mut jrnl = journal_init(jrnl_id)?;
RawJournalWriter::close_driver(&mut jrnl)?; RawJournalWriter::close_driver(&mut jrnl)?;
drop(jrnl); drop(jrnl);
// reopen and close // reopen and close
let mut jrnl = open_journal::<SimpleDBJournal>( let mut jrnl = journal_open(jrnl_id, &SimpleDB::new())?;
jrnl_id,
&SimpleDB::new(),
JournalSettings::default(),
)?;
RawJournalWriter::close_driver(&mut jrnl)?; RawJournalWriter::close_driver(&mut jrnl)?;
drop(jrnl); drop(jrnl);
Ok(InitializerInfo::new(0, 2)) // close (to corrupt), reopen, close Ok(InitializerInfo::new(0, 2)) // close (to corrupt), reopen, close
@ -1036,27 +1025,19 @@ fn midway_corruption_close() {
|jrnl_id| { |jrnl_id| {
{ {
// create and close // create and close
let mut jrnl = create_journal::<SimpleDBJournal>(jrnl_id)?; let mut jrnl = journal_init(jrnl_id)?;
RawJournalWriter::close_driver(&mut jrnl)?; // (0) RawJournalWriter::close_driver(&mut jrnl)?; // (0)
} }
let op_cnt; let op_cnt;
{ {
// reopen, apply mix and close // reopen, apply mix and close
let mut jrnl = open_journal::<SimpleDBJournal>( let mut jrnl = journal_open(jrnl_id, &SimpleDB::new())?; // (1)
jrnl_id,
&SimpleDB::new(),
JournalSettings::default(),
)?; // (1)
op_cnt = apply_event_mix(&mut jrnl)?; op_cnt = apply_event_mix(&mut jrnl)?;
RawJournalWriter::close_driver(&mut jrnl)?; // <-- (op_cnt + 2) corrupt this one RawJournalWriter::close_driver(&mut jrnl)?; // <-- (op_cnt + 2) corrupt this one
} }
{ {
// reopen and close // reopen and close
let mut jrnl = open_journal::<SimpleDBJournal>( let mut jrnl = journal_open(jrnl_id, &SimpleDB::new())?; // (op_cnt + 3)
jrnl_id,
&SimpleDB::new(),
JournalSettings::default(),
)?; // (op_cnt + 3)
RawJournalWriter::close_driver(&mut jrnl)?; // (op_cnt + 4) RawJournalWriter::close_driver(&mut jrnl)?; // (op_cnt + 4)
} }
Ok(InitializerInfo::new(op_cnt + 2, op_cnt + 4)) Ok(InitializerInfo::new(op_cnt + 2, op_cnt + 4))
@ -1075,25 +1056,17 @@ fn midway_corruption_close() {
|jrnl_id| { |jrnl_id| {
{ {
// create and close // create and close
let mut jrnl = create_journal::<SimpleDBJournal>(jrnl_id)?; let mut jrnl = journal_init(jrnl_id)?;
RawJournalWriter::close_driver(&mut jrnl)?; // (0) RawJournalWriter::close_driver(&mut jrnl)?; // (0)
} }
{ {
// reopen and close // reopen and close
let mut jrnl = open_journal::<SimpleDBJournal>( let mut jrnl = journal_open(jrnl_id, &SimpleDB::new())?; // (1)
jrnl_id,
&SimpleDB::new(),
JournalSettings::default(),
)?; // (1)
RawJournalWriter::close_driver(&mut jrnl)?; // <-- (2) corrupt this one RawJournalWriter::close_driver(&mut jrnl)?; // <-- (2) corrupt this one
} }
let op_cnt; let op_cnt;
{ {
let mut jrnl = open_journal::<SimpleDBJournal>( let mut jrnl = journal_open(jrnl_id, &SimpleDB::new())?; // (3)
jrnl_id,
&SimpleDB::new(),
JournalSettings::default(),
)?; // (3)
op_cnt = apply_event_mix(&mut jrnl)?; // (3 + op_count) op_cnt = apply_event_mix(&mut jrnl)?; // (3 + op_count)
RawJournalWriter::close_driver(&mut jrnl)?; // (4 + op_count) RawJournalWriter::close_driver(&mut jrnl)?; // (4 + op_count)
} }
@ -1203,15 +1176,11 @@ fn midway_corruption_reopen() {
journal. we emulate a midway corruption where the reopen (1) gets corrupted. journal. we emulate a midway corruption where the reopen (1) gets corrupted.
*/ */
{ {
let mut jrnl = create_journal::<SimpleDBJournal>(jrnl_id)?; let mut jrnl = journal_init(jrnl_id)?;
RawJournalWriter::close_driver(&mut jrnl)?; // (0) RawJournalWriter::close_driver(&mut jrnl)?; // (0)
} }
{ {
let mut jrnl = open_journal::<SimpleDBJournal>( let mut jrnl = journal_open(jrnl_id, &SimpleDB::new())?; // (1) <-- corrupt
jrnl_id,
&SimpleDB::new(),
JournalSettings::default(),
)?; // (1) <-- corrupt
RawJournalWriter::close_driver(&mut jrnl)?; // (2) RawJournalWriter::close_driver(&mut jrnl)?; // (2)
} }
Ok(InitializerInfo::new(1, 2)) Ok(InitializerInfo::new(1, 2))
@ -1226,16 +1195,12 @@ fn midway_corruption_reopen() {
|jrnl_id| { |jrnl_id| {
let op_count; let op_count;
{ {
let mut jrnl = create_journal::<SimpleDBJournal>(jrnl_id)?; let mut jrnl = journal_init(jrnl_id)?;
op_count = apply_event_mix(&mut jrnl)?; op_count = apply_event_mix(&mut jrnl)?;
RawJournalWriter::close_driver(&mut jrnl)?; RawJournalWriter::close_driver(&mut jrnl)?;
} }
{ {
let mut jrnl = open_journal::<SimpleDBJournal>( let mut jrnl = journal_open(jrnl_id, &SimpleDB::new())?;
jrnl_id,
&SimpleDB::new(),
JournalSettings::default(),
)?;
RawJournalWriter::close_driver(&mut jrnl)?; RawJournalWriter::close_driver(&mut jrnl)?;
} }
Ok(InitializerInfo::new((op_count + 1) as u64, 102)) Ok(InitializerInfo::new((op_count + 1) as u64, 102))
@ -1249,15 +1214,11 @@ fn midway_corruption_reopen() {
"midway_corruption_reopen_apply_post_corrupted_reopen", "midway_corruption_reopen_apply_post_corrupted_reopen",
|jrnl_id| { |jrnl_id| {
{ {
let mut jrnl = create_journal::<SimpleDBJournal>(jrnl_id)?; let mut jrnl = journal_init(jrnl_id)?;
RawJournalWriter::close_driver(&mut jrnl)?; RawJournalWriter::close_driver(&mut jrnl)?;
} }
{ {
let mut jrnl = open_journal::<SimpleDBJournal>( let mut jrnl = journal_open(jrnl_id, &SimpleDB::new())?; // <-- corrupt this one
jrnl_id,
&SimpleDB::new(),
JournalSettings::default(),
)?; // <-- corrupt this one
let _ = apply_event_mix(&mut jrnl)?; // apply mix let _ = apply_event_mix(&mut jrnl)?; // apply mix
RawJournalWriter::close_driver(&mut jrnl)?; RawJournalWriter::close_driver(&mut jrnl)?;
} }
@ -1367,7 +1328,7 @@ fn midway_corruption_at_runtime() {
*/ */
"midway_corruption_at_runtime_open_server_event_close", "midway_corruption_at_runtime_open_server_event_close",
|jrnl_id| { |jrnl_id| {
let mut jrnl = create_journal::<SimpleDBJournal>(jrnl_id)?; let mut jrnl = journal_init(jrnl_id)?;
SimpleDB::new().push(&mut jrnl, KEY)?; SimpleDB::new().push(&mut jrnl, KEY)?;
RawJournalWriter::close_driver(&mut jrnl)?; RawJournalWriter::close_driver(&mut jrnl)?;
Ok(InitializerInfo::new(0, 1)) Ok(InitializerInfo::new(0, 1))
@ -1380,7 +1341,7 @@ fn midway_corruption_at_runtime() {
*/ */
"midway_corruption_at_runtime_open_multiserver_event_then_close_corrupt_last", "midway_corruption_at_runtime_open_multiserver_event_then_close_corrupt_last",
|jrnl_id| { |jrnl_id| {
let mut jrnl = create_journal::<SimpleDBJournal>(jrnl_id)?; let mut jrnl = journal_init(jrnl_id)?;
let mut sdb = SimpleDB::new(); let mut sdb = SimpleDB::new();
for num in 1..=TRIALS { for num in 1..=TRIALS {
sdb.push(&mut jrnl, keyfmt(num))?; sdb.push(&mut jrnl, keyfmt(num))?;
@ -1396,7 +1357,7 @@ fn midway_corruption_at_runtime() {
*/ */
"midway_corruption_at_runtime_open_multiserver_event_then_close_corrupt_first", "midway_corruption_at_runtime_open_multiserver_event_then_close_corrupt_first",
|jrnl_id| { |jrnl_id| {
let mut jrnl = create_journal::<SimpleDBJournal>(jrnl_id)?; let mut jrnl = journal_init(jrnl_id)?;
let mut sdb = SimpleDB::new(); let mut sdb = SimpleDB::new();
for num in 1..=TRIALS { for num in 1..=TRIALS {
sdb.push(&mut jrnl, keyfmt(num))?; sdb.push(&mut jrnl, keyfmt(num))?;
@ -1521,13 +1482,15 @@ fn emulate_failure_for_rollback(
let mut jrnl = create_journal::<SimpleDBJournal>(journal_id).unwrap(); let mut jrnl = create_journal::<SimpleDBJournal>(journal_id).unwrap();
let err = action(&mut db, &mut jrnl).unwrap_err(); let err = action(&mut db, &mut jrnl).unwrap_err();
verify_error(err); verify_error(err);
jrnl.__rollback().unwrap(); for _ in 0..1000 {
// idempotency guarantee: no matter how many times this is called, the underlying state will rollback to, and only to the last event
jrnl.__rollback().unwrap();
}
RawJournalWriter::close_driver(&mut jrnl).unwrap(); RawJournalWriter::close_driver(&mut jrnl).unwrap();
} }
{ {
let db = SimpleDB::new(); let db = SimpleDB::new();
let mut jrnl = open_journal::<SimpleDBJournal>(journal_id, &db, JournalSettings::default()) let mut jrnl = journal_open(journal_id, &db).expect(&format!("{:#?}", debug_get_trace()));
.expect(&format!("{:#?}", debug_get_trace()));
post_rollback(&db); post_rollback(&db);
RawJournalWriter::close_driver(&mut jrnl).unwrap(); RawJournalWriter::close_driver(&mut jrnl).unwrap();
} }

Loading…
Cancel
Save