storage: Test database state post crash and recovery

next
Sayan Nandan 6 months ago
parent bb4132a617
commit af7e567b31
No known key found for this signature in database
GPG Key ID: 0EBD769024B24F0A

@ -27,8 +27,8 @@
use { use {
super::{ super::{
super::{ super::{
create_journal, debug_get_trace, open_journal, DriverEventKind, JournalReaderTraceEvent, create_journal, debug_get_trace, open_journal, DriverEventKind,
JournalSettings, JournalWriterTraceEvent, RawJournalWriter, JournalReaderTraceEvent, JournalSettings, JournalWriterTraceEvent, RawJournalWriter,
}, },
SimpleDB, SimpleDBJournal, SimpleDB, SimpleDBJournal,
}, },

@ -53,35 +53,82 @@ use {
}; };
const TRIALS: usize = 100; const TRIALS: usize = 100;
const POST_TRIALS_SIZE: usize = TRIALS - (TRIALS / 10);
struct Initializer { struct Initializer {
journal_id: &'static str, journal_id: &'static str,
initializer_fn: fn(&str) -> RuntimeResult<u64>, initializer_fn: fn(&str) -> RuntimeResult<InitializerInfo>,
last_event_size: usize, last_event_size: usize,
} }
#[derive(Debug)]
struct ModifiedJournalInfo {
init: InitializerInfo,
_storage: ModifiedJournalStorageInfo,
initializer_id: usize,
}
impl ModifiedJournalInfo {
fn new(
init: InitializerInfo,
storage: ModifiedJournalStorageInfo,
initializer_id: usize,
) -> Self {
Self {
init,
_storage: storage,
initializer_id,
}
}
}
#[derive(Debug, Clone, Copy)]
struct InitializerInfo {
corrupted_event_id: u64,
last_executed_event_id: u64,
}
impl InitializerInfo {
fn new_last_event(last_event_id: u64) -> Self {
Self::new(last_event_id, last_event_id)
}
fn new(corrupted_event_id: u64, last_executed_event_id: u64) -> Self {
Self {
corrupted_event_id,
last_executed_event_id,
}
}
fn not_last_event(&self) -> bool {
self.corrupted_event_id != self.last_executed_event_id
}
}
impl Initializer { impl Initializer {
fn new(name: &'static str, f: fn(&str) -> RuntimeResult<u64>, last_event_size: usize) -> Self { fn new(
name: &'static str,
f: fn(&str) -> RuntimeResult<InitializerInfo>,
last_event_size: usize,
) -> Self {
Self { Self {
journal_id: name, journal_id: name,
initializer_fn: f, initializer_fn: f,
last_event_size, last_event_size,
} }
} }
fn new_driver_type(name: &'static str, f: fn(&str) -> RuntimeResult<u64>) -> Self { fn new_driver_type(name: &'static str, f: fn(&str) -> RuntimeResult<InitializerInfo>) -> Self {
Self::new(name, f, DriverEvent::FULL_EVENT_SIZE) Self::new(name, f, DriverEvent::FULL_EVENT_SIZE)
} }
} }
#[derive(Debug)] #[derive(Debug)]
#[allow(unused)] #[allow(unused)]
struct ModifiedJournalInfo { struct ModifiedJournalStorageInfo {
original_file_size: usize, original_file_size: usize,
modified_file_size: usize, modified_file_size: usize,
corruption_range: Range<usize>, corruption_range: Range<usize>,
} }
impl ModifiedJournalInfo { impl ModifiedJournalStorageInfo {
fn new( fn new(
original_file_size: usize, original_file_size: usize,
modified_file_size: usize, modified_file_size: usize,
@ -100,35 +147,37 @@ fn emulate_sequentially_varying_single_corruption(
modified_journal_generator_fn: impl Fn( modified_journal_generator_fn: impl Fn(
&str, &str,
&str, &str,
u64, &InitializerInfo,
usize, usize,
&BTreeMap<u64, u64>, &BTreeMap<u64, u64>,
) -> IoResult<ModifiedJournalInfo>, ) -> IoResult<ModifiedJournalStorageInfo>,
post_corruption_handler: impl Fn( post_corruption_handler: impl Fn(
&str, &str,
u64, &ModifiedJournalInfo,
usize, usize,
SimpleDB, SimpleDB,
RuntimeResult<RawJournalWriter<SimpleDBJournal>>, RuntimeResult<RawJournalWriter<SimpleDBJournal>>,
ModifiedJournalInfo,
), ),
post_repair_handler: impl Fn( post_repair_handler: impl Fn(
&str, &str,
u64, &ModifiedJournalInfo,
usize, usize,
RuntimeResult<RepairResult>, RuntimeResult<RepairResult>,
SimpleDB, SimpleDB,
RuntimeResult<RawJournalWriter<SimpleDBJournal>>, RuntimeResult<RawJournalWriter<SimpleDBJournal>>,
), ),
) { ) {
for Initializer { for (
journal_id, initializer_id,
initializer_fn, Initializer {
last_event_size, journal_id,
} in initializers initializer_fn,
last_event_size,
},
) in initializers.into_iter().enumerate()
{ {
// initialize journal, get size and clear traces // initialize journal, get size and clear traces
let repaired_last_event_id = match initializer_fn(journal_id) { let initializer_info = match initializer_fn(journal_id) {
Ok(nid) => nid, Ok(nid) => nid,
Err(e) => panic!( Err(e) => panic!(
"failed to initialize {journal_id} due to {e}. trace: {:?}, file_data={:?}", "failed to initialize {journal_id} due to {e}. trace: {:?}, file_data={:?}",
@ -150,41 +199,46 @@ fn emulate_sequentially_varying_single_corruption(
) )
}; };
// modify journal // modify journal
let mod_stat = modified_journal_generator_fn( let storage_info = modified_journal_generator_fn(
journal_id, journal_id,
&corrupted_journal_path, &corrupted_journal_path,
repaired_last_event_id, &initializer_info,
trim_size, trim_size,
&original_offsets, &original_offsets,
) )
.unwrap(); .unwrap();
let modified_journal_info =
ModifiedJournalInfo::new(initializer_info, storage_info, initializer_id);
// now let the caller handle any post corruption work // now let the caller handle any post corruption work
{ {
let sdb = SimpleDB::new(); let sdb = SimpleDB::new();
let open_journal_result = open_journal_fn(&sdb); let open_journal_result = open_journal_fn(&sdb);
post_corruption_handler( post_corruption_handler(
journal_id, journal_id,
repaired_last_event_id, &modified_journal_info,
trim_size, trim_size,
sdb, sdb,
open_journal_result, open_journal_result,
mod_stat,
); );
} }
// repair and let the caller handle post repair work // repair and let the caller handle post repair work
let repair_result;
{ {
let sdb = SimpleDB::new(); let sdb = SimpleDB::new();
let repair_result = repair_journal::<SimpleDBJournal>( repair_result = repair_journal::<SimpleDBJournal>(
&corrupted_journal_path, &corrupted_journal_path,
&sdb, &sdb,
JournalSettings::default(), JournalSettings::default(),
JournalRepairMode::Simple, JournalRepairMode::Simple,
); );
}
{
let sdb = SimpleDB::new();
let repaired_journal_reopen_result = open_journal_fn(&sdb); let repaired_journal_reopen_result = open_journal_fn(&sdb);
// let caller handle any post repair work // let caller handle any post repair work
post_repair_handler( post_repair_handler(
journal_id, journal_id,
repaired_last_event_id, &modified_journal_info,
trim_size, trim_size,
repair_result, repair_result,
sdb, sdb,
@ -199,15 +253,14 @@ fn emulate_final_event_corruption(
initializers: impl IntoIterator<Item = Initializer>, initializers: impl IntoIterator<Item = Initializer>,
post_corruption_handler: impl Fn( post_corruption_handler: impl Fn(
&str, &str,
u64, &ModifiedJournalInfo,
usize, usize,
SimpleDB, SimpleDB,
RuntimeResult<RawJournalWriter<SimpleDBJournal>>, RuntimeResult<RawJournalWriter<SimpleDBJournal>>,
ModifiedJournalInfo,
), ),
post_repair_handler: impl Fn( post_repair_handler: impl Fn(
&str, &str,
u64, &ModifiedJournalInfo,
usize, usize,
RuntimeResult<RepairResult>, RuntimeResult<RepairResult>,
SimpleDB, SimpleDB,
@ -221,7 +274,7 @@ fn emulate_final_event_corruption(
let mut f = File::open(modified_journal)?; let mut f = File::open(modified_journal)?;
let real_flen = f.f_len()? as usize; let real_flen = f.f_len()? as usize;
f.f_truncate((real_flen - trim_amount) as _)?; f.f_truncate((real_flen - trim_amount) as _)?;
Ok(ModifiedJournalInfo::new( Ok(ModifiedJournalStorageInfo::new(
real_flen, real_flen,
trim_amount, trim_amount,
trim_amount..real_flen, trim_amount..real_flen,
@ -236,15 +289,14 @@ fn emulate_midway_corruption(
initializers: impl IntoIterator<Item = Initializer>, initializers: impl IntoIterator<Item = Initializer>,
post_corruption_handler: impl Fn( post_corruption_handler: impl Fn(
&str, &str,
u64, &ModifiedJournalInfo,
usize, usize,
SimpleDB, SimpleDB,
RuntimeResult<RawJournalWriter<SimpleDBJournal>>, RuntimeResult<RawJournalWriter<SimpleDBJournal>>,
ModifiedJournalInfo,
), ),
post_repair_handler: impl Fn( post_repair_handler: impl Fn(
&str, &str,
u64, &ModifiedJournalInfo,
usize, usize,
RuntimeResult<RepairResult>, RuntimeResult<RepairResult>,
SimpleDB, SimpleDB,
@ -255,13 +307,15 @@ fn emulate_midway_corruption(
initializers, initializers,
|original_journal_path, |original_journal_path,
corrupted_journal_path, corrupted_journal_path,
event_to_corrupt, initializer_info,
trim_size, trim_size,
original_offsets| { original_offsets| {
let orig_journal_data = FileSystem::read(original_journal_path)?; let orig_journal_data = FileSystem::read(original_journal_path)?;
let orig_journal_size = orig_journal_data.len(); let orig_journal_size = orig_journal_data.len();
let mut f = File::create(corrupted_journal_path)?; let mut f = File::create(corrupted_journal_path)?;
let end_offset = *original_offsets.get(&event_to_corrupt).unwrap() as usize; let end_offset = *original_offsets
.get(&initializer_info.corrupted_event_id)
.unwrap() as usize;
// apply // apply
let segment_before_corruption = &orig_journal_data[..end_offset - trim_size]; let segment_before_corruption = &orig_journal_data[..end_offset - trim_size];
let segment_after_corruption = &orig_journal_data[end_offset..]; let segment_after_corruption = &orig_journal_data[end_offset..];
@ -276,7 +330,7 @@ fn emulate_midway_corruption(
); );
f.fwrite_all(segment_before_corruption)?; f.fwrite_all(segment_before_corruption)?;
f.fwrite_all(segment_after_corruption)?; f.fwrite_all(segment_after_corruption)?;
Ok(ModifiedJournalInfo::new( Ok(ModifiedJournalStorageInfo::new(
orig_journal_size, orig_journal_size,
new_size, new_size,
end_offset - trim_size..end_offset, end_offset - trim_size..end_offset,
@ -287,17 +341,22 @@ fn emulate_midway_corruption(
) )
} }
fn keyfmt(num: usize) -> String {
format!("key-{num:06}")
}
fn apply_event_mix(jrnl: &mut RawJournalWriter<SimpleDBJournal>) -> RuntimeResult<u64> { fn apply_event_mix(jrnl: &mut RawJournalWriter<SimpleDBJournal>) -> RuntimeResult<u64> {
let mut op_count = 0; let mut op_count = 0;
let mut sdb = SimpleDB::new(); let mut sdb = SimpleDB::new();
for num in 1..=TRIALS { for num in 1..=TRIALS {
op_count += 1; op_count += 1;
sdb.push(jrnl, format!("key-{num:06}"))?; sdb.push(jrnl, keyfmt(num))?;
if num % 10 == 0 { if num % 10 == 0 {
op_count += 1; op_count += 1;
sdb.pop(jrnl)?; sdb.pop(jrnl)?;
} }
} }
assert_eq!(sdb.data().len(), POST_TRIALS_SIZE);
Ok(op_count) Ok(op_count)
} }
@ -308,14 +367,14 @@ fn corruption_before_close() {
Initializer::new_driver_type("close_event_corruption_empty.db", |jrnl_id| { Initializer::new_driver_type("close_event_corruption_empty.db", |jrnl_id| {
let mut jrnl = create_journal::<SimpleDBJournal>(jrnl_id)?; let mut jrnl = create_journal::<SimpleDBJournal>(jrnl_id)?;
RawJournalWriter::close_driver(&mut jrnl)?; RawJournalWriter::close_driver(&mut jrnl)?;
Ok(0) Ok(InitializerInfo::new_last_event(0))
}), }),
// open, apply mix of events, close // open, apply mix of events, close
Initializer::new_driver_type("close_event_corruption.db", |jrnl_id| { Initializer::new_driver_type("close_event_corruption.db", |jrnl_id| {
let mut jrnl = create_journal::<SimpleDBJournal>(jrnl_id)?; let mut jrnl = create_journal::<SimpleDBJournal>(jrnl_id)?;
let operation_count = apply_event_mix(&mut jrnl)?; let operation_count = apply_event_mix(&mut jrnl)?;
RawJournalWriter::close_driver(&mut jrnl)?; RawJournalWriter::close_driver(&mut jrnl)?;
Ok(operation_count) Ok(InitializerInfo::new_last_event(operation_count))
}), }),
// open, close, reinit, close // open, close, reinit, close
Initializer::new_driver_type( Initializer::new_driver_type(
@ -332,20 +391,25 @@ fn corruption_before_close() {
JournalSettings::default(), JournalSettings::default(),
)?; )?;
RawJournalWriter::close_driver(&mut jrnl)?; RawJournalWriter::close_driver(&mut jrnl)?;
Ok(2) Ok(InitializerInfo::new_last_event(2))
}, },
), ),
]; ];
emulate_final_event_corruption( emulate_final_event_corruption(
initializers, initializers,
|journal_id, repaired_last_event_id, trim_size, _db, open_result, _modstat| { |journal_id, modified_journal_info, trim_size, db, open_result| {
// open the journal and validate failure // open the journal and validate failure
let open_err = open_result.unwrap_err(); let open_err = open_result.unwrap_err();
let trace = debug_get_trace(); let trace = debug_get_trace();
if trim_size > (DriverEvent::FULL_EVENT_SIZE - (sizeof!(u128) + sizeof!(u64))) { if trim_size > (DriverEvent::FULL_EVENT_SIZE - (sizeof!(u128) + sizeof!(u64))) {
// the amount of trim from the end of the file causes us to lose valuable metadata // the amount of trim from the end of the file causes us to lose valuable metadata
if repaired_last_event_id == 0 { if modified_journal_info.init.last_executed_event_id == 0 {
// empty log // empty log
assert_eq!(
db.data().len(),
0,
"failed at {trim_size} for journal {journal_id}"
);
assert_eq!( assert_eq!(
trace, trace,
intovec![ intovec![
@ -355,6 +419,25 @@ fn corruption_before_close() {
"failed at trim_size {trim_size} for journal {journal_id}" "failed at trim_size {trim_size} for journal {journal_id}"
) )
} else { } else {
if modified_journal_info.initializer_id == 1 {
// in the second case, we apply the event mix so we need to check this
assert_eq!(
db.data().len(),
POST_TRIALS_SIZE,
"failed at {trim_size} for journal {journal_id}"
);
assert_eq!(
*db.data().last().unwrap(),
keyfmt(TRIALS - 1),
"failed at {trim_size} for journal {journal_id}"
);
} else {
assert_eq!(
db.data().len(),
0,
"failed at {trim_size} for journal {journal_id}"
);
}
assert_eq!( assert_eq!(
*trace.last().unwrap(), *trace.last().unwrap(),
JournalReaderTraceEvent::LookingForEvent.into(), JournalReaderTraceEvent::LookingForEvent.into(),
@ -363,24 +446,52 @@ fn corruption_before_close() {
} }
} else { } else {
// the amount of trim still allows us to read some metadata // the amount of trim still allows us to read some metadata
if repaired_last_event_id == 0 { if modified_journal_info.init.last_executed_event_id == 0 {
// empty log // empty log
assert_eq!(
db.data().len(),
0,
"failed at {trim_size} for journal {journal_id}"
);
assert_eq!( assert_eq!(
trace, trace,
intovec![ intovec![
JournalReaderTraceEvent::Initialized, JournalReaderTraceEvent::Initialized,
JournalReaderTraceEvent::LookingForEvent, JournalReaderTraceEvent::LookingForEvent,
JournalReaderTraceEvent::AttemptingEvent(repaired_last_event_id), JournalReaderTraceEvent::AttemptingEvent(
modified_journal_info.init.corrupted_event_id
),
JournalReaderTraceEvent::DriverEventExpectingClose, JournalReaderTraceEvent::DriverEventExpectingClose,
], ],
"failed at trim_size {trim_size} for journal {journal_id}" "failed at trim_size {trim_size} for journal {journal_id}"
) )
} else { } else {
if modified_journal_info.initializer_id == 1 {
// in the second case, we apply the event mix so we need to check this
assert_eq!(
db.data().len(),
POST_TRIALS_SIZE,
"failed at {trim_size} for journal {journal_id}"
);
assert_eq!(
*db.data().last().unwrap(),
keyfmt(TRIALS - 1),
"failed at {trim_size} for journal {journal_id}"
);
} else {
assert_eq!(
db.data().len(),
0,
"failed at {trim_size} for journal {journal_id}"
);
}
assert_eq!( assert_eq!(
&trace[trace.len() - 3..], &trace[trace.len() - 3..],
&into_array![ &into_array![
JournalReaderTraceEvent::LookingForEvent, JournalReaderTraceEvent::LookingForEvent,
JournalReaderTraceEvent::AttemptingEvent(repaired_last_event_id), JournalReaderTraceEvent::AttemptingEvent(
modified_journal_info.init.corrupted_event_id
),
JournalReaderTraceEvent::DriverEventExpectingClose JournalReaderTraceEvent::DriverEventExpectingClose
], ],
"failed at trim_size {trim_size} for journal {journal_id}" "failed at trim_size {trim_size} for journal {journal_id}"
@ -393,12 +504,33 @@ fn corruption_before_close() {
"failed at trim_size {trim_size} for journal {journal_id}" "failed at trim_size {trim_size} for journal {journal_id}"
); );
}, },
|journal_id, _repaired_last_id, trim_size, repair_result, _db, reopen_result| { |journal_id, modified_journal_info, trim_size, repair_result, db, reopen_result| {
assert_eq!( assert_eq!(
repair_result.unwrap(), repair_result.unwrap(),
RepairResult::UnspecifiedLoss((DriverEvent::FULL_EVENT_SIZE - trim_size) as _), RepairResult::UnspecifiedLoss((DriverEvent::FULL_EVENT_SIZE - trim_size) as _),
"failed at trim_size {trim_size} for journal {journal_id}" "failed at trim_size {trim_size} for journal {journal_id}"
); );
if modified_journal_info.init.last_executed_event_id == 0
|| modified_journal_info.initializer_id == 2
{
assert_eq!(
db.data().len(),
0,
"failed at {trim_size} for journal {journal_id}"
);
} else {
// in the second case, we apply the event mix so we need to check this
assert_eq!(
db.data().len(),
POST_TRIALS_SIZE,
"failed at {trim_size} for journal {journal_id}"
);
assert_eq!(
*db.data().last().unwrap(),
keyfmt(TRIALS - 1),
"failed at {trim_size} for journal {journal_id}"
);
}
let _ = reopen_result.unwrap(); let _ = reopen_result.unwrap();
// clear trace // clear trace
let _ = debug_get_trace(); let _ = debug_get_trace();
@ -416,7 +548,7 @@ fn corruption_after_reopen() {
drop(jrnl); drop(jrnl);
// reopen, but don't close // reopen, but don't close
open_journal::<SimpleDBJournal>(jrnl_id, &SimpleDB::new(), JournalSettings::default())?; open_journal::<SimpleDBJournal>(jrnl_id, &SimpleDB::new(), JournalSettings::default())?;
Ok(1) Ok(InitializerInfo::new_last_event(1))
}), }),
Initializer::new_driver_type("corruption_after_ropen_multi_before_close.db", |jrnl_id| { Initializer::new_driver_type("corruption_after_ropen_multi_before_close.db", |jrnl_id| {
let mut jrnl = create_journal::<SimpleDBJournal>(jrnl_id)?; let mut jrnl = create_journal::<SimpleDBJournal>(jrnl_id)?;
@ -425,12 +557,12 @@ fn corruption_after_reopen() {
drop(jrnl); drop(jrnl);
// reopen, but don't close // reopen, but don't close
open_journal::<SimpleDBJournal>(jrnl_id, &SimpleDB::new(), JournalSettings::default())?; open_journal::<SimpleDBJournal>(jrnl_id, &SimpleDB::new(), JournalSettings::default())?;
Ok(operation_count + 1) // + 1 since we have the reopen event which is the next event that'll vanish Ok(InitializerInfo::new_last_event(operation_count + 1)) // + 1 since we have the reopen event which is the next event that'll vanish
}), }),
]; ];
emulate_final_event_corruption( emulate_final_event_corruption(
initializers, initializers,
|journal_id, repaired_last_event_id, trim_size, _db, open_result, _modstat| { |journal_id, modified_journal_info, trim_size, db, open_result| {
let trace = debug_get_trace(); let trace = debug_get_trace();
if trim_size == DriverEvent::FULL_EVENT_SIZE { if trim_size == DriverEvent::FULL_EVENT_SIZE {
/* /*
@ -444,8 +576,13 @@ fn corruption_after_reopen() {
*/ */
let mut jrnl = let mut jrnl =
open_result.expect(&format!("failed at {trim_size} for journal {journal_id}")); open_result.expect(&format!("failed at {trim_size} for journal {journal_id}"));
if repaired_last_event_id == 1 { if modified_journal_info.init.last_executed_event_id == 1 {
// empty log, only the reopen // empty log, only the reopen
assert_eq!(
db.data().len(),
0,
"failed at {trim_size} for journal {journal_id}"
);
assert_eq!( assert_eq!(
trace, trace,
intovec![ intovec![
@ -460,7 +597,7 @@ fn corruption_after_reopen() {
JournalWriterTraceEvent::ReinitializeAttempt, JournalWriterTraceEvent::ReinitializeAttempt,
JournalWriterTraceEvent::DriverEventAttemptCommit { JournalWriterTraceEvent::DriverEventAttemptCommit {
event: DriverEventKind::Reopened, event: DriverEventKind::Reopened,
event_id: repaired_last_event_id, event_id: modified_journal_info.init.corrupted_event_id,
prev_id: 0 prev_id: 0
}, },
JournalWriterTraceEvent::DriverEventCompleted, JournalWriterTraceEvent::DriverEventCompleted,
@ -469,12 +606,25 @@ fn corruption_after_reopen() {
"failed at {trim_size} for journal {journal_id}" "failed at {trim_size} for journal {journal_id}"
); );
} else { } else {
// we will have upto the last event since only the reopen is gone
assert_eq!(
db.data().len(),
POST_TRIALS_SIZE,
"failed at {trim_size} for journal {journal_id}"
);
assert_eq!(
*db.data().last().unwrap(),
keyfmt(TRIALS - 1),
"failed at {trim_size} for journal {journal_id}"
);
assert_eq!( assert_eq!(
&trace[trace.len() - 12..], &trace[trace.len() - 12..],
intovec![ intovec![
JournalReaderTraceEvent::ServerEventAppliedSuccess, JournalReaderTraceEvent::ServerEventAppliedSuccess,
JournalReaderTraceEvent::LookingForEvent, JournalReaderTraceEvent::LookingForEvent,
JournalReaderTraceEvent::AttemptingEvent(repaired_last_event_id - 1), // close event JournalReaderTraceEvent::AttemptingEvent(
modified_journal_info.init.corrupted_event_id - 1
), // close event
JournalReaderTraceEvent::DriverEventExpectingClose, JournalReaderTraceEvent::DriverEventExpectingClose,
JournalReaderTraceEvent::DriverEventCompletedBlockRead, JournalReaderTraceEvent::DriverEventCompletedBlockRead,
JournalReaderTraceEvent::DriverEventExpectedCloseGotClose, JournalReaderTraceEvent::DriverEventExpectedCloseGotClose,
@ -483,12 +633,13 @@ fn corruption_after_reopen() {
JournalWriterTraceEvent::ReinitializeAttempt, JournalWriterTraceEvent::ReinitializeAttempt,
JournalWriterTraceEvent::DriverEventAttemptCommit { JournalWriterTraceEvent::DriverEventAttemptCommit {
event: DriverEventKind::Reopened, event: DriverEventKind::Reopened,
event_id: repaired_last_event_id, event_id: modified_journal_info.init.corrupted_event_id,
prev_id: repaired_last_event_id - 1 // close event prev_id: modified_journal_info.init.corrupted_event_id - 1 // close event
}, },
JournalWriterTraceEvent::DriverEventCompleted, JournalWriterTraceEvent::DriverEventCompleted,
JournalWriterTraceEvent::ReinitializeComplete JournalWriterTraceEvent::ReinitializeComplete
] ],
"failed at {trim_size} for journal {journal_id}"
) )
} }
// now close this so that this works with the post repair handler // now close this so that this works with the post repair handler
@ -498,10 +649,16 @@ fn corruption_after_reopen() {
} else { } else {
assert_eq!( assert_eq!(
open_result.unwrap_err().kind(), open_result.unwrap_err().kind(),
&ErrorKind::IoError(IoErrorKind::UnexpectedEof.into()) &ErrorKind::IoError(IoErrorKind::UnexpectedEof.into()),
"failed at {trim_size} for journal {journal_id}"
); );
if repaired_last_event_id == 1 { if modified_journal_info.init.last_executed_event_id == 1 {
// empty log, only the reopen // empty log, only the reopen
assert_eq!(
db.data().len(),
0,
"failed at {trim_size} for journal {journal_id}"
);
assert_eq!( assert_eq!(
trace, trace,
intovec![ intovec![
@ -512,10 +669,16 @@ fn corruption_after_reopen() {
JournalReaderTraceEvent::DriverEventCompletedBlockRead, JournalReaderTraceEvent::DriverEventCompletedBlockRead,
JournalReaderTraceEvent::DriverEventExpectedCloseGotClose, JournalReaderTraceEvent::DriverEventExpectedCloseGotClose,
JournalReaderTraceEvent::DriverEventExpectingReopenBlock, JournalReaderTraceEvent::DriverEventExpectingReopenBlock,
JournalReaderTraceEvent::AttemptingEvent(repaired_last_event_id) JournalReaderTraceEvent::AttemptingEvent(
] modified_journal_info.init.corrupted_event_id
)
],
"failed at {trim_size} for journal {journal_id}"
); );
} else { } else {
// we will have upto the last event since only the reopen is gone
assert_eq!(db.data().len(), POST_TRIALS_SIZE);
assert_eq!(*db.data().last().unwrap(), keyfmt(TRIALS - 1));
assert_eq!( assert_eq!(
&trace[trace.len() - 5..], &trace[trace.len() - 5..],
intovec![ intovec![
@ -523,13 +686,16 @@ fn corruption_after_reopen() {
JournalReaderTraceEvent::DriverEventCompletedBlockRead, JournalReaderTraceEvent::DriverEventCompletedBlockRead,
JournalReaderTraceEvent::DriverEventExpectedCloseGotClose, JournalReaderTraceEvent::DriverEventExpectedCloseGotClose,
JournalReaderTraceEvent::DriverEventExpectingReopenBlock, JournalReaderTraceEvent::DriverEventExpectingReopenBlock,
JournalReaderTraceEvent::AttemptingEvent(repaired_last_event_id) JournalReaderTraceEvent::AttemptingEvent(
] modified_journal_info.init.corrupted_event_id
)
],
"failed at {trim_size} for journal {journal_id}"
); );
} }
} }
}, },
|journal_id, _repaired_last_id, trim_size, repair_result, _db, reopen_result| { |journal_id, modified_journal_info, trim_size, repair_result, db, reopen_result| {
assert!(reopen_result.is_ok()); assert!(reopen_result.is_ok());
if trim_size == DriverEvent::FULL_EVENT_SIZE { if trim_size == DriverEvent::FULL_EVENT_SIZE {
// see earlier comment // see earlier comment
@ -543,9 +709,20 @@ fn corruption_after_reopen() {
repair_result.unwrap(), repair_result.unwrap(),
RepairResult::UnspecifiedLoss( RepairResult::UnspecifiedLoss(
(DriverEvent::FULL_EVENT_SIZE - trim_size) as u64 (DriverEvent::FULL_EVENT_SIZE - trim_size) as u64
) ),
"failed at {trim_size} for journal {journal_id}"
); );
} }
if modified_journal_info.init.last_executed_event_id == 1 {
assert_eq!(
db.data().len(),
0,
"failed at {trim_size} for journal {journal_id}"
);
} else {
assert_eq!(db.data().len(), POST_TRIALS_SIZE);
assert_eq!(*db.data().last().unwrap(), keyfmt(TRIALS - 1));
}
let _ = debug_get_trace(); let _ = debug_get_trace();
let _ = debug_get_offsets(); let _ = debug_get_offsets();
}, },
@ -576,7 +753,7 @@ fn corruption_at_runtime() {
let mut jrnl = create_journal(jrnl_id)?; let mut jrnl = create_journal(jrnl_id)?;
sdb.push(&mut jrnl, KEY)?; sdb.push(&mut jrnl, KEY)?;
// don't close // don't close
Ok(0) Ok(InitializerInfo::new_last_event(0))
}, },
offset, offset,
), ),
@ -586,19 +763,19 @@ fn corruption_at_runtime() {
let mut op_count = 0; let mut op_count = 0;
let mut sdb = SimpleDB::new(); let mut sdb = SimpleDB::new();
let mut jrnl = create_journal(jrnl_id)?; let mut jrnl = create_journal(jrnl_id)?;
for _ in 0..TRIALS { for _ in 1..=TRIALS {
sdb.push(&mut jrnl, KEY)?; sdb.push(&mut jrnl, KEY)?;
op_count += 1; op_count += 1;
} }
// don't close // don't close
Ok(op_count) Ok(InitializerInfo::new_last_event(op_count))
}, },
offset, offset,
), ),
]; ];
emulate_final_event_corruption( emulate_final_event_corruption(
initializers, initializers,
|journal_id, repaired_last_event_id, trim_size, _db, open_result, _modstat| { |journal_id, modified_journal_info, trim_size, db, open_result| {
let trace = debug_get_trace(); let trace = debug_get_trace();
let err = open_result.unwrap_err(); let err = open_result.unwrap_err();
assert_eq!( assert_eq!(
@ -607,7 +784,12 @@ fn corruption_at_runtime() {
"failed for journal {journal_id} with trim_size {trim_size}" "failed for journal {journal_id} with trim_size {trim_size}"
); );
if trim_size > offset - (sizeof!(u128) + sizeof!(u64)) { if trim_size > offset - (sizeof!(u128) + sizeof!(u64)) {
if repaired_last_event_id == 0 { if modified_journal_info.init.last_executed_event_id == 0 {
assert_eq!(
db.data().len(),
0,
"failed for journal {journal_id} with trim_size {trim_size}"
);
assert_eq!( assert_eq!(
trace, trace,
intovec![ intovec![
@ -617,6 +799,17 @@ fn corruption_at_runtime() {
"failed for journal {journal_id} with trim_size {trim_size}" "failed for journal {journal_id} with trim_size {trim_size}"
) )
} else { } else {
// we lost the last server event, so we'll have one key less
assert_eq!(
db.data().len(),
TRIALS - 1,
"failed for journal {journal_id} with trim_size {trim_size}"
);
assert_eq!(
db.data()[TRIALS - 2],
KEY,
"failed for journal {journal_id} with trim_size {trim_size}"
);
assert_eq!( assert_eq!(
&trace[trace.len() - 4..], &trace[trace.len() - 4..],
intovec![ intovec![
@ -629,8 +822,13 @@ fn corruption_at_runtime() {
) )
} }
} else { } else {
if repaired_last_event_id == 0 { if modified_journal_info.init.last_executed_event_id == 0 {
// empty log // empty log
assert_eq!(
db.data().len(),
0,
"failed for journal {journal_id} with trim_size {trim_size}"
);
assert_eq!( assert_eq!(
trace, trace,
intovec![ intovec![
@ -643,11 +841,24 @@ fn corruption_at_runtime() {
"failed for journal {journal_id} with trim_size {trim_size}" "failed for journal {journal_id} with trim_size {trim_size}"
); );
} else { } else {
// we lost the last server event, so we'll have one key less
assert_eq!(
db.data().len(),
TRIALS - 1,
"failed for journal {journal_id} with trim_size {trim_size}"
);
assert_eq!(
db.data()[TRIALS - 2],
KEY,
"failed for journal {journal_id} with trim_size {trim_size}"
);
assert_eq!( assert_eq!(
&trace[trace.len() - 4..], &trace[trace.len() - 4..],
intovec![ intovec![
JournalReaderTraceEvent::LookingForEvent, JournalReaderTraceEvent::LookingForEvent,
JournalReaderTraceEvent::AttemptingEvent(repaired_last_event_id - 1), JournalReaderTraceEvent::AttemptingEvent(
modified_journal_info.init.corrupted_event_id - 1
),
JournalReaderTraceEvent::DetectedServerEvent, JournalReaderTraceEvent::DetectedServerEvent,
JournalReaderTraceEvent::ServerEventMetadataParsed, JournalReaderTraceEvent::ServerEventMetadataParsed,
], ],
@ -656,13 +867,31 @@ fn corruption_at_runtime() {
} }
} }
}, },
|journal_id, _repaired_last_id, trim_size, repair_result, _db, reopen_result| { |journal_id, modified_journal_info, trim_size, repair_result, db, reopen_result| {
assert!(reopen_result.is_ok()); assert!(reopen_result.is_ok());
assert_eq!( assert_eq!(
repair_result.unwrap(), repair_result.unwrap(),
RepairResult::UnspecifiedLoss((offset - trim_size) as u64), RepairResult::UnspecifiedLoss((offset - trim_size) as u64),
"failed for journal {journal_id} with trim_size {trim_size}" "failed for journal {journal_id} with trim_size {trim_size}"
); );
if modified_journal_info.init.last_executed_event_id == 0 {
assert_eq!(
db.data().len(),
0,
"failed for journal {journal_id} with trim_size {trim_size}"
);
} else {
assert_eq!(
db.data().len(),
TRIALS - 1,
"failed for journal {journal_id} with trim_size {trim_size}"
);
assert_eq!(
db.data()[TRIALS - 2],
KEY,
"failed for journal {journal_id} with trim_size {trim_size}"
);
}
let _ = debug_get_trace(); let _ = debug_get_trace();
}, },
) )
@ -684,7 +913,7 @@ fn midway_corruption_close() {
)?; )?;
RawJournalWriter::close_driver(&mut jrnl)?; RawJournalWriter::close_driver(&mut jrnl)?;
drop(jrnl); drop(jrnl);
Ok(0) // close (to corrupt), reopen, close Ok(InitializerInfo::new(0, 2)) // close (to corrupt), reopen, close
}), }),
Initializer::new_driver_type( Initializer::new_driver_type(
"midway_corruption_close_events_before_second_close", "midway_corruption_close_events_before_second_close",
@ -692,7 +921,18 @@ fn midway_corruption_close() {
{ {
// create and close // create and close
let mut jrnl = create_journal::<SimpleDBJournal>(jrnl_id)?; let mut jrnl = create_journal::<SimpleDBJournal>(jrnl_id)?;
RawJournalWriter::close_driver(&mut jrnl)?; RawJournalWriter::close_driver(&mut jrnl)?; // (0)
}
let op_cnt;
{
// reopen, apply mix and close
let mut jrnl = open_journal::<SimpleDBJournal>(
jrnl_id,
&SimpleDB::new(),
JournalSettings::default(),
)?; // (1)
op_cnt = apply_event_mix(&mut jrnl)?;
RawJournalWriter::close_driver(&mut jrnl)?; // <-- (op_cnt + 2) corrupt this one
} }
{ {
// reopen and close // reopen and close
@ -700,69 +940,120 @@ fn midway_corruption_close() {
jrnl_id, jrnl_id,
&SimpleDB::new(), &SimpleDB::new(),
JournalSettings::default(), JournalSettings::default(),
)?; )?; // (op_cnt + 3)
RawJournalWriter::close_driver(&mut jrnl)?; // <-- corrupt this one RawJournalWriter::close_driver(&mut jrnl)?; // (op_cnt + 4)
}
Ok(InitializerInfo::new(op_cnt + 2, op_cnt + 4))
},
),
Initializer::new_driver_type(
"midway_corruption_close_events_before_third_close",
|jrnl_id| {
{
// create and close
let mut jrnl = create_journal::<SimpleDBJournal>(jrnl_id)?;
RawJournalWriter::close_driver(&mut jrnl)?; // (0)
} }
{ {
// reopen and close
let mut jrnl = open_journal::<SimpleDBJournal>( let mut jrnl = open_journal::<SimpleDBJournal>(
jrnl_id, jrnl_id,
&SimpleDB::new(), &SimpleDB::new(),
JournalSettings::default(), JournalSettings::default(),
)?; )?; // (1)
let _ = apply_event_mix(&mut jrnl)?; RawJournalWriter::close_driver(&mut jrnl)?; // <-- (2) corrupt this one
RawJournalWriter::close_driver(&mut jrnl)?;
} }
Ok(2) // corrupt the second close event let op_cnt;
{
let mut jrnl = open_journal::<SimpleDBJournal>(
jrnl_id,
&SimpleDB::new(),
JournalSettings::default(),
)?; // (3)
op_cnt = apply_event_mix(&mut jrnl)?; // (3 + op_count)
RawJournalWriter::close_driver(&mut jrnl)?; // (4 + op_count)
}
Ok(InitializerInfo::new(2, op_cnt + 4)) // corrupt the second close event
}, },
), ),
]; ];
debug_set_offset_tracking(true); debug_set_offset_tracking(true);
emulate_midway_corruption( emulate_midway_corruption(
initializers, initializers,
|journal_id, _last_id, trim_size, db, open_result, _modstat| { |journal_id, modified_journal_info, trim_size, db, open_result| {
assert!( assert!(
open_result.is_err(), open_result.is_err(),
"failed for journal {journal_id} with trim_size {trim_size}" "failed for journal {journal_id} with trim_size {trim_size}"
); );
// all data will be lost, so the DB will be empty match modified_journal_info.initializer_id {
assert_eq!( 0 | 2 => {
db.data().len(), // in the first and third case, (0) no data is present (2) all data is lost
0, // all data will be lost, so the DB will be empty
"failed for journal {journal_id} with trim_size {trim_size}" assert_eq!(
); db.data().len(),
0,
"failed for journal {journal_id} with trim_size {trim_size}"
);
}
1 => {
// in this case, all elements will be preserved
assert_eq!(
*db.data().last().unwrap(),
keyfmt(TRIALS - 1),
"failed at {trim_size} for journal {journal_id}"
);
}
_ => panic!(),
}
let _ = debug_get_offsets(); let _ = debug_get_offsets();
let _ = debug_get_trace(); let _ = debug_get_trace();
}, },
|journal_id, last_id, trim_size, repair_result, db, reopen_result| { |journal_id, modified_journal_info, trim_size, repair_result, db, reopen_result| {
let _ = reopen_result.unwrap(); let _ = reopen_result.unwrap();
// all data will be lost, so the DB will be empty match modified_journal_info.initializer_id {
assert_eq!( 0 | 2 => {
db.data().len(), // all data will be lost, so the DB will be empty
0, assert_eq!(
"failed for journal {journal_id} with trim_size {trim_size}" db.data().len(),
); 0,
if last_id == 0 { "failed for journal {journal_id} with trim_size {trim_size}"
assert_eq!( );
repair_result.unwrap(), if modified_journal_info.init.corrupted_event_id == 0
RepairResult::UnspecifiedLoss( && modified_journal_info.init.not_last_event()
((DriverEvent::FULL_EVENT_SIZE * 3) - trim_size) as u64 {
), // the first event was corrupted
"failed for journal {journal_id} with trim_size {trim_size}" assert_eq!(
); repair_result.unwrap(),
} else { RepairResult::UnspecifiedLoss(
// this is a serious midway corruption with major data loss ((DriverEvent::FULL_EVENT_SIZE * 3) - trim_size) as u64
let full_log_size = File::open(journal_id).unwrap().f_len().unwrap(); ),
assert_eq!( "failed for journal {journal_id} with trim_size {trim_size}"
repair_result.unwrap(), );
RepairResult::UnspecifiedLoss( } else {
full_log_size // this is a serious midway corruption with major data loss
- <<SimpleDBJournal as RawJournalAdapter>::Spec as FileSpecV1>::SIZE // account for header let full_log_size = File::open(journal_id).unwrap().f_len().unwrap();
as u64 assert_eq!(
- (DriverEvent::FULL_EVENT_SIZE * 2) as u64 // account for close (0), reopen(1) repair_result.unwrap(),
- trim_size as u64 // account for trim RepairResult::UnspecifiedLoss(
), full_log_size
"failed for journal {journal_id} with trim_size {trim_size}" - <<SimpleDBJournal as RawJournalAdapter>::Spec as FileSpecV1>::SIZE // account for header
); as u64
- (DriverEvent::FULL_EVENT_SIZE * 2) as u64 // account for close (0), reopen(1)
- trim_size as u64 // account for trim
),
"failed for journal {journal_id} with trim_size {trim_size}"
);
}
}
1 => {
// in this case, all elements will be preserved
assert_eq!(
*db.data().last().unwrap(),
keyfmt(TRIALS - 1),
"failed at {trim_size} for journal {journal_id}"
);
}
_ => panic!(),
} }
let _ = debug_get_trace(); let _ = debug_get_trace();
let _ = debug_get_offsets(); let _ = debug_get_offsets();

Loading…
Cancel
Save