Add methods to create directory tree

next
Sayan Nandan 3 years ago
parent 058b1ef1c6
commit bca8df5863

@ -41,6 +41,7 @@ use core::mem::MaybeUninit;
use core::ops; use core::ops;
use core::ptr; use core::ptr;
use core::slice; use core::slice;
use core::str;
use serde::{de::SeqAccess, de::Visitor, Deserialize, Deserializer, Serialize, Serializer}; use serde::{de::SeqAccess, de::Visitor, Deserialize, Deserializer, Serialize, Serializer};
/// A compile-time, fixed size array that can have unintialized memory. This array is as /// A compile-time, fixed size array that can have unintialized memory. This array is as
@ -298,6 +299,14 @@ impl<T, const N: usize> Array<T, N> {
} }
} }
impl<const N: usize> Array<u8, N> {
/// This isn't _unsafe_ but it can cause functions expecting pure unicode to
/// crash if the array contains invalid unicode
pub unsafe fn as_str(&self) -> &str {
str::from_utf8_unchecked(self.as_ref())
}
}
impl<T, const N: usize> ops::Deref for Array<T, N> { impl<T, const N: usize> ops::Deref for Array<T, N> {
type Target = [T]; type Target = [T];
fn deref(&self) -> &Self::Target { fn deref(&self) -> &Self::Target {

@ -58,7 +58,6 @@
use crate::coredb::array::Array; use crate::coredb::array::Array;
use crate::coredb::htable::Coremap; use crate::coredb::htable::Coremap;
use crate::coredb::htable::Data;
use crate::coredb::SnapshotStatus; use crate::coredb::SnapshotStatus;
use crate::kvengine::KVEngine; use crate::kvengine::KVEngine;
use core::mem::MaybeUninit; use core::mem::MaybeUninit;
@ -128,7 +127,7 @@ pub enum DdlError {
/// connection-level control abilities over the keyspace /// connection-level control abilities over the keyspace
pub struct Memstore { pub struct Memstore {
/// the keyspaces /// the keyspaces
keyspaces: Arc<Coremap<ObjectID, Arc<Keyspace>>>, pub keyspaces: Arc<Coremap<ObjectID, Arc<Keyspace>>>,
} }
impl Memstore { impl Memstore {
@ -186,7 +185,7 @@ pub enum TableType {
/// A keyspace houses all the other tables /// A keyspace houses all the other tables
pub struct Keyspace { pub struct Keyspace {
/// the tables /// the tables
tables: Coremap<Data, Arc<Table>>, pub tables: Coremap<ObjectID, Arc<Table>>,
/// current state of the disk flush status. if this is true, we're safe to /// current state of the disk flush status. if this is true, we're safe to
/// go ahead with writes /// go ahead with writes
flush_state_healthy: AtomicBool, flush_state_healthy: AtomicBool,
@ -196,6 +195,12 @@ pub struct Keyspace {
replication_strategy: cluster::ReplicationStrategy, replication_strategy: cluster::ReplicationStrategy,
} }
macro_rules! unsafe_objectid_from_slice {
($slice:expr) => {{
unsafe { ObjectID::from_slice($slice) }
}};
}
impl Keyspace { impl Keyspace {
/// Create a new empty keyspace with the default tables: a `default` table and a /// Create a new empty keyspace with the default tables: a `default` table and a
/// `system` table /// `system` table
@ -205,12 +210,12 @@ impl Keyspace {
let ht = Coremap::new(); let ht = Coremap::new();
// add the default table // add the default table
ht.true_if_insert( ht.true_if_insert(
Data::from("default"), unsafe_objectid_from_slice!("default"),
Arc::new(Table::KV(KVEngine::default())), Arc::new(Table::KV(KVEngine::default())),
); );
// add the system table // add the system table
ht.true_if_insert( ht.true_if_insert(
Data::from("_system"), unsafe_objectid_from_slice!("_system"),
Arc::new(Table::KV(KVEngine::default())), Arc::new(Table::KV(KVEngine::default())),
); );
ht ht
@ -230,20 +235,20 @@ impl Keyspace {
} }
} }
/// Get an atomic reference to a table in this keyspace if it exists /// Get an atomic reference to a table in this keyspace if it exists
pub fn get_table_atomic_ref(&self, table_identifier: Data) -> Option<Arc<Table>> { pub fn get_table_atomic_ref(&self, table_identifier: ObjectID) -> Option<Arc<Table>> {
self.tables.get(&table_identifier).map(|v| v.clone()) self.tables.get(&table_identifier).map(|v| v.clone())
} }
/// Create a new table with **default encoding** /// Create a new table with **default encoding**
pub fn create_table(&self, table_identifier: Data, table_type: TableType) -> bool { pub fn create_table(&self, table_identifier: ObjectID, table_type: TableType) -> bool {
self.tables.true_if_insert(table_identifier, { self.tables.true_if_insert(table_identifier, {
match table_type { match table_type {
TableType::KeyValue => Arc::new(Table::KV(KVEngine::default())), TableType::KeyValue => Arc::new(Table::KV(KVEngine::default())),
} }
}) })
} }
pub fn drop_table(&self, table_identifier: Data) -> Result<(), DdlError> { pub fn drop_table(&self, table_identifier: ObjectID) -> Result<(), DdlError> {
if table_identifier.eq(&Data::from("default")) if table_identifier.eq(&unsafe_objectid_from_slice!("default"))
|| table_identifier.eq(&Data::from("_system")) || table_identifier.eq(&unsafe_objectid_from_slice!("_system"))
{ {
Err(DdlError::ProtectedObject) Err(DdlError::ProtectedObject)
} else if self.tables.contains_key(&table_identifier) { } else if self.tables.contains_key(&table_identifier) {
@ -268,19 +273,23 @@ impl Keyspace {
#[test] #[test]
fn test_keyspace_drop_no_atomic_ref() { fn test_keyspace_drop_no_atomic_ref() {
let our_keyspace = Keyspace::empty_default(); let our_keyspace = Keyspace::empty_default();
assert!(our_keyspace.create_table(Data::from("apps"), TableType::KeyValue)); assert!(our_keyspace.create_table(unsafe_objectid_from_slice!("apps"), TableType::KeyValue));
assert!(our_keyspace.drop_table(Data::from("apps")).is_ok()); assert!(our_keyspace
.drop_table(unsafe_objectid_from_slice!("apps"))
.is_ok());
} }
#[test] #[test]
fn test_keyspace_drop_fail_with_atomic_ref() { fn test_keyspace_drop_fail_with_atomic_ref() {
let our_keyspace = Keyspace::empty_default(); let our_keyspace = Keyspace::empty_default();
assert!(our_keyspace.create_table(Data::from("apps"), TableType::KeyValue)); assert!(our_keyspace.create_table(unsafe_objectid_from_slice!("apps"), TableType::KeyValue));
let _atomic_tbl_ref = our_keyspace let _atomic_tbl_ref = our_keyspace
.get_table_atomic_ref(Data::from("apps")) .get_table_atomic_ref(unsafe_objectid_from_slice!("apps"))
.unwrap(); .unwrap();
assert_eq!( assert_eq!(
our_keyspace.drop_table(Data::from("apps")).unwrap_err(), our_keyspace
.drop_table(unsafe_objectid_from_slice!("apps"))
.unwrap_err(),
DdlError::StillInUse DdlError::StillInUse
); );
} }
@ -289,11 +298,15 @@ fn test_keyspace_drop_fail_with_atomic_ref() {
fn test_keyspace_try_delete_protected_table() { fn test_keyspace_try_delete_protected_table() {
let our_keyspace = Keyspace::empty_default(); let our_keyspace = Keyspace::empty_default();
assert_eq!( assert_eq!(
our_keyspace.drop_table(Data::from("default")).unwrap_err(), our_keyspace
.drop_table(unsafe_objectid_from_slice!("default"))
.unwrap_err(),
DdlError::ProtectedObject DdlError::ProtectedObject
); );
assert_eq!( assert_eq!(
our_keyspace.drop_table(Data::from("_system")).unwrap_err(), our_keyspace
.drop_table(unsafe_objectid_from_slice!("_system"))
.unwrap_err(),
DdlError::ProtectedObject DdlError::ProtectedObject
); );
} }

@ -30,10 +30,112 @@ use super::PartitionID;
use crate::coredb::buffers::Integer32Buffer; use crate::coredb::buffers::Integer32Buffer;
use crate::coredb::htable::Coremap; use crate::coredb::htable::Coremap;
use crate::coredb::htable::Data; use crate::coredb::htable::Data;
use crate::coredb::memstore::Memstore;
use std::fs; use std::fs;
use std::io::ErrorKind;
use std::io::Result as IoResult; use std::io::Result as IoResult;
use std::io::{BufWriter, Write}; use std::io::{BufWriter, Write};
use std::thread::{self, JoinHandle}; use std::path::PathBuf;
const DIR_KSROOT: &str = "data/ks";
const DIR_SNAPROOT: &str = "data/snaps";
const DIR_BACKUPS: &str = "data/backups";
const DIR_ROOT: &str = "data";
macro_rules! try_dir_ignore_existing {
($dir:expr) => {{
match fs::create_dir_all($dir) {
Ok(_) => Ok(()),
Err(e) => match e.kind() {
ErrorKind::AlreadyExists => Ok(()),
_ => Err(e),
},
}
}};
($($dir:expr),*) => {
$(try_dir_ignore_existing!($dir)?;)*
}
}
macro_rules! concat_path {
($($s:expr),*) => {{ {
let mut path = PathBuf::with_capacity($(($s).len()+)*0);
$(path.push($s);)*
path
}}};
}
/// This creates the root directory structure:
/// ```
/// data/
/// ks/
/// ks1/
/// ks2/
/// ks3/
/// snaps/
/// ks1/
/// tbl1/
/// tbl2/
/// ks2/
/// tbl1/
/// tbl2/
/// ks3/
/// tbl1/
/// tbl2/
/// backups/
/// ```
///
/// If any directories exist, they are simply ignored
pub fn create_tree(memroot: Memstore) -> IoResult<()> {
try_dir_ignore_existing!(DIR_ROOT, DIR_KSROOT, DIR_BACKUPS, DIR_SNAPROOT);
for ks in memroot.keyspaces.iter() {
unsafe {
try_dir_ignore_existing!(concat_path!(DIR_KSROOT, ks.key().as_str()))?;
for tbl in ks.value().tables.iter() {
try_dir_ignore_existing!(concat_path!(
DIR_SNAPROOT,
ks.key().as_str(),
tbl.key().as_str()
))?;
}
}
}
Ok(())
}
#[test]
fn test_tree() {
create_tree(Memstore::new_default()).unwrap();
let read_ks: Vec<String> = fs::read_dir(DIR_KSROOT)
.unwrap()
.map(|dir| {
let v = dir.unwrap().file_name();
v.to_string_lossy().to_string()
})
.collect();
assert_eq!(read_ks, vec!["default".to_owned()]);
// just read one level of the snaps dir
let read_snaps: Vec<String> = fs::read_dir(DIR_SNAPROOT)
.unwrap()
.map(|dir| {
let v = dir.unwrap().file_name();
v.to_string_lossy().to_string()
})
.collect();
assert_eq!(read_snaps, vec!["default".to_owned()]);
// now read level two: snaps/default
let read_snaps: Vec<String> = fs::read_dir(concat_path!(DIR_SNAPROOT, "default"))
.unwrap()
.map(|dir| {
let v = dir.unwrap().file_name();
v.to_string_lossy().to_string()
})
.collect();
assert_veceq!(read_snaps, vec!["_system".to_owned(), "default".to_owned()]);
assert!(PathBuf::from("data/backups").is_dir());
// clean up
fs::remove_dir_all(DIR_ROOT).unwrap();
}
/// Uses a buffered writer under the hood to improve write performance as the provided /// Uses a buffered writer under the hood to improve write performance as the provided
/// writable interface might be very slow. The buffer does flush once done, however, it /// writable interface might be very slow. The buffer does flush once done, however, it
@ -64,18 +166,3 @@ fn test_cowfile() {
assert_eq!(cow_file, "10_".to_owned()); assert_eq!(cow_file, "10_".to_owned());
assert_eq!(&cow_file[..cow_file.len() - 1], "10".to_owned()); assert_eq!(&cow_file[..cow_file.len() - 1], "10".to_owned());
} }
/// Returns a handle to a thread that was spawned to handle this specific flush routine
pub fn threaded_se(
tblref: Coremap<Data, Data>,
partition_id: PartitionID,
) -> JoinHandle<IoResult<()>> {
thread::spawn(move || {
let fname = cow_file(partition_id);
let mut f = fs::File::create(&*fname)?;
self::serialize_map_into_slow_buffer(&mut f, &tblref)?;
f.sync_all()?;
fs::rename(&*fname, &fname[..fname.len() - 1])?;
Ok(())
})
}

@ -90,3 +90,18 @@ macro_rules! cfg_test {
$(#[cfg(test)] $item)* $(#[cfg(test)] $item)*
}; };
} }
#[macro_export]
/// Compare two vectors irrespective of their elements' position
macro_rules! veceq {
($v1:expr, $v2:expr) => {
$v1.len() == $v2.len() && $v1.iter().all(|v| $v2.contains(v))
};
}
#[macro_export]
macro_rules! assert_veceq {
($v1:expr, $v2:expr) => {
assert!(veceq!($v1, $v2))
};
}

Loading…
Cancel
Save