From b9c97c51b299281b9c9385976cedceb755ab42b5 Mon Sep 17 00:00:00 2001 From: Sayan Nandan Date: Wed, 27 Mar 2024 00:31:21 +0530 Subject: [PATCH] storage: Fix recovery algorithms and add test for repair --- .../engine/storage/v2/raw/journal/raw/mod.rs | 36 +++- .../raw/{tests.rs => tests/journal_ops.rs} | 196 ++---------------- .../storage/v2/raw/journal/raw/tests/mod.rs | 193 +++++++++++++++++ .../v2/raw/journal/raw/tests/recovery.rs | 119 +++++++++++ 4 files changed, 357 insertions(+), 187 deletions(-) rename server/src/engine/storage/v2/raw/journal/raw/{tests.rs => tests/journal_ops.rs} (70%) create mode 100644 server/src/engine/storage/v2/raw/journal/raw/tests/mod.rs create mode 100644 server/src/engine/storage/v2/raw/journal/raw/tests/recovery.rs diff --git a/server/src/engine/storage/v2/raw/journal/raw/mod.rs b/server/src/engine/storage/v2/raw/journal/raw/mod.rs index fc000de3..84c16de3 100644 --- a/server/src/engine/storage/v2/raw/journal/raw/mod.rs +++ b/server/src/engine/storage/v2/raw/journal/raw/mod.rs @@ -78,7 +78,7 @@ where RawJournalWriter::new(initializer, file) } -#[derive(Debug)] +#[derive(Debug, PartialEq)] /// The result of a journal repair operation pub enum RepairResult { /// No errors were detected @@ -215,8 +215,8 @@ pub(super) enum JournalWriterTraceEvent { DriverClosed, } +#[cfg(test)] local! { - #[cfg(test)] static TRACE: Vec = Vec::new(); } @@ -746,10 +746,9 @@ impl RawJournalReader { ); return Ok(initializer); } - Ok(false) => {} + Ok(false) => self.state = JournalState::AwaitingEvent, Err(e) => return Err(e), } - self.state = JournalState::AwaitingEvent; } } fn new( @@ -807,7 +806,13 @@ impl RawJournalReader { e: Error, repair_mode: JournalRepairMode, ) -> RuntimeResult<(RepairResult, JournalInitializer, SdssFile)> { - let lost = self.tr.cached_size() - self.tr.cursor(); + let lost = if self.last_txn_offset == 0 { + // we haven't scanned any events and already hit an error + // so essentially, we lost the entire log + self.tr.cached_size() - ::SIZE as u64 + } else { + self.tr.cached_size() - self.last_txn_offset + }; let mut repair_result = RepairResult::LostBytes(lost); match repair_mode { JournalRepairMode::Simple => {} @@ -867,9 +872,13 @@ impl RawJournalReader { */ l!(let known_event_id, known_event_offset, known_event_checksum = self.last_txn_id, self.last_txn_offset, self.last_txn_checksum); let mut last_logged_checksum = self.tr.checksum(); - let was_eof = self.tr.is_eof(); let mut base_log = self.tr.into_inner(); - base_log.truncate(known_event_offset)?; + if known_event_offset == 0 { + // no event, so just trim upto header + base_log.truncate(::SIZE as _)?; + } else { + base_log.truncate(known_event_offset)?; + } /* see what needs to be done next */ @@ -882,13 +891,22 @@ impl RawJournalReader { the log is in a dirty state that can only be resolved by closing it */ let drv_close = DriverEvent::new( - (known_event_id + 1) as u128, + if known_event_offset == 0 { + // no event occurred + 0 + } else { + // something happened prior to this, so we'll use an incremented ID for this event + known_event_id + 1 + } as u128, DriverEventKind::Closed, known_event_checksum, known_event_offset, known_event_id, ); - if matches!(self.state, JournalState::AwaitingClose) & was_eof { + if { + (self.state == JournalState::AwaitingClose) | // expecting a close but we couldn't parse it + (self.state == JournalState::AwaitingEvent) // we were awaiting an event but we couldn't get enough metadata to do anything + } { // we reached eof and we were expecting a close. definitely lost an unspecified number of bytes repair_result = RepairResult::UnspecifiedLoss(lost); } diff --git a/server/src/engine/storage/v2/raw/journal/raw/tests.rs b/server/src/engine/storage/v2/raw/journal/raw/tests/journal_ops.rs similarity index 70% rename from server/src/engine/storage/v2/raw/journal/raw/tests.rs rename to server/src/engine/storage/v2/raw/journal/raw/tests/journal_ops.rs index 42c8d5eb..0dc63fc3 100644 --- a/server/src/engine/storage/v2/raw/journal/raw/tests.rs +++ b/server/src/engine/storage/v2/raw/journal/raw/tests/journal_ops.rs @@ -1,5 +1,5 @@ /* - * Created on Tue Jan 30 2024 + * Created on Tue Mar 26 2024 * * This file is a part of Skytable * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source @@ -26,175 +26,15 @@ use { super::{ - create_journal, open_journal, CommitPreference, DriverEvent, DriverEventKind, - JournalInitializer, RawJournalAdapter, RawJournalAdapterEvent, RawJournalWriter, - }, - crate::engine::{ - error::StorageError, - fractal::error::ErrorContext, - storage::{ - common::sdss::sdss_r1::rw::TrackedReader, - v2::raw::{ - journal::raw::{JournalReaderTraceEvent, JournalSettings, JournalWriterTraceEvent}, - spec::SystemDatabaseV1, - }, + super::{ + create_journal, obtain_trace, open_journal, DriverEventKind, JournalReaderTraceEvent, + JournalSettings, JournalWriterTraceEvent, RawJournalWriter, }, - RuntimeResult, + SimpleDB, SimpleDBJournal, }, - std::cell::RefCell, + crate::engine::fractal::error::ErrorContext, }; -#[test] -fn encode_decode_meta() { - let dv1 = DriverEvent::new(u128::MAX - 1, DriverEventKind::Reopened, 0, 0, 0); - let encoded1 = dv1.encode_self(); - let decoded1 = DriverEvent::decode(encoded1).unwrap(); - assert_eq!(dv1, decoded1); -} - -/* - impls for journal tests -*/ - -#[derive(Debug, Clone, PartialEq)] -pub struct SimpleDB { - data: RefCell>, -} -impl SimpleDB { - fn new() -> Self { - Self { - data: RefCell::default(), - } - } - fn data(&self) -> std::cell::Ref<'_, Vec> { - self.data.borrow() - } - fn clear(&mut self, log: &mut RawJournalWriter) -> RuntimeResult<()> { - log.commit_event(DbEventClear)?; - self.data.get_mut().clear(); - Ok(()) - } - fn pop(&mut self, log: &mut RawJournalWriter) -> RuntimeResult<()> { - self.data.get_mut().pop().unwrap(); - log.commit_event(DbEventPop)?; - Ok(()) - } - fn push( - &mut self, - log: &mut RawJournalWriter, - new: impl ToString, - ) -> RuntimeResult<()> { - let new = new.to_string(); - log.commit_event(DbEventPush(&new))?; - self.data.get_mut().push(new); - Ok(()) - } -} - -/* - event impls -*/ - -pub struct SimpleDBJournal; -struct DbEventPush<'a>(&'a str); -struct DbEventPop; -struct DbEventClear; -trait SimpleDBEvent: Sized { - const OPC: u8; - fn write_buffered(self, _: &mut Vec); -} -macro_rules! impl_db_event { - ($($ty:ty as $code:expr $(=> $expr:expr)?),*) => { - $(impl SimpleDBEvent for $ty { - const OPC: u8 = $code; - fn write_buffered(self, buf: &mut Vec) { let _ = buf; fn _do_it(s: $ty, b: &mut Vec, f: impl Fn($ty, &mut Vec)) { f(s, b) } $(_do_it(self, buf, $expr))? } - })* - } -} - -impl_db_event!( - DbEventPush<'_> as 0 => |me, buf| { - buf.extend(&(me.0.len() as u64).to_le_bytes()); - buf.extend(me.0.as_bytes()); - }, - DbEventPop as 1, - DbEventClear as 2 -); - -impl RawJournalAdapterEvent for T { - fn md(&self) -> u64 { - T::OPC as _ - } - fn write_buffered(self, buf: &mut Vec, _: ()) { - T::write_buffered(self, buf) - } -} - -#[derive(Debug, PartialEq, Clone, Copy)] -pub enum EventMeta { - NewKey, - Pop, - Clear, -} -impl RawJournalAdapter for SimpleDBJournal { - const COMMIT_PREFERENCE: CommitPreference = CommitPreference::Buffered; - type Spec = SystemDatabaseV1; - type GlobalState = SimpleDB; - type EventMeta = EventMeta; - type CommitContext = (); - type Context<'a> = () where Self: 'a; - fn initialize(_: &JournalInitializer) -> Self { - Self - } - fn enter_context<'a>(_: &'a mut RawJournalWriter) -> Self::Context<'a> { - () - } - fn parse_event_meta(meta: u64) -> Option { - Some(match meta { - 0 => EventMeta::NewKey, - 1 => EventMeta::Pop, - 2 => EventMeta::Clear, - _ => return None, - }) - } - fn commit_buffered<'a, E: RawJournalAdapterEvent>( - &mut self, - buf: &mut Vec, - event: E, - ctx: (), - ) { - event.write_buffered(buf, ctx) - } - fn decode_apply<'a>( - gs: &Self::GlobalState, - meta: Self::EventMeta, - file: &mut TrackedReader, - ) -> RuntimeResult<()> { - match meta { - EventMeta::NewKey => { - let key_size = u64::from_le_bytes(file.read_block()?); - let mut keybuf = vec![0u8; key_size as usize]; - file.tracked_read(&mut keybuf)?; - match String::from_utf8(keybuf) { - Ok(k) => gs.data.borrow_mut().push(k), - Err(_) => { - return Err(StorageError::RawJournalDecodeEventCorruptedPayload.into()) - } - } - } - EventMeta::Clear => gs.data.borrow_mut().clear(), - EventMeta::Pop => { - let _ = gs.data.borrow_mut().pop().unwrap(); - } - } - Ok(()) - } -} - -/* - journal tests -*/ - #[test] fn journal_open_close() { const JOURNAL_NAME: &str = "journal_open_close"; @@ -202,12 +42,12 @@ fn journal_open_close() { // new boot let mut j = create_journal::(JOURNAL_NAME).unwrap(); assert_eq!( - super::obtain_trace(), + obtain_trace(), intovec![JournalWriterTraceEvent::Initialized] ); RawJournalWriter::close_driver(&mut j).unwrap(); assert_eq!( - super::obtain_trace(), + obtain_trace(), intovec![ JournalWriterTraceEvent::DriverEventAttemptCommit { event: DriverEventKind::Closed, @@ -228,7 +68,7 @@ fn journal_open_close() { ) .unwrap(); assert_eq!( - super::obtain_trace(), + obtain_trace(), intovec![ // init reader and read close event JournalReaderTraceEvent::Initialized, @@ -251,7 +91,7 @@ fn journal_open_close() { ); RawJournalWriter::close_driver(&mut j).unwrap(); assert_eq!( - super::obtain_trace(), + obtain_trace(), intovec![ JournalWriterTraceEvent::DriverEventAttemptCommit { event: DriverEventKind::Closed, @@ -272,7 +112,7 @@ fn journal_open_close() { ) .unwrap(); assert_eq!( - super::obtain_trace(), + obtain_trace(), intovec![ // init reader and read reopen event JournalReaderTraceEvent::Initialized, @@ -304,7 +144,7 @@ fn journal_open_close() { ); RawJournalWriter::close_driver(&mut j).unwrap(); assert_eq!( - super::obtain_trace(), + obtain_trace(), intovec![ JournalWriterTraceEvent::DriverEventAttemptCommit { event: DriverEventKind::Closed, @@ -328,7 +168,7 @@ fn journal_with_server_single_event() { db.push(&mut j, "hello world").unwrap(); RawJournalWriter::close_driver(&mut j).unwrap(); assert_eq!( - super::obtain_trace(), + obtain_trace(), intovec![ JournalWriterTraceEvent::Initialized, JournalWriterTraceEvent::CommitAttemptForEvent(0), @@ -349,12 +189,12 @@ fn journal_with_server_single_event() { let db = SimpleDB::new(); // second boot let mut j = open_journal::(JOURNAL_NAME, &db, JournalSettings::default()) - .set_dmsg_fn(|| format!("{:?}", super::obtain_trace())) + .set_dmsg_fn(|| format!("{:?}", obtain_trace())) .unwrap(); assert_eq!(db.data().len(), 1); assert_eq!(db.data()[0], "hello world"); assert_eq!( - super::obtain_trace(), + obtain_trace(), intovec![ // init reader and read server event JournalReaderTraceEvent::Initialized, @@ -382,7 +222,7 @@ fn journal_with_server_single_event() { ); RawJournalWriter::close_driver(&mut j).unwrap(); assert_eq!( - super::obtain_trace(), + obtain_trace(), intovec![ JournalWriterTraceEvent::DriverEventAttemptCommit { event: DriverEventKind::Closed, @@ -402,7 +242,7 @@ fn journal_with_server_single_event() { assert_eq!(db.data().len(), 1); assert_eq!(db.data()[0], "hello world"); assert_eq!( - super::obtain_trace(), + obtain_trace(), intovec![ // init reader and read server event JournalReaderTraceEvent::Initialized, @@ -440,7 +280,7 @@ fn journal_with_server_single_event() { ); RawJournalWriter::close_driver(&mut j).unwrap(); assert_eq!( - super::obtain_trace(), + obtain_trace(), intovec![ JournalWriterTraceEvent::DriverEventAttemptCommit { event: DriverEventKind::Closed, diff --git a/server/src/engine/storage/v2/raw/journal/raw/tests/mod.rs b/server/src/engine/storage/v2/raw/journal/raw/tests/mod.rs new file mode 100644 index 00000000..b032ebe1 --- /dev/null +++ b/server/src/engine/storage/v2/raw/journal/raw/tests/mod.rs @@ -0,0 +1,193 @@ +/* + * Created on Tue Jan 30 2024 + * + * This file is a part of Skytable + * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source + * NoSQL database written by Sayan Nandan ("the Author") with the + * vision to provide flexibility in data modelling without compromising + * on performance, queryability or scalability. + * + * Copyright (c) 2024, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +mod journal_ops; +mod recovery; + +use { + super::{ + CommitPreference, DriverEvent, DriverEventKind, JournalInitializer, RawJournalAdapter, + RawJournalAdapterEvent, RawJournalWriter, + }, + crate::engine::{ + error::StorageError, + storage::{common::sdss::sdss_r1::rw::TrackedReader, v2::raw::spec::SystemDatabaseV1}, + RuntimeResult, + }, + std::cell::RefCell, +}; + +/* + impls for journal tests +*/ + +#[derive(Debug, Clone, PartialEq)] +pub struct SimpleDB { + data: RefCell>, +} +impl SimpleDB { + fn new() -> Self { + Self { + data: RefCell::default(), + } + } + fn data(&self) -> std::cell::Ref<'_, Vec> { + self.data.borrow() + } + fn clear(&mut self, log: &mut RawJournalWriter) -> RuntimeResult<()> { + log.commit_event(DbEventClear)?; + self.data.get_mut().clear(); + Ok(()) + } + fn pop(&mut self, log: &mut RawJournalWriter) -> RuntimeResult<()> { + self.data.get_mut().pop().unwrap(); + log.commit_event(DbEventPop)?; + Ok(()) + } + fn push( + &mut self, + log: &mut RawJournalWriter, + new: impl ToString, + ) -> RuntimeResult<()> { + let new = new.to_string(); + log.commit_event(DbEventPush(&new))?; + self.data.get_mut().push(new); + Ok(()) + } +} + +/* + event impls +*/ + +#[derive(Debug)] +pub struct SimpleDBJournal; +struct DbEventPush<'a>(&'a str); +struct DbEventPop; +struct DbEventClear; +trait SimpleDBEvent: Sized { + const OPC: u8; + fn write_buffered(self, _: &mut Vec); +} +macro_rules! impl_db_event { + ($($ty:ty as $code:expr $(=> $expr:expr)?),*) => { + $(impl SimpleDBEvent for $ty { + const OPC: u8 = $code; + fn write_buffered(self, buf: &mut Vec) { let _ = buf; fn _do_it(s: $ty, b: &mut Vec, f: impl Fn($ty, &mut Vec)) { f(s, b) } $(_do_it(self, buf, $expr))? } + })* + } +} + +impl_db_event!( + DbEventPush<'_> as 0 => |me, buf| { + buf.extend(&(me.0.len() as u64).to_le_bytes()); + buf.extend(me.0.as_bytes()); + }, + DbEventPop as 1, + DbEventClear as 2 +); + +impl RawJournalAdapterEvent for T { + fn md(&self) -> u64 { + T::OPC as _ + } + fn write_buffered(self, buf: &mut Vec, _: ()) { + T::write_buffered(self, buf) + } +} + +#[derive(Debug, PartialEq, Clone, Copy)] +pub enum EventMeta { + NewKey, + Pop, + Clear, +} +impl RawJournalAdapter for SimpleDBJournal { + const COMMIT_PREFERENCE: CommitPreference = CommitPreference::Buffered; + type Spec = SystemDatabaseV1; + type GlobalState = SimpleDB; + type EventMeta = EventMeta; + type CommitContext = (); + type Context<'a> = () where Self: 'a; + fn initialize(_: &JournalInitializer) -> Self { + Self + } + fn enter_context<'a>(_: &'a mut RawJournalWriter) -> Self::Context<'a> { + () + } + fn parse_event_meta(meta: u64) -> Option { + Some(match meta { + 0 => EventMeta::NewKey, + 1 => EventMeta::Pop, + 2 => EventMeta::Clear, + _ => return None, + }) + } + fn commit_buffered<'a, E: RawJournalAdapterEvent>( + &mut self, + buf: &mut Vec, + event: E, + ctx: (), + ) { + event.write_buffered(buf, ctx) + } + fn decode_apply<'a>( + gs: &Self::GlobalState, + meta: Self::EventMeta, + file: &mut TrackedReader, + ) -> RuntimeResult<()> { + match meta { + EventMeta::NewKey => { + let key_size = u64::from_le_bytes(file.read_block()?); + let mut keybuf = vec![0u8; key_size as usize]; + file.tracked_read(&mut keybuf)?; + match String::from_utf8(keybuf) { + Ok(k) => gs.data.borrow_mut().push(k), + Err(_) => { + return Err(StorageError::RawJournalDecodeEventCorruptedPayload.into()) + } + } + } + EventMeta::Clear => gs.data.borrow_mut().clear(), + EventMeta::Pop => { + let _ = gs.data.borrow_mut().pop().unwrap(); + } + } + Ok(()) + } +} + +/* + basic tests +*/ + +#[test] +fn encode_decode_meta() { + let dv1 = DriverEvent::new(u128::MAX - 1, DriverEventKind::Reopened, 0, 0, 0); + let encoded1 = dv1.encode_self(); + let decoded1 = DriverEvent::decode(encoded1).unwrap(); + assert_eq!(dv1, decoded1); +} diff --git a/server/src/engine/storage/v2/raw/journal/raw/tests/recovery.rs b/server/src/engine/storage/v2/raw/journal/raw/tests/recovery.rs new file mode 100644 index 00000000..4fbb723b --- /dev/null +++ b/server/src/engine/storage/v2/raw/journal/raw/tests/recovery.rs @@ -0,0 +1,119 @@ +/* + * Created on Tue Mar 26 2024 + * + * This file is a part of Skytable + * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source + * NoSQL database written by Sayan Nandan ("the Author") with the + * vision to provide flexibility in data modelling without compromising + * on performance, queryability or scalability. + * + * Copyright (c) 2024, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +use { + super::{SimpleDB, SimpleDBJournal}, + crate::engine::{ + error::ErrorKind, + storage::{ + common::interface::fs::{File, FileExt, FileSystem, FileWriteExt}, + v2::raw::journal::{ + create_journal, open_journal, + raw::{obtain_trace, DriverEvent, JournalReaderTraceEvent, RawJournalWriter}, + repair_journal, JournalRepairMode, JournalSettings, RepairResult, + }, + }, + }, + std::io::ErrorKind as IoErrorKind, +}; + +#[test] +fn close_event_corruption() { + let full_file_size; + { + // open and close a journal (and clear traces) + let mut jrnl = create_journal::("close_event_corruption.db").unwrap(); + RawJournalWriter::close_driver(&mut jrnl).unwrap(); + let _ = obtain_trace(); + full_file_size = { + let f = File::open("close_event_corruption.db").unwrap(); + f.f_len().unwrap() + }; + } + for (trim_size, new_size) in (1..=DriverEvent::FULL_EVENT_SIZE) + .rev() + .map(|trim_size| (trim_size, full_file_size - trim_size as u64)) + { + // create a copy of the "good" journal + let trimmed_journal_path = format!("close_event_corruption-trimmed-{trim_size}.db"); + FileSystem::copy("close_event_corruption.db", &trimmed_journal_path).unwrap(); + let simple_db = SimpleDB::new(); + let open_journal_fn = || { + open_journal::( + &trimmed_journal_path, + &simple_db, + JournalSettings::default(), + ) + }; + // trim this journal to simulate loss of data + let mut f = File::open(&trimmed_journal_path).unwrap(); + f.f_truncate(new_size).unwrap(); + // open the journal and validate failure + let open_err = open_journal_fn().unwrap_err(); + let trace = obtain_trace(); + if trim_size > (DriverEvent::FULL_EVENT_SIZE - (sizeof!(u128) + sizeof!(u64))) { + // the amount of trim from the end of the file causes us to lose valuable metadata + assert_eq!( + trace, + intovec![JournalReaderTraceEvent::Initialized], + "failed at trim_size {trim_size}" + ); + } else { + // the amount of trim still allows us to read some metadata + assert_eq!( + trace, + intovec![ + JournalReaderTraceEvent::Initialized, + JournalReaderTraceEvent::AttemptingEvent(0), + JournalReaderTraceEvent::DriverEventExpectingClose + ], + "failed at trim_size {trim_size}" + ); + } + assert_eq!( + open_err.kind(), + &ErrorKind::IoError(IoErrorKind::UnexpectedEof.into()), + "failed at trim_size {trim_size}" + ); + // now repair this log + assert_eq!( + repair_journal::( + &trimmed_journal_path, + &simple_db, + JournalSettings::default(), + JournalRepairMode::Simple, + ) + .unwrap(), + RepairResult::UnspecifiedLoss((DriverEvent::FULL_EVENT_SIZE - trim_size) as _), + "failed at trim_size {trim_size}" + ); + // now reopen log and ensure it's repaired + let mut jrnl = open_journal_fn().unwrap(); + RawJournalWriter::close_driver(&mut jrnl).unwrap(); + // clear trace + let _ = obtain_trace(); + } +}