From 1ed4f41565d196855afa497ae655bd2756aae56f Mon Sep 17 00:00:00 2001 From: Sayan Nandan Date: Fri, 1 Mar 2024 23:40:34 +0530 Subject: [PATCH] Enable conditional auto-recovery of drivers --- server/src/engine/core/mod.rs | 36 ++++---- server/src/engine/core/model/alt.rs | 27 +++--- server/src/engine/core/model/mod.rs | 10 ++- server/src/engine/core/space.rs | 28 ++++--- server/src/engine/core/system_db.rs | 11 ++- server/src/engine/error.rs | 1 + server/src/engine/fractal/drivers.rs | 8 +- server/src/engine/fractal/mgr.rs | 84 +++++++++++++++++-- server/src/engine/fractal/mod.rs | 24 ++++++ server/src/engine/fractal/test_utils.rs | 2 + server/src/engine/fractal/util.rs | 5 -- .../storage/common/sdss/impls/sdss_r1/rw.rs | 14 ++++ .../engine/storage/v2/raw/journal/raw/mod.rs | 13 +++ 13 files changed, 202 insertions(+), 61 deletions(-) diff --git a/server/src/engine/core/mod.rs b/server/src/engine/core/mod.rs index 42af315f..40bc5403 100644 --- a/server/src/engine/core/mod.rs +++ b/server/src/engine/core/mod.rs @@ -38,6 +38,7 @@ mod util; // test #[cfg(test)] pub(super) mod tests; + // re-exports pub use self::util::{EntityID, EntityIDRef}; @@ -47,11 +48,14 @@ use { dml::QueryExecMeta, model::{Model, ModelData}, }, - crate::engine::{ - core::space::Space, - error::{QueryError, QueryResult}, - fractal::{FractalGNSDriver, GlobalInstanceLike}, - idx::IndexST, + crate::{ + engine::{ + core::space::Space, + error::{QueryError, QueryResult}, + fractal::{FractalGNSDriver, GlobalInstanceLike}, + idx::IndexST, + }, + util::compiler, }, parking_lot::RwLock, std::collections::HashMap, @@ -179,13 +183,17 @@ where let Some(model) = mdl_idx.get(&entity) else { return Err(QueryError::QExecObjectNotFound); }; - let r = f(model.data())?; - model::DeltaState::guard_delta_overflow( - global, - entity.space(), - entity.entity(), - model.data(), - r, - ); - Ok(()) + if compiler::likely(model.driver().status().is_healthy()) { + let r = f(model.data())?; + model::DeltaState::guard_delta_overflow( + global, + entity.space(), + entity.entity(), + model.data(), + r, + ); + Ok(()) + } else { + compiler::cold_call(|| Err(QueryError::SysServerError)) + } } diff --git a/server/src/engine/core/model/alt.rs b/server/src/engine/core/model/alt.rs index 1ea17210..90421d05 100644 --- a/server/src/engine/core/model/alt.rs +++ b/server/src/engine/core/model/alt.rs @@ -274,10 +274,11 @@ impl ModelData { &new_fields, ); // commit txn - global - .state() - .gns_driver() - .driver_context(|drv| drv.commit_event(txn), || {})?; + global.state().gns_driver().driver_context( + global, + |drv| drv.commit_event(txn), + || {}, + )?; let mut mutator = model.model_mutator(); new_fields .stseq_ord_kv() @@ -293,10 +294,11 @@ impl ModelData { &removed, ); // commit txn - global - .state() - .gns_driver() - .driver_context(|drv| drv.commit_event(txn), || {})?; + global.state().gns_driver().driver_context( + global, + |drv| drv.commit_event(txn), + || {}, + )?; let mut mutator = model.model_mutator(); removed.iter().for_each(|field_id| { mutator.remove_field(field_id.as_str()); @@ -309,10 +311,11 @@ impl ModelData { &updated, ); // commit txn - global - .state() - .gns_driver() - .driver_context(|drv| drv.commit_event(txn), || {})?; + global.state().gns_driver().driver_context( + global, + |drv| drv.commit_event(txn), + || {}, + )?; let mut mutator = model.model_mutator(); updated.into_iter().for_each(|(field_id, field)| { mutator.update_field(field_id.as_ref(), field); diff --git a/server/src/engine/core/model/mod.rs b/server/src/engine/core/model/mod.rs index e3138be1..3d24e4de 100644 --- a/server/src/engine/core/model/mod.rs +++ b/server/src/engine/core/model/mod.rs @@ -320,6 +320,7 @@ impl ModelData { )?; // commit txn global.state().gns_driver().driver_context( + global, |drv| drv.commit_event(txn), || { global.taskmgr_post_standard_priority(Task::new( @@ -385,10 +386,11 @@ impl ModelData { .value_u64(), )); // commit txn - global - .state() - .gns_driver() - .driver_context(|drv| drv.commit_event(txn), || {})?; + global.state().gns_driver().driver_context( + global, + |drv| drv.commit_event(txn), + || {}, + )?; // request cleanup global.purge_model_driver( space_name, diff --git a/server/src/engine/core/space.rs b/server/src/engine/core/space.rs index 3fc3d701..e0b75be9 100644 --- a/server/src/engine/core/space.rs +++ b/server/src/engine/core/space.rs @@ -169,6 +169,7 @@ impl Space { global.initialize_space(&space_name, space.get_uuid())?; // commit txn global.state().gns_driver().driver_context( + global, |drv| drv.commit_event(txn), || { global.taskmgr_post_standard_priority(Task::new(GenericTask::delete_space_dir( @@ -216,10 +217,11 @@ impl Space { ); // commit // commit txn - global - .state() - .gns_driver() - .driver_context(|drv| drv.commit_event(txn), || {})?; + global.state().gns_driver().driver_context( + global, + |drv| drv.commit_event(txn), + || {}, + )?; // merge dict::rmerge_data_with_patch(space.props_mut(), patch); // the `env` key may have been popped, so put it back (setting `env: null` removes the env key and we don't want to waste time enforcing this in the @@ -255,10 +257,11 @@ impl Space { let txn = txn::gns::space::DropSpaceTxn::new(SpaceIDRef::new(&space_name, &space)); // commit txn - global - .state() - .gns_driver() - .driver_context(|drv| drv.commit_event(txn), || {})?; + global.state().gns_driver().driver_context( + global, + |drv| drv.commit_event(txn), + || {}, + )?; // request cleanup global.taskmgr_post_standard_priority(Task::new( GenericTask::delete_space_dir(&space_name, space.get_uuid()), @@ -301,10 +304,11 @@ impl Space { // prepare txn let txn = txn::gns::space::DropSpaceTxn::new(SpaceIDRef::new(&space_name, &space)); // commit txn - global - .state() - .gns_driver() - .driver_context(|drv| drv.commit_event(txn), || {})?; + global.state().gns_driver().driver_context( + global, + |drv| drv.commit_event(txn), + || {}, + )?; // request cleanup global.taskmgr_post_standard_priority(Task::new(GenericTask::delete_space_dir( &space_name, diff --git a/server/src/engine/core/system_db.rs b/server/src/engine/core/system_db.rs index dd0c45d1..303adfa6 100644 --- a/server/src/engine/core/system_db.rs +++ b/server/src/engine/core/system_db.rs @@ -132,6 +132,7 @@ impl SystemDatabase { } let password_hash = rcrypt::hash(password, rcrypt::DEFAULT_COST).unwrap(); global.state().gns_driver().driver_context( + global, |drv| drv.commit_event(CreateUserTxn::new(&username, &password_hash)), || {}, )?; @@ -148,6 +149,7 @@ impl SystemDatabase { Some(user) => { let password_hash = rcrypt::hash(password, rcrypt::DEFAULT_COST).unwrap(); global.state().gns_driver().driver_context( + global, |drv| drv.commit_event(AlterUserTxn::new(username, &password_hash)), || {}, )?; @@ -162,10 +164,11 @@ impl SystemDatabase { if !users.contains_key(username) { return Err(QueryError::SysAuthError); } - global - .state() - .gns_driver() - .driver_context(|drv| drv.commit_event(DropUserTxn::new(username)), || {})?; + global.state().gns_driver().driver_context( + global, + |drv| drv.commit_event(DropUserTxn::new(username)), + || {}, + )?; let _ = users.remove(username); Ok(()) } diff --git a/server/src/engine/error.rs b/server/src/engine/error.rs index e7104c5f..6997f14d 100644 --- a/server/src/engine/error.rs +++ b/server/src/engine/error.rs @@ -220,5 +220,6 @@ enumerate_err! { RawJournalEventCorrupted = "journal-invalid-event", RawJournalCorrupted = "journal-corrupted", RawJournalInvalidEvent = "journal-invalid-event-order", + RawJournalRuntimeCriticalLwtHBFail = "journal-lwt-heartbeat-failed", } } diff --git a/server/src/engine/fractal/drivers.rs b/server/src/engine/fractal/drivers.rs index eb38ba44..e22e8881 100644 --- a/server/src/engine/fractal/drivers.rs +++ b/server/src/engine/fractal/drivers.rs @@ -25,10 +25,11 @@ */ use { - super::util, + super::{util, GlobalInstanceLike}, crate::{ engine::{ error::{QueryError, QueryResult, RuntimeResult}, + fractal::{CriticalTask, Task}, storage::{GNSDriver, ModelDriver}, }, util::compiler, @@ -50,8 +51,12 @@ impl FractalGNSDriver { txn_driver: Mutex::new(txn_driver), } } + pub(super) fn status(&self) -> &util::Status { + &self.status + } pub fn driver_context( &self, + g: &impl GlobalInstanceLike, f: impl Fn(&mut GNSDriver) -> RuntimeResult, on_failure: impl Fn(), ) -> QueryResult { @@ -65,6 +70,7 @@ impl FractalGNSDriver { error!("GNS driver failed with: {e}"); self.status.set_iffy(); on_failure(); + g.taskmgr_post_high_priority(Task::new(CriticalTask::CheckGNSDriver)); Err(QueryError::SysServerError) }), } diff --git a/server/src/engine/fractal/mgr.rs b/server/src/engine/fractal/mgr.rs index 6d5f8c91..1ab8c608 100644 --- a/server/src/engine/fractal/mgr.rs +++ b/server/src/engine/fractal/mgr.rs @@ -25,7 +25,7 @@ */ use { - super::ModelUniqueID, + super::{ModelUniqueID, ModelUniqueIDRef}, crate::{ engine::{ core::{ @@ -108,6 +108,9 @@ impl GenericTask { pub enum CriticalTask { /// Write a new data batch WriteBatch(ModelUniqueID, usize), + /// try recovering model ID + TryModelAutorecoverLWT(ModelUniqueID), + CheckGNSDriver, } /// The task manager @@ -288,6 +291,55 @@ impl FractalMgr { ) { // TODO(@ohsayan): check threshold and update hooks match task { + CriticalTask::CheckGNSDriver => { + info!("trying to autorecover GNS driver"); + match global + .state() + .gns_driver() + .txn_driver + .lock() + .__lwt_heartbeat() + { + Ok(()) => { + info!("GNS driver has been successfully auto-recovered"); + global.state().gns_driver().status().set_okay(); + } + Err(e) => { + error!("failed to autorecover GNS driver with error `{e}`. will try again"); + self.hp_dispatcher + .send(Task::new(CriticalTask::CheckGNSDriver)) + .unwrap(); + } + } + } + CriticalTask::TryModelAutorecoverLWT(mdl_id) => { + info!("trying to autorecover model {mdl_id}"); + match global + .state() + .namespace() + .idx_models() + .read() + .get(&EntityIDRef::new(mdl_id.space(), mdl_id.model())) + { + Some(mdl) if mdl.data().get_uuid() == mdl_id.uuid() => { + let mut drv = mdl.driver().batch_driver().lock(); + let drv = drv.as_mut().unwrap(); + match drv.__lwt_heartbeat() { + Ok(()) => { + mdl.driver().status().set_okay(); + info!("model driver for {mdl_id} has been successfully auto-recovered"); + } + Err(e) => { + error!("failed to autorecover {mdl_id} with {e}. will try again"); + self.hp_dispatcher + .send(Task::new(CriticalTask::TryModelAutorecoverLWT(mdl_id))) + .unwrap() + } + } + } + Some(_) | None => {} + } + } CriticalTask::WriteBatch(model_id, observed_size) => { info!("fhp: {model_id} has reached cache capacity. writing to disk"); let mdl_read = global.state().namespace().idx_models().read(); @@ -295,16 +347,18 @@ impl FractalMgr { model_id.space().into(), model_id.model().into(), )) { - Some(mdl) if mdl.data().get_uuid() != model_id.uuid() => { - // this is a different model with the same entity path + Some(mdl) if mdl.data().get_uuid() == model_id.uuid() => mdl, + Some(_) | None => { + // this is a different model with the same entity path or it was deleted but the task was queued return; } - Some(mdl) => mdl, - None => { - panic!("found deleted model") - } }; - match Self::try_write_model_data_batch(mdl.data(), observed_size, mdl.driver()) { + match self.try_write_model_data_batch( + ModelUniqueIDRef::from(&model_id), + mdl.data(), + observed_size, + mdl.driver(), + ) { Ok(()) => { if observed_size != 0 { info!("fhp: completed maintenance task for {model_id}, synced={observed_size}") @@ -390,7 +444,12 @@ impl FractalMgr { .data() .delta_state() .__fractal_take_full_from_data_delta(super::FractalToken::new()); - match Self::try_write_model_data_batch(model.data(), observed_len, model.driver()) { + match self.try_write_model_data_batch( + ModelUniqueIDRef::new(model_id.space(), model_id.entity(), model.data().get_uuid()), + model.data(), + observed_len, + model.driver(), + ) { Ok(()) => { if observed_len != 0 { info!( @@ -428,6 +487,8 @@ impl FractalMgr { /// /// The zero check is essential fn try_write_model_data_batch( + &'static self, + mdl_id: ModelUniqueIDRef, model: &ModelData, observed_size: usize, mdl_driver_: &super::drivers::FractalModelDriver, @@ -456,6 +517,11 @@ impl FractalMgr { ) .map_err(|e| { mdl_driver_.status().set_iffy(); + self.hp_dispatcher + .send(Task::new(CriticalTask::TryModelAutorecoverLWT( + mdl_id.into(), + ))) + .unwrap(); (e, BatchStats::into_inner(batch_stats)) }) } diff --git a/server/src/engine/fractal/mod.rs b/server/src/engine/fractal/mod.rs index 17f1fb13..154db00c 100644 --- a/server/src/engine/fractal/mod.rs +++ b/server/src/engine/fractal/mod.rs @@ -277,12 +277,36 @@ pub struct ModelUniqueID { uuid: Uuid, } +pub struct ModelUniqueIDRef<'a> { + space: &'a str, + model: &'a str, + uuid: Uuid, +} + +impl<'a> ModelUniqueIDRef<'a> { + pub fn new(space: &'a str, model: &'a str, uuid: Uuid) -> Self { + Self { space, model, uuid } + } +} + impl fmt::Display for ModelUniqueID { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "model-{}@{}", self.model(), self.space()) } } +impl<'a> From> for ModelUniqueID { + fn from(uid: ModelUniqueIDRef<'a>) -> Self { + Self::new(uid.space, uid.model, uid.uuid) + } +} + +impl<'a> From<&'a ModelUniqueID> for ModelUniqueIDRef<'a> { + fn from(uid: &'a ModelUniqueID) -> Self { + Self::new(uid.space(), uid.model(), uid.uuid()) + } +} + impl ModelUniqueID { /// Create a new unique model ID pub fn new(space: &str, model: &str, uuid: Uuid) -> Self { diff --git a/server/src/engine/fractal/test_utils.rs b/server/src/engine/fractal/test_utils.rs index 6445599a..284f7a97 100644 --- a/server/src/engine/fractal/test_utils.rs +++ b/server/src/engine/fractal/test_utils.rs @@ -127,6 +127,8 @@ impl GlobalInstanceLike for TestGlobal { .commit_with_ctx(StdModelBatch::new(mdl.data(), count), BatchStats::new()) .unwrap() } + CriticalTask::TryModelAutorecoverLWT(_) => {} + CriticalTask::CheckGNSDriver => {} } } fn taskmgr_post_standard_priority(&self, task: Task) { diff --git a/server/src/engine/fractal/util.rs b/server/src/engine/fractal/util.rs index 323e79d5..fdb47b32 100644 --- a/server/src/engine/fractal/util.rs +++ b/server/src/engine/fractal/util.rs @@ -24,8 +24,6 @@ * */ -#![allow(unused)] - use std::sync::atomic::{AtomicBool, Ordering}; #[derive(Debug)] @@ -37,9 +35,6 @@ impl Status { pub const fn new_okay() -> Self { Self::new(true) } - pub const fn new_iffy() -> Self { - Self::new(false) - } const fn new(v: bool) -> Self { Self { okay: AtomicBool::new(v), diff --git a/server/src/engine/storage/common/sdss/impls/sdss_r1/rw.rs b/server/src/engine/storage/common/sdss/impls/sdss_r1/rw.rs index 34f92fba..11e4410f 100644 --- a/server/src/engine/storage/common/sdss/impls/sdss_r1/rw.rs +++ b/server/src/engine/storage/common/sdss/impls/sdss_r1/rw.rs @@ -332,6 +332,20 @@ impl< fn available_capacity(&self) -> usize { self.buf.remaining_capacity() } + pub fn verify_cursor(&mut self) -> IoResult<()> { + let cursor = self.f_d.f_cursor()?; + if self.cursor() == cursor { + Ok(()) + } else { + Err(std::io::Error::new( + std::io::ErrorKind::Other, + "file cursor is out of sync. unreliable file system", + )) + } + } + pub fn __zero_buffer(&mut self) { + self.buf.clear() + } } impl< diff --git a/server/src/engine/storage/v2/raw/journal/raw/mod.rs b/server/src/engine/storage/v2/raw/journal/raw/mod.rs index e06127d4..da834b91 100644 --- a/server/src/engine/storage/v2/raw/journal/raw/mod.rs +++ b/server/src/engine/storage/v2/raw/journal/raw/mod.rs @@ -563,6 +563,19 @@ impl RawJournalWriter { { self.commit_with_ctx(event, Default::default()) } + /// WARNING: ONLY CALL AFTER A FAILURE EVENT. THIS WILL EMPTY THE UNFLUSHED BUFFER + pub fn __lwt_heartbeat(&mut self) -> RuntimeResult<()> { + // verify that the on disk cursor is the same as what we know + self.log_file.verify_cursor()?; + if self.log_file.cursor() == self.known_txn_offset { + // great, so if there was something in the buffer, simply ignore it + self.log_file.__zero_buffer(); + Ok(()) + } else { + // so, the on-disk file probably has some partial state. this is bad. throw an error + Err(StorageError::RawJournalRuntimeCriticalLwtHBFail.into()) + } + } } impl RawJournalWriter {