diff --git a/CHANGELOG.md b/CHANGELOG.md index c4bd8383..f135f7b9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,7 +6,8 @@ All changes in this project will be noted in this file. ### Additions -- Skyhash/2.1: Restored support for pipelines +- Skyhash/2: Restored support for pipelines +- Enable online (runtime) recovery of transactional failures due to disk errors ## Version 0.8.1 diff --git a/server/src/engine/error.rs b/server/src/engine/error.rs index 30e4bdfb..f7c4a4bf 100644 --- a/server/src/engine/error.rs +++ b/server/src/engine/error.rs @@ -251,7 +251,6 @@ enumerate_err! { runtime errors ---- */ - RawJournalRuntimeHeartbeatFail = "journal-lwt-heartbeat-failed", RawJournalRuntimeDirty = "journal-in-dirty-state", } } diff --git a/server/src/engine/fractal/mgr.rs b/server/src/engine/fractal/mgr.rs index 0b8f7190..172f49dc 100644 --- a/server/src/engine/fractal/mgr.rs +++ b/server/src/engine/fractal/mgr.rs @@ -115,7 +115,7 @@ pub enum CriticalTask { /// Write a new data batch WriteBatch(ModelUniqueID, usize), /// try recovering model ID - TryModelAutorecoverLWT(ModelUniqueID), + TryModelAutorecover(ModelUniqueID), CheckGNSDriver, } @@ -323,13 +323,7 @@ impl FractalMgr { match task { CriticalTask::CheckGNSDriver => { info!("trying to autorecover GNS driver"); - match global - .state() - .gns_driver() - .txn_driver - .lock() - .__lwt_heartbeat() - { + match global.state().gns_driver().txn_driver.lock().__rollback() { Ok(()) => { info!("GNS driver has been successfully auto-recovered"); global.state().gns_driver().status().set_okay(); @@ -343,7 +337,7 @@ impl FractalMgr { } } } - CriticalTask::TryModelAutorecoverLWT(mdl_id) => { + CriticalTask::TryModelAutorecover(mdl_id) => { info!("trying to autorecover model {mdl_id}"); match global .state() @@ -355,7 +349,7 @@ impl FractalMgr { Some(mdl) if mdl.data().get_uuid() == mdl_id.uuid() => { let mut drv = mdl.driver().batch_driver().lock(); let drv = drv.as_mut().unwrap(); - match drv.__lwt_heartbeat() { + match drv.__rollback() { Ok(()) => { mdl.driver().status().set_okay(); global.health().report_recovery(); @@ -364,7 +358,7 @@ impl FractalMgr { Err(e) => { error!("failed to autorecover {mdl_id} with {e}. will try again"); self.hp_dispatcher - .send(Task::new(CriticalTask::TryModelAutorecoverLWT(mdl_id))) + .send(Task::new(CriticalTask::TryModelAutorecover(mdl_id))) .unwrap() } } @@ -548,9 +542,7 @@ impl FractalMgr { .map_err(|e| { mdl_driver_.status().set_iffy(); self.hp_dispatcher - .send(Task::new(CriticalTask::TryModelAutorecoverLWT( - mdl_id.into(), - ))) + .send(Task::new(CriticalTask::TryModelAutorecover(mdl_id.into()))) .unwrap(); (e, BatchStats::into_inner(batch_stats)) }) diff --git a/server/src/engine/fractal/test_utils.rs b/server/src/engine/fractal/test_utils.rs index 219cf0de..e16db672 100644 --- a/server/src/engine/fractal/test_utils.rs +++ b/server/src/engine/fractal/test_utils.rs @@ -133,7 +133,7 @@ impl GlobalInstanceLike for TestGlobal { .commit_with_ctx(StdModelBatch::new(mdl.data(), count), BatchStats::new()) .unwrap() } - CriticalTask::TryModelAutorecoverLWT(_) => {} + CriticalTask::TryModelAutorecover(_) => {} CriticalTask::CheckGNSDriver => {} } } diff --git a/server/src/engine/macros.rs b/server/src/engine/macros.rs index 2968364b..0ebe0718 100644 --- a/server/src/engine/macros.rs +++ b/server/src/engine/macros.rs @@ -401,7 +401,7 @@ macro_rules! local { } macro_rules! local_mut { - ($ident:ident, $call:expr) => {{ + ($ident:expr, $call:expr) => {{ #[inline(always)] fn _f(v: &::std::cell::RefCell, f: impl FnOnce(&mut T) -> U) -> U { f(&mut *v.borrow_mut()) diff --git a/server/src/engine/storage/common/interface/fs.rs b/server/src/engine/storage/common/interface/fs.rs index 4a9a9e12..7bf25ad1 100644 --- a/server/src/engine/storage/common/interface/fs.rs +++ b/server/src/engine/storage/common/interface/fs.rs @@ -192,6 +192,7 @@ pub trait FileWrite { ) } Ok(n) => written += n, + Err(e) if e.kind() == ErrorKind::Interrupted => continue, Err(e) => return (written, Err(e)), } } diff --git a/server/src/engine/storage/common/interface/mod.rs b/server/src/engine/storage/common/interface/mod.rs index ea1247c9..4a7ab68f 100644 --- a/server/src/engine/storage/common/interface/mod.rs +++ b/server/src/engine/storage/common/interface/mod.rs @@ -33,3 +33,5 @@ pub mod fs; #[cfg(test)] mod vfs; +#[cfg(test)] +pub use vfs::vfs_utils; diff --git a/server/src/engine/storage/common/interface/vfs.rs b/server/src/engine/storage/common/interface/vfs.rs index 51f5bb4b..cfd77af7 100644 --- a/server/src/engine/storage/common/interface/vfs.rs +++ b/server/src/engine/storage/common/interface/vfs.rs @@ -25,7 +25,7 @@ */ use { - crate::{engine::sync::cell::Lazy, IoResult}, + crate::{engine::sync::cell::Lazy, util::test_utils, IoResult}, parking_lot::RwLock, std::{ collections::{ @@ -41,6 +41,34 @@ use { --- */ +pub mod vfs_utils { + #[derive(Debug, PartialEq, Clone, Copy)] + pub(super) enum WriteCrashKind { + None, + Zero, + Random, + } + local!( + static RANDOM_WRITE_CRASH: WriteCrashKind = WriteCrashKind::None; + pub(super) static RNG: Option = None; + ); + /// WARNING: A random write crash automatically degrades to a [`WriteCrashKind::Zero`] as soon as it completes + /// to prevent any further data writes (due to retries in + /// [`fs::FileWrite::fwrite_all_count`](super::super::fs::FileWrite::fwrite_all_count)) + pub fn debug_enable_random_write_crash() { + local_mut!(RANDOM_WRITE_CRASH, |crash| *crash = WriteCrashKind::Random) + } + pub fn debug_enable_zero_write_crash() { + local_mut!(RANDOM_WRITE_CRASH, |crash| *crash = WriteCrashKind::Zero) + } + pub fn debug_disable_write_crash() { + local_mut!(RANDOM_WRITE_CRASH, |crash| *crash = WriteCrashKind::None) + } + pub(super) fn debug_write_crash_setting() -> WriteCrashKind { + local_ref!(RANDOM_WRITE_CRASH, |crash| *crash) + } +} + /* definitions --- @@ -167,12 +195,41 @@ impl VFile { if !self.write { return Err(Error::new(ErrorKind::PermissionDenied, "Write permission denied").into()); } - if self.pos + bytes.len() > self.data.len() { - self.data.resize(self.pos + bytes.len(), 0); + match vfs_utils::debug_write_crash_setting() { + vfs_utils::WriteCrashKind::None => { + if self.pos + bytes.len() > self.data.len() { + self.data.resize(self.pos + bytes.len(), 0); + } + self.data[self.pos..self.pos + bytes.len()].copy_from_slice(bytes); + self.pos += bytes.len(); + Ok(bytes.len() as _) + } + vfs_utils::WriteCrashKind::Random => { + let actual_write_length = local_mut!(vfs_utils::RNG, |rng| { + match rng { + Some(ref mut rng) => test_utils::random_number(0, bytes.len(), rng), + None => { + let mut rng_ = rand::thread_rng(); + let r = test_utils::random_number(0, bytes.len(), &mut rng_); + *rng = Some(rng_); + r + } + } + }); + // write some random part of the buffer into this file + if self.pos + actual_write_length > self.data.len() { + self.data.resize(self.pos + actual_write_length, 0); + } + self.data[self.pos..self.pos + actual_write_length] + .copy_from_slice(&bytes[..actual_write_length]); + self.pos += actual_write_length; + // now soon as this is complete, downgrade error type to writezero so that we don't write any further data during + // a retry + vfs_utils::debug_enable_zero_write_crash(); + Ok(actual_write_length as _) + } + vfs_utils::WriteCrashKind::Zero => Ok(0), } - self.data[self.pos..self.pos + bytes.len()].copy_from_slice(bytes); - self.pos += bytes.len(); - Ok(bytes.len() as _) } } diff --git a/server/src/engine/storage/common/sdss/impls/sdss_r1/rw.rs b/server/src/engine/storage/common/sdss/impls/sdss_r1/rw.rs index cdf900c2..d3708a3a 100644 --- a/server/src/engine/storage/common/sdss/impls/sdss_r1/rw.rs +++ b/server/src/engine/storage/common/sdss/impls/sdss_r1/rw.rs @@ -297,7 +297,7 @@ pub struct TrackedWriter< S: FileSpecV1, const SIZE: usize = 8192, const PANIC_IF_UNFLUSHED: bool = true, - const CHECKSUM_WRITTEN_IF_BLOCK_ERROR: bool = true, + const CHECKSUM_WRITTEN_IF_BLOCK_ERROR: bool = false, > { f_d: File, f_md: S::Metadata, @@ -417,6 +417,12 @@ impl< pub fn current_checksum(&self) -> u64 { self.t_checksum.clone().finish() } + pub fn inner_mut(&mut self, f: impl Fn(&mut File) -> IoResult) -> IoResult<()> { + let file = &mut self.f_d; + let new_cursor = f(file)?; + self.t_cursor = new_cursor; + Ok(()) + } } impl< @@ -491,7 +497,13 @@ impl< return Ok(()); } self.flush_buf()?; - // write whatever capacity exceeds the buffer size + /* + write whatever capacity exceeds the buffer size + [a,b,c,d,e,f] + problem: but we can only hold two items + so write to disk: [a,b] + store in memory: [c,d,e,f] + */ let to_write_cnt = buf.len().saturating_sub(SIZE); match self.f_d.fwrite_all_count(&buf[..to_write_cnt]) { (cnt, r) => { @@ -538,6 +550,12 @@ impl< pub fn fsync(&mut self) -> IoResult<()> { self.f_d.fsync_all() } + /// Empty the write buffer + /// + /// DANGER: This means that whatever data was in the buffer will be immediately discarded + pub unsafe fn drain_buffer(&mut self) { + self.buf.clear() + } } impl< diff --git a/server/src/engine/storage/v2/raw/journal/raw/mod.rs b/server/src/engine/storage/v2/raw/journal/raw/mod.rs index b93eae48..4130d792 100644 --- a/server/src/engine/storage/v2/raw/journal/raw/mod.rs +++ b/server/src/engine/storage/v2/raw/journal/raw/mod.rs @@ -35,6 +35,7 @@ use { mem::unsafe_apis::memcpy, storage::common::{ checksum::SCrc64, + interface::fs::FileWriteExt, sdss::sdss_r1::{ rw::{SdssFile, TrackedReader, TrackedWriter}, FileSpecV1, @@ -162,6 +163,11 @@ pub fn debug_set_offset_tracking(track: bool) { local_mut!(TRACE_OFFSETS, |track_| *track_ = track) } +#[cfg(test)] +pub fn debug_get_first_meta_triplet() -> Option<(u64, u64, u64)> { + local_mut!(FIRST_TRIPLET, |tr| core::mem::take(tr)) +} + #[derive(Debug, PartialEq)] #[cfg(test)] pub enum JournalTraceEvent { @@ -229,6 +235,7 @@ local! { static TRACE: Vec = Vec::new(); static OFFSETS: std::collections::BTreeMap = Default::default(); static TRACE_OFFSETS: bool = false; + static FIRST_TRIPLET: Option<(u64, u64, u64)> = None; } macro_rules! jtrace_event_offset { @@ -519,7 +526,9 @@ pub(super) enum DriverEventKind { Journal writer implementation --- Quick notes: - - This is a low level writer and only handles driver events. Higher level impls must account for + - This is a low level writer and only handles driver events + - Checksum verification is only performed for meta events + - Implementors must handle checksums themselves +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */ @@ -622,18 +631,30 @@ impl RawJournalWriter { { self.commit_with_ctx(event, Default::default()) } - /// WARNING: ONLY CALL AFTER A FAILURE EVENT. THIS WILL EMPTY THE UNFLUSHED BUFFER - pub fn __lwt_heartbeat(&mut self) -> RuntimeResult<()> { - // verify that the on disk cursor is the same as what we know + /// roll back to the last txn + /// WARNING: only call on failure + /// + /// NB: Idempotency is guaranteed. Will rollback to, and only to the last event + pub fn __rollback(&mut self) -> RuntimeResult<()> { + // ensure cursors are in sync, even if out of position self.log_file.verify_cursor()?; - if self.log_file.cursor() == self.known_txn_offset { - // great, so if there was something in the buffer, simply ignore it - self.log_file.__zero_buffer(); - Ok(()) - } else { - // so, the on-disk file probably has some partial state. this is bad. throw an error - Err(StorageError::RawJournalRuntimeHeartbeatFail.into()) + // reverse + self.log_file.inner_mut(|file| { + let new_offset = if self.txn_id == 0 { + debug_assert_eq!(self.known_txn_offset, 0); + <::Spec as FileSpecV1>::SIZE as u64 + } else { + self.known_txn_offset + }; + file.f_truncate(new_offset)?; + Ok(new_offset) + })?; + // reverse successful, now empty write buffer + unsafe { + // UNSAFE(@ohsayan): since the log has been reversed, whatever we failed to write should simply be ignored + self.log_file.drain_buffer(); } + Ok(()) } } @@ -642,13 +663,23 @@ impl RawJournalWriter { &mut self, f: impl FnOnce(&mut Self, u128) -> RuntimeResult, ) -> RuntimeResult { + #[cfg(test)] + if local_ref!(FIRST_TRIPLET, |tr| { tr.is_none() }) { + local_mut!(FIRST_TRIPLET, |tr| { + *tr = Some(( + self.known_txn_id, + self.known_txn_offset, + self.log_file.current_checksum(), + )); + }) + } let id = self.txn_id; - self.txn_id += 1; let ret = f(self, id as u128); if ret.is_ok() { jtrace_event_offset!(id, self.log_file.cursor()); self.known_txn_id = id; self.known_txn_offset = self.log_file.cursor(); + self.txn_id += 1; } ret } @@ -859,7 +890,6 @@ impl RawJournalReader { }, ErrorKind::Storage(e) => match e { // unreachable errors (no execution path here) - StorageError::RawJournalRuntimeHeartbeatFail // can't reach runtime error before driver start | StorageError::RawJournalRuntimeDirty | StorageError::FileDecodeHeaderVersionMismatch // should be caught earlier | StorageError::FileDecodeHeaderCorrupted // should be caught earlier @@ -1030,6 +1060,11 @@ impl RawJournalReader { jtrace_reader!(DriverEventExpectedCloseGotClose); // a driver closed event; we've checked integrity, but we must check the field values let valid_meta = okay! { + /* + basically: + - if this is a new journal all these values are 0 (we're essentially reading the first event) + - otherwise, it is the last event offset + */ self.last_txn_checksum == drv_close_event.last_checksum, self.last_txn_id == drv_close_event.last_txn_id, self.last_txn_offset == drv_close_event.last_offset, diff --git a/server/src/engine/storage/v2/raw/journal/raw/tests/mod.rs b/server/src/engine/storage/v2/raw/journal/raw/tests/mod.rs index 7afdcf2f..7f211e81 100644 --- a/server/src/engine/storage/v2/raw/journal/raw/tests/mod.rs +++ b/server/src/engine/storage/v2/raw/journal/raw/tests/mod.rs @@ -29,8 +29,8 @@ mod recovery; use { super::{ - CommitPreference, DriverEvent, DriverEventKind, JournalInitializer, RawJournalAdapter, - RawJournalAdapterEvent, RawJournalWriter, + create_journal, CommitPreference, DriverEvent, DriverEventKind, JournalInitializer, + RawJournalAdapter, RawJournalAdapterEvent, RawJournalWriter, }, crate::engine::{ error::StorageError, @@ -216,3 +216,48 @@ fn encode_decode_meta() { let decoded1 = DriverEvent::decode(encoded1).unwrap(); assert_eq!(dv1, decoded1); } + +#[test] +fn first_triplet_sanity() { + // first driver event + { + assert_eq!( + super::debug_get_first_meta_triplet(), + None, + "failed for first driver event" + ); + let mut jrnl = create_journal::("first_triplet_sanity_drv_event").unwrap(); + assert_eq!( + super::debug_get_first_meta_triplet(), + None, + "failed for first driver event" + ); + RawJournalWriter::close_driver(&mut jrnl).unwrap(); + assert_eq!( + super::debug_get_first_meta_triplet(), + Some((0, 0, 0)), + "failed for first driver event" + ); + } + // first server event + { + assert_eq!( + super::debug_get_first_meta_triplet(), + None, + "failed for first server event" + ); + let mut jrnl = + create_journal::("first_triplet_sanity_server_event").unwrap(); + assert_eq!( + super::debug_get_first_meta_triplet(), + None, + "failed for first server event" + ); + SimpleDB::new().push(&mut jrnl, "hello").unwrap(); + assert_eq!( + super::debug_get_first_meta_triplet(), + Some((0, 0, 0)), + "failed for first driver event" + ); + } +} diff --git a/server/src/engine/storage/v2/raw/journal/raw/tests/recovery.rs b/server/src/engine/storage/v2/raw/journal/raw/tests/recovery.rs index 17a18bb4..81e91f56 100644 --- a/server/src/engine/storage/v2/raw/journal/raw/tests/recovery.rs +++ b/server/src/engine/storage/v2/raw/journal/raw/tests/recovery.rs @@ -29,9 +29,13 @@ use { crate::{ engine::{ error::ErrorKind, + fractal, storage::{ common::{ - interface::fs::{File, FileExt, FileSystem, FileWrite, FileWriteExt}, + interface::{ + fs::{File, FileExt, FileSystem, FileWrite, FileWriteExt}, + vfs_utils, + }, sdss::sdss_r1::FileSpecV1, }, v2::raw::journal::{ @@ -135,6 +139,17 @@ fn make_corrupted_file_name(journal_id: &str, trim_size: usize) -> String { format!("{journal_id}-trimmed-{trim_size}.db") } +fn journal_init(journal_id: &str) -> RuntimeResult> { + create_journal(journal_id) +} + +fn journal_open( + journal_id: &str, + db: &SimpleDB, +) -> RuntimeResult> { + open_journal(journal_id, db, JournalSettings::default()) +} + #[derive(Debug)] /// Information about the layout of the modified journal struct ModifiedJournalStorageInfo { @@ -213,13 +228,7 @@ fn emulate_sequentially_varying_single_corruption( for trim_size in 1..=last_event_size { // create a copy of the "good" journal and corrupt it let corrupted_journal_path = make_corrupted_file_name(journal_id, trim_size); - let open_journal_fn = |db: &SimpleDB| { - open_journal::( - &corrupted_journal_path, - db, - JournalSettings::default(), - ) - }; + let open_journal_fn = |db: &SimpleDB| journal_open(&corrupted_journal_path, db); // modify journal let storage_info = modified_journal_generator_fn( journal_id, @@ -408,7 +417,7 @@ fn corruption_before_close() { */ "close_event_corruption_empty.db", |jrnl_id| { - let mut jrnl = create_journal::(jrnl_id)?; + let mut jrnl = journal_init(jrnl_id)?; RawJournalWriter::close_driver(&mut jrnl)?; Ok(InitializerInfo::new_last_event(0)) }, @@ -419,7 +428,7 @@ fn corruption_before_close() { */ "close_event_corruption.db", |jrnl_id| { - let mut jrnl = create_journal::(jrnl_id)?; + let mut jrnl = journal_init(jrnl_id)?; let operation_count = apply_event_mix(&mut jrnl)?; RawJournalWriter::close_driver(&mut jrnl)?; Ok(InitializerInfo::new_last_event(operation_count)) @@ -432,15 +441,11 @@ fn corruption_before_close() { "close_event_corruption_open_close_open_close.db", |jrnl_id| { // open and close - let mut jrnl = create_journal::(jrnl_id)?; + let mut jrnl = journal_init(jrnl_id)?; RawJournalWriter::close_driver(&mut jrnl)?; drop(jrnl); // reinit and close - let mut jrnl = open_journal::( - jrnl_id, - &SimpleDB::new(), - JournalSettings::default(), - )?; + let mut jrnl = journal_open(jrnl_id, &SimpleDB::new())?; RawJournalWriter::close_driver(&mut jrnl)?; Ok(InitializerInfo::new_last_event(2)) }, @@ -603,15 +608,11 @@ fn corruption_after_reopen() { */ "corruption_after_reopen.db", |jrnl_id| { - let mut jrnl = create_journal::(jrnl_id)?; + let mut jrnl = journal_init(jrnl_id)?; RawJournalWriter::close_driver(&mut jrnl)?; drop(jrnl); // reopen, but don't close - open_journal::( - jrnl_id, - &SimpleDB::new(), - JournalSettings::default(), - )?; + journal_open(jrnl_id, &SimpleDB::new())?; Ok(InitializerInfo::new_last_event(1)) }, ), @@ -621,16 +622,12 @@ fn corruption_after_reopen() { */ "corruption_after_ropen_multi_before_close.db", |jrnl_id| { - let mut jrnl = create_journal::(jrnl_id)?; + let mut jrnl = journal_init(jrnl_id)?; let operation_count = apply_event_mix(&mut jrnl)?; RawJournalWriter::close_driver(&mut jrnl)?; drop(jrnl); // reopen, but don't close - open_journal::( - jrnl_id, - &SimpleDB::new(), - JournalSettings::default(), - )?; + journal_open(jrnl_id, &SimpleDB::new())?; Ok(InitializerInfo::new_last_event(operation_count + 1)) // + 1 since we have the reopen event which is the next event that'll vanish }, ), @@ -1007,15 +1004,11 @@ fn midway_corruption_close() { we emulate a sequential corruption case for (0) */ // create and close - let mut jrnl = create_journal::(jrnl_id)?; + let mut jrnl = journal_init(jrnl_id)?; RawJournalWriter::close_driver(&mut jrnl)?; drop(jrnl); // reopen and close - let mut jrnl = open_journal::( - jrnl_id, - &SimpleDB::new(), - JournalSettings::default(), - )?; + let mut jrnl = journal_open(jrnl_id, &SimpleDB::new())?; RawJournalWriter::close_driver(&mut jrnl)?; drop(jrnl); Ok(InitializerInfo::new(0, 2)) // close (to corrupt), reopen, close @@ -1032,27 +1025,19 @@ fn midway_corruption_close() { |jrnl_id| { { // create and close - let mut jrnl = create_journal::(jrnl_id)?; + let mut jrnl = journal_init(jrnl_id)?; RawJournalWriter::close_driver(&mut jrnl)?; // (0) } let op_cnt; { // reopen, apply mix and close - let mut jrnl = open_journal::( - jrnl_id, - &SimpleDB::new(), - JournalSettings::default(), - )?; // (1) + let mut jrnl = journal_open(jrnl_id, &SimpleDB::new())?; // (1) op_cnt = apply_event_mix(&mut jrnl)?; RawJournalWriter::close_driver(&mut jrnl)?; // <-- (op_cnt + 2) corrupt this one } { // reopen and close - let mut jrnl = open_journal::( - jrnl_id, - &SimpleDB::new(), - JournalSettings::default(), - )?; // (op_cnt + 3) + let mut jrnl = journal_open(jrnl_id, &SimpleDB::new())?; // (op_cnt + 3) RawJournalWriter::close_driver(&mut jrnl)?; // (op_cnt + 4) } Ok(InitializerInfo::new(op_cnt + 2, op_cnt + 4)) @@ -1071,25 +1056,17 @@ fn midway_corruption_close() { |jrnl_id| { { // create and close - let mut jrnl = create_journal::(jrnl_id)?; + let mut jrnl = journal_init(jrnl_id)?; RawJournalWriter::close_driver(&mut jrnl)?; // (0) } { // reopen and close - let mut jrnl = open_journal::( - jrnl_id, - &SimpleDB::new(), - JournalSettings::default(), - )?; // (1) + let mut jrnl = journal_open(jrnl_id, &SimpleDB::new())?; // (1) RawJournalWriter::close_driver(&mut jrnl)?; // <-- (2) corrupt this one } let op_cnt; { - let mut jrnl = open_journal::( - jrnl_id, - &SimpleDB::new(), - JournalSettings::default(), - )?; // (3) + let mut jrnl = journal_open(jrnl_id, &SimpleDB::new())?; // (3) op_cnt = apply_event_mix(&mut jrnl)?; // (3 + op_count) RawJournalWriter::close_driver(&mut jrnl)?; // (4 + op_count) } @@ -1199,15 +1176,11 @@ fn midway_corruption_reopen() { journal. we emulate a midway corruption where the reopen (1) gets corrupted. */ { - let mut jrnl = create_journal::(jrnl_id)?; + let mut jrnl = journal_init(jrnl_id)?; RawJournalWriter::close_driver(&mut jrnl)?; // (0) } { - let mut jrnl = open_journal::( - jrnl_id, - &SimpleDB::new(), - JournalSettings::default(), - )?; // (1) <-- corrupt + let mut jrnl = journal_open(jrnl_id, &SimpleDB::new())?; // (1) <-- corrupt RawJournalWriter::close_driver(&mut jrnl)?; // (2) } Ok(InitializerInfo::new(1, 2)) @@ -1222,16 +1195,12 @@ fn midway_corruption_reopen() { |jrnl_id| { let op_count; { - let mut jrnl = create_journal::(jrnl_id)?; + let mut jrnl = journal_init(jrnl_id)?; op_count = apply_event_mix(&mut jrnl)?; RawJournalWriter::close_driver(&mut jrnl)?; } { - let mut jrnl = open_journal::( - jrnl_id, - &SimpleDB::new(), - JournalSettings::default(), - )?; + let mut jrnl = journal_open(jrnl_id, &SimpleDB::new())?; RawJournalWriter::close_driver(&mut jrnl)?; } Ok(InitializerInfo::new((op_count + 1) as u64, 102)) @@ -1245,15 +1214,11 @@ fn midway_corruption_reopen() { "midway_corruption_reopen_apply_post_corrupted_reopen", |jrnl_id| { { - let mut jrnl = create_journal::(jrnl_id)?; + let mut jrnl = journal_init(jrnl_id)?; RawJournalWriter::close_driver(&mut jrnl)?; } { - let mut jrnl = open_journal::( - jrnl_id, - &SimpleDB::new(), - JournalSettings::default(), - )?; // <-- corrupt this one + let mut jrnl = journal_open(jrnl_id, &SimpleDB::new())?; // <-- corrupt this one let _ = apply_event_mix(&mut jrnl)?; // apply mix RawJournalWriter::close_driver(&mut jrnl)?; } @@ -1363,7 +1328,7 @@ fn midway_corruption_at_runtime() { */ "midway_corruption_at_runtime_open_server_event_close", |jrnl_id| { - let mut jrnl = create_journal::(jrnl_id)?; + let mut jrnl = journal_init(jrnl_id)?; SimpleDB::new().push(&mut jrnl, KEY)?; RawJournalWriter::close_driver(&mut jrnl)?; Ok(InitializerInfo::new(0, 1)) @@ -1376,7 +1341,7 @@ fn midway_corruption_at_runtime() { */ "midway_corruption_at_runtime_open_multiserver_event_then_close_corrupt_last", |jrnl_id| { - let mut jrnl = create_journal::(jrnl_id)?; + let mut jrnl = journal_init(jrnl_id)?; let mut sdb = SimpleDB::new(); for num in 1..=TRIALS { sdb.push(&mut jrnl, keyfmt(num))?; @@ -1392,7 +1357,7 @@ fn midway_corruption_at_runtime() { */ "midway_corruption_at_runtime_open_multiserver_event_then_close_corrupt_first", |jrnl_id| { - let mut jrnl = create_journal::(jrnl_id)?; + let mut jrnl = journal_init(jrnl_id)?; let mut sdb = SimpleDB::new(); for num in 1..=TRIALS { sdb.push(&mut jrnl, keyfmt(num))?; @@ -1495,3 +1460,125 @@ fn midway_corruption_at_runtime() { }, ) } + +/* + rollback tests +*/ + +/// Steps: +/// 1. A new log is created +/// 2. Events and corruptions are introduced +/// 3. Rolled back +/// 4. Closed +/// 5. Re-opened +fn emulate_failure_for_rollback( + journal_id: &str, + action: impl Fn(&mut SimpleDB, &mut RawJournalWriter) -> RuntimeResult<()>, + verify_error: impl Fn(fractal::error::Error), + post_rollback: impl Fn(&SimpleDB), +) { + { + let mut db = SimpleDB::new(); + let mut jrnl = create_journal::(journal_id).unwrap(); + let err = action(&mut db, &mut jrnl).unwrap_err(); + verify_error(err); + for _ in 0..1000 { + // idempotency guarantee: no matter how many times this is called, the underlying state will rollback to, and only to the last event + jrnl.__rollback().unwrap(); + } + RawJournalWriter::close_driver(&mut jrnl).unwrap(); + } + { + let db = SimpleDB::new(); + let mut jrnl = journal_open(journal_id, &db).expect(&format!("{:#?}", debug_get_trace())); + post_rollback(&db); + RawJournalWriter::close_driver(&mut jrnl).unwrap(); + } + FileSystem::remove_file(journal_id).unwrap(); +} + +#[test] +fn rollback_write_zero_empty_log() { + emulate_failure_for_rollback( + "rollback_empty_log_write_zero", + |db, jrnl| { + vfs_utils::debug_enable_zero_write_crash(); + let r = db.push(jrnl, "hello, world"); + vfs_utils::debug_disable_write_crash(); + r + }, + |e| match e.kind() { + ErrorKind::IoError(io) if io.kind() == IoErrorKind::WriteZero => {} + unexpected => panic!("expected write zero, got {unexpected:?}"), + }, + |db| assert_eq!(db.data().len(), 0), + ); +} + +#[test] +fn rollback_write_zero_nonempty_log() { + emulate_failure_for_rollback( + "rollback_write_zero_nonempty_log", + |db, jrnl| { + // commit a single "good" event + db.push(jrnl, "my good key")?; + vfs_utils::debug_enable_zero_write_crash(); + let r = db.push(jrnl, "this won't go in"); + vfs_utils::debug_disable_write_crash(); + r + }, + |e| match e.kind() { + ErrorKind::IoError(io) if io.kind() == IoErrorKind::WriteZero => {} + unexpected => panic!("expected write zero, got {unexpected:?}"), + }, + |db| { + assert_eq!(db.data().len(), 1); + assert_eq!(db.data()[0], "my good key") + }, + ) +} + +#[test] +fn rollback_random_write_failure_empty_log() { + for _ in 0..100 { + emulate_failure_for_rollback( + "rollback_random_write_failure_empty_log", + |db, jrnl| { + vfs_utils::debug_enable_random_write_crash(); + let r = db.push(jrnl, "hello, world"); + vfs_utils::debug_disable_write_crash(); + r + }, + |e| match e.kind() { + ErrorKind::IoError(io) if io.kind() == IoErrorKind::WriteZero => {} + unexpected => panic!("expected write zero, got {unexpected:?}"), + }, + |db| assert_eq!(db.data().len(), 0), + ); + } +} + +#[test] +fn rollback_random_write_failure_log() { + for _ in 0..100 { + emulate_failure_for_rollback( + "rollback_random_write_failure_log", + |db, jrnl| { + // commit a single "good" event + db.push(jrnl, "my good key")?; + vfs_utils::debug_enable_random_write_crash(); + let r = db.push(jrnl, "this won't go in"); + vfs_utils::debug_disable_write_crash(); + r + }, + |e| match e.kind() { + ErrorKind::IoError(io) if io.kind() == IoErrorKind::WriteZero => {} + unexpected => panic!("expected write zero, got {unexpected:?}"), + }, + |db| { + assert_eq!(db.data().len(), 1); + assert_eq!(db.data()[0], "my good key") + }, + ) + } +} diff --git a/server/src/util/os.rs b/server/src/util/os.rs index e7398589..2d9c6068 100644 --- a/server/src/util/os.rs +++ b/server/src/util/os.rs @@ -54,6 +54,9 @@ impl SysIOError { pub fn kind(&self) -> std::io::ErrorKind { self.0.kind() } + pub fn inner(&self) -> &std::io::Error { + &self.0 + } } impl From for SysIOError {