From 982898b8191b13135cf385cdd64d6363956aee74 Mon Sep 17 00:00:00 2001 From: Sayan Nandan Date: Fri, 1 Mar 2024 17:41:07 +0530 Subject: [PATCH] Keep driver and data in same container --- server/src/engine/core/dcl.rs | 3 + server/src/engine/core/ddl_misc.rs | 21 +- server/src/engine/core/dml/ins.rs | 4 +- server/src/engine/core/dml/mod.rs | 4 +- server/src/engine/core/dml/sel.rs | 65 ++--- server/src/engine/core/exec.rs | 27 ++- server/src/engine/core/mod.rs | 52 +++- server/src/engine/core/model/alt.rs | 15 +- server/src/engine/core/model/delta.rs | 4 +- server/src/engine/core/model/mod.rs | 229 ++++++++++-------- server/src/engine/core/space.rs | 162 +++++++------ server/src/engine/core/system_db.rs | 6 +- server/src/engine/core/tests/ddl_model/alt.rs | 15 +- server/src/engine/core/tests/ddl_model/mod.rs | 15 +- server/src/engine/core/tests/ddl_space/mod.rs | 4 +- server/src/engine/core/tests/dml/mod.rs | 9 +- server/src/engine/core/tests/dml/select.rs | 9 +- server/src/engine/fractal/drivers.rs | 74 +++--- server/src/engine/fractal/mgr.rs | 67 +++-- server/src/engine/fractal/mod.rs | 87 +++---- server/src/engine/fractal/test_utils.rs | 86 +++---- server/src/engine/mod.rs | 8 +- server/src/engine/net/protocol/mod.rs | 1 + .../src/engine/storage/common/interface/fs.rs | 6 +- .../engine/storage/common/interface/vfs.rs | 3 +- .../storage/common/sdss/impls/sdss_r1/rw.rs | 63 ++--- .../common_encoding/r1/impls/gns/mod.rs | 8 +- .../common_encoding/r1/impls/gns/model.rs | 38 +-- .../common_encoding/r1/impls/gns/space.rs | 8 +- .../r1/impls/gns/tests/full_chain.rs | 33 ++- .../common_encoding/r1/impls/gns/tests/io.rs | 8 +- .../engine/storage/common_encoding/r1/obj.rs | 12 +- .../storage/common_encoding/r1/tests.rs | 4 +- .../src/engine/storage/common_encoding/r2.rs | 8 +- server/src/engine/storage/mod.rs | 7 +- server/src/engine/storage/v1/loader.rs | 24 +- server/src/engine/storage/v1/mod.rs | 4 +- .../engine/storage/v1/raw/batch_jrnl/mod.rs | 4 +- .../storage/v1/raw/batch_jrnl/restore.rs | 6 +- .../src/engine/storage/v1/raw/journal/mod.rs | 6 +- server/src/engine/storage/v2/impls/gns_log.rs | 11 +- .../engine/storage/v2/impls/mdl_journal.rs | 43 ++-- .../storage/v2/impls/tests/model_driver.rs | 6 +- server/src/engine/storage/v2/mod.rs | 50 ++-- .../src/engine/storage/v2/raw/journal/mod.rs | 8 +- .../engine/storage/v2/raw/journal/raw/mod.rs | 23 +- .../engine/storage/v2/raw/journal/tests.rs | 6 +- server/src/engine/storage/v2/raw/spec.rs | 1 + server/src/engine/txn/gns/model.rs | 8 +- server/src/engine/txn/shared.rs | 4 +- 50 files changed, 700 insertions(+), 669 deletions(-) diff --git a/server/src/engine/core/dcl.rs b/server/src/engine/core/dcl.rs index 924e92f2..31c3d511 100644 --- a/server/src/engine/core/dcl.rs +++ b/server/src/engine/core/dcl.rs @@ -62,6 +62,7 @@ fn alter_user( let (username, password) = get_user_data(user)?; global .state() + .namespace() .sys_db() .alter_user(global, &username, &password) } @@ -70,6 +71,7 @@ fn create_user(global: &impl GlobalInstanceLike, user: UserDecl) -> QueryResult< let (username, password) = get_user_data(user)?; global .state() + .namespace() .sys_db() .create_user(global, username.into_boxed_str(), &password) } @@ -99,6 +101,7 @@ fn drop_user( } global .state() + .namespace() .sys_db() .drop_user(global, user_del.username()) } diff --git a/server/src/engine/core/ddl_misc.rs b/server/src/engine/core/ddl_misc.rs index 9ec9f964..0c7d4312 100644 --- a/server/src/engine/core/ddl_misc.rs +++ b/server/src/engine/core/ddl_misc.rs @@ -39,7 +39,7 @@ pub fn inspect( let ret = match stmt { Inspect::Global => { // collect spaces - let spaces = g.state().idx().read(); + let spaces = g.state().namespace().idx().read(); let mut spaces_iter = spaces.iter().peekable(); let mut ret = format!("{{\"spaces\":["); while let Some((space, _)) = spaces_iter.next() { @@ -56,7 +56,7 @@ pub fn inspect( drop(spaces_iter); drop(spaces); // collect users - let users = g.state().sys_db().users().read(); + let users = g.state().namespace().sys_db().users().read(); let mut users_iter = users.iter().peekable(); while let Some((user, _)) = users_iter.next() { ret.push('"'); @@ -70,15 +70,18 @@ pub fn inspect( ret.push_str("],\"settings\":{}}"); ret } - Inspect::Model(m) => match g.state().idx_models().read().get(&m) { - Some(m) => format!( - "{{\"decl\":\"{}\",\"rows\":{},\"properties\":{{}}}}", - m.describe(), - m.primary_index().count() - ), + Inspect::Model(m) => match g.state().namespace().idx_models().read().get(&m) { + Some(m) => { + let m = m.data(); + format!( + "{{\"decl\":\"{}\",\"rows\":{},\"properties\":{{}}}}", + m.describe(), + m.primary_index().count() + ) + } None => return Err(QueryError::QExecObjectNotFound), }, - Inspect::Space(s) => match g.state().idx().read().get(s.as_str()) { + Inspect::Space(s) => match g.state().namespace().idx().read().get(s.as_str()) { Some(s) => { let mut ret = format!("{{\"models\":["); let mut models_iter = s.models().iter().peekable(); diff --git a/server/src/engine/core/dml/ins.rs b/server/src/engine/core/dml/ins.rs index 6eaa2674..307f81ee 100644 --- a/server/src/engine/core/dml/ins.rs +++ b/server/src/engine/core/dml/ins.rs @@ -29,7 +29,7 @@ use crate::engine::{ self, dml::QueryExecMeta, index::{DcFieldIndex, PrimaryIndexKey, Row}, - model::{delta::DataDeltaKind, Model}, + model::{delta::DataDeltaKind, ModelData}, }, error::{QueryError, QueryResult}, fractal::GlobalInstanceLike, @@ -67,7 +67,7 @@ pub fn insert(global: &impl GlobalInstanceLike, insert: InsertStatement) -> Quer // TODO(@ohsayan): optimize null case fn prepare_insert( - model: &Model, + model: &ModelData, insert: InsertData, ) -> QueryResult<(PrimaryIndexKey, DcFieldIndex)> { let fields = model.fields(); diff --git a/server/src/engine/core/dml/mod.rs b/server/src/engine/core/dml/mod.rs index 6463bbe0..5d7cc1fc 100644 --- a/server/src/engine/core/dml/mod.rs +++ b/server/src/engine/core/dml/mod.rs @@ -31,7 +31,7 @@ mod upd; use crate::{ engine::{ - core::model::Model, + core::model::ModelData, data::{lit::Lit, tag::DataTag}, error::{QueryError, QueryResult}, ql::dml::WhereClause, @@ -53,7 +53,7 @@ pub use { upd::update_resp, }; -impl Model { +impl ModelData { pub(self) fn resolve_where<'a>( &self, where_clause: &mut WhereClause<'a>, diff --git a/server/src/engine/core/dml/sel.rs b/server/src/engine/core/dml/sel.rs index 5aa05156..ea884583 100644 --- a/server/src/engine/core/dml/sel.rs +++ b/server/src/engine/core/dml/sel.rs @@ -29,7 +29,7 @@ use crate::engine::{ index::{ DcFieldIndex, IndexLatchHandleExclusive, PrimaryIndexKey, Row, RowData, RowDataLck, }, - model::Model, + model::ModelData, }, data::{ cell::{Datacell, VirtualDatacell}, @@ -91,10 +91,10 @@ pub fn select_all( mut f: F, ) -> QueryResult where - Fm: FnMut(&mut T, &Model, usize), + Fm: FnMut(&mut T, &ModelData, usize), F: FnMut(&mut T, &Datacell, usize), { - global.state().with_model(select.entity, |mdl| { + global.state().namespace().with_model(select.entity, |mdl| { let g = sync::atm::cpin(); let mut i = 0; if select.wildcard { @@ -181,47 +181,50 @@ pub fn select_custom( where F: FnMut(&Datacell), { - global.state().with_model(select.entity(), |mdl| { - let target_key = mdl.resolve_where(select.clauses_mut())?; - let pkdc = VirtualDatacell::new(target_key.clone(), mdl.p_tag().tag_unique()); - let g = sync::atm::cpin(); - let mut read_field = |key, fields: &DcFieldIndex| { - match fields.st_get(key) { - Some(dc) => cellfn(dc), - None if key == mdl.p_key() => cellfn(&pkdc), - None => return Err(QueryError::QExecUnknownField), - } - Ok(()) - }; - match mdl.primary_index().select(target_key.clone(), &g) { - Some(row) => { - let r = row.resolve_schema_deltas_and_freeze(mdl.delta_state()); - if select.is_wildcard() { - for key in mdl.fields().stseq_ord_key() { - read_field(key.as_ref(), r.fields())?; - } - } else { - for key in select.into_fields() { - read_field(key.as_str(), r.fields())?; + global + .state() + .namespace() + .with_model(select.entity(), |mdl| { + let target_key = mdl.resolve_where(select.clauses_mut())?; + let pkdc = VirtualDatacell::new(target_key.clone(), mdl.p_tag().tag_unique()); + let g = sync::atm::cpin(); + let mut read_field = |key, fields: &DcFieldIndex| { + match fields.st_get(key) { + Some(dc) => cellfn(dc), + None if key == mdl.p_key() => cellfn(&pkdc), + None => return Err(QueryError::QExecUnknownField), + } + Ok(()) + }; + match mdl.primary_index().select(target_key.clone(), &g) { + Some(row) => { + let r = row.resolve_schema_deltas_and_freeze(mdl.delta_state()); + if select.is_wildcard() { + for key in mdl.fields().stseq_ord_key() { + read_field(key.as_ref(), r.fields())?; + } + } else { + for key in select.into_fields() { + read_field(key.as_str(), r.fields())?; + } } } + None => return Err(QueryError::QExecDmlRowNotFound), } - None => return Err(QueryError::QExecDmlRowNotFound), - } - Ok(()) - }) + Ok(()) + }) } struct RowIteratorAll<'g> { _g: &'g sync::atm::Guard, - mdl: &'g Model, + mdl: &'g ModelData, iter: as MTIndexExt>::IterEntry<'g, 'g, 'g>, _latch: IndexLatchHandleExclusive<'g>, limit: usize, } impl<'g> RowIteratorAll<'g> { - fn new(g: &'g sync::atm::Guard, mdl: &'g Model, limit: usize) -> Self { + fn new(g: &'g sync::atm::Guard, mdl: &'g ModelData, limit: usize) -> Self { let idx = mdl.primary_index(); let latch = idx.acquire_exclusive(); Self { diff --git a/server/src/engine/core/exec.rs b/server/src/engine/core/exec.rs index 79a2bf64..557894b5 100644 --- a/server/src/engine/core/exec.rs +++ b/server/src/engine/core/exec.rs @@ -25,7 +25,7 @@ */ use crate::engine::{ - core::{ddl_misc, dml, model::Model, space::Space}, + core::{ddl_misc, dml, model::ModelData, space::Space}, error::{QueryError, QueryResult}, fractal::{Global, GlobalInstanceLike}, net::protocol::{ClientLocalState, Response, ResponseType, SQuery}, @@ -152,14 +152,25 @@ async fn run_blocking_stmt( _callgs_map( &g, t, - Model::transactional_exec_create, + ModelData::transactional_exec_create, translate_ddl_result, ) }, |g, _, t| _callgs_map(&g, t, Space::transactional_exec_alter, |_| Response::Empty), - |g, _, t| _callgs_map(&g, t, Model::transactional_exec_alter, |_| Response::Empty), + |g, _, t| { + _callgs_map(&g, t, ModelData::transactional_exec_alter, |_| { + Response::Empty + }) + }, |g, _, t| _callgs_map(&g, t, Space::transactional_exec_drop, translate_ddl_result), - |g, _, t| _callgs_map(&g, t, Model::transactional_exec_drop, translate_ddl_result), + |g, _, t| { + _callgs_map( + &g, + t, + ModelData::transactional_exec_drop, + translate_ddl_result, + ) + }, ]; let r = unsafe { // UNSAFE(@ohsayan): the only await is within this block @@ -201,7 +212,11 @@ fn cstate_use( NB: just like SQL, we don't really care about what this is set to as it's basically a shorthand. so we do a simple vanity check */ - if !global.state().contains_space(new_space.as_str()) { + if !global + .state() + .namespace() + .contains_space(new_space.as_str()) + { return Err(QueryError::QExecObjectNotFound); } cstate.set_cs(new_space.boxed_str()); @@ -209,7 +224,7 @@ fn cstate_use( Use::RefreshCurrent => match cstate.get_cs() { None => return Ok(Response::Null), Some(space) => { - if !global.state().contains_space(space) { + if !global.state().namespace().contains_space(space) { cstate.unset_cs(); return Err(QueryError::QExecObjectNotFound); } diff --git a/server/src/engine/core/mod.rs b/server/src/engine/core/mod.rs index e3eaa845..42af315f 100644 --- a/server/src/engine/core/mod.rs +++ b/server/src/engine/core/mod.rs @@ -40,13 +40,17 @@ mod util; pub(super) mod tests; // re-exports pub use self::util::{EntityID, EntityIDRef}; + // imports use { - self::{dml::QueryExecMeta, model::Model}, - super::fractal::GlobalInstanceLike, + self::{ + dml::QueryExecMeta, + model::{Model, ModelData}, + }, crate::engine::{ core::space::Space, error::{QueryError, QueryResult}, + fractal::{FractalGNSDriver, GlobalInstanceLike}, idx::IndexST, }, parking_lot::RwLock, @@ -57,14 +61,32 @@ use { /// but something better is in the offing type RWLIdx = RwLock>; -#[cfg_attr(test, derive(Debug))] +#[derive(Debug)] pub struct GlobalNS { + data: GNSData, + driver: FractalGNSDriver, +} + +impl GlobalNS { + pub fn new(data: GNSData, driver: FractalGNSDriver) -> Self { + Self { data, driver } + } + pub fn namespace(&self) -> &GNSData { + &self.data + } + pub fn gns_driver(&self) -> &FractalGNSDriver { + &self.driver + } +} + +#[derive(Debug)] +pub struct GNSData { idx_mdl: RWLIdx, idx: RWLIdx, Space>, sys_db: system_db::SystemDatabase, } -impl GlobalNS { +impl GNSData { pub fn empty() -> Self { Self { idx_mdl: RWLIdx::default(), @@ -104,7 +126,7 @@ impl GlobalNS { f: F, ) -> QueryResult where - F: FnOnce(&Space, &mut Model) -> QueryResult, + F: FnOnce(&Space, &mut ModelData) -> QueryResult, { let mut mdl_idx = self.idx_mdl.write(); let Some(model) = mdl_idx.get_mut(&entity) else { @@ -112,17 +134,17 @@ impl GlobalNS { }; let space_read = self.idx.read(); let space = space_read.get(entity.space()).unwrap(); - f(space, model) + f(space, model.data_mut()) } pub fn with_model<'a, T, F>(&self, entity: EntityIDRef<'a>, f: F) -> QueryResult where - F: FnOnce(&Model) -> QueryResult, + F: FnOnce(&ModelData) -> QueryResult, { let mdl_idx = self.idx_mdl.read(); let Some(model) = mdl_idx.get(&entity) else { return Err(QueryError::QExecObjectNotFound); }; - f(model) + f(model.data()) } pub fn idx_models(&self) -> &RWLIdx { &self.idx_mdl @@ -151,13 +173,19 @@ pub(self) fn with_model_for_data_update<'a, F>( f: F, ) -> QueryResult<()> where - F: FnOnce(&Model) -> QueryResult, + F: FnOnce(&ModelData) -> QueryResult, { - let mdl_idx = global.state().idx_mdl.read(); + let mdl_idx = global.state().namespace().idx_mdl.read(); let Some(model) = mdl_idx.get(&entity) else { return Err(QueryError::QExecObjectNotFound); }; - let r = f(model)?; - model::DeltaState::guard_delta_overflow(global, entity.space(), entity.entity(), model, r); + let r = f(model.data())?; + model::DeltaState::guard_delta_overflow( + global, + entity.space(), + entity.entity(), + model.data(), + r, + ); Ok(()) } diff --git a/server/src/engine/core/model/alt.rs b/server/src/engine/core/model/alt.rs index 104e1664..1ea17210 100644 --- a/server/src/engine/core/model/alt.rs +++ b/server/src/engine/core/model/alt.rs @@ -25,7 +25,7 @@ */ use { - super::{Field, Layer, Model}, + super::{Field, Layer, ModelData}, crate::{ engine::{ core::EntityIDRef, @@ -76,7 +76,7 @@ macro_rules! can_ignore { } #[inline(always)] -fn no_field(mr: &Model, new: &str) -> bool { +fn no_field(mr: &ModelData, new: &str) -> bool { !mr.fields().st_contains(new) } @@ -90,7 +90,7 @@ fn check_nullable(props: &mut HashMap, DictEntryGeneric>) -> QueryResul impl<'a> AlterPlan<'a> { pub fn fdeltas( - mdl: &Model, + mdl: &ModelData, AlterModel { model, kind }: AlterModel<'a>, ) -> QueryResult> { let mut no_lock = true; @@ -245,7 +245,7 @@ impl<'a> AlterPlan<'a> { } } -impl Model { +impl ModelData { pub fn transactional_exec_alter( global: &G, alter: AlterModel, @@ -253,6 +253,7 @@ impl Model { let (space_name, model_name) = (alter.model.space(), alter.model.entity()); global .state() + .namespace() .with_model_space_mut_for_ddl(alter.model, |space, model| { // prepare plan let plan = AlterPlan::fdeltas(model, alter)?; @@ -274,8 +275,8 @@ impl Model { ); // commit txn global + .state() .gns_driver() - .lock() .driver_context(|drv| drv.commit_event(txn), || {})?; let mut mutator = model.model_mutator(); new_fields @@ -293,8 +294,8 @@ impl Model { ); // commit txn global + .state() .gns_driver() - .lock() .driver_context(|drv| drv.commit_event(txn), || {})?; let mut mutator = model.model_mutator(); removed.iter().for_each(|field_id| { @@ -309,8 +310,8 @@ impl Model { ); // commit txn global + .state() .gns_driver() - .lock() .driver_context(|drv| drv.commit_event(txn), || {})?; let mut mutator = model.model_mutator(); updated.into_iter().for_each(|(field_id, field)| { diff --git a/server/src/engine/core/model/delta.rs b/server/src/engine/core/model/delta.rs index 661a5bce..84e2aaa3 100644 --- a/server/src/engine/core/model/delta.rs +++ b/server/src/engine/core/model/delta.rs @@ -25,7 +25,7 @@ */ use { - super::Model, + super::ModelData, crate::engine::{ core::{dml::QueryExecMeta, index::Row}, fractal::{FractalToken, GlobalInstanceLike}, @@ -74,7 +74,7 @@ impl DeltaState { global: &impl GlobalInstanceLike, space_name: &str, model_name: &str, - model: &Model, + model: &ModelData, hint: QueryExecMeta, ) { global.request_batch_resolve_if_cache_full(space_name, model_name, model, hint) diff --git a/server/src/engine/core/model/mod.rs b/server/src/engine/core/model/mod.rs index b0233d2f..e3138be1 100644 --- a/server/src/engine/core/model/mod.rs +++ b/server/src/engine/core/model/mod.rs @@ -36,7 +36,7 @@ use { uuid::Uuid, }, error::{QueryError, QueryResult}, - fractal::{GenericTask, GlobalInstanceLike, Task}, + fractal::{FractalModelDriver, GenericTask, GlobalInstanceLike, Task}, idx::{self, IndexBaseSpec, IndexSTSeqCns, STIndex, STIndexSeq}, mem::{RawStr, VInline}, ql::ddl::{ @@ -56,6 +56,30 @@ type Fields = IndexSTSeqCns; #[derive(Debug)] pub struct Model { + data: ModelData, + driver: FractalModelDriver, +} + +impl Model { + pub fn new(data: ModelData, driver: FractalModelDriver) -> Self { + Self { data, driver } + } + pub fn data(&self) -> &ModelData { + &self.data + } + pub fn data_mut(&mut self) -> &mut ModelData { + &mut self.data + } + pub fn driver(&self) -> &FractalModelDriver { + &self.driver + } + pub fn into_driver(self) -> FractalModelDriver { + self.driver + } +} + +#[derive(Debug)] +pub struct ModelData { uuid: Uuid, p_key: RawStr, p_tag: FullTag, @@ -67,7 +91,7 @@ pub struct Model { } #[cfg(test)] -impl PartialEq for Model { +impl PartialEq for ModelData { fn eq(&self, m: &Self) -> bool { self.uuid == m.uuid && self.p_key == m.p_key @@ -76,7 +100,7 @@ impl PartialEq for Model { } } -impl Model { +impl ModelData { pub fn get_uuid(&self) -> Uuid { self.uuid } @@ -153,7 +177,7 @@ impl Model { } } -impl Model { +impl ModelData { fn new_with_private( uuid: Uuid, p_key: RawStr, @@ -260,7 +284,7 @@ impl Model { } } -impl Model { +impl ModelData { pub fn transactional_exec_create( global: &G, stmt: CreateModel, @@ -268,110 +292,119 @@ impl Model { let (space_name, model_name) = (stmt.model_name.space(), stmt.model_name.entity()); let if_nx = stmt.if_not_exists; let model = Self::process_create(stmt)?; - global.state().ddl_with_space_mut(&space_name, |space| { - // TODO(@ohsayan): be extra cautious with post-transactional tasks (memck) - if space.models().contains(model_name) { + global + .state() + .namespace() + .ddl_with_space_mut(&space_name, |space| { + // TODO(@ohsayan): be extra cautious with post-transactional tasks (memck) + if space.models().contains(model_name) { + if if_nx { + return Ok(Some(false)); + } else { + return Err(QueryError::QExecDdlObjectAlreadyExists); + } + } + // since we've locked this down, no one else can parallely create another model in the same space (or remove) + // prepare txn + let txn = gns::model::CreateModelTxn::new( + SpaceIDRef::new(&space_name, &space), + &model_name, + &model, + ); + // attempt to initialize driver + let mdl_driver = global.initialize_model_driver( + &space_name, + space.get_uuid(), + &model_name, + model.get_uuid(), + )?; + // commit txn + global.state().gns_driver().driver_context( + |drv| drv.commit_event(txn), + || { + global.taskmgr_post_standard_priority(Task::new( + GenericTask::delete_model_dir( + &space_name, + space.get_uuid(), + &model_name, + model.get_uuid(), + ), + )) + }, + )?; + // update global state + let _ = space.models_mut().insert(model_name.into()); + let _ = global.state().namespace().idx_models().write().insert( + EntityID::new(&space_name, &model_name), + Model::new(model, mdl_driver), + ); if if_nx { - return Ok(Some(false)); + Ok(Some(true)) } else { - return Err(QueryError::QExecDdlObjectAlreadyExists); + Ok(None) } - } - // since we've locked this down, no one else can parallely create another model in the same space (or remove) - let mut txn_driver = global.gns_driver().lock(); - // prepare txn - let txn = gns::model::CreateModelTxn::new( - SpaceIDRef::new(&space_name, &space), - &model_name, - &model, - ); - // attempt to initialize driver - global.initialize_model_driver( - &space_name, - space.get_uuid(), - &model_name, - model.get_uuid(), - )?; - // commit txn - txn_driver.driver_context( - |drv| drv.commit_event(txn), - || { - global.taskmgr_post_standard_priority(Task::new(GenericTask::delete_model_dir( - &space_name, - space.get_uuid(), - &model_name, - model.get_uuid(), - ))) - }, - )?; - // update global state - let _ = space.models_mut().insert(model_name.into()); - let _ = global - .state() - .idx_models() - .write() - .insert(EntityID::new(&space_name, &model_name), model); - if if_nx { - Ok(Some(true)) - } else { - Ok(None) - } - }) + }) } pub fn transactional_exec_drop( global: &G, stmt: DropModel, ) -> QueryResult> { let (space_name, model_name) = (stmt.entity.space(), stmt.entity.entity()); - global.state().ddl_with_space_mut(&space_name, |space| { - if !space.models().contains(model_name) { + global + .state() + .namespace() + .ddl_with_space_mut(&space_name, |space| { + if !space.models().contains(model_name) { + if stmt.if_exists { + return Ok(Some(false)); + } else { + // the model isn't even present + return Err(QueryError::QExecObjectNotFound); + } + } + // get exclusive lock on models + let mut models_idx = global.state().namespace().idx_models().write(); + let model = models_idx + .get(&EntityIDRef::new(&space_name, &model_name)) + .unwrap(); + // the model must be empty for us to clean it up! (NB: consistent view + EX) + if (model.data.primary_index().count() != 0) & !(stmt.force) { + // nope, we can't drop this + return Err(QueryError::QExecDdlNotEmpty); + } + // okay this is looking good for us + // prepare txn + let txn = gns::model::DropModelTxn::new(ModelIDRef::new( + SpaceIDRef::new(&space_name, &space), + &model_name, + model.data.get_uuid(), + model + .data + .delta_state() + .schema_current_version() + .value_u64(), + )); + // commit txn + global + .state() + .gns_driver() + .driver_context(|drv| drv.commit_event(txn), || {})?; + // request cleanup + global.purge_model_driver( + space_name, + space.get_uuid(), + model_name, + model.data().get_uuid(), + ); + // update global state + let _ = models_idx.remove(&EntityIDRef::new(&space_name, &model_name)); + let _ = space.models_mut().remove(model_name); if stmt.if_exists { - return Ok(Some(false)); + Ok(Some(true)) } else { - // the model isn't even present - return Err(QueryError::QExecObjectNotFound); + Ok(None) } - } - // get exclusive lock on models - let mut models_idx = global.state().idx_models().write(); - let model = models_idx - .get(&EntityIDRef::new(&space_name, &model_name)) - .unwrap(); - // the model must be empty for us to clean it up! (NB: consistent view + EX) - if (model.primary_index().count() != 0) & !(stmt.force) { - // nope, we can't drop this - return Err(QueryError::QExecDdlNotEmpty); - } - // okay this is looking good for us - // prepare txn - let txn = gns::model::DropModelTxn::new(ModelIDRef::new( - SpaceIDRef::new(&space_name, &space), - &model_name, - model.get_uuid(), - model.delta_state().schema_current_version().value_u64(), - )); - // commit txn - global - .gns_driver() - .lock() - .driver_context(|drv| drv.commit_event(txn), || {})?; - // request cleanup - global.purge_model_driver( - space_name, - space.get_uuid(), - model_name, - model.get_uuid(), - false, - ); - // update global state - let _ = models_idx.remove(&EntityIDRef::new(&space_name, &model_name)); - let _ = space.models_mut().remove(model_name); - if stmt.if_exists { - Ok(Some(true)) - } else { - Ok(None) - } - }) + }) } } @@ -429,7 +462,7 @@ impl ModelPrivate { } pub struct ModelMutator<'a> { - model: &'a mut Model, + model: &'a mut ModelData, } impl<'a> ModelMutator<'a> { diff --git a/server/src/engine/core/space.rs b/server/src/engine/core/space.rs index b5feca6f..3fc3d701 100644 --- a/server/src/engine/core/space.rs +++ b/server/src/engine/core/space.rs @@ -24,11 +24,8 @@ * */ -use crate::engine::storage::safe_interfaces::{paths_v1, FileSystem}; - -use super::EntityIDRef; - use { + super::EntityIDRef, crate::engine::{ data::{dict, uuid::Uuid, DictEntryGeneric, DictGeneric}, error::{QueryError, QueryResult}, @@ -157,7 +154,7 @@ impl Space { if_not_exists, } = Self::process_create(space)?; // lock the global namespace - global.state().ddl_with_spaces_write(|spaces| { + global.state().namespace().ddl_with_spaces_write(|spaces| { if spaces.st_contains(&space_name) { if if_not_exists { return Ok(Some(false)); @@ -169,9 +166,9 @@ impl Space { // prepare txn let txn = txn::gns::space::CreateSpaceTxn::new(space.props(), &space_name, &space); // try to create space for...the space - FileSystem::create_dir_all(&paths_v1::space_dir(&space_name, space.get_uuid()))?; + global.initialize_space(&space_name, space.get_uuid())?; // commit txn - global.gns_driver().lock().driver_context( + global.state().gns_driver().driver_context( |drv| drv.commit_event(txn), || { global.taskmgr_post_standard_priority(Task::new(GenericTask::delete_space_dir( @@ -197,36 +194,41 @@ impl Space { updated_props, }: AlterSpace, ) -> QueryResult<()> { - global.state().ddl_with_space_mut(&space_name, |space| { - match updated_props.get(Self::KEY_ENV) { - Some(DictEntryGeneric::Map(_)) if updated_props.len() == 1 => {} - Some(DictEntryGeneric::Data(l)) if updated_props.len() == 1 && l.is_null() => {} - None if updated_props.is_empty() => return Ok(()), - _ => return Err(QueryError::QExecDdlInvalidProperties), - } - // create patch - let patch = match dict::rprepare_metadata_patch(space.props(), updated_props) { - Some(patch) => patch, - None => return Err(QueryError::QExecDdlInvalidProperties), - }; - // prepare txn - let txn = - txn::gns::space::AlterSpaceTxn::new(SpaceIDRef::new(&space_name, space), &patch); - // commit - // commit txn - global - .gns_driver() - .lock() - .driver_context(|drv| drv.commit_event(txn), || {})?; - // merge - dict::rmerge_data_with_patch(space.props_mut(), patch); - // the `env` key may have been popped, so put it back (setting `env: null` removes the env key and we don't want to waste time enforcing this in the - // merge algorithm) - let _ = space - .props_mut() - .st_insert(Self::KEY_ENV.into(), DictEntryGeneric::Map(into_dict!())); - Ok(()) - }) + global + .state() + .namespace() + .ddl_with_space_mut(&space_name, |space| { + match updated_props.get(Self::KEY_ENV) { + Some(DictEntryGeneric::Map(_)) if updated_props.len() == 1 => {} + Some(DictEntryGeneric::Data(l)) if updated_props.len() == 1 && l.is_null() => {} + None if updated_props.is_empty() => return Ok(()), + _ => return Err(QueryError::QExecDdlInvalidProperties), + } + // create patch + let patch = match dict::rprepare_metadata_patch(space.props(), updated_props) { + Some(patch) => patch, + None => return Err(QueryError::QExecDdlInvalidProperties), + }; + // prepare txn + let txn = txn::gns::space::AlterSpaceTxn::new( + SpaceIDRef::new(&space_name, space), + &patch, + ); + // commit + // commit txn + global + .state() + .gns_driver() + .driver_context(|drv| drv.commit_event(txn), || {})?; + // merge + dict::rmerge_data_with_patch(space.props_mut(), patch); + // the `env` key may have been popped, so put it back (setting `env: null` removes the env key and we don't want to waste time enforcing this in the + // merge algorithm) + let _ = space + .props_mut() + .st_insert(Self::KEY_ENV.into(), DictEntryGeneric::Map(into_dict!())); + Ok(()) + }) } pub fn transactional_exec_drop( global: &G, @@ -237,51 +239,53 @@ impl Space { }: DropSpace, ) -> QueryResult> { if force { - global.state().ddl_with_all_mut(|spaces, models| { - let Some(space) = spaces.remove(space_name.as_str()) else { + global + .state() + .namespace() + .ddl_with_all_mut(|spaces, models| { + let Some(space) = spaces.remove(space_name.as_str()) else { + if if_exists { + return Ok(Some(false)); + } else { + return Err(QueryError::QExecObjectNotFound); + } + }; + // commit drop + // prepare txn + let txn = + txn::gns::space::DropSpaceTxn::new(SpaceIDRef::new(&space_name, &space)); + // commit txn + global + .state() + .gns_driver() + .driver_context(|drv| drv.commit_event(txn), || {})?; + // request cleanup + global.taskmgr_post_standard_priority(Task::new( + GenericTask::delete_space_dir(&space_name, space.get_uuid()), + )); + let space_uuid = space.get_uuid(); + for model in space.models.into_iter() { + let e: EntityIDRef<'static> = unsafe { + // UNSAFE(@ohsayan): I want to try what the borrow checker has been trying + core::mem::transmute(EntityIDRef::new(space_name.as_str(), &model)) + }; + let mdl = models.st_delete_return(&e).unwrap(); + global.purge_model_driver( + &space_name, + space_uuid, + &model, + mdl.data().get_uuid(), + ); + } + let _ = spaces.st_delete(space_name.as_str()); if if_exists { - return Ok(Some(false)); + Ok(Some(true)) } else { - return Err(QueryError::QExecObjectNotFound); + Ok(None) } - }; - // commit drop - // prepare txn - let txn = txn::gns::space::DropSpaceTxn::new(SpaceIDRef::new(&space_name, &space)); - // commit txn - global - .gns_driver() - .lock() - .driver_context(|drv| drv.commit_event(txn), || {})?; - // request cleanup - global.taskmgr_post_standard_priority(Task::new(GenericTask::delete_space_dir( - &space_name, - space.get_uuid(), - ))); - let space_uuid = space.get_uuid(); - for model in space.models.into_iter() { - let e: EntityIDRef<'static> = unsafe { - // UNSAFE(@ohsayan): I want to try what the borrow checker has been trying - core::mem::transmute(EntityIDRef::new(space_name.as_str(), &model)) - }; - let mdl = models.st_delete_return(&e).unwrap(); - global.purge_model_driver( - &space_name, - space_uuid, - &model, - mdl.get_uuid(), - true, - ); - } - let _ = spaces.st_delete(space_name.as_str()); - if if_exists { - Ok(Some(true)) - } else { - Ok(None) - } - }) + }) } else { - global.state().ddl_with_spaces_write(|spaces| { + global.state().namespace().ddl_with_spaces_write(|spaces| { let Some(space) = spaces.get(space_name.as_str()) else { if if_exists { return Ok(Some(false)); @@ -298,8 +302,8 @@ impl Space { let txn = txn::gns::space::DropSpaceTxn::new(SpaceIDRef::new(&space_name, &space)); // commit txn global + .state() .gns_driver() - .lock() .driver_context(|drv| drv.commit_event(txn), || {})?; // request cleanup global.taskmgr_post_standard_priority(Task::new(GenericTask::delete_space_dir( diff --git a/server/src/engine/core/system_db.rs b/server/src/engine/core/system_db.rs index a96debe8..dd0c45d1 100644 --- a/server/src/engine/core/system_db.rs +++ b/server/src/engine/core/system_db.rs @@ -131,7 +131,7 @@ impl SystemDatabase { return Err(QueryError::SysAuthError); } let password_hash = rcrypt::hash(password, rcrypt::DEFAULT_COST).unwrap(); - global.gns_driver().lock().driver_context( + global.state().gns_driver().driver_context( |drv| drv.commit_event(CreateUserTxn::new(&username, &password_hash)), || {}, )?; @@ -147,7 +147,7 @@ impl SystemDatabase { match self.users.write().get_mut(username) { Some(user) => { let password_hash = rcrypt::hash(password, rcrypt::DEFAULT_COST).unwrap(); - global.gns_driver().lock().driver_context( + global.state().gns_driver().driver_context( |drv| drv.commit_event(AlterUserTxn::new(username, &password_hash)), || {}, )?; @@ -163,8 +163,8 @@ impl SystemDatabase { return Err(QueryError::SysAuthError); } global + .state() .gns_driver() - .lock() .driver_context(|drv| drv.commit_event(DropUserTxn::new(username)), || {})?; let _ = users.remove(username); Ok(()) diff --git a/server/src/engine/core/tests/ddl_model/alt.rs b/server/src/engine/core/tests/ddl_model/alt.rs index 83f0de73..f63e4dd7 100644 --- a/server/src/engine/core/tests/ddl_model/alt.rs +++ b/server/src/engine/core/tests/ddl_model/alt.rs @@ -26,7 +26,7 @@ use crate::engine::{ core::{ - model::{alt::AlterPlan, Model}, + model::{alt::AlterPlan, ModelData}, tests::ddl_model::{create, exec_create}, EntityIDRef, }, @@ -50,25 +50,26 @@ fn exec_plan( new_space: bool, model: &str, plan: &str, - f: impl Fn(&Model), + f: impl Fn(&ModelData), ) -> QueryResult<()> { let mdl_name = exec_create(global, model, new_space)?; let prev_uuid = { global .state() + .namespace() .idx_models() .read() .get(&EntityIDRef::new("myspace", &mdl_name)) - .map(|mdl| mdl.get_uuid()) + .map(|mdl| mdl.data().get_uuid()) .unwrap() }; let tok = lex_insecure(plan.as_bytes()).unwrap(); let alter = parse_ast_node_full::(&tok[2..]).unwrap(); - Model::transactional_exec_alter(global, alter)?; - let models = global.state().idx_models().read(); + ModelData::transactional_exec_alter(global, alter)?; + let models = global.state().namespace().idx_models().read(); let model = models.get(&EntityIDRef::new("myspace", &mdl_name)).unwrap(); - assert_eq!(prev_uuid, model.get_uuid()); - f(model); + assert_eq!(prev_uuid, model.data().get_uuid()); + f(model.data()); Ok(()) } diff --git a/server/src/engine/core/tests/ddl_model/mod.rs b/server/src/engine/core/tests/ddl_model/mod.rs index 2259f2bf..42877a07 100644 --- a/server/src/engine/core/tests/ddl_model/mod.rs +++ b/server/src/engine/core/tests/ddl_model/mod.rs @@ -29,16 +29,16 @@ mod crt; mod layer; use crate::engine::{ - core::{model::Model, EntityIDRef}, + core::{model::ModelData, EntityIDRef}, error::QueryResult, fractal::GlobalInstanceLike, ql::{ast::parse_ast_node_full, ddl::crt::CreateModel, tests::lex_insecure}, }; -fn create(s: &str) -> QueryResult { +fn create(s: &str) -> QueryResult { let tok = lex_insecure(s.as_bytes()).unwrap(); let create_model = parse_ast_node_full(&tok[2..]).unwrap(); - Model::process_create(create_model) + ModelData::process_create(create_model) } pub fn exec_create( @@ -52,9 +52,10 @@ pub fn exec_create( if create_new_space { global .state() + .namespace() .create_empty_test_space(create_model.model_name.space()) } - Model::transactional_exec_create(global, create_model).map(|_| name) + ModelData::transactional_exec_create(global, create_model).map(|_| name) } pub fn exec_create_new_space( @@ -68,9 +69,9 @@ fn with_model( global: &impl GlobalInstanceLike, space_id: &str, model_name: &str, - f: impl Fn(&Model), + f: impl Fn(&ModelData), ) { - let models = global.state().idx_models().read(); + let models = global.state().namespace().idx_models().read(); let model = models.get(&EntityIDRef::new(space_id, model_name)).unwrap(); - f(model) + f(model.data()) } diff --git a/server/src/engine/core/tests/ddl_space/mod.rs b/server/src/engine/core/tests/ddl_space/mod.rs index 792caa0a..501fce23 100644 --- a/server/src/engine/core/tests/ddl_space/mod.rs +++ b/server/src/engine/core/tests/ddl_space/mod.rs @@ -48,7 +48,7 @@ pub fn exec_create( ast::parse_ast_node_full::(&tok[2..]).unwrap(); let name = ast_node.space_name; Space::transactional_exec_create(gns, ast_node)?; - gns.state().ddl_with_space_mut(&name, |space| { + gns.state().namespace().ddl_with_space_mut(&name, |space| { verify(space); Ok(space.get_uuid()) }) @@ -64,7 +64,7 @@ pub fn exec_alter( ast::parse_ast_node_full::(&tok[2..]).unwrap(); let name = ast_node.space_name; Space::transactional_exec_alter(gns, ast_node)?; - gns.state().ddl_with_space_mut(&name, |space| { + gns.state().namespace().ddl_with_space_mut(&name, |space| { verify(space); Ok(space.get_uuid()) }) diff --git a/server/src/engine/core/tests/dml/mod.rs b/server/src/engine/core/tests/dml/mod.rs index 23b04586..192a978c 100644 --- a/server/src/engine/core/tests/dml/mod.rs +++ b/server/src/engine/core/tests/dml/mod.rs @@ -30,7 +30,7 @@ mod select; mod update; use crate::engine::{ - core::{dml, index::Row, model::Model, space::Space, EntityIDRef}, + core::{dml, index::Row, model::ModelData, space::Space, EntityIDRef}, data::{cell::Datacell, lit::Lit}, error::QueryResult, fractal::GlobalInstanceLike, @@ -45,12 +45,13 @@ use crate::engine::{ fn _exec_only_create_space_model(global: &impl GlobalInstanceLike, model: &str) -> QueryResult<()> { let _ = global .state() + .namespace() .idx() .write() .insert("myspace".into(), Space::new_auto_all().into()); let lex_create_model = lex_insecure(model.as_bytes()).unwrap(); let stmt_create_model = parse_ast_node_full(&lex_create_model[2..]).unwrap(); - Model::transactional_exec_create(global, stmt_create_model).map(|_| ()) + ModelData::transactional_exec_create(global, stmt_create_model).map(|_| ()) } fn _exec_only_insert( @@ -73,7 +74,7 @@ fn _exec_only_read_key_and_then( and_then: impl Fn(Row) -> T, ) -> QueryResult { let guard = sync::atm::cpin(); - global.state().with_model(entity, |mdl| { + global.state().namespace().with_model(entity, |mdl| { let row = mdl .primary_index() .select(Lit::from(key_name), &guard) @@ -90,7 +91,7 @@ fn _exec_delete_only(global: &impl GlobalInstanceLike, delete: &str, key: &str) let entity = delete.entity(); dml::delete(global, delete)?; assert_eq!( - global.state().with_model(entity, |model| { + global.state().namespace().with_model(entity, |model| { let g = sync::atm::cpin(); Ok(model.primary_index().select(key.into(), &g).is_none()) }), diff --git a/server/src/engine/core/tests/dml/select.rs b/server/src/engine/core/tests/dml/select.rs index 5a120aa2..bad94777 100644 --- a/server/src/engine/core/tests/dml/select.rs +++ b/server/src/engine/core/tests/dml/select.rs @@ -46,7 +46,9 @@ fn simple_select_wildcard() { #[test] fn simple_select_specified_same_order() { - let global = TestGlobal::new_with_driver_id_instant_update("dml_select_simple_select_specified_same_order"); + let global = TestGlobal::new_with_driver_id_instant_update( + "dml_select_simple_select_specified_same_order", + ); assert_eq!( super::exec_select( &global, @@ -61,8 +63,9 @@ fn simple_select_specified_same_order() { #[test] fn simple_select_specified_reversed_order() { - let global = - TestGlobal::new_with_driver_id_instant_update("dml_select_simple_select_specified_reversed_order"); + let global = TestGlobal::new_with_driver_id_instant_update( + "dml_select_simple_select_specified_reversed_order", + ); assert_eq!( super::exec_select( &global, diff --git a/server/src/engine/fractal/drivers.rs b/server/src/engine/fractal/drivers.rs index d4218cd0..eb38ba44 100644 --- a/server/src/engine/fractal/drivers.rs +++ b/server/src/engine/fractal/drivers.rs @@ -25,7 +25,7 @@ */ use { - super::{util, ModelUniqueID}, + super::util, crate::{ engine::{ error::{QueryError, QueryResult, RuntimeResult}, @@ -33,32 +33,33 @@ use { }, util::compiler, }, - parking_lot::{Mutex, RwLock}, - std::{collections::HashMap, sync::Arc}, + parking_lot::Mutex, }; /// GNS driver +#[derive(Debug)] pub struct FractalGNSDriver { status: util::Status, - pub(super) txn_driver: GNSDriver, + pub(super) txn_driver: Mutex, } impl FractalGNSDriver { - pub(super) fn new(txn_driver: GNSDriver) -> Self { + pub fn new(txn_driver: GNSDriver) -> Self { Self { status: util::Status::new_okay(), - txn_driver: txn_driver, + txn_driver: Mutex::new(txn_driver), } } pub fn driver_context( - &mut self, + &self, f: impl Fn(&mut GNSDriver) -> RuntimeResult, on_failure: impl Fn(), ) -> QueryResult { if self.status.is_iffy() { return Err(QueryError::SysServerError); } - match f(&mut self.txn_driver) { + let mut txn_driver = self.txn_driver.lock(); + match f(&mut txn_driver) { Ok(v) => Ok(v), Err(e) => compiler::cold_call(|| { error!("GNS driver failed with: {e}"); @@ -70,58 +71,43 @@ impl FractalGNSDriver { } } -pub struct ModelDrivers { - drivers: RwLock>, +/// Model driver +#[derive(Debug)] +#[must_use] +pub struct FractalModelDriver { + status: util::Status, + batch_driver: Mutex>, } -impl ModelDrivers { - pub fn empty() -> Self { +impl FractalModelDriver { + pub const fn uninitialized() -> Self { Self { - drivers: RwLock::new(HashMap::new()), + status: util::Status::new_okay(), + batch_driver: Mutex::new(None), } } - pub fn drivers(&self) -> &RwLock> { - &self.drivers - } - pub fn count(&self) -> usize { - self.drivers.read().len() - } - pub fn add_driver(&self, id: ModelUniqueID, batch_driver: ModelDriver) { - assert!(self - .drivers - .write() - .insert(id, FractalModelDriver::init(batch_driver)) - .is_none()); - } - pub fn remove_driver(&self, id: ModelUniqueID) { - assert!(self.drivers.write().remove(&id).is_some()) - } - pub fn into_inner(self) -> HashMap { - self.drivers.into_inner() + pub fn initialize_model_driver(&self, driver: ModelDriver) { + let mut drv = self.batch_driver.lock(); + if drv.is_none() { + *drv = Some(driver); + } else { + panic!("driver already initialized") + } } -} - -/// Model driver -pub struct FractalModelDriver { - status: Arc, - batch_driver: Mutex, -} - -impl FractalModelDriver { pub(in crate::engine::fractal) fn init(batch_driver: ModelDriver) -> Self { Self { - status: Arc::new(util::Status::new_okay()), - batch_driver: Mutex::new(batch_driver), + status: util::Status::new_okay(), + batch_driver: Mutex::new(Some(batch_driver)), } } pub fn status(&self) -> &util::Status { &self.status } /// Returns a reference to the batch persist driver - pub fn batch_driver(&self) -> &Mutex { + pub fn batch_driver(&self) -> &Mutex> { &self.batch_driver } pub fn close(self) -> RuntimeResult<()> { - ModelDriver::close_driver(&mut self.batch_driver.into_inner()) + ModelDriver::close_driver(&mut self.batch_driver.into_inner().unwrap()) } } diff --git a/server/src/engine/fractal/mgr.rs b/server/src/engine/fractal/mgr.rs index 502a208c..6d5f8c91 100644 --- a/server/src/engine/fractal/mgr.rs +++ b/server/src/engine/fractal/mgr.rs @@ -29,11 +29,12 @@ use { crate::{ engine::{ core::{ - model::{delta::DataDelta, Model}, + model::{delta::DataDelta, ModelData}, EntityIDRef, }, data::uuid::Uuid, error::ErrorKind, + fractal::GlobalInstanceLike, storage::{ safe_interfaces::{paths_v1, StdModelBatch}, BatchStats, @@ -289,20 +290,13 @@ impl FractalMgr { match task { CriticalTask::WriteBatch(model_id, observed_size) => { info!("fhp: {model_id} has reached cache capacity. writing to disk"); - let mdl_drivers = global.get_state().get_mdl_drivers().drivers().read(); - let Some(mdl_driver) = mdl_drivers.get(&model_id) else { - // because we maximize throughput, the model driver may have been already removed but this task - // was way behind in the queue - return; - }; - let mdl_read = global._namespace().idx_models().read(); + let mdl_read = global.state().namespace().idx_models().read(); let mdl = match mdl_read.get(&EntityIDRef::new( model_id.space().into(), model_id.model().into(), )) { - Some(mdl) if mdl.get_uuid() != model_id.uuid() => { - // so the model driver was not removed, neither was the model *yet* but we happened to find the task - // just return + Some(mdl) if mdl.data().get_uuid() != model_id.uuid() => { + // this is a different model with the same entity path return; } Some(mdl) => mdl, @@ -310,7 +304,7 @@ impl FractalMgr { panic!("found deleted model") } }; - match Self::try_write_model_data_batch(mdl, observed_size, mdl_driver) { + match Self::try_write_model_data_batch(mdl.data(), observed_size, mdl.driver()) { Ok(()) => { if observed_size != 0 { info!("fhp: completed maintenance task for {model_id}, synced={observed_size}") @@ -391,40 +385,34 @@ impl FractalMgr { } } fn general_executor(&'static self, global: super::Global) { - let mdl_drivers = global.get_state().get_mdl_drivers().drivers().read(); - for (model_id, driver) in mdl_drivers.iter() { - let mdl_read = global._namespace().idx_models().read(); - let mdl = match mdl_read.get(&EntityIDRef::new( - model_id.space().into(), - model_id.model().into(), - )) { - Some(mdl) if mdl.get_uuid() != model_id.uuid() => { - // so the model driver was not removed, neither was the model *yet* but we happened to find the task - // just return - return; - } - Some(mdl) => mdl, - None => { - panic!("found deleted model") - } - }; - let observed_len = mdl + for (model_id, model) in global.state().namespace().idx_models().read().iter() { + let observed_len = model + .data() .delta_state() .__fractal_take_full_from_data_delta(super::FractalToken::new()); - match Self::try_write_model_data_batch(mdl, observed_len, driver) { + match Self::try_write_model_data_batch(model.data(), observed_len, model.driver()) { Ok(()) => { if observed_len != 0 { info!( - "flp: completed maintenance task for {model_id}, synced={observed_len}" + "flp: completed maintenance task for {}.{}, synced={observed_len}", + model_id.space(), + model_id.entity() ) } } Err((e, stats)) => { - info!("flp: failed to sync data for {model_id} with {e}. promoting to higher priority"); + info!( + "flp: failed to sync data for {}.{} with erro `{e}`. promoting to higher priority", + model_id.space(), model_id.entity(), + ); // this failure is *not* good, so we want to promote this to a critical task self.hp_dispatcher .send(Task::new(CriticalTask::WriteBatch( - model_id.clone(), + ModelUniqueID::new( + model_id.space(), + model_id.entity(), + model.data().get_uuid(), + ), observed_len - stats.get_actual(), ))) .unwrap() @@ -440,11 +428,11 @@ impl FractalMgr { /// /// The zero check is essential fn try_write_model_data_batch( - model: &Model, + model: &ModelData, observed_size: usize, - mdl_driver: &super::drivers::FractalModelDriver, + mdl_driver_: &super::drivers::FractalModelDriver, ) -> Result<(), (super::error::Error, BatchStats)> { - if mdl_driver.status().is_iffy() { + if mdl_driver_.status().is_iffy() { // don't mess this up any further return Err(( super::error::Error::from(ErrorKind::Other( @@ -459,14 +447,15 @@ impl FractalMgr { } // try flushing the batch let batch_stats = BatchStats::new(); - let mut batch_driver = mdl_driver.batch_driver().lock(); + let mut mdl_driver = mdl_driver_.batch_driver().lock(); + let batch_driver = mdl_driver.as_mut().unwrap(); batch_driver .commit_with_ctx( StdModelBatch::new(model, observed_size), batch_stats.clone(), ) .map_err(|e| { - mdl_driver.status().set_iffy(); + mdl_driver_.status().set_iffy(); (e, BatchStats::into_inner(batch_stats)) }) } diff --git a/server/src/engine/fractal/mod.rs b/server/src/engine/fractal/mod.rs index 46c874e7..17f1fb13 100644 --- a/server/src/engine/fractal/mod.rs +++ b/server/src/engine/fractal/mod.rs @@ -26,7 +26,7 @@ use { super::{ - core::{dml::QueryExecMeta, model::Model, GlobalNS}, + core::{dml::QueryExecMeta, model::ModelData, GlobalNS}, data::uuid::Uuid, storage::{ safe_interfaces::{paths_v1, FileSystem}, @@ -34,7 +34,6 @@ use { }, }, crate::engine::error::RuntimeResult, - parking_lot::Mutex, std::{fmt, mem::MaybeUninit}, tokio::sync::mpsc::unbounded_channel, }; @@ -47,7 +46,7 @@ mod mgr; pub mod test_utils; mod util; pub use { - drivers::ModelDrivers, + drivers::{FractalGNSDriver, FractalModelDriver}, mgr::{CriticalTask, GenericTask, Task, GENERAL_EXECUTOR_WINDOW}, util::FractalToken, }; @@ -68,19 +67,12 @@ pub struct GlobalStateStart { /// ## Safety /// /// Must be called iff this is the only thread calling it -pub unsafe fn load_and_enable_all( - gns: GlobalNS, - gns_driver: GNSDriver, - model_drivers: ModelDrivers, -) -> GlobalStateStart { - let model_cnt_on_boot = model_drivers.count(); - let gns_driver = drivers::FractalGNSDriver::new(gns_driver); +pub unsafe fn load_and_enable_all(gns: GlobalNS) -> GlobalStateStart { + let model_cnt_on_boot = gns.namespace().idx_models().read().len(); let (hp_sender, hp_recv) = unbounded_channel(); let (lp_sender, lp_recv) = unbounded_channel(); let global_state = GlobalState::new( gns, - gns_driver, - model_drivers, mgr::FractalMgr::new(hp_sender, lp_sender, model_cnt_on_boot), ); *Global::__gref_raw() = MaybeUninit::new(global_state); @@ -101,7 +93,11 @@ pub trait GlobalInstanceLike { fn get_max_delta_size(&self) -> usize; // global namespace fn state(&self) -> &GlobalNS; - fn gns_driver(&self) -> &Mutex; + fn initialize_space(&self, space_name: &str, space_uuid: Uuid) -> RuntimeResult<()> { + e!(FileSystem::create_dir_all(&paths_v1::space_dir( + space_name, space_uuid + ))) + } // model drivers fn initialize_model_driver( &self, @@ -109,14 +105,13 @@ pub trait GlobalInstanceLike { space_uuid: Uuid, model_name: &str, model_uuid: Uuid, - ) -> RuntimeResult<()>; + ) -> RuntimeResult; fn purge_model_driver( &self, space_name: &str, space_uuid: Uuid, model_name: &str, model_uuid: Uuid, - skip_delete: bool, ); // taskmgr fn taskmgr_post_high_priority(&self, task: Task); @@ -126,7 +121,7 @@ pub trait GlobalInstanceLike { &self, space_name: &str, model_name: &str, - model: &Model, + model: &ModelData, hint: QueryExecMeta, ) { // check if we need to sync @@ -150,9 +145,6 @@ impl GlobalInstanceLike for Global { fn state(&self) -> &GlobalNS { self._namespace() } - fn gns_driver(&self) -> &Mutex { - &self.get_state().gns_driver - } // taskmgr fn taskmgr_post_high_priority(&self, task: Task) { self._post_high_priority_task(task) @@ -171,15 +163,10 @@ impl GlobalInstanceLike for Global { space_uuid: Uuid, model_name: &str, model_uuid: Uuid, - skip_delete: bool, ) { - let id = ModelUniqueID::new(space_name, model_name, model_uuid); - self.get_state().mdl_driver.remove_driver(id); - if !skip_delete { - self.taskmgr_post_standard_priority(Task::new(GenericTask::delete_model_dir( - space_name, space_uuid, model_name, model_uuid, - ))); - } + self.taskmgr_post_standard_priority(Task::new(GenericTask::delete_model_dir( + space_name, space_uuid, model_name, model_uuid, + ))); } fn initialize_model_driver( &self, @@ -187,7 +174,7 @@ impl GlobalInstanceLike for Global { space_uuid: Uuid, model_name: &str, model_uuid: Uuid, - ) -> RuntimeResult<()> { + ) -> RuntimeResult { // create dir FileSystem::create_dir(&paths_v1::model_dir( space_name, space_uuid, model_name, model_uuid, @@ -196,11 +183,7 @@ impl GlobalInstanceLike for Global { let driver = ModelDriver::create_model_driver(&paths_v1::model_path( space_name, space_uuid, model_name, model_uuid, ))?; - self.get_state().mdl_driver.add_driver( - ModelUniqueID::new(space_name, model_name, model_uuid), - driver, - ); - Ok(()) + Ok(FractalModelDriver::init(driver)) } } @@ -247,16 +230,17 @@ impl Global { } pub unsafe fn unload_all(self) { // TODO(@ohsayan): handle errors - let GlobalState { - gns_driver, - mdl_driver, - .. - } = Self::__gref_raw().assume_init_read(); - let mut gns_driver = gns_driver.into_inner().txn_driver; - let mdl_drivers = mdl_driver.into_inner(); + let GlobalState { gns, .. } = Self::__gref_raw().assume_init_read(); + let mut gns_driver = gns.gns_driver().txn_driver.lock(); GNSDriver::close_driver(&mut gns_driver).unwrap(); - for (_, driver) in mdl_drivers { - driver.close().unwrap(); + for mdl in gns + .namespace() + .idx_models() + .write() + .drain() + .map(|(_, mdl)| mdl) + { + mdl.into_driver().close().unwrap(); } } } @@ -268,27 +252,12 @@ impl Global { /// The global state struct GlobalState { gns: GlobalNS, - gns_driver: Mutex, - mdl_driver: ModelDrivers, task_mgr: mgr::FractalMgr, } impl GlobalState { - fn new( - gns: GlobalNS, - gns_driver: drivers::FractalGNSDriver, - mdl_driver: ModelDrivers, - task_mgr: mgr::FractalMgr, - ) -> Self { - Self { - gns, - gns_driver: Mutex::new(gns_driver), - mdl_driver, - task_mgr, - } - } - pub(self) fn get_mdl_drivers(&self) -> &ModelDrivers { - &self.mdl_driver + fn new(gns: GlobalNS, task_mgr: mgr::FractalMgr) -> Self { + Self { gns, task_mgr } } pub(self) fn fractal_mgr(&self) -> &mgr::FractalMgr { &self.task_mgr diff --git a/server/src/engine/fractal/test_utils.rs b/server/src/engine/fractal/test_utils.rs index 63fd4c99..1d38d3bf 100644 --- a/server/src/engine/fractal/test_utils.rs +++ b/server/src/engine/fractal/test_utils.rs @@ -26,22 +26,20 @@ use { super::{ - drivers::FractalGNSDriver, CriticalTask, GenericTask, GlobalInstanceLike, ModelUniqueID, - Task, + drivers::FractalGNSDriver, CriticalTask, FractalModelDriver, GenericTask, + GlobalInstanceLike, Task, }, crate::engine::{ - core::{EntityIDRef, GlobalNS}, + core::{EntityIDRef, GNSData, GlobalNS}, data::uuid::Uuid, error::ErrorKind, - fractal::drivers::FractalModelDriver, storage::{ safe_interfaces::{paths_v1, FileSystem, StdModelBatch}, BatchStats, GNSDriver, ModelDriver, }, RuntimeResult, }, - parking_lot::{Mutex, RwLock}, - std::collections::HashMap, + parking_lot::RwLock, }; /// A `test` mode global implementation @@ -50,19 +48,15 @@ pub struct TestGlobal { lp_queue: RwLock>>, #[allow(unused)] max_delta_size: usize, - txn_driver: Mutex, - model_drivers: RwLock>, max_data_pressure: usize, } impl TestGlobal { - fn new(gns: GlobalNS, max_delta_size: usize, txn_driver: GNSDriver) -> Self { + fn new(gns: GlobalNS, max_delta_size: usize) -> Self { Self { gns, lp_queue: RwLock::default(), max_delta_size, - txn_driver: Mutex::new(FractalGNSDriver::new(txn_driver)), - model_drivers: RwLock::default(), max_data_pressure: usize::MAX, } } @@ -72,25 +66,20 @@ impl TestGlobal { /// Normally, model drivers are not loaded on startup because of shared global state. Calling this will attempt to load /// all model drivers pub fn load_model_drivers(&self) -> RuntimeResult<()> { - let mut mdl_drivers = self.model_drivers.write(); - let space_idx = self.gns.idx().read(); - for (model_name, model) in self.gns.idx_models().read().iter() { + let space_idx = self.gns.namespace().idx().read(); + for (model_name, model) in self.gns.namespace().idx_models().read().iter() { + let model_data = model.data(); let space_uuid = space_idx.get(model_name.space()).unwrap().get_uuid(); let driver = ModelDriver::open_model_driver( - model, + model_data, &paths_v1::model_path( model_name.space(), space_uuid, model_name.entity(), - model.get_uuid(), + model_data.get_uuid(), ), )?; - assert!(mdl_drivers - .insert( - ModelUniqueID::new(model_name.space(), model_name.entity(), model.get_uuid()), - FractalModelDriver::init(driver) - ) - .is_none()); + model.driver().initialize_model_driver(driver); } Ok(()) } @@ -103,13 +92,13 @@ impl TestGlobal { me } pub fn new_with_driver_id(log_name: &str) -> Self { - let gns = GlobalNS::empty(); + let data = GNSData::empty(); let driver = match GNSDriver::create_gns_with_name(log_name) { Ok(drv) => Ok(drv), Err(e) => match e.kind() { ErrorKind::IoError(e_) => match e_.kind() { std::io::ErrorKind::AlreadyExists => { - GNSDriver::open_gns_with_name(log_name, &gns) + GNSDriver::open_gns_with_name(log_name, &data) } _ => Err(e), }, @@ -117,7 +106,7 @@ impl TestGlobal { }, } .unwrap(); - Self::new(gns, 0, driver) + Self::new(GlobalNS::new(data, FractalGNSDriver::new(driver)), 0) } } @@ -125,24 +114,19 @@ impl GlobalInstanceLike for TestGlobal { fn state(&self) -> &GlobalNS { &self.gns } - fn gns_driver(&self) -> &Mutex { - &self.txn_driver - } fn taskmgr_post_high_priority(&self, task: Task) { match task.into_task() { CriticalTask::WriteBatch(mdl_id, count) => { - let models = self.gns.idx_models().read(); + let models = self.gns.namespace().idx_models().read(); let mdl = models .get(&EntityIDRef::new(mdl_id.space(), mdl_id.model())) .unwrap(); - self.model_drivers - .read() - .get(&mdl_id) + let mut mdl_driver = mdl.driver().batch_driver().lock(); + mdl_driver + .as_mut() + .unwrap() + .commit_with_ctx(StdModelBatch::new(mdl.data(), count), BatchStats::new()) .unwrap() - .batch_driver() - .lock() - .commit_with_ctx(StdModelBatch::new(mdl, count), BatchStats::new()) - .unwrap(); } } } @@ -158,18 +142,10 @@ impl GlobalInstanceLike for TestGlobal { space_uuid: Uuid, model_name: &str, model_uuid: Uuid, - skip_delete: bool, ) { - let id = ModelUniqueID::new(space_name, model_name, model_uuid); - self.model_drivers - .write() - .remove(&id) - .expect("tried to remove non-existent model"); - if !skip_delete { - self.taskmgr_post_standard_priority(Task::new(GenericTask::delete_model_dir( - space_name, space_uuid, model_name, model_uuid, - ))); - } + self.taskmgr_post_standard_priority(Task::new(GenericTask::delete_model_dir( + space_name, space_uuid, model_name, model_uuid, + ))); } fn initialize_model_driver( &self, @@ -177,7 +153,7 @@ impl GlobalInstanceLike for TestGlobal { space_uuid: Uuid, model_name: &str, model_uuid: Uuid, - ) -> crate::engine::error::RuntimeResult<()> { + ) -> crate::engine::error::RuntimeResult { // create model dir FileSystem::create_dir_all(&paths_v1::model_dir( space_name, space_uuid, model_name, model_uuid, @@ -185,20 +161,16 @@ impl GlobalInstanceLike for TestGlobal { let driver = ModelDriver::create_model_driver(&paths_v1::model_path( space_name, space_uuid, model_name, model_uuid, ))?; - self.model_drivers.write().insert( - ModelUniqueID::new(space_name, model_name, model_uuid), - super::drivers::FractalModelDriver::init(driver), - ); - Ok(()) + Ok(super::drivers::FractalModelDriver::init(driver)) } } impl Drop for TestGlobal { fn drop(&mut self) { - let mut txn_driver = self.txn_driver.lock(); - GNSDriver::close_driver(&mut txn_driver.txn_driver).unwrap(); - for (_, model_driver) in self.model_drivers.write().drain() { - model_driver.close().unwrap(); + let mut txn_driver = self.gns.gns_driver().txn_driver.lock(); + GNSDriver::close_driver(&mut txn_driver).unwrap(); + for (_, model_driver) in self.gns.namespace().idx_models().write().drain() { + model_driver.into_driver().close().unwrap(); } } } diff --git a/server/src/engine/mod.rs b/server/src/engine/mod.rs index 6a2b2cc3..101ee060 100644 --- a/server/src/engine/mod.rs +++ b/server/src/engine/mod.rs @@ -71,15 +71,11 @@ pub fn load_all( } info!("starting storage engine"); context::set_origin(Subsystem::Storage); - let SELoaded { - gns, - gns_driver, - model_drivers, - } = storage::load(&config)?; + let SELoaded { gns } = storage::load(&config)?; info!("storage engine ready. initializing system"); let global = unsafe { // UNSAFE(@ohsayan): the only call we ever make - fractal::load_and_enable_all(gns, gns_driver, model_drivers) + fractal::load_and_enable_all(gns) }; Ok((config, global)) } diff --git a/server/src/engine/net/protocol/mod.rs b/server/src/engine/net/protocol/mod.rs index f784f87b..c1745eae 100644 --- a/server/src/engine/net/protocol/mod.rs +++ b/server/src/engine/net/protocol/mod.rs @@ -293,6 +293,7 @@ async fn do_handshake( Ok(uname) => { match global .state() + .namespace() .sys_db() .verify_user(uname, handshake.hs_auth().password()) { diff --git a/server/src/engine/storage/common/interface/fs.rs b/server/src/engine/storage/common/interface/fs.rs index 07ce3622..952c9151 100644 --- a/server/src/engine/storage/common/interface/fs.rs +++ b/server/src/engine/storage/common/interface/fs.rs @@ -30,15 +30,13 @@ file system */ -use std::io::BufWriter; - #[cfg(test)] use super::vfs::{VFileDescriptor, VirtualFS}; use { crate::IoResult, std::{ fs as std_fs, - io::{BufReader, Error, ErrorKind, Read, Seek, SeekFrom, Write}, + io::{BufReader, BufWriter, Error, ErrorKind, Read, Seek, SeekFrom, Write}, }, }; @@ -380,11 +378,13 @@ impl FileExt for AnyFile { */ #[cfg(test)] +#[derive(Debug)] enum AnyFile { Local(Lf), Virtual(VFileDescriptor), } +#[derive(Debug)] pub struct File { #[cfg(test)] f: AnyFile, diff --git a/server/src/engine/storage/common/interface/vfs.rs b/server/src/engine/storage/common/interface/vfs.rs index d1f84f42..393c1757 100644 --- a/server/src/engine/storage/common/interface/vfs.rs +++ b/server/src/engine/storage/common/interface/vfs.rs @@ -24,8 +24,6 @@ * */ -#![allow(dead_code)] - use { crate::{engine::sync::cell::Lazy, IoResult}, parking_lot::RwLock, @@ -83,6 +81,7 @@ pub enum FileOpen { Existing(EF), } +#[derive(Debug)] pub struct VFileDescriptor(pub(super) Box); impl VFileDescriptor { diff --git a/server/src/engine/storage/common/sdss/impls/sdss_r1/rw.rs b/server/src/engine/storage/common/sdss/impls/sdss_r1/rw.rs index 72bec469..34f92fba 100644 --- a/server/src/engine/storage/common/sdss/impls/sdss_r1/rw.rs +++ b/server/src/engine/storage/common/sdss/impls/sdss_r1/rw.rs @@ -40,6 +40,7 @@ use { util::os::SysIOError, IoResult, }, + core::fmt, std::mem, }; @@ -287,13 +288,12 @@ impl TrackedReader { /// interface. It tracks the cursor, automatically buffers writes and in case of buffer flush failure, /// provides methods to robustly handle errors, down to byte-level cursor tracking in case of failure. pub struct TrackedWriter< - F, S: FileSpecV1, const SIZE: usize = 8192, const PANIC_IF_UNFLUSHED: bool = true, const CHECKSUM_WRITTEN_IF_BLOCK_ERROR: bool = true, > { - f_d: F, + f_d: File, f_md: S::Metadata, t_cursor: u64, t_checksum: SCrc64, @@ -302,12 +302,32 @@ pub struct TrackedWriter< } impl< - F, S: FileSpecV1, const SIZE: usize, const PANIC_IF_UNFLUSHED: bool, const CHECKSUM_WRITTEN_IF_BLOCK_ERROR: bool, - > TrackedWriter + > fmt::Debug for TrackedWriter +where + S::Metadata: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("TrackedWriter") + .field("f_d", &self.f_d) + .field("f_md", &self.f_md) + .field("t_cursor", &self.t_cursor) + .field("t_checksum", &self.t_checksum) + .field("t_partial_checksum", &self.t_partial_checksum) + .field("buf", &self.buf) + .finish() + } +} + +impl< + S: FileSpecV1, + const SIZE: usize, + const PANIC_IF_UNFLUSHED: bool, + const CHECKSUM_WRITTEN_IF_BLOCK_ERROR: bool, + > TrackedWriter { fn available_capacity(&self) -> usize { self.buf.remaining_capacity() @@ -315,14 +335,13 @@ impl< } impl< - F, S: FileSpecV1, const SIZE: usize, const PANIC_IF_UNFLUSHED: bool, const CHECKSUM_WRITTEN_IF_BLOCK_ERROR: bool, - > TrackedWriter + > TrackedWriter { - fn _new(f_d: F, f_md: S::Metadata, t_cursor: u64, t_checksum: SCrc64) -> Self { + fn _new(f_d: File, f_md: S::Metadata, t_cursor: u64, t_checksum: SCrc64) -> Self { Self { f_d, f_md, @@ -349,29 +368,27 @@ impl< } impl< - F: FileExt, S: FileSpecV1, const SIZE: usize, const PANIC_IF_UNFLUSHED: bool, const CHECKSUM_WRITTEN_IF_BLOCK_ERROR: bool, - > TrackedWriter + > TrackedWriter { /// Create a new tracked writer /// /// NB: The cursor is fetched. If the cursor is already available, use [`Self::with_cursor`] pub fn new( - mut f: SdssFile, - ) -> IoResult> - { + mut f: SdssFile, + ) -> IoResult> { f.file_cursor().map(|v| TrackedWriter::with_cursor(f, v)) } /// Create a new tracked writer with the provided cursor - pub fn with_cursor(f: SdssFile, c: u64) -> Self { + pub fn with_cursor(f: SdssFile, c: u64) -> Self { Self::with_cursor_and_checksum(f, c, SCrc64::new()) } /// Create a new tracked writer with the provided checksum and cursor pub fn with_cursor_and_checksum( - SdssFile { file, meta }: SdssFile, + SdssFile { file, meta }: SdssFile, c: u64, ck: SCrc64, ) -> Self { @@ -383,12 +400,11 @@ impl< } impl< - F: FileWrite, S: FileSpecV1, const SIZE: usize, const PANIC_IF_UNFLUSHED: bool, const CHECKSUM_WRITTEN_IF_BLOCK_ERROR: bool, - > TrackedWriter + > TrackedWriter { /// Same as [`Self::tracked_write_through_buffer`], but the partial state is updated pub fn dtrack_write_through_buffer(&mut self, buf: &[u8]) -> IoResult<()> { @@ -471,10 +487,7 @@ impl< Ok(()) } /// Flush the buffer and then sync data and metadata - pub fn flush_sync(&mut self) -> IoResult<()> - where - F: FileWriteExt, - { + pub fn flush_sync(&mut self) -> IoResult<()> { self.flush_buf().and_then(|_| self.fsync()) } /// Flush the buffer @@ -502,21 +515,17 @@ impl< } } } - pub fn fsync(&mut self) -> IoResult<()> - where - F: FileWriteExt, - { + pub fn fsync(&mut self) -> IoResult<()> { self.f_d.fsync_all() } } impl< - F, S: FileSpecV1, const SIZE: usize, const PANIC_IF_UNFLUSHED: bool, const CHECKSUM_WRITTEN_IF_BLOCK_ERROR: bool, - > Drop for TrackedWriter + > Drop for TrackedWriter { fn drop(&mut self) { if PANIC_IF_UNFLUSHED && !self.buf.is_empty() { @@ -544,7 +553,7 @@ fn check_vfs_buffering() { }; closure! { // init writer - let mut twriter: TrackedWriter = + let mut twriter: TrackedWriter = TrackedWriter::new(SdssFile::create("myfile")?)?; assert_eq!(twriter.cursor_usize(), Header::SIZE); { diff --git a/server/src/engine/storage/common_encoding/r1/impls/gns/mod.rs b/server/src/engine/storage/common_encoding/r1/impls/gns/mod.rs index 59137550..bf501f42 100644 --- a/server/src/engine/storage/common_encoding/r1/impls/gns/mod.rs +++ b/server/src/engine/storage/common_encoding/r1/impls/gns/mod.rs @@ -27,7 +27,7 @@ use { crate::{ engine::{ - core::GlobalNS, + core::GNSData, data::uuid::Uuid, error::{RuntimeResult, StorageError}, mem::BufferedScanner, @@ -68,7 +68,7 @@ where fn encode_event(commit: Self, buf: &mut Vec) { r1::enc::full_into_buffer::(buf, commit) } - fn decode_apply(gns: &GlobalNS, data: Vec) -> RuntimeResult<()> { + fn decode_apply(gns: &GNSData, data: Vec) -> RuntimeResult<()> { let mut scanner = BufferedScanner::new(&data); Self::decode_and_update_global_state(&mut scanner, gns)?; if scanner.eof() { @@ -79,7 +79,7 @@ where } fn decode_and_update_global_state( scanner: &mut BufferedScanner, - gns: &GlobalNS, + gns: &GNSData, ) -> RuntimeResult<()> { Self::update_global_state(Self::decode(scanner)?, gns) } @@ -88,7 +88,7 @@ where r1::dec::full_from_scanner::(scanner).map_err(|e| e.into()) } /// Update the global state from the restored event - fn update_global_state(restore: Self::RestoreType, gns: &GlobalNS) -> RuntimeResult<()>; + fn update_global_state(restore: Self::RestoreType, gns: &GNSData) -> RuntimeResult<()>; } #[derive(Debug, PartialEq)] diff --git a/server/src/engine/storage/common_encoding/r1/impls/gns/model.rs b/server/src/engine/storage/common_encoding/r1/impls/gns/model.rs index 4c41ee2b..faafbaba 100644 --- a/server/src/engine/storage/common_encoding/r1/impls/gns/model.rs +++ b/server/src/engine/storage/common_encoding/r1/impls/gns/model.rs @@ -29,12 +29,13 @@ use { crate::{ engine::{ core::{ - model::{Field, Model}, + model::{Field, Model, ModelData}, space::Space, - EntityID, EntityIDRef, GlobalNS, + EntityID, EntityIDRef, GNSData, }, data::uuid::Uuid, error::{RuntimeResult, StorageError, TransactionError}, + fractal::FractalModelDriver, idx::{IndexSTSeqCns, STIndex, STIndexSeq}, mem::BufferedScanner, storage::common_encoding::r1::{self, map, obj, PersistObject}, @@ -126,7 +127,7 @@ impl<'a> PersistObject for ModelID<'a> { } fn with_space( - gns: &GlobalNS, + gns: &GNSData, space_id: &super::SpaceIDRes, f: impl FnOnce(&Space) -> RuntimeResult, ) -> RuntimeResult { @@ -141,7 +142,7 @@ fn with_space( } fn with_space_mut( - gns: &GlobalNS, + gns: &GNSData, space_id: &super::SpaceIDRes, mut f: impl FnMut(&mut Space) -> RuntimeResult, ) -> RuntimeResult { @@ -156,10 +157,10 @@ fn with_space_mut( } fn with_model_mut( - gns: &GlobalNS, + gns: &GNSData, space_id: &super::SpaceIDRes, model_id: &ModelIDRes, - f: impl FnOnce(&mut Model) -> RuntimeResult, + f: impl FnOnce(&mut ModelData) -> RuntimeResult, ) -> RuntimeResult { with_space(gns, space_id, |_| { let mut models = gns.idx_models().write(); @@ -167,11 +168,11 @@ fn with_model_mut( else { return Err(TransactionError::OnRestoreDataMissing.into()); }; - if model.get_uuid() != model_id.model_uuid { + if model.data().get_uuid() != model_id.model_uuid { // this should have been handled by an earlier transaction return Err(TransactionError::OnRestoreDataConflictMismatch.into()); } - f(model) + f(model.data_mut()) }) } @@ -184,7 +185,7 @@ fn with_model_mut( pub struct CreateModelTxnRestorePL { pub(super) space_id: super::SpaceIDRes, pub(super) model_name: Box, - pub(super) model: Model, + pub(super) model: ModelData, } pub struct CreateModelTxnMD { @@ -258,9 +259,9 @@ impl<'a> GNSEvent for CreateModelTxn<'a> { CreateModelTxnRestorePL { space_id, model_name, - model, + model: model_data, }: Self::RestoreType, - gns: &GlobalNS, + gns: &GNSData, ) -> RuntimeResult<()> { /* NOTE(@ohsayan): @@ -278,7 +279,10 @@ impl<'a> GNSEvent for CreateModelTxn<'a> { return Err(TransactionError::OnRestoreDataConflictAlreadyExists.into()); } if models - .insert(EntityID::new(&space_id.name, &model_name), model) + .insert( + EntityID::new(&space_id.name, &model_name), + Model::new(model_data, FractalModelDriver::uninitialized()), + ) .is_some() { return Err(TransactionError::OnRestoreDataConflictMismatch.into()); @@ -355,7 +359,7 @@ impl<'a> GNSEvent for AlterModelAddTxn<'a> { model_id, new_fields, }: Self::RestoreType, - gns: &GlobalNS, + gns: &GNSData, ) -> RuntimeResult<()> { with_model_mut(gns, &model_id.space_id, &model_id, |model| { let mut mutator = model.model_mutator(); @@ -445,7 +449,7 @@ impl<'a> GNSEvent for AlterModelRemoveTxn<'a> { model_id, removed_fields, }: Self::RestoreType, - gns: &GlobalNS, + gns: &GNSData, ) -> RuntimeResult<()> { with_model_mut(gns, &model_id.space_id, &model_id, |model| { let mut mutator = model.model_mutator(); @@ -526,7 +530,7 @@ impl<'a> GNSEvent for AlterModelUpdateTxn<'a> { model_id, updated_fields, }: Self::RestoreType, - gns: &GlobalNS, + gns: &GNSData, ) -> RuntimeResult<()> { with_model_mut(gns, &model_id.space_id, &model_id, |model| { let mut mutator = model.model_mutator(); @@ -585,7 +589,7 @@ impl<'a> GNSEvent for DropModelTxn<'a> { model_uuid, model_version: _, }: Self::RestoreType, - gns: &GlobalNS, + gns: &GNSData, ) -> RuntimeResult<()> { with_space_mut(gns, &space_id, |space| { let mut models = gns.idx_models().write(); @@ -596,7 +600,7 @@ impl<'a> GNSEvent for DropModelTxn<'a> { else { return Err(TransactionError::OnRestoreDataMissing.into()); }; - if removed_model.get_uuid() != model_uuid { + if removed_model.data().get_uuid() != model_uuid { return Err(TransactionError::OnRestoreDataConflictMismatch.into()); } Ok(()) diff --git a/server/src/engine/storage/common_encoding/r1/impls/gns/space.rs b/server/src/engine/storage/common_encoding/r1/impls/gns/space.rs index 71c5f469..3fe89492 100644 --- a/server/src/engine/storage/common_encoding/r1/impls/gns/space.rs +++ b/server/src/engine/storage/common_encoding/r1/impls/gns/space.rs @@ -28,7 +28,7 @@ use { super::GNSEvent, crate::{ engine::{ - core::{space::Space, EntityIDRef, GlobalNS}, + core::{space::Space, EntityIDRef, GNSData}, data::DictGeneric, error::{RuntimeResult, TransactionError}, idx::STIndex, @@ -102,7 +102,7 @@ impl<'a> GNSEvent for CreateSpaceTxn<'a> { type RestoreType = CreateSpaceTxnRestorePL; fn update_global_state( CreateSpaceTxnRestorePL { space_name, space }: CreateSpaceTxnRestorePL, - gns: &crate::engine::core::GlobalNS, + gns: &crate::engine::core::GNSData, ) -> RuntimeResult<()> { let mut spaces = gns.idx().write(); if spaces.st_insert(space_name, space.into()) { @@ -180,7 +180,7 @@ impl<'a> GNSEvent for AlterSpaceTxn<'a> { space_id, space_meta, }: Self::RestoreType, - gns: &crate::engine::core::GlobalNS, + gns: &crate::engine::core::GNSData, ) -> RuntimeResult<()> { let mut gns = gns.idx().write(); match gns.st_get_mut(&space_id.name) { @@ -229,7 +229,7 @@ impl<'a> GNSEvent for DropSpaceTxn<'a> { type RestoreType = super::SpaceIDRes; fn update_global_state( super::SpaceIDRes { uuid, name }: Self::RestoreType, - gns: &GlobalNS, + gns: &GNSData, ) -> RuntimeResult<()> { let mut wgns = gns.idx().write(); let mut wmodel = gns.idx_models().write(); diff --git a/server/src/engine/storage/common_encoding/r1/impls/gns/tests/full_chain.rs b/server/src/engine/storage/common_encoding/r1/impls/gns/tests/full_chain.rs index 589c7474..fd23d6d7 100644 --- a/server/src/engine/storage/common_encoding/r1/impls/gns/tests/full_chain.rs +++ b/server/src/engine/storage/common_encoding/r1/impls/gns/tests/full_chain.rs @@ -26,7 +26,7 @@ use crate::engine::{ core::{ - model::{Field, Layer, Model}, + model::{Field, Layer, ModelData}, space::Space, }, data::{cell::Datacell, tag::TagSelector, uuid::Uuid, DictEntryGeneric}, @@ -58,6 +58,7 @@ fn init_space(global: &impl GlobalInstanceLike, space_name: &str, env: &str) -> Space::transactional_exec_create(global, stmt).unwrap(); global .state() + .namespace() .idx() .read() .get(name.as_str()) @@ -76,7 +77,7 @@ fn create_space() { } multirun(|| { let global = TestGlobal::new_with_driver_id(log_name); - let spaces = global.state().idx().read(); + let spaces = global.state().namespace().idx().read(); let space = spaces.get("myspace").unwrap(); assert_eq!( &*space, @@ -106,7 +107,7 @@ fn alter_space() { } multirun(|| { let global = TestGlobal::new_with_driver_id(log_name); - let spaces = global.state().idx().read(); + let spaces = global.state().namespace().idx().read(); let space = spaces.get("myspace").unwrap(); assert_eq!( &*space, @@ -133,7 +134,13 @@ fn drop_space() { } multirun(|| { let global = TestGlobal::new_with_driver_id(log_name); - assert!(global.state().idx().read().get("myspace").is_none()); + assert!(global + .state() + .namespace() + .idx() + .read() + .get("myspace") + .is_none()); }) }) } @@ -148,9 +155,10 @@ fn init_model( let stmt = lex_insecure(query.as_bytes()).unwrap(); let stmt = parse_ast_node_full::(&stmt[2..]).unwrap(); let model_name = stmt.model_name; - Model::transactional_exec_create(global, stmt).unwrap(); + ModelData::transactional_exec_create(global, stmt).unwrap(); global .state() + .namespace() .with_model(model_name, |model| Ok(model.get_uuid())) .unwrap() } @@ -178,10 +186,11 @@ fn create_model() { let global = TestGlobal::new_with_driver_id(log_name); global .state() + .namespace() .with_model(("myspace", "mymodel").into(), |model| { assert_eq!( model, - &Model::new_restore( + &ModelData::new_restore( uuid_model, "username".into(), TagSelector::String.into_full(), @@ -210,12 +219,13 @@ fn alter_model_add() { ) .unwrap(); let stmt = parse_ast_node_full(&stmt[2..]).unwrap(); - Model::transactional_exec_alter(&global, stmt).unwrap(); + ModelData::transactional_exec_alter(&global, stmt).unwrap(); } multirun(|| { let global = TestGlobal::new_with_driver_id(log_name); global .state() + .namespace() .with_model(("myspace", "mymodel").into(), |model| { assert_eq!( model.fields().st_get("profile_pic").unwrap(), @@ -245,12 +255,13 @@ fn alter_model_remove() { ) .unwrap(); let stmt = parse_ast_node_full(&stmt[2..]).unwrap(); - Model::transactional_exec_alter(&global, stmt).unwrap(); + ModelData::transactional_exec_alter(&global, stmt).unwrap(); } multirun(|| { let global = TestGlobal::new_with_driver_id(log_name); global .state() + .namespace() .with_model(("myspace", "mymodel").into(), |model| { assert!(model.fields().st_get("has_secure_key").is_none()); assert!(model.fields().st_get("is_dumb").is_none()); @@ -277,12 +288,13 @@ fn alter_model_update() { lex_insecure(b"alter model myspace.mymodel update profile_pic { nullable: true }") .unwrap(); let stmt = parse_ast_node_full(&stmt[2..]).unwrap(); - Model::transactional_exec_alter(&global, stmt).unwrap(); + ModelData::transactional_exec_alter(&global, stmt).unwrap(); } multirun(|| { let global = TestGlobal::new_with_driver_id(log_name); global .state() + .namespace() .with_model(("myspace", "mymodel").into(), |model| { assert_eq!( model.fields().st_get("profile_pic").unwrap(), @@ -304,13 +316,14 @@ fn drop_model() { init_default_model(&global); let stmt = lex_insecure(b"drop model myspace.mymodel").unwrap(); let stmt = parse_ast_node_full(&stmt[2..]).unwrap(); - Model::transactional_exec_drop(&global, stmt).unwrap(); + ModelData::transactional_exec_drop(&global, stmt).unwrap(); } multirun(|| { let global = TestGlobal::new_with_driver_id(log_name); assert_eq!( global .state() + .namespace() .with_model(("myspace", "mymodel").into(), |_| { Ok(()) }) .unwrap_err(), QueryError::QExecObjectNotFound diff --git a/server/src/engine/storage/common_encoding/r1/impls/gns/tests/io.rs b/server/src/engine/storage/common_encoding/r1/impls/gns/tests/io.rs index 7aeb038e..5563b83d 100644 --- a/server/src/engine/storage/common_encoding/r1/impls/gns/tests/io.rs +++ b/server/src/engine/storage/common_encoding/r1/impls/gns/tests/io.rs @@ -30,7 +30,7 @@ use { space, SpaceIDRef, SpaceIDRes, }, crate::engine::{ - core::{model::Model, space::Space}, + core::{model::ModelData, space::Space}, storage::common_encoding::r1::{dec, enc}, txn::ModelIDRef, }, @@ -95,7 +95,7 @@ mod model_tests { AlterModelAddTxnRestorePL, AlterModelRemoveTxnRestorePL, AlterModelUpdateTxnRestorePL, CreateModelTxnRestorePL, }, - Model, Space, + ModelData, Space, }, crate::engine::{ core::model::{Field, Layer}, @@ -106,9 +106,9 @@ mod model_tests { }, }, }; - fn default_space_model() -> (Space, Model) { + fn default_space_model() -> (Space, ModelData) { let space = Space::new_auto_all(); - let model = Model::new_restore( + let model = ModelData::new_restore( Uuid::new(), "username".into(), TagSelector::String.into_full(), diff --git a/server/src/engine/storage/common_encoding/r1/obj.rs b/server/src/engine/storage/common_encoding/r1/obj.rs index 0fa8adcb..231891b8 100644 --- a/server/src/engine/storage/common_encoding/r1/obj.rs +++ b/server/src/engine/storage/common_encoding/r1/obj.rs @@ -29,7 +29,7 @@ use { crate::{ engine::{ core::{ - model::{Field, Layer, Model}, + model::{Field, Layer, ModelData}, space::Space, }, data::{ @@ -405,16 +405,16 @@ impl ModelLayoutMD { } #[derive(Clone, Copy)] -pub struct ModelLayoutRef<'a>(pub(super) &'a Model); -impl<'a> From<&'a Model> for ModelLayoutRef<'a> { - fn from(mdl: &'a Model) -> Self { +pub struct ModelLayoutRef<'a>(pub(super) &'a ModelData); +impl<'a> From<&'a ModelData> for ModelLayoutRef<'a> { + fn from(mdl: &'a ModelData) -> Self { Self(mdl) } } impl<'a> PersistObject for ModelLayoutRef<'a> { const METADATA_SIZE: usize = sizeof!(u128) + sizeof!(u64, 3); type InputType = ModelLayoutRef<'a>; - type OutputType = Model; + type OutputType = ModelData; type Metadata = ModelLayoutMD; fn pretest_can_dec_object(scanner: &BufferedScanner, md: &Self::Metadata) -> bool { scanner.has_left(md.p_key_len as usize) @@ -455,7 +455,7 @@ impl<'a> PersistObject for ModelLayoutRef<'a> { } else { TagSelector::from_raw(md.p_key_tag as u8) }; - Ok(Model::new_restore( + Ok(ModelData::new_restore( md.model_uuid, key.into_boxed_str(), ptag.into_full(), diff --git a/server/src/engine/storage/common_encoding/r1/tests.rs b/server/src/engine/storage/common_encoding/r1/tests.rs index 33935a08..f91b7a7e 100644 --- a/server/src/engine/storage/common_encoding/r1/tests.rs +++ b/server/src/engine/storage/common_encoding/r1/tests.rs @@ -28,7 +28,7 @@ use { super::obj, crate::engine::{ core::{ - model::{Field, Layer, Model}, + model::{Field, Layer, ModelData}, space::Space, }, data::{ @@ -98,7 +98,7 @@ fn fieldmap() { #[test] fn model() { let uuid = Uuid::new(); - let model = Model::new_restore( + let model = ModelData::new_restore( uuid, "username".into(), TagSelector::String.into_full(), diff --git a/server/src/engine/storage/common_encoding/r2.rs b/server/src/engine/storage/common_encoding/r2.rs index e338f4a8..bd05e6d5 100644 --- a/server/src/engine/storage/common_encoding/r2.rs +++ b/server/src/engine/storage/common_encoding/r2.rs @@ -32,7 +32,7 @@ use { super::r1::{dec, impls::gns::GNSEvent, PersistObject}, crate::{ engine::{ - core::GlobalNS, + core::GNSData, error::{StorageError, TransactionError}, mem::BufferedScanner, txn::gns::sysctl::{AlterUserTxn, CreateUserTxn, DropUserTxn}, @@ -51,7 +51,7 @@ impl<'a> GNSEvent for CreateUserTxn<'a> { type RestoreType = FullUserDefinition; fn update_global_state( FullUserDefinition { username, password }: Self::RestoreType, - gns: &GlobalNS, + gns: &GNSData, ) -> RuntimeResult<()> { if gns.sys_db().__raw_create_user(username, password) { Ok(()) @@ -138,7 +138,7 @@ impl<'a> GNSEvent for AlterUserTxn<'a> { type RestoreType = FullUserDefinition; fn update_global_state( FullUserDefinition { username, password }: Self::RestoreType, - gns: &GlobalNS, + gns: &GNSData, ) -> RuntimeResult<()> { if gns.sys_db().__raw_alter_user(&username, password) { Ok(()) @@ -200,7 +200,7 @@ impl<'a> GNSEvent for DropUserTxn<'a> { type RestoreType = DropUserPayload; fn update_global_state( DropUserPayload(username): Self::RestoreType, - gns: &GlobalNS, + gns: &GNSData, ) -> RuntimeResult<()> { if gns.sys_db().__raw_delete_user(&username) { Ok(()) diff --git a/server/src/engine/storage/mod.rs b/server/src/engine/storage/mod.rs index 96a91f81..2bec1cfa 100644 --- a/server/src/engine/storage/mod.rs +++ b/server/src/engine/storage/mod.rs @@ -27,10 +27,7 @@ //! Implementations of the Skytable Disk Storage Subsystem (SDSS) use { - super::{ - config::Configuration, core::GlobalNS, fractal::context, fractal::ModelDrivers, - RuntimeResult, - }, + super::{config::Configuration, core::GlobalNS, fractal::context, RuntimeResult}, std::path::Path, }; @@ -58,8 +55,6 @@ pub use v2::impls::{ pub struct SELoaded { pub gns: GlobalNS, - pub gns_driver: v2::impls::gns_log::GNSDriver, - pub model_drivers: ModelDrivers, } pub fn load(cfg: &Configuration) -> RuntimeResult { diff --git a/server/src/engine/storage/v1/loader.rs b/server/src/engine/storage/v1/loader.rs index 70d834c7..76764d10 100644 --- a/server/src/engine/storage/v1/loader.rs +++ b/server/src/engine/storage/v1/loader.rs @@ -26,7 +26,7 @@ use { crate::engine::{ - core::{EntityIDRef, GlobalNS}, + core::{EntityIDRef, GNSData}, error::RuntimeResult, fractal::{error::ErrorContext, ModelUniqueID}, storage::{ @@ -41,8 +41,8 @@ use { std::collections::HashMap, }; -pub fn load_gns() -> RuntimeResult { - let gns = GlobalNS::empty(); +pub fn load_gns() -> RuntimeResult { + let gns = GNSData::empty(); let gns_txn_driver = raw_journal::load_journal::(super::GNS_PATH, &gns)?; let mut model_drivers = HashMap::new(); @@ -55,17 +55,21 @@ pub fn load_gns() -> RuntimeResult { let model = models .get_mut(&EntityIDRef::new(&space_name, &model_name)) .unwrap(); - let path = - paths_v1::model_path(space_name, space_uuid, model_name, model.get_uuid()); - let persist_driver = batch_jrnl::reinit(&path, model).inherit_set_dmsg(format!( - "failed to restore model data from journal in `{path}`" - ))?; + let path = paths_v1::model_path( + space_name, + space_uuid, + model_name, + model.data().get_uuid(), + ); + let persist_driver = batch_jrnl::reinit(&path, model.data()).inherit_set_dmsg( + format!("failed to restore model data from journal in `{path}`"), + )?; unsafe { // UNSAFE(@ohsayan): all pieces of data are upgraded by now, so vacuum - model.model_mutator().vacuum_stashed(); + model.data_mut().model_mutator().vacuum_stashed(); } let _ = model_drivers.insert( - ModelUniqueID::new(space_name, model_name, model.get_uuid()), + ModelUniqueID::new(space_name, model_name, model.data().get_uuid()), persist_driver, ); } diff --git a/server/src/engine/storage/v1/mod.rs b/server/src/engine/storage/v1/mod.rs index ff08005d..fa1f667f 100644 --- a/server/src/engine/storage/v1/mod.rs +++ b/server/src/engine/storage/v1/mod.rs @@ -35,7 +35,7 @@ use { self::raw::sysdb::RestoredSystemDatabase, super::common::interface::fs::FileSystem, crate::{ - engine::{core::GlobalNS, RuntimeResult}, + engine::{core::GNSData, RuntimeResult}, util, }, }; @@ -44,7 +44,7 @@ pub const GNS_PATH: &str = "gns.db-tlog"; pub const SYSDB_PATH: &str = "sys.db"; pub const DATA_DIR: &str = "data"; -pub fn load_gns_prepare_migration() -> RuntimeResult { +pub fn load_gns_prepare_migration() -> RuntimeResult { // load gns let gns = loader::load_gns()?; // load sysdb diff --git a/server/src/engine/storage/v1/raw/batch_jrnl/mod.rs b/server/src/engine/storage/v1/raw/batch_jrnl/mod.rs index 3e863236..ba6217ad 100644 --- a/server/src/engine/storage/v1/raw/batch_jrnl/mod.rs +++ b/server/src/engine/storage/v1/raw/batch_jrnl/mod.rs @@ -42,11 +42,11 @@ pub use {persist::DataBatchPersistDriver, restore::DataBatchRestoreDriver}; use { super::{rw::SDSSFileIO, spec}, - crate::engine::{core::model::Model, error::RuntimeResult}, + crate::engine::{core::model::ModelData, error::RuntimeResult}, }; /// Re-initialize an existing batch journal and read all its data into model -pub fn reinit(name: &str, model: &Model) -> RuntimeResult { +pub fn reinit(name: &str, model: &ModelData) -> RuntimeResult { let (f, _header) = SDSSFileIO::open::(name)?; // restore let mut restore_driver = DataBatchRestoreDriver::new(f)?; diff --git a/server/src/engine/storage/v1/raw/batch_jrnl/restore.rs b/server/src/engine/storage/v1/raw/batch_jrnl/restore.rs index 48560d90..62e7674d 100644 --- a/server/src/engine/storage/v1/raw/batch_jrnl/restore.rs +++ b/server/src/engine/storage/v1/raw/batch_jrnl/restore.rs @@ -32,7 +32,7 @@ use { crate::engine::{ core::{ index::{DcFieldIndex, PrimaryIndexKey, Row}, - model::{delta::DeltaVersion, Model}, + model::{delta::DeltaVersion, ModelData}, }, data::{cell::Datacell, tag::TagUnique}, error::{RuntimeResult, StorageError}, @@ -120,7 +120,7 @@ impl DataBatchRestoreDriver { } pub(in crate::engine::storage::v1) fn read_data_batch_into_model( &mut self, - model: &Model, + model: &ModelData, ) -> RuntimeResult<()> { self.read_all_batches_and_for_each(|batch| { // apply the batch @@ -199,7 +199,7 @@ impl DataBatchRestoreDriver { impl DataBatchRestoreDriver { fn apply_batch( - m: &Model, + m: &ModelData, NormalBatch { events, schema_version, diff --git a/server/src/engine/storage/v1/raw/journal/mod.rs b/server/src/engine/storage/v1/raw/journal/mod.rs index 5490cbcd..23f58368 100644 --- a/server/src/engine/storage/v1/raw/journal/mod.rs +++ b/server/src/engine/storage/v1/raw/journal/mod.rs @@ -27,7 +27,7 @@ use { self::raw::JournalAdapter, crate::engine::{ - core::GlobalNS, error::TransactionError, mem::BufferedScanner, + core::GNSData, error::TransactionError, mem::BufferedScanner, storage::common_encoding::r1::impls::gns::GNSEvent, txn::gns, RuntimeResult, }, }; @@ -47,7 +47,7 @@ pub struct GNSAdapter; impl JournalAdapter for GNSAdapter { const RECOVERY_PLUGIN: bool = true; type JournalEvent = GNSSuperEvent; - type GlobalState = GlobalNS; + type GlobalState = GNSData; type Error = crate::engine::fractal::error::Error; fn encode(GNSSuperEvent(b): Self::JournalEvent) -> Box<[u8]> { b @@ -58,7 +58,7 @@ impl JournalAdapter for GNSAdapter { [$(<$item as GNSEvent>::decode_and_update_global_state),*, |_, _| Err(TransactionError::DecodeUnknownTxnOp.into())] }; } - static DISPATCH: [fn(&mut BufferedScanner, &GlobalNS) -> RuntimeResult<()>; 9] = dispatch!( + static DISPATCH: [fn(&mut BufferedScanner, &GNSData) -> RuntimeResult<()>; 9] = dispatch!( gns::space::CreateSpaceTxn, gns::space::AlterSpaceTxn, gns::space::DropSpaceTxn, diff --git a/server/src/engine/storage/v2/impls/gns_log.rs b/server/src/engine/storage/v2/impls/gns_log.rs index 6b1d2e23..6842cdd0 100644 --- a/server/src/engine/storage/v2/impls/gns_log.rs +++ b/server/src/engine/storage/v2/impls/gns_log.rs @@ -31,7 +31,7 @@ use { }, crate::{ engine::{ - core::GlobalNS, + core::GNSData, storage::{ common_encoding::r1::impls::gns::GNSEvent, v2::raw::journal::{self, EventLogDriver, JournalAdapterEvent}, @@ -56,14 +56,15 @@ use { */ pub type GNSDriver = EventLogDriver; +#[derive(Debug)] pub struct GNSEventLog; impl GNSDriver { const FILE_PATH: &'static str = "gns.db-tlog"; - pub fn open_gns_with_name(name: &str, gs: &GlobalNS) -> RuntimeResult { + pub fn open_gns_with_name(name: &str, gs: &GNSData) -> RuntimeResult { journal::open_journal(name, gs) } - pub fn open_gns(gs: &GlobalNS) -> RuntimeResult { + pub fn open_gns(gs: &GNSData) -> RuntimeResult { Self::open_gns_with_name(Self::FILE_PATH, gs) } pub fn create_gns_with_name(name: &str) -> RuntimeResult { @@ -83,10 +84,10 @@ macro_rules! make_dispatch { impl EventLogSpec for GNSEventLog { type Spec = SystemDatabaseV1; - type GlobalState = GlobalNS; + type GlobalState = GNSData; type EventMeta = GNSTransactionCode; type DecodeDispatch = - [fn(&GlobalNS, Vec) -> RuntimeResult<()>; GNSTransactionCode::VARIANT_COUNT]; + [fn(&GNSData, Vec) -> RuntimeResult<()>; GNSTransactionCode::VARIANT_COUNT]; const DECODE_DISPATCH: Self::DecodeDispatch = make_dispatch![ CreateSpaceTxn, AlterSpaceTxn, diff --git a/server/src/engine/storage/v2/impls/mdl_journal.rs b/server/src/engine/storage/v2/impls/mdl_journal.rs index d7128039..7b066794 100644 --- a/server/src/engine/storage/v2/impls/mdl_journal.rs +++ b/server/src/engine/storage/v2/impls/mdl_journal.rs @@ -31,7 +31,7 @@ use { index::{DcFieldIndex, PrimaryIndexKey, Row, RowData}, model::{ delta::{DataDelta, DataDeltaKind, DeltaVersion}, - Model, + ModelData, }, }, data::{ @@ -41,10 +41,7 @@ use { error::StorageError, idx::{MTIndex, STIndex, STIndexSeq}, storage::{ - common::{ - interface::fs::File, - sdss::sdss_r1::rw::{TrackedReaderContext, TrackedWriter}, - }, + common::sdss::sdss_r1::rw::{TrackedReaderContext, TrackedWriter}, common_encoding::r1, v2::raw::{ journal::{ @@ -69,7 +66,7 @@ use { pub type ModelDriver = BatchDriver; impl ModelDriver { - pub fn open_model_driver(mdl: &Model, model_data_file_path: &str) -> RuntimeResult { + pub fn open_model_driver(mdl: &ModelData, model_data_file_path: &str) -> RuntimeResult { journal::open_journal(model_data_file_path, mdl) } /// Create a new event log @@ -79,6 +76,7 @@ impl ModelDriver { } /// The model data adapter (abstract journal adapter impl) +#[derive(Debug)] pub struct ModelDataAdapter; #[derive(Debug, PartialEq, Clone, Copy, TaggedEnum)] @@ -110,7 +108,7 @@ pub enum EventType { */ struct RowWriter<'b> { - f: &'b mut TrackedWriter as RawJournalAdapter>::Spec>, + f: &'b mut TrackedWriter< as RawJournalAdapter>::Spec>, } impl<'b> RowWriter<'b> { @@ -118,7 +116,7 @@ impl<'b> RowWriter<'b> { /// - pk tag /// - schema version /// - column count - fn write_row_global_metadata(&mut self, model: &Model) -> RuntimeResult<()> { + fn write_row_global_metadata(&mut self, model: &ModelData) -> RuntimeResult<()> { self.f .dtrack_write(&[model.p_tag().tag_unique().value_u8()])?; self.f.dtrack_write( @@ -191,7 +189,7 @@ impl<'b> RowWriter<'b> { Ok(()) } /// Encode row data - fn write_row_data(&mut self, model: &Model, row_data: &RowData) -> RuntimeResult<()> { + fn write_row_data(&mut self, model: &ModelData, row_data: &RowData) -> RuntimeResult<()> { for field_name in model.fields().stseq_ord_key() { match row_data.fields().get(field_name) { Some(cell) => { @@ -206,7 +204,7 @@ impl<'b> RowWriter<'b> { } struct BatchWriter<'a, 'b> { - model: &'a Model, + model: &'a ModelData, row_writer: RowWriter<'b>, g: &'a Guard, sync_count: usize, @@ -214,10 +212,10 @@ struct BatchWriter<'a, 'b> { impl<'a, 'b> BatchWriter<'a, 'b> { fn write_batch( - model: &'a Model, + model: &'a ModelData, g: &'a Guard, expected: usize, - f: &'b mut TrackedWriter as RawJournalAdapter>::Spec>, + f: &'b mut TrackedWriter< as RawJournalAdapter>::Spec>, batch_stat: &mut BatchStats, ) -> RuntimeResult { /* @@ -248,9 +246,9 @@ impl<'a, 'b> BatchWriter<'a, 'b> { Ok(me.sync_count) } fn new( - model: &'a Model, + model: &'a ModelData, g: &'a Guard, - f: &'b mut TrackedWriter as RawJournalAdapter>::Spec>, + f: &'b mut TrackedWriter< as RawJournalAdapter>::Spec>, ) -> RuntimeResult { let mut row_writer = RowWriter { f }; row_writer.write_row_global_metadata(model)?; @@ -293,10 +291,10 @@ impl<'a, 'b> BatchWriter<'a, 'b> { } /// A standard model batch where atmost the given number of keys are flushed -pub struct StdModelBatch<'a>(&'a Model, usize); +pub struct StdModelBatch<'a>(&'a ModelData, usize); impl<'a> StdModelBatch<'a> { - pub fn new(model: &'a Model, observed_len: usize) -> Self { + pub fn new(model: &'a ModelData, observed_len: usize) -> Self { Self(model, observed_len) } } @@ -307,10 +305,7 @@ impl<'a> JournalAdapterEvent> for StdModelBatch<' } fn write_direct( self, - writer: &mut TrackedWriter< - File, - as RawJournalAdapter>::Spec, - >, + writer: &mut TrackedWriter< as RawJournalAdapter>::Spec>, ctx: Rc>, ) -> RuntimeResult<()> { // [expected commit] @@ -326,10 +321,10 @@ impl<'a> JournalAdapterEvent> for StdModelBatch<' } } -pub struct FullModel<'a>(&'a Model); +pub struct FullModel<'a>(&'a ModelData); impl<'a> FullModel<'a> { - pub fn new(model: &'a Model) -> Self { + pub fn new(model: &'a ModelData) -> Self { Self(model) } } @@ -340,7 +335,7 @@ impl<'a> JournalAdapterEvent> for FullModel<'a> { } fn write_direct( self, - f: &mut TrackedWriter as RawJournalAdapter>::Spec>, + f: &mut TrackedWriter< as RawJournalAdapter>::Spec>, _: Rc>, ) -> RuntimeResult<()> { let g = pin(); @@ -432,7 +427,7 @@ impl BatchStats { impl BatchAdapterSpec for ModelDataAdapter { type Spec = ModelDataBatchAofV1; - type GlobalState = Model; + type GlobalState = ModelData; type BatchType = BatchType; type EventType = EventType; type BatchMetadata = BatchMetadata; diff --git a/server/src/engine/storage/v2/impls/tests/model_driver.rs b/server/src/engine/storage/v2/impls/tests/model_driver.rs index 2fb12abd..6eab6347 100644 --- a/server/src/engine/storage/v2/impls/tests/model_driver.rs +++ b/server/src/engine/storage/v2/impls/tests/model_driver.rs @@ -27,7 +27,7 @@ use { crate::{ engine::{ - core::{dml, index::RowData, model::Model, space::Space, EntityID, EntityIDRef}, + core::{dml, index::RowData, model::ModelData, space::Space, EntityID, EntityIDRef}, data::lit::Lit, error::QueryResult, fractal::{test_utils::TestGlobal, GlobalInstanceLike}, @@ -75,7 +75,7 @@ fn create_model_and_space(global: &TestGlobal, create_model: &str) -> QueryResul let create_space_tokens = lex_insecure(create_space_str.as_bytes()).unwrap(); let create_space: CreateSpace = ast::parse_ast_node_full(&create_space_tokens[2..]).unwrap(); Space::transactional_exec_create(global, create_space)?; - Model::transactional_exec_create(global, create_model).map(|_| mdl_name) + ModelData::transactional_exec_create(global, create_model).map(|_| mdl_name) } fn run_insert(global: &TestGlobal, insert: &str) -> QueryResult<()> { @@ -144,6 +144,7 @@ fn run_sample_inserts( global.load_model_drivers().unwrap(); global .state() + .namespace() .with_model( EntityIDRef::new(mdl_name.space(), mdl_name.entity()), |model| { @@ -225,6 +226,7 @@ fn run_sample_updates( { global .state() + .namespace() .with_model( EntityIDRef::new(mdl_name.space(), mdl_name.entity()), |model| { diff --git a/server/src/engine/storage/v2/mod.rs b/server/src/engine/storage/v2/mod.rs index a93467c6..7c1a7863 100644 --- a/server/src/engine/storage/v2/mod.rs +++ b/server/src/engine/storage/v2/mod.rs @@ -31,9 +31,9 @@ use { config::Configuration, core::{ system_db::{SystemDatabase, VerifyUser}, - GlobalNS, + GNSData, GlobalNS, }, - fractal::{context, ModelDrivers, ModelUniqueID}, + fractal::{context, FractalGNSDriver}, storage::common::paths_v1, txn::{ gns::{ @@ -54,8 +54,7 @@ pub(super) mod raw; pub const GNS_PATH: &str = v1::GNS_PATH; pub const DATA_DIR: &str = v1::DATA_DIR; -pub fn recreate(gns: GlobalNS) -> RuntimeResult { - let model_drivers = ModelDrivers::empty(); +pub fn recreate(gns: GNSData) -> RuntimeResult { context::set_dmsg("creating gns"); let mut gns_driver = impls::gns_log::GNSDriver::create_gns()?; // create all spaces @@ -67,41 +66,37 @@ pub fn recreate(gns: GlobalNS) -> RuntimeResult { // create all models context::set_dmsg("creating all models"); for (model_id, model) in gns.idx_models().read().iter() { + let model_data = model.data(); let space_uuid = gns.idx().read().get(model_id.space()).unwrap().get_uuid(); FileSystem::create_dir_all(&paths_v1::model_dir( model_id.space(), space_uuid, model_id.entity(), - model.get_uuid(), + model_data.get_uuid(), ))?; let mut model_driver = ModelDriver::create_model_driver(&paths_v1::model_path( model_id.space(), space_uuid, model_id.entity(), - model.get_uuid(), + model_data.get_uuid(), ))?; gns_driver.commit_event(CreateModelTxn::new( SpaceIDRef::with_uuid(model_id.space(), space_uuid), model_id.entity(), - model, + model_data, ))?; - model_driver.commit_with_ctx(FullModel::new(model), BatchStats::new())?; - model_drivers.add_driver( - ModelUniqueID::new(model_id.space(), model_id.entity(), model.get_uuid()), - model_driver, - ); + model_driver.commit_with_ctx(FullModel::new(model_data), BatchStats::new())?; + model.driver().initialize_model_driver(model_driver); } Ok(SELoaded { - gns, - gns_driver, - model_drivers, + gns: GlobalNS::new(gns, FractalGNSDriver::new(gns_driver)), }) } pub fn initialize_new(config: &Configuration) -> RuntimeResult { FileSystem::create_dir_all(DATA_DIR)?; let mut gns_driver = impls::gns_log::GNSDriver::create_gns()?; - let gns = GlobalNS::empty(); + let gns = GNSData::empty(); let password_hash = rcrypt::hash(&config.auth.root_key, rcrypt::DEFAULT_COST).unwrap(); // now go ahead and initialize our root user gns_driver.commit_event(CreateUserTxn::new( @@ -113,31 +108,26 @@ pub fn initialize_new(config: &Configuration) -> RuntimeResult { password_hash.into_boxed_slice(), )); Ok(SELoaded { - gns, - gns_driver, - model_drivers: ModelDrivers::empty(), + gns: GlobalNS::new(gns, FractalGNSDriver::new(gns_driver)), }) } pub fn restore(cfg: &Configuration) -> RuntimeResult { - let gns = GlobalNS::empty(); + let gns = GNSData::empty(); context::set_dmsg("loading gns"); let mut gns_driver = impls::gns_log::GNSDriver::open_gns(&gns)?; - let model_drivers = ModelDrivers::empty(); for (id, model) in gns.idx_models().write().iter_mut() { + let model_data = model.data(); let space_uuid = gns.idx().read().get(id.space()).unwrap().get_uuid(); let model_data_file_path = - paths_v1::model_path(id.space(), space_uuid, id.entity(), model.get_uuid()); + paths_v1::model_path(id.space(), space_uuid, id.entity(), model_data.get_uuid()); context::set_dmsg(format!("loading model driver in {model_data_file_path}")); let model_driver = - impls::mdl_journal::ModelDriver::open_model_driver(model, &model_data_file_path)?; - model_drivers.add_driver( - ModelUniqueID::new(id.space(), id.entity(), model.get_uuid()), - model_driver, - ); + impls::mdl_journal::ModelDriver::open_model_driver(model_data, &model_data_file_path)?; + model.driver().initialize_model_driver(model_driver); unsafe { // UNSAFE(@ohsayan): all pieces of data are upgraded by now, so vacuum - model.model_mutator().vacuum_stashed(); + model.data_mut().model_mutator().vacuum_stashed(); } } // check if password has changed @@ -155,8 +145,6 @@ pub fn restore(cfg: &Configuration) -> RuntimeResult { .__raw_alter_user(SystemDatabase::ROOT_ACCOUNT, phash.into_boxed_slice()); } Ok(SELoaded { - gns, - gns_driver, - model_drivers, + gns: GlobalNS::new(gns, FractalGNSDriver::new(gns_driver)), }) } diff --git a/server/src/engine/storage/v2/raw/journal/mod.rs b/server/src/engine/storage/v2/raw/journal/mod.rs index f01f7372..230849d3 100644 --- a/server/src/engine/storage/v2/raw/journal/mod.rs +++ b/server/src/engine/storage/v2/raw/journal/mod.rs @@ -24,8 +24,6 @@ * */ -use crate::engine::storage::common::interface::fs::File; - use { self::raw::{CommitPreference, RawJournalAdapterEvent, RawJournalWriter}, crate::{ @@ -65,6 +63,7 @@ pub use raw::{ /// An event log driver pub type EventLogDriver = RawJournalWriter>; /// The event log adapter +#[derive(Debug)] pub struct EventLogAdapter(PhantomData); type DispatchFn = fn(&G, Vec) -> RuntimeResult<()>; @@ -103,7 +102,7 @@ impl RawJournalAdapter for EventLogAdapter { } fn commit_direct<'a, E>( &mut self, - w: &mut TrackedWriter, + w: &mut TrackedWriter, ev: E, ctx: (), ) -> RuntimeResult<()> @@ -160,6 +159,7 @@ impl RawJournalAdapter for EventLogAdapter { /// Batch journal driver pub type BatchDriver = RawJournalWriter>; /// Batch journal adapter +#[derive(Debug)] pub struct BatchAdapter(PhantomData); #[cfg(test)] @@ -245,7 +245,7 @@ impl RawJournalAdapter for BatchAdapter { } fn commit_direct<'a, E>( &mut self, - w: &mut TrackedWriter, + w: &mut TrackedWriter, ev: E, ctx: Self::CommitContext, ) -> RuntimeResult<()> diff --git a/server/src/engine/storage/v2/raw/journal/raw/mod.rs b/server/src/engine/storage/v2/raw/journal/raw/mod.rs index c049dde6..e06127d4 100644 --- a/server/src/engine/storage/v2/raw/journal/raw/mod.rs +++ b/server/src/engine/storage/v2/raw/journal/raw/mod.rs @@ -33,7 +33,6 @@ use { mem::unsafe_apis::memcpy, storage::common::{ checksum::SCrc64, - interface::fs::File, sdss::sdss_r1::{ rw::{SdssFile, TrackedReader, TrackedWriter}, FileSpecV1, @@ -41,6 +40,7 @@ use { }, RuntimeResult, }, + core::fmt, std::ops::Range, }; @@ -212,7 +212,7 @@ pub trait RawJournalAdapterEvent: Sized { fn md(&self) -> u64; fn write_direct( self, - _: &mut TrackedWriter::Spec>, + _: &mut TrackedWriter<::Spec>, _: ::CommitContext, ) -> RuntimeResult<()> { unimplemented!() @@ -251,7 +251,7 @@ pub trait RawJournalAdapter: Sized { /// commit event (direct preference) fn commit_direct( &mut self, - _: &mut TrackedWriter, + _: &mut TrackedWriter, _: E, _: Self::CommitContext, ) -> RuntimeResult<()> @@ -468,12 +468,27 @@ pub(super) enum DriverEventKind { /// A low-level journal writer pub struct RawJournalWriter { j: J, - log_file: TrackedWriter::Spec>, + log_file: TrackedWriter<::Spec>, txn_id: u64, known_txn_id: u64, known_txn_offset: u64, // if offset is 0, txn id is unset } +impl fmt::Debug for RawJournalWriter +where + ::Metadata: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("RawJournalWriter") + .field("j", &self.j) + .field("log_file", &self.log_file) + .field("txn_id", &self.txn_id) + .field("known_txn_id", &self.known_txn_id) + .field("known_txn_offset", &self.known_txn_offset) + .finish() + } +} + const SERVER_EV_MASK: u64 = 1 << (u64::BITS - 1); impl RawJournalWriter { diff --git a/server/src/engine/storage/v2/raw/journal/tests.rs b/server/src/engine/storage/v2/raw/journal/tests.rs index c2dbb525..bc2e8e78 100644 --- a/server/src/engine/storage/v2/raw/journal/tests.rs +++ b/server/src/engine/storage/v2/raw/journal/tests.rs @@ -38,10 +38,7 @@ use { error::StorageError, mem::unsafe_apis, storage::{ - common::{ - interface::fs::File, - sdss::sdss_r1::rw::{TrackedReaderContext, TrackedWriter}, - }, + common::sdss::sdss_r1::rw::{TrackedReaderContext, TrackedWriter}, v2::raw::{ journal::raw::{create_journal, open_journal, RawJournalWriter}, spec::{ModelDataBatchAofV1, SystemDatabaseV1}, @@ -285,7 +282,6 @@ impl<'a> RawJournalAdapterEvent> for BatchDBFlush<' fn write_direct( self, f: &mut TrackedWriter< - File, as super::raw::RawJournalAdapter>::Spec, >, ctx: Rc>, diff --git a/server/src/engine/storage/v2/raw/spec.rs b/server/src/engine/storage/v2/raw/spec.rs index bd476cf4..63657d2a 100644 --- a/server/src/engine/storage/v2/raw/spec.rs +++ b/server/src/engine/storage/v2/raw/spec.rs @@ -70,6 +70,7 @@ impl sdss::sdss_r1::HeaderV1Enumeration for FileSpecifier { } } +#[derive(Debug)] pub struct HeaderImplV2; impl sdss::sdss_r1::HeaderV1Spec for HeaderImplV2 { type FileClass = FileClass; diff --git a/server/src/engine/txn/gns/model.rs b/server/src/engine/txn/gns/model.rs index 778be0fa..e2d5869e 100644 --- a/server/src/engine/txn/gns/model.rs +++ b/server/src/engine/txn/gns/model.rs @@ -25,7 +25,7 @@ */ use crate::engine::{ - core::model::{Field, Model}, + core::model::{Field, ModelData}, idx::{IndexST, IndexSTSeqCns}, ql::lex::Ident, txn::{ModelIDRef, SpaceIDRef}, @@ -44,11 +44,11 @@ impl_gns_event!( pub struct CreateModelTxn<'a> { space_id: SpaceIDRef<'a>, model_name: &'a str, - model: &'a Model, + model: &'a ModelData, } impl<'a> CreateModelTxn<'a> { - pub const fn new(space_id: SpaceIDRef<'a>, model_name: &'a str, model: &'a Model) -> Self { + pub const fn new(space_id: SpaceIDRef<'a>, model_name: &'a str, model: &'a ModelData) -> Self { Self { space_id, model_name, @@ -61,7 +61,7 @@ impl<'a> CreateModelTxn<'a> { pub fn model_name(&self) -> &str { self.model_name } - pub fn model(&self) -> &Model { + pub fn model(&self) -> &ModelData { self.model } } diff --git a/server/src/engine/txn/shared.rs b/server/src/engine/txn/shared.rs index df32be3d..38403017 100644 --- a/server/src/engine/txn/shared.rs +++ b/server/src/engine/txn/shared.rs @@ -25,7 +25,7 @@ */ use crate::engine::{ - core::{model::Model, space::Space}, + core::{model::ModelData, space::Space}, data::uuid::Uuid, }; @@ -63,7 +63,7 @@ impl<'a> ModelIDRef<'a> { space_name: &'a str, space: &'a Space, model_name: &'a str, - model: &'a Model, + model: &'a ModelData, ) -> ModelIDRef<'a> { ModelIDRef::new( SpaceIDRef::new(space_name, space),