Also store schema version in delta

next
Sayan Nandan 1 year ago
parent e4848e645e
commit ee9ccd5a30
No known key found for this signature in database
GPG Key ID: 42EEDF4AE9D96B54

@ -35,6 +35,7 @@ use crate::engine::{
pub fn delete(gns: &GlobalNS, mut delete: DeleteStatement) -> DatabaseResult<()> {
gns.with_model(delete.entity(), |model| {
let g = sync::atm::cpin();
let schema_version = model.delta_state().schema_current_version();
let delta_state = model.delta_state();
// create new version
let new_version = delta_state.create_new_data_delta_version();
@ -47,6 +48,7 @@ pub fn delete(gns: &GlobalNS, mut delete: DeleteStatement) -> DatabaseResult<()>
delta_state.append_new_data_delta(
DataDeltaKind::Delete,
row.clone(),
schema_version,
new_version,
&g,
);

@ -47,7 +47,13 @@ pub fn insert(gns: &GlobalNS, insert: InsertStatement) -> DatabaseResult<()> {
let row = Row::new(pk, data, ds.schema_current_version(), cv);
if mdl.primary_index().__raw_index().mt_insert(row.clone(), &g) {
// append delta for new version
ds.append_new_data_delta(DataDeltaKind::Insert, row, cv, &g);
ds.append_new_data_delta(
DataDeltaKind::Insert,
row,
ds.schema_current_version(),
cv,
&g,
);
Ok(())
} else {
Err(DatabaseError::DmlConstraintViolationDuplicate)

@ -343,7 +343,13 @@ pub fn update(gns: &GlobalNS, mut update: UpdateStatement) -> DatabaseResult<()>
// update revised tag
row_data_wl.set_txn_revised(cv);
// publish delta
ds.append_new_data_delta(DataDeltaKind::Update, row.clone(), cv, &g);
ds.append_new_data_delta(
DataDeltaKind::Update,
row.clone(),
ds.schema_current_version(),
cv,
&g,
);
}
ret
})

@ -173,11 +173,12 @@ impl DeltaState {
&self,
kind: DataDeltaKind,
row: Row,
version: DeltaVersion,
schema_version: DeltaVersion,
data_version: DeltaVersion,
g: &Guard,
) {
self.data_deltas
.blocking_enqueue(DataDelta::new(version.0, row, kind), g);
.blocking_enqueue(DataDelta::new(schema_version, data_version, row, kind), g);
}
pub fn create_new_data_delta_version(&self) -> DeltaVersion {
DeltaVersion(self.__data_delta_step())
@ -315,15 +316,22 @@ impl<'a> SchemaDeltaIndexRGuard<'a> {
#[derive(Debug)]
pub struct DataDelta {
version: u64,
schema_version: DeltaVersion,
data_version: DeltaVersion,
row: Row,
change: DataDeltaKind,
}
impl DataDelta {
pub const fn new(version: u64, row: Row, change: DataDeltaKind) -> Self {
pub const fn new(
schema_version: DeltaVersion,
data_version: DeltaVersion,
row: Row,
change: DataDeltaKind,
) -> Self {
Self {
version,
schema_version,
data_version,
row,
change,
}

@ -51,7 +51,7 @@ pub struct SpaceMeta {
}
impl SpaceMeta {
pub const KEY_ENV: &str = "env";
pub const KEY_ENV: &'static str = "env";
pub fn new_with_meta(props: DictGeneric) -> Self {
Self {
props: RwLock::new(props),

@ -26,7 +26,7 @@
use crate::{
engine::{
core::{index::PrimaryIndexKey, GlobalNS},
core::{index::PrimaryIndexKey, model::delta::DataDelta, GlobalNS},
data::cell::Datacell,
storage::v1::inf::obj,
},
@ -35,8 +35,16 @@ use crate::{
type Buf = Vec<u8>;
static mut CAP_PER_LL: usize = 0;
static mut FREEMEM: u64 = 0;
/*
memory adjustments
*/
/// free memory in bytes
static mut FREEMEM_BYTES: u64 = 0;
/// capacity in bytes, per linked list
static mut CAP_PER_LL_BYTES: u64 = 0;
/// maximum number of nodes in linked list
static mut MAX_NODES_IN_LL_CNT: usize = 0;
/// Set the free memory and cap for deltas so that we don't bust through memory
///
@ -51,8 +59,18 @@ pub unsafe fn set_limits(gns: &GlobalNS) {
.map(|space| space.models().read().len())
.sum();
let available_mem = os::free_memory_in_bytes();
FREEMEM = available_mem;
CAP_PER_LL = ((available_mem as usize / core::cmp::max(1, model_cnt)) as f64 * 0.01) as usize;
FREEMEM_BYTES = available_mem;
CAP_PER_LL_BYTES =
((available_mem / core::cmp::max(1, model_cnt) as u64) as f64 * 0.002) as u64;
MAX_NODES_IN_LL_CNT = CAP_PER_LL_BYTES as usize / (sizeof!(DataDelta) + sizeof!(u64));
}
/// Returns the maximum number of nodes that can be stored inside a delta queue for a model
///
/// Currently hardcoded to 0.2% of free memory after all datasets have been loaded
pub unsafe fn get_max_delta_queue_size() -> usize {
// TODO(@ohsayan): dynamically approximate this limit
MAX_NODES_IN_LL_CNT
}
/*

Loading…
Cancel
Save