From 2dfe7227aa25100dc9e707635b149db76a05049a Mon Sep 17 00:00:00 2001 From: Sayan Nandan Date: Sun, 22 Jan 2023 09:27:05 +0000 Subject: [PATCH] Add `TMCell` --- server/Cargo.toml | 2 +- server/src/engine/sync/atm.rs | 22 +++-- server/src/engine/sync/cell.rs | 148 +++++++++++++++++++++++++++++++++ server/src/engine/sync/mod.rs | 1 + 4 files changed, 163 insertions(+), 10 deletions(-) create mode 100644 server/src/engine/sync/cell.rs diff --git a/server/Cargo.toml b/server/Cargo.toml index d2c4862b..45b26e9a 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -19,7 +19,7 @@ env_logger = "0.9.0" hashbrown = { version = "0.12.3", features = ["raw"] } log = "0.4.17" openssl = { version = "0.10.41", features = ["vendored"] } -crossbeam-epoch = "0.9.13" +crossbeam-epoch = { version = "0.9.13" } parking_lot = "0.12.1" regex = "1.6.0" serde = { version = "1.0.144", features = ["derive"] } diff --git a/server/src/engine/sync/atm.rs b/server/src/engine/sync/atm.rs index dd4e2488..fd729148 100644 --- a/server/src/engine/sync/atm.rs +++ b/server/src/engine/sync/atm.rs @@ -24,15 +24,12 @@ * */ -use core::{ - fmt, - mem::{self, size_of}, - ops::Deref, +use core::{fmt, mem, ops::Deref, sync::atomic::Ordering}; +use crossbeam_epoch::{Atomic as CBAtomic, CompareExchangeError, Pointable, Pointer}; +// re-export here because we have some future plans ;) (@ohsayan) +pub use crossbeam_epoch::{ + pin as pin_current, unprotected as pin_unprotected, Guard, Owned, Shared, }; -use crossbeam_epoch::{ - Atomic as CBAtomic, CompareExchangeError, Guard, Pointable, Pointer, Shared, -}; -use std::sync::atomic::Ordering; pub(super) const ORD_RLX: Ordering = Ordering::Relaxed; pub(super) const ORD_ACQ: Ordering = Ordering::Acquire; @@ -58,7 +55,10 @@ impl fmt::Debug for Atomic { impl Atomic { // the compile time address size check ensures "first class" sanity - const _ENSURE_FLAG_STATIC_CHECK: () = ensure_flag_align::(size_of::()); + const _ENSURE_FLAG_STATIC_CHECK: () = ensure_flag_align::(0); + /// Instantiates a new atomic + /// + /// **This will allocate** pub fn new_alloc(t: T) -> Self { let _ = Self::_ENSURE_FLAG_STATIC_CHECK; Self { @@ -114,6 +114,10 @@ impl Atomic { pub(super) fn ld_acq<'g>(&self, g: &'g Guard) -> Shared<'g, T> { self.ld(ORD_ACQ, g) } + #[inline(always)] + pub(crate) fn ld_rlx<'g>(&self, g: &'g Guard) -> Shared<'g, T> { + self.ld(ORD_RLX, g) + } } impl From for Atomic diff --git a/server/src/engine/sync/cell.rs b/server/src/engine/sync/cell.rs new file mode 100644 index 00000000..132a31c9 --- /dev/null +++ b/server/src/engine/sync/cell.rs @@ -0,0 +1,148 @@ +/* + * Created on Sat Jan 21 2023 + * + * This file is a part of Skytable + * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source + * NoSQL database written by Sayan Nandan ("the Author") with the + * vision to provide flexibility in data modelling without compromising + * on performance, queryability or scalability. + * + * Copyright (c) 2023, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +use super::atm::{pin_unprotected, Atomic, Guard, Owned, Shared, ORD_REL}; +use core::ops::Deref; +use parking_lot::{Mutex, MutexGuard}; +use std::marker::PhantomData; + +/// A [`TMCell`] provides atomic reads and serialized writes; the `static` is a CB hack +pub struct TMCell { + a: Atomic, + g: Mutex<()>, +} + +impl TMCell { + pub fn new(v: T) -> Self { + Self { + a: Atomic::new_alloc(v), + g: Mutex::new(()), + } + } + pub fn begin_write_txn<'a, 'g>(&'a self, g: &'g Guard) -> TMCellWriteTxn<'a, 'g, T> { + let wg = self.g.lock(); + let snapshot = self.a.ld_acq(g); + let data: &'g T = unsafe { + // UNSAFE(@ohsayan): first, non-null (TMCell is never null). second, the guard + snapshot.deref() + }; + TMCellWriteTxn::new(data, &self.a, wg) + } + pub fn begin_read_txn<'a, 'g>(&'a self, g: &'g Guard) -> TMCellReadTxn<'a, 'g, T> { + let snapshot = self.a.ld_acq(g); + let data: &'g T = unsafe { + // UNSAFE(@ohsayan): non-null and the guard + snapshot.deref() + }; + TMCellReadTxn::new(data) + } +} + +impl Drop for TMCell { + fn drop(&mut self) { + unsafe { + // UNSAFE(@ohsayan): Sole owner with mutable access + let g = pin_unprotected(); + let shptr = self.a.ld_rlx(&g); + g.defer_destroy(shptr); + } + } +} + +pub struct TMCellReadTxn<'a, 'g, T: 'static> { + d: &'g T, + _m: PhantomData<&'a TMCell>, +} + +impl<'a, 'g, T> TMCellReadTxn<'a, 'g, T> { + #[inline(always)] + pub fn new(d: &'g T) -> Self { + Self { d, _m: PhantomData } + } + #[inline(always)] + pub fn read(&self) -> &'g T { + self.d + } +} + +impl<'a, 'g, T: Copy> TMCellReadTxn<'a, 'g, T> { + fn read_copy(&self) -> T { + *self.d + } +} + +impl<'a, 'g, T> Deref for TMCellReadTxn<'a, 'g, T> { + type Target = T; + fn deref(&self) -> &'g Self::Target { + self.d + } +} + +pub struct TMCellWriteTxn<'a, 'g, T: 'static> { + d: &'g T, + a: &'a Atomic, + g: MutexGuard<'a, ()>, +} + +impl<'a, 'g, T> TMCellWriteTxn<'a, 'g, T> { + #[inline(always)] + pub fn new(d: &'g T, a: &'a Atomic, g: MutexGuard<'a, ()>) -> Self { + Self { d, a, g } + } + pub fn publish_commit(self, new: T, g: &'g Guard) { + self._commit(new, g, |p| { + unsafe { + // UNSAFE(@ohsayan): Unlinked + g.defer_destroy(p); + } + }) + } + fn _commit(self, new: T, g: &'g Guard, f: F) -> R + where + F: FnOnce(Shared) -> R, + { + let new = Owned::new(new); + let r = self.a.swap(new, ORD_REL, g); + f(r) + } + #[inline(always)] + pub fn read(&self) -> &'g T { + self.d + } +} + +impl<'a, 'g, T: Copy> TMCellWriteTxn<'a, 'g, T> { + fn read_copy(&self) -> T { + *self.d + } +} + +impl<'a, 'g, T> Deref for TMCellWriteTxn<'a, 'g, T> { + type Target = T; + fn deref(&self) -> &'g Self::Target { + self.d + } +} diff --git a/server/src/engine/sync/mod.rs b/server/src/engine/sync/mod.rs index 16a24114..a5f67e40 100644 --- a/server/src/engine/sync/mod.rs +++ b/server/src/engine/sync/mod.rs @@ -25,3 +25,4 @@ */ pub(super) mod atm; +pub(super) mod cell;