From 29baeb49968b7c0155785f10b2fb3b6eb6144b2d Mon Sep 17 00:00:00 2001 From: Sayan Date: Fri, 27 Jan 2023 22:53:01 +0530 Subject: [PATCH] Implement mtchm --- server/src/engine/idx/mod.rs | 1 + server/src/engine/idx/mtchm/access.rs | 218 +++++++++ server/src/engine/idx/mtchm/iter.rs | 268 +++++++++++ server/src/engine/idx/mtchm/meta.rs | 123 +++++ server/src/engine/idx/mtchm/mod.rs | 658 ++++++++++++++++++++++++++ server/src/engine/macros.rs | 3 + server/src/engine/mem/vinline.rs | 4 +- server/src/engine/mod.rs | 3 +- server/src/engine/ql/macros.rs | 6 + 9 files changed, 1280 insertions(+), 4 deletions(-) create mode 100644 server/src/engine/idx/mtchm/access.rs create mode 100644 server/src/engine/idx/mtchm/iter.rs create mode 100644 server/src/engine/idx/mtchm/meta.rs create mode 100644 server/src/engine/idx/mtchm/mod.rs diff --git a/server/src/engine/idx/mod.rs b/server/src/engine/idx/mod.rs index aa95e65a..84d482fe 100644 --- a/server/src/engine/idx/mod.rs +++ b/server/src/engine/idx/mod.rs @@ -26,6 +26,7 @@ mod stdhm; mod stord; +mod mtchm; #[cfg(test)] mod tests; diff --git a/server/src/engine/idx/mtchm/access.rs b/server/src/engine/idx/mtchm/access.rs new file mode 100644 index 00000000..4b5177dc --- /dev/null +++ b/server/src/engine/idx/mtchm/access.rs @@ -0,0 +1,218 @@ +/* + * Created on Fri Jan 27 2023 + * + * This file is a part of Skytable + * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source + * NoSQL database written by Sayan Nandan ("the Author") with the + * vision to provide flexibility in data modelling without compromising + * on performance, queryability or scalability. + * + * Copyright (c) 2023, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +use super::meta::TreeElement; + +/// write mode flag +type WriteFlag = u8; +pub const WRITEMODE_DELETE: WriteFlag = 0xFF; +/// fresh +pub const WRITEMODE_FRESH: WriteFlag = 0b01; +/// refresh +pub const WRITEMODE_REFRESH: WriteFlag = 0b10; +/// any +pub const WRITEMODE_ANY: WriteFlag = 0b11; + +pub trait ReadMode: 'static { + type Ret<'a>; + fn ex<'a>(v: &'a T) -> Self::Ret<'a>; + fn nx<'a>() -> Self::Ret<'a>; +} + +pub struct RModeExists; +impl ReadMode for RModeExists { + type Ret<'a> = bool; + fn ex<'a>(_: &'a T) -> Self::Ret<'a> { + true + } + fn nx<'a>() -> Self::Ret<'a> { + false + } +} + +pub struct RModeRef; +impl ReadMode for RModeRef { + type Ret<'a> = Option<&'a T::Value>; + fn ex<'a>(v: &'a T) -> Self::Ret<'a> { + Some(v.val()) + } + fn nx<'a>() -> Self::Ret<'a> { + None + } +} + +pub struct RModeClone; +impl ReadMode for RModeClone { + type Ret<'a> = Option; + fn ex<'a>(v: &'a T) -> Self::Ret<'a> { + Some(v.val().clone()) + } + fn nx<'a>() -> Self::Ret<'a> { + None + } +} + +pub trait WriteMode: 'static { + const WMODE: WriteFlag; + type Ret<'a>; + fn ex<'a>(v: &'a T) -> Self::Ret<'a>; + fn nx<'a>() -> Self::Ret<'a>; +} + +pub struct WModeFresh; +impl WriteMode for WModeFresh { + const WMODE: WriteFlag = WRITEMODE_FRESH; + type Ret<'a> = bool; + #[inline(always)] + fn ex(_: &T) -> Self::Ret<'static> { + false + } + #[inline(always)] + fn nx<'a>() -> Self::Ret<'a> { + true + } +} + +pub struct WModeUpdate; +impl WriteMode for WModeUpdate { + const WMODE: WriteFlag = WRITEMODE_REFRESH; + type Ret<'a> = bool; + #[inline(always)] + fn ex(_: &T) -> Self::Ret<'static> { + true + } + #[inline(always)] + fn nx<'a>() -> Self::Ret<'a> { + false + } +} + +pub struct WModeUpdateRetClone; +impl WriteMode for WModeUpdateRetClone { + const WMODE: WriteFlag = WRITEMODE_REFRESH; + type Ret<'a> = Option; + #[inline(always)] + fn ex(v: &T) -> Self::Ret<'static> { + Some(v.val().clone()) + } + #[inline(always)] + fn nx<'a>() -> Self::Ret<'a> { + None + } +} + +pub struct WModeUpdateRetRef; +impl WriteMode for WModeUpdateRetRef { + const WMODE: WriteFlag = WRITEMODE_REFRESH; + type Ret<'a> = Option<&'a T::Value>; + #[inline(always)] + fn ex<'a>(v: &'a T) -> Self::Ret<'a> { + Some(v.val()) + } + #[inline(always)] + fn nx<'a>() -> Self::Ret<'a> { + None + } +} + +pub struct WModeUpsert; +impl WriteMode for WModeUpsert { + const WMODE: WriteFlag = WRITEMODE_ANY; + type Ret<'a> = (); + #[inline(always)] + fn ex(_: &T) -> Self::Ret<'static> { + () + } + #[inline(always)] + fn nx<'a>() -> Self::Ret<'a> { + () + } +} + +pub struct WModeUpsertRef; +impl WriteMode for WModeUpsertRef { + const WMODE: WriteFlag = WRITEMODE_ANY; + type Ret<'a> = Option<&'a T::Value>; + fn ex<'a>(v: &'a T) -> Self::Ret<'a> { + Some(v.val()) + } + fn nx<'a>() -> Self::Ret<'a> { + None + } +} + +pub struct WModeUpsertClone; +impl WriteMode for WModeUpsertClone { + const WMODE: WriteFlag = WRITEMODE_ANY; + type Ret<'a> = Option; + fn ex<'a>(v: &'a T) -> Self::Ret<'static> { + Some(v.val().clone()) + } + fn nx<'a>() -> Self::Ret<'a> { + None + } +} +pub struct WModeDelete; +impl WriteMode for WModeDelete { + const WMODE: WriteFlag = WRITEMODE_DELETE; + type Ret<'a> = bool; + #[inline(always)] + fn ex<'a>(_: &'a T) -> Self::Ret<'a> { + true + } + #[inline(always)] + fn nx<'a>() -> Self::Ret<'a> { + false + } +} + +pub struct WModeDeleteRef; +impl WriteMode for WModeDeleteRef { + const WMODE: WriteFlag = WRITEMODE_DELETE; + type Ret<'a> = Option<&'a T::Value>; + #[inline(always)] + fn ex<'a>(v: &'a T) -> Self::Ret<'a> { + Some(v.val()) + } + #[inline(always)] + fn nx<'a>() -> Self::Ret<'a> { + None + } +} + +pub struct WModeDeleteClone; +impl WriteMode for WModeDeleteClone { + const WMODE: WriteFlag = WRITEMODE_DELETE; + type Ret<'a> = Option; + #[inline(always)] + fn ex<'a>(v: &'a T) -> Self::Ret<'a> { + Some(v.val().clone()) + } + #[inline(always)] + fn nx<'a>() -> Self::Ret<'a> { + None + } +} diff --git a/server/src/engine/idx/mtchm/iter.rs b/server/src/engine/idx/mtchm/iter.rs new file mode 100644 index 00000000..d856011a --- /dev/null +++ b/server/src/engine/idx/mtchm/iter.rs @@ -0,0 +1,268 @@ +/* + * Created on Fri Jan 27 2023 + * + * This file is a part of Skytable + * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source + * NoSQL database written by Sayan Nandan ("the Author") with the + * vision to provide flexibility in data modelling without compromising + * on performance, queryability or scalability. + * + * Copyright (c) 2023, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +use super::{ + super::super::{ + mem::UArray, + sync::atm::{Guard, Shared}, + }, + meta::{Config, DefConfig, NodeFlag, TreeElement}, + Node, Tree, +}; +use std::marker::PhantomData; + +pub struct IterKV<'t, 'g, 'v, T, S, C> +where + 't: 'v, + 'g: 'v + 't, + C: Config, +{ + i: RawIter<'t, 'g, 'v, T, S, C, CfgIterKV>, +} + +impl<'t, 'g, 'v, T, S, C> IterKV<'t, 'g, 'v, T, S, C> +where + 't: 'v, + 'g: 'v + 't, + C: Config, +{ + pub fn new(t: &'t Tree, g: &'g Guard) -> Self { + Self { + i: RawIter::new(t, g), + } + } +} + +impl<'t, 'g, 'v, T, S, C> Iterator for IterKV<'t, 'g, 'v, T, S, C> +where + 't: 'v, + 'g: 'v + 't, + C: Config, + T: TreeElement, +{ + type Item = &'v T; + + fn next(&mut self) -> Option { + self.i.next() + } +} + +pub struct IterKey<'t, 'g, 'v, T, S, C> +where + 't: 'v, + 'g: 'v + 't, + C: Config, + T: TreeElement, +{ + i: RawIter<'t, 'g, 'v, T, S, C, CfgIterKey>, +} + +impl<'t, 'g, 'v, T, S, C> IterKey<'t, 'g, 'v, T, S, C> +where + 't: 'v, + 'g: 'v + 't, + C: Config, + T: TreeElement, +{ + pub fn new(t: &'t Tree, g: &'g Guard) -> Self { + Self { + i: RawIter::new(t, g), + } + } +} + +impl<'t, 'g, 'v, T, S, C> Iterator for IterKey<'t, 'g, 'v, T, S, C> +where + 't: 'v, + 'g: 'v + 't, + C: Config, + T: TreeElement, +{ + type Item = &'v T::Key; + + fn next(&mut self) -> Option { + self.i.next() + } +} + +pub struct IterVal<'t, 'g, 'v, T, S, C> +where + 't: 'v, + 'g: 'v + 't, + C: Config, + T: TreeElement, +{ + i: RawIter<'t, 'g, 'v, T, S, C, CfgIterVal>, +} + +impl<'t, 'g, 'v, T, S, C> IterVal<'t, 'g, 'v, T, S, C> +where + 't: 'v, + 'g: 'v + 't, + C: Config, + T: TreeElement, +{ + pub fn new(t: &'t Tree, g: &'g Guard) -> Self { + Self { + i: RawIter::new(t, g), + } + } +} + +impl<'t, 'g, 'v, T, S, C> Iterator for IterVal<'t, 'g, 'v, T, S, C> +where + 't: 'v, + 'g: 'v + 't, + C: Config, + T: TreeElement, +{ + type Item = &'v T::Value; + + fn next(&mut self) -> Option { + self.i.next() + } +} + +trait IterConfig { + type Ret<'a> + where + T: 'a; + fn some<'a>(v: &'a T) -> Option>; +} + +struct CfgIterKV; +impl IterConfig for CfgIterKV { + type Ret<'a> = &'a T where T: 'a; + fn some<'a>(v: &'a T) -> Option> { + Some(v) + } +} + +struct CfgIterKey; +impl IterConfig for CfgIterKey { + type Ret<'a> = &'a T::Key where T::Key: 'a; + fn some<'a>(v: &'a T) -> Option> { + Some(v.key()) + } +} + +struct CfgIterVal; +impl IterConfig for CfgIterVal { + type Ret<'a> = &'a T::Value where T::Value: 'a; + fn some<'a>(v: &'a T) -> Option> { + Some(v.val()) + } +} + +struct DFSCNodeCtx<'g, C: Config> { + sptr: Shared<'g, Node>, + idx: usize, +} + +struct RawIter<'t, 'g, 'v, T, S, C, I> +where + 't: 'v, + 'g: 'v + 't, + I: IterConfig, + C: Config, +{ + g: &'g Guard, + stack: UArray<{ ::BRANCH_MX + 1 }, DFSCNodeCtx<'g, C>>, + _m: PhantomData<(&'v T, C, &'t Tree, I)>, +} + +impl<'t, 'g, 'v, T, S, C, I> RawIter<'t, 'g, 'v, T, S, C, I> +where + 't: 'v, + 'g: 'v + 't, + I: IterConfig, + C: Config, +{ + pub(super) fn new(tree: &'t Tree, g: &'g Guard) -> Self { + let mut stack = UArray::new(); + let sptr = tree.root.ld_acq(g); + stack.push(DFSCNodeCtx { sptr, idx: 0 }); + Self { + g, + stack, + _m: PhantomData, + } + } + /// depth-first search the tree + fn _next(&mut self) -> Option> { + while !self.stack.is_empty() { + let l = self.stack.len() - 1; + let ref mut current = self.stack[l]; + let ref node = current.sptr; + let flag = super::ldfl(¤t.sptr); + match flag { + _ if node.is_null() => { + self.stack.pop(); + } + flag if super::hf(flag, NodeFlag::DATA) => { + let data = unsafe { + // UNSAFE(@ohsayan): flagck + Tree::::read_data(current.sptr) + }; + if current.idx < data.len() { + let ref ret = data[current.idx]; + current.idx += 1; + return I::some(ret); + } else { + self.stack.pop(); + } + } + _ if current.idx < C::MAX_TREE_HEIGHT => { + let this_node = unsafe { + // UNSAFE(@ohsayan): guard + node.deref() + }; + let sptr = this_node.branch[current.idx].ld_acq(&self.g); + current.idx += 1; + self.stack.push(DFSCNodeCtx { sptr, idx: 0 }); + } + _ => { + self.stack.pop(); + } + } + } + None + } +} + +impl<'t, 'g, 'v, T, S, C, I> Iterator for RawIter<'t, 'g, 'v, T, S, C, I> +where + 't: 'v, + 'g: 'v + 't, + I: IterConfig, + C: Config, +{ + type Item = I::Ret<'v>; + + fn next(&mut self) -> Option { + self._next() + } +} diff --git a/server/src/engine/idx/mtchm/meta.rs b/server/src/engine/idx/mtchm/meta.rs new file mode 100644 index 00000000..913b0db5 --- /dev/null +++ b/server/src/engine/idx/mtchm/meta.rs @@ -0,0 +1,123 @@ +/* + * Created on Thu Jan 26 2023 + * + * This file is a part of Skytable + * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source + * NoSQL database written by Sayan Nandan ("the Author") with the + * vision to provide flexibility in data modelling without compromising + * on performance, queryability or scalability. + * + * Copyright (c) 2023, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +use super::super::{super::mem::VInline, AsKeyClone}; +use std::{hash::BuildHasher, sync::Arc}; + +const LNODE_STACK: usize = 2; +pub type DefConfig = Config2B; +pub type LNode = VInline; + +pub trait PreConfig: Sized + 'static { + const BITS: u32; +} + +pub trait Config: PreConfig { + const BRANCH_MX: usize = ::BITS as _; + const BRANCH_LG: usize = { + let mut index = ::BRANCH_MX; + let mut log = 0usize; + while { + index >>= 1; + index != 0 + } { + log += 1; + } + log + }; + const HASH_MASK: u64 = (::BITS - 1) as _; + const MAX_TREE_HEIGHT_UB: usize = 0x40; + const MAX_TREE_HEIGHT: usize = + ::MAX_TREE_HEIGHT_UB / ::BRANCH_LG; + const LEVEL_ZERO: usize = 0; +} + +impl Config for T {} + +macro_rules! impl_config { + ($($vis:vis $name:ident = $ty:ty),*) => { + $($vis struct $name; impl $crate::engine::idx::mtchm::meta::PreConfig for $name { const BITS: u32 = <$ty>::BITS; })* + } +} + +impl_config!(pub Config2B = u16); + +pub trait Key: AsKeyClone + 'static {} +impl Key for T where T: AsKeyClone + 'static {} +pub trait Value: Clone + 'static {} +impl Value for T where T: Clone + 'static {} +pub trait AsHasher: BuildHasher + Default {} +impl AsHasher for T where T: BuildHasher + Default {} + +pub trait TreeElement: Clone + 'static { + type Key: Key; + type Value: Value; + fn key(&self) -> &Self::Key; + fn val(&self) -> &Self::Value; +} + +impl TreeElement for (K, V) { + type Key = K; + type Value = V; + #[inline(always)] + fn key(&self) -> &K { + &self.0 + } + #[inline(always)] + fn val(&self) -> &V { + &self.1 + } +} + +impl TreeElement for Arc<(K, V)> { + type Key = K; + type Value = V; + #[inline(always)] + fn key(&self) -> &K { + &self.0 + } + #[inline(always)] + fn val(&self) -> &V { + &self.1 + } +} + +flags! { + pub struct NodeFlag: usize { + PENDING_DELETE = 0b01, + DATA = 0b10, + } +} + +flags! { + #[derive(PartialEq, Eq)] + pub struct CompressState: u8 { + NULL = 0b00, + SNODE = 0b01, + CASFAIL = 0b10, + RESTORED = 0b11, + } +} diff --git a/server/src/engine/idx/mtchm/mod.rs b/server/src/engine/idx/mtchm/mod.rs new file mode 100644 index 00000000..b4dc70ec --- /dev/null +++ b/server/src/engine/idx/mtchm/mod.rs @@ -0,0 +1,658 @@ +/* + * Created on Thu Jan 26 2023 + * + * This file is a part of Skytable + * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source + * NoSQL database written by Sayan Nandan ("the Author") with the + * vision to provide flexibility in data modelling without compromising + * on performance, queryability or scalability. + * + * Copyright (c) 2023, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +mod access; +mod iter; +mod meta; + +use self::{ + access::{ReadMode, WriteMode}, + iter::{IterKV, IterKey, IterVal}, + meta::{AsHasher, CompressState, Config, DefConfig, LNode, NodeFlag, TreeElement}, +}; +use super::{ + super::{ + mem::UArray, + sync::atm::{self, cpin, upin, Atomic, Guard, Owned, Shared, ORD_ACQ, ORD_ACR, ORD_RLX}, + }, + AsKey, +}; +use crossbeam_epoch::CompareExchangeError; +use std::{borrow::Borrow, hash::Hasher, marker::PhantomData, mem, sync::atomic::AtomicUsize}; + +/* + HACK(@ohsayan): Until https://github.com/rust-lang/rust/issues/76560 is stabilized which is likely to take a while, + we need to settle for trait objects +*/ + +pub struct Node { + branch: [Atomic; ::BRANCH_MX], +} + +impl Node { + const NULL: Atomic = Atomic::null(); + const NULL_BRANCH: [Atomic; ::BRANCH_MX] = + [Self::NULL; ::BRANCH_MX]; + #[inline(always)] + const fn null() -> Self { + Self { + branch: Self::NULL_BRANCH, + } + } +} + +#[inline(always)] +fn gc(g: &Guard) { + g.flush(); +} + +#[inline(always)] +fn ldfl(c: &Shared>) -> usize { + c.tag() +} + +#[inline(always)] +const fn hf(c: usize, f: NodeFlag) -> bool { + (c & f.d()) == f.d() +} + +#[inline(always)] +const fn cf(c: usize, r: NodeFlag) -> usize { + c & !r.d() +} + +trait CTFlagAlign { + const FL_A: bool; + const FL_B: bool; + const FLCK_A: () = assert!(Self::FL_A & Self::FL_B); + const FLCK: () = Self::FLCK_A; +} + +impl CTFlagAlign for Tree { + const FL_A: bool = atm::ensure_flag_align::, { NodeFlag::bits() }>(); + const FL_B: bool = atm::ensure_flag_align::, { NodeFlag::bits() }>(); +} + +pub struct Tree { + root: Atomic>, + h: S, + l: AtomicUsize, + _m: PhantomData, +} + +impl Tree { + #[inline(always)] + const fn _new(h: S) -> Self { + let _ = Self::FLCK; + Self { + root: Atomic::null(), + h, + l: AtomicUsize::new(0), + _m: PhantomData, + } + } + #[inline(always)] + fn len(&self) -> usize { + self.l.load(ORD_RLX) + } + #[inline(always)] + fn is_empty(&self) -> bool { + self.len() == 0 + } + #[inline(always)] + pub const fn with_hasher(h: S) -> Self { + Self::_new(h) + } +} + +impl Tree { + #[inline(always)] + fn new() -> Self { + Self::_new(S::default()) + } +} + +impl Tree { + fn hash(&self, k: &Q) -> u64 + where + Q: ?Sized + AsKey, + { + let mut state = self.h.build_hasher(); + k.hash(&mut state); + state.finish() + } +} + +// iter +impl Tree { + fn iter_kv<'t, 'g, 'v>(&'t self, g: &'g Guard) -> IterKV<'t, 'g, 'v, T, S, C> { + IterKV::new(self, g) + } + fn iter_key<'t, 'g, 'v>(&'t self, g: &'g Guard) -> IterKey<'t, 'g, 'v, T, S, C> { + IterKey::new(self, g) + } + fn iter_val<'t, 'g, 'v>(&'t self, g: &'g Guard) -> IterVal<'t, 'g, 'v, T, S, C> { + IterVal::new(self, g) + } +} + +impl Tree { + fn insert(&self, elem: T, g: &Guard) -> bool { + self._insert::(elem, g) + } + fn update(&self, elem: T, g: &Guard) -> bool { + self._insert::(elem, g) + } + fn update_return<'g>(&'g self, elem: T, g: &'g Guard) -> Option<&'g T::Value> { + self._insert::(elem, g) + } + fn upsert(&self, elem: T, g: &Guard) { + self._insert::(elem, g) + } + fn upsert_return<'g>(&'g self, elem: T, g: &'g Guard) -> Option<&'g T::Value> { + self._insert::(elem, g) + } + fn _insert<'g, W: WriteMode>(&'g self, elem: T, g: &'g Guard) -> W::Ret<'g> { + let hash = self.hash(elem.key()); + let mut level = C::LEVEL_ZERO; + let mut current = &self.root; + let mut parent = None; + let mut child = None; + loop { + let node = current.ld_acq(g); + match ldfl(&node) { + flag if hf(flag, NodeFlag::PENDING_DELETE) => { + /* + FIXME(@ohsayan): + this node is about to be deleted (well, maybe) so we'll attempt a cleanup as well. we might not exactly + need to do this. also this is a potentially expensive thing since we're going all the way back to the root, + we might be able to optimize this with a fixed-size queue. + */ + unsafe { + // UNSAFE(@ohsayan): we know that isn't the root and def doesn't have data (that's how the algorithm works) + Self::compress(parent.unwrap(), child.unwrap(), g); + } + level = C::LEVEL_ZERO; + current = &self.root; + parent = None; + child = None; + } + _ if node.is_null() => { + // this is an empty slot + if W::WMODE == access::WRITEMODE_REFRESH { + // I call that a job well done + return W::nx(); + } + if (W::WMODE == access::WRITEMODE_ANY) | (W::WMODE == access::WRITEMODE_FRESH) { + let new = Self::new_data(elem.clone()); + match current.cx_rel(node, new, g) { + Ok(_) => { + // we're done here + self.incr_len(); + return W::nx(); + } + Err(CompareExchangeError { new, .. }) => unsafe { + /* + UNSAFE(@ohsayan): so we attempted to CAS it but the CAS failed. in that case, destroy the + lnode we created. We never published the value so no other thread has watched, making this + safe + */ + Self::ldrop(new.into_shared(g)); + }, + } + } + } + flag if hf(flag, NodeFlag::DATA) => { + // so we have an lnode. well maybe an snode + let data = unsafe { + // UNSAFE(@ohsayan): flagck + Self::read_data(node) + }; + debug_assert!(!data.is_empty(), "logic,empty node not compressed"); + if data[0].key() != elem.key() && level < C::MAX_TREE_HEIGHT_UB { + /* + so this is a collision and since we haven't reached the max height, we should always + create a new branch so let's do that + */ + debug_assert_eq!(data.len(), 1, "logic,lnode before height ub"); + if W::WMODE == access::WRITEMODE_REFRESH { + // another job well done; an snode with the wrong key; so basically it's missing + return W::nx(); + } + let next_chunk = (self.hash(data[0].key()) >> level) & C::HASH_MASK; + let mut new_branch = Node::null(); + // stick this one in + new_branch.branch[next_chunk as usize] = Atomic::from(node); + // we don't care about what happens + let _ = current.cx_rel(node, Owned::new(new_branch), g); + } else { + /* + in this case we either have the same key or we found an lnode. resolve any conflicts and attempt + to update + */ + let p = data.iter().position(|e| e.key() == elem.key()); + match p { + Some(v) if W::WMODE == access::WRITEMODE_FRESH => { + return W::ex(&data[v]) + } + Some(i) + if W::WMODE == access::WRITEMODE_REFRESH + || W::WMODE == access::WRITEMODE_ANY => + { + // update the entry and create a new node + let mut new_ln = data.clone(); + new_ln[i] = elem.clone(); + match current.cx_rel( + node, + Self::new_lnode(new_ln).into_shared(g), + g, + ) { + Ok(_) => { + unsafe { + /* + UNSAFE(@ohsayan): swapped out, and we'll be the last thread to see this once the epoch proceeds + sufficiently + */ + g.defer_destroy(Shared::>::from( + node.as_raw() as *const LNode<_> + )) + } + return W::ex(&data[i]); + } + Err(CompareExchangeError { new, .. }) => { + // failed to swap it in + unsafe { + // UNSAFE(@ohsayan): Failed to swap this in, and no one else saw it (well) + Self::ldrop(new) + } + } + } + } + None if W::WMODE == access::WRITEMODE_ANY => { + // no funk here + let new_node = Self::new_data(elem.clone()); + match current.cx_rel(node, new_node.into_shared(g), g) { + Ok(_) => { + // swapped out + unsafe { + // UNSAFE(@ohsayan): last thread to see this (well, sorta) + g.defer_destroy(Shared::>::from( + node.as_raw() as *const LNode<_>, + )); + } + self.incr_len(); + return W::nx(); + } + Err(CompareExchangeError { new, .. }) => { + // failed to swap it + unsafe { + // UNSAFE(@ohsayan): never published this, so we're the last one + Self::ldrop(new) + } + } + } + } + None if W::WMODE == access::WRITEMODE_REFRESH => return W::nx(), + _ => { + unreachable!("logic,W::WMODE mismatch: `{}`", W::WMODE); + } + } + } + } + _ => { + // branch + let nxidx = (hash >> level) & C::HASH_MASK; + level += C::BRANCH_LG; + parent = Some(current); + child = Some(node); + current = &unsafe { node.deref() }.branch[nxidx as usize]; + } + } + } + } + fn contains_key<'g, Q, R: ReadMode>(&'g self, k: &Q, g: &'g Guard) -> bool + where + T::Key: Borrow, + Q: AsKey + ?Sized, + { + self._lookup::(k, g) + } + fn get<'g, Q, R: ReadMode>(&'g self, k: &Q, g: &'g Guard) -> Option<&'g T::Value> + where + T::Key: Borrow, + Q: AsKey + ?Sized, + { + self._lookup::(k, g) + } + fn _lookup<'g, Q, R: ReadMode>(&'g self, k: &Q, g: &'g Guard) -> R::Ret<'g> + where + T::Key: Borrow, + Q: AsKey + ?Sized, + { + let mut hash = self.hash(k); + let mut current = &self.root; + loop { + let node = current.ld_acq(g); + match ldfl(&node) { + _ if node.is_null() => { + // honestly, if this ran on the root I'm going to die laughing (@ohsayan) + return R::nx(); + } + flag if hf(flag, NodeFlag::DATA) => { + let mut ret = R::nx(); + return unsafe { + // UNSAFE(@ohsayan): checked flag + nullck + Self::read_data(node).iter().find_map(|e| { + e.key().borrow().eq(k).then_some({ + ret = R::ex(e); + Some(()) + }) + }); + ret + }; + } + _ => { + // branch + current = &unsafe { node.deref() }.branch[(hash & C::HASH_MASK) as usize]; + hash >>= C::BRANCH_LG; + } + } + } + } + fn remove<'g, Q>(&'g self, k: &Q, g: &'g Guard) -> bool + where + T::Key: Borrow, + Q: AsKey + ?Sized, + { + self._remove::(k, g) + } + fn remove_return<'g, Q>(&'g self, k: &Q, g: &'g Guard) -> Option<&'g T::Value> + where + T::Key: Borrow, + Q: AsKey + ?Sized, + { + self._remove::(k, g) + } + fn _remove<'g, Q, W: WriteMode>(&'g self, k: &Q, g: &'g Guard) -> W::Ret<'g> + where + T::Key: Borrow, + Q: AsKey + ?Sized, + { + let hash = self.hash(k); + let mut current = &self.root; + let mut level = C::LEVEL_ZERO; + let mut levels = UArray::<{ ::BRANCH_MX }, _>::new(); + 'retry: loop { + let node = current.ld_acq(g); + match ldfl(&node) { + _ if node.is_null() => { + // lol + return W::nx(); + } + flag if hf(flag, NodeFlag::PENDING_DELETE) => { + let (p, c) = levels.pop().unwrap(); + unsafe { + /* + we hit a node that might be deleted, we aren't allowed to change it, so we'll attempt a + compression as well. same thing here as the other routines....can we do anything to avoid + the expensive root traversal? + */ + Self::compress(p, c, g); + } + levels.clear(); + level = C::LEVEL_ZERO; + current = &self.root; + } + flag if hf(flag, NodeFlag::DATA) => { + let data = unsafe { + // UNSAFE(@ohsayan): flagck + Self::read_data(node) + }; + let mut ret = W::nx(); + // this node shouldn't be empty + debug_assert!(!data.is_empty(), "logic,empty node not collected"); + // build new lnode + let r: LNode = data + .iter() + .filter_map(|this_elem| { + if this_elem.key().borrow() == k { + ret = W::ex(this_elem); + None + } else { + Some(this_elem.clone()) + } + }) + .collect(); + let replace = if r.is_empty() { + // don't create dead nodes + Shared::null() + } else { + Self::new_lnode(r).into_shared(g) + }; + match current.cx_rel(node, replace, g) { + Ok(_) => { + // swapped it out + unsafe { + // UNSAFE(@ohsayan): flagck + g.defer_destroy(Shared::>::from( + node.as_raw() as *const LNode<_> + )); + } + } + Err(CompareExchangeError { new, .. }) if !new.is_null() => { + // failed to swap it in, and it had some data + unsafe { + // UNSAFE(@ohsayan): Never published it, all ours + g.defer_destroy(Shared::>::from( + new.as_raw() as *const LNode<_> + )); + } + continue 'retry; + } + Err(_) => continue 'retry, + } + // attempt compressions + for (p, c) in levels.into_iter().rev() { + let live_nodes = unsafe { + // UNSAFE(@ohsayan): guard + c.deref() + } + .branch + .iter() + .filter(|n| !n.ld_rlx(g).is_null()) + .count(); + if live_nodes > 1 { + break; + } + if unsafe { + // UNSAFE(@ohsayan): we know for a fact that we only have sensible levels + Self::compress(p, c, g) + } == CompressState::RESTORED + { + // simply restored the earlier state, so let's stop + break; + } + } + gc(g); + return ret; + } + _ => { + // branch + levels.push((current, node)); + let nxidx = (hash >> level) & C::HASH_MASK; + level += C::BRANCH_LG; + current = &unsafe { node.deref() }.branch[nxidx as usize]; + } + } + } + } +} + +// low-level methods +impl Tree { + // hilarious enough but true, l doesn't affect safety but only creates an incorrect state + fn decr_len(&self) { + self.l.fetch_sub(1, ORD_ACQ); + } + fn incr_len(&self) { + self.l.fetch_add(1, ORD_ACQ); + } + #[inline(always)] + fn new_lnode(node: LNode) -> Owned> { + unsafe { + Owned::>::from_raw(Box::into_raw(Box::new(node)) as *mut Node<_>) + .with_tag(NodeFlag::DATA.d()) + } + } + /// Returns a new inner node, in the form of a data probe leaf + /// ☢ WARNING ☢: Do not drop this naively for god's sake + #[inline(always)] + fn new_data(data: T) -> Owned> { + let mut d = LNode::new(); + unsafe { + // UNSAFE(@ohsayan): empty arr + d.push_unchecked(data) + }; + Self::new_lnode(d) + } + unsafe fn read_data<'g>(d: Shared<'g, Node>) -> &'g LNode { + debug_assert!(hf(ldfl(&d), NodeFlag::DATA)); + (d.as_raw() as *const LNode<_>) + .as_ref() + .expect("logic,nullptr in lnode") + } + /// SAFETY: Ensure you have some actual data and not random garbage + #[inline(always)] + unsafe fn ldrop(leaf: Shared>) { + debug_assert!(hf(ldfl(&leaf), NodeFlag::DATA)); + drop(Owned::>::from_raw(leaf.as_raw() as *mut _)) + } + unsafe fn rdrop(n: &Atomic>) { + let g = upin(); + let node = n.ld_acq(g); + match ldfl(&node) { + _ if node.is_null() => {} + flag if hf(flag, NodeFlag::DATA) => Self::ldrop(node), + _ => { + // a branch + let this_branch = node.into_owned(); + for child in &this_branch.branch { + Self::rdrop(child) + } + drop(this_branch); + } + } + } + unsafe fn compress<'g>( + parent: &Atomic>, + child: Shared<'g, Node>, + g: &'g Guard, + ) -> CompressState { + /* + We look at the child's children and determine whether we can clean the child up. Although the amount of + memory we can save is not something very signficant but it becomes important with larger cardinalities + */ + debug_assert!(!hf(ldfl(&child), NodeFlag::DATA), "logic,compress lnode"); + debug_assert_eq!(ldfl(&child), 0, "logic,compress pending delete node"); + let branch = child.deref(); + let mut continue_compress = true; + let mut last_leaf = None; + let mut new_child = Node::null(); + let mut cnt = 0_usize; + + let mut i = 0; + while i < C::BRANCH_MX { + let ref child_ref = branch.branch[i]; + let this_child = child_ref.fetch_or(NodeFlag::PENDING_DELETE.d(), ORD_ACR, g); + let this_child = this_child.with_tag(cf(ldfl(&this_child), NodeFlag::PENDING_DELETE)); + match ldfl(&this_child) { + // lol, dangling child + _ if this_child.is_null() => {} + // some data in here + flag if hf(flag, NodeFlag::DATA) => { + last_leaf = Some(this_child); + cnt += Self::read_data(this_child).len(); + } + // branch + _ => { + continue_compress = false; + cnt += 1; + } + } + new_child.branch[i] = Atomic::from(this_child); + i += 1; + } + + let insert; + let ret; + let mut drop = None; + + match last_leaf { + Some(node) if continue_compress && cnt == 1 => { + // snode + insert = node; + ret = CompressState::SNODE; + } + None if cnt == 0 => { + // a dangling branch + insert = Shared::null(); + ret = CompressState::NULL; + } + _ => { + // we can't compress this since we have a lot of children + let new = Owned::new(new_child).into_shared(g); + insert = new; + drop = Some(new); + ret = CompressState::RESTORED; + } + } + + // all logic done; let's see what fate the CAS brings us + match parent.cx_rel(child, insert, g) { + Ok(_) => { + unsafe { + // UNSAFE(@ohsayan): We're the thread in the last epoch who's seeing this; so, we're good + g.defer_destroy(child); + } + ret + } + Err(_) => { + mem::drop(drop.map(|n| Shared::into_owned(n))); + CompressState::CASFAIL + } + } + } +} + +impl Drop for Tree { + fn drop(&mut self) { + unsafe { + // UNSAFE(@ohsayan): sole live owner + Self::rdrop(&self.root); + } + gc(&cpin()) + } +} diff --git a/server/src/engine/macros.rs b/server/src/engine/macros.rs index c710e8af..a65c7f84 100644 --- a/server/src/engine/macros.rs +++ b/server/src/engine/macros.rs @@ -34,6 +34,7 @@ macro_rules! extract { }; } +#[cfg(test)] macro_rules! multi_assert_eq { ($($lhs:expr),* => $rhs:expr) => { $(assert_eq!($lhs, $rhs);)* @@ -49,6 +50,7 @@ macro_rules! enum_impls { }; } +#[allow(unused_macros)] macro_rules! assertions { ($($assert:expr),*$(,)?) => {$(const _:()=::core::assert!($assert);)*} } @@ -82,6 +84,7 @@ macro_rules! flags { ); } +#[allow(unused_macros)] macro_rules! union { ($(#[$attr:meta])* $vis:vis union $name:ident $tail:tt) => (union!(@parse [$(#[$attr])* $vis union $name] [] $tail);); ($(#[$attr:meta])* $vis:vis union $name:ident<$($lt:lifetime),*> $tail:tt) => (union!(@parse [$(#[$attr])* $vis union $name<$($lt),*>] [] $tail);); diff --git a/server/src/engine/mem/vinline.rs b/server/src/engine/mem/vinline.rs index e3f84c54..663ddbef 100644 --- a/server/src/engine/mem/vinline.rs +++ b/server/src/engine/mem/vinline.rs @@ -71,7 +71,6 @@ impl VInline { // UNSAFE(@ohsayan): grow allocated the cap we needed self.push_unchecked(v); } - self.l += 1; } #[inline(always)] pub fn clear(&mut self) { @@ -182,8 +181,9 @@ impl VInline { p as *mut T } } - unsafe fn push_unchecked(&mut self, v: T) { + pub unsafe fn push_unchecked(&mut self, v: T) { self._as_mut_ptr().add(self.l).write(v); + self.l += 1; } pub fn optimize_capacity(&mut self) { if self.on_stack() || self.len() == self.capacity() { diff --git a/server/src/engine/mod.rs b/server/src/engine/mod.rs index 9d5855f5..b367bf89 100644 --- a/server/src/engine/mod.rs +++ b/server/src/engine/mod.rs @@ -25,12 +25,11 @@ */ #![allow(dead_code)] -#![allow(unused_macros)] #[macro_use] mod macros; mod core; mod idx; +mod mem; mod ql; mod sync; -mod mem; diff --git a/server/src/engine/ql/macros.rs b/server/src/engine/ql/macros.rs index 10b9ff99..4b60718a 100644 --- a/server/src/engine/ql/macros.rs +++ b/server/src/engine/ql/macros.rs @@ -282,6 +282,7 @@ macro_rules! dict { }}; } +#[cfg(test)] macro_rules! nullable_dict { () => { dict! {} @@ -295,6 +296,7 @@ macro_rules! nullable_dict { }; } +#[cfg(test)] macro_rules! dict_nullable { () => { <::std::collections::HashMap<_, _> as ::core::default::Default>::default() @@ -306,6 +308,7 @@ macro_rules! dict_nullable { }}; } +#[cfg(test)] macro_rules! set { () => { <::std::collections::HashSet<_> as ::core::default::Default>::default() @@ -317,14 +320,17 @@ macro_rules! set { }}; } +#[cfg(test)] macro_rules! into_array { ($($e:expr),* $(,)?) => { [$($e.into()),*] }; } +#[cfg(test)] macro_rules! into_array_nullable { ($($e:expr),* $(,)?) => { [$($crate::engine::ql::tests::nullable_datatype($e)),*] }; } +#[allow(unused_macros)] macro_rules! statictbl { ($name:ident: $kind:ty => [$($expr:expr),*]) => {{ const LEN: usize = {let mut i = 0;$(let _ = $expr; i += 1;)*i};