From f7c6ba1fb0124acd5f3a1077c0cf41708376e761 Mon Sep 17 00:00:00 2001 From: Sayan Nandan Date: Sun, 25 Feb 2024 21:53:05 +0530 Subject: [PATCH] Use driver contexts and improve model data batch recovery This implementation revises the way we handle errors for both data delta sync and that of direct driver events. For the GNS: we now check the instance state before doing anything at all. This is much better because we don't want to aggravate the on-disk situation which is probably already in a very bad layout. The same for the model driver: we check the instance's state before doing anything at all. We also utilized a driver state to check what was synced, following the changes that were made in an earlier commit (skytable/skytable@e28d94e) where we only load the delta that failed to sync back to the model's delta state. --- server/src/engine/core/model/alt.rs | 15 ++- server/src/engine/core/model/mod.rs | 19 ++-- server/src/engine/core/space.rs | 30 +++-- server/src/engine/core/system_db.rs | 59 +++------- server/src/engine/fractal/drivers.rs | 47 ++++---- server/src/engine/fractal/mgr.rs | 107 +++++++++++------- server/src/engine/fractal/test_utils.rs | 4 +- server/src/engine/storage/mod.rs | 5 +- server/src/engine/storage/v2/impls/gns_log.rs | 2 +- .../engine/storage/v2/impls/mdl_journal.rs | 33 +++++- server/src/engine/storage/v2/mod.rs | 4 +- .../src/engine/storage/v2/raw/journal/mod.rs | 10 +- .../engine/storage/v2/raw/journal/raw/mod.rs | 31 +++-- .../storage/v2/raw/journal/raw/tests.rs | 6 +- .../engine/storage/v2/raw/journal/tests.rs | 15 ++- 15 files changed, 240 insertions(+), 147 deletions(-) diff --git a/server/src/engine/core/model/alt.rs b/server/src/engine/core/model/alt.rs index d53c297f..483e215e 100644 --- a/server/src/engine/core/model/alt.rs +++ b/server/src/engine/core/model/alt.rs @@ -273,7 +273,10 @@ impl Model { &new_fields, ); // commit txn - global.gns_driver().lock().driver().commit_event(txn)?; + global + .gns_driver() + .lock() + .driver_context(|drv| drv.commit_event(txn), || {})?; } let mut mutator = model.model_mutator(); new_fields @@ -291,7 +294,10 @@ impl Model { &removed, ); // commit txn - global.gns_driver().lock().driver().commit_event(txn)?; + global + .gns_driver() + .lock() + .driver_context(|drv| drv.commit_event(txn), || {})?; } let mut mutator = model.model_mutator(); removed.iter().for_each(|field_id| { @@ -306,7 +312,10 @@ impl Model { &updated, ); // commit txn - global.gns_driver().lock().driver().commit_event(txn)?; + global + .gns_driver() + .lock() + .driver_context(|drv| drv.commit_event(txn), || {})?; } let mut mutator = model.model_mutator(); updated.into_iter().for_each(|(field_id, field)| { diff --git a/server/src/engine/core/model/mod.rs b/server/src/engine/core/model/mod.rs index c2ed1477..93d601c9 100644 --- a/server/src/engine/core/model/mod.rs +++ b/server/src/engine/core/model/mod.rs @@ -294,10 +294,9 @@ impl Model { model.get_uuid(), )?; // commit txn - match txn_driver.driver().commit_event(txn) { - Ok(()) => {} - Err(e) => { - // failed to commit, request cleanup + txn_driver.driver_context( + |drv| drv.commit_event(txn), + || { global.taskmgr_post_standard_priority(Task::new( GenericTask::delete_model_dir( &space_name, @@ -305,10 +304,9 @@ impl Model { &model_name, model.get_uuid(), ), - )); - return Err(e.into()); - } - } + )) + }, + )?; } // update global state let _ = space.models_mut().insert(model_name.into()); @@ -358,7 +356,10 @@ impl Model { model.delta_state().schema_current_version().value_u64(), )); // commit txn - global.gns_driver().lock().driver().commit_event(txn)?; + global + .gns_driver() + .lock() + .driver_context(|drv| drv.commit_event(txn), || {})?; // request cleanup global.purge_model_driver( space_name, diff --git a/server/src/engine/core/space.rs b/server/src/engine/core/space.rs index c61f1e7f..e70fd2cc 100644 --- a/server/src/engine/core/space.rs +++ b/server/src/engine/core/space.rs @@ -176,16 +176,14 @@ impl Space { space.get_uuid(), ))?; // commit txn - match global.gns_driver().lock().driver().commit_event(txn) { - Ok(()) => {} - Err(e) => { - // tell fractal to clean it up sometime + global.gns_driver().lock().driver_context( + |drv| drv.commit_event(txn), + || { global.taskmgr_post_standard_priority(Task::new( GenericTask::delete_space_dir(&space_name, space.get_uuid()), - )); - return Err(e.into()); - } - } + )) + }, + )?; } // update global state let _ = spaces.st_insert(space_name, space); @@ -223,7 +221,11 @@ impl Space { &patch, ); // commit - global.gns_driver().lock().driver().commit_event(txn)?; + // commit txn + global + .gns_driver() + .lock() + .driver_context(|drv| drv.commit_event(txn), || {})?; } // merge dict::rmerge_data_with_patch(space.props_mut(), patch); @@ -258,7 +260,10 @@ impl Space { let txn = txn::gns::space::DropSpaceTxn::new(SpaceIDRef::new(&space_name, &space)); // commit txn - global.gns_driver().lock().driver().commit_event(txn)?; + global + .gns_driver() + .lock() + .driver_context(|drv| drv.commit_event(txn), || {})?; // request cleanup global.taskmgr_post_standard_priority(Task::new( GenericTask::delete_space_dir(&space_name, space.get_uuid()), @@ -305,7 +310,10 @@ impl Space { let txn = txn::gns::space::DropSpaceTxn::new(SpaceIDRef::new(&space_name, &space)); // commit txn - global.gns_driver().lock().driver().commit_event(txn)?; + global + .gns_driver() + .lock() + .driver_context(|drv| drv.commit_event(txn), || {})?; // request cleanup global.taskmgr_post_standard_priority(Task::new( GenericTask::delete_space_dir(&space_name, space.get_uuid()), diff --git a/server/src/engine/core/system_db.rs b/server/src/engine/core/system_db.rs index 09e36017..a96debe8 100644 --- a/server/src/engine/core/system_db.rs +++ b/server/src/engine/core/system_db.rs @@ -131,21 +131,12 @@ impl SystemDatabase { return Err(QueryError::SysAuthError); } let password_hash = rcrypt::hash(password, rcrypt::DEFAULT_COST).unwrap(); - match global - .gns_driver() - .lock() - .driver() - .commit_event(CreateUserTxn::new(&username, &password_hash)) - { - Ok(()) => { - users.insert(username, User::new(password_hash.into_boxed_slice())); - Ok(()) - } - Err(e) => { - error!("failed to create user: {e}"); - return Err(QueryError::SysTransactionalError); - } - } + global.gns_driver().lock().driver_context( + |drv| drv.commit_event(CreateUserTxn::new(&username, &password_hash)), + || {}, + )?; + users.insert(username, User::new(password_hash.into_boxed_slice())); + Ok(()) } pub fn alter_user( &self, @@ -156,21 +147,12 @@ impl SystemDatabase { match self.users.write().get_mut(username) { Some(user) => { let password_hash = rcrypt::hash(password, rcrypt::DEFAULT_COST).unwrap(); - match global - .gns_driver() - .lock() - .driver() - .commit_event(AlterUserTxn::new(username, &password_hash)) - { - Ok(()) => { - user.phash = password_hash.into_boxed_slice(); - Ok(()) - } - Err(e) => { - error!("failed to alter user: {e}"); - Err(QueryError::SysTransactionalError) - } - } + global.gns_driver().lock().driver_context( + |drv| drv.commit_event(AlterUserTxn::new(username, &password_hash)), + || {}, + )?; + user.phash = password_hash.into_boxed_slice(); + Ok(()) } None => Err(QueryError::SysAuthError), } @@ -180,20 +162,11 @@ impl SystemDatabase { if !users.contains_key(username) { return Err(QueryError::SysAuthError); } - match global + global .gns_driver() .lock() - .driver() - .commit_event(DropUserTxn::new(username)) - { - Ok(()) => { - let _ = users.remove(username); - Ok(()) - } - Err(e) => { - error!("failed to remove user: {e}"); - Err(QueryError::SysTransactionalError) - } - } + .driver_context(|drv| drv.commit_event(DropUserTxn::new(username)), || {})?; + let _ = users.remove(username); + Ok(()) } } diff --git a/server/src/engine/fractal/drivers.rs b/server/src/engine/fractal/drivers.rs index a73a0f28..8c8f3595 100644 --- a/server/src/engine/fractal/drivers.rs +++ b/server/src/engine/fractal/drivers.rs @@ -26,9 +26,12 @@ use { super::{util, ModelUniqueID}, - crate::engine::{ - error::RuntimeResult, - storage::{safe_interfaces::FSInterface, GNSDriver, ModelDriver}, + crate::{ + engine::{ + error::{QueryError, QueryResult, RuntimeResult}, + storage::{safe_interfaces::FSInterface, GNSDriver, ModelDriver}, + }, + util::compiler, }, parking_lot::{Mutex, RwLock}, std::{collections::HashMap, sync::Arc}, @@ -36,7 +39,6 @@ use { /// GNS driver pub struct FractalGNSDriver { - #[allow(unused)] status: util::Status, pub(super) txn_driver: GNSDriver, } @@ -48,8 +50,23 @@ impl FractalGNSDriver { txn_driver: txn_driver, } } - pub fn driver(&mut self) -> &mut GNSDriver { - &mut self.txn_driver + pub fn driver_context( + &mut self, + f: impl Fn(&mut GNSDriver) -> RuntimeResult, + on_failure: impl Fn(), + ) -> QueryResult { + if self.status.is_iffy() { + return Err(QueryError::SysServerError); + } + match f(&mut self.txn_driver) { + Ok(v) => Ok(v), + Err(e) => compiler::cold_call(|| { + error!("GNS driver failed with: {e}"); + self.status.set_iffy(); + on_failure(); + Err(QueryError::SysServerError) + }), + } } } @@ -86,18 +103,20 @@ impl ModelDrivers { /// Model driver pub struct FractalModelDriver { - #[allow(unused)] - hooks: Arc, + status: Arc, batch_driver: Mutex>, } impl FractalModelDriver { pub(in crate::engine::fractal) fn init(batch_driver: ModelDriver) -> Self { Self { - hooks: Arc::new(FractalModelHooks::new()), + status: Arc::new(util::Status::new_okay()), batch_driver: Mutex::new(batch_driver), } } + pub fn status(&self) -> &util::Status { + &self.status + } /// Returns a reference to the batch persist driver pub fn batch_driver(&self) -> &Mutex> { &self.batch_driver @@ -106,13 +125,3 @@ impl FractalModelDriver { ModelDriver::close_driver(&mut self.batch_driver.into_inner()) } } - -/// Model hooks -#[derive(Debug)] -pub struct FractalModelHooks; - -impl FractalModelHooks { - fn new() -> Self { - Self - } -} diff --git a/server/src/engine/fractal/mgr.rs b/server/src/engine/fractal/mgr.rs index 01a16f95..9063a7a7 100644 --- a/server/src/engine/fractal/mgr.rs +++ b/server/src/engine/fractal/mgr.rs @@ -33,7 +33,11 @@ use { EntityIDRef, }, data::uuid::Uuid, - storage::safe_interfaces::{paths_v1, LocalFS, StdModelBatch}, + error::ErrorKind, + storage::{ + safe_interfaces::{paths_v1, LocalFS, StdModelBatch}, + BatchStats, + }, }, util::os, }, @@ -291,32 +295,39 @@ impl FractalMgr { // was way behind in the queue return; }; - let res = global._namespace().with_model( - EntityIDRef::new(model_id.space().into(), model_id.model().into()), - |model| { - if model.get_uuid() != model_id.uuid() { - // once again, throughput maximization will lead to, in extremely rare cases, this - // branch returning. but it is okay - return Ok(()); - } - Self::try_write_model_data_batch(model, observed_size, mdl_driver) - }, - ); - match res { + let mdl_read = global._namespace().idx_models().read(); + let mdl = match mdl_read.get(&EntityIDRef::new( + model_id.space().into(), + model_id.model().into(), + )) { + Some(mdl) if mdl.get_uuid() != model_id.uuid() => { + // so the model driver was not removed, neither was the model *yet* but we happened to find the task + // just return + return; + } + Some(mdl) => mdl, + None => { + panic!("found deleted model") + } + }; + match Self::try_write_model_data_batch(mdl, observed_size, mdl_driver) { Ok(()) => { if observed_size != 0 { info!("fhp: completed maintenance task for {model_id}, synced={observed_size}") } } - Err(_) => { + Err((err, stats)) => { error!( - "fhp: error writing data batch for model {}. retrying ...", + "fhp: failed to sync data deltas for model {} with {err}. retrying ...", model_id.uuid() ); // enqueue again for retrying self.hp_dispatcher .send(Task::with_threshold( - CriticalTask::WriteBatch(model_id, observed_size), + CriticalTask::WriteBatch( + model_id, + observed_size - stats.get_actual(), + ), threshold - 1, )) .unwrap(); @@ -382,23 +393,25 @@ impl FractalMgr { fn general_executor(&'static self, global: super::Global) { let mdl_drivers = global.get_state().get_mdl_drivers().drivers().read(); for (model_id, driver) in mdl_drivers.iter() { - let mut observed_len = 0; - let res = global._namespace().with_model( - EntityIDRef::new(model_id.space().into(), model_id.model().into()), - |model| { - if model.get_uuid() != model_id.uuid() { - // once again, throughput maximization will lead to, in extremely rare cases, this - // branch returning. but it is okay - return Ok(()); - } - // mark that we're taking these deltas - observed_len = model - .delta_state() - .__fractal_take_full_from_data_delta(super::FractalToken::new()); - Self::try_write_model_data_batch(model, observed_len, driver) - }, - ); - match res { + let mdl_read = global._namespace().idx_models().read(); + let mdl = match mdl_read.get(&EntityIDRef::new( + model_id.space().into(), + model_id.model().into(), + )) { + Some(mdl) if mdl.get_uuid() != model_id.uuid() => { + // so the model driver was not removed, neither was the model *yet* but we happened to find the task + // just return + return; + } + Some(mdl) => mdl, + None => { + panic!("found deleted model") + } + }; + let observed_len = mdl + .delta_state() + .__fractal_take_full_from_data_delta(super::FractalToken::new()); + match Self::try_write_model_data_batch(mdl, observed_len, driver) { Ok(()) => { if observed_len != 0 { info!( @@ -406,12 +419,13 @@ impl FractalMgr { ) } } - Err(_) => { + Err((e, stats)) => { + info!("flp: failed to sync data for {model_id} with {e}. promoting to higher priority"); // this failure is *not* good, so we want to promote this to a critical task self.hp_dispatcher .send(Task::new(CriticalTask::WriteBatch( model_id.clone(), - observed_len, + observed_len - stats.get_actual(), ))) .unwrap() } @@ -429,14 +443,31 @@ impl FractalMgr { model: &Model, observed_size: usize, mdl_driver: &super::drivers::FractalModelDriver, - ) -> crate::engine::error::QueryResult<()> { + ) -> Result<(), (super::error::Error, BatchStats)> { + if mdl_driver.status().is_iffy() { + // don't mess this up any further + return Err(( + super::error::Error::from(ErrorKind::Other( + "model driver is in dirty state".into(), + )), + BatchStats::into_inner(BatchStats::new()), + )); + } if observed_size == 0 { // no changes, all good return Ok(()); } // try flushing the batch + let batch_stats = BatchStats::new(); let mut batch_driver = mdl_driver.batch_driver().lock(); - batch_driver.commit_event(StdModelBatch::new(model, observed_size))?; - Ok(()) + batch_driver + .commit_with_ctx( + StdModelBatch::new(model, observed_size), + batch_stats.clone(), + ) + .map_err(|e| { + mdl_driver.status().set_iffy(); + (e, BatchStats::into_inner(batch_stats)) + }) } } diff --git a/server/src/engine/fractal/test_utils.rs b/server/src/engine/fractal/test_utils.rs index f78db5f9..2d9d89ae 100644 --- a/server/src/engine/fractal/test_utils.rs +++ b/server/src/engine/fractal/test_utils.rs @@ -36,7 +36,7 @@ use { fractal::drivers::FractalModelDriver, storage::{ safe_interfaces::{paths_v1, FSInterface, NullFS, StdModelBatch, VirtualFS}, - GNSDriver, ModelDriver, + BatchStats, GNSDriver, ModelDriver, }, RuntimeResult, }, @@ -152,7 +152,7 @@ impl GlobalInstanceLike for TestGlobal { .unwrap() .batch_driver() .lock() - .commit_event(StdModelBatch::new(mdl, count)) + .commit_with_ctx(StdModelBatch::new(mdl, count), BatchStats::new()) .unwrap(); } } diff --git a/server/src/engine/storage/mod.rs b/server/src/engine/storage/mod.rs index b0ad9bce..4e96f357 100644 --- a/server/src/engine/storage/mod.rs +++ b/server/src/engine/storage/mod.rs @@ -57,7 +57,10 @@ pub mod safe_interfaces { loader impl */ -pub use v2::impls::{gns_log::GNSDriver, mdl_journal::ModelDriver}; +pub use v2::impls::{ + gns_log::GNSDriver, + mdl_journal::{BatchStats, ModelDriver}, +}; pub struct SELoaded { pub gns: GlobalNS, diff --git a/server/src/engine/storage/v2/impls/gns_log.rs b/server/src/engine/storage/v2/impls/gns_log.rs index 9df2cd81..d7adec67 100644 --- a/server/src/engine/storage/v2/impls/gns_log.rs +++ b/server/src/engine/storage/v2/impls/gns_log.rs @@ -107,7 +107,7 @@ impl JournalAdapterEvent> for T { fn md(&self) -> u64 { ::CODE.dscr_u64() } - fn write_buffered(self, b: &mut Vec) { + fn write_buffered(self, b: &mut Vec, _: ()) { T::encode_event(self, b) } } diff --git a/server/src/engine/storage/v2/impls/mdl_journal.rs b/server/src/engine/storage/v2/impls/mdl_journal.rs index d833dbf0..3f26fd79 100644 --- a/server/src/engine/storage/v2/impls/mdl_journal.rs +++ b/server/src/engine/storage/v2/impls/mdl_journal.rs @@ -60,7 +60,11 @@ use { }, crossbeam_epoch::{pin, Guard}, sky_macros::TaggedEnum, - std::collections::{hash_map::Entry as HMEntry, HashMap}, + std::{ + cell::RefCell, + collections::{hash_map::Entry as HMEntry, HashMap}, + rc::Rc, + }, }; pub type ModelDriver = BatchDriver; @@ -216,6 +220,7 @@ impl<'a, 'b, Fs: FSInterface> BatchWriter<'a, 'b, Fs> { Fs::File, as RawJournalAdapter>::Spec, >, + batch_stat: &mut BatchStats, ) -> RuntimeResult { /* go over each delta, check if inconsistent and apply if not. if any delta sync fails, we enqueue the delta again. @@ -237,6 +242,7 @@ impl<'a, 'b, Fs: FSInterface> BatchWriter<'a, 'b, Fs> { Err(e) => { // errored, so push this back in; we have written and flushed all prior deltas me.model.delta_state().append_new_data_delta(delta, me.g); + batch_stat.set_actual(i); return Err(e); } } @@ -310,11 +316,13 @@ impl<'a> JournalAdapterEvent> for StdModelBatch<' Fs::File, as RawJournalAdapter>::Spec, >, + ctx: Rc>, ) -> RuntimeResult<()> { // [expected commit] writer.dtrack_write(&self.1.u64_bytes_le())?; let g = pin(); - let actual_commit = BatchWriter::::write_batch(self.0, &g, self.1, writer)?; + let actual_commit = + BatchWriter::::write_batch(self.0, &g, self.1, writer, &mut ctx.borrow_mut())?; if actual_commit != self.1 { // early exit writer.dtrack_write(&[EventType::EarlyExit.dscr()])?; @@ -341,6 +349,7 @@ impl<'a> JournalAdapterEvent> for FullModel<'a> { Fs::File, as RawJournalAdapter>::Spec, >, + _: Rc>, ) -> RuntimeResult<()> { let g = pin(); let mut row_writer: RowWriter<'_, Fs> = RowWriter { f }; @@ -410,6 +419,25 @@ impl DecodedBatchEvent { } } +pub struct BatchStats { + actual_commit: usize, +} + +impl BatchStats { + pub fn new() -> Rc> { + Rc::new(RefCell::new(Self { actual_commit: 0 })) + } + pub fn into_inner(me: Rc>) -> Self { + RefCell::into_inner(Rc::into_inner(me).unwrap()) + } + fn set_actual(&mut self, new: usize) { + self.actual_commit = new; + } + pub fn get_actual(&self) -> usize { + self.actual_commit + } +} + impl BatchAdapterSpec for ModelDataAdapter { type Spec = ModelDataBatchAofV1; type GlobalState = Model; @@ -417,6 +445,7 @@ impl BatchAdapterSpec for ModelDataAdapter { type EventType = EventType; type BatchMetadata = BatchMetadata; type BatchState = BatchRestoreState; + type CommitContext = Rc>; fn is_early_exit(event_type: &Self::EventType) -> bool { EventType::EarlyExit.eq(event_type) } diff --git a/server/src/engine/storage/v2/mod.rs b/server/src/engine/storage/v2/mod.rs index 65c8bf6f..45859500 100644 --- a/server/src/engine/storage/v2/mod.rs +++ b/server/src/engine/storage/v2/mod.rs @@ -25,7 +25,7 @@ */ use { - self::impls::mdl_journal::FullModel, + self::impls::mdl_journal::{BatchStats, FullModel}, super::{ common::interface::{fs_imp::LocalFS, fs_traits::FSInterface}, v1, SELoaded, @@ -88,7 +88,7 @@ pub fn recreate(gns: GlobalNS) -> RuntimeResult { model_id.entity(), model, ))?; - model_driver.commit_event(FullModel::new(model))?; + model_driver.commit_with_ctx(FullModel::new(model), BatchStats::new())?; model_drivers.add_driver( ModelUniqueID::new(model_id.space(), model_id.entity(), model.get_uuid()), model_driver, diff --git a/server/src/engine/storage/v2/raw/journal/mod.rs b/server/src/engine/storage/v2/raw/journal/mod.rs index 8f11a222..59307f88 100644 --- a/server/src/engine/storage/v2/raw/journal/mod.rs +++ b/server/src/engine/storage/v2/raw/journal/mod.rs @@ -92,6 +92,7 @@ impl RawJournalAdapter for EventLogAdapter { type GlobalState = ::GlobalState; type Context<'a> = () where Self: 'a; type EventMeta = ::EventMeta; + type CommitContext = (); fn initialize(_: &raw::JournalInitializer) -> Self { Self(PhantomData) } @@ -106,12 +107,13 @@ impl RawJournalAdapter for EventLogAdapter { &mut self, w: &mut TrackedWriter, ev: E, + ctx: (), ) -> RuntimeResult<()> where E: RawJournalAdapterEvent, { let mut pl = vec![]; - ev.write_buffered(&mut pl); + ev.write_buffered(&mut pl, ctx); let plen = (pl.len() as u64).to_le_bytes(); let mut checksum = SCrc64::new(); checksum.update(&plen); @@ -207,6 +209,8 @@ pub trait BatchAdapterSpec { type BatchMetadata; /// batch state type BatchState; + /// commit context + type CommitContext; /// return true if the given event tag indicates an early exit fn is_early_exit(event_type: &Self::EventType) -> bool; /// initialize the batch state @@ -245,6 +249,7 @@ impl RawJournalAdapter for BatchAdapter { type GlobalState = ::GlobalState; type Context<'a> = () where Self: 'a; type EventMeta = ::BatchType; + type CommitContext = ::CommitContext; fn initialize(_: &raw::JournalInitializer) -> Self { Self(PhantomData) } @@ -259,11 +264,12 @@ impl RawJournalAdapter for BatchAdapter { &mut self, w: &mut TrackedWriter, ev: E, + ctx: Self::CommitContext, ) -> RuntimeResult<()> where E: RawJournalAdapterEvent, { - ev.write_direct::(w)?; + ev.write_direct::(w, ctx)?; let checksum = w.reset_partial(); w.tracked_write(&checksum.to_le_bytes()) } diff --git a/server/src/engine/storage/v2/raw/journal/raw/mod.rs b/server/src/engine/storage/v2/raw/journal/raw/mod.rs index c28cf2db..5a853b55 100644 --- a/server/src/engine/storage/v2/raw/journal/raw/mod.rs +++ b/server/src/engine/storage/v2/raw/journal/raw/mod.rs @@ -215,10 +215,11 @@ pub trait RawJournalAdapterEvent: Sized { fn write_direct( self, _: &mut TrackedWriter::Spec>, + _: ::CommitContext, ) -> RuntimeResult<()> { unimplemented!() } - fn write_buffered(self, _: &mut Vec) { + fn write_buffered<'a>(self, _: &mut Vec, _: ::CommitContext) { unimplemented!() } } @@ -239,6 +240,8 @@ pub trait RawJournalAdapter: Sized { type Context<'a> where Self: 'a; + /// any external context to use during commit. can be used by events + type CommitContext; /// a type representing the event kind type EventMeta; /// initialize this adapter @@ -250,10 +253,11 @@ pub trait RawJournalAdapter: Sized { /// parse event metadata fn parse_event_meta(meta: u64) -> Option; /// commit event (direct preference) - fn commit_direct<'a, Fs: FSInterface, E>( + fn commit_direct( &mut self, _: &mut TrackedWriter, _: E, + _: Self::CommitContext, ) -> RuntimeResult<()> where E: RawJournalAdapterEvent, @@ -261,7 +265,7 @@ pub trait RawJournalAdapter: Sized { unimplemented!() } /// commit event (buffered) - fn commit_buffered<'a, E>(&mut self, _: &mut Vec, _: E) + fn commit_buffered(&mut self, _: &mut Vec, _: E, _: Self::CommitContext) where E: RawJournalAdapterEvent, { @@ -499,13 +503,10 @@ impl RawJournalWriter { } Ok(me) } - /// Commit a new event to the journal - /// - /// This will auto-flush the buffer and sync metadata as soon as the [`RawJournalAdapter::commit`] method returns, - /// unless otherwise configured. - pub fn commit_event<'a, E: RawJournalAdapterEvent>( + pub fn commit_with_ctx<'a, E: RawJournalAdapterEvent>( &mut self, event: E, + ctx: J::CommitContext, ) -> RuntimeResult<()> { self.txn_context(|me, txn_id| { let ev_md = event.md(); @@ -522,7 +523,7 @@ impl RawJournalWriter { buf.extend(&txn_id.to_le_bytes()); buf.extend(&ev_md.to_le_bytes()); jtrace_writer!(CommitServerEventWroteMetadata); - j.commit_buffered(&mut buf, event); + j.commit_buffered(&mut buf, event, ctx); log_file.tracked_write_through_buffer(&buf)?; } CommitPreference::Direct => { @@ -532,7 +533,7 @@ impl RawJournalWriter { log_file.tracked_write(&ev_md.to_le_bytes())?; jtrace_writer!(CommitServerEventWroteMetadata); // now hand over control to adapter impl - J::commit_direct::(j, log_file, event)?; + J::commit_direct::(j, log_file, event, ctx)?; } } jtrace_writer!(CommitServerEventAdapterCompleted); @@ -544,6 +545,16 @@ impl RawJournalWriter { Ok(()) }) } + /// Commit a new event to the journal + /// + /// This will auto-flush the buffer and sync metadata as soon as the [`RawJournalAdapter::commit`] method returns, + /// unless otherwise configured. + pub fn commit_event<'a, E: RawJournalAdapterEvent>(&mut self, event: E) -> RuntimeResult<()> + where + J::CommitContext: Default, + { + self.commit_with_ctx(event, Default::default()) + } } impl RawJournalWriter { diff --git a/server/src/engine/storage/v2/raw/journal/raw/tests.rs b/server/src/engine/storage/v2/raw/journal/raw/tests.rs index eb74464c..ff804d6a 100644 --- a/server/src/engine/storage/v2/raw/journal/raw/tests.rs +++ b/server/src/engine/storage/v2/raw/journal/raw/tests.rs @@ -134,7 +134,7 @@ impl RawJournalAdapterEvent for T { fn md(&self) -> u64 { T::OPC as _ } - fn write_buffered(self, buf: &mut Vec) { + fn write_buffered(self, buf: &mut Vec, _: ()) { T::write_buffered(self, buf) } } @@ -150,6 +150,7 @@ impl RawJournalAdapter for SimpleDBJournal { type Spec = SystemDatabaseV1; type GlobalState = SimpleDB; type EventMeta = EventMeta; + type CommitContext = (); type Context<'a> = () where Self: 'a; fn initialize(_: &JournalInitializer) -> Self { Self @@ -171,8 +172,9 @@ impl RawJournalAdapter for SimpleDBJournal { &mut self, buf: &mut Vec, event: E, + ctx: (), ) { - event.write_buffered(buf) + event.write_buffered(buf, ctx) } fn decode_apply<'a, Fs: FSInterface>( gs: &Self::GlobalState, diff --git a/server/src/engine/storage/v2/raw/journal/tests.rs b/server/src/engine/storage/v2/raw/journal/tests.rs index dcd5a3be..8eaa2e66 100644 --- a/server/src/engine/storage/v2/raw/journal/tests.rs +++ b/server/src/engine/storage/v2/raw/journal/tests.rs @@ -55,7 +55,10 @@ use { util::compiler::TaggedEnum, }, sky_macros::TaggedEnum, - std::cell::{Ref, RefCell, RefMut}, + std::{ + cell::{Ref, RefCell, RefMut}, + rc::Rc, + }, }; // event definitions @@ -99,7 +102,7 @@ impl RawJournalAdapterEvent> for fn md(&self) -> u64 { Self::EVCODE.dscr_u64() } - fn write_buffered(self, buf: &mut Vec) { + fn write_buffered(self, buf: &mut Vec, _: ()) { TE::encode(self, buf) } } @@ -296,6 +299,7 @@ impl<'a> RawJournalAdapterEvent> for BatchDBFlush<' Fs::File, as super::raw::RawJournalAdapter>::Spec, >, + ctx: Rc>, ) -> RuntimeResult<()> { // write: [expected commit][body][actual commit] // for this dummy impl, we're expecting to write the full dataset but we're going to actually write the part @@ -315,12 +319,18 @@ impl<'a> RawJournalAdapterEvent> for BatchDBFlush<' // early exit! f.dtrack_write(&[BatchEventType::EarlyExit.dscr()])?; } + ctx.borrow_mut().actual_write = actual.len(); // actual commit f.dtrack_write(&(actual.len() as u64).to_le_bytes())?; Ok(()) } } +#[derive(Debug, Default)] +pub struct BatchContext { + actual_write: usize, +} + pub struct BatchDBAdapter; impl BatchAdapterSpec for BatchDBAdapter { type Spec = ModelDataBatchAofV1; @@ -328,6 +338,7 @@ impl BatchAdapterSpec for BatchDBAdapter { type BatchType = BatchType; type EventType = BatchEventType; type BatchMetadata = (); + type CommitContext = Rc>; type BatchState = BatchState; fn initialize_batch_state(_: &Self::GlobalState) -> Self::BatchState { BatchState {