From f769175083d2b07f395dd4bd126e6fafd69a40d1 Mon Sep 17 00:00:00 2001 From: Sayan Nandan Date: Thu, 18 Jan 2024 22:05:55 +0530 Subject: [PATCH] Implement and refactor to use common storage interfaces --- server/src/engine/core/space.rs | 2 +- server/src/engine/fractal/drivers.rs | 10 +- server/src/engine/fractal/mgr.rs | 2 +- server/src/engine/fractal/mod.rs | 4 +- server/src/engine/fractal/sys_store.rs | 4 +- server/src/engine/fractal/test_utils.rs | 15 +- server/src/engine/macros.rs | 12 + server/src/engine/mem/mod.rs | 6 + server/src/engine/mod.rs | 6 +- server/src/engine/storage/common/checksum.rs | 57 ++ .../engine/storage/common/interface/fs_imp.rs | 205 ++++++ .../storage/common/interface/fs_test.rs | 607 ++++++++++++++++++ .../storage/common/interface/fs_traits.rs | 201 ++++++ .../engine/storage/common/interface/mod.rs | 36 ++ server/src/engine/storage/common/mod.rs | 31 + server/src/engine/storage/common/sdss/mod.rs | 28 + server/src/engine/storage/common/sdss/spec.rs | 485 ++++++++++++++ .../src/engine/storage/common/static_meta.rs | 160 +++++ .../src/engine/storage/common/versions/mod.rs | 115 ++++ .../storage/common/versions/server_version.rs | 103 +++ server/src/engine/storage/mod.rs | 12 +- .../src/engine/storage/v1/batch_jrnl/mod.rs | 11 +- .../engine/storage/v1/batch_jrnl/persist.rs | 43 +- .../engine/storage/v1/batch_jrnl/restore.rs | 29 +- server/src/engine/storage/v1/journal.rs | 55 +- server/src/engine/storage/v1/loader.rs | 17 +- server/src/engine/storage/v1/mod.rs | 10 +- server/src/engine/storage/v1/rw.rs | 414 +++--------- server/src/engine/storage/v1/spec.rs | 585 ++--------------- server/src/engine/storage/v1/sysdb.rs | 14 +- server/src/engine/storage/v1/tests.rs | 2 +- server/src/engine/storage/v1/tests/batch.rs | 18 +- server/src/engine/storage/v1/tests/rw.rs | 6 +- server/src/engine/storage/v2/mod.rs | 27 + server/src/engine/storage/v2/spec.rs | 92 +++ server/src/engine/txn/gns/mod.rs | 13 +- 36 files changed, 2444 insertions(+), 993 deletions(-) create mode 100644 server/src/engine/storage/common/checksum.rs create mode 100644 server/src/engine/storage/common/interface/fs_imp.rs create mode 100644 server/src/engine/storage/common/interface/fs_test.rs create mode 100644 server/src/engine/storage/common/interface/fs_traits.rs create mode 100644 server/src/engine/storage/common/interface/mod.rs create mode 100644 server/src/engine/storage/common/mod.rs create mode 100644 server/src/engine/storage/common/sdss/mod.rs create mode 100644 server/src/engine/storage/common/sdss/spec.rs create mode 100644 server/src/engine/storage/common/static_meta.rs create mode 100644 server/src/engine/storage/common/versions/mod.rs create mode 100644 server/src/engine/storage/common/versions/server_version.rs create mode 100644 server/src/engine/storage/v2/mod.rs create mode 100644 server/src/engine/storage/v2/spec.rs diff --git a/server/src/engine/core/space.rs b/server/src/engine/core/space.rs index 5567d7ed..d4e71ec2 100644 --- a/server/src/engine/core/space.rs +++ b/server/src/engine/core/space.rs @@ -33,7 +33,7 @@ use { fractal::{GenericTask, GlobalInstanceLike, Task}, idx::STIndex, ql::ddl::{alt::AlterSpace, crt::CreateSpace, drop::DropSpace}, - storage::v1::{loader::SEInitState, RawFSInterface}, + storage::{safe_interfaces::FSInterface, v1::loader::SEInitState}, txn::gns as gnstxn, }, std::collections::HashSet, diff --git a/server/src/engine/fractal/drivers.rs b/server/src/engine/fractal/drivers.rs index 6a633660..6f8f43c7 100644 --- a/server/src/engine/fractal/drivers.rs +++ b/server/src/engine/fractal/drivers.rs @@ -28,7 +28,7 @@ use { super::util, crate::engine::{ error::RuntimeResult, - storage::v1::{data_batch::DataBatchPersistDriver, RawFSInterface}, + storage::{safe_interfaces::FSInterface, v1::data_batch::DataBatchPersistDriver}, txn::gns::GNSTransactionDriverAnyFS, }, parking_lot::Mutex, @@ -36,13 +36,13 @@ use { }; /// GNS driver -pub(super) struct FractalGNSDriver { +pub(super) struct FractalGNSDriver { #[allow(unused)] status: util::Status, pub(super) txn_driver: Mutex>, } -impl FractalGNSDriver { +impl FractalGNSDriver { pub(super) fn new(txn_driver: GNSTransactionDriverAnyFS) -> Self { Self { status: util::Status::new_okay(), @@ -55,13 +55,13 @@ impl FractalGNSDriver { } /// Model driver -pub struct FractalModelDriver { +pub struct FractalModelDriver { #[allow(unused)] hooks: Arc, batch_driver: Mutex>, } -impl FractalModelDriver { +impl FractalModelDriver { /// Initialize a model driver with default settings pub fn init(batch_driver: DataBatchPersistDriver) -> Self { Self { diff --git a/server/src/engine/fractal/mgr.rs b/server/src/engine/fractal/mgr.rs index ea9332ab..3ae209ab 100644 --- a/server/src/engine/fractal/mgr.rs +++ b/server/src/engine/fractal/mgr.rs @@ -33,7 +33,7 @@ use { EntityIDRef, }, data::uuid::Uuid, - storage::v1::LocalFS, + storage::safe_interfaces::LocalFS, }, util::os, }, diff --git a/server/src/engine/fractal/mod.rs b/server/src/engine/fractal/mod.rs index cc8321b5..4d3520e1 100644 --- a/server/src/engine/fractal/mod.rs +++ b/server/src/engine/fractal/mod.rs @@ -31,7 +31,7 @@ use { data::uuid::Uuid, storage::{ self, - v1::{LocalFS, RawFSInterface}, + safe_interfaces::{FSInterface, LocalFS}, }, txn::gns::GNSTransactionDriverAnyFS, }, @@ -105,7 +105,7 @@ pub unsafe fn load_and_enable_all( /// Something that represents the global state pub trait GlobalInstanceLike { - type FileSystem: RawFSInterface; + type FileSystem: FSInterface; const FS_IS_NON_NULL: bool = Self::FileSystem::NOT_NULL; // stat fn get_max_delta_size(&self) -> usize; diff --git a/server/src/engine/fractal/sys_store.rs b/server/src/engine/fractal/sys_store.rs index f20820fa..7f4f9bfa 100644 --- a/server/src/engine/fractal/sys_store.rs +++ b/server/src/engine/fractal/sys_store.rs @@ -28,7 +28,7 @@ use { crate::engine::{ config::{ConfigAuth, ConfigMode}, error::{QueryError, QueryResult}, - storage::v1::RawFSInterface, + storage::safe_interfaces::FSInterface, }, parking_lot::RwLock, std::{ @@ -147,7 +147,7 @@ impl SysHostData { } } -impl SystemStore { +impl SystemStore { pub fn _new(syscfg: SysConfig) -> Self { Self { syscfg, diff --git a/server/src/engine/fractal/test_utils.rs b/server/src/engine/fractal/test_utils.rs index ed555cd1..d2566c11 100644 --- a/server/src/engine/fractal/test_utils.rs +++ b/server/src/engine/fractal/test_utils.rs @@ -34,10 +34,7 @@ use { data::uuid::Uuid, storage::{ self, - v1::{ - memfs::{NullFS, VirtualFS}, - RawFSInterface, - }, + safe_interfaces::{FSInterface, NullFS, VirtualFS}, }, txn::gns::GNSTransactionDriverAnyFS, }, @@ -46,7 +43,7 @@ use { }; /// A `test` mode global implementation -pub struct TestGlobal { +pub struct TestGlobal { gns: GlobalNS, hp_queue: RwLock>>, lp_queue: RwLock>>, @@ -57,7 +54,7 @@ pub struct TestGlobal { sys_cfg: SystemStore, } -impl TestGlobal { +impl TestGlobal { fn new( gns: GlobalNS, max_delta_size: usize, @@ -75,7 +72,7 @@ impl TestGlobal { } } -impl TestGlobal { +impl TestGlobal { pub fn new_with_driver_id(log_name: &str) -> Self { let gns = GlobalNS::empty(); let driver = storage::v1::loader::open_gns_driver(log_name, &gns) @@ -100,7 +97,7 @@ impl TestGlobal { } } -impl GlobalInstanceLike for TestGlobal { +impl GlobalInstanceLike for TestGlobal { type FileSystem = Fs; fn namespace(&self) -> &GlobalNS { &self.gns @@ -162,7 +159,7 @@ impl GlobalInstanceLike for TestGlobal { } } -impl Drop for TestGlobal { +impl Drop for TestGlobal { fn drop(&mut self) { let mut txn_driver = self.txn_driver.lock(); txn_driver.__journal_mut().__close_mut().unwrap(); diff --git a/server/src/engine/macros.rs b/server/src/engine/macros.rs index 81fe8ff6..a774ae2a 100644 --- a/server/src/engine/macros.rs +++ b/server/src/engine/macros.rs @@ -420,3 +420,15 @@ macro_rules! local_ref { ::std::thread::LocalKey::with(&$ident, |v| _f(v, $call)) }}; } + +macro_rules! var { + (let $($name:ident),* $(,)?) => { + $(let $name;)* + } +} + +macro_rules! okay { + ($($expr:expr),* $(,)?) => { + $(($expr) &)*true + } +} diff --git a/server/src/engine/mem/mod.rs b/server/src/engine/mem/mod.rs index df24fe0d..4bb47e43 100644 --- a/server/src/engine/mem/mod.rs +++ b/server/src/engine/mem/mod.rs @@ -56,6 +56,12 @@ pub unsafe fn dealloc_array(ptr: *mut T, l: usize) { } } +pub unsafe fn memcpy(src: &[u8]) -> [u8; N] { + let mut dst = [0u8; N]; + src.as_ptr().copy_to_nonoverlapping(dst.as_mut_ptr(), N); + dst +} + /// Native double pointer width (note, native != arch native, but host native) pub struct NativeDword([usize; 2]); /// Native triple pointer width (note, native != arch native, but host native) diff --git a/server/src/engine/mod.rs b/server/src/engine/mod.rs index 73f75835..c59331d7 100644 --- a/server/src/engine/mod.rs +++ b/server/src/engine/mod.rs @@ -51,9 +51,9 @@ use { context::{self, Subsystem}, sys_store::SystemStore, }, - storage::v1::{ - loader::{self, SEInitState}, - LocalFS, + storage::{ + safe_interfaces::LocalFS, + v1::loader::{self, SEInitState}, }, }, crate::util::os::TerminationSignal, diff --git a/server/src/engine/storage/common/checksum.rs b/server/src/engine/storage/common/checksum.rs new file mode 100644 index 00000000..8dc37473 --- /dev/null +++ b/server/src/engine/storage/common/checksum.rs @@ -0,0 +1,57 @@ +/* + * Created on Sun Sep 03 2023 + * + * This file is a part of Skytable + * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source + * NoSQL database written by Sayan Nandan ("the Author") with the + * vision to provide flexibility in data modelling without compromising + * on performance, queryability or scalability. + * + * Copyright (c) 2023, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +//! # Checksum utils +//! +//! This module contains utils for handling checksums +//! + +use crc::{Crc, Digest, CRC_64_XZ}; + +/* + NOTE(@ohsayan): we're currently using crc's impl. but the reason I decided to make a wrapper is because I have a + different impl in mind +*/ + +const CRC64: Crc = Crc::::new(&CRC_64_XZ); + +pub struct SCrc64 { + digest: Digest<'static, u64>, +} + +impl SCrc64 { + pub const fn new() -> Self { + Self { + digest: CRC64.digest(), + } + } + pub fn recompute_with_new_var_block(&mut self, b: &[u8]) { + self.digest.update(b) + } + pub fn finish(self) -> u64 { + self.digest.finalize() + } +} diff --git a/server/src/engine/storage/common/interface/fs_imp.rs b/server/src/engine/storage/common/interface/fs_imp.rs new file mode 100644 index 00000000..4e56785f --- /dev/null +++ b/server/src/engine/storage/common/interface/fs_imp.rs @@ -0,0 +1,205 @@ +/* + * Created on Sun Jan 07 2024 + * + * This file is a part of Skytable + * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source + * NoSQL database written by Sayan Nandan ("the Author") with the + * vision to provide flexibility in data modelling without compromising + * on performance, queryability or scalability. + * + * Copyright (c) 2024, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +use { + super::fs_traits::{ + FSInterface, FileInterface, FileInterfaceBufWrite, FileInterfaceExt, FileInterfaceRead, + FileInterfaceWrite, FileInterfaceWriteExt, FileOpen, + }, + crate::engine::RuntimeResult, + std::{ + fs::{self, File}, + io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write}, + }, +}; + +/* + local fs impls +*/ + +/// A type representing the host's local filesystem (or atleast where our data directory is) +pub struct LocalFS; + +fn cvt>(r: Result) -> Result { + r.map_err(Into::into) +} + +impl FSInterface for LocalFS { + type File = File; + fn fs_remove_file(fpath: &str) -> RuntimeResult<()> { + cvt(fs::remove_file(fpath)) + } + fn fs_rename(from: &str, to: &str) -> RuntimeResult<()> { + cvt(fs::rename(from, to)) + } + fn fs_create_dir(fpath: &str) -> RuntimeResult<()> { + cvt(fs::create_dir(fpath)) + } + fn fs_create_dir_all(fpath: &str) -> RuntimeResult<()> { + cvt(fs::create_dir_all(fpath)) + } + fn fs_delete_dir(fpath: &str) -> RuntimeResult<()> { + cvt(fs::remove_dir(fpath)) + } + fn fs_delete_dir_all(fpath: &str) -> RuntimeResult<()> { + cvt(fs::remove_dir_all(fpath)) + } + fn fs_fopen_or_create_rw(fpath: &str) -> RuntimeResult> { + let r = || -> Result<_, std::io::Error> { + let f = File::options() + .create(true) + .read(true) + .write(true) + .open(fpath)?; + let md = f.metadata()?; + if md.len() == 0 { + Ok(FileOpen::Created(f)) + } else { + Ok(FileOpen::Existing(f)) + } + }; + cvt(r()) + } + fn fs_fopen_rw(fpath: &str) -> RuntimeResult { + let f = File::options().read(true).write(true).open(fpath)?; + Ok(f) + } + fn fs_fcreate_rw(fpath: &str) -> RuntimeResult { + let f = File::options() + .create_new(true) + .read(true) + .write(true) + .open(fpath)?; + Ok(f) + } +} + +/* + common impls for files +*/ + +impl FileInterfaceRead for R { + fn fread_exact(&mut self, buf: &mut [u8]) -> RuntimeResult<()> { + cvt(self.read_exact(buf)) + } +} + +impl FileInterfaceWrite for W { + fn fwrite(&mut self, buf: &[u8]) -> RuntimeResult { + cvt(self.write(buf).map(|v| v as _)) + } +} + +/* + local file impls +*/ + +impl FileInterface for File { + type BufReader = BufReader; + type BufWriter = BufWriter; + fn upgrade_to_buffered_reader(self) -> RuntimeResult { + Ok(BufReader::new(self)) + } + fn upgrade_to_buffered_writer(self) -> RuntimeResult { + Ok(BufWriter::new(self)) + } + fn downgrade_reader(r: Self::BufReader) -> RuntimeResult { + Ok(r.into_inner()) + } + fn downgrade_writer(mut r: Self::BufWriter) -> RuntimeResult { + // TODO(@ohsayan): maybe we'll want to explicitly handle not syncing this? + r.flush()?; + let (me, err) = r.into_parts(); + match err { + Ok(x) if x.is_empty() => Ok(me), + Ok(_) | Err(_) => { + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + "failed to flush data from buffer into sink", + ) + .into()) + } + } + } +} + +/// A trait for handling wrappers of [`std::fs::File`] +trait AsLocalFile { + fn file(&self) -> &File; + fn file_mut(&mut self) -> &mut File; +} + +impl AsLocalFile for File { + fn file(&self) -> &File { + self + } + fn file_mut(&mut self) -> &mut File { + self + } +} + +impl AsLocalFile for BufReader { + fn file(&self) -> &File { + self.get_ref() + } + fn file_mut(&mut self) -> &mut File { + self.get_mut() + } +} + +impl AsLocalFile for BufWriter { + fn file(&self) -> &File { + self.get_ref() + } + fn file_mut(&mut self) -> &mut File { + self.get_mut() + } +} + +impl FileInterfaceBufWrite for BufWriter { + fn sync_write_cache(&mut self) -> RuntimeResult<()> { + // TODO(@ohsayan): maybe we'll want to explicitly handle not syncing this? + cvt(self.flush()) + } +} + +impl FileInterfaceExt for F { + fn fext_length(&self) -> RuntimeResult { + Ok(self.file().metadata()?.len()) + } + fn fext_cursor(&mut self) -> RuntimeResult { + cvt(self.file_mut().stream_position()) + } + fn fext_seek_ahead_from_start_by(&mut self, by: u64) -> RuntimeResult<()> { + cvt(self.file_mut().seek(SeekFrom::Start(by)).map(|_| ())) + } +} + +impl FileInterfaceWriteExt for File { + fn fwext_truncate_to(&mut self, to: u64) -> RuntimeResult<()> { + cvt(self.set_len(to)) + } +} diff --git a/server/src/engine/storage/common/interface/fs_test.rs b/server/src/engine/storage/common/interface/fs_test.rs new file mode 100644 index 00000000..d36cbf30 --- /dev/null +++ b/server/src/engine/storage/common/interface/fs_test.rs @@ -0,0 +1,607 @@ +/* + * Created on Sun Jan 07 2024 + * + * This file is a part of Skytable + * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source + * NoSQL database written by Sayan Nandan ("the Author") with the + * vision to provide flexibility in data modelling without compromising + * on performance, queryability or scalability. + * + * Copyright (c) 2024, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +//! File system emulation +//! +//! This directory contains some implementations of virtual file systems (either emulating `/tmp` or +//! `/dev/null`) that are directly implemented at the application level with some necessary changes +//! required for testing +//! + +use { + super::fs_traits::{ + FSInterface, FileInterface, FileInterfaceBufWrite, FileInterfaceExt, FileInterfaceRead, + FileInterfaceWrite, FileInterfaceWriteExt, FileOpen, + }, + crate::engine::{sync::cell::Lazy, RuntimeResult}, + parking_lot::RwLock, + std::{ + collections::{ + hash_map::{Entry, OccupiedEntry}, + HashMap, + }, + io::{Error, ErrorKind}, + }, +}; + +/* + vfs definitions +*/ + +/// # VirtualFS +/// +/// A virtual file system stored entirely in the process's memory (inclusive of swap if enabled; no explicit discrimination is made) +/// +/// The virtual file system is generally intended for being utilized as an in-memory filesystem, primarily for testing +/// purposes and has a lot of limitations. +/// +/// It has support for: +/// - nested directories +/// - read/write permissions +/// - file position tracking and seeking +/// - directory operations +pub struct VirtualFS; + +/// A virtual directory +type VDir = HashMap, VNode>; +/// An iterator over the components of a file path (alias) +type ComponentIter<'a> = std::iter::Take>; + +/** + vnode + --- + either a vfile or a vdir +*/ +#[derive(Debug)] +pub(super) enum VNode { + Dir(HashMap, Self>), + File(VFile), +} + +impl VNode { + fn as_dir_mut(&mut self) -> Option<&mut VDir> { + match self { + Self::Dir(d) => Some(d), + Self::File(_) => None, + } + } +} + +/* + vfile +*/ + +#[derive(Debug)] +pub struct VFile { + read: bool, + write: bool, + data: Vec, + pos: usize, +} + +impl VFile { + fn new(read: bool, write: bool, data: Vec, pos: usize) -> Self { + Self { + read, + write, + data, + pos, + } + } + fn current(&self) -> &[u8] { + &self.data[self.pos..] + } +} + +mod err { + //! Errors + //! + //! These are custom errors returned by the virtual file system + use { + crate::engine::RuntimeResult, + std::io::{Error, ErrorKind}, + }; + pub(super) fn item_is_not_file() -> RuntimeResult { + Err(Error::new(ErrorKind::InvalidInput, "found directory, not a file").into()) + } + pub(super) fn file_in_dir_path() -> RuntimeResult { + Err(Error::new(ErrorKind::InvalidInput, "found file in directory path").into()) + } + pub(super) fn dir_missing_in_path() -> RuntimeResult { + Err(Error::new(ErrorKind::InvalidInput, "could not find directory in path").into()) + } + pub(super) fn could_not_find_item() -> RuntimeResult { + Err(Error::new(ErrorKind::NotFound, "could not find item").into()) + } +} + +mod util { + use { + super::{err, ComponentIter, VDir, VNode}, + crate::engine::RuntimeResult, + }; + pub(super) fn split_parts(fpath: &str) -> Vec<&str> { + fpath.split("/").collect() + } + pub(super) fn split_target_and_components(fpath: &str) -> (&str, ComponentIter) { + let parts = split_parts(fpath); + let target = parts.last().unwrap(); + let component_len = parts.len() - 1; + (target, parts.into_iter().take(component_len)) + } + pub(super) fn find_target_dir_mut<'a>( + components: ComponentIter, + mut current: &'a mut VDir, + ) -> RuntimeResult<&'a mut VDir> { + for component in components { + match current.get_mut(component) { + Some(VNode::Dir(d)) => current = d, + Some(VNode::File(_)) => return err::file_in_dir_path(), + None => return err::dir_missing_in_path(), + } + } + Ok(current) + } + pub(super) fn find_target_dir<'a>( + components: ComponentIter, + mut current: &'a VDir, + ) -> RuntimeResult<&'a VDir> { + for component in components { + match current.get(component) { + Some(VNode::Dir(d)) => current = d, + Some(VNode::File(_)) => return err::file_in_dir_path(), + None => return err::dir_missing_in_path(), + } + } + Ok(current) + } +} + +/* + vfs impl: + - nested directory structure + - make parents + - make child +*/ + +impl VirtualFS { + /// Get a handle to the virtual filesystem + fn handle() -> &'static RwLock { + static VFS: Lazy, fn() -> RwLock> = Lazy::new(|| Default::default()); + &VFS + } + fn with_file_mut( + fpath: &str, + mut f: impl FnMut(&mut VFile) -> RuntimeResult, + ) -> RuntimeResult { + let mut vfs = Self::handle().write(); + let (target_file, components) = util::split_target_and_components(fpath); + let target_dir = util::find_target_dir_mut(components, &mut vfs)?; + match target_dir.get_mut(target_file) { + Some(VNode::File(file)) => f(file), + Some(VNode::Dir(_)) => return err::item_is_not_file(), + None => return Err(Error::from(ErrorKind::NotFound).into()), + } + } + fn with_file( + fpath: &str, + mut f: impl FnMut(&VFile) -> RuntimeResult, + ) -> RuntimeResult { + let vfs = Self::handle().read(); + let (target_file, components) = util::split_target_and_components(fpath); + let target_dir = util::find_target_dir(components, &vfs)?; + match target_dir.get(target_file) { + Some(VNode::File(file)) => f(file), + Some(VNode::Dir(_)) => return err::item_is_not_file(), + None => return Err(Error::from(ErrorKind::NotFound).into()), + } + } + fn with_item_mut( + fpath: &str, + f: impl Fn(OccupiedEntry, VNode>) -> RuntimeResult, + ) -> RuntimeResult { + let mut vfs = Self::handle().write(); + let mut current = &mut *vfs; + // process components + let (target, components) = util::split_target_and_components(fpath); + for component in components { + match current.get_mut(component) { + Some(VNode::Dir(dir)) => { + current = dir; + } + Some(VNode::File(_)) => return err::file_in_dir_path(), + None => return err::dir_missing_in_path(), + } + } + match current.entry(target.into()) { + Entry::Occupied(item) => return f(item), + Entry::Vacant(_) => return err::could_not_find_item(), + } + } + fn delete_dir(fpath: &str, allow_if_non_empty: bool) -> RuntimeResult<()> { + Self::with_item_mut(fpath, |node| match node.get() { + VNode::Dir(d) => { + if allow_if_non_empty || d.is_empty() { + node.remove(); + return Ok(()); + } + return Err(Error::new(ErrorKind::InvalidInput, "directory is not empty").into()); + } + VNode::File(_) => return err::file_in_dir_path(), + }) + } +} + +impl FSInterface for VirtualFS { + type File = VFileDescriptor; + fn fs_rename(from: &str, to: &str) -> RuntimeResult<()> { + // get file data + let data = VirtualFS::with_file(from, |f| Ok(f.data.clone()))?; + // create new file + let file = VirtualFS::fs_fopen_or_create_rw(to)?; + match file { + FileOpen::Created(mut c) => { + c.fw_write_all(&data)?; + } + FileOpen::Existing(mut e) => { + e.fwext_truncate_to(0)?; + e.fw_write_all(&data)?; + } + } + // delete old file + Self::fs_remove_file(from) + } + fn fs_remove_file(fpath: &str) -> RuntimeResult<()> { + VirtualFS::with_item_mut(fpath, |e| match e.get() { + VNode::File(_) => { + e.remove(); + Ok(()) + } + _ => return err::item_is_not_file(), + }) + } + fn fs_create_dir(fpath: &str) -> RuntimeResult<()> { + // get vfs + let mut vfs = VirtualFS::handle().write(); + // get root dir + let mut current = &mut *vfs; + // process components + let (target, mut components) = util::split_target_and_components(fpath); + while let Some(component) = components.next() { + match current.get_mut(component) { + Some(VNode::Dir(d)) => { + current = d; + } + Some(VNode::File(_)) => return err::file_in_dir_path(), + None => return err::dir_missing_in_path(), + } + } + match current.entry(target.into()) { + Entry::Occupied(_) => return Err(Error::from(ErrorKind::AlreadyExists).into()), + Entry::Vacant(ve) => { + ve.insert(VNode::Dir(into_dict!())); + Ok(()) + } + } + } + fn fs_create_dir_all(fpath: &str) -> RuntimeResult<()> { + let mut vfs = VirtualFS::handle().write(); + fn create_ahead(mut ahead: &[&str], current: &mut VDir) -> RuntimeResult<()> { + if ahead.is_empty() { + return Ok(()); + } + let this = ahead[0]; + ahead = &ahead[1..]; + match current.get_mut(this) { + Some(VNode::Dir(d)) => { + if ahead.is_empty() { + // hmm, this was the list dir that was to be created, but it already exists + return Err(Error::from(ErrorKind::AlreadyExists).into()); + } + return create_ahead(ahead, d); + } + Some(VNode::File(_)) => return err::file_in_dir_path(), + None => { + let _ = current.insert(this.into(), VNode::Dir(into_dict!())); + let dir = current.get_mut(this).unwrap().as_dir_mut().unwrap(); + return create_ahead(ahead, dir); + } + } + } + let pieces = util::split_parts(fpath); + create_ahead(&pieces, &mut *vfs) + } + fn fs_delete_dir(fpath: &str) -> RuntimeResult<()> { + VirtualFS::delete_dir(fpath, false) + } + fn fs_delete_dir_all(fpath: &str) -> RuntimeResult<()> { + VirtualFS::delete_dir(fpath, true) + } + fn fs_fopen_or_create_rw(fpath: &str) -> RuntimeResult> { + let mut vfs = VirtualFS::handle().write(); + // components + let (target_file, components) = util::split_target_and_components(fpath); + let target_dir = util::find_target_dir_mut(components, &mut vfs)?; + match target_dir.entry(target_file.into()) { + Entry::Occupied(mut oe) => match oe.get_mut() { + VNode::File(f) => { + f.read = true; + f.write = true; + Ok(FileOpen::Existing(VFileDescriptor(fpath.into()))) + } + VNode::Dir(_) => return err::item_is_not_file(), + }, + Entry::Vacant(v) => { + v.insert(VNode::File(VFile::new(true, true, vec![], 0))); + Ok(FileOpen::Created(VFileDescriptor(fpath.into()))) + } + } + } + fn fs_fcreate_rw(fpath: &str) -> RuntimeResult { + let mut vfs = VirtualFS::handle().write(); + let (target_file, components) = util::split_target_and_components(fpath); + let target_dir = util::find_target_dir_mut(components, &mut vfs)?; + match target_dir.entry(target_file.into()) { + Entry::Occupied(k) => { + match k.get() { + VNode::Dir(_) => { + return Err(Error::new( + ErrorKind::AlreadyExists, + "found directory with same name where file was to be created", + ) + .into()); + } + VNode::File(_) => { + // the file already exists + return Err(Error::new( + ErrorKind::AlreadyExists, + "the file already exists", + ) + .into()); + } + } + } + Entry::Vacant(v) => { + // no file exists, we can create this + v.insert(VNode::File(VFile::new(true, true, vec![], 0))); + Ok(VFileDescriptor(fpath.into())) + } + } + } + fn fs_fopen_rw(fpath: &str) -> RuntimeResult { + VirtualFS::with_file_mut(fpath, |f| { + f.read = true; + f.write = true; + Ok(VFileDescriptor(fpath.into())) + }) + } +} + +/* + vfile & descriptor impls + (this is our `File` but a temporary, completely in-memory file) +*/ + +pub struct VFileDescriptor(Box); +impl Drop for VFileDescriptor { + fn drop(&mut self) { + let _ = VirtualFS::with_file_mut(&self.0, |f| { + f.read = false; + f.write = false; + f.pos = 0; + Ok(()) + }); + } +} + +impl FileInterface for VFileDescriptor { + type BufReader = Self; + type BufWriter = Self; + fn upgrade_to_buffered_reader(self) -> RuntimeResult { + Ok(self) + } + fn upgrade_to_buffered_writer(self) -> RuntimeResult { + Ok(self) + } + fn downgrade_reader(r: Self::BufReader) -> RuntimeResult { + Ok(r) + } + fn downgrade_writer(r: Self::BufWriter) -> RuntimeResult { + Ok(r) + } +} + +impl FileInterfaceRead for VFileDescriptor { + fn fread_exact(&mut self, buf: &mut [u8]) -> RuntimeResult<()> { + VirtualFS::with_file_mut(&self.0, |file| { + if !file.read { + return Err( + Error::new(ErrorKind::PermissionDenied, "Read permission denied").into(), + ); + } + let available_bytes = file.current().len(); + if available_bytes < buf.len() { + return Err(Error::from(ErrorKind::UnexpectedEof).into()); + } + buf.copy_from_slice(&file.data[file.pos..file.pos + buf.len()]); + file.pos += buf.len(); + Ok(()) + }) + } +} + +impl FileInterfaceWrite for VFileDescriptor { + fn fwrite(&mut self, bytes: &[u8]) -> RuntimeResult { + VirtualFS::with_file_mut(&self.0, |file| { + if !file.write { + return Err( + Error::new(ErrorKind::PermissionDenied, "Write permission denied").into(), + ); + } + if file.pos + bytes.len() > file.data.len() { + file.data.resize(file.pos + bytes.len(), 0); + } + file.data[file.pos..file.pos + bytes.len()].copy_from_slice(bytes); + file.pos += bytes.len(); + Ok(bytes.len() as _) + }) + } +} + +impl FileInterfaceWriteExt for VFileDescriptor { + fn fwext_truncate_to(&mut self, to: u64) -> RuntimeResult<()> { + VirtualFS::with_file_mut(&self.0, |file| { + if !file.write { + return Err( + Error::new(ErrorKind::PermissionDenied, "Write permission denied").into(), + ); + } + if to as usize > file.data.len() { + file.data.resize(to as usize, 0); + } else { + file.data.truncate(to as usize); + } + if file.pos > file.data.len() { + file.pos = file.data.len(); + } + Ok(()) + }) + } +} + +impl FileInterfaceBufWrite for VFileDescriptor { + fn sync_write_cache(&mut self) -> RuntimeResult<()> { + Ok(()) + } +} + +impl FileInterfaceExt for VFileDescriptor { + fn fext_length(&self) -> RuntimeResult { + VirtualFS::with_file(&self.0, |f| Ok(f.data.len() as u64)) + } + fn fext_cursor(&mut self) -> RuntimeResult { + VirtualFS::with_file(&self.0, |f| Ok(f.pos as u64)) + } + fn fext_seek_ahead_from_start_by(&mut self, by: u64) -> RuntimeResult<()> { + VirtualFS::with_file_mut(&self.0, |file| { + if by > file.data.len() as u64 { + return Err( + Error::new(ErrorKind::InvalidInput, "Can't seek beyond file's end").into(), + ); + } + file.pos = by as usize; + Ok(()) + }) + } +} + +/// An application level implementation of `/dev/null` with some changes +pub struct NullFS; +/// A handle to a file in `/dev/null` (emulated) +pub struct NullFile; + +impl FSInterface for NullFS { + type File = NullFile; + fn fs_remove_file(_: &str) -> RuntimeResult<()> { + Ok(()) + } + fn fs_rename(_: &str, _: &str) -> RuntimeResult<()> { + Ok(()) + } + fn fs_create_dir(_: &str) -> RuntimeResult<()> { + Ok(()) + } + fn fs_create_dir_all(_: &str) -> RuntimeResult<()> { + Ok(()) + } + fn fs_delete_dir(_: &str) -> RuntimeResult<()> { + Ok(()) + } + fn fs_delete_dir_all(_: &str) -> RuntimeResult<()> { + Ok(()) + } + fn fs_fopen_or_create_rw(_: &str) -> RuntimeResult> { + Ok(FileOpen::Created(NullFile)) + } + fn fs_fopen_rw(_: &str) -> RuntimeResult { + Ok(NullFile) + } + fn fs_fcreate_rw(_: &str) -> RuntimeResult { + Ok(NullFile) + } +} + +impl FileInterface for NullFile { + type BufReader = Self; + type BufWriter = Self; + fn upgrade_to_buffered_reader(self) -> RuntimeResult { + Ok(self) + } + fn upgrade_to_buffered_writer(self) -> RuntimeResult { + Ok(self) + } + fn downgrade_reader(r: Self::BufReader) -> RuntimeResult { + Ok(r) + } + fn downgrade_writer(r: Self::BufWriter) -> RuntimeResult { + Ok(r) + } +} + +impl FileInterfaceWrite for NullFile { + fn fwrite(&mut self, buf: &[u8]) -> RuntimeResult { + Ok(buf.len() as _) + } +} + +impl FileInterfaceWriteExt for NullFile { + fn fwext_truncate_to(&mut self, _: u64) -> RuntimeResult<()> { + Ok(()) + } +} + +impl FileInterfaceRead for NullFile { + fn fread_exact(&mut self, _: &mut [u8]) -> RuntimeResult<()> { + Ok(()) + } +} + +impl FileInterfaceExt for NullFile { + fn fext_length(&self) -> RuntimeResult { + Ok(0) + } + fn fext_cursor(&mut self) -> RuntimeResult { + Ok(0) + } + fn fext_seek_ahead_from_start_by(&mut self, _: u64) -> RuntimeResult<()> { + Ok(()) + } +} +impl FileInterfaceBufWrite for NullFile { + fn sync_write_cache(&mut self) -> RuntimeResult<()> { + Ok(()) + } +} diff --git a/server/src/engine/storage/common/interface/fs_traits.rs b/server/src/engine/storage/common/interface/fs_traits.rs new file mode 100644 index 00000000..4e1ae40a --- /dev/null +++ b/server/src/engine/storage/common/interface/fs_traits.rs @@ -0,0 +1,201 @@ +/* + * Created on Sun Jan 07 2024 + * + * This file is a part of Skytable + * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source + * NoSQL database written by Sayan Nandan ("the Author") with the + * vision to provide flexibility in data modelling without compromising + * on performance, queryability or scalability. + * + * Copyright (c) 2024, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +use { + crate::engine::RuntimeResult, + std::io::{Error as IoError, ErrorKind as IoErrorKind}, +}; + +#[derive(Debug, PartialEq)] +/// Result of opening a file +/// - Created: newly created file +/// - Existing: existing file that was reopened +pub enum FileOpen { + /// new file + Created(CF), + /// existing file + Existing(EF), +} + +#[cfg(test)] +impl FileOpen { + pub fn into_existing(self) -> Option { + match self { + Self::Existing(e) => Some(e), + Self::Created(_) => None, + } + } + pub fn into_created(self) -> Option { + match self { + Self::Existing(_) => None, + Self::Created(c) => Some(c), + } + } +} + +#[cfg(test)] +impl FileOpen { + pub fn into_inner(self) -> CF { + match self { + Self::Created(f) | Self::Existing(f) => f, + } + } +} + +pub trait FSInterface { + // settings + /// set to false if the file system is a special device like `/dev/null` + const NOT_NULL: bool = true; + // types + /// the file type that is returned by this file system + type File: FileInterface; + // functions + /// Remove a file + fn fs_remove_file(fpath: &str) -> RuntimeResult<()>; + /// Rename a file + fn fs_rename(from: &str, to: &str) -> RuntimeResult<()>; + /// Create a directory + fn fs_create_dir(fpath: &str) -> RuntimeResult<()>; + /// Create a directory and all corresponding path components + fn fs_create_dir_all(fpath: &str) -> RuntimeResult<()>; + /// Delete a directory + fn fs_delete_dir(fpath: &str) -> RuntimeResult<()>; + /// Delete a directory and recursively remove all (if any) children + fn fs_delete_dir_all(fpath: &str) -> RuntimeResult<()>; + /// Open or create a file in R/W mode + /// + /// This will: + /// - Create a file if it doesn't exist + /// - Open a file it it does exist + fn fs_fopen_or_create_rw(fpath: &str) -> RuntimeResult>; + /// Open an existing file + fn fs_fopen_rw(fpath: &str) -> RuntimeResult; + /// Create a new file + fn fs_fcreate_rw(fpath: &str) -> RuntimeResult; +} + +/// File interface definition +pub trait FileInterface: + FileInterfaceRead + FileInterfaceWrite + FileInterfaceWriteExt + FileInterfaceExt + Sized +{ + /// A buffered reader implementation + type BufReader: FileInterfaceRead + FileInterfaceExt; + /// A buffered writer implementation + type BufWriter: FileInterfaceBufWrite; + /// Get a buffered reader for this file + fn upgrade_to_buffered_reader(self) -> RuntimeResult; + /// Get a buffered writer for this file + fn upgrade_to_buffered_writer(self) -> RuntimeResult; + /// Get the file back from the buffered reader + fn downgrade_reader(r: Self::BufReader) -> RuntimeResult; + /// Get the file back from the buffered writer + fn downgrade_writer(r: Self::BufWriter) -> RuntimeResult; +} + +pub trait FileInterfaceBufWrite: FileInterfaceWrite + FileInterfaceExt { + fn sync_write_cache(&mut self) -> RuntimeResult<()>; +} + +/// Readable object +pub trait FileInterfaceRead { + /// Read in a block of the exact given length + fn fread_exact_block(&mut self) -> RuntimeResult<[u8; N]> { + let mut ret = [0u8; N]; + self.fread_exact(&mut ret)?; + Ok(ret) + } + /// Read in `n` bytes to fill the given buffer + fn fread_exact(&mut self, buf: &mut [u8]) -> RuntimeResult<()>; +} + +/// Writable object +pub trait FileInterfaceWrite { + /// Attempt to write the buffer into this object, returning the number of bytes that were + /// written. It is **NOT GUARANTEED THAT ALL DATA WILL BE WRITTEN** + fn fwrite(&mut self, buf: &[u8]) -> RuntimeResult; + /// Attempt to write the entire buffer into this object, returning the number of bytes written + /// + /// It is guaranteed that if the [`Result`] returned is [`Ok(())`], then the entire buffer was + /// written to disk. + fn fwrite_all_count(&mut self, buf: &[u8]) -> (u64, RuntimeResult<()>) { + let len = buf.len() as u64; + let mut written = 0; + while written != len { + match self.fwrite(buf) { + Ok(0) => { + return ( + written, + Err(IoError::new( + IoErrorKind::WriteZero, + format!("could only write {} of {} bytes", written, buf.len()), + ) + .into()), + ) + } + Ok(n) => written += n, + Err(e) => return (written, Err(e)), + } + } + (written, Ok(())) + } + /// Attempt to write the entire buffer into this object + /// + /// If this return [`Ok(())`] then it is guaranteed that all bytes have been written + fn fw_write_all(&mut self, buf: &[u8]) -> RuntimeResult<()> { + self.fwrite_all_count(buf).1 + } +} + +/// Advanced write traits +pub trait FileInterfaceWriteExt { + /// Sync data and metadata for this file + fn fwext_sync_all(&mut self) -> RuntimeResult<()> { + Ok(()) + } + /// Sync data for this file + fn fwext_sync_data(&mut self) -> RuntimeResult<()> { + Ok(()) + } + /// Sync meta for this file + fn fwext_sync_meta(&mut self) -> RuntimeResult<()> { + Ok(()) + } + /// Truncate the size of the file to the given size + /// + /// - If `to` > actual file length: the file is zero padded to fill `to - len` + /// - If `to` < actual file length: the file is trimmed to the size `to` + fn fwext_truncate_to(&mut self, to: u64) -> RuntimeResult<()>; +} + +/// Advanced file access +pub trait FileInterfaceExt { + /// Returns the length of the file + fn fext_length(&self) -> RuntimeResult; + /// Returns the current cursor position of the file + fn fext_cursor(&mut self) -> RuntimeResult; + /// Seek by `from` bytes from the start of the file + fn fext_seek_ahead_from_start_by(&mut self, by: u64) -> RuntimeResult<()>; +} diff --git a/server/src/engine/storage/common/interface/mod.rs b/server/src/engine/storage/common/interface/mod.rs new file mode 100644 index 00000000..237a1d51 --- /dev/null +++ b/server/src/engine/storage/common/interface/mod.rs @@ -0,0 +1,36 @@ +/* + * Created on Sun Jan 07 2024 + * + * This file is a part of Skytable + * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source + * NoSQL database written by Sayan Nandan ("the Author") with the + * vision to provide flexibility in data modelling without compromising + * on performance, queryability or scalability. + * + * Copyright (c) 2024, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +//! # FS abstractions +//! +//! This module defines abstractions over file systems (whether physical or virtual) and provides +//! traits that provide an unified API for all file systems irrespective of their base impl +//! + +pub mod fs_imp; +#[cfg(test)] +pub mod fs_test; +pub mod fs_traits; diff --git a/server/src/engine/storage/common/mod.rs b/server/src/engine/storage/common/mod.rs new file mode 100644 index 00000000..57e8015e --- /dev/null +++ b/server/src/engine/storage/common/mod.rs @@ -0,0 +1,31 @@ +/* + * Created on Tue Jan 09 2024 + * + * This file is a part of Skytable + * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source + * NoSQL database written by Sayan Nandan ("the Author") with the + * vision to provide flexibility in data modelling without compromising + * on performance, queryability or scalability. + * + * Copyright (c) 2024, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +pub mod checksum; +pub mod interface; +pub mod sdss; +pub mod static_meta; +pub mod versions; diff --git a/server/src/engine/storage/common/sdss/mod.rs b/server/src/engine/storage/common/sdss/mod.rs new file mode 100644 index 00000000..56c9f396 --- /dev/null +++ b/server/src/engine/storage/common/sdss/mod.rs @@ -0,0 +1,28 @@ +/* + * Created on Fri Jan 12 2024 + * + * This file is a part of Skytable + * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source + * NoSQL database written by Sayan Nandan ("the Author") with the + * vision to provide flexibility in data modelling without compromising + * on performance, queryability or scalability. + * + * Copyright (c) 2024, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +mod spec; +pub use spec::{FileSpecV1, HeaderV1, HeaderV1Enumeration, HeaderV1Spec, SimpleFileSpecV1}; diff --git a/server/src/engine/storage/common/sdss/spec.rs b/server/src/engine/storage/common/sdss/spec.rs new file mode 100644 index 00000000..6d65717a --- /dev/null +++ b/server/src/engine/storage/common/sdss/spec.rs @@ -0,0 +1,485 @@ +/* + * Created on Wed Jan 10 2024 + * + * This file is a part of Skytable + * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source + * NoSQL database written by Sayan Nandan ("the Author") with the + * vision to provide flexibility in data modelling without compromising + * on performance, queryability or scalability. + * + * Copyright (c) 2024, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +/*! + # SDSS spec + + This module provides traits and types to deal with the SDSS spec, especially headers. + + The static SDSS header block has a special segment that defines the header version which is static and will + never change across any versions. While the same isn't warranted for the rest of the header, it's exceedingly + unlikely that we'll ever change the static block. + + The only header that we currently use is [`HeaderV1`]. +*/ + +use { + super::super::{ + interface::fs_traits::{FileInterfaceRead, FileInterfaceWrite}, + static_meta::{HostArch, HostEndian, HostOS, HostPointerWidth, SDSS_MAGIC_8B}, + versions::{self, DriverVersion, FileSpecifierVersion, HeaderVersion, ServerVersion}, + }, + crate::{ + engine::{error::StorageError, mem::memcpy, RuntimeResult}, + util::os, + }, + std::{ + mem::{transmute, ManuallyDrop}, + ops::Range, + }, +}; + +/* + header utils +*/ + +pub trait HeaderV1Enumeration { + /// the maximum value of this enumeration + const MAX: u8; + /// Create a new enumeration, given that the maximum is validated + unsafe fn new(x: u8) -> Self; + /// Return the 1B repr of the enumeration + fn repr_u8(&self) -> u8; +} + +/// A trait that enables customizing the SDSS header for a specific version tuple +pub trait HeaderV1Spec { + // types + /// The file class type + type FileClass: HeaderV1Enumeration + Copy + PartialEq; + /// The file specifier type + type FileSpecifier: HeaderV1Enumeration + Copy + PartialEq; + // constants + /// The server version to use during encode + /// + /// NB: This is NOT the compatible version but rather the current version + const CURRENT_SERVER_VERSION: ServerVersion; + /// The driver version to use during encode + /// + /// NB: This is NOT the compatible version but rather the current version + const CURRENT_DRIVER_VERSION: DriverVersion; + /// The file class to use and verify at encode/decode time + /// check server version compatibility is valid at decode time + fn check_if_server_version_compatible(v: ServerVersion) -> bool { + v == Self::CURRENT_SERVER_VERSION + } + /// check driver version compatibility is valid at decode time + fn check_if_driver_version_compatible(v: DriverVersion) -> bool { + v == Self::CURRENT_DRIVER_VERSION + } +} + +/* + Compact SDSS Header v1 + --- + - 1: Magic block (16B): magic + header version + - 2: Static block (40B): + - 2.1: Genesis static record (24B) + - 2.1.1: Software information (16B) + - Server version (8B) + - Driver version (8B) + - 2.1.2: Host information (4B): + - OS (1B) + - Arch (1B) + - Pointer width (1B) + - Endian (1B) + - 2.1.3: File information (4B): + - File class (1B) + - File specifier (1B) + - File specifier version (2B) + - 2.2: Genesis runtime record (16B) + - Host epoch (16B) + - 3: Padding block (8B) +*/ + +#[repr(align(8))] +#[derive(Debug, PartialEq)] +pub struct HeaderV1 { + // 1 magic block + magic_header_version: HeaderVersion, + // 2.1.1 + genesis_static_sw_server_version: ServerVersion, + genesis_static_sw_driver_version: DriverVersion, + // 2.1.2 + genesis_static_host_os: HostOS, + genesis_static_host_arch: HostArch, + genesis_static_host_ptr_width: HostPointerWidth, + genesis_static_host_endian: HostEndian, + // 2.1.3 + genesis_static_file_class: H::FileClass, + genesis_static_file_specifier: H::FileSpecifier, + genesis_static_file_specifier_version: FileSpecifierVersion, + // 2.2 + genesis_runtime_epoch_time: u128, + // 3 + genesis_padding_block: [u8; 8], +} + +impl HeaderV1 { + const SEG1_MAGIC: Range = 0..8; + const SEG1_HEADER_VERSION: Range = 8..16; + const SEG2_REC1_SERVER_VERSION: Range = 16..24; + const SEG2_REC1_DRIVER_VERSION: Range = 24..32; + const SEG2_REC1_HOST_OS: usize = 32; + const SEG2_REC1_HOST_ARCH: usize = 33; + const SEG2_REC1_HOST_PTR_WIDTH: usize = 34; + const SEG2_REC1_HOST_ENDIAN: usize = 35; + const SEG2_REC1_FILE_CLASS: usize = 36; + const SEG2_REC1_FILE_SPECIFIER: usize = 37; + const SEG2_REC1_FILE_SPECIFIER_VERSION: Range = 38..40; + const SEG2_REC2_RUNTIME_EPOCH_TIME: Range = 40..56; + const SEG3_PADDING_BLK: Range = 56..64; + pub const SIZE: usize = 64; + #[inline(always)] + fn _new_auto( + file_class: H::FileClass, + file_specifier: H::FileSpecifier, + file_specifier_version: FileSpecifierVersion, + epoch_time: u128, + genesis_padding_block: [u8; 8], + ) -> Self { + Self::_new( + versions::HEADER_V1, + H::CURRENT_SERVER_VERSION, + H::CURRENT_DRIVER_VERSION, + HostOS::new(), + HostArch::new(), + HostPointerWidth::new(), + HostEndian::new(), + file_class, + file_specifier, + file_specifier_version, + epoch_time, + genesis_padding_block, + ) + } + #[inline(always)] + fn _new( + magic_header_version: HeaderVersion, + genesis_static_sw_server_version: ServerVersion, + genesis_static_sw_driver_version: DriverVersion, + genesis_static_host_os: HostOS, + genesis_static_host_arch: HostArch, + genesis_static_host_ptr_width: HostPointerWidth, + genesis_static_host_endian: HostEndian, + genesis_static_file_class: H::FileClass, + genesis_static_file_specifier: H::FileSpecifier, + genesis_static_file_specifier_version: FileSpecifierVersion, + genesis_runtime_epoch_time: u128, + genesis_padding_block: [u8; 8], + ) -> Self { + Self { + magic_header_version, + genesis_static_sw_server_version, + genesis_static_sw_driver_version, + genesis_static_host_os, + genesis_static_host_arch, + genesis_static_host_ptr_width, + genesis_static_host_endian, + genesis_static_file_class, + genesis_static_file_specifier, + genesis_static_file_specifier_version, + genesis_runtime_epoch_time, + genesis_padding_block, + } + } + fn _encode_auto_raw( + file_class: H::FileClass, + file_specifier: H::FileSpecifier, + file_specifier_version: FileSpecifierVersion, + epoch_time: u128, + padding_block: [u8; 8], + ) -> [u8; 64] { + let mut ret = [0; 64]; + // 1. mgblk + ret[Self::SEG1_MAGIC].copy_from_slice(&SDSS_MAGIC_8B.to_le_bytes()); + ret[Self::SEG1_HEADER_VERSION] + .copy_from_slice(&versions::v1::V1_HEADER_VERSION.little_endian_u64()); + // 2.1.1 + ret[Self::SEG2_REC1_SERVER_VERSION] + .copy_from_slice(&H::CURRENT_SERVER_VERSION.little_endian()); + ret[Self::SEG2_REC1_DRIVER_VERSION] + .copy_from_slice(&H::CURRENT_DRIVER_VERSION.little_endian()); + // 2.1.2 + ret[Self::SEG2_REC1_HOST_OS] = HostOS::new().value_u8(); + ret[Self::SEG2_REC1_HOST_ARCH] = HostArch::new().value_u8(); + ret[Self::SEG2_REC1_HOST_PTR_WIDTH] = HostPointerWidth::new().value_u8(); + ret[Self::SEG2_REC1_HOST_ENDIAN] = HostEndian::new().value_u8(); + // 2.1.3 + ret[Self::SEG2_REC1_FILE_CLASS] = file_class.repr_u8(); + ret[Self::SEG2_REC1_FILE_SPECIFIER] = file_specifier.repr_u8(); + ret[Self::SEG2_REC1_FILE_SPECIFIER_VERSION] + .copy_from_slice(&file_specifier_version.little_endian()); + // 2.2 + ret[Self::SEG2_REC2_RUNTIME_EPOCH_TIME].copy_from_slice(&epoch_time.to_le_bytes()); + // 3 + ret[Self::SEG3_PADDING_BLK].copy_from_slice(&padding_block); + ret + } + fn encode_return( + file_class: H::FileClass, + file_specifier: H::FileSpecifier, + file_specifier_version: FileSpecifierVersion, + ) -> (Self, [u8; 64]) { + let epoch_time = os::get_epoch_time(); + let encoded = Self::_encode_auto_raw( + file_class, + file_specifier, + file_specifier_version, + epoch_time, + [0; 8], + ); + let me = Self::_new_auto( + file_class, + file_specifier, + file_specifier_version, + epoch_time, + [0; 8], + ); + (me, encoded) + } + /// Decode and validate the full header block (validate ONLY; you must verify yourself) + /// + /// Notes: + /// - Time might be inconsistent; verify + /// - Compatibility requires additional intervention + /// - If padding block was not zeroed, handle + /// - No file metadata is verified. Check! + /// + pub fn decode(block: [u8; 64]) -> Result { + var!(let raw_magic, raw_header_version, raw_server_version, raw_driver_version, raw_host_os, raw_host_arch, + raw_host_ptr_width, raw_host_endian, raw_file_class, raw_file_specifier, raw_file_specifier_version, + raw_runtime_epoch_time, raw_paddding_block, + ); + macro_rules! u64 { + ($pos:expr) => { + u64::from_le_bytes(memcpy(&block[$pos])) + }; + } + unsafe { + // UNSAFE(@ohsayan): all segments are correctly accessed (aligned to u8) + raw_magic = u64!(Self::SEG1_MAGIC); + raw_header_version = HeaderVersion::__new(u64!(Self::SEG1_HEADER_VERSION)); + raw_server_version = ServerVersion::__new(u64!(Self::SEG2_REC1_SERVER_VERSION)); + raw_driver_version = DriverVersion::__new(u64!(Self::SEG2_REC1_DRIVER_VERSION)); + raw_host_os = block[Self::SEG2_REC1_HOST_OS]; + raw_host_arch = block[Self::SEG2_REC1_HOST_ARCH]; + raw_host_ptr_width = block[Self::SEG2_REC1_HOST_PTR_WIDTH]; + raw_host_endian = block[Self::SEG2_REC1_HOST_ENDIAN]; + raw_file_class = block[Self::SEG2_REC1_FILE_CLASS]; + raw_file_specifier = block[Self::SEG2_REC1_FILE_SPECIFIER]; + raw_file_specifier_version = FileSpecifierVersion::__new(u16::from_le_bytes(memcpy( + &block[Self::SEG2_REC1_FILE_SPECIFIER_VERSION], + ))); + raw_runtime_epoch_time = + u128::from_le_bytes(memcpy(&block[Self::SEG2_REC2_RUNTIME_EPOCH_TIME])); + raw_paddding_block = memcpy::<8>(&block[Self::SEG3_PADDING_BLK]); + } + let okay_header_version = raw_header_version == versions::HEADER_V1; + let okay_server_version = H::check_if_server_version_compatible(raw_server_version); + let okay_driver_version = H::check_if_driver_version_compatible(raw_driver_version); + let okay = okay!( + // 1.1 mgblk + raw_magic == SDSS_MAGIC_8B, + okay_header_version, + // 2.1.1 + okay_server_version, + okay_driver_version, + // 2.1.2 + raw_host_os <= HostOS::MAX, + raw_host_arch <= HostArch::MAX, + raw_host_ptr_width <= HostPointerWidth::MAX, + raw_host_endian <= HostEndian::MAX, + // 2.1.3 + raw_file_class <= H::FileClass::MAX, + raw_file_specifier <= H::FileSpecifier::MAX, + ); + if okay { + Ok(unsafe { + // UNSAFE(@ohsayan): the block ranges are very well defined + Self::_new( + // 1.1 + raw_header_version, + // 2.1.1 + raw_server_version, + raw_driver_version, + // 2.1.2 + transmute(raw_host_os), + transmute(raw_host_arch), + transmute(raw_host_ptr_width), + transmute(raw_host_endian), + // 2.1.3 + H::FileClass::new(raw_file_class), + H::FileSpecifier::new(raw_file_specifier), + raw_file_specifier_version, + // 2.2 + raw_runtime_epoch_time, + // 3 + raw_paddding_block, + ) + }) + } else { + let version_okay = okay_header_version & okay_server_version & okay_driver_version; + let md = ManuallyDrop::new([ + StorageError::HeaderDecodeCorruptedHeader, + StorageError::HeaderDecodeVersionMismatch, + ]); + Err(unsafe { + // UNSAFE(@ohsayan): while not needed, md for drop safety + correct index + md.as_ptr().add(!version_okay as usize).read().into() + }) + } + } +} + +#[allow(unused)] +impl HeaderV1 { + pub fn header_version(&self) -> HeaderVersion { + self.magic_header_version + } + pub fn server_version(&self) -> ServerVersion { + self.genesis_static_sw_server_version + } + pub fn driver_version(&self) -> DriverVersion { + self.genesis_static_sw_driver_version + } + pub fn host_os(&self) -> HostOS { + self.genesis_static_host_os + } + pub fn host_arch(&self) -> HostArch { + self.genesis_static_host_arch + } + pub fn host_ptr_width(&self) -> HostPointerWidth { + self.genesis_static_host_ptr_width + } + pub fn host_endian(&self) -> HostEndian { + self.genesis_static_host_endian + } + pub fn file_class(&self) -> H::FileClass { + self.genesis_static_file_class + } + pub fn file_specifier(&self) -> H::FileSpecifier { + self.genesis_static_file_specifier + } + pub fn file_specifier_version(&self) -> FileSpecifierVersion { + self.genesis_static_file_specifier_version + } + pub fn epoch_time(&self) -> u128 { + self.genesis_runtime_epoch_time + } + pub fn padding_block(&self) -> [u8; 8] { + self.genesis_padding_block + } +} + +pub trait FileSpecV1 { + type Metadata; + /// the header type + type HeaderSpec: HeaderV1Spec; + /// the args need to validate the metadata (for example, additional context) + type EncodeArgs; + type DecodeArgs; + /// validate the metadata + fn validate_metadata( + md: HeaderV1, + v_args: Self::DecodeArgs, + ) -> RuntimeResult; + /// read and validate metadata (only override if you need to) + fn read_metadata( + f: &mut impl FileInterfaceRead, + v_args: Self::DecodeArgs, + ) -> RuntimeResult { + let md = HeaderV1::decode(f.fread_exact_block()?)?; + Self::validate_metadata(md, v_args) + } + /// write metadata + fn write_metadata( + f: &mut impl FileInterfaceWrite, + args: Self::EncodeArgs, + ) -> RuntimeResult; +} + +/// # Simple SDSS file specification (v1) +/// +/// ## Decode and verify +/// A simple SDSS file specification that checks if: +/// - the file class, +/// - file specifier and +/// - file specifier revision +/// +/// match +/// +/// ## Version Compatibility +/// +/// Also, the [`HeaderV1Spec`] is supposed to address compatibility across server and driver versions +pub trait SimpleFileSpecV1 { + type HeaderSpec: HeaderV1Spec; + const FILE_CLASS: ::FileClass; + const FILE_SPECIFIER: ::FileSpecifier; + const FILE_SPECFIER_VERSION: FileSpecifierVersion; + fn check_if_file_specifier_revision_is_compatible( + v: FileSpecifierVersion, + ) -> RuntimeResult<()> { + if v == Self::FILE_SPECFIER_VERSION { + Ok(()) + } else { + Err(StorageError::HeaderDecodeVersionMismatch.into()) + } + } +} + +impl FileSpecV1 for Sfs { + type Metadata = HeaderV1; + type HeaderSpec = ::HeaderSpec; + type DecodeArgs = (); + type EncodeArgs = (); + fn validate_metadata( + md: HeaderV1, + _: Self::DecodeArgs, + ) -> RuntimeResult { + let okay = okay!( + md.file_class() == Self::FILE_CLASS, + md.file_specifier() == Self::FILE_SPECIFIER, + ); + Self::check_if_file_specifier_revision_is_compatible(md.file_specifier_version())?; + if okay { + Ok(md) + } else { + Err(StorageError::HeaderDecodeVersionMismatch.into()) + } + } + fn write_metadata( + f: &mut impl FileInterfaceWrite, + _: Self::EncodeArgs, + ) -> RuntimeResult { + let (md, block) = HeaderV1::::encode_return( + Self::FILE_CLASS, + Self::FILE_SPECIFIER, + Self::FILE_SPECFIER_VERSION, + ); + f.fw_write_all(&block).map(|_| md) + } +} diff --git a/server/src/engine/storage/common/static_meta.rs b/server/src/engine/storage/common/static_meta.rs new file mode 100644 index 00000000..8d338d8c --- /dev/null +++ b/server/src/engine/storage/common/static_meta.rs @@ -0,0 +1,160 @@ +/* + * Created on Tue Jan 09 2024 + * + * This file is a part of Skytable + * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source + * NoSQL database written by Sayan Nandan ("the Author") with the + * vision to provide flexibility in data modelling without compromising + * on performance, queryability or scalability. + * + * Copyright (c) 2024, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +//! # Static metadata +//! +//! Compile-time metadata used by storage engine implementations +//! + +/// The 8B SDSS magic block +pub const SDSS_MAGIC_8B: u64 = 0x4F48534159414E21; + +#[repr(u8)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, sky_macros::EnumMethods)] +/// Host architecture enumeration for common platforms +pub enum HostArch { + X86 = 0, + X86_64 = 1, + ARM = 2, + ARM64 = 3, + MIPS = 4, + PowerPC = 5, +} + +impl HostArch { + pub const fn new() -> Self { + if cfg!(target_arch = "x86") { + HostArch::X86 + } else if cfg!(target_arch = "x86_64") { + HostArch::X86_64 + } else if cfg!(target_arch = "arm") { + HostArch::ARM + } else if cfg!(target_arch = "aarch64") { + HostArch::ARM64 + } else if cfg!(target_arch = "mips") { + HostArch::MIPS + } else if cfg!(target_arch = "powerpc") { + HostArch::PowerPC + } else { + panic!("Unsupported target architecture") + } + } +} + +#[repr(u8)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, sky_macros::EnumMethods)] +/// Host OS enumeration for common operating systems +pub enum HostOS { + // T1 + Linux = 0, + Windows = 1, + MacOS = 2, + // T2 + Android = 3, + AppleiOS = 4, + FreeBSD = 5, + OpenBSD = 6, + NetBSD = 7, + WASI = 8, + Emscripten = 9, + // T3 + Solaris = 10, + Fuchsia = 11, + Redox = 12, + DragonFly = 13, +} + +impl HostOS { + pub const fn new() -> Self { + if cfg!(target_os = "linux") { + HostOS::Linux + } else if cfg!(target_os = "windows") { + HostOS::Windows + } else if cfg!(target_os = "macos") { + HostOS::MacOS + } else if cfg!(target_os = "android") { + HostOS::Android + } else if cfg!(target_os = "ios") { + HostOS::AppleiOS + } else if cfg!(target_os = "freebsd") { + HostOS::FreeBSD + } else if cfg!(target_os = "openbsd") { + HostOS::OpenBSD + } else if cfg!(target_os = "netbsd") { + HostOS::NetBSD + } else if cfg!(target_os = "dragonfly") { + HostOS::DragonFly + } else if cfg!(target_os = "redox") { + HostOS::Redox + } else if cfg!(target_os = "fuchsia") { + HostOS::Fuchsia + } else if cfg!(target_os = "solaris") { + HostOS::Solaris + } else if cfg!(target_os = "emscripten") { + HostOS::Emscripten + } else if cfg!(target_os = "wasi") { + HostOS::WASI + } else { + panic!("unknown os") + } + } +} + +#[repr(u8)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, sky_macros::EnumMethods)] +/// Host endian enumeration +pub enum HostEndian { + Big = 0, + Little = 1, +} + +impl HostEndian { + pub const fn new() -> Self { + if cfg!(target_endian = "little") { + Self::Little + } else { + Self::Big + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, sky_macros::EnumMethods)] +#[repr(u8)] +/// Host pointer width enumeration +pub enum HostPointerWidth { + P32 = 0, + P64 = 1, +} + +impl HostPointerWidth { + pub const fn new() -> Self { + match sizeof!(usize) { + 4 => Self::P32, + 8 => Self::P64, + _ => panic!("unknown pointer width"), + } + } +} diff --git a/server/src/engine/storage/common/versions/mod.rs b/server/src/engine/storage/common/versions/mod.rs new file mode 100644 index 00000000..201684ad --- /dev/null +++ b/server/src/engine/storage/common/versions/mod.rs @@ -0,0 +1,115 @@ +/* + * Created on Mon May 15 2023 + * + * This file is a part of Skytable + * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source + * NoSQL database written by Sayan Nandan ("the Author") with the + * vision to provide flexibility in data modelling without compromising + * on performance, queryability or scalability. + * + * Copyright (c) 2023, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +//! # Versioning +//! +//! Storage engine versioning utility +//! + +pub mod server_version; + +pub const HEADER_V1: HeaderVersion = HeaderVersion(0); + +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Hash)] +/// The header version +/// +/// The header version is part of the static record and *barely* changes (almost like once in a light year) +pub struct HeaderVersion(u64); + +impl HeaderVersion { + pub const fn __new(v: u64) -> Self { + Self(v) + } + pub const fn little_endian_u64(&self) -> [u8; 8] { + self.0.to_le_bytes() + } +} + +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Hash)] +/// The server version (based on tag index) +pub struct ServerVersion(u64); + +impl ServerVersion { + pub const fn __new(v: u64) -> Self { + Self(v) + } + pub const fn little_endian(&self) -> [u8; 8] { + self.0.to_le_bytes() + } +} + +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Hash)] +/// The driver version +pub struct DriverVersion(u64); + +impl DriverVersion { + pub const fn __new(v: u64) -> Self { + Self(v) + } + pub const fn little_endian(&self) -> [u8; 8] { + self.0.to_le_bytes() + } +} + +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Hash)] +/// The file specifier version +pub struct FileSpecifierVersion(u16); + +impl FileSpecifierVersion { + pub const fn __new(v: u16) -> Self { + Self(v) + } + pub const fn little_endian(&self) -> [u8; 2] { + self.0.to_le_bytes() + } +} + +pub mod v1 { + //! The first SDSS based storage engine implementation. + //! Target tag: 0.8.0 {beta.1, beta.2, beta.3} + use super::{DriverVersion, HeaderVersion, ServerVersion}; + + /// The SDSS header version UID + pub const V1_HEADER_VERSION: HeaderVersion = HeaderVersion(0); + /// The server version UID + pub const V1_SERVER_VERSION: ServerVersion = + ServerVersion(super::server_version::fetch_id("v0.8.0") as _); + /// The driver version UID + pub const V1_DRIVER_VERSION: DriverVersion = DriverVersion(0); +} + +#[allow(unused)] +pub mod v2 { + //! The second SDSS based storage implementation + //! + //! Target tag: 0.8.0 (GA) + //! + //! Same tags as [`super::v1`] but different [`DriverVersion`] + use super::{DriverVersion, HeaderVersion, ServerVersion}; + pub const V2_HEADER_VERSION: HeaderVersion = super::v1::V1_HEADER_VERSION; + pub const V2_SERVER_VERSION: ServerVersion = super::v1::V1_SERVER_VERSION; + pub const V2_DRIVER_VERSION: DriverVersion = DriverVersion(1); +} diff --git a/server/src/engine/storage/common/versions/server_version.rs b/server/src/engine/storage/common/versions/server_version.rs new file mode 100644 index 00000000..257cde4e --- /dev/null +++ b/server/src/engine/storage/common/versions/server_version.rs @@ -0,0 +1,103 @@ +/* + * Created on Wed May 17 2023 + * + * This file is a part of Skytable + * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source + * NoSQL database written by Sayan Nandan ("the Author") with the + * vision to provide flexibility in data modelling without compromising + * on performance, queryability or scalability. + * + * Copyright (c) 2023, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +const VERSION_TAGS: [&str; 52] = [ + "v0.1.0", + "v0.2.0", + "v0.3.0", + "v0.3.1", + "v0.3.2", + "v0.4.0-alpha.1", + "v0.4.0-alpha.2", + "v0.4.0", + "v0.4.1-alpha.1", + "v0.4.1", + "v0.4.2-alpha.1", + "v0.4.2", + "v0.4.3-alpha.1", + "v0.4.3", + "v0.4.4", + "v0.4.5-alpha.1", + "v0.4.5-alpha.2", + "v0.4.5", + "v0.5.0-alpha.1", + "v0.5.0-alpha.2", + "v0.5.0", + "v0.5.1-alpha.1", + "v0.5.1", + "v0.5.2", + "v0.5.3", + "v0.6.0", + "v0.6.1", + "v0.6.2-testrelease.1", + "v0.6.2", + "v0.6.3-alpha.1", + "v0.6.3", + "v0.6.4-alpha.1", + "v0.6.4", + "v0.7.0-RC.1", + "v0.7.0-alpha.1", + "v0.7.0-alpha.2", + "v0.7.0-beta.1", + "v0.7.0", + "v0.7.1-alpha.1", + "v0.7.1", + "v0.7.2-alpha.1", + "v0.7.2", + "v0.7.3-alpha.1", + "v0.7.3-alpha.2", + "v0.7.3-alpha.3", + "v0.7.3", + "v0.7.4", + "v0.7.5", + "v0.7.6", + "v0.7.7", + "v0.8.0-alpha.1", + "v0.8.0", +]; +const VERSION_TAGS_LEN: usize = VERSION_TAGS.len(); +pub const fn fetch_id(id: &str) -> usize { + // this is ct, so a O(n) doesn't matter + let mut i = 0; + while i < VERSION_TAGS_LEN { + let bytes = VERSION_TAGS[i].as_bytes(); + let given = id.as_bytes(); + let mut j = 0; + let mut eq = true; + while (j < bytes.len()) & (bytes.len() == given.len()) { + if bytes[i] != given[i] { + eq = false; + break; + } + j += 1; + } + if eq { + return i; + } + i += 1; + } + panic!("version not found") +} diff --git a/server/src/engine/storage/mod.rs b/server/src/engine/storage/mod.rs index 540fb990..1c083088 100644 --- a/server/src/engine/storage/mod.rs +++ b/server/src/engine/storage/mod.rs @@ -26,9 +26,13 @@ //! Implementations of the Skytable Disk Storage Subsystem (SDSS) -mod checksum; -mod versions; -// impls +mod common; +// driver versions pub mod v1; +pub mod v2; -pub use checksum::SCrc; +pub mod safe_interfaces { + #[cfg(test)] + pub use super::common::interface::fs_test::{NullFS, VirtualFS}; + pub use super::common::interface::{fs_imp::LocalFS, fs_traits::FSInterface}; +} diff --git a/server/src/engine/storage/v1/batch_jrnl/mod.rs b/server/src/engine/storage/v1/batch_jrnl/mod.rs index 2013acbf..05ba5e7b 100644 --- a/server/src/engine/storage/v1/batch_jrnl/mod.rs +++ b/server/src/engine/storage/v1/batch_jrnl/mod.rs @@ -43,12 +43,15 @@ pub(super) use restore::{DecodedBatchEvent, DecodedBatchEventKind, NormalBatch}; pub use {persist::DataBatchPersistDriver, restore::DataBatchRestoreDriver}; use { - super::{rw::SDSSFileIO, spec, RawFSInterface}, - crate::engine::{core::model::Model, error::RuntimeResult}, + super::{rw::SDSSFileIO, spec}, + crate::engine::{ + core::model::Model, error::RuntimeResult, + storage::common::interface::fs_traits::FSInterface, + }, }; /// Re-initialize an existing batch journal and read all its data into model -pub fn reinit( +pub fn reinit( name: &str, model: &Model, ) -> RuntimeResult> { @@ -60,7 +63,7 @@ pub fn reinit( } /// Create a new batch journal -pub fn create(path: &str) -> RuntimeResult> { +pub fn create(path: &str) -> RuntimeResult> { let f = SDSSFileIO::::create::(path)?; DataBatchPersistDriver::new(f, true) } diff --git a/server/src/engine/storage/v1/batch_jrnl/persist.rs b/server/src/engine/storage/v1/batch_jrnl/persist.rs index 852411d1..3a6f654b 100644 --- a/server/src/engine/storage/v1/batch_jrnl/persist.rs +++ b/server/src/engine/storage/v1/batch_jrnl/persist.rs @@ -46,28 +46,31 @@ use { }, error::{RuntimeResult, StorageError}, idx::STIndexSeq, - storage::v1::rw::{RawFSInterface, SDSSFileIO, SDSSFileTrackedWriter}, + storage::{ + common::interface::fs_traits::FSInterface, + v1::rw::{SDSSFileIO, TrackedWriter}, + }, }, util::EndianQW, }, crossbeam_epoch::pin, }; -pub struct DataBatchPersistDriver { - f: SDSSFileTrackedWriter, +pub struct DataBatchPersistDriver { + f: TrackedWriter, } -impl DataBatchPersistDriver { +impl DataBatchPersistDriver { pub fn new(mut file: SDSSFileIO, is_new: bool) -> RuntimeResult { if !is_new { file.fsynced_write(&[MARKER_BATCH_REOPEN])?; } Ok(Self { - f: SDSSFileTrackedWriter::new(file)?, + f: TrackedWriter::new(file)?, }) } pub fn close(self) -> RuntimeResult<()> { - let mut slf = self.f.into_inner_file()?; + let mut slf = self.f.sync_into_inner()?; if slf.fsynced_write(&[MARKER_BATCH_CLOSED]).is_ok() { return Ok(()); } else { @@ -151,12 +154,12 @@ impl DataBatchPersistDriver { col_cnt: usize, ) -> RuntimeResult<()> { self.f - .tracked_write_unfsynced(&[MARKER_ACTUAL_BATCH_EVENT, pk_tag.value_u8()])?; + .tracked_write(&[MARKER_ACTUAL_BATCH_EVENT, pk_tag.value_u8()])?; let observed_len_bytes = observed_len.u64_bytes_le(); - self.f.tracked_write_unfsynced(&observed_len_bytes)?; + self.f.tracked_write(&observed_len_bytes)?; self.f - .tracked_write_unfsynced(&schema_version.value_u64().to_le_bytes())?; - self.f.tracked_write_unfsynced(&col_cnt.u64_bytes_le())?; + .tracked_write(&schema_version.value_u64().to_le_bytes())?; + self.f.tracked_write(&col_cnt.u64_bytes_le())?; Ok(()) } /// Append a summary of this batch and most importantly, **sync everything to disk** @@ -166,9 +169,9 @@ impl DataBatchPersistDriver { inconsistent_reads: usize, ) -> RuntimeResult<()> { // [0xFD][actual_commit][checksum] - self.f.tracked_write_unfsynced(&[MARKER_END_OF_BATCH])?; + self.f.tracked_write(&[MARKER_END_OF_BATCH])?; let actual_commit = (observed_len - inconsistent_reads).u64_bytes_le(); - self.f.tracked_write_unfsynced(&actual_commit)?; + self.f.tracked_write(&actual_commit)?; let cs = self.f.reset_and_finish_checksum().to_le_bytes(); self.f.untracked_write(&cs)?; // IMPORTANT: now that all data has been written, we need to actually ensure that the writes pass through the cache @@ -189,7 +192,7 @@ impl DataBatchPersistDriver { } } -impl DataBatchPersistDriver { +impl DataBatchPersistDriver { /// encode the primary key only. this means NO TAG is encoded. fn encode_pk_only(&mut self, pk: &PrimaryIndexKey) -> RuntimeResult<()> { let buf = &mut self.f; @@ -200,7 +203,7 @@ impl DataBatchPersistDriver { pk.read_uint() } .to_le_bytes(); - buf.tracked_write_unfsynced(&data)?; + buf.tracked_write(&data)?; } TagUnique::Str | TagUnique::Bin => { let slice = unsafe { @@ -208,8 +211,8 @@ impl DataBatchPersistDriver { pk.read_bin() }; let slice_l = slice.len().u64_bytes_le(); - buf.tracked_write_unfsynced(&slice_l)?; - buf.tracked_write_unfsynced(slice)?; + buf.tracked_write(&slice_l)?; + buf.tracked_write(slice)?; } TagUnique::Illegal => unsafe { // UNSAFE(@ohsayan): a pk can't be constructed with illegal @@ -222,7 +225,7 @@ impl DataBatchPersistDriver { fn encode_cell(&mut self, value: &Datacell) -> RuntimeResult<()> { let mut buf = vec![]; cell::encode(&mut buf, value); - self.f.tracked_write_unfsynced(&buf)?; + self.f.tracked_write(&buf)?; Ok(()) } /// Encode row data @@ -233,7 +236,7 @@ impl DataBatchPersistDriver { self.encode_cell(cell)?; } None if field_name.as_str() == model.p_key() => {} - None => self.f.tracked_write_unfsynced(&[0])?, + None => self.f.tracked_write(&[0])?, } } Ok(()) @@ -241,9 +244,9 @@ impl DataBatchPersistDriver { /// Write the change type and txnid fn write_batch_item_common_row_data(&mut self, delta: &DataDelta) -> RuntimeResult<()> { let change_type = [delta.change().value_u8()]; - self.f.tracked_write_unfsynced(&change_type)?; + self.f.tracked_write(&change_type)?; let txn_id = delta.data_version().value_u64().to_le_bytes(); - self.f.tracked_write_unfsynced(&txn_id)?; + self.f.tracked_write(&txn_id)?; Ok(()) } } diff --git a/server/src/engine/storage/v1/batch_jrnl/restore.rs b/server/src/engine/storage/v1/batch_jrnl/restore.rs index bcc99534..862d0e42 100644 --- a/server/src/engine/storage/v1/batch_jrnl/restore.rs +++ b/server/src/engine/storage/v1/batch_jrnl/restore.rs @@ -42,7 +42,10 @@ use { data::{cell::Datacell, tag::TagUnique}, error::{RuntimeResult, StorageError}, idx::{MTIndex, STIndex, STIndexSeq}, - storage::v1::rw::{RawFSInterface, SDSSFileIO, SDSSFileTrackedReader}, + storage::{ + common::interface::fs_traits::FSInterface, + v1::rw::{SDSSFileIO, TrackedReader}, + }, }, std::{ collections::{hash_map::Entry as HMEntry, HashMap}, @@ -103,14 +106,14 @@ enum Batch { BatchClosed, } -pub struct DataBatchRestoreDriver { - f: SDSSFileTrackedReader, +pub struct DataBatchRestoreDriver { + f: TrackedReader, } -impl DataBatchRestoreDriver { +impl DataBatchRestoreDriver { pub fn new(f: SDSSFileIO) -> RuntimeResult { Ok(Self { - f: SDSSFileTrackedReader::new(f)?, + f: TrackedReader::new(f)?, }) } pub fn into_file(self) -> RuntimeResult> { @@ -138,7 +141,7 @@ impl DataBatchRestoreDriver { } } -impl DataBatchRestoreDriver { +impl DataBatchRestoreDriver { fn read_all_batches_and_for_each( &mut self, mut f: impl FnMut(NormalBatch) -> RuntimeResult<()>, @@ -206,7 +209,7 @@ impl DataBatchRestoreDriver { } } -impl DataBatchRestoreDriver { +impl DataBatchRestoreDriver { fn apply_batch( m: &Model, NormalBatch { @@ -302,7 +305,7 @@ impl DataBatchRestoreDriver { } } -impl DataBatchRestoreDriver { +impl DataBatchRestoreDriver { fn read_batch_summary(&mut self, finished_early: bool) -> RuntimeResult { if !finished_early { // we must read the batch termination signature @@ -467,7 +470,7 @@ impl BatchStartBlock { } } -impl DataBatchRestoreDriver { +impl DataBatchRestoreDriver { fn decode_primary_key(&mut self, pk_type: u8) -> RuntimeResult { let Some(pk_type) = TagUnique::try_from_raw(pk_type) else { return Err(StorageError::DataBatchRestoreCorruptedEntry.into()); @@ -483,7 +486,7 @@ impl DataBatchRestoreDriver { TagUnique::Str | TagUnique::Bin => { let len = self.f.read_u64_le()?; let mut data = vec![0; len as usize]; - self.f.read_into_buffer(&mut data)?; + self.f.tracked_read(&mut data)?; if pk_type == TagUnique::Str { if core::str::from_utf8(&data).is_err() { return Err(StorageError::DataBatchRestoreCorruptedEntry.into()); @@ -505,7 +508,7 @@ impl DataBatchRestoreDriver { let Some(dscr) = StorageCellTypeID::try_from_raw(self.f.read_byte()?) else { return Err(StorageError::DataBatchRestoreCorruptedEntry.into()); }; - unsafe { cell::decode_element::>(&mut self.f, dscr) } + unsafe { cell::decode_element::>(&mut self.f, dscr) } .map_err(|e| e.0) } } @@ -521,7 +524,7 @@ impl From<()> for ErrorHack { Self(StorageError::DataBatchRestoreCorruptedEntry.into()) } } -impl DataSource for SDSSFileTrackedReader { +impl DataSource for TrackedReader { const RELIABLE_SOURCE: bool = false; type Error = ErrorHack; fn has_remaining(&self, cnt: usize) -> bool { @@ -538,7 +541,7 @@ impl DataSource for SDSSFileTrackedReader { } unsafe fn read_next_variable_block(&mut self, size: usize) -> Result, Self::Error> { let mut buf = vec![0; size]; - self.read_into_buffer(&mut buf)?; + self.tracked_read(&mut buf)?; Ok(buf) } } diff --git a/server/src/engine/storage/v1/journal.rs b/server/src/engine/storage/v1/journal.rs index a1815b6c..4b2045a4 100644 --- a/server/src/engine/storage/v1/journal.rs +++ b/server/src/engine/storage/v1/journal.rs @@ -41,13 +41,15 @@ - FIXME(@ohsayan): we will probably (naively) need to dynamically reposition the cursor in case the metadata is corrupted as well */ +#[cfg(test)] +use crate::engine::storage::common::interface::fs_traits::FileOpen; use { - super::{ - rw::{RawFSInterface, SDSSFileIO}, - spec, - }, + super::rw::SDSSFileIO, crate::{ - engine::error::{RuntimeResult, StorageError}, + engine::{ + error::{RuntimeResult, StorageError}, + storage::common::{interface::fs_traits::FSInterface, sdss}, + }, util::{compiler, copy_a_into_b, copy_slice_to_array as memcpy}, }, std::marker::PhantomData, @@ -56,11 +58,14 @@ use { const CRC: crc::Crc = crc::Crc::::new(&crc::CRC_32_ISO_HDLC); #[cfg(test)] -pub fn open_or_create_journal( +pub fn open_or_create_journal< + TA: JournalAdapter, + Fs: FSInterface, + F: sdss::FileSpecV1, +>( log_file_name: &str, gs: &TA::GlobalState, -) -> RuntimeResult>> { - use super::rw::FileOpen; +) -> RuntimeResult>> { let file = match SDSSFileIO::::open_or_create_perm_rw::(log_file_name)? { FileOpen::Created(f) => return Ok(FileOpen::Created(JournalWriter::new(f, 0, true)?)), FileOpen::Existing((file, _header)) => file, @@ -71,13 +76,13 @@ pub fn open_or_create_journal( +pub fn create_journal>( log_file_name: &str, ) -> RuntimeResult> { JournalWriter::new(SDSSFileIO::create::(log_file_name)?, 0, true) } -pub fn load_journal( +pub fn load_journal>( log_file_name: &str, gs: &TA::GlobalState, ) -> RuntimeResult> { @@ -192,7 +197,7 @@ impl JournalEntryMetadata { } } -pub struct JournalReader { +pub struct JournalReader { log_file: SDSSFileIO, evid: u64, closed: bool, @@ -200,9 +205,9 @@ pub struct JournalReader { _m: PhantomData, } -impl JournalReader { +impl JournalReader { pub fn new(log_file: SDSSFileIO) -> RuntimeResult { - let log_size = log_file.file_length()? - spec::SDSSStaticHeaderV1Compact::SIZE as u64; + let log_size = log_file.file_length()? - super::Header::SIZE as u64; Ok(Self { log_file, evid: 0, @@ -228,7 +233,7 @@ impl JournalReader { // the only case when this happens is when the journal faults at runtime with a write zero (or some other error when no bytes were written) self.remaining_bytes += JournalEntryMetadata::SIZE as u64; // move back cursor to see if we have a recovery block - let new_cursor = self.log_file.retrieve_cursor()? - JournalEntryMetadata::SIZE as u64; + let new_cursor = self.log_file.file_cursor()? - JournalEntryMetadata::SIZE as u64; self.log_file.seek_from_start(new_cursor)?; return self.try_recover_journal_strategy_simple_reverse(); } @@ -297,7 +302,7 @@ impl JournalReader { debug_assert!(TA::RECOVERY_PLUGIN, "recovery plugin not enabled"); self.__record_read_bytes(JournalEntryMetadata::SIZE); // FIXME(@ohsayan): don't assume read length? let mut entry_buf = [0u8; JournalEntryMetadata::SIZE]; - if self.log_file.read_to_buffer(&mut entry_buf).is_err() { + if self.log_file.read_buffer(&mut entry_buf).is_err() { return Err(StorageError::JournalCorrupted.into()); } let entry = JournalEntryMetadata::decode(entry_buf); @@ -329,7 +334,7 @@ impl JournalReader { } } -impl JournalReader { +impl JournalReader { fn _incr_evid(&mut self) { self.evid += 1; } @@ -344,19 +349,19 @@ impl JournalReader { } } -impl JournalReader { +impl JournalReader { fn logfile_read_into_buffer(&mut self, buf: &mut [u8]) -> RuntimeResult<()> { if !self.has_remaining_bytes(buf.len() as _) { // do this right here to avoid another syscall return Err(std::io::Error::from(std::io::ErrorKind::UnexpectedEof).into()); } - self.log_file.read_to_buffer(buf)?; + self.log_file.read_buffer(buf)?; self.__record_read_bytes(buf.len()); Ok(()) } } -pub struct JournalWriter { +pub struct JournalWriter { /// the txn log file log_file: SDSSFileIO, /// the id of the **next** journal @@ -365,7 +370,7 @@ pub struct JournalWriter { closed: bool, } -impl JournalWriter { +impl JournalWriter { pub fn new(mut log_file: SDSSFileIO, last_txn_id: u64, new: bool) -> RuntimeResult { let log_size = log_file.file_length()?; log_file.seek_from_start(log_size)?; // avoid jumbling with headers @@ -390,8 +395,8 @@ impl JournalWriter { encoded.len() as u64, ) .encoded(); - self.log_file.unfsynced_write(&md)?; - self.log_file.unfsynced_write(&encoded)?; + self.log_file.write_buffer(&md)?; + self.log_file.write_buffer(&encoded)?; self.log_file.fsync_all()?; Ok(()) } @@ -411,7 +416,7 @@ impl JournalWriter { } } -impl JournalWriter { +impl JournalWriter { pub fn appendrec_journal_reverse_entry(&mut self) -> RuntimeResult<()> { let mut entry = JournalEntryMetadata::new(0, EventSourceMarker::RECOVERY_REVERSE_LAST_JOURNAL, 0, 0); @@ -440,7 +445,7 @@ impl JournalWriter { } } -impl JournalWriter { +impl JournalWriter { fn _incr_id(&mut self) -> u64 { let current = self.id; self.id += 1; @@ -448,7 +453,7 @@ impl JournalWriter { } } -impl Drop for JournalWriter { +impl Drop for JournalWriter { fn drop(&mut self) { assert!(self.closed, "log not closed"); } diff --git a/server/src/engine/storage/v1/loader.rs b/server/src/engine/storage/v1/loader.rs index 065a3f49..87e3af65 100644 --- a/server/src/engine/storage/v1/loader.rs +++ b/server/src/engine/storage/v1/loader.rs @@ -25,9 +25,9 @@ */ #[cfg(test)] -use crate::engine::storage::v1::{ - rw::{FileOpen, RawFSInterface}, - JournalWriter, +use crate::engine::storage::{ + common::interface::fs_traits::{FSInterface, FileOpen}, + v1::JournalWriter, }; use crate::engine::{ core::{EntityIDRef, GlobalNS}, @@ -35,7 +35,10 @@ use crate::engine::{ error::RuntimeResult, fractal::error::ErrorContext, fractal::{FractalModelDriver, ModelDrivers, ModelUniqueID}, - storage::v1::{batch_jrnl, journal, spec, LocalFS}, + storage::{ + common::interface::fs_imp::LocalFS, + v1::{batch_jrnl, journal, spec}, + }, txn::gns::{GNSAdapter, GNSTransactionDriverAnyFS}, }; @@ -43,14 +46,14 @@ const GNS_FILE_PATH: &str = "gns.db-tlog"; const DATA_DIR: &str = "data"; pub struct SEInitState { - pub txn_driver: GNSTransactionDriverAnyFS, + pub txn_driver: GNSTransactionDriverAnyFS, pub model_drivers: ModelDrivers, pub gns: GlobalNS, } impl SEInitState { pub fn new( - txn_driver: GNSTransactionDriverAnyFS, + txn_driver: GNSTransactionDriverAnyFS, model_drivers: ModelDrivers, gns: GlobalNS, ) -> Self { @@ -140,7 +143,7 @@ impl SEInitState { } #[cfg(test)] -pub fn open_gns_driver( +pub fn open_gns_driver( path: &str, gns: &GlobalNS, ) -> RuntimeResult>> { diff --git a/server/src/engine/storage/v1/mod.rs b/server/src/engine/storage/v1/mod.rs index 3a1bb406..1ce27ee6 100644 --- a/server/src/engine/storage/v1/mod.rs +++ b/server/src/engine/storage/v1/mod.rs @@ -24,6 +24,10 @@ * */ +//! SDSS based storage engine driver v1 ([`versions::v1`]) +//! +//! Target tags: `0.8.0-beta`, `0.8.0-beta.2`, `0.8.0-beta.3` + // impls mod batch_jrnl; mod journal; @@ -33,15 +37,15 @@ pub mod spec; pub mod sysdb; // hl pub mod inf; -// test -pub mod memfs; #[cfg(test)] mod tests; // re-exports +pub(self) use spec::Header; + pub use { journal::{JournalAdapter, JournalWriter}, - rw::{LocalFS, RawFSInterface, SDSSFileIO}, + rw::SDSSFileIO, }; pub mod data_batch { pub use super::batch_jrnl::{create, DataBatchPersistDriver}; diff --git a/server/src/engine/storage/v1/rw.rs b/server/src/engine/storage/v1/rw.rs index 83b6d087..f2b21443 100644 --- a/server/src/engine/storage/v1/rw.rs +++ b/server/src/engine/storage/v1/rw.rs @@ -25,360 +25,106 @@ */ use { - super::spec::{FileSpec, Header}, crate::{ - engine::{error::RuntimeResult, storage::SCrc}, + engine::{ + error::RuntimeResult, + storage::common::{ + checksum::SCrc64, + interface::fs_traits::{ + FSInterface, FileInterface, FileInterfaceBufWrite, FileInterfaceExt, + FileInterfaceRead, FileInterfaceWrite, FileInterfaceWriteExt, FileOpen, + }, + sdss, + }, + }, util::os::SysIOError, }, - std::{ - fs::{self, File}, - io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write}, - marker::PhantomData, - }, + std::marker::PhantomData, }; -#[derive(Debug)] -/// Log whether -pub enum FileOpen { - Created(CF), - Existing(EF), -} - -#[cfg(test)] -impl FileOpen { - pub fn into_existing(self) -> Option { - match self { - Self::Existing(e) => Some(e), - Self::Created(_) => None, - } - } - pub fn into_created(self) -> Option { - match self { - Self::Created(f) => Some(f), - Self::Existing(_) => None, - } - } -} - -#[cfg(test)] -impl FileOpen { - pub fn into_inner(self) -> F { - match self { - Self::Created(f) => f, - Self::Existing(f) => f, - } - } -} - -/// The specification for a file system interface (our own abstraction over the fs) -pub trait RawFSInterface { - /// asserts that the file system is not a null filesystem (like `/dev/null` for example) - const NOT_NULL: bool = true; - /// the file descriptor that is returned by the file system when a file is opened - type File: RawFileInterface; - /// Remove a file - fn fs_remove_file(fpath: &str) -> RuntimeResult<()>; - /// Rename a file - fn fs_rename_file(from: &str, to: &str) -> RuntimeResult<()>; - /// Create a directory - fn fs_create_dir(fpath: &str) -> RuntimeResult<()>; - /// Create a directory and all corresponding path components - fn fs_create_dir_all(fpath: &str) -> RuntimeResult<()>; - /// Delete a directory - fn fs_delete_dir(fpath: &str) -> RuntimeResult<()>; - /// Delete a directory and recursively remove all (if any) children - fn fs_delete_dir_all(fpath: &str) -> RuntimeResult<()>; - /// Open or create a file in R/W mode - /// - /// This will: - /// - Create a file if it doesn't exist - /// - Open a file it it does exist - fn fs_fopen_or_create_rw(fpath: &str) -> RuntimeResult>; - /// Open an existing file - fn fs_fopen_rw(fpath: &str) -> RuntimeResult; - /// Create a new file - fn fs_fcreate_rw(fpath: &str) -> RuntimeResult; +pub struct TrackedWriter { + file: SDSSFileIO::BufWriter>, + cs: SCrc64, } -/// A file (well, probably) that can be used for RW operations along with advanced write and extended operations (such as seeking) -pub trait RawFileInterface: Sized -where - Self: RawFileInterfaceRead - + RawFileInterfaceWrite - + RawFileInterfaceWriteExt - + RawFileInterfaceExt, -{ - type BufReader: RawFileInterfaceBufferedReader; - type BufWriter: RawFileInterfaceBufferedWriter; - fn into_buffered_reader(self) -> RuntimeResult; - fn downgrade_reader(r: Self::BufReader) -> RuntimeResult; - fn into_buffered_writer(self) -> RuntimeResult; - fn downgrade_writer(w: Self::BufWriter) -> RuntimeResult; -} - -pub trait RawFileInterfaceBufferedReader: RawFileInterfaceRead + RawFileInterfaceExt {} -impl RawFileInterfaceBufferedReader for R {} - -pub trait RawFileInterfaceBufferedWriter: RawFileInterfaceWrite + RawFileInterfaceExt { - fn sync_write_cache(&mut self) -> RuntimeResult<()> { - Ok(()) - } -} - -/// A file interface that supports read operations -pub trait RawFileInterfaceRead { - fn fr_read_exact(&mut self, buf: &mut [u8]) -> RuntimeResult<()>; -} - -impl RawFileInterfaceRead for R { - fn fr_read_exact(&mut self, buf: &mut [u8]) -> RuntimeResult<()> { - self.read_exact(buf).map_err(From::from) - } -} - -/// A file interface that supports write operations -pub trait RawFileInterfaceWrite { - fn fw_write_all(&mut self, buf: &[u8]) -> RuntimeResult<()>; -} - -impl RawFileInterfaceWrite for W { - fn fw_write_all(&mut self, buf: &[u8]) -> RuntimeResult<()> { - self.write_all(buf).map_err(From::from) - } -} - -/// A file interface that supports advanced write operations -pub trait RawFileInterfaceWriteExt { - fn fwext_fsync_all(&mut self) -> RuntimeResult<()>; - fn fwext_truncate_to(&mut self, to: u64) -> RuntimeResult<()>; -} - -/// A file interface that supports advanced file operations -pub trait RawFileInterfaceExt { - fn fext_file_length(&self) -> RuntimeResult; - fn fext_cursor(&mut self) -> RuntimeResult; - fn fext_seek_ahead_from_start_by(&mut self, ahead_by: u64) -> RuntimeResult<()>; -} - -fn cvt(v: std::io::Result) -> RuntimeResult { - let r = v?; - Ok(r) -} - -/// The actual local host file system (as an abstraction [`fs`]) -#[derive(Debug)] -pub struct LocalFS; - -impl RawFSInterface for LocalFS { - type File = File; - fn fs_remove_file(fpath: &str) -> RuntimeResult<()> { - cvt(fs::remove_file(fpath)) - } - fn fs_rename_file(from: &str, to: &str) -> RuntimeResult<()> { - cvt(fs::rename(from, to)) - } - fn fs_create_dir(fpath: &str) -> RuntimeResult<()> { - cvt(fs::create_dir(fpath)) - } - fn fs_create_dir_all(fpath: &str) -> RuntimeResult<()> { - cvt(fs::create_dir_all(fpath)) - } - fn fs_delete_dir(fpath: &str) -> RuntimeResult<()> { - cvt(fs::remove_dir(fpath)) - } - fn fs_delete_dir_all(fpath: &str) -> RuntimeResult<()> { - cvt(fs::remove_dir_all(fpath)) - } - fn fs_fopen_or_create_rw(fpath: &str) -> RuntimeResult> { - let f = File::options() - .create(true) - .read(true) - .write(true) - .open(fpath)?; - let md = f.metadata()?; - if md.len() == 0 { - Ok(FileOpen::Created(f)) - } else { - Ok(FileOpen::Existing(f)) - } - } - fn fs_fcreate_rw(fpath: &str) -> RuntimeResult { - let f = File::options() - .create_new(true) - .read(true) - .write(true) - .open(fpath)?; - Ok(f) - } - fn fs_fopen_rw(fpath: &str) -> RuntimeResult { - let f = File::options().read(true).write(true).open(fpath)?; - Ok(f) - } -} - -impl RawFileInterface for File { - type BufReader = BufReader; - type BufWriter = BufWriter; - fn into_buffered_reader(self) -> RuntimeResult { - Ok(BufReader::new(self)) - } - fn downgrade_reader(r: Self::BufReader) -> RuntimeResult { - Ok(r.into_inner()) - } - fn into_buffered_writer(self) -> RuntimeResult { - Ok(BufWriter::new(self)) - } - fn downgrade_writer(mut w: Self::BufWriter) -> RuntimeResult { - w.flush()?; // TODO(@ohsayan): handle rare case where writer does panic - let (w, _) = w.into_parts(); - Ok(w) - } -} - -impl RawFileInterfaceBufferedWriter for BufWriter { - fn sync_write_cache(&mut self) -> RuntimeResult<()> { - self.flush()?; - self.get_mut().sync_all()?; - Ok(()) - } -} - -impl RawFileInterfaceWriteExt for File { - fn fwext_fsync_all(&mut self) -> RuntimeResult<()> { - cvt(self.sync_all()) - } - fn fwext_truncate_to(&mut self, to: u64) -> RuntimeResult<()> { - cvt(self.set_len(to)) - } -} - -trait LocalFSFile { - fn file_mut(&mut self) -> &mut File; - fn file(&self) -> &File; -} - -impl LocalFSFile for File { - fn file_mut(&mut self) -> &mut File { - self - } - fn file(&self) -> &File { - self - } -} - -impl LocalFSFile for BufReader { - fn file_mut(&mut self) -> &mut File { - self.get_mut() - } - fn file(&self) -> &File { - self.get_ref() - } -} - -impl LocalFSFile for BufWriter { - fn file_mut(&mut self) -> &mut File { - self.get_mut() - } - fn file(&self) -> &File { - self.get_ref() - } -} - -impl RawFileInterfaceExt for F { - fn fext_file_length(&self) -> RuntimeResult { - Ok(self.file().metadata()?.len()) - } - fn fext_cursor(&mut self) -> RuntimeResult { - cvt(self.file_mut().stream_position()) - } - fn fext_seek_ahead_from_start_by(&mut self, by: u64) -> RuntimeResult<()> { - cvt(self.file_mut().seek(SeekFrom::Start(by)).map(|_| ())) - } -} - -pub struct SDSSFileTrackedWriter { - f: SDSSFileIO::BufWriter>, - cs: SCrc, -} - -impl SDSSFileTrackedWriter { +impl TrackedWriter { pub fn new(f: SDSSFileIO) -> RuntimeResult { Ok(Self { - f: f.into_buffered_sdss_writer()?, - cs: SCrc::new(), + file: f.into_buffered_writer()?, + cs: SCrc64::new(), }) } - pub fn tracked_write_unfsynced(&mut self, block: &[u8]) -> RuntimeResult<()> { + pub fn tracked_write(&mut self, block: &[u8]) -> RuntimeResult<()> { self.untracked_write(block) .map(|_| self.cs.recompute_with_new_var_block(block)) } pub fn untracked_write(&mut self, block: &[u8]) -> RuntimeResult<()> { - match self.f.unfsynced_write(block) { + match self.file.write_buffer(block) { Ok(()) => Ok(()), e => e, } } pub fn sync_writes(&mut self) -> RuntimeResult<()> { - self.f.f.sync_write_cache() + self.file.f.sync_write_cache() } pub fn reset_and_finish_checksum(&mut self) -> u64 { - let scrc = core::mem::replace(&mut self.cs, SCrc::new()); + let scrc = core::mem::replace(&mut self.cs, SCrc64::new()); scrc.finish() } - pub fn into_inner_file(self) -> RuntimeResult> { - self.f.downgrade_writer() + pub fn sync_into_inner(self) -> RuntimeResult> { + self.file.downgrade_writer() } } /// [`SDSSFileLenTracked`] simply maintains application level length and checksum tracking to avoid frequent syscalls because we /// do not expect (even though it's very possible) users to randomly modify file lengths while we're reading them -pub struct SDSSFileTrackedReader { - f: SDSSFileIO::BufReader>, +pub struct TrackedReader { + f: SDSSFileIO::BufReader>, len: u64, - pos: u64, - cs: SCrc, + cursor: u64, + cs: SCrc64, } -impl SDSSFileTrackedReader { +impl TrackedReader { /// Important: this will only look at the data post the current cursor! pub fn new(mut f: SDSSFileIO) -> RuntimeResult { let len = f.file_length()?; - let pos = f.retrieve_cursor()?; - let f = f.into_buffered_sdss_reader()?; + let pos = f.file_cursor()?; + let f = f.into_buffered_reader()?; Ok(Self { f, len, - pos, - cs: SCrc::new(), + cursor: pos, + cs: SCrc64::new(), }) } pub fn remaining(&self) -> u64 { - self.len - self.pos + self.len - self.cursor } pub fn is_eof(&self) -> bool { - self.len == self.pos + self.len == self.cursor } pub fn has_left(&self, v: u64) -> bool { self.remaining() >= v } - pub fn read_into_buffer(&mut self, buf: &mut [u8]) -> RuntimeResult<()> { + pub fn tracked_read(&mut self, buf: &mut [u8]) -> RuntimeResult<()> { self.untracked_read(buf) .map(|_| self.cs.recompute_with_new_var_block(buf)) } pub fn read_byte(&mut self) -> RuntimeResult { let mut buf = [0u8; 1]; - self.read_into_buffer(&mut buf).map(|_| buf[0]) + self.tracked_read(&mut buf).map(|_| buf[0]) } pub fn __reset_checksum(&mut self) -> u64 { - let mut crc = SCrc::new(); + let mut crc = SCrc64::new(); core::mem::swap(&mut crc, &mut self.cs); crc.finish() } pub fn untracked_read(&mut self, buf: &mut [u8]) -> RuntimeResult<()> { if self.remaining() >= buf.len() as u64 { - match self.f.read_to_buffer(buf) { + match self.f.read_buffer(buf) { Ok(()) => { - self.pos += buf.len() as u64; + self.cursor += buf.len() as u64; Ok(()) } Err(e) => return Err(e), @@ -395,7 +141,7 @@ impl SDSSFileTrackedReader { return Err(SysIOError::from(std::io::ErrorKind::InvalidInput).into()); } let mut buf = [0; N]; - self.read_into_buffer(&mut buf)?; + self.tracked_read(&mut buf)?; Ok(buf) } pub fn read_u64_le(&mut self) -> RuntimeResult { @@ -404,66 +150,68 @@ impl SDSSFileTrackedReader { } #[derive(Debug)] -pub struct SDSSFileIO::File> { +pub struct SDSSFileIO::File> { f: F, _fs: PhantomData, } -impl SDSSFileIO { - pub fn open(fpath: &str) -> RuntimeResult<(Self, F::Header)> { +impl SDSSFileIO { + pub fn open>( + fpath: &str, + ) -> RuntimeResult<(Self, F::Metadata)> { let mut f = Self::_new(Fs::fs_fopen_rw(fpath)?); - let header = F::Header::decode_verify(&mut f, F::DECODE_DATA, F::VERIFY_DATA)?; - Ok((f, header)) + let v = F::read_metadata(&mut f.f, ())?; + Ok((f, v)) } - pub fn create(fpath: &str) -> RuntimeResult { + pub fn create>(fpath: &str) -> RuntimeResult { let mut f = Self::_new(Fs::fs_fcreate_rw(fpath)?); - F::Header::encode(&mut f, F::ENCODE_DATA)?; + F::write_metadata(&mut f.f, ())?; Ok(f) } - pub fn open_or_create_perm_rw( + pub fn open_or_create_perm_rw>( fpath: &str, - ) -> RuntimeResult> { + ) -> RuntimeResult> { match Fs::fs_fopen_or_create_rw(fpath)? { FileOpen::Created(c) => { let mut f = Self::_new(c); - F::Header::encode(&mut f, F::ENCODE_DATA)?; + F::write_metadata(&mut f.f, ())?; Ok(FileOpen::Created(f)) } FileOpen::Existing(e) => { let mut f = Self::_new(e); - let header = F::Header::decode_verify(&mut f, F::DECODE_DATA, F::VERIFY_DATA)?; + let header = F::read_metadata(&mut f.f, ())?; Ok(FileOpen::Existing((f, header))) } } } - pub fn into_buffered_sdss_reader( + pub fn into_buffered_reader( self, - ) -> RuntimeResult::BufReader>> { - self.f.into_buffered_reader().map(SDSSFileIO::_new) + ) -> RuntimeResult::BufReader>> { + self.f.upgrade_to_buffered_reader().map(SDSSFileIO::_new) } - pub fn into_buffered_sdss_writer( + pub fn into_buffered_writer( self, - ) -> RuntimeResult::BufWriter>> { - self.f.into_buffered_writer().map(SDSSFileIO::_new) + ) -> RuntimeResult::BufWriter>> { + self.f.upgrade_to_buffered_writer().map(SDSSFileIO::_new) } } -impl SDSSFileIO::BufReader> { +impl SDSSFileIO::BufReader> { pub fn downgrade_reader(self) -> RuntimeResult> { - let me = ::downgrade_reader(self.f)?; + let me = ::downgrade_reader(self.f)?; Ok(SDSSFileIO::_new(me)) } } -impl SDSSFileIO::BufWriter> { +impl SDSSFileIO::BufWriter> { pub fn downgrade_writer(self) -> RuntimeResult> { - let me = ::downgrade_writer(self.f)?; + let me = ::downgrade_writer(self.f)?; Ok(SDSSFileIO::_new(me)) } } -impl SDSSFileIO { - pub fn _new(f: F) -> Self { +impl SDSSFileIO { + fn _new(f: F) -> Self { Self { f, _fs: PhantomData, @@ -471,46 +219,46 @@ impl SDSSFileIO { } } -impl SDSSFileIO { - pub fn read_to_buffer(&mut self, buffer: &mut [u8]) -> RuntimeResult<()> { - self.f.fr_read_exact(buffer) +impl SDSSFileIO { + pub fn read_buffer(&mut self, buffer: &mut [u8]) -> RuntimeResult<()> { + self.f.fread_exact(buffer) } } -impl SDSSFileIO { - pub fn retrieve_cursor(&mut self) -> RuntimeResult { +impl SDSSFileIO { + pub fn file_cursor(&mut self) -> RuntimeResult { self.f.fext_cursor() } pub fn file_length(&self) -> RuntimeResult { - self.f.fext_file_length() + self.f.fext_length() } pub fn seek_from_start(&mut self, by: u64) -> RuntimeResult<()> { self.f.fext_seek_ahead_from_start_by(by) } } -impl SDSSFileIO { - pub fn load_remaining_into_buffer(&mut self) -> RuntimeResult> { - let len = self.file_length()? - self.retrieve_cursor()?; +impl SDSSFileIO { + pub fn read_full(&mut self) -> RuntimeResult> { + let len = self.file_length()? - self.file_cursor()?; let mut buf = vec![0; len as usize]; - self.read_to_buffer(&mut buf)?; + self.read_buffer(&mut buf)?; Ok(buf) } } -impl SDSSFileIO { - pub fn unfsynced_write(&mut self, data: &[u8]) -> RuntimeResult<()> { +impl SDSSFileIO { + pub fn write_buffer(&mut self, data: &[u8]) -> RuntimeResult<()> { self.f.fw_write_all(data) } } -impl SDSSFileIO { +impl SDSSFileIO { pub fn fsync_all(&mut self) -> RuntimeResult<()> { - self.f.fwext_fsync_all()?; + self.f.fwext_sync_all()?; Ok(()) } pub fn fsynced_write(&mut self, data: &[u8]) -> RuntimeResult<()> { self.f.fw_write_all(data)?; - self.f.fwext_fsync_all() + self.f.fwext_sync_all() } } diff --git a/server/src/engine/storage/v1/spec.rs b/server/src/engine/storage/v1/spec.rs index 9f556c1d..07645d0d 100644 --- a/server/src/engine/storage/v1/spec.rs +++ b/server/src/engine/storage/v1/spec.rs @@ -24,157 +24,37 @@ * */ -/* - Header specification - --- - We utilize two different kinds of headers: - - Static header - Mostly to avoid data corruption - - Variable header - For preserving dynamic information -*/ - -use { - super::rw::{RawFSInterface, SDSSFileIO}, - crate::{ - engine::{ - error::{RuntimeResult, StorageError}, - storage::versions::{self, DriverVersion, HeaderVersion, ServerVersion}, - }, - util::os, - }, - std::{ - mem::{transmute, ManuallyDrop}, - ops::Range, - }, +use crate::engine::storage::common::{ + sdss, + versions::{self, DriverVersion, FileSpecifierVersion, ServerVersion}, }; -/* - meta -*/ +pub(super) type Header = sdss::HeaderV1; -#[repr(u8)] -#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, sky_macros::EnumMethods)] -/// Host architecture enumeration for common platforms -pub enum HostArch { - X86 = 0, - X86_64 = 1, - ARM = 2, - ARM64 = 3, - MIPS = 4, - PowerPC = 5, +#[derive(Debug)] +pub(super) struct HeaderImplV1; +impl sdss::HeaderV1Spec for HeaderImplV1 { + type FileClass = FileScope; + type FileSpecifier = FileSpecifier; + const CURRENT_SERVER_VERSION: ServerVersion = versions::v1::V1_SERVER_VERSION; + const CURRENT_DRIVER_VERSION: DriverVersion = versions::v1::V1_DRIVER_VERSION; } - -impl HostArch { - pub const fn new() -> Self { - if cfg!(target_arch = "x86") { - HostArch::X86 - } else if cfg!(target_arch = "x86_64") { - HostArch::X86_64 - } else if cfg!(target_arch = "arm") { - HostArch::ARM - } else if cfg!(target_arch = "aarch64") { - HostArch::ARM64 - } else if cfg!(target_arch = "mips") { - HostArch::MIPS - } else if cfg!(target_arch = "powerpc") { - HostArch::PowerPC - } else { - panic!("Unsupported target architecture") - } +impl sdss::HeaderV1Enumeration for FileScope { + const MAX: u8 = FileScope::MAX; + unsafe fn new(x: u8) -> Self { + core::mem::transmute(x) } -} - -#[repr(u8)] -#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, sky_macros::EnumMethods)] -/// Host OS enumeration for common operating systems -pub enum HostOS { - // T1 - Linux = 0, - Windows = 1, - MacOS = 2, - // T2 - Android = 3, - AppleiOS = 4, - FreeBSD = 5, - OpenBSD = 6, - NetBSD = 7, - WASI = 8, - Emscripten = 9, - // T3 - Solaris = 10, - Fuchsia = 11, - Redox = 12, - DragonFly = 13, -} - -impl HostOS { - pub const fn new() -> Self { - if cfg!(target_os = "linux") { - HostOS::Linux - } else if cfg!(target_os = "windows") { - HostOS::Windows - } else if cfg!(target_os = "macos") { - HostOS::MacOS - } else if cfg!(target_os = "android") { - HostOS::Android - } else if cfg!(target_os = "ios") { - HostOS::AppleiOS - } else if cfg!(target_os = "freebsd") { - HostOS::FreeBSD - } else if cfg!(target_os = "openbsd") { - HostOS::OpenBSD - } else if cfg!(target_os = "netbsd") { - HostOS::NetBSD - } else if cfg!(target_os = "dragonfly") { - HostOS::DragonFly - } else if cfg!(target_os = "redox") { - HostOS::Redox - } else if cfg!(target_os = "fuchsia") { - HostOS::Fuchsia - } else if cfg!(target_os = "solaris") { - HostOS::Solaris - } else if cfg!(target_os = "emscripten") { - HostOS::Emscripten - } else if cfg!(target_os = "wasi") { - HostOS::WASI - } else { - panic!("unknown os") - } + fn repr_u8(&self) -> u8 { + FileScope::value_u8(self) } } - -#[repr(u8)] -#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, sky_macros::EnumMethods)] -/// Host endian enumeration -pub enum HostEndian { - Big = 0, - Little = 1, -} - -impl HostEndian { - pub const fn new() -> Self { - if cfg!(target_endian = "little") { - Self::Little - } else { - Self::Big - } +impl sdss::HeaderV1Enumeration for FileSpecifier { + const MAX: u8 = FileSpecifier::MAX; + unsafe fn new(x: u8) -> Self { + core::mem::transmute(x) } -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, sky_macros::EnumMethods)] -#[repr(u8)] -/// Host pointer width enumeration -pub enum HostPointerWidth { - P32 = 0, - P64 = 1, -} - -impl HostPointerWidth { - pub const fn new() -> Self { - match sizeof!(usize) { - 4 => Self::P32, - 8 => Self::P64, - _ => panic!("unknown pointer width"), - } + fn repr_u8(&self) -> u8 { + self.value_u8() } } @@ -197,414 +77,43 @@ pub enum FileSpecifier { TestTransactionLog = 0xFF, } -#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] -pub struct FileSpecifierVersion(u16); -impl FileSpecifierVersion { - pub const fn __new(v: u16) -> Self { - Self(v) - } -} - -const SDSS_MAGIC: u64 = 0x4F48534159414E21; - -/// Specification for a SDSS file -pub trait FileSpec { - /// The header spec for the file - type Header: Header; - /// Encode data - const ENCODE_DATA: ::EncodeArgs; - /// Decode data - const DECODE_DATA: ::DecodeArgs; - /// Verify data - const VERIFY_DATA: ::DecodeVerifyArgs; -} - /* file spec impls */ #[cfg(test)] -pub struct TestFile; +pub(super) struct TestFile; #[cfg(test)] -impl FileSpec for TestFile { - type Header = SDSSStaticHeaderV1Compact; - const ENCODE_DATA: ::EncodeArgs = ( - FileScope::FlatmapData, - FileSpecifier::TestTransactionLog, - FileSpecifierVersion::__new(0), - ); - const DECODE_DATA: ::DecodeArgs = (); - const VERIFY_DATA: ::DecodeVerifyArgs = Self::ENCODE_DATA; +impl sdss::SimpleFileSpecV1 for TestFile { + type HeaderSpec = HeaderImplV1; + const FILE_CLASS: FileScope = FileScope::FlatmapData; + const FILE_SPECIFIER: FileSpecifier = FileSpecifier::TestTransactionLog; + const FILE_SPECFIER_VERSION: FileSpecifierVersion = FileSpecifierVersion::__new(0); } /// The file specification for the GNS transaction log (impl v1) -pub struct GNSTransactionLogV1; -impl FileSpec for GNSTransactionLogV1 { - type Header = SDSSStaticHeaderV1Compact; - const ENCODE_DATA: ::EncodeArgs = ( - FileScope::Journal, - FileSpecifier::GNSTxnLog, - FileSpecifierVersion::__new(0), - ); - const DECODE_DATA: ::DecodeArgs = (); - const VERIFY_DATA: ::DecodeVerifyArgs = Self::ENCODE_DATA; +pub(super) struct GNSTransactionLogV1; +impl sdss::SimpleFileSpecV1 for GNSTransactionLogV1 { + type HeaderSpec = HeaderImplV1; + const FILE_CLASS: FileScope = FileScope::Journal; + const FILE_SPECIFIER: FileSpecifier = FileSpecifier::GNSTxnLog; + const FILE_SPECFIER_VERSION: FileSpecifierVersion = FileSpecifierVersion::__new(0); } /// The file specification for a journal batch -pub struct DataBatchJournalV1; -impl FileSpec for DataBatchJournalV1 { - type Header = SDSSStaticHeaderV1Compact; - const ENCODE_DATA: ::EncodeArgs = ( - FileScope::DataBatch, - FileSpecifier::TableDataBatch, - FileSpecifierVersion::__new(0), - ); - const DECODE_DATA: ::DecodeArgs = (); - const VERIFY_DATA: ::DecodeVerifyArgs = Self::ENCODE_DATA; +pub(super) struct DataBatchJournalV1; +impl sdss::SimpleFileSpecV1 for DataBatchJournalV1 { + type HeaderSpec = HeaderImplV1; + const FILE_CLASS: FileScope = FileScope::DataBatch; + const FILE_SPECIFIER: FileSpecifier = FileSpecifier::TableDataBatch; + const FILE_SPECFIER_VERSION: FileSpecifierVersion = FileSpecifierVersion::__new(0); } /// The file specification for the system db -pub struct SysDBV1; -impl FileSpec for SysDBV1 { - type Header = SDSSStaticHeaderV1Compact; - const ENCODE_DATA: ::EncodeArgs = ( - FileScope::FlatmapData, - FileSpecifier::SysDB, - FileSpecifierVersion::__new(0), - ); - const DECODE_DATA: ::DecodeArgs = (); - const VERIFY_DATA: ::DecodeVerifyArgs = Self::ENCODE_DATA; -} - -/* - header spec -*/ - -/// SDSS Header specification -pub trait Header: Sized { - /// Encode arguments - type EncodeArgs; - /// Decode arguments - type DecodeArgs; - /// Decode verify arguments - type DecodeVerifyArgs; - /// Encode the header - fn encode( - f: &mut SDSSFileIO, - args: Self::EncodeArgs, - ) -> RuntimeResult<()>; - /// Decode the header - fn decode( - f: &mut SDSSFileIO, - args: Self::DecodeArgs, - ) -> RuntimeResult; - /// Verify the header - fn verify(&self, args: Self::DecodeVerifyArgs) -> RuntimeResult<()>; - /// Decode and verify the header - fn decode_verify( - f: &mut SDSSFileIO, - d_args: Self::DecodeArgs, - v_args: Self::DecodeVerifyArgs, - ) -> RuntimeResult { - let h = Self::decode(f, d_args)?; - h.verify(v_args)?; - Ok(h) - } -} - -/* - header impls -*/ - -unsafe fn memcpy(src: &[u8]) -> [u8; N] { - let mut dst = [0u8; N]; - src.as_ptr().copy_to_nonoverlapping(dst.as_mut_ptr(), N); - dst -} - -macro_rules! var { - (let $($name:ident),* $(,)?) => { - $(let $name;)* - } -} - -/* - Compact SDSS Header v1 - --- - - 1: Magic block (16B): magic + header version - - 2: Static block (40B): - - 2.1: Genesis static record (24B) - - 2.1.1: Software information (16B) - - Server version (8B) - - Driver version (8B) - - 2.1.2: Host information (4B): - - OS (1B) - - Arch (1B) - - Pointer width (1B) - - Endian (1B) - - 2.1.3: File information (4B): - - File class (1B) - - File specifier (1B) - - File specifier version (2B) - - 2.2: Genesis runtime record (16B) - - Host epoch (16B) - - 3: Padding block (8B) -*/ - -#[repr(align(8))] -#[derive(Debug, PartialEq)] -pub struct SDSSStaticHeaderV1Compact { - // 1 magic block - magic_header_version: HeaderVersion, - // 2.1.1 - genesis_static_sw_server_version: ServerVersion, - genesis_static_sw_driver_version: DriverVersion, - // 2.1.2 - genesis_static_host_os: HostOS, - genesis_static_host_arch: HostArch, - genesis_static_host_ptr_width: HostPointerWidth, - genesis_static_host_endian: HostEndian, - // 2.1.3 - genesis_static_file_class: FileScope, - genesis_static_file_specifier: FileSpecifier, - genesis_static_file_specifier_version: FileSpecifierVersion, - // 2.2 - genesis_runtime_epoch_time: u128, - // 3 - genesis_padding_block: [u8; 8], -} - -impl SDSSStaticHeaderV1Compact { - pub const SIZE: usize = 64; - /// Decode and validate the full header block (validate ONLY; you must verify yourself) - /// - /// Notes: - /// - Time might be inconsistent; verify - /// - Compatibility requires additional intervention - /// - If padding block was not zeroed, handle - /// - No file metadata and is verified. Check! - /// - fn _decode(block: [u8; 64]) -> RuntimeResult { - var!(let raw_magic, raw_header_version, raw_server_version, raw_driver_version, raw_host_os, raw_host_arch, - raw_host_ptr_width, raw_host_endian, raw_file_class, raw_file_specifier, raw_file_specifier_version, - raw_runtime_epoch_time, raw_paddding_block, - ); - macro_rules! u64 { - ($pos:expr) => { - u64::from_le_bytes(memcpy(&block[$pos])) - }; - } - unsafe { - // UNSAFE(@ohsayan): all segments are correctly accessed (aligned to u8) - raw_magic = u64!(Self::SEG1_MAGIC); - raw_header_version = HeaderVersion::__new(u64!(Self::SEG1_HEADER_VERSION)); - raw_server_version = ServerVersion::__new(u64!(Self::SEG2_REC1_SERVER_VERSION)); - raw_driver_version = DriverVersion::__new(u64!(Self::SEG2_REC1_DRIVER_VERSION)); - raw_host_os = block[Self::SEG2_REC1_HOST_OS]; - raw_host_arch = block[Self::SEG2_REC1_HOST_ARCH]; - raw_host_ptr_width = block[Self::SEG2_REC1_HOST_PTR_WIDTH]; - raw_host_endian = block[Self::SEG2_REC1_HOST_ENDIAN]; - raw_file_class = block[Self::SEG2_REC1_FILE_CLASS]; - raw_file_specifier = block[Self::SEG2_REC1_FILE_SPECIFIER]; - raw_file_specifier_version = FileSpecifierVersion::__new(u16::from_le_bytes(memcpy( - &block[Self::SEG2_REC1_FILE_SPECIFIER_VERSION], - ))); - raw_runtime_epoch_time = - u128::from_le_bytes(memcpy(&block[Self::SEG2_REC2_RUNTIME_EPOCH_TIME])); - raw_paddding_block = memcpy::<8>(&block[Self::SEG3_PADDING_BLK]); - } - macro_rules! okay { - ($($expr:expr),* $(,)?) => { - $(($expr) &)*true - } - } - let okay_header_version = raw_header_version == versions::CURRENT_HEADER_VERSION; - let okay_server_version = raw_server_version == versions::CURRENT_SERVER_VERSION; - let okay_driver_version = raw_driver_version == versions::CURRENT_DRIVER_VERSION; - let okay = okay!( - // 1.1 mgblk - raw_magic == SDSS_MAGIC, - okay_header_version, - // 2.1.1 - okay_server_version, - okay_driver_version, - // 2.1.2 - raw_host_os <= HostOS::MAX, - raw_host_arch <= HostArch::MAX, - raw_host_ptr_width <= HostPointerWidth::MAX, - raw_host_endian <= HostEndian::MAX, - // 2.1.3 - raw_file_class <= FileScope::MAX, - raw_file_specifier <= FileSpecifier::MAX, - ); - if okay { - Ok(unsafe { - // UNSAFE(@ohsayan): the block ranges are very well defined - Self { - // 1.1 - magic_header_version: raw_header_version, - // 2.1.1 - genesis_static_sw_server_version: raw_server_version, - genesis_static_sw_driver_version: raw_driver_version, - // 2.1.2 - genesis_static_host_os: transmute(raw_host_os), - genesis_static_host_arch: transmute(raw_host_arch), - genesis_static_host_ptr_width: transmute(raw_host_ptr_width), - genesis_static_host_endian: transmute(raw_host_endian), - // 2.1.3 - genesis_static_file_class: transmute(raw_file_class), - genesis_static_file_specifier: transmute(raw_file_specifier), - genesis_static_file_specifier_version: raw_file_specifier_version, - // 2.2 - genesis_runtime_epoch_time: raw_runtime_epoch_time, - // 3 - genesis_padding_block: raw_paddding_block, - } - }) - } else { - let version_okay = okay_header_version & okay_server_version & okay_driver_version; - let md = ManuallyDrop::new([ - StorageError::HeaderDecodeCorruptedHeader, - StorageError::HeaderDecodeVersionMismatch, - ]); - Err(unsafe { - // UNSAFE(@ohsayan): while not needed, md for drop safety + correct index - md.as_ptr().add(!version_okay as usize).read().into() - }) - } - } -} - -impl SDSSStaticHeaderV1Compact { - const SEG1_MAGIC: Range = 0..8; - const SEG1_HEADER_VERSION: Range = 8..16; - const SEG2_REC1_SERVER_VERSION: Range = 16..24; - const SEG2_REC1_DRIVER_VERSION: Range = 24..32; - const SEG2_REC1_HOST_OS: usize = 32; - const SEG2_REC1_HOST_ARCH: usize = 33; - const SEG2_REC1_HOST_PTR_WIDTH: usize = 34; - const SEG2_REC1_HOST_ENDIAN: usize = 35; - const SEG2_REC1_FILE_CLASS: usize = 36; - const SEG2_REC1_FILE_SPECIFIER: usize = 37; - const SEG2_REC1_FILE_SPECIFIER_VERSION: Range = 38..40; - const SEG2_REC2_RUNTIME_EPOCH_TIME: Range = 40..56; - const SEG3_PADDING_BLK: Range = 56..64; - fn _encode( - file_class: FileScope, - file_specifier: FileSpecifier, - file_specifier_version: FileSpecifierVersion, - epoch_time: u128, - padding_block: [u8; 8], - ) -> [u8; 64] { - let mut ret = [0; 64]; - // 1. mgblk - ret[Self::SEG1_MAGIC].copy_from_slice(&SDSS_MAGIC.to_le_bytes()); - ret[Self::SEG1_HEADER_VERSION] - .copy_from_slice(&versions::CURRENT_HEADER_VERSION.little_endian_u64()); - // 2.1.1 - ret[Self::SEG2_REC1_SERVER_VERSION] - .copy_from_slice(&versions::CURRENT_SERVER_VERSION.little_endian()); - ret[Self::SEG2_REC1_DRIVER_VERSION] - .copy_from_slice(&versions::CURRENT_DRIVER_VERSION.little_endian()); - // 2.1.2 - ret[Self::SEG2_REC1_HOST_OS] = HostOS::new().value_u8(); - ret[Self::SEG2_REC1_HOST_ARCH] = HostArch::new().value_u8(); - ret[Self::SEG2_REC1_HOST_PTR_WIDTH] = HostPointerWidth::new().value_u8(); - ret[Self::SEG2_REC1_HOST_ENDIAN] = HostEndian::new().value_u8(); - // 2.1.3 - ret[Self::SEG2_REC1_FILE_CLASS] = file_class.value_u8(); - ret[Self::SEG2_REC1_FILE_SPECIFIER] = file_specifier.value_u8(); - ret[Self::SEG2_REC1_FILE_SPECIFIER_VERSION] - .copy_from_slice(&file_specifier_version.0.to_le_bytes()); - // 2.2 - ret[Self::SEG2_REC2_RUNTIME_EPOCH_TIME].copy_from_slice(&epoch_time.to_le_bytes()); - // 3 - ret[Self::SEG3_PADDING_BLK].copy_from_slice(&padding_block); - ret - } - pub fn _encode_auto( - file_class: FileScope, - file_specifier: FileSpecifier, - file_specifier_version: FileSpecifierVersion, - ) -> [u8; 64] { - let epoch_time = os::get_epoch_time(); - Self::_encode( - file_class, - file_specifier, - file_specifier_version, - epoch_time, - [0; 8], - ) - } -} - -#[allow(unused)] -impl SDSSStaticHeaderV1Compact { - pub fn header_version(&self) -> HeaderVersion { - self.magic_header_version - } - pub fn server_version(&self) -> ServerVersion { - self.genesis_static_sw_server_version - } - pub fn driver_version(&self) -> DriverVersion { - self.genesis_static_sw_driver_version - } - pub fn host_os(&self) -> HostOS { - self.genesis_static_host_os - } - pub fn host_arch(&self) -> HostArch { - self.genesis_static_host_arch - } - pub fn host_ptr_width(&self) -> HostPointerWidth { - self.genesis_static_host_ptr_width - } - pub fn host_endian(&self) -> HostEndian { - self.genesis_static_host_endian - } - pub fn file_class(&self) -> FileScope { - self.genesis_static_file_class - } - pub fn file_specifier(&self) -> FileSpecifier { - self.genesis_static_file_specifier - } - pub fn file_specifier_version(&self) -> FileSpecifierVersion { - self.genesis_static_file_specifier_version - } - pub fn epoch_time(&self) -> u128 { - self.genesis_runtime_epoch_time - } - pub fn padding_block(&self) -> [u8; 8] { - self.genesis_padding_block - } -} - -impl Header for SDSSStaticHeaderV1Compact { - type EncodeArgs = (FileScope, FileSpecifier, FileSpecifierVersion); - type DecodeArgs = (); - type DecodeVerifyArgs = Self::EncodeArgs; - fn encode( - f: &mut SDSSFileIO, - (scope, spec, spec_v): Self::EncodeArgs, - ) -> RuntimeResult<()> { - let b = Self::_encode_auto(scope, spec, spec_v); - f.fsynced_write(&b) - } - fn decode( - f: &mut SDSSFileIO, - _: Self::DecodeArgs, - ) -> RuntimeResult { - let mut buf = [0u8; 64]; - f.read_to_buffer(&mut buf)?; - Self::_decode(buf) - } - fn verify(&self, (scope, spec, spec_v): Self::DecodeVerifyArgs) -> RuntimeResult<()> { - if (self.file_class() == scope) - & (self.file_specifier() == spec) - & (self.file_specifier_version() == spec_v) - { - Ok(()) - } else { - Err(StorageError::HeaderDecodeDataMismatch.into()) - } - } +pub(super) struct SysDBV1; +impl sdss::SimpleFileSpecV1 for SysDBV1 { + type HeaderSpec = HeaderImplV1; + const FILE_CLASS: FileScope = FileScope::FlatmapData; + const FILE_SPECIFIER: FileSpecifier = FileSpecifier::SysDB; + const FILE_SPECFIER_VERSION: FileSpecifierVersion = FileSpecifierVersion::__new(0); } diff --git a/server/src/engine/storage/v1/sysdb.rs b/server/src/engine/storage/v1/sysdb.rs index a0a4eace..3b6d49d9 100644 --- a/server/src/engine/storage/v1/sysdb.rs +++ b/server/src/engine/storage/v1/sysdb.rs @@ -25,13 +25,15 @@ */ use { - super::rw::FileOpen, crate::engine::{ config::{ConfigAuth, ConfigMode}, data::{cell::Datacell, DictEntryGeneric, DictGeneric}, error::{RuntimeResult, StorageError}, fractal::sys_store::{SysAuth, SysAuthUser, SysConfig, SysHostData, SystemStore}, - storage::v1::{inf, spec, RawFSInterface, SDSSFileIO}, + storage::{ + common::interface::fs_traits::{FSInterface, FileOpen}, + v1::{inf, spec, SDSSFileIO}, + }, }, parking_lot::RwLock, std::collections::HashMap, @@ -68,7 +70,7 @@ fn rkey( } } -impl SystemStore { +impl SystemStore { const SYSDB_PATH: &'static str = "sys.db"; const SYSDB_COW_PATH: &'static str = "sys.db.cow"; const SYS_KEY_AUTH: &'static str = "auth"; @@ -104,7 +106,7 @@ impl SystemStore { } } -impl SystemStore { +impl SystemStore { fn _sync(&self, mut f: SDSSFileIO, auth: &SysAuth) -> RuntimeResult<()> { let cfg = self.system_store(); // prepare our flat file @@ -141,7 +143,7 @@ impl SystemStore { fn _sync_with(&self, target: &str, cow: &str, auth: &SysAuth) -> RuntimeResult<()> { let f = SDSSFileIO::create::(cow)?; self._sync(f, auth)?; - Fs::fs_rename_file(cow, target) + Fs::fs_rename(cow, target) } fn restore_and_sync( f: SDSSFileIO, @@ -180,7 +182,7 @@ impl SystemStore { } fn _restore(mut f: SDSSFileIO, run_mode: ConfigMode) -> RuntimeResult { let mut sysdb_data = - inf::dec::dec_dict_full::(&f.load_remaining_into_buffer()?)?; + inf::dec::dec_dict_full::(&f.read_full()?)?; // get our auth and sys stores let mut auth_store = rkey( &mut sysdb_data, diff --git a/server/src/engine/storage/v1/tests.rs b/server/src/engine/storage/v1/tests.rs index 49700f1b..d5f8937e 100644 --- a/server/src/engine/storage/v1/tests.rs +++ b/server/src/engine/storage/v1/tests.rs @@ -24,7 +24,7 @@ * */ -type VirtualFS = super::memfs::VirtualFS; +type VirtualFS = crate::engine::storage::common::interface::fs_test::VirtualFS; mod batch; mod rw; diff --git a/server/src/engine/storage/v1/tests/batch.rs b/server/src/engine/storage/v1/tests/batch.rs index 75ab5578..2a4a9067 100644 --- a/server/src/engine/storage/v1/tests/batch.rs +++ b/server/src/engine/storage/v1/tests/batch.rs @@ -36,14 +36,16 @@ use { }, data::{cell::Datacell, tag::TagSelector, uuid::Uuid}, idx::MTIndex, - storage::v1::{ - batch_jrnl::{ - DataBatchPersistDriver, DataBatchRestoreDriver, DecodedBatchEvent, - DecodedBatchEventKind, NormalBatch, + storage::{ + common::interface::{fs_test::VirtualFS, fs_traits::FileOpen}, + v1::{ + batch_jrnl::{ + DataBatchPersistDriver, DataBatchRestoreDriver, DecodedBatchEvent, + DecodedBatchEventKind, NormalBatch, + }, + rw::SDSSFileIO, + spec, }, - memfs::VirtualFS, - rw::{FileOpen, SDSSFileIO}, - spec, }, }, util::test_utils, @@ -57,7 +59,7 @@ fn pkey(v: impl Into) -> PrimaryIndexKey { fn open_file( fpath: &str, -) -> FileOpen, (SDSSFileIO, spec::SDSSStaticHeaderV1Compact)> { +) -> FileOpen, (SDSSFileIO, super::super::Header)> { SDSSFileIO::open_or_create_perm_rw::(fpath).unwrap() } diff --git a/server/src/engine/storage/v1/tests/rw.rs b/server/src/engine/storage/v1/tests/rw.rs index d964cf81..8cd30794 100644 --- a/server/src/engine/storage/v1/tests/rw.rs +++ b/server/src/engine/storage/v1/tests/rw.rs @@ -24,9 +24,9 @@ * */ -use crate::engine::storage::v1::{ - rw::{FileOpen, SDSSFileIO}, - spec, +use crate::engine::storage::{ + common::interface::fs_traits::FileOpen, + v1::{rw::SDSSFileIO, spec}, }; #[test] diff --git a/server/src/engine/storage/v2/mod.rs b/server/src/engine/storage/v2/mod.rs new file mode 100644 index 00000000..4893a0f7 --- /dev/null +++ b/server/src/engine/storage/v2/mod.rs @@ -0,0 +1,27 @@ +/* + * Created on Sun Jan 07 2024 + * + * This file is a part of Skytable + * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source + * NoSQL database written by Sayan Nandan ("the Author") with the + * vision to provide flexibility in data modelling without compromising + * on performance, queryability or scalability. + * + * Copyright (c) 2024, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +mod spec; diff --git a/server/src/engine/storage/v2/spec.rs b/server/src/engine/storage/v2/spec.rs new file mode 100644 index 00000000..8e7ff45a --- /dev/null +++ b/server/src/engine/storage/v2/spec.rs @@ -0,0 +1,92 @@ +/* + * Created on Thu Jan 11 2024 + * + * This file is a part of Skytable + * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source + * NoSQL database written by Sayan Nandan ("the Author") with the + * vision to provide flexibility in data modelling without compromising + * on performance, queryability or scalability. + * + * Copyright (c) 2024, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +use { + crate::engine::storage::common::{ + sdss::{self, HeaderV1Spec}, + versions::{self, DriverVersion, FileSpecifierVersion, ServerVersion}, + }, + std::mem::transmute, +}; + +/// The file scope +#[repr(u8)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, sky_macros::EnumMethods)] +pub enum FileClass { + EventLog = 0, + Batch = 1, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, sky_macros::EnumMethods)] +#[repr(u8)] +pub enum FileSpecifier { + GlobalNS = 0, + ModelData = 1, +} + +impl sdss::HeaderV1Enumeration for FileClass { + const MAX: u8 = FileClass::MAX; + unsafe fn new(x: u8) -> Self { + transmute(x) + } + fn repr_u8(&self) -> u8 { + self.value_u8() + } +} + +impl sdss::HeaderV1Enumeration for FileSpecifier { + const MAX: u8 = FileSpecifier::MAX; + unsafe fn new(x: u8) -> Self { + transmute(x) + } + fn repr_u8(&self) -> u8 { + self.value_u8() + } +} + +pub struct HeaderImplV2; +impl HeaderV1Spec for HeaderImplV2 { + type FileClass = FileClass; + type FileSpecifier = FileSpecifier; + const CURRENT_SERVER_VERSION: ServerVersion = versions::v2::V2_SERVER_VERSION; + const CURRENT_DRIVER_VERSION: DriverVersion = versions::v2::V2_DRIVER_VERSION; +} + +pub struct SystemDatabaseV1; +impl sdss::SimpleFileSpecV1 for SystemDatabaseV1 { + type HeaderSpec = HeaderImplV2; + const FILE_CLASS: FileClass = FileClass::EventLog; + const FILE_SPECIFIER: FileSpecifier = FileSpecifier::GlobalNS; + const FILE_SPECFIER_VERSION: FileSpecifierVersion = FileSpecifierVersion::__new(0); +} + +pub struct ModelDataBatchAofV1; +impl sdss::SimpleFileSpecV1 for ModelDataBatchAofV1 { + type HeaderSpec = HeaderImplV2; + const FILE_CLASS: FileClass = FileClass::Batch; + const FILE_SPECIFIER: FileSpecifier = FileSpecifier::ModelData; + const FILE_SPECFIER_VERSION: FileSpecifierVersion = FileSpecifierVersion::__new(0); +} diff --git a/server/src/engine/txn/gns/mod.rs b/server/src/engine/txn/gns/mod.rs index 09f33f5c..f741d01e 100644 --- a/server/src/engine/txn/gns/mod.rs +++ b/server/src/engine/txn/gns/mod.rs @@ -31,9 +31,12 @@ use { data::uuid::Uuid, error::{RuntimeResult, TransactionError}, mem::BufferedScanner, - storage::v1::{ - inf::{self, PersistObject}, - JournalAdapter, JournalWriter, LocalFS, RawFSInterface, + storage::{ + safe_interfaces::{FSInterface, LocalFS}, + v1::{ + inf::{self, PersistObject}, + JournalAdapter, JournalWriter, + }, }, }, util::EndianQW, @@ -57,11 +60,11 @@ pub use { }; /// The GNS transaction driver is used to handle DDL transactions -pub struct GNSTransactionDriverAnyFS { +pub struct GNSTransactionDriverAnyFS { journal: JournalWriter, } -impl GNSTransactionDriverAnyFS { +impl GNSTransactionDriverAnyFS { pub fn new(journal: JournalWriter) -> Self { Self { journal } }