Implement services

next
Sayan Nandan 1 year ago
parent e309e35b63
commit 5cafc61231
No known key found for this signature in database
GPG Key ID: 42EEDF4AE9D96B54

@ -26,7 +26,9 @@
use {
super::{Fields, Model},
crate::engine::{core::index::Row, sync::atm::Guard, sync::queue::Queue},
crate::engine::{
core::index::Row, fractal::FractalToken, sync::atm::Guard, sync::queue::Queue,
},
parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard},
std::{
collections::btree_map::{BTreeMap, Range},
@ -259,13 +261,23 @@ impl DeltaState {
}
}
// fractal
impl DeltaState {
pub fn __fractal_take_from_data_delta(&self, cnt: usize, _token: FractalToken) {
let _ = self.data_deltas_size.fetch_sub(cnt, Ordering::Release);
}
pub fn __fractal_take_full_from_data_delta(&self, _token: FractalToken) -> usize {
self.data_deltas_size.swap(0, Ordering::AcqRel)
}
}
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
pub struct DeltaVersion(u64);
impl DeltaVersion {
pub const fn genesis() -> Self {
Self(0)
}
pub const fn __new(v: u64) -> Self {
pub const fn __new(v: u64) -> Self {
Self(v)
}
fn step(&self) -> Self {

@ -24,6 +24,8 @@
*
*/
use core::fmt;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct Uuid {
data: uuid::Uuid,
@ -48,8 +50,8 @@ impl Uuid {
}
}
impl ToString for Uuid {
fn to_string(&self) -> String {
self.data.to_string()
impl fmt::Display for Uuid {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.data.fmt(f)
}
}

@ -24,7 +24,7 @@
*
*/
use super::txn::TransactionError;
use super::{storage::v1::SDSSError, txn::TransactionError};
pub type LangResult<T> = Result<T, LangError>;
pub type LexResult<T> = Result<T, LexError>;
@ -81,8 +81,9 @@ pub enum LangError {
StmtUnknownDrop,
}
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
#[derive(Debug)]
#[repr(u8)]
#[cfg_attr(test, derive(PartialEq))]
/// Executor errors
pub enum DatabaseError {
// sys
@ -136,6 +137,13 @@ pub enum DatabaseError {
DmlConstraintViolationFieldTypedef,
ServerError,
TransactionalError,
StorageSubsystemErr(SDSSError),
}
impl From<SDSSError> for DatabaseError {
fn from(e: SDSSError) -> Self {
Self::StorageSubsystemErr(e)
}
}
impl From<TransactionError> for DatabaseError {

@ -0,0 +1,47 @@
/*
* Created on Sun Sep 10 2023
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2023, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use crate::engine::storage::v1::header_meta::HostRunMode;
pub struct ServerConfig {
host_settings_version: u32,
host_run_mode: HostRunMode,
host_startup_counter: u64,
}
impl ServerConfig {
pub fn new(
host_settings_version: u32,
host_run_mode: HostRunMode,
host_startup_counter: u64,
) -> Self {
Self {
host_settings_version,
host_run_mode,
host_startup_counter,
}
}
}

@ -0,0 +1,88 @@
/*
* Created on Sun Sep 10 2023
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2023, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use {
super::util,
crate::engine::{
storage::v1::{data_batch::DataBatchPersistDriver, LocalFS},
txn::gns::GNSTransactionDriverAnyFS,
},
parking_lot::Mutex,
std::sync::Arc,
};
/// GNS driver
pub(super) struct FractalGNSDriver {
status: util::Status,
txn_driver: Mutex<GNSTransactionDriverAnyFS<LocalFS>>,
}
impl FractalGNSDriver {
pub(super) fn new(txn_driver: GNSTransactionDriverAnyFS<LocalFS>) -> Self {
Self {
status: util::Status::new_okay(),
txn_driver: Mutex::new(txn_driver),
}
}
}
/// Model driver
pub struct FractalModelDriver {
hooks: Arc<FractalModelHooks>,
batch_driver: Mutex<DataBatchPersistDriver<LocalFS>>,
}
impl FractalModelDriver {
/// Initialize a model driver with default settings
pub fn init(batch_driver: DataBatchPersistDriver<LocalFS>) -> Self {
Self {
hooks: Arc::new(FractalModelHooks::new()),
batch_driver: Mutex::new(batch_driver),
}
}
/// Returns a reference to the batch persist driver
pub fn batch_driver(&self) -> &Mutex<DataBatchPersistDriver<LocalFS>> {
&self.batch_driver
}
}
/// Model hooks
#[derive(Debug)]
pub struct FractalModelHooks {
status: util::Status,
}
impl FractalModelHooks {
#[cfg(test)]
pub fn test() -> Self {
Self::new()
}
fn new() -> Self {
Self {
status: util::Status::new_okay(),
}
}
}

@ -0,0 +1,303 @@
/*
* Created on Sat Sep 09 2023
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2023, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use {
super::ModelUniqueID,
crate::{
engine::core::model::{delta::DataDelta, Model},
util::os,
},
std::path::PathBuf,
tokio::{
fs,
sync::mpsc::{UnboundedReceiver, UnboundedSender},
task::JoinHandle,
},
};
/// A task for the [`FractalMgr`] to perform
pub struct Task<T> {
threshold: usize,
task: T,
}
impl<T> Task<T> {
const THRESHOLD: usize = 10;
/// Create a new task with the default threshold
pub fn new(task: T) -> Self {
Self::with_threshold(task, Self::THRESHOLD)
}
/// Create a task with the given threshold
fn with_threshold(task: T, threshold: usize) -> Self {
Self { threshold, task }
}
}
/// A general task
pub enum GenericTask {
/// Delete a single file
DeleteFile(PathBuf),
/// Delete a directory (and all its children)
DeleteDirAll(PathBuf),
}
/// A critical task
pub enum CriticalTask {
/// Write a new data batch
WriteBatch(ModelUniqueID, usize),
}
/// The task manager
pub(super) struct FractalMgr {
hp_dispatcher: UnboundedSender<Task<CriticalTask>>,
general_dispatcher: UnboundedSender<Task<GenericTask>>,
runtime_stats: FractalRTStat,
}
pub(super) struct FractalRTStat {
mem_free_bytes: u64,
per_mdl_delta_max_size: usize,
}
impl FractalRTStat {
fn init(model_cnt: usize) -> Self {
let mem_free_bytes = os::free_memory_in_bytes();
let allowed_delta_limit = mem_free_bytes as f64 * 0.02;
let per_model_limit = allowed_delta_limit / model_cnt.max(1) as f64;
Self {
mem_free_bytes,
per_mdl_delta_max_size: per_model_limit as usize / sizeof!(DataDelta),
}
}
pub(super) fn mem_free_bytes(&self) -> u64 {
self.mem_free_bytes
}
pub(super) fn per_mdl_delta_max_size(&self) -> usize {
self.per_mdl_delta_max_size
}
}
impl FractalMgr {
pub(super) fn new(
hp_dispatcher: UnboundedSender<Task<CriticalTask>>,
general_dispatcher: UnboundedSender<Task<GenericTask>>,
model_count: usize,
) -> Self {
Self {
hp_dispatcher,
general_dispatcher,
runtime_stats: FractalRTStat::init(model_count),
}
}
pub fn get_rt_stat(&self) -> &FractalRTStat {
&self.runtime_stats
}
/// Add a high priority task to the queue
///
/// ## Panics
///
/// This will panic if the high priority executor has crashed or exited
pub fn post_high_priority(&self, task: Task<CriticalTask>) {
self.hp_dispatcher.send(task).unwrap()
}
/// Add a low priority task to the queue
///
/// ## Panics
///
/// This will panic if the low priority executor has crashed or exited
pub fn post_low_priority(&self, task: Task<GenericTask>) {
self.general_dispatcher.send(task).unwrap()
}
}
/// Handles to all the services that fractal needs. These are spawned on the default runtime
pub struct FractalServiceHandles {
pub hp_handle: JoinHandle<()>,
pub lp_handle: JoinHandle<()>,
}
impl FractalMgr {
/// Start all background services, and return their handles
pub(super) fn start_all(
global: super::Global,
lp_receiver: UnboundedReceiver<Task<GenericTask>>,
hp_receiver: UnboundedReceiver<Task<CriticalTask>>,
) -> FractalServiceHandles {
let fractal_mgr = global.get_state().fractal_mgr();
let hp_handle = tokio::spawn(async move {
FractalMgr::hp_executor_svc(fractal_mgr, global, hp_receiver).await
});
let lp_handle = tokio::spawn(async move {
FractalMgr::general_executor_svc(fractal_mgr, global, lp_receiver).await
});
FractalServiceHandles {
hp_handle,
lp_handle,
}
}
}
// services
impl FractalMgr {
const GENERAL_EXECUTOR_WINDOW: u64 = 5 * 60;
/// The high priority executor service runs in the background to take care of high priority tasks and take any
/// appropriate action. It will exclusively own the high priority queue since it is the only broker that is
/// allowed to perform HP tasks
pub async fn hp_executor_svc(
&'static self,
global: super::Global,
mut receiver: UnboundedReceiver<Task<CriticalTask>>,
) {
loop {
let Some(Task { threshold, task }) = receiver.recv().await else {
return; // all handles closed; nothing left to do
};
// TODO(@ohsayan): check threshold and update hooks
match task {
CriticalTask::WriteBatch(model_id, observed_size) => {
let mdl_drivers = global.get_state().get_mdl_drivers().read();
let Some(mdl_driver) = mdl_drivers.get(&model_id) else {
// because we maximize throughput, the model driver may have been already removed but this task
// was way behind in the queue
continue;
};
let res = global.namespace().with_model(
(model_id.space(), model_id.model()),
|model| {
if model.get_uuid() != model_id.uuid() {
// once again, throughput maximization will lead to, in extremely rare cases, this
// branch returning. but it is okay
return Ok(());
}
// mark that we're taking these deltas
model.delta_state().__fractal_take_from_data_delta(
observed_size,
super::FractalToken::new(),
);
Self::try_write_model_data_batch(model, observed_size, mdl_driver)
},
);
match res {
Ok(()) => {}
Err(_) => {
log::error!(
"Error writing data batch for model {}. Retrying...",
model_id.uuid()
);
// enqueue again for retrying
self.hp_dispatcher
.send(Task::with_threshold(
CriticalTask::WriteBatch(model_id, observed_size),
threshold - 1,
))
.unwrap();
}
}
}
}
}
}
/// The general priority task or simply the general queue takes of care of low priority and other standard priority
/// tasks (such as those running on a schedule). A low priority task can be promoted to a high priority task, and the
/// discretion of the GP executor. Similarly, the executor owns the general purpose task queue since it is the sole broker
/// for such tasks
pub async fn general_executor_svc(
&'static self,
global: super::Global,
mut lpq: UnboundedReceiver<Task<GenericTask>>,
) {
loop {
tokio::select! {
_ = tokio::time::sleep(std::time::Duration::from_secs(Self::GENERAL_EXECUTOR_WINDOW)) => {
let mdl_drivers = global.get_state().get_mdl_drivers().read();
for (model_id, driver) in mdl_drivers.iter() {
let mut observed_len = 0;
let res = global.namespace().with_model((model_id.space(), model_id.model()), |model| {
if model.get_uuid() != model_id.uuid() {
// once again, throughput maximization will lead to, in extremely rare cases, this
// branch returning. but it is okay
return Ok(());
}
// mark that we're taking these deltas
observed_len = model.delta_state().__fractal_take_full_from_data_delta(super::FractalToken::new());
Self::try_write_model_data_batch(model, observed_len, driver)
});
match res {
Ok(()) => {}
Err(_) => {
// this failure is *not* good, so we want to promote this to a critical task
self.hp_dispatcher.send(Task::new(CriticalTask::WriteBatch(model_id.clone(), observed_len))).unwrap()
}
}
}
}
task = lpq.recv() => {
let Some(Task { threshold, task }) = task else {
return;
};
// TODO(@ohsayan): threshold
match task {
GenericTask::DeleteFile(f) => {
if let Err(_) = fs::remove_file(&f).await {
self.general_dispatcher.send(
Task::with_threshold(GenericTask::DeleteFile(f), threshold - 1)
).unwrap();
}
}
GenericTask::DeleteDirAll(dir) => {
if let Err(_) = fs::remove_dir_all(&dir).await {
self.general_dispatcher.send(
Task::with_threshold(GenericTask::DeleteDirAll(dir), threshold - 1)
).unwrap();
}
}
}
}
}
}
}
}
// util
impl FractalMgr {
/// Attempt to write a model data batch with the observed size.
///
/// The zero check is essential
fn try_write_model_data_batch(
model: &Model,
observed_size: usize,
mdl_driver: &super::FractalModelDriver,
) -> Result<(), crate::engine::error::DatabaseError> {
if observed_size == 0 {
// no changes, all good
return Ok(());
}
// try flushing the batch
let mut batch_driver = mdl_driver.batch_driver().lock();
batch_driver.write_new_batch(model, observed_size)?;
Ok(())
}
}

@ -0,0 +1,204 @@
/*
* Created on Sat Sep 09 2023
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2023, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use {
super::{
core::GlobalNS, data::uuid::Uuid, storage::v1::LocalFS, txn::gns::GNSTransactionDriverAnyFS,
},
parking_lot::RwLock,
std::{collections::HashMap, mem::MaybeUninit},
tokio::sync::mpsc::unbounded_channel,
};
mod config;
mod drivers;
mod mgr;
mod util;
pub use {
config::ServerConfig,
drivers::FractalModelDriver,
mgr::{CriticalTask, GenericTask, Task},
util::FractalToken,
};
pub type ModelDrivers = HashMap<ModelUniqueID, drivers::FractalModelDriver>;
static mut GLOBAL: MaybeUninit<GlobalState> = MaybeUninit::uninit();
/*
global state init
*/
/// Returned by [`enable_and_start_all`]. This contains a [`Global`] handle that can be used to easily access global
/// data
pub struct GlobalStateStart {
pub global: Global,
pub mgr_handles: mgr::FractalServiceHandles,
}
/// Enable all drivers and start all engines
///
/// ## Safety
///
/// Must be called iff this is the only thread calling it
pub unsafe fn enable_and_start_all(
gns: GlobalNS,
config: config::ServerConfig,
gns_driver: GNSTransactionDriverAnyFS<LocalFS>,
model_drivers: ModelDrivers,
) -> GlobalStateStart {
let model_cnt_on_boot = model_drivers.len();
let gns_driver = drivers::FractalGNSDriver::new(gns_driver);
let mdl_driver = RwLock::new(model_drivers);
let (hp_sender, hp_recv) = unbounded_channel();
let (lp_sender, lp_recv) = unbounded_channel();
let global_state = GlobalState::new(
gns,
gns_driver,
mdl_driver,
mgr::FractalMgr::new(hp_sender, lp_sender, model_cnt_on_boot),
config,
);
GLOBAL = MaybeUninit::new(global_state);
let token = Global::new();
GlobalStateStart {
global: token,
mgr_handles: mgr::FractalMgr::start_all(token, lp_recv, hp_recv),
}
}
/*
global access
*/
#[derive(Debug, Clone, Copy)]
/// A handle to the global state
pub struct Global(());
impl Global {
fn new() -> Self {
Self(())
}
pub(self) fn get_state(&self) -> &'static GlobalState {
unsafe { GLOBAL.assume_init_ref() }
}
/// Returns a handle to the [`GlobalNS`]
pub fn namespace(&self) -> &'static GlobalNS {
&unsafe { GLOBAL.assume_init_ref() }.gns
}
/// Post an urgent task
pub fn post_high_priority_task(&self, task: Task<CriticalTask>) {
self.get_state().fractal_mgr().post_high_priority(task)
}
/// Post a task with normal priority
///
/// NB: It is not guaranteed that the task will remain as a low priority task because the scheduler can choose
/// to promote the task to a high priority task, if it deems necessary.
pub fn post_standard_priority_task(&self, task: Task<GenericTask>) {
self.get_state().fractal_mgr().post_low_priority(task)
}
/// Returns the maximum size a model's delta size can hit before it should immediately issue a batch write request
/// to avoid memory pressure
pub fn get_max_delta_size(&self) -> usize {
self.get_state()
.fractal_mgr()
.get_rt_stat()
.per_mdl_delta_max_size()
}
}
/*
global state
*/
/// The global state
struct GlobalState {
gns: GlobalNS,
gns_driver: drivers::FractalGNSDriver,
mdl_driver: RwLock<ModelDrivers>,
task_mgr: mgr::FractalMgr,
config: config::ServerConfig,
}
impl GlobalState {
fn new(
gns: GlobalNS,
gns_driver: drivers::FractalGNSDriver,
mdl_driver: RwLock<ModelDrivers>,
task_mgr: mgr::FractalMgr,
config: config::ServerConfig,
) -> Self {
Self {
gns,
gns_driver,
mdl_driver,
task_mgr,
config,
}
}
pub(self) fn get_mdl_drivers(&self) -> &RwLock<ModelDrivers> {
&self.mdl_driver
}
pub(self) fn fractal_mgr(&self) -> &mgr::FractalMgr {
&self.task_mgr
}
}
// these impls are completely fine
unsafe impl Send for GlobalState {}
unsafe impl Sync for GlobalState {}
/// An unique signature that identifies a model, and only that model (guaranteed by the OS's random source)
// NB(@ohsayan): if there are collisions, which I absolutely do not expect any instances of, pool in the space's UUID
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
pub struct ModelUniqueID {
space: Box<str>,
model: Box<str>,
uuid: Uuid,
}
impl ModelUniqueID {
/// Create a new unique model ID
pub fn new(space: &str, model: &str, uuid: Uuid) -> Self {
Self {
space: space.into(),
model: model.into(),
uuid,
}
}
/// Returns the space name
pub fn space(&self) -> &str {
self.space.as_ref()
}
/// Returns the model name
pub fn model(&self) -> &str {
self.model.as_ref()
}
/// Returns the uuid
pub fn uuid(&self) -> Uuid {
self.uuid
}
}

@ -0,0 +1,78 @@
/*
* Created on Sat Sep 09 2023
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2023, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use std::sync::atomic::{AtomicBool, Ordering};
#[derive(Debug)]
pub struct Status {
okay: AtomicBool,
}
impl Status {
pub const fn new_okay() -> Self {
Self::new(true)
}
pub const fn new_iffy() -> Self {
Self::new(false)
}
const fn new(v: bool) -> Self {
Self {
okay: AtomicBool::new(v),
}
}
}
impl Status {
pub fn is_iffy(&self) -> bool {
!self._get()
}
pub fn is_healthy(&self) -> bool {
self._get()
}
fn _get(&self) -> bool {
self.okay.load(Ordering::Acquire)
}
}
impl Status {
pub(super) fn set_okay(&self) {
self._set(true)
}
pub(super) fn set_iffy(&self) {
self._set(false)
}
fn _set(&self, v: bool) {
self.okay.store(v, Ordering::Release)
}
}
/// A special token for fractal calls
pub struct FractalToken(());
impl FractalToken {
pub(super) fn new() -> Self {
Self(())
}
}

@ -31,6 +31,7 @@ mod macros;
mod core;
mod data;
mod error;
mod fractal;
mod idx;
mod mem;
mod ql;

@ -41,12 +41,11 @@ use {
},
util::{compiler, MaybeInit},
},
core::cmp,
};
#[inline(always)]
pub fn minidx<T>(src: &[T], index: usize) -> usize {
cmp::min(src.len() - 1, index)
(src.len() - 1).min(index)
}
#[derive(Debug, PartialEq)]

@ -36,7 +36,6 @@ use {
},
util::{compiler, MaybeInit},
},
core::cmp,
std::{
collections::HashMap,
time::{Duration, SystemTime, UNIX_EPOCH},
@ -114,7 +113,7 @@ fn hashp(key: &[u8]) -> u32 {
fn ldfunc(func: Ident<'_>) -> Option<ProducerFn> {
let func = func.as_bytes();
let ph = hashp(func) as usize;
let min = cmp::min(ph, PRODUCER_F.len() - 1);
let min = ph.min(PRODUCER_F.len() - 1);
let data = PRODUCER_F[min];
if data.0 == func {
Some(data.1)

@ -38,7 +38,7 @@ use {
},
util::compiler,
},
core::{cmp, fmt, ops::BitOr, slice, str},
core::{fmt, ops::BitOr, slice, str},
};
pub use self::raw::{Ident, Keyword, Symbol, Token};
@ -484,7 +484,7 @@ impl<'a> SafeQueryData<'a> {
while src.len() >= 3 && okay {
let tc = src[0];
okay &= tc <= nonpadded_offset;
let mx = cmp::min(ecc_offset, tc as usize);
let mx = ecc_offset.min(tc as usize);
let mut i_ = 1;
okay &= LITIR_TF[mx](&src[1..], &mut i_, &mut data);
src = &src[i_..];
@ -508,7 +508,7 @@ impl<'b> SafeQueryData<'b> {
let src = &src[i..];
// find payload
*flag &= src.len() >= payload_len;
let mx_extract = cmp::min(payload_len, src.len());
let mx_extract = payload_len.min(src.len());
// incr cursor
i += mx_extract;
*cnt += i;
@ -534,7 +534,7 @@ impl<'b> SafeQueryData<'b> {
#[inline(always)]
pub(super) fn bool<'a>(src: Slice<'a>, cnt: &mut usize, data: &mut Vec<LitIR<'a>>) -> bool {
// `true\n` or `false\n`
let mx = cmp::min(6, src.len());
let mx = 6.min(src.len());
let slice = &src[..mx];
let v_true = slice.starts_with(b"true\n");
let v_false = slice.starts_with(b"false\n");

@ -43,3 +43,45 @@ const RECOVERY_THRESHOLD: usize = 10;
#[cfg(test)]
pub(super) use restore::{DecodedBatchEvent, DecodedBatchEventKind, NormalBatch};
pub use {persist::DataBatchPersistDriver, restore::DataBatchRestoreDriver};
use {
super::{
header_meta,
rw::{FileOpen, SDSSFileIO},
RawFSInterface, SDSSResult,
},
crate::engine::core::model::Model,
};
const LOG_SPECIFIER_VERSION: header_meta::FileSpecifierVersion =
header_meta::FileSpecifierVersion::__new(0);
pub fn open_or_reinit<Fs: RawFSInterface>(
name: &str,
model: &Model,
host_setting_version: u32,
host_run_mode: header_meta::HostRunMode,
host_startup_counter: u64,
) -> SDSSResult<DataBatchPersistDriver<Fs>> {
let f = SDSSFileIO::<Fs>::open_or_create_perm_rw::<false>(
name,
header_meta::FileScope::Journal,
header_meta::FileSpecifier::TableDataBatch,
LOG_SPECIFIER_VERSION,
host_setting_version,
host_run_mode,
host_startup_counter,
)?;
match f {
FileOpen::Created(new_file) => Ok(DataBatchPersistDriver::new(new_file, true)?),
FileOpen::Existing(existing, _) => {
// restore
let mut restore_driver = DataBatchRestoreDriver::new(existing)?;
restore_driver.read_data_batch_into_model(model)?;
Ok(DataBatchPersistDriver::new(
restore_driver.into_file(),
false,
)?)
}
}
}

@ -27,7 +27,7 @@
use {
super::{
obj::{self, FieldMD},
PersistTypeDscr, PersistMapSpec, PersistObject, VecU8,
PersistMapSpec, PersistObject, PersistTypeDscr, VecU8,
},
crate::{
engine::{
@ -44,7 +44,6 @@ use {
util::{copy_slice_to_array as memcpy, EndianQW},
},
core::marker::PhantomData,
std::cmp,
};
#[derive(Debug, PartialEq, Eq, Clone, Copy, PartialOrd, Ord)]
@ -177,7 +176,7 @@ impl PersistMapSpec for GenericDictSpec {
}
fn pretest_entry_data(scanner: &BufferedScanner, md: &Self::EntryMD) -> bool {
static EXPECT_ATLEAST: [u8; 4] = [0, 1, 8, 8]; // PAD to align
let lbound_rem = md.klen + EXPECT_ATLEAST[cmp::min(md.dscr, 3) as usize] as usize;
let lbound_rem = md.klen + EXPECT_ATLEAST[md.dscr.min(3) as usize] as usize;
scanner.has_left(lbound_rem) & (md.dscr <= PersistTypeDscr::Dict.value_u8())
}
fn entry_md_enc(buf: &mut VecU8, key: &Self::Key, _: &Self::Value) {
@ -256,11 +255,7 @@ impl PersistMapSpec for GenericDictSpec {
return None;
}
v.push(
match decode_element(
scanner,
PersistTypeDscr::from_raw(dscr),
false,
) {
match decode_element(scanner, PersistTypeDscr::from_raw(dscr), false) {
Some(DictEntryGeneric::Data(l)) => l,
None => return None,
_ => unreachable!("found top-level dict item in datacell"),

@ -0,0 +1,100 @@
/*
* Created on Sun Sep 10 2023
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2023, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use crate::engine::{
core::GlobalNS,
data::uuid::Uuid,
fractal::{FractalModelDriver, ModelDrivers, ModelUniqueID},
storage::v1::{batch_jrnl, header_meta::HostRunMode, LocalFS, SDSSErrorContext, SDSSResult},
txn::gns::GNSTransactionDriverAnyFS,
};
const GNS_FILE_PATH: &str = "gns.db-tlog";
pub struct SEInitState {
pub txn_driver: GNSTransactionDriverAnyFS<super::LocalFS>,
pub model_drivers: ModelDrivers,
}
impl SEInitState {
pub fn new(
txn_driver: GNSTransactionDriverAnyFS<super::LocalFS>,
model_drivers: ModelDrivers,
) -> Self {
Self {
txn_driver,
model_drivers,
}
}
pub fn try_init(
host_setting_version: u32,
host_run_mode: HostRunMode,
host_startup_counter: u64,
) -> SDSSResult<Self> {
let gns = GlobalNS::empty();
let gns_txn_driver = GNSTransactionDriverAnyFS::<LocalFS>::open_or_reinit_with_name(
&gns,
GNS_FILE_PATH,
host_setting_version,
host_run_mode,
host_startup_counter,
)?;
let mut model_drivers = ModelDrivers::new();
for (space_name, space) in gns.spaces().read().iter() {
let space_uuid = space.get_uuid();
for (model_name, model) in space.models().read().iter() {
let path = Self::model_path(space_name, space_uuid, model_name, model.get_uuid());
let persist_driver = match batch_jrnl::open_or_reinit(
&path,
model,
host_setting_version,
host_run_mode,
host_startup_counter,
) {
Ok(j) => j,
Err(e) => {
return Err(e.with_extra(format!(
"failed to restore model data from journal in `{path}`"
)))
}
};
let _ = model_drivers.insert(
ModelUniqueID::new(space_name, model_name, model.get_uuid()),
FractalModelDriver::init(persist_driver),
);
}
}
Ok(SEInitState::new(gns_txn_driver, model_drivers))
}
fn model_path(
space_name: &str,
space_uuid: Uuid,
model_name: &str,
model_uuid: Uuid,
) -> String {
format!("data/{space_name}-{space_uuid}/{model_name}-{model_uuid}/data.db-btlog")
}
}

@ -45,6 +45,8 @@ use {
static VFS: Lazy<RwLock<HashMap<Box<str>, VNode>>, fn() -> RwLock<HashMap<Box<str>, VNode>>> =
Lazy::new(|| Default::default());
type ComponentIter<'a> = std::iter::Take<std::vec::IntoIter<&'a str>>;
/*
vnode
---
@ -79,6 +81,17 @@ impl VNode {
- make child
*/
fn split_parts(fpath: &str) -> Vec<&str> {
fpath.split("/").collect()
}
fn split_target_and_components(fpath: &str) -> (&str, ComponentIter) {
let parts = split_parts(fpath);
let target = parts.last().unwrap();
let component_len = parts.len() - 1;
(target, parts.into_iter().take(component_len))
}
#[derive(Debug)]
pub struct VirtualFS;
@ -88,14 +101,10 @@ impl RawFSInterface for VirtualFS {
// get vfs
let mut vfs = VFS.write();
// get root dir
let path = fpath.split("/").collect::<Vec<&str>>();
// get target
let target = *path.last().unwrap();
let mut current = &mut *vfs;
// process components
let component_len = path.len() - 1;
let mut path = path.into_iter().take(component_len);
while let Some(component) = path.next() {
let (target, mut components) = split_target_and_components(fpath);
while let Some(component) = components.next() {
match current.get_mut(component) {
Some(VNode::Dir(d)) => {
current = d;
@ -147,7 +156,7 @@ impl RawFSInterface for VirtualFS {
}
}
}
let pieces: Vec<&str> = fpath.split("/").collect();
let pieces = split_parts(fpath);
create_ahead(&pieces, &mut *vfs)
}
fn fs_delete_dir(fpath: &str) -> super::SDSSResult<()> {
@ -159,10 +168,9 @@ impl RawFSInterface for VirtualFS {
fn fs_fopen_or_create_rw(fpath: &str) -> super::SDSSResult<super::rw::RawFileOpen<Self::File>> {
let mut vfs = VFS.write();
// components
let components = fpath.split("/").collect::<Vec<&str>>();
let file = components.last().unwrap().to_owned().into();
let (target_file, components) = split_target_and_components(fpath);
let target_dir = find_target_dir_mut(components, &mut vfs)?;
match target_dir.entry(file) {
match target_dir.entry(target_file.into()) {
Entry::Occupied(mut oe) => match oe.get_mut() {
VNode::File(f) => {
f.read = true;
@ -184,11 +192,10 @@ impl RawFSInterface for VirtualFS {
}
fn find_target_dir_mut<'a>(
components: Vec<&str>,
components: ComponentIter,
mut current: &'a mut HashMap<Box<str>, VNode>,
) -> Result<&'a mut HashMap<Box<str>, VNode>, super::SDSSError> {
let path_len = components.len() - 1;
for component in components.into_iter().take(path_len) {
for component in components {
match current.get_mut(component) {
Some(VNode::Dir(d)) => current = d,
Some(VNode::File(_)) => {
@ -205,11 +212,10 @@ fn find_target_dir_mut<'a>(
}
fn find_target_dir<'a>(
components: Vec<&str>,
components: ComponentIter,
mut current: &'a HashMap<Box<str>, VNode>,
) -> Result<&'a HashMap<Box<str>, VNode>, super::SDSSError> {
let path_len = components.len() - 1;
for component in components.into_iter().take(path_len) {
for component in components {
match current.get(component) {
Some(VNode::Dir(d)) => current = d,
Some(VNode::File(_)) => {
@ -229,10 +235,8 @@ fn delete_dir(fpath: &str, allow_if_non_empty: bool) -> Result<(), super::SDSSEr
let mut vfs = VFS.write();
let mut current = &mut *vfs;
// process components
let components = fpath.split("/").collect::<Vec<&str>>();
let components_len = components.len() - 1;
let target = *components.last().unwrap();
for component in components.into_iter().take(components_len) {
let (target, components) = split_target_and_components(fpath);
for component in components {
match current.get_mut(component) {
Some(VNode::Dir(dir)) => {
current = dir;
@ -310,10 +314,9 @@ impl Drop for VFileDescriptor {
fn with_file_mut<T>(fpath: &str, mut f: impl FnMut(&mut VFile) -> SDSSResult<T>) -> SDSSResult<T> {
let mut vfs = VFS.write();
let components = fpath.split("/").collect::<Vec<&str>>();
let file = *components.last().unwrap();
let (target_file, components) = split_target_and_components(fpath);
let target_dir = find_target_dir_mut(components, &mut vfs)?;
match target_dir.get_mut(file) {
match target_dir.get_mut(target_file) {
Some(VNode::File(file)) => f(file),
Some(VNode::Dir(_)) => {
return Err(Error::new(ErrorKind::InvalidInput, "found directory, not a file").into())
@ -324,10 +327,9 @@ fn with_file_mut<T>(fpath: &str, mut f: impl FnMut(&mut VFile) -> SDSSResult<T>)
fn with_file<T>(fpath: &str, mut f: impl FnMut(&VFile) -> SDSSResult<T>) -> SDSSResult<T> {
let vfs = VFS.read();
let components = fpath.split("/").collect::<Vec<&str>>();
let file = *components.last().unwrap();
let (target_file, components) = split_target_and_components(fpath);
let target_dir = find_target_dir(components, &vfs)?;
match target_dir.get(file) {
match target_dir.get(target_file) {
Some(VNode::File(file)) => f(file),
Some(VNode::Dir(_)) => {
return Err(Error::new(ErrorKind::InvalidInput, "found directory, not a file").into())

@ -29,6 +29,7 @@ mod header_impl;
// impls
mod batch_jrnl;
mod journal;
mod loader;
mod rw;
// hl
pub mod inf;
@ -44,11 +45,14 @@ pub use {
memfs::NullFS,
rw::{BufferedScanner, LocalFS, RawFSInterface, SDSSFileIO},
};
pub mod data_batch {
pub use super::batch_jrnl::{DataBatchPersistDriver, DataBatchRestoreDriver};
}
pub mod header_meta {
pub use super::header_impl::{FileScope, FileSpecifier, FileSpecifierVersion, HostRunMode};
}
use crate::util::os::SysIOError as IoError;
use crate::{engine::txn::TransactionError, util::os::SysIOError as IoError};
pub type SDSSResult<T> = Result<T, SDSSError>;
@ -71,6 +75,14 @@ impl SDSSErrorContext for std::io::Error {
}
}
impl SDSSErrorContext for SDSSError {
type ExtraData = String;
fn with_extra(self, extra: Self::ExtraData) -> SDSSError {
SDSSError::Extra(Box::new(self), extra)
}
}
#[derive(Debug)]
#[cfg_attr(test, derive(PartialEq))]
pub enum SDSSError {
@ -121,6 +133,16 @@ pub enum SDSSError {
/// we failed to close the data batch
DataBatchCloseError,
DataBatchRestoreCorruptedBatchFile,
JournalRestoreTxnError,
/// An error with more context
// TODO(@ohsayan): avoid the box; we'll clean this up soon
Extra(Box<Self>, String),
}
impl From<TransactionError> for SDSSError {
fn from(_: TransactionError) -> Self {
Self::JournalRestoreTxnError
}
}
impl SDSSError {

@ -1,70 +0,0 @@
/*
* Created on Mon Aug 28 2023
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2023, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use crate::{
engine::core::{model::delta::DataDelta, GlobalNS},
util::os,
};
type Buf = Vec<u8>;
/*
memory adjustments
*/
/// free memory in bytes
static mut FREEMEM_BYTES: u64 = 0;
/// capacity in bytes, per linked list
static mut CAP_PER_LL_BYTES: u64 = 0;
/// maximum number of nodes in linked list
static mut MAX_NODES_IN_LL_CNT: usize = 0;
/// Set the free memory and cap for deltas so that we don't bust through memory
///
/// ## Safety
/// - All models must have been loaded
/// - This must be called **before** the arbiter spawns threads for connections
pub unsafe fn set_limits(gns: &GlobalNS) {
let model_cnt: usize = gns
.spaces()
.read()
.values()
.map(|space| space.models().read().len())
.sum();
let available_mem = os::free_memory_in_bytes();
FREEMEM_BYTES = available_mem;
CAP_PER_LL_BYTES =
((available_mem / core::cmp::max(1, model_cnt) as u64) as f64 * 0.002) as u64;
MAX_NODES_IN_LL_CNT = CAP_PER_LL_BYTES as usize / (sizeof!(DataDelta) + sizeof!(u64));
}
/// Returns the maximum number of nodes that can be stored inside a delta queue for a model
///
/// Currently hardcoded to 0.2% of free memory after all datasets have been loaded
pub unsafe fn get_max_delta_queue_size() -> usize {
// TODO(@ohsayan): dynamically approximate this limit
MAX_NODES_IN_LL_CNT
}

@ -41,7 +41,7 @@ use {
},
util::EndianQW,
},
std::{fs::File, marker::PhantomData},
std::marker::PhantomData,
};
mod model;
@ -61,7 +61,6 @@ pub use {
pub type GNSTransactionDriverNullZero =
GNSTransactionDriverAnyFS<crate::engine::storage::v1::NullFS>;
pub type GNSTransactionDriver = GNSTransactionDriverAnyFS<File>;
#[cfg(test)]
pub type GNSTransactionDriverVFS = GNSTransactionDriverAnyFS<VirtualFS>;
@ -106,20 +105,6 @@ impl<F: GNSTransactionDriverLLInterface> GNSTransactionDriverAnyFS<F> {
.append_journal_close_and_close()
.map_err(|e| e.into())
}
pub fn open_or_reinit(
gns: &GlobalNS,
host_setting_version: u32,
host_run_mode: header_meta::HostRunMode,
host_startup_counter: u64,
) -> TransactionResult<Self> {
Self::open_or_reinit_with_name(
gns,
"gns.db-tlog",
host_setting_version,
host_run_mode,
host_startup_counter,
)
}
pub fn open_or_reinit_with_name(
gns: &GlobalNS,
log_file_name: &str,
@ -190,7 +175,7 @@ impl JournalAdapter for GNSAdapter {
// UNSAFE(@ohsayan):
u16::from_le_bytes(scanner.next_chunk())
};
match DISPATCH[core::cmp::min(opc as usize, DISPATCH.len())](&mut scanner, gs) {
match DISPATCH[(opc as usize).min(DISPATCH.len())](&mut scanner, gs) {
Ok(()) if scanner.eof() => return Ok(()),
Ok(_) => Err(TransactionError::DecodeCorruptedPayloadMoreBytes),
Err(e) => Err(e),

@ -24,7 +24,6 @@
*
*/
pub mod data;
pub mod gns;
use super::storage::v1::SDSSError;

Loading…
Cancel
Save