storage: online rollback for disk failures

next
Sayan Nandan 6 months ago
parent b961e840f5
commit 4982a537de
No known key found for this signature in database
GPG Key ID: 0EBD769024B24F0A

@ -6,7 +6,8 @@ All changes in this project will be noted in this file.
### Additions ### Additions
- Skyhash/2.1: Restored support for pipelines - Skyhash/2: Restored support for pipelines
- Enable online (runtime) recovery of transactional failures due to disk errors
## Version 0.8.1 ## Version 0.8.1

@ -115,7 +115,7 @@ pub enum CriticalTask {
/// Write a new data batch /// Write a new data batch
WriteBatch(ModelUniqueID, usize), WriteBatch(ModelUniqueID, usize),
/// try recovering model ID /// try recovering model ID
TryModelAutorecoverLWT(ModelUniqueID), TryModelAutorecover(ModelUniqueID),
CheckGNSDriver, CheckGNSDriver,
} }
@ -323,13 +323,7 @@ impl FractalMgr {
match task { match task {
CriticalTask::CheckGNSDriver => { CriticalTask::CheckGNSDriver => {
info!("trying to autorecover GNS driver"); info!("trying to autorecover GNS driver");
match global match global.state().gns_driver().txn_driver.lock().__rollback() {
.state()
.gns_driver()
.txn_driver
.lock()
.__lwt_heartbeat()
{
Ok(()) => { Ok(()) => {
info!("GNS driver has been successfully auto-recovered"); info!("GNS driver has been successfully auto-recovered");
global.state().gns_driver().status().set_okay(); global.state().gns_driver().status().set_okay();
@ -343,7 +337,7 @@ impl FractalMgr {
} }
} }
} }
CriticalTask::TryModelAutorecoverLWT(mdl_id) => { CriticalTask::TryModelAutorecover(mdl_id) => {
info!("trying to autorecover model {mdl_id}"); info!("trying to autorecover model {mdl_id}");
match global match global
.state() .state()
@ -355,7 +349,7 @@ impl FractalMgr {
Some(mdl) if mdl.data().get_uuid() == mdl_id.uuid() => { Some(mdl) if mdl.data().get_uuid() == mdl_id.uuid() => {
let mut drv = mdl.driver().batch_driver().lock(); let mut drv = mdl.driver().batch_driver().lock();
let drv = drv.as_mut().unwrap(); let drv = drv.as_mut().unwrap();
match drv.__lwt_heartbeat() { match drv.__rollback() {
Ok(()) => { Ok(()) => {
mdl.driver().status().set_okay(); mdl.driver().status().set_okay();
global.health().report_recovery(); global.health().report_recovery();
@ -364,7 +358,7 @@ impl FractalMgr {
Err(e) => { Err(e) => {
error!("failed to autorecover {mdl_id} with {e}. will try again"); error!("failed to autorecover {mdl_id} with {e}. will try again");
self.hp_dispatcher self.hp_dispatcher
.send(Task::new(CriticalTask::TryModelAutorecoverLWT(mdl_id))) .send(Task::new(CriticalTask::TryModelAutorecover(mdl_id)))
.unwrap() .unwrap()
} }
} }
@ -548,9 +542,7 @@ impl FractalMgr {
.map_err(|e| { .map_err(|e| {
mdl_driver_.status().set_iffy(); mdl_driver_.status().set_iffy();
self.hp_dispatcher self.hp_dispatcher
.send(Task::new(CriticalTask::TryModelAutorecoverLWT( .send(Task::new(CriticalTask::TryModelAutorecover(mdl_id.into())))
mdl_id.into(),
)))
.unwrap(); .unwrap();
(e, BatchStats::into_inner(batch_stats)) (e, BatchStats::into_inner(batch_stats))
}) })

@ -133,7 +133,7 @@ impl GlobalInstanceLike for TestGlobal {
.commit_with_ctx(StdModelBatch::new(mdl.data(), count), BatchStats::new()) .commit_with_ctx(StdModelBatch::new(mdl.data(), count), BatchStats::new())
.unwrap() .unwrap()
} }
CriticalTask::TryModelAutorecoverLWT(_) => {} CriticalTask::TryModelAutorecover(_) => {}
CriticalTask::CheckGNSDriver => {} CriticalTask::CheckGNSDriver => {}
} }
} }

@ -297,7 +297,7 @@ pub struct TrackedWriter<
S: FileSpecV1, S: FileSpecV1,
const SIZE: usize = 8192, const SIZE: usize = 8192,
const PANIC_IF_UNFLUSHED: bool = true, const PANIC_IF_UNFLUSHED: bool = true,
const CHECKSUM_WRITTEN_IF_BLOCK_ERROR: bool = true, const CHECKSUM_WRITTEN_IF_BLOCK_ERROR: bool = false,
> { > {
f_d: File, f_d: File,
f_md: S::Metadata, f_md: S::Metadata,
@ -417,6 +417,12 @@ impl<
pub fn current_checksum(&self) -> u64 { pub fn current_checksum(&self) -> u64 {
self.t_checksum.clone().finish() self.t_checksum.clone().finish()
} }
pub fn inner_mut(&mut self, f: impl Fn(&mut File) -> IoResult<u64>) -> IoResult<()> {
let file = &mut self.f_d;
let new_cursor = f(file)?;
self.t_cursor = new_cursor;
Ok(())
}
} }
impl< impl<
@ -441,10 +447,8 @@ impl<
match self.f_d.fwrite_all_count(buf) { match self.f_d.fwrite_all_count(buf) {
(cnt, r) => { (cnt, r) => {
self.t_cursor += cnt; self.t_cursor += cnt;
if r.is_err() { if r.is_err() && CHECKSUM_WRITTEN_IF_BLOCK_ERROR {
if CHECKSUM_WRITTEN_IF_BLOCK_ERROR { self.t_checksum.update(&buf[..cnt as usize]);
self.t_checksum.update(&buf[..cnt as usize]);
}
} else { } else {
self.t_checksum.update(buf); self.t_checksum.update(buf);
} }
@ -491,7 +495,13 @@ impl<
return Ok(()); return Ok(());
} }
self.flush_buf()?; self.flush_buf()?;
// write whatever capacity exceeds the buffer size /*
write whatever capacity exceeds the buffer size
[a,b,c,d,e,f]
problem: but we can only hold two items
so write to disk: [a,b]
store in memory: [c,d,e,f]
*/
let to_write_cnt = buf.len().saturating_sub(SIZE); let to_write_cnt = buf.len().saturating_sub(SIZE);
match self.f_d.fwrite_all_count(&buf[..to_write_cnt]) { match self.f_d.fwrite_all_count(&buf[..to_write_cnt]) {
(cnt, r) => { (cnt, r) => {
@ -538,6 +548,12 @@ impl<
pub fn fsync(&mut self) -> IoResult<()> { pub fn fsync(&mut self) -> IoResult<()> {
self.f_d.fsync_all() self.f_d.fsync_all()
} }
/// Empty the write buffer
///
/// DANGER: This means that whatever data was in the buffer will be immediately discarded
pub unsafe fn drain_buffer(&mut self) {
self.buf.clear()
}
} }
impl< impl<

@ -35,6 +35,7 @@ use {
mem::unsafe_apis::memcpy, mem::unsafe_apis::memcpy,
storage::common::{ storage::common::{
checksum::SCrc64, checksum::SCrc64,
interface::fs::FileWriteExt,
sdss::sdss_r1::{ sdss::sdss_r1::{
rw::{SdssFile, TrackedReader, TrackedWriter}, rw::{SdssFile, TrackedReader, TrackedWriter},
FileSpecV1, FileSpecV1,
@ -519,7 +520,9 @@ pub(super) enum DriverEventKind {
Journal writer implementation Journal writer implementation
--- ---
Quick notes: Quick notes:
- This is a low level writer and only handles driver events. Higher level impls must account for - This is a low level writer and only handles driver events
- Checksum verification is only performed for meta events
- Implementors must handle checksums themselves
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
*/ */
@ -622,18 +625,20 @@ impl<J: RawJournalAdapter> RawJournalWriter<J> {
{ {
self.commit_with_ctx(event, Default::default()) self.commit_with_ctx(event, Default::default())
} }
/// WARNING: ONLY CALL AFTER A FAILURE EVENT. THIS WILL EMPTY THE UNFLUSHED BUFFER pub fn __rollback(&mut self) -> RuntimeResult<()> {
pub fn __lwt_heartbeat(&mut self) -> RuntimeResult<()> { // ensure cursors are in sync, even if out of position
// verify that the on disk cursor is the same as what we know
self.log_file.verify_cursor()?; self.log_file.verify_cursor()?;
if self.log_file.cursor() == self.known_txn_offset { // reverse
// great, so if there was something in the buffer, simply ignore it self.log_file.inner_mut(|file| {
self.log_file.__zero_buffer(); file.f_truncate(self.known_txn_offset)?;
Ok(()) Ok(self.known_txn_offset)
} else { })?;
// so, the on-disk file probably has some partial state. this is bad. throw an error // reverse successful, now empty write buffer
Err(StorageError::RawJournalRuntimeHeartbeatFail.into()) unsafe {
// UNSAFE(@ohsayan): since the log has been reversed, whatever we failed to write should simply be ignored
self.log_file.drain_buffer();
} }
Ok(())
} }
} }
@ -643,12 +648,12 @@ impl<J: RawJournalAdapter> RawJournalWriter<J> {
f: impl FnOnce(&mut Self, u128) -> RuntimeResult<T>, f: impl FnOnce(&mut Self, u128) -> RuntimeResult<T>,
) -> RuntimeResult<T> { ) -> RuntimeResult<T> {
let id = self.txn_id; let id = self.txn_id;
self.txn_id += 1;
let ret = f(self, id as u128); let ret = f(self, id as u128);
if ret.is_ok() { if ret.is_ok() {
jtrace_event_offset!(id, self.log_file.cursor()); jtrace_event_offset!(id, self.log_file.cursor());
self.known_txn_id = id; self.known_txn_id = id;
self.known_txn_offset = self.log_file.cursor(); self.known_txn_offset = self.log_file.cursor();
self.txn_id += 1;
} }
ret ret
} }

Loading…
Cancel
Save