diff --git a/server/src/engine/storage/v2/mod.rs b/server/src/engine/storage/v2/mod.rs index 33e11a57..5978e4d0 100644 --- a/server/src/engine/storage/v2/mod.rs +++ b/server/src/engine/storage/v2/mod.rs @@ -219,9 +219,6 @@ pub fn repair() -> RuntimeResult<()> { fn print_repair_info(result: RepairResult, id: &str) { match result { RepairResult::NoErrors => info!("repair: no errors detected in {id}"), - RepairResult::LostBytes(lost) => { - warn!("repair: LOST DATA. repaired {id} but lost {lost} trailing bytes") - } RepairResult::UnspecifiedLoss(definitely_lost) => { if definitely_lost == 0 { warn!("repair: LOST DATA. repaired {id} but lost an unspecified amount of data") diff --git a/server/src/engine/storage/v2/raw/journal/raw/mod.rs b/server/src/engine/storage/v2/raw/journal/raw/mod.rs index 1a837d12..b93eae48 100644 --- a/server/src/engine/storage/v2/raw/journal/raw/mod.rs +++ b/server/src/engine/storage/v2/raw/journal/raw/mod.rs @@ -83,8 +83,6 @@ where pub enum RepairResult { /// No errors were detected NoErrors, - /// Lost n bytes - LostBytes(u64), /// Definitely lost n bytes, but might have lost more UnspecifiedLoss(u64), } @@ -150,15 +148,20 @@ impl JournalInitializer { */ #[cfg(test)] -pub fn obtain_trace() -> Vec { +pub fn debug_get_trace() -> Vec { local_mut!(TRACE, |t| core::mem::take(t)) } #[cfg(test)] -pub fn obtain_offsets() -> std::collections::BTreeMap { +pub fn debug_get_offsets() -> std::collections::BTreeMap { local_mut!(OFFSETS, |offsets| core::mem::take(offsets)) } +#[cfg(test)] +pub fn debug_set_offset_tracking(track: bool) { + local_mut!(TRACE_OFFSETS, |track_| *track_ = track) +} + #[derive(Debug, PartialEq)] #[cfg(test)] pub enum JournalTraceEvent { @@ -838,7 +841,7 @@ impl RawJournalReader { } else { self.tr.cached_size() - self.last_txn_offset }; - let mut repair_result = RepairResult::LostBytes(lost); + let repair_result = RepairResult::UnspecifiedLoss(lost); match repair_mode { JournalRepairMode::Simple => {} } @@ -928,13 +931,6 @@ impl RawJournalReader { known_event_offset, known_event_id, ); - if { - (self.state == JournalState::AwaitingClose) | // expecting a close but we couldn't parse it - (self.state == JournalState::AwaitingEvent) // we were awaiting an event but we couldn't get enough metadata to do anything - } { - // we reached eof and we were expecting a close. definitely lost an unspecified number of bytes - repair_result = RepairResult::UnspecifiedLoss(lost); - } let drv_close_event = drv_close.encode_self(); last_logged_checksum.update(&drv_close_event); base_log.fsynced_write(&drv_close_event)?; diff --git a/server/src/engine/storage/v2/raw/journal/raw/tests/journal_ops.rs b/server/src/engine/storage/v2/raw/journal/raw/tests/journal_ops.rs index 8684f4c1..7729d403 100644 --- a/server/src/engine/storage/v2/raw/journal/raw/tests/journal_ops.rs +++ b/server/src/engine/storage/v2/raw/journal/raw/tests/journal_ops.rs @@ -27,7 +27,7 @@ use { super::{ super::{ - create_journal, obtain_trace, open_journal, DriverEventKind, JournalReaderTraceEvent, + create_journal, debug_get_trace, open_journal, DriverEventKind, JournalReaderTraceEvent, JournalSettings, JournalWriterTraceEvent, RawJournalWriter, }, SimpleDB, SimpleDBJournal, @@ -42,12 +42,12 @@ fn journal_open_close() { // new boot let mut j = create_journal::(JOURNAL_NAME).unwrap(); assert_eq!( - obtain_trace(), + debug_get_trace(), intovec![JournalWriterTraceEvent::Initialized] ); RawJournalWriter::close_driver(&mut j).unwrap(); assert_eq!( - obtain_trace(), + debug_get_trace(), intovec![ JournalWriterTraceEvent::DriverEventAttemptCommit { event: DriverEventKind::Closed, @@ -68,7 +68,7 @@ fn journal_open_close() { ) .unwrap(); assert_eq!( - obtain_trace(), + debug_get_trace(), intovec![ // init reader and read close event JournalReaderTraceEvent::Initialized, @@ -92,7 +92,7 @@ fn journal_open_close() { ); RawJournalWriter::close_driver(&mut j).unwrap(); assert_eq!( - obtain_trace(), + debug_get_trace(), intovec![ JournalWriterTraceEvent::DriverEventAttemptCommit { event: DriverEventKind::Closed, @@ -113,7 +113,7 @@ fn journal_open_close() { ) .unwrap(); assert_eq!( - obtain_trace(), + debug_get_trace(), intovec![ // init reader and read reopen event JournalReaderTraceEvent::Initialized, @@ -147,7 +147,7 @@ fn journal_open_close() { ); RawJournalWriter::close_driver(&mut j).unwrap(); assert_eq!( - obtain_trace(), + debug_get_trace(), intovec![ JournalWriterTraceEvent::DriverEventAttemptCommit { event: DriverEventKind::Closed, @@ -171,7 +171,7 @@ fn journal_with_server_single_event() { db.push(&mut j, "hello world").unwrap(); RawJournalWriter::close_driver(&mut j).unwrap(); assert_eq!( - obtain_trace(), + debug_get_trace(), intovec![ JournalWriterTraceEvent::Initialized, JournalWriterTraceEvent::CommitAttemptForEvent(0), @@ -192,12 +192,12 @@ fn journal_with_server_single_event() { let db = SimpleDB::new(); // second boot let mut j = open_journal::(JOURNAL_NAME, &db, JournalSettings::default()) - .set_dmsg_fn(|| format!("{:?}", obtain_trace())) + .set_dmsg_fn(|| format!("{:?}", debug_get_trace())) .unwrap(); assert_eq!(db.data().len(), 1); assert_eq!(db.data()[0], "hello world"); assert_eq!( - obtain_trace(), + debug_get_trace(), intovec![ // init reader and read server event JournalReaderTraceEvent::Initialized, @@ -227,7 +227,7 @@ fn journal_with_server_single_event() { ); RawJournalWriter::close_driver(&mut j).unwrap(); assert_eq!( - obtain_trace(), + debug_get_trace(), intovec![ JournalWriterTraceEvent::DriverEventAttemptCommit { event: DriverEventKind::Closed, @@ -247,7 +247,7 @@ fn journal_with_server_single_event() { assert_eq!(db.data().len(), 1); assert_eq!(db.data()[0], "hello world"); assert_eq!( - obtain_trace(), + debug_get_trace(), intovec![ // init reader and read server event JournalReaderTraceEvent::Initialized, @@ -288,7 +288,7 @@ fn journal_with_server_single_event() { ); RawJournalWriter::close_driver(&mut j).unwrap(); assert_eq!( - obtain_trace(), + debug_get_trace(), intovec![ JournalWriterTraceEvent::DriverEventAttemptCommit { event: DriverEventKind::Closed, diff --git a/server/src/engine/storage/v2/raw/journal/raw/tests/recovery.rs b/server/src/engine/storage/v2/raw/journal/raw/tests/recovery.rs index 6e181b7c..80286b90 100644 --- a/server/src/engine/storage/v2/raw/journal/raw/tests/recovery.rs +++ b/server/src/engine/storage/v2/raw/journal/raw/tests/recovery.rs @@ -30,14 +30,19 @@ use { engine::{ error::ErrorKind, storage::{ - common::interface::fs::{File, FileExt, FileSystem, FileWriteExt}, + common::{ + interface::fs::{File, FileExt, FileSystem, FileWriteExt}, + sdss::sdss_r1::FileSpecV1, + }, v2::raw::journal::{ create_journal, open_journal, raw::{ - obtain_offsets, obtain_trace, DriverEvent, DriverEventKind, - JournalReaderTraceEvent, JournalWriterTraceEvent, RawJournalWriter, + debug_get_offsets, debug_get_trace, debug_set_offset_tracking, DriverEvent, + DriverEventKind, JournalReaderTraceEvent, JournalWriterTraceEvent, + RawJournalWriter, }, - repair_journal, JournalRepairMode, JournalSettings, RepairResult, + repair_journal, JournalRepairMode, JournalSettings, RawJournalAdapter, + RepairResult, }, }, RuntimeResult, @@ -47,7 +52,24 @@ use { std::io::ErrorKind as IoErrorKind, }; -type Initializer = (&'static str, fn(&str) -> RuntimeResult); +struct Initializer { + journal_id: &'static str, + initializer_fn: fn(&str) -> RuntimeResult, + last_event_size: usize, +} + +impl Initializer { + fn new(name: &'static str, f: fn(&str) -> RuntimeResult, last_event_size: usize) -> Self { + Self { + journal_id: name, + initializer_fn: f, + last_event_size, + } + } + fn new_driver_type(name: &'static str, f: fn(&str) -> RuntimeResult) -> Self { + Self::new(name, f, DriverEvent::FULL_EVENT_SIZE) + } +} fn create_trimmed_file(from: &str, to: &str, trim_to: u64) -> IoResult<()> { FileSystem::copy(from, to)?; @@ -65,21 +87,26 @@ fn emulate_corrupted_final_event( RuntimeResult>, ), ) { - for (journal_id, initializer) in initializers { + for Initializer { + journal_id, + initializer_fn, + last_event_size, + } in initializers + { // initialize journal, get size and clear traces - let repaired_last_event_id = match initializer(journal_id) { + let repaired_last_event_id = match initializer_fn(journal_id) { Ok(nid) => nid, Err(e) => panic!( "failed to initialize {journal_id} due to {e}. trace: {:?}, file_data={:?}", - obtain_trace(), + debug_get_trace(), FileSystem::read(journal_id), ), }; let journal_size = File::open(journal_id).unwrap().f_len().unwrap(); - let _ = obtain_trace(); - let _ = obtain_offsets(); + let _ = debug_get_trace(); + let _ = debug_get_offsets(); // now trim and repeat - for (trim_size, new_size) in (1..=DriverEvent::FULL_EVENT_SIZE) + for (trim_size, new_size) in (1..=last_event_size) .rev() .map(|trim_size| (trim_size, journal_size - trim_size as u64)) { @@ -138,22 +165,22 @@ fn apply_event_mix(jrnl: &mut RawJournalWriter) -> RuntimeResul #[test] fn corruption_before_close() { - let initializers: Vec = vec![ + let initializers = vec![ // open and close - ("close_event_corruption_empty.db", |jrnl_id| { + Initializer::new_driver_type("close_event_corruption_empty.db", |jrnl_id| { let mut jrnl = create_journal::(jrnl_id)?; RawJournalWriter::close_driver(&mut jrnl)?; Ok(0) }), // open, apply mix of events, close - ("close_event_corruption.db", |jrnl_id| { + Initializer::new_driver_type("close_event_corruption.db", |jrnl_id| { let mut jrnl = create_journal::(jrnl_id)?; let operation_count = apply_event_mix(&mut jrnl)?; RawJournalWriter::close_driver(&mut jrnl)?; Ok(operation_count) }), // open, close, reinit, close - ( + Initializer::new_driver_type( "close_event_corruption_open_close_open_close.db", |jrnl_id| { // open and close @@ -176,7 +203,7 @@ fn corruption_before_close() { |journal_id, repaired_last_event_id, trim_size, open_result| { // open the journal and validate failure let open_err = open_result.unwrap_err(); - let trace = obtain_trace(); + let trace = debug_get_trace(); if trim_size > (DriverEvent::FULL_EVENT_SIZE - (sizeof!(u128) + sizeof!(u64))) { // the amount of trim from the end of the file causes us to lose valuable metadata if repaired_last_event_id == 0 { @@ -234,20 +261,18 @@ fn corruption_before_close() { RepairResult::UnspecifiedLoss((DriverEvent::FULL_EVENT_SIZE - trim_size) as _), "failed at trim_size {trim_size} for journal {journal_id}" ); - let mut jrnl = reopen_result.unwrap(); - // now reopen log and ensure it's repaired - RawJournalWriter::close_driver(&mut jrnl).unwrap(); + let _ = reopen_result.unwrap(); // clear trace - let _ = obtain_trace(); - let _ = obtain_offsets(); + let _ = debug_get_trace(); + let _ = debug_get_offsets(); }, ) } #[test] fn corruption_after_reopen() { - let initializers: Vec = vec![ - ("corruption_after_reopen.db", |jrnl_id| { + let initializers = vec![ + Initializer::new_driver_type("corruption_after_reopen.db", |jrnl_id| { let mut jrnl = create_journal::(jrnl_id)?; RawJournalWriter::close_driver(&mut jrnl)?; drop(jrnl); @@ -255,7 +280,7 @@ fn corruption_after_reopen() { open_journal::(jrnl_id, &SimpleDB::new(), JournalSettings::default())?; Ok(1) }), - ("corruption_after_ropen_multi_before_close.db", |jrnl_id| { + Initializer::new_driver_type("corruption_after_ropen_multi_before_close.db", |jrnl_id| { let mut jrnl = create_journal::(jrnl_id)?; let operation_count = apply_event_mix(&mut jrnl)?; RawJournalWriter::close_driver(&mut jrnl)?; @@ -268,7 +293,7 @@ fn corruption_after_reopen() { emulate_corrupted_final_event( initializers, |journal_id, repaired_last_event_id, trim_size, open_result| { - let trace = obtain_trace(); + let trace = debug_get_trace(); if trim_size == DriverEvent::FULL_EVENT_SIZE { /* IMPORTANT IFFY SITUATION: undetectable error. if an entire "correct" part of the log vanishes, it's not going to be detected. @@ -330,8 +355,8 @@ fn corruption_after_reopen() { } // now close this so that this works with the post repair handler RawJournalWriter::close_driver(&mut jrnl).unwrap(); - let _ = obtain_offsets(); - let _ = obtain_trace(); + let _ = debug_get_offsets(); + let _ = debug_get_trace(); } else { assert_eq!( open_result.unwrap_err().kind(), @@ -378,11 +403,130 @@ fn corruption_after_reopen() { } else { assert_eq!( repair_result.unwrap(), - RepairResult::LostBytes((DriverEvent::FULL_EVENT_SIZE - trim_size) as u64) + RepairResult::UnspecifiedLoss( + (DriverEvent::FULL_EVENT_SIZE - trim_size) as u64 + ) ); } - let _ = obtain_trace(); - let _ = obtain_offsets(); + let _ = debug_get_trace(); + let _ = debug_get_offsets(); + }, + ) +} + +#[test] +fn corruption_at_runtime() { + // first get the offsets to compute the size of the event + const KEY: &str = "hello, universe"; + let offset = { + debug_set_offset_tracking(true); + let mut sdb = SimpleDB::new(); + let mut jrnl = create_journal("corruption_at_runtime_test_log.db").unwrap(); + sdb.push(&mut jrnl, KEY).unwrap(); + let (_, offset) = debug_get_offsets().pop_last().unwrap(); + let ret = + offset as usize - <::Spec as FileSpecV1>::SIZE; + debug_set_offset_tracking(false); + let _ = debug_get_trace(); + ret + }; + let initializers = vec![ + Initializer::new( + "corruption_at_runtime_open_commit_corrupt", + |jrnl_id| { + let mut sdb = SimpleDB::new(); + let mut jrnl = create_journal(jrnl_id)?; + sdb.push(&mut jrnl, KEY)?; + // don't close + Ok(0) + }, + offset, + ), + Initializer::new( + "corruption_at_runtime_open_multi_commit_then_corrupt", + |jrnl_id| { + let mut op_count = 0; + let mut sdb = SimpleDB::new(); + let mut jrnl = create_journal(jrnl_id)?; + for _ in 0..100 { + sdb.push(&mut jrnl, KEY)?; + op_count += 1; + } + // don't close + Ok(op_count) + }, + offset, + ), + ]; + emulate_corrupted_final_event( + initializers, + |journal_id, repaired_last_event_id, trim_size, open_result| { + let trace = debug_get_trace(); + let err = open_result.unwrap_err(); + assert_eq!( + err.kind(), + &ErrorKind::IoError(IoErrorKind::UnexpectedEof.into()), + "failed for journal {journal_id} with trim_size {trim_size}" + ); + if trim_size > offset - (sizeof!(u128) + sizeof!(u64)) { + if repaired_last_event_id == 0 { + assert_eq!( + trace, + intovec![ + JournalReaderTraceEvent::Initialized, + JournalReaderTraceEvent::LookingForEvent, + ], + "failed for journal {journal_id} with trim_size {trim_size}" + ) + } else { + assert_eq!( + &trace[trace.len() - 4..], + intovec![ + JournalReaderTraceEvent::DetectedServerEvent, + JournalReaderTraceEvent::ServerEventMetadataParsed, + JournalReaderTraceEvent::ServerEventAppliedSuccess, + JournalReaderTraceEvent::LookingForEvent, + ], + "failed for journal {journal_id} with trim_size {trim_size}" + ) + } + } else { + if repaired_last_event_id == 0 { + // empty log + assert_eq!( + trace, + intovec![ + JournalReaderTraceEvent::Initialized, + JournalReaderTraceEvent::LookingForEvent, + JournalReaderTraceEvent::AttemptingEvent(0), + JournalReaderTraceEvent::DetectedServerEvent, + JournalReaderTraceEvent::ServerEventMetadataParsed, + ], + "failed for journal {journal_id} with trim_size {trim_size}" + ); + } else { + assert_eq!( + &trace[trace.len() - 4..], + intovec![ + JournalReaderTraceEvent::LookingForEvent, + JournalReaderTraceEvent::AttemptingEvent(repaired_last_event_id - 1), + JournalReaderTraceEvent::DetectedServerEvent, + JournalReaderTraceEvent::ServerEventMetadataParsed, + ], + "failed for journal {journal_id} with trim_size {trim_size}" + ); + } + } + }, + |journal_id, trim_size, repair_result, reopen_result| { + assert!(reopen_result.is_ok()); + assert_eq!( + repair_result.unwrap(), + RepairResult::UnspecifiedLoss((offset - trim_size) as u64), + "failed for journal {journal_id} with trim_size {trim_size}" + ); + let _ = debug_get_trace(); + let _ = debug_get_offsets(); }, ) }