storage: Test more midway (on-disk corruption) scenarios

next
Sayan Nandan 6 months ago
parent af7e567b31
commit 13361fe535
No known key found for this signature in database
GPG Key ID: 0EBD769024B24F0A

@ -34,12 +34,17 @@ use {
}, },
crate::engine::{ crate::engine::{
error::StorageError, error::StorageError,
storage::{common::sdss::sdss_r1::rw::TrackedReader, v2::raw::spec::SystemDatabaseV1}, storage::{
common::{checksum::SCrc64, sdss::sdss_r1::rw::TrackedReader},
v2::raw::spec::SystemDatabaseV1,
},
RuntimeResult, RuntimeResult,
}, },
std::cell::RefCell, std::cell::RefCell,
}; };
const SANE_MEM_LIMIT_BYTES: usize = 2048;
/* /*
impls for journal tests impls for journal tests
*/ */
@ -103,8 +108,14 @@ macro_rules! impl_db_event {
impl_db_event!( impl_db_event!(
DbEventPush<'_> as 0 => |me, buf| { DbEventPush<'_> as 0 => |me, buf| {
buf.extend(&(me.0.len() as u64).to_le_bytes()); let length_bytes = (me.0.len() as u64).to_le_bytes();
buf.extend(me.0.as_bytes()); let me_bytes = me.0.as_bytes();
let mut checksum = SCrc64::new();
checksum.update(&length_bytes);
checksum.update(&me_bytes);
buf.extend(&(checksum.finish().to_le_bytes())); // checksum
buf.extend(&(me.0.len() as u64).to_le_bytes()); // length
buf.extend(me.0.as_bytes()); // payload
}, },
DbEventPop as 1, DbEventPop as 1,
DbEventClear as 2 DbEventClear as 2
@ -161,19 +172,25 @@ impl RawJournalAdapter for SimpleDBJournal {
) -> RuntimeResult<()> { ) -> RuntimeResult<()> {
match meta { match meta {
EventMeta::NewKey => { EventMeta::NewKey => {
let key_size = u64::from_le_bytes(file.read_block()?) as usize; let checksum = u64::from_le_bytes(file.read_block()?);
let mut keybuf = Vec::<u8>::new(); let length = u64::from_le_bytes(file.read_block()?) as usize;
if keybuf.try_reserve_exact(key_size as usize).is_err() { let mut payload = Vec::<u8>::new();
if length > SANE_MEM_LIMIT_BYTES
|| payload.try_reserve_exact(length as usize).is_err()
{
return Err(StorageError::RawJournalDecodeEventCorruptedPayload.into()); return Err(StorageError::RawJournalDecodeEventCorruptedPayload.into());
} }
unsafe { unsafe {
keybuf.as_mut_ptr().write_bytes(0, key_size); payload.as_mut_ptr().write_bytes(0, length);
keybuf.set_len(key_size); payload.set_len(length);
} }
file.tracked_read(&mut keybuf)?; file.tracked_read(&mut payload)?;
match String::from_utf8(keybuf) { let mut this_checksum = SCrc64::new();
Ok(k) => gs.data.borrow_mut().push(k), this_checksum.update(&length.to_le_bytes());
Err(_) => { this_checksum.update(&payload);
match String::from_utf8(payload) {
Ok(k) if this_checksum.finish() == checksum => gs.data.borrow_mut().push(k),
Err(_) | Ok(_) => {
return Err(StorageError::RawJournalDecodeEventCorruptedPayload.into()) return Err(StorageError::RawJournalDecodeEventCorruptedPayload.into())
} }
} }

@ -52,9 +52,14 @@ use {
std::{collections::BTreeMap, io::ErrorKind as IoErrorKind, ops::Range}, std::{collections::BTreeMap, io::ErrorKind as IoErrorKind, ops::Range},
}; };
/// the number of events that we want to usually emulate for
const TRIALS: usize = 100; const TRIALS: usize = 100;
/// See impl of [`apply_event_mix`]. We remove every 10th element
const POST_TRIALS_SIZE: usize = TRIALS - (TRIALS / 10); const POST_TRIALS_SIZE: usize = TRIALS - (TRIALS / 10);
/// a test key for single events
const KEY: &str = concat!("1234567890-0987654321");
/// The initializer for a corruption test case
struct Initializer { struct Initializer {
journal_id: &'static str, journal_id: &'static str,
initializer_fn: fn(&str) -> RuntimeResult<InitializerInfo>, initializer_fn: fn(&str) -> RuntimeResult<InitializerInfo>,
@ -62,6 +67,7 @@ struct Initializer {
} }
#[derive(Debug)] #[derive(Debug)]
/// Information about the modified journal generated by an [`Initializer`]
struct ModifiedJournalInfo { struct ModifiedJournalInfo {
init: InitializerInfo, init: InitializerInfo,
_storage: ModifiedJournalStorageInfo, _storage: ModifiedJournalStorageInfo,
@ -83,12 +89,14 @@ impl ModifiedJournalInfo {
} }
#[derive(Debug, Clone, Copy)] #[derive(Debug, Clone, Copy)]
/// Information about the initial state of a "good journal". Generated from [`Initializer`]
struct InitializerInfo { struct InitializerInfo {
corrupted_event_id: u64, corrupted_event_id: u64,
last_executed_event_id: u64, last_executed_event_id: u64,
} }
impl InitializerInfo { impl InitializerInfo {
/// The initializer only creates one event
fn new_last_event(last_event_id: u64) -> Self { fn new_last_event(last_event_id: u64) -> Self {
Self::new(last_event_id, last_event_id) Self::new(last_event_id, last_event_id)
} }
@ -98,6 +106,7 @@ impl InitializerInfo {
last_executed_event_id, last_executed_event_id,
} }
} }
/// Returns true if the initializer created multiple events (and not a single event)
fn not_last_event(&self) -> bool { fn not_last_event(&self) -> bool {
self.corrupted_event_id != self.last_executed_event_id self.corrupted_event_id != self.last_executed_event_id
} }
@ -120,8 +129,12 @@ impl Initializer {
} }
} }
fn make_corrupted_file_name(journal_id: &str, trim_size: usize) -> String {
format!("{journal_id}-trimmed-{trim_size}.db")
}
#[derive(Debug)] #[derive(Debug)]
#[allow(unused)] /// Information about the layout of the modified journal
struct ModifiedJournalStorageInfo { struct ModifiedJournalStorageInfo {
original_file_size: usize, original_file_size: usize,
modified_file_size: usize, modified_file_size: usize,
@ -142,6 +155,13 @@ impl ModifiedJournalStorageInfo {
} }
} }
/**
Emulate a sequentially varying corruption.
- The initializer creates a modified journal and provides information about it
- We go over each initializer and then enumerate a bunch of corruption test cases.
- Generally, we take the size of the event, n, (it isn't necessary that it's a static size but
should atleast be computable/traced somehow) and then shave off 1 bit, followed by upto n bytes
*/
fn emulate_sequentially_varying_single_corruption( fn emulate_sequentially_varying_single_corruption(
initializers: impl IntoIterator<Item = Initializer>, initializers: impl IntoIterator<Item = Initializer>,
modified_journal_generator_fn: impl Fn( modified_journal_generator_fn: impl Fn(
@ -190,7 +210,7 @@ fn emulate_sequentially_varying_single_corruption(
// now trim and repeat // now trim and repeat
for trim_size in 1..=last_event_size { for trim_size in 1..=last_event_size {
// create a copy of the "good" journal and corrupt it // create a copy of the "good" journal and corrupt it
let corrupted_journal_path = format!("{journal_id}-trimmed-{trim_size}.db"); let corrupted_journal_path = make_corrupted_file_name(journal_id, trim_size);
let open_journal_fn = |db: &SimpleDB| { let open_journal_fn = |db: &SimpleDB| {
open_journal::<SimpleDBJournal>( open_journal::<SimpleDBJournal>(
&corrupted_journal_path, &corrupted_journal_path,
@ -207,6 +227,11 @@ fn emulate_sequentially_varying_single_corruption(
&original_offsets, &original_offsets,
) )
.unwrap(); .unwrap();
assert_ne!(storage_info.corruption_range.len(), 0);
assert_ne!(
storage_info.modified_file_size,
storage_info.original_file_size
);
let modified_journal_info = let modified_journal_info =
ModifiedJournalInfo::new(initializer_info, storage_info, initializer_id); ModifiedJournalInfo::new(initializer_info, storage_info, initializer_id);
// now let the caller handle any post corruption work // now let the caller handle any post corruption work
@ -245,10 +270,15 @@ fn emulate_sequentially_varying_single_corruption(
repaired_journal_reopen_result, repaired_journal_reopen_result,
); );
} }
// we're done, delete the corrupted journal
FileSystem::remove_file(&corrupted_journal_path).unwrap();
} }
// delete the good journal, we're done with this one as well
FileSystem::remove_file(journal_id).unwrap();
} }
} }
/// In this emulation, we sequentially corrupt the last event across multiple trials
fn emulate_final_event_corruption( fn emulate_final_event_corruption(
initializers: impl IntoIterator<Item = Initializer>, initializers: impl IntoIterator<Item = Initializer>,
post_corruption_handler: impl Fn( post_corruption_handler: impl Fn(
@ -285,6 +315,7 @@ fn emulate_final_event_corruption(
) )
} }
/// In this emulation, we sequentially corrupt an intermediary event across multiple trials
fn emulate_midway_corruption( fn emulate_midway_corruption(
initializers: impl IntoIterator<Item = Initializer>, initializers: impl IntoIterator<Item = Initializer>,
post_corruption_handler: impl Fn( post_corruption_handler: impl Fn(
@ -341,10 +372,14 @@ fn emulate_midway_corruption(
) )
} }
/// Format a key as a string (padded to six bytes)
fn keyfmt(num: usize) -> String { fn keyfmt(num: usize) -> String {
format!("key-{num:06}") format!("key-{num:06}")
} }
/// Apply an event mix
/// - Add [`TRIALS`] count of elements
/// - Remove every 10th element
fn apply_event_mix(jrnl: &mut RawJournalWriter<SimpleDBJournal>) -> RuntimeResult<u64> { fn apply_event_mix(jrnl: &mut RawJournalWriter<SimpleDBJournal>) -> RuntimeResult<u64> {
let mut op_count = 0; let mut op_count = 0;
let mut sdb = SimpleDB::new(); let mut sdb = SimpleDB::new();
@ -362,22 +397,34 @@ fn apply_event_mix(jrnl: &mut RawJournalWriter<SimpleDBJournal>) -> RuntimeResul
#[test] #[test]
fn corruption_before_close() { fn corruption_before_close() {
let initializers = vec![ let initializers = [
// open and close
Initializer::new_driver_type("close_event_corruption_empty.db", |jrnl_id| {
let mut jrnl = create_journal::<SimpleDBJournal>(jrnl_id)?;
RawJournalWriter::close_driver(&mut jrnl)?;
Ok(InitializerInfo::new_last_event(0))
}),
// open, apply mix of events, close
Initializer::new_driver_type("close_event_corruption.db", |jrnl_id| {
let mut jrnl = create_journal::<SimpleDBJournal>(jrnl_id)?;
let operation_count = apply_event_mix(&mut jrnl)?;
RawJournalWriter::close_driver(&mut jrnl)?;
Ok(InitializerInfo::new_last_event(operation_count))
}),
// open, close, reinit, close
Initializer::new_driver_type( Initializer::new_driver_type(
/*
in this case we: create, close (0), corrupt close (0)
*/
"close_event_corruption_empty.db",
|jrnl_id| {
let mut jrnl = create_journal::<SimpleDBJournal>(jrnl_id)?;
RawJournalWriter::close_driver(&mut jrnl)?;
Ok(InitializerInfo::new_last_event(0))
},
),
Initializer::new_driver_type(
/*
in this case we: create, apply events ([0,99]), close (100). corrupt close (100). expect no data loss.
*/
"close_event_corruption.db",
|jrnl_id| {
let mut jrnl = create_journal::<SimpleDBJournal>(jrnl_id)?;
let operation_count = apply_event_mix(&mut jrnl)?;
RawJournalWriter::close_driver(&mut jrnl)?;
Ok(InitializerInfo::new_last_event(operation_count))
},
),
Initializer::new_driver_type(
/*
in this case we: create, close (0), reopen(1), close(2). corrupt last close (2)
*/
"close_event_corruption_open_close_open_close.db", "close_event_corruption_open_close_open_close.db",
|jrnl_id| { |jrnl_id| {
// open and close // open and close
@ -408,7 +455,7 @@ fn corruption_before_close() {
assert_eq!( assert_eq!(
db.data().len(), db.data().len(),
0, 0,
"failed at {trim_size} for journal {journal_id}" "failed at trim_size {trim_size} for journal {journal_id}"
); );
assert_eq!( assert_eq!(
trace, trace,
@ -424,18 +471,18 @@ fn corruption_before_close() {
assert_eq!( assert_eq!(
db.data().len(), db.data().len(),
POST_TRIALS_SIZE, POST_TRIALS_SIZE,
"failed at {trim_size} for journal {journal_id}" "failed at trim_size {trim_size} for journal {journal_id}"
); );
assert_eq!( assert_eq!(
*db.data().last().unwrap(), *db.data().last().unwrap(),
keyfmt(TRIALS - 1), keyfmt(TRIALS - 1),
"failed at {trim_size} for journal {journal_id}" "failed at trim_size {trim_size} for journal {journal_id}"
); );
} else { } else {
assert_eq!( assert_eq!(
db.data().len(), db.data().len(),
0, 0,
"failed at {trim_size} for journal {journal_id}" "failed at trim_size {trim_size} for journal {journal_id}"
); );
} }
assert_eq!( assert_eq!(
@ -451,7 +498,7 @@ fn corruption_before_close() {
assert_eq!( assert_eq!(
db.data().len(), db.data().len(),
0, 0,
"failed at {trim_size} for journal {journal_id}" "failed at trim_size {trim_size} for journal {journal_id}"
); );
assert_eq!( assert_eq!(
trace, trace,
@ -471,18 +518,18 @@ fn corruption_before_close() {
assert_eq!( assert_eq!(
db.data().len(), db.data().len(),
POST_TRIALS_SIZE, POST_TRIALS_SIZE,
"failed at {trim_size} for journal {journal_id}" "failed at trim_size {trim_size} for journal {journal_id}"
); );
assert_eq!( assert_eq!(
*db.data().last().unwrap(), *db.data().last().unwrap(),
keyfmt(TRIALS - 1), keyfmt(TRIALS - 1),
"failed at {trim_size} for journal {journal_id}" "failed at trim_size {trim_size} for journal {journal_id}"
); );
} else { } else {
assert_eq!( assert_eq!(
db.data().len(), db.data().len(),
0, 0,
"failed at {trim_size} for journal {journal_id}" "failed at trim_size {trim_size} for journal {journal_id}"
); );
} }
assert_eq!( assert_eq!(
@ -506,7 +553,9 @@ fn corruption_before_close() {
}, },
|journal_id, modified_journal_info, trim_size, repair_result, db, reopen_result| { |journal_id, modified_journal_info, trim_size, repair_result, db, reopen_result| {
assert_eq!( assert_eq!(
repair_result.unwrap(), repair_result.expect(&format!(
"failed at trim_size {trim_size} for journal {journal_id}"
)),
RepairResult::UnspecifiedLoss((DriverEvent::FULL_EVENT_SIZE - trim_size) as _), RepairResult::UnspecifiedLoss((DriverEvent::FULL_EVENT_SIZE - trim_size) as _),
"failed at trim_size {trim_size} for journal {journal_id}" "failed at trim_size {trim_size} for journal {journal_id}"
); );
@ -516,22 +565,24 @@ fn corruption_before_close() {
assert_eq!( assert_eq!(
db.data().len(), db.data().len(),
0, 0,
"failed at {trim_size} for journal {journal_id}" "failed at trim_size {trim_size} for journal {journal_id}"
); );
} else { } else {
// in the second case, we apply the event mix so we need to check this // in the second case, we apply the event mix so we need to check this
assert_eq!( assert_eq!(
db.data().len(), db.data().len(),
POST_TRIALS_SIZE, POST_TRIALS_SIZE,
"failed at {trim_size} for journal {journal_id}" "failed at trim_size {trim_size} for journal {journal_id}"
); );
assert_eq!( assert_eq!(
*db.data().last().unwrap(), *db.data().last().unwrap(),
keyfmt(TRIALS - 1), keyfmt(TRIALS - 1),
"failed at {trim_size} for journal {journal_id}" "failed at trim_size {trim_size} for journal {journal_id}"
); );
} }
let _ = reopen_result.unwrap(); let _ = reopen_result.expect(&format!(
"failed at trim_size {trim_size} for journal {journal_id}"
));
// clear trace // clear trace
let _ = debug_get_trace(); let _ = debug_get_trace();
let _ = debug_get_offsets(); let _ = debug_get_offsets();
@ -541,24 +592,44 @@ fn corruption_before_close() {
#[test] #[test]
fn corruption_after_reopen() { fn corruption_after_reopen() {
let initializers = vec![ let initializers = [
Initializer::new_driver_type("corruption_after_reopen.db", |jrnl_id| { Initializer::new_driver_type(
let mut jrnl = create_journal::<SimpleDBJournal>(jrnl_id)?; /*
RawJournalWriter::close_driver(&mut jrnl)?; in this case we: create, close (0), reopen(1). corrupt reopen (1)
drop(jrnl); */
// reopen, but don't close "corruption_after_reopen.db",
open_journal::<SimpleDBJournal>(jrnl_id, &SimpleDB::new(), JournalSettings::default())?; |jrnl_id| {
Ok(InitializerInfo::new_last_event(1)) let mut jrnl = create_journal::<SimpleDBJournal>(jrnl_id)?;
}), RawJournalWriter::close_driver(&mut jrnl)?;
Initializer::new_driver_type("corruption_after_ropen_multi_before_close.db", |jrnl_id| { drop(jrnl);
let mut jrnl = create_journal::<SimpleDBJournal>(jrnl_id)?; // reopen, but don't close
let operation_count = apply_event_mix(&mut jrnl)?; open_journal::<SimpleDBJournal>(
RawJournalWriter::close_driver(&mut jrnl)?; jrnl_id,
drop(jrnl); &SimpleDB::new(),
// reopen, but don't close JournalSettings::default(),
open_journal::<SimpleDBJournal>(jrnl_id, &SimpleDB::new(), JournalSettings::default())?; )?;
Ok(InitializerInfo::new_last_event(operation_count + 1)) // + 1 since we have the reopen event which is the next event that'll vanish Ok(InitializerInfo::new_last_event(1))
}), },
),
Initializer::new_driver_type(
/*
in this case we: create, apply events([0,99]), close (100), reopen(101). corrupt reopen (101). expect no data loss.
*/
"corruption_after_ropen_multi_before_close.db",
|jrnl_id| {
let mut jrnl = create_journal::<SimpleDBJournal>(jrnl_id)?;
let operation_count = apply_event_mix(&mut jrnl)?;
RawJournalWriter::close_driver(&mut jrnl)?;
drop(jrnl);
// reopen, but don't close
open_journal::<SimpleDBJournal>(
jrnl_id,
&SimpleDB::new(),
JournalSettings::default(),
)?;
Ok(InitializerInfo::new_last_event(operation_count + 1)) // + 1 since we have the reopen event which is the next event that'll vanish
},
),
]; ];
emulate_final_event_corruption( emulate_final_event_corruption(
initializers, initializers,
@ -574,14 +645,15 @@ fn corruption_after_reopen() {
will potentially introduce more bugs due to increased complexity. Get a good filesystem and disk controller (that attaches checksums to sectors)! will potentially introduce more bugs due to increased complexity. Get a good filesystem and disk controller (that attaches checksums to sectors)!
-- @ohsayan -- @ohsayan
*/ */
let mut jrnl = let mut jrnl = open_result.expect(&format!(
open_result.expect(&format!("failed at {trim_size} for journal {journal_id}")); "failed at trim_size {trim_size} for journal {journal_id}"
));
if modified_journal_info.init.last_executed_event_id == 1 { if modified_journal_info.init.last_executed_event_id == 1 {
// empty log, only the reopen // empty log, only the reopen
assert_eq!( assert_eq!(
db.data().len(), db.data().len(),
0, 0,
"failed at {trim_size} for journal {journal_id}" "failed at trim_size {trim_size} for journal {journal_id}"
); );
assert_eq!( assert_eq!(
trace, trace,
@ -603,19 +675,19 @@ fn corruption_after_reopen() {
JournalWriterTraceEvent::DriverEventCompleted, JournalWriterTraceEvent::DriverEventCompleted,
JournalWriterTraceEvent::ReinitializeComplete, JournalWriterTraceEvent::ReinitializeComplete,
], ],
"failed at {trim_size} for journal {journal_id}" "failed at trim_size {trim_size} for journal {journal_id}"
); );
} else { } else {
// we will have upto the last event since only the reopen is gone // we will have upto the last event since only the reopen is gone
assert_eq!( assert_eq!(
db.data().len(), db.data().len(),
POST_TRIALS_SIZE, POST_TRIALS_SIZE,
"failed at {trim_size} for journal {journal_id}" "failed at trim_size {trim_size} for journal {journal_id}"
); );
assert_eq!( assert_eq!(
*db.data().last().unwrap(), *db.data().last().unwrap(),
keyfmt(TRIALS - 1), keyfmt(TRIALS - 1),
"failed at {trim_size} for journal {journal_id}" "failed at trim_size {trim_size} for journal {journal_id}"
); );
assert_eq!( assert_eq!(
&trace[trace.len() - 12..], &trace[trace.len() - 12..],
@ -639,7 +711,7 @@ fn corruption_after_reopen() {
JournalWriterTraceEvent::DriverEventCompleted, JournalWriterTraceEvent::DriverEventCompleted,
JournalWriterTraceEvent::ReinitializeComplete JournalWriterTraceEvent::ReinitializeComplete
], ],
"failed at {trim_size} for journal {journal_id}" "failed at trim_size {trim_size} for journal {journal_id}"
) )
} }
// now close this so that this works with the post repair handler // now close this so that this works with the post repair handler
@ -650,14 +722,14 @@ fn corruption_after_reopen() {
assert_eq!( assert_eq!(
open_result.unwrap_err().kind(), open_result.unwrap_err().kind(),
&ErrorKind::IoError(IoErrorKind::UnexpectedEof.into()), &ErrorKind::IoError(IoErrorKind::UnexpectedEof.into()),
"failed at {trim_size} for journal {journal_id}" "failed at trim_size {trim_size} for journal {journal_id}"
); );
if modified_journal_info.init.last_executed_event_id == 1 { if modified_journal_info.init.last_executed_event_id == 1 {
// empty log, only the reopen // empty log, only the reopen
assert_eq!( assert_eq!(
db.data().len(), db.data().len(),
0, 0,
"failed at {trim_size} for journal {journal_id}" "failed at trim_size {trim_size} for journal {journal_id}"
); );
assert_eq!( assert_eq!(
trace, trace,
@ -673,7 +745,7 @@ fn corruption_after_reopen() {
modified_journal_info.init.corrupted_event_id modified_journal_info.init.corrupted_event_id
) )
], ],
"failed at {trim_size} for journal {journal_id}" "failed at trim_size {trim_size} for journal {journal_id}"
); );
} else { } else {
// we will have upto the last event since only the reopen is gone // we will have upto the last event since only the reopen is gone
@ -690,7 +762,7 @@ fn corruption_after_reopen() {
modified_journal_info.init.corrupted_event_id modified_journal_info.init.corrupted_event_id
) )
], ],
"failed at {trim_size} for journal {journal_id}" "failed at trim_size {trim_size} for journal {journal_id}"
); );
} }
} }
@ -700,24 +772,28 @@ fn corruption_after_reopen() {
if trim_size == DriverEvent::FULL_EVENT_SIZE { if trim_size == DriverEvent::FULL_EVENT_SIZE {
// see earlier comment // see earlier comment
assert_eq!( assert_eq!(
repair_result.unwrap(), repair_result.expect(&format!(
"failed at trim_size {trim_size} for journal {journal_id}"
)),
RepairResult::NoErrors, RepairResult::NoErrors,
"failed at {trim_size} for journal {journal_id}" "failed at trim_size {trim_size} for journal {journal_id}"
); );
} else { } else {
assert_eq!( assert_eq!(
repair_result.unwrap(), repair_result.expect(&format!(
"failed at trim_size {trim_size} for journal {journal_id}"
)),
RepairResult::UnspecifiedLoss( RepairResult::UnspecifiedLoss(
(DriverEvent::FULL_EVENT_SIZE - trim_size) as u64 (DriverEvent::FULL_EVENT_SIZE - trim_size) as u64
), ),
"failed at {trim_size} for journal {journal_id}" "failed at trim_size {trim_size} for journal {journal_id}"
); );
} }
if modified_journal_info.init.last_executed_event_id == 1 { if modified_journal_info.init.last_executed_event_id == 1 {
assert_eq!( assert_eq!(
db.data().len(), db.data().len(),
0, 0,
"failed at {trim_size} for journal {journal_id}" "failed at trim_size {trim_size} for journal {journal_id}"
); );
} else { } else {
assert_eq!(db.data().len(), POST_TRIALS_SIZE); assert_eq!(db.data().len(), POST_TRIALS_SIZE);
@ -732,7 +808,6 @@ fn corruption_after_reopen() {
#[test] #[test]
fn corruption_at_runtime() { fn corruption_at_runtime() {
// first get the offsets to compute the size of the event // first get the offsets to compute the size of the event
const KEY: &str = "hello, universe";
let offset = { let offset = {
debug_set_offset_tracking(true); debug_set_offset_tracking(true);
let mut sdb = SimpleDB::new(); let mut sdb = SimpleDB::new();
@ -745,8 +820,14 @@ fn corruption_at_runtime() {
let _ = debug_get_trace(); let _ = debug_get_trace();
ret ret
}; };
let initializers = vec![ let initializers = [
Initializer::new( Initializer::new(
/*
for this one we:
- PRC1: we create and apply one event (0)
exepct data loss (0).
*/
"corruption_at_runtime_open_commit_corrupt", "corruption_at_runtime_open_commit_corrupt",
|jrnl_id| { |jrnl_id| {
let mut sdb = SimpleDB::new(); let mut sdb = SimpleDB::new();
@ -758,6 +839,11 @@ fn corruption_at_runtime() {
offset, offset,
), ),
Initializer::new( Initializer::new(
/*
for this one we:
- PRC1: we create and apply events ([0,99])
expect data loss (99)
*/
"corruption_at_runtime_open_multi_commit_then_corrupt", "corruption_at_runtime_open_multi_commit_then_corrupt",
|jrnl_id| { |jrnl_id| {
let mut op_count = 0; let mut op_count = 0;
@ -870,7 +956,9 @@ fn corruption_at_runtime() {
|journal_id, modified_journal_info, trim_size, repair_result, db, reopen_result| { |journal_id, modified_journal_info, trim_size, repair_result, db, reopen_result| {
assert!(reopen_result.is_ok()); assert!(reopen_result.is_ok());
assert_eq!( assert_eq!(
repair_result.unwrap(), repair_result.expect(&format!(
"failed at trim_size {trim_size} for journal {journal_id}"
)),
RepairResult::UnspecifiedLoss((offset - trim_size) as u64), RepairResult::UnspecifiedLoss((offset - trim_size) as u64),
"failed for journal {journal_id} with trim_size {trim_size}" "failed for journal {journal_id} with trim_size {trim_size}"
); );
@ -897,10 +985,24 @@ fn corruption_at_runtime() {
) )
} }
/*
midway corruption tests
---
while in the prior tests we tested cases where the last event was corrupted, we now test cases where some middle
portion of the journal gets corrupted. the trouble is that we'll have to enumerate all cases for generated traces...
which is absolutely not feasible. Instead, we just ensure that the pre and post states are valid.
*/
#[test] #[test]
fn midway_corruption_close() { fn midway_corruption_close() {
let initializers = vec![ let initializers = [
Initializer::new_driver_type("midway_corruption_close_direct", |jrnl_id| { Initializer::new_driver_type("midway_corruption_close_direct", |jrnl_id| {
/*
in this test corruption case we:
- PR cycle 1: create and close (0)
- PR cycle 2: open (1) and close (2)
we emulate a sequential corruption case for (0)
*/
// create and close // create and close
let mut jrnl = create_journal::<SimpleDBJournal>(jrnl_id)?; let mut jrnl = create_journal::<SimpleDBJournal>(jrnl_id)?;
RawJournalWriter::close_driver(&mut jrnl)?; RawJournalWriter::close_driver(&mut jrnl)?;
@ -916,6 +1018,13 @@ fn midway_corruption_close() {
Ok(InitializerInfo::new(0, 2)) // close (to corrupt), reopen, close Ok(InitializerInfo::new(0, 2)) // close (to corrupt), reopen, close
}), }),
Initializer::new_driver_type( Initializer::new_driver_type(
/*
in this test case we:
- PR cycle 1: create and close (0)
- PR cycle 2: reopen (1), apply events([2,101]), close (102)
- PR cycle 3: reopen (103), close (104)
we emulate a sequential corruption case for (102). expect all events to persist (<= 101)
*/
"midway_corruption_close_events_before_second_close", "midway_corruption_close_events_before_second_close",
|jrnl_id| { |jrnl_id| {
{ {
@ -947,6 +1056,14 @@ fn midway_corruption_close() {
}, },
), ),
Initializer::new_driver_type( Initializer::new_driver_type(
/*
in this test case:
- PR cycle 1: create and close (0)
- PR cycle 2: reopen (1) and close (2)
- PR cycle 3: reopen(3), apply events([4,103]), close(104)
we emulate a sequential corruption of (2) which results in a catastrophic corruption. expect major
data loss (==TRIALS)
*/
"midway_corruption_close_events_before_third_close", "midway_corruption_close_events_before_third_close",
|jrnl_id| { |jrnl_id| {
{ {
@ -1000,7 +1117,7 @@ fn midway_corruption_close() {
assert_eq!( assert_eq!(
*db.data().last().unwrap(), *db.data().last().unwrap(),
keyfmt(TRIALS - 1), keyfmt(TRIALS - 1),
"failed at {trim_size} for journal {journal_id}" "failed at trim_size {trim_size} for journal {journal_id}"
); );
} }
_ => panic!(), _ => panic!(),
@ -1009,7 +1126,9 @@ fn midway_corruption_close() {
let _ = debug_get_trace(); let _ = debug_get_trace();
}, },
|journal_id, modified_journal_info, trim_size, repair_result, db, reopen_result| { |journal_id, modified_journal_info, trim_size, repair_result, db, reopen_result| {
let _ = reopen_result.unwrap(); let _ = reopen_result.expect(&format!(
"failed at trim_size {trim_size} for journal {journal_id}"
));
match modified_journal_info.initializer_id { match modified_journal_info.initializer_id {
0 | 2 => { 0 | 2 => {
// all data will be lost, so the DB will be empty // all data will be lost, so the DB will be empty
@ -1023,7 +1142,9 @@ fn midway_corruption_close() {
{ {
// the first event was corrupted // the first event was corrupted
assert_eq!( assert_eq!(
repair_result.unwrap(), repair_result.expect(&format!(
"failed at trim_size {trim_size} for journal {journal_id}"
)),
RepairResult::UnspecifiedLoss( RepairResult::UnspecifiedLoss(
((DriverEvent::FULL_EVENT_SIZE * 3) - trim_size) as u64 ((DriverEvent::FULL_EVENT_SIZE * 3) - trim_size) as u64
), ),
@ -1033,7 +1154,7 @@ fn midway_corruption_close() {
// this is a serious midway corruption with major data loss // this is a serious midway corruption with major data loss
let full_log_size = File::open(journal_id).unwrap().f_len().unwrap(); let full_log_size = File::open(journal_id).unwrap().f_len().unwrap();
assert_eq!( assert_eq!(
repair_result.unwrap(), repair_result.expect(&format!("failed at trim_size {trim_size} for journal {journal_id}")),
RepairResult::UnspecifiedLoss( RepairResult::UnspecifiedLoss(
full_log_size full_log_size
- <<SimpleDBJournal as RawJournalAdapter>::Spec as FileSpecV1>::SIZE // account for header - <<SimpleDBJournal as RawJournalAdapter>::Spec as FileSpecV1>::SIZE // account for header
@ -1050,7 +1171,7 @@ fn midway_corruption_close() {
assert_eq!( assert_eq!(
*db.data().last().unwrap(), *db.data().last().unwrap(),
keyfmt(TRIALS - 1), keyfmt(TRIALS - 1),
"failed at {trim_size} for journal {journal_id}" "failed at trim_size {trim_size} for journal {journal_id}"
); );
} }
_ => panic!(), _ => panic!(),
@ -1061,3 +1182,277 @@ fn midway_corruption_close() {
); );
debug_set_offset_tracking(false); debug_set_offset_tracking(false);
} }
#[test]
fn midway_corruption_reopen() {
let initializers = [
Initializer::new(
"midway_corruption_reopen_close_reopen_close",
|jrnl_id| {
/*
for this test case we create and close (0) the journal and in the next power cycle we reopen (1) and close (2) the
journal. we emulate a midway corruption where the reopen (1) gets corrupted.
*/
{
let mut jrnl = create_journal::<SimpleDBJournal>(jrnl_id)?;
RawJournalWriter::close_driver(&mut jrnl)?; // (0)
}
{
let mut jrnl = open_journal::<SimpleDBJournal>(
jrnl_id,
&SimpleDB::new(),
JournalSettings::default(),
)?; // (1) <-- corrupt
RawJournalWriter::close_driver(&mut jrnl)?; // (2)
}
Ok(InitializerInfo::new(1, 2))
},
DriverEvent::FULL_EVENT_SIZE,
),
Initializer::new(
/*
create, apply ([0,99]), close (100). reopen(101), close (102). corrupt (101). expect no data loss
*/
"midway_corruption_reopen_apply_close_reopen_close",
|jrnl_id| {
let op_count;
{
let mut jrnl = create_journal::<SimpleDBJournal>(jrnl_id)?;
op_count = apply_event_mix(&mut jrnl)?;
RawJournalWriter::close_driver(&mut jrnl)?;
}
{
let mut jrnl = open_journal::<SimpleDBJournal>(
jrnl_id,
&SimpleDB::new(),
JournalSettings::default(),
)?;
RawJournalWriter::close_driver(&mut jrnl)?;
}
Ok(InitializerInfo::new((op_count + 1) as u64, 102))
},
DriverEvent::FULL_EVENT_SIZE,
),
Initializer::new(
/*
create, close (0). reopen(1), apply ([2,101]), close (102). corrupt (1). expect full data loss
*/
"midway_corruption_reopen_apply_post_corrupted_reopen",
|jrnl_id| {
{
let mut jrnl = create_journal::<SimpleDBJournal>(jrnl_id)?;
RawJournalWriter::close_driver(&mut jrnl)?;
}
{
let mut jrnl = open_journal::<SimpleDBJournal>(
jrnl_id,
&SimpleDB::new(),
JournalSettings::default(),
)?; // <-- corrupt this one
let _ = apply_event_mix(&mut jrnl)?; // apply mix
RawJournalWriter::close_driver(&mut jrnl)?;
}
Ok(InitializerInfo::new(1, 102))
},
DriverEvent::FULL_EVENT_SIZE,
),
];
debug_set_offset_tracking(true); // we need to track offsets
emulate_midway_corruption(
initializers,
|journal_id, modified_journal_info, trim_size, db, open_result| {
let _ = open_result.unwrap_err();
match modified_journal_info.initializer_id {
0 | 2 => {
assert_eq!(
db.data().len(),
0,
"failed at trim_size {trim_size} for journal {journal_id}"
);
}
1 => {
assert_eq!(
db.data().len(),
POST_TRIALS_SIZE,
"failed at trim_size {trim_size} for journal {journal_id}"
);
assert_eq!(
*db.data().last().unwrap(),
keyfmt(TRIALS - 1),
"failed at trim_size {trim_size} for journal {journal_id}"
)
}
_ => panic!(),
}
let _ = debug_get_trace();
let _ = debug_get_offsets();
},
|journal_id, modified_journal_info, trim_size, repair_result, db, open_result| {
let _ = open_result.expect(&format!(
"failed at trim_size {trim_size} for journal {journal_id}"
));
let _ = repair_result.expect(&format!(
"failed at trim_size {trim_size} for journal {journal_id}"
));
match modified_journal_info.initializer_id {
0 | 2 => {
assert_eq!(
db.data().len(),
0,
"failed at trim_size {trim_size} for journal {journal_id}"
);
}
1 => {
assert_eq!(
db.data().len(),
POST_TRIALS_SIZE,
"failed at trim_size {trim_size} for journal {journal_id}"
);
assert_eq!(
*db.data().last().unwrap(),
keyfmt(TRIALS - 1),
"failed at trim_size {trim_size} for journal {journal_id}"
)
}
_ => panic!(),
}
let _ = debug_get_trace();
let _ = debug_get_offsets();
},
);
debug_set_offset_tracking(false);
}
#[test]
fn midway_corruption_at_runtime() {
debug_set_offset_tracking(true);
// compute offset size
let event_size_fixed_size_key = {
let mut jrnl =
create_journal::<SimpleDBJournal>("midway_corruption_at_runtime_fixed_key").unwrap();
SimpleDB::new().push(&mut jrnl, KEY).unwrap();
let (_, offsets) = (debug_get_trace(), debug_get_offsets());
*offsets.get(&0).unwrap() as usize
- <<SimpleDBJournal as RawJournalAdapter>::Spec as FileSpecV1>::SIZE
};
// compute offset size
let event_size_dynamic_key = {
let mut jrnl =
create_journal::<SimpleDBJournal>("midway_corruption_at_runtime_dynamic_key").unwrap();
SimpleDB::new().push(&mut jrnl, keyfmt(0)).unwrap();
let (_, offsets) = (debug_get_trace(), debug_get_offsets());
*offsets.get(&0).unwrap() as usize
- <<SimpleDBJournal as RawJournalAdapter>::Spec as FileSpecV1>::SIZE
};
let initializers = [
Initializer::new(
/*
open, apply (0), close (1). corrupt (0). expect complete loss of the push event (0).
*/
"midway_corruption_at_runtime_open_server_event_close",
|jrnl_id| {
let mut jrnl = create_journal::<SimpleDBJournal>(jrnl_id)?;
SimpleDB::new().push(&mut jrnl, KEY)?;
RawJournalWriter::close_driver(&mut jrnl)?;
Ok(InitializerInfo::new(0, 1))
},
event_size_fixed_size_key,
),
Initializer::new(
/*
open, apply([0,99]), close (100). corrupt (99). expect complete loss of the last event(99).
*/
"midway_corruption_at_runtime_open_multiserver_event_then_close_corrupt_last",
|jrnl_id| {
let mut jrnl = create_journal::<SimpleDBJournal>(jrnl_id)?;
let mut sdb = SimpleDB::new();
for num in 1..=TRIALS {
sdb.push(&mut jrnl, keyfmt(num))?;
}
RawJournalWriter::close_driver(&mut jrnl)?;
Ok(InitializerInfo::new(TRIALS as u64 - 1, TRIALS as u64))
},
event_size_dynamic_key,
),
Initializer::new(
/*
open, apply([0,99]), close (100). corrupt (0). expect complete loss of all events
*/
"midway_corruption_at_runtime_open_multiserver_event_then_close_corrupt_first",
|jrnl_id| {
let mut jrnl = create_journal::<SimpleDBJournal>(jrnl_id)?;
let mut sdb = SimpleDB::new();
for num in 1..=TRIALS {
sdb.push(&mut jrnl, keyfmt(num))?;
}
RawJournalWriter::close_driver(&mut jrnl)?;
Ok(InitializerInfo::new(0, TRIALS as u64))
},
event_size_dynamic_key,
),
];
emulate_midway_corruption(
initializers,
|journal_id, modified_journal_info, trim_size, db, open_result| {
let _ = open_result.unwrap_err();
let (_, _) = (debug_get_trace(), debug_get_offsets());
match modified_journal_info.initializer_id {
0 | 2 => {
assert_eq!(
db.data().len(),
0,
"failed at trim_size {trim_size} for journal {journal_id}. data={:?}",
db.data()
);
}
1 => {
// expect to have all keys upto TRIALS - 1
assert_eq!(
db.data().len(),
TRIALS - 1,
"failed at trim_size {trim_size} for journal {journal_id}. data={:?}",
db.data()
);
// last key is TRIALS - 1
assert_eq!(*db.data().last().unwrap(), keyfmt(TRIALS - 1));
}
_ => panic!(),
}
},
|journal_id, modified_journal_info, trim_size, repair_result, db, open_result| {
let _ = repair_result.expect(&format!(
"failed at trim_size {trim_size} for journal {journal_id}. file data={:?}. original_data={:?}",
FileSystem::read(&make_corrupted_file_name(journal_id, trim_size)),
FileSystem::read(journal_id),
));
let _ = open_result.expect(&format!(
"failed at trim_size {trim_size} for journal {journal_id}. file data={:?}. original_data={:?}",
FileSystem::read(&make_corrupted_file_name(journal_id, trim_size)),
FileSystem::read(journal_id),
));
match modified_journal_info.initializer_id {
0 | 2 => {
assert_eq!(
db.data().len(),
0,
"failed at trim_size {trim_size} for journal {journal_id}. data={:?}",
db.data()
);
}
1 => {
// expect to have all keys upto TRIALS - 1
assert_eq!(
db.data().len(),
TRIALS - 1,
"failed at trim_size {trim_size} for journal {journal_id}. data={:?}",
db.data()
);
// last key is TRIALS - 1
assert_eq!(*db.data().last().unwrap(), keyfmt(TRIALS - 1));
}
_ => panic!(),
}
let (_, _) = (debug_get_trace(), debug_get_offsets());
},
)
}

Loading…
Cancel
Save