diff --git a/server/src/engine/core/model/delta.rs b/server/src/engine/core/model/delta.rs index 78b532c1..5365f1c4 100644 --- a/server/src/engine/core/model/delta.rs +++ b/server/src/engine/core/model/delta.rs @@ -26,7 +26,9 @@ use { super::{Fields, Model}, - crate::engine::{core::index::Row, sync::atm::Guard, sync::queue::Queue}, + crate::engine::{ + core::index::Row, fractal::FractalToken, sync::atm::Guard, sync::queue::Queue, + }, parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard}, std::{ collections::btree_map::{BTreeMap, Range}, @@ -259,13 +261,23 @@ impl DeltaState { } } +// fractal +impl DeltaState { + pub fn __fractal_take_from_data_delta(&self, cnt: usize, _token: FractalToken) { + let _ = self.data_deltas_size.fetch_sub(cnt, Ordering::Release); + } + pub fn __fractal_take_full_from_data_delta(&self, _token: FractalToken) -> usize { + self.data_deltas_size.swap(0, Ordering::AcqRel) + } +} + #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)] pub struct DeltaVersion(u64); impl DeltaVersion { pub const fn genesis() -> Self { Self(0) } - pub const fn __new(v: u64) -> Self { + pub const fn __new(v: u64) -> Self { Self(v) } fn step(&self) -> Self { diff --git a/server/src/engine/data/uuid.rs b/server/src/engine/data/uuid.rs index 154d51fc..819c536c 100644 --- a/server/src/engine/data/uuid.rs +++ b/server/src/engine/data/uuid.rs @@ -24,6 +24,8 @@ * */ +use core::fmt; + #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)] pub struct Uuid { data: uuid::Uuid, @@ -48,8 +50,8 @@ impl Uuid { } } -impl ToString for Uuid { - fn to_string(&self) -> String { - self.data.to_string() +impl fmt::Display for Uuid { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.data.fmt(f) } } diff --git a/server/src/engine/error.rs b/server/src/engine/error.rs index a302fab8..542b8785 100644 --- a/server/src/engine/error.rs +++ b/server/src/engine/error.rs @@ -24,7 +24,7 @@ * */ -use super::txn::TransactionError; +use super::{storage::v1::SDSSError, txn::TransactionError}; pub type LangResult = Result; pub type LexResult = Result; @@ -81,8 +81,9 @@ pub enum LangError { StmtUnknownDrop, } -#[derive(Debug, PartialEq, Eq, Clone, Copy)] +#[derive(Debug)] #[repr(u8)] +#[cfg_attr(test, derive(PartialEq))] /// Executor errors pub enum DatabaseError { // sys @@ -136,6 +137,13 @@ pub enum DatabaseError { DmlConstraintViolationFieldTypedef, ServerError, TransactionalError, + StorageSubsystemErr(SDSSError), +} + +impl From for DatabaseError { + fn from(e: SDSSError) -> Self { + Self::StorageSubsystemErr(e) + } } impl From for DatabaseError { diff --git a/server/src/engine/fractal/config.rs b/server/src/engine/fractal/config.rs new file mode 100644 index 00000000..5b898ec0 --- /dev/null +++ b/server/src/engine/fractal/config.rs @@ -0,0 +1,47 @@ +/* + * Created on Sun Sep 10 2023 + * + * This file is a part of Skytable + * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source + * NoSQL database written by Sayan Nandan ("the Author") with the + * vision to provide flexibility in data modelling without compromising + * on performance, queryability or scalability. + * + * Copyright (c) 2023, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +use crate::engine::storage::v1::header_meta::HostRunMode; + +pub struct ServerConfig { + host_settings_version: u32, + host_run_mode: HostRunMode, + host_startup_counter: u64, +} + +impl ServerConfig { + pub fn new( + host_settings_version: u32, + host_run_mode: HostRunMode, + host_startup_counter: u64, + ) -> Self { + Self { + host_settings_version, + host_run_mode, + host_startup_counter, + } + } +} diff --git a/server/src/engine/fractal/drivers.rs b/server/src/engine/fractal/drivers.rs new file mode 100644 index 00000000..8bd42c9d --- /dev/null +++ b/server/src/engine/fractal/drivers.rs @@ -0,0 +1,88 @@ +/* + * Created on Sun Sep 10 2023 + * + * This file is a part of Skytable + * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source + * NoSQL database written by Sayan Nandan ("the Author") with the + * vision to provide flexibility in data modelling without compromising + * on performance, queryability or scalability. + * + * Copyright (c) 2023, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +use { + super::util, + crate::engine::{ + storage::v1::{data_batch::DataBatchPersistDriver, LocalFS}, + txn::gns::GNSTransactionDriverAnyFS, + }, + parking_lot::Mutex, + std::sync::Arc, +}; + +/// GNS driver +pub(super) struct FractalGNSDriver { + status: util::Status, + txn_driver: Mutex>, +} + +impl FractalGNSDriver { + pub(super) fn new(txn_driver: GNSTransactionDriverAnyFS) -> Self { + Self { + status: util::Status::new_okay(), + txn_driver: Mutex::new(txn_driver), + } + } +} + +/// Model driver +pub struct FractalModelDriver { + hooks: Arc, + batch_driver: Mutex>, +} + +impl FractalModelDriver { + /// Initialize a model driver with default settings + pub fn init(batch_driver: DataBatchPersistDriver) -> Self { + Self { + hooks: Arc::new(FractalModelHooks::new()), + batch_driver: Mutex::new(batch_driver), + } + } + /// Returns a reference to the batch persist driver + pub fn batch_driver(&self) -> &Mutex> { + &self.batch_driver + } +} + +/// Model hooks +#[derive(Debug)] +pub struct FractalModelHooks { + status: util::Status, +} + +impl FractalModelHooks { + #[cfg(test)] + pub fn test() -> Self { + Self::new() + } + fn new() -> Self { + Self { + status: util::Status::new_okay(), + } + } +} diff --git a/server/src/engine/fractal/mgr.rs b/server/src/engine/fractal/mgr.rs new file mode 100644 index 00000000..51dc80ac --- /dev/null +++ b/server/src/engine/fractal/mgr.rs @@ -0,0 +1,303 @@ +/* + * Created on Sat Sep 09 2023 + * + * This file is a part of Skytable + * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source + * NoSQL database written by Sayan Nandan ("the Author") with the + * vision to provide flexibility in data modelling without compromising + * on performance, queryability or scalability. + * + * Copyright (c) 2023, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +use { + super::ModelUniqueID, + crate::{ + engine::core::model::{delta::DataDelta, Model}, + util::os, + }, + std::path::PathBuf, + tokio::{ + fs, + sync::mpsc::{UnboundedReceiver, UnboundedSender}, + task::JoinHandle, + }, +}; + +/// A task for the [`FractalMgr`] to perform +pub struct Task { + threshold: usize, + task: T, +} + +impl Task { + const THRESHOLD: usize = 10; + /// Create a new task with the default threshold + pub fn new(task: T) -> Self { + Self::with_threshold(task, Self::THRESHOLD) + } + /// Create a task with the given threshold + fn with_threshold(task: T, threshold: usize) -> Self { + Self { threshold, task } + } +} + +/// A general task +pub enum GenericTask { + /// Delete a single file + DeleteFile(PathBuf), + /// Delete a directory (and all its children) + DeleteDirAll(PathBuf), +} + +/// A critical task +pub enum CriticalTask { + /// Write a new data batch + WriteBatch(ModelUniqueID, usize), +} + +/// The task manager +pub(super) struct FractalMgr { + hp_dispatcher: UnboundedSender>, + general_dispatcher: UnboundedSender>, + runtime_stats: FractalRTStat, +} + +pub(super) struct FractalRTStat { + mem_free_bytes: u64, + per_mdl_delta_max_size: usize, +} + +impl FractalRTStat { + fn init(model_cnt: usize) -> Self { + let mem_free_bytes = os::free_memory_in_bytes(); + let allowed_delta_limit = mem_free_bytes as f64 * 0.02; + let per_model_limit = allowed_delta_limit / model_cnt.max(1) as f64; + Self { + mem_free_bytes, + per_mdl_delta_max_size: per_model_limit as usize / sizeof!(DataDelta), + } + } + pub(super) fn mem_free_bytes(&self) -> u64 { + self.mem_free_bytes + } + pub(super) fn per_mdl_delta_max_size(&self) -> usize { + self.per_mdl_delta_max_size + } +} + +impl FractalMgr { + pub(super) fn new( + hp_dispatcher: UnboundedSender>, + general_dispatcher: UnboundedSender>, + model_count: usize, + ) -> Self { + Self { + hp_dispatcher, + general_dispatcher, + runtime_stats: FractalRTStat::init(model_count), + } + } + pub fn get_rt_stat(&self) -> &FractalRTStat { + &self.runtime_stats + } + /// Add a high priority task to the queue + /// + /// ## Panics + /// + /// This will panic if the high priority executor has crashed or exited + pub fn post_high_priority(&self, task: Task) { + self.hp_dispatcher.send(task).unwrap() + } + /// Add a low priority task to the queue + /// + /// ## Panics + /// + /// This will panic if the low priority executor has crashed or exited + pub fn post_low_priority(&self, task: Task) { + self.general_dispatcher.send(task).unwrap() + } +} + +/// Handles to all the services that fractal needs. These are spawned on the default runtime +pub struct FractalServiceHandles { + pub hp_handle: JoinHandle<()>, + pub lp_handle: JoinHandle<()>, +} + +impl FractalMgr { + /// Start all background services, and return their handles + pub(super) fn start_all( + global: super::Global, + lp_receiver: UnboundedReceiver>, + hp_receiver: UnboundedReceiver>, + ) -> FractalServiceHandles { + let fractal_mgr = global.get_state().fractal_mgr(); + let hp_handle = tokio::spawn(async move { + FractalMgr::hp_executor_svc(fractal_mgr, global, hp_receiver).await + }); + let lp_handle = tokio::spawn(async move { + FractalMgr::general_executor_svc(fractal_mgr, global, lp_receiver).await + }); + FractalServiceHandles { + hp_handle, + lp_handle, + } + } +} + +// services +impl FractalMgr { + const GENERAL_EXECUTOR_WINDOW: u64 = 5 * 60; + /// The high priority executor service runs in the background to take care of high priority tasks and take any + /// appropriate action. It will exclusively own the high priority queue since it is the only broker that is + /// allowed to perform HP tasks + pub async fn hp_executor_svc( + &'static self, + global: super::Global, + mut receiver: UnboundedReceiver>, + ) { + loop { + let Some(Task { threshold, task }) = receiver.recv().await else { + return; // all handles closed; nothing left to do + }; + // TODO(@ohsayan): check threshold and update hooks + match task { + CriticalTask::WriteBatch(model_id, observed_size) => { + let mdl_drivers = global.get_state().get_mdl_drivers().read(); + let Some(mdl_driver) = mdl_drivers.get(&model_id) else { + // because we maximize throughput, the model driver may have been already removed but this task + // was way behind in the queue + continue; + }; + let res = global.namespace().with_model( + (model_id.space(), model_id.model()), + |model| { + if model.get_uuid() != model_id.uuid() { + // once again, throughput maximization will lead to, in extremely rare cases, this + // branch returning. but it is okay + return Ok(()); + } + // mark that we're taking these deltas + model.delta_state().__fractal_take_from_data_delta( + observed_size, + super::FractalToken::new(), + ); + Self::try_write_model_data_batch(model, observed_size, mdl_driver) + }, + ); + match res { + Ok(()) => {} + Err(_) => { + log::error!( + "Error writing data batch for model {}. Retrying...", + model_id.uuid() + ); + // enqueue again for retrying + self.hp_dispatcher + .send(Task::with_threshold( + CriticalTask::WriteBatch(model_id, observed_size), + threshold - 1, + )) + .unwrap(); + } + } + } + } + } + } + /// The general priority task or simply the general queue takes of care of low priority and other standard priority + /// tasks (such as those running on a schedule). A low priority task can be promoted to a high priority task, and the + /// discretion of the GP executor. Similarly, the executor owns the general purpose task queue since it is the sole broker + /// for such tasks + pub async fn general_executor_svc( + &'static self, + global: super::Global, + mut lpq: UnboundedReceiver>, + ) { + loop { + tokio::select! { + _ = tokio::time::sleep(std::time::Duration::from_secs(Self::GENERAL_EXECUTOR_WINDOW)) => { + let mdl_drivers = global.get_state().get_mdl_drivers().read(); + for (model_id, driver) in mdl_drivers.iter() { + let mut observed_len = 0; + let res = global.namespace().with_model((model_id.space(), model_id.model()), |model| { + if model.get_uuid() != model_id.uuid() { + // once again, throughput maximization will lead to, in extremely rare cases, this + // branch returning. but it is okay + return Ok(()); + } + // mark that we're taking these deltas + observed_len = model.delta_state().__fractal_take_full_from_data_delta(super::FractalToken::new()); + Self::try_write_model_data_batch(model, observed_len, driver) + }); + match res { + Ok(()) => {} + Err(_) => { + // this failure is *not* good, so we want to promote this to a critical task + self.hp_dispatcher.send(Task::new(CriticalTask::WriteBatch(model_id.clone(), observed_len))).unwrap() + } + } + } + } + task = lpq.recv() => { + let Some(Task { threshold, task }) = task else { + return; + }; + // TODO(@ohsayan): threshold + match task { + GenericTask::DeleteFile(f) => { + if let Err(_) = fs::remove_file(&f).await { + self.general_dispatcher.send( + Task::with_threshold(GenericTask::DeleteFile(f), threshold - 1) + ).unwrap(); + } + } + GenericTask::DeleteDirAll(dir) => { + if let Err(_) = fs::remove_dir_all(&dir).await { + self.general_dispatcher.send( + Task::with_threshold(GenericTask::DeleteDirAll(dir), threshold - 1) + ).unwrap(); + } + } + } + } + } + } + } +} + +// util +impl FractalMgr { + /// Attempt to write a model data batch with the observed size. + /// + /// The zero check is essential + fn try_write_model_data_batch( + model: &Model, + observed_size: usize, + mdl_driver: &super::FractalModelDriver, + ) -> Result<(), crate::engine::error::DatabaseError> { + if observed_size == 0 { + // no changes, all good + return Ok(()); + } + // try flushing the batch + let mut batch_driver = mdl_driver.batch_driver().lock(); + batch_driver.write_new_batch(model, observed_size)?; + Ok(()) + } +} diff --git a/server/src/engine/fractal/mod.rs b/server/src/engine/fractal/mod.rs new file mode 100644 index 00000000..5d6bb0a6 --- /dev/null +++ b/server/src/engine/fractal/mod.rs @@ -0,0 +1,204 @@ +/* + * Created on Sat Sep 09 2023 + * + * This file is a part of Skytable + * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source + * NoSQL database written by Sayan Nandan ("the Author") with the + * vision to provide flexibility in data modelling without compromising + * on performance, queryability or scalability. + * + * Copyright (c) 2023, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +use { + super::{ + core::GlobalNS, data::uuid::Uuid, storage::v1::LocalFS, txn::gns::GNSTransactionDriverAnyFS, + }, + parking_lot::RwLock, + std::{collections::HashMap, mem::MaybeUninit}, + tokio::sync::mpsc::unbounded_channel, +}; + +mod config; +mod drivers; +mod mgr; +mod util; +pub use { + config::ServerConfig, + drivers::FractalModelDriver, + mgr::{CriticalTask, GenericTask, Task}, + util::FractalToken, +}; + +pub type ModelDrivers = HashMap; + +static mut GLOBAL: MaybeUninit = MaybeUninit::uninit(); + +/* + global state init +*/ + +/// Returned by [`enable_and_start_all`]. This contains a [`Global`] handle that can be used to easily access global +/// data +pub struct GlobalStateStart { + pub global: Global, + pub mgr_handles: mgr::FractalServiceHandles, +} + +/// Enable all drivers and start all engines +/// +/// ## Safety +/// +/// Must be called iff this is the only thread calling it +pub unsafe fn enable_and_start_all( + gns: GlobalNS, + config: config::ServerConfig, + gns_driver: GNSTransactionDriverAnyFS, + model_drivers: ModelDrivers, +) -> GlobalStateStart { + let model_cnt_on_boot = model_drivers.len(); + let gns_driver = drivers::FractalGNSDriver::new(gns_driver); + let mdl_driver = RwLock::new(model_drivers); + let (hp_sender, hp_recv) = unbounded_channel(); + let (lp_sender, lp_recv) = unbounded_channel(); + let global_state = GlobalState::new( + gns, + gns_driver, + mdl_driver, + mgr::FractalMgr::new(hp_sender, lp_sender, model_cnt_on_boot), + config, + ); + GLOBAL = MaybeUninit::new(global_state); + let token = Global::new(); + GlobalStateStart { + global: token, + mgr_handles: mgr::FractalMgr::start_all(token, lp_recv, hp_recv), + } +} + +/* + global access +*/ + +#[derive(Debug, Clone, Copy)] +/// A handle to the global state +pub struct Global(()); + +impl Global { + fn new() -> Self { + Self(()) + } + pub(self) fn get_state(&self) -> &'static GlobalState { + unsafe { GLOBAL.assume_init_ref() } + } + /// Returns a handle to the [`GlobalNS`] + pub fn namespace(&self) -> &'static GlobalNS { + &unsafe { GLOBAL.assume_init_ref() }.gns + } + /// Post an urgent task + pub fn post_high_priority_task(&self, task: Task) { + self.get_state().fractal_mgr().post_high_priority(task) + } + /// Post a task with normal priority + /// + /// NB: It is not guaranteed that the task will remain as a low priority task because the scheduler can choose + /// to promote the task to a high priority task, if it deems necessary. + pub fn post_standard_priority_task(&self, task: Task) { + self.get_state().fractal_mgr().post_low_priority(task) + } + /// Returns the maximum size a model's delta size can hit before it should immediately issue a batch write request + /// to avoid memory pressure + pub fn get_max_delta_size(&self) -> usize { + self.get_state() + .fractal_mgr() + .get_rt_stat() + .per_mdl_delta_max_size() + } +} + +/* + global state +*/ + +/// The global state +struct GlobalState { + gns: GlobalNS, + gns_driver: drivers::FractalGNSDriver, + mdl_driver: RwLock, + task_mgr: mgr::FractalMgr, + config: config::ServerConfig, +} + +impl GlobalState { + fn new( + gns: GlobalNS, + gns_driver: drivers::FractalGNSDriver, + mdl_driver: RwLock, + task_mgr: mgr::FractalMgr, + config: config::ServerConfig, + ) -> Self { + Self { + gns, + gns_driver, + mdl_driver, + task_mgr, + config, + } + } + pub(self) fn get_mdl_drivers(&self) -> &RwLock { + &self.mdl_driver + } + pub(self) fn fractal_mgr(&self) -> &mgr::FractalMgr { + &self.task_mgr + } +} + +// these impls are completely fine +unsafe impl Send for GlobalState {} +unsafe impl Sync for GlobalState {} + +/// An unique signature that identifies a model, and only that model (guaranteed by the OS's random source) +// NB(@ohsayan): if there are collisions, which I absolutely do not expect any instances of, pool in the space's UUID +#[derive(Debug, PartialEq, Eq, Hash, Clone)] +pub struct ModelUniqueID { + space: Box, + model: Box, + uuid: Uuid, +} + +impl ModelUniqueID { + /// Create a new unique model ID + pub fn new(space: &str, model: &str, uuid: Uuid) -> Self { + Self { + space: space.into(), + model: model.into(), + uuid, + } + } + /// Returns the space name + pub fn space(&self) -> &str { + self.space.as_ref() + } + /// Returns the model name + pub fn model(&self) -> &str { + self.model.as_ref() + } + /// Returns the uuid + pub fn uuid(&self) -> Uuid { + self.uuid + } +} diff --git a/server/src/engine/fractal/util.rs b/server/src/engine/fractal/util.rs new file mode 100644 index 00000000..c3bb986b --- /dev/null +++ b/server/src/engine/fractal/util.rs @@ -0,0 +1,78 @@ +/* + * Created on Sat Sep 09 2023 + * + * This file is a part of Skytable + * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source + * NoSQL database written by Sayan Nandan ("the Author") with the + * vision to provide flexibility in data modelling without compromising + * on performance, queryability or scalability. + * + * Copyright (c) 2023, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +use std::sync::atomic::{AtomicBool, Ordering}; + +#[derive(Debug)] +pub struct Status { + okay: AtomicBool, +} + +impl Status { + pub const fn new_okay() -> Self { + Self::new(true) + } + pub const fn new_iffy() -> Self { + Self::new(false) + } + const fn new(v: bool) -> Self { + Self { + okay: AtomicBool::new(v), + } + } +} + +impl Status { + pub fn is_iffy(&self) -> bool { + !self._get() + } + pub fn is_healthy(&self) -> bool { + self._get() + } + fn _get(&self) -> bool { + self.okay.load(Ordering::Acquire) + } +} + +impl Status { + pub(super) fn set_okay(&self) { + self._set(true) + } + pub(super) fn set_iffy(&self) { + self._set(false) + } + fn _set(&self, v: bool) { + self.okay.store(v, Ordering::Release) + } +} + +/// A special token for fractal calls +pub struct FractalToken(()); +impl FractalToken { + pub(super) fn new() -> Self { + Self(()) + } +} diff --git a/server/src/engine/mod.rs b/server/src/engine/mod.rs index 7bc029f4..4d43a88f 100644 --- a/server/src/engine/mod.rs +++ b/server/src/engine/mod.rs @@ -31,6 +31,7 @@ mod macros; mod core; mod data; mod error; +mod fractal; mod idx; mod mem; mod ql; diff --git a/server/src/engine/ql/ast/mod.rs b/server/src/engine/ql/ast/mod.rs index 9d27d045..0a306411 100644 --- a/server/src/engine/ql/ast/mod.rs +++ b/server/src/engine/ql/ast/mod.rs @@ -41,12 +41,11 @@ use { }, util::{compiler, MaybeInit}, }, - core::cmp, }; #[inline(always)] pub fn minidx(src: &[T], index: usize) -> usize { - cmp::min(src.len() - 1, index) + (src.len() - 1).min(index) } #[derive(Debug, PartialEq)] diff --git a/server/src/engine/ql/dml/ins.rs b/server/src/engine/ql/dml/ins.rs index 608ccbed..fabf4b47 100644 --- a/server/src/engine/ql/dml/ins.rs +++ b/server/src/engine/ql/dml/ins.rs @@ -36,7 +36,6 @@ use { }, util::{compiler, MaybeInit}, }, - core::cmp, std::{ collections::HashMap, time::{Duration, SystemTime, UNIX_EPOCH}, @@ -114,7 +113,7 @@ fn hashp(key: &[u8]) -> u32 { fn ldfunc(func: Ident<'_>) -> Option { let func = func.as_bytes(); let ph = hashp(func) as usize; - let min = cmp::min(ph, PRODUCER_F.len() - 1); + let min = ph.min(PRODUCER_F.len() - 1); let data = PRODUCER_F[min]; if data.0 == func { Some(data.1) diff --git a/server/src/engine/ql/lex/mod.rs b/server/src/engine/ql/lex/mod.rs index b376631f..839d795e 100644 --- a/server/src/engine/ql/lex/mod.rs +++ b/server/src/engine/ql/lex/mod.rs @@ -38,7 +38,7 @@ use { }, util::compiler, }, - core::{cmp, fmt, ops::BitOr, slice, str}, + core::{fmt, ops::BitOr, slice, str}, }; pub use self::raw::{Ident, Keyword, Symbol, Token}; @@ -484,7 +484,7 @@ impl<'a> SafeQueryData<'a> { while src.len() >= 3 && okay { let tc = src[0]; okay &= tc <= nonpadded_offset; - let mx = cmp::min(ecc_offset, tc as usize); + let mx = ecc_offset.min(tc as usize); let mut i_ = 1; okay &= LITIR_TF[mx](&src[1..], &mut i_, &mut data); src = &src[i_..]; @@ -508,7 +508,7 @@ impl<'b> SafeQueryData<'b> { let src = &src[i..]; // find payload *flag &= src.len() >= payload_len; - let mx_extract = cmp::min(payload_len, src.len()); + let mx_extract = payload_len.min(src.len()); // incr cursor i += mx_extract; *cnt += i; @@ -534,7 +534,7 @@ impl<'b> SafeQueryData<'b> { #[inline(always)] pub(super) fn bool<'a>(src: Slice<'a>, cnt: &mut usize, data: &mut Vec>) -> bool { // `true\n` or `false\n` - let mx = cmp::min(6, src.len()); + let mx = 6.min(src.len()); let slice = &src[..mx]; let v_true = slice.starts_with(b"true\n"); let v_false = slice.starts_with(b"false\n"); diff --git a/server/src/engine/storage/v1/batch_jrnl/mod.rs b/server/src/engine/storage/v1/batch_jrnl/mod.rs index 68b32d4b..0632e27e 100644 --- a/server/src/engine/storage/v1/batch_jrnl/mod.rs +++ b/server/src/engine/storage/v1/batch_jrnl/mod.rs @@ -43,3 +43,45 @@ const RECOVERY_THRESHOLD: usize = 10; #[cfg(test)] pub(super) use restore::{DecodedBatchEvent, DecodedBatchEventKind, NormalBatch}; pub use {persist::DataBatchPersistDriver, restore::DataBatchRestoreDriver}; + +use { + super::{ + header_meta, + rw::{FileOpen, SDSSFileIO}, + RawFSInterface, SDSSResult, + }, + crate::engine::core::model::Model, +}; + +const LOG_SPECIFIER_VERSION: header_meta::FileSpecifierVersion = + header_meta::FileSpecifierVersion::__new(0); + +pub fn open_or_reinit( + name: &str, + model: &Model, + host_setting_version: u32, + host_run_mode: header_meta::HostRunMode, + host_startup_counter: u64, +) -> SDSSResult> { + let f = SDSSFileIO::::open_or_create_perm_rw::( + name, + header_meta::FileScope::Journal, + header_meta::FileSpecifier::TableDataBatch, + LOG_SPECIFIER_VERSION, + host_setting_version, + host_run_mode, + host_startup_counter, + )?; + match f { + FileOpen::Created(new_file) => Ok(DataBatchPersistDriver::new(new_file, true)?), + FileOpen::Existing(existing, _) => { + // restore + let mut restore_driver = DataBatchRestoreDriver::new(existing)?; + restore_driver.read_data_batch_into_model(model)?; + Ok(DataBatchPersistDriver::new( + restore_driver.into_file(), + false, + )?) + } + } +} diff --git a/server/src/engine/storage/v1/inf/map.rs b/server/src/engine/storage/v1/inf/map.rs index 660a4569..17e9485a 100644 --- a/server/src/engine/storage/v1/inf/map.rs +++ b/server/src/engine/storage/v1/inf/map.rs @@ -27,7 +27,7 @@ use { super::{ obj::{self, FieldMD}, - PersistTypeDscr, PersistMapSpec, PersistObject, VecU8, + PersistMapSpec, PersistObject, PersistTypeDscr, VecU8, }, crate::{ engine::{ @@ -44,7 +44,6 @@ use { util::{copy_slice_to_array as memcpy, EndianQW}, }, core::marker::PhantomData, - std::cmp, }; #[derive(Debug, PartialEq, Eq, Clone, Copy, PartialOrd, Ord)] @@ -177,7 +176,7 @@ impl PersistMapSpec for GenericDictSpec { } fn pretest_entry_data(scanner: &BufferedScanner, md: &Self::EntryMD) -> bool { static EXPECT_ATLEAST: [u8; 4] = [0, 1, 8, 8]; // PAD to align - let lbound_rem = md.klen + EXPECT_ATLEAST[cmp::min(md.dscr, 3) as usize] as usize; + let lbound_rem = md.klen + EXPECT_ATLEAST[md.dscr.min(3) as usize] as usize; scanner.has_left(lbound_rem) & (md.dscr <= PersistTypeDscr::Dict.value_u8()) } fn entry_md_enc(buf: &mut VecU8, key: &Self::Key, _: &Self::Value) { @@ -256,11 +255,7 @@ impl PersistMapSpec for GenericDictSpec { return None; } v.push( - match decode_element( - scanner, - PersistTypeDscr::from_raw(dscr), - false, - ) { + match decode_element(scanner, PersistTypeDscr::from_raw(dscr), false) { Some(DictEntryGeneric::Data(l)) => l, None => return None, _ => unreachable!("found top-level dict item in datacell"), diff --git a/server/src/engine/storage/v1/loader.rs b/server/src/engine/storage/v1/loader.rs new file mode 100644 index 00000000..2c3876e5 --- /dev/null +++ b/server/src/engine/storage/v1/loader.rs @@ -0,0 +1,100 @@ +/* + * Created on Sun Sep 10 2023 + * + * This file is a part of Skytable + * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source + * NoSQL database written by Sayan Nandan ("the Author") with the + * vision to provide flexibility in data modelling without compromising + * on performance, queryability or scalability. + * + * Copyright (c) 2023, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +use crate::engine::{ + core::GlobalNS, + data::uuid::Uuid, + fractal::{FractalModelDriver, ModelDrivers, ModelUniqueID}, + storage::v1::{batch_jrnl, header_meta::HostRunMode, LocalFS, SDSSErrorContext, SDSSResult}, + txn::gns::GNSTransactionDriverAnyFS, +}; + +const GNS_FILE_PATH: &str = "gns.db-tlog"; + +pub struct SEInitState { + pub txn_driver: GNSTransactionDriverAnyFS, + pub model_drivers: ModelDrivers, +} + +impl SEInitState { + pub fn new( + txn_driver: GNSTransactionDriverAnyFS, + model_drivers: ModelDrivers, + ) -> Self { + Self { + txn_driver, + model_drivers, + } + } + pub fn try_init( + host_setting_version: u32, + host_run_mode: HostRunMode, + host_startup_counter: u64, + ) -> SDSSResult { + let gns = GlobalNS::empty(); + let gns_txn_driver = GNSTransactionDriverAnyFS::::open_or_reinit_with_name( + &gns, + GNS_FILE_PATH, + host_setting_version, + host_run_mode, + host_startup_counter, + )?; + let mut model_drivers = ModelDrivers::new(); + for (space_name, space) in gns.spaces().read().iter() { + let space_uuid = space.get_uuid(); + for (model_name, model) in space.models().read().iter() { + let path = Self::model_path(space_name, space_uuid, model_name, model.get_uuid()); + let persist_driver = match batch_jrnl::open_or_reinit( + &path, + model, + host_setting_version, + host_run_mode, + host_startup_counter, + ) { + Ok(j) => j, + Err(e) => { + return Err(e.with_extra(format!( + "failed to restore model data from journal in `{path}`" + ))) + } + }; + let _ = model_drivers.insert( + ModelUniqueID::new(space_name, model_name, model.get_uuid()), + FractalModelDriver::init(persist_driver), + ); + } + } + Ok(SEInitState::new(gns_txn_driver, model_drivers)) + } + fn model_path( + space_name: &str, + space_uuid: Uuid, + model_name: &str, + model_uuid: Uuid, + ) -> String { + format!("data/{space_name}-{space_uuid}/{model_name}-{model_uuid}/data.db-btlog") + } +} diff --git a/server/src/engine/storage/v1/memfs.rs b/server/src/engine/storage/v1/memfs.rs index 7c81f548..d6480519 100644 --- a/server/src/engine/storage/v1/memfs.rs +++ b/server/src/engine/storage/v1/memfs.rs @@ -45,6 +45,8 @@ use { static VFS: Lazy, VNode>>, fn() -> RwLock, VNode>>> = Lazy::new(|| Default::default()); +type ComponentIter<'a> = std::iter::Take>; + /* vnode --- @@ -79,6 +81,17 @@ impl VNode { - make child */ +fn split_parts(fpath: &str) -> Vec<&str> { + fpath.split("/").collect() +} + +fn split_target_and_components(fpath: &str) -> (&str, ComponentIter) { + let parts = split_parts(fpath); + let target = parts.last().unwrap(); + let component_len = parts.len() - 1; + (target, parts.into_iter().take(component_len)) +} + #[derive(Debug)] pub struct VirtualFS; @@ -88,14 +101,10 @@ impl RawFSInterface for VirtualFS { // get vfs let mut vfs = VFS.write(); // get root dir - let path = fpath.split("/").collect::>(); - // get target - let target = *path.last().unwrap(); let mut current = &mut *vfs; // process components - let component_len = path.len() - 1; - let mut path = path.into_iter().take(component_len); - while let Some(component) = path.next() { + let (target, mut components) = split_target_and_components(fpath); + while let Some(component) = components.next() { match current.get_mut(component) { Some(VNode::Dir(d)) => { current = d; @@ -147,7 +156,7 @@ impl RawFSInterface for VirtualFS { } } } - let pieces: Vec<&str> = fpath.split("/").collect(); + let pieces = split_parts(fpath); create_ahead(&pieces, &mut *vfs) } fn fs_delete_dir(fpath: &str) -> super::SDSSResult<()> { @@ -159,10 +168,9 @@ impl RawFSInterface for VirtualFS { fn fs_fopen_or_create_rw(fpath: &str) -> super::SDSSResult> { let mut vfs = VFS.write(); // components - let components = fpath.split("/").collect::>(); - let file = components.last().unwrap().to_owned().into(); + let (target_file, components) = split_target_and_components(fpath); let target_dir = find_target_dir_mut(components, &mut vfs)?; - match target_dir.entry(file) { + match target_dir.entry(target_file.into()) { Entry::Occupied(mut oe) => match oe.get_mut() { VNode::File(f) => { f.read = true; @@ -184,11 +192,10 @@ impl RawFSInterface for VirtualFS { } fn find_target_dir_mut<'a>( - components: Vec<&str>, + components: ComponentIter, mut current: &'a mut HashMap, VNode>, ) -> Result<&'a mut HashMap, VNode>, super::SDSSError> { - let path_len = components.len() - 1; - for component in components.into_iter().take(path_len) { + for component in components { match current.get_mut(component) { Some(VNode::Dir(d)) => current = d, Some(VNode::File(_)) => { @@ -205,11 +212,10 @@ fn find_target_dir_mut<'a>( } fn find_target_dir<'a>( - components: Vec<&str>, + components: ComponentIter, mut current: &'a HashMap, VNode>, ) -> Result<&'a HashMap, VNode>, super::SDSSError> { - let path_len = components.len() - 1; - for component in components.into_iter().take(path_len) { + for component in components { match current.get(component) { Some(VNode::Dir(d)) => current = d, Some(VNode::File(_)) => { @@ -229,10 +235,8 @@ fn delete_dir(fpath: &str, allow_if_non_empty: bool) -> Result<(), super::SDSSEr let mut vfs = VFS.write(); let mut current = &mut *vfs; // process components - let components = fpath.split("/").collect::>(); - let components_len = components.len() - 1; - let target = *components.last().unwrap(); - for component in components.into_iter().take(components_len) { + let (target, components) = split_target_and_components(fpath); + for component in components { match current.get_mut(component) { Some(VNode::Dir(dir)) => { current = dir; @@ -310,10 +314,9 @@ impl Drop for VFileDescriptor { fn with_file_mut(fpath: &str, mut f: impl FnMut(&mut VFile) -> SDSSResult) -> SDSSResult { let mut vfs = VFS.write(); - let components = fpath.split("/").collect::>(); - let file = *components.last().unwrap(); + let (target_file, components) = split_target_and_components(fpath); let target_dir = find_target_dir_mut(components, &mut vfs)?; - match target_dir.get_mut(file) { + match target_dir.get_mut(target_file) { Some(VNode::File(file)) => f(file), Some(VNode::Dir(_)) => { return Err(Error::new(ErrorKind::InvalidInput, "found directory, not a file").into()) @@ -324,10 +327,9 @@ fn with_file_mut(fpath: &str, mut f: impl FnMut(&mut VFile) -> SDSSResult) fn with_file(fpath: &str, mut f: impl FnMut(&VFile) -> SDSSResult) -> SDSSResult { let vfs = VFS.read(); - let components = fpath.split("/").collect::>(); - let file = *components.last().unwrap(); + let (target_file, components) = split_target_and_components(fpath); let target_dir = find_target_dir(components, &vfs)?; - match target_dir.get(file) { + match target_dir.get(target_file) { Some(VNode::File(file)) => f(file), Some(VNode::Dir(_)) => { return Err(Error::new(ErrorKind::InvalidInput, "found directory, not a file").into()) diff --git a/server/src/engine/storage/v1/mod.rs b/server/src/engine/storage/v1/mod.rs index ca269318..ca060411 100644 --- a/server/src/engine/storage/v1/mod.rs +++ b/server/src/engine/storage/v1/mod.rs @@ -29,6 +29,7 @@ mod header_impl; // impls mod batch_jrnl; mod journal; +mod loader; mod rw; // hl pub mod inf; @@ -44,11 +45,14 @@ pub use { memfs::NullFS, rw::{BufferedScanner, LocalFS, RawFSInterface, SDSSFileIO}, }; +pub mod data_batch { + pub use super::batch_jrnl::{DataBatchPersistDriver, DataBatchRestoreDriver}; +} pub mod header_meta { pub use super::header_impl::{FileScope, FileSpecifier, FileSpecifierVersion, HostRunMode}; } -use crate::util::os::SysIOError as IoError; +use crate::{engine::txn::TransactionError, util::os::SysIOError as IoError}; pub type SDSSResult = Result; @@ -71,6 +75,14 @@ impl SDSSErrorContext for std::io::Error { } } +impl SDSSErrorContext for SDSSError { + type ExtraData = String; + + fn with_extra(self, extra: Self::ExtraData) -> SDSSError { + SDSSError::Extra(Box::new(self), extra) + } +} + #[derive(Debug)] #[cfg_attr(test, derive(PartialEq))] pub enum SDSSError { @@ -121,6 +133,16 @@ pub enum SDSSError { /// we failed to close the data batch DataBatchCloseError, DataBatchRestoreCorruptedBatchFile, + JournalRestoreTxnError, + /// An error with more context + // TODO(@ohsayan): avoid the box; we'll clean this up soon + Extra(Box, String), +} + +impl From for SDSSError { + fn from(_: TransactionError) -> Self { + Self::JournalRestoreTxnError + } } impl SDSSError { diff --git a/server/src/engine/txn/data.rs b/server/src/engine/txn/data.rs deleted file mode 100644 index 38c70437..00000000 --- a/server/src/engine/txn/data.rs +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Created on Mon Aug 28 2023 - * - * This file is a part of Skytable - * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source - * NoSQL database written by Sayan Nandan ("the Author") with the - * vision to provide flexibility in data modelling without compromising - * on performance, queryability or scalability. - * - * Copyright (c) 2023, Sayan Nandan - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * -*/ - -use crate::{ - engine::core::{model::delta::DataDelta, GlobalNS}, - util::os, -}; - -type Buf = Vec; - -/* - memory adjustments -*/ - -/// free memory in bytes -static mut FREEMEM_BYTES: u64 = 0; -/// capacity in bytes, per linked list -static mut CAP_PER_LL_BYTES: u64 = 0; -/// maximum number of nodes in linked list -static mut MAX_NODES_IN_LL_CNT: usize = 0; - -/// Set the free memory and cap for deltas so that we don't bust through memory -/// -/// ## Safety -/// - All models must have been loaded -/// - This must be called **before** the arbiter spawns threads for connections -pub unsafe fn set_limits(gns: &GlobalNS) { - let model_cnt: usize = gns - .spaces() - .read() - .values() - .map(|space| space.models().read().len()) - .sum(); - let available_mem = os::free_memory_in_bytes(); - FREEMEM_BYTES = available_mem; - CAP_PER_LL_BYTES = - ((available_mem / core::cmp::max(1, model_cnt) as u64) as f64 * 0.002) as u64; - MAX_NODES_IN_LL_CNT = CAP_PER_LL_BYTES as usize / (sizeof!(DataDelta) + sizeof!(u64)); -} - -/// Returns the maximum number of nodes that can be stored inside a delta queue for a model -/// -/// Currently hardcoded to 0.2% of free memory after all datasets have been loaded -pub unsafe fn get_max_delta_queue_size() -> usize { - // TODO(@ohsayan): dynamically approximate this limit - MAX_NODES_IN_LL_CNT -} diff --git a/server/src/engine/txn/gns/mod.rs b/server/src/engine/txn/gns/mod.rs index a290f53e..73938417 100644 --- a/server/src/engine/txn/gns/mod.rs +++ b/server/src/engine/txn/gns/mod.rs @@ -41,7 +41,7 @@ use { }, util::EndianQW, }, - std::{fs::File, marker::PhantomData}, + std::marker::PhantomData, }; mod model; @@ -61,7 +61,6 @@ pub use { pub type GNSTransactionDriverNullZero = GNSTransactionDriverAnyFS; -pub type GNSTransactionDriver = GNSTransactionDriverAnyFS; #[cfg(test)] pub type GNSTransactionDriverVFS = GNSTransactionDriverAnyFS; @@ -106,20 +105,6 @@ impl GNSTransactionDriverAnyFS { .append_journal_close_and_close() .map_err(|e| e.into()) } - pub fn open_or_reinit( - gns: &GlobalNS, - host_setting_version: u32, - host_run_mode: header_meta::HostRunMode, - host_startup_counter: u64, - ) -> TransactionResult { - Self::open_or_reinit_with_name( - gns, - "gns.db-tlog", - host_setting_version, - host_run_mode, - host_startup_counter, - ) - } pub fn open_or_reinit_with_name( gns: &GlobalNS, log_file_name: &str, @@ -190,7 +175,7 @@ impl JournalAdapter for GNSAdapter { // UNSAFE(@ohsayan): u16::from_le_bytes(scanner.next_chunk()) }; - match DISPATCH[core::cmp::min(opc as usize, DISPATCH.len())](&mut scanner, gs) { + match DISPATCH[(opc as usize).min(DISPATCH.len())](&mut scanner, gs) { Ok(()) if scanner.eof() => return Ok(()), Ok(_) => Err(TransactionError::DecodeCorruptedPayloadMoreBytes), Err(e) => Err(e), diff --git a/server/src/engine/txn/mod.rs b/server/src/engine/txn/mod.rs index fc4863e7..733839dc 100644 --- a/server/src/engine/txn/mod.rs +++ b/server/src/engine/txn/mod.rs @@ -24,7 +24,6 @@ * */ -pub mod data; pub mod gns; use super::storage::v1::SDSSError;