storage: Test different midway corruption scenarios

next
Sayan Nandan 6 months ago
parent 7c45503bc4
commit bb4132a617
No known key found for this signature in database
GPG Key ID: 0EBD769024B24F0A

@ -161,8 +161,15 @@ impl RawJournalAdapter for SimpleDBJournal {
) -> RuntimeResult<()> {
match meta {
EventMeta::NewKey => {
let key_size = u64::from_le_bytes(file.read_block()?);
let mut keybuf = vec![0u8; key_size as usize];
let key_size = u64::from_le_bytes(file.read_block()?) as usize;
let mut keybuf = Vec::<u8>::new();
if keybuf.try_reserve_exact(key_size as usize).is_err() {
return Err(StorageError::RawJournalDecodeEventCorruptedPayload.into());
}
unsafe {
keybuf.as_mut_ptr().write_bytes(0, key_size);
keybuf.set_len(key_size);
}
file.tracked_read(&mut keybuf)?;
match String::from_utf8(keybuf) {
Ok(k) => gs.data.borrow_mut().push(k),

@ -31,7 +31,7 @@ use {
error::ErrorKind,
storage::{
common::{
interface::fs::{File, FileExt, FileSystem, FileWriteExt},
interface::fs::{File, FileExt, FileSystem, FileWrite, FileWriteExt},
sdss::sdss_r1::FileSpecV1,
},
v2::raw::journal::{
@ -49,9 +49,11 @@ use {
},
IoResult,
},
std::io::ErrorKind as IoErrorKind,
std::{collections::BTreeMap, io::ErrorKind as IoErrorKind, ops::Range},
};
const TRIALS: usize = 100;
struct Initializer {
journal_id: &'static str,
initializer_fn: fn(&str) -> RuntimeResult<u64>,
@ -71,19 +73,51 @@ impl Initializer {
}
}
fn create_trimmed_file(from: &str, to: &str, trim_to: u64) -> IoResult<()> {
FileSystem::copy(from, to)?;
let mut f = File::open(to)?;
f.f_truncate(trim_to)
#[derive(Debug)]
#[allow(unused)]
struct ModifiedJournalInfo {
original_file_size: usize,
modified_file_size: usize,
corruption_range: Range<usize>,
}
fn emulate_corrupted_final_event(
impl ModifiedJournalInfo {
fn new(
original_file_size: usize,
modified_file_size: usize,
corruption_range: Range<usize>,
) -> Self {
Self {
original_file_size,
modified_file_size,
corruption_range,
}
}
}
fn emulate_sequentially_varying_single_corruption(
initializers: impl IntoIterator<Item = Initializer>,
post_corruption_handler: impl Fn(&str, u64, usize, RuntimeResult<RawJournalWriter<SimpleDBJournal>>),
modified_journal_generator_fn: impl Fn(
&str,
&str,
u64,
usize,
&BTreeMap<u64, u64>,
) -> IoResult<ModifiedJournalInfo>,
post_corruption_handler: impl Fn(
&str,
u64,
usize,
SimpleDB,
RuntimeResult<RawJournalWriter<SimpleDBJournal>>,
ModifiedJournalInfo,
),
post_repair_handler: impl Fn(
&str,
u64,
usize,
RuntimeResult<RepairResult>,
SimpleDB,
RuntimeResult<RawJournalWriter<SimpleDBJournal>>,
),
) {
@ -102,59 +136,163 @@ fn emulate_corrupted_final_event(
FileSystem::read(journal_id),
),
};
let journal_size = File::open(journal_id).unwrap().f_len().unwrap();
let _ = debug_get_trace();
let _ = debug_get_offsets();
let original_offsets = debug_get_offsets();
// now trim and repeat
for (trim_size, new_size) in (1..=last_event_size)
.rev()
.map(|trim_size| (trim_size, journal_size - trim_size as u64))
{
// create a copy of the "good" journal and trim to simulate data loss
let trimmed_journal_path = format!("{journal_id}-trimmed-{trim_size}.db");
create_trimmed_file(journal_id, &trimmed_journal_path, new_size).unwrap();
// init misc
let simple_db = SimpleDB::new();
let open_journal_fn = || {
for trim_size in 1..=last_event_size {
// create a copy of the "good" journal and corrupt it
let corrupted_journal_path = format!("{journal_id}-trimmed-{trim_size}.db");
let open_journal_fn = |db: &SimpleDB| {
open_journal::<SimpleDBJournal>(
&trimmed_journal_path,
&simple_db,
&corrupted_journal_path,
db,
JournalSettings::default(),
)
};
// now let the caller handle any post corruption work
let open_journal_result = open_journal_fn();
post_corruption_handler(
// modify journal
let mod_stat = modified_journal_generator_fn(
journal_id,
&corrupted_journal_path,
repaired_last_event_id,
trim_size,
open_journal_result,
);
// repair
let repair_result = repair_journal::<SimpleDBJournal>(
&trimmed_journal_path,
&simple_db,
JournalSettings::default(),
JournalRepairMode::Simple,
);
let repaired_journal_reopen_result = open_journal_fn();
// let caller handle any post repair work
post_repair_handler(
journal_id,
trim_size,
repair_result,
repaired_journal_reopen_result,
);
&original_offsets,
)
.unwrap();
// now let the caller handle any post corruption work
{
let sdb = SimpleDB::new();
let open_journal_result = open_journal_fn(&sdb);
post_corruption_handler(
journal_id,
repaired_last_event_id,
trim_size,
sdb,
open_journal_result,
mod_stat,
);
}
// repair and let the caller handle post repair work
{
let sdb = SimpleDB::new();
let repair_result = repair_journal::<SimpleDBJournal>(
&corrupted_journal_path,
&sdb,
JournalSettings::default(),
JournalRepairMode::Simple,
);
let repaired_journal_reopen_result = open_journal_fn(&sdb);
// let caller handle any post repair work
post_repair_handler(
journal_id,
repaired_last_event_id,
trim_size,
repair_result,
sdb,
repaired_journal_reopen_result,
);
}
}
}
}
fn emulate_final_event_corruption(
initializers: impl IntoIterator<Item = Initializer>,
post_corruption_handler: impl Fn(
&str,
u64,
usize,
SimpleDB,
RuntimeResult<RawJournalWriter<SimpleDBJournal>>,
ModifiedJournalInfo,
),
post_repair_handler: impl Fn(
&str,
u64,
usize,
RuntimeResult<RepairResult>,
SimpleDB,
RuntimeResult<RawJournalWriter<SimpleDBJournal>>,
),
) {
emulate_sequentially_varying_single_corruption(
initializers,
|original_journal, modified_journal, _, trim_amount, _offsets| {
FileSystem::copy(original_journal, modified_journal)?;
let mut f = File::open(modified_journal)?;
let real_flen = f.f_len()? as usize;
f.f_truncate((real_flen - trim_amount) as _)?;
Ok(ModifiedJournalInfo::new(
real_flen,
trim_amount,
trim_amount..real_flen,
))
},
post_corruption_handler,
post_repair_handler,
)
}
fn emulate_midway_corruption(
initializers: impl IntoIterator<Item = Initializer>,
post_corruption_handler: impl Fn(
&str,
u64,
usize,
SimpleDB,
RuntimeResult<RawJournalWriter<SimpleDBJournal>>,
ModifiedJournalInfo,
),
post_repair_handler: impl Fn(
&str,
u64,
usize,
RuntimeResult<RepairResult>,
SimpleDB,
RuntimeResult<RawJournalWriter<SimpleDBJournal>>,
),
) {
emulate_sequentially_varying_single_corruption(
initializers,
|original_journal_path,
corrupted_journal_path,
event_to_corrupt,
trim_size,
original_offsets| {
let orig_journal_data = FileSystem::read(original_journal_path)?;
let orig_journal_size = orig_journal_data.len();
let mut f = File::create(corrupted_journal_path)?;
let end_offset = *original_offsets.get(&event_to_corrupt).unwrap() as usize;
// apply
let segment_before_corruption = &orig_journal_data[..end_offset - trim_size];
let segment_after_corruption = &orig_journal_data[end_offset..];
let new_size = segment_before_corruption.len() + segment_after_corruption.len();
assert!(
new_size < orig_journal_size,
"real len is {orig_journal_size} while new len is {new_size}",
);
assert_eq!(
segment_before_corruption.len() + segment_after_corruption.len() + trim_size,
orig_journal_size
);
f.fwrite_all(segment_before_corruption)?;
f.fwrite_all(segment_after_corruption)?;
Ok(ModifiedJournalInfo::new(
orig_journal_size,
new_size,
end_offset - trim_size..end_offset,
))
},
post_corruption_handler,
post_repair_handler,
)
}
fn apply_event_mix(jrnl: &mut RawJournalWriter<SimpleDBJournal>) -> RuntimeResult<u64> {
let mut op_count = 0;
let mut sdb = SimpleDB::new();
for num in 1..=100 {
for num in 1..=TRIALS {
op_count += 1;
sdb.push(jrnl, format!("key-{num}"))?;
sdb.push(jrnl, format!("key-{num:06}"))?;
if num % 10 == 0 {
op_count += 1;
sdb.pop(jrnl)?;
@ -198,9 +336,9 @@ fn corruption_before_close() {
},
),
];
emulate_corrupted_final_event(
emulate_final_event_corruption(
initializers,
|journal_id, repaired_last_event_id, trim_size, open_result| {
|journal_id, repaired_last_event_id, trim_size, _db, open_result, _modstat| {
// open the journal and validate failure
let open_err = open_result.unwrap_err();
let trace = debug_get_trace();
@ -255,7 +393,7 @@ fn corruption_before_close() {
"failed at trim_size {trim_size} for journal {journal_id}"
);
},
|journal_id, trim_size, repair_result, reopen_result| {
|journal_id, _repaired_last_id, trim_size, repair_result, _db, reopen_result| {
assert_eq!(
repair_result.unwrap(),
RepairResult::UnspecifiedLoss((DriverEvent::FULL_EVENT_SIZE - trim_size) as _),
@ -290,9 +428,9 @@ fn corruption_after_reopen() {
Ok(operation_count + 1) // + 1 since we have the reopen event which is the next event that'll vanish
}),
];
emulate_corrupted_final_event(
emulate_final_event_corruption(
initializers,
|journal_id, repaired_last_event_id, trim_size, open_result| {
|journal_id, repaired_last_event_id, trim_size, _db, open_result, _modstat| {
let trace = debug_get_trace();
if trim_size == DriverEvent::FULL_EVENT_SIZE {
/*
@ -391,7 +529,7 @@ fn corruption_after_reopen() {
}
}
},
|journal_id, trim_size, repair_result, reopen_result| {
|journal_id, _repaired_last_id, trim_size, repair_result, _db, reopen_result| {
assert!(reopen_result.is_ok());
if trim_size == DriverEvent::FULL_EVENT_SIZE {
// see earlier comment
@ -448,7 +586,7 @@ fn corruption_at_runtime() {
let mut op_count = 0;
let mut sdb = SimpleDB::new();
let mut jrnl = create_journal(jrnl_id)?;
for _ in 0..100 {
for _ in 0..TRIALS {
sdb.push(&mut jrnl, KEY)?;
op_count += 1;
}
@ -458,9 +596,9 @@ fn corruption_at_runtime() {
offset,
),
];
emulate_corrupted_final_event(
emulate_final_event_corruption(
initializers,
|journal_id, repaired_last_event_id, trim_size, open_result| {
|journal_id, repaired_last_event_id, trim_size, _db, open_result, _modstat| {
let trace = debug_get_trace();
let err = open_result.unwrap_err();
assert_eq!(
@ -518,7 +656,7 @@ fn corruption_at_runtime() {
}
}
},
|journal_id, trim_size, repair_result, reopen_result| {
|journal_id, _repaired_last_id, trim_size, repair_result, _db, reopen_result| {
assert!(reopen_result.is_ok());
assert_eq!(
repair_result.unwrap(),
@ -526,7 +664,109 @@ fn corruption_at_runtime() {
"failed for journal {journal_id} with trim_size {trim_size}"
);
let _ = debug_get_trace();
let _ = debug_get_offsets();
},
)
}
#[test]
fn midway_corruption_close() {
let initializers = vec![
Initializer::new_driver_type("midway_corruption_close_direct", |jrnl_id| {
// create and close
let mut jrnl = create_journal::<SimpleDBJournal>(jrnl_id)?;
RawJournalWriter::close_driver(&mut jrnl)?;
drop(jrnl);
// reopen and close
let mut jrnl = open_journal::<SimpleDBJournal>(
jrnl_id,
&SimpleDB::new(),
JournalSettings::default(),
)?;
RawJournalWriter::close_driver(&mut jrnl)?;
drop(jrnl);
Ok(0) // close (to corrupt), reopen, close
}),
Initializer::new_driver_type(
"midway_corruption_close_events_before_second_close",
|jrnl_id| {
{
// create and close
let mut jrnl = create_journal::<SimpleDBJournal>(jrnl_id)?;
RawJournalWriter::close_driver(&mut jrnl)?;
}
{
// reopen and close
let mut jrnl = open_journal::<SimpleDBJournal>(
jrnl_id,
&SimpleDB::new(),
JournalSettings::default(),
)?;
RawJournalWriter::close_driver(&mut jrnl)?; // <-- corrupt this one
}
{
let mut jrnl = open_journal::<SimpleDBJournal>(
jrnl_id,
&SimpleDB::new(),
JournalSettings::default(),
)?;
let _ = apply_event_mix(&mut jrnl)?;
RawJournalWriter::close_driver(&mut jrnl)?;
}
Ok(2) // corrupt the second close event
},
),
];
debug_set_offset_tracking(true);
emulate_midway_corruption(
initializers,
|journal_id, _last_id, trim_size, db, open_result, _modstat| {
assert!(
open_result.is_err(),
"failed for journal {journal_id} with trim_size {trim_size}"
);
// all data will be lost, so the DB will be empty
assert_eq!(
db.data().len(),
0,
"failed for journal {journal_id} with trim_size {trim_size}"
);
let _ = debug_get_offsets();
let _ = debug_get_trace();
},
|journal_id, last_id, trim_size, repair_result, db, reopen_result| {
let _ = reopen_result.unwrap();
// all data will be lost, so the DB will be empty
assert_eq!(
db.data().len(),
0,
"failed for journal {journal_id} with trim_size {trim_size}"
);
if last_id == 0 {
assert_eq!(
repair_result.unwrap(),
RepairResult::UnspecifiedLoss(
((DriverEvent::FULL_EVENT_SIZE * 3) - trim_size) as u64
),
"failed for journal {journal_id} with trim_size {trim_size}"
);
} else {
// this is a serious midway corruption with major data loss
let full_log_size = File::open(journal_id).unwrap().f_len().unwrap();
assert_eq!(
repair_result.unwrap(),
RepairResult::UnspecifiedLoss(
full_log_size
- <<SimpleDBJournal as RawJournalAdapter>::Spec as FileSpecV1>::SIZE // account for header
as u64
- (DriverEvent::FULL_EVENT_SIZE * 2) as u64 // account for close (0), reopen(1)
- trim_size as u64 // account for trim
),
"failed for journal {journal_id} with trim_size {trim_size}"
);
}
let _ = debug_get_trace();
let _ = debug_get_offsets();
},
);
debug_set_offset_tracking(false);
}

Loading…
Cancel
Save