Sync row deltas to delta state

next
Sayan Nandan 1 year ago
parent f230bc7920
commit e4848e645e
No known key found for this signature in database
GPG Key ID: 42EEDF4AE9D96B54

@ -25,8 +25,9 @@
*/
use crate::engine::{
core::GlobalNS,
core::{model::delta::DataDeltaKind, GlobalNS},
error::{DatabaseError, DatabaseResult},
idx::MTIndex,
ql::dml::del::DeleteStatement,
sync,
};
@ -34,13 +35,24 @@ use crate::engine::{
pub fn delete(gns: &GlobalNS, mut delete: DeleteStatement) -> DatabaseResult<()> {
gns.with_model(delete.entity(), |model| {
let g = sync::atm::cpin();
if model
let delta_state = model.delta_state();
// create new version
let new_version = delta_state.create_new_data_delta_version();
match model
.primary_index()
.remove(model.resolve_where(delete.clauses_mut())?, &g)
.__raw_index()
.mt_delete_return_entry(&model.resolve_where(delete.clauses_mut())?, &g)
{
Ok(())
} else {
Err(DatabaseError::DmlEntryNotFound)
Some(row) => {
delta_state.append_new_data_delta(
DataDeltaKind::Delete,
row.clone(),
new_version,
&g,
);
Ok(())
}
None => Err(DatabaseError::DmlEntryNotFound),
}
})
}

@ -26,12 +26,12 @@
use crate::engine::{
core::{
index::{DcFieldIndex, PrimaryIndexKey},
model::{Fields, Model},
index::{DcFieldIndex, PrimaryIndexKey, Row},
model::{delta::DataDeltaKind, Fields, Model},
GlobalNS,
},
error::{DatabaseError, DatabaseResult},
idx::{IndexBaseSpec, STIndex, STIndexSeq},
idx::{IndexBaseSpec, MTIndex, STIndex, STIndexSeq},
ql::dml::ins::{InsertData, InsertStatement},
sync::atm::cpin,
};
@ -41,10 +41,13 @@ pub fn insert(gns: &GlobalNS, insert: InsertStatement) -> DatabaseResult<()> {
let irmwd = mdl.intent_write_new_data();
let (pk, data) = prepare_insert(mdl, irmwd.fields(), insert.data())?;
let g = cpin();
if mdl
.primary_index()
.insert(pk, data, mdl.delta_state().current_version(), &g)
{
let ds = mdl.delta_state();
// create new version
let cv = ds.create_new_data_delta_version();
let row = Row::new(pk, data, ds.schema_current_version(), cv);
if mdl.primary_index().__raw_index().mt_insert(row.clone(), &g) {
// append delta for new version
ds.append_new_data_delta(DataDeltaKind::Insert, row, cv, &g);
Ok(())
} else {
Err(DatabaseError::DmlConstraintViolationDuplicate)

@ -56,7 +56,7 @@ where
};
match mdl.primary_index().select(target_key.clone(), &g) {
Some(row) => {
let r = row.resolve_deltas_and_freeze(mdl.delta_state());
let r = row.resolve_schema_deltas_and_freeze(mdl.delta_state());
if select.is_wildcard() {
for key in irm.fields().stseq_ord_key() {
read_field(key.as_ref(), r.fields())?;

@ -26,10 +26,11 @@
#[cfg(test)]
use std::cell::RefCell;
use {
crate::{
engine::{
core::{query_meta::AssignmentOperator, GlobalNS},
core::{model::delta::DataDeltaKind, query_meta::AssignmentOperator, GlobalNS},
data::{
cell::Datacell,
lit::LitIR,
@ -234,19 +235,28 @@ pub fn collect_trace_path() -> Vec<&'static str> {
pub fn update(gns: &GlobalNS, mut update: UpdateStatement) -> DatabaseResult<()> {
gns.with_model(update.entity(), |mdl| {
let mut ret = Ok(());
// prepare row fetch
let key = mdl.resolve_where(update.clauses_mut())?;
// freeze schema
let irm = mdl.intent_read_model();
// fetch row
let g = sync::atm::cpin();
let Some(row) = mdl.primary_index().select(key, &g) else {
return Err(DatabaseError::DmlEntryNotFound);
};
// lock row
let mut row_data_wl = row.d_data().write();
// create new version
let ds = mdl.delta_state();
let cv = ds.create_new_data_delta_version();
// process changes
let mut rollback_now = false;
let mut rollback_data = Vec::with_capacity(update.expressions().len());
let mut assn_expressions = update.into_expressions().into_iter();
/*
FIXME(@ohsayan): where's my usual magic? I'll do it once we have the SE stabilized
*/
// apply changes
while (assn_expressions.len() != 0) & (!rollback_now) {
let AssignmentExpression {
lhs,
@ -329,6 +339,11 @@ pub fn update(gns: &GlobalNS, mut update: UpdateStatement) -> DatabaseResult<()>
.for_each(|(field_id, restored_data)| {
row_data_wl.fields_mut().st_update(field_id, restored_data);
});
} else {
// update revised tag
row_data_wl.set_txn_revised(cv);
// publish delta
ds.append_new_data_delta(DataDeltaKind::Update, row.clone(), cv, &g);
}
ret
})

@ -50,6 +50,12 @@ pub struct PrimaryIndexKey {
data: SpecialPaddedWord,
}
impl PrimaryIndexKey {
pub fn tag(&self) -> TagUnique {
self.tag
}
}
impl PrimaryIndexKey {
pub unsafe fn read_uint(&self) -> u64 {
self.data.load()

@ -28,7 +28,6 @@ mod key;
mod row;
use crate::engine::{
core::model::DeltaVersion,
data::lit::LitIR,
idx::{IndexBaseSpec, IndexMTRaw, MTIndex},
sync::atm::Guard,
@ -50,16 +49,6 @@ impl PrimaryIndex {
data: IndexMTRaw::idx_init(),
}
}
pub fn insert(
&self,
key: PrimaryIndexKey,
data: row::DcFieldIndex,
delta_version: DeltaVersion,
g: &Guard,
) -> bool {
self.data
.mt_insert(Row::new(key, data, delta_version, delta_version), g)
}
pub fn remove<'a>(&self, key: LitIR<'a>, g: &Guard) -> bool {
self.data.mt_delete(&key, g)
}
@ -70,4 +59,7 @@ impl PrimaryIndex {
) -> Option<&'v Row> {
self.data.mt_get_element(&key, g)
}
pub fn __raw_index(&self) -> &IndexMTRaw<row::Row> {
&self.data
}
}

@ -28,7 +28,7 @@ use {
super::key::PrimaryIndexKey,
crate::{
engine::{
core::model::{DeltaKind, DeltaState, DeltaVersion},
core::model::{DeltaState, DeltaVersion, SchemaDeltaKind},
data::cell::Datacell,
idx::{meta::hash::HasherNativeFx, mtchm::meta::TreeElement, IndexST, STIndex},
sync::smart::RawRC,
@ -43,7 +43,7 @@ pub type DcFieldIndex = IndexST<Box<str>, Datacell, HasherNativeFx>;
#[derive(Debug)]
pub struct Row {
__txn_genesis: DeltaVersion,
__genesis_schema_version: DeltaVersion,
__pk: ManuallyDrop<PrimaryIndexKey>,
__rc: RawRC<RwLock<RowData>>,
}
@ -51,7 +51,8 @@ pub struct Row {
#[derive(Debug, PartialEq)]
pub struct RowData {
fields: DcFieldIndex,
txn_revised: DeltaVersion,
txn_revised_data: DeltaVersion,
txn_revised_schema_version: DeltaVersion,
}
impl RowData {
@ -61,6 +62,12 @@ impl RowData {
pub fn fields_mut(&mut self) -> &mut DcFieldIndex {
&mut self.fields
}
pub fn set_txn_revised(&mut self, new: DeltaVersion) {
self.txn_revised_data = new;
}
pub fn get_txn_revised(&self) -> DeltaVersion {
self.txn_revised_data
}
}
impl TreeElement for Row {
@ -90,17 +97,18 @@ impl Row {
pub fn new(
pk: PrimaryIndexKey,
data: DcFieldIndex,
txn_genesis: DeltaVersion,
txn_revised: DeltaVersion,
schema_version: DeltaVersion,
txn_revised_data: DeltaVersion,
) -> Self {
Self {
__txn_genesis: txn_genesis,
__genesis_schema_version: schema_version,
__pk: ManuallyDrop::new(pk),
__rc: unsafe {
// UNSAFE(@ohsayan): we free this up later
RawRC::new(RwLock::new(RowData {
fields: data,
txn_revised,
txn_revised_schema_version: schema_version,
txn_revised_data,
}))
},
}
@ -131,31 +139,32 @@ impl Row {
}
impl Row {
pub fn resolve_deltas_and_freeze<'g>(
pub fn resolve_schema_deltas_and_freeze<'g>(
&'g self,
delta_state: &DeltaState,
) -> RwLockReadGuard<'g, RowData> {
let rwl_ug = self.d_data().upgradable_read();
let current_version = delta_state.current_version();
if compiler::likely(current_version <= rwl_ug.txn_revised) {
let current_version = delta_state.schema_current_version();
if compiler::likely(current_version <= rwl_ug.txn_revised_schema_version) {
return RwLockUpgradableReadGuard::downgrade(rwl_ug);
}
// we have deltas to apply
let mut wl = RwLockUpgradableReadGuard::upgrade(rwl_ug);
let delta_read = delta_state.rguard();
let mut max_delta = wl.txn_revised;
for (delta_id, delta) in delta_read.resolve_iter_since(wl.txn_revised) {
let delta_read = delta_state.schema_delta_read();
let mut max_delta = wl.txn_revised_schema_version;
for (delta_id, delta) in delta_read.resolve_iter_since(wl.txn_revised_schema_version) {
match delta.kind() {
DeltaKind::FieldAdd(f) => {
SchemaDeltaKind::FieldAdd(f) => {
wl.fields.st_insert(f.clone(), Datacell::null());
}
DeltaKind::FieldRem(f) => {
SchemaDeltaKind::FieldRem(f) => {
wl.fields.st_delete(f);
}
}
max_delta = *delta_id;
}
wl.txn_revised = max_delta;
// we've revised upto the most most recent delta version (that we saw at this point)
wl.txn_revised_schema_version = max_delta;
return RwLockWriteGuard::downgrade(wl);
}
}

@ -25,7 +25,7 @@
*/
mod dml;
mod index;
pub(in crate::engine) mod index;
pub(in crate::engine) mod model;
pub(in crate::engine) mod query_meta;
pub mod space;

@ -270,7 +270,7 @@ impl Model {
match plan.action {
AlterAction::Ignore => drop(iwm),
AlterAction::Add(new_fields) => {
let mut guard = model.delta_state().wguard();
let mut guard = model.delta_state().schema_delta_write();
// TODO(@ohsayan): this impacts lockdown duration; fix it
if GI::NONNULL {
// prepare txn
@ -287,12 +287,12 @@ impl Model {
.for_each(|(field_id, field)| {
model
.delta_state()
.append_unresolved_wl_field_add(&mut guard, &field_id);
.schema_append_unresolved_wl_field_add(&mut guard, &field_id);
iwm.fields_mut().st_insert(field_id, field);
});
}
AlterAction::Remove(removed) => {
let mut guard = model.delta_state().wguard();
let mut guard = model.delta_state().schema_delta_write();
if GI::NONNULL {
// prepare txn
let txn = gnstxn::AlterModelRemoveTxn::new(
@ -305,7 +305,7 @@ impl Model {
removed.iter().for_each(|field_id| {
model
.delta_state()
.append_unresolved_wl_field_rem(&mut guard, field_id.as_str());
.schema_append_unresolved_wl_field_rem(&mut guard, field_id.as_str());
iwm.fields_mut().st_delete(field_id.as_str());
});
}

@ -26,6 +26,7 @@
use {
super::{Fields, Model},
crate::engine::{core::index::Row, sync::atm::Guard, sync::queue::Queue},
parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard},
std::{
collections::btree_map::{BTreeMap, Range},
@ -39,6 +40,8 @@ use {
// FIXME(@ohsayan): This an inefficient repr of the matrix; replace it with my other design
#[derive(Debug)]
/// A sync matrix enables different queries to have different access permissions on the data model, and the data in the
/// index
pub struct ISyncMatrix {
// virtual privileges
/// read/write model
@ -55,6 +58,7 @@ impl PartialEq for ISyncMatrix {
}
#[derive(Debug)]
/// Read model, write new data
pub struct IRModelSMData<'a> {
rmodel: RwLockReadGuard<'a, ()>,
mdata: RwLockReadGuard<'a, ()>,
@ -80,6 +84,7 @@ impl<'a> IRModelSMData<'a> {
}
#[derive(Debug)]
/// Read model
pub struct IRModel<'a> {
rmodel: RwLockReadGuard<'a, ()>,
fields: &'a Fields,
@ -101,6 +106,7 @@ impl<'a> IRModel<'a> {
}
#[derive(Debug)]
/// Write model
pub struct IWModel<'a> {
wmodel: RwLockWriteGuard<'a, ()>,
fields: &'a mut Fields,
@ -139,48 +145,99 @@ impl ISyncMatrix {
*/
#[derive(Debug)]
/// A delta state for the model
pub struct DeltaState {
current_version: AtomicU64,
deltas: RwLock<BTreeMap<DeltaVersion, DeltaPart>>,
// schema
schema_current_version: AtomicU64,
schema_deltas: RwLock<BTreeMap<DeltaVersion, SchemaDeltaPart>>,
// data
data_current_version: AtomicU64,
data_deltas: Queue<DataDelta>,
}
#[derive(Debug)]
pub struct DeltaPart {
kind: DeltaKind,
impl DeltaState {
/// A new, fully resolved delta state with version counters set to 0
pub fn new_resolved() -> Self {
Self {
schema_current_version: AtomicU64::new(0),
schema_deltas: RwLock::new(BTreeMap::new()),
data_current_version: AtomicU64::new(0),
data_deltas: Queue::new(),
}
}
}
impl DeltaPart {
pub fn kind(&self) -> &DeltaKind {
&self.kind
// data
impl DeltaState {
pub fn append_new_data_delta(
&self,
kind: DataDeltaKind,
row: Row,
version: DeltaVersion,
g: &Guard,
) {
self.data_deltas
.blocking_enqueue(DataDelta::new(version.0, row, kind), g);
}
pub fn create_new_data_delta_version(&self) -> DeltaVersion {
DeltaVersion(self.__data_delta_step())
}
}
#[derive(Debug)]
pub enum DeltaKind {
FieldAdd(Box<str>),
FieldRem(Box<str>),
impl DeltaState {
pub fn __data_delta_step(&self) -> u64 {
self.data_current_version.fetch_add(1, Ordering::AcqRel)
}
}
impl DeltaPart {
fn new(kind: DeltaKind) -> Self {
Self { kind }
// schema
impl DeltaState {
pub fn schema_delta_write<'a>(&'a self) -> SchemaDeltaIndexWGuard<'a> {
SchemaDeltaIndexWGuard(self.schema_deltas.write())
}
fn field_add(field_name: &str) -> Self {
Self::new(DeltaKind::FieldAdd(field_name.to_owned().into_boxed_str()))
pub fn schema_delta_read<'a>(&'a self) -> SchemaDeltaIndexRGuard<'a> {
SchemaDeltaIndexRGuard(self.schema_deltas.read())
}
fn field_rem(field_name: &str) -> Self {
Self::new(DeltaKind::FieldRem(field_name.to_owned().into_boxed_str()))
pub fn schema_current_version(&self) -> DeltaVersion {
self.__schema_delta_current()
}
pub fn schema_append_unresolved_wl_field_add(
&self,
guard: &mut SchemaDeltaIndexWGuard,
field_name: &str,
) {
self.__schema_append_unresolved_delta(&mut guard.0, SchemaDeltaPart::field_add(field_name));
}
pub fn schema_append_unresolved_wl_field_rem(
&self,
guard: &mut SchemaDeltaIndexWGuard,
field_name: &str,
) {
self.__schema_append_unresolved_delta(&mut guard.0, SchemaDeltaPart::field_rem(field_name));
}
pub fn schema_append_unresolved_field_add(&self, field_name: &str) {
self.schema_append_unresolved_wl_field_add(&mut self.schema_delta_write(), field_name);
}
pub fn schema_append_unresolved_field_rem(&self, field_name: &str) {
self.schema_append_unresolved_wl_field_rem(&mut self.schema_delta_write(), field_name);
}
}
pub struct DeltaIndexWGuard<'a>(RwLockWriteGuard<'a, BTreeMap<DeltaVersion, DeltaPart>>);
pub struct DeltaIndexRGuard<'a>(RwLockReadGuard<'a, BTreeMap<DeltaVersion, DeltaPart>>);
impl<'a> DeltaIndexRGuard<'a> {
pub fn resolve_iter_since(
impl DeltaState {
fn __schema_delta_step(&self) -> DeltaVersion {
DeltaVersion(self.schema_current_version.fetch_add(1, Ordering::AcqRel))
}
fn __schema_delta_current(&self) -> DeltaVersion {
DeltaVersion(self.schema_current_version.load(Ordering::Acquire))
}
fn __schema_append_unresolved_delta(
&self,
current_version: DeltaVersion,
) -> Range<DeltaVersion, DeltaPart> {
self.0.range(current_version.step()..)
w: &mut BTreeMap<DeltaVersion, SchemaDeltaPart>,
part: SchemaDeltaPart,
) -> DeltaVersion {
let v = self.__schema_delta_step();
w.insert(v, part);
v
}
}
@ -202,50 +259,80 @@ impl DeltaVersion {
}
}
impl DeltaState {
pub fn new_resolved() -> Self {
Self {
current_version: AtomicU64::new(0),
deltas: RwLock::new(BTreeMap::new()),
}
}
pub fn wguard<'a>(&'a self) -> DeltaIndexWGuard<'a> {
DeltaIndexWGuard(self.deltas.write())
}
pub fn rguard<'a>(&'a self) -> DeltaIndexRGuard<'a> {
DeltaIndexRGuard(self.deltas.read())
}
pub fn current_version(&self) -> DeltaVersion {
self.__delta_current()
}
pub fn append_unresolved_wl_field_add(&self, guard: &mut DeltaIndexWGuard, field_name: &str) {
self.__append_unresolved_delta(&mut guard.0, DeltaPart::field_add(field_name));
/*
schema delta
*/
#[derive(Debug)]
pub struct SchemaDeltaPart {
kind: SchemaDeltaKind,
}
impl SchemaDeltaPart {
pub fn kind(&self) -> &SchemaDeltaKind {
&self.kind
}
pub fn append_unresolved_wl_field_rem(&self, guard: &mut DeltaIndexWGuard, field_name: &str) {
self.__append_unresolved_delta(&mut guard.0, DeltaPart::field_rem(field_name));
}
#[derive(Debug)]
pub enum SchemaDeltaKind {
FieldAdd(Box<str>),
FieldRem(Box<str>),
}
impl SchemaDeltaPart {
fn new(kind: SchemaDeltaKind) -> Self {
Self { kind }
}
pub fn append_unresolved_field_add(&self, field_name: &str) {
self.append_unresolved_wl_field_add(&mut self.wguard(), field_name);
fn field_add(field_name: &str) -> Self {
Self::new(SchemaDeltaKind::FieldAdd(
field_name.to_owned().into_boxed_str(),
))
}
pub fn append_unresolved_field_rem(&self, field_name: &str) {
self.append_unresolved_wl_field_rem(&mut self.wguard(), field_name);
fn field_rem(field_name: &str) -> Self {
Self::new(SchemaDeltaKind::FieldRem(
field_name.to_owned().into_boxed_str(),
))
}
}
impl DeltaState {
fn __delta_step(&self) -> DeltaVersion {
DeltaVersion(self.current_version.fetch_add(1, Ordering::AcqRel))
}
fn __delta_current(&self) -> DeltaVersion {
DeltaVersion(self.current_version.load(Ordering::Acquire))
}
fn __append_unresolved_delta(
pub struct SchemaDeltaIndexWGuard<'a>(
RwLockWriteGuard<'a, BTreeMap<DeltaVersion, SchemaDeltaPart>>,
);
pub struct SchemaDeltaIndexRGuard<'a>(RwLockReadGuard<'a, BTreeMap<DeltaVersion, SchemaDeltaPart>>);
impl<'a> SchemaDeltaIndexRGuard<'a> {
pub fn resolve_iter_since(
&self,
w: &mut BTreeMap<DeltaVersion, DeltaPart>,
part: DeltaPart,
) -> DeltaVersion {
let v = self.__delta_step();
w.insert(v, part);
v
current_version: DeltaVersion,
) -> Range<DeltaVersion, SchemaDeltaPart> {
self.0.range(current_version.step()..)
}
}
/*
data delta
*/
#[derive(Debug)]
pub struct DataDelta {
version: u64,
row: Row,
change: DataDeltaKind,
}
impl DataDelta {
pub const fn new(version: u64, row: Row, change: DataDeltaKind) -> Self {
Self {
version,
row,
change,
}
}
}
#[derive(Debug)]
pub enum DataDeltaKind {
Insert,
Update,
Delete,
}

@ -52,7 +52,7 @@ use {
std::cell::UnsafeCell,
};
pub(in crate::engine::core) use self::delta::{DeltaKind, DeltaState, DeltaVersion};
pub(in crate::engine::core) use self::delta::{SchemaDeltaKind, DeltaState, DeltaVersion};
pub(in crate::engine) type Fields = IndexSTSeqCns<Box<str>, Field>;
#[derive(Debug)]
@ -258,7 +258,7 @@ impl Model {
gnstxn::SpaceIDRef::new(space_name, space),
model_name,
model.get_uuid(),
model.delta_state().current_version().value_u64(),
model.delta_state().schema_current_version().value_u64(),
));
// commit txn
txn_driver.try_commit(txn)?;

@ -383,7 +383,7 @@ mod exec {
]
);
assert_eq!(
model.delta_state().current_version(),
model.delta_state().schema_current_version(),
DeltaVersion::test_new(2)
);
},
@ -410,7 +410,7 @@ mod exec {
[("username".into(), Field::new([Layer::str()].into(), false))]
);
assert_eq!(
mdl.delta_state().current_version(),
mdl.delta_state().schema_current_version(),
DeltaVersion::test_new(4)
);
}
@ -428,7 +428,7 @@ mod exec {
let schema = model.intent_read_model();
assert!(schema.fields().st_get("password").unwrap().is_nullable());
assert_eq!(
model.delta_state().current_version(),
model.delta_state().schema_current_version(),
DeltaVersion::genesis()
);
},

@ -53,7 +53,7 @@ mod validation {
]
);
assert_eq!(
model.delta_state().current_version(),
model.delta_state().schema_current_version(),
DeltaVersion::genesis()
);
}
@ -77,7 +77,7 @@ mod validation {
]
);
assert_eq!(
model.delta_state().current_version(),
model.delta_state().schema_current_version(),
DeltaVersion::genesis()
);
}
@ -174,7 +174,7 @@ mod exec {
]
);
assert_eq!(
model.delta_state().current_version(),
model.delta_state().schema_current_version(),
DeltaVersion::genesis()
);
});

@ -207,6 +207,11 @@ pub trait MTIndex<E, K, V>: IndexBaseSpec {
Q: ?Sized + Comparable<K>,
't: 'v,
'g: 't + 'v;
fn mt_delete_return_entry<'t, 'g, 'v, Q>(&'t self, key: &Q, g: &'g Guard) -> Option<&'v E>
where
Q: ?Sized + Comparable<K>,
't: 'v,
'g: 't + 'v;
}
/// An unordered STIndex

@ -30,7 +30,7 @@ use {
super::{
iter::{IterKV, IterKey, IterVal},
meta::{Config, TreeElement},
patch::{VanillaInsert, VanillaUpdate, VanillaUpdateRet, VanillaUpsert},
patch::{DeleteRetEntry, VanillaInsert, VanillaUpdate, VanillaUpdateRet, VanillaUpsert},
RawTree,
},
crate::engine::{
@ -171,4 +171,13 @@ impl<E: TreeElement, C: Config> MTIndex<E, E::Key, E::Value> for Raw<E, C> {
{
self.remove_return(key, g)
}
fn mt_delete_return_entry<'t, 'g, 'v, Q>(&'t self, key: &Q, g: &'g Guard) -> Option<&'v E>
where
Q: ?Sized + Comparable<E::Key>,
't: 'v,
'g: 't + 'v,
{
self._remove(DeleteRetEntry::new(key), g)
}
}

@ -211,6 +211,40 @@ impl<'d, T: TreeElement, U: Comparable<T::Key> + ?Sized> PatchDelete<T> for Dele
}
}
pub struct DeleteRetEntry<'a, T: TreeElement, U: ?Sized> {
target: &'a U,
_m: PhantomData<T>,
}
impl<'a, T: TreeElement, U: ?Sized> DeleteRetEntry<'a, T, U> {
pub fn new(target: &'a U) -> Self {
Self {
target,
_m: PhantomData,
}
}
}
impl<'dr, T: TreeElement, U: Comparable<T::Key> + ?Sized> PatchDelete<T>
for DeleteRetEntry<'dr, T, U>
{
type Ret<'a> = Option<&'a T>;
type Target = U;
fn target(&self) -> &Self::Target {
self.target
}
fn ex<'a>(v: &'a T) -> Self::Ret<'a> {
Some(v)
}
fn nx<'a>() -> Self::Ret<'a> {
None
}
}
pub struct DeleteRet<'a, T: TreeElement, U: ?Sized> {
target: &'a U,
_m: PhantomData<T>,

@ -0,0 +1,103 @@
/*
* Created on Fri Sep 01 2023
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2023, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use core::ops::{Deref, DerefMut};
#[derive(Debug, Clone, Copy, Default, Hash, PartialEq, Eq, PartialOrd, Ord)]
#[cfg_attr(target_arch = "s390x", repr(align(256)))]
#[cfg_attr(
any(
target_arch = "aarch64",
target_arch = "powerpc64",
target_arch = "x86_64",
),
repr(align(128))
)]
#[cfg_attr(
any(
target_arch = "arm",
target_arch = "hexagon",
target_arch = "mips",
target_arch = "mips64",
target_arch = "riscv32",
target_arch = "riscv64",
target_arch = "sparc"
),
repr(align(32))
)]
#[cfg_attr(
not(any(
target_arch = "aarch64",
target_arch = "arm",
target_arch = "hexagon",
target_arch = "m68k",
target_arch = "mips",
target_arch = "mips64",
target_arch = "powerpc64",
target_arch = "riscv32",
target_arch = "riscv64",
target_arch = "s390x",
target_arch = "sparc",
target_arch = "x86_64",
)),
repr(align(64))
)]
#[cfg_attr(target_arch = "m68k", repr(align(16)))]
/**
cache line padding (to avoid unintended cache line invalidation)
- 256-bit (on a side note, good lord):
-> s390x: https://community.ibm.com/community/user/ibmz-and-linuxone/viewdocument/microprocessor-optimization-primer
- 128-bit:
-> aarch64: ARM64's big.LITTLE (it's a funny situation because there's a silly situation where one set of cores have one cache line
size while the other ones have a different size; see this excellent article: https://www.mono-project.com/news/2016/09/12/arm64-icache/)
-> powerpc64: https://reviews.llvm.org/D33656
-> x86_64: Intel's Sandy Bridge+ (https://www.intel.com/content/dam/www/public/us/en/documents/manuals/64-ia-32-architectures-optimization-manual.pdf)
- 64-bit: default for all non-specific targets
- 32-bit: arm, hexagon, mips, mips64, riscv64, and sparc have 32-byte cache line size
- 16-bit: m68k (not very useful for us, but yeah)
*/
pub struct CachePadded<T> {
data: T,
}
impl<T> CachePadded<T> {
pub const fn new(data: T) -> Self {
Self { data }
}
}
impl<T> Deref for CachePadded<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.data
}
}
impl<T> DerefMut for CachePadded<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.data
}
}

@ -25,6 +25,7 @@
*/
mod astr;
mod ll;
mod stackop;
mod uarray;
mod vinline;
@ -35,6 +36,7 @@ mod tests;
// re-exports
pub use {
astr::AStr,
ll::CachePadded,
stackop::ByteStack,
uarray::UArray,
vinline::VInline,

@ -25,14 +25,17 @@
*/
use {
super::{obj::FieldMD, PersistDictEntryDscr, PersistMapSpec, PersistObject, VecU8},
super::{
obj::{self, FieldMD},
PersistDictEntryDscr, PersistMapSpec, PersistObject, VecU8,
},
crate::{
engine::{
core::model::Field,
data::{
cell::Datacell,
dict::DictEntryGeneric,
tag::{CUTag, DataTag, TagClass, TagUnique},
tag::{CUTag, TagUnique},
DictGeneric,
},
idx::{IndexBaseSpec, IndexSTSeqCns, STIndex, STIndexSeq},
@ -191,36 +194,9 @@ impl PersistMapSpec for GenericDictSpec {
<PersistMapImpl<Self> as PersistObject>::default_full_enc(buf, map);
}
DictEntryGeneric::Data(dc) => {
buf.push(
PersistDictEntryDscr::translate_from_class(dc.tag().tag_class()).value_u8()
* (!dc.is_null() as u8),
);
obj::encode_datacell_tag(buf, dc);
buf.extend(key.as_bytes());
fn encode_element(buf: &mut VecU8, dc: &Datacell) {
unsafe {
use TagClass::*;
match dc.tag().tag_class() {
Bool if dc.is_init() => buf.push(dc.read_bool() as u8),
Bool => {}
UnsignedInt | SignedInt | Float => {
buf.extend(dc.read_uint().to_le_bytes())
}
Str | Bin => {
let slc = dc.read_bin();
buf.extend(slc.len().u64_bytes_le());
buf.extend(slc);
}
List => {
let lst = dc.read_list().read();
buf.extend(lst.len().u64_bytes_le());
for item in lst.iter() {
encode_element(buf, item);
}
}
}
}
}
encode_element(buf, dc);
obj::encode_element(buf, dc);
}
}
}

@ -27,7 +27,7 @@
use crate::engine::{core::model::delta::IRModel, data::DictGeneric};
use {
super::{PersistObject, VecU8},
super::{PersistDictEntryDscr, PersistObject, VecU8},
crate::{
engine::{
core::{
@ -35,6 +35,7 @@ use {
space::{Space, SpaceMeta},
},
data::{
cell::Datacell,
tag::{DataTag, TagClass, TagSelector},
uuid::Uuid,
},
@ -45,12 +46,35 @@ use {
},
};
/*
Full 8B tag block. Notes:
1. 7B at this moment is currently unused but there's a lot of additional flags that we might want to store here
2. If we end up deciding that this is indeed a waste of space, version this out and get rid of the 7B (or whatever we determine
to be the correct size.)
*/
pub fn encode_element(buf: &mut VecU8, dc: &Datacell) {
unsafe {
use TagClass::*;
match dc.tag().tag_class() {
Bool if dc.is_init() => buf.push(dc.read_bool() as u8),
Bool => {}
UnsignedInt | SignedInt | Float => buf.extend(dc.read_uint().to_le_bytes()),
Str | Bin => {
let slc = dc.read_bin();
buf.extend(slc.len().u64_bytes_le());
buf.extend(slc);
}
List => {
let lst = dc.read_list().read();
buf.extend(lst.len().u64_bytes_le());
for item in lst.iter() {
encode_element(buf, item);
}
}
}
}
}
pub fn encode_datacell_tag(buf: &mut VecU8, dc: &Datacell) {
buf.push(
PersistDictEntryDscr::translate_from_class(dc.tag().tag_class()).value_u8()
* (!dc.is_null() as u8),
)
}
/*
layer

@ -26,6 +26,7 @@
pub(super) mod atm;
pub(super) mod cell;
pub(super) mod queue;
pub(super) mod smart;
use std::{cell::Cell, hint::spin_loop, thread};

@ -0,0 +1,236 @@
/*
* Created on Wed Aug 30 2023
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2023, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use {
super::atm::Atomic,
crate::engine::mem::CachePadded,
crossbeam_epoch::{pin, unprotected, Guard, Owned, Shared},
std::{mem::MaybeUninit, sync::atomic::Ordering},
};
#[derive(Debug)]
struct QNode<T> {
data: MaybeUninit<T>,
next: Atomic<Self>,
}
impl<T> QNode<T> {
fn new(data: MaybeUninit<T>, next: Atomic<Self>) -> Self {
Self { data, next }
}
fn null() -> Self {
Self::new(MaybeUninit::uninit(), Atomic::null())
}
fn new_data(val: T) -> Self {
Self::new(MaybeUninit::new(val), Atomic::null())
}
}
#[derive(Debug)]
pub struct Queue<T> {
head: CachePadded<Atomic<QNode<T>>>,
tail: CachePadded<Atomic<QNode<T>>>,
}
impl<T> Queue<T> {
pub fn new() -> Self {
let slf = Self {
head: CachePadded::new(Atomic::null()),
tail: CachePadded::new(Atomic::null()),
};
let g = unsafe { unprotected() };
let sentinel = Owned::new(QNode::null()).into_shared(&g);
slf.head.store(sentinel, Ordering::Relaxed);
slf.tail.store(sentinel, Ordering::Relaxed);
slf
}
pub fn blocking_enqueue_autopin(&self, new: T) {
let g = pin();
self.blocking_enqueue(new, &g);
}
pub fn blocking_enqueue(&self, new: T, g: &Guard) {
let newptr = Owned::new(QNode::new_data(new)).into_shared(g);
loop {
// get current tail
let tailptr = self.tail.load(Ordering::Acquire, g);
let tail = unsafe { tailptr.deref() };
let tail_nextptr = tail.next.load(Ordering::Acquire, g);
if tail_nextptr.is_null() {
// tail points to null which means this should ideally by the last LL node
if tail
.next
.compare_exchange(
Shared::null(),
newptr,
Ordering::Release,
Ordering::Relaxed,
g,
)
.is_ok()
{
/*
CAS'd in but tail is *probably* lagging behind. This CAS might fail but we don't care since we're allowed to have a lagging tail
*/
let _ = self.tail.compare_exchange(
tailptr,
newptr,
Ordering::Release,
Ordering::Relaxed,
g,
);
break;
}
} else {
// tail is lagging behind; attempt to help update it
let _ = self.tail.compare_exchange(
tailptr,
tail_nextptr,
Ordering::Release,
Ordering::Relaxed,
g,
);
}
}
}
pub fn blocking_try_dequeue_autopin(&self) -> Option<T> {
let g = pin();
self.blocking_try_dequeue(&g)
}
pub fn blocking_try_dequeue(&self, g: &Guard) -> Option<T> {
loop {
// get current head
let headptr = self.head.load(Ordering::Acquire, g);
let head = unsafe { headptr.deref() };
let head_nextptr = head.next.load(Ordering::Acquire, g);
if head_nextptr.is_null() {
// this is the sentinel; queue is empty
return None;
}
// we observe at this point in time that there is atleast one element in the list
// let us swing that into sentinel position
if self
.head
.compare_exchange(
headptr,
head_nextptr,
Ordering::Release,
Ordering::Relaxed,
g,
)
.is_ok()
{
// good so we were able to update the head
let tailptr = self.tail.load(Ordering::Acquire, g);
// but wait, was this the last node? in that case, we need to update the tail before we destroy it.
// this is fine though, as nothing will go boom right now since the tail is allowed to lag by one
if headptr == tailptr {
// right so this was the last node uh oh
let _ = self.tail.compare_exchange(
tailptr,
head_nextptr,
Ordering::Release,
Ordering::Relaxed,
g,
);
}
// now we're in a position to happily destroy this
unsafe { g.defer_destroy(headptr) }
// read out the ptr
return Some(unsafe { head_nextptr.deref().data.as_ptr().read() });
}
}
}
}
impl<T> Drop for Queue<T> {
fn drop(&mut self) {
let g = unsafe { unprotected() };
while self.blocking_try_dequeue(g).is_some() {}
// dealloc sentinel
unsafe {
self.head.load(Ordering::Relaxed, g).into_owned();
}
}
}
#[cfg(test)]
type StringQueue = Queue<String>;
#[test]
fn empty() {
let q = StringQueue::new();
drop(q);
}
#[test]
fn empty_deq() {
let g = pin();
let q = StringQueue::new();
assert_eq!(q.blocking_try_dequeue(&g), None);
}
#[test]
fn empty_enq() {
let g = pin();
let q = StringQueue::new();
q.blocking_enqueue("hello".into(), &g);
}
#[test]
fn multi_eq_dq() {
const ITEMS_L: usize = 100;
use std::{sync::Arc, thread};
let q = Arc::new(StringQueue::new());
let producer_q = q.clone();
let consumer_q = q.clone();
let producer = thread::spawn(move || {
let mut sent = vec![];
let g = pin();
for i in 0..ITEMS_L {
let item = format!("time-{i}");
// send a message and then sleep for two seconds
producer_q.blocking_enqueue(item.clone(), &g);
sent.push(item);
}
sent
});
let consumer = thread::spawn(move || {
let g = pin();
let mut received = vec![];
loop {
if received.len() == ITEMS_L {
break;
}
if let Some(item) = consumer_q.blocking_try_dequeue(&g) {
received.push(item);
}
}
received
});
let sent = producer.join().unwrap();
let received = consumer.join().unwrap();
assert_eq!(sent, received);
}

@ -0,0 +1,81 @@
/*
* Created on Mon Aug 28 2023
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2023, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use crate::{
engine::{
core::{index::PrimaryIndexKey, GlobalNS},
data::cell::Datacell,
storage::v1::inf::obj,
},
util::{os, EndianQW},
};
type Buf = Vec<u8>;
static mut CAP_PER_LL: usize = 0;
static mut FREEMEM: u64 = 0;
/// Set the free memory and cap for deltas so that we don't bust through memory
///
/// ## Safety
/// - All models must have been loaded
/// - This must be called **before** the arbiter spawns threads for connections
pub unsafe fn set_limits(gns: &GlobalNS) {
let model_cnt: usize = gns
.spaces()
.read()
.values()
.map(|space| space.models().read().len())
.sum();
let available_mem = os::free_memory_in_bytes();
FREEMEM = available_mem;
CAP_PER_LL = ((available_mem as usize / core::cmp::max(1, model_cnt)) as f64 * 0.01) as usize;
}
/*
misc. methods
*/
fn encode_primary_key(buf: &mut Buf, pk: &PrimaryIndexKey) {
buf.push(pk.tag().d());
static EXEC: [unsafe fn(&mut Buf, &PrimaryIndexKey); 2] = [
|buf, pk| unsafe { buf.extend(pk.read_uint().to_le_bytes()) },
|buf, pk| unsafe {
let bin = pk.read_bin();
buf.extend(bin.len().u64_bytes_le());
buf.extend(bin);
},
];
unsafe {
// UNSAFE(@ohsayan): tag map
assert!((pk.tag().d() / 2) < 2);
EXEC[(pk.tag().d() / 2) as usize](buf, pk);
}
}
fn encode_dc(buf: &mut Buf, dc: &Datacell) {
obj::encode_element(buf, dc)
}

@ -67,7 +67,7 @@ impl<'a> ModelIDRef<'a> {
super::SpaceIDRef::new(space_name, space),
model_name,
model.get_uuid(),
model.delta_state().current_version().value_u64(),
model.delta_state().schema_current_version().value_u64(),
)
}
pub fn new(

@ -148,7 +148,7 @@ mod model_tests {
super::SpaceIDRef::new("myspace", &space),
"mymodel",
model.get_uuid(),
model.delta_state().current_version().value_u64(),
model.delta_state().schema_current_version().value_u64(),
),
&new_fields,
);
@ -160,7 +160,7 @@ mod model_tests {
super::SpaceIDRes::new(space.get_uuid(), "myspace".into()),
"mymodel".into(),
model.get_uuid(),
model.delta_state().current_version().value_u64()
model.delta_state().schema_current_version().value_u64()
),
new_fields
},
@ -176,7 +176,7 @@ mod model_tests {
super::SpaceIDRef::new("myspace", &space),
"mymodel",
model.get_uuid(),
model.delta_state().current_version().value_u64(),
model.delta_state().schema_current_version().value_u64(),
),
&removed_fields,
);
@ -188,7 +188,7 @@ mod model_tests {
super::SpaceIDRes::new(space.get_uuid(), "myspace".into()),
"mymodel".into(),
model.get_uuid(),
model.delta_state().current_version().value_u64()
model.delta_state().schema_current_version().value_u64()
),
removed_fields: ["profile_pic".into()].into()
},
@ -207,7 +207,7 @@ mod model_tests {
super::SpaceIDRef::new("myspace", &space),
"mymodel".into(),
model.get_uuid(),
model.delta_state().current_version().value_u64(),
model.delta_state().schema_current_version().value_u64(),
),
&updated_fields,
);
@ -219,7 +219,7 @@ mod model_tests {
super::SpaceIDRes::new(space.get_uuid(), "myspace".into()),
"mymodel".into(),
model.get_uuid(),
model.delta_state().current_version().value_u64()
model.delta_state().schema_current_version().value_u64()
),
updated_fields
},
@ -233,7 +233,7 @@ mod model_tests {
super::SpaceIDRef::new("myspace", &space),
"mymodel",
model.get_uuid(),
model.delta_state().current_version().value_u64(),
model.delta_state().schema_current_version().value_u64(),
));
let encoded = super::enc::enc_full_self(txn);
let decoded = super::dec::dec_full::<DropModelTxn>(&encoded).unwrap();
@ -242,7 +242,7 @@ mod model_tests {
super::SpaceIDRes::new(space.get_uuid(), "myspace".into()),
"mymodel".into(),
model.get_uuid(),
model.delta_state().current_version().value_u64()
model.delta_state().schema_current_version().value_u64()
),
decoded
);

@ -24,6 +24,7 @@
*
*/
pub mod data;
pub mod gns;
use super::storage::v1::SDSSError;

@ -28,7 +28,9 @@
pub use unix::*;
#[cfg(windows)]
pub use windows::*;
mod free_memory;
pub use free_memory::free_memory_in_bytes;
use {
crate::IoResult,
std::{

@ -0,0 +1,90 @@
/*
* Created on Sat Sep 02 2023
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2023, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
#[cfg(target_os = "windows")]
extern crate winapi;
#[cfg(any(target_os = "linux", target_os = "macos"))]
extern crate libc;
pub fn free_memory_in_bytes() -> u64 {
#[cfg(target_os = "windows")]
{
use winapi::um::sysinfoapi::{GlobalMemoryStatusEx, MEMORYSTATUSEX};
let mut statex: MEMORYSTATUSEX = unsafe { std::mem::zeroed() };
statex.dwLength = std::mem::size_of::<MEMORYSTATUSEX>() as u32;
unsafe {
GlobalMemoryStatusEx(&mut statex);
}
// Return free physical memory
return statex.ullAvailPhys;
}
#[cfg(target_os = "linux")]
{
use libc::sysinfo;
let mut info: libc::sysinfo = unsafe { core::mem::zeroed() };
unsafe {
if sysinfo(&mut info) == 0 {
// Return free memory
return (info.freeram as u64) * (info.mem_unit as u64);
}
}
return 0;
}
#[cfg(target_os = "macos")]
{
use std::mem;
unsafe {
let page_size = libc::sysconf(libc::_SC_PAGESIZE);
let mut count: u32 = libc::HOST_VM_INFO64_COUNT as _;
let mut stat: libc::vm_statistics64 = mem::zeroed();
libc::host_statistics64(
libc::mach_host_self(),
libc::HOST_VM_INFO64,
&mut stat as *mut libc::vm_statistics64 as *mut _,
&mut count,
);
// see this: https://opensource.apple.com/source/xnu/xnu-4570.31.3/osfmk/mach/vm_statistics.h.auto.html
return (stat.free_count as u64)
.saturating_add(stat.inactive_count as _)
.saturating_add(stat.compressor_page_count as u64)
.saturating_mul(page_size as _);
}
}
#[cfg(not(any(target_os = "windows", target_os = "linux", target_os = "macos")))]
{
return 0;
}
}
Loading…
Cancel
Save