diff --git a/README.md b/README.md
index 716e5f6b..84417521 100644
--- a/README.md
+++ b/README.md
@@ -9,11 +9,9 @@ A modern NoSQL database, powered by BlueQL.
Skytable is a **modern NoSQL database** that focuses on **performance, flexibility and scalability**. Skytable
is a primarily in-memory, wide-column based database with support for additional data models* that
uses its own storage engine (structured like n-ary records with optimized delayed-durability transactions) and
-enables querying with its own query language *BlueQL* that is based on SQL which builds on SQL to improve
-security and flexibility.
+enables querying with its own query language *BlueQL* that builds on top of SQL to improve security and flexibility.
-Skytable is best-suited for applications that need to store large-scale data, need high-performance and low
-latencies.
+Skytable is best-suited for applications that need to store large-scale data, need high-performance and low latencies.
> **You can read more about Skytable's architecture, including information on the clustering and HA implementation that we're currently working on, and limitations [on this page](https://docs.skytable.io/architecture).**
diff --git a/server/src/engine/config.rs b/server/src/engine/config.rs
index cef98899..9c1e8ee5 100644
--- a/server/src/engine/config.rs
+++ b/server/src/engine/config.rs
@@ -525,24 +525,23 @@ fn arg_decode_auth(
src_args: &mut ParsedRawArgs,
config: &mut ModifyGuard,
) -> RuntimeResult<()> {
- let (Some(auth_driver), Some(mut root_key)) = (
- src_args.remove(CS::KEY_AUTH_DRIVER),
- src_args.remove(CS::KEY_AUTH_ROOT_PASSWORD),
- ) else {
+ let auth_driver = src_args.remove(CS::KEY_AUTH_DRIVER);
+ let Some(mut root_key) = src_args.remove(CS::KEY_AUTH_ROOT_PASSWORD) else {
return Err(ConfigError::with_src(
CS::SOURCE,
ConfigErrorKind::ErrorString(format!(
- "to enable auth, you must provide values for both {} and {}",
+ "to enable auth, you must provide values for {}",
CS::KEY_AUTH_DRIVER,
- CS::KEY_AUTH_ROOT_PASSWORD
)),
)
.into());
};
- argck_duplicate_values::(&auth_driver, CS::KEY_AUTH_DRIVER)?;
+ if let Some(ref adrv) = auth_driver {
+ argck_duplicate_values::(&adrv, CS::KEY_AUTH_DRIVER)?;
+ }
argck_duplicate_values::(&root_key, CS::KEY_AUTH_DRIVER)?;
- let auth_plugin = match auth_driver[0].as_str() {
- "pwd" => AuthDriver::Pwd,
+ let auth_plugin = match auth_driver.as_ref().map(|v| v[0].as_str()) {
+ Some("pwd") | None => AuthDriver::Pwd,
_ => return Err(CS::err_invalid_value_for(CS::KEY_AUTH_DRIVER).into()),
};
config.auth = Some(DecodedAuth {
diff --git a/server/src/engine/fractal/test_utils.rs b/server/src/engine/fractal/test_utils.rs
index 1d38d3bf..6445599a 100644
--- a/server/src/engine/fractal/test_utils.rs
+++ b/server/src/engine/fractal/test_utils.rs
@@ -46,26 +46,23 @@ use {
pub struct TestGlobal {
gns: GlobalNS,
lp_queue: RwLock>>,
- #[allow(unused)]
max_delta_size: usize,
- max_data_pressure: usize,
}
impl TestGlobal {
- fn new(gns: GlobalNS, max_delta_size: usize) -> Self {
+ fn new(gns: GlobalNS) -> Self {
Self {
gns,
lp_queue: RwLock::default(),
- max_delta_size,
- max_data_pressure: usize::MAX,
+ max_delta_size: usize::MAX,
}
}
pub fn set_max_data_pressure(&mut self, max_data_pressure: usize) {
- self.max_data_pressure = max_data_pressure;
+ self.max_delta_size = max_data_pressure;
}
/// Normally, model drivers are not loaded on startup because of shared global state. Calling this will attempt to load
/// all model drivers
- pub fn load_model_drivers(&self) -> RuntimeResult<()> {
+ fn load_model_drivers(&self) -> RuntimeResult<()> {
let space_idx = self.gns.namespace().idx().read();
for (model_name, model) in self.gns.namespace().idx_models().read().iter() {
let model_data = model.data();
@@ -106,7 +103,9 @@ impl TestGlobal {
},
}
.unwrap();
- Self::new(GlobalNS::new(data, FractalGNSDriver::new(driver)), 0)
+ let me = Self::new(GlobalNS::new(data, FractalGNSDriver::new(driver)));
+ me.load_model_drivers().unwrap();
+ me
}
}
@@ -134,7 +133,7 @@ impl GlobalInstanceLike for TestGlobal {
self.lp_queue.write().push(task)
}
fn get_max_delta_size(&self) -> usize {
- self.max_data_pressure
+ self.max_delta_size
}
fn purge_model_driver(
&self,
@@ -169,8 +168,22 @@ impl Drop for TestGlobal {
fn drop(&mut self) {
let mut txn_driver = self.gns.gns_driver().txn_driver.lock();
GNSDriver::close_driver(&mut txn_driver).unwrap();
- for (_, model_driver) in self.gns.namespace().idx_models().write().drain() {
- model_driver.into_driver().close().unwrap();
+ for (_, model) in self.gns.namespace().idx_models().write().drain() {
+ let delta_count = model
+ .data()
+ .delta_state()
+ .__fractal_take_full_from_data_delta(super::FractalToken::new());
+ if delta_count != 0 {
+ let mut drv = model.driver().batch_driver().lock();
+ drv.as_mut()
+ .unwrap()
+ .commit_with_ctx(
+ StdModelBatch::new(model.data(), delta_count),
+ BatchStats::new(),
+ )
+ .unwrap();
+ }
+ model.into_driver().close().unwrap();
}
}
}
diff --git a/server/src/engine/storage/common/interface/fs.rs b/server/src/engine/storage/common/interface/fs.rs
index 952c9151..58551822 100644
--- a/server/src/engine/storage/common/interface/fs.rs
+++ b/server/src/engine/storage/common/interface/fs.rs
@@ -42,12 +42,6 @@ use {
pub struct FileSystem {}
-impl Default for FileSystem {
- fn default() -> Self {
- Self::instance()
- }
-}
-
#[derive(Debug, PartialEq, Clone, Copy)]
pub enum FSContext {
Local,
@@ -55,9 +49,6 @@ pub enum FSContext {
}
impl FileSystem {
- pub fn instance() -> Self {
- Self {}
- }
fn context() -> FSContext {
local! { static CTX: FSContext = FSContext::Virtual; }
local_ref!(CTX, |ctx| *ctx)
@@ -308,7 +299,9 @@ impl FileWrite for AnyFile {
fn fwrite(&mut self, buf: &[u8]) -> IoResult {
match self {
Self::Local(lf) => lf.fwrite(buf),
- Self::Virtual(vf) => vf.get_mut(&mut VirtualFS::instance().write()).fwrite(buf),
+ Self::Virtual(vf) => VirtualFS::instance()
+ .read()
+ .with_file_mut(&vf.0, |f| f.fwrite(buf)),
}
}
}
@@ -318,9 +311,9 @@ impl FileRead for AnyFile {
fn fread_exact(&mut self, buf: &mut [u8]) -> IoResult<()> {
match self {
Self::Local(lf) => lf.fread_exact(buf),
- Self::Virtual(vf) => vf
- .get_mut(&mut VirtualFS::instance().write())
- .fread_exact(buf),
+ Self::Virtual(vf) => VirtualFS::instance()
+ .read()
+ .with_file_mut(&vf.0, |f| f.fread_exact(buf)),
}
}
}
@@ -342,9 +335,9 @@ impl FileWriteExt for AnyFile {
fn f_truncate(&mut self, new_size: u64) -> IoResult<()> {
match self {
Self::Local(lf) => lf.f_truncate(new_size),
- Self::Virtual(vf) => vf
- .get_mut(&mut VirtualFS::instance().write())
- .truncate(new_size),
+ Self::Virtual(vf) => VirtualFS::instance()
+ .read()
+ .with_file_mut(&vf.0, |f| f.truncate(new_size)),
}
}
}
@@ -354,21 +347,25 @@ impl FileExt for AnyFile {
fn f_len(&self) -> IoResult {
match self {
Self::Local(lf) => lf.f_len(),
- Self::Virtual(vf) => vf.get_ref(&VirtualFS::instance().read()).length(),
+ Self::Virtual(vf) => VirtualFS::instance()
+ .read()
+ .with_file(&vf.0, |f| f.length()),
}
}
fn f_cursor(&mut self) -> IoResult {
match self {
Self::Local(lf) => lf.f_cursor(),
- Self::Virtual(vf) => vf.get_ref(&VirtualFS::instance().read()).cursor(),
+ Self::Virtual(vf) => VirtualFS::instance()
+ .read()
+ .with_file(&vf.0, |f| f.cursor()),
}
}
fn f_seek_start(&mut self, offset: u64) -> IoResult<()> {
match self {
Self::Local(lf) => lf.f_seek_start(offset),
- Self::Virtual(vf) => vf
- .get_mut(&mut VirtualFS::instance().write())
- .seek_from_start(offset),
+ Self::Virtual(vf) => VirtualFS::instance()
+ .read()
+ .with_file_mut(&vf.0, |f| f.seek_from_start(offset)),
}
}
}
diff --git a/server/src/engine/storage/common/interface/vfs.rs b/server/src/engine/storage/common/interface/vfs.rs
index 393c1757..428933a4 100644
--- a/server/src/engine/storage/common/interface/vfs.rs
+++ b/server/src/engine/storage/common/interface/vfs.rs
@@ -59,7 +59,7 @@ pub struct VirtualFS {
#[derive(Debug)]
enum VNode {
Dir(HashMap, Self>),
- File(VFile),
+ File(RwLock),
}
#[derive(Debug)]
@@ -84,15 +84,6 @@ pub enum FileOpen {
#[derive(Debug)]
pub struct VFileDescriptor(pub(super) Box);
-impl VFileDescriptor {
- pub(super) fn get_ref<'a>(&self, vfs: &'a VirtualFS) -> &'a VFile {
- vfs.with_file(&self.0, |f| Ok(f)).unwrap()
- }
- pub(super) fn get_mut<'a>(&self, vfs: &'a mut VirtualFS) -> &'a mut VFile {
- vfs.with_file_mut(&self.0, |f| Ok(f)).unwrap()
- }
-}
-
impl Drop for VFileDescriptor {
fn drop(&mut self) {
VirtualFS::instance()
@@ -176,9 +167,6 @@ impl VFile {
fn current(&self) -> &[u8] {
&self.data[self.pos..]
}
- fn fw_write_all(&mut self, bytes: &[u8]) -> IoResult<()> {
- self.fwrite(bytes).map(|_| ())
- }
}
impl VNode {
@@ -224,7 +212,7 @@ impl VirtualFS {
}
Entry::Vacant(v) => {
// no file exists, we can create this
- v.insert(VNode::File(VFile::new(true, true, vec![], 0)));
+ v.insert(VNode::File(RwLock::new(VFile::new(true, true, vec![], 0))));
Ok(VFileDescriptor(fpath.into()))
}
}
@@ -242,14 +230,14 @@ impl VirtualFS {
// create new file
let file = self.fs_fopen_or_create_rw(to)?;
match file {
- FileOpen::Created(c) => {
- c.get_mut(self).fw_write_all(&data)?;
- }
- FileOpen::Existing(c) => {
- let file = c.get_mut(self);
- file.truncate(0)?;
- file.fw_write_all(&data)?;
- }
+ FileOpen::Created(c) => self.with_file_mut(&c.0, |f| Ok(f.data = data))?,
+ FileOpen::Existing(c) => self.with_file_mut(&c.0, |f| {
+ f.data = data;
+ f.pos = 0;
+ f.read = false;
+ f.write = false;
+ Ok(())
+ })?,
}
// delete old file
self.fs_remove_file(from)
@@ -330,8 +318,9 @@ impl VirtualFS {
let (target_file, components) = util::split_target_and_components(fpath);
let target_dir = util::find_target_dir_mut(components, &mut self.root)?;
match target_dir.entry(target_file.into()) {
- Entry::Occupied(mut oe) => match oe.get_mut() {
+ Entry::Occupied(oe) => match oe.get() {
VNode::File(f) => {
+ let mut f = f.write();
f.read = true;
f.write = true;
Ok(FileOpen::Existing(VFileDescriptor(fpath.into())))
@@ -339,33 +328,39 @@ impl VirtualFS {
VNode::Dir(_) => return err::item_is_not_file(),
},
Entry::Vacant(v) => {
- v.insert(VNode::File(VFile::new(true, true, vec![], 0)));
+ v.insert(VNode::File(RwLock::new(VFile::new(true, true, vec![], 0))));
Ok(FileOpen::Created(VFileDescriptor(fpath.into())))
}
}
}
- fn with_file_mut<'a, T>(
- &'a mut self,
+ pub(super) fn with_file_mut(
+ &self,
fpath: &str,
- mut f: impl FnMut(&'a mut VFile) -> IoResult,
+ f: impl FnOnce(&mut VFile) -> IoResult,
) -> IoResult {
let (target_file, components) = util::split_target_and_components(fpath);
- let target_dir = util::find_target_dir_mut(components, &mut self.root)?;
- match target_dir.get_mut(target_file) {
- Some(VNode::File(file)) => f(file),
+ let target_dir = util::find_target_dir(components, &self.root)?;
+ match target_dir.get(target_file) {
+ Some(VNode::File(file)) => {
+ let mut file = file.write();
+ f(&mut file)
+ }
Some(VNode::Dir(_)) => return err::item_is_not_file(),
None => return Err(Error::from(ErrorKind::NotFound).into()),
}
}
- fn with_file<'a, T>(
- &'a self,
+ pub(super) fn with_file(
+ &self,
fpath: &str,
- mut f: impl FnMut(&'a VFile) -> IoResult,
+ f: impl FnOnce(&VFile) -> IoResult,
) -> IoResult {
let (target_file, components) = util::split_target_and_components(fpath);
let target_dir = util::find_target_dir(components, &self.root)?;
match target_dir.get(target_file) {
- Some(VNode::File(file)) => f(file),
+ Some(VNode::File(file)) => {
+ let f_ = file.read();
+ f(&f_)
+ }
Some(VNode::Dir(_)) => return err::item_is_not_file(),
None => return Err(Error::from(ErrorKind::NotFound).into()),
}
diff --git a/server/src/engine/storage/v2/impls/tests/model_driver.rs b/server/src/engine/storage/v2/impls/tests/model_driver.rs
index 6eab6347..baa9b366 100644
--- a/server/src/engine/storage/v2/impls/tests/model_driver.rs
+++ b/server/src/engine/storage/v2/impls/tests/model_driver.rs
@@ -141,7 +141,6 @@ fn run_sample_inserts(
// reopen and verify 100 times
test_utils::multi_run(100, || {
let global = TestGlobal::new_with_driver_id(log_name);
- global.load_model_drivers().unwrap();
global
.state()
.namespace()
@@ -203,7 +202,6 @@ fn run_sample_updates(
for _ in 0..reopen_count {
let mut global = TestGlobal::new_with_driver_id(log_name);
global.set_max_data_pressure(changes_per_cycle);
- global.load_model_drivers().unwrap();
let mut j = 0;
for _ in 0..changes_per_cycle {
let (username, pass) = &key_values[actual_position];
@@ -218,7 +216,6 @@ fn run_sample_updates(
}
{
let global = TestGlobal::new_with_driver_id(log_name);
- global.load_model_drivers().unwrap();
for (txn_id, (username, password)) in key_values
.iter()
.enumerate()