Add `TMCell`

next
Sayan Nandan 2 years ago
parent 44aa57a25a
commit 2dfe7227aa
No known key found for this signature in database
GPG Key ID: 42EEDF4AE9D96B54

@ -19,7 +19,7 @@ env_logger = "0.9.0"
hashbrown = { version = "0.12.3", features = ["raw"] }
log = "0.4.17"
openssl = { version = "0.10.41", features = ["vendored"] }
crossbeam-epoch = "0.9.13"
crossbeam-epoch = { version = "0.9.13" }
parking_lot = "0.12.1"
regex = "1.6.0"
serde = { version = "1.0.144", features = ["derive"] }

@ -24,15 +24,12 @@
*
*/
use core::{
fmt,
mem::{self, size_of},
ops::Deref,
use core::{fmt, mem, ops::Deref, sync::atomic::Ordering};
use crossbeam_epoch::{Atomic as CBAtomic, CompareExchangeError, Pointable, Pointer};
// re-export here because we have some future plans ;) (@ohsayan)
pub use crossbeam_epoch::{
pin as pin_current, unprotected as pin_unprotected, Guard, Owned, Shared,
};
use crossbeam_epoch::{
Atomic as CBAtomic, CompareExchangeError, Guard, Pointable, Pointer, Shared,
};
use std::sync::atomic::Ordering;
pub(super) const ORD_RLX: Ordering = Ordering::Relaxed;
pub(super) const ORD_ACQ: Ordering = Ordering::Acquire;
@ -58,7 +55,10 @@ impl<T> fmt::Debug for Atomic<T> {
impl<T: Pointable> Atomic<T> {
// the compile time address size check ensures "first class" sanity
const _ENSURE_FLAG_STATIC_CHECK: () = ensure_flag_align::<T>(size_of::<Self>());
const _ENSURE_FLAG_STATIC_CHECK: () = ensure_flag_align::<T>(0);
/// Instantiates a new atomic
///
/// **This will allocate**
pub fn new_alloc(t: T) -> Self {
let _ = Self::_ENSURE_FLAG_STATIC_CHECK;
Self {
@ -114,6 +114,10 @@ impl<T: Pointable> Atomic<T> {
pub(super) fn ld_acq<'g>(&self, g: &'g Guard) -> Shared<'g, T> {
self.ld(ORD_ACQ, g)
}
#[inline(always)]
pub(crate) fn ld_rlx<'g>(&self, g: &'g Guard) -> Shared<'g, T> {
self.ld(ORD_RLX, g)
}
}
impl<T, A> From<A> for Atomic<T>

@ -0,0 +1,148 @@
/*
* Created on Sat Jan 21 2023
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2023, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use super::atm::{pin_unprotected, Atomic, Guard, Owned, Shared, ORD_REL};
use core::ops::Deref;
use parking_lot::{Mutex, MutexGuard};
use std::marker::PhantomData;
/// A [`TMCell`] provides atomic reads and serialized writes; the `static` is a CB hack
pub struct TMCell<T: 'static> {
a: Atomic<T>,
g: Mutex<()>,
}
impl<T: 'static> TMCell<T> {
pub fn new(v: T) -> Self {
Self {
a: Atomic::new_alloc(v),
g: Mutex::new(()),
}
}
pub fn begin_write_txn<'a, 'g>(&'a self, g: &'g Guard) -> TMCellWriteTxn<'a, 'g, T> {
let wg = self.g.lock();
let snapshot = self.a.ld_acq(g);
let data: &'g T = unsafe {
// UNSAFE(@ohsayan): first, non-null (TMCell is never null). second, the guard
snapshot.deref()
};
TMCellWriteTxn::new(data, &self.a, wg)
}
pub fn begin_read_txn<'a, 'g>(&'a self, g: &'g Guard) -> TMCellReadTxn<'a, 'g, T> {
let snapshot = self.a.ld_acq(g);
let data: &'g T = unsafe {
// UNSAFE(@ohsayan): non-null and the guard
snapshot.deref()
};
TMCellReadTxn::new(data)
}
}
impl<T> Drop for TMCell<T> {
fn drop(&mut self) {
unsafe {
// UNSAFE(@ohsayan): Sole owner with mutable access
let g = pin_unprotected();
let shptr = self.a.ld_rlx(&g);
g.defer_destroy(shptr);
}
}
}
pub struct TMCellReadTxn<'a, 'g, T: 'static> {
d: &'g T,
_m: PhantomData<&'a TMCell<T>>,
}
impl<'a, 'g, T> TMCellReadTxn<'a, 'g, T> {
#[inline(always)]
pub fn new(d: &'g T) -> Self {
Self { d, _m: PhantomData }
}
#[inline(always)]
pub fn read(&self) -> &'g T {
self.d
}
}
impl<'a, 'g, T: Copy> TMCellReadTxn<'a, 'g, T> {
fn read_copy(&self) -> T {
*self.d
}
}
impl<'a, 'g, T> Deref for TMCellReadTxn<'a, 'g, T> {
type Target = T;
fn deref(&self) -> &'g Self::Target {
self.d
}
}
pub struct TMCellWriteTxn<'a, 'g, T: 'static> {
d: &'g T,
a: &'a Atomic<T>,
g: MutexGuard<'a, ()>,
}
impl<'a, 'g, T> TMCellWriteTxn<'a, 'g, T> {
#[inline(always)]
pub fn new(d: &'g T, a: &'a Atomic<T>, g: MutexGuard<'a, ()>) -> Self {
Self { d, a, g }
}
pub fn publish_commit(self, new: T, g: &'g Guard) {
self._commit(new, g, |p| {
unsafe {
// UNSAFE(@ohsayan): Unlinked
g.defer_destroy(p);
}
})
}
fn _commit<F, R>(self, new: T, g: &'g Guard, f: F) -> R
where
F: FnOnce(Shared<T>) -> R,
{
let new = Owned::new(new);
let r = self.a.swap(new, ORD_REL, g);
f(r)
}
#[inline(always)]
pub fn read(&self) -> &'g T {
self.d
}
}
impl<'a, 'g, T: Copy> TMCellWriteTxn<'a, 'g, T> {
fn read_copy(&self) -> T {
*self.d
}
}
impl<'a, 'g, T> Deref for TMCellWriteTxn<'a, 'g, T> {
type Target = T;
fn deref(&self) -> &'g Self::Target {
self.d
}
}

@ -25,3 +25,4 @@
*/
pub(super) mod atm;
pub(super) mod cell;

Loading…
Cancel
Save