Request delta updates

next
Sayan Nandan 1 year ago
parent 0d076dd1b3
commit cf055f418d
No known key found for this signature in database
GPG Key ID: 42EEDF4AE9D96B54

@ -25,7 +25,7 @@
*/
use crate::engine::{
core::model::delta::DataDeltaKind,
core::{self, model::delta::DataDeltaKind},
error::{DatabaseError, DatabaseResult},
fractal::GlobalInstanceLike,
idx::MTIndex,
@ -34,7 +34,7 @@ use crate::engine::{
};
pub fn delete(global: &impl GlobalInstanceLike, mut delete: DeleteStatement) -> DatabaseResult<()> {
global.namespace().with_model(delete.entity(), |model| {
core::with_model_for_data_update(global, delete.entity(), |model| {
let g = sync::atm::cpin();
let schema_version = model.delta_state().schema_current_version();
let delta_state = model.delta_state();

@ -26,6 +26,7 @@
use crate::engine::{
core::{
self,
index::{DcFieldIndex, PrimaryIndexKey, Row},
model::{delta::DataDeltaKind, Fields, Model},
},
@ -36,8 +37,8 @@ use crate::engine::{
sync::atm::cpin,
};
pub fn insert(gns: &impl GlobalInstanceLike, insert: InsertStatement) -> DatabaseResult<()> {
gns.namespace().with_model(insert.entity(), |mdl| {
pub fn insert(global: &impl GlobalInstanceLike, insert: InsertStatement) -> DatabaseResult<()> {
core::with_model_for_data_update(global, insert.entity(), |mdl| {
let irmwd = mdl.intent_write_new_data();
let (pk, data) = prepare_insert(mdl, irmwd.fields(), insert.data())?;
let g = cpin();

@ -30,7 +30,7 @@ use std::cell::RefCell;
use {
crate::{
engine::{
core::{model::delta::DataDeltaKind, query_meta::AssignmentOperator},
core::{self, model::delta::DataDeltaKind, query_meta::AssignmentOperator},
data::{
cell::Datacell,
lit::LitIR,
@ -234,7 +234,7 @@ pub fn collect_trace_path() -> Vec<&'static str> {
}
pub fn update(global: &impl GlobalInstanceLike, mut update: UpdateStatement) -> DatabaseResult<()> {
global.namespace().with_model(update.entity(), |mdl| {
core::with_model_for_data_update(global, update.entity(), |mdl| {
let mut ret = Ok(());
// prepare row fetch
let key = mdl.resolve_where(update.clauses_mut())?;

@ -62,4 +62,7 @@ impl PrimaryIndex {
pub fn __raw_index(&self) -> &IndexMTRaw<row::Row> {
&self.data
}
pub fn count(&self) -> usize {
self.data.mt_len()
}
}

@ -39,6 +39,7 @@ use {
crate::engine::{
core::space::Space,
error::{DatabaseError, DatabaseResult},
fractal::GlobalInstanceLike,
idx::{IndexST, STIndex},
},
parking_lot::RwLock,
@ -55,6 +56,28 @@ pub struct GlobalNS {
index_space: RWLIdx<Box<str>, Space>,
}
pub(self) fn with_model_for_data_update<'a, T, E, F>(
global: &impl GlobalInstanceLike,
entity: E,
f: F,
) -> DatabaseResult<T>
where
F: FnOnce(&Model) -> DatabaseResult<T>,
E: 'a + EntityLocator<'a>,
{
let (space_name, model_name) = entity.parse_entity()?;
global
.namespace()
.with_model((space_name, model_name), |mdl| {
let r = f(mdl);
// see if this task local delta is full
if r.is_ok() {
model::DeltaState::guard_delta_overflow(global, space_name, model_name, mdl);
}
r
})
}
impl GlobalNS {
pub fn spaces(&self) -> &RWLIdx<Box<str>, Space> {
&self.index_space

@ -26,8 +26,14 @@
use {
super::{Fields, Model},
crate::engine::{
core::index::Row, fractal::FractalToken, sync::atm::Guard, sync::queue::Queue,
crate::{
engine::{
core::index::Row,
fractal::{FractalToken, GlobalInstanceLike},
sync::atm::Guard,
sync::queue::Queue,
},
util::compiler,
},
parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard},
std::{
@ -171,6 +177,29 @@ impl DeltaState {
}
}
// data direct
impl DeltaState {
pub(in crate::engine::core) fn guard_delta_overflow(
global: &impl GlobalInstanceLike,
space_name: &str,
model_name: &str,
model: &Model,
) {
let current_deltas_size = model.delta_state().data_deltas_size.load(Ordering::Acquire);
let max_len = global
.get_max_delta_size()
.min((model.primary_index().count() as f64 * 0.05) as usize);
if compiler::unlikely(current_deltas_size >= max_len) {
global.request_batch_resolve(
space_name,
model_name,
model.get_uuid(),
current_deltas_size,
);
}
}
}
// data
impl DeltaState {
pub fn append_new_data_delta_with(

@ -102,6 +102,19 @@ pub trait GlobalInstanceLike {
fn post_high_priority_task(&self, task: Task<CriticalTask>);
fn post_standard_priority_task(&self, task: Task<GenericTask>);
fn get_max_delta_size(&self) -> usize;
// default impls
fn request_batch_resolve(
&self,
space_name: &str,
model_name: &str,
model_uuid: Uuid,
observed_len: usize,
) {
self.post_high_priority_task(Task::new(CriticalTask::WriteBatch(
ModelUniqueID::new(space_name, model_name, model_uuid),
observed_len,
)))
}
}
impl GlobalInstanceLike for Global {

@ -149,6 +149,8 @@ pub trait MTIndex<E, K, V>: IndexBaseSpec {
't: 'v,
V: 'v,
Self: 't;
/// Returns the length of the index
fn mt_len(&self) -> usize;
/// Attempts to compact the backing storage
fn mt_compact(&self) {}
/// Clears all the entries in the MTIndex

@ -87,6 +87,9 @@ impl<E: TreeElement, C: Config> MTIndex<E, E::Key, E::Value> for Raw<E, C> {
E::Value: 'v,
Self: 't;
fn mt_len(&self) -> usize {
self.len()
}
fn mt_clear(&self, g: &Guard) {
self.nontransactional_clear(g)
}

Loading…
Cancel
Save