Initialize storage drivers on init

Also fixed an issue with NullFS where the null
flag was not set.
next
Sayan Nandan 1 year ago
parent fac2802849
commit 6df09c194c
No known key found for this signature in database
GPG Key ID: 42EEDF4AE9D96B54

@ -27,7 +27,7 @@
use {
super::util,
crate::engine::{
storage::v1::{data_batch::DataBatchPersistDriver, LocalFS},
storage::v1::{data_batch::DataBatchPersistDriver, RawFSInterface},
txn::gns::GNSTransactionDriverAnyFS,
},
parking_lot::Mutex,
@ -35,39 +35,39 @@ use {
};
/// GNS driver
pub(super) struct FractalGNSDriver {
pub(super) struct FractalGNSDriver<Fs: RawFSInterface> {
status: util::Status,
txn_driver: Mutex<GNSTransactionDriverAnyFS<LocalFS>>,
txn_driver: Mutex<GNSTransactionDriverAnyFS<Fs>>,
}
impl FractalGNSDriver {
pub(super) fn new(txn_driver: GNSTransactionDriverAnyFS<LocalFS>) -> Self {
impl<Fs: RawFSInterface> FractalGNSDriver<Fs> {
pub(super) fn new(txn_driver: GNSTransactionDriverAnyFS<Fs>) -> Self {
Self {
status: util::Status::new_okay(),
txn_driver: Mutex::new(txn_driver),
}
}
pub fn txn_driver(&self) -> &Mutex<GNSTransactionDriverAnyFS<LocalFS>> {
pub fn txn_driver(&self) -> &Mutex<GNSTransactionDriverAnyFS<Fs>> {
&self.txn_driver
}
}
/// Model driver
pub struct FractalModelDriver {
pub struct FractalModelDriver<Fs: RawFSInterface> {
hooks: Arc<FractalModelHooks>,
batch_driver: Mutex<DataBatchPersistDriver<LocalFS>>,
batch_driver: Mutex<DataBatchPersistDriver<Fs>>,
}
impl FractalModelDriver {
impl<Fs: RawFSInterface> FractalModelDriver<Fs> {
/// Initialize a model driver with default settings
pub fn init(batch_driver: DataBatchPersistDriver<LocalFS>) -> Self {
pub fn init(batch_driver: DataBatchPersistDriver<Fs>) -> Self {
Self {
hooks: Arc::new(FractalModelHooks::new()),
batch_driver: Mutex::new(batch_driver),
}
}
/// Returns a reference to the batch persist driver
pub fn batch_driver(&self) -> &Mutex<DataBatchPersistDriver<LocalFS>> {
pub fn batch_driver(&self) -> &Mutex<DataBatchPersistDriver<Fs>> {
&self.batch_driver
}
}

@ -24,6 +24,8 @@
*
*/
use crate::engine::storage::v1::LocalFS;
use {
super::ModelUniqueID,
crate::{
@ -316,7 +318,7 @@ impl FractalMgr {
fn try_write_model_data_batch(
model: &Model,
observed_size: usize,
mdl_driver: &super::FractalModelDriver,
mdl_driver: &super::FractalModelDriver<LocalFS>,
) -> crate::engine::error::QueryResult<()> {
if observed_size == 0 {
// no changes, all good

@ -28,7 +28,10 @@ use {
super::{
core::GlobalNS,
data::uuid::Uuid,
storage::v1::{LocalFS, RawFSInterface, SDSSResult},
storage::{
self,
v1::{LocalFS, RawFSInterface, SDSSResult},
},
txn::gns::GNSTransactionDriverAnyFS,
},
parking_lot::{Mutex, RwLock},
@ -49,7 +52,7 @@ pub use {
util::FractalToken,
};
pub type ModelDrivers = HashMap<ModelUniqueID, drivers::FractalModelDriver>;
pub type ModelDrivers<Fs> = HashMap<ModelUniqueID, drivers::FractalModelDriver<Fs>>;
static mut GLOBAL: MaybeUninit<GlobalState> = MaybeUninit::uninit();
@ -73,7 +76,7 @@ pub unsafe fn enable_and_start_all(
gns: GlobalNS,
config: config::SysConfig,
gns_driver: GNSTransactionDriverAnyFS<LocalFS>,
model_drivers: ModelDrivers,
model_drivers: ModelDrivers<LocalFS>,
) -> GlobalStateStart {
let model_cnt_on_boot = model_drivers.len();
let gns_driver = drivers::FractalGNSDriver::new(gns_driver);
@ -163,12 +166,29 @@ impl GlobalInstanceLike for Global {
// model
fn initialize_model_driver(
&self,
_space_name: &str,
_space_uuid: Uuid,
_model_name: &str,
_model_uuid: Uuid,
space_name: &str,
space_uuid: Uuid,
model_name: &str,
model_uuid: Uuid,
) -> SDSSResult<()> {
todo!()
// create dir
LocalFS::fs_create_dir(&storage::v1::loader::SEInitState::model_dir(
space_name, space_uuid, model_name, model_uuid,
))?;
// init driver
let driver = storage::v1::data_batch::create(
&storage::v1::loader::SEInitState::model_path(
space_name, space_uuid, model_name, model_uuid,
),
self.sys_cfg().host_data().settings_version(),
self.sys_cfg().host_data().run_mode(),
self.sys_cfg().host_data().startup_counter(),
)?;
self.get_state().mdl_driver.write().insert(
ModelUniqueID::new(space_name, model_name, model_uuid),
drivers::FractalModelDriver::init(driver),
);
Ok(())
}
}
@ -221,8 +241,8 @@ impl Global {
/// The global state
struct GlobalState {
gns: GlobalNS,
gns_driver: drivers::FractalGNSDriver,
mdl_driver: RwLock<ModelDrivers>,
gns_driver: drivers::FractalGNSDriver<LocalFS>,
mdl_driver: RwLock<ModelDrivers<LocalFS>>,
task_mgr: mgr::FractalMgr,
config: config::SysConfig,
}
@ -230,8 +250,8 @@ struct GlobalState {
impl GlobalState {
fn new(
gns: GlobalNS,
gns_driver: drivers::FractalGNSDriver,
mdl_driver: RwLock<ModelDrivers>,
gns_driver: drivers::FractalGNSDriver<LocalFS>,
mdl_driver: RwLock<ModelDrivers<LocalFS>>,
task_mgr: mgr::FractalMgr,
config: config::SysConfig,
) -> Self {
@ -243,7 +263,7 @@ impl GlobalState {
config,
}
}
pub(self) fn get_mdl_drivers(&self) -> &RwLock<ModelDrivers> {
pub(self) fn get_mdl_drivers(&self) -> &RwLock<ModelDrivers<LocalFS>> {
&self.mdl_driver
}
pub(self) fn fractal_mgr(&self) -> &mgr::FractalMgr {

@ -25,9 +25,13 @@
*/
use {
super::{CriticalTask, GenericTask, GlobalInstanceLike, SysConfig, Task},
super::{
CriticalTask, FractalModelDriver, GenericTask, GlobalInstanceLike, ModelUniqueID,
SysConfig, Task,
},
crate::engine::{
core::GlobalNS,
data::uuid::Uuid,
storage::v1::{
header_meta::HostRunMode,
memfs::{NullFS, VirtualFS},
@ -36,6 +40,7 @@ use {
txn::gns::GNSTransactionDriverAnyFS,
},
parking_lot::{Mutex, RwLock},
std::collections::HashMap,
};
/// A `test` mode global implementation
@ -45,6 +50,7 @@ pub struct TestGlobal<Fs: RawFSInterface = VirtualFS> {
lp_queue: RwLock<Vec<Task<GenericTask>>>,
max_delta_size: usize,
txn_driver: Mutex<GNSTransactionDriverAnyFS<Fs>>,
model_drivers: RwLock<HashMap<ModelUniqueID, FractalModelDriver<Fs>>>,
sys_cfg: super::SysConfig,
}
@ -60,6 +66,7 @@ impl<Fs: RawFSInterface> TestGlobal<Fs> {
lp_queue: RwLock::default(),
max_delta_size,
txn_driver: Mutex::new(txn_driver),
model_drivers: RwLock::default(),
sys_cfg: SysConfig::test_default(),
}
}
@ -117,12 +124,28 @@ impl<Fs: RawFSInterface> GlobalInstanceLike for TestGlobal<Fs> {
}
fn initialize_model_driver(
&self,
_space_name: &str,
_space_uuid: crate::engine::data::uuid::Uuid,
_model_name: &str,
_model_uuid: crate::engine::data::uuid::Uuid,
space_name: &str,
space_uuid: Uuid,
model_name: &str,
model_uuid: Uuid,
) -> crate::engine::storage::v1::SDSSResult<()> {
todo!()
// create model dir
Fs::fs_create_dir(&crate::engine::storage::v1::loader::SEInitState::model_dir(
space_name, space_uuid, model_name, model_uuid,
))?;
let driver = crate::engine::storage::v1::data_batch::create(
&crate::engine::storage::v1::loader::SEInitState::model_path(
space_name, space_uuid, model_name, model_uuid,
),
self.sys_cfg().host_data().settings_version(),
self.sys_cfg().host_data().run_mode(),
self.sys_cfg().host_data().startup_counter(),
)?;
self.model_drivers.write().insert(
ModelUniqueID::new(space_name, model_name, model_uuid),
FractalModelDriver::init(driver),
);
Ok(())
}
}

@ -399,6 +399,15 @@ impl<K: AsKey, V: AsValue, C: Config<K, V>> IndexSTSeqDll<K, V, C> {
&(e.as_ref()).read_value().v
})
}
fn _get_mut<Q: ?Sized>(&mut self, k: &Q) -> Option<&mut V>
where
K: Borrow<Q>,
Q: AsKey,
{
self.m
.get_mut(unsafe { IndexSTSeqDllQref::from_ref(k) })
.map(|e| unsafe { &mut e.as_mut().v })
}
#[inline(always)]
fn _update<Q: ?Sized>(&mut self, k: &Q, v: V) -> Option<V>
where
@ -629,12 +638,12 @@ where
self._get(key).cloned()
}
fn st_get_mut<Q>(&mut self, _: &Q) -> Option<&mut V>
fn st_get_mut<Q>(&mut self, k: &Q) -> Option<&mut V>
where
K: AsKey + Borrow<Q>,
Q: ?Sized + AsKey,
{
todo!()
self._get_mut(k)
}
fn st_update<Q>(&mut self, key: &Q, val: V) -> bool

@ -45,26 +45,39 @@ pub(super) use restore::{DecodedBatchEvent, DecodedBatchEventKind, NormalBatch};
pub use {persist::DataBatchPersistDriver, restore::DataBatchRestoreDriver};
use {
super::{
header_meta,
rw::{FileOpen, SDSSFileIO},
RawFSInterface, SDSSResult,
},
super::{header_meta, rw::SDSSFileIO, RawFSInterface, SDSSResult},
crate::engine::core::model::Model,
};
const LOG_SPECIFIER_VERSION: header_meta::FileSpecifierVersion =
header_meta::FileSpecifierVersion::__new(0);
pub fn open_or_reinit<Fs: RawFSInterface>(
/// Re-initialize an existing batch journal and read all its data into model
pub fn reinit<Fs: RawFSInterface>(
name: &str,
model: &Model,
) -> SDSSResult<DataBatchPersistDriver<Fs>> {
let (_header, f) = SDSSFileIO::<Fs>::open::<false>(
name,
header_meta::FileScope::Journal,
header_meta::FileSpecifier::TableDataBatch,
LOG_SPECIFIER_VERSION,
)?;
// restore
let mut restore_driver = DataBatchRestoreDriver::new(f)?;
restore_driver.read_data_batch_into_model(model)?;
DataBatchPersistDriver::new(restore_driver.into_file(), false)
}
/// Create a new batch journal
pub fn create<Fs: RawFSInterface>(
path: &str,
host_setting_version: u32,
host_run_mode: header_meta::HostRunMode,
host_startup_counter: u64,
) -> SDSSResult<DataBatchPersistDriver<Fs>> {
let f = SDSSFileIO::<Fs>::open_or_create_perm_rw::<false>(
name,
let f = SDSSFileIO::<Fs>::create(
path,
header_meta::FileScope::Journal,
header_meta::FileSpecifier::TableDataBatch,
LOG_SPECIFIER_VERSION,
@ -72,16 +85,5 @@ pub fn open_or_reinit<Fs: RawFSInterface>(
host_run_mode,
host_startup_counter,
)?;
match f {
FileOpen::Created(new_file) => Ok(DataBatchPersistDriver::new(new_file, true)?),
FileOpen::Existing(existing, _) => {
// restore
let mut restore_driver = DataBatchRestoreDriver::new(existing)?;
restore_driver.read_data_batch_into_model(model)?;
Ok(DataBatchPersistDriver::new(
restore_driver.into_file(),
false,
)?)
}
}
DataBatchPersistDriver::new(f, true)
}

@ -36,14 +36,14 @@ const GNS_FILE_PATH: &str = "gns.db-tlog";
pub struct SEInitState {
pub txn_driver: GNSTransactionDriverAnyFS<super::LocalFS>,
pub model_drivers: ModelDrivers,
pub model_drivers: ModelDrivers<LocalFS>,
pub gns: GlobalNS,
}
impl SEInitState {
pub fn new(
txn_driver: GNSTransactionDriverAnyFS<super::LocalFS>,
model_drivers: ModelDrivers,
model_drivers: ModelDrivers<LocalFS>,
gns: GlobalNS,
) -> Self {
Self {
@ -70,13 +70,7 @@ impl SEInitState {
let space_uuid = space.get_uuid();
for (model_name, model) in space.models().read().iter() {
let path = Self::model_path(space_name, space_uuid, model_name, model.get_uuid());
let persist_driver = match batch_jrnl::open_or_reinit(
&path,
model,
host_setting_version,
host_run_mode,
host_startup_counter,
) {
let persist_driver = match batch_jrnl::reinit(&path, model) {
Ok(j) => j,
Err(e) => {
return Err(e.with_extra(format!(

@ -57,7 +57,7 @@ type ComponentIter<'a> = std::iter::Take<std::vec::IntoIter<&'a str>>;
*/
#[derive(Debug)]
enum VNode {
pub(super) enum VNode {
Dir(HashMap<Box<str>, Self>),
File(VFile),
}
@ -223,6 +223,44 @@ impl RawFSInterface for VirtualFS {
}
}
}
fn fs_fcreate_rw(fpath: &str) -> SDSSResult<Self::File> {
let mut vfs = VFS.write();
let (target_file, components) = split_target_and_components(fpath);
let target_dir = find_target_dir_mut(components, &mut vfs)?;
match target_dir.entry(target_file.into()) {
Entry::Occupied(k) => {
match k.get() {
VNode::Dir(_) => {
return Err(Error::new(
ErrorKind::AlreadyExists,
"found directory with same name where file was to be created",
)
.into());
}
VNode::File(_) => {
// the file already exists
return Err(Error::new(
ErrorKind::AlreadyExists,
"the file already exists",
)
.into());
}
}
}
Entry::Vacant(v) => {
// no file exists, we can create this
v.insert(VNode::File(VFile::new(true, true, vec![], 0)));
Ok(VFileDescriptor(fpath.into()))
}
}
}
fn fs_fopen_rw(fpath: &str) -> SDSSResult<Self::File> {
with_file_mut(fpath, |f| {
f.read = true;
f.write = true;
Ok(VFileDescriptor(fpath.into()))
})
}
}
fn find_target_dir_mut<'a>(
@ -477,6 +515,7 @@ impl RawFileInterfaceExt for VFileDescriptor {
pub struct NullFS;
pub struct NullFile;
impl RawFSInterface for NullFS {
const NOT_NULL: bool = false;
type File = NullFile;
fn fs_rename_file(_: &str, _: &str) -> SDSSResult<()> {
Ok(())
@ -499,6 +538,12 @@ impl RawFSInterface for NullFS {
fn fs_fopen_or_create_rw(_: &str) -> SDSSResult<RawFileOpen<Self::File>> {
Ok(RawFileOpen::Created(NullFile))
}
fn fs_fopen_rw(_: &str) -> SDSSResult<Self::File> {
Ok(NullFile)
}
fn fs_fcreate_rw(_: &str) -> SDSSResult<Self::File> {
Ok(NullFile)
}
}
impl RawFileInterfaceRead for NullFile {
fn fr_read_exact(&mut self, _: &mut [u8]) -> SDSSResult<()> {

@ -47,7 +47,7 @@ pub use {
rw::{LocalFS, RawFSInterface, SDSSFileIO},
};
pub mod data_batch {
pub use super::batch_jrnl::{DataBatchPersistDriver, DataBatchRestoreDriver};
pub use super::batch_jrnl::{create, reinit, DataBatchPersistDriver, DataBatchRestoreDriver};
}
pub mod header_meta {
pub use super::header_impl::{FileScope, FileSpecifier, FileSpecifierVersion, HostRunMode};

@ -71,16 +71,34 @@ pub enum RawFileOpen<F> {
Existing(F),
}
/// The specification for a file system interface (our own abstraction over the fs)
pub trait RawFSInterface {
/// asserts that the file system is not a null filesystem (like `/dev/null` for example)
const NOT_NULL: bool = true;
/// the file descriptor that is returned by the file system when a file is opened
type File: RawFileInterface;
/// Remove a file
fn fs_remove_file(fpath: &str) -> SDSSResult<()>;
/// Rename a file
fn fs_rename_file(from: &str, to: &str) -> SDSSResult<()>;
/// Create a directory
fn fs_create_dir(fpath: &str) -> SDSSResult<()>;
/// Create a directory and all corresponding path components
fn fs_create_dir_all(fpath: &str) -> SDSSResult<()>;
/// Delete a directory
fn fs_delete_dir(fpath: &str) -> SDSSResult<()>;
/// Delete a directory and recursively remove all (if any) children
fn fs_delete_dir_all(fpath: &str) -> SDSSResult<()>;
/// Open or create a file in R/W mode
///
/// This will:
/// - Create a file if it doesn't exist
/// - Open a file it it does exist
fn fs_fopen_or_create_rw(fpath: &str) -> SDSSResult<RawFileOpen<Self::File>>;
/// Open an existing file
fn fs_fopen_rw(fpath: &str) -> SDSSResult<Self::File>;
/// Create a new file
fn fs_fcreate_rw(fpath: &str) -> SDSSResult<Self::File>;
}
/// A file (well, probably) that can be used for RW operations along with advanced write and extended operations (such as seeking)
@ -174,6 +192,18 @@ impl RawFSInterface for LocalFS {
Ok(RawFileOpen::Existing(f))
}
}
fn fs_fcreate_rw(fpath: &str) -> SDSSResult<Self::File> {
let f = File::options()
.create_new(true)
.read(true)
.write(true)
.open(fpath)?;
Ok(f)
}
fn fs_fopen_rw(fpath: &str) -> SDSSResult<Self::File> {
let f = File::options().read(true).write(true).open(fpath)?;
Ok(f)
}
}
impl RawFileInterface for File {
@ -357,6 +387,95 @@ pub struct SDSSFileIO<Fs: RawFSInterface> {
}
impl<Fs: RawFSInterface> SDSSFileIO<Fs> {
/// Open an existing SDSS file
///
/// **IMPORTANT: File position: end-of-header-section**
pub fn open<const REWRITE_MODIFY_COUNTER: bool>(
file_path: &str,
file_scope: FileScope,
file_specifier: FileSpecifier,
file_specifier_version: FileSpecifierVersion,
) -> SDSSResult<(SDSSHeader, Self)> {
let f = Fs::fs_fopen_rw(file_path)?;
Self::_sdss_fopen::<REWRITE_MODIFY_COUNTER>(
f,
file_scope,
file_specifier,
file_specifier_version,
)
}
/// internal SDSS fopen routine
fn _sdss_fopen<const REWRITE_MODIFY_COUNTER: bool>(
mut f: <Fs as RawFSInterface>::File,
file_scope: FileScope,
file_specifier: FileSpecifier,
file_specifier_version: FileSpecifierVersion,
) -> Result<(SDSSHeader, SDSSFileIO<Fs>), SDSSError> {
let mut header_raw = [0u8; SDSSHeaderRaw::header_size()];
f.fr_read_exact(&mut header_raw)?;
let header = SDSSHeaderRaw::decode_noverify(header_raw)
.ok_or(SDSSError::HeaderDecodeCorruptedHeader)?;
header.verify(file_scope, file_specifier, file_specifier_version)?;
let mut f = Self::_new(f);
if REWRITE_MODIFY_COUNTER {
// since we updated this file, let us update the header
let mut new_header = header.clone();
new_header.dr_rs_mut().bump_modify_count();
f.seek_from_start(0)?;
f.fsynced_write(new_header.encoded().array().as_ref())?;
f.seek_from_start(SDSSHeaderRaw::header_size() as _)?;
}
Ok((header, f))
}
/// Create a new SDSS file
///
/// **IMPORTANT: File position: end-of-header-section**
pub fn create(
file_path: &str,
file_scope: FileScope,
file_specifier: FileSpecifier,
file_specifier_version: FileSpecifierVersion,
host_setting_version: u32,
host_run_mode: HostRunMode,
host_startup_counter: u64,
) -> SDSSResult<Self> {
let f = Fs::fs_fcreate_rw(file_path)?;
Self::_sdss_fcreate(
file_scope,
file_specifier,
file_specifier_version,
host_setting_version,
host_run_mode,
host_startup_counter,
f,
)
}
/// Internal SDSS fcreate routine
fn _sdss_fcreate(
file_scope: FileScope,
file_specifier: FileSpecifier,
file_specifier_version: FileSpecifierVersion,
host_setting_version: u32,
host_run_mode: HostRunMode,
host_startup_counter: u64,
f: <Fs as RawFSInterface>::File,
) -> Result<SDSSFileIO<Fs>, SDSSError> {
let data = SDSSHeaderRaw::new_auto(
file_scope,
file_specifier,
file_specifier_version,
host_setting_version,
host_run_mode,
host_startup_counter,
0,
)
.array();
let mut f = Self::_new(f);
f.fsynced_write(&data)?;
Ok(f)
}
/// Create a new SDSS file or re-open an existing file and verify
///
/// **IMPORTANT: File position: end-of-header-section**
pub fn open_or_create_perm_rw<const REWRITE_MODIFY_COUNTER: bool>(
file_path: &str,
@ -370,39 +489,25 @@ impl<Fs: RawFSInterface> SDSSFileIO<Fs> {
let f = Fs::fs_fopen_or_create_rw(file_path)?;
match f {
RawFileOpen::Created(f) => {
// since this file was just created, we need to append the header
let data = SDSSHeaderRaw::new_auto(
let f = Self::_sdss_fcreate(
file_scope,
file_specifier,
file_specifier_version,
host_setting_version,
host_run_mode,
host_startup_counter,
0,
)
.array();
let mut f = Self::_new(f);
f.fsynced_write(&data)?;
f,
)?;
Ok(FileOpen::Created(f))
}
RawFileOpen::Existing(mut f) => {
// this is an existing file. decoded the header
let mut header_raw = [0u8; SDSSHeaderRaw::header_size()];
f.fr_read_exact(&mut header_raw)?;
let header = SDSSHeaderRaw::decode_noverify(header_raw)
.ok_or(SDSSError::HeaderDecodeCorruptedHeader)?;
// now validate the header
header.verify(file_scope, file_specifier, file_specifier_version)?;
let mut f = Self::_new(f);
if REWRITE_MODIFY_COUNTER {
// since we updated this file, let us update the header
let mut new_header = header.clone();
new_header.dr_rs_mut().bump_modify_count();
f.seek_from_start(0)?;
f.fsynced_write(new_header.encoded().array().as_ref())?;
f.seek_from_start(SDSSHeaderRaw::header_size() as _)?;
}
Ok(FileOpen::Existing(f, header))
RawFileOpen::Existing(f) => {
let (f, header) = Self::_sdss_fopen::<REWRITE_MODIFY_COUNTER>(
f,
file_scope,
file_specifier,
file_specifier_version,
)?;
Ok(FileOpen::Existing(header, f))
}
}
}

Loading…
Cancel
Save