diff --git a/server/src/engine/core/dml/del.rs b/server/src/engine/core/dml/del.rs index 0b1f6dfb..4deab3a5 100644 --- a/server/src/engine/core/dml/del.rs +++ b/server/src/engine/core/dml/del.rs @@ -35,6 +35,7 @@ use crate::engine::{ pub fn delete(gns: &GlobalNS, mut delete: DeleteStatement) -> DatabaseResult<()> { gns.with_model(delete.entity(), |model| { let g = sync::atm::cpin(); + let schema_version = model.delta_state().schema_current_version(); let delta_state = model.delta_state(); // create new version let new_version = delta_state.create_new_data_delta_version(); @@ -47,6 +48,7 @@ pub fn delete(gns: &GlobalNS, mut delete: DeleteStatement) -> DatabaseResult<()> delta_state.append_new_data_delta( DataDeltaKind::Delete, row.clone(), + schema_version, new_version, &g, ); diff --git a/server/src/engine/core/dml/ins.rs b/server/src/engine/core/dml/ins.rs index ede87bf4..75d37bd1 100644 --- a/server/src/engine/core/dml/ins.rs +++ b/server/src/engine/core/dml/ins.rs @@ -47,7 +47,13 @@ pub fn insert(gns: &GlobalNS, insert: InsertStatement) -> DatabaseResult<()> { let row = Row::new(pk, data, ds.schema_current_version(), cv); if mdl.primary_index().__raw_index().mt_insert(row.clone(), &g) { // append delta for new version - ds.append_new_data_delta(DataDeltaKind::Insert, row, cv, &g); + ds.append_new_data_delta( + DataDeltaKind::Insert, + row, + ds.schema_current_version(), + cv, + &g, + ); Ok(()) } else { Err(DatabaseError::DmlConstraintViolationDuplicate) diff --git a/server/src/engine/core/dml/upd.rs b/server/src/engine/core/dml/upd.rs index a53b5ec9..3077d4b3 100644 --- a/server/src/engine/core/dml/upd.rs +++ b/server/src/engine/core/dml/upd.rs @@ -343,7 +343,13 @@ pub fn update(gns: &GlobalNS, mut update: UpdateStatement) -> DatabaseResult<()> // update revised tag row_data_wl.set_txn_revised(cv); // publish delta - ds.append_new_data_delta(DataDeltaKind::Update, row.clone(), cv, &g); + ds.append_new_data_delta( + DataDeltaKind::Update, + row.clone(), + ds.schema_current_version(), + cv, + &g, + ); } ret }) diff --git a/server/src/engine/core/model/delta.rs b/server/src/engine/core/model/delta.rs index 970094cb..4265a4c9 100644 --- a/server/src/engine/core/model/delta.rs +++ b/server/src/engine/core/model/delta.rs @@ -173,11 +173,12 @@ impl DeltaState { &self, kind: DataDeltaKind, row: Row, - version: DeltaVersion, + schema_version: DeltaVersion, + data_version: DeltaVersion, g: &Guard, ) { self.data_deltas - .blocking_enqueue(DataDelta::new(version.0, row, kind), g); + .blocking_enqueue(DataDelta::new(schema_version, data_version, row, kind), g); } pub fn create_new_data_delta_version(&self) -> DeltaVersion { DeltaVersion(self.__data_delta_step()) @@ -315,15 +316,22 @@ impl<'a> SchemaDeltaIndexRGuard<'a> { #[derive(Debug)] pub struct DataDelta { - version: u64, + schema_version: DeltaVersion, + data_version: DeltaVersion, row: Row, change: DataDeltaKind, } impl DataDelta { - pub const fn new(version: u64, row: Row, change: DataDeltaKind) -> Self { + pub const fn new( + schema_version: DeltaVersion, + data_version: DeltaVersion, + row: Row, + change: DataDeltaKind, + ) -> Self { Self { - version, + schema_version, + data_version, row, change, } diff --git a/server/src/engine/core/space.rs b/server/src/engine/core/space.rs index a4094167..f6da7c3e 100644 --- a/server/src/engine/core/space.rs +++ b/server/src/engine/core/space.rs @@ -51,7 +51,7 @@ pub struct SpaceMeta { } impl SpaceMeta { - pub const KEY_ENV: &str = "env"; + pub const KEY_ENV: &'static str = "env"; pub fn new_with_meta(props: DictGeneric) -> Self { Self { props: RwLock::new(props), diff --git a/server/src/engine/txn/data.rs b/server/src/engine/txn/data.rs index a9e57fa8..fcc9cd48 100644 --- a/server/src/engine/txn/data.rs +++ b/server/src/engine/txn/data.rs @@ -26,7 +26,7 @@ use crate::{ engine::{ - core::{index::PrimaryIndexKey, GlobalNS}, + core::{index::PrimaryIndexKey, model::delta::DataDelta, GlobalNS}, data::cell::Datacell, storage::v1::inf::obj, }, @@ -35,8 +35,16 @@ use crate::{ type Buf = Vec; -static mut CAP_PER_LL: usize = 0; -static mut FREEMEM: u64 = 0; +/* + memory adjustments +*/ + +/// free memory in bytes +static mut FREEMEM_BYTES: u64 = 0; +/// capacity in bytes, per linked list +static mut CAP_PER_LL_BYTES: u64 = 0; +/// maximum number of nodes in linked list +static mut MAX_NODES_IN_LL_CNT: usize = 0; /// Set the free memory and cap for deltas so that we don't bust through memory /// @@ -51,8 +59,18 @@ pub unsafe fn set_limits(gns: &GlobalNS) { .map(|space| space.models().read().len()) .sum(); let available_mem = os::free_memory_in_bytes(); - FREEMEM = available_mem; - CAP_PER_LL = ((available_mem as usize / core::cmp::max(1, model_cnt)) as f64 * 0.01) as usize; + FREEMEM_BYTES = available_mem; + CAP_PER_LL_BYTES = + ((available_mem / core::cmp::max(1, model_cnt) as u64) as f64 * 0.002) as u64; + MAX_NODES_IN_LL_CNT = CAP_PER_LL_BYTES as usize / (sizeof!(DataDelta) + sizeof!(u64)); +} + +/// Returns the maximum number of nodes that can be stored inside a delta queue for a model +/// +/// Currently hardcoded to 0.2% of free memory after all datasets have been loaded +pub unsafe fn get_max_delta_queue_size() -> usize { + // TODO(@ohsayan): dynamically approximate this limit + MAX_NODES_IN_LL_CNT } /*