Implement and refactor to use common storage interfaces

next
Sayan Nandan 8 months ago
parent 3f2b464975
commit f769175083
No known key found for this signature in database
GPG Key ID: 0EBD769024B24F0A

@ -33,7 +33,7 @@ use {
fractal::{GenericTask, GlobalInstanceLike, Task},
idx::STIndex,
ql::ddl::{alt::AlterSpace, crt::CreateSpace, drop::DropSpace},
storage::v1::{loader::SEInitState, RawFSInterface},
storage::{safe_interfaces::FSInterface, v1::loader::SEInitState},
txn::gns as gnstxn,
},
std::collections::HashSet,

@ -28,7 +28,7 @@ use {
super::util,
crate::engine::{
error::RuntimeResult,
storage::v1::{data_batch::DataBatchPersistDriver, RawFSInterface},
storage::{safe_interfaces::FSInterface, v1::data_batch::DataBatchPersistDriver},
txn::gns::GNSTransactionDriverAnyFS,
},
parking_lot::Mutex,
@ -36,13 +36,13 @@ use {
};
/// GNS driver
pub(super) struct FractalGNSDriver<Fs: RawFSInterface> {
pub(super) struct FractalGNSDriver<Fs: FSInterface> {
#[allow(unused)]
status: util::Status,
pub(super) txn_driver: Mutex<GNSTransactionDriverAnyFS<Fs>>,
}
impl<Fs: RawFSInterface> FractalGNSDriver<Fs> {
impl<Fs: FSInterface> FractalGNSDriver<Fs> {
pub(super) fn new(txn_driver: GNSTransactionDriverAnyFS<Fs>) -> Self {
Self {
status: util::Status::new_okay(),
@ -55,13 +55,13 @@ impl<Fs: RawFSInterface> FractalGNSDriver<Fs> {
}
/// Model driver
pub struct FractalModelDriver<Fs: RawFSInterface> {
pub struct FractalModelDriver<Fs: FSInterface> {
#[allow(unused)]
hooks: Arc<FractalModelHooks>,
batch_driver: Mutex<DataBatchPersistDriver<Fs>>,
}
impl<Fs: RawFSInterface> FractalModelDriver<Fs> {
impl<Fs: FSInterface> FractalModelDriver<Fs> {
/// Initialize a model driver with default settings
pub fn init(batch_driver: DataBatchPersistDriver<Fs>) -> Self {
Self {

@ -33,7 +33,7 @@ use {
EntityIDRef,
},
data::uuid::Uuid,
storage::v1::LocalFS,
storage::safe_interfaces::LocalFS,
},
util::os,
},

@ -31,7 +31,7 @@ use {
data::uuid::Uuid,
storage::{
self,
v1::{LocalFS, RawFSInterface},
safe_interfaces::{FSInterface, LocalFS},
},
txn::gns::GNSTransactionDriverAnyFS,
},
@ -105,7 +105,7 @@ pub unsafe fn load_and_enable_all(
/// Something that represents the global state
pub trait GlobalInstanceLike {
type FileSystem: RawFSInterface;
type FileSystem: FSInterface;
const FS_IS_NON_NULL: bool = Self::FileSystem::NOT_NULL;
// stat
fn get_max_delta_size(&self) -> usize;

@ -28,7 +28,7 @@ use {
crate::engine::{
config::{ConfigAuth, ConfigMode},
error::{QueryError, QueryResult},
storage::v1::RawFSInterface,
storage::safe_interfaces::FSInterface,
},
parking_lot::RwLock,
std::{
@ -147,7 +147,7 @@ impl SysHostData {
}
}
impl<Fs: RawFSInterface> SystemStore<Fs> {
impl<Fs: FSInterface> SystemStore<Fs> {
pub fn _new(syscfg: SysConfig) -> Self {
Self {
syscfg,

@ -34,10 +34,7 @@ use {
data::uuid::Uuid,
storage::{
self,
v1::{
memfs::{NullFS, VirtualFS},
RawFSInterface,
},
safe_interfaces::{FSInterface, NullFS, VirtualFS},
},
txn::gns::GNSTransactionDriverAnyFS,
},
@ -46,7 +43,7 @@ use {
};
/// A `test` mode global implementation
pub struct TestGlobal<Fs: RawFSInterface = VirtualFS> {
pub struct TestGlobal<Fs: FSInterface = VirtualFS> {
gns: GlobalNS,
hp_queue: RwLock<Vec<Task<CriticalTask>>>,
lp_queue: RwLock<Vec<Task<GenericTask>>>,
@ -57,7 +54,7 @@ pub struct TestGlobal<Fs: RawFSInterface = VirtualFS> {
sys_cfg: SystemStore<Fs>,
}
impl<Fs: RawFSInterface> TestGlobal<Fs> {
impl<Fs: FSInterface> TestGlobal<Fs> {
fn new(
gns: GlobalNS,
max_delta_size: usize,
@ -75,7 +72,7 @@ impl<Fs: RawFSInterface> TestGlobal<Fs> {
}
}
impl<Fs: RawFSInterface> TestGlobal<Fs> {
impl<Fs: FSInterface> TestGlobal<Fs> {
pub fn new_with_driver_id(log_name: &str) -> Self {
let gns = GlobalNS::empty();
let driver = storage::v1::loader::open_gns_driver(log_name, &gns)
@ -100,7 +97,7 @@ impl TestGlobal<NullFS> {
}
}
impl<Fs: RawFSInterface> GlobalInstanceLike for TestGlobal<Fs> {
impl<Fs: FSInterface> GlobalInstanceLike for TestGlobal<Fs> {
type FileSystem = Fs;
fn namespace(&self) -> &GlobalNS {
&self.gns
@ -162,7 +159,7 @@ impl<Fs: RawFSInterface> GlobalInstanceLike for TestGlobal<Fs> {
}
}
impl<Fs: RawFSInterface> Drop for TestGlobal<Fs> {
impl<Fs: FSInterface> Drop for TestGlobal<Fs> {
fn drop(&mut self) {
let mut txn_driver = self.txn_driver.lock();
txn_driver.__journal_mut().__close_mut().unwrap();

@ -420,3 +420,15 @@ macro_rules! local_ref {
::std::thread::LocalKey::with(&$ident, |v| _f(v, $call))
}};
}
macro_rules! var {
(let $($name:ident),* $(,)?) => {
$(let $name;)*
}
}
macro_rules! okay {
($($expr:expr),* $(,)?) => {
$(($expr) &)*true
}
}

@ -56,6 +56,12 @@ pub unsafe fn dealloc_array<T>(ptr: *mut T, l: usize) {
}
}
pub unsafe fn memcpy<const N: usize>(src: &[u8]) -> [u8; N] {
let mut dst = [0u8; N];
src.as_ptr().copy_to_nonoverlapping(dst.as_mut_ptr(), N);
dst
}
/// Native double pointer width (note, native != arch native, but host native)
pub struct NativeDword([usize; 2]);
/// Native triple pointer width (note, native != arch native, but host native)

@ -51,9 +51,9 @@ use {
context::{self, Subsystem},
sys_store::SystemStore,
},
storage::v1::{
loader::{self, SEInitState},
LocalFS,
storage::{
safe_interfaces::LocalFS,
v1::loader::{self, SEInitState},
},
},
crate::util::os::TerminationSignal,

@ -0,0 +1,57 @@
/*
* Created on Sun Sep 03 2023
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2023, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
//! # Checksum utils
//!
//! This module contains utils for handling checksums
//!
use crc::{Crc, Digest, CRC_64_XZ};
/*
NOTE(@ohsayan): we're currently using crc's impl. but the reason I decided to make a wrapper is because I have a
different impl in mind
*/
const CRC64: Crc<u64> = Crc::<u64>::new(&CRC_64_XZ);
pub struct SCrc64 {
digest: Digest<'static, u64>,
}
impl SCrc64 {
pub const fn new() -> Self {
Self {
digest: CRC64.digest(),
}
}
pub fn recompute_with_new_var_block(&mut self, b: &[u8]) {
self.digest.update(b)
}
pub fn finish(self) -> u64 {
self.digest.finalize()
}
}

@ -0,0 +1,205 @@
/*
* Created on Sun Jan 07 2024
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2024, Sayan Nandan <nandansayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use {
super::fs_traits::{
FSInterface, FileInterface, FileInterfaceBufWrite, FileInterfaceExt, FileInterfaceRead,
FileInterfaceWrite, FileInterfaceWriteExt, FileOpen,
},
crate::engine::RuntimeResult,
std::{
fs::{self, File},
io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write},
},
};
/*
local fs impls
*/
/// A type representing the host's local filesystem (or atleast where our data directory is)
pub struct LocalFS;
fn cvt<T, E1, E2: From<E1>>(r: Result<T, E1>) -> Result<T, E2> {
r.map_err(Into::into)
}
impl FSInterface for LocalFS {
type File = File;
fn fs_remove_file(fpath: &str) -> RuntimeResult<()> {
cvt(fs::remove_file(fpath))
}
fn fs_rename(from: &str, to: &str) -> RuntimeResult<()> {
cvt(fs::rename(from, to))
}
fn fs_create_dir(fpath: &str) -> RuntimeResult<()> {
cvt(fs::create_dir(fpath))
}
fn fs_create_dir_all(fpath: &str) -> RuntimeResult<()> {
cvt(fs::create_dir_all(fpath))
}
fn fs_delete_dir(fpath: &str) -> RuntimeResult<()> {
cvt(fs::remove_dir(fpath))
}
fn fs_delete_dir_all(fpath: &str) -> RuntimeResult<()> {
cvt(fs::remove_dir_all(fpath))
}
fn fs_fopen_or_create_rw(fpath: &str) -> RuntimeResult<super::fs_traits::FileOpen<Self::File>> {
let r = || -> Result<_, std::io::Error> {
let f = File::options()
.create(true)
.read(true)
.write(true)
.open(fpath)?;
let md = f.metadata()?;
if md.len() == 0 {
Ok(FileOpen::Created(f))
} else {
Ok(FileOpen::Existing(f))
}
};
cvt(r())
}
fn fs_fopen_rw(fpath: &str) -> RuntimeResult<Self::File> {
let f = File::options().read(true).write(true).open(fpath)?;
Ok(f)
}
fn fs_fcreate_rw(fpath: &str) -> RuntimeResult<Self::File> {
let f = File::options()
.create_new(true)
.read(true)
.write(true)
.open(fpath)?;
Ok(f)
}
}
/*
common impls for files
*/
impl<R: Read> FileInterfaceRead for R {
fn fread_exact(&mut self, buf: &mut [u8]) -> RuntimeResult<()> {
cvt(self.read_exact(buf))
}
}
impl<W: Write> FileInterfaceWrite for W {
fn fwrite(&mut self, buf: &[u8]) -> RuntimeResult<u64> {
cvt(self.write(buf).map(|v| v as _))
}
}
/*
local file impls
*/
impl FileInterface for File {
type BufReader = BufReader<Self>;
type BufWriter = BufWriter<Self>;
fn upgrade_to_buffered_reader(self) -> RuntimeResult<Self::BufReader> {
Ok(BufReader::new(self))
}
fn upgrade_to_buffered_writer(self) -> RuntimeResult<Self::BufWriter> {
Ok(BufWriter::new(self))
}
fn downgrade_reader(r: Self::BufReader) -> RuntimeResult<Self> {
Ok(r.into_inner())
}
fn downgrade_writer(mut r: Self::BufWriter) -> RuntimeResult<Self> {
// TODO(@ohsayan): maybe we'll want to explicitly handle not syncing this?
r.flush()?;
let (me, err) = r.into_parts();
match err {
Ok(x) if x.is_empty() => Ok(me),
Ok(_) | Err(_) => {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
"failed to flush data from buffer into sink",
)
.into())
}
}
}
}
/// A trait for handling wrappers of [`std::fs::File`]
trait AsLocalFile {
fn file(&self) -> &File;
fn file_mut(&mut self) -> &mut File;
}
impl AsLocalFile for File {
fn file(&self) -> &File {
self
}
fn file_mut(&mut self) -> &mut File {
self
}
}
impl AsLocalFile for BufReader<File> {
fn file(&self) -> &File {
self.get_ref()
}
fn file_mut(&mut self) -> &mut File {
self.get_mut()
}
}
impl AsLocalFile for BufWriter<File> {
fn file(&self) -> &File {
self.get_ref()
}
fn file_mut(&mut self) -> &mut File {
self.get_mut()
}
}
impl FileInterfaceBufWrite for BufWriter<File> {
fn sync_write_cache(&mut self) -> RuntimeResult<()> {
// TODO(@ohsayan): maybe we'll want to explicitly handle not syncing this?
cvt(self.flush())
}
}
impl<F: AsLocalFile> FileInterfaceExt for F {
fn fext_length(&self) -> RuntimeResult<u64> {
Ok(self.file().metadata()?.len())
}
fn fext_cursor(&mut self) -> RuntimeResult<u64> {
cvt(self.file_mut().stream_position())
}
fn fext_seek_ahead_from_start_by(&mut self, by: u64) -> RuntimeResult<()> {
cvt(self.file_mut().seek(SeekFrom::Start(by)).map(|_| ()))
}
}
impl FileInterfaceWriteExt for File {
fn fwext_truncate_to(&mut self, to: u64) -> RuntimeResult<()> {
cvt(self.set_len(to))
}
}

@ -0,0 +1,607 @@
/*
* Created on Sun Jan 07 2024
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2024, Sayan Nandan <nandansayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
//! File system emulation
//!
//! This directory contains some implementations of virtual file systems (either emulating `/tmp` or
//! `/dev/null`) that are directly implemented at the application level with some necessary changes
//! required for testing
//!
use {
super::fs_traits::{
FSInterface, FileInterface, FileInterfaceBufWrite, FileInterfaceExt, FileInterfaceRead,
FileInterfaceWrite, FileInterfaceWriteExt, FileOpen,
},
crate::engine::{sync::cell::Lazy, RuntimeResult},
parking_lot::RwLock,
std::{
collections::{
hash_map::{Entry, OccupiedEntry},
HashMap,
},
io::{Error, ErrorKind},
},
};
/*
vfs definitions
*/
/// # VirtualFS
///
/// A virtual file system stored entirely in the process's memory (inclusive of swap if enabled; no explicit discrimination is made)
///
/// The virtual file system is generally intended for being utilized as an in-memory filesystem, primarily for testing
/// purposes and has a lot of limitations.
///
/// It has support for:
/// - nested directories
/// - read/write permissions
/// - file position tracking and seeking
/// - directory operations
pub struct VirtualFS;
/// A virtual directory
type VDir = HashMap<Box<str>, VNode>;
/// An iterator over the components of a file path (alias)
type ComponentIter<'a> = std::iter::Take<std::vec::IntoIter<&'a str>>;
/**
vnode
---
either a vfile or a vdir
*/
#[derive(Debug)]
pub(super) enum VNode {
Dir(HashMap<Box<str>, Self>),
File(VFile),
}
impl VNode {
fn as_dir_mut(&mut self) -> Option<&mut VDir> {
match self {
Self::Dir(d) => Some(d),
Self::File(_) => None,
}
}
}
/*
vfile
*/
#[derive(Debug)]
pub struct VFile {
read: bool,
write: bool,
data: Vec<u8>,
pos: usize,
}
impl VFile {
fn new(read: bool, write: bool, data: Vec<u8>, pos: usize) -> Self {
Self {
read,
write,
data,
pos,
}
}
fn current(&self) -> &[u8] {
&self.data[self.pos..]
}
}
mod err {
//! Errors
//!
//! These are custom errors returned by the virtual file system
use {
crate::engine::RuntimeResult,
std::io::{Error, ErrorKind},
};
pub(super) fn item_is_not_file<T>() -> RuntimeResult<T> {
Err(Error::new(ErrorKind::InvalidInput, "found directory, not a file").into())
}
pub(super) fn file_in_dir_path<T>() -> RuntimeResult<T> {
Err(Error::new(ErrorKind::InvalidInput, "found file in directory path").into())
}
pub(super) fn dir_missing_in_path<T>() -> RuntimeResult<T> {
Err(Error::new(ErrorKind::InvalidInput, "could not find directory in path").into())
}
pub(super) fn could_not_find_item<T>() -> RuntimeResult<T> {
Err(Error::new(ErrorKind::NotFound, "could not find item").into())
}
}
mod util {
use {
super::{err, ComponentIter, VDir, VNode},
crate::engine::RuntimeResult,
};
pub(super) fn split_parts(fpath: &str) -> Vec<&str> {
fpath.split("/").collect()
}
pub(super) fn split_target_and_components(fpath: &str) -> (&str, ComponentIter) {
let parts = split_parts(fpath);
let target = parts.last().unwrap();
let component_len = parts.len() - 1;
(target, parts.into_iter().take(component_len))
}
pub(super) fn find_target_dir_mut<'a>(
components: ComponentIter,
mut current: &'a mut VDir,
) -> RuntimeResult<&'a mut VDir> {
for component in components {
match current.get_mut(component) {
Some(VNode::Dir(d)) => current = d,
Some(VNode::File(_)) => return err::file_in_dir_path(),
None => return err::dir_missing_in_path(),
}
}
Ok(current)
}
pub(super) fn find_target_dir<'a>(
components: ComponentIter,
mut current: &'a VDir,
) -> RuntimeResult<&'a VDir> {
for component in components {
match current.get(component) {
Some(VNode::Dir(d)) => current = d,
Some(VNode::File(_)) => return err::file_in_dir_path(),
None => return err::dir_missing_in_path(),
}
}
Ok(current)
}
}
/*
vfs impl:
- nested directory structure
- make parents
- make child
*/
impl VirtualFS {
/// Get a handle to the virtual filesystem
fn handle() -> &'static RwLock<VDir> {
static VFS: Lazy<RwLock<VDir>, fn() -> RwLock<VDir>> = Lazy::new(|| Default::default());
&VFS
}
fn with_file_mut<T>(
fpath: &str,
mut f: impl FnMut(&mut VFile) -> RuntimeResult<T>,
) -> RuntimeResult<T> {
let mut vfs = Self::handle().write();
let (target_file, components) = util::split_target_and_components(fpath);
let target_dir = util::find_target_dir_mut(components, &mut vfs)?;
match target_dir.get_mut(target_file) {
Some(VNode::File(file)) => f(file),
Some(VNode::Dir(_)) => return err::item_is_not_file(),
None => return Err(Error::from(ErrorKind::NotFound).into()),
}
}
fn with_file<T>(
fpath: &str,
mut f: impl FnMut(&VFile) -> RuntimeResult<T>,
) -> RuntimeResult<T> {
let vfs = Self::handle().read();
let (target_file, components) = util::split_target_and_components(fpath);
let target_dir = util::find_target_dir(components, &vfs)?;
match target_dir.get(target_file) {
Some(VNode::File(file)) => f(file),
Some(VNode::Dir(_)) => return err::item_is_not_file(),
None => return Err(Error::from(ErrorKind::NotFound).into()),
}
}
fn with_item_mut<T>(
fpath: &str,
f: impl Fn(OccupiedEntry<Box<str>, VNode>) -> RuntimeResult<T>,
) -> RuntimeResult<T> {
let mut vfs = Self::handle().write();
let mut current = &mut *vfs;
// process components
let (target, components) = util::split_target_and_components(fpath);
for component in components {
match current.get_mut(component) {
Some(VNode::Dir(dir)) => {
current = dir;
}
Some(VNode::File(_)) => return err::file_in_dir_path(),
None => return err::dir_missing_in_path(),
}
}
match current.entry(target.into()) {
Entry::Occupied(item) => return f(item),
Entry::Vacant(_) => return err::could_not_find_item(),
}
}
fn delete_dir(fpath: &str, allow_if_non_empty: bool) -> RuntimeResult<()> {
Self::with_item_mut(fpath, |node| match node.get() {
VNode::Dir(d) => {
if allow_if_non_empty || d.is_empty() {
node.remove();
return Ok(());
}
return Err(Error::new(ErrorKind::InvalidInput, "directory is not empty").into());
}
VNode::File(_) => return err::file_in_dir_path(),
})
}
}
impl FSInterface for VirtualFS {
type File = VFileDescriptor;
fn fs_rename(from: &str, to: &str) -> RuntimeResult<()> {
// get file data
let data = VirtualFS::with_file(from, |f| Ok(f.data.clone()))?;
// create new file
let file = VirtualFS::fs_fopen_or_create_rw(to)?;
match file {
FileOpen::Created(mut c) => {
c.fw_write_all(&data)?;
}
FileOpen::Existing(mut e) => {
e.fwext_truncate_to(0)?;
e.fw_write_all(&data)?;
}
}
// delete old file
Self::fs_remove_file(from)
}
fn fs_remove_file(fpath: &str) -> RuntimeResult<()> {
VirtualFS::with_item_mut(fpath, |e| match e.get() {
VNode::File(_) => {
e.remove();
Ok(())
}
_ => return err::item_is_not_file(),
})
}
fn fs_create_dir(fpath: &str) -> RuntimeResult<()> {
// get vfs
let mut vfs = VirtualFS::handle().write();
// get root dir
let mut current = &mut *vfs;
// process components
let (target, mut components) = util::split_target_and_components(fpath);
while let Some(component) = components.next() {
match current.get_mut(component) {
Some(VNode::Dir(d)) => {
current = d;
}
Some(VNode::File(_)) => return err::file_in_dir_path(),
None => return err::dir_missing_in_path(),
}
}
match current.entry(target.into()) {
Entry::Occupied(_) => return Err(Error::from(ErrorKind::AlreadyExists).into()),
Entry::Vacant(ve) => {
ve.insert(VNode::Dir(into_dict!()));
Ok(())
}
}
}
fn fs_create_dir_all(fpath: &str) -> RuntimeResult<()> {
let mut vfs = VirtualFS::handle().write();
fn create_ahead(mut ahead: &[&str], current: &mut VDir) -> RuntimeResult<()> {
if ahead.is_empty() {
return Ok(());
}
let this = ahead[0];
ahead = &ahead[1..];
match current.get_mut(this) {
Some(VNode::Dir(d)) => {
if ahead.is_empty() {
// hmm, this was the list dir that was to be created, but it already exists
return Err(Error::from(ErrorKind::AlreadyExists).into());
}
return create_ahead(ahead, d);
}
Some(VNode::File(_)) => return err::file_in_dir_path(),
None => {
let _ = current.insert(this.into(), VNode::Dir(into_dict!()));
let dir = current.get_mut(this).unwrap().as_dir_mut().unwrap();
return create_ahead(ahead, dir);
}
}
}
let pieces = util::split_parts(fpath);
create_ahead(&pieces, &mut *vfs)
}
fn fs_delete_dir(fpath: &str) -> RuntimeResult<()> {
VirtualFS::delete_dir(fpath, false)
}
fn fs_delete_dir_all(fpath: &str) -> RuntimeResult<()> {
VirtualFS::delete_dir(fpath, true)
}
fn fs_fopen_or_create_rw(fpath: &str) -> RuntimeResult<FileOpen<Self::File>> {
let mut vfs = VirtualFS::handle().write();
// components
let (target_file, components) = util::split_target_and_components(fpath);
let target_dir = util::find_target_dir_mut(components, &mut vfs)?;
match target_dir.entry(target_file.into()) {
Entry::Occupied(mut oe) => match oe.get_mut() {
VNode::File(f) => {
f.read = true;
f.write = true;
Ok(FileOpen::Existing(VFileDescriptor(fpath.into())))
}
VNode::Dir(_) => return err::item_is_not_file(),
},
Entry::Vacant(v) => {
v.insert(VNode::File(VFile::new(true, true, vec![], 0)));
Ok(FileOpen::Created(VFileDescriptor(fpath.into())))
}
}
}
fn fs_fcreate_rw(fpath: &str) -> RuntimeResult<Self::File> {
let mut vfs = VirtualFS::handle().write();
let (target_file, components) = util::split_target_and_components(fpath);
let target_dir = util::find_target_dir_mut(components, &mut vfs)?;
match target_dir.entry(target_file.into()) {
Entry::Occupied(k) => {
match k.get() {
VNode::Dir(_) => {
return Err(Error::new(
ErrorKind::AlreadyExists,
"found directory with same name where file was to be created",
)
.into());
}
VNode::File(_) => {
// the file already exists
return Err(Error::new(
ErrorKind::AlreadyExists,
"the file already exists",
)
.into());
}
}
}
Entry::Vacant(v) => {
// no file exists, we can create this
v.insert(VNode::File(VFile::new(true, true, vec![], 0)));
Ok(VFileDescriptor(fpath.into()))
}
}
}
fn fs_fopen_rw(fpath: &str) -> RuntimeResult<Self::File> {
VirtualFS::with_file_mut(fpath, |f| {
f.read = true;
f.write = true;
Ok(VFileDescriptor(fpath.into()))
})
}
}
/*
vfile & descriptor impls
(this is our `File` but a temporary, completely in-memory file)
*/
pub struct VFileDescriptor(Box<str>);
impl Drop for VFileDescriptor {
fn drop(&mut self) {
let _ = VirtualFS::with_file_mut(&self.0, |f| {
f.read = false;
f.write = false;
f.pos = 0;
Ok(())
});
}
}
impl FileInterface for VFileDescriptor {
type BufReader = Self;
type BufWriter = Self;
fn upgrade_to_buffered_reader(self) -> RuntimeResult<Self::BufReader> {
Ok(self)
}
fn upgrade_to_buffered_writer(self) -> RuntimeResult<Self::BufWriter> {
Ok(self)
}
fn downgrade_reader(r: Self::BufReader) -> RuntimeResult<Self> {
Ok(r)
}
fn downgrade_writer(r: Self::BufWriter) -> RuntimeResult<Self> {
Ok(r)
}
}
impl FileInterfaceRead for VFileDescriptor {
fn fread_exact(&mut self, buf: &mut [u8]) -> RuntimeResult<()> {
VirtualFS::with_file_mut(&self.0, |file| {
if !file.read {
return Err(
Error::new(ErrorKind::PermissionDenied, "Read permission denied").into(),
);
}
let available_bytes = file.current().len();
if available_bytes < buf.len() {
return Err(Error::from(ErrorKind::UnexpectedEof).into());
}
buf.copy_from_slice(&file.data[file.pos..file.pos + buf.len()]);
file.pos += buf.len();
Ok(())
})
}
}
impl FileInterfaceWrite for VFileDescriptor {
fn fwrite(&mut self, bytes: &[u8]) -> RuntimeResult<u64> {
VirtualFS::with_file_mut(&self.0, |file| {
if !file.write {
return Err(
Error::new(ErrorKind::PermissionDenied, "Write permission denied").into(),
);
}
if file.pos + bytes.len() > file.data.len() {
file.data.resize(file.pos + bytes.len(), 0);
}
file.data[file.pos..file.pos + bytes.len()].copy_from_slice(bytes);
file.pos += bytes.len();
Ok(bytes.len() as _)
})
}
}
impl FileInterfaceWriteExt for VFileDescriptor {
fn fwext_truncate_to(&mut self, to: u64) -> RuntimeResult<()> {
VirtualFS::with_file_mut(&self.0, |file| {
if !file.write {
return Err(
Error::new(ErrorKind::PermissionDenied, "Write permission denied").into(),
);
}
if to as usize > file.data.len() {
file.data.resize(to as usize, 0);
} else {
file.data.truncate(to as usize);
}
if file.pos > file.data.len() {
file.pos = file.data.len();
}
Ok(())
})
}
}
impl FileInterfaceBufWrite for VFileDescriptor {
fn sync_write_cache(&mut self) -> RuntimeResult<()> {
Ok(())
}
}
impl FileInterfaceExt for VFileDescriptor {
fn fext_length(&self) -> RuntimeResult<u64> {
VirtualFS::with_file(&self.0, |f| Ok(f.data.len() as u64))
}
fn fext_cursor(&mut self) -> RuntimeResult<u64> {
VirtualFS::with_file(&self.0, |f| Ok(f.pos as u64))
}
fn fext_seek_ahead_from_start_by(&mut self, by: u64) -> RuntimeResult<()> {
VirtualFS::with_file_mut(&self.0, |file| {
if by > file.data.len() as u64 {
return Err(
Error::new(ErrorKind::InvalidInput, "Can't seek beyond file's end").into(),
);
}
file.pos = by as usize;
Ok(())
})
}
}
/// An application level implementation of `/dev/null` with some changes
pub struct NullFS;
/// A handle to a file in `/dev/null` (emulated)
pub struct NullFile;
impl FSInterface for NullFS {
type File = NullFile;
fn fs_remove_file(_: &str) -> RuntimeResult<()> {
Ok(())
}
fn fs_rename(_: &str, _: &str) -> RuntimeResult<()> {
Ok(())
}
fn fs_create_dir(_: &str) -> RuntimeResult<()> {
Ok(())
}
fn fs_create_dir_all(_: &str) -> RuntimeResult<()> {
Ok(())
}
fn fs_delete_dir(_: &str) -> RuntimeResult<()> {
Ok(())
}
fn fs_delete_dir_all(_: &str) -> RuntimeResult<()> {
Ok(())
}
fn fs_fopen_or_create_rw(_: &str) -> RuntimeResult<FileOpen<Self::File>> {
Ok(FileOpen::Created(NullFile))
}
fn fs_fopen_rw(_: &str) -> RuntimeResult<Self::File> {
Ok(NullFile)
}
fn fs_fcreate_rw(_: &str) -> RuntimeResult<Self::File> {
Ok(NullFile)
}
}
impl FileInterface for NullFile {
type BufReader = Self;
type BufWriter = Self;
fn upgrade_to_buffered_reader(self) -> RuntimeResult<Self::BufReader> {
Ok(self)
}
fn upgrade_to_buffered_writer(self) -> RuntimeResult<Self::BufWriter> {
Ok(self)
}
fn downgrade_reader(r: Self::BufReader) -> RuntimeResult<Self> {
Ok(r)
}
fn downgrade_writer(r: Self::BufWriter) -> RuntimeResult<Self> {
Ok(r)
}
}
impl FileInterfaceWrite for NullFile {
fn fwrite(&mut self, buf: &[u8]) -> RuntimeResult<u64> {
Ok(buf.len() as _)
}
}
impl FileInterfaceWriteExt for NullFile {
fn fwext_truncate_to(&mut self, _: u64) -> RuntimeResult<()> {
Ok(())
}
}
impl FileInterfaceRead for NullFile {
fn fread_exact(&mut self, _: &mut [u8]) -> RuntimeResult<()> {
Ok(())
}
}
impl FileInterfaceExt for NullFile {
fn fext_length(&self) -> RuntimeResult<u64> {
Ok(0)
}
fn fext_cursor(&mut self) -> RuntimeResult<u64> {
Ok(0)
}
fn fext_seek_ahead_from_start_by(&mut self, _: u64) -> RuntimeResult<()> {
Ok(())
}
}
impl FileInterfaceBufWrite for NullFile {
fn sync_write_cache(&mut self) -> RuntimeResult<()> {
Ok(())
}
}

@ -0,0 +1,201 @@
/*
* Created on Sun Jan 07 2024
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2024, Sayan Nandan <nandansayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use {
crate::engine::RuntimeResult,
std::io::{Error as IoError, ErrorKind as IoErrorKind},
};
#[derive(Debug, PartialEq)]
/// Result of opening a file
/// - Created: newly created file
/// - Existing: existing file that was reopened
pub enum FileOpen<CF, EF = CF> {
/// new file
Created(CF),
/// existing file
Existing(EF),
}
#[cfg(test)]
impl<CF, EF> FileOpen<CF, EF> {
pub fn into_existing(self) -> Option<EF> {
match self {
Self::Existing(e) => Some(e),
Self::Created(_) => None,
}
}
pub fn into_created(self) -> Option<CF> {
match self {
Self::Existing(_) => None,
Self::Created(c) => Some(c),
}
}
}
#[cfg(test)]
impl<CF> FileOpen<CF> {
pub fn into_inner(self) -> CF {
match self {
Self::Created(f) | Self::Existing(f) => f,
}
}
}
pub trait FSInterface {
// settings
/// set to false if the file system is a special device like `/dev/null`
const NOT_NULL: bool = true;
// types
/// the file type that is returned by this file system
type File: FileInterface;
// functions
/// Remove a file
fn fs_remove_file(fpath: &str) -> RuntimeResult<()>;
/// Rename a file
fn fs_rename(from: &str, to: &str) -> RuntimeResult<()>;
/// Create a directory
fn fs_create_dir(fpath: &str) -> RuntimeResult<()>;
/// Create a directory and all corresponding path components
fn fs_create_dir_all(fpath: &str) -> RuntimeResult<()>;
/// Delete a directory
fn fs_delete_dir(fpath: &str) -> RuntimeResult<()>;
/// Delete a directory and recursively remove all (if any) children
fn fs_delete_dir_all(fpath: &str) -> RuntimeResult<()>;
/// Open or create a file in R/W mode
///
/// This will:
/// - Create a file if it doesn't exist
/// - Open a file it it does exist
fn fs_fopen_or_create_rw(fpath: &str) -> RuntimeResult<FileOpen<Self::File>>;
/// Open an existing file
fn fs_fopen_rw(fpath: &str) -> RuntimeResult<Self::File>;
/// Create a new file
fn fs_fcreate_rw(fpath: &str) -> RuntimeResult<Self::File>;
}
/// File interface definition
pub trait FileInterface:
FileInterfaceRead + FileInterfaceWrite + FileInterfaceWriteExt + FileInterfaceExt + Sized
{
/// A buffered reader implementation
type BufReader: FileInterfaceRead + FileInterfaceExt;
/// A buffered writer implementation
type BufWriter: FileInterfaceBufWrite;
/// Get a buffered reader for this file
fn upgrade_to_buffered_reader(self) -> RuntimeResult<Self::BufReader>;
/// Get a buffered writer for this file
fn upgrade_to_buffered_writer(self) -> RuntimeResult<Self::BufWriter>;
/// Get the file back from the buffered reader
fn downgrade_reader(r: Self::BufReader) -> RuntimeResult<Self>;
/// Get the file back from the buffered writer
fn downgrade_writer(r: Self::BufWriter) -> RuntimeResult<Self>;
}
pub trait FileInterfaceBufWrite: FileInterfaceWrite + FileInterfaceExt {
fn sync_write_cache(&mut self) -> RuntimeResult<()>;
}
/// Readable object
pub trait FileInterfaceRead {
/// Read in a block of the exact given length
fn fread_exact_block<const N: usize>(&mut self) -> RuntimeResult<[u8; N]> {
let mut ret = [0u8; N];
self.fread_exact(&mut ret)?;
Ok(ret)
}
/// Read in `n` bytes to fill the given buffer
fn fread_exact(&mut self, buf: &mut [u8]) -> RuntimeResult<()>;
}
/// Writable object
pub trait FileInterfaceWrite {
/// Attempt to write the buffer into this object, returning the number of bytes that were
/// written. It is **NOT GUARANTEED THAT ALL DATA WILL BE WRITTEN**
fn fwrite(&mut self, buf: &[u8]) -> RuntimeResult<u64>;
/// Attempt to write the entire buffer into this object, returning the number of bytes written
///
/// It is guaranteed that if the [`Result`] returned is [`Ok(())`], then the entire buffer was
/// written to disk.
fn fwrite_all_count(&mut self, buf: &[u8]) -> (u64, RuntimeResult<()>) {
let len = buf.len() as u64;
let mut written = 0;
while written != len {
match self.fwrite(buf) {
Ok(0) => {
return (
written,
Err(IoError::new(
IoErrorKind::WriteZero,
format!("could only write {} of {} bytes", written, buf.len()),
)
.into()),
)
}
Ok(n) => written += n,
Err(e) => return (written, Err(e)),
}
}
(written, Ok(()))
}
/// Attempt to write the entire buffer into this object
///
/// If this return [`Ok(())`] then it is guaranteed that all bytes have been written
fn fw_write_all(&mut self, buf: &[u8]) -> RuntimeResult<()> {
self.fwrite_all_count(buf).1
}
}
/// Advanced write traits
pub trait FileInterfaceWriteExt {
/// Sync data and metadata for this file
fn fwext_sync_all(&mut self) -> RuntimeResult<()> {
Ok(())
}
/// Sync data for this file
fn fwext_sync_data(&mut self) -> RuntimeResult<()> {
Ok(())
}
/// Sync meta for this file
fn fwext_sync_meta(&mut self) -> RuntimeResult<()> {
Ok(())
}
/// Truncate the size of the file to the given size
///
/// - If `to` > actual file length: the file is zero padded to fill `to - len`
/// - If `to` < actual file length: the file is trimmed to the size `to`
fn fwext_truncate_to(&mut self, to: u64) -> RuntimeResult<()>;
}
/// Advanced file access
pub trait FileInterfaceExt {
/// Returns the length of the file
fn fext_length(&self) -> RuntimeResult<u64>;
/// Returns the current cursor position of the file
fn fext_cursor(&mut self) -> RuntimeResult<u64>;
/// Seek by `from` bytes from the start of the file
fn fext_seek_ahead_from_start_by(&mut self, by: u64) -> RuntimeResult<()>;
}

@ -0,0 +1,36 @@
/*
* Created on Sun Jan 07 2024
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2024, Sayan Nandan <nandansayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
//! # FS abstractions
//!
//! This module defines abstractions over file systems (whether physical or virtual) and provides
//! traits that provide an unified API for all file systems irrespective of their base impl
//!
pub mod fs_imp;
#[cfg(test)]
pub mod fs_test;
pub mod fs_traits;

@ -0,0 +1,31 @@
/*
* Created on Tue Jan 09 2024
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2024, Sayan Nandan <nandansayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
pub mod checksum;
pub mod interface;
pub mod sdss;
pub mod static_meta;
pub mod versions;

@ -0,0 +1,28 @@
/*
* Created on Fri Jan 12 2024
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2024, Sayan Nandan <nandansayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
mod spec;
pub use spec::{FileSpecV1, HeaderV1, HeaderV1Enumeration, HeaderV1Spec, SimpleFileSpecV1};

@ -0,0 +1,485 @@
/*
* Created on Wed Jan 10 2024
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2024, Sayan Nandan <nandansayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
/*!
# SDSS spec
This module provides traits and types to deal with the SDSS spec, especially headers.
The static SDSS header block has a special segment that defines the header version which is static and will
never change across any versions. While the same isn't warranted for the rest of the header, it's exceedingly
unlikely that we'll ever change the static block.
The only header that we currently use is [`HeaderV1`].
*/
use {
super::super::{
interface::fs_traits::{FileInterfaceRead, FileInterfaceWrite},
static_meta::{HostArch, HostEndian, HostOS, HostPointerWidth, SDSS_MAGIC_8B},
versions::{self, DriverVersion, FileSpecifierVersion, HeaderVersion, ServerVersion},
},
crate::{
engine::{error::StorageError, mem::memcpy, RuntimeResult},
util::os,
},
std::{
mem::{transmute, ManuallyDrop},
ops::Range,
},
};
/*
header utils
*/
pub trait HeaderV1Enumeration {
/// the maximum value of this enumeration
const MAX: u8;
/// Create a new enumeration, given that the maximum is validated
unsafe fn new(x: u8) -> Self;
/// Return the 1B repr of the enumeration
fn repr_u8(&self) -> u8;
}
/// A trait that enables customizing the SDSS header for a specific version tuple
pub trait HeaderV1Spec {
// types
/// The file class type
type FileClass: HeaderV1Enumeration + Copy + PartialEq;
/// The file specifier type
type FileSpecifier: HeaderV1Enumeration + Copy + PartialEq;
// constants
/// The server version to use during encode
///
/// NB: This is NOT the compatible version but rather the current version
const CURRENT_SERVER_VERSION: ServerVersion;
/// The driver version to use during encode
///
/// NB: This is NOT the compatible version but rather the current version
const CURRENT_DRIVER_VERSION: DriverVersion;
/// The file class to use and verify at encode/decode time
/// check server version compatibility is valid at decode time
fn check_if_server_version_compatible(v: ServerVersion) -> bool {
v == Self::CURRENT_SERVER_VERSION
}
/// check driver version compatibility is valid at decode time
fn check_if_driver_version_compatible(v: DriverVersion) -> bool {
v == Self::CURRENT_DRIVER_VERSION
}
}
/*
Compact SDSS Header v1
---
- 1: Magic block (16B): magic + header version
- 2: Static block (40B):
- 2.1: Genesis static record (24B)
- 2.1.1: Software information (16B)
- Server version (8B)
- Driver version (8B)
- 2.1.2: Host information (4B):
- OS (1B)
- Arch (1B)
- Pointer width (1B)
- Endian (1B)
- 2.1.3: File information (4B):
- File class (1B)
- File specifier (1B)
- File specifier version (2B)
- 2.2: Genesis runtime record (16B)
- Host epoch (16B)
- 3: Padding block (8B)
*/
#[repr(align(8))]
#[derive(Debug, PartialEq)]
pub struct HeaderV1<H: HeaderV1Spec> {
// 1 magic block
magic_header_version: HeaderVersion,
// 2.1.1
genesis_static_sw_server_version: ServerVersion,
genesis_static_sw_driver_version: DriverVersion,
// 2.1.2
genesis_static_host_os: HostOS,
genesis_static_host_arch: HostArch,
genesis_static_host_ptr_width: HostPointerWidth,
genesis_static_host_endian: HostEndian,
// 2.1.3
genesis_static_file_class: H::FileClass,
genesis_static_file_specifier: H::FileSpecifier,
genesis_static_file_specifier_version: FileSpecifierVersion,
// 2.2
genesis_runtime_epoch_time: u128,
// 3
genesis_padding_block: [u8; 8],
}
impl<H: HeaderV1Spec> HeaderV1<H> {
const SEG1_MAGIC: Range<usize> = 0..8;
const SEG1_HEADER_VERSION: Range<usize> = 8..16;
const SEG2_REC1_SERVER_VERSION: Range<usize> = 16..24;
const SEG2_REC1_DRIVER_VERSION: Range<usize> = 24..32;
const SEG2_REC1_HOST_OS: usize = 32;
const SEG2_REC1_HOST_ARCH: usize = 33;
const SEG2_REC1_HOST_PTR_WIDTH: usize = 34;
const SEG2_REC1_HOST_ENDIAN: usize = 35;
const SEG2_REC1_FILE_CLASS: usize = 36;
const SEG2_REC1_FILE_SPECIFIER: usize = 37;
const SEG2_REC1_FILE_SPECIFIER_VERSION: Range<usize> = 38..40;
const SEG2_REC2_RUNTIME_EPOCH_TIME: Range<usize> = 40..56;
const SEG3_PADDING_BLK: Range<usize> = 56..64;
pub const SIZE: usize = 64;
#[inline(always)]
fn _new_auto(
file_class: H::FileClass,
file_specifier: H::FileSpecifier,
file_specifier_version: FileSpecifierVersion,
epoch_time: u128,
genesis_padding_block: [u8; 8],
) -> Self {
Self::_new(
versions::HEADER_V1,
H::CURRENT_SERVER_VERSION,
H::CURRENT_DRIVER_VERSION,
HostOS::new(),
HostArch::new(),
HostPointerWidth::new(),
HostEndian::new(),
file_class,
file_specifier,
file_specifier_version,
epoch_time,
genesis_padding_block,
)
}
#[inline(always)]
fn _new(
magic_header_version: HeaderVersion,
genesis_static_sw_server_version: ServerVersion,
genesis_static_sw_driver_version: DriverVersion,
genesis_static_host_os: HostOS,
genesis_static_host_arch: HostArch,
genesis_static_host_ptr_width: HostPointerWidth,
genesis_static_host_endian: HostEndian,
genesis_static_file_class: H::FileClass,
genesis_static_file_specifier: H::FileSpecifier,
genesis_static_file_specifier_version: FileSpecifierVersion,
genesis_runtime_epoch_time: u128,
genesis_padding_block: [u8; 8],
) -> Self {
Self {
magic_header_version,
genesis_static_sw_server_version,
genesis_static_sw_driver_version,
genesis_static_host_os,
genesis_static_host_arch,
genesis_static_host_ptr_width,
genesis_static_host_endian,
genesis_static_file_class,
genesis_static_file_specifier,
genesis_static_file_specifier_version,
genesis_runtime_epoch_time,
genesis_padding_block,
}
}
fn _encode_auto_raw(
file_class: H::FileClass,
file_specifier: H::FileSpecifier,
file_specifier_version: FileSpecifierVersion,
epoch_time: u128,
padding_block: [u8; 8],
) -> [u8; 64] {
let mut ret = [0; 64];
// 1. mgblk
ret[Self::SEG1_MAGIC].copy_from_slice(&SDSS_MAGIC_8B.to_le_bytes());
ret[Self::SEG1_HEADER_VERSION]
.copy_from_slice(&versions::v1::V1_HEADER_VERSION.little_endian_u64());
// 2.1.1
ret[Self::SEG2_REC1_SERVER_VERSION]
.copy_from_slice(&H::CURRENT_SERVER_VERSION.little_endian());
ret[Self::SEG2_REC1_DRIVER_VERSION]
.copy_from_slice(&H::CURRENT_DRIVER_VERSION.little_endian());
// 2.1.2
ret[Self::SEG2_REC1_HOST_OS] = HostOS::new().value_u8();
ret[Self::SEG2_REC1_HOST_ARCH] = HostArch::new().value_u8();
ret[Self::SEG2_REC1_HOST_PTR_WIDTH] = HostPointerWidth::new().value_u8();
ret[Self::SEG2_REC1_HOST_ENDIAN] = HostEndian::new().value_u8();
// 2.1.3
ret[Self::SEG2_REC1_FILE_CLASS] = file_class.repr_u8();
ret[Self::SEG2_REC1_FILE_SPECIFIER] = file_specifier.repr_u8();
ret[Self::SEG2_REC1_FILE_SPECIFIER_VERSION]
.copy_from_slice(&file_specifier_version.little_endian());
// 2.2
ret[Self::SEG2_REC2_RUNTIME_EPOCH_TIME].copy_from_slice(&epoch_time.to_le_bytes());
// 3
ret[Self::SEG3_PADDING_BLK].copy_from_slice(&padding_block);
ret
}
fn encode_return(
file_class: H::FileClass,
file_specifier: H::FileSpecifier,
file_specifier_version: FileSpecifierVersion,
) -> (Self, [u8; 64]) {
let epoch_time = os::get_epoch_time();
let encoded = Self::_encode_auto_raw(
file_class,
file_specifier,
file_specifier_version,
epoch_time,
[0; 8],
);
let me = Self::_new_auto(
file_class,
file_specifier,
file_specifier_version,
epoch_time,
[0; 8],
);
(me, encoded)
}
/// Decode and validate the full header block (validate ONLY; you must verify yourself)
///
/// Notes:
/// - Time might be inconsistent; verify
/// - Compatibility requires additional intervention
/// - If padding block was not zeroed, handle
/// - No file metadata is verified. Check!
///
pub fn decode(block: [u8; 64]) -> Result<Self, StorageError> {
var!(let raw_magic, raw_header_version, raw_server_version, raw_driver_version, raw_host_os, raw_host_arch,
raw_host_ptr_width, raw_host_endian, raw_file_class, raw_file_specifier, raw_file_specifier_version,
raw_runtime_epoch_time, raw_paddding_block,
);
macro_rules! u64 {
($pos:expr) => {
u64::from_le_bytes(memcpy(&block[$pos]))
};
}
unsafe {
// UNSAFE(@ohsayan): all segments are correctly accessed (aligned to u8)
raw_magic = u64!(Self::SEG1_MAGIC);
raw_header_version = HeaderVersion::__new(u64!(Self::SEG1_HEADER_VERSION));
raw_server_version = ServerVersion::__new(u64!(Self::SEG2_REC1_SERVER_VERSION));
raw_driver_version = DriverVersion::__new(u64!(Self::SEG2_REC1_DRIVER_VERSION));
raw_host_os = block[Self::SEG2_REC1_HOST_OS];
raw_host_arch = block[Self::SEG2_REC1_HOST_ARCH];
raw_host_ptr_width = block[Self::SEG2_REC1_HOST_PTR_WIDTH];
raw_host_endian = block[Self::SEG2_REC1_HOST_ENDIAN];
raw_file_class = block[Self::SEG2_REC1_FILE_CLASS];
raw_file_specifier = block[Self::SEG2_REC1_FILE_SPECIFIER];
raw_file_specifier_version = FileSpecifierVersion::__new(u16::from_le_bytes(memcpy(
&block[Self::SEG2_REC1_FILE_SPECIFIER_VERSION],
)));
raw_runtime_epoch_time =
u128::from_le_bytes(memcpy(&block[Self::SEG2_REC2_RUNTIME_EPOCH_TIME]));
raw_paddding_block = memcpy::<8>(&block[Self::SEG3_PADDING_BLK]);
}
let okay_header_version = raw_header_version == versions::HEADER_V1;
let okay_server_version = H::check_if_server_version_compatible(raw_server_version);
let okay_driver_version = H::check_if_driver_version_compatible(raw_driver_version);
let okay = okay!(
// 1.1 mgblk
raw_magic == SDSS_MAGIC_8B,
okay_header_version,
// 2.1.1
okay_server_version,
okay_driver_version,
// 2.1.2
raw_host_os <= HostOS::MAX,
raw_host_arch <= HostArch::MAX,
raw_host_ptr_width <= HostPointerWidth::MAX,
raw_host_endian <= HostEndian::MAX,
// 2.1.3
raw_file_class <= H::FileClass::MAX,
raw_file_specifier <= H::FileSpecifier::MAX,
);
if okay {
Ok(unsafe {
// UNSAFE(@ohsayan): the block ranges are very well defined
Self::_new(
// 1.1
raw_header_version,
// 2.1.1
raw_server_version,
raw_driver_version,
// 2.1.2
transmute(raw_host_os),
transmute(raw_host_arch),
transmute(raw_host_ptr_width),
transmute(raw_host_endian),
// 2.1.3
H::FileClass::new(raw_file_class),
H::FileSpecifier::new(raw_file_specifier),
raw_file_specifier_version,
// 2.2
raw_runtime_epoch_time,
// 3
raw_paddding_block,
)
})
} else {
let version_okay = okay_header_version & okay_server_version & okay_driver_version;
let md = ManuallyDrop::new([
StorageError::HeaderDecodeCorruptedHeader,
StorageError::HeaderDecodeVersionMismatch,
]);
Err(unsafe {
// UNSAFE(@ohsayan): while not needed, md for drop safety + correct index
md.as_ptr().add(!version_okay as usize).read().into()
})
}
}
}
#[allow(unused)]
impl<H: HeaderV1Spec> HeaderV1<H> {
pub fn header_version(&self) -> HeaderVersion {
self.magic_header_version
}
pub fn server_version(&self) -> ServerVersion {
self.genesis_static_sw_server_version
}
pub fn driver_version(&self) -> DriverVersion {
self.genesis_static_sw_driver_version
}
pub fn host_os(&self) -> HostOS {
self.genesis_static_host_os
}
pub fn host_arch(&self) -> HostArch {
self.genesis_static_host_arch
}
pub fn host_ptr_width(&self) -> HostPointerWidth {
self.genesis_static_host_ptr_width
}
pub fn host_endian(&self) -> HostEndian {
self.genesis_static_host_endian
}
pub fn file_class(&self) -> H::FileClass {
self.genesis_static_file_class
}
pub fn file_specifier(&self) -> H::FileSpecifier {
self.genesis_static_file_specifier
}
pub fn file_specifier_version(&self) -> FileSpecifierVersion {
self.genesis_static_file_specifier_version
}
pub fn epoch_time(&self) -> u128 {
self.genesis_runtime_epoch_time
}
pub fn padding_block(&self) -> [u8; 8] {
self.genesis_padding_block
}
}
pub trait FileSpecV1 {
type Metadata;
/// the header type
type HeaderSpec: HeaderV1Spec;
/// the args need to validate the metadata (for example, additional context)
type EncodeArgs;
type DecodeArgs;
/// validate the metadata
fn validate_metadata(
md: HeaderV1<Self::HeaderSpec>,
v_args: Self::DecodeArgs,
) -> RuntimeResult<Self::Metadata>;
/// read and validate metadata (only override if you need to)
fn read_metadata(
f: &mut impl FileInterfaceRead,
v_args: Self::DecodeArgs,
) -> RuntimeResult<Self::Metadata> {
let md = HeaderV1::decode(f.fread_exact_block()?)?;
Self::validate_metadata(md, v_args)
}
/// write metadata
fn write_metadata(
f: &mut impl FileInterfaceWrite,
args: Self::EncodeArgs,
) -> RuntimeResult<Self::Metadata>;
}
/// # Simple SDSS file specification (v1)
///
/// ## Decode and verify
/// A simple SDSS file specification that checks if:
/// - the file class,
/// - file specifier and
/// - file specifier revision
///
/// match
///
/// ## Version Compatibility
///
/// Also, the [`HeaderV1Spec`] is supposed to address compatibility across server and driver versions
pub trait SimpleFileSpecV1 {
type HeaderSpec: HeaderV1Spec;
const FILE_CLASS: <Self::HeaderSpec as HeaderV1Spec>::FileClass;
const FILE_SPECIFIER: <Self::HeaderSpec as HeaderV1Spec>::FileSpecifier;
const FILE_SPECFIER_VERSION: FileSpecifierVersion;
fn check_if_file_specifier_revision_is_compatible(
v: FileSpecifierVersion,
) -> RuntimeResult<()> {
if v == Self::FILE_SPECFIER_VERSION {
Ok(())
} else {
Err(StorageError::HeaderDecodeVersionMismatch.into())
}
}
}
impl<Sfs: SimpleFileSpecV1> FileSpecV1 for Sfs {
type Metadata = HeaderV1<Self::HeaderSpec>;
type HeaderSpec = <Self as SimpleFileSpecV1>::HeaderSpec;
type DecodeArgs = ();
type EncodeArgs = ();
fn validate_metadata(
md: HeaderV1<Self::HeaderSpec>,
_: Self::DecodeArgs,
) -> RuntimeResult<Self::Metadata> {
let okay = okay!(
md.file_class() == Self::FILE_CLASS,
md.file_specifier() == Self::FILE_SPECIFIER,
);
Self::check_if_file_specifier_revision_is_compatible(md.file_specifier_version())?;
if okay {
Ok(md)
} else {
Err(StorageError::HeaderDecodeVersionMismatch.into())
}
}
fn write_metadata(
f: &mut impl FileInterfaceWrite,
_: Self::EncodeArgs,
) -> RuntimeResult<Self::Metadata> {
let (md, block) = HeaderV1::<Self::HeaderSpec>::encode_return(
Self::FILE_CLASS,
Self::FILE_SPECIFIER,
Self::FILE_SPECFIER_VERSION,
);
f.fw_write_all(&block).map(|_| md)
}
}

@ -0,0 +1,160 @@
/*
* Created on Tue Jan 09 2024
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2024, Sayan Nandan <nandansayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
//! # Static metadata
//!
//! Compile-time metadata used by storage engine implementations
//!
/// The 8B SDSS magic block
pub const SDSS_MAGIC_8B: u64 = 0x4F48534159414E21;
#[repr(u8)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, sky_macros::EnumMethods)]
/// Host architecture enumeration for common platforms
pub enum HostArch {
X86 = 0,
X86_64 = 1,
ARM = 2,
ARM64 = 3,
MIPS = 4,
PowerPC = 5,
}
impl HostArch {
pub const fn new() -> Self {
if cfg!(target_arch = "x86") {
HostArch::X86
} else if cfg!(target_arch = "x86_64") {
HostArch::X86_64
} else if cfg!(target_arch = "arm") {
HostArch::ARM
} else if cfg!(target_arch = "aarch64") {
HostArch::ARM64
} else if cfg!(target_arch = "mips") {
HostArch::MIPS
} else if cfg!(target_arch = "powerpc") {
HostArch::PowerPC
} else {
panic!("Unsupported target architecture")
}
}
}
#[repr(u8)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, sky_macros::EnumMethods)]
/// Host OS enumeration for common operating systems
pub enum HostOS {
// T1
Linux = 0,
Windows = 1,
MacOS = 2,
// T2
Android = 3,
AppleiOS = 4,
FreeBSD = 5,
OpenBSD = 6,
NetBSD = 7,
WASI = 8,
Emscripten = 9,
// T3
Solaris = 10,
Fuchsia = 11,
Redox = 12,
DragonFly = 13,
}
impl HostOS {
pub const fn new() -> Self {
if cfg!(target_os = "linux") {
HostOS::Linux
} else if cfg!(target_os = "windows") {
HostOS::Windows
} else if cfg!(target_os = "macos") {
HostOS::MacOS
} else if cfg!(target_os = "android") {
HostOS::Android
} else if cfg!(target_os = "ios") {
HostOS::AppleiOS
} else if cfg!(target_os = "freebsd") {
HostOS::FreeBSD
} else if cfg!(target_os = "openbsd") {
HostOS::OpenBSD
} else if cfg!(target_os = "netbsd") {
HostOS::NetBSD
} else if cfg!(target_os = "dragonfly") {
HostOS::DragonFly
} else if cfg!(target_os = "redox") {
HostOS::Redox
} else if cfg!(target_os = "fuchsia") {
HostOS::Fuchsia
} else if cfg!(target_os = "solaris") {
HostOS::Solaris
} else if cfg!(target_os = "emscripten") {
HostOS::Emscripten
} else if cfg!(target_os = "wasi") {
HostOS::WASI
} else {
panic!("unknown os")
}
}
}
#[repr(u8)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, sky_macros::EnumMethods)]
/// Host endian enumeration
pub enum HostEndian {
Big = 0,
Little = 1,
}
impl HostEndian {
pub const fn new() -> Self {
if cfg!(target_endian = "little") {
Self::Little
} else {
Self::Big
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, sky_macros::EnumMethods)]
#[repr(u8)]
/// Host pointer width enumeration
pub enum HostPointerWidth {
P32 = 0,
P64 = 1,
}
impl HostPointerWidth {
pub const fn new() -> Self {
match sizeof!(usize) {
4 => Self::P32,
8 => Self::P64,
_ => panic!("unknown pointer width"),
}
}
}

@ -0,0 +1,115 @@
/*
* Created on Mon May 15 2023
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2023, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
//! # Versioning
//!
//! Storage engine versioning utility
//!
pub mod server_version;
pub const HEADER_V1: HeaderVersion = HeaderVersion(0);
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Hash)]
/// The header version
///
/// The header version is part of the static record and *barely* changes (almost like once in a light year)
pub struct HeaderVersion(u64);
impl HeaderVersion {
pub const fn __new(v: u64) -> Self {
Self(v)
}
pub const fn little_endian_u64(&self) -> [u8; 8] {
self.0.to_le_bytes()
}
}
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Hash)]
/// The server version (based on tag index)
pub struct ServerVersion(u64);
impl ServerVersion {
pub const fn __new(v: u64) -> Self {
Self(v)
}
pub const fn little_endian(&self) -> [u8; 8] {
self.0.to_le_bytes()
}
}
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Hash)]
/// The driver version
pub struct DriverVersion(u64);
impl DriverVersion {
pub const fn __new(v: u64) -> Self {
Self(v)
}
pub const fn little_endian(&self) -> [u8; 8] {
self.0.to_le_bytes()
}
}
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Hash)]
/// The file specifier version
pub struct FileSpecifierVersion(u16);
impl FileSpecifierVersion {
pub const fn __new(v: u16) -> Self {
Self(v)
}
pub const fn little_endian(&self) -> [u8; 2] {
self.0.to_le_bytes()
}
}
pub mod v1 {
//! The first SDSS based storage engine implementation.
//! Target tag: 0.8.0 {beta.1, beta.2, beta.3}
use super::{DriverVersion, HeaderVersion, ServerVersion};
/// The SDSS header version UID
pub const V1_HEADER_VERSION: HeaderVersion = HeaderVersion(0);
/// The server version UID
pub const V1_SERVER_VERSION: ServerVersion =
ServerVersion(super::server_version::fetch_id("v0.8.0") as _);
/// The driver version UID
pub const V1_DRIVER_VERSION: DriverVersion = DriverVersion(0);
}
#[allow(unused)]
pub mod v2 {
//! The second SDSS based storage implementation
//!
//! Target tag: 0.8.0 (GA)
//!
//! Same tags as [`super::v1`] but different [`DriverVersion`]
use super::{DriverVersion, HeaderVersion, ServerVersion};
pub const V2_HEADER_VERSION: HeaderVersion = super::v1::V1_HEADER_VERSION;
pub const V2_SERVER_VERSION: ServerVersion = super::v1::V1_SERVER_VERSION;
pub const V2_DRIVER_VERSION: DriverVersion = DriverVersion(1);
}

@ -0,0 +1,103 @@
/*
* Created on Wed May 17 2023
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2023, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
const VERSION_TAGS: [&str; 52] = [
"v0.1.0",
"v0.2.0",
"v0.3.0",
"v0.3.1",
"v0.3.2",
"v0.4.0-alpha.1",
"v0.4.0-alpha.2",
"v0.4.0",
"v0.4.1-alpha.1",
"v0.4.1",
"v0.4.2-alpha.1",
"v0.4.2",
"v0.4.3-alpha.1",
"v0.4.3",
"v0.4.4",
"v0.4.5-alpha.1",
"v0.4.5-alpha.2",
"v0.4.5",
"v0.5.0-alpha.1",
"v0.5.0-alpha.2",
"v0.5.0",
"v0.5.1-alpha.1",
"v0.5.1",
"v0.5.2",
"v0.5.3",
"v0.6.0",
"v0.6.1",
"v0.6.2-testrelease.1",
"v0.6.2",
"v0.6.3-alpha.1",
"v0.6.3",
"v0.6.4-alpha.1",
"v0.6.4",
"v0.7.0-RC.1",
"v0.7.0-alpha.1",
"v0.7.0-alpha.2",
"v0.7.0-beta.1",
"v0.7.0",
"v0.7.1-alpha.1",
"v0.7.1",
"v0.7.2-alpha.1",
"v0.7.2",
"v0.7.3-alpha.1",
"v0.7.3-alpha.2",
"v0.7.3-alpha.3",
"v0.7.3",
"v0.7.4",
"v0.7.5",
"v0.7.6",
"v0.7.7",
"v0.8.0-alpha.1",
"v0.8.0",
];
const VERSION_TAGS_LEN: usize = VERSION_TAGS.len();
pub const fn fetch_id(id: &str) -> usize {
// this is ct, so a O(n) doesn't matter
let mut i = 0;
while i < VERSION_TAGS_LEN {
let bytes = VERSION_TAGS[i].as_bytes();
let given = id.as_bytes();
let mut j = 0;
let mut eq = true;
while (j < bytes.len()) & (bytes.len() == given.len()) {
if bytes[i] != given[i] {
eq = false;
break;
}
j += 1;
}
if eq {
return i;
}
i += 1;
}
panic!("version not found")
}

@ -26,9 +26,13 @@
//! Implementations of the Skytable Disk Storage Subsystem (SDSS)
mod checksum;
mod versions;
// impls
mod common;
// driver versions
pub mod v1;
pub mod v2;
pub use checksum::SCrc;
pub mod safe_interfaces {
#[cfg(test)]
pub use super::common::interface::fs_test::{NullFS, VirtualFS};
pub use super::common::interface::{fs_imp::LocalFS, fs_traits::FSInterface};
}

@ -43,12 +43,15 @@ pub(super) use restore::{DecodedBatchEvent, DecodedBatchEventKind, NormalBatch};
pub use {persist::DataBatchPersistDriver, restore::DataBatchRestoreDriver};
use {
super::{rw::SDSSFileIO, spec, RawFSInterface},
crate::engine::{core::model::Model, error::RuntimeResult},
super::{rw::SDSSFileIO, spec},
crate::engine::{
core::model::Model, error::RuntimeResult,
storage::common::interface::fs_traits::FSInterface,
},
};
/// Re-initialize an existing batch journal and read all its data into model
pub fn reinit<Fs: RawFSInterface>(
pub fn reinit<Fs: FSInterface>(
name: &str,
model: &Model,
) -> RuntimeResult<DataBatchPersistDriver<Fs>> {
@ -60,7 +63,7 @@ pub fn reinit<Fs: RawFSInterface>(
}
/// Create a new batch journal
pub fn create<Fs: RawFSInterface>(path: &str) -> RuntimeResult<DataBatchPersistDriver<Fs>> {
pub fn create<Fs: FSInterface>(path: &str) -> RuntimeResult<DataBatchPersistDriver<Fs>> {
let f = SDSSFileIO::<Fs>::create::<spec::DataBatchJournalV1>(path)?;
DataBatchPersistDriver::new(f, true)
}

@ -46,28 +46,31 @@ use {
},
error::{RuntimeResult, StorageError},
idx::STIndexSeq,
storage::v1::rw::{RawFSInterface, SDSSFileIO, SDSSFileTrackedWriter},
storage::{
common::interface::fs_traits::FSInterface,
v1::rw::{SDSSFileIO, TrackedWriter},
},
},
util::EndianQW,
},
crossbeam_epoch::pin,
};
pub struct DataBatchPersistDriver<Fs: RawFSInterface> {
f: SDSSFileTrackedWriter<Fs>,
pub struct DataBatchPersistDriver<Fs: FSInterface> {
f: TrackedWriter<Fs>,
}
impl<Fs: RawFSInterface> DataBatchPersistDriver<Fs> {
impl<Fs: FSInterface> DataBatchPersistDriver<Fs> {
pub fn new(mut file: SDSSFileIO<Fs>, is_new: bool) -> RuntimeResult<Self> {
if !is_new {
file.fsynced_write(&[MARKER_BATCH_REOPEN])?;
}
Ok(Self {
f: SDSSFileTrackedWriter::new(file)?,
f: TrackedWriter::new(file)?,
})
}
pub fn close(self) -> RuntimeResult<()> {
let mut slf = self.f.into_inner_file()?;
let mut slf = self.f.sync_into_inner()?;
if slf.fsynced_write(&[MARKER_BATCH_CLOSED]).is_ok() {
return Ok(());
} else {
@ -151,12 +154,12 @@ impl<Fs: RawFSInterface> DataBatchPersistDriver<Fs> {
col_cnt: usize,
) -> RuntimeResult<()> {
self.f
.tracked_write_unfsynced(&[MARKER_ACTUAL_BATCH_EVENT, pk_tag.value_u8()])?;
.tracked_write(&[MARKER_ACTUAL_BATCH_EVENT, pk_tag.value_u8()])?;
let observed_len_bytes = observed_len.u64_bytes_le();
self.f.tracked_write_unfsynced(&observed_len_bytes)?;
self.f.tracked_write(&observed_len_bytes)?;
self.f
.tracked_write_unfsynced(&schema_version.value_u64().to_le_bytes())?;
self.f.tracked_write_unfsynced(&col_cnt.u64_bytes_le())?;
.tracked_write(&schema_version.value_u64().to_le_bytes())?;
self.f.tracked_write(&col_cnt.u64_bytes_le())?;
Ok(())
}
/// Append a summary of this batch and most importantly, **sync everything to disk**
@ -166,9 +169,9 @@ impl<Fs: RawFSInterface> DataBatchPersistDriver<Fs> {
inconsistent_reads: usize,
) -> RuntimeResult<()> {
// [0xFD][actual_commit][checksum]
self.f.tracked_write_unfsynced(&[MARKER_END_OF_BATCH])?;
self.f.tracked_write(&[MARKER_END_OF_BATCH])?;
let actual_commit = (observed_len - inconsistent_reads).u64_bytes_le();
self.f.tracked_write_unfsynced(&actual_commit)?;
self.f.tracked_write(&actual_commit)?;
let cs = self.f.reset_and_finish_checksum().to_le_bytes();
self.f.untracked_write(&cs)?;
// IMPORTANT: now that all data has been written, we need to actually ensure that the writes pass through the cache
@ -189,7 +192,7 @@ impl<Fs: RawFSInterface> DataBatchPersistDriver<Fs> {
}
}
impl<Fs: RawFSInterface> DataBatchPersistDriver<Fs> {
impl<Fs: FSInterface> DataBatchPersistDriver<Fs> {
/// encode the primary key only. this means NO TAG is encoded.
fn encode_pk_only(&mut self, pk: &PrimaryIndexKey) -> RuntimeResult<()> {
let buf = &mut self.f;
@ -200,7 +203,7 @@ impl<Fs: RawFSInterface> DataBatchPersistDriver<Fs> {
pk.read_uint()
}
.to_le_bytes();
buf.tracked_write_unfsynced(&data)?;
buf.tracked_write(&data)?;
}
TagUnique::Str | TagUnique::Bin => {
let slice = unsafe {
@ -208,8 +211,8 @@ impl<Fs: RawFSInterface> DataBatchPersistDriver<Fs> {
pk.read_bin()
};
let slice_l = slice.len().u64_bytes_le();
buf.tracked_write_unfsynced(&slice_l)?;
buf.tracked_write_unfsynced(slice)?;
buf.tracked_write(&slice_l)?;
buf.tracked_write(slice)?;
}
TagUnique::Illegal => unsafe {
// UNSAFE(@ohsayan): a pk can't be constructed with illegal
@ -222,7 +225,7 @@ impl<Fs: RawFSInterface> DataBatchPersistDriver<Fs> {
fn encode_cell(&mut self, value: &Datacell) -> RuntimeResult<()> {
let mut buf = vec![];
cell::encode(&mut buf, value);
self.f.tracked_write_unfsynced(&buf)?;
self.f.tracked_write(&buf)?;
Ok(())
}
/// Encode row data
@ -233,7 +236,7 @@ impl<Fs: RawFSInterface> DataBatchPersistDriver<Fs> {
self.encode_cell(cell)?;
}
None if field_name.as_str() == model.p_key() => {}
None => self.f.tracked_write_unfsynced(&[0])?,
None => self.f.tracked_write(&[0])?,
}
}
Ok(())
@ -241,9 +244,9 @@ impl<Fs: RawFSInterface> DataBatchPersistDriver<Fs> {
/// Write the change type and txnid
fn write_batch_item_common_row_data(&mut self, delta: &DataDelta) -> RuntimeResult<()> {
let change_type = [delta.change().value_u8()];
self.f.tracked_write_unfsynced(&change_type)?;
self.f.tracked_write(&change_type)?;
let txn_id = delta.data_version().value_u64().to_le_bytes();
self.f.tracked_write_unfsynced(&txn_id)?;
self.f.tracked_write(&txn_id)?;
Ok(())
}
}

@ -42,7 +42,10 @@ use {
data::{cell::Datacell, tag::TagUnique},
error::{RuntimeResult, StorageError},
idx::{MTIndex, STIndex, STIndexSeq},
storage::v1::rw::{RawFSInterface, SDSSFileIO, SDSSFileTrackedReader},
storage::{
common::interface::fs_traits::FSInterface,
v1::rw::{SDSSFileIO, TrackedReader},
},
},
std::{
collections::{hash_map::Entry as HMEntry, HashMap},
@ -103,14 +106,14 @@ enum Batch {
BatchClosed,
}
pub struct DataBatchRestoreDriver<F: RawFSInterface> {
f: SDSSFileTrackedReader<F>,
pub struct DataBatchRestoreDriver<F: FSInterface> {
f: TrackedReader<F>,
}
impl<F: RawFSInterface> DataBatchRestoreDriver<F> {
impl<F: FSInterface> DataBatchRestoreDriver<F> {
pub fn new(f: SDSSFileIO<F>) -> RuntimeResult<Self> {
Ok(Self {
f: SDSSFileTrackedReader::new(f)?,
f: TrackedReader::new(f)?,
})
}
pub fn into_file(self) -> RuntimeResult<SDSSFileIO<F>> {
@ -138,7 +141,7 @@ impl<F: RawFSInterface> DataBatchRestoreDriver<F> {
}
}
impl<F: RawFSInterface> DataBatchRestoreDriver<F> {
impl<F: FSInterface> DataBatchRestoreDriver<F> {
fn read_all_batches_and_for_each(
&mut self,
mut f: impl FnMut(NormalBatch) -> RuntimeResult<()>,
@ -206,7 +209,7 @@ impl<F: RawFSInterface> DataBatchRestoreDriver<F> {
}
}
impl<F: RawFSInterface> DataBatchRestoreDriver<F> {
impl<F: FSInterface> DataBatchRestoreDriver<F> {
fn apply_batch(
m: &Model,
NormalBatch {
@ -302,7 +305,7 @@ impl<F: RawFSInterface> DataBatchRestoreDriver<F> {
}
}
impl<F: RawFSInterface> DataBatchRestoreDriver<F> {
impl<F: FSInterface> DataBatchRestoreDriver<F> {
fn read_batch_summary(&mut self, finished_early: bool) -> RuntimeResult<u64> {
if !finished_early {
// we must read the batch termination signature
@ -467,7 +470,7 @@ impl BatchStartBlock {
}
}
impl<F: RawFSInterface> DataBatchRestoreDriver<F> {
impl<F: FSInterface> DataBatchRestoreDriver<F> {
fn decode_primary_key(&mut self, pk_type: u8) -> RuntimeResult<PrimaryIndexKey> {
let Some(pk_type) = TagUnique::try_from_raw(pk_type) else {
return Err(StorageError::DataBatchRestoreCorruptedEntry.into());
@ -483,7 +486,7 @@ impl<F: RawFSInterface> DataBatchRestoreDriver<F> {
TagUnique::Str | TagUnique::Bin => {
let len = self.f.read_u64_le()?;
let mut data = vec![0; len as usize];
self.f.read_into_buffer(&mut data)?;
self.f.tracked_read(&mut data)?;
if pk_type == TagUnique::Str {
if core::str::from_utf8(&data).is_err() {
return Err(StorageError::DataBatchRestoreCorruptedEntry.into());
@ -505,7 +508,7 @@ impl<F: RawFSInterface> DataBatchRestoreDriver<F> {
let Some(dscr) = StorageCellTypeID::try_from_raw(self.f.read_byte()?) else {
return Err(StorageError::DataBatchRestoreCorruptedEntry.into());
};
unsafe { cell::decode_element::<Datacell, SDSSFileTrackedReader<F>>(&mut self.f, dscr) }
unsafe { cell::decode_element::<Datacell, TrackedReader<F>>(&mut self.f, dscr) }
.map_err(|e| e.0)
}
}
@ -521,7 +524,7 @@ impl From<()> for ErrorHack {
Self(StorageError::DataBatchRestoreCorruptedEntry.into())
}
}
impl<F: RawFSInterface> DataSource for SDSSFileTrackedReader<F> {
impl<F: FSInterface> DataSource for TrackedReader<F> {
const RELIABLE_SOURCE: bool = false;
type Error = ErrorHack;
fn has_remaining(&self, cnt: usize) -> bool {
@ -538,7 +541,7 @@ impl<F: RawFSInterface> DataSource for SDSSFileTrackedReader<F> {
}
unsafe fn read_next_variable_block(&mut self, size: usize) -> Result<Vec<u8>, Self::Error> {
let mut buf = vec![0; size];
self.read_into_buffer(&mut buf)?;
self.tracked_read(&mut buf)?;
Ok(buf)
}
}

@ -41,13 +41,15 @@
- FIXME(@ohsayan): we will probably (naively) need to dynamically reposition the cursor in case the metadata is corrupted as well
*/
#[cfg(test)]
use crate::engine::storage::common::interface::fs_traits::FileOpen;
use {
super::{
rw::{RawFSInterface, SDSSFileIO},
spec,
},
super::rw::SDSSFileIO,
crate::{
engine::error::{RuntimeResult, StorageError},
engine::{
error::{RuntimeResult, StorageError},
storage::common::{interface::fs_traits::FSInterface, sdss},
},
util::{compiler, copy_a_into_b, copy_slice_to_array as memcpy},
},
std::marker::PhantomData,
@ -56,11 +58,14 @@ use {
const CRC: crc::Crc<u32> = crc::Crc::<u32>::new(&crc::CRC_32_ISO_HDLC);
#[cfg(test)]
pub fn open_or_create_journal<TA: JournalAdapter, Fs: RawFSInterface, F: spec::FileSpec>(
pub fn open_or_create_journal<
TA: JournalAdapter,
Fs: FSInterface,
F: sdss::FileSpecV1<DecodeArgs = (), EncodeArgs = ()>,
>(
log_file_name: &str,
gs: &TA::GlobalState,
) -> RuntimeResult<super::rw::FileOpen<JournalWriter<Fs, TA>>> {
use super::rw::FileOpen;
) -> RuntimeResult<FileOpen<JournalWriter<Fs, TA>>> {
let file = match SDSSFileIO::<Fs>::open_or_create_perm_rw::<F>(log_file_name)? {
FileOpen::Created(f) => return Ok(FileOpen::Created(JournalWriter::new(f, 0, true)?)),
FileOpen::Existing((file, _header)) => file,
@ -71,13 +76,13 @@ pub fn open_or_create_journal<TA: JournalAdapter, Fs: RawFSInterface, F: spec::F
)?))
}
pub fn create_journal<TA: JournalAdapter, Fs: RawFSInterface, F: spec::FileSpec>(
pub fn create_journal<TA: JournalAdapter, Fs: FSInterface, F: sdss::FileSpecV1<EncodeArgs = ()>>(
log_file_name: &str,
) -> RuntimeResult<JournalWriter<Fs, TA>> {
JournalWriter::new(SDSSFileIO::create::<F>(log_file_name)?, 0, true)
}
pub fn load_journal<TA: JournalAdapter, Fs: RawFSInterface, F: spec::FileSpec>(
pub fn load_journal<TA: JournalAdapter, Fs: FSInterface, F: sdss::FileSpecV1<DecodeArgs = ()>>(
log_file_name: &str,
gs: &TA::GlobalState,
) -> RuntimeResult<JournalWriter<Fs, TA>> {
@ -192,7 +197,7 @@ impl JournalEntryMetadata {
}
}
pub struct JournalReader<TA, Fs: RawFSInterface> {
pub struct JournalReader<TA, Fs: FSInterface> {
log_file: SDSSFileIO<Fs>,
evid: u64,
closed: bool,
@ -200,9 +205,9 @@ pub struct JournalReader<TA, Fs: RawFSInterface> {
_m: PhantomData<TA>,
}
impl<TA: JournalAdapter, Fs: RawFSInterface> JournalReader<TA, Fs> {
impl<TA: JournalAdapter, Fs: FSInterface> JournalReader<TA, Fs> {
pub fn new(log_file: SDSSFileIO<Fs>) -> RuntimeResult<Self> {
let log_size = log_file.file_length()? - spec::SDSSStaticHeaderV1Compact::SIZE as u64;
let log_size = log_file.file_length()? - super::Header::SIZE as u64;
Ok(Self {
log_file,
evid: 0,
@ -228,7 +233,7 @@ impl<TA: JournalAdapter, Fs: RawFSInterface> JournalReader<TA, Fs> {
// the only case when this happens is when the journal faults at runtime with a write zero (or some other error when no bytes were written)
self.remaining_bytes += JournalEntryMetadata::SIZE as u64;
// move back cursor to see if we have a recovery block
let new_cursor = self.log_file.retrieve_cursor()? - JournalEntryMetadata::SIZE as u64;
let new_cursor = self.log_file.file_cursor()? - JournalEntryMetadata::SIZE as u64;
self.log_file.seek_from_start(new_cursor)?;
return self.try_recover_journal_strategy_simple_reverse();
}
@ -297,7 +302,7 @@ impl<TA: JournalAdapter, Fs: RawFSInterface> JournalReader<TA, Fs> {
debug_assert!(TA::RECOVERY_PLUGIN, "recovery plugin not enabled");
self.__record_read_bytes(JournalEntryMetadata::SIZE); // FIXME(@ohsayan): don't assume read length?
let mut entry_buf = [0u8; JournalEntryMetadata::SIZE];
if self.log_file.read_to_buffer(&mut entry_buf).is_err() {
if self.log_file.read_buffer(&mut entry_buf).is_err() {
return Err(StorageError::JournalCorrupted.into());
}
let entry = JournalEntryMetadata::decode(entry_buf);
@ -329,7 +334,7 @@ impl<TA: JournalAdapter, Fs: RawFSInterface> JournalReader<TA, Fs> {
}
}
impl<TA, Fs: RawFSInterface> JournalReader<TA, Fs> {
impl<TA, Fs: FSInterface> JournalReader<TA, Fs> {
fn _incr_evid(&mut self) {
self.evid += 1;
}
@ -344,19 +349,19 @@ impl<TA, Fs: RawFSInterface> JournalReader<TA, Fs> {
}
}
impl<TA, Fs: RawFSInterface> JournalReader<TA, Fs> {
impl<TA, Fs: FSInterface> JournalReader<TA, Fs> {
fn logfile_read_into_buffer(&mut self, buf: &mut [u8]) -> RuntimeResult<()> {
if !self.has_remaining_bytes(buf.len() as _) {
// do this right here to avoid another syscall
return Err(std::io::Error::from(std::io::ErrorKind::UnexpectedEof).into());
}
self.log_file.read_to_buffer(buf)?;
self.log_file.read_buffer(buf)?;
self.__record_read_bytes(buf.len());
Ok(())
}
}
pub struct JournalWriter<Fs: RawFSInterface, TA> {
pub struct JournalWriter<Fs: FSInterface, TA> {
/// the txn log file
log_file: SDSSFileIO<Fs>,
/// the id of the **next** journal
@ -365,7 +370,7 @@ pub struct JournalWriter<Fs: RawFSInterface, TA> {
closed: bool,
}
impl<Fs: RawFSInterface, TA: JournalAdapter> JournalWriter<Fs, TA> {
impl<Fs: FSInterface, TA: JournalAdapter> JournalWriter<Fs, TA> {
pub fn new(mut log_file: SDSSFileIO<Fs>, last_txn_id: u64, new: bool) -> RuntimeResult<Self> {
let log_size = log_file.file_length()?;
log_file.seek_from_start(log_size)?; // avoid jumbling with headers
@ -390,8 +395,8 @@ impl<Fs: RawFSInterface, TA: JournalAdapter> JournalWriter<Fs, TA> {
encoded.len() as u64,
)
.encoded();
self.log_file.unfsynced_write(&md)?;
self.log_file.unfsynced_write(&encoded)?;
self.log_file.write_buffer(&md)?;
self.log_file.write_buffer(&encoded)?;
self.log_file.fsync_all()?;
Ok(())
}
@ -411,7 +416,7 @@ impl<Fs: RawFSInterface, TA: JournalAdapter> JournalWriter<Fs, TA> {
}
}
impl<Fs: RawFSInterface, TA> JournalWriter<Fs, TA> {
impl<Fs: FSInterface, TA> JournalWriter<Fs, TA> {
pub fn appendrec_journal_reverse_entry(&mut self) -> RuntimeResult<()> {
let mut entry =
JournalEntryMetadata::new(0, EventSourceMarker::RECOVERY_REVERSE_LAST_JOURNAL, 0, 0);
@ -440,7 +445,7 @@ impl<Fs: RawFSInterface, TA> JournalWriter<Fs, TA> {
}
}
impl<Fs: RawFSInterface, TA> JournalWriter<Fs, TA> {
impl<Fs: FSInterface, TA> JournalWriter<Fs, TA> {
fn _incr_id(&mut self) -> u64 {
let current = self.id;
self.id += 1;
@ -448,7 +453,7 @@ impl<Fs: RawFSInterface, TA> JournalWriter<Fs, TA> {
}
}
impl<Fs: RawFSInterface, TA> Drop for JournalWriter<Fs, TA> {
impl<Fs: FSInterface, TA> Drop for JournalWriter<Fs, TA> {
fn drop(&mut self) {
assert!(self.closed, "log not closed");
}

@ -25,9 +25,9 @@
*/
#[cfg(test)]
use crate::engine::storage::v1::{
rw::{FileOpen, RawFSInterface},
JournalWriter,
use crate::engine::storage::{
common::interface::fs_traits::{FSInterface, FileOpen},
v1::JournalWriter,
};
use crate::engine::{
core::{EntityIDRef, GlobalNS},
@ -35,7 +35,10 @@ use crate::engine::{
error::RuntimeResult,
fractal::error::ErrorContext,
fractal::{FractalModelDriver, ModelDrivers, ModelUniqueID},
storage::v1::{batch_jrnl, journal, spec, LocalFS},
storage::{
common::interface::fs_imp::LocalFS,
v1::{batch_jrnl, journal, spec},
},
txn::gns::{GNSAdapter, GNSTransactionDriverAnyFS},
};
@ -43,14 +46,14 @@ const GNS_FILE_PATH: &str = "gns.db-tlog";
const DATA_DIR: &str = "data";
pub struct SEInitState {
pub txn_driver: GNSTransactionDriverAnyFS<super::LocalFS>,
pub txn_driver: GNSTransactionDriverAnyFS<LocalFS>,
pub model_drivers: ModelDrivers<LocalFS>,
pub gns: GlobalNS,
}
impl SEInitState {
pub fn new(
txn_driver: GNSTransactionDriverAnyFS<super::LocalFS>,
txn_driver: GNSTransactionDriverAnyFS<LocalFS>,
model_drivers: ModelDrivers<LocalFS>,
gns: GlobalNS,
) -> Self {
@ -140,7 +143,7 @@ impl SEInitState {
}
#[cfg(test)]
pub fn open_gns_driver<Fs: RawFSInterface>(
pub fn open_gns_driver<Fs: FSInterface>(
path: &str,
gns: &GlobalNS,
) -> RuntimeResult<FileOpen<JournalWriter<Fs, GNSAdapter>>> {

@ -24,6 +24,10 @@
*
*/
//! SDSS based storage engine driver v1 ([`versions::v1`])
//!
//! Target tags: `0.8.0-beta`, `0.8.0-beta.2`, `0.8.0-beta.3`
// impls
mod batch_jrnl;
mod journal;
@ -33,15 +37,15 @@ pub mod spec;
pub mod sysdb;
// hl
pub mod inf;
// test
pub mod memfs;
#[cfg(test)]
mod tests;
// re-exports
pub(self) use spec::Header;
pub use {
journal::{JournalAdapter, JournalWriter},
rw::{LocalFS, RawFSInterface, SDSSFileIO},
rw::SDSSFileIO,
};
pub mod data_batch {
pub use super::batch_jrnl::{create, DataBatchPersistDriver};

@ -25,360 +25,106 @@
*/
use {
super::spec::{FileSpec, Header},
crate::{
engine::{error::RuntimeResult, storage::SCrc},
engine::{
error::RuntimeResult,
storage::common::{
checksum::SCrc64,
interface::fs_traits::{
FSInterface, FileInterface, FileInterfaceBufWrite, FileInterfaceExt,
FileInterfaceRead, FileInterfaceWrite, FileInterfaceWriteExt, FileOpen,
},
sdss,
},
},
util::os::SysIOError,
},
std::{
fs::{self, File},
io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write},
marker::PhantomData,
},
std::marker::PhantomData,
};
#[derive(Debug)]
/// Log whether
pub enum FileOpen<CF, EF = CF> {
Created(CF),
Existing(EF),
}
#[cfg(test)]
impl<CF, EF> FileOpen<CF, EF> {
pub fn into_existing(self) -> Option<EF> {
match self {
Self::Existing(e) => Some(e),
Self::Created(_) => None,
}
}
pub fn into_created(self) -> Option<CF> {
match self {
Self::Created(f) => Some(f),
Self::Existing(_) => None,
}
}
}
#[cfg(test)]
impl<F> FileOpen<F> {
pub fn into_inner(self) -> F {
match self {
Self::Created(f) => f,
Self::Existing(f) => f,
}
}
}
/// The specification for a file system interface (our own abstraction over the fs)
pub trait RawFSInterface {
/// asserts that the file system is not a null filesystem (like `/dev/null` for example)
const NOT_NULL: bool = true;
/// the file descriptor that is returned by the file system when a file is opened
type File: RawFileInterface;
/// Remove a file
fn fs_remove_file(fpath: &str) -> RuntimeResult<()>;
/// Rename a file
fn fs_rename_file(from: &str, to: &str) -> RuntimeResult<()>;
/// Create a directory
fn fs_create_dir(fpath: &str) -> RuntimeResult<()>;
/// Create a directory and all corresponding path components
fn fs_create_dir_all(fpath: &str) -> RuntimeResult<()>;
/// Delete a directory
fn fs_delete_dir(fpath: &str) -> RuntimeResult<()>;
/// Delete a directory and recursively remove all (if any) children
fn fs_delete_dir_all(fpath: &str) -> RuntimeResult<()>;
/// Open or create a file in R/W mode
///
/// This will:
/// - Create a file if it doesn't exist
/// - Open a file it it does exist
fn fs_fopen_or_create_rw(fpath: &str) -> RuntimeResult<FileOpen<Self::File>>;
/// Open an existing file
fn fs_fopen_rw(fpath: &str) -> RuntimeResult<Self::File>;
/// Create a new file
fn fs_fcreate_rw(fpath: &str) -> RuntimeResult<Self::File>;
pub struct TrackedWriter<Fs: FSInterface> {
file: SDSSFileIO<Fs, <Fs::File as FileInterface>::BufWriter>,
cs: SCrc64,
}
/// A file (well, probably) that can be used for RW operations along with advanced write and extended operations (such as seeking)
pub trait RawFileInterface: Sized
where
Self: RawFileInterfaceRead
+ RawFileInterfaceWrite
+ RawFileInterfaceWriteExt
+ RawFileInterfaceExt,
{
type BufReader: RawFileInterfaceBufferedReader;
type BufWriter: RawFileInterfaceBufferedWriter;
fn into_buffered_reader(self) -> RuntimeResult<Self::BufReader>;
fn downgrade_reader(r: Self::BufReader) -> RuntimeResult<Self>;
fn into_buffered_writer(self) -> RuntimeResult<Self::BufWriter>;
fn downgrade_writer(w: Self::BufWriter) -> RuntimeResult<Self>;
}
pub trait RawFileInterfaceBufferedReader: RawFileInterfaceRead + RawFileInterfaceExt {}
impl<R: RawFileInterfaceRead + RawFileInterfaceExt> RawFileInterfaceBufferedReader for R {}
pub trait RawFileInterfaceBufferedWriter: RawFileInterfaceWrite + RawFileInterfaceExt {
fn sync_write_cache(&mut self) -> RuntimeResult<()> {
Ok(())
}
}
/// A file interface that supports read operations
pub trait RawFileInterfaceRead {
fn fr_read_exact(&mut self, buf: &mut [u8]) -> RuntimeResult<()>;
}
impl<R: Read> RawFileInterfaceRead for R {
fn fr_read_exact(&mut self, buf: &mut [u8]) -> RuntimeResult<()> {
self.read_exact(buf).map_err(From::from)
}
}
/// A file interface that supports write operations
pub trait RawFileInterfaceWrite {
fn fw_write_all(&mut self, buf: &[u8]) -> RuntimeResult<()>;
}
impl<W: Write> RawFileInterfaceWrite for W {
fn fw_write_all(&mut self, buf: &[u8]) -> RuntimeResult<()> {
self.write_all(buf).map_err(From::from)
}
}
/// A file interface that supports advanced write operations
pub trait RawFileInterfaceWriteExt {
fn fwext_fsync_all(&mut self) -> RuntimeResult<()>;
fn fwext_truncate_to(&mut self, to: u64) -> RuntimeResult<()>;
}
/// A file interface that supports advanced file operations
pub trait RawFileInterfaceExt {
fn fext_file_length(&self) -> RuntimeResult<u64>;
fn fext_cursor(&mut self) -> RuntimeResult<u64>;
fn fext_seek_ahead_from_start_by(&mut self, ahead_by: u64) -> RuntimeResult<()>;
}
fn cvt<T>(v: std::io::Result<T>) -> RuntimeResult<T> {
let r = v?;
Ok(r)
}
/// The actual local host file system (as an abstraction [`fs`])
#[derive(Debug)]
pub struct LocalFS;
impl RawFSInterface for LocalFS {
type File = File;
fn fs_remove_file(fpath: &str) -> RuntimeResult<()> {
cvt(fs::remove_file(fpath))
}
fn fs_rename_file(from: &str, to: &str) -> RuntimeResult<()> {
cvt(fs::rename(from, to))
}
fn fs_create_dir(fpath: &str) -> RuntimeResult<()> {
cvt(fs::create_dir(fpath))
}
fn fs_create_dir_all(fpath: &str) -> RuntimeResult<()> {
cvt(fs::create_dir_all(fpath))
}
fn fs_delete_dir(fpath: &str) -> RuntimeResult<()> {
cvt(fs::remove_dir(fpath))
}
fn fs_delete_dir_all(fpath: &str) -> RuntimeResult<()> {
cvt(fs::remove_dir_all(fpath))
}
fn fs_fopen_or_create_rw(fpath: &str) -> RuntimeResult<FileOpen<Self::File>> {
let f = File::options()
.create(true)
.read(true)
.write(true)
.open(fpath)?;
let md = f.metadata()?;
if md.len() == 0 {
Ok(FileOpen::Created(f))
} else {
Ok(FileOpen::Existing(f))
}
}
fn fs_fcreate_rw(fpath: &str) -> RuntimeResult<Self::File> {
let f = File::options()
.create_new(true)
.read(true)
.write(true)
.open(fpath)?;
Ok(f)
}
fn fs_fopen_rw(fpath: &str) -> RuntimeResult<Self::File> {
let f = File::options().read(true).write(true).open(fpath)?;
Ok(f)
}
}
impl RawFileInterface for File {
type BufReader = BufReader<Self>;
type BufWriter = BufWriter<Self>;
fn into_buffered_reader(self) -> RuntimeResult<Self::BufReader> {
Ok(BufReader::new(self))
}
fn downgrade_reader(r: Self::BufReader) -> RuntimeResult<Self> {
Ok(r.into_inner())
}
fn into_buffered_writer(self) -> RuntimeResult<Self::BufWriter> {
Ok(BufWriter::new(self))
}
fn downgrade_writer(mut w: Self::BufWriter) -> RuntimeResult<Self> {
w.flush()?; // TODO(@ohsayan): handle rare case where writer does panic
let (w, _) = w.into_parts();
Ok(w)
}
}
impl RawFileInterfaceBufferedWriter for BufWriter<File> {
fn sync_write_cache(&mut self) -> RuntimeResult<()> {
self.flush()?;
self.get_mut().sync_all()?;
Ok(())
}
}
impl RawFileInterfaceWriteExt for File {
fn fwext_fsync_all(&mut self) -> RuntimeResult<()> {
cvt(self.sync_all())
}
fn fwext_truncate_to(&mut self, to: u64) -> RuntimeResult<()> {
cvt(self.set_len(to))
}
}
trait LocalFSFile {
fn file_mut(&mut self) -> &mut File;
fn file(&self) -> &File;
}
impl LocalFSFile for File {
fn file_mut(&mut self) -> &mut File {
self
}
fn file(&self) -> &File {
self
}
}
impl LocalFSFile for BufReader<File> {
fn file_mut(&mut self) -> &mut File {
self.get_mut()
}
fn file(&self) -> &File {
self.get_ref()
}
}
impl LocalFSFile for BufWriter<File> {
fn file_mut(&mut self) -> &mut File {
self.get_mut()
}
fn file(&self) -> &File {
self.get_ref()
}
}
impl<F: LocalFSFile> RawFileInterfaceExt for F {
fn fext_file_length(&self) -> RuntimeResult<u64> {
Ok(self.file().metadata()?.len())
}
fn fext_cursor(&mut self) -> RuntimeResult<u64> {
cvt(self.file_mut().stream_position())
}
fn fext_seek_ahead_from_start_by(&mut self, by: u64) -> RuntimeResult<()> {
cvt(self.file_mut().seek(SeekFrom::Start(by)).map(|_| ()))
}
}
pub struct SDSSFileTrackedWriter<Fs: RawFSInterface> {
f: SDSSFileIO<Fs, <Fs::File as RawFileInterface>::BufWriter>,
cs: SCrc,
}
impl<Fs: RawFSInterface> SDSSFileTrackedWriter<Fs> {
impl<Fs: FSInterface> TrackedWriter<Fs> {
pub fn new(f: SDSSFileIO<Fs>) -> RuntimeResult<Self> {
Ok(Self {
f: f.into_buffered_sdss_writer()?,
cs: SCrc::new(),
file: f.into_buffered_writer()?,
cs: SCrc64::new(),
})
}
pub fn tracked_write_unfsynced(&mut self, block: &[u8]) -> RuntimeResult<()> {
pub fn tracked_write(&mut self, block: &[u8]) -> RuntimeResult<()> {
self.untracked_write(block)
.map(|_| self.cs.recompute_with_new_var_block(block))
}
pub fn untracked_write(&mut self, block: &[u8]) -> RuntimeResult<()> {
match self.f.unfsynced_write(block) {
match self.file.write_buffer(block) {
Ok(()) => Ok(()),
e => e,
}
}
pub fn sync_writes(&mut self) -> RuntimeResult<()> {
self.f.f.sync_write_cache()
self.file.f.sync_write_cache()
}
pub fn reset_and_finish_checksum(&mut self) -> u64 {
let scrc = core::mem::replace(&mut self.cs, SCrc::new());
let scrc = core::mem::replace(&mut self.cs, SCrc64::new());
scrc.finish()
}
pub fn into_inner_file(self) -> RuntimeResult<SDSSFileIO<Fs>> {
self.f.downgrade_writer()
pub fn sync_into_inner(self) -> RuntimeResult<SDSSFileIO<Fs>> {
self.file.downgrade_writer()
}
}
/// [`SDSSFileLenTracked`] simply maintains application level length and checksum tracking to avoid frequent syscalls because we
/// do not expect (even though it's very possible) users to randomly modify file lengths while we're reading them
pub struct SDSSFileTrackedReader<Fs: RawFSInterface> {
f: SDSSFileIO<Fs, <Fs::File as RawFileInterface>::BufReader>,
pub struct TrackedReader<Fs: FSInterface> {
f: SDSSFileIO<Fs, <Fs::File as FileInterface>::BufReader>,
len: u64,
pos: u64,
cs: SCrc,
cursor: u64,
cs: SCrc64,
}
impl<Fs: RawFSInterface> SDSSFileTrackedReader<Fs> {
impl<Fs: FSInterface> TrackedReader<Fs> {
/// Important: this will only look at the data post the current cursor!
pub fn new(mut f: SDSSFileIO<Fs>) -> RuntimeResult<Self> {
let len = f.file_length()?;
let pos = f.retrieve_cursor()?;
let f = f.into_buffered_sdss_reader()?;
let pos = f.file_cursor()?;
let f = f.into_buffered_reader()?;
Ok(Self {
f,
len,
pos,
cs: SCrc::new(),
cursor: pos,
cs: SCrc64::new(),
})
}
pub fn remaining(&self) -> u64 {
self.len - self.pos
self.len - self.cursor
}
pub fn is_eof(&self) -> bool {
self.len == self.pos
self.len == self.cursor
}
pub fn has_left(&self, v: u64) -> bool {
self.remaining() >= v
}
pub fn read_into_buffer(&mut self, buf: &mut [u8]) -> RuntimeResult<()> {
pub fn tracked_read(&mut self, buf: &mut [u8]) -> RuntimeResult<()> {
self.untracked_read(buf)
.map(|_| self.cs.recompute_with_new_var_block(buf))
}
pub fn read_byte(&mut self) -> RuntimeResult<u8> {
let mut buf = [0u8; 1];
self.read_into_buffer(&mut buf).map(|_| buf[0])
self.tracked_read(&mut buf).map(|_| buf[0])
}
pub fn __reset_checksum(&mut self) -> u64 {
let mut crc = SCrc::new();
let mut crc = SCrc64::new();
core::mem::swap(&mut crc, &mut self.cs);
crc.finish()
}
pub fn untracked_read(&mut self, buf: &mut [u8]) -> RuntimeResult<()> {
if self.remaining() >= buf.len() as u64 {
match self.f.read_to_buffer(buf) {
match self.f.read_buffer(buf) {
Ok(()) => {
self.pos += buf.len() as u64;
self.cursor += buf.len() as u64;
Ok(())
}
Err(e) => return Err(e),
@ -395,7 +141,7 @@ impl<Fs: RawFSInterface> SDSSFileTrackedReader<Fs> {
return Err(SysIOError::from(std::io::ErrorKind::InvalidInput).into());
}
let mut buf = [0; N];
self.read_into_buffer(&mut buf)?;
self.tracked_read(&mut buf)?;
Ok(buf)
}
pub fn read_u64_le(&mut self) -> RuntimeResult<u64> {
@ -404,66 +150,68 @@ impl<Fs: RawFSInterface> SDSSFileTrackedReader<Fs> {
}
#[derive(Debug)]
pub struct SDSSFileIO<Fs: RawFSInterface, F = <Fs as RawFSInterface>::File> {
pub struct SDSSFileIO<Fs: FSInterface, F = <Fs as FSInterface>::File> {
f: F,
_fs: PhantomData<Fs>,
}
impl<Fs: RawFSInterface> SDSSFileIO<Fs> {
pub fn open<F: FileSpec>(fpath: &str) -> RuntimeResult<(Self, F::Header)> {
impl<Fs: FSInterface> SDSSFileIO<Fs> {
pub fn open<F: sdss::FileSpecV1<DecodeArgs = ()>>(
fpath: &str,
) -> RuntimeResult<(Self, F::Metadata)> {
let mut f = Self::_new(Fs::fs_fopen_rw(fpath)?);
let header = F::Header::decode_verify(&mut f, F::DECODE_DATA, F::VERIFY_DATA)?;
Ok((f, header))
let v = F::read_metadata(&mut f.f, ())?;
Ok((f, v))
}
pub fn create<F: FileSpec>(fpath: &str) -> RuntimeResult<Self> {
pub fn create<F: sdss::FileSpecV1<EncodeArgs = ()>>(fpath: &str) -> RuntimeResult<Self> {
let mut f = Self::_new(Fs::fs_fcreate_rw(fpath)?);
F::Header::encode(&mut f, F::ENCODE_DATA)?;
F::write_metadata(&mut f.f, ())?;
Ok(f)
}
pub fn open_or_create_perm_rw<F: FileSpec>(
pub fn open_or_create_perm_rw<F: sdss::FileSpecV1<DecodeArgs = (), EncodeArgs = ()>>(
fpath: &str,
) -> RuntimeResult<FileOpen<Self, (Self, F::Header)>> {
) -> RuntimeResult<FileOpen<Self, (Self, F::Metadata)>> {
match Fs::fs_fopen_or_create_rw(fpath)? {
FileOpen::Created(c) => {
let mut f = Self::_new(c);
F::Header::encode(&mut f, F::ENCODE_DATA)?;
F::write_metadata(&mut f.f, ())?;
Ok(FileOpen::Created(f))
}
FileOpen::Existing(e) => {
let mut f = Self::_new(e);
let header = F::Header::decode_verify(&mut f, F::DECODE_DATA, F::VERIFY_DATA)?;
let header = F::read_metadata(&mut f.f, ())?;
Ok(FileOpen::Existing((f, header)))
}
}
}
pub fn into_buffered_sdss_reader(
pub fn into_buffered_reader(
self,
) -> RuntimeResult<SDSSFileIO<Fs, <Fs::File as RawFileInterface>::BufReader>> {
self.f.into_buffered_reader().map(SDSSFileIO::_new)
) -> RuntimeResult<SDSSFileIO<Fs, <Fs::File as FileInterface>::BufReader>> {
self.f.upgrade_to_buffered_reader().map(SDSSFileIO::_new)
}
pub fn into_buffered_sdss_writer(
pub fn into_buffered_writer(
self,
) -> RuntimeResult<SDSSFileIO<Fs, <Fs::File as RawFileInterface>::BufWriter>> {
self.f.into_buffered_writer().map(SDSSFileIO::_new)
) -> RuntimeResult<SDSSFileIO<Fs, <Fs::File as FileInterface>::BufWriter>> {
self.f.upgrade_to_buffered_writer().map(SDSSFileIO::_new)
}
}
impl<Fs: RawFSInterface> SDSSFileIO<Fs, <Fs::File as RawFileInterface>::BufReader> {
impl<Fs: FSInterface> SDSSFileIO<Fs, <Fs::File as FileInterface>::BufReader> {
pub fn downgrade_reader(self) -> RuntimeResult<SDSSFileIO<Fs, Fs::File>> {
let me = <Fs::File as RawFileInterface>::downgrade_reader(self.f)?;
let me = <Fs::File as FileInterface>::downgrade_reader(self.f)?;
Ok(SDSSFileIO::_new(me))
}
}
impl<Fs: RawFSInterface> SDSSFileIO<Fs, <Fs::File as RawFileInterface>::BufWriter> {
impl<Fs: FSInterface> SDSSFileIO<Fs, <Fs::File as FileInterface>::BufWriter> {
pub fn downgrade_writer(self) -> RuntimeResult<SDSSFileIO<Fs>> {
let me = <Fs::File as RawFileInterface>::downgrade_writer(self.f)?;
let me = <Fs::File as FileInterface>::downgrade_writer(self.f)?;
Ok(SDSSFileIO::_new(me))
}
}
impl<Fs: RawFSInterface, F> SDSSFileIO<Fs, F> {
pub fn _new(f: F) -> Self {
impl<Fs: FSInterface, F> SDSSFileIO<Fs, F> {
fn _new(f: F) -> Self {
Self {
f,
_fs: PhantomData,
@ -471,46 +219,46 @@ impl<Fs: RawFSInterface, F> SDSSFileIO<Fs, F> {
}
}
impl<Fs: RawFSInterface, F: RawFileInterfaceRead> SDSSFileIO<Fs, F> {
pub fn read_to_buffer(&mut self, buffer: &mut [u8]) -> RuntimeResult<()> {
self.f.fr_read_exact(buffer)
impl<Fs: FSInterface, F: FileInterfaceRead> SDSSFileIO<Fs, F> {
pub fn read_buffer(&mut self, buffer: &mut [u8]) -> RuntimeResult<()> {
self.f.fread_exact(buffer)
}
}
impl<Fs: RawFSInterface, F: RawFileInterfaceExt> SDSSFileIO<Fs, F> {
pub fn retrieve_cursor(&mut self) -> RuntimeResult<u64> {
impl<Fs: FSInterface, F: FileInterfaceExt> SDSSFileIO<Fs, F> {
pub fn file_cursor(&mut self) -> RuntimeResult<u64> {
self.f.fext_cursor()
}
pub fn file_length(&self) -> RuntimeResult<u64> {
self.f.fext_file_length()
self.f.fext_length()
}
pub fn seek_from_start(&mut self, by: u64) -> RuntimeResult<()> {
self.f.fext_seek_ahead_from_start_by(by)
}
}
impl<Fs: RawFSInterface, F: RawFileInterfaceRead + RawFileInterfaceExt> SDSSFileIO<Fs, F> {
pub fn load_remaining_into_buffer(&mut self) -> RuntimeResult<Vec<u8>> {
let len = self.file_length()? - self.retrieve_cursor()?;
impl<Fs: FSInterface, F: FileInterfaceRead + FileInterfaceExt> SDSSFileIO<Fs, F> {
pub fn read_full(&mut self) -> RuntimeResult<Vec<u8>> {
let len = self.file_length()? - self.file_cursor()?;
let mut buf = vec![0; len as usize];
self.read_to_buffer(&mut buf)?;
self.read_buffer(&mut buf)?;
Ok(buf)
}
}
impl<Fs: RawFSInterface, F: RawFileInterfaceWrite> SDSSFileIO<Fs, F> {
pub fn unfsynced_write(&mut self, data: &[u8]) -> RuntimeResult<()> {
impl<Fs: FSInterface, F: FileInterfaceWrite> SDSSFileIO<Fs, F> {
pub fn write_buffer(&mut self, data: &[u8]) -> RuntimeResult<()> {
self.f.fw_write_all(data)
}
}
impl<Fs: RawFSInterface, F: RawFileInterfaceWrite + RawFileInterfaceWriteExt> SDSSFileIO<Fs, F> {
impl<Fs: FSInterface, F: FileInterfaceWrite + FileInterfaceWriteExt> SDSSFileIO<Fs, F> {
pub fn fsync_all(&mut self) -> RuntimeResult<()> {
self.f.fwext_fsync_all()?;
self.f.fwext_sync_all()?;
Ok(())
}
pub fn fsynced_write(&mut self, data: &[u8]) -> RuntimeResult<()> {
self.f.fw_write_all(data)?;
self.f.fwext_fsync_all()
self.f.fwext_sync_all()
}
}

@ -24,157 +24,37 @@
*
*/
/*
Header specification
---
We utilize two different kinds of headers:
- Static header - Mostly to avoid data corruption
- Variable header - For preserving dynamic information
*/
use {
super::rw::{RawFSInterface, SDSSFileIO},
crate::{
engine::{
error::{RuntimeResult, StorageError},
storage::versions::{self, DriverVersion, HeaderVersion, ServerVersion},
},
util::os,
},
std::{
mem::{transmute, ManuallyDrop},
ops::Range,
},
use crate::engine::storage::common::{
sdss,
versions::{self, DriverVersion, FileSpecifierVersion, ServerVersion},
};
/*
meta
*/
pub(super) type Header = sdss::HeaderV1<HeaderImplV1>;
#[repr(u8)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, sky_macros::EnumMethods)]
/// Host architecture enumeration for common platforms
pub enum HostArch {
X86 = 0,
X86_64 = 1,
ARM = 2,
ARM64 = 3,
MIPS = 4,
PowerPC = 5,
#[derive(Debug)]
pub(super) struct HeaderImplV1;
impl sdss::HeaderV1Spec for HeaderImplV1 {
type FileClass = FileScope;
type FileSpecifier = FileSpecifier;
const CURRENT_SERVER_VERSION: ServerVersion = versions::v1::V1_SERVER_VERSION;
const CURRENT_DRIVER_VERSION: DriverVersion = versions::v1::V1_DRIVER_VERSION;
}
impl HostArch {
pub const fn new() -> Self {
if cfg!(target_arch = "x86") {
HostArch::X86
} else if cfg!(target_arch = "x86_64") {
HostArch::X86_64
} else if cfg!(target_arch = "arm") {
HostArch::ARM
} else if cfg!(target_arch = "aarch64") {
HostArch::ARM64
} else if cfg!(target_arch = "mips") {
HostArch::MIPS
} else if cfg!(target_arch = "powerpc") {
HostArch::PowerPC
} else {
panic!("Unsupported target architecture")
}
impl sdss::HeaderV1Enumeration for FileScope {
const MAX: u8 = FileScope::MAX;
unsafe fn new(x: u8) -> Self {
core::mem::transmute(x)
}
}
#[repr(u8)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, sky_macros::EnumMethods)]
/// Host OS enumeration for common operating systems
pub enum HostOS {
// T1
Linux = 0,
Windows = 1,
MacOS = 2,
// T2
Android = 3,
AppleiOS = 4,
FreeBSD = 5,
OpenBSD = 6,
NetBSD = 7,
WASI = 8,
Emscripten = 9,
// T3
Solaris = 10,
Fuchsia = 11,
Redox = 12,
DragonFly = 13,
}
impl HostOS {
pub const fn new() -> Self {
if cfg!(target_os = "linux") {
HostOS::Linux
} else if cfg!(target_os = "windows") {
HostOS::Windows
} else if cfg!(target_os = "macos") {
HostOS::MacOS
} else if cfg!(target_os = "android") {
HostOS::Android
} else if cfg!(target_os = "ios") {
HostOS::AppleiOS
} else if cfg!(target_os = "freebsd") {
HostOS::FreeBSD
} else if cfg!(target_os = "openbsd") {
HostOS::OpenBSD
} else if cfg!(target_os = "netbsd") {
HostOS::NetBSD
} else if cfg!(target_os = "dragonfly") {
HostOS::DragonFly
} else if cfg!(target_os = "redox") {
HostOS::Redox
} else if cfg!(target_os = "fuchsia") {
HostOS::Fuchsia
} else if cfg!(target_os = "solaris") {
HostOS::Solaris
} else if cfg!(target_os = "emscripten") {
HostOS::Emscripten
} else if cfg!(target_os = "wasi") {
HostOS::WASI
} else {
panic!("unknown os")
}
fn repr_u8(&self) -> u8 {
FileScope::value_u8(self)
}
}
#[repr(u8)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, sky_macros::EnumMethods)]
/// Host endian enumeration
pub enum HostEndian {
Big = 0,
Little = 1,
}
impl HostEndian {
pub const fn new() -> Self {
if cfg!(target_endian = "little") {
Self::Little
} else {
Self::Big
}
impl sdss::HeaderV1Enumeration for FileSpecifier {
const MAX: u8 = FileSpecifier::MAX;
unsafe fn new(x: u8) -> Self {
core::mem::transmute(x)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, sky_macros::EnumMethods)]
#[repr(u8)]
/// Host pointer width enumeration
pub enum HostPointerWidth {
P32 = 0,
P64 = 1,
}
impl HostPointerWidth {
pub const fn new() -> Self {
match sizeof!(usize) {
4 => Self::P32,
8 => Self::P64,
_ => panic!("unknown pointer width"),
}
fn repr_u8(&self) -> u8 {
self.value_u8()
}
}
@ -197,414 +77,43 @@ pub enum FileSpecifier {
TestTransactionLog = 0xFF,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub struct FileSpecifierVersion(u16);
impl FileSpecifierVersion {
pub const fn __new(v: u16) -> Self {
Self(v)
}
}
const SDSS_MAGIC: u64 = 0x4F48534159414E21;
/// Specification for a SDSS file
pub trait FileSpec {
/// The header spec for the file
type Header: Header;
/// Encode data
const ENCODE_DATA: <Self::Header as Header>::EncodeArgs;
/// Decode data
const DECODE_DATA: <Self::Header as Header>::DecodeArgs;
/// Verify data
const VERIFY_DATA: <Self::Header as Header>::DecodeVerifyArgs;
}
/*
file spec impls
*/
#[cfg(test)]
pub struct TestFile;
pub(super) struct TestFile;
#[cfg(test)]
impl FileSpec for TestFile {
type Header = SDSSStaticHeaderV1Compact;
const ENCODE_DATA: <Self::Header as Header>::EncodeArgs = (
FileScope::FlatmapData,
FileSpecifier::TestTransactionLog,
FileSpecifierVersion::__new(0),
);
const DECODE_DATA: <Self::Header as Header>::DecodeArgs = ();
const VERIFY_DATA: <Self::Header as Header>::DecodeVerifyArgs = Self::ENCODE_DATA;
impl sdss::SimpleFileSpecV1 for TestFile {
type HeaderSpec = HeaderImplV1;
const FILE_CLASS: FileScope = FileScope::FlatmapData;
const FILE_SPECIFIER: FileSpecifier = FileSpecifier::TestTransactionLog;
const FILE_SPECFIER_VERSION: FileSpecifierVersion = FileSpecifierVersion::__new(0);
}
/// The file specification for the GNS transaction log (impl v1)
pub struct GNSTransactionLogV1;
impl FileSpec for GNSTransactionLogV1 {
type Header = SDSSStaticHeaderV1Compact;
const ENCODE_DATA: <Self::Header as Header>::EncodeArgs = (
FileScope::Journal,
FileSpecifier::GNSTxnLog,
FileSpecifierVersion::__new(0),
);
const DECODE_DATA: <Self::Header as Header>::DecodeArgs = ();
const VERIFY_DATA: <Self::Header as Header>::DecodeVerifyArgs = Self::ENCODE_DATA;
pub(super) struct GNSTransactionLogV1;
impl sdss::SimpleFileSpecV1 for GNSTransactionLogV1 {
type HeaderSpec = HeaderImplV1;
const FILE_CLASS: FileScope = FileScope::Journal;
const FILE_SPECIFIER: FileSpecifier = FileSpecifier::GNSTxnLog;
const FILE_SPECFIER_VERSION: FileSpecifierVersion = FileSpecifierVersion::__new(0);
}
/// The file specification for a journal batch
pub struct DataBatchJournalV1;
impl FileSpec for DataBatchJournalV1 {
type Header = SDSSStaticHeaderV1Compact;
const ENCODE_DATA: <Self::Header as Header>::EncodeArgs = (
FileScope::DataBatch,
FileSpecifier::TableDataBatch,
FileSpecifierVersion::__new(0),
);
const DECODE_DATA: <Self::Header as Header>::DecodeArgs = ();
const VERIFY_DATA: <Self::Header as Header>::DecodeVerifyArgs = Self::ENCODE_DATA;
pub(super) struct DataBatchJournalV1;
impl sdss::SimpleFileSpecV1 for DataBatchJournalV1 {
type HeaderSpec = HeaderImplV1;
const FILE_CLASS: FileScope = FileScope::DataBatch;
const FILE_SPECIFIER: FileSpecifier = FileSpecifier::TableDataBatch;
const FILE_SPECFIER_VERSION: FileSpecifierVersion = FileSpecifierVersion::__new(0);
}
/// The file specification for the system db
pub struct SysDBV1;
impl FileSpec for SysDBV1 {
type Header = SDSSStaticHeaderV1Compact;
const ENCODE_DATA: <Self::Header as Header>::EncodeArgs = (
FileScope::FlatmapData,
FileSpecifier::SysDB,
FileSpecifierVersion::__new(0),
);
const DECODE_DATA: <Self::Header as Header>::DecodeArgs = ();
const VERIFY_DATA: <Self::Header as Header>::DecodeVerifyArgs = Self::ENCODE_DATA;
}
/*
header spec
*/
/// SDSS Header specification
pub trait Header: Sized {
/// Encode arguments
type EncodeArgs;
/// Decode arguments
type DecodeArgs;
/// Decode verify arguments
type DecodeVerifyArgs;
/// Encode the header
fn encode<Fs: RawFSInterface>(
f: &mut SDSSFileIO<Fs>,
args: Self::EncodeArgs,
) -> RuntimeResult<()>;
/// Decode the header
fn decode<Fs: RawFSInterface>(
f: &mut SDSSFileIO<Fs>,
args: Self::DecodeArgs,
) -> RuntimeResult<Self>;
/// Verify the header
fn verify(&self, args: Self::DecodeVerifyArgs) -> RuntimeResult<()>;
/// Decode and verify the header
fn decode_verify<Fs: RawFSInterface>(
f: &mut SDSSFileIO<Fs>,
d_args: Self::DecodeArgs,
v_args: Self::DecodeVerifyArgs,
) -> RuntimeResult<Self> {
let h = Self::decode(f, d_args)?;
h.verify(v_args)?;
Ok(h)
}
}
/*
header impls
*/
unsafe fn memcpy<const N: usize>(src: &[u8]) -> [u8; N] {
let mut dst = [0u8; N];
src.as_ptr().copy_to_nonoverlapping(dst.as_mut_ptr(), N);
dst
}
macro_rules! var {
(let $($name:ident),* $(,)?) => {
$(let $name;)*
}
}
/*
Compact SDSS Header v1
---
- 1: Magic block (16B): magic + header version
- 2: Static block (40B):
- 2.1: Genesis static record (24B)
- 2.1.1: Software information (16B)
- Server version (8B)
- Driver version (8B)
- 2.1.2: Host information (4B):
- OS (1B)
- Arch (1B)
- Pointer width (1B)
- Endian (1B)
- 2.1.3: File information (4B):
- File class (1B)
- File specifier (1B)
- File specifier version (2B)
- 2.2: Genesis runtime record (16B)
- Host epoch (16B)
- 3: Padding block (8B)
*/
#[repr(align(8))]
#[derive(Debug, PartialEq)]
pub struct SDSSStaticHeaderV1Compact {
// 1 magic block
magic_header_version: HeaderVersion,
// 2.1.1
genesis_static_sw_server_version: ServerVersion,
genesis_static_sw_driver_version: DriverVersion,
// 2.1.2
genesis_static_host_os: HostOS,
genesis_static_host_arch: HostArch,
genesis_static_host_ptr_width: HostPointerWidth,
genesis_static_host_endian: HostEndian,
// 2.1.3
genesis_static_file_class: FileScope,
genesis_static_file_specifier: FileSpecifier,
genesis_static_file_specifier_version: FileSpecifierVersion,
// 2.2
genesis_runtime_epoch_time: u128,
// 3
genesis_padding_block: [u8; 8],
}
impl SDSSStaticHeaderV1Compact {
pub const SIZE: usize = 64;
/// Decode and validate the full header block (validate ONLY; you must verify yourself)
///
/// Notes:
/// - Time might be inconsistent; verify
/// - Compatibility requires additional intervention
/// - If padding block was not zeroed, handle
/// - No file metadata and is verified. Check!
///
fn _decode(block: [u8; 64]) -> RuntimeResult<Self> {
var!(let raw_magic, raw_header_version, raw_server_version, raw_driver_version, raw_host_os, raw_host_arch,
raw_host_ptr_width, raw_host_endian, raw_file_class, raw_file_specifier, raw_file_specifier_version,
raw_runtime_epoch_time, raw_paddding_block,
);
macro_rules! u64 {
($pos:expr) => {
u64::from_le_bytes(memcpy(&block[$pos]))
};
}
unsafe {
// UNSAFE(@ohsayan): all segments are correctly accessed (aligned to u8)
raw_magic = u64!(Self::SEG1_MAGIC);
raw_header_version = HeaderVersion::__new(u64!(Self::SEG1_HEADER_VERSION));
raw_server_version = ServerVersion::__new(u64!(Self::SEG2_REC1_SERVER_VERSION));
raw_driver_version = DriverVersion::__new(u64!(Self::SEG2_REC1_DRIVER_VERSION));
raw_host_os = block[Self::SEG2_REC1_HOST_OS];
raw_host_arch = block[Self::SEG2_REC1_HOST_ARCH];
raw_host_ptr_width = block[Self::SEG2_REC1_HOST_PTR_WIDTH];
raw_host_endian = block[Self::SEG2_REC1_HOST_ENDIAN];
raw_file_class = block[Self::SEG2_REC1_FILE_CLASS];
raw_file_specifier = block[Self::SEG2_REC1_FILE_SPECIFIER];
raw_file_specifier_version = FileSpecifierVersion::__new(u16::from_le_bytes(memcpy(
&block[Self::SEG2_REC1_FILE_SPECIFIER_VERSION],
)));
raw_runtime_epoch_time =
u128::from_le_bytes(memcpy(&block[Self::SEG2_REC2_RUNTIME_EPOCH_TIME]));
raw_paddding_block = memcpy::<8>(&block[Self::SEG3_PADDING_BLK]);
}
macro_rules! okay {
($($expr:expr),* $(,)?) => {
$(($expr) &)*true
}
}
let okay_header_version = raw_header_version == versions::CURRENT_HEADER_VERSION;
let okay_server_version = raw_server_version == versions::CURRENT_SERVER_VERSION;
let okay_driver_version = raw_driver_version == versions::CURRENT_DRIVER_VERSION;
let okay = okay!(
// 1.1 mgblk
raw_magic == SDSS_MAGIC,
okay_header_version,
// 2.1.1
okay_server_version,
okay_driver_version,
// 2.1.2
raw_host_os <= HostOS::MAX,
raw_host_arch <= HostArch::MAX,
raw_host_ptr_width <= HostPointerWidth::MAX,
raw_host_endian <= HostEndian::MAX,
// 2.1.3
raw_file_class <= FileScope::MAX,
raw_file_specifier <= FileSpecifier::MAX,
);
if okay {
Ok(unsafe {
// UNSAFE(@ohsayan): the block ranges are very well defined
Self {
// 1.1
magic_header_version: raw_header_version,
// 2.1.1
genesis_static_sw_server_version: raw_server_version,
genesis_static_sw_driver_version: raw_driver_version,
// 2.1.2
genesis_static_host_os: transmute(raw_host_os),
genesis_static_host_arch: transmute(raw_host_arch),
genesis_static_host_ptr_width: transmute(raw_host_ptr_width),
genesis_static_host_endian: transmute(raw_host_endian),
// 2.1.3
genesis_static_file_class: transmute(raw_file_class),
genesis_static_file_specifier: transmute(raw_file_specifier),
genesis_static_file_specifier_version: raw_file_specifier_version,
// 2.2
genesis_runtime_epoch_time: raw_runtime_epoch_time,
// 3
genesis_padding_block: raw_paddding_block,
}
})
} else {
let version_okay = okay_header_version & okay_server_version & okay_driver_version;
let md = ManuallyDrop::new([
StorageError::HeaderDecodeCorruptedHeader,
StorageError::HeaderDecodeVersionMismatch,
]);
Err(unsafe {
// UNSAFE(@ohsayan): while not needed, md for drop safety + correct index
md.as_ptr().add(!version_okay as usize).read().into()
})
}
}
}
impl SDSSStaticHeaderV1Compact {
const SEG1_MAGIC: Range<usize> = 0..8;
const SEG1_HEADER_VERSION: Range<usize> = 8..16;
const SEG2_REC1_SERVER_VERSION: Range<usize> = 16..24;
const SEG2_REC1_DRIVER_VERSION: Range<usize> = 24..32;
const SEG2_REC1_HOST_OS: usize = 32;
const SEG2_REC1_HOST_ARCH: usize = 33;
const SEG2_REC1_HOST_PTR_WIDTH: usize = 34;
const SEG2_REC1_HOST_ENDIAN: usize = 35;
const SEG2_REC1_FILE_CLASS: usize = 36;
const SEG2_REC1_FILE_SPECIFIER: usize = 37;
const SEG2_REC1_FILE_SPECIFIER_VERSION: Range<usize> = 38..40;
const SEG2_REC2_RUNTIME_EPOCH_TIME: Range<usize> = 40..56;
const SEG3_PADDING_BLK: Range<usize> = 56..64;
fn _encode(
file_class: FileScope,
file_specifier: FileSpecifier,
file_specifier_version: FileSpecifierVersion,
epoch_time: u128,
padding_block: [u8; 8],
) -> [u8; 64] {
let mut ret = [0; 64];
// 1. mgblk
ret[Self::SEG1_MAGIC].copy_from_slice(&SDSS_MAGIC.to_le_bytes());
ret[Self::SEG1_HEADER_VERSION]
.copy_from_slice(&versions::CURRENT_HEADER_VERSION.little_endian_u64());
// 2.1.1
ret[Self::SEG2_REC1_SERVER_VERSION]
.copy_from_slice(&versions::CURRENT_SERVER_VERSION.little_endian());
ret[Self::SEG2_REC1_DRIVER_VERSION]
.copy_from_slice(&versions::CURRENT_DRIVER_VERSION.little_endian());
// 2.1.2
ret[Self::SEG2_REC1_HOST_OS] = HostOS::new().value_u8();
ret[Self::SEG2_REC1_HOST_ARCH] = HostArch::new().value_u8();
ret[Self::SEG2_REC1_HOST_PTR_WIDTH] = HostPointerWidth::new().value_u8();
ret[Self::SEG2_REC1_HOST_ENDIAN] = HostEndian::new().value_u8();
// 2.1.3
ret[Self::SEG2_REC1_FILE_CLASS] = file_class.value_u8();
ret[Self::SEG2_REC1_FILE_SPECIFIER] = file_specifier.value_u8();
ret[Self::SEG2_REC1_FILE_SPECIFIER_VERSION]
.copy_from_slice(&file_specifier_version.0.to_le_bytes());
// 2.2
ret[Self::SEG2_REC2_RUNTIME_EPOCH_TIME].copy_from_slice(&epoch_time.to_le_bytes());
// 3
ret[Self::SEG3_PADDING_BLK].copy_from_slice(&padding_block);
ret
}
pub fn _encode_auto(
file_class: FileScope,
file_specifier: FileSpecifier,
file_specifier_version: FileSpecifierVersion,
) -> [u8; 64] {
let epoch_time = os::get_epoch_time();
Self::_encode(
file_class,
file_specifier,
file_specifier_version,
epoch_time,
[0; 8],
)
}
}
#[allow(unused)]
impl SDSSStaticHeaderV1Compact {
pub fn header_version(&self) -> HeaderVersion {
self.magic_header_version
}
pub fn server_version(&self) -> ServerVersion {
self.genesis_static_sw_server_version
}
pub fn driver_version(&self) -> DriverVersion {
self.genesis_static_sw_driver_version
}
pub fn host_os(&self) -> HostOS {
self.genesis_static_host_os
}
pub fn host_arch(&self) -> HostArch {
self.genesis_static_host_arch
}
pub fn host_ptr_width(&self) -> HostPointerWidth {
self.genesis_static_host_ptr_width
}
pub fn host_endian(&self) -> HostEndian {
self.genesis_static_host_endian
}
pub fn file_class(&self) -> FileScope {
self.genesis_static_file_class
}
pub fn file_specifier(&self) -> FileSpecifier {
self.genesis_static_file_specifier
}
pub fn file_specifier_version(&self) -> FileSpecifierVersion {
self.genesis_static_file_specifier_version
}
pub fn epoch_time(&self) -> u128 {
self.genesis_runtime_epoch_time
}
pub fn padding_block(&self) -> [u8; 8] {
self.genesis_padding_block
}
}
impl Header for SDSSStaticHeaderV1Compact {
type EncodeArgs = (FileScope, FileSpecifier, FileSpecifierVersion);
type DecodeArgs = ();
type DecodeVerifyArgs = Self::EncodeArgs;
fn encode<Fs: RawFSInterface>(
f: &mut SDSSFileIO<Fs>,
(scope, spec, spec_v): Self::EncodeArgs,
) -> RuntimeResult<()> {
let b = Self::_encode_auto(scope, spec, spec_v);
f.fsynced_write(&b)
}
fn decode<Fs: RawFSInterface>(
f: &mut SDSSFileIO<Fs>,
_: Self::DecodeArgs,
) -> RuntimeResult<Self> {
let mut buf = [0u8; 64];
f.read_to_buffer(&mut buf)?;
Self::_decode(buf)
}
fn verify(&self, (scope, spec, spec_v): Self::DecodeVerifyArgs) -> RuntimeResult<()> {
if (self.file_class() == scope)
& (self.file_specifier() == spec)
& (self.file_specifier_version() == spec_v)
{
Ok(())
} else {
Err(StorageError::HeaderDecodeDataMismatch.into())
}
}
pub(super) struct SysDBV1;
impl sdss::SimpleFileSpecV1 for SysDBV1 {
type HeaderSpec = HeaderImplV1;
const FILE_CLASS: FileScope = FileScope::FlatmapData;
const FILE_SPECIFIER: FileSpecifier = FileSpecifier::SysDB;
const FILE_SPECFIER_VERSION: FileSpecifierVersion = FileSpecifierVersion::__new(0);
}

@ -25,13 +25,15 @@
*/
use {
super::rw::FileOpen,
crate::engine::{
config::{ConfigAuth, ConfigMode},
data::{cell::Datacell, DictEntryGeneric, DictGeneric},
error::{RuntimeResult, StorageError},
fractal::sys_store::{SysAuth, SysAuthUser, SysConfig, SysHostData, SystemStore},
storage::v1::{inf, spec, RawFSInterface, SDSSFileIO},
storage::{
common::interface::fs_traits::{FSInterface, FileOpen},
v1::{inf, spec, SDSSFileIO},
},
},
parking_lot::RwLock,
std::collections::HashMap,
@ -68,7 +70,7 @@ fn rkey<T>(
}
}
impl<Fs: RawFSInterface> SystemStore<Fs> {
impl<Fs: FSInterface> SystemStore<Fs> {
const SYSDB_PATH: &'static str = "sys.db";
const SYSDB_COW_PATH: &'static str = "sys.db.cow";
const SYS_KEY_AUTH: &'static str = "auth";
@ -104,7 +106,7 @@ impl<Fs: RawFSInterface> SystemStore<Fs> {
}
}
impl<Fs: RawFSInterface> SystemStore<Fs> {
impl<Fs: FSInterface> SystemStore<Fs> {
fn _sync(&self, mut f: SDSSFileIO<Fs>, auth: &SysAuth) -> RuntimeResult<()> {
let cfg = self.system_store();
// prepare our flat file
@ -141,7 +143,7 @@ impl<Fs: RawFSInterface> SystemStore<Fs> {
fn _sync_with(&self, target: &str, cow: &str, auth: &SysAuth) -> RuntimeResult<()> {
let f = SDSSFileIO::create::<spec::SysDBV1>(cow)?;
self._sync(f, auth)?;
Fs::fs_rename_file(cow, target)
Fs::fs_rename(cow, target)
}
fn restore_and_sync(
f: SDSSFileIO<Fs>,
@ -180,7 +182,7 @@ impl<Fs: RawFSInterface> SystemStore<Fs> {
}
fn _restore(mut f: SDSSFileIO<Fs>, run_mode: ConfigMode) -> RuntimeResult<SysConfig> {
let mut sysdb_data =
inf::dec::dec_dict_full::<inf::map::GenericDictSpec>(&f.load_remaining_into_buffer()?)?;
inf::dec::dec_dict_full::<inf::map::GenericDictSpec>(&f.read_full()?)?;
// get our auth and sys stores
let mut auth_store = rkey(
&mut sysdb_data,

@ -24,7 +24,7 @@
*
*/
type VirtualFS = super::memfs::VirtualFS;
type VirtualFS = crate::engine::storage::common::interface::fs_test::VirtualFS;
mod batch;
mod rw;

@ -36,14 +36,16 @@ use {
},
data::{cell::Datacell, tag::TagSelector, uuid::Uuid},
idx::MTIndex,
storage::v1::{
batch_jrnl::{
DataBatchPersistDriver, DataBatchRestoreDriver, DecodedBatchEvent,
DecodedBatchEventKind, NormalBatch,
storage::{
common::interface::{fs_test::VirtualFS, fs_traits::FileOpen},
v1::{
batch_jrnl::{
DataBatchPersistDriver, DataBatchRestoreDriver, DecodedBatchEvent,
DecodedBatchEventKind, NormalBatch,
},
rw::SDSSFileIO,
spec,
},
memfs::VirtualFS,
rw::{FileOpen, SDSSFileIO},
spec,
},
},
util::test_utils,
@ -57,7 +59,7 @@ fn pkey(v: impl Into<Datacell>) -> PrimaryIndexKey {
fn open_file(
fpath: &str,
) -> FileOpen<SDSSFileIO<VirtualFS>, (SDSSFileIO<VirtualFS>, spec::SDSSStaticHeaderV1Compact)> {
) -> FileOpen<SDSSFileIO<VirtualFS>, (SDSSFileIO<VirtualFS>, super::super::Header)> {
SDSSFileIO::open_or_create_perm_rw::<spec::DataBatchJournalV1>(fpath).unwrap()
}

@ -24,9 +24,9 @@
*
*/
use crate::engine::storage::v1::{
rw::{FileOpen, SDSSFileIO},
spec,
use crate::engine::storage::{
common::interface::fs_traits::FileOpen,
v1::{rw::SDSSFileIO, spec},
};
#[test]

@ -0,0 +1,27 @@
/*
* Created on Sun Jan 07 2024
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2024, Sayan Nandan <nandansayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
mod spec;

@ -0,0 +1,92 @@
/*
* Created on Thu Jan 11 2024
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2024, Sayan Nandan <nandansayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use {
crate::engine::storage::common::{
sdss::{self, HeaderV1Spec},
versions::{self, DriverVersion, FileSpecifierVersion, ServerVersion},
},
std::mem::transmute,
};
/// The file scope
#[repr(u8)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, sky_macros::EnumMethods)]
pub enum FileClass {
EventLog = 0,
Batch = 1,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, sky_macros::EnumMethods)]
#[repr(u8)]
pub enum FileSpecifier {
GlobalNS = 0,
ModelData = 1,
}
impl sdss::HeaderV1Enumeration for FileClass {
const MAX: u8 = FileClass::MAX;
unsafe fn new(x: u8) -> Self {
transmute(x)
}
fn repr_u8(&self) -> u8 {
self.value_u8()
}
}
impl sdss::HeaderV1Enumeration for FileSpecifier {
const MAX: u8 = FileSpecifier::MAX;
unsafe fn new(x: u8) -> Self {
transmute(x)
}
fn repr_u8(&self) -> u8 {
self.value_u8()
}
}
pub struct HeaderImplV2;
impl HeaderV1Spec for HeaderImplV2 {
type FileClass = FileClass;
type FileSpecifier = FileSpecifier;
const CURRENT_SERVER_VERSION: ServerVersion = versions::v2::V2_SERVER_VERSION;
const CURRENT_DRIVER_VERSION: DriverVersion = versions::v2::V2_DRIVER_VERSION;
}
pub struct SystemDatabaseV1;
impl sdss::SimpleFileSpecV1 for SystemDatabaseV1 {
type HeaderSpec = HeaderImplV2;
const FILE_CLASS: FileClass = FileClass::EventLog;
const FILE_SPECIFIER: FileSpecifier = FileSpecifier::GlobalNS;
const FILE_SPECFIER_VERSION: FileSpecifierVersion = FileSpecifierVersion::__new(0);
}
pub struct ModelDataBatchAofV1;
impl sdss::SimpleFileSpecV1 for ModelDataBatchAofV1 {
type HeaderSpec = HeaderImplV2;
const FILE_CLASS: FileClass = FileClass::Batch;
const FILE_SPECIFIER: FileSpecifier = FileSpecifier::ModelData;
const FILE_SPECFIER_VERSION: FileSpecifierVersion = FileSpecifierVersion::__new(0);
}

@ -31,9 +31,12 @@ use {
data::uuid::Uuid,
error::{RuntimeResult, TransactionError},
mem::BufferedScanner,
storage::v1::{
inf::{self, PersistObject},
JournalAdapter, JournalWriter, LocalFS, RawFSInterface,
storage::{
safe_interfaces::{FSInterface, LocalFS},
v1::{
inf::{self, PersistObject},
JournalAdapter, JournalWriter,
},
},
},
util::EndianQW,
@ -57,11 +60,11 @@ pub use {
};
/// The GNS transaction driver is used to handle DDL transactions
pub struct GNSTransactionDriverAnyFS<Fs: RawFSInterface = LocalFS> {
pub struct GNSTransactionDriverAnyFS<Fs: FSInterface = LocalFS> {
journal: JournalWriter<Fs, GNSAdapter>,
}
impl<Fs: RawFSInterface> GNSTransactionDriverAnyFS<Fs> {
impl<Fs: FSInterface> GNSTransactionDriverAnyFS<Fs> {
pub fn new(journal: JournalWriter<Fs, GNSAdapter>) -> Self {
Self { journal }
}

Loading…
Cancel
Save