Enable conditional auto-recovery of drivers

next
Sayan Nandan 7 months ago
parent 614e71cbef
commit 1ed4f41565
No known key found for this signature in database
GPG Key ID: 0EBD769024B24F0A

@ -38,6 +38,7 @@ mod util;
// test
#[cfg(test)]
pub(super) mod tests;
// re-exports
pub use self::util::{EntityID, EntityIDRef};
@ -47,12 +48,15 @@ use {
dml::QueryExecMeta,
model::{Model, ModelData},
},
crate::engine::{
crate::{
engine::{
core::space::Space,
error::{QueryError, QueryResult},
fractal::{FractalGNSDriver, GlobalInstanceLike},
idx::IndexST,
},
util::compiler,
},
parking_lot::RwLock,
std::collections::HashMap,
};
@ -179,6 +183,7 @@ where
let Some(model) = mdl_idx.get(&entity) else {
return Err(QueryError::QExecObjectNotFound);
};
if compiler::likely(model.driver().status().is_healthy()) {
let r = f(model.data())?;
model::DeltaState::guard_delta_overflow(
global,
@ -188,4 +193,7 @@ where
r,
);
Ok(())
} else {
compiler::cold_call(|| Err(QueryError::SysServerError))
}
}

@ -274,10 +274,11 @@ impl ModelData {
&new_fields,
);
// commit txn
global
.state()
.gns_driver()
.driver_context(|drv| drv.commit_event(txn), || {})?;
global.state().gns_driver().driver_context(
global,
|drv| drv.commit_event(txn),
|| {},
)?;
let mut mutator = model.model_mutator();
new_fields
.stseq_ord_kv()
@ -293,10 +294,11 @@ impl ModelData {
&removed,
);
// commit txn
global
.state()
.gns_driver()
.driver_context(|drv| drv.commit_event(txn), || {})?;
global.state().gns_driver().driver_context(
global,
|drv| drv.commit_event(txn),
|| {},
)?;
let mut mutator = model.model_mutator();
removed.iter().for_each(|field_id| {
mutator.remove_field(field_id.as_str());
@ -309,10 +311,11 @@ impl ModelData {
&updated,
);
// commit txn
global
.state()
.gns_driver()
.driver_context(|drv| drv.commit_event(txn), || {})?;
global.state().gns_driver().driver_context(
global,
|drv| drv.commit_event(txn),
|| {},
)?;
let mut mutator = model.model_mutator();
updated.into_iter().for_each(|(field_id, field)| {
mutator.update_field(field_id.as_ref(), field);

@ -320,6 +320,7 @@ impl ModelData {
)?;
// commit txn
global.state().gns_driver().driver_context(
global,
|drv| drv.commit_event(txn),
|| {
global.taskmgr_post_standard_priority(Task::new(
@ -385,10 +386,11 @@ impl ModelData {
.value_u64(),
));
// commit txn
global
.state()
.gns_driver()
.driver_context(|drv| drv.commit_event(txn), || {})?;
global.state().gns_driver().driver_context(
global,
|drv| drv.commit_event(txn),
|| {},
)?;
// request cleanup
global.purge_model_driver(
space_name,

@ -169,6 +169,7 @@ impl Space {
global.initialize_space(&space_name, space.get_uuid())?;
// commit txn
global.state().gns_driver().driver_context(
global,
|drv| drv.commit_event(txn),
|| {
global.taskmgr_post_standard_priority(Task::new(GenericTask::delete_space_dir(
@ -216,10 +217,11 @@ impl Space {
);
// commit
// commit txn
global
.state()
.gns_driver()
.driver_context(|drv| drv.commit_event(txn), || {})?;
global.state().gns_driver().driver_context(
global,
|drv| drv.commit_event(txn),
|| {},
)?;
// merge
dict::rmerge_data_with_patch(space.props_mut(), patch);
// the `env` key may have been popped, so put it back (setting `env: null` removes the env key and we don't want to waste time enforcing this in the
@ -255,10 +257,11 @@ impl Space {
let txn =
txn::gns::space::DropSpaceTxn::new(SpaceIDRef::new(&space_name, &space));
// commit txn
global
.state()
.gns_driver()
.driver_context(|drv| drv.commit_event(txn), || {})?;
global.state().gns_driver().driver_context(
global,
|drv| drv.commit_event(txn),
|| {},
)?;
// request cleanup
global.taskmgr_post_standard_priority(Task::new(
GenericTask::delete_space_dir(&space_name, space.get_uuid()),
@ -301,10 +304,11 @@ impl Space {
// prepare txn
let txn = txn::gns::space::DropSpaceTxn::new(SpaceIDRef::new(&space_name, &space));
// commit txn
global
.state()
.gns_driver()
.driver_context(|drv| drv.commit_event(txn), || {})?;
global.state().gns_driver().driver_context(
global,
|drv| drv.commit_event(txn),
|| {},
)?;
// request cleanup
global.taskmgr_post_standard_priority(Task::new(GenericTask::delete_space_dir(
&space_name,

@ -132,6 +132,7 @@ impl SystemDatabase {
}
let password_hash = rcrypt::hash(password, rcrypt::DEFAULT_COST).unwrap();
global.state().gns_driver().driver_context(
global,
|drv| drv.commit_event(CreateUserTxn::new(&username, &password_hash)),
|| {},
)?;
@ -148,6 +149,7 @@ impl SystemDatabase {
Some(user) => {
let password_hash = rcrypt::hash(password, rcrypt::DEFAULT_COST).unwrap();
global.state().gns_driver().driver_context(
global,
|drv| drv.commit_event(AlterUserTxn::new(username, &password_hash)),
|| {},
)?;
@ -162,10 +164,11 @@ impl SystemDatabase {
if !users.contains_key(username) {
return Err(QueryError::SysAuthError);
}
global
.state()
.gns_driver()
.driver_context(|drv| drv.commit_event(DropUserTxn::new(username)), || {})?;
global.state().gns_driver().driver_context(
global,
|drv| drv.commit_event(DropUserTxn::new(username)),
|| {},
)?;
let _ = users.remove(username);
Ok(())
}

@ -220,5 +220,6 @@ enumerate_err! {
RawJournalEventCorrupted = "journal-invalid-event",
RawJournalCorrupted = "journal-corrupted",
RawJournalInvalidEvent = "journal-invalid-event-order",
RawJournalRuntimeCriticalLwtHBFail = "journal-lwt-heartbeat-failed",
}
}

@ -25,10 +25,11 @@
*/
use {
super::util,
super::{util, GlobalInstanceLike},
crate::{
engine::{
error::{QueryError, QueryResult, RuntimeResult},
fractal::{CriticalTask, Task},
storage::{GNSDriver, ModelDriver},
},
util::compiler,
@ -50,8 +51,12 @@ impl FractalGNSDriver {
txn_driver: Mutex::new(txn_driver),
}
}
pub(super) fn status(&self) -> &util::Status {
&self.status
}
pub fn driver_context<T>(
&self,
g: &impl GlobalInstanceLike,
f: impl Fn(&mut GNSDriver) -> RuntimeResult<T>,
on_failure: impl Fn(),
) -> QueryResult<T> {
@ -65,6 +70,7 @@ impl FractalGNSDriver {
error!("GNS driver failed with: {e}");
self.status.set_iffy();
on_failure();
g.taskmgr_post_high_priority(Task::new(CriticalTask::CheckGNSDriver));
Err(QueryError::SysServerError)
}),
}

@ -25,7 +25,7 @@
*/
use {
super::ModelUniqueID,
super::{ModelUniqueID, ModelUniqueIDRef},
crate::{
engine::{
core::{
@ -108,6 +108,9 @@ impl GenericTask {
pub enum CriticalTask {
/// Write a new data batch
WriteBatch(ModelUniqueID, usize),
/// try recovering model ID
TryModelAutorecoverLWT(ModelUniqueID),
CheckGNSDriver,
}
/// The task manager
@ -288,6 +291,55 @@ impl FractalMgr {
) {
// TODO(@ohsayan): check threshold and update hooks
match task {
CriticalTask::CheckGNSDriver => {
info!("trying to autorecover GNS driver");
match global
.state()
.gns_driver()
.txn_driver
.lock()
.__lwt_heartbeat()
{
Ok(()) => {
info!("GNS driver has been successfully auto-recovered");
global.state().gns_driver().status().set_okay();
}
Err(e) => {
error!("failed to autorecover GNS driver with error `{e}`. will try again");
self.hp_dispatcher
.send(Task::new(CriticalTask::CheckGNSDriver))
.unwrap();
}
}
}
CriticalTask::TryModelAutorecoverLWT(mdl_id) => {
info!("trying to autorecover model {mdl_id}");
match global
.state()
.namespace()
.idx_models()
.read()
.get(&EntityIDRef::new(mdl_id.space(), mdl_id.model()))
{
Some(mdl) if mdl.data().get_uuid() == mdl_id.uuid() => {
let mut drv = mdl.driver().batch_driver().lock();
let drv = drv.as_mut().unwrap();
match drv.__lwt_heartbeat() {
Ok(()) => {
mdl.driver().status().set_okay();
info!("model driver for {mdl_id} has been successfully auto-recovered");
}
Err(e) => {
error!("failed to autorecover {mdl_id} with {e}. will try again");
self.hp_dispatcher
.send(Task::new(CriticalTask::TryModelAutorecoverLWT(mdl_id)))
.unwrap()
}
}
}
Some(_) | None => {}
}
}
CriticalTask::WriteBatch(model_id, observed_size) => {
info!("fhp: {model_id} has reached cache capacity. writing to disk");
let mdl_read = global.state().namespace().idx_models().read();
@ -295,16 +347,18 @@ impl FractalMgr {
model_id.space().into(),
model_id.model().into(),
)) {
Some(mdl) if mdl.data().get_uuid() != model_id.uuid() => {
// this is a different model with the same entity path
Some(mdl) if mdl.data().get_uuid() == model_id.uuid() => mdl,
Some(_) | None => {
// this is a different model with the same entity path or it was deleted but the task was queued
return;
}
Some(mdl) => mdl,
None => {
panic!("found deleted model")
}
};
match Self::try_write_model_data_batch(mdl.data(), observed_size, mdl.driver()) {
match self.try_write_model_data_batch(
ModelUniqueIDRef::from(&model_id),
mdl.data(),
observed_size,
mdl.driver(),
) {
Ok(()) => {
if observed_size != 0 {
info!("fhp: completed maintenance task for {model_id}, synced={observed_size}")
@ -390,7 +444,12 @@ impl FractalMgr {
.data()
.delta_state()
.__fractal_take_full_from_data_delta(super::FractalToken::new());
match Self::try_write_model_data_batch(model.data(), observed_len, model.driver()) {
match self.try_write_model_data_batch(
ModelUniqueIDRef::new(model_id.space(), model_id.entity(), model.data().get_uuid()),
model.data(),
observed_len,
model.driver(),
) {
Ok(()) => {
if observed_len != 0 {
info!(
@ -428,6 +487,8 @@ impl FractalMgr {
///
/// The zero check is essential
fn try_write_model_data_batch(
&'static self,
mdl_id: ModelUniqueIDRef,
model: &ModelData,
observed_size: usize,
mdl_driver_: &super::drivers::FractalModelDriver,
@ -456,6 +517,11 @@ impl FractalMgr {
)
.map_err(|e| {
mdl_driver_.status().set_iffy();
self.hp_dispatcher
.send(Task::new(CriticalTask::TryModelAutorecoverLWT(
mdl_id.into(),
)))
.unwrap();
(e, BatchStats::into_inner(batch_stats))
})
}

@ -277,12 +277,36 @@ pub struct ModelUniqueID {
uuid: Uuid,
}
pub struct ModelUniqueIDRef<'a> {
space: &'a str,
model: &'a str,
uuid: Uuid,
}
impl<'a> ModelUniqueIDRef<'a> {
pub fn new(space: &'a str, model: &'a str, uuid: Uuid) -> Self {
Self { space, model, uuid }
}
}
impl fmt::Display for ModelUniqueID {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "model-{}@{}", self.model(), self.space())
}
}
impl<'a> From<ModelUniqueIDRef<'a>> for ModelUniqueID {
fn from(uid: ModelUniqueIDRef<'a>) -> Self {
Self::new(uid.space, uid.model, uid.uuid)
}
}
impl<'a> From<&'a ModelUniqueID> for ModelUniqueIDRef<'a> {
fn from(uid: &'a ModelUniqueID) -> Self {
Self::new(uid.space(), uid.model(), uid.uuid())
}
}
impl ModelUniqueID {
/// Create a new unique model ID
pub fn new(space: &str, model: &str, uuid: Uuid) -> Self {

@ -127,6 +127,8 @@ impl GlobalInstanceLike for TestGlobal {
.commit_with_ctx(StdModelBatch::new(mdl.data(), count), BatchStats::new())
.unwrap()
}
CriticalTask::TryModelAutorecoverLWT(_) => {}
CriticalTask::CheckGNSDriver => {}
}
}
fn taskmgr_post_standard_priority(&self, task: Task<GenericTask>) {

@ -24,8 +24,6 @@
*
*/
#![allow(unused)]
use std::sync::atomic::{AtomicBool, Ordering};
#[derive(Debug)]
@ -37,9 +35,6 @@ impl Status {
pub const fn new_okay() -> Self {
Self::new(true)
}
pub const fn new_iffy() -> Self {
Self::new(false)
}
const fn new(v: bool) -> Self {
Self {
okay: AtomicBool::new(v),

@ -332,6 +332,20 @@ impl<
fn available_capacity(&self) -> usize {
self.buf.remaining_capacity()
}
pub fn verify_cursor(&mut self) -> IoResult<()> {
let cursor = self.f_d.f_cursor()?;
if self.cursor() == cursor {
Ok(())
} else {
Err(std::io::Error::new(
std::io::ErrorKind::Other,
"file cursor is out of sync. unreliable file system",
))
}
}
pub fn __zero_buffer(&mut self) {
self.buf.clear()
}
}
impl<

@ -563,6 +563,19 @@ impl<J: RawJournalAdapter> RawJournalWriter<J> {
{
self.commit_with_ctx(event, Default::default())
}
/// WARNING: ONLY CALL AFTER A FAILURE EVENT. THIS WILL EMPTY THE UNFLUSHED BUFFER
pub fn __lwt_heartbeat(&mut self) -> RuntimeResult<()> {
// verify that the on disk cursor is the same as what we know
self.log_file.verify_cursor()?;
if self.log_file.cursor() == self.known_txn_offset {
// great, so if there was something in the buffer, simply ignore it
self.log_file.__zero_buffer();
Ok(())
} else {
// so, the on-disk file probably has some partial state. this is bad. throw an error
Err(StorageError::RawJournalRuntimeCriticalLwtHBFail.into())
}
}
}
impl<J: RawJournalAdapter> RawJournalWriter<J> {

Loading…
Cancel
Save