Add listmap se/de support in storage engine

next
Sayan Nandan 3 years ago
parent 0ee5f69d50
commit 4e68c4bd49

@ -28,6 +28,7 @@ use crate::corestore::htable::Coremap;
use crate::corestore::memstore::DdlError;
use crate::corestore::Data;
use crate::corestore::KeyspaceResult;
use crate::kvengine::listmap::LockedVec;
use crate::kvengine::KVTable;
use crate::kvengine::{listmap::KVEListMap, KVEngine};
use crate::storage::bytemarks;
@ -104,7 +105,7 @@ impl Table {
self.volatile
}
/// Create a new KVE Table with the provided settings
pub fn new_kve_with_data(
pub fn new_pure_kve_with_data(
data: Coremap<Data, Data>,
volatile: bool,
k_enc: bool,
@ -115,6 +116,21 @@ impl Table {
model_store: DataModel::KV(KVEngine::init_with_data(k_enc, v_enc, data)),
}
}
pub fn new_kve_listmap_with_data(
data: Coremap<Data, LockedVec>,
volatile: bool,
k_enc: bool,
payload_enc: bool,
) -> Self {
Self {
volatile,
model_store: DataModel::KVExtListmap(KVEListMap::init_with_data(
k_enc,
payload_enc,
data,
)),
}
}
pub fn new_kve_with_encoding(volatile: bool, k_enc: bool, v_enc: bool) -> Self {
Self {
volatile,
@ -133,14 +149,14 @@ impl Table {
}
/// Create a new kve with default settings but with provided volatile configuration
pub fn new_kve_with_volatile(volatile: bool) -> Self {
Self::new_kve_with_data(Coremap::new(), volatile, false, false)
Self::new_pure_kve_with_data(Coremap::new(), volatile, false, false)
}
/// Returns the default kve:
/// - `k_enc`: `false`
/// - `v_enc`: `false`
/// - `volatile`: `false`
pub fn new_default_kve() -> Self {
Self::new_kve_with_data(Coremap::new(), false, false, false)
Self::new_pure_kve_with_data(Coremap::new(), false, false, false)
}
/// Returns the model code. See [`bytemarks`] for more info
pub fn get_model_code(&self) -> u8 {

@ -33,20 +33,29 @@ use crate::corestore::Data;
use crate::resp::{TSYMBOL_BINARY, TSYMBOL_UNICODE};
use parking_lot::RwLock;
pub type LockedVec = RwLock<Vec<Data>>;
#[derive(Debug)]
pub struct KVEListMap {
encoded_id: bool,
encoded_payload_element: bool,
base: Coremap<Data, RwLock<Vec<Data>>>,
base: Coremap<Data, LockedVec>,
}
impl KVEListMap {
/// Create a new KVEListMap. `Encoded ID == encoded key` and `encoded payload == encoded elements`
pub fn new(encoded_id: bool, encoded_payload_element: bool) -> Self {
Self::init_with_data(encoded_id, encoded_payload_element, Coremap::new())
}
pub fn init_with_data(
encoded_id: bool,
encoded_payload_element: bool,
base: Coremap<Data, LockedVec>,
) -> Self {
Self {
encoded_id,
encoded_payload_element,
base: Coremap::new(),
base,
}
}
/// Get an encoder instance for the payload elements
@ -77,7 +86,7 @@ impl KVEListMap {
if (self.encode_key(&listname)) {
None
} else {
Some(self.base.true_if_insert(listname, RwLock::new(Vec::new())))
Some(self.base.true_if_insert(listname, LockedVec::default()))
}
}
}

@ -47,7 +47,7 @@ pub const BYTEMARK_MODEL_KV_BINSTR_LIST_STR: u8 = 5;
/// KVE model bytemark with key:str, val: list<binstr>
pub const BYTEMARK_MODEL_KV_STR_LIST_BINSTR: u8 = 6;
/// KVE model bytemark with key:str, val: list<str>
pub const BYTEMARK_MODEL_KV_STR_LIST_STR: u8 = 6;
pub const BYTEMARK_MODEL_KV_STR_LIST_STR: u8 = 7;
// storage bym
/// Persistent storage bytemark

@ -33,6 +33,7 @@ use super::interface;
use crate::corestore::memstore::Keyspace;
use crate::corestore::memstore::Memstore;
use crate::corestore::memstore::ObjectID;
use crate::kvengine::KVTable;
use crate::registry;
use crate::IoResult;
@ -124,14 +125,14 @@ pub mod oneshot {
// fine, this needs to be flushed
let mut file = File::create(&$path)?;
match $table.get_model_ref() {
DataModel::KV(kve) => super::interface::serialize_map_into_slow_buffer(
DataModel::KV(kve) => super::interface::serialize_into_slow_buffer(
&mut file,
kve.__get_inner_ref(),
kve.kve_inner_ref(),
)?,
DataModel::KVExtListmap(kve) => super::interface::serialize_into_slow_buffer(
&mut file,
kve.kve_inner_ref(),
)?,
_ => {
// TODO(@ohsayan): Implement this
unimplemented!("Listmap se/de has not been implemented")
}
}
file.sync_all()?;
fs::rename(&$path, &$path[..$path.len() - 1])

@ -30,6 +30,7 @@ use crate::corestore::htable::Coremap;
use crate::corestore::htable::Data;
use crate::corestore::memstore::Keyspace;
use crate::corestore::memstore::Memstore;
use crate::kvengine::listmap::LockedVec;
use crate::registry;
use crate::IoResult;
use std::collections::HashSet;
@ -42,6 +43,22 @@ pub const DIR_RSNAPROOT: &str = "data/rsnap";
pub const DIR_BACKUPS: &str = "data/backups";
pub const DIR_ROOT: &str = "data";
pub trait DiskWritable {
fn write_self<W: Write>(&self, writer: &mut W) -> IoResult<()>;
}
impl<'a> DiskWritable for &'a Coremap<Data, Data> {
fn write_self<W: Write>(&self, writer: &mut W) -> IoResult<()> {
super::se::raw_serialize_map(self, writer)
}
}
impl<'a> DiskWritable for &'a Coremap<Data, LockedVec> {
fn write_self<W: Write>(&self, writer: &mut W) -> IoResult<()> {
super::se::raw_serialize_list_map(self, writer)
}
}
/// This creates the root directory structure:
/// ```
/// data/
@ -124,12 +141,12 @@ pub fn cleanup_tree(memroot: &Memstore) -> IoResult<()> {
/// Uses a buffered writer under the hood to improve write performance as the provided
/// writable interface might be very slow. The buffer does flush once done, however, it
/// is important that you fsync yourself!
pub fn serialize_map_into_slow_buffer<T: Write>(
pub fn serialize_into_slow_buffer<T: Write, U: DiskWritable>(
buffer: &mut T,
map: &Coremap<Data, Data>,
writable_item: U,
) -> IoResult<()> {
let mut buffer = BufWriter::new(buffer);
super::se::raw_serialize_map(map, &mut buffer)?;
writable_item.write_self(&mut buffer)?;
buffer.flush()?;
Ok(())
}

@ -126,3 +126,11 @@ macro_rules! read_dir_to_col {
.collect()
};
}
#[cfg(test)]
macro_rules! lvec {
($($item:expr),+ $(,)?) => {{
let v = std::vec![$($item.into()),*];
parking_lot::RwLock::new(v)
}};
}

@ -183,6 +183,7 @@ unsafe fn raw_byte_repr<'a, T: 'a>(len: &'a T) -> &'a [u8] {
mod se {
use super::*;
use crate::corestore::memstore::Keyspace;
use crate::kvengine::listmap::LockedVec;
use crate::IoResult;
macro_rules! unsafe_sz_byte_repr {
@ -260,14 +261,9 @@ mod se {
}
Ok(())
}
pub fn raw_serialize_list_map<'a, W, T: 'a, U: 'a>(
w: &mut W,
data: &Coremap<Data, T>,
) -> IoResult<()>
pub fn raw_serialize_list_map<W>(data: &Coremap<Data, LockedVec>, w: &mut W) -> IoResult<()>
where
W: Write,
T: AsRef<[U]>,
U: AsRef<[u8]>,
{
/*
[8B: Extent]([8B: Key extent][?B: Key][8B: Max index][?B: Payload])*
@ -280,13 +276,14 @@ mod se {
// key
let k = key.key();
// list payload
let v = key.value().as_ref();
let vread = key.value().read();
let v: &Vec<Data> = &vread;
// write the key extent
w.write_all(unsafe_sz_byte_repr!(k.len()))?;
// write the key
w.write_all(k)?;
// write the list payload
self::raw_serialize_nested_list(w, v)?;
self::raw_serialize_nested_list(w, &v)?;
}
}
Ok(())
@ -322,7 +319,9 @@ mod se {
mod de {
use super::iter::{RawSliceIter, RawSliceIterBorrowed};
use super::{Array, Coremap, Data, Hash, HashSet};
use crate::kvengine::listmap::LockedVec;
use core::ptr;
use parking_lot::RwLock;
use std::collections::HashMap;
pub trait DeserializeFrom {
@ -330,6 +329,26 @@ mod de {
fn from_slice(slice: &[u8]) -> Self;
}
pub trait DeserializeInto: Sized {
fn from_slice(slice: &[u8]) -> Option<Self>;
}
impl DeserializeInto for Coremap<Data, Data> {
fn from_slice(slice: &[u8]) -> Option<Self> {
self::deserialize_map(slice)
}
}
impl DeserializeInto for Coremap<Data, LockedVec> {
fn from_slice(slice: &[u8]) -> Option<Self> {
self::deserialize_list_map(slice)
}
}
pub fn deserialize_into<T: DeserializeInto>(input: &[u8]) -> Option<T> {
T::from_slice(input)
}
impl<const N: usize> DeserializeFrom for Array<u8, N> {
fn is_expected_len(clen: usize) -> bool {
clen <= N
@ -401,7 +420,7 @@ mod de {
}
/// Deserialize a file that contains a serialized map. This also returns the model code
pub fn deserialize_map(data: &[u8]) -> Option<Coremap<Data, Data>> {
let mut rawiter = RawSliceIter::new(&data);
let mut rawiter = RawSliceIter::new(data);
let len = rawiter.next_64bit_integer_to_usize()?;
let hm = Coremap::with_capacity(len);
for _ in 0..len {
@ -419,7 +438,7 @@ mod de {
}
}
pub fn deserialize_list_map(bytes: &[u8]) -> Option<Coremap<Data, Vec<Data>>> {
pub fn deserialize_list_map(bytes: &[u8]) -> Option<Coremap<Data, LockedVec>> {
let mut rawiter = RawSliceIter::new(bytes);
// get the len
let len = rawiter.next_64bit_integer_to_usize()?;
@ -433,7 +452,7 @@ mod de {
let borrowed_iter = rawiter.get_borrowed_iter();
let list = self::deserialize_nested_list(borrowed_iter)?;
// push it in
map.true_if_insert(key, list);
map.true_if_insert(key, RwLock::new(list));
}
if rawiter.end_of_allocation() {
Some(map)
@ -445,7 +464,7 @@ mod de {
/// Deserialize a nested list: `[EXTENT]([EL_EXT][EL])*`
///
pub fn deserialize_nested_list<'a>(mut iter: RawSliceIterBorrowed<'a>) -> Option<Vec<Data>> {
pub fn deserialize_nested_list(mut iter: RawSliceIterBorrowed<'_>) -> Option<Vec<Data>> {
// get list payload len
let list_payload_extent = iter.next_64bit_integer_to_usize()?;
let mut list = Vec::with_capacity(list_payload_extent);

@ -307,6 +307,9 @@ mod list_tests {
use super::iter::RawSliceIter;
use super::{de, se};
use crate::corestore::{htable::Coremap, Data};
use crate::kvengine::listmap::LockedVec;
use core::ops::Deref;
use parking_lot::RwLock;
#[test]
fn test_list_se_de() {
let mylist = vec![Data::from("a"), Data::from("b"), Data::from("c")];
@ -342,44 +345,60 @@ mod list_tests {
#[test]
fn test_list_map_monoelement_se_de() {
let mymap = Coremap::new();
let vals = vec!["apples", "bananas", "carrots"];
mymap.true_if_insert(Data::from("mykey"), vals.clone());
let vals = lvec!["apples", "bananas", "carrots"];
mymap.true_if_insert(Data::from("mykey"), RwLock::new(vals.read().clone()));
let mut v = Vec::new();
se::raw_serialize_list_map(&mut v, &mymap).unwrap();
se::raw_serialize_list_map(&mymap, &mut v).unwrap();
let de = de::deserialize_list_map(&v).unwrap();
assert_eq!(de.len(), 1);
let mykey_value = de
.get("mykey".as_bytes())
.unwrap()
.value()
.deref()
.read()
.clone();
assert_eq!(
de.get("mykey".as_bytes()).unwrap().value().clone(),
vals.into_iter().map(Data::from).collect::<Vec<Data>>()
mykey_value,
vals.into_inner()
.into_iter()
.map(Data::from)
.collect::<Vec<Data>>()
);
}
#[test]
fn test_list_map_se_de() {
let mymap = Coremap::new();
let mymap: Coremap<Data, LockedVec> = Coremap::new();
let key1: Data = "mykey1".into();
let val1 = vec!["apples", "bananas", "carrots"];
let val1 = lvec!["apples", "bananas", "carrots"];
let key2: Data = "mykey2long".into();
let val2 = vec!["code", "coffee", "cats"];
mymap.true_if_insert(key1.clone(), val1.clone());
mymap.true_if_insert(key2.clone(), val2.clone());
let val2 = lvec!["code", "coffee", "cats"];
mymap.true_if_insert(key1.clone(), RwLock::new(val1.read().clone()));
mymap.true_if_insert(key2.clone(), RwLock::new(val2.read().clone()));
let mut v = Vec::new();
se::raw_serialize_list_map(&mut v, &mymap).unwrap();
se::raw_serialize_list_map(&mymap, &mut v).unwrap();
let de = de::deserialize_list_map(&v).unwrap();
assert_eq!(de.len(), 2);
assert_eq!(
de.get(&key1).unwrap().value().clone(),
val1.into_iter().map(Data::from).collect::<Vec<Data>>()
de.get(&key1).unwrap().value().deref().read().clone(),
val1.into_inner()
.into_iter()
.map(Data::from)
.collect::<Vec<Data>>()
);
assert_eq!(
de.get(&key2).unwrap().value().clone(),
val2.into_iter().map(Data::from).collect::<Vec<Data>>()
de.get(&key1).unwrap().value().deref().read().clone(),
val2.into_inner()
.into_iter()
.map(Data::from)
.collect::<Vec<Data>>()
);
}
#[test]
fn test_list_map_empty_se_de() {
let mymap: Coremap<Data, Vec<Data>> = Coremap::new();
let mymap: Coremap<Data, LockedVec> = Coremap::new();
let mut v = Vec::new();
se::raw_serialize_list_map(&mut v, &mymap).unwrap();
se::raw_serialize_list_map(&mymap, &mut v).unwrap();
let de = de::deserialize_list_map(&v).unwrap();
assert_eq!(de.len(), 0)
}
@ -388,6 +407,7 @@ mod list_tests {
mod corruption_tests {
use crate::corestore::htable::Coremap;
use crate::corestore::Data;
use crate::kvengine::listmap::LockedVec;
#[test]
fn test_corruption_map_basic() {
let mymap = Coremap::new();
@ -416,24 +436,24 @@ mod corruption_tests {
}
#[test]
fn test_listmap_corruption_basic() {
let mymap: Coremap<Data, Vec<Data>> = Coremap::new();
mymap.upsert("hello".into(), Vec::from(["hello-1".into()]));
let mymap: Coremap<Data, LockedVec> = Coremap::new();
mymap.upsert("hello".into(), lvec!("hello-1"));
// current repr: [1u64][5u64]["hello"][1u64][7u64]["hello-1"]
// sanity test
let mut v = Vec::new();
super::se::raw_serialize_list_map(&mut v, &mymap).unwrap();
super::se::raw_serialize_list_map(&mymap, &mut v).unwrap();
assert!(super::de::deserialize_list_map(&v).is_some());
// now chop "hello-1"
assert!(super::de::deserialize_list_map(&v[..v.len() - 7]).is_none());
}
#[test]
fn test_listmap_corruption_midway() {
let mymap: Coremap<Data, Vec<Data>> = Coremap::new();
mymap.upsert("hello".into(), Vec::from(["hello-1".into()]));
let mymap: Coremap<Data, LockedVec> = Coremap::new();
mymap.upsert("hello".into(), lvec!("hello-1"));
// current repr: [1u64][5u64]["hello"][1u64][7u64]["hello-1"]
// sanity test
let mut v = Vec::new();
super::se::raw_serialize_list_map(&mut v, &mymap).unwrap();
super::se::raw_serialize_list_map(&mymap, &mut v).unwrap();
assert!(super::de::deserialize_list_map(&v).is_some());
assert_eq!(v.len(), 44);
// now chop "7u64" (8+8+5+8+8+7)

@ -57,26 +57,48 @@ pub fn read_table(
model_code: u8,
) -> IoResult<Table> {
let filepath = unsafe { concat_path!(DIR_KSROOT, ksid.as_str(), tblid.as_str()) };
let data = if volatile {
// no need to read anything; table is volatile and has no file
Coremap::new()
} else {
// not volatile, so read this in
let f = fs::read(filepath)?;
super::de::deserialize_map(&f).ok_or_else(|| bad_data!())?
};
macro_rules! decode {
() => {{
let data = if volatile {
Coremap::new()
} else {
// not volatile, so read this in
let f = fs::read(filepath)?;
super::de::deserialize_into(&f).ok_or_else(|| bad_data!())?
};
data
}};
}
let tbl = match model_code {
bytemarks::BYTEMARK_MODEL_KV_BIN_BIN => {
Table::new_kve_with_data(data, volatile, false, false)
}
bytemarks::BYTEMARK_MODEL_KV_BIN_STR => {
Table::new_kve_with_data(data, volatile, false, true)
}
bytemarks::BYTEMARK_MODEL_KV_STR_STR => {
Table::new_kve_with_data(data, volatile, true, true)
0 | 1 | 2 | 3 => {
let data = decode!();
macro_rules! pkve {
($kenc:literal, $venc:literal) => {
Table::new_pure_kve_with_data(data, volatile, $kenc, $venc)
};
}
match model_code {
bytemarks::BYTEMARK_MODEL_KV_BIN_BIN => pkve!(false, false),
bytemarks::BYTEMARK_MODEL_KV_BIN_STR => pkve!(false, true),
bytemarks::BYTEMARK_MODEL_KV_STR_STR => pkve!(true, true),
bytemarks::BYTEMARK_MODEL_KV_STR_BIN => pkve!(true, false),
_ => unsafe { impossible!() },
}
}
bytemarks::BYTEMARK_MODEL_KV_STR_BIN => {
Table::new_kve_with_data(data, volatile, true, false)
4 | 5 | 6 | 7 => {
let data = decode!();
macro_rules! listmap {
($kenc:literal, $penc:literal) => {
Table::new_kve_listmap_with_data(data, volatile, $kenc, $penc)
};
}
match model_code {
bytemarks::BYTEMARK_MODEL_KV_BINSTR_LIST_BINSTR => listmap!(false, false),
bytemarks::BYTEMARK_MODEL_KV_BINSTR_LIST_STR => listmap!(false, true),
bytemarks::BYTEMARK_MODEL_KV_STR_LIST_BINSTR => listmap!(true, false),
bytemarks::BYTEMARK_MODEL_KV_STR_LIST_STR => listmap!(true, true),
_ => unsafe { impossible!() },
}
}
_ => return Err(IoError::from(ErrorKind::Unsupported)),
};

Loading…
Cancel
Save