Merge pull request #340 from skytable/storage/online-recover

storage: Online recovery
next
Sayan 5 months ago committed by GitHub
commit e4dc0b4332
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -6,7 +6,8 @@ All changes in this project will be noted in this file.
### Additions ### Additions
- Skyhash/2.1: Restored support for pipelines - Skyhash/2: Restored support for pipelines
- Enable online (runtime) recovery of transactional failures due to disk errors
## Version 0.8.1 ## Version 0.8.1

@ -251,7 +251,6 @@ enumerate_err! {
runtime errors runtime errors
---- ----
*/ */
RawJournalRuntimeHeartbeatFail = "journal-lwt-heartbeat-failed",
RawJournalRuntimeDirty = "journal-in-dirty-state", RawJournalRuntimeDirty = "journal-in-dirty-state",
} }
} }

@ -115,7 +115,7 @@ pub enum CriticalTask {
/// Write a new data batch /// Write a new data batch
WriteBatch(ModelUniqueID, usize), WriteBatch(ModelUniqueID, usize),
/// try recovering model ID /// try recovering model ID
TryModelAutorecoverLWT(ModelUniqueID), TryModelAutorecover(ModelUniqueID),
CheckGNSDriver, CheckGNSDriver,
} }
@ -323,13 +323,7 @@ impl FractalMgr {
match task { match task {
CriticalTask::CheckGNSDriver => { CriticalTask::CheckGNSDriver => {
info!("trying to autorecover GNS driver"); info!("trying to autorecover GNS driver");
match global match global.state().gns_driver().txn_driver.lock().__rollback() {
.state()
.gns_driver()
.txn_driver
.lock()
.__lwt_heartbeat()
{
Ok(()) => { Ok(()) => {
info!("GNS driver has been successfully auto-recovered"); info!("GNS driver has been successfully auto-recovered");
global.state().gns_driver().status().set_okay(); global.state().gns_driver().status().set_okay();
@ -343,7 +337,7 @@ impl FractalMgr {
} }
} }
} }
CriticalTask::TryModelAutorecoverLWT(mdl_id) => { CriticalTask::TryModelAutorecover(mdl_id) => {
info!("trying to autorecover model {mdl_id}"); info!("trying to autorecover model {mdl_id}");
match global match global
.state() .state()
@ -355,7 +349,7 @@ impl FractalMgr {
Some(mdl) if mdl.data().get_uuid() == mdl_id.uuid() => { Some(mdl) if mdl.data().get_uuid() == mdl_id.uuid() => {
let mut drv = mdl.driver().batch_driver().lock(); let mut drv = mdl.driver().batch_driver().lock();
let drv = drv.as_mut().unwrap(); let drv = drv.as_mut().unwrap();
match drv.__lwt_heartbeat() { match drv.__rollback() {
Ok(()) => { Ok(()) => {
mdl.driver().status().set_okay(); mdl.driver().status().set_okay();
global.health().report_recovery(); global.health().report_recovery();
@ -364,7 +358,7 @@ impl FractalMgr {
Err(e) => { Err(e) => {
error!("failed to autorecover {mdl_id} with {e}. will try again"); error!("failed to autorecover {mdl_id} with {e}. will try again");
self.hp_dispatcher self.hp_dispatcher
.send(Task::new(CriticalTask::TryModelAutorecoverLWT(mdl_id))) .send(Task::new(CriticalTask::TryModelAutorecover(mdl_id)))
.unwrap() .unwrap()
} }
} }
@ -548,9 +542,7 @@ impl FractalMgr {
.map_err(|e| { .map_err(|e| {
mdl_driver_.status().set_iffy(); mdl_driver_.status().set_iffy();
self.hp_dispatcher self.hp_dispatcher
.send(Task::new(CriticalTask::TryModelAutorecoverLWT( .send(Task::new(CriticalTask::TryModelAutorecover(mdl_id.into())))
mdl_id.into(),
)))
.unwrap(); .unwrap();
(e, BatchStats::into_inner(batch_stats)) (e, BatchStats::into_inner(batch_stats))
}) })

@ -133,7 +133,7 @@ impl GlobalInstanceLike for TestGlobal {
.commit_with_ctx(StdModelBatch::new(mdl.data(), count), BatchStats::new()) .commit_with_ctx(StdModelBatch::new(mdl.data(), count), BatchStats::new())
.unwrap() .unwrap()
} }
CriticalTask::TryModelAutorecoverLWT(_) => {} CriticalTask::TryModelAutorecover(_) => {}
CriticalTask::CheckGNSDriver => {} CriticalTask::CheckGNSDriver => {}
} }
} }

@ -401,7 +401,7 @@ macro_rules! local {
} }
macro_rules! local_mut { macro_rules! local_mut {
($ident:ident, $call:expr) => {{ ($ident:expr, $call:expr) => {{
#[inline(always)] #[inline(always)]
fn _f<T, U>(v: &::std::cell::RefCell<T>, f: impl FnOnce(&mut T) -> U) -> U { fn _f<T, U>(v: &::std::cell::RefCell<T>, f: impl FnOnce(&mut T) -> U) -> U {
f(&mut *v.borrow_mut()) f(&mut *v.borrow_mut())

@ -192,6 +192,7 @@ pub trait FileWrite {
) )
} }
Ok(n) => written += n, Ok(n) => written += n,
Err(e) if e.kind() == ErrorKind::Interrupted => continue,
Err(e) => return (written, Err(e)), Err(e) => return (written, Err(e)),
} }
} }

@ -33,3 +33,5 @@
pub mod fs; pub mod fs;
#[cfg(test)] #[cfg(test)]
mod vfs; mod vfs;
#[cfg(test)]
pub use vfs::vfs_utils;

@ -25,7 +25,7 @@
*/ */
use { use {
crate::{engine::sync::cell::Lazy, IoResult}, crate::{engine::sync::cell::Lazy, util::test_utils, IoResult},
parking_lot::RwLock, parking_lot::RwLock,
std::{ std::{
collections::{ collections::{
@ -41,6 +41,34 @@ use {
--- ---
*/ */
pub mod vfs_utils {
#[derive(Debug, PartialEq, Clone, Copy)]
pub(super) enum WriteCrashKind {
None,
Zero,
Random,
}
local!(
static RANDOM_WRITE_CRASH: WriteCrashKind = WriteCrashKind::None;
pub(super) static RNG: Option<rand::rngs::ThreadRng> = None;
);
/// WARNING: A random write crash automatically degrades to a [`WriteCrashKind::Zero`] as soon as it completes
/// to prevent any further data writes (due to retries in
/// [`fs::FileWrite::fwrite_all_count`](super::super::fs::FileWrite::fwrite_all_count))
pub fn debug_enable_random_write_crash() {
local_mut!(RANDOM_WRITE_CRASH, |crash| *crash = WriteCrashKind::Random)
}
pub fn debug_enable_zero_write_crash() {
local_mut!(RANDOM_WRITE_CRASH, |crash| *crash = WriteCrashKind::Zero)
}
pub fn debug_disable_write_crash() {
local_mut!(RANDOM_WRITE_CRASH, |crash| *crash = WriteCrashKind::None)
}
pub(super) fn debug_write_crash_setting() -> WriteCrashKind {
local_ref!(RANDOM_WRITE_CRASH, |crash| *crash)
}
}
/* /*
definitions definitions
--- ---
@ -167,12 +195,41 @@ impl VFile {
if !self.write { if !self.write {
return Err(Error::new(ErrorKind::PermissionDenied, "Write permission denied").into()); return Err(Error::new(ErrorKind::PermissionDenied, "Write permission denied").into());
} }
if self.pos + bytes.len() > self.data.len() { match vfs_utils::debug_write_crash_setting() {
self.data.resize(self.pos + bytes.len(), 0); vfs_utils::WriteCrashKind::None => {
if self.pos + bytes.len() > self.data.len() {
self.data.resize(self.pos + bytes.len(), 0);
}
self.data[self.pos..self.pos + bytes.len()].copy_from_slice(bytes);
self.pos += bytes.len();
Ok(bytes.len() as _)
}
vfs_utils::WriteCrashKind::Random => {
let actual_write_length = local_mut!(vfs_utils::RNG, |rng| {
match rng {
Some(ref mut rng) => test_utils::random_number(0, bytes.len(), rng),
None => {
let mut rng_ = rand::thread_rng();
let r = test_utils::random_number(0, bytes.len(), &mut rng_);
*rng = Some(rng_);
r
}
}
});
// write some random part of the buffer into this file
if self.pos + actual_write_length > self.data.len() {
self.data.resize(self.pos + actual_write_length, 0);
}
self.data[self.pos..self.pos + actual_write_length]
.copy_from_slice(&bytes[..actual_write_length]);
self.pos += actual_write_length;
// now soon as this is complete, downgrade error type to writezero so that we don't write any further data during
// a retry
vfs_utils::debug_enable_zero_write_crash();
Ok(actual_write_length as _)
}
vfs_utils::WriteCrashKind::Zero => Ok(0),
} }
self.data[self.pos..self.pos + bytes.len()].copy_from_slice(bytes);
self.pos += bytes.len();
Ok(bytes.len() as _)
} }
} }

@ -297,7 +297,7 @@ pub struct TrackedWriter<
S: FileSpecV1, S: FileSpecV1,
const SIZE: usize = 8192, const SIZE: usize = 8192,
const PANIC_IF_UNFLUSHED: bool = true, const PANIC_IF_UNFLUSHED: bool = true,
const CHECKSUM_WRITTEN_IF_BLOCK_ERROR: bool = true, const CHECKSUM_WRITTEN_IF_BLOCK_ERROR: bool = false,
> { > {
f_d: File, f_d: File,
f_md: S::Metadata, f_md: S::Metadata,
@ -417,6 +417,12 @@ impl<
pub fn current_checksum(&self) -> u64 { pub fn current_checksum(&self) -> u64 {
self.t_checksum.clone().finish() self.t_checksum.clone().finish()
} }
pub fn inner_mut(&mut self, f: impl Fn(&mut File) -> IoResult<u64>) -> IoResult<()> {
let file = &mut self.f_d;
let new_cursor = f(file)?;
self.t_cursor = new_cursor;
Ok(())
}
} }
impl< impl<
@ -491,7 +497,13 @@ impl<
return Ok(()); return Ok(());
} }
self.flush_buf()?; self.flush_buf()?;
// write whatever capacity exceeds the buffer size /*
write whatever capacity exceeds the buffer size
[a,b,c,d,e,f]
problem: but we can only hold two items
so write to disk: [a,b]
store in memory: [c,d,e,f]
*/
let to_write_cnt = buf.len().saturating_sub(SIZE); let to_write_cnt = buf.len().saturating_sub(SIZE);
match self.f_d.fwrite_all_count(&buf[..to_write_cnt]) { match self.f_d.fwrite_all_count(&buf[..to_write_cnt]) {
(cnt, r) => { (cnt, r) => {
@ -538,6 +550,12 @@ impl<
pub fn fsync(&mut self) -> IoResult<()> { pub fn fsync(&mut self) -> IoResult<()> {
self.f_d.fsync_all() self.f_d.fsync_all()
} }
/// Empty the write buffer
///
/// DANGER: This means that whatever data was in the buffer will be immediately discarded
pub unsafe fn drain_buffer(&mut self) {
self.buf.clear()
}
} }
impl< impl<

@ -35,6 +35,7 @@ use {
mem::unsafe_apis::memcpy, mem::unsafe_apis::memcpy,
storage::common::{ storage::common::{
checksum::SCrc64, checksum::SCrc64,
interface::fs::FileWriteExt,
sdss::sdss_r1::{ sdss::sdss_r1::{
rw::{SdssFile, TrackedReader, TrackedWriter}, rw::{SdssFile, TrackedReader, TrackedWriter},
FileSpecV1, FileSpecV1,
@ -162,6 +163,11 @@ pub fn debug_set_offset_tracking(track: bool) {
local_mut!(TRACE_OFFSETS, |track_| *track_ = track) local_mut!(TRACE_OFFSETS, |track_| *track_ = track)
} }
#[cfg(test)]
pub fn debug_get_first_meta_triplet() -> Option<(u64, u64, u64)> {
local_mut!(FIRST_TRIPLET, |tr| core::mem::take(tr))
}
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
#[cfg(test)] #[cfg(test)]
pub enum JournalTraceEvent { pub enum JournalTraceEvent {
@ -229,6 +235,7 @@ local! {
static TRACE: Vec<JournalTraceEvent> = Vec::new(); static TRACE: Vec<JournalTraceEvent> = Vec::new();
static OFFSETS: std::collections::BTreeMap<u64, u64> = Default::default(); static OFFSETS: std::collections::BTreeMap<u64, u64> = Default::default();
static TRACE_OFFSETS: bool = false; static TRACE_OFFSETS: bool = false;
static FIRST_TRIPLET: Option<(u64, u64, u64)> = None;
} }
macro_rules! jtrace_event_offset { macro_rules! jtrace_event_offset {
@ -519,7 +526,9 @@ pub(super) enum DriverEventKind {
Journal writer implementation Journal writer implementation
--- ---
Quick notes: Quick notes:
- This is a low level writer and only handles driver events. Higher level impls must account for - This is a low level writer and only handles driver events
- Checksum verification is only performed for meta events
- Implementors must handle checksums themselves
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
*/ */
@ -622,18 +631,30 @@ impl<J: RawJournalAdapter> RawJournalWriter<J> {
{ {
self.commit_with_ctx(event, Default::default()) self.commit_with_ctx(event, Default::default())
} }
/// WARNING: ONLY CALL AFTER A FAILURE EVENT. THIS WILL EMPTY THE UNFLUSHED BUFFER /// roll back to the last txn
pub fn __lwt_heartbeat(&mut self) -> RuntimeResult<()> { /// WARNING: only call on failure
// verify that the on disk cursor is the same as what we know ///
/// NB: Idempotency is guaranteed. Will rollback to, and only to the last event
pub fn __rollback(&mut self) -> RuntimeResult<()> {
// ensure cursors are in sync, even if out of position
self.log_file.verify_cursor()?; self.log_file.verify_cursor()?;
if self.log_file.cursor() == self.known_txn_offset { // reverse
// great, so if there was something in the buffer, simply ignore it self.log_file.inner_mut(|file| {
self.log_file.__zero_buffer(); let new_offset = if self.txn_id == 0 {
Ok(()) debug_assert_eq!(self.known_txn_offset, 0);
} else { <<J as RawJournalAdapter>::Spec as FileSpecV1>::SIZE as u64
// so, the on-disk file probably has some partial state. this is bad. throw an error } else {
Err(StorageError::RawJournalRuntimeHeartbeatFail.into()) self.known_txn_offset
};
file.f_truncate(new_offset)?;
Ok(new_offset)
})?;
// reverse successful, now empty write buffer
unsafe {
// UNSAFE(@ohsayan): since the log has been reversed, whatever we failed to write should simply be ignored
self.log_file.drain_buffer();
} }
Ok(())
} }
} }
@ -642,13 +663,23 @@ impl<J: RawJournalAdapter> RawJournalWriter<J> {
&mut self, &mut self,
f: impl FnOnce(&mut Self, u128) -> RuntimeResult<T>, f: impl FnOnce(&mut Self, u128) -> RuntimeResult<T>,
) -> RuntimeResult<T> { ) -> RuntimeResult<T> {
#[cfg(test)]
if local_ref!(FIRST_TRIPLET, |tr| { tr.is_none() }) {
local_mut!(FIRST_TRIPLET, |tr| {
*tr = Some((
self.known_txn_id,
self.known_txn_offset,
self.log_file.current_checksum(),
));
})
}
let id = self.txn_id; let id = self.txn_id;
self.txn_id += 1;
let ret = f(self, id as u128); let ret = f(self, id as u128);
if ret.is_ok() { if ret.is_ok() {
jtrace_event_offset!(id, self.log_file.cursor()); jtrace_event_offset!(id, self.log_file.cursor());
self.known_txn_id = id; self.known_txn_id = id;
self.known_txn_offset = self.log_file.cursor(); self.known_txn_offset = self.log_file.cursor();
self.txn_id += 1;
} }
ret ret
} }
@ -859,7 +890,6 @@ impl<J: RawJournalAdapter> RawJournalReader<J> {
}, },
ErrorKind::Storage(e) => match e { ErrorKind::Storage(e) => match e {
// unreachable errors (no execution path here) // unreachable errors (no execution path here)
StorageError::RawJournalRuntimeHeartbeatFail // can't reach runtime error before driver start
| StorageError::RawJournalRuntimeDirty | StorageError::RawJournalRuntimeDirty
| StorageError::FileDecodeHeaderVersionMismatch // should be caught earlier | StorageError::FileDecodeHeaderVersionMismatch // should be caught earlier
| StorageError::FileDecodeHeaderCorrupted // should be caught earlier | StorageError::FileDecodeHeaderCorrupted // should be caught earlier
@ -1030,6 +1060,11 @@ impl<J: RawJournalAdapter> RawJournalReader<J> {
jtrace_reader!(DriverEventExpectedCloseGotClose); jtrace_reader!(DriverEventExpectedCloseGotClose);
// a driver closed event; we've checked integrity, but we must check the field values // a driver closed event; we've checked integrity, but we must check the field values
let valid_meta = okay! { let valid_meta = okay! {
/*
basically:
- if this is a new journal all these values are 0 (we're essentially reading the first event)
- otherwise, it is the last event offset
*/
self.last_txn_checksum == drv_close_event.last_checksum, self.last_txn_checksum == drv_close_event.last_checksum,
self.last_txn_id == drv_close_event.last_txn_id, self.last_txn_id == drv_close_event.last_txn_id,
self.last_txn_offset == drv_close_event.last_offset, self.last_txn_offset == drv_close_event.last_offset,

@ -29,8 +29,8 @@ mod recovery;
use { use {
super::{ super::{
CommitPreference, DriverEvent, DriverEventKind, JournalInitializer, RawJournalAdapter, create_journal, CommitPreference, DriverEvent, DriverEventKind, JournalInitializer,
RawJournalAdapterEvent, RawJournalWriter, RawJournalAdapter, RawJournalAdapterEvent, RawJournalWriter,
}, },
crate::engine::{ crate::engine::{
error::StorageError, error::StorageError,
@ -216,3 +216,48 @@ fn encode_decode_meta() {
let decoded1 = DriverEvent::decode(encoded1).unwrap(); let decoded1 = DriverEvent::decode(encoded1).unwrap();
assert_eq!(dv1, decoded1); assert_eq!(dv1, decoded1);
} }
#[test]
fn first_triplet_sanity() {
// first driver event
{
assert_eq!(
super::debug_get_first_meta_triplet(),
None,
"failed for first driver event"
);
let mut jrnl = create_journal::<SimpleDBJournal>("first_triplet_sanity_drv_event").unwrap();
assert_eq!(
super::debug_get_first_meta_triplet(),
None,
"failed for first driver event"
);
RawJournalWriter::close_driver(&mut jrnl).unwrap();
assert_eq!(
super::debug_get_first_meta_triplet(),
Some((0, 0, 0)),
"failed for first driver event"
);
}
// first server event
{
assert_eq!(
super::debug_get_first_meta_triplet(),
None,
"failed for first server event"
);
let mut jrnl =
create_journal::<SimpleDBJournal>("first_triplet_sanity_server_event").unwrap();
assert_eq!(
super::debug_get_first_meta_triplet(),
None,
"failed for first server event"
);
SimpleDB::new().push(&mut jrnl, "hello").unwrap();
assert_eq!(
super::debug_get_first_meta_triplet(),
Some((0, 0, 0)),
"failed for first driver event"
);
}
}

@ -29,9 +29,13 @@ use {
crate::{ crate::{
engine::{ engine::{
error::ErrorKind, error::ErrorKind,
fractal,
storage::{ storage::{
common::{ common::{
interface::fs::{File, FileExt, FileSystem, FileWrite, FileWriteExt}, interface::{
fs::{File, FileExt, FileSystem, FileWrite, FileWriteExt},
vfs_utils,
},
sdss::sdss_r1::FileSpecV1, sdss::sdss_r1::FileSpecV1,
}, },
v2::raw::journal::{ v2::raw::journal::{
@ -135,6 +139,17 @@ fn make_corrupted_file_name(journal_id: &str, trim_size: usize) -> String {
format!("{journal_id}-trimmed-{trim_size}.db") format!("{journal_id}-trimmed-{trim_size}.db")
} }
fn journal_init(journal_id: &str) -> RuntimeResult<RawJournalWriter<SimpleDBJournal>> {
create_journal(journal_id)
}
fn journal_open(
journal_id: &str,
db: &SimpleDB,
) -> RuntimeResult<RawJournalWriter<SimpleDBJournal>> {
open_journal(journal_id, db, JournalSettings::default())
}
#[derive(Debug)] #[derive(Debug)]
/// Information about the layout of the modified journal /// Information about the layout of the modified journal
struct ModifiedJournalStorageInfo { struct ModifiedJournalStorageInfo {
@ -213,13 +228,7 @@ fn emulate_sequentially_varying_single_corruption(
for trim_size in 1..=last_event_size { for trim_size in 1..=last_event_size {
// create a copy of the "good" journal and corrupt it // create a copy of the "good" journal and corrupt it
let corrupted_journal_path = make_corrupted_file_name(journal_id, trim_size); let corrupted_journal_path = make_corrupted_file_name(journal_id, trim_size);
let open_journal_fn = |db: &SimpleDB| { let open_journal_fn = |db: &SimpleDB| journal_open(&corrupted_journal_path, db);
open_journal::<SimpleDBJournal>(
&corrupted_journal_path,
db,
JournalSettings::default(),
)
};
// modify journal // modify journal
let storage_info = modified_journal_generator_fn( let storage_info = modified_journal_generator_fn(
journal_id, journal_id,
@ -408,7 +417,7 @@ fn corruption_before_close() {
*/ */
"close_event_corruption_empty.db", "close_event_corruption_empty.db",
|jrnl_id| { |jrnl_id| {
let mut jrnl = create_journal::<SimpleDBJournal>(jrnl_id)?; let mut jrnl = journal_init(jrnl_id)?;
RawJournalWriter::close_driver(&mut jrnl)?; RawJournalWriter::close_driver(&mut jrnl)?;
Ok(InitializerInfo::new_last_event(0)) Ok(InitializerInfo::new_last_event(0))
}, },
@ -419,7 +428,7 @@ fn corruption_before_close() {
*/ */
"close_event_corruption.db", "close_event_corruption.db",
|jrnl_id| { |jrnl_id| {
let mut jrnl = create_journal::<SimpleDBJournal>(jrnl_id)?; let mut jrnl = journal_init(jrnl_id)?;
let operation_count = apply_event_mix(&mut jrnl)?; let operation_count = apply_event_mix(&mut jrnl)?;
RawJournalWriter::close_driver(&mut jrnl)?; RawJournalWriter::close_driver(&mut jrnl)?;
Ok(InitializerInfo::new_last_event(operation_count)) Ok(InitializerInfo::new_last_event(operation_count))
@ -432,15 +441,11 @@ fn corruption_before_close() {
"close_event_corruption_open_close_open_close.db", "close_event_corruption_open_close_open_close.db",
|jrnl_id| { |jrnl_id| {
// open and close // open and close
let mut jrnl = create_journal::<SimpleDBJournal>(jrnl_id)?; let mut jrnl = journal_init(jrnl_id)?;
RawJournalWriter::close_driver(&mut jrnl)?; RawJournalWriter::close_driver(&mut jrnl)?;
drop(jrnl); drop(jrnl);
// reinit and close // reinit and close
let mut jrnl = open_journal::<SimpleDBJournal>( let mut jrnl = journal_open(jrnl_id, &SimpleDB::new())?;
jrnl_id,
&SimpleDB::new(),
JournalSettings::default(),
)?;
RawJournalWriter::close_driver(&mut jrnl)?; RawJournalWriter::close_driver(&mut jrnl)?;
Ok(InitializerInfo::new_last_event(2)) Ok(InitializerInfo::new_last_event(2))
}, },
@ -603,15 +608,11 @@ fn corruption_after_reopen() {
*/ */
"corruption_after_reopen.db", "corruption_after_reopen.db",
|jrnl_id| { |jrnl_id| {
let mut jrnl = create_journal::<SimpleDBJournal>(jrnl_id)?; let mut jrnl = journal_init(jrnl_id)?;
RawJournalWriter::close_driver(&mut jrnl)?; RawJournalWriter::close_driver(&mut jrnl)?;
drop(jrnl); drop(jrnl);
// reopen, but don't close // reopen, but don't close
open_journal::<SimpleDBJournal>( journal_open(jrnl_id, &SimpleDB::new())?;
jrnl_id,
&SimpleDB::new(),
JournalSettings::default(),
)?;
Ok(InitializerInfo::new_last_event(1)) Ok(InitializerInfo::new_last_event(1))
}, },
), ),
@ -621,16 +622,12 @@ fn corruption_after_reopen() {
*/ */
"corruption_after_ropen_multi_before_close.db", "corruption_after_ropen_multi_before_close.db",
|jrnl_id| { |jrnl_id| {
let mut jrnl = create_journal::<SimpleDBJournal>(jrnl_id)?; let mut jrnl = journal_init(jrnl_id)?;
let operation_count = apply_event_mix(&mut jrnl)?; let operation_count = apply_event_mix(&mut jrnl)?;
RawJournalWriter::close_driver(&mut jrnl)?; RawJournalWriter::close_driver(&mut jrnl)?;
drop(jrnl); drop(jrnl);
// reopen, but don't close // reopen, but don't close
open_journal::<SimpleDBJournal>( journal_open(jrnl_id, &SimpleDB::new())?;
jrnl_id,
&SimpleDB::new(),
JournalSettings::default(),
)?;
Ok(InitializerInfo::new_last_event(operation_count + 1)) // + 1 since we have the reopen event which is the next event that'll vanish Ok(InitializerInfo::new_last_event(operation_count + 1)) // + 1 since we have the reopen event which is the next event that'll vanish
}, },
), ),
@ -1007,15 +1004,11 @@ fn midway_corruption_close() {
we emulate a sequential corruption case for (0) we emulate a sequential corruption case for (0)
*/ */
// create and close // create and close
let mut jrnl = create_journal::<SimpleDBJournal>(jrnl_id)?; let mut jrnl = journal_init(jrnl_id)?;
RawJournalWriter::close_driver(&mut jrnl)?; RawJournalWriter::close_driver(&mut jrnl)?;
drop(jrnl); drop(jrnl);
// reopen and close // reopen and close
let mut jrnl = open_journal::<SimpleDBJournal>( let mut jrnl = journal_open(jrnl_id, &SimpleDB::new())?;
jrnl_id,
&SimpleDB::new(),
JournalSettings::default(),
)?;
RawJournalWriter::close_driver(&mut jrnl)?; RawJournalWriter::close_driver(&mut jrnl)?;
drop(jrnl); drop(jrnl);
Ok(InitializerInfo::new(0, 2)) // close (to corrupt), reopen, close Ok(InitializerInfo::new(0, 2)) // close (to corrupt), reopen, close
@ -1032,27 +1025,19 @@ fn midway_corruption_close() {
|jrnl_id| { |jrnl_id| {
{ {
// create and close // create and close
let mut jrnl = create_journal::<SimpleDBJournal>(jrnl_id)?; let mut jrnl = journal_init(jrnl_id)?;
RawJournalWriter::close_driver(&mut jrnl)?; // (0) RawJournalWriter::close_driver(&mut jrnl)?; // (0)
} }
let op_cnt; let op_cnt;
{ {
// reopen, apply mix and close // reopen, apply mix and close
let mut jrnl = open_journal::<SimpleDBJournal>( let mut jrnl = journal_open(jrnl_id, &SimpleDB::new())?; // (1)
jrnl_id,
&SimpleDB::new(),
JournalSettings::default(),
)?; // (1)
op_cnt = apply_event_mix(&mut jrnl)?; op_cnt = apply_event_mix(&mut jrnl)?;
RawJournalWriter::close_driver(&mut jrnl)?; // <-- (op_cnt + 2) corrupt this one RawJournalWriter::close_driver(&mut jrnl)?; // <-- (op_cnt + 2) corrupt this one
} }
{ {
// reopen and close // reopen and close
let mut jrnl = open_journal::<SimpleDBJournal>( let mut jrnl = journal_open(jrnl_id, &SimpleDB::new())?; // (op_cnt + 3)
jrnl_id,
&SimpleDB::new(),
JournalSettings::default(),
)?; // (op_cnt + 3)
RawJournalWriter::close_driver(&mut jrnl)?; // (op_cnt + 4) RawJournalWriter::close_driver(&mut jrnl)?; // (op_cnt + 4)
} }
Ok(InitializerInfo::new(op_cnt + 2, op_cnt + 4)) Ok(InitializerInfo::new(op_cnt + 2, op_cnt + 4))
@ -1071,25 +1056,17 @@ fn midway_corruption_close() {
|jrnl_id| { |jrnl_id| {
{ {
// create and close // create and close
let mut jrnl = create_journal::<SimpleDBJournal>(jrnl_id)?; let mut jrnl = journal_init(jrnl_id)?;
RawJournalWriter::close_driver(&mut jrnl)?; // (0) RawJournalWriter::close_driver(&mut jrnl)?; // (0)
} }
{ {
// reopen and close // reopen and close
let mut jrnl = open_journal::<SimpleDBJournal>( let mut jrnl = journal_open(jrnl_id, &SimpleDB::new())?; // (1)
jrnl_id,
&SimpleDB::new(),
JournalSettings::default(),
)?; // (1)
RawJournalWriter::close_driver(&mut jrnl)?; // <-- (2) corrupt this one RawJournalWriter::close_driver(&mut jrnl)?; // <-- (2) corrupt this one
} }
let op_cnt; let op_cnt;
{ {
let mut jrnl = open_journal::<SimpleDBJournal>( let mut jrnl = journal_open(jrnl_id, &SimpleDB::new())?; // (3)
jrnl_id,
&SimpleDB::new(),
JournalSettings::default(),
)?; // (3)
op_cnt = apply_event_mix(&mut jrnl)?; // (3 + op_count) op_cnt = apply_event_mix(&mut jrnl)?; // (3 + op_count)
RawJournalWriter::close_driver(&mut jrnl)?; // (4 + op_count) RawJournalWriter::close_driver(&mut jrnl)?; // (4 + op_count)
} }
@ -1199,15 +1176,11 @@ fn midway_corruption_reopen() {
journal. we emulate a midway corruption where the reopen (1) gets corrupted. journal. we emulate a midway corruption where the reopen (1) gets corrupted.
*/ */
{ {
let mut jrnl = create_journal::<SimpleDBJournal>(jrnl_id)?; let mut jrnl = journal_init(jrnl_id)?;
RawJournalWriter::close_driver(&mut jrnl)?; // (0) RawJournalWriter::close_driver(&mut jrnl)?; // (0)
} }
{ {
let mut jrnl = open_journal::<SimpleDBJournal>( let mut jrnl = journal_open(jrnl_id, &SimpleDB::new())?; // (1) <-- corrupt
jrnl_id,
&SimpleDB::new(),
JournalSettings::default(),
)?; // (1) <-- corrupt
RawJournalWriter::close_driver(&mut jrnl)?; // (2) RawJournalWriter::close_driver(&mut jrnl)?; // (2)
} }
Ok(InitializerInfo::new(1, 2)) Ok(InitializerInfo::new(1, 2))
@ -1222,16 +1195,12 @@ fn midway_corruption_reopen() {
|jrnl_id| { |jrnl_id| {
let op_count; let op_count;
{ {
let mut jrnl = create_journal::<SimpleDBJournal>(jrnl_id)?; let mut jrnl = journal_init(jrnl_id)?;
op_count = apply_event_mix(&mut jrnl)?; op_count = apply_event_mix(&mut jrnl)?;
RawJournalWriter::close_driver(&mut jrnl)?; RawJournalWriter::close_driver(&mut jrnl)?;
} }
{ {
let mut jrnl = open_journal::<SimpleDBJournal>( let mut jrnl = journal_open(jrnl_id, &SimpleDB::new())?;
jrnl_id,
&SimpleDB::new(),
JournalSettings::default(),
)?;
RawJournalWriter::close_driver(&mut jrnl)?; RawJournalWriter::close_driver(&mut jrnl)?;
} }
Ok(InitializerInfo::new((op_count + 1) as u64, 102)) Ok(InitializerInfo::new((op_count + 1) as u64, 102))
@ -1245,15 +1214,11 @@ fn midway_corruption_reopen() {
"midway_corruption_reopen_apply_post_corrupted_reopen", "midway_corruption_reopen_apply_post_corrupted_reopen",
|jrnl_id| { |jrnl_id| {
{ {
let mut jrnl = create_journal::<SimpleDBJournal>(jrnl_id)?; let mut jrnl = journal_init(jrnl_id)?;
RawJournalWriter::close_driver(&mut jrnl)?; RawJournalWriter::close_driver(&mut jrnl)?;
} }
{ {
let mut jrnl = open_journal::<SimpleDBJournal>( let mut jrnl = journal_open(jrnl_id, &SimpleDB::new())?; // <-- corrupt this one
jrnl_id,
&SimpleDB::new(),
JournalSettings::default(),
)?; // <-- corrupt this one
let _ = apply_event_mix(&mut jrnl)?; // apply mix let _ = apply_event_mix(&mut jrnl)?; // apply mix
RawJournalWriter::close_driver(&mut jrnl)?; RawJournalWriter::close_driver(&mut jrnl)?;
} }
@ -1363,7 +1328,7 @@ fn midway_corruption_at_runtime() {
*/ */
"midway_corruption_at_runtime_open_server_event_close", "midway_corruption_at_runtime_open_server_event_close",
|jrnl_id| { |jrnl_id| {
let mut jrnl = create_journal::<SimpleDBJournal>(jrnl_id)?; let mut jrnl = journal_init(jrnl_id)?;
SimpleDB::new().push(&mut jrnl, KEY)?; SimpleDB::new().push(&mut jrnl, KEY)?;
RawJournalWriter::close_driver(&mut jrnl)?; RawJournalWriter::close_driver(&mut jrnl)?;
Ok(InitializerInfo::new(0, 1)) Ok(InitializerInfo::new(0, 1))
@ -1376,7 +1341,7 @@ fn midway_corruption_at_runtime() {
*/ */
"midway_corruption_at_runtime_open_multiserver_event_then_close_corrupt_last", "midway_corruption_at_runtime_open_multiserver_event_then_close_corrupt_last",
|jrnl_id| { |jrnl_id| {
let mut jrnl = create_journal::<SimpleDBJournal>(jrnl_id)?; let mut jrnl = journal_init(jrnl_id)?;
let mut sdb = SimpleDB::new(); let mut sdb = SimpleDB::new();
for num in 1..=TRIALS { for num in 1..=TRIALS {
sdb.push(&mut jrnl, keyfmt(num))?; sdb.push(&mut jrnl, keyfmt(num))?;
@ -1392,7 +1357,7 @@ fn midway_corruption_at_runtime() {
*/ */
"midway_corruption_at_runtime_open_multiserver_event_then_close_corrupt_first", "midway_corruption_at_runtime_open_multiserver_event_then_close_corrupt_first",
|jrnl_id| { |jrnl_id| {
let mut jrnl = create_journal::<SimpleDBJournal>(jrnl_id)?; let mut jrnl = journal_init(jrnl_id)?;
let mut sdb = SimpleDB::new(); let mut sdb = SimpleDB::new();
for num in 1..=TRIALS { for num in 1..=TRIALS {
sdb.push(&mut jrnl, keyfmt(num))?; sdb.push(&mut jrnl, keyfmt(num))?;
@ -1495,3 +1460,125 @@ fn midway_corruption_at_runtime() {
}, },
) )
} }
/*
rollback tests
*/
/// Steps:
/// 1. A new log is created
/// 2. Events and corruptions are introduced
/// 3. Rolled back
/// 4. Closed
/// 5. Re-opened
fn emulate_failure_for_rollback(
journal_id: &str,
action: impl Fn(&mut SimpleDB, &mut RawJournalWriter<SimpleDBJournal>) -> RuntimeResult<()>,
verify_error: impl Fn(fractal::error::Error),
post_rollback: impl Fn(&SimpleDB),
) {
{
let mut db = SimpleDB::new();
let mut jrnl = create_journal::<SimpleDBJournal>(journal_id).unwrap();
let err = action(&mut db, &mut jrnl).unwrap_err();
verify_error(err);
for _ in 0..1000 {
// idempotency guarantee: no matter how many times this is called, the underlying state will rollback to, and only to the last event
jrnl.__rollback().unwrap();
}
RawJournalWriter::close_driver(&mut jrnl).unwrap();
}
{
let db = SimpleDB::new();
let mut jrnl = journal_open(journal_id, &db).expect(&format!("{:#?}", debug_get_trace()));
post_rollback(&db);
RawJournalWriter::close_driver(&mut jrnl).unwrap();
}
FileSystem::remove_file(journal_id).unwrap();
}
#[test]
fn rollback_write_zero_empty_log() {
emulate_failure_for_rollback(
"rollback_empty_log_write_zero",
|db, jrnl| {
vfs_utils::debug_enable_zero_write_crash();
let r = db.push(jrnl, "hello, world");
vfs_utils::debug_disable_write_crash();
r
},
|e| match e.kind() {
ErrorKind::IoError(io) if io.kind() == IoErrorKind::WriteZero => {}
unexpected => panic!("expected write zero, got {unexpected:?}"),
},
|db| assert_eq!(db.data().len(), 0),
);
}
#[test]
fn rollback_write_zero_nonempty_log() {
emulate_failure_for_rollback(
"rollback_write_zero_nonempty_log",
|db, jrnl| {
// commit a single "good" event
db.push(jrnl, "my good key")?;
vfs_utils::debug_enable_zero_write_crash();
let r = db.push(jrnl, "this won't go in");
vfs_utils::debug_disable_write_crash();
r
},
|e| match e.kind() {
ErrorKind::IoError(io) if io.kind() == IoErrorKind::WriteZero => {}
unexpected => panic!("expected write zero, got {unexpected:?}"),
},
|db| {
assert_eq!(db.data().len(), 1);
assert_eq!(db.data()[0], "my good key")
},
)
}
#[test]
fn rollback_random_write_failure_empty_log() {
for _ in 0..100 {
emulate_failure_for_rollback(
"rollback_random_write_failure_empty_log",
|db, jrnl| {
vfs_utils::debug_enable_random_write_crash();
let r = db.push(jrnl, "hello, world");
vfs_utils::debug_disable_write_crash();
r
},
|e| match e.kind() {
ErrorKind::IoError(io) if io.kind() == IoErrorKind::WriteZero => {}
unexpected => panic!("expected write zero, got {unexpected:?}"),
},
|db| assert_eq!(db.data().len(), 0),
);
}
}
#[test]
fn rollback_random_write_failure_log() {
for _ in 0..100 {
emulate_failure_for_rollback(
"rollback_random_write_failure_log",
|db, jrnl| {
// commit a single "good" event
db.push(jrnl, "my good key")?;
vfs_utils::debug_enable_random_write_crash();
let r = db.push(jrnl, "this won't go in");
vfs_utils::debug_disable_write_crash();
r
},
|e| match e.kind() {
ErrorKind::IoError(io) if io.kind() == IoErrorKind::WriteZero => {}
unexpected => panic!("expected write zero, got {unexpected:?}"),
},
|db| {
assert_eq!(db.data().len(), 1);
assert_eq!(db.data()[0], "my good key")
},
)
}
}

@ -54,6 +54,9 @@ impl SysIOError {
pub fn kind(&self) -> std::io::ErrorKind { pub fn kind(&self) -> std::io::ErrorKind {
self.0.kind() self.0.kind()
} }
pub fn inner(&self) -> &std::io::Error {
&self.0
}
} }
impl From<std::io::Error> for SysIOError { impl From<std::io::Error> for SysIOError {

Loading…
Cancel
Save