diff --git a/server/src/coredb/array.rs b/server/src/coredb/array.rs index b89769e1..b0f785ae 100644 --- a/server/src/coredb/array.rs +++ b/server/src/coredb/array.rs @@ -41,6 +41,7 @@ use core::mem::MaybeUninit; use core::ops; use core::ptr; use core::slice; +use core::str; use serde::{de::SeqAccess, de::Visitor, Deserialize, Deserializer, Serialize, Serializer}; /// A compile-time, fixed size array that can have unintialized memory. This array is as @@ -298,6 +299,14 @@ impl Array { } } +impl Array { + /// This isn't _unsafe_ but it can cause functions expecting pure unicode to + /// crash if the array contains invalid unicode + pub unsafe fn as_str(&self) -> &str { + str::from_utf8_unchecked(self.as_ref()) + } +} + impl ops::Deref for Array { type Target = [T]; fn deref(&self) -> &Self::Target { diff --git a/server/src/coredb/memstore.rs b/server/src/coredb/memstore.rs index 113f24ff..e6071d86 100644 --- a/server/src/coredb/memstore.rs +++ b/server/src/coredb/memstore.rs @@ -58,7 +58,6 @@ use crate::coredb::array::Array; use crate::coredb::htable::Coremap; -use crate::coredb::htable::Data; use crate::coredb::SnapshotStatus; use crate::kvengine::KVEngine; use core::mem::MaybeUninit; @@ -128,7 +127,7 @@ pub enum DdlError { /// connection-level control abilities over the keyspace pub struct Memstore { /// the keyspaces - keyspaces: Arc>>, + pub keyspaces: Arc>>, } impl Memstore { @@ -186,7 +185,7 @@ pub enum TableType { /// A keyspace houses all the other tables pub struct Keyspace { /// the tables - tables: Coremap>, + pub tables: Coremap>, /// current state of the disk flush status. if this is true, we're safe to /// go ahead with writes flush_state_healthy: AtomicBool, @@ -196,6 +195,12 @@ pub struct Keyspace { replication_strategy: cluster::ReplicationStrategy, } +macro_rules! unsafe_objectid_from_slice { + ($slice:expr) => {{ + unsafe { ObjectID::from_slice($slice) } + }}; +} + impl Keyspace { /// Create a new empty keyspace with the default tables: a `default` table and a /// `system` table @@ -205,12 +210,12 @@ impl Keyspace { let ht = Coremap::new(); // add the default table ht.true_if_insert( - Data::from("default"), + unsafe_objectid_from_slice!("default"), Arc::new(Table::KV(KVEngine::default())), ); // add the system table ht.true_if_insert( - Data::from("_system"), + unsafe_objectid_from_slice!("_system"), Arc::new(Table::KV(KVEngine::default())), ); ht @@ -230,20 +235,20 @@ impl Keyspace { } } /// Get an atomic reference to a table in this keyspace if it exists - pub fn get_table_atomic_ref(&self, table_identifier: Data) -> Option> { + pub fn get_table_atomic_ref(&self, table_identifier: ObjectID) -> Option> { self.tables.get(&table_identifier).map(|v| v.clone()) } /// Create a new table with **default encoding** - pub fn create_table(&self, table_identifier: Data, table_type: TableType) -> bool { + pub fn create_table(&self, table_identifier: ObjectID, table_type: TableType) -> bool { self.tables.true_if_insert(table_identifier, { match table_type { TableType::KeyValue => Arc::new(Table::KV(KVEngine::default())), } }) } - pub fn drop_table(&self, table_identifier: Data) -> Result<(), DdlError> { - if table_identifier.eq(&Data::from("default")) - || table_identifier.eq(&Data::from("_system")) + pub fn drop_table(&self, table_identifier: ObjectID) -> Result<(), DdlError> { + if table_identifier.eq(&unsafe_objectid_from_slice!("default")) + || table_identifier.eq(&unsafe_objectid_from_slice!("_system")) { Err(DdlError::ProtectedObject) } else if self.tables.contains_key(&table_identifier) { @@ -268,19 +273,23 @@ impl Keyspace { #[test] fn test_keyspace_drop_no_atomic_ref() { let our_keyspace = Keyspace::empty_default(); - assert!(our_keyspace.create_table(Data::from("apps"), TableType::KeyValue)); - assert!(our_keyspace.drop_table(Data::from("apps")).is_ok()); + assert!(our_keyspace.create_table(unsafe_objectid_from_slice!("apps"), TableType::KeyValue)); + assert!(our_keyspace + .drop_table(unsafe_objectid_from_slice!("apps")) + .is_ok()); } #[test] fn test_keyspace_drop_fail_with_atomic_ref() { let our_keyspace = Keyspace::empty_default(); - assert!(our_keyspace.create_table(Data::from("apps"), TableType::KeyValue)); + assert!(our_keyspace.create_table(unsafe_objectid_from_slice!("apps"), TableType::KeyValue)); let _atomic_tbl_ref = our_keyspace - .get_table_atomic_ref(Data::from("apps")) + .get_table_atomic_ref(unsafe_objectid_from_slice!("apps")) .unwrap(); assert_eq!( - our_keyspace.drop_table(Data::from("apps")).unwrap_err(), + our_keyspace + .drop_table(unsafe_objectid_from_slice!("apps")) + .unwrap_err(), DdlError::StillInUse ); } @@ -289,11 +298,15 @@ fn test_keyspace_drop_fail_with_atomic_ref() { fn test_keyspace_try_delete_protected_table() { let our_keyspace = Keyspace::empty_default(); assert_eq!( - our_keyspace.drop_table(Data::from("default")).unwrap_err(), + our_keyspace + .drop_table(unsafe_objectid_from_slice!("default")) + .unwrap_err(), DdlError::ProtectedObject ); assert_eq!( - our_keyspace.drop_table(Data::from("_system")).unwrap_err(), + our_keyspace + .drop_table(unsafe_objectid_from_slice!("_system")) + .unwrap_err(), DdlError::ProtectedObject ); } diff --git a/server/src/storage/interface.rs b/server/src/storage/interface.rs index f026a02f..a90901b3 100644 --- a/server/src/storage/interface.rs +++ b/server/src/storage/interface.rs @@ -30,10 +30,112 @@ use super::PartitionID; use crate::coredb::buffers::Integer32Buffer; use crate::coredb::htable::Coremap; use crate::coredb::htable::Data; +use crate::coredb::memstore::Memstore; use std::fs; +use std::io::ErrorKind; use std::io::Result as IoResult; use std::io::{BufWriter, Write}; -use std::thread::{self, JoinHandle}; +use std::path::PathBuf; + +const DIR_KSROOT: &str = "data/ks"; +const DIR_SNAPROOT: &str = "data/snaps"; +const DIR_BACKUPS: &str = "data/backups"; +const DIR_ROOT: &str = "data"; + +macro_rules! try_dir_ignore_existing { + ($dir:expr) => {{ + match fs::create_dir_all($dir) { + Ok(_) => Ok(()), + Err(e) => match e.kind() { + ErrorKind::AlreadyExists => Ok(()), + _ => Err(e), + }, + } + }}; + ($($dir:expr),*) => { + $(try_dir_ignore_existing!($dir)?;)* + } +} + +macro_rules! concat_path { + ($($s:expr),*) => {{ { + let mut path = PathBuf::with_capacity($(($s).len()+)*0); + $(path.push($s);)* + path + }}}; +} + +/// This creates the root directory structure: +/// ``` +/// data/ +/// ks/ +/// ks1/ +/// ks2/ +/// ks3/ +/// snaps/ +/// ks1/ +/// tbl1/ +/// tbl2/ +/// ks2/ +/// tbl1/ +/// tbl2/ +/// ks3/ +/// tbl1/ +/// tbl2/ +/// backups/ +/// ``` +/// +/// If any directories exist, they are simply ignored +pub fn create_tree(memroot: Memstore) -> IoResult<()> { + try_dir_ignore_existing!(DIR_ROOT, DIR_KSROOT, DIR_BACKUPS, DIR_SNAPROOT); + for ks in memroot.keyspaces.iter() { + unsafe { + try_dir_ignore_existing!(concat_path!(DIR_KSROOT, ks.key().as_str()))?; + for tbl in ks.value().tables.iter() { + try_dir_ignore_existing!(concat_path!( + DIR_SNAPROOT, + ks.key().as_str(), + tbl.key().as_str() + ))?; + } + } + } + Ok(()) +} + +#[test] +fn test_tree() { + create_tree(Memstore::new_default()).unwrap(); + let read_ks: Vec = fs::read_dir(DIR_KSROOT) + .unwrap() + .map(|dir| { + let v = dir.unwrap().file_name(); + v.to_string_lossy().to_string() + }) + .collect(); + assert_eq!(read_ks, vec!["default".to_owned()]); + // just read one level of the snaps dir + let read_snaps: Vec = fs::read_dir(DIR_SNAPROOT) + .unwrap() + .map(|dir| { + let v = dir.unwrap().file_name(); + v.to_string_lossy().to_string() + }) + .collect(); + assert_eq!(read_snaps, vec!["default".to_owned()]); + // now read level two: snaps/default + let read_snaps: Vec = fs::read_dir(concat_path!(DIR_SNAPROOT, "default")) + .unwrap() + .map(|dir| { + let v = dir.unwrap().file_name(); + v.to_string_lossy().to_string() + }) + .collect(); + assert_veceq!(read_snaps, vec!["_system".to_owned(), "default".to_owned()]); + assert!(PathBuf::from("data/backups").is_dir()); + // clean up + fs::remove_dir_all(DIR_ROOT).unwrap(); +} /// Uses a buffered writer under the hood to improve write performance as the provided /// writable interface might be very slow. The buffer does flush once done, however, it @@ -64,18 +166,3 @@ fn test_cowfile() { assert_eq!(cow_file, "10_".to_owned()); assert_eq!(&cow_file[..cow_file.len() - 1], "10".to_owned()); } - -/// Returns a handle to a thread that was spawned to handle this specific flush routine -pub fn threaded_se( - tblref: Coremap, - partition_id: PartitionID, -) -> JoinHandle> { - thread::spawn(move || { - let fname = cow_file(partition_id); - let mut f = fs::File::create(&*fname)?; - self::serialize_map_into_slow_buffer(&mut f, &tblref)?; - f.sync_all()?; - fs::rename(&*fname, &fname[..fname.len() - 1])?; - Ok(()) - }) -} diff --git a/server/src/util.rs b/server/src/util.rs index b0e6811b..92f4df9b 100644 --- a/server/src/util.rs +++ b/server/src/util.rs @@ -90,3 +90,18 @@ macro_rules! cfg_test { $(#[cfg(test)] $item)* }; } + +#[macro_export] +/// Compare two vectors irrespective of their elements' position +macro_rules! veceq { + ($v1:expr, $v2:expr) => { + $v1.len() == $v2.len() && $v1.iter().all(|v| $v2.contains(v)) + }; +} + +#[macro_export] +macro_rules! assert_veceq { + ($v1:expr, $v2:expr) => { + assert!(veceq!($v1, $v2)) + }; +}