Add preload generation

next
Sayan Nandan 3 years ago
parent bca8df5863
commit 87d79650ce

@ -128,6 +128,9 @@ pub enum DdlError {
pub struct Memstore { pub struct Memstore {
/// the keyspaces /// the keyspaces
pub keyspaces: Arc<Coremap<ObjectID, Arc<Keyspace>>>, pub keyspaces: Arc<Coremap<ObjectID, Arc<Keyspace>>>,
/// current state of the disk flush status. if this is true, we're safe to
/// go ahead with writes
flush_state_healthy: AtomicBool,
} }
impl Memstore { impl Memstore {
@ -135,6 +138,7 @@ impl Memstore {
pub fn new_empty() -> Self { pub fn new_empty() -> Self {
Self { Self {
keyspaces: Arc::new(Coremap::new()), keyspaces: Arc::new(Coremap::new()),
flush_state_healthy: AtomicBool::new(true),
} }
} }
/// Create a new in-memory table with the default keyspace and the default /// Create a new in-memory table with the default keyspace and the default
@ -159,6 +163,7 @@ impl Memstore {
n.true_if_insert(DEFAULT, Arc::new(Keyspace::empty_default())); n.true_if_insert(DEFAULT, Arc::new(Keyspace::empty_default()));
Arc::new(n) Arc::new(n)
}, },
flush_state_healthy: AtomicBool::new(true),
} }
} }
/// Get an atomic reference to a keyspace /// Get an atomic reference to a keyspace
@ -186,9 +191,6 @@ pub enum TableType {
pub struct Keyspace { pub struct Keyspace {
/// the tables /// the tables
pub tables: Coremap<ObjectID, Arc<Table>>, pub tables: Coremap<ObjectID, Arc<Table>>,
/// current state of the disk flush status. if this is true, we're safe to
/// go ahead with writes
flush_state_healthy: AtomicBool,
/// the snapshot configuration for this keyspace /// the snapshot configuration for this keyspace
snap_config: Option<SnapshotStatus>, snap_config: Option<SnapshotStatus>,
/// the replication strategy for this keyspace /// the replication strategy for this keyspace
@ -220,7 +222,6 @@ impl Keyspace {
); );
ht ht
}, },
flush_state_healthy: AtomicBool::new(true),
snap_config: None, snap_config: None,
replication_strategy: cluster::ReplicationStrategy::default(), replication_strategy: cluster::ReplicationStrategy::default(),
} }
@ -229,7 +230,6 @@ impl Keyspace {
pub fn empty() -> Self { pub fn empty() -> Self {
Self { Self {
tables: Coremap::new(), tables: Coremap::new(),
flush_state_healthy: AtomicBool::new(true),
snap_config: None, snap_config: None,
replication_strategy: cluster::ReplicationStrategy::default(), replication_strategy: cluster::ReplicationStrategy::default(),
} }

@ -31,39 +31,13 @@ use crate::coredb::buffers::Integer32Buffer;
use crate::coredb::htable::Coremap; use crate::coredb::htable::Coremap;
use crate::coredb::htable::Data; use crate::coredb::htable::Data;
use crate::coredb::memstore::Memstore; use crate::coredb::memstore::Memstore;
use std::fs;
use std::io::ErrorKind;
use std::io::Result as IoResult; use std::io::Result as IoResult;
use std::io::{BufWriter, Write}; use std::io::{BufWriter, Write};
use std::path::PathBuf;
const DIR_KSROOT: &str = "data/ks"; pub const DIR_KSROOT: &str = "data/ks";
const DIR_SNAPROOT: &str = "data/snaps"; pub const DIR_SNAPROOT: &str = "data/snaps";
const DIR_BACKUPS: &str = "data/backups"; pub const DIR_BACKUPS: &str = "data/backups";
const DIR_ROOT: &str = "data"; pub const DIR_ROOT: &str = "data";
macro_rules! try_dir_ignore_existing {
($dir:expr) => {{
match fs::create_dir_all($dir) {
Ok(_) => Ok(()),
Err(e) => match e.kind() {
ErrorKind::AlreadyExists => Ok(()),
_ => Err(e),
},
}
}};
($($dir:expr),*) => {
$(try_dir_ignore_existing!($dir)?;)*
}
}
macro_rules! concat_path {
($($s:expr),*) => {{ {
let mut path = PathBuf::with_capacity($(($s).len()+)*0);
$(path.push($s);)*
path
}}};
}
/// This creates the root directory structure: /// This creates the root directory structure:
/// ``` /// ```
@ -103,40 +77,6 @@ pub fn create_tree(memroot: Memstore) -> IoResult<()> {
Ok(()) Ok(())
} }
#[test]
fn test_tree() {
create_tree(Memstore::new_default()).unwrap();
let read_ks: Vec<String> = fs::read_dir(DIR_KSROOT)
.unwrap()
.map(|dir| {
let v = dir.unwrap().file_name();
v.to_string_lossy().to_string()
})
.collect();
assert_eq!(read_ks, vec!["default".to_owned()]);
// just read one level of the snaps dir
let read_snaps: Vec<String> = fs::read_dir(DIR_SNAPROOT)
.unwrap()
.map(|dir| {
let v = dir.unwrap().file_name();
v.to_string_lossy().to_string()
})
.collect();
assert_eq!(read_snaps, vec!["default".to_owned()]);
// now read level two: snaps/default
let read_snaps: Vec<String> = fs::read_dir(concat_path!(DIR_SNAPROOT, "default"))
.unwrap()
.map(|dir| {
let v = dir.unwrap().file_name();
v.to_string_lossy().to_string()
})
.collect();
assert_veceq!(read_snaps, vec!["_system".to_owned(), "default".to_owned()]);
assert!(PathBuf::from("data/backups").is_dir());
// clean up
fs::remove_dir_all(DIR_ROOT).unwrap();
}
/// Uses a buffered writer under the hood to improve write performance as the provided /// Uses a buffered writer under the hood to improve write performance as the provided
/// writable interface might be very slow. The buffer does flush once done, however, it /// writable interface might be very slow. The buffer does flush once done, however, it
/// is important that you fsync yourself! /// is important that you fsync yourself!
@ -151,7 +91,7 @@ pub fn serialize_map_into_slow_buffer<T: Write>(
} }
/// Get the file for COW. If the parition ID is 0000 /// Get the file for COW. If the parition ID is 0000
fn cow_file(id: PartitionID) -> Integer32Buffer { pub(super) fn cow_file(id: PartitionID) -> Integer32Buffer {
let mut buffer = Integer32Buffer::init(id); let mut buffer = Integer32Buffer::init(id);
unsafe { unsafe {
// UNSAFE(@ohsayan): We know we're just pushing in one thing // UNSAFE(@ohsayan): We know we're just pushing in one thing
@ -159,10 +99,3 @@ fn cow_file(id: PartitionID) -> Integer32Buffer {
} }
buffer buffer
} }
#[test]
fn test_cowfile() {
let cow_file = cow_file(10);
assert_eq!(cow_file, "10_".to_owned());
assert_eq!(&cow_file[..cow_file.len() - 1], "10".to_owned());
}

@ -0,0 +1,113 @@
/*
* Created on Sat Jul 17 2021
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2021, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
#[cfg(target_endian = "big")]
macro_rules! endian_mark {
() => {
1_u64
};
}
#[cfg(target_endian = "little")]
macro_rules! endian_mark {
() => {
0_u64
};
}
macro_rules! little_endian {
($block:block) => {
#[cfg(target_endian = "little")]
{
$block
}
};
}
macro_rules! big_endian {
($block:block) => {
#[cfg(target_endian = "big")]
{
$block
}
};
}
macro_rules! not_64_bit {
($block:block) => {
#[cfg(not(target_pointer_width = "64"))]
{
$block
}
};
}
macro_rules! is_64_bit {
($block:block) => {
#[cfg(target_pointer_width = "64")]
{
$block
}
};
}
#[cfg(target_endian = "big")]
macro_rules! to_64bit_little_endian {
($e:expr) => {
($e as u64).swap_bytes()
};
}
#[cfg(target_endian = "little")]
macro_rules! to_64bit_little_endian {
($e:expr) => {
($e as u64)
};
}
macro_rules! try_dir_ignore_existing {
($dir:expr) => {{
match std::fs::create_dir_all($dir) {
Ok(_) => Ok(()),
Err(e) => match e.kind() {
std::io::ErrorKind::AlreadyExists => Ok(()),
_ => Err(e),
},
}
}};
($($dir:expr),*) => {
$(try_dir_ignore_existing!($dir)?;)*
}
}
#[macro_export]
macro_rules! concat_path {
($($s:expr),*) => {{ {
let mut path = std::path::PathBuf::with_capacity($(($s).len()+)*0);
$(path.push($s);)*
path
}}};
}

@ -56,7 +56,14 @@ use core::mem;
use core::ptr; use core::ptr;
use core::slice; use core::slice;
use std::io::Write; use std::io::Write;
// for some astronomical reasons do not mess with this
#[macro_use]
mod macros;
// endof do not mess
pub mod interface; pub mod interface;
pub mod preload;
#[cfg(test)]
mod tests;
/// The ID of the partition in a keyspace. Using too many keyspaces is an absolute anti-pattern /// The ID of the partition in a keyspace. Using too many keyspaces is an absolute anti-pattern
/// on Skytable, something that it has inherited from prior experience in large scale systems. As /// on Skytable, something that it has inherited from prior experience in large scale systems. As
@ -64,56 +71,6 @@ pub mod interface;
/// you should never hit that limit. /// you should never hit that limit.
pub type PartitionID = u32; pub type PartitionID = u32;
macro_rules! little_endian {
($block:block) => {
#[cfg(target_endian = "little")]
{
$block
}
};
}
macro_rules! big_endian {
($block:block) => {
#[cfg(target_endian = "big")]
{
$block
}
};
}
macro_rules! not_64_bit {
($block:block) => {
#[cfg(not(target_pointer_width = "64"))]
{
$block
}
};
}
macro_rules! is_64_bit {
($block:block) => {
#[cfg(target_pointer_width = "64")]
{
$block
}
};
}
#[cfg(target_endian = "big")]
macro_rules! to_64bit_little_endian {
($e:expr) => {
($e as u64).swap_bytes()
};
}
#[cfg(target_endian = "little")]
macro_rules! to_64bit_little_endian {
($e:expr) => {
($e as u64)
};
}
/* /*
Endian and pointer "appendix": Endian and pointer "appendix":
We assume a fixed size of 1 for all the cases. All sizes don't hit over isize::MAX as We assume a fixed size of 1 for all the cases. All sizes don't hit over isize::MAX as
@ -352,105 +309,3 @@ unsafe fn transmute_len(start_ptr: *const u8) -> usize {
}); });
}); });
} }
#[test]
fn test_serialize_deserialize_empty() {
let cmap = Coremap::new();
let ser = serialize_map(&cmap).unwrap();
let de = deserialize(ser).unwrap();
assert!(de.len() == 0);
}
#[test]
fn test_ser_de_few_elements() {
let cmap = Coremap::new();
cmap.upsert("sayan".into(), "writes code".into());
cmap.upsert("supersayan".into(), "writes super code".into());
let ser = serialize_map(&cmap).unwrap();
let de = deserialize(ser).unwrap();
assert!(de.len() == cmap.len());
assert!(de
.iter()
.all(|kv| cmap.get(kv.key()).unwrap().eq(kv.value())));
}
cfg_test!(
use libstress::utils::generate_random_string_vector;
use rand::thread_rng;
#[test]
fn roast_the_serializer() {
const COUNT: usize = 1000_usize;
const LEN: usize = 8_usize;
let mut rng = thread_rng();
let (keys, values) = (
generate_random_string_vector(COUNT, LEN, &mut rng, true),
generate_random_string_vector(COUNT, LEN, &mut rng, false),
);
let cmap: Coremap<Data, Data> = keys
.iter()
.zip(values.iter())
.map(|(k, v)| (Data::from(k.to_owned()), Data::from(v.to_owned())))
.collect();
let ser = serialize_map(&cmap).unwrap();
let de = deserialize(ser).unwrap();
assert!(de
.iter()
.all(|kv| cmap.get(kv.key()).unwrap().eq(kv.value())));
assert!(de.len() == cmap.len());
}
#[test]
fn test_ser_de_safety() {
const COUNT: usize = 1000_usize;
const LEN: usize = 8_usize;
let mut rng = thread_rng();
let (keys, values) = (
generate_random_string_vector(COUNT, LEN, &mut rng, true),
generate_random_string_vector(COUNT, LEN, &mut rng, false),
);
let cmap: Coremap<Data, Data> = keys
.iter()
.zip(values.iter())
.map(|(k, v)| (Data::from(k.to_owned()), Data::from(v.to_owned())))
.collect();
let mut se = serialize_map(&cmap).unwrap();
// random chop
se.truncate(124);
// corrupted
assert!(deserialize(se).is_none());
}
#[test]
fn test_ser_de_excess_bytes() {
// this test needs a lot of auxiliary space
// we can approximate this to be: 100,000 x 30 bytes = 3,000,000 bytes
// and then we may have a clone overhead + heap allocation by the map
// so ~9,000,000 bytes or ~9MB
const COUNT: usize = 1000_usize;
const LEN: usize = 8_usize;
let mut rng = thread_rng();
let (keys, values) = (
generate_random_string_vector(COUNT, LEN, &mut rng, true),
generate_random_string_vector(COUNT, LEN, &mut rng, false),
);
let cmap: Coremap<Data, Data> = keys
.iter()
.zip(values.iter())
.map(|(k, v)| (Data::from(k.to_owned()), Data::from(v.to_owned())))
.collect();
let mut se = serialize_map(&cmap).unwrap();
// random patch
let patch: Vec<u8> = (0u16..500u16).into_iter().map(|v| (v >> 7) as u8).collect();
se.extend(patch);
assert!(deserialize(se).is_none());
}
);
#[cfg(target_pointer_width = "32")]
#[test]
#[should_panic]
fn test_runtime_panic_32bit_or_lower() {
let max = u64::MAX;
let byte_stream = unsafe { raw_byte_repr(&max).to_owned() };
let ptr = byte_stream.as_ptr();
unsafe { transmute_len(ptr) };
}

@ -0,0 +1,69 @@
/*
* Created on Sat Jul 17 2021
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2021, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use super::raw_byte_repr;
use crate::coredb::memstore::Memstore;
use std::io::Result as IoResult;
use std::io::Write;
const VERSION_MARK: u64 = 1u64.swap_bytes();
/// Add padding bytes to align to 8B boundaries
fn pad_nul_align8<W: Write>(l: usize, w: &mut W) -> IoResult<()> {
// ignore handled amount
let _ = w.write(&[b'0'].repeat(64 - l))?;
Ok(())
}
/// Generate the `PRELOAD` disk file for this instance
/// ```text
/// [8B: Endian Mark/Version Mark (padded)] => Meta segment
/// [8B: Extent header] => Predata Segment
/// ([8B: Parition ID (nul padded)])* => Data segment
/// ```
///
/// The meta segment need not be 8B, but it is done for easier alignment
pub fn raw_generate_preload<W: Write>(w: &mut W, store: Memstore) -> IoResult<()> {
unsafe {
// generate the meta segment
#[allow(clippy::identity_op)] // clippy doesn't understand endian
let meta_segment = endian_mark!() | VERSION_MARK;
w.write_all(&raw_byte_repr(&meta_segment))?;
// generate and write the extent header (predata)
w.write_all(&raw_byte_repr(&to_64bit_little_endian!(store
.keyspaces
.len())))?;
}
// start writing the parition IDs
for partition in store.keyspaces.iter() {
let partition_id = partition.key();
w.write_all(&partition_id)?;
// pad
pad_nul_align8(partition_id.len(), w)?;
}
Ok(())
}

@ -0,0 +1,177 @@
/*
* Created on Sat Jul 17 2021
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2021, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use super::*;
#[test]
fn test_serialize_deserialize_empty() {
let cmap = Coremap::new();
let ser = serialize_map(&cmap).unwrap();
let de = deserialize(ser).unwrap();
assert!(de.len() == 0);
}
#[test]
fn test_ser_de_few_elements() {
let cmap = Coremap::new();
cmap.upsert("sayan".into(), "writes code".into());
cmap.upsert("supersayan".into(), "writes super code".into());
let ser = serialize_map(&cmap).unwrap();
let de = deserialize(ser).unwrap();
assert!(de.len() == cmap.len());
assert!(de
.iter()
.all(|kv| cmap.get(kv.key()).unwrap().eq(kv.value())));
}
cfg_test!(
use libstress::utils::generate_random_string_vector;
use rand::thread_rng;
#[test]
fn roast_the_serializer() {
const COUNT: usize = 1000_usize;
const LEN: usize = 8_usize;
let mut rng = thread_rng();
let (keys, values) = (
generate_random_string_vector(COUNT, LEN, &mut rng, true),
generate_random_string_vector(COUNT, LEN, &mut rng, false),
);
let cmap: Coremap<Data, Data> = keys
.iter()
.zip(values.iter())
.map(|(k, v)| (Data::from(k.to_owned()), Data::from(v.to_owned())))
.collect();
let ser = serialize_map(&cmap).unwrap();
let de = deserialize(ser).unwrap();
assert!(de
.iter()
.all(|kv| cmap.get(kv.key()).unwrap().eq(kv.value())));
assert!(de.len() == cmap.len());
}
#[test]
fn test_ser_de_safety() {
const COUNT: usize = 1000_usize;
const LEN: usize = 8_usize;
let mut rng = thread_rng();
let (keys, values) = (
generate_random_string_vector(COUNT, LEN, &mut rng, true),
generate_random_string_vector(COUNT, LEN, &mut rng, false),
);
let cmap: Coremap<Data, Data> = keys
.iter()
.zip(values.iter())
.map(|(k, v)| (Data::from(k.to_owned()), Data::from(v.to_owned())))
.collect();
let mut se = serialize_map(&cmap).unwrap();
// random chop
se.truncate(124);
// corrupted
assert!(deserialize(se).is_none());
}
#[test]
fn test_ser_de_excess_bytes() {
// this test needs a lot of auxiliary space
// we can approximate this to be: 100,000 x 30 bytes = 3,000,000 bytes
// and then we may have a clone overhead + heap allocation by the map
// so ~9,000,000 bytes or ~9MB
const COUNT: usize = 1000_usize;
const LEN: usize = 8_usize;
let mut rng = thread_rng();
let (keys, values) = (
generate_random_string_vector(COUNT, LEN, &mut rng, true),
generate_random_string_vector(COUNT, LEN, &mut rng, false),
);
let cmap: Coremap<Data, Data> = keys
.iter()
.zip(values.iter())
.map(|(k, v)| (Data::from(k.to_owned()), Data::from(v.to_owned())))
.collect();
let mut se = serialize_map(&cmap).unwrap();
// random patch
let patch: Vec<u8> = (0u16..500u16).into_iter().map(|v| (v >> 7) as u8).collect();
se.extend(patch);
assert!(deserialize(se).is_none());
}
);
#[cfg(target_pointer_width = "32")]
#[test]
#[should_panic]
fn test_runtime_panic_32bit_or_lower() {
let max = u64::MAX;
let byte_stream = unsafe { raw_byte_repr(&max).to_owned() };
let ptr = byte_stream.as_ptr();
unsafe { transmute_len(ptr) };
}
mod interface_tests {
use super::interface::{cow_file, create_tree, DIR_KSROOT, DIR_ROOT, DIR_SNAPROOT};
use crate::concat_path;
use crate::coredb::memstore::Memstore;
use std::fs;
use std::path::PathBuf;
#[test]
fn test_tree() {
create_tree(Memstore::new_default()).unwrap();
let read_ks: Vec<String> = fs::read_dir(DIR_KSROOT)
.unwrap()
.map(|dir| {
let v = dir.unwrap().file_name();
v.to_string_lossy().to_string()
})
.collect();
assert_eq!(read_ks, vec!["default".to_owned()]);
// just read one level of the snaps dir
let read_snaps: Vec<String> = fs::read_dir(DIR_SNAPROOT)
.unwrap()
.map(|dir| {
let v = dir.unwrap().file_name();
v.to_string_lossy().to_string()
})
.collect();
assert_eq!(read_snaps, vec!["default".to_owned()]);
// now read level two: snaps/default
let read_snaps: Vec<String> = fs::read_dir(concat_path!(DIR_SNAPROOT, "default"))
.unwrap()
.map(|dir| {
let v = dir.unwrap().file_name();
v.to_string_lossy().to_string()
})
.collect();
assert_veceq!(read_snaps, vec!["_system".to_owned(), "default".to_owned()]);
assert!(PathBuf::from("data/backups").is_dir());
// clean up
fs::remove_dir_all(DIR_ROOT).unwrap();
}
#[test]
fn test_cowfile() {
let cow_file = cow_file(10);
assert_eq!(cow_file, "10_".to_owned());
assert_eq!(&cow_file[..cow_file.len() - 1], "10".to_owned());
}
}
Loading…
Cancel
Save