From 6df09c194c78e31e2fce151f69c062346dceaac1 Mon Sep 17 00:00:00 2001 From: Sayan Nandan Date: Sun, 24 Sep 2023 05:33:13 +0000 Subject: [PATCH] Initialize storage drivers on init Also fixed an issue with NullFS where the null flag was not set. --- server/src/engine/fractal/drivers.rs | 22 +-- server/src/engine/fractal/mgr.rs | 4 +- server/src/engine/fractal/mod.rs | 46 ++++-- server/src/engine/fractal/test_utils.rs | 35 +++- server/src/engine/idx/stord/mod.rs | 13 +- .../src/engine/storage/v1/batch_jrnl/mod.rs | 42 ++--- server/src/engine/storage/v1/loader.rs | 12 +- server/src/engine/storage/v1/memfs.rs | 47 +++++- server/src/engine/storage/v1/mod.rs | 2 +- server/src/engine/storage/v1/rw.rs | 155 +++++++++++++++--- 10 files changed, 289 insertions(+), 89 deletions(-) diff --git a/server/src/engine/fractal/drivers.rs b/server/src/engine/fractal/drivers.rs index c003c8c2..87ac096a 100644 --- a/server/src/engine/fractal/drivers.rs +++ b/server/src/engine/fractal/drivers.rs @@ -27,7 +27,7 @@ use { super::util, crate::engine::{ - storage::v1::{data_batch::DataBatchPersistDriver, LocalFS}, + storage::v1::{data_batch::DataBatchPersistDriver, RawFSInterface}, txn::gns::GNSTransactionDriverAnyFS, }, parking_lot::Mutex, @@ -35,39 +35,39 @@ use { }; /// GNS driver -pub(super) struct FractalGNSDriver { +pub(super) struct FractalGNSDriver { status: util::Status, - txn_driver: Mutex>, + txn_driver: Mutex>, } -impl FractalGNSDriver { - pub(super) fn new(txn_driver: GNSTransactionDriverAnyFS) -> Self { +impl FractalGNSDriver { + pub(super) fn new(txn_driver: GNSTransactionDriverAnyFS) -> Self { Self { status: util::Status::new_okay(), txn_driver: Mutex::new(txn_driver), } } - pub fn txn_driver(&self) -> &Mutex> { + pub fn txn_driver(&self) -> &Mutex> { &self.txn_driver } } /// Model driver -pub struct FractalModelDriver { +pub struct FractalModelDriver { hooks: Arc, - batch_driver: Mutex>, + batch_driver: Mutex>, } -impl FractalModelDriver { +impl FractalModelDriver { /// Initialize a model driver with default settings - pub fn init(batch_driver: DataBatchPersistDriver) -> Self { + pub fn init(batch_driver: DataBatchPersistDriver) -> Self { Self { hooks: Arc::new(FractalModelHooks::new()), batch_driver: Mutex::new(batch_driver), } } /// Returns a reference to the batch persist driver - pub fn batch_driver(&self) -> &Mutex> { + pub fn batch_driver(&self) -> &Mutex> { &self.batch_driver } } diff --git a/server/src/engine/fractal/mgr.rs b/server/src/engine/fractal/mgr.rs index 38cd532f..72f46d29 100644 --- a/server/src/engine/fractal/mgr.rs +++ b/server/src/engine/fractal/mgr.rs @@ -24,6 +24,8 @@ * */ +use crate::engine::storage::v1::LocalFS; + use { super::ModelUniqueID, crate::{ @@ -316,7 +318,7 @@ impl FractalMgr { fn try_write_model_data_batch( model: &Model, observed_size: usize, - mdl_driver: &super::FractalModelDriver, + mdl_driver: &super::FractalModelDriver, ) -> crate::engine::error::QueryResult<()> { if observed_size == 0 { // no changes, all good diff --git a/server/src/engine/fractal/mod.rs b/server/src/engine/fractal/mod.rs index 090f88de..c01e3fcb 100644 --- a/server/src/engine/fractal/mod.rs +++ b/server/src/engine/fractal/mod.rs @@ -28,7 +28,10 @@ use { super::{ core::GlobalNS, data::uuid::Uuid, - storage::v1::{LocalFS, RawFSInterface, SDSSResult}, + storage::{ + self, + v1::{LocalFS, RawFSInterface, SDSSResult}, + }, txn::gns::GNSTransactionDriverAnyFS, }, parking_lot::{Mutex, RwLock}, @@ -49,7 +52,7 @@ pub use { util::FractalToken, }; -pub type ModelDrivers = HashMap; +pub type ModelDrivers = HashMap>; static mut GLOBAL: MaybeUninit = MaybeUninit::uninit(); @@ -73,7 +76,7 @@ pub unsafe fn enable_and_start_all( gns: GlobalNS, config: config::SysConfig, gns_driver: GNSTransactionDriverAnyFS, - model_drivers: ModelDrivers, + model_drivers: ModelDrivers, ) -> GlobalStateStart { let model_cnt_on_boot = model_drivers.len(); let gns_driver = drivers::FractalGNSDriver::new(gns_driver); @@ -163,12 +166,29 @@ impl GlobalInstanceLike for Global { // model fn initialize_model_driver( &self, - _space_name: &str, - _space_uuid: Uuid, - _model_name: &str, - _model_uuid: Uuid, + space_name: &str, + space_uuid: Uuid, + model_name: &str, + model_uuid: Uuid, ) -> SDSSResult<()> { - todo!() + // create dir + LocalFS::fs_create_dir(&storage::v1::loader::SEInitState::model_dir( + space_name, space_uuid, model_name, model_uuid, + ))?; + // init driver + let driver = storage::v1::data_batch::create( + &storage::v1::loader::SEInitState::model_path( + space_name, space_uuid, model_name, model_uuid, + ), + self.sys_cfg().host_data().settings_version(), + self.sys_cfg().host_data().run_mode(), + self.sys_cfg().host_data().startup_counter(), + )?; + self.get_state().mdl_driver.write().insert( + ModelUniqueID::new(space_name, model_name, model_uuid), + drivers::FractalModelDriver::init(driver), + ); + Ok(()) } } @@ -221,8 +241,8 @@ impl Global { /// The global state struct GlobalState { gns: GlobalNS, - gns_driver: drivers::FractalGNSDriver, - mdl_driver: RwLock, + gns_driver: drivers::FractalGNSDriver, + mdl_driver: RwLock>, task_mgr: mgr::FractalMgr, config: config::SysConfig, } @@ -230,8 +250,8 @@ struct GlobalState { impl GlobalState { fn new( gns: GlobalNS, - gns_driver: drivers::FractalGNSDriver, - mdl_driver: RwLock, + gns_driver: drivers::FractalGNSDriver, + mdl_driver: RwLock>, task_mgr: mgr::FractalMgr, config: config::SysConfig, ) -> Self { @@ -243,7 +263,7 @@ impl GlobalState { config, } } - pub(self) fn get_mdl_drivers(&self) -> &RwLock { + pub(self) fn get_mdl_drivers(&self) -> &RwLock> { &self.mdl_driver } pub(self) fn fractal_mgr(&self) -> &mgr::FractalMgr { diff --git a/server/src/engine/fractal/test_utils.rs b/server/src/engine/fractal/test_utils.rs index 75043e23..69e52c42 100644 --- a/server/src/engine/fractal/test_utils.rs +++ b/server/src/engine/fractal/test_utils.rs @@ -25,9 +25,13 @@ */ use { - super::{CriticalTask, GenericTask, GlobalInstanceLike, SysConfig, Task}, + super::{ + CriticalTask, FractalModelDriver, GenericTask, GlobalInstanceLike, ModelUniqueID, + SysConfig, Task, + }, crate::engine::{ core::GlobalNS, + data::uuid::Uuid, storage::v1::{ header_meta::HostRunMode, memfs::{NullFS, VirtualFS}, @@ -36,6 +40,7 @@ use { txn::gns::GNSTransactionDriverAnyFS, }, parking_lot::{Mutex, RwLock}, + std::collections::HashMap, }; /// A `test` mode global implementation @@ -45,6 +50,7 @@ pub struct TestGlobal { lp_queue: RwLock>>, max_delta_size: usize, txn_driver: Mutex>, + model_drivers: RwLock>>, sys_cfg: super::SysConfig, } @@ -60,6 +66,7 @@ impl TestGlobal { lp_queue: RwLock::default(), max_delta_size, txn_driver: Mutex::new(txn_driver), + model_drivers: RwLock::default(), sys_cfg: SysConfig::test_default(), } } @@ -117,12 +124,28 @@ impl GlobalInstanceLike for TestGlobal { } fn initialize_model_driver( &self, - _space_name: &str, - _space_uuid: crate::engine::data::uuid::Uuid, - _model_name: &str, - _model_uuid: crate::engine::data::uuid::Uuid, + space_name: &str, + space_uuid: Uuid, + model_name: &str, + model_uuid: Uuid, ) -> crate::engine::storage::v1::SDSSResult<()> { - todo!() + // create model dir + Fs::fs_create_dir(&crate::engine::storage::v1::loader::SEInitState::model_dir( + space_name, space_uuid, model_name, model_uuid, + ))?; + let driver = crate::engine::storage::v1::data_batch::create( + &crate::engine::storage::v1::loader::SEInitState::model_path( + space_name, space_uuid, model_name, model_uuid, + ), + self.sys_cfg().host_data().settings_version(), + self.sys_cfg().host_data().run_mode(), + self.sys_cfg().host_data().startup_counter(), + )?; + self.model_drivers.write().insert( + ModelUniqueID::new(space_name, model_name, model_uuid), + FractalModelDriver::init(driver), + ); + Ok(()) } } diff --git a/server/src/engine/idx/stord/mod.rs b/server/src/engine/idx/stord/mod.rs index d053caf5..22400507 100644 --- a/server/src/engine/idx/stord/mod.rs +++ b/server/src/engine/idx/stord/mod.rs @@ -399,6 +399,15 @@ impl> IndexSTSeqDll { &(e.as_ref()).read_value().v }) } + fn _get_mut(&mut self, k: &Q) -> Option<&mut V> + where + K: Borrow, + Q: AsKey, + { + self.m + .get_mut(unsafe { IndexSTSeqDllQref::from_ref(k) }) + .map(|e| unsafe { &mut e.as_mut().v }) + } #[inline(always)] fn _update(&mut self, k: &Q, v: V) -> Option where @@ -629,12 +638,12 @@ where self._get(key).cloned() } - fn st_get_mut(&mut self, _: &Q) -> Option<&mut V> + fn st_get_mut(&mut self, k: &Q) -> Option<&mut V> where K: AsKey + Borrow, Q: ?Sized + AsKey, { - todo!() + self._get_mut(k) } fn st_update(&mut self, key: &Q, val: V) -> bool diff --git a/server/src/engine/storage/v1/batch_jrnl/mod.rs b/server/src/engine/storage/v1/batch_jrnl/mod.rs index 0632e27e..b5a3de3c 100644 --- a/server/src/engine/storage/v1/batch_jrnl/mod.rs +++ b/server/src/engine/storage/v1/batch_jrnl/mod.rs @@ -45,26 +45,39 @@ pub(super) use restore::{DecodedBatchEvent, DecodedBatchEventKind, NormalBatch}; pub use {persist::DataBatchPersistDriver, restore::DataBatchRestoreDriver}; use { - super::{ - header_meta, - rw::{FileOpen, SDSSFileIO}, - RawFSInterface, SDSSResult, - }, + super::{header_meta, rw::SDSSFileIO, RawFSInterface, SDSSResult}, crate::engine::core::model::Model, }; const LOG_SPECIFIER_VERSION: header_meta::FileSpecifierVersion = header_meta::FileSpecifierVersion::__new(0); -pub fn open_or_reinit( +/// Re-initialize an existing batch journal and read all its data into model +pub fn reinit( name: &str, model: &Model, +) -> SDSSResult> { + let (_header, f) = SDSSFileIO::::open::( + name, + header_meta::FileScope::Journal, + header_meta::FileSpecifier::TableDataBatch, + LOG_SPECIFIER_VERSION, + )?; + // restore + let mut restore_driver = DataBatchRestoreDriver::new(f)?; + restore_driver.read_data_batch_into_model(model)?; + DataBatchPersistDriver::new(restore_driver.into_file(), false) +} + +/// Create a new batch journal +pub fn create( + path: &str, host_setting_version: u32, host_run_mode: header_meta::HostRunMode, host_startup_counter: u64, ) -> SDSSResult> { - let f = SDSSFileIO::::open_or_create_perm_rw::( - name, + let f = SDSSFileIO::::create( + path, header_meta::FileScope::Journal, header_meta::FileSpecifier::TableDataBatch, LOG_SPECIFIER_VERSION, @@ -72,16 +85,5 @@ pub fn open_or_reinit( host_run_mode, host_startup_counter, )?; - match f { - FileOpen::Created(new_file) => Ok(DataBatchPersistDriver::new(new_file, true)?), - FileOpen::Existing(existing, _) => { - // restore - let mut restore_driver = DataBatchRestoreDriver::new(existing)?; - restore_driver.read_data_batch_into_model(model)?; - Ok(DataBatchPersistDriver::new( - restore_driver.into_file(), - false, - )?) - } - } + DataBatchPersistDriver::new(f, true) } diff --git a/server/src/engine/storage/v1/loader.rs b/server/src/engine/storage/v1/loader.rs index 0a21b218..1fad9678 100644 --- a/server/src/engine/storage/v1/loader.rs +++ b/server/src/engine/storage/v1/loader.rs @@ -36,14 +36,14 @@ const GNS_FILE_PATH: &str = "gns.db-tlog"; pub struct SEInitState { pub txn_driver: GNSTransactionDriverAnyFS, - pub model_drivers: ModelDrivers, + pub model_drivers: ModelDrivers, pub gns: GlobalNS, } impl SEInitState { pub fn new( txn_driver: GNSTransactionDriverAnyFS, - model_drivers: ModelDrivers, + model_drivers: ModelDrivers, gns: GlobalNS, ) -> Self { Self { @@ -70,13 +70,7 @@ impl SEInitState { let space_uuid = space.get_uuid(); for (model_name, model) in space.models().read().iter() { let path = Self::model_path(space_name, space_uuid, model_name, model.get_uuid()); - let persist_driver = match batch_jrnl::open_or_reinit( - &path, - model, - host_setting_version, - host_run_mode, - host_startup_counter, - ) { + let persist_driver = match batch_jrnl::reinit(&path, model) { Ok(j) => j, Err(e) => { return Err(e.with_extra(format!( diff --git a/server/src/engine/storage/v1/memfs.rs b/server/src/engine/storage/v1/memfs.rs index 15a0a2a6..adce7539 100644 --- a/server/src/engine/storage/v1/memfs.rs +++ b/server/src/engine/storage/v1/memfs.rs @@ -57,7 +57,7 @@ type ComponentIter<'a> = std::iter::Take>; */ #[derive(Debug)] -enum VNode { +pub(super) enum VNode { Dir(HashMap, Self>), File(VFile), } @@ -223,6 +223,44 @@ impl RawFSInterface for VirtualFS { } } } + fn fs_fcreate_rw(fpath: &str) -> SDSSResult { + let mut vfs = VFS.write(); + let (target_file, components) = split_target_and_components(fpath); + let target_dir = find_target_dir_mut(components, &mut vfs)?; + match target_dir.entry(target_file.into()) { + Entry::Occupied(k) => { + match k.get() { + VNode::Dir(_) => { + return Err(Error::new( + ErrorKind::AlreadyExists, + "found directory with same name where file was to be created", + ) + .into()); + } + VNode::File(_) => { + // the file already exists + return Err(Error::new( + ErrorKind::AlreadyExists, + "the file already exists", + ) + .into()); + } + } + } + Entry::Vacant(v) => { + // no file exists, we can create this + v.insert(VNode::File(VFile::new(true, true, vec![], 0))); + Ok(VFileDescriptor(fpath.into())) + } + } + } + fn fs_fopen_rw(fpath: &str) -> SDSSResult { + with_file_mut(fpath, |f| { + f.read = true; + f.write = true; + Ok(VFileDescriptor(fpath.into())) + }) + } } fn find_target_dir_mut<'a>( @@ -477,6 +515,7 @@ impl RawFileInterfaceExt for VFileDescriptor { pub struct NullFS; pub struct NullFile; impl RawFSInterface for NullFS { + const NOT_NULL: bool = false; type File = NullFile; fn fs_rename_file(_: &str, _: &str) -> SDSSResult<()> { Ok(()) @@ -499,6 +538,12 @@ impl RawFSInterface for NullFS { fn fs_fopen_or_create_rw(_: &str) -> SDSSResult> { Ok(RawFileOpen::Created(NullFile)) } + fn fs_fopen_rw(_: &str) -> SDSSResult { + Ok(NullFile) + } + fn fs_fcreate_rw(_: &str) -> SDSSResult { + Ok(NullFile) + } } impl RawFileInterfaceRead for NullFile { fn fr_read_exact(&mut self, _: &mut [u8]) -> SDSSResult<()> { diff --git a/server/src/engine/storage/v1/mod.rs b/server/src/engine/storage/v1/mod.rs index c84f3f21..0dbb413c 100644 --- a/server/src/engine/storage/v1/mod.rs +++ b/server/src/engine/storage/v1/mod.rs @@ -47,7 +47,7 @@ pub use { rw::{LocalFS, RawFSInterface, SDSSFileIO}, }; pub mod data_batch { - pub use super::batch_jrnl::{DataBatchPersistDriver, DataBatchRestoreDriver}; + pub use super::batch_jrnl::{create, reinit, DataBatchPersistDriver, DataBatchRestoreDriver}; } pub mod header_meta { pub use super::header_impl::{FileScope, FileSpecifier, FileSpecifierVersion, HostRunMode}; diff --git a/server/src/engine/storage/v1/rw.rs b/server/src/engine/storage/v1/rw.rs index 8174f4f4..b040f142 100644 --- a/server/src/engine/storage/v1/rw.rs +++ b/server/src/engine/storage/v1/rw.rs @@ -71,16 +71,34 @@ pub enum RawFileOpen { Existing(F), } +/// The specification for a file system interface (our own abstraction over the fs) pub trait RawFSInterface { + /// asserts that the file system is not a null filesystem (like `/dev/null` for example) const NOT_NULL: bool = true; + /// the file descriptor that is returned by the file system when a file is opened type File: RawFileInterface; + /// Remove a file fn fs_remove_file(fpath: &str) -> SDSSResult<()>; + /// Rename a file fn fs_rename_file(from: &str, to: &str) -> SDSSResult<()>; + /// Create a directory fn fs_create_dir(fpath: &str) -> SDSSResult<()>; + /// Create a directory and all corresponding path components fn fs_create_dir_all(fpath: &str) -> SDSSResult<()>; + /// Delete a directory fn fs_delete_dir(fpath: &str) -> SDSSResult<()>; + /// Delete a directory and recursively remove all (if any) children fn fs_delete_dir_all(fpath: &str) -> SDSSResult<()>; + /// Open or create a file in R/W mode + /// + /// This will: + /// - Create a file if it doesn't exist + /// - Open a file it it does exist fn fs_fopen_or_create_rw(fpath: &str) -> SDSSResult>; + /// Open an existing file + fn fs_fopen_rw(fpath: &str) -> SDSSResult; + /// Create a new file + fn fs_fcreate_rw(fpath: &str) -> SDSSResult; } /// A file (well, probably) that can be used for RW operations along with advanced write and extended operations (such as seeking) @@ -174,6 +192,18 @@ impl RawFSInterface for LocalFS { Ok(RawFileOpen::Existing(f)) } } + fn fs_fcreate_rw(fpath: &str) -> SDSSResult { + let f = File::options() + .create_new(true) + .read(true) + .write(true) + .open(fpath)?; + Ok(f) + } + fn fs_fopen_rw(fpath: &str) -> SDSSResult { + let f = File::options().read(true).write(true).open(fpath)?; + Ok(f) + } } impl RawFileInterface for File { @@ -357,6 +387,95 @@ pub struct SDSSFileIO { } impl SDSSFileIO { + /// Open an existing SDSS file + /// + /// **IMPORTANT: File position: end-of-header-section** + pub fn open( + file_path: &str, + file_scope: FileScope, + file_specifier: FileSpecifier, + file_specifier_version: FileSpecifierVersion, + ) -> SDSSResult<(SDSSHeader, Self)> { + let f = Fs::fs_fopen_rw(file_path)?; + Self::_sdss_fopen::( + f, + file_scope, + file_specifier, + file_specifier_version, + ) + } + /// internal SDSS fopen routine + fn _sdss_fopen( + mut f: ::File, + file_scope: FileScope, + file_specifier: FileSpecifier, + file_specifier_version: FileSpecifierVersion, + ) -> Result<(SDSSHeader, SDSSFileIO), SDSSError> { + let mut header_raw = [0u8; SDSSHeaderRaw::header_size()]; + f.fr_read_exact(&mut header_raw)?; + let header = SDSSHeaderRaw::decode_noverify(header_raw) + .ok_or(SDSSError::HeaderDecodeCorruptedHeader)?; + header.verify(file_scope, file_specifier, file_specifier_version)?; + let mut f = Self::_new(f); + if REWRITE_MODIFY_COUNTER { + // since we updated this file, let us update the header + let mut new_header = header.clone(); + new_header.dr_rs_mut().bump_modify_count(); + f.seek_from_start(0)?; + f.fsynced_write(new_header.encoded().array().as_ref())?; + f.seek_from_start(SDSSHeaderRaw::header_size() as _)?; + } + Ok((header, f)) + } + /// Create a new SDSS file + /// + /// **IMPORTANT: File position: end-of-header-section** + pub fn create( + file_path: &str, + file_scope: FileScope, + file_specifier: FileSpecifier, + file_specifier_version: FileSpecifierVersion, + host_setting_version: u32, + host_run_mode: HostRunMode, + host_startup_counter: u64, + ) -> SDSSResult { + let f = Fs::fs_fcreate_rw(file_path)?; + Self::_sdss_fcreate( + file_scope, + file_specifier, + file_specifier_version, + host_setting_version, + host_run_mode, + host_startup_counter, + f, + ) + } + /// Internal SDSS fcreate routine + fn _sdss_fcreate( + file_scope: FileScope, + file_specifier: FileSpecifier, + file_specifier_version: FileSpecifierVersion, + host_setting_version: u32, + host_run_mode: HostRunMode, + host_startup_counter: u64, + f: ::File, + ) -> Result, SDSSError> { + let data = SDSSHeaderRaw::new_auto( + file_scope, + file_specifier, + file_specifier_version, + host_setting_version, + host_run_mode, + host_startup_counter, + 0, + ) + .array(); + let mut f = Self::_new(f); + f.fsynced_write(&data)?; + Ok(f) + } + /// Create a new SDSS file or re-open an existing file and verify + /// /// **IMPORTANT: File position: end-of-header-section** pub fn open_or_create_perm_rw( file_path: &str, @@ -370,39 +489,25 @@ impl SDSSFileIO { let f = Fs::fs_fopen_or_create_rw(file_path)?; match f { RawFileOpen::Created(f) => { - // since this file was just created, we need to append the header - let data = SDSSHeaderRaw::new_auto( + let f = Self::_sdss_fcreate( file_scope, file_specifier, file_specifier_version, host_setting_version, host_run_mode, host_startup_counter, - 0, - ) - .array(); - let mut f = Self::_new(f); - f.fsynced_write(&data)?; + f, + )?; Ok(FileOpen::Created(f)) } - RawFileOpen::Existing(mut f) => { - // this is an existing file. decoded the header - let mut header_raw = [0u8; SDSSHeaderRaw::header_size()]; - f.fr_read_exact(&mut header_raw)?; - let header = SDSSHeaderRaw::decode_noverify(header_raw) - .ok_or(SDSSError::HeaderDecodeCorruptedHeader)?; - // now validate the header - header.verify(file_scope, file_specifier, file_specifier_version)?; - let mut f = Self::_new(f); - if REWRITE_MODIFY_COUNTER { - // since we updated this file, let us update the header - let mut new_header = header.clone(); - new_header.dr_rs_mut().bump_modify_count(); - f.seek_from_start(0)?; - f.fsynced_write(new_header.encoded().array().as_ref())?; - f.seek_from_start(SDSSHeaderRaw::header_size() as _)?; - } - Ok(FileOpen::Existing(f, header)) + RawFileOpen::Existing(f) => { + let (f, header) = Self::_sdss_fopen::( + f, + file_scope, + file_specifier, + file_specifier_version, + )?; + Ok(FileOpen::Existing(header, f)) } } }