Use driver contexts and improve model data batch recovery

This implementation revises the way we handle errors for both data
delta sync and that of direct driver events.

For the GNS: we now check the instance state before doing anything
at all. This is much better because we don't want to aggravate the
on-disk situation which is probably already in a very bad layout.

The same for the model driver: we check the instance's state before
doing anything at all. We also utilized a driver state to check
what was synced, following the changes that were made in an earlier
commit (skytable/skytable@e28d94e) where we only load the delta
that failed to sync back to the model's delta state.
next
Sayan Nandan 7 months ago
parent e28d94efba
commit f7c6ba1fb0
No known key found for this signature in database
GPG Key ID: 0EBD769024B24F0A

@ -273,7 +273,10 @@ impl Model {
&new_fields, &new_fields,
); );
// commit txn // commit txn
global.gns_driver().lock().driver().commit_event(txn)?; global
.gns_driver()
.lock()
.driver_context(|drv| drv.commit_event(txn), || {})?;
} }
let mut mutator = model.model_mutator(); let mut mutator = model.model_mutator();
new_fields new_fields
@ -291,7 +294,10 @@ impl Model {
&removed, &removed,
); );
// commit txn // commit txn
global.gns_driver().lock().driver().commit_event(txn)?; global
.gns_driver()
.lock()
.driver_context(|drv| drv.commit_event(txn), || {})?;
} }
let mut mutator = model.model_mutator(); let mut mutator = model.model_mutator();
removed.iter().for_each(|field_id| { removed.iter().for_each(|field_id| {
@ -306,7 +312,10 @@ impl Model {
&updated, &updated,
); );
// commit txn // commit txn
global.gns_driver().lock().driver().commit_event(txn)?; global
.gns_driver()
.lock()
.driver_context(|drv| drv.commit_event(txn), || {})?;
} }
let mut mutator = model.model_mutator(); let mut mutator = model.model_mutator();
updated.into_iter().for_each(|(field_id, field)| { updated.into_iter().for_each(|(field_id, field)| {

@ -294,10 +294,9 @@ impl Model {
model.get_uuid(), model.get_uuid(),
)?; )?;
// commit txn // commit txn
match txn_driver.driver().commit_event(txn) { txn_driver.driver_context(
Ok(()) => {} |drv| drv.commit_event(txn),
Err(e) => { || {
// failed to commit, request cleanup
global.taskmgr_post_standard_priority(Task::new( global.taskmgr_post_standard_priority(Task::new(
GenericTask::delete_model_dir( GenericTask::delete_model_dir(
&space_name, &space_name,
@ -305,10 +304,9 @@ impl Model {
&model_name, &model_name,
model.get_uuid(), model.get_uuid(),
), ),
)); ))
return Err(e.into()); },
} )?;
}
} }
// update global state // update global state
let _ = space.models_mut().insert(model_name.into()); let _ = space.models_mut().insert(model_name.into());
@ -358,7 +356,10 @@ impl Model {
model.delta_state().schema_current_version().value_u64(), model.delta_state().schema_current_version().value_u64(),
)); ));
// commit txn // commit txn
global.gns_driver().lock().driver().commit_event(txn)?; global
.gns_driver()
.lock()
.driver_context(|drv| drv.commit_event(txn), || {})?;
// request cleanup // request cleanup
global.purge_model_driver( global.purge_model_driver(
space_name, space_name,

@ -176,16 +176,14 @@ impl Space {
space.get_uuid(), space.get_uuid(),
))?; ))?;
// commit txn // commit txn
match global.gns_driver().lock().driver().commit_event(txn) { global.gns_driver().lock().driver_context(
Ok(()) => {} |drv| drv.commit_event(txn),
Err(e) => { || {
// tell fractal to clean it up sometime
global.taskmgr_post_standard_priority(Task::new( global.taskmgr_post_standard_priority(Task::new(
GenericTask::delete_space_dir(&space_name, space.get_uuid()), GenericTask::delete_space_dir(&space_name, space.get_uuid()),
)); ))
return Err(e.into()); },
} )?;
}
} }
// update global state // update global state
let _ = spaces.st_insert(space_name, space); let _ = spaces.st_insert(space_name, space);
@ -223,7 +221,11 @@ impl Space {
&patch, &patch,
); );
// commit // commit
global.gns_driver().lock().driver().commit_event(txn)?; // commit txn
global
.gns_driver()
.lock()
.driver_context(|drv| drv.commit_event(txn), || {})?;
} }
// merge // merge
dict::rmerge_data_with_patch(space.props_mut(), patch); dict::rmerge_data_with_patch(space.props_mut(), patch);
@ -258,7 +260,10 @@ impl Space {
let txn = let txn =
txn::gns::space::DropSpaceTxn::new(SpaceIDRef::new(&space_name, &space)); txn::gns::space::DropSpaceTxn::new(SpaceIDRef::new(&space_name, &space));
// commit txn // commit txn
global.gns_driver().lock().driver().commit_event(txn)?; global
.gns_driver()
.lock()
.driver_context(|drv| drv.commit_event(txn), || {})?;
// request cleanup // request cleanup
global.taskmgr_post_standard_priority(Task::new( global.taskmgr_post_standard_priority(Task::new(
GenericTask::delete_space_dir(&space_name, space.get_uuid()), GenericTask::delete_space_dir(&space_name, space.get_uuid()),
@ -305,7 +310,10 @@ impl Space {
let txn = let txn =
txn::gns::space::DropSpaceTxn::new(SpaceIDRef::new(&space_name, &space)); txn::gns::space::DropSpaceTxn::new(SpaceIDRef::new(&space_name, &space));
// commit txn // commit txn
global.gns_driver().lock().driver().commit_event(txn)?; global
.gns_driver()
.lock()
.driver_context(|drv| drv.commit_event(txn), || {})?;
// request cleanup // request cleanup
global.taskmgr_post_standard_priority(Task::new( global.taskmgr_post_standard_priority(Task::new(
GenericTask::delete_space_dir(&space_name, space.get_uuid()), GenericTask::delete_space_dir(&space_name, space.get_uuid()),

@ -131,21 +131,12 @@ impl SystemDatabase {
return Err(QueryError::SysAuthError); return Err(QueryError::SysAuthError);
} }
let password_hash = rcrypt::hash(password, rcrypt::DEFAULT_COST).unwrap(); let password_hash = rcrypt::hash(password, rcrypt::DEFAULT_COST).unwrap();
match global global.gns_driver().lock().driver_context(
.gns_driver() |drv| drv.commit_event(CreateUserTxn::new(&username, &password_hash)),
.lock() || {},
.driver() )?;
.commit_event(CreateUserTxn::new(&username, &password_hash)) users.insert(username, User::new(password_hash.into_boxed_slice()));
{ Ok(())
Ok(()) => {
users.insert(username, User::new(password_hash.into_boxed_slice()));
Ok(())
}
Err(e) => {
error!("failed to create user: {e}");
return Err(QueryError::SysTransactionalError);
}
}
} }
pub fn alter_user( pub fn alter_user(
&self, &self,
@ -156,21 +147,12 @@ impl SystemDatabase {
match self.users.write().get_mut(username) { match self.users.write().get_mut(username) {
Some(user) => { Some(user) => {
let password_hash = rcrypt::hash(password, rcrypt::DEFAULT_COST).unwrap(); let password_hash = rcrypt::hash(password, rcrypt::DEFAULT_COST).unwrap();
match global global.gns_driver().lock().driver_context(
.gns_driver() |drv| drv.commit_event(AlterUserTxn::new(username, &password_hash)),
.lock() || {},
.driver() )?;
.commit_event(AlterUserTxn::new(username, &password_hash)) user.phash = password_hash.into_boxed_slice();
{ Ok(())
Ok(()) => {
user.phash = password_hash.into_boxed_slice();
Ok(())
}
Err(e) => {
error!("failed to alter user: {e}");
Err(QueryError::SysTransactionalError)
}
}
} }
None => Err(QueryError::SysAuthError), None => Err(QueryError::SysAuthError),
} }
@ -180,20 +162,11 @@ impl SystemDatabase {
if !users.contains_key(username) { if !users.contains_key(username) {
return Err(QueryError::SysAuthError); return Err(QueryError::SysAuthError);
} }
match global global
.gns_driver() .gns_driver()
.lock() .lock()
.driver() .driver_context(|drv| drv.commit_event(DropUserTxn::new(username)), || {})?;
.commit_event(DropUserTxn::new(username)) let _ = users.remove(username);
{ Ok(())
Ok(()) => {
let _ = users.remove(username);
Ok(())
}
Err(e) => {
error!("failed to remove user: {e}");
Err(QueryError::SysTransactionalError)
}
}
} }
} }

@ -26,9 +26,12 @@
use { use {
super::{util, ModelUniqueID}, super::{util, ModelUniqueID},
crate::engine::{ crate::{
error::RuntimeResult, engine::{
storage::{safe_interfaces::FSInterface, GNSDriver, ModelDriver}, error::{QueryError, QueryResult, RuntimeResult},
storage::{safe_interfaces::FSInterface, GNSDriver, ModelDriver},
},
util::compiler,
}, },
parking_lot::{Mutex, RwLock}, parking_lot::{Mutex, RwLock},
std::{collections::HashMap, sync::Arc}, std::{collections::HashMap, sync::Arc},
@ -36,7 +39,6 @@ use {
/// GNS driver /// GNS driver
pub struct FractalGNSDriver<Fs: FSInterface> { pub struct FractalGNSDriver<Fs: FSInterface> {
#[allow(unused)]
status: util::Status, status: util::Status,
pub(super) txn_driver: GNSDriver<Fs>, pub(super) txn_driver: GNSDriver<Fs>,
} }
@ -48,8 +50,23 @@ impl<Fs: FSInterface> FractalGNSDriver<Fs> {
txn_driver: txn_driver, txn_driver: txn_driver,
} }
} }
pub fn driver(&mut self) -> &mut GNSDriver<Fs> { pub fn driver_context<T>(
&mut self.txn_driver &mut self,
f: impl Fn(&mut GNSDriver<Fs>) -> RuntimeResult<T>,
on_failure: impl Fn(),
) -> QueryResult<T> {
if self.status.is_iffy() {
return Err(QueryError::SysServerError);
}
match f(&mut self.txn_driver) {
Ok(v) => Ok(v),
Err(e) => compiler::cold_call(|| {
error!("GNS driver failed with: {e}");
self.status.set_iffy();
on_failure();
Err(QueryError::SysServerError)
}),
}
} }
} }
@ -86,18 +103,20 @@ impl<Fs: FSInterface> ModelDrivers<Fs> {
/// Model driver /// Model driver
pub struct FractalModelDriver<Fs: FSInterface> { pub struct FractalModelDriver<Fs: FSInterface> {
#[allow(unused)] status: Arc<util::Status>,
hooks: Arc<FractalModelHooks>,
batch_driver: Mutex<ModelDriver<Fs>>, batch_driver: Mutex<ModelDriver<Fs>>,
} }
impl<Fs: FSInterface> FractalModelDriver<Fs> { impl<Fs: FSInterface> FractalModelDriver<Fs> {
pub(in crate::engine::fractal) fn init(batch_driver: ModelDriver<Fs>) -> Self { pub(in crate::engine::fractal) fn init(batch_driver: ModelDriver<Fs>) -> Self {
Self { Self {
hooks: Arc::new(FractalModelHooks::new()), status: Arc::new(util::Status::new_okay()),
batch_driver: Mutex::new(batch_driver), batch_driver: Mutex::new(batch_driver),
} }
} }
pub fn status(&self) -> &util::Status {
&self.status
}
/// Returns a reference to the batch persist driver /// Returns a reference to the batch persist driver
pub fn batch_driver(&self) -> &Mutex<ModelDriver<Fs>> { pub fn batch_driver(&self) -> &Mutex<ModelDriver<Fs>> {
&self.batch_driver &self.batch_driver
@ -106,13 +125,3 @@ impl<Fs: FSInterface> FractalModelDriver<Fs> {
ModelDriver::close_driver(&mut self.batch_driver.into_inner()) ModelDriver::close_driver(&mut self.batch_driver.into_inner())
} }
} }
/// Model hooks
#[derive(Debug)]
pub struct FractalModelHooks;
impl FractalModelHooks {
fn new() -> Self {
Self
}
}

@ -33,7 +33,11 @@ use {
EntityIDRef, EntityIDRef,
}, },
data::uuid::Uuid, data::uuid::Uuid,
storage::safe_interfaces::{paths_v1, LocalFS, StdModelBatch}, error::ErrorKind,
storage::{
safe_interfaces::{paths_v1, LocalFS, StdModelBatch},
BatchStats,
},
}, },
util::os, util::os,
}, },
@ -291,32 +295,39 @@ impl FractalMgr {
// was way behind in the queue // was way behind in the queue
return; return;
}; };
let res = global._namespace().with_model( let mdl_read = global._namespace().idx_models().read();
EntityIDRef::new(model_id.space().into(), model_id.model().into()), let mdl = match mdl_read.get(&EntityIDRef::new(
|model| { model_id.space().into(),
if model.get_uuid() != model_id.uuid() { model_id.model().into(),
// once again, throughput maximization will lead to, in extremely rare cases, this )) {
// branch returning. but it is okay Some(mdl) if mdl.get_uuid() != model_id.uuid() => {
return Ok(()); // so the model driver was not removed, neither was the model *yet* but we happened to find the task
} // just return
Self::try_write_model_data_batch(model, observed_size, mdl_driver) return;
}, }
); Some(mdl) => mdl,
match res { None => {
panic!("found deleted model")
}
};
match Self::try_write_model_data_batch(mdl, observed_size, mdl_driver) {
Ok(()) => { Ok(()) => {
if observed_size != 0 { if observed_size != 0 {
info!("fhp: completed maintenance task for {model_id}, synced={observed_size}") info!("fhp: completed maintenance task for {model_id}, synced={observed_size}")
} }
} }
Err(_) => { Err((err, stats)) => {
error!( error!(
"fhp: error writing data batch for model {}. retrying ...", "fhp: failed to sync data deltas for model {} with {err}. retrying ...",
model_id.uuid() model_id.uuid()
); );
// enqueue again for retrying // enqueue again for retrying
self.hp_dispatcher self.hp_dispatcher
.send(Task::with_threshold( .send(Task::with_threshold(
CriticalTask::WriteBatch(model_id, observed_size), CriticalTask::WriteBatch(
model_id,
observed_size - stats.get_actual(),
),
threshold - 1, threshold - 1,
)) ))
.unwrap(); .unwrap();
@ -382,23 +393,25 @@ impl FractalMgr {
fn general_executor(&'static self, global: super::Global) { fn general_executor(&'static self, global: super::Global) {
let mdl_drivers = global.get_state().get_mdl_drivers().drivers().read(); let mdl_drivers = global.get_state().get_mdl_drivers().drivers().read();
for (model_id, driver) in mdl_drivers.iter() { for (model_id, driver) in mdl_drivers.iter() {
let mut observed_len = 0; let mdl_read = global._namespace().idx_models().read();
let res = global._namespace().with_model( let mdl = match mdl_read.get(&EntityIDRef::new(
EntityIDRef::new(model_id.space().into(), model_id.model().into()), model_id.space().into(),
|model| { model_id.model().into(),
if model.get_uuid() != model_id.uuid() { )) {
// once again, throughput maximization will lead to, in extremely rare cases, this Some(mdl) if mdl.get_uuid() != model_id.uuid() => {
// branch returning. but it is okay // so the model driver was not removed, neither was the model *yet* but we happened to find the task
return Ok(()); // just return
} return;
// mark that we're taking these deltas }
observed_len = model Some(mdl) => mdl,
.delta_state() None => {
.__fractal_take_full_from_data_delta(super::FractalToken::new()); panic!("found deleted model")
Self::try_write_model_data_batch(model, observed_len, driver) }
}, };
); let observed_len = mdl
match res { .delta_state()
.__fractal_take_full_from_data_delta(super::FractalToken::new());
match Self::try_write_model_data_batch(mdl, observed_len, driver) {
Ok(()) => { Ok(()) => {
if observed_len != 0 { if observed_len != 0 {
info!( info!(
@ -406,12 +419,13 @@ impl FractalMgr {
) )
} }
} }
Err(_) => { Err((e, stats)) => {
info!("flp: failed to sync data for {model_id} with {e}. promoting to higher priority");
// this failure is *not* good, so we want to promote this to a critical task // this failure is *not* good, so we want to promote this to a critical task
self.hp_dispatcher self.hp_dispatcher
.send(Task::new(CriticalTask::WriteBatch( .send(Task::new(CriticalTask::WriteBatch(
model_id.clone(), model_id.clone(),
observed_len, observed_len - stats.get_actual(),
))) )))
.unwrap() .unwrap()
} }
@ -429,14 +443,31 @@ impl FractalMgr {
model: &Model, model: &Model,
observed_size: usize, observed_size: usize,
mdl_driver: &super::drivers::FractalModelDriver<LocalFS>, mdl_driver: &super::drivers::FractalModelDriver<LocalFS>,
) -> crate::engine::error::QueryResult<()> { ) -> Result<(), (super::error::Error, BatchStats)> {
if mdl_driver.status().is_iffy() {
// don't mess this up any further
return Err((
super::error::Error::from(ErrorKind::Other(
"model driver is in dirty state".into(),
)),
BatchStats::into_inner(BatchStats::new()),
));
}
if observed_size == 0 { if observed_size == 0 {
// no changes, all good // no changes, all good
return Ok(()); return Ok(());
} }
// try flushing the batch // try flushing the batch
let batch_stats = BatchStats::new();
let mut batch_driver = mdl_driver.batch_driver().lock(); let mut batch_driver = mdl_driver.batch_driver().lock();
batch_driver.commit_event(StdModelBatch::new(model, observed_size))?; batch_driver
Ok(()) .commit_with_ctx(
StdModelBatch::new(model, observed_size),
batch_stats.clone(),
)
.map_err(|e| {
mdl_driver.status().set_iffy();
(e, BatchStats::into_inner(batch_stats))
})
} }
} }

@ -36,7 +36,7 @@ use {
fractal::drivers::FractalModelDriver, fractal::drivers::FractalModelDriver,
storage::{ storage::{
safe_interfaces::{paths_v1, FSInterface, NullFS, StdModelBatch, VirtualFS}, safe_interfaces::{paths_v1, FSInterface, NullFS, StdModelBatch, VirtualFS},
GNSDriver, ModelDriver, BatchStats, GNSDriver, ModelDriver,
}, },
RuntimeResult, RuntimeResult,
}, },
@ -152,7 +152,7 @@ impl<Fs: FSInterface> GlobalInstanceLike for TestGlobal<Fs> {
.unwrap() .unwrap()
.batch_driver() .batch_driver()
.lock() .lock()
.commit_event(StdModelBatch::new(mdl, count)) .commit_with_ctx(StdModelBatch::new(mdl, count), BatchStats::new())
.unwrap(); .unwrap();
} }
} }

@ -57,7 +57,10 @@ pub mod safe_interfaces {
loader impl loader impl
*/ */
pub use v2::impls::{gns_log::GNSDriver, mdl_journal::ModelDriver}; pub use v2::impls::{
gns_log::GNSDriver,
mdl_journal::{BatchStats, ModelDriver},
};
pub struct SELoaded { pub struct SELoaded {
pub gns: GlobalNS, pub gns: GlobalNS,

@ -107,7 +107,7 @@ impl<T: GNSEvent> JournalAdapterEvent<EventLogAdapter<GNSEventLog>> for T {
fn md(&self) -> u64 { fn md(&self) -> u64 {
<T as GNSTransaction>::CODE.dscr_u64() <T as GNSTransaction>::CODE.dscr_u64()
} }
fn write_buffered(self, b: &mut Vec<u8>) { fn write_buffered(self, b: &mut Vec<u8>, _: ()) {
T::encode_event(self, b) T::encode_event(self, b)
} }
} }

@ -60,7 +60,11 @@ use {
}, },
crossbeam_epoch::{pin, Guard}, crossbeam_epoch::{pin, Guard},
sky_macros::TaggedEnum, sky_macros::TaggedEnum,
std::collections::{hash_map::Entry as HMEntry, HashMap}, std::{
cell::RefCell,
collections::{hash_map::Entry as HMEntry, HashMap},
rc::Rc,
},
}; };
pub type ModelDriver<Fs> = BatchDriver<ModelDataAdapter, Fs>; pub type ModelDriver<Fs> = BatchDriver<ModelDataAdapter, Fs>;
@ -216,6 +220,7 @@ impl<'a, 'b, Fs: FSInterface> BatchWriter<'a, 'b, Fs> {
Fs::File, Fs::File,
<BatchAdapter<ModelDataAdapter> as RawJournalAdapter>::Spec, <BatchAdapter<ModelDataAdapter> as RawJournalAdapter>::Spec,
>, >,
batch_stat: &mut BatchStats,
) -> RuntimeResult<usize> { ) -> RuntimeResult<usize> {
/* /*
go over each delta, check if inconsistent and apply if not. if any delta sync fails, we enqueue the delta again. go over each delta, check if inconsistent and apply if not. if any delta sync fails, we enqueue the delta again.
@ -237,6 +242,7 @@ impl<'a, 'b, Fs: FSInterface> BatchWriter<'a, 'b, Fs> {
Err(e) => { Err(e) => {
// errored, so push this back in; we have written and flushed all prior deltas // errored, so push this back in; we have written and flushed all prior deltas
me.model.delta_state().append_new_data_delta(delta, me.g); me.model.delta_state().append_new_data_delta(delta, me.g);
batch_stat.set_actual(i);
return Err(e); return Err(e);
} }
} }
@ -310,11 +316,13 @@ impl<'a> JournalAdapterEvent<BatchAdapter<ModelDataAdapter>> for StdModelBatch<'
Fs::File, Fs::File,
<BatchAdapter<ModelDataAdapter> as RawJournalAdapter>::Spec, <BatchAdapter<ModelDataAdapter> as RawJournalAdapter>::Spec,
>, >,
ctx: Rc<RefCell<BatchStats>>,
) -> RuntimeResult<()> { ) -> RuntimeResult<()> {
// [expected commit] // [expected commit]
writer.dtrack_write(&self.1.u64_bytes_le())?; writer.dtrack_write(&self.1.u64_bytes_le())?;
let g = pin(); let g = pin();
let actual_commit = BatchWriter::<Fs>::write_batch(self.0, &g, self.1, writer)?; let actual_commit =
BatchWriter::<Fs>::write_batch(self.0, &g, self.1, writer, &mut ctx.borrow_mut())?;
if actual_commit != self.1 { if actual_commit != self.1 {
// early exit // early exit
writer.dtrack_write(&[EventType::EarlyExit.dscr()])?; writer.dtrack_write(&[EventType::EarlyExit.dscr()])?;
@ -341,6 +349,7 @@ impl<'a> JournalAdapterEvent<BatchAdapter<ModelDataAdapter>> for FullModel<'a> {
Fs::File, Fs::File,
<BatchAdapter<ModelDataAdapter> as RawJournalAdapter>::Spec, <BatchAdapter<ModelDataAdapter> as RawJournalAdapter>::Spec,
>, >,
_: Rc<RefCell<BatchStats>>,
) -> RuntimeResult<()> { ) -> RuntimeResult<()> {
let g = pin(); let g = pin();
let mut row_writer: RowWriter<'_, Fs> = RowWriter { f }; let mut row_writer: RowWriter<'_, Fs> = RowWriter { f };
@ -410,6 +419,25 @@ impl DecodedBatchEvent {
} }
} }
pub struct BatchStats {
actual_commit: usize,
}
impl BatchStats {
pub fn new() -> Rc<RefCell<Self>> {
Rc::new(RefCell::new(Self { actual_commit: 0 }))
}
pub fn into_inner(me: Rc<RefCell<Self>>) -> Self {
RefCell::into_inner(Rc::into_inner(me).unwrap())
}
fn set_actual(&mut self, new: usize) {
self.actual_commit = new;
}
pub fn get_actual(&self) -> usize {
self.actual_commit
}
}
impl BatchAdapterSpec for ModelDataAdapter { impl BatchAdapterSpec for ModelDataAdapter {
type Spec = ModelDataBatchAofV1; type Spec = ModelDataBatchAofV1;
type GlobalState = Model; type GlobalState = Model;
@ -417,6 +445,7 @@ impl BatchAdapterSpec for ModelDataAdapter {
type EventType = EventType; type EventType = EventType;
type BatchMetadata = BatchMetadata; type BatchMetadata = BatchMetadata;
type BatchState = BatchRestoreState; type BatchState = BatchRestoreState;
type CommitContext = Rc<RefCell<BatchStats>>;
fn is_early_exit(event_type: &Self::EventType) -> bool { fn is_early_exit(event_type: &Self::EventType) -> bool {
EventType::EarlyExit.eq(event_type) EventType::EarlyExit.eq(event_type)
} }

@ -25,7 +25,7 @@
*/ */
use { use {
self::impls::mdl_journal::FullModel, self::impls::mdl_journal::{BatchStats, FullModel},
super::{ super::{
common::interface::{fs_imp::LocalFS, fs_traits::FSInterface}, common::interface::{fs_imp::LocalFS, fs_traits::FSInterface},
v1, SELoaded, v1, SELoaded,
@ -88,7 +88,7 @@ pub fn recreate(gns: GlobalNS) -> RuntimeResult<SELoaded> {
model_id.entity(), model_id.entity(),
model, model,
))?; ))?;
model_driver.commit_event(FullModel::new(model))?; model_driver.commit_with_ctx(FullModel::new(model), BatchStats::new())?;
model_drivers.add_driver( model_drivers.add_driver(
ModelUniqueID::new(model_id.space(), model_id.entity(), model.get_uuid()), ModelUniqueID::new(model_id.space(), model_id.entity(), model.get_uuid()),
model_driver, model_driver,

@ -92,6 +92,7 @@ impl<EL: EventLogSpec> RawJournalAdapter for EventLogAdapter<EL> {
type GlobalState = <EL as EventLogSpec>::GlobalState; type GlobalState = <EL as EventLogSpec>::GlobalState;
type Context<'a> = () where Self: 'a; type Context<'a> = () where Self: 'a;
type EventMeta = <EL as EventLogSpec>::EventMeta; type EventMeta = <EL as EventLogSpec>::EventMeta;
type CommitContext = ();
fn initialize(_: &raw::JournalInitializer) -> Self { fn initialize(_: &raw::JournalInitializer) -> Self {
Self(PhantomData) Self(PhantomData)
} }
@ -106,12 +107,13 @@ impl<EL: EventLogSpec> RawJournalAdapter for EventLogAdapter<EL> {
&mut self, &mut self,
w: &mut TrackedWriter<Fs::File, Self::Spec>, w: &mut TrackedWriter<Fs::File, Self::Spec>,
ev: E, ev: E,
ctx: (),
) -> RuntimeResult<()> ) -> RuntimeResult<()>
where where
E: RawJournalAdapterEvent<Self>, E: RawJournalAdapterEvent<Self>,
{ {
let mut pl = vec![]; let mut pl = vec![];
ev.write_buffered(&mut pl); ev.write_buffered(&mut pl, ctx);
let plen = (pl.len() as u64).to_le_bytes(); let plen = (pl.len() as u64).to_le_bytes();
let mut checksum = SCrc64::new(); let mut checksum = SCrc64::new();
checksum.update(&plen); checksum.update(&plen);
@ -207,6 +209,8 @@ pub trait BatchAdapterSpec {
type BatchMetadata; type BatchMetadata;
/// batch state /// batch state
type BatchState; type BatchState;
/// commit context
type CommitContext;
/// return true if the given event tag indicates an early exit /// return true if the given event tag indicates an early exit
fn is_early_exit(event_type: &Self::EventType) -> bool; fn is_early_exit(event_type: &Self::EventType) -> bool;
/// initialize the batch state /// initialize the batch state
@ -245,6 +249,7 @@ impl<BA: BatchAdapterSpec> RawJournalAdapter for BatchAdapter<BA> {
type GlobalState = <BA as BatchAdapterSpec>::GlobalState; type GlobalState = <BA as BatchAdapterSpec>::GlobalState;
type Context<'a> = () where Self: 'a; type Context<'a> = () where Self: 'a;
type EventMeta = <BA as BatchAdapterSpec>::BatchType; type EventMeta = <BA as BatchAdapterSpec>::BatchType;
type CommitContext = <BA as BatchAdapterSpec>::CommitContext;
fn initialize(_: &raw::JournalInitializer) -> Self { fn initialize(_: &raw::JournalInitializer) -> Self {
Self(PhantomData) Self(PhantomData)
} }
@ -259,11 +264,12 @@ impl<BA: BatchAdapterSpec> RawJournalAdapter for BatchAdapter<BA> {
&mut self, &mut self,
w: &mut TrackedWriter<Fs::File, Self::Spec>, w: &mut TrackedWriter<Fs::File, Self::Spec>,
ev: E, ev: E,
ctx: Self::CommitContext,
) -> RuntimeResult<()> ) -> RuntimeResult<()>
where where
E: RawJournalAdapterEvent<Self>, E: RawJournalAdapterEvent<Self>,
{ {
ev.write_direct::<Fs>(w)?; ev.write_direct::<Fs>(w, ctx)?;
let checksum = w.reset_partial(); let checksum = w.reset_partial();
w.tracked_write(&checksum.to_le_bytes()) w.tracked_write(&checksum.to_le_bytes())
} }

@ -215,10 +215,11 @@ pub trait RawJournalAdapterEvent<CA: RawJournalAdapter>: Sized {
fn write_direct<Fs: FSInterface>( fn write_direct<Fs: FSInterface>(
self, self,
_: &mut TrackedWriter<Fs::File, <CA as RawJournalAdapter>::Spec>, _: &mut TrackedWriter<Fs::File, <CA as RawJournalAdapter>::Spec>,
_: <CA as RawJournalAdapter>::CommitContext,
) -> RuntimeResult<()> { ) -> RuntimeResult<()> {
unimplemented!() unimplemented!()
} }
fn write_buffered(self, _: &mut Vec<u8>) { fn write_buffered<'a>(self, _: &mut Vec<u8>, _: <CA as RawJournalAdapter>::CommitContext) {
unimplemented!() unimplemented!()
} }
} }
@ -239,6 +240,8 @@ pub trait RawJournalAdapter: Sized {
type Context<'a> type Context<'a>
where where
Self: 'a; Self: 'a;
/// any external context to use during commit. can be used by events
type CommitContext;
/// a type representing the event kind /// a type representing the event kind
type EventMeta; type EventMeta;
/// initialize this adapter /// initialize this adapter
@ -250,10 +253,11 @@ pub trait RawJournalAdapter: Sized {
/// parse event metadata /// parse event metadata
fn parse_event_meta(meta: u64) -> Option<Self::EventMeta>; fn parse_event_meta(meta: u64) -> Option<Self::EventMeta>;
/// commit event (direct preference) /// commit event (direct preference)
fn commit_direct<'a, Fs: FSInterface, E>( fn commit_direct<Fs: FSInterface, E>(
&mut self, &mut self,
_: &mut TrackedWriter<Fs::File, Self::Spec>, _: &mut TrackedWriter<Fs::File, Self::Spec>,
_: E, _: E,
_: Self::CommitContext,
) -> RuntimeResult<()> ) -> RuntimeResult<()>
where where
E: RawJournalAdapterEvent<Self>, E: RawJournalAdapterEvent<Self>,
@ -261,7 +265,7 @@ pub trait RawJournalAdapter: Sized {
unimplemented!() unimplemented!()
} }
/// commit event (buffered) /// commit event (buffered)
fn commit_buffered<'a, E>(&mut self, _: &mut Vec<u8>, _: E) fn commit_buffered<E>(&mut self, _: &mut Vec<u8>, _: E, _: Self::CommitContext)
where where
E: RawJournalAdapterEvent<Self>, E: RawJournalAdapterEvent<Self>,
{ {
@ -499,13 +503,10 @@ impl<J: RawJournalAdapter, Fs: FSInterface> RawJournalWriter<J, Fs> {
} }
Ok(me) Ok(me)
} }
/// Commit a new event to the journal pub fn commit_with_ctx<'a, E: RawJournalAdapterEvent<J>>(
///
/// This will auto-flush the buffer and sync metadata as soon as the [`RawJournalAdapter::commit`] method returns,
/// unless otherwise configured.
pub fn commit_event<'a, E: RawJournalAdapterEvent<J>>(
&mut self, &mut self,
event: E, event: E,
ctx: J::CommitContext,
) -> RuntimeResult<()> { ) -> RuntimeResult<()> {
self.txn_context(|me, txn_id| { self.txn_context(|me, txn_id| {
let ev_md = event.md(); let ev_md = event.md();
@ -522,7 +523,7 @@ impl<J: RawJournalAdapter, Fs: FSInterface> RawJournalWriter<J, Fs> {
buf.extend(&txn_id.to_le_bytes()); buf.extend(&txn_id.to_le_bytes());
buf.extend(&ev_md.to_le_bytes()); buf.extend(&ev_md.to_le_bytes());
jtrace_writer!(CommitServerEventWroteMetadata); jtrace_writer!(CommitServerEventWroteMetadata);
j.commit_buffered(&mut buf, event); j.commit_buffered(&mut buf, event, ctx);
log_file.tracked_write_through_buffer(&buf)?; log_file.tracked_write_through_buffer(&buf)?;
} }
CommitPreference::Direct => { CommitPreference::Direct => {
@ -532,7 +533,7 @@ impl<J: RawJournalAdapter, Fs: FSInterface> RawJournalWriter<J, Fs> {
log_file.tracked_write(&ev_md.to_le_bytes())?; log_file.tracked_write(&ev_md.to_le_bytes())?;
jtrace_writer!(CommitServerEventWroteMetadata); jtrace_writer!(CommitServerEventWroteMetadata);
// now hand over control to adapter impl // now hand over control to adapter impl
J::commit_direct::<Fs, _>(j, log_file, event)?; J::commit_direct::<Fs, _>(j, log_file, event, ctx)?;
} }
} }
jtrace_writer!(CommitServerEventAdapterCompleted); jtrace_writer!(CommitServerEventAdapterCompleted);
@ -544,6 +545,16 @@ impl<J: RawJournalAdapter, Fs: FSInterface> RawJournalWriter<J, Fs> {
Ok(()) Ok(())
}) })
} }
/// Commit a new event to the journal
///
/// This will auto-flush the buffer and sync metadata as soon as the [`RawJournalAdapter::commit`] method returns,
/// unless otherwise configured.
pub fn commit_event<'a, E: RawJournalAdapterEvent<J>>(&mut self, event: E) -> RuntimeResult<()>
where
J::CommitContext: Default,
{
self.commit_with_ctx(event, Default::default())
}
} }
impl<J: RawJournalAdapter, Fs: FSInterface> RawJournalWriter<J, Fs> { impl<J: RawJournalAdapter, Fs: FSInterface> RawJournalWriter<J, Fs> {

@ -134,7 +134,7 @@ impl<T: SimpleDBEvent> RawJournalAdapterEvent<SimpleDBJournal> for T {
fn md(&self) -> u64 { fn md(&self) -> u64 {
T::OPC as _ T::OPC as _
} }
fn write_buffered(self, buf: &mut Vec<u8>) { fn write_buffered(self, buf: &mut Vec<u8>, _: ()) {
T::write_buffered(self, buf) T::write_buffered(self, buf)
} }
} }
@ -150,6 +150,7 @@ impl RawJournalAdapter for SimpleDBJournal {
type Spec = SystemDatabaseV1; type Spec = SystemDatabaseV1;
type GlobalState = SimpleDB; type GlobalState = SimpleDB;
type EventMeta = EventMeta; type EventMeta = EventMeta;
type CommitContext = ();
type Context<'a> = () where Self: 'a; type Context<'a> = () where Self: 'a;
fn initialize(_: &JournalInitializer) -> Self { fn initialize(_: &JournalInitializer) -> Self {
Self Self
@ -171,8 +172,9 @@ impl RawJournalAdapter for SimpleDBJournal {
&mut self, &mut self,
buf: &mut Vec<u8>, buf: &mut Vec<u8>,
event: E, event: E,
ctx: (),
) { ) {
event.write_buffered(buf) event.write_buffered(buf, ctx)
} }
fn decode_apply<'a, Fs: FSInterface>( fn decode_apply<'a, Fs: FSInterface>(
gs: &Self::GlobalState, gs: &Self::GlobalState,

@ -55,7 +55,10 @@ use {
util::compiler::TaggedEnum, util::compiler::TaggedEnum,
}, },
sky_macros::TaggedEnum, sky_macros::TaggedEnum,
std::cell::{Ref, RefCell, RefMut}, std::{
cell::{Ref, RefCell, RefMut},
rc::Rc,
},
}; };
// event definitions // event definitions
@ -99,7 +102,7 @@ impl<TE: IsTestEvent> RawJournalAdapterEvent<EventLogAdapter<TestDBAdapter>> for
fn md(&self) -> u64 { fn md(&self) -> u64 {
Self::EVCODE.dscr_u64() Self::EVCODE.dscr_u64()
} }
fn write_buffered(self, buf: &mut Vec<u8>) { fn write_buffered(self, buf: &mut Vec<u8>, _: ()) {
TE::encode(self, buf) TE::encode(self, buf)
} }
} }
@ -296,6 +299,7 @@ impl<'a> RawJournalAdapterEvent<BatchAdapter<BatchDBAdapter>> for BatchDBFlush<'
Fs::File, Fs::File,
<BatchAdapter<BatchDBAdapter> as super::raw::RawJournalAdapter>::Spec, <BatchAdapter<BatchDBAdapter> as super::raw::RawJournalAdapter>::Spec,
>, >,
ctx: Rc<RefCell<BatchContext>>,
) -> RuntimeResult<()> { ) -> RuntimeResult<()> {
// write: [expected commit][body][actual commit] // write: [expected commit][body][actual commit]
// for this dummy impl, we're expecting to write the full dataset but we're going to actually write the part // for this dummy impl, we're expecting to write the full dataset but we're going to actually write the part
@ -315,12 +319,18 @@ impl<'a> RawJournalAdapterEvent<BatchAdapter<BatchDBAdapter>> for BatchDBFlush<'
// early exit! // early exit!
f.dtrack_write(&[BatchEventType::EarlyExit.dscr()])?; f.dtrack_write(&[BatchEventType::EarlyExit.dscr()])?;
} }
ctx.borrow_mut().actual_write = actual.len();
// actual commit // actual commit
f.dtrack_write(&(actual.len() as u64).to_le_bytes())?; f.dtrack_write(&(actual.len() as u64).to_le_bytes())?;
Ok(()) Ok(())
} }
} }
#[derive(Debug, Default)]
pub struct BatchContext {
actual_write: usize,
}
pub struct BatchDBAdapter; pub struct BatchDBAdapter;
impl BatchAdapterSpec for BatchDBAdapter { impl BatchAdapterSpec for BatchDBAdapter {
type Spec = ModelDataBatchAofV1; type Spec = ModelDataBatchAofV1;
@ -328,6 +338,7 @@ impl BatchAdapterSpec for BatchDBAdapter {
type BatchType = BatchType; type BatchType = BatchType;
type EventType = BatchEventType; type EventType = BatchEventType;
type BatchMetadata = (); type BatchMetadata = ();
type CommitContext = Rc<RefCell<BatchContext>>;
type BatchState = BatchState; type BatchState = BatchState;
fn initialize_batch_state(_: &Self::GlobalState) -> Self::BatchState { fn initialize_batch_state(_: &Self::GlobalState) -> Self::BatchState {
BatchState { BatchState {

Loading…
Cancel
Save