Merge branch 'memory/improve-cmap' into next

next
Sayan Nandan 3 years ago
commit 153f940ff6

41
Cargo.lock generated

@ -2,6 +2,17 @@
# It is not intended for manual editing. # It is not intended for manual editing.
version = 3 version = 3
[[package]]
name = "ahash"
version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43bb833f0bf979d8475d38fbf09ed3b8a55e1885fe93ad3f93239fc6a4f17b98"
dependencies = [
"getrandom",
"once_cell",
"version_check",
]
[[package]] [[package]]
name = "aho-corasick" name = "aho-corasick"
version = "0.7.18" version = "0.7.18"
@ -185,17 +196,6 @@ dependencies = [
"winapi", "winapi",
] ]
[[package]]
name = "dashmap"
version = "4.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e77a43b28d0668df09411cb0bc9a8c2adc40f9a048afe863e05fd43251e8e39c"
dependencies = [
"cfg-if",
"num_cpus",
"serde",
]
[[package]] [[package]]
name = "devtimer" name = "devtimer"
version = "4.0.1" version = "4.0.1"
@ -400,6 +400,15 @@ dependencies = [
"wasi", "wasi",
] ]
[[package]]
name = "hashbrown"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e"
dependencies = [
"ahash",
]
[[package]] [[package]]
name = "hermit-abi" name = "hermit-abi"
version = "0.1.19" version = "0.1.19"
@ -961,14 +970,16 @@ dependencies = [
"cc", "cc",
"chrono", "chrono",
"clap", "clap",
"dashmap",
"env_logger", "env_logger",
"hashbrown",
"jemallocator", "jemallocator",
"libc", "libc",
"libsky", "libsky",
"libstress", "libstress",
"log", "log",
"num_cpus",
"openssl", "openssl",
"parking_lot",
"rand", "rand",
"regex", "regex",
"serde", "serde",
@ -1186,6 +1197,12 @@ version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191" checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191"
[[package]]
name = "version_check"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5fecdca9a5291cc2b8dcf7dc02453fee791a280f3743cb0905f8822ae463b3fe"
[[package]] [[package]]
name = "wasi" name = "wasi"
version = "0.10.0+wasi-snapshot-preview1" version = "0.10.0+wasi-snapshot-preview1"

@ -13,7 +13,6 @@ sky_macros = { path = "../sky-macros" }
tokio = { version = "1.9.0", features = ["full"] } tokio = { version = "1.9.0", features = ["full"] }
bytes = "1.0.1" bytes = "1.0.1"
libsky = { path = "../libsky" } libsky = { path = "../libsky" }
dashmap = { version = "4.0.2", features = ["serde", "raw-api"] }
serde = { version = "1.0.126", features = ["derive"] } serde = { version = "1.0.126", features = ["derive"] }
toml = "0.5.8" toml = "0.5.8"
clap = { version = "2.33.3", features = ["yaml"] } clap = { version = "2.33.3", features = ["yaml"] }
@ -23,6 +22,9 @@ chrono = "0.4.19"
regex = "1.5.4" regex = "1.5.4"
tokio-openssl = "0.6.2" tokio-openssl = "0.6.2"
openssl = { version = "0.10.35", features = ["vendored"] } openssl = { version = "0.10.35", features = ["vendored"] }
hashbrown = { version = "*", features = ["raw"] }
parking_lot = "0.11.1"
num_cpus = "1.13.0"
[target.'cfg(not(target_env = "msvc"))'.dependencies] [target.'cfg(not(target_env = "msvc"))'.dependencies]
# external deps # external deps

@ -119,7 +119,7 @@ pub(super) fn snapshot_and_update(
// When we snapshotted, we looked at `snapshot`. If the value is still the // When we snapshotted, we looked at `snapshot`. If the value is still the
// same, then we'll update it. Otherwise, let it be // same, then we'll update it. Otherwise, let it be
if let Some(mut mutable) = lowtable.mut_entry(Data::from(key)) { if let Some(mut mutable) = lowtable.mut_entry(Data::from(key)) {
if mutable.get().eq(&snapshot) { if mutable.value().eq(&snapshot) {
mutable.insert(Data::from(value)); mutable.insert(Data::from(value));
} else { } else {
drop(mutable); drop(mutable);

@ -24,6 +24,11 @@
* *
*/ */
use crate::corestore::map::{
bref::{Entry, OccupiedEntry, Ref, VacantEntry},
iter::{BorrowedIter, OwnedIter},
Skymap,
};
use bytes::Bytes; use bytes::Bytes;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::borrow::Borrow; use std::borrow::Borrow;
@ -33,17 +38,7 @@ use std::hash::Hash;
use std::iter::FromIterator; use std::iter::FromIterator;
use std::ops::Deref; use std::ops::Deref;
use dashmap::iter::Iter; type HashTable<K, V> = Skymap<K, V, RandomState>;
pub use dashmap::lock::RwLock as MapRWL;
pub use dashmap::lock::RwLockReadGuard as MapRWLGuard;
pub use dashmap::mapref::entry::Entry as MapEntry;
pub use dashmap::mapref::entry::OccupiedEntry;
use dashmap::mapref::entry::VacantEntry;
pub use dashmap::mapref::one::Ref as MapSingleReference;
use dashmap::mapref::one::Ref;
use dashmap::DashMap;
pub use dashmap::SharedValue;
pub type HashTable<K, V> = DashMap<K, V>;
#[derive(Debug)] #[derive(Debug)]
/// The Coremap contains the actual key/value pairs along with additional fields for data safety /// The Coremap contains the actual key/value pairs along with additional fields for data safety
@ -112,8 +107,8 @@ where
self.inner.clear() self.inner.clear()
} }
/// Return a non-consuming iterator /// Return a non-consuming iterator
pub fn iter(&self) -> Iter<'_, K, V> { pub fn iter(&self) -> BorrowedIter<'_, K, V> {
self.inner.iter() self.inner.get_iter()
} }
/// Get a reference to the value of a key, if it exists /// Get a reference to the value of a key, if it exists
pub fn get<Q>(&self, key: &Q) -> Option<Ref<'_, K, V>> pub fn get<Q>(&self, key: &Q) -> Option<Ref<'_, K, V>>
@ -125,7 +120,7 @@ where
} }
/// Returns true if the non-existent key was assigned to a value /// Returns true if the non-existent key was assigned to a value
pub fn true_if_insert(&self, k: K, v: V) -> bool { pub fn true_if_insert(&self, k: K, v: V) -> bool {
if let MapEntry::Vacant(ve) = self.inner.entry(k) { if let Entry::Vacant(ve) = self.inner.entry(k) {
ve.insert(v); ve.insert(v);
true true
} else { } else {
@ -152,7 +147,7 @@ where
} }
/// Returns true if the value was updated /// Returns true if the value was updated
pub fn true_if_update(&self, k: K, v: V) -> bool { pub fn true_if_update(&self, k: K, v: V) -> bool {
if let MapEntry::Occupied(mut oe) = self.inner.entry(k) { if let Entry::Occupied(mut oe) = self.inner.entry(k) {
oe.insert(v); oe.insert(v);
true true
} else { } else {
@ -160,14 +155,14 @@ where
} }
} }
pub fn mut_entry(&self, key: K) -> Option<OccupiedEntry<K, V, RandomState>> { pub fn mut_entry(&self, key: K) -> Option<OccupiedEntry<K, V, RandomState>> {
if let MapEntry::Occupied(oe) = self.inner.entry(key) { if let Entry::Occupied(oe) = self.inner.entry(key) {
Some(oe) Some(oe)
} else { } else {
None None
} }
} }
pub fn fresh_entry(&self, key: K) -> Option<VacantEntry<K, V, RandomState>> { pub fn fresh_entry(&self, key: K) -> Option<VacantEntry<K, V, RandomState>> {
if let MapEntry::Vacant(ve) = self.inner.entry(key) { if let Entry::Vacant(ve) = self.inner.entry(key) {
Some(ve) Some(ve)
} else { } else {
None None
@ -189,9 +184,9 @@ impl Coremap<Data, Data> {
impl<K: Eq + Hash, V> IntoIterator for Coremap<K, V> { impl<K: Eq + Hash, V> IntoIterator for Coremap<K, V> {
type Item = (K, V); type Item = (K, V);
type IntoIter = dashmap::iter::OwningIter<K, V>; type IntoIter = OwnedIter<K, V>;
fn into_iter(self) -> Self::IntoIter { fn into_iter(self) -> Self::IntoIter {
self.inner.into_iter() self.inner.get_owned_iter()
} }
} }
@ -229,7 +224,7 @@ where
T: IntoIterator<Item = (K, V)>, T: IntoIterator<Item = (K, V)>,
{ {
Coremap { Coremap {
inner: DashMap::from_iter(iter), inner: Skymap::from_iter(iter),
} }
} }
} }

@ -0,0 +1,280 @@
/*
* Created on Mon Aug 09 2021
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2021, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use super::LowMap;
use crate::util::compiler;
use crate::util::Unwrappable;
use core::hash::BuildHasher;
use core::hash::Hash;
use core::mem;
use core::ops::Deref;
use core::ops::DerefMut;
use parking_lot::RwLockReadGuard;
use parking_lot::RwLockWriteGuard;
use std::collections::hash_map::RandomState;
use std::sync::Arc;
pub struct Ref<'a, K, V> {
_guard: RwLockReadGuard<'a, LowMap<K, V>>,
k: &'a K,
v: &'a V,
}
impl<'a, K, V> Ref<'a, K, V> {
pub const fn new(_guard: RwLockReadGuard<'a, LowMap<K, V>>, k: &'a K, v: &'a V) -> Self {
Self { _guard, k, v }
}
pub const fn key(&self) -> &K {
self.k
}
pub const fn value(&self) -> &V {
self.v
}
pub const fn pair(&self) -> (&K, &V) {
let Self { k, v, .. } = self;
(k, v)
}
}
impl<'a, K, V> Deref for Ref<'a, K, V> {
type Target = V;
fn deref(&self) -> &Self::Target {
self.value()
}
}
unsafe impl<'a, K: Send, V: Send> Send for Ref<'a, K, V> {}
unsafe impl<'a, K: Sync, V: Sync> Sync for Ref<'a, K, V> {}
pub struct RefMut<'a, K, V> {
guard: RwLockWriteGuard<'a, LowMap<K, V>>,
k: &'a K,
v: &'a mut V,
}
impl<'a, K, V> RefMut<'a, K, V> {
pub fn new(guard: RwLockWriteGuard<'a, LowMap<K, V>>, k: &'a K, v: &'a mut V) -> Self {
Self { guard, k, v }
}
pub const fn key(&self) -> &K {
self.k
}
pub const fn value(&self) -> &V {
self.v
}
pub fn value_mut(&mut self) -> &mut V {
self.v
}
pub fn pair(&mut self) -> (&K, &V) {
let Self { k, v, .. } = self;
(k, v)
}
pub fn downgrade_ref(self) -> Ref<'a, K, V> {
Ref::new(RwLockWriteGuard::downgrade(self.guard), self.k, self.v)
}
}
impl<'a, K, V> Deref for RefMut<'a, K, V> {
type Target = V;
fn deref(&self) -> &Self::Target {
self.value()
}
}
impl<'a, K, V> DerefMut for RefMut<'a, K, V> {
fn deref_mut(&mut self) -> &mut V {
self.value_mut()
}
}
unsafe impl<'a, K: Send, V: Send> Send for RefMut<'a, K, V> {}
unsafe impl<'a, K: Sync, V: Sync> Sync for RefMut<'a, K, V> {}
pub struct OccupiedEntry<'a, K, V, S> {
guard: RwLockWriteGuard<'a, LowMap<K, V>>,
elem: (&'a K, &'a mut V),
key: K,
hasher: S,
}
impl<'a, K: Hash + Eq, V, S: BuildHasher> OccupiedEntry<'a, K, V, S> {
pub fn new(
guard: RwLockWriteGuard<'a, LowMap<K, V>>,
key: K,
elem: (&'a K, &'a mut V),
hasher: S,
) -> Self {
Self {
guard,
elem,
key,
hasher,
}
}
pub fn key(&self) -> &K {
self.elem.0
}
pub fn value(&self) -> &V {
self.elem.1
}
pub fn insert(&mut self, other: V) -> V {
mem::replace(self.elem.1, other)
}
pub fn remove(mut self) -> V {
let hash = super::make_hash::<K, K, S>(&self.hasher, &self.key);
unsafe {
self.guard
.remove_entry(hash, super::ceq(self.elem.0))
.unsafe_unwrap()
}
.1
}
}
unsafe impl<'a, K: Send, V: Send, S> Send for OccupiedEntry<'a, K, V, S> {}
unsafe impl<'a, K: Sync, V: Sync, S> Sync for OccupiedEntry<'a, K, V, S> {}
pub struct VacantEntry<'a, K, V, S> {
guard: RwLockWriteGuard<'a, LowMap<K, V>>,
key: K,
hasher: S,
}
impl<'a, K: Hash + Eq, V, S: BuildHasher> VacantEntry<'a, K, V, S> {
pub fn new(guard: RwLockWriteGuard<'a, LowMap<K, V>>, key: K, hasher: S) -> Self {
Self { guard, key, hasher }
}
pub fn insert(mut self, value: V) -> RefMut<'a, K, V> {
unsafe {
let hash = super::make_insert_hash::<K, S>(&self.hasher, &self.key);
let &mut (ref mut k, ref mut v) = self.guard.insert_entry(
hash,
(self.key, value),
super::make_hasher::<K, _, V, S>(&self.hasher),
);
let kptr = compiler::extend_lifetime(k);
let vptr = compiler::extend_lifetime_mut(v);
RefMut::new(self.guard, kptr, vptr)
}
}
pub fn into_key(self) -> K {
self.key
}
pub fn key(&self) -> &K {
&self.key
}
}
pub enum Entry<'a, K, V, S = RandomState> {
Occupied(OccupiedEntry<'a, K, V, S>),
Vacant(VacantEntry<'a, K, V, S>),
}
impl<'a, K, V, S> Entry<'a, K, V, S> {
pub const fn is_occupied(&self) -> bool {
matches!(self, Self::Occupied(_))
}
pub const fn is_vacant(&self) -> bool {
matches!(self, Self::Vacant(_))
}
}
pub struct RefMulti<'a, K, V> {
_g: Arc<RwLockReadGuard<'a, LowMap<K, V>>>,
k: &'a K,
v: &'a V,
}
impl<'a, K, V> RefMulti<'a, K, V> {
pub const fn new(_g: Arc<RwLockReadGuard<'a, LowMap<K, V>>>, k: &'a K, v: &'a V) -> Self {
Self { _g, k, v }
}
pub const fn key(&self) -> &K {
self.k
}
pub const fn value(&self) -> &V {
self.v
}
pub const fn pair(&self) -> (&K, &V) {
let Self { k, v, .. } = self;
(k, v)
}
}
impl<'a, K, V> Deref for RefMulti<'a, K, V> {
type Target = V;
fn deref(&self) -> &Self::Target {
self.value()
}
}
unsafe impl<'a, K: Sync, V: Sync> Sync for RefMulti<'a, K, V> {}
unsafe impl<'a, K: Send, V: Send> Send for RefMulti<'a, K, V> {}
pub struct RefMultiMut<'a, K, V> {
_g: Arc<RwLockWriteGuard<'a, LowMap<K, V>>>,
k: &'a K,
v: &'a mut V,
}
impl<'a, K, V> RefMultiMut<'a, K, V> {
pub fn new(_g: Arc<RwLockWriteGuard<'a, LowMap<K, V>>>, k: &'a K, v: &'a mut V) -> Self {
Self { _g, k, v }
}
pub const fn key(&self) -> &K {
self.k
}
pub const fn value(&self) -> &V {
self.v
}
pub fn value_mut(&mut self) -> &mut V {
self.v
}
pub fn pair(&self) -> (&K, &V) {
let Self { k, v, .. } = self;
(k, v)
}
pub fn pair_mut(&mut self) -> (&K, &mut V) {
let Self { k, v, .. } = self;
(k, v)
}
}
impl<'a, K, V> Deref for RefMultiMut<'a, K, V> {
type Target = V;
fn deref(&self) -> &Self::Target {
self.value()
}
}
impl<'a, K, V> DerefMut for RefMultiMut<'a, K, V> {
fn deref_mut(&mut self) -> &mut V {
self.value_mut()
}
}
unsafe impl<'a, K: Sync, V: Sync> Sync for RefMultiMut<'a, K, V> {}
unsafe impl<'a, K: Send, V: Send> Send for RefMultiMut<'a, K, V> {}

@ -0,0 +1,187 @@
/*
* Created on Tue Aug 10 2021
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2021, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use super::bref::{RefMulti, RefMultiMut};
use super::LowMap;
use super::Skymap;
use core::mem;
use hashbrown::raw::RawIntoIter;
use hashbrown::raw::RawIter;
use parking_lot::RwLockReadGuard;
use parking_lot::RwLockWriteGuard;
use std::collections::hash_map::RandomState;
use std::sync::Arc;
pub struct OwnedIter<K, V, S = RandomState> {
map: Skymap<K, V, S>,
cs: usize,
current: Option<RawIntoIter<(K, V)>>,
}
impl<K, V, S> OwnedIter<K, V, S> {
pub fn new(map: Skymap<K, V, S>) -> Self {
Self {
map,
cs: 0usize,
current: None,
}
}
}
impl<K, V, S> Iterator for OwnedIter<K, V, S> {
type Item = (K, V);
fn next(&mut self) -> Option<Self::Item> {
loop {
if let Some(current) = self.current.as_mut() {
if let Some(bucket) = current.next() {
return Some(bucket);
}
}
if self.cs == self.map.shards().len() {
return None;
}
let mut wshard = unsafe { self.map.get_wshard_unchecked(self.cs) };
// get the next map's iterator
let current_map = mem::replace(&mut *wshard, LowMap::new());
drop(wshard);
let iter = current_map.into_iter();
self.current = Some(iter);
self.cs += 1;
}
}
}
unsafe impl<K: Send, V: Send, S> Send for OwnedIter<K, V, S> {}
unsafe impl<K: Sync, V: Sync, S> Sync for OwnedIter<K, V, S> {}
type BorrowedIterGroup<'a, K, V> = (RawIter<(K, V)>, Arc<RwLockReadGuard<'a, LowMap<K, V>>>);
type BorrowedIterGroupMut<'a, K, V> = (RawIter<(K, V)>, Arc<RwLockWriteGuard<'a, LowMap<K, V>>>);
pub struct BorrowedIter<'a, K, V, S = RandomState> {
map: &'a Skymap<K, V, S>,
cs: usize,
citer: Option<BorrowedIterGroup<'a, K, V>>,
}
impl<'a, K, V, S> BorrowedIter<'a, K, V, S> {
pub const fn new(map: &'a Skymap<K, V, S>) -> Self {
Self {
map,
cs: 0usize,
citer: None,
}
}
}
impl<'a, K, V, S> Iterator for BorrowedIter<'a, K, V, S> {
type Item = RefMulti<'a, K, V>;
fn next(&mut self) -> Option<Self::Item> {
loop {
if let Some(current) = self.citer.as_mut() {
if let Some(bucket) = current.0.next() {
let (kptr, vptr) = unsafe {
// we know that this is valid, and this guarantee is
// provided to us by the lifetime
bucket.as_ref()
};
let guard = current.1.clone();
return Some(RefMulti::new(guard, kptr, vptr));
}
}
if self.cs == self.map.shards().len() {
// end of shards
return None;
}
// warning: the rawiter allows us to terribly violate conditions
// you can mutate!
let rshard = unsafe { self.map.get_rshard_unchecked(self.cs) };
let iter = unsafe {
// same thing: our lt params ensure validity
rshard.iter()
};
self.citer = Some((iter, Arc::new(rshard)));
self.cs += 1;
}
}
}
unsafe impl<'a, K: Send, V: Send, S> Send for BorrowedIter<'a, K, V, S> {}
unsafe impl<'a, K: Sync, V: Sync, S> Sync for BorrowedIter<'a, K, V, S> {}
pub struct BorrowedIterMut<'a, K, V, S> {
map: &'a Skymap<K, V, S>,
cs: usize,
citer: Option<BorrowedIterGroupMut<'a, K, V>>,
}
impl<'a, K, V, S> BorrowedIterMut<'a, K, V, S> {
pub const fn new(map: &'a Skymap<K, V, S>) -> Self {
Self {
map,
cs: 0usize,
citer: None,
}
}
}
impl<'a, K, V, S> Iterator for BorrowedIterMut<'a, K, V, S> {
type Item = RefMultiMut<'a, K, V>;
fn next(&mut self) -> Option<Self::Item> {
loop {
if let Some(current) = self.citer.as_mut() {
if let Some(bucket) = current.0.next() {
let (kptr, vptr) = unsafe {
// the lt guarantees that the map will outlive this
// reference
bucket.as_mut()
};
let guard = Arc::clone(&current.1);
return Some(RefMultiMut::new(guard, kptr, vptr));
}
}
if self.cs == self.map.shards().len() {
// reached end of shards
return None;
}
let wshard = unsafe { self.map.get_wshard_unchecked(self.cs) };
let iter = unsafe { wshard.iter() };
self.citer = Some((iter, Arc::new(wshard)));
self.cs += 1;
}
}
}
#[test]
fn test_iter() {
let map = Skymap::default();
map.insert("hello1", "world");
map.insert("hello2", "world");
map.insert("hello3", "world");
let collected: Vec<(&str, &str)> = map.get_owned_iter().collect();
assert!(collected.len() == 3);
assert!(collected.contains(&("hello1", "world")));
assert!(collected.contains(&("hello2", "world")));
assert!(collected.contains(&("hello3", "world")));
}

@ -0,0 +1,385 @@
/*
* Created on Mon Aug 09 2021
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2021, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
#![allow(clippy::manual_map)] // avoid LLVM bloat
use crate::util::compiler;
use core::borrow::Borrow;
use core::fmt;
use core::hash::BuildHasher;
use core::hash::Hash;
use core::hash::Hasher;
use core::iter::FromIterator;
use core::mem;
use parking_lot::RwLock;
use parking_lot::RwLockReadGuard;
use parking_lot::RwLockWriteGuard;
use std::collections::hash_map::RandomState;
pub mod bref;
use iter::{BorrowedIter, BorrowedIterMut, OwnedIter};
pub mod iter;
use bref::{Entry, OccupiedEntry, Ref, RefMut, VacantEntry};
type LowMap<K, V> = hashbrown::raw::RawTable<(K, V)>;
type ShardSlice<K, V> = [RwLock<LowMap<K, V>>];
type SRlock<'a, K, V> = RwLockReadGuard<'a, hashbrown::raw::RawTable<(K, V)>>;
type SWlock<'a, K, V> = RwLockWriteGuard<'a, hashbrown::raw::RawTable<(K, V)>>;
const BITS_IN_USIZE: usize = mem::size_of::<usize>() * 8;
const DEFAULT_CAP: usize = 128;
fn make_hash<K, Q, S>(hash_builder: &S, val: &Q) -> u64
where
K: Borrow<Q>,
Q: Hash + ?Sized,
S: BuildHasher,
{
let mut state = hash_builder.build_hasher();
val.hash(&mut state);
state.finish()
}
fn make_insert_hash<K, S>(hash_builder: &S, val: &K) -> u64
where
K: Hash,
S: BuildHasher,
{
let mut state = hash_builder.build_hasher();
val.hash(&mut state);
state.finish()
}
fn make_hasher<K, Q, V, S>(hash_builder: &S) -> impl Fn(&(Q, V)) -> u64 + '_
where
K: Borrow<Q>,
Q: Hash,
S: BuildHasher,
{
move |val| make_hash::<K, Q, S>(hash_builder, &val.0)
}
fn ceq<Q, K, V>(k: &Q) -> impl Fn(&(K, V)) -> bool + '_
where
K: Borrow<Q>,
Q: ?Sized + Eq,
{
move |x| k.eq(x.0.borrow())
}
fn get_shard_count() -> usize {
(num_cpus::get() * 4).next_power_of_two()
}
const fn cttz(amount: usize) -> usize {
amount.trailing_zeros() as usize
}
pub struct Skymap<K, V, S = RandomState> {
shards: Box<ShardSlice<K, V>>,
hasher: S,
shift: usize,
}
impl<K, V> Default for Skymap<K, V, RandomState> {
fn default() -> Self {
Self::with_hasher(RandomState::default())
}
}
impl<K: fmt::Debug, V: fmt::Debug, S: BuildHasher + Default> fmt::Debug for Skymap<K, V, S> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut map = f.debug_map();
for s in self.get_iter() {
map.entry(s.key(), s.value());
}
map.finish()
}
}
impl<K, V, S> FromIterator<(K, V)> for Skymap<K, V, S>
where
K: Eq + Hash,
S: BuildHasher + Default + Clone,
{
fn from_iter<T>(iter: T) -> Self
where
T: IntoIterator<Item = (K, V)>,
{
let map = Skymap::new();
iter.into_iter().for_each(|(k, v)| {
let _ = map.insert(k, v);
});
map
}
}
// basic impls
impl<K, V, S> Skymap<K, V, S>
where
S: BuildHasher + Default,
{
pub fn new() -> Self {
Self::with_hasher(S::default())
}
pub fn with_capacity(cap: usize) -> Self {
Self::with_capacity_and_hasher(cap, S::default())
}
pub fn with_capacity_and_hasher(mut cap: usize, hasher: S) -> Self {
let shard_count = get_shard_count();
let shift = BITS_IN_USIZE - cttz(shard_count);
if cap != 0 {
cap = (cap + (shard_count - 1)) & !(shard_count - 1);
}
let cap_per_shard = cap / shard_count;
Self {
shards: (0..shard_count)
.map(|_| RwLock::new(LowMap::with_capacity(cap_per_shard)))
.collect(),
hasher,
shift,
}
}
pub fn with_hasher(hasher: S) -> Self {
Self::with_capacity_and_hasher(DEFAULT_CAP, hasher)
}
pub fn len(&self) -> usize {
self.shards.iter().map(|s| s.read().len()).sum()
}
pub fn capacity(&self) -> usize {
self.shards.iter().map(|s| s.read().capacity()).sum()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn get_iter(&self) -> BorrowedIter<K, V, S> {
BorrowedIter::new(self)
}
pub fn get_iter_mut(&self) -> BorrowedIterMut<K, V, S> {
BorrowedIterMut::new(self)
}
pub fn get_owned_iter(self) -> OwnedIter<K, V, S> {
OwnedIter::new(self)
}
}
// const impls
impl<K, V, S> Skymap<K, V, S> {
const fn shards(&self) -> &ShardSlice<K, V> {
&self.shards
}
const fn determine_shard(&self, hash: usize) -> usize {
(hash << 7) >> self.shift
}
const fn h(&self) -> &S {
&self.hasher
}
}
// insert/get/remove impls
impl<K, V, S> Skymap<K, V, S>
where
K: Eq + Hash,
S: BuildHasher + Clone,
{
pub fn insert(&self, k: K, v: V) -> Option<V> {
let hash = make_insert_hash::<K, S>(&self.hasher, &k);
let idx = self.determine_shard(hash as usize);
unsafe {
// begin critical section
let mut lowtable = self.get_wshard_unchecked(idx);
if let Some((_, item)) = lowtable.get_mut(hash, ceq(&k)) {
Some(mem::replace(item, v))
} else {
lowtable.insert(hash, (k, v), make_hasher::<K, _, V, S>(self.h()));
None
}
// end critical section
}
}
pub fn remove<Q>(&self, k: &Q) -> Option<(K, V)>
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
let hash = make_hash::<K, Q, S>(self.h(), k);
let idx = self.determine_shard(hash as usize);
unsafe {
// begin critical section
let mut lowtable = self.get_wshard_unchecked(idx);
match lowtable.remove_entry(hash, ceq(k)) {
Some(kv) => Some(kv),
None => None,
}
// end critical section
}
}
pub fn remove_if<Q>(&self, k: &Q, f: impl FnOnce(&K, &V) -> bool) -> Option<(K, V)>
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
let hash = make_hash::<K, Q, S>(self.h(), k);
let idx = self.determine_shard(hash as usize);
unsafe {
// begin critical section
let mut lowtable = self.get_wshard_unchecked(idx);
match lowtable.find(hash, ceq(k)) {
Some(bucket) => {
let (kptr, vptr) = bucket.as_ref();
if f(kptr, vptr) {
Some(lowtable.remove(bucket))
} else {
None
}
}
None => None,
}
// end critical section
}
}
}
// lt impls
impl<'a, K: 'a + Hash + Eq, V: 'a, S: BuildHasher + Clone> Skymap<K, V, S> {
pub fn get<Q>(&'a self, k: &Q) -> Option<Ref<'a, K, V>>
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
let hash = make_hash::<K, Q, S>(self.h(), k);
let idx = self.determine_shard(hash as usize);
unsafe {
// begin critical section
let lowtable = self.get_rshard_unchecked(idx);
match lowtable.get(hash, ceq(k)) {
Some((ref kptr, ref vptr)) => {
let kptr = compiler::extend_lifetime(kptr);
let vptr = compiler::extend_lifetime(vptr);
Some(Ref::new(lowtable, kptr, vptr))
}
None => None,
}
// end critical section
}
}
pub fn get_mut<Q>(&'a self, k: &Q) -> Option<RefMut<'a, K, V>>
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
let hash = make_hash::<K, Q, S>(self.h(), k);
let idx = self.determine_shard(hash as usize);
unsafe {
// begin critical section
let mut lowtable = self.get_wshard_unchecked(idx);
match lowtable.get_mut(hash, ceq(k)) {
Some(&mut (ref kptr, ref mut vptr)) => {
let kptr = compiler::extend_lifetime(kptr);
let vptr = compiler::extend_lifetime_mut(vptr);
Some(RefMut::new(lowtable, kptr, vptr))
}
None => None,
}
// end critical section
}
}
pub fn entry(&'a self, key: K) -> Entry<'a, K, V, S> {
let hash = make_insert_hash::<K, S>(self.h(), &key);
let idx = self.determine_shard(hash as usize);
unsafe {
// begin critical section
let lowtable = self.get_wshard_unchecked(idx);
if let Some(elem) = lowtable.find(hash, ceq(&key)) {
let (kptr, vptr) = elem.as_mut();
let kptr = compiler::extend_lifetime(kptr);
let vptr = compiler::extend_lifetime_mut(vptr);
Entry::Occupied(OccupiedEntry::new(
lowtable,
key,
(kptr, vptr),
self.hasher.clone(),
))
} else {
Entry::Vacant(VacantEntry::new(lowtable, key, self.hasher.clone()))
}
// end critical section
}
}
pub fn contains_key<Q>(&self, key: &Q) -> bool
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
self.get(key).is_some()
}
pub fn clear(&self) {
self.shards().iter().for_each(|shard| shard.write().clear())
}
}
// inner impls
impl<'a, K: 'a, V: 'a, S> Skymap<K, V, S> {
unsafe fn get_rshard_unchecked(&'a self, shard: usize) -> SRlock<'a, K, V> {
self.shards.get_unchecked(shard).read()
}
unsafe fn get_wshard_unchecked(&'a self, shard: usize) -> SWlock<'a, K, V> {
self.shards.get_unchecked(shard).write()
}
}
#[test]
fn test_insert_remove() {
let map = Skymap::default();
map.insert("hello", "world");
assert_eq!(map.remove("hello").unwrap().1, "world");
}
#[test]
fn test_remove_if() {
let map = Skymap::default();
map.insert("hello", "world");
assert!(map
.remove_if("hello", |_k, v| { (*v).eq("notworld") })
.is_none());
}
#[test]
fn test_insert_get() {
let map = Skymap::default();
map.insert("sayan", "likes computational dark arts");
let _ref = map.get("sayan").unwrap();
assert_eq!(*_ref, "likes computational dark arts")
}
#[test]
fn test_entry() {
let map = Skymap::default();
map.insert("hello", "world");
assert!(map.entry("hello").is_occupied());
assert!(map.entry("world").is_vacant());
}

@ -247,11 +247,11 @@ impl Memstore {
let removed_keyspace = self.keyspaces.mut_entry(ksid); let removed_keyspace = self.keyspaces.mut_entry(ksid);
match removed_keyspace { match removed_keyspace {
Some(ks) => { Some(ks) => {
let no_one_is_using_keyspace = Arc::strong_count(ks.get()) == 1; let no_one_is_using_keyspace = Arc::strong_count(ks.value()) == 1;
let no_tables_are_in_keyspace = ks.get().table_count() == 0; let no_tables_are_in_keyspace = ks.value().table_count() == 0;
if no_one_is_using_keyspace && no_tables_are_in_keyspace { if no_one_is_using_keyspace && no_tables_are_in_keyspace {
// we are free to drop this // we are free to drop this
ks.remove_entry(); ks.remove();
// trip the preload switch // trip the preload switch
registry::get_preload_tripswitch().trip(); registry::get_preload_tripswitch().trip();
Ok(()) Ok(())
@ -294,14 +294,14 @@ impl Memstore {
// of having not empty keyspaces. The invariant that `drop keyspace force` // of having not empty keyspaces. The invariant that `drop keyspace force`
// maintains is that the keyspace or any of its objects are never // maintains is that the keyspace or any of its objects are never
// referenced to -- only then it is safe to delete the keyspace // referenced to -- only then it is safe to delete the keyspace
let no_tables_in_use = Arc::strong_count(keyspace.get()) == 1 let no_tables_in_use = Arc::strong_count(keyspace.value()) == 1
&& keyspace && keyspace
.get() .value()
.tables .tables
.iter() .iter()
.all(|table| Arc::strong_count(table.value()) == 1); .all(|table| Arc::strong_count(table.value()) == 1);
if no_tables_in_use { if no_tables_in_use {
keyspace.remove_entry(); keyspace.remove();
// trip the preload switch // trip the preload switch
registry::get_preload_tripswitch().trip(); registry::get_preload_tripswitch().trip();
Ok(()) Ok(())

@ -52,6 +52,7 @@ pub mod htable;
pub mod iarray; pub mod iarray;
pub mod lazy; pub mod lazy;
pub mod lock; pub mod lock;
pub mod map;
pub mod memstore; pub mod memstore;
pub mod table; pub mod table;
#[cfg(test)] #[cfg(test)]

@ -26,9 +26,7 @@
use crate::corestore::htable::Coremap; use crate::corestore::htable::Coremap;
use crate::corestore::htable::Data; use crate::corestore::htable::Data;
use crate::corestore::htable::MapRWLGuard; use crate::corestore::map::bref::Ref;
use crate::corestore::htable::MapSingleReference;
use crate::corestore::htable::SharedValue;
use core::borrow::Borrow; use core::borrow::Borrow;
use core::hash::Hash; use core::hash::Hash;
use core::sync::atomic::AtomicBool; use core::sync::atomic::AtomicBool;
@ -36,40 +34,6 @@ use core::sync::atomic::Ordering;
pub mod encoding; pub mod encoding;
const ORD_RELAXED: Ordering = Ordering::Relaxed; const ORD_RELAXED: Ordering = Ordering::Relaxed;
/// A shard lock
///
/// Our jagged or sharded or striped in-memory table is made of multiple in-memory shards
/// and we need a convenient interface to lock down the records. This is exactly why this
/// structure exists: it locks down the table making it resistant to any possible write
/// operation which might give us trouble in some cases
///
pub struct ShardLock<'a> {
/// A reference to the table (just for lifetime convenience)
_tableref: &'a Coremap<Data, Data>,
/// the shard locks
shard_locks: Vec<MapRWLGuard<'a, std::collections::HashMap<Data, SharedValue<Data>>>>,
}
impl<'a> ShardLock<'a> {
/// Initialize a shard lock from a provided table: DARN, **this is blocking** because
/// it will wait for every writer in every stripe to exit before returning. So, know
/// what you're doing beforehand!
pub fn init(_tableref: &'a Coremap<Data, Data>) -> Self {
let shard_locks = _tableref
.inner
.shards()
.iter()
.map(|lck| lck.read())
.collect();
// no lifetime issues here :)
Self {
_tableref,
shard_locks,
}
}
}
/// An arbitrary unicode/binary _double encoder_ for two byte slice inputs /// An arbitrary unicode/binary _double encoder_ for two byte slice inputs
pub struct DoubleEncoder { pub struct DoubleEncoder {
fn_ptr: fn(&[u8], &[u8]) -> bool, fn_ptr: fn(&[u8], &[u8]) -> bool,
@ -108,12 +72,6 @@ pub struct KVEngine {
encoded_v: AtomicBool, encoded_v: AtomicBool,
} }
/// Errors arising from trying to modify the definition of tables
pub enum DdlError {
/// The table is not empty
TableNotEmpty,
}
impl Default for KVEngine { impl Default for KVEngine {
fn default() -> Self { fn default() -> Self {
// by default, we don't care about the encoding scheme unless explicitly // by default, we don't care about the encoding scheme unless explicitly
@ -214,39 +172,6 @@ impl KVEngine {
pub fn __get_inner_ref(&self) -> &Coremap<Data, Data> { pub fn __get_inner_ref(&self) -> &Coremap<Data, Data> {
&self.table &self.table
} }
/// Alter the table and set the key encoding switch
///
/// Note: this will need an empty table
pub fn alter_table_key(&self, encoded_k: bool) -> Result<(), DdlError> {
let _shardlock = ShardLock::init(&self.table);
// we can now be sure random records are not being tossed around
if self.table.len() != 0 {
Err(DdlError::TableNotEmpty)
} else {
// the table is empty, roger the alter
// relaxed memory ordering is fine because we have locked the table
// for this specific alteration
self.encoded_k.store(encoded_k, ORD_RELAXED);
Ok(())
}
}
/// Alter the table and set the value encoding switch
///
/// Note: this will need an empty table
// TODO(@ohsayan): Figure out how exactly we will handle this at the keyspace level
pub fn alter_table_value(&self, encoded_v: bool) -> Result<(), DdlError> {
let _shardlock = ShardLock::init(&self.table);
// we can now be sure random records are not being tossed around
if self.table.len() != 0 {
Err(DdlError::TableNotEmpty)
} else {
// the table is empty, roger the alter
// relaxed memory ordering is fine because we have locked the table
// for this specific alteration
self.encoded_v.store(encoded_v, ORD_RELAXED);
Ok(())
}
}
/// Return an owned value of the key. In most cases, the reference count is just incremented /// Return an owned value of the key. In most cases, the reference count is just incremented
/// unless the data itself is mutated in place /// unless the data itself is mutated in place
pub fn take_snapshot<Q>(&self, key: &Q) -> Option<Data> pub fn take_snapshot<Q>(&self, key: &Q) -> Option<Data>
@ -261,7 +186,7 @@ impl KVEngine {
self.table.clear() self.table.clear()
} }
/// Get the value for a given key if it exists /// Get the value for a given key if it exists
pub fn get(&self, key: impl Into<Data>) -> Result<Option<MapSingleReference<Data, Data>>, ()> { pub fn get(&self, key: impl Into<Data>) -> Result<Option<Ref<Data, Data>>, ()> {
Ok(self.table.get(&self._encode_key(key.into())?)) Ok(self.table.get(&self._encode_key(key.into())?))
} }
pub fn exists<Q>(&self, key: Q) -> Result<bool, ()> pub fn exists<Q>(&self, key: Q) -> Result<bool, ()>

@ -175,7 +175,10 @@ macro_rules! action {
} }
pub mod compiler { pub mod compiler {
//! BP hints for added optim //! Dark compiler arts and hackery to defy the normal. Use at your own
//! risk
use core::mem;
#[cold] #[cold]
#[inline(never)] #[inline(never)]
@ -200,6 +203,13 @@ pub mod compiler {
pub fn cold_err<T>(v: T) -> T { pub fn cold_err<T>(v: T) -> T {
v v
} }
pub unsafe fn extend_lifetime<'a, 'b, T>(inp: &'a T) -> &'b T {
mem::transmute(inp)
}
pub unsafe fn extend_lifetime_mut<'a, 'b, T>(inp: &'a mut T) -> &'b mut T {
mem::transmute(inp)
}
} }
#[macro_export] #[macro_export]

Loading…
Cancel
Save