diff --git a/server/src/engine/core/dml/ins.rs b/server/src/engine/core/dml/ins.rs index 2ff9723b..1bc3d183 100644 --- a/server/src/engine/core/dml/ins.rs +++ b/server/src/engine/core/dml/ins.rs @@ -29,7 +29,7 @@ use crate::engine::{ self, dml::QueryExecMeta, index::{DcFieldIndex, PrimaryIndexKey, Row}, - model::{delta::DataDeltaKind, Fields, Model}, + model::{delta::DataDeltaKind, Model}, }, error::{QueryError, QueryResult}, fractal::GlobalInstanceLike, @@ -48,8 +48,7 @@ pub fn insert_resp( pub fn insert(global: &impl GlobalInstanceLike, insert: InsertStatement) -> QueryResult<()> { core::with_model_for_data_update(global, insert.entity(), |mdl| { - let irmwd = mdl.intent_write_new_data(); - let (pk, data) = prepare_insert(mdl, irmwd.fields(), insert.data())?; + let (pk, data) = prepare_insert(mdl, insert.data())?; let g = cpin(); let ds = mdl.delta_state(); // create new version @@ -68,9 +67,9 @@ pub fn insert(global: &impl GlobalInstanceLike, insert: InsertStatement) -> Quer // TODO(@ohsayan): optimize null case fn prepare_insert( model: &Model, - fields: &Fields, insert: InsertData, ) -> QueryResult<(PrimaryIndexKey, DcFieldIndex)> { + let fields = model.fields(); let mut okay = fields.len() == insert.column_count(); let mut prepared_data = DcFieldIndex::idx_init_cap(fields.len()); match insert { diff --git a/server/src/engine/core/dml/sel.rs b/server/src/engine/core/dml/sel.rs index 8c2d0f1f..146850cf 100644 --- a/server/src/engine/core/dml/sel.rs +++ b/server/src/engine/core/dml/sel.rs @@ -102,7 +102,6 @@ where F: FnMut(&Datacell), { global.namespace().with_model(select.entity(), |mdl| { - let irm = mdl.intent_read_model(); let target_key = mdl.resolve_where(select.clauses_mut())?; let pkdc = VirtualDatacell::new(target_key.clone()); let g = sync::atm::cpin(); @@ -118,7 +117,7 @@ where Some(row) => { let r = row.resolve_schema_deltas_and_freeze(mdl.delta_state()); if select.is_wildcard() { - for key in irm.fields().stseq_ord_key() { + for key in mdl.fields().stseq_ord_key() { read_field(key.as_ref(), r.fields())?; } } else { diff --git a/server/src/engine/core/dml/upd.rs b/server/src/engine/core/dml/upd.rs index b3810a5d..d665a741 100644 --- a/server/src/engine/core/dml/upd.rs +++ b/server/src/engine/core/dml/upd.rs @@ -266,8 +266,6 @@ pub fn update(global: &impl GlobalInstanceLike, mut update: UpdateStatement) -> let mut ret = Ok(QueryExecMeta::zero()); // prepare row fetch let key = mdl.resolve_where(update.clauses_mut())?; - // freeze schema - let irm = mdl.intent_read_model(); // fetch row let g = sync::atm::cpin(); let Some(row) = mdl.primary_index().select(key, &g) else { @@ -298,7 +296,7 @@ pub fn update(global: &impl GlobalInstanceLike, mut update: UpdateStatement) -> let field_definition; let field_data; match ( - irm.fields().st_get(lhs.as_str()), + mdl.fields().st_get(lhs.as_str()), row_data_wl.fields_mut().st_get_mut(lhs.as_str()), ) { (Some(fdef), Some(fdata)) => { diff --git a/server/src/engine/core/index/row.rs b/server/src/engine/core/index/row.rs index b667edd4..7cde70a6 100644 --- a/server/src/engine/core/index/row.rs +++ b/server/src/engine/core/index/row.rs @@ -167,9 +167,8 @@ impl Row { } // we have deltas to apply let mut wl = RwLockUpgradableReadGuard::upgrade(rwl_ug); - let delta_read = delta_state.schema_delta_read(); let mut max_delta = wl.txn_revised_schema_version; - for (delta_id, delta) in delta_read.resolve_iter_since(wl.txn_revised_schema_version) { + for (delta_id, delta) in delta_state.resolve_iter_since(wl.txn_revised_schema_version) { match delta.kind() { SchemaDeltaKind::FieldAdd(f) => { wl.fields.st_insert(f.clone(), Datacell::null()); diff --git a/server/src/engine/core/mod.rs b/server/src/engine/core/mod.rs index da929a4f..69bfd829 100644 --- a/server/src/engine/core/mod.rs +++ b/server/src/engine/core/mod.rs @@ -87,13 +87,13 @@ impl GlobalNS { let mut space = space.write(); f(&mut space) } - pub fn with_model_space<'a, T, F>(&self, entity: Entity<'a>, f: F) -> QueryResult + pub fn with_model_space_mut_for_ddl<'a, T, F>(&self, entity: Entity<'a>, f: F) -> QueryResult where - F: FnOnce(&Space, &Model) -> QueryResult, + F: FnOnce(&Space, &mut Model) -> QueryResult, { let (space, model_name) = entity.into_full_result()?; - let mdl_idx = self.idx_mdl.read(); - let Some(model) = mdl_idx.get(&EntityIDRef::new(&space, &model_name)) else { + let mut mdl_idx = self.idx_mdl.write(); + let Some(model) = mdl_idx.get_mut(&EntityIDRef::new(&space, &model_name)) else { return Err(QueryError::QExecObjectNotFound); }; let space_read = self.idx.read(); diff --git a/server/src/engine/core/model/alt.rs b/server/src/engine/core/model/alt.rs index ef1386cd..ad24f554 100644 --- a/server/src/engine/core/model/alt.rs +++ b/server/src/engine/core/model/alt.rs @@ -25,7 +25,7 @@ */ use { - super::{Field, IWModel, Layer, Model}, + super::{Field, Layer, Model}, crate::{ engine::{ data::{ @@ -76,7 +76,7 @@ macro_rules! can_ignore { } #[inline(always)] -fn no_field(mr: &IWModel, new: &str) -> bool { +fn no_field(mr: &Model, new: &str) -> bool { !mr.fields().st_contains(new) } @@ -90,8 +90,7 @@ fn check_nullable(props: &mut HashMap, DictEntryGeneric>) -> QueryResul impl<'a> AlterPlan<'a> { pub fn fdeltas( - mv: &Model, - wm: &IWModel, + mdl: &Model, AlterModel { model, kind }: AlterModel<'a>, ) -> QueryResult> { let mut no_lock = true; @@ -104,8 +103,8 @@ impl<'a> AlterPlan<'a> { } let mut not_found = false; if r.iter().all(|id| { - let not_pk = mv.not_pk(id); - let exists = !no_field(wm, id.as_str()); + let not_pk = mdl.not_pk(id); + let exists = !no_field(mdl, id.as_str()); not_found = !exists; not_pk & exists }) { @@ -125,7 +124,7 @@ impl<'a> AlterPlan<'a> { layers, mut props, } = fields.next().unwrap(); - okay &= no_field(wm, &field_name) & mv.not_pk(&field_name); + okay &= no_field(mdl, &field_name) & mdl.not_pk(&field_name); let is_nullable = check_nullable(&mut props)?; let layers = Field::parse_layers(layers, is_nullable)?; okay &= add.st_insert(field_name.to_string().into_boxed_str(), layers); @@ -144,9 +143,9 @@ impl<'a> AlterPlan<'a> { mut props, } = updated_fields.next().unwrap(); // enforce pk - mv.guard_pk(&field_name)?; + mdl.guard_pk(&field_name)?; // get the current field - let Some(current_field) = wm.fields().st_get(field_name.as_str()) else { + let Some(current_field) = mdl.fields().st_get(field_name.as_str()) else { return Err(QueryError::QExecUnknownField); }; // check props @@ -255,22 +254,18 @@ impl Model { let (space_name, model_name) = alter.model.into_full_result()?; global .namespace() - .with_model_space(alter.model, |space, model| { - // make intent - let iwm = model.intent_write_model(); + .with_model_space_mut_for_ddl(alter.model, |space, model| { // prepare plan - let plan = AlterPlan::fdeltas(model, &iwm, alter)?; + let plan = AlterPlan::fdeltas(model, alter)?; // we have a legal plan; acquire exclusive if we need it if !plan.no_lock { // TODO(@ohsayan): allow this later on, once we define the syntax return Err(QueryError::QExecNeedLock); } // fine, we're good - let mut iwm = iwm; match plan.action { - AlterAction::Ignore => drop(iwm), + AlterAction::Ignore => {} AlterAction::Add(new_fields) => { - let mut guard = model.delta_state().schema_delta_write(); // TODO(@ohsayan): this impacts lockdown duration; fix it if G::FS_IS_NON_NULL { // prepare txn @@ -291,13 +286,12 @@ impl Model { .map(|(x, y)| (x.clone(), y.clone())) .for_each(|(field_id, field)| { model - .delta_state() - .schema_append_unresolved_wl_field_add(&mut guard, &field_id); - iwm.fields_mut().st_insert(field_id, field); + .delta_state_mut() + .schema_append_unresolved_wl_field_add(&field_id); + model.fields_mut().st_insert(field_id, field); }); } AlterAction::Remove(removed) => { - let mut guard = model.delta_state().schema_delta_write(); if G::FS_IS_NON_NULL { // prepare txn let txn = gnstxn::AlterModelRemoveTxn::new( @@ -308,11 +302,10 @@ impl Model { global.namespace_txn_driver().lock().try_commit(txn)?; } removed.iter().for_each(|field_id| { - model.delta_state().schema_append_unresolved_wl_field_rem( - &mut guard, - field_id.as_str(), - ); - iwm.fields_mut().st_delete(field_id.as_str()); + model + .delta_state_mut() + .schema_append_unresolved_wl_field_rem(field_id.as_str()); + model.fields_mut().st_delete(field_id.as_str()); }); } AlterAction::Update(updated) => { @@ -326,7 +319,7 @@ impl Model { global.namespace_txn_driver().lock().try_commit(txn)?; } updated.into_iter().for_each(|(field_id, field)| { - iwm.fields_mut().st_update(&field_id, field); + model.fields_mut().st_update(&field_id, field); }); } } diff --git a/server/src/engine/core/model/delta.rs b/server/src/engine/core/model/delta.rs index a09c5514..2feb6cf0 100644 --- a/server/src/engine/core/model/delta.rs +++ b/server/src/engine/core/model/delta.rs @@ -25,136 +25,25 @@ */ use { - super::{Fields, Model}, + super::Model, crate::engine::{ core::{dml::QueryExecMeta, index::Row}, fractal::{FractalToken, GlobalInstanceLike}, sync::atm::Guard, sync::queue::Queue, }, - parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard}, std::{ collections::btree_map::{BTreeMap, Range}, sync::atomic::{AtomicU64, AtomicUsize, Ordering}, }, }; -/* - sync matrix -*/ - -// FIXME(@ohsayan): This an inefficient repr of the matrix; replace it with my other design -#[derive(Debug)] -/// A sync matrix enables different queries to have different access permissions on the data model, and the data in the -/// index -pub struct ISyncMatrix { - // virtual privileges - /// read/write model - v_priv_model_alter: RwLock<()>, - /// RW data/block all - v_priv_data_new_or_revise: RwLock<()>, -} - -#[cfg(test)] -impl PartialEq for ISyncMatrix { - fn eq(&self, _: &Self) -> bool { - true - } -} - -#[derive(Debug)] -/// Read model, write new data -pub struct IRModelSMData<'a> { - _rmodel: RwLockReadGuard<'a, ()>, - _mdata: RwLockReadGuard<'a, ()>, - fields: &'a Fields, -} - -impl<'a> IRModelSMData<'a> { - pub fn new(m: &'a Model) -> Self { - let rmodel = m.sync_matrix().v_priv_model_alter.read(); - let mdata = m.sync_matrix().v_priv_data_new_or_revise.read(); - Self { - _rmodel: rmodel, - _mdata: mdata, - fields: unsafe { - // UNSAFE(@ohsayan): we already have acquired this resource - m._read_fields() - }, - } - } - pub fn fields(&'a self) -> &'a Fields { - self.fields - } -} - -#[derive(Debug)] -/// Read model -pub struct IRModel<'a> { - _rmodel: RwLockReadGuard<'a, ()>, - fields: &'a Fields, -} - -impl<'a> IRModel<'a> { - pub fn new(m: &'a Model) -> Self { - Self { - _rmodel: m.sync_matrix().v_priv_model_alter.read(), - fields: unsafe { - // UNSAFE(@ohsayan): we already have acquired this resource - m._read_fields() - }, - } - } - pub fn fields(&'a self) -> &'a Fields { - self.fields - } -} - -#[derive(Debug)] -/// Write model -pub struct IWModel<'a> { - _wmodel: RwLockWriteGuard<'a, ()>, - fields: &'a mut Fields, -} - -impl<'a> IWModel<'a> { - pub fn new(m: &'a Model) -> Self { - Self { - _wmodel: m.sync_matrix().v_priv_model_alter.write(), - fields: unsafe { - // UNSAFE(@ohsayan): we have exclusive access to this resource - m._read_fields_mut() - }, - } - } - pub fn fields(&'a self) -> &'a Fields { - self.fields - } - // ALIASING - pub fn fields_mut(&mut self) -> &mut Fields { - self.fields - } -} - -impl ISyncMatrix { - pub const fn new() -> Self { - Self { - v_priv_model_alter: RwLock::new(()), - v_priv_data_new_or_revise: RwLock::new(()), - } - } -} - -/* - delta -*/ - #[derive(Debug)] /// A delta state for the model pub struct DeltaState { // schema - schema_current_version: AtomicU64, - schema_deltas: RwLock>, + schema_current_version: u64, + schema_deltas: BTreeMap, // data data_current_version: AtomicU64, data_deltas: Queue, @@ -165,8 +54,8 @@ impl DeltaState { /// A new, fully resolved delta state with version counters set to 0 pub fn new_resolved() -> Self { Self { - schema_current_version: AtomicU64::new(0), - schema_deltas: RwLock::new(BTreeMap::new()), + schema_current_version: 0, + schema_deltas: BTreeMap::new(), data_current_version: AtomicU64::new(0), data_deltas: Queue::new(), data_deltas_size: AtomicUsize::new(0), @@ -218,51 +107,32 @@ impl DeltaState { // schema impl DeltaState { - pub fn schema_delta_write<'a>(&'a self) -> SchemaDeltaIndexWGuard<'a> { - SchemaDeltaIndexWGuard(self.schema_deltas.write()) - } - pub fn schema_delta_read<'a>(&'a self) -> SchemaDeltaIndexRGuard<'a> { - SchemaDeltaIndexRGuard(self.schema_deltas.read()) - } - pub fn schema_current_version(&self) -> DeltaVersion { - self.__schema_delta_current() - } - pub fn schema_append_unresolved_wl_field_add( + pub fn resolve_iter_since( &self, - guard: &mut SchemaDeltaIndexWGuard, - field_name: &str, - ) { - self.__schema_append_unresolved_delta(&mut guard.0, SchemaDeltaPart::field_add(field_name)); + current_version: DeltaVersion, + ) -> Range { + self.schema_deltas.range(current_version.step()..) } - pub fn schema_append_unresolved_wl_field_rem( - &self, - guard: &mut SchemaDeltaIndexWGuard, - field_name: &str, - ) { - self.__schema_append_unresolved_delta(&mut guard.0, SchemaDeltaPart::field_rem(field_name)); + pub fn schema_current_version(&self) -> DeltaVersion { + DeltaVersion(self.schema_current_version) } - pub fn schema_append_unresolved_field_add(&self, field_name: &str) { - self.schema_append_unresolved_wl_field_add(&mut self.schema_delta_write(), field_name); + pub fn schema_append_unresolved_wl_field_add(&mut self, field_name: &str) { + self.__schema_append_unresolved_delta(SchemaDeltaPart::field_add(field_name)); } - pub fn schema_append_unresolved_field_rem(&self, field_name: &str) { - self.schema_append_unresolved_wl_field_rem(&mut self.schema_delta_write(), field_name); + pub fn schema_append_unresolved_wl_field_rem(&mut self, field_name: &str) { + self.__schema_append_unresolved_delta(SchemaDeltaPart::field_rem(field_name)); } } impl DeltaState { - fn __schema_delta_step(&self) -> DeltaVersion { - DeltaVersion(self.schema_current_version.fetch_add(1, Ordering::AcqRel)) - } - fn __schema_delta_current(&self) -> DeltaVersion { - DeltaVersion(self.schema_current_version.load(Ordering::Acquire)) + fn __schema_delta_step(&mut self) -> DeltaVersion { + let current = self.schema_current_version; + self.schema_current_version += 1; + DeltaVersion(current) } - fn __schema_append_unresolved_delta( - &self, - w: &mut BTreeMap, - part: SchemaDeltaPart, - ) -> DeltaVersion { + fn __schema_append_unresolved_delta(&mut self, part: SchemaDeltaPart) -> DeltaVersion { let v = self.__schema_delta_step(); - w.insert(v, part); + self.schema_deltas.insert(v, part); v } } @@ -328,19 +198,6 @@ impl SchemaDeltaPart { } } -pub struct SchemaDeltaIndexWGuard<'a>( - RwLockWriteGuard<'a, BTreeMap>, -); -pub struct SchemaDeltaIndexRGuard<'a>(RwLockReadGuard<'a, BTreeMap>); -impl<'a> SchemaDeltaIndexRGuard<'a> { - pub fn resolve_iter_since( - &self, - current_version: DeltaVersion, - ) -> Range { - self.0.range(current_version.step()..) - } -} - /* data delta */ diff --git a/server/src/engine/core/model/mod.rs b/server/src/engine/core/model/mod.rs index 71203879..d965787c 100644 --- a/server/src/engine/core/model/mod.rs +++ b/server/src/engine/core/model/mod.rs @@ -31,7 +31,6 @@ pub(in crate::engine) mod delta; use std::cell::RefCell; use { - self::delta::{IRModel, IRModelSMData, ISyncMatrix, IWModel}, super::index::PrimaryIndex, crate::engine::{ data::{ @@ -50,7 +49,6 @@ use { }, txn::gns::{self as gnstxn, SpaceIDRef}, }, - std::cell::UnsafeCell, }; pub(in crate::engine::core) use self::delta::{DeltaState, DeltaVersion, SchemaDeltaKind}; @@ -63,8 +61,7 @@ pub struct Model { uuid: Uuid, p_key: Box, p_tag: FullTag, - fields: UnsafeCell, - sync_matrix: ISyncMatrix, + fields: Fields, data: PrimaryIndex, delta: DeltaState, } @@ -72,11 +69,10 @@ pub struct Model { #[cfg(test)] impl PartialEq for Model { fn eq(&self, m: &Self) -> bool { - let mdl1 = self.intent_read_model(); - let mdl2 = m.intent_read_model(); - ((self.p_key == m.p_key) & (self.p_tag == m.p_tag)) - && self.uuid == m.uuid - && mdl1.fields() == mdl2.fields() + self.uuid == m.uuid + && self.p_key == m.p_key + && self.p_tag == m.p_tag + && self.fields == m.fields } } @@ -85,8 +81,7 @@ impl Model { uuid: Uuid, p_key: Box, p_tag: FullTag, - fields: UnsafeCell, - sync_matrix: ISyncMatrix, + fields: Fields, data: PrimaryIndex, delta: DeltaState, ) -> Self { @@ -95,12 +90,10 @@ impl Model { p_key, p_tag, fields, - sync_matrix, data, delta, } } - pub fn get_uuid(&self) -> Uuid { self.uuid } @@ -110,24 +103,6 @@ impl Model { pub fn p_tag(&self) -> FullTag { self.p_tag } - pub fn sync_matrix(&self) -> &ISyncMatrix { - &self.sync_matrix - } - unsafe fn _read_fields<'a>(&'a self) -> &'a Fields { - &*self.fields.get().cast_const() - } - unsafe fn _read_fields_mut<'a>(&'a self) -> &'a mut Fields { - &mut *self.fields.get() - } - pub fn intent_read_model<'a>(&'a self) -> IRModel<'a> { - IRModel::new(self) - } - pub fn intent_write_model<'a>(&'a self) -> IWModel<'a> { - IWModel::new(self) - } - pub fn intent_write_new_data<'a>(&'a self) -> IRModelSMData<'a> { - IRModelSMData::new(self) - } fn is_pk(&self, new: &str) -> bool { self.p_key.as_bytes() == new.as_bytes() } @@ -147,6 +122,15 @@ impl Model { pub fn delta_state(&self) -> &DeltaState { &self.delta } + pub fn delta_state_mut(&mut self) -> &mut DeltaState { + &mut self.delta + } + pub fn fields_mut(&mut self) -> &mut Fields { + &mut self.fields + } + pub fn fields(&self) -> &Fields { + &self.fields + } } impl Model { @@ -155,8 +139,7 @@ impl Model { uuid, p_key, p_tag, - UnsafeCell::new(fields), - ISyncMatrix::new(), + fields, PrimaryIndex::new_empty(), DeltaState::new_resolved(), ) @@ -215,14 +198,12 @@ impl Model { } // since we've locked this down, no one else can parallely create another model in the same space (or remove) if G::FS_IS_NON_NULL { - let irm = model.intent_read_model(); let mut txn_driver = global.namespace_txn_driver().lock(); // prepare txn let txn = gnstxn::CreateModelTxn::new( SpaceIDRef::new(&space_name, &space), &model_name, &model, - &irm, ); // attempt to initialize driver global.initialize_model_driver( diff --git a/server/src/engine/core/tests/ddl_model/alt.rs b/server/src/engine/core/tests/ddl_model/alt.rs index 5977b11e..5ec493f6 100644 --- a/server/src/engine/core/tests/ddl_model/alt.rs +++ b/server/src/engine/core/tests/ddl_model/alt.rs @@ -39,8 +39,7 @@ fn with_plan(model: &str, plan: &str, f: impl Fn(AlterPlan)) -> QueryResult<()> let model = create(model)?; let tok = lex_insecure(plan.as_bytes()).unwrap(); let alter = parse_ast_node_full(&tok[2..]).unwrap(); - let model_write = model.intent_write_model(); - let mv = AlterPlan::fdeltas(&model, &model_write, alter)?; + let mv = AlterPlan::fdeltas(&model, alter)?; Ok(f(mv)) } fn plan(model: &str, plan: &str, f: impl Fn(AlterPlan)) { @@ -366,9 +365,8 @@ mod exec { "create model myspace.mymodel(username: string, col1: uint64)", "alter model myspace.mymodel add (col2 { type: uint32, nullable: true }, col3 { type: uint16, nullable: true })", |model| { - let schema = model.intent_read_model(); assert_eq!( - schema + model .fields() .stseq_ord_kv() .rev() @@ -397,9 +395,8 @@ mod exec { "create model myspace.mymodel(username: string, col1: uint64, col2: uint32, col3: uint16, col4: uint8)", "alter model myspace.mymodel remove (col1, col2, col3, col4)", |mdl| { - let schema = mdl.intent_read_model(); assert_eq!( - schema + mdl .fields() .stseq_ord_kv() .rev() @@ -423,8 +420,7 @@ mod exec { "create model myspace.mymodel(username: string, password: binary)", "alter model myspace.mymodel update password { nullable: true }", |model| { - let schema = model.intent_read_model(); - assert!(schema.fields().st_get("password").unwrap().is_nullable()); + assert!(model.fields().st_get("password").unwrap().is_nullable()); assert_eq!( model.delta_state().schema_current_version(), DeltaVersion::genesis() diff --git a/server/src/engine/core/tests/ddl_model/crt.rs b/server/src/engine/core/tests/ddl_model/crt.rs index fa52a40d..909582c2 100644 --- a/server/src/engine/core/tests/ddl_model/crt.rs +++ b/server/src/engine/core/tests/ddl_model/crt.rs @@ -42,7 +42,6 @@ mod validation { assert_eq!(model.p_tag(), FullTag::STR); assert_eq!( model - .intent_read_model() .fields() .stseq_ord_value() .cloned() @@ -66,7 +65,6 @@ mod validation { assert_eq!(model.p_tag(), FullTag::STR); assert_eq!( model - .intent_read_model() .fields() .stseq_ord_value() .cloned() @@ -153,7 +151,6 @@ mod exec { .unwrap(); with_model(&global, SPACE, "mymodel", |model| { let models: Vec<(String, Field)> = model - .intent_read_model() .fields() .stseq_ord_kv() .map(|(k, v)| (k.to_string(), v.clone())) diff --git a/server/src/engine/core/tests/dml/mod.rs b/server/src/engine/core/tests/dml/mod.rs index f06779ae..41dd7854 100644 --- a/server/src/engine/core/tests/dml/mod.rs +++ b/server/src/engine/core/tests/dml/mod.rs @@ -74,7 +74,6 @@ fn _exec_only_read_key_and_then( ) -> QueryResult { let guard = sync::atm::cpin(); global.namespace().with_model(entity, |mdl| { - let _irm = mdl.intent_read_model(); let row = mdl .primary_index() .select(Lit::from(key_name), &guard) @@ -92,7 +91,6 @@ fn _exec_delete_only(global: &impl GlobalInstanceLike, delete: &str, key: &str) dml::delete(global, delete)?; assert_eq!( global.namespace().with_model(entity, |model| { - let _ = model.intent_read_model(); let g = sync::atm::cpin(); Ok(model.primary_index().select(key.into(), &g).is_none()) }), diff --git a/server/src/engine/storage/v1/batch_jrnl/persist.rs b/server/src/engine/storage/v1/batch_jrnl/persist.rs index ec0d3613..d0b8905c 100644 --- a/server/src/engine/storage/v1/batch_jrnl/persist.rs +++ b/server/src/engine/storage/v1/batch_jrnl/persist.rs @@ -36,7 +36,7 @@ use { core::{ index::{PrimaryIndexKey, RowData}, model::{ - delta::{DataDelta, DataDeltaKind, DeltaVersion, IRModel}, + delta::{DataDelta, DataDeltaKind, DeltaVersion}, Model, }, }, @@ -76,7 +76,6 @@ impl DataBatchPersistDriver { } pub fn write_new_batch(&mut self, model: &Model, observed_len: usize) -> RuntimeResult<()> { // pin model - let irm = model.intent_read_model(); let schema_version = model.delta_state().schema_current_version(); let g = pin(); // init restore list @@ -90,7 +89,7 @@ impl DataBatchPersistDriver { observed_len, schema_version, model.p_tag().tag_unique(), - irm.fields().len() - 1, + model.fields().len() - 1, )?; while i < observed_len { let delta = model.delta_state().__data_delta_dequeue(&g).unwrap(); @@ -116,7 +115,7 @@ impl DataBatchPersistDriver { self.write_batch_item_common_row_data(&delta)?; // encode data self.encode_pk_only(delta.row().d_key())?; - self.encode_row_data(model, &irm, &row_data)?; + self.encode_row_data(model, &row_data)?; } } i += 1; @@ -227,18 +226,13 @@ impl DataBatchPersistDriver { Ok(()) } /// Encode row data - fn encode_row_data( - &mut self, - mdl: &Model, - irm: &IRModel, - row_data: &RowData, - ) -> RuntimeResult<()> { - for field_name in irm.fields().stseq_ord_key() { + fn encode_row_data(&mut self, model: &Model, row_data: &RowData) -> RuntimeResult<()> { + for field_name in model.fields().stseq_ord_key() { match row_data.fields().get(field_name) { Some(cell) => { self.encode_cell(cell)?; } - None if field_name.as_ref() == mdl.p_key() => {} + None if field_name.as_ref() == model.p_key() => {} None => self.f.write_unfsynced(&[0])?, } } diff --git a/server/src/engine/storage/v1/batch_jrnl/restore.rs b/server/src/engine/storage/v1/batch_jrnl/restore.rs index c5b1bace..5851d044 100644 --- a/server/src/engine/storage/v1/batch_jrnl/restore.rs +++ b/server/src/engine/storage/v1/batch_jrnl/restore.rs @@ -215,7 +215,6 @@ impl DataBatchRestoreDriver { ) -> RuntimeResult<()> { // NOTE(@ohsayan): current complexity is O(n) which is good enough (in the future I might revise this to a fancier impl) // pin model - let irm = m.intent_read_model(); let g = unsafe { crossbeam_epoch::unprotected() }; let mut pending_delete = HashMap::new(); let p_index = m.primary_index().__raw_index(); @@ -235,7 +234,7 @@ impl DataBatchRestoreDriver { // new row (logically) let _ = p_index.mt_delete(&pk, &g); let mut data = DcFieldIndex::default(); - for (field_name, new_data) in irm + for (field_name, new_data) in m .fields() .stseq_ord_key() .filter(|key| key.as_ref() != m.p_key()) diff --git a/server/src/engine/storage/v1/inf/obj.rs b/server/src/engine/storage/v1/inf/obj.rs index 43c2d727..f2ed1372 100644 --- a/server/src/engine/storage/v1/inf/obj.rs +++ b/server/src/engine/storage/v1/inf/obj.rs @@ -29,7 +29,7 @@ use { crate::{ engine::{ core::{ - model::{delta::IRModel, Field, Layer, Model}, + model::{Field, Layer, Model}, space::Space, }, data::{ @@ -405,10 +405,10 @@ impl ModelLayoutMD { } #[derive(Clone, Copy)] -pub struct ModelLayoutRef<'a>(pub(super) &'a Model, pub(super) &'a IRModel<'a>); -impl<'a> From<(&'a Model, &'a IRModel<'a>)> for ModelLayoutRef<'a> { - fn from((mdl, irm): (&'a Model, &'a IRModel<'a>)) -> Self { - Self(mdl, irm) +pub struct ModelLayoutRef<'a>(pub(super) &'a Model); +impl<'a> From<&'a Model> for ModelLayoutRef<'a> { + fn from(mdl: &'a Model) -> Self { + Self(mdl) } } impl<'a> PersistObject for ModelLayoutRef<'a> { @@ -419,11 +419,11 @@ impl<'a> PersistObject for ModelLayoutRef<'a> { fn pretest_can_dec_object(scanner: &BufferedScanner, md: &Self::Metadata) -> bool { scanner.has_left(md.p_key_len as usize) } - fn meta_enc(buf: &mut VecU8, ModelLayoutRef(v, irm): Self::InputType) { - buf.extend(v.get_uuid().to_le_bytes()); - buf.extend(v.p_key().len().u64_bytes_le()); - buf.extend(v.p_tag().tag_selector().value_qword().to_le_bytes()); - buf.extend(irm.fields().len().u64_bytes_le()); + fn meta_enc(buf: &mut VecU8, ModelLayoutRef(model_def): Self::InputType) { + buf.extend(model_def.get_uuid().to_le_bytes()); + buf.extend(model_def.p_key().len().u64_bytes_le()); + buf.extend(model_def.p_tag().tag_selector().value_qword().to_le_bytes()); + buf.extend(model_def.fields().len().u64_bytes_le()); } unsafe fn meta_dec(scanner: &mut BufferedScanner) -> RuntimeResult { Ok(ModelLayoutMD::new( @@ -433,11 +433,11 @@ impl<'a> PersistObject for ModelLayoutRef<'a> { scanner.next_u64_le(), )) } - fn obj_enc(buf: &mut VecU8, ModelLayoutRef(mdl, irm): Self::InputType) { - buf.extend(mdl.p_key().as_bytes()); + fn obj_enc(buf: &mut VecU8, ModelLayoutRef(model_definition): Self::InputType) { + buf.extend(model_definition.p_key().as_bytes()); as PersistObject>::obj_enc( buf, - irm.fields(), + model_definition.fields(), ) } unsafe fn obj_dec( diff --git a/server/src/engine/storage/v1/inf/tests.rs b/server/src/engine/storage/v1/inf/tests.rs index b402854e..a942652a 100644 --- a/server/src/engine/storage/v1/inf/tests.rs +++ b/server/src/engine/storage/v1/inf/tests.rs @@ -102,8 +102,7 @@ fn model() { "profile_pic" => Field::new([Layer::bin()].into(), true), }, ); - let model_irm = model.intent_read_model(); - let enc = super::enc::enc_full::(obj::ModelLayoutRef(&model, &model_irm)); + let enc = super::enc::enc_full::(obj::ModelLayoutRef(&model)); let dec = super::dec::dec_full::(&enc).unwrap(); assert_eq!(model, dec); } diff --git a/server/src/engine/txn/gns/model.rs b/server/src/engine/txn/gns/model.rs index 0dc8a482..629778e3 100644 --- a/server/src/engine/txn/gns/model.rs +++ b/server/src/engine/txn/gns/model.rs @@ -29,7 +29,7 @@ use { crate::{ engine::{ core::{ - model::{delta::IRModel, Field, Model}, + model::{Field, Model}, space::Space, GlobalNS, {EntityID, EntityIDRef}, }, @@ -187,15 +187,15 @@ fn with_space_mut( f(&mut space) } -fn with_model( +fn with_model_mut( gns: &GlobalNS, space_id: &super::SpaceIDRes, model_id: &ModelIDRes, - mut f: impl FnMut(&Model) -> RuntimeResult, + mut f: impl FnMut(&mut Model) -> RuntimeResult, ) -> RuntimeResult { with_space(gns, space_id, |_| { - let models = gns.idx_models().read(); - let Some(model) = models.get(&EntityIDRef::new(&space_id.name, &model_id.model_name)) + let mut models = gns.idx_models().write(); + let Some(model) = models.get_mut(&EntityIDRef::new(&space_id.name, &model_id.model_name)) else { return Err(TransactionError::OnRestoreDataMissing.into()); }; @@ -217,7 +217,6 @@ pub struct CreateModelTxn<'a> { space_id: super::SpaceIDRef<'a>, model_name: &'a str, model: &'a Model, - model_read: &'a IRModel<'a>, } impl<'a> CreateModelTxn<'a> { @@ -225,13 +224,11 @@ impl<'a> CreateModelTxn<'a> { space_id: super::SpaceIDRef<'a>, model_name: &'a str, model: &'a Model, - model_read: &'a IRModel<'a>, ) -> Self { Self { space_id, model_name, model, - model_read, } } } @@ -266,10 +263,7 @@ impl<'a> PersistObject for CreateModelTxn<'a> { // model name buf.extend(data.model_name.len().u64_bytes_le()); // model meta dump - ::meta_enc( - buf, - obj::ModelLayoutRef::from((data.model, data.model_read)), - ) + ::meta_enc(buf, obj::ModelLayoutRef::from(data.model)) } unsafe fn meta_dec(scanner: &mut BufferedScanner) -> RuntimeResult { let space_id = ::meta_dec(scanner)?; @@ -287,10 +281,7 @@ impl<'a> PersistObject for CreateModelTxn<'a> { // model name buf.extend(data.model_name.as_bytes()); // model dump - ::obj_enc( - buf, - obj::ModelLayoutRef::from((data.model, data.model_read)), - ) + ::obj_enc(buf, obj::ModelLayoutRef::from(data.model)) } unsafe fn obj_dec( s: &mut BufferedScanner, @@ -432,16 +423,15 @@ impl<'a> GNSEvent for AlterModelAddTxn<'a> { }: Self::RestoreType, gns: &GlobalNS, ) -> RuntimeResult<()> { - with_model(gns, &model_id.space_id, &model_id, |model| { - let mut wmodel = model.intent_write_model(); + with_model_mut(gns, &model_id.space_id, &model_id, |model| { for (i, (field_name, field)) in new_fields.stseq_ord_kv().enumerate() { - if !wmodel + if !model .fields_mut() .st_insert(field_name.to_owned(), field.clone()) { // rollback; corrupted new_fields.stseq_ord_key().take(i).for_each(|field_id| { - let _ = wmodel.fields_mut().st_delete(field_id); + let _ = model.fields_mut().st_delete(field_id); }); return Err(TransactionError::OnRestoreDataConflictMismatch.into()); } @@ -450,8 +440,8 @@ impl<'a> GNSEvent for AlterModelAddTxn<'a> { // publish deltas for field_name in new_fields.stseq_ord_key() { model - .delta_state() - .schema_append_unresolved_field_add(field_name); + .delta_state_mut() + .schema_append_unresolved_wl_field_add(field_name); } Ok(()) }) @@ -551,18 +541,17 @@ impl<'a> GNSEvent for AlterModelRemoveTxn<'a> { }: Self::RestoreType, gns: &GlobalNS, ) -> RuntimeResult<()> { - with_model(gns, &model_id.space_id, &model_id, |model| { - let mut iwm = model.intent_write_model(); + with_model_mut(gns, &model_id.space_id, &model_id, |model| { let mut removed_fields_rb = vec![]; for removed_field in removed_fields.iter() { - match iwm.fields_mut().st_delete_return(removed_field) { + match model.fields_mut().st_delete_return(removed_field) { Some(field) => { removed_fields_rb.push((removed_field as &str, field)); } None => { // rollback removed_fields_rb.into_iter().for_each(|(field_id, field)| { - let _ = iwm.fields_mut().st_insert(field_id.into(), field); + let _ = model.fields_mut().st_insert(field_id.into(), field); }); return Err(TransactionError::OnRestoreDataConflictMismatch.into()); } @@ -572,8 +561,8 @@ impl<'a> GNSEvent for AlterModelRemoveTxn<'a> { // publish deltas for field_name in removed_fields.iter() { model - .delta_state() - .schema_append_unresolved_field_rem(field_name); + .delta_state_mut() + .schema_append_unresolved_wl_field_rem(field_name); } Ok(()) }) @@ -667,16 +656,15 @@ impl<'a> GNSEvent for AlterModelUpdateTxn<'a> { }: Self::RestoreType, gns: &GlobalNS, ) -> RuntimeResult<()> { - with_model(gns, &model_id.space_id, &model_id, |model| { - let mut iwm = model.intent_write_model(); + with_model_mut(gns, &model_id.space_id, &model_id, |model| { let mut fields_rb = vec![]; for (field_id, field) in updated_fields.iter() { - match iwm.fields_mut().st_update_return(field_id, field.clone()) { + match model.fields_mut().st_update_return(field_id, field.clone()) { Some(f) => fields_rb.push((field_id as &str, f)), None => { // rollback fields_rb.into_iter().for_each(|(field_id, field)| { - let _ = iwm.fields_mut().st_update(field_id, field); + let _ = model.fields_mut().st_update(field_id, field); }); return Err(TransactionError::OnRestoreDataConflictMismatch.into()); } diff --git a/server/src/engine/txn/gns/tests/full_chain.rs b/server/src/engine/txn/gns/tests/full_chain.rs index 7ec18033..191d663f 100644 --- a/server/src/engine/txn/gns/tests/full_chain.rs +++ b/server/src/engine/txn/gns/tests/full_chain.rs @@ -219,11 +219,7 @@ fn alter_model_add() { .namespace() .with_model(("myspace", "mymodel").into(), |model| { assert_eq!( - model - .intent_read_model() - .fields() - .st_get("profile_pic") - .unwrap(), + model.fields().st_get("profile_pic").unwrap(), &Field::new([Layer::bin()].into(), true) ); Ok(()) @@ -257,9 +253,8 @@ fn alter_model_remove() { global .namespace() .with_model(("myspace", "mymodel").into(), |model| { - let irm = model.intent_read_model(); - assert!(irm.fields().st_get("has_secure_key").is_none()); - assert!(irm.fields().st_get("is_dumb").is_none()); + assert!(model.fields().st_get("has_secure_key").is_none()); + assert!(model.fields().st_get("is_dumb").is_none()); Ok(()) }) .unwrap(); @@ -291,11 +286,7 @@ fn alter_model_update() { .namespace() .with_model(("myspace", "mymodel").into(), |model| { assert_eq!( - model - .intent_read_model() - .fields() - .st_get("profile_pic") - .unwrap(), + model.fields().st_get("profile_pic").unwrap(), &Field::new([Layer::bin()].into(), true) ); Ok(()) diff --git a/server/src/engine/txn/gns/tests/io.rs b/server/src/engine/txn/gns/tests/io.rs index 470aac63..fd6b6dcc 100644 --- a/server/src/engine/txn/gns/tests/io.rs +++ b/server/src/engine/txn/gns/tests/io.rs @@ -118,15 +118,8 @@ mod model_tests { #[test] fn create() { let (space, model) = default_space_model(); - let irm = model.intent_read_model(); - let txn = CreateModelTxn::new( - super::SpaceIDRef::new("myspace", &space), - "mymodel", - &model, - &irm, - ); + let txn = CreateModelTxn::new(super::SpaceIDRef::new("myspace", &space), "mymodel", &model); let encoded = super::enc::enc_full_self(txn); - core::mem::drop(irm); let decoded = super::dec::dec_full::(&encoded).unwrap(); assert_eq!( CreateModelTxnRestorePL {