Fix preload generation and add preload decoding

next
Sayan Nandan 3 years ago
parent 87d79650ce
commit e89417cbc6

@ -65,8 +65,6 @@ const PAIR_MAP_LUT: [u8; 200] = [
/// A 32-bit integer buffer with one extra byte /// A 32-bit integer buffer with one extra byte
pub type Integer32Buffer = Integer32BufferRaw<11>; pub type Integer32Buffer = Integer32BufferRaw<11>;
/// A 32-bit integer buffer with **no extra byte**
pub type Integer32 = Integer32BufferRaw<10>;
#[derive(Debug)] #[derive(Debug)]
/// A buffer for unsigned 32-bit integers with one _extra byte_ of memory reserved for /// A buffer for unsigned 32-bit integers with one _extra byte_ of memory reserved for
@ -200,8 +198,6 @@ fn test_int32_buffer_push() {
assert_eq!(buffer, "278?"); assert_eq!(buffer, "278?");
} }
/// A 64-bit integer buffer with one extra byte
pub type Integer64Buffer = Integer64BufferRaw<21>;
/// A 64-bit integer buffer with **no extra byte** /// A 64-bit integer buffer with **no extra byte**
pub type Integer64 = Integer64BufferRaw<20>; pub type Integer64 = Integer64BufferRaw<20>;
@ -402,10 +398,7 @@ where
fn test_int64_buffer() { fn test_int64_buffer() {
assert_eq!( assert_eq!(
9348910481349849081_u64.to_string(), 9348910481349849081_u64.to_string(),
Integer64Buffer::init(9348910481349849081_u64).as_ref() Integer64::init(9348910481349849081_u64).as_ref()
);
assert_eq!(
u64::MAX.to_string(),
Integer64Buffer::init(u64::MAX).as_ref()
); );
assert_eq!(u64::MAX.to_string(), Integer64::init(u64::MAX).as_ref());
} }

@ -83,7 +83,7 @@ fn test_def_macro_sanity() {
/// typedef for the keyspace/table IDs. We don't need too much fancy here, /// typedef for the keyspace/table IDs. We don't need too much fancy here,
/// no atomic pointers and all. Just a nice array. With amazing gurantees /// no atomic pointers and all. Just a nice array. With amazing gurantees
type ObjectID = Array<u8, 64>; pub type ObjectID = Array<u8, 64>;
mod cluster { mod cluster {
/// This is for the future where every node will be allocated a shard /// This is for the future where every node will be allocated a shard

@ -24,20 +24,6 @@
* *
*/ */
#[cfg(target_endian = "big")]
macro_rules! endian_mark {
() => {
1_u64
};
}
#[cfg(target_endian = "little")]
macro_rules! endian_mark {
() => {
0_u64
};
}
macro_rules! little_endian { macro_rules! little_endian {
($block:block) => { ($block:block) => {
#[cfg(target_endian = "little")] #[cfg(target_endian = "little")]

@ -50,11 +50,14 @@
//! think of using them anywhere outside. This is a specialized parser built for the database. //! think of using them anywhere outside. This is a specialized parser built for the database.
//! -- Sayan (July 2021) //! -- Sayan (July 2021)
use crate::coredb::array::Array;
use crate::coredb::htable::Coremap; use crate::coredb::htable::Coremap;
use crate::coredb::Data; use crate::coredb::Data;
use core::hash::Hash;
use core::mem; use core::mem;
use core::ptr; use core::ptr;
use core::slice; use core::slice;
use std::collections::HashSet;
use std::io::Write; use std::io::Write;
// for some astronomical reasons do not mess with this // for some astronomical reasons do not mess with this
#[macro_use] #[macro_use]
@ -207,8 +210,90 @@ pub fn raw_serialize_map<W: Write>(map: &Coremap<Data, Data>, w: &mut W) -> std:
Ok(()) Ok(())
} }
/// Serialize a set and write it to a provided buffer
pub fn raw_serialize_set<W, K, V>(map: &Coremap<K, V>, w: &mut W) -> std::io::Result<()>
where
W: Write,
K: Eq + Hash + AsRef<[u8]>,
{
unsafe {
w.write_all(raw_byte_repr(&to_64bit_little_endian!(map.len())))?;
// now the keys and values
for kv in map.iter() {
let key = kv.key().as_ref();
w.write_all(raw_byte_repr(&to_64bit_little_endian!(key.len())))?;
w.write_all(key)?;
}
}
Ok(())
}
pub trait DeserializeFrom {
fn is_expected_len(current_len: usize) -> bool;
fn from_slice(slice: &[u8]) -> Self;
}
impl<const N: usize> DeserializeFrom for Array<u8, N> {
fn is_expected_len(clen: usize) -> bool {
clen <= N
}
fn from_slice(slice: &[u8]) -> Self {
unsafe { Self::from_slice(slice) }
}
}
/// Deserialize a set to a custom type
pub fn deserialize_set_ctype<T>(data: &[u8]) -> Option<HashSet<T>>
where
T: DeserializeFrom + Eq + Hash,
{
// First read the length header
if data.len() < 8 {
// so the file doesn't even have the length header? noice, just return
None
} else {
unsafe {
// so we have 8B. Just unsafe access and transmute it
let len = transmute_len(data.as_ptr());
let mut set = HashSet::with_capacity(len);
// this is what we have left: [KLEN:8B]*
// move 8 bytes ahead since we're done with len
let mut ptr = data.as_ptr().add(8);
let end_ptr = data.as_ptr().add(data.len());
for _ in 0..len {
if (ptr.add(8)) >= end_ptr {
// not enough space and even if there is a len
// there is no value. This is even true for ZSTs
return None;
}
let lenkey = transmute_len(ptr);
ptr = ptr.add(8);
if (ptr.add(lenkey)) > end_ptr {
// not enough data left
return None;
}
if !T::is_expected_len(lenkey) {
return None;
}
// get the key as a raw slice, we've already checked if end_ptr is less
let key = T::from_slice(slice::from_raw_parts(ptr, lenkey));
// move the ptr ahead; done with the key
ptr = ptr.add(lenkey);
// push it in
set.insert(key);
}
if ptr == end_ptr {
Some(set)
} else {
// nope, someone gave us more data
None
}
}
}
}
/// Deserialize a file that contains a serialized map /// Deserialize a file that contains a serialized map
pub fn deserialize(data: Vec<u8>) -> Option<Coremap<Data, Data>> { pub fn deserialize_map(data: Vec<u8>) -> Option<Coremap<Data, Data>> {
// First read the length header // First read the length header
if data.len() < 8 { if data.len() < 8 {
// so the file doesn't even have the length header? noice, just return // so the file doesn't even have the length header? noice, just return

@ -24,46 +24,56 @@
* *
*/ */
use super::raw_byte_repr;
use crate::coredb::memstore::Memstore; use crate::coredb::memstore::Memstore;
use crate::coredb::memstore::ObjectID;
use core::ptr;
use std::collections::HashSet;
use std::io::Error as IoError;
use std::io::ErrorKind;
use std::io::Result as IoResult; use std::io::Result as IoResult;
use std::io::Write; use std::io::Write;
const VERSION_MARK: u64 = 1u64.swap_bytes(); // our version and endian are based on nibbles
/// Add padding bytes to align to 8B boundaries #[cfg(target_endian = "little")]
fn pad_nul_align8<W: Write>(l: usize, w: &mut W) -> IoResult<()> { const META_SEGMENT: u8 = 0b1000_0000;
// ignore handled amount
let _ = w.write(&[b'0'].repeat(64 - l))?; #[cfg(target_endian = "big")]
Ok(()) const META_SEGMENT: u8 = 0b1000_0001;
}
const VERSION: u8 = 1;
/// Generate the `PRELOAD` disk file for this instance /// Generate the `PRELOAD` disk file for this instance
/// ```text /// ```text
/// [8B: Endian Mark/Version Mark (padded)] => Meta segment /// [1B: Endian Mark/Version Mark (padded)] => Meta segment
/// [8B: Extent header] => Predata Segment /// [8B: Extent header] => Predata Segment
/// ([8B: Parition ID (nul padded)])* => Data segment /// ([8B: Partion ID len][8B: Parition ID (not padded)])* => Data segment
/// ``` /// ```
/// ///
/// The meta segment need not be 8B, but it is done for easier alignment pub fn raw_generate_preload<W: Write>(w: &mut W, store: &Memstore) -> IoResult<()> {
pub fn raw_generate_preload<W: Write>(w: &mut W, store: Memstore) -> IoResult<()> {
unsafe {
// generate the meta segment // generate the meta segment
#[allow(clippy::identity_op)] // clippy doesn't understand endian #[allow(clippy::identity_op)]
let meta_segment = endian_mark!() | VERSION_MARK; w.write_all(&[META_SEGMENT])?;
w.write_all(&raw_byte_repr(&meta_segment))?; super::raw_serialize_set(&store.keyspaces, w)?;
Ok(())
}
// generate and write the extent header (predata) pub fn read_preload(preload: Vec<u8>) -> IoResult<HashSet<ObjectID>> {
w.write_all(&raw_byte_repr(&to_64bit_little_endian!(store if preload.len() < 16 {
.keyspaces // nah, this is a bad disk file
.len())))?; return Err(IoError::from(ErrorKind::UnexpectedEof));
} }
// start writing the parition IDs // first read in the meta segment
for partition in store.keyspaces.iter() { unsafe {
let partition_id = partition.key(); let meta_segment: u8 = ptr::read(preload.as_ptr());
w.write_all(&partition_id)?; if meta_segment != META_SEGMENT {
// pad return Err(IoError::from(ErrorKind::Unsupported));
pad_nul_align8(partition_id.len(), w)?; }
}
// all checks complete; time to decode
let ret = super::deserialize_set_ctype(&preload[1..]);
match ret {
Some(ret) => Ok(ret),
_ => Err(IoError::from(ErrorKind::UnexpectedEof)),
} }
Ok(())
} }

@ -30,7 +30,7 @@ use super::*;
fn test_serialize_deserialize_empty() { fn test_serialize_deserialize_empty() {
let cmap = Coremap::new(); let cmap = Coremap::new();
let ser = serialize_map(&cmap).unwrap(); let ser = serialize_map(&cmap).unwrap();
let de = deserialize(ser).unwrap(); let de = deserialize_map(ser).unwrap();
assert!(de.len() == 0); assert!(de.len() == 0);
} }
@ -40,7 +40,7 @@ fn test_ser_de_few_elements() {
cmap.upsert("sayan".into(), "writes code".into()); cmap.upsert("sayan".into(), "writes code".into());
cmap.upsert("supersayan".into(), "writes super code".into()); cmap.upsert("supersayan".into(), "writes super code".into());
let ser = serialize_map(&cmap).unwrap(); let ser = serialize_map(&cmap).unwrap();
let de = deserialize(ser).unwrap(); let de = deserialize_map(ser).unwrap();
assert!(de.len() == cmap.len()); assert!(de.len() == cmap.len());
assert!(de assert!(de
.iter() .iter()
@ -65,7 +65,7 @@ cfg_test!(
.map(|(k, v)| (Data::from(k.to_owned()), Data::from(v.to_owned()))) .map(|(k, v)| (Data::from(k.to_owned()), Data::from(v.to_owned())))
.collect(); .collect();
let ser = serialize_map(&cmap).unwrap(); let ser = serialize_map(&cmap).unwrap();
let de = deserialize(ser).unwrap(); let de = deserialize_map(ser).unwrap();
assert!(de assert!(de
.iter() .iter()
.all(|kv| cmap.get(kv.key()).unwrap().eq(kv.value()))); .all(|kv| cmap.get(kv.key()).unwrap().eq(kv.value())));
@ -90,7 +90,7 @@ cfg_test!(
// random chop // random chop
se.truncate(124); se.truncate(124);
// corrupted // corrupted
assert!(deserialize(se).is_none()); assert!(deserialize_map(se).is_none());
} }
#[test] #[test]
fn test_ser_de_excess_bytes() { fn test_ser_de_excess_bytes() {
@ -114,7 +114,7 @@ cfg_test!(
// random patch // random patch
let patch: Vec<u8> = (0u16..500u16).into_iter().map(|v| (v >> 7) as u8).collect(); let patch: Vec<u8> = (0u16..500u16).into_iter().map(|v| (v >> 7) as u8).collect();
se.extend(patch); se.extend(patch);
assert!(deserialize(se).is_none()); assert!(deserialize_map(se).is_none());
} }
); );
@ -175,3 +175,20 @@ mod interface_tests {
assert_eq!(&cow_file[..cow_file.len() - 1], "10".to_owned()); assert_eq!(&cow_file[..cow_file.len() - 1], "10".to_owned());
} }
} }
mod preload_tests {
use super::*;
use crate::coredb::memstore::Memstore;
#[test]
fn test_preload() {
let memstore = Memstore::new_default();
let mut v = Vec::new();
preload::raw_generate_preload(&mut v, &memstore).unwrap();
let de: Vec<String> = preload::read_preload(v)
.unwrap()
.into_iter()
.map(|each| unsafe { each.as_str().to_owned() })
.collect();
assert_veceq!(de, vec!["default".to_owned()]);
}
}

Loading…
Cancel
Save