From bb4132a617f5b5d4ed1feb2b0cae525fc4036d6a Mon Sep 17 00:00:00 2001 From: Sayan Nandan Date: Fri, 29 Mar 2024 12:55:33 +0530 Subject: [PATCH] storage: Test different midway corruption scenarios --- .../storage/v2/raw/journal/raw/tests/mod.rs | 11 +- .../v2/raw/journal/raw/tests/recovery.rs | 350 +++++++++++++++--- 2 files changed, 304 insertions(+), 57 deletions(-) diff --git a/server/src/engine/storage/v2/raw/journal/raw/tests/mod.rs b/server/src/engine/storage/v2/raw/journal/raw/tests/mod.rs index b032ebe1..21ba37e5 100644 --- a/server/src/engine/storage/v2/raw/journal/raw/tests/mod.rs +++ b/server/src/engine/storage/v2/raw/journal/raw/tests/mod.rs @@ -161,8 +161,15 @@ impl RawJournalAdapter for SimpleDBJournal { ) -> RuntimeResult<()> { match meta { EventMeta::NewKey => { - let key_size = u64::from_le_bytes(file.read_block()?); - let mut keybuf = vec![0u8; key_size as usize]; + let key_size = u64::from_le_bytes(file.read_block()?) as usize; + let mut keybuf = Vec::::new(); + if keybuf.try_reserve_exact(key_size as usize).is_err() { + return Err(StorageError::RawJournalDecodeEventCorruptedPayload.into()); + } + unsafe { + keybuf.as_mut_ptr().write_bytes(0, key_size); + keybuf.set_len(key_size); + } file.tracked_read(&mut keybuf)?; match String::from_utf8(keybuf) { Ok(k) => gs.data.borrow_mut().push(k), diff --git a/server/src/engine/storage/v2/raw/journal/raw/tests/recovery.rs b/server/src/engine/storage/v2/raw/journal/raw/tests/recovery.rs index 80286b90..ace94d8d 100644 --- a/server/src/engine/storage/v2/raw/journal/raw/tests/recovery.rs +++ b/server/src/engine/storage/v2/raw/journal/raw/tests/recovery.rs @@ -31,7 +31,7 @@ use { error::ErrorKind, storage::{ common::{ - interface::fs::{File, FileExt, FileSystem, FileWriteExt}, + interface::fs::{File, FileExt, FileSystem, FileWrite, FileWriteExt}, sdss::sdss_r1::FileSpecV1, }, v2::raw::journal::{ @@ -49,9 +49,11 @@ use { }, IoResult, }, - std::io::ErrorKind as IoErrorKind, + std::{collections::BTreeMap, io::ErrorKind as IoErrorKind, ops::Range}, }; +const TRIALS: usize = 100; + struct Initializer { journal_id: &'static str, initializer_fn: fn(&str) -> RuntimeResult, @@ -71,19 +73,51 @@ impl Initializer { } } -fn create_trimmed_file(from: &str, to: &str, trim_to: u64) -> IoResult<()> { - FileSystem::copy(from, to)?; - let mut f = File::open(to)?; - f.f_truncate(trim_to) +#[derive(Debug)] +#[allow(unused)] +struct ModifiedJournalInfo { + original_file_size: usize, + modified_file_size: usize, + corruption_range: Range, } -fn emulate_corrupted_final_event( +impl ModifiedJournalInfo { + fn new( + original_file_size: usize, + modified_file_size: usize, + corruption_range: Range, + ) -> Self { + Self { + original_file_size, + modified_file_size, + corruption_range, + } + } +} + +fn emulate_sequentially_varying_single_corruption( initializers: impl IntoIterator, - post_corruption_handler: impl Fn(&str, u64, usize, RuntimeResult>), + modified_journal_generator_fn: impl Fn( + &str, + &str, + u64, + usize, + &BTreeMap, + ) -> IoResult, + post_corruption_handler: impl Fn( + &str, + u64, + usize, + SimpleDB, + RuntimeResult>, + ModifiedJournalInfo, + ), post_repair_handler: impl Fn( &str, + u64, usize, RuntimeResult, + SimpleDB, RuntimeResult>, ), ) { @@ -102,59 +136,163 @@ fn emulate_corrupted_final_event( FileSystem::read(journal_id), ), }; - let journal_size = File::open(journal_id).unwrap().f_len().unwrap(); let _ = debug_get_trace(); - let _ = debug_get_offsets(); + let original_offsets = debug_get_offsets(); // now trim and repeat - for (trim_size, new_size) in (1..=last_event_size) - .rev() - .map(|trim_size| (trim_size, journal_size - trim_size as u64)) - { - // create a copy of the "good" journal and trim to simulate data loss - let trimmed_journal_path = format!("{journal_id}-trimmed-{trim_size}.db"); - create_trimmed_file(journal_id, &trimmed_journal_path, new_size).unwrap(); - // init misc - let simple_db = SimpleDB::new(); - let open_journal_fn = || { + for trim_size in 1..=last_event_size { + // create a copy of the "good" journal and corrupt it + let corrupted_journal_path = format!("{journal_id}-trimmed-{trim_size}.db"); + let open_journal_fn = |db: &SimpleDB| { open_journal::( - &trimmed_journal_path, - &simple_db, + &corrupted_journal_path, + db, JournalSettings::default(), ) }; - // now let the caller handle any post corruption work - let open_journal_result = open_journal_fn(); - post_corruption_handler( + // modify journal + let mod_stat = modified_journal_generator_fn( journal_id, + &corrupted_journal_path, repaired_last_event_id, trim_size, - open_journal_result, - ); - // repair - let repair_result = repair_journal::( - &trimmed_journal_path, - &simple_db, - JournalSettings::default(), - JournalRepairMode::Simple, - ); - let repaired_journal_reopen_result = open_journal_fn(); - // let caller handle any post repair work - post_repair_handler( - journal_id, - trim_size, - repair_result, - repaired_journal_reopen_result, - ); + &original_offsets, + ) + .unwrap(); + // now let the caller handle any post corruption work + { + let sdb = SimpleDB::new(); + let open_journal_result = open_journal_fn(&sdb); + post_corruption_handler( + journal_id, + repaired_last_event_id, + trim_size, + sdb, + open_journal_result, + mod_stat, + ); + } + // repair and let the caller handle post repair work + { + let sdb = SimpleDB::new(); + let repair_result = repair_journal::( + &corrupted_journal_path, + &sdb, + JournalSettings::default(), + JournalRepairMode::Simple, + ); + let repaired_journal_reopen_result = open_journal_fn(&sdb); + // let caller handle any post repair work + post_repair_handler( + journal_id, + repaired_last_event_id, + trim_size, + repair_result, + sdb, + repaired_journal_reopen_result, + ); + } } } } +fn emulate_final_event_corruption( + initializers: impl IntoIterator, + post_corruption_handler: impl Fn( + &str, + u64, + usize, + SimpleDB, + RuntimeResult>, + ModifiedJournalInfo, + ), + post_repair_handler: impl Fn( + &str, + u64, + usize, + RuntimeResult, + SimpleDB, + RuntimeResult>, + ), +) { + emulate_sequentially_varying_single_corruption( + initializers, + |original_journal, modified_journal, _, trim_amount, _offsets| { + FileSystem::copy(original_journal, modified_journal)?; + let mut f = File::open(modified_journal)?; + let real_flen = f.f_len()? as usize; + f.f_truncate((real_flen - trim_amount) as _)?; + Ok(ModifiedJournalInfo::new( + real_flen, + trim_amount, + trim_amount..real_flen, + )) + }, + post_corruption_handler, + post_repair_handler, + ) +} + +fn emulate_midway_corruption( + initializers: impl IntoIterator, + post_corruption_handler: impl Fn( + &str, + u64, + usize, + SimpleDB, + RuntimeResult>, + ModifiedJournalInfo, + ), + post_repair_handler: impl Fn( + &str, + u64, + usize, + RuntimeResult, + SimpleDB, + RuntimeResult>, + ), +) { + emulate_sequentially_varying_single_corruption( + initializers, + |original_journal_path, + corrupted_journal_path, + event_to_corrupt, + trim_size, + original_offsets| { + let orig_journal_data = FileSystem::read(original_journal_path)?; + let orig_journal_size = orig_journal_data.len(); + let mut f = File::create(corrupted_journal_path)?; + let end_offset = *original_offsets.get(&event_to_corrupt).unwrap() as usize; + // apply + let segment_before_corruption = &orig_journal_data[..end_offset - trim_size]; + let segment_after_corruption = &orig_journal_data[end_offset..]; + let new_size = segment_before_corruption.len() + segment_after_corruption.len(); + assert!( + new_size < orig_journal_size, + "real len is {orig_journal_size} while new len is {new_size}", + ); + assert_eq!( + segment_before_corruption.len() + segment_after_corruption.len() + trim_size, + orig_journal_size + ); + f.fwrite_all(segment_before_corruption)?; + f.fwrite_all(segment_after_corruption)?; + Ok(ModifiedJournalInfo::new( + orig_journal_size, + new_size, + end_offset - trim_size..end_offset, + )) + }, + post_corruption_handler, + post_repair_handler, + ) +} + fn apply_event_mix(jrnl: &mut RawJournalWriter) -> RuntimeResult { let mut op_count = 0; let mut sdb = SimpleDB::new(); - for num in 1..=100 { + for num in 1..=TRIALS { op_count += 1; - sdb.push(jrnl, format!("key-{num}"))?; + sdb.push(jrnl, format!("key-{num:06}"))?; if num % 10 == 0 { op_count += 1; sdb.pop(jrnl)?; @@ -198,9 +336,9 @@ fn corruption_before_close() { }, ), ]; - emulate_corrupted_final_event( + emulate_final_event_corruption( initializers, - |journal_id, repaired_last_event_id, trim_size, open_result| { + |journal_id, repaired_last_event_id, trim_size, _db, open_result, _modstat| { // open the journal and validate failure let open_err = open_result.unwrap_err(); let trace = debug_get_trace(); @@ -255,7 +393,7 @@ fn corruption_before_close() { "failed at trim_size {trim_size} for journal {journal_id}" ); }, - |journal_id, trim_size, repair_result, reopen_result| { + |journal_id, _repaired_last_id, trim_size, repair_result, _db, reopen_result| { assert_eq!( repair_result.unwrap(), RepairResult::UnspecifiedLoss((DriverEvent::FULL_EVENT_SIZE - trim_size) as _), @@ -290,9 +428,9 @@ fn corruption_after_reopen() { Ok(operation_count + 1) // + 1 since we have the reopen event which is the next event that'll vanish }), ]; - emulate_corrupted_final_event( + emulate_final_event_corruption( initializers, - |journal_id, repaired_last_event_id, trim_size, open_result| { + |journal_id, repaired_last_event_id, trim_size, _db, open_result, _modstat| { let trace = debug_get_trace(); if trim_size == DriverEvent::FULL_EVENT_SIZE { /* @@ -391,7 +529,7 @@ fn corruption_after_reopen() { } } }, - |journal_id, trim_size, repair_result, reopen_result| { + |journal_id, _repaired_last_id, trim_size, repair_result, _db, reopen_result| { assert!(reopen_result.is_ok()); if trim_size == DriverEvent::FULL_EVENT_SIZE { // see earlier comment @@ -448,7 +586,7 @@ fn corruption_at_runtime() { let mut op_count = 0; let mut sdb = SimpleDB::new(); let mut jrnl = create_journal(jrnl_id)?; - for _ in 0..100 { + for _ in 0..TRIALS { sdb.push(&mut jrnl, KEY)?; op_count += 1; } @@ -458,9 +596,9 @@ fn corruption_at_runtime() { offset, ), ]; - emulate_corrupted_final_event( + emulate_final_event_corruption( initializers, - |journal_id, repaired_last_event_id, trim_size, open_result| { + |journal_id, repaired_last_event_id, trim_size, _db, open_result, _modstat| { let trace = debug_get_trace(); let err = open_result.unwrap_err(); assert_eq!( @@ -518,7 +656,7 @@ fn corruption_at_runtime() { } } }, - |journal_id, trim_size, repair_result, reopen_result| { + |journal_id, _repaired_last_id, trim_size, repair_result, _db, reopen_result| { assert!(reopen_result.is_ok()); assert_eq!( repair_result.unwrap(), @@ -526,7 +664,109 @@ fn corruption_at_runtime() { "failed for journal {journal_id} with trim_size {trim_size}" ); let _ = debug_get_trace(); - let _ = debug_get_offsets(); }, ) } + +#[test] +fn midway_corruption_close() { + let initializers = vec![ + Initializer::new_driver_type("midway_corruption_close_direct", |jrnl_id| { + // create and close + let mut jrnl = create_journal::(jrnl_id)?; + RawJournalWriter::close_driver(&mut jrnl)?; + drop(jrnl); + // reopen and close + let mut jrnl = open_journal::( + jrnl_id, + &SimpleDB::new(), + JournalSettings::default(), + )?; + RawJournalWriter::close_driver(&mut jrnl)?; + drop(jrnl); + Ok(0) // close (to corrupt), reopen, close + }), + Initializer::new_driver_type( + "midway_corruption_close_events_before_second_close", + |jrnl_id| { + { + // create and close + let mut jrnl = create_journal::(jrnl_id)?; + RawJournalWriter::close_driver(&mut jrnl)?; + } + { + // reopen and close + let mut jrnl = open_journal::( + jrnl_id, + &SimpleDB::new(), + JournalSettings::default(), + )?; + RawJournalWriter::close_driver(&mut jrnl)?; // <-- corrupt this one + } + { + let mut jrnl = open_journal::( + jrnl_id, + &SimpleDB::new(), + JournalSettings::default(), + )?; + let _ = apply_event_mix(&mut jrnl)?; + RawJournalWriter::close_driver(&mut jrnl)?; + } + Ok(2) // corrupt the second close event + }, + ), + ]; + debug_set_offset_tracking(true); + emulate_midway_corruption( + initializers, + |journal_id, _last_id, trim_size, db, open_result, _modstat| { + assert!( + open_result.is_err(), + "failed for journal {journal_id} with trim_size {trim_size}" + ); + // all data will be lost, so the DB will be empty + assert_eq!( + db.data().len(), + 0, + "failed for journal {journal_id} with trim_size {trim_size}" + ); + let _ = debug_get_offsets(); + let _ = debug_get_trace(); + }, + |journal_id, last_id, trim_size, repair_result, db, reopen_result| { + let _ = reopen_result.unwrap(); + // all data will be lost, so the DB will be empty + assert_eq!( + db.data().len(), + 0, + "failed for journal {journal_id} with trim_size {trim_size}" + ); + if last_id == 0 { + assert_eq!( + repair_result.unwrap(), + RepairResult::UnspecifiedLoss( + ((DriverEvent::FULL_EVENT_SIZE * 3) - trim_size) as u64 + ), + "failed for journal {journal_id} with trim_size {trim_size}" + ); + } else { + // this is a serious midway corruption with major data loss + let full_log_size = File::open(journal_id).unwrap().f_len().unwrap(); + assert_eq!( + repair_result.unwrap(), + RepairResult::UnspecifiedLoss( + full_log_size + - <::Spec as FileSpecV1>::SIZE // account for header + as u64 + - (DriverEvent::FULL_EVENT_SIZE * 2) as u64 // account for close (0), reopen(1) + - trim_size as u64 // account for trim + ), + "failed for journal {journal_id} with trim_size {trim_size}" + ); + } + let _ = debug_get_trace(); + let _ = debug_get_offsets(); + }, + ); + debug_set_offset_tracking(false); +}