diff --git a/CHANGELOG.md b/CHANGELOG.md index c4bd8383..f135f7b9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,7 +6,8 @@ All changes in this project will be noted in this file. ### Additions -- Skyhash/2.1: Restored support for pipelines +- Skyhash/2: Restored support for pipelines +- Enable online (runtime) recovery of transactional failures due to disk errors ## Version 0.8.1 diff --git a/server/src/engine/fractal/mgr.rs b/server/src/engine/fractal/mgr.rs index 0b8f7190..172f49dc 100644 --- a/server/src/engine/fractal/mgr.rs +++ b/server/src/engine/fractal/mgr.rs @@ -115,7 +115,7 @@ pub enum CriticalTask { /// Write a new data batch WriteBatch(ModelUniqueID, usize), /// try recovering model ID - TryModelAutorecoverLWT(ModelUniqueID), + TryModelAutorecover(ModelUniqueID), CheckGNSDriver, } @@ -323,13 +323,7 @@ impl FractalMgr { match task { CriticalTask::CheckGNSDriver => { info!("trying to autorecover GNS driver"); - match global - .state() - .gns_driver() - .txn_driver - .lock() - .__lwt_heartbeat() - { + match global.state().gns_driver().txn_driver.lock().__rollback() { Ok(()) => { info!("GNS driver has been successfully auto-recovered"); global.state().gns_driver().status().set_okay(); @@ -343,7 +337,7 @@ impl FractalMgr { } } } - CriticalTask::TryModelAutorecoverLWT(mdl_id) => { + CriticalTask::TryModelAutorecover(mdl_id) => { info!("trying to autorecover model {mdl_id}"); match global .state() @@ -355,7 +349,7 @@ impl FractalMgr { Some(mdl) if mdl.data().get_uuid() == mdl_id.uuid() => { let mut drv = mdl.driver().batch_driver().lock(); let drv = drv.as_mut().unwrap(); - match drv.__lwt_heartbeat() { + match drv.__rollback() { Ok(()) => { mdl.driver().status().set_okay(); global.health().report_recovery(); @@ -364,7 +358,7 @@ impl FractalMgr { Err(e) => { error!("failed to autorecover {mdl_id} with {e}. will try again"); self.hp_dispatcher - .send(Task::new(CriticalTask::TryModelAutorecoverLWT(mdl_id))) + .send(Task::new(CriticalTask::TryModelAutorecover(mdl_id))) .unwrap() } } @@ -548,9 +542,7 @@ impl FractalMgr { .map_err(|e| { mdl_driver_.status().set_iffy(); self.hp_dispatcher - .send(Task::new(CriticalTask::TryModelAutorecoverLWT( - mdl_id.into(), - ))) + .send(Task::new(CriticalTask::TryModelAutorecover(mdl_id.into()))) .unwrap(); (e, BatchStats::into_inner(batch_stats)) }) diff --git a/server/src/engine/fractal/test_utils.rs b/server/src/engine/fractal/test_utils.rs index 219cf0de..e16db672 100644 --- a/server/src/engine/fractal/test_utils.rs +++ b/server/src/engine/fractal/test_utils.rs @@ -133,7 +133,7 @@ impl GlobalInstanceLike for TestGlobal { .commit_with_ctx(StdModelBatch::new(mdl.data(), count), BatchStats::new()) .unwrap() } - CriticalTask::TryModelAutorecoverLWT(_) => {} + CriticalTask::TryModelAutorecover(_) => {} CriticalTask::CheckGNSDriver => {} } } diff --git a/server/src/engine/storage/common/sdss/impls/sdss_r1/rw.rs b/server/src/engine/storage/common/sdss/impls/sdss_r1/rw.rs index cdf900c2..92dea573 100644 --- a/server/src/engine/storage/common/sdss/impls/sdss_r1/rw.rs +++ b/server/src/engine/storage/common/sdss/impls/sdss_r1/rw.rs @@ -297,7 +297,7 @@ pub struct TrackedWriter< S: FileSpecV1, const SIZE: usize = 8192, const PANIC_IF_UNFLUSHED: bool = true, - const CHECKSUM_WRITTEN_IF_BLOCK_ERROR: bool = true, + const CHECKSUM_WRITTEN_IF_BLOCK_ERROR: bool = false, > { f_d: File, f_md: S::Metadata, @@ -417,6 +417,12 @@ impl< pub fn current_checksum(&self) -> u64 { self.t_checksum.clone().finish() } + pub fn inner_mut(&mut self, f: impl Fn(&mut File) -> IoResult) -> IoResult<()> { + let file = &mut self.f_d; + let new_cursor = f(file)?; + self.t_cursor = new_cursor; + Ok(()) + } } impl< @@ -441,10 +447,8 @@ impl< match self.f_d.fwrite_all_count(buf) { (cnt, r) => { self.t_cursor += cnt; - if r.is_err() { - if CHECKSUM_WRITTEN_IF_BLOCK_ERROR { - self.t_checksum.update(&buf[..cnt as usize]); - } + if r.is_err() && CHECKSUM_WRITTEN_IF_BLOCK_ERROR { + self.t_checksum.update(&buf[..cnt as usize]); } else { self.t_checksum.update(buf); } @@ -491,7 +495,13 @@ impl< return Ok(()); } self.flush_buf()?; - // write whatever capacity exceeds the buffer size + /* + write whatever capacity exceeds the buffer size + [a,b,c,d,e,f] + problem: but we can only hold two items + so write to disk: [a,b] + store in memory: [c,d,e,f] + */ let to_write_cnt = buf.len().saturating_sub(SIZE); match self.f_d.fwrite_all_count(&buf[..to_write_cnt]) { (cnt, r) => { @@ -538,6 +548,12 @@ impl< pub fn fsync(&mut self) -> IoResult<()> { self.f_d.fsync_all() } + /// Empty the write buffer + /// + /// DANGER: This means that whatever data was in the buffer will be immediately discarded + pub unsafe fn drain_buffer(&mut self) { + self.buf.clear() + } } impl< diff --git a/server/src/engine/storage/v2/raw/journal/raw/mod.rs b/server/src/engine/storage/v2/raw/journal/raw/mod.rs index b93eae48..46f030da 100644 --- a/server/src/engine/storage/v2/raw/journal/raw/mod.rs +++ b/server/src/engine/storage/v2/raw/journal/raw/mod.rs @@ -35,6 +35,7 @@ use { mem::unsafe_apis::memcpy, storage::common::{ checksum::SCrc64, + interface::fs::FileWriteExt, sdss::sdss_r1::{ rw::{SdssFile, TrackedReader, TrackedWriter}, FileSpecV1, @@ -519,7 +520,9 @@ pub(super) enum DriverEventKind { Journal writer implementation --- Quick notes: - - This is a low level writer and only handles driver events. Higher level impls must account for + - This is a low level writer and only handles driver events + - Checksum verification is only performed for meta events + - Implementors must handle checksums themselves +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */ @@ -622,18 +625,20 @@ impl RawJournalWriter { { self.commit_with_ctx(event, Default::default()) } - /// WARNING: ONLY CALL AFTER A FAILURE EVENT. THIS WILL EMPTY THE UNFLUSHED BUFFER - pub fn __lwt_heartbeat(&mut self) -> RuntimeResult<()> { - // verify that the on disk cursor is the same as what we know + pub fn __rollback(&mut self) -> RuntimeResult<()> { + // ensure cursors are in sync, even if out of position self.log_file.verify_cursor()?; - if self.log_file.cursor() == self.known_txn_offset { - // great, so if there was something in the buffer, simply ignore it - self.log_file.__zero_buffer(); - Ok(()) - } else { - // so, the on-disk file probably has some partial state. this is bad. throw an error - Err(StorageError::RawJournalRuntimeHeartbeatFail.into()) + // reverse + self.log_file.inner_mut(|file| { + file.f_truncate(self.known_txn_offset)?; + Ok(self.known_txn_offset) + })?; + // reverse successful, now empty write buffer + unsafe { + // UNSAFE(@ohsayan): since the log has been reversed, whatever we failed to write should simply be ignored + self.log_file.drain_buffer(); } + Ok(()) } } @@ -643,12 +648,12 @@ impl RawJournalWriter { f: impl FnOnce(&mut Self, u128) -> RuntimeResult, ) -> RuntimeResult { let id = self.txn_id; - self.txn_id += 1; let ret = f(self, id as u128); if ret.is_ok() { jtrace_event_offset!(id, self.log_file.cursor()); self.known_txn_id = id; self.known_txn_offset = self.log_file.cursor(); + self.txn_id += 1; } ret }