diff --git a/server/src/engine/core/model/alt.rs b/server/src/engine/core/model/alt.rs index d53c297f..483e215e 100644 --- a/server/src/engine/core/model/alt.rs +++ b/server/src/engine/core/model/alt.rs @@ -273,7 +273,10 @@ impl Model { &new_fields, ); // commit txn - global.gns_driver().lock().driver().commit_event(txn)?; + global + .gns_driver() + .lock() + .driver_context(|drv| drv.commit_event(txn), || {})?; } let mut mutator = model.model_mutator(); new_fields @@ -291,7 +294,10 @@ impl Model { &removed, ); // commit txn - global.gns_driver().lock().driver().commit_event(txn)?; + global + .gns_driver() + .lock() + .driver_context(|drv| drv.commit_event(txn), || {})?; } let mut mutator = model.model_mutator(); removed.iter().for_each(|field_id| { @@ -306,7 +312,10 @@ impl Model { &updated, ); // commit txn - global.gns_driver().lock().driver().commit_event(txn)?; + global + .gns_driver() + .lock() + .driver_context(|drv| drv.commit_event(txn), || {})?; } let mut mutator = model.model_mutator(); updated.into_iter().for_each(|(field_id, field)| { diff --git a/server/src/engine/core/model/mod.rs b/server/src/engine/core/model/mod.rs index c2ed1477..93d601c9 100644 --- a/server/src/engine/core/model/mod.rs +++ b/server/src/engine/core/model/mod.rs @@ -294,10 +294,9 @@ impl Model { model.get_uuid(), )?; // commit txn - match txn_driver.driver().commit_event(txn) { - Ok(()) => {} - Err(e) => { - // failed to commit, request cleanup + txn_driver.driver_context( + |drv| drv.commit_event(txn), + || { global.taskmgr_post_standard_priority(Task::new( GenericTask::delete_model_dir( &space_name, @@ -305,10 +304,9 @@ impl Model { &model_name, model.get_uuid(), ), - )); - return Err(e.into()); - } - } + )) + }, + )?; } // update global state let _ = space.models_mut().insert(model_name.into()); @@ -358,7 +356,10 @@ impl Model { model.delta_state().schema_current_version().value_u64(), )); // commit txn - global.gns_driver().lock().driver().commit_event(txn)?; + global + .gns_driver() + .lock() + .driver_context(|drv| drv.commit_event(txn), || {})?; // request cleanup global.purge_model_driver( space_name, diff --git a/server/src/engine/core/space.rs b/server/src/engine/core/space.rs index c61f1e7f..e70fd2cc 100644 --- a/server/src/engine/core/space.rs +++ b/server/src/engine/core/space.rs @@ -176,16 +176,14 @@ impl Space { space.get_uuid(), ))?; // commit txn - match global.gns_driver().lock().driver().commit_event(txn) { - Ok(()) => {} - Err(e) => { - // tell fractal to clean it up sometime + global.gns_driver().lock().driver_context( + |drv| drv.commit_event(txn), + || { global.taskmgr_post_standard_priority(Task::new( GenericTask::delete_space_dir(&space_name, space.get_uuid()), - )); - return Err(e.into()); - } - } + )) + }, + )?; } // update global state let _ = spaces.st_insert(space_name, space); @@ -223,7 +221,11 @@ impl Space { &patch, ); // commit - global.gns_driver().lock().driver().commit_event(txn)?; + // commit txn + global + .gns_driver() + .lock() + .driver_context(|drv| drv.commit_event(txn), || {})?; } // merge dict::rmerge_data_with_patch(space.props_mut(), patch); @@ -258,7 +260,10 @@ impl Space { let txn = txn::gns::space::DropSpaceTxn::new(SpaceIDRef::new(&space_name, &space)); // commit txn - global.gns_driver().lock().driver().commit_event(txn)?; + global + .gns_driver() + .lock() + .driver_context(|drv| drv.commit_event(txn), || {})?; // request cleanup global.taskmgr_post_standard_priority(Task::new( GenericTask::delete_space_dir(&space_name, space.get_uuid()), @@ -305,7 +310,10 @@ impl Space { let txn = txn::gns::space::DropSpaceTxn::new(SpaceIDRef::new(&space_name, &space)); // commit txn - global.gns_driver().lock().driver().commit_event(txn)?; + global + .gns_driver() + .lock() + .driver_context(|drv| drv.commit_event(txn), || {})?; // request cleanup global.taskmgr_post_standard_priority(Task::new( GenericTask::delete_space_dir(&space_name, space.get_uuid()), diff --git a/server/src/engine/core/system_db.rs b/server/src/engine/core/system_db.rs index 09e36017..a96debe8 100644 --- a/server/src/engine/core/system_db.rs +++ b/server/src/engine/core/system_db.rs @@ -131,21 +131,12 @@ impl SystemDatabase { return Err(QueryError::SysAuthError); } let password_hash = rcrypt::hash(password, rcrypt::DEFAULT_COST).unwrap(); - match global - .gns_driver() - .lock() - .driver() - .commit_event(CreateUserTxn::new(&username, &password_hash)) - { - Ok(()) => { - users.insert(username, User::new(password_hash.into_boxed_slice())); - Ok(()) - } - Err(e) => { - error!("failed to create user: {e}"); - return Err(QueryError::SysTransactionalError); - } - } + global.gns_driver().lock().driver_context( + |drv| drv.commit_event(CreateUserTxn::new(&username, &password_hash)), + || {}, + )?; + users.insert(username, User::new(password_hash.into_boxed_slice())); + Ok(()) } pub fn alter_user( &self, @@ -156,21 +147,12 @@ impl SystemDatabase { match self.users.write().get_mut(username) { Some(user) => { let password_hash = rcrypt::hash(password, rcrypt::DEFAULT_COST).unwrap(); - match global - .gns_driver() - .lock() - .driver() - .commit_event(AlterUserTxn::new(username, &password_hash)) - { - Ok(()) => { - user.phash = password_hash.into_boxed_slice(); - Ok(()) - } - Err(e) => { - error!("failed to alter user: {e}"); - Err(QueryError::SysTransactionalError) - } - } + global.gns_driver().lock().driver_context( + |drv| drv.commit_event(AlterUserTxn::new(username, &password_hash)), + || {}, + )?; + user.phash = password_hash.into_boxed_slice(); + Ok(()) } None => Err(QueryError::SysAuthError), } @@ -180,20 +162,11 @@ impl SystemDatabase { if !users.contains_key(username) { return Err(QueryError::SysAuthError); } - match global + global .gns_driver() .lock() - .driver() - .commit_event(DropUserTxn::new(username)) - { - Ok(()) => { - let _ = users.remove(username); - Ok(()) - } - Err(e) => { - error!("failed to remove user: {e}"); - Err(QueryError::SysTransactionalError) - } - } + .driver_context(|drv| drv.commit_event(DropUserTxn::new(username)), || {})?; + let _ = users.remove(username); + Ok(()) } } diff --git a/server/src/engine/fractal/drivers.rs b/server/src/engine/fractal/drivers.rs index a73a0f28..8c8f3595 100644 --- a/server/src/engine/fractal/drivers.rs +++ b/server/src/engine/fractal/drivers.rs @@ -26,9 +26,12 @@ use { super::{util, ModelUniqueID}, - crate::engine::{ - error::RuntimeResult, - storage::{safe_interfaces::FSInterface, GNSDriver, ModelDriver}, + crate::{ + engine::{ + error::{QueryError, QueryResult, RuntimeResult}, + storage::{safe_interfaces::FSInterface, GNSDriver, ModelDriver}, + }, + util::compiler, }, parking_lot::{Mutex, RwLock}, std::{collections::HashMap, sync::Arc}, @@ -36,7 +39,6 @@ use { /// GNS driver pub struct FractalGNSDriver { - #[allow(unused)] status: util::Status, pub(super) txn_driver: GNSDriver, } @@ -48,8 +50,23 @@ impl FractalGNSDriver { txn_driver: txn_driver, } } - pub fn driver(&mut self) -> &mut GNSDriver { - &mut self.txn_driver + pub fn driver_context( + &mut self, + f: impl Fn(&mut GNSDriver) -> RuntimeResult, + on_failure: impl Fn(), + ) -> QueryResult { + if self.status.is_iffy() { + return Err(QueryError::SysServerError); + } + match f(&mut self.txn_driver) { + Ok(v) => Ok(v), + Err(e) => compiler::cold_call(|| { + error!("GNS driver failed with: {e}"); + self.status.set_iffy(); + on_failure(); + Err(QueryError::SysServerError) + }), + } } } @@ -86,18 +103,20 @@ impl ModelDrivers { /// Model driver pub struct FractalModelDriver { - #[allow(unused)] - hooks: Arc, + status: Arc, batch_driver: Mutex>, } impl FractalModelDriver { pub(in crate::engine::fractal) fn init(batch_driver: ModelDriver) -> Self { Self { - hooks: Arc::new(FractalModelHooks::new()), + status: Arc::new(util::Status::new_okay()), batch_driver: Mutex::new(batch_driver), } } + pub fn status(&self) -> &util::Status { + &self.status + } /// Returns a reference to the batch persist driver pub fn batch_driver(&self) -> &Mutex> { &self.batch_driver @@ -106,13 +125,3 @@ impl FractalModelDriver { ModelDriver::close_driver(&mut self.batch_driver.into_inner()) } } - -/// Model hooks -#[derive(Debug)] -pub struct FractalModelHooks; - -impl FractalModelHooks { - fn new() -> Self { - Self - } -} diff --git a/server/src/engine/fractal/mgr.rs b/server/src/engine/fractal/mgr.rs index 01a16f95..9063a7a7 100644 --- a/server/src/engine/fractal/mgr.rs +++ b/server/src/engine/fractal/mgr.rs @@ -33,7 +33,11 @@ use { EntityIDRef, }, data::uuid::Uuid, - storage::safe_interfaces::{paths_v1, LocalFS, StdModelBatch}, + error::ErrorKind, + storage::{ + safe_interfaces::{paths_v1, LocalFS, StdModelBatch}, + BatchStats, + }, }, util::os, }, @@ -291,32 +295,39 @@ impl FractalMgr { // was way behind in the queue return; }; - let res = global._namespace().with_model( - EntityIDRef::new(model_id.space().into(), model_id.model().into()), - |model| { - if model.get_uuid() != model_id.uuid() { - // once again, throughput maximization will lead to, in extremely rare cases, this - // branch returning. but it is okay - return Ok(()); - } - Self::try_write_model_data_batch(model, observed_size, mdl_driver) - }, - ); - match res { + let mdl_read = global._namespace().idx_models().read(); + let mdl = match mdl_read.get(&EntityIDRef::new( + model_id.space().into(), + model_id.model().into(), + )) { + Some(mdl) if mdl.get_uuid() != model_id.uuid() => { + // so the model driver was not removed, neither was the model *yet* but we happened to find the task + // just return + return; + } + Some(mdl) => mdl, + None => { + panic!("found deleted model") + } + }; + match Self::try_write_model_data_batch(mdl, observed_size, mdl_driver) { Ok(()) => { if observed_size != 0 { info!("fhp: completed maintenance task for {model_id}, synced={observed_size}") } } - Err(_) => { + Err((err, stats)) => { error!( - "fhp: error writing data batch for model {}. retrying ...", + "fhp: failed to sync data deltas for model {} with {err}. retrying ...", model_id.uuid() ); // enqueue again for retrying self.hp_dispatcher .send(Task::with_threshold( - CriticalTask::WriteBatch(model_id, observed_size), + CriticalTask::WriteBatch( + model_id, + observed_size - stats.get_actual(), + ), threshold - 1, )) .unwrap(); @@ -382,23 +393,25 @@ impl FractalMgr { fn general_executor(&'static self, global: super::Global) { let mdl_drivers = global.get_state().get_mdl_drivers().drivers().read(); for (model_id, driver) in mdl_drivers.iter() { - let mut observed_len = 0; - let res = global._namespace().with_model( - EntityIDRef::new(model_id.space().into(), model_id.model().into()), - |model| { - if model.get_uuid() != model_id.uuid() { - // once again, throughput maximization will lead to, in extremely rare cases, this - // branch returning. but it is okay - return Ok(()); - } - // mark that we're taking these deltas - observed_len = model - .delta_state() - .__fractal_take_full_from_data_delta(super::FractalToken::new()); - Self::try_write_model_data_batch(model, observed_len, driver) - }, - ); - match res { + let mdl_read = global._namespace().idx_models().read(); + let mdl = match mdl_read.get(&EntityIDRef::new( + model_id.space().into(), + model_id.model().into(), + )) { + Some(mdl) if mdl.get_uuid() != model_id.uuid() => { + // so the model driver was not removed, neither was the model *yet* but we happened to find the task + // just return + return; + } + Some(mdl) => mdl, + None => { + panic!("found deleted model") + } + }; + let observed_len = mdl + .delta_state() + .__fractal_take_full_from_data_delta(super::FractalToken::new()); + match Self::try_write_model_data_batch(mdl, observed_len, driver) { Ok(()) => { if observed_len != 0 { info!( @@ -406,12 +419,13 @@ impl FractalMgr { ) } } - Err(_) => { + Err((e, stats)) => { + info!("flp: failed to sync data for {model_id} with {e}. promoting to higher priority"); // this failure is *not* good, so we want to promote this to a critical task self.hp_dispatcher .send(Task::new(CriticalTask::WriteBatch( model_id.clone(), - observed_len, + observed_len - stats.get_actual(), ))) .unwrap() } @@ -429,14 +443,31 @@ impl FractalMgr { model: &Model, observed_size: usize, mdl_driver: &super::drivers::FractalModelDriver, - ) -> crate::engine::error::QueryResult<()> { + ) -> Result<(), (super::error::Error, BatchStats)> { + if mdl_driver.status().is_iffy() { + // don't mess this up any further + return Err(( + super::error::Error::from(ErrorKind::Other( + "model driver is in dirty state".into(), + )), + BatchStats::into_inner(BatchStats::new()), + )); + } if observed_size == 0 { // no changes, all good return Ok(()); } // try flushing the batch + let batch_stats = BatchStats::new(); let mut batch_driver = mdl_driver.batch_driver().lock(); - batch_driver.commit_event(StdModelBatch::new(model, observed_size))?; - Ok(()) + batch_driver + .commit_with_ctx( + StdModelBatch::new(model, observed_size), + batch_stats.clone(), + ) + .map_err(|e| { + mdl_driver.status().set_iffy(); + (e, BatchStats::into_inner(batch_stats)) + }) } } diff --git a/server/src/engine/fractal/test_utils.rs b/server/src/engine/fractal/test_utils.rs index f78db5f9..2d9d89ae 100644 --- a/server/src/engine/fractal/test_utils.rs +++ b/server/src/engine/fractal/test_utils.rs @@ -36,7 +36,7 @@ use { fractal::drivers::FractalModelDriver, storage::{ safe_interfaces::{paths_v1, FSInterface, NullFS, StdModelBatch, VirtualFS}, - GNSDriver, ModelDriver, + BatchStats, GNSDriver, ModelDriver, }, RuntimeResult, }, @@ -152,7 +152,7 @@ impl GlobalInstanceLike for TestGlobal { .unwrap() .batch_driver() .lock() - .commit_event(StdModelBatch::new(mdl, count)) + .commit_with_ctx(StdModelBatch::new(mdl, count), BatchStats::new()) .unwrap(); } } diff --git a/server/src/engine/storage/mod.rs b/server/src/engine/storage/mod.rs index b0ad9bce..4e96f357 100644 --- a/server/src/engine/storage/mod.rs +++ b/server/src/engine/storage/mod.rs @@ -57,7 +57,10 @@ pub mod safe_interfaces { loader impl */ -pub use v2::impls::{gns_log::GNSDriver, mdl_journal::ModelDriver}; +pub use v2::impls::{ + gns_log::GNSDriver, + mdl_journal::{BatchStats, ModelDriver}, +}; pub struct SELoaded { pub gns: GlobalNS, diff --git a/server/src/engine/storage/v2/impls/gns_log.rs b/server/src/engine/storage/v2/impls/gns_log.rs index 9df2cd81..d7adec67 100644 --- a/server/src/engine/storage/v2/impls/gns_log.rs +++ b/server/src/engine/storage/v2/impls/gns_log.rs @@ -107,7 +107,7 @@ impl JournalAdapterEvent> for T { fn md(&self) -> u64 { ::CODE.dscr_u64() } - fn write_buffered(self, b: &mut Vec) { + fn write_buffered(self, b: &mut Vec, _: ()) { T::encode_event(self, b) } } diff --git a/server/src/engine/storage/v2/impls/mdl_journal.rs b/server/src/engine/storage/v2/impls/mdl_journal.rs index d833dbf0..3f26fd79 100644 --- a/server/src/engine/storage/v2/impls/mdl_journal.rs +++ b/server/src/engine/storage/v2/impls/mdl_journal.rs @@ -60,7 +60,11 @@ use { }, crossbeam_epoch::{pin, Guard}, sky_macros::TaggedEnum, - std::collections::{hash_map::Entry as HMEntry, HashMap}, + std::{ + cell::RefCell, + collections::{hash_map::Entry as HMEntry, HashMap}, + rc::Rc, + }, }; pub type ModelDriver = BatchDriver; @@ -216,6 +220,7 @@ impl<'a, 'b, Fs: FSInterface> BatchWriter<'a, 'b, Fs> { Fs::File, as RawJournalAdapter>::Spec, >, + batch_stat: &mut BatchStats, ) -> RuntimeResult { /* go over each delta, check if inconsistent and apply if not. if any delta sync fails, we enqueue the delta again. @@ -237,6 +242,7 @@ impl<'a, 'b, Fs: FSInterface> BatchWriter<'a, 'b, Fs> { Err(e) => { // errored, so push this back in; we have written and flushed all prior deltas me.model.delta_state().append_new_data_delta(delta, me.g); + batch_stat.set_actual(i); return Err(e); } } @@ -310,11 +316,13 @@ impl<'a> JournalAdapterEvent> for StdModelBatch<' Fs::File, as RawJournalAdapter>::Spec, >, + ctx: Rc>, ) -> RuntimeResult<()> { // [expected commit] writer.dtrack_write(&self.1.u64_bytes_le())?; let g = pin(); - let actual_commit = BatchWriter::::write_batch(self.0, &g, self.1, writer)?; + let actual_commit = + BatchWriter::::write_batch(self.0, &g, self.1, writer, &mut ctx.borrow_mut())?; if actual_commit != self.1 { // early exit writer.dtrack_write(&[EventType::EarlyExit.dscr()])?; @@ -341,6 +349,7 @@ impl<'a> JournalAdapterEvent> for FullModel<'a> { Fs::File, as RawJournalAdapter>::Spec, >, + _: Rc>, ) -> RuntimeResult<()> { let g = pin(); let mut row_writer: RowWriter<'_, Fs> = RowWriter { f }; @@ -410,6 +419,25 @@ impl DecodedBatchEvent { } } +pub struct BatchStats { + actual_commit: usize, +} + +impl BatchStats { + pub fn new() -> Rc> { + Rc::new(RefCell::new(Self { actual_commit: 0 })) + } + pub fn into_inner(me: Rc>) -> Self { + RefCell::into_inner(Rc::into_inner(me).unwrap()) + } + fn set_actual(&mut self, new: usize) { + self.actual_commit = new; + } + pub fn get_actual(&self) -> usize { + self.actual_commit + } +} + impl BatchAdapterSpec for ModelDataAdapter { type Spec = ModelDataBatchAofV1; type GlobalState = Model; @@ -417,6 +445,7 @@ impl BatchAdapterSpec for ModelDataAdapter { type EventType = EventType; type BatchMetadata = BatchMetadata; type BatchState = BatchRestoreState; + type CommitContext = Rc>; fn is_early_exit(event_type: &Self::EventType) -> bool { EventType::EarlyExit.eq(event_type) } diff --git a/server/src/engine/storage/v2/mod.rs b/server/src/engine/storage/v2/mod.rs index 65c8bf6f..45859500 100644 --- a/server/src/engine/storage/v2/mod.rs +++ b/server/src/engine/storage/v2/mod.rs @@ -25,7 +25,7 @@ */ use { - self::impls::mdl_journal::FullModel, + self::impls::mdl_journal::{BatchStats, FullModel}, super::{ common::interface::{fs_imp::LocalFS, fs_traits::FSInterface}, v1, SELoaded, @@ -88,7 +88,7 @@ pub fn recreate(gns: GlobalNS) -> RuntimeResult { model_id.entity(), model, ))?; - model_driver.commit_event(FullModel::new(model))?; + model_driver.commit_with_ctx(FullModel::new(model), BatchStats::new())?; model_drivers.add_driver( ModelUniqueID::new(model_id.space(), model_id.entity(), model.get_uuid()), model_driver, diff --git a/server/src/engine/storage/v2/raw/journal/mod.rs b/server/src/engine/storage/v2/raw/journal/mod.rs index 8f11a222..59307f88 100644 --- a/server/src/engine/storage/v2/raw/journal/mod.rs +++ b/server/src/engine/storage/v2/raw/journal/mod.rs @@ -92,6 +92,7 @@ impl RawJournalAdapter for EventLogAdapter { type GlobalState = ::GlobalState; type Context<'a> = () where Self: 'a; type EventMeta = ::EventMeta; + type CommitContext = (); fn initialize(_: &raw::JournalInitializer) -> Self { Self(PhantomData) } @@ -106,12 +107,13 @@ impl RawJournalAdapter for EventLogAdapter { &mut self, w: &mut TrackedWriter, ev: E, + ctx: (), ) -> RuntimeResult<()> where E: RawJournalAdapterEvent, { let mut pl = vec![]; - ev.write_buffered(&mut pl); + ev.write_buffered(&mut pl, ctx); let plen = (pl.len() as u64).to_le_bytes(); let mut checksum = SCrc64::new(); checksum.update(&plen); @@ -207,6 +209,8 @@ pub trait BatchAdapterSpec { type BatchMetadata; /// batch state type BatchState; + /// commit context + type CommitContext; /// return true if the given event tag indicates an early exit fn is_early_exit(event_type: &Self::EventType) -> bool; /// initialize the batch state @@ -245,6 +249,7 @@ impl RawJournalAdapter for BatchAdapter { type GlobalState = ::GlobalState; type Context<'a> = () where Self: 'a; type EventMeta = ::BatchType; + type CommitContext = ::CommitContext; fn initialize(_: &raw::JournalInitializer) -> Self { Self(PhantomData) } @@ -259,11 +264,12 @@ impl RawJournalAdapter for BatchAdapter { &mut self, w: &mut TrackedWriter, ev: E, + ctx: Self::CommitContext, ) -> RuntimeResult<()> where E: RawJournalAdapterEvent, { - ev.write_direct::(w)?; + ev.write_direct::(w, ctx)?; let checksum = w.reset_partial(); w.tracked_write(&checksum.to_le_bytes()) } diff --git a/server/src/engine/storage/v2/raw/journal/raw/mod.rs b/server/src/engine/storage/v2/raw/journal/raw/mod.rs index c28cf2db..5a853b55 100644 --- a/server/src/engine/storage/v2/raw/journal/raw/mod.rs +++ b/server/src/engine/storage/v2/raw/journal/raw/mod.rs @@ -215,10 +215,11 @@ pub trait RawJournalAdapterEvent: Sized { fn write_direct( self, _: &mut TrackedWriter::Spec>, + _: ::CommitContext, ) -> RuntimeResult<()> { unimplemented!() } - fn write_buffered(self, _: &mut Vec) { + fn write_buffered<'a>(self, _: &mut Vec, _: ::CommitContext) { unimplemented!() } } @@ -239,6 +240,8 @@ pub trait RawJournalAdapter: Sized { type Context<'a> where Self: 'a; + /// any external context to use during commit. can be used by events + type CommitContext; /// a type representing the event kind type EventMeta; /// initialize this adapter @@ -250,10 +253,11 @@ pub trait RawJournalAdapter: Sized { /// parse event metadata fn parse_event_meta(meta: u64) -> Option; /// commit event (direct preference) - fn commit_direct<'a, Fs: FSInterface, E>( + fn commit_direct( &mut self, _: &mut TrackedWriter, _: E, + _: Self::CommitContext, ) -> RuntimeResult<()> where E: RawJournalAdapterEvent, @@ -261,7 +265,7 @@ pub trait RawJournalAdapter: Sized { unimplemented!() } /// commit event (buffered) - fn commit_buffered<'a, E>(&mut self, _: &mut Vec, _: E) + fn commit_buffered(&mut self, _: &mut Vec, _: E, _: Self::CommitContext) where E: RawJournalAdapterEvent, { @@ -499,13 +503,10 @@ impl RawJournalWriter { } Ok(me) } - /// Commit a new event to the journal - /// - /// This will auto-flush the buffer and sync metadata as soon as the [`RawJournalAdapter::commit`] method returns, - /// unless otherwise configured. - pub fn commit_event<'a, E: RawJournalAdapterEvent>( + pub fn commit_with_ctx<'a, E: RawJournalAdapterEvent>( &mut self, event: E, + ctx: J::CommitContext, ) -> RuntimeResult<()> { self.txn_context(|me, txn_id| { let ev_md = event.md(); @@ -522,7 +523,7 @@ impl RawJournalWriter { buf.extend(&txn_id.to_le_bytes()); buf.extend(&ev_md.to_le_bytes()); jtrace_writer!(CommitServerEventWroteMetadata); - j.commit_buffered(&mut buf, event); + j.commit_buffered(&mut buf, event, ctx); log_file.tracked_write_through_buffer(&buf)?; } CommitPreference::Direct => { @@ -532,7 +533,7 @@ impl RawJournalWriter { log_file.tracked_write(&ev_md.to_le_bytes())?; jtrace_writer!(CommitServerEventWroteMetadata); // now hand over control to adapter impl - J::commit_direct::(j, log_file, event)?; + J::commit_direct::(j, log_file, event, ctx)?; } } jtrace_writer!(CommitServerEventAdapterCompleted); @@ -544,6 +545,16 @@ impl RawJournalWriter { Ok(()) }) } + /// Commit a new event to the journal + /// + /// This will auto-flush the buffer and sync metadata as soon as the [`RawJournalAdapter::commit`] method returns, + /// unless otherwise configured. + pub fn commit_event<'a, E: RawJournalAdapterEvent>(&mut self, event: E) -> RuntimeResult<()> + where + J::CommitContext: Default, + { + self.commit_with_ctx(event, Default::default()) + } } impl RawJournalWriter { diff --git a/server/src/engine/storage/v2/raw/journal/raw/tests.rs b/server/src/engine/storage/v2/raw/journal/raw/tests.rs index eb74464c..ff804d6a 100644 --- a/server/src/engine/storage/v2/raw/journal/raw/tests.rs +++ b/server/src/engine/storage/v2/raw/journal/raw/tests.rs @@ -134,7 +134,7 @@ impl RawJournalAdapterEvent for T { fn md(&self) -> u64 { T::OPC as _ } - fn write_buffered(self, buf: &mut Vec) { + fn write_buffered(self, buf: &mut Vec, _: ()) { T::write_buffered(self, buf) } } @@ -150,6 +150,7 @@ impl RawJournalAdapter for SimpleDBJournal { type Spec = SystemDatabaseV1; type GlobalState = SimpleDB; type EventMeta = EventMeta; + type CommitContext = (); type Context<'a> = () where Self: 'a; fn initialize(_: &JournalInitializer) -> Self { Self @@ -171,8 +172,9 @@ impl RawJournalAdapter for SimpleDBJournal { &mut self, buf: &mut Vec, event: E, + ctx: (), ) { - event.write_buffered(buf) + event.write_buffered(buf, ctx) } fn decode_apply<'a, Fs: FSInterface>( gs: &Self::GlobalState, diff --git a/server/src/engine/storage/v2/raw/journal/tests.rs b/server/src/engine/storage/v2/raw/journal/tests.rs index dcd5a3be..8eaa2e66 100644 --- a/server/src/engine/storage/v2/raw/journal/tests.rs +++ b/server/src/engine/storage/v2/raw/journal/tests.rs @@ -55,7 +55,10 @@ use { util::compiler::TaggedEnum, }, sky_macros::TaggedEnum, - std::cell::{Ref, RefCell, RefMut}, + std::{ + cell::{Ref, RefCell, RefMut}, + rc::Rc, + }, }; // event definitions @@ -99,7 +102,7 @@ impl RawJournalAdapterEvent> for fn md(&self) -> u64 { Self::EVCODE.dscr_u64() } - fn write_buffered(self, buf: &mut Vec) { + fn write_buffered(self, buf: &mut Vec, _: ()) { TE::encode(self, buf) } } @@ -296,6 +299,7 @@ impl<'a> RawJournalAdapterEvent> for BatchDBFlush<' Fs::File, as super::raw::RawJournalAdapter>::Spec, >, + ctx: Rc>, ) -> RuntimeResult<()> { // write: [expected commit][body][actual commit] // for this dummy impl, we're expecting to write the full dataset but we're going to actually write the part @@ -315,12 +319,18 @@ impl<'a> RawJournalAdapterEvent> for BatchDBFlush<' // early exit! f.dtrack_write(&[BatchEventType::EarlyExit.dscr()])?; } + ctx.borrow_mut().actual_write = actual.len(); // actual commit f.dtrack_write(&(actual.len() as u64).to_le_bytes())?; Ok(()) } } +#[derive(Debug, Default)] +pub struct BatchContext { + actual_write: usize, +} + pub struct BatchDBAdapter; impl BatchAdapterSpec for BatchDBAdapter { type Spec = ModelDataBatchAofV1; @@ -328,6 +338,7 @@ impl BatchAdapterSpec for BatchDBAdapter { type BatchType = BatchType; type EventType = BatchEventType; type BatchMetadata = (); + type CommitContext = Rc>; type BatchState = BatchState; fn initialize_batch_state(_: &Self::GlobalState) -> Self::BatchState { BatchState {