diff --git a/server/src/engine/fractal/test_utils.rs b/server/src/engine/fractal/test_utils.rs index 69e52c42..018ee678 100644 --- a/server/src/engine/fractal/test_utils.rs +++ b/server/src/engine/fractal/test_utils.rs @@ -32,10 +32,13 @@ use { crate::engine::{ core::GlobalNS, data::uuid::Uuid, - storage::v1::{ - header_meta::HostRunMode, - memfs::{NullFS, VirtualFS}, - RawFSInterface, + storage::{ + self, + v1::{ + header_meta::HostRunMode, + memfs::{NullFS, VirtualFS}, + RawFSInterface, + }, }, txn::gns::GNSTransactionDriverAnyFS, }, @@ -75,15 +78,10 @@ impl TestGlobal { impl TestGlobal { pub fn new_with_driver_id(log_name: &str) -> Self { let gns = GlobalNS::empty(); - let driver = GNSTransactionDriverAnyFS::open_or_reinit_with_name( - &gns, - log_name, - 0, - HostRunMode::Prod, - 0, - ) - .unwrap(); - Self::new(gns, 0, driver) + let driver = storage::v1::loader::open_gns_driver(log_name, 0, HostRunMode::Dev, 0, &gns) + .unwrap() + .into_inner(); + Self::new(gns, 0, GNSTransactionDriverAnyFS::new(driver)) } } @@ -128,13 +126,13 @@ impl GlobalInstanceLike for TestGlobal { space_uuid: Uuid, model_name: &str, model_uuid: Uuid, - ) -> crate::engine::storage::v1::SDSSResult<()> { + ) -> storage::v1::SDSSResult<()> { // create model dir - Fs::fs_create_dir(&crate::engine::storage::v1::loader::SEInitState::model_dir( + Fs::fs_create_dir(&storage::v1::loader::SEInitState::model_dir( space_name, space_uuid, model_name, model_uuid, ))?; - let driver = crate::engine::storage::v1::data_batch::create( - &crate::engine::storage::v1::loader::SEInitState::model_path( + let driver = storage::v1::data_batch::create( + &storage::v1::loader::SEInitState::model_path( space_name, space_uuid, model_name, model_uuid, ), self.sys_cfg().host_data().settings_version(), diff --git a/server/src/engine/storage/v1/journal.rs b/server/src/engine/storage/v1/journal.rs index 7336b932..07a4bc3e 100644 --- a/server/src/engine/storage/v1/journal.rs +++ b/server/src/engine/storage/v1/journal.rs @@ -41,12 +41,10 @@ - FIXME(@ohsayan): we will probably (naively) need to dynamically reposition the cursor in case the metadata is corrupted as well */ -use super::rw::RawFSInterface; - use { super::{ header_impl::{FileSpecifierVersion, HostRunMode, SDSSHeaderRaw}, - rw::{FileOpen, SDSSFileIO}, + rw::{FileOpen, RawFSInterface, SDSSFileIO}, SDSSError, SDSSResult, }, crate::{ @@ -59,34 +57,6 @@ use { const CRC: crc::Crc = crc::Crc::::new(&crc::CRC_32_ISO_HDLC); const RECOVERY_BLOCK_AUTO_THRESHOLD: usize = 5; -/// A journal to `/dev/null` (app. level impl) -#[cfg(test)] -pub fn null_journal( - log_file_name: &str, - log_kind: FileSpecifier, - log_kind_version: FileSpecifierVersion, - host_setting_version: u32, - host_run_mode: HostRunMode, - host_startup_counter: u64, - _: &TA::GlobalState, -) -> JournalWriter { - let FileOpen::Created(journal) = - SDSSFileIO::::open_or_create_perm_rw::( - log_file_name, - FileScope::Journal, - log_kind, - log_kind_version, - host_setting_version, - host_run_mode, - host_startup_counter, - ) - .unwrap() - else { - panic!() - }; - JournalWriter::new(journal, 0, true).unwrap() -} - pub fn open_journal( log_file_name: &str, log_kind: FileSpecifier, @@ -95,7 +65,7 @@ pub fn open_journal( host_run_mode: HostRunMode, host_startup_counter: u64, gs: &TA::GlobalState, -) -> SDSSResult> { +) -> SDSSResult>> { macro_rules! open_file { ($modify:literal) => { SDSSFileIO::::open_or_create_perm_rw::<$modify>( @@ -116,11 +86,13 @@ pub fn open_journal( open_file!(true) }?; let file = match f { - FileOpen::Created(f) => return JournalWriter::new(f, 0, true), - FileOpen::Existing(file, _) => file, + FileOpen::Created(f) => return Ok(FileOpen::Created(JournalWriter::new(f, 0, true)?)), + FileOpen::Existing((file, _header)) => file, }; let (file, last_txn) = JournalReader::::scroll(file, gs)?; - JournalWriter::new(file, last_txn, false) + Ok(FileOpen::Existing(JournalWriter::new( + file, last_txn, false, + )?)) } /// The journal adapter diff --git a/server/src/engine/storage/v1/loader.rs b/server/src/engine/storage/v1/loader.rs index 1fad9678..b73c6fef 100644 --- a/server/src/engine/storage/v1/loader.rs +++ b/server/src/engine/storage/v1/loader.rs @@ -28,13 +28,20 @@ use crate::engine::{ core::GlobalNS, data::uuid::Uuid, fractal::{FractalModelDriver, ModelDrivers, ModelUniqueID}, - storage::v1::{batch_jrnl, header_meta::HostRunMode, LocalFS, SDSSErrorContext, SDSSResult}, - txn::gns::GNSTransactionDriverAnyFS, + storage::v1::{ + batch_jrnl, header_meta, + journal::{self, JournalWriter}, + rw::{FileOpen, RawFSInterface}, + LocalFS, SDSSErrorContext, SDSSResult, + }, + txn::gns::{GNSAdapter, GNSTransactionDriverAnyFS}, }; const GNS_FILE_PATH: &str = "gns.db-tlog"; +const GNS_LOG_VERSION_CODE: u32 = 0; pub struct SEInitState { + pub new_instance: bool, pub txn_driver: GNSTransactionDriverAnyFS, pub model_drivers: ModelDrivers, pub gns: GlobalNS, @@ -42,11 +49,13 @@ pub struct SEInitState { impl SEInitState { pub fn new( + new_instance: bool, txn_driver: GNSTransactionDriverAnyFS, model_drivers: ModelDrivers, gns: GlobalNS, ) -> Self { Self { + new_instance, txn_driver, model_drivers, gns, @@ -54,37 +63,47 @@ impl SEInitState { } pub fn try_init( host_setting_version: u32, - host_run_mode: HostRunMode, + host_run_mode: header_meta::HostRunMode, host_startup_counter: u64, ) -> SDSSResult { let gns = GlobalNS::empty(); - let gns_txn_driver = GNSTransactionDriverAnyFS::::open_or_reinit_with_name( - &gns, + let gns_txn_driver = open_gns_driver( GNS_FILE_PATH, host_setting_version, host_run_mode, host_startup_counter, + &gns, )?; + let new_instance = gns_txn_driver.is_created(); let mut model_drivers = ModelDrivers::new(); - for (space_name, space) in gns.spaces().read().iter() { - let space_uuid = space.get_uuid(); - for (model_name, model) in space.models().read().iter() { - let path = Self::model_path(space_name, space_uuid, model_name, model.get_uuid()); - let persist_driver = match batch_jrnl::reinit(&path, model) { - Ok(j) => j, - Err(e) => { - return Err(e.with_extra(format!( - "failed to restore model data from journal in `{path}`" - ))) - } - }; - let _ = model_drivers.insert( - ModelUniqueID::new(space_name, model_name, model.get_uuid()), - FractalModelDriver::init(persist_driver), - ); + if !new_instance { + // this is an existing instance, so read in all data + for (space_name, space) in gns.spaces().read().iter() { + let space_uuid = space.get_uuid(); + for (model_name, model) in space.models().read().iter() { + let path = + Self::model_path(space_name, space_uuid, model_name, model.get_uuid()); + let persist_driver = match batch_jrnl::reinit(&path, model) { + Ok(j) => j, + Err(e) => { + return Err(e.with_extra(format!( + "failed to restore model data from journal in `{path}`" + ))) + } + }; + let _ = model_drivers.insert( + ModelUniqueID::new(space_name, model_name, model.get_uuid()), + FractalModelDriver::init(persist_driver), + ); + } } } - Ok(SEInitState::new(gns_txn_driver, model_drivers, gns)) + Ok(SEInitState::new( + new_instance, + GNSTransactionDriverAnyFS::new(gns_txn_driver.into_inner()), + model_drivers, + gns, + )) } pub fn model_path( space_name: &str, @@ -109,3 +128,21 @@ impl SEInitState { format!("data/{space_name}-{space_uuid}") } } + +pub fn open_gns_driver( + path: &str, + host_setting_version: u32, + host_run_mode: header_meta::HostRunMode, + host_startup_counter: u64, + gns: &GlobalNS, +) -> SDSSResult>> { + journal::open_journal::( + path, + header_meta::FileSpecifier::GNSTxnLog, + header_meta::FileSpecifierVersion::__new(GNS_LOG_VERSION_CODE), + host_setting_version, + host_run_mode, + host_startup_counter, + gns, + ) +} diff --git a/server/src/engine/storage/v1/memfs.rs b/server/src/engine/storage/v1/memfs.rs index adce7539..9dd11fa9 100644 --- a/server/src/engine/storage/v1/memfs.rs +++ b/server/src/engine/storage/v1/memfs.rs @@ -28,8 +28,8 @@ use { crate::engine::{ storage::v1::{ rw::{ - RawFSInterface, RawFileInterface, RawFileInterfaceExt, RawFileInterfaceRead, - RawFileInterfaceWrite, RawFileInterfaceWriteExt, RawFileOpen, + FileOpen, RawFSInterface, RawFileInterface, RawFileInterfaceExt, + RawFileInterfaceRead, RawFileInterfaceWrite, RawFileInterfaceWriteExt, }, SDSSResult, }, @@ -123,10 +123,10 @@ impl RawFSInterface for VirtualFS { // create new file let file = VirtualFS::fs_fopen_or_create_rw(to)?; match file { - RawFileOpen::Created(mut c) => { + FileOpen::Created(mut c) => { c.fw_write_all(&data)?; } - RawFileOpen::Existing(mut e) => { + FileOpen::Existing(mut e) => { e.fw_truncate_to(0)?; e.fw_write_all(&data)?; } @@ -203,7 +203,7 @@ impl RawFSInterface for VirtualFS { fn fs_delete_dir_all(fpath: &str) -> super::SDSSResult<()> { delete_dir(fpath, true) } - fn fs_fopen_or_create_rw(fpath: &str) -> super::SDSSResult> { + fn fs_fopen_or_create_rw(fpath: &str) -> super::SDSSResult> { let mut vfs = VFS.write(); // components let (target_file, components) = split_target_and_components(fpath); @@ -213,13 +213,13 @@ impl RawFSInterface for VirtualFS { VNode::File(f) => { f.read = true; f.write = true; - Ok(RawFileOpen::Existing(VFileDescriptor(fpath.into()))) + Ok(FileOpen::Existing(VFileDescriptor(fpath.into()))) } VNode::Dir(_) => return err_item_is_not_file(), }, Entry::Vacant(v) => { v.insert(VNode::File(VFile::new(true, true, vec![], 0))); - Ok(RawFileOpen::Created(VFileDescriptor(fpath.into()))) + Ok(FileOpen::Created(VFileDescriptor(fpath.into()))) } } } @@ -535,8 +535,8 @@ impl RawFSInterface for NullFS { fn fs_delete_dir_all(_: &str) -> SDSSResult<()> { Ok(()) } - fn fs_fopen_or_create_rw(_: &str) -> SDSSResult> { - Ok(RawFileOpen::Created(NullFile)) + fn fs_fopen_or_create_rw(_: &str) -> SDSSResult> { + Ok(FileOpen::Created(NullFile)) } fn fs_fopen_rw(_: &str) -> SDSSResult { Ok(NullFile) diff --git a/server/src/engine/storage/v1/rw.rs b/server/src/engine/storage/v1/rw.rs index b040f142..fbabc1b8 100644 --- a/server/src/engine/storage/v1/rw.rs +++ b/server/src/engine/storage/v1/rw.rs @@ -24,8 +24,6 @@ * */ -use std::marker::PhantomData; - use { super::{ header_impl::{ @@ -40,35 +38,45 @@ use { std::{ fs::{self, File}, io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write}, + marker::PhantomData, }, }; #[derive(Debug)] /// Log whether -pub enum FileOpen { - Created(F), - Existing(F, SDSSHeader), +pub enum FileOpen { + Created(CF), + Existing(EF), } -impl FileOpen { - pub fn into_existing(self) -> Option<(F, SDSSHeader)> { +impl FileOpen { + pub const fn is_created(&self) -> bool { + matches!(self, Self::Created(_)) + } + pub const fn is_existing(&self) -> bool { + !self.is_created() + } + pub fn into_existing(self) -> Option { match self { - Self::Existing(f, h) => Some((f, h)), + Self::Existing(e) => Some(e), Self::Created(_) => None, } } - pub fn into_created(self) -> Option { + pub fn into_created(self) -> Option { match self { Self::Created(f) => Some(f), - Self::Existing(_, _) => None, + Self::Existing(_) => None, } } } -#[derive(Debug)] -pub enum RawFileOpen { - Created(F), - Existing(F), +impl FileOpen { + pub fn into_inner(self) -> F { + match self { + Self::Created(f) => f, + Self::Existing(f) => f, + } + } } /// The specification for a file system interface (our own abstraction over the fs) @@ -94,7 +102,7 @@ pub trait RawFSInterface { /// This will: /// - Create a file if it doesn't exist /// - Open a file it it does exist - fn fs_fopen_or_create_rw(fpath: &str) -> SDSSResult>; + fn fs_fopen_or_create_rw(fpath: &str) -> SDSSResult>; /// Open an existing file fn fs_fopen_rw(fpath: &str) -> SDSSResult; /// Create a new file @@ -179,7 +187,7 @@ impl RawFSInterface for LocalFS { fn fs_delete_dir_all(fpath: &str) -> SDSSResult<()> { cvt(fs::remove_dir_all(fpath)) } - fn fs_fopen_or_create_rw(fpath: &str) -> SDSSResult> { + fn fs_fopen_or_create_rw(fpath: &str) -> SDSSResult> { let f = File::options() .create(true) .read(true) @@ -187,9 +195,9 @@ impl RawFSInterface for LocalFS { .open(fpath)?; let md = f.metadata()?; if md.len() == 0 { - Ok(RawFileOpen::Created(f)) + Ok(FileOpen::Created(f)) } else { - Ok(RawFileOpen::Existing(f)) + Ok(FileOpen::Existing(f)) } } fn fs_fcreate_rw(fpath: &str) -> SDSSResult { @@ -485,10 +493,10 @@ impl SDSSFileIO { host_setting_version: u32, host_run_mode: HostRunMode, host_startup_counter: u64, - ) -> SDSSResult> { + ) -> SDSSResult> { let f = Fs::fs_fopen_or_create_rw(file_path)?; match f { - RawFileOpen::Created(f) => { + FileOpen::Created(f) => { let f = Self::_sdss_fcreate( file_scope, file_specifier, @@ -500,14 +508,14 @@ impl SDSSFileIO { )?; Ok(FileOpen::Created(f)) } - RawFileOpen::Existing(f) => { + FileOpen::Existing(f) => { let (f, header) = Self::_sdss_fopen::( f, file_scope, file_specifier, file_specifier_version, )?; - Ok(FileOpen::Existing(header, f)) + Ok(FileOpen::Existing((header, f))) } } } diff --git a/server/src/engine/storage/v1/tests/batch.rs b/server/src/engine/storage/v1/tests/batch.rs index 5f63a1ef..e38d4113 100644 --- a/server/src/engine/storage/v1/tests/batch.rs +++ b/server/src/engine/storage/v1/tests/batch.rs @@ -55,7 +55,10 @@ fn pkey(v: impl Into) -> PrimaryIndexKey { PrimaryIndexKey::try_from_dc(v.into()).unwrap() } -fn open_file(fpath: &str) -> FileOpen> { +fn open_file( + fpath: &str, +) -> FileOpen, (SDSSFileIO, super::super::header_impl::SDSSHeader)> +{ SDSSFileIO::open_or_create_perm_rw::( fpath, FileScope::DataBatch, @@ -71,7 +74,7 @@ fn open_file(fpath: &str) -> FileOpen> { fn open_batch_data(fpath: &str, mdl: &Model) -> DataBatchPersistDriver { match open_file(fpath) { FileOpen::Created(f) => DataBatchPersistDriver::new(f, true), - FileOpen::Existing(f, _) => { + FileOpen::Existing((f, _header)) => { let mut dbr = DataBatchRestoreDriver::new(f).unwrap(); dbr.read_data_batch_into_model(mdl).unwrap(); DataBatchPersistDriver::new(dbr.into_file(), false) diff --git a/server/src/engine/storage/v1/tests/rw.rs b/server/src/engine/storage/v1/tests/rw.rs index ff70d03a..4df4ce87 100644 --- a/server/src/engine/storage/v1/tests/rw.rs +++ b/server/src/engine/storage/v1/tests/rw.rs @@ -43,7 +43,7 @@ fn create_delete() { ) .unwrap(); match f { - FileOpen::Existing(_, _) => panic!(), + FileOpen::Existing(_) => panic!(), FileOpen::Created(_) => {} }; } @@ -58,7 +58,7 @@ fn create_delete() { ) .unwrap(); let h = match open { - FileOpen::Existing(_, header) => header, + FileOpen::Existing((_, header)) => header, _ => panic!(), }; assert_eq!(h.gr_mdr().file_scope(), FileScope::Journal); diff --git a/server/src/engine/storage/v1/tests/tx.rs b/server/src/engine/storage/v1/tests/tx.rs index 01c24403..5bf01ec0 100644 --- a/server/src/engine/storage/v1/tests/tx.rs +++ b/server/src/engine/storage/v1/tests/tx.rs @@ -143,6 +143,7 @@ fn open_log( 1, &db, ) + .map(|v| v.into_inner()) } #[test] diff --git a/server/src/engine/txn/gns/mod.rs b/server/src/engine/txn/gns/mod.rs index 85905315..07479d46 100644 --- a/server/src/engine/txn/gns/mod.rs +++ b/server/src/engine/txn/gns/mod.rs @@ -34,7 +34,6 @@ use { data::uuid::Uuid, mem::BufferedScanner, storage::v1::{ - self, header_meta, inf::{self, PersistObject}, JournalAdapter, JournalWriter, LocalFS, RawFSInterface, SDSSResult, }, @@ -64,35 +63,18 @@ pub type GNSTransactionDriverNullZero = #[cfg(test)] pub type GNSTransactionDriverVFS = GNSTransactionDriverAnyFS; -const CURRENT_LOG_VERSION: u32 = 0; - /// The GNS transaction driver is used to handle DDL transactions -pub struct GNSTransactionDriverAnyFS { - journal: JournalWriter, +pub struct GNSTransactionDriverAnyFS { + journal: JournalWriter, } impl GNSTransactionDriverAnyFS { + pub fn new(journal: JournalWriter) -> Self { + Self { journal } + } pub fn __journal_mut(&mut self) -> &mut JournalWriter { &mut self.journal } - pub fn open_or_reinit_with_name( - gns: &GlobalNS, - log_file_name: &str, - host_setting_version: u32, - host_run_mode: header_meta::HostRunMode, - host_startup_counter: u64, - ) -> TransactionResult { - let journal = v1::open_journal( - log_file_name, - header_meta::FileSpecifier::GNSTxnLog, - header_meta::FileSpecifierVersion::__new(CURRENT_LOG_VERSION), - host_setting_version, - host_run_mode, - host_startup_counter, - gns, - )?; - Ok(Self { journal }) - } /// Attempts to commit the given event into the journal, handling any possible recovery triggers and returning /// errors (if any) pub fn try_commit(&mut self, gns_event: GE) -> TransactionResult<()> {