@ -26,8 +26,6 @@
use bytes ::Bytes ;
use libsky ::TResult ;
use parking_lot ::Condvar ;
use parking_lot ::Mutex ;
use serde ::{ Deserialize , Serialize } ;
use std ::borrow ::Borrow ;
use std ::fmt ;
@ -35,12 +33,8 @@ use std::hash::Hash;
use std ::iter ::FromIterator ;
use std ::marker ::PhantomData ;
use std ::ops ::Deref ;
use std ::sync ::atomic ::AtomicBool ;
use std ::sync ::atomic ::Ordering ;
use std ::sync ::Arc ;
const ORDERING_RELAXED : Ordering = Ordering ::Relaxed ;
#[ derive(Debug) ]
/// A thread-safe in-memory hashtable
///
@ -87,52 +81,6 @@ where
}
}
/// A [`CVar`] is a conditional variable that uses zero CPU time while waiting on a condition
///
/// This Condvar was specifically built for use with [`Coremap`] which uses a [`TableLockstateGuard`]
/// object to temporarily deny all writes
#[ derive(Debug) ]
struct Cvar {
c : Condvar ,
m : Mutex < ( ) > ,
}
impl Cvar {
fn new ( ) -> Self {
Self {
c : Condvar ::new ( ) ,
m : Mutex ::new ( ( ) ) ,
}
}
/// Notify all the threads waiting on this condvar that the state has changed
fn notify_all ( & self ) {
let _ = self . c . notify_all ( ) ;
}
/// Wait for a notification on the conditional variable
fn wait ( & self , locked_state : & AtomicBool ) {
while locked_state . load ( ORDERING_RELAXED ) {
// only wait if locked_state is true
let guard = self . m . lock ( ) ;
let mut owned_guard = guard ;
self . c . wait ( & mut owned_guard ) ;
}
}
/// Wait for a notification and then immediately run a closure as soon as the `locked_state`
/// is false
fn wait_and_then_immediately < T , F > ( & self , locked_state : & AtomicBool , and_then : F ) -> T
where
F : Fn ( ) -> T ,
{
while locked_state . load ( ORDERING_RELAXED ) {
// only wait if locked_state is true
let guard = self . m . lock ( ) ;
let mut owned_guard = guard ;
self . c . wait ( & mut owned_guard ) ;
}
and_then ( )
}
}
use dashmap ::iter ::Iter ;
use dashmap ::mapref ::entry ::Entry ;
use dashmap ::mapref ::one ::Ref ;
@ -147,8 +95,6 @@ where
K : Eq + Hash ,
{
inner : HashTable < K , V > ,
state_lock : AtomicBool ,
state_condvar : Cvar ,
}
impl < K : Eq + Hash , V > Deref for HTable < K , V > {
@ -158,43 +104,6 @@ impl<K: Eq + Hash, V> Deref for HTable<K, V> {
}
}
/// A table lock state guard
///
/// This object holds a locked [`Coremap`] object. The locked state corresponds to the internal `state_lock`
/// `AtomicBool`'s value. You can use the [`TableLockStateGuard`] to reference the actual table and do any operations
/// on it. It is recommended that whenever you're about to do a BGSAVE operation, call [`Coremap::lock_writes()`]
/// and you'll get this object. Use this object to mutate/read the data of the inner hashtable and then as soon
/// as this lock state goes out of scope, you can be sure that all threads waiting to write will get access.
///
/// ## Undefined Behavior (UB)
///
/// It is **absolutely undefined behavior to hold two lock states** for the same table because each one will
/// attempt to notify the other waiting threads. This will never happen unless you explicitly attempt to do it
/// as [`Coremap`] will wait for a [`TableLockStateGuard`] to be available before it gives you one
pub struct TableLockStateGuard < ' a , K , V >
where
K : Eq + Hash + Serialize ,
V : Serialize ,
{
inner : & ' a Coremap < K , V > ,
}
impl < ' a , K : Eq + Hash + Serialize , V : Serialize > Deref for TableLockStateGuard < ' a , K , V > {
type Target = Coremap < K , V > ;
fn deref ( & self ) -> & Self ::Target {
& self . inner
}
}
impl < ' a , K : Hash + Eq + Serialize , V : Serialize > Drop for TableLockStateGuard < ' a , K , V > {
fn drop ( & mut self ) {
unsafe {
// UNSAFE(@ohsayan): we know that no such guards exist, so indicate that the guards has been released
self . inner . _force_unlock_writes ( ) ;
}
}
}
impl < K , V > Coremap < K , V >
where
K : Eq + Hash + Serialize ,
@ -204,8 +113,6 @@ where
pub fn new ( ) -> Self {
Coremap {
inner : HashTable ::new ( ) ,
state_lock : AtomicBool ::new ( false ) ,
state_condvar : Cvar ::new ( ) ,
}
}
/// Returns the total number of key value pairs
@ -218,7 +125,6 @@ where
K : Borrow < Q > ,
Q : Hash + Eq + ? Sized ,
{
self . wait_for_write_unlock ( ) ;
self . inner . remove ( key )
}
/// Returns true if an existent key was removed
@ -227,7 +133,6 @@ where
K : Borrow < Q > ,
Q : Hash + Eq + ? Sized ,
{
self . wait_for_write_unlock ( ) ;
self . inner . remove ( key ) . is_some ( )
}
/// Check if a table contains a key
@ -240,7 +145,6 @@ where
}
/// Clears the inner table!
pub fn clear ( & self ) {
self . wait_for_write_unlock ( ) ;
self . inner . clear ( )
}
/// Return a non-consuming iterator
@ -257,7 +161,6 @@ where
}
/// Returns true if the non-existent key was assigned to a value
pub fn true_if_insert ( & self , k : K , v : V ) -> bool {
self . wait_for_write_unlock ( ) ;
if let Entry ::Vacant ( ve ) = self . inner . entry ( k ) {
ve . insert ( v ) ;
true
@ -267,12 +170,10 @@ where
}
/// Update or insert
pub fn upsert ( & self , k : K , v : V ) {
self . wait_for_write_unlock ( ) ;
let _ = self . inner . insert ( k , v ) ;
}
/// Returns true if the value was updated
pub fn true_if_update ( & self , k : K , v : V ) -> bool {
self . wait_for_write_unlock ( ) ;
if let Entry ::Occupied ( mut oe ) = self . inner . entry ( k ) {
oe . insert ( v ) ;
true
@ -284,62 +185,13 @@ where
pub fn serialize ( & self ) -> TResult < Vec < u8 > > {
bincode ::serialize ( & self . inner ) . map_err ( | e | e . into ( ) )
}
/// Force lock writes on the underlying table
///
/// ## Safety
/// This function is unsafe to be called directly and may result in undefined behavior (UB).
/// Instead, call [`Coremap::lock_writes`]
unsafe fn _force_lock_writes ( & self ) -> TableLockStateGuard < ' _ , K , V > {
self . state_lock . store ( true , ORDERING_RELAXED ) ;
self . state_condvar . notify_all ( ) ;
TableLockStateGuard { inner : & self }
}
/// Force unlock writes on the underlying table
///
/// ## Safety
/// This function is unsafe to be called directly and may result in undefined behavior (UB).
/// Instead, call [`Coremap::lock_writes`] and then drop the [`TableLockStateGuard`] to unlock
/// writes on the table (will be dropped as soon as it goes out of scope)
unsafe fn _force_unlock_writes ( & self ) {
self . state_lock . store ( false , ORDERING_RELAXED ) ;
self . state_condvar . notify_all ( ) ;
}
/// Blocks the current thread, waiting for an unlock on writes
fn wait_for_write_unlock ( & self ) {
self . state_condvar . wait ( & self . state_lock ) ;
}
/// Wait for an unlock on writes and then immediately run the provided closure (`then`)
fn wait_for_write_unlock_and_then < T , F > ( & self , then : F ) -> T
where
F : Fn ( ) -> T ,
{
self . state_condvar
. wait_and_then_immediately ( & self . state_lock , then )
}
/// Lock writes on the table
///
/// This will immediately return a [`TableLockStateGuard`] if the table is in an unlocked state,
/// but however **will block if the table is already locked** and then return when a guard is available
pub fn lock_writes ( & self ) -> TableLockStateGuard < ' _ , K , V > {
self . wait_for_write_unlock_and_then ( | | unsafe {
// UNSAFE(@ohsayan): This is safe because we're running it exactly after acquiring a lock
// since we've got a write unlock at this exact point, we're free to lock the table
// so this _should be_ safe
// FIXME: UB/race condition here? What if exactly after the write unlock another thread does a lock_writes?
self . _force_lock_writes ( )
} )
}
}
impl Coremap < Data , Data > {
/// Returns a `Coremap<Data, Data>` from the provided file (as a `Vec<u8>`)
pub fn deserialize ( src : Vec < u8 > ) -> TResult < Self > {
let h : HashTable < Data , Data > = bincode ::deserialize ( & src ) ? ;
Ok ( Self {
inner : h ,
state_lock : AtomicBool ::new ( false ) ,
state_condvar : Cvar ::new ( ) ,
} )
Ok ( Self { inner : h } )
}
/// Returns atleast `count` number of keys from the hashtable
pub fn get_keys ( & self , count : usize ) -> Vec < Bytes > {
@ -389,8 +241,6 @@ where
Self {
inner : Arc ::new ( Coremap {
inner : DashMap ::from_iter ( iter ) ,
state_lock : AtomicBool ::new ( false ) ,
state_condvar : Cvar ::new ( ) ,
} ) ,
_marker_value : PhantomData ,
_marker_key : PhantomData ,
@ -408,8 +258,6 @@ where
{
Coremap {
inner : DashMap ::from_iter ( iter ) ,
state_lock : AtomicBool ::new ( false ) ,
state_condvar : Cvar ::new ( ) ,
}
}
}
@ -511,194 +359,3 @@ fn test_de() {
hmap . upsert ( Data ::from ( "sayan" ) , Data ::from ( "writes code" ) ) ;
assert! ( hmap . get ( "sayan" . as_bytes ( ) ) . is_some ( ) ) ;
}
#[ cfg(test) ]
mod concurrency_tests {
use super ::HTable ;
#[ test ]
fn test_race_and_multiple_table_lock_state_guards ( ) {
// this will test for a race condition and should take approximately 40 seconds to complete
// although that doesn't include any possible delays involved
// Uncomment the `println`s for seeing the thing in action (or to debug)
use std ::sync ::mpsc ;
use std ::thread ;
use std ::time ::Duration ;
let skymap : HTable < & str , & str > = HTable ::new ( ) ;
for _ in 0 .. 1000 {
let c1 = skymap . clone ( ) ;
let c2 = skymap . clone ( ) ;
let c3 = skymap . clone ( ) ;
let c4 = skymap . clone ( ) ;
// all producers will send a +1 on acquiring a lock and -1 on releasing a lock
let ( tx , rx ) = mpsc ::channel ::< isize > ( ) ;
// this variable maintains the number of table wide write locks that are currently held
let mut number_of_table_wide_locks = 0 ;
let thread_2_sender = tx . clone ( ) ;
let thread_3_sender = tx . clone ( ) ;
let thread_4_sender = tx . clone ( ) ;
let ( h1 , h2 , h3 , h4 ) = (
thread ::spawn ( move | | {
// println!("[T1] attempting acquire/waiting on lock");
let lck = c1 . lock_writes ( ) ;
tx . send ( 1 ) . unwrap ( ) ;
// println!("[T1] Acquired lock now");
for _i in 0 .. 10 {
// println!("[T1] Sleeping for {}/10ms", i + 1);
thread ::sleep ( Duration ::from_millis ( 1 ) ) ;
}
drop ( lck ) ;
tx . send ( - 1 ) . unwrap ( ) ;
drop ( tx ) ;
// println!("[T1] Dropped lock");
} ) ,
thread ::spawn ( move | | {
let tx = thread_2_sender ;
// println!("[T2] attempting acquire/waiting on lock");
let lck = c2 . lock_writes ( ) ;
tx . send ( 1 ) . unwrap ( ) ;
// println!("[T2] Acquired lock now");
for _i in 0 .. 10 {
// println!("[T2] Sleeping for {}/10ms", i + 1);
thread ::sleep ( Duration ::from_millis ( 1 ) ) ;
}
drop ( lck ) ;
tx . send ( - 1 ) . unwrap ( ) ;
drop ( tx ) ;
// println!("[T2] Dropped lock")
} ) ,
thread ::spawn ( move | | {
let tx = thread_3_sender ;
// println!("[T3] attempting acquire/waiting on lock");
let lck = c3 . lock_writes ( ) ;
tx . send ( 1 ) . unwrap ( ) ;
// println!("[T3] Acquired lock now");
for _i in 0 .. 10 {
// println!("[T3] Sleeping for {}/10ms", i + 1);
thread ::sleep ( Duration ::from_millis ( 1 ) ) ;
}
drop ( lck ) ;
tx . send ( - 1 ) . unwrap ( ) ;
drop ( tx ) ;
// println!("[T3] Dropped lock");
} ) ,
thread ::spawn ( move | | {
let tx = thread_4_sender ;
// println!("[T4] attempting acquire/waiting on lock");
let lck = c4 . lock_writes ( ) ;
tx . send ( 1 ) . unwrap ( ) ;
// println!("[T4] Acquired lock now");
for _i in 0 .. 10 {
// println!("[T4] Sleeping for {}/10ms", i + 1);
thread ::sleep ( Duration ::from_millis ( 1 ) ) ;
}
drop ( lck ) ;
tx . send ( - 1 ) . unwrap ( ) ;
drop ( tx ) ;
// println!("[T4] Dropped lock");
} ) ,
) ;
// allow this because we're just trying to make sure that all threads are terminate at the same time
#[ allow(clippy::drop_copy) ]
drop ( (
h1 . join ( ) . unwrap ( ) ,
h2 . join ( ) . unwrap ( ) ,
h3 . join ( ) . unwrap ( ) ,
h4 . join ( ) . unwrap ( ) ,
) ) ;
// allow this lint because this is a test where we just want to keep things simple
#[ allow(clippy::for_loops_over_fallibles) ]
// wait in a loop to receive notifications on this mpsc channel
// all received messages are in the same order as they were produced
for msg in rx . recv ( ) {
// add the sent isize to the counter of number of table wide write locks
number_of_table_wide_locks + = msg ;
if number_of_table_wide_locks > = 2 {
// if there are more than/same as 2 writes at the same time, then that's trouble
// for us
panic! ( "Two threads acquired lock" ) ;
}
}
}
}
#[ test ]
fn test_wait_on_one_thread_insert_others ( ) {
use devtimer ::DevTime ;
use std ::thread ;
use std ::time ::Duration ;
let skymap : HTable < & str , & str > = HTable ::new ( ) ;
assert! ( skymap . true_if_insert ( "sayan" , "wrote some dumb stuff" ) ) ;
let c1 = skymap . clone ( ) ;
let c2 = skymap . clone ( ) ;
let c3 = skymap . clone ( ) ;
let c4 = skymap . clone ( ) ;
let c5 = skymap . clone ( ) ;
let h1 = thread ::spawn ( move | | {
let x = c1 . lock_writes ( ) ;
for _i in 0 .. 10 {
// println!("Waiting to unlock write: {}/10", i + 1);
thread ::sleep ( Duration ::from_secs ( 1 ) ) ;
}
drop ( x ) ;
} ) ;
/*
wait for h1 to start up ; 2 s wait
the other threads will have to wait atleast 7.5 x10 ^ 9 nanoseconds before
they can do anything useful . Atleast because thread ::sleep can essentially sleep for
longer but not lesser . So let ' s say the sleep is actually 2 s , then each thread will have to wait for 8 s ,
if the sleep is longer , say 2.5 ms ( we ' ll * * assume * * a maximum delay of 500 ms in the sleep duration )
then each thread will have to wait for ~ 7.5 s . This is the basis of this test , to ensure that the waiting
threads are notified in a timely fashion , approximate of course . The only exception is the get that
doesn ' t need to mutate anything . Uncomment the ` println ` s for seeing the thing in action ( or to debug )
If anyone sees too many test failures with this duration , adjust it one the basis of the knowledge
that you have acquired here .
* /
thread ::sleep ( Duration ::from_millis ( 2000 ) ) ;
let h2 = thread ::spawn ( move | | {
let mut dt = DevTime ::new_simple ( ) ;
// println!("[T2] Waiting to insert value");
dt . start ( ) ;
c2 . true_if_insert ( "sayan1" , "writes-code" ) ;
dt . stop ( ) ;
assert! ( dt . time_in_nanos ( ) . unwrap ( ) > = 7_500_000_000 ) ;
// println!("[T2] Finished inserting");
} ) ;
let h3 = thread ::spawn ( move | | {
let mut dt = DevTime ::new_simple ( ) ;
// println!("[T3] Waiting to insert value");
dt . start ( ) ;
c3 . true_if_insert ( "sayan2" , "writes-code" ) ;
dt . stop ( ) ;
assert! ( dt . time_in_nanos ( ) . unwrap ( ) > = 7_500_000_000 ) ;
// println!("[T3] Finished inserting");
} ) ;
let h4 = thread ::spawn ( move | | {
let mut dt = DevTime ::new_simple ( ) ;
// println!("[T4] Waiting to insert value");
dt . start ( ) ;
c4 . true_if_insert ( "sayan3" , "writes-code" ) ;
dt . stop ( ) ;
assert! ( dt . time_in_nanos ( ) . unwrap ( ) > = 7_500_000_000 ) ;
// println!("[T4] Finished inserting");
} ) ;
let h5 = thread ::spawn ( move | | {
let mut dt = DevTime ::new_simple ( ) ;
// println!("[T3] Waiting to get value");
dt . start ( ) ;
let _got = c5 . get ( "sayan" ) . map ( | v | * v ) . unwrap_or ( "<none>" ) ;
dt . stop ( ) ;
assert! ( dt . time_in_nanos ( ) . unwrap ( ) < = 1_000_000_000 ) ;
// println!("Got: '{:?}'", got);
// println!("[T3] Finished reading. Returned immediately from now");
} ) ;
// allow this because we're just trying to make sure that all threads are terminate at the same time
#[ allow(clippy::drop_copy) ]
drop ( (
h1 . join ( ) . unwrap ( ) ,
h2 . join ( ) . unwrap ( ) ,
h3 . join ( ) . unwrap ( ) ,
h4 . join ( ) . unwrap ( ) ,
h5 . join ( ) . unwrap ( ) ,
) ) ;
}
}