From 614e71cbef46e55d76012b49276950b6217b4247 Mon Sep 17 00:00:00 2001 From: Sayan Nandan Date: Fri, 1 Mar 2024 22:34:27 +0530 Subject: [PATCH] Default to `pwd` auth plugin and fix fs testing Also added misc documentation changes --- README.md | 6 +- server/src/engine/config.rs | 17 +++-- server/src/engine/fractal/test_utils.rs | 35 +++++++---- .../src/engine/storage/common/interface/fs.rs | 39 ++++++------ .../engine/storage/common/interface/vfs.rs | 63 +++++++++---------- .../storage/v2/impls/tests/model_driver.rs | 3 - 6 files changed, 81 insertions(+), 82 deletions(-) diff --git a/README.md b/README.md index 716e5f6b..84417521 100644 --- a/README.md +++ b/README.md @@ -9,11 +9,9 @@ A modern NoSQL database, powered by BlueQL.
Skytable is a **modern NoSQL database** that focuses on **performance, flexibility and scalability**. Skytable is a primarily in-memory, wide-column based database with support for additional data models* that uses its own storage engine (structured like n-ary records with optimized delayed-durability transactions) and -enables querying with its own query language *BlueQL* that is based on SQL which builds on SQL to improve -security and flexibility. +enables querying with its own query language *BlueQL* that builds on top of SQL to improve security and flexibility. -Skytable is best-suited for applications that need to store large-scale data, need high-performance and low -latencies. +Skytable is best-suited for applications that need to store large-scale data, need high-performance and low latencies. > **You can read more about Skytable's architecture, including information on the clustering and HA implementation that we're currently working on, and limitations [on this page](https://docs.skytable.io/architecture).** diff --git a/server/src/engine/config.rs b/server/src/engine/config.rs index cef98899..9c1e8ee5 100644 --- a/server/src/engine/config.rs +++ b/server/src/engine/config.rs @@ -525,24 +525,23 @@ fn arg_decode_auth( src_args: &mut ParsedRawArgs, config: &mut ModifyGuard, ) -> RuntimeResult<()> { - let (Some(auth_driver), Some(mut root_key)) = ( - src_args.remove(CS::KEY_AUTH_DRIVER), - src_args.remove(CS::KEY_AUTH_ROOT_PASSWORD), - ) else { + let auth_driver = src_args.remove(CS::KEY_AUTH_DRIVER); + let Some(mut root_key) = src_args.remove(CS::KEY_AUTH_ROOT_PASSWORD) else { return Err(ConfigError::with_src( CS::SOURCE, ConfigErrorKind::ErrorString(format!( - "to enable auth, you must provide values for both {} and {}", + "to enable auth, you must provide values for {}", CS::KEY_AUTH_DRIVER, - CS::KEY_AUTH_ROOT_PASSWORD )), ) .into()); }; - argck_duplicate_values::(&auth_driver, CS::KEY_AUTH_DRIVER)?; + if let Some(ref adrv) = auth_driver { + argck_duplicate_values::(&adrv, CS::KEY_AUTH_DRIVER)?; + } argck_duplicate_values::(&root_key, CS::KEY_AUTH_DRIVER)?; - let auth_plugin = match auth_driver[0].as_str() { - "pwd" => AuthDriver::Pwd, + let auth_plugin = match auth_driver.as_ref().map(|v| v[0].as_str()) { + Some("pwd") | None => AuthDriver::Pwd, _ => return Err(CS::err_invalid_value_for(CS::KEY_AUTH_DRIVER).into()), }; config.auth = Some(DecodedAuth { diff --git a/server/src/engine/fractal/test_utils.rs b/server/src/engine/fractal/test_utils.rs index 1d38d3bf..6445599a 100644 --- a/server/src/engine/fractal/test_utils.rs +++ b/server/src/engine/fractal/test_utils.rs @@ -46,26 +46,23 @@ use { pub struct TestGlobal { gns: GlobalNS, lp_queue: RwLock>>, - #[allow(unused)] max_delta_size: usize, - max_data_pressure: usize, } impl TestGlobal { - fn new(gns: GlobalNS, max_delta_size: usize) -> Self { + fn new(gns: GlobalNS) -> Self { Self { gns, lp_queue: RwLock::default(), - max_delta_size, - max_data_pressure: usize::MAX, + max_delta_size: usize::MAX, } } pub fn set_max_data_pressure(&mut self, max_data_pressure: usize) { - self.max_data_pressure = max_data_pressure; + self.max_delta_size = max_data_pressure; } /// Normally, model drivers are not loaded on startup because of shared global state. Calling this will attempt to load /// all model drivers - pub fn load_model_drivers(&self) -> RuntimeResult<()> { + fn load_model_drivers(&self) -> RuntimeResult<()> { let space_idx = self.gns.namespace().idx().read(); for (model_name, model) in self.gns.namespace().idx_models().read().iter() { let model_data = model.data(); @@ -106,7 +103,9 @@ impl TestGlobal { }, } .unwrap(); - Self::new(GlobalNS::new(data, FractalGNSDriver::new(driver)), 0) + let me = Self::new(GlobalNS::new(data, FractalGNSDriver::new(driver))); + me.load_model_drivers().unwrap(); + me } } @@ -134,7 +133,7 @@ impl GlobalInstanceLike for TestGlobal { self.lp_queue.write().push(task) } fn get_max_delta_size(&self) -> usize { - self.max_data_pressure + self.max_delta_size } fn purge_model_driver( &self, @@ -169,8 +168,22 @@ impl Drop for TestGlobal { fn drop(&mut self) { let mut txn_driver = self.gns.gns_driver().txn_driver.lock(); GNSDriver::close_driver(&mut txn_driver).unwrap(); - for (_, model_driver) in self.gns.namespace().idx_models().write().drain() { - model_driver.into_driver().close().unwrap(); + for (_, model) in self.gns.namespace().idx_models().write().drain() { + let delta_count = model + .data() + .delta_state() + .__fractal_take_full_from_data_delta(super::FractalToken::new()); + if delta_count != 0 { + let mut drv = model.driver().batch_driver().lock(); + drv.as_mut() + .unwrap() + .commit_with_ctx( + StdModelBatch::new(model.data(), delta_count), + BatchStats::new(), + ) + .unwrap(); + } + model.into_driver().close().unwrap(); } } } diff --git a/server/src/engine/storage/common/interface/fs.rs b/server/src/engine/storage/common/interface/fs.rs index 952c9151..58551822 100644 --- a/server/src/engine/storage/common/interface/fs.rs +++ b/server/src/engine/storage/common/interface/fs.rs @@ -42,12 +42,6 @@ use { pub struct FileSystem {} -impl Default for FileSystem { - fn default() -> Self { - Self::instance() - } -} - #[derive(Debug, PartialEq, Clone, Copy)] pub enum FSContext { Local, @@ -55,9 +49,6 @@ pub enum FSContext { } impl FileSystem { - pub fn instance() -> Self { - Self {} - } fn context() -> FSContext { local! { static CTX: FSContext = FSContext::Virtual; } local_ref!(CTX, |ctx| *ctx) @@ -308,7 +299,9 @@ impl FileWrite for AnyFile { fn fwrite(&mut self, buf: &[u8]) -> IoResult { match self { Self::Local(lf) => lf.fwrite(buf), - Self::Virtual(vf) => vf.get_mut(&mut VirtualFS::instance().write()).fwrite(buf), + Self::Virtual(vf) => VirtualFS::instance() + .read() + .with_file_mut(&vf.0, |f| f.fwrite(buf)), } } } @@ -318,9 +311,9 @@ impl FileRead for AnyFile { fn fread_exact(&mut self, buf: &mut [u8]) -> IoResult<()> { match self { Self::Local(lf) => lf.fread_exact(buf), - Self::Virtual(vf) => vf - .get_mut(&mut VirtualFS::instance().write()) - .fread_exact(buf), + Self::Virtual(vf) => VirtualFS::instance() + .read() + .with_file_mut(&vf.0, |f| f.fread_exact(buf)), } } } @@ -342,9 +335,9 @@ impl FileWriteExt for AnyFile { fn f_truncate(&mut self, new_size: u64) -> IoResult<()> { match self { Self::Local(lf) => lf.f_truncate(new_size), - Self::Virtual(vf) => vf - .get_mut(&mut VirtualFS::instance().write()) - .truncate(new_size), + Self::Virtual(vf) => VirtualFS::instance() + .read() + .with_file_mut(&vf.0, |f| f.truncate(new_size)), } } } @@ -354,21 +347,25 @@ impl FileExt for AnyFile { fn f_len(&self) -> IoResult { match self { Self::Local(lf) => lf.f_len(), - Self::Virtual(vf) => vf.get_ref(&VirtualFS::instance().read()).length(), + Self::Virtual(vf) => VirtualFS::instance() + .read() + .with_file(&vf.0, |f| f.length()), } } fn f_cursor(&mut self) -> IoResult { match self { Self::Local(lf) => lf.f_cursor(), - Self::Virtual(vf) => vf.get_ref(&VirtualFS::instance().read()).cursor(), + Self::Virtual(vf) => VirtualFS::instance() + .read() + .with_file(&vf.0, |f| f.cursor()), } } fn f_seek_start(&mut self, offset: u64) -> IoResult<()> { match self { Self::Local(lf) => lf.f_seek_start(offset), - Self::Virtual(vf) => vf - .get_mut(&mut VirtualFS::instance().write()) - .seek_from_start(offset), + Self::Virtual(vf) => VirtualFS::instance() + .read() + .with_file_mut(&vf.0, |f| f.seek_from_start(offset)), } } } diff --git a/server/src/engine/storage/common/interface/vfs.rs b/server/src/engine/storage/common/interface/vfs.rs index 393c1757..428933a4 100644 --- a/server/src/engine/storage/common/interface/vfs.rs +++ b/server/src/engine/storage/common/interface/vfs.rs @@ -59,7 +59,7 @@ pub struct VirtualFS { #[derive(Debug)] enum VNode { Dir(HashMap, Self>), - File(VFile), + File(RwLock), } #[derive(Debug)] @@ -84,15 +84,6 @@ pub enum FileOpen { #[derive(Debug)] pub struct VFileDescriptor(pub(super) Box); -impl VFileDescriptor { - pub(super) fn get_ref<'a>(&self, vfs: &'a VirtualFS) -> &'a VFile { - vfs.with_file(&self.0, |f| Ok(f)).unwrap() - } - pub(super) fn get_mut<'a>(&self, vfs: &'a mut VirtualFS) -> &'a mut VFile { - vfs.with_file_mut(&self.0, |f| Ok(f)).unwrap() - } -} - impl Drop for VFileDescriptor { fn drop(&mut self) { VirtualFS::instance() @@ -176,9 +167,6 @@ impl VFile { fn current(&self) -> &[u8] { &self.data[self.pos..] } - fn fw_write_all(&mut self, bytes: &[u8]) -> IoResult<()> { - self.fwrite(bytes).map(|_| ()) - } } impl VNode { @@ -224,7 +212,7 @@ impl VirtualFS { } Entry::Vacant(v) => { // no file exists, we can create this - v.insert(VNode::File(VFile::new(true, true, vec![], 0))); + v.insert(VNode::File(RwLock::new(VFile::new(true, true, vec![], 0)))); Ok(VFileDescriptor(fpath.into())) } } @@ -242,14 +230,14 @@ impl VirtualFS { // create new file let file = self.fs_fopen_or_create_rw(to)?; match file { - FileOpen::Created(c) => { - c.get_mut(self).fw_write_all(&data)?; - } - FileOpen::Existing(c) => { - let file = c.get_mut(self); - file.truncate(0)?; - file.fw_write_all(&data)?; - } + FileOpen::Created(c) => self.with_file_mut(&c.0, |f| Ok(f.data = data))?, + FileOpen::Existing(c) => self.with_file_mut(&c.0, |f| { + f.data = data; + f.pos = 0; + f.read = false; + f.write = false; + Ok(()) + })?, } // delete old file self.fs_remove_file(from) @@ -330,8 +318,9 @@ impl VirtualFS { let (target_file, components) = util::split_target_and_components(fpath); let target_dir = util::find_target_dir_mut(components, &mut self.root)?; match target_dir.entry(target_file.into()) { - Entry::Occupied(mut oe) => match oe.get_mut() { + Entry::Occupied(oe) => match oe.get() { VNode::File(f) => { + let mut f = f.write(); f.read = true; f.write = true; Ok(FileOpen::Existing(VFileDescriptor(fpath.into()))) @@ -339,33 +328,39 @@ impl VirtualFS { VNode::Dir(_) => return err::item_is_not_file(), }, Entry::Vacant(v) => { - v.insert(VNode::File(VFile::new(true, true, vec![], 0))); + v.insert(VNode::File(RwLock::new(VFile::new(true, true, vec![], 0)))); Ok(FileOpen::Created(VFileDescriptor(fpath.into()))) } } } - fn with_file_mut<'a, T>( - &'a mut self, + pub(super) fn with_file_mut( + &self, fpath: &str, - mut f: impl FnMut(&'a mut VFile) -> IoResult, + f: impl FnOnce(&mut VFile) -> IoResult, ) -> IoResult { let (target_file, components) = util::split_target_and_components(fpath); - let target_dir = util::find_target_dir_mut(components, &mut self.root)?; - match target_dir.get_mut(target_file) { - Some(VNode::File(file)) => f(file), + let target_dir = util::find_target_dir(components, &self.root)?; + match target_dir.get(target_file) { + Some(VNode::File(file)) => { + let mut file = file.write(); + f(&mut file) + } Some(VNode::Dir(_)) => return err::item_is_not_file(), None => return Err(Error::from(ErrorKind::NotFound).into()), } } - fn with_file<'a, T>( - &'a self, + pub(super) fn with_file( + &self, fpath: &str, - mut f: impl FnMut(&'a VFile) -> IoResult, + f: impl FnOnce(&VFile) -> IoResult, ) -> IoResult { let (target_file, components) = util::split_target_and_components(fpath); let target_dir = util::find_target_dir(components, &self.root)?; match target_dir.get(target_file) { - Some(VNode::File(file)) => f(file), + Some(VNode::File(file)) => { + let f_ = file.read(); + f(&f_) + } Some(VNode::Dir(_)) => return err::item_is_not_file(), None => return Err(Error::from(ErrorKind::NotFound).into()), } diff --git a/server/src/engine/storage/v2/impls/tests/model_driver.rs b/server/src/engine/storage/v2/impls/tests/model_driver.rs index 6eab6347..baa9b366 100644 --- a/server/src/engine/storage/v2/impls/tests/model_driver.rs +++ b/server/src/engine/storage/v2/impls/tests/model_driver.rs @@ -141,7 +141,6 @@ fn run_sample_inserts( // reopen and verify 100 times test_utils::multi_run(100, || { let global = TestGlobal::new_with_driver_id(log_name); - global.load_model_drivers().unwrap(); global .state() .namespace() @@ -203,7 +202,6 @@ fn run_sample_updates( for _ in 0..reopen_count { let mut global = TestGlobal::new_with_driver_id(log_name); global.set_max_data_pressure(changes_per_cycle); - global.load_model_drivers().unwrap(); let mut j = 0; for _ in 0..changes_per_cycle { let (username, pass) = &key_values[actual_position]; @@ -218,7 +216,6 @@ fn run_sample_updates( } { let global = TestGlobal::new_with_driver_id(log_name); - global.load_model_drivers().unwrap(); for (txn_id, (username, password)) in key_values .iter() .enumerate()