diff --git a/server/src/engine/core/space.rs b/server/src/engine/core/space.rs index d282d837..ab686bc7 100644 --- a/server/src/engine/core/space.rs +++ b/server/src/engine/core/space.rs @@ -31,6 +31,7 @@ use { error::{DatabaseError, DatabaseResult}, idx::{IndexST, STIndex}, ql::ddl::{alt::AlterSpace, crt::CreateSpace, drop::DropSpace}, + txn::gns as gnstxn, }, parking_lot::RwLock, }; @@ -168,15 +169,34 @@ impl Space { } impl Space { - /// Execute a `create` stmt - pub fn exec_create(gns: &super::GlobalNS, space: CreateSpace) -> DatabaseResult<()> { + pub fn transactional_exec_create( + gns: &super::GlobalNS, + txn_driver: &mut gnstxn::GNSTransactionDriverAnyFS, + space: CreateSpace, + ) -> DatabaseResult<()> { + // process create let ProcedureCreate { space_name, space } = Self::process_create(space)?; + // acquire access let mut wl = gns.spaces().write(); - if wl.st_insert(space_name, space) { - Ok(()) - } else { - Err(DatabaseError::DdlSpaceAlreadyExists) + if wl.st_contains(&space_name) { + return Err(DatabaseError::DdlSpaceAlreadyExists); } + // commit txn + if TI::NONNULL { + // prepare and commit txn + let s_read = space.metadata().env().read(); + txn_driver.try_commit(gnstxn::CreateSpaceTxn::new(&s_read, &space_name, &space))?; + } + // update global state + let _ = wl.st_insert(space_name, space); + Ok(()) + } + /// Execute a `create` stmt + #[cfg(test)] + pub fn exec_create(gns: &super::GlobalNS, space: CreateSpace) -> DatabaseResult<()> { + gnstxn::GNSTransactionDriverNullZero::nullzero_create_exec(gns, move |driver| { + Self::transactional_exec_create(gns, driver, space) + }) } /// Execute a `alter` stmt pub fn exec_alter( diff --git a/server/src/engine/storage/v1/journal.rs b/server/src/engine/storage/v1/journal.rs index d993f61e..eba132c3 100644 --- a/server/src/engine/storage/v1/journal.rs +++ b/server/src/engine/storage/v1/journal.rs @@ -57,10 +57,33 @@ use { const CRC: crc::Crc = crc::Crc::::new(&crc::CRC_32_ISO_HDLC); const RECOVERY_BLOCK_AUTO_THRESHOLD: usize = 5; -pub fn open_journal< - TA: JournalAdapter + core::fmt::Debug, - LF: RawFileIOInterface + core::fmt::Debug, ->( +/// A journal to `/dev/null` (app. level impl) +#[cfg(test)] +pub fn null_journal( + log_file_name: &str, + log_kind: FileSpecifier, + log_kind_version: FileSpecifierVersion, + host_setting_version: u32, + host_run_mode: HostRunMode, + host_startup_counter: u64, + _: &TA::GlobalState, +) -> JournalWriter { + let FileOpen::Created(journal) = SDSSFileIO::::open_or_create_perm_rw( + log_file_name, + FileScope::Journal, + log_kind, + log_kind_version, + host_setting_version, + host_run_mode, + host_startup_counter, + ) + .unwrap() else { + panic!() + }; + JournalWriter::new(journal, 0, true).unwrap() +} + +pub fn open_journal( log_file_name: &str, log_kind: FileSpecifier, log_kind_version: FileSpecifierVersion, diff --git a/server/src/engine/storage/v1/mod.rs b/server/src/engine/storage/v1/mod.rs index 0031839c..b4113c78 100644 --- a/server/src/engine/storage/v1/mod.rs +++ b/server/src/engine/storage/v1/mod.rs @@ -41,8 +41,11 @@ mod tests; // re-exports pub use { journal::{open_journal, JournalAdapter, JournalWriter}, - rw::BufferedScanner, + rw::{BufferedScanner, NullZero, RawFileIOInterface, SDSSFileIO}, }; +pub mod header_meta { + pub use super::header_impl::{FileScope, FileSpecifier, FileSpecifierVersion, HostRunMode}; +} use crate::util::os::SysIOError as IoError; diff --git a/server/src/engine/storage/v1/rw.rs b/server/src/engine/storage/v1/rw.rs index e8f8f856..969f8cbc 100644 --- a/server/src/engine/storage/v1/rw.rs +++ b/server/src/engine/storage/v1/rw.rs @@ -53,6 +53,8 @@ pub enum RawFileOpen { } pub trait RawFileIOInterface: Sized { + /// Indicates that the interface is not a `/dev/null` (or related) implementation + const NOTNULL: bool = true; fn fopen_or_create_rw(file_path: &str) -> SDSSResult>; fn fread_exact(&mut self, buf: &mut [u8]) -> SDSSResult<()>; fn fwrite_all(&mut self, bytes: &[u8]) -> SDSSResult<()>; @@ -63,6 +65,37 @@ pub trait RawFileIOInterface: Sized { fn fcursor(&mut self) -> SDSSResult; } +/// This is a kind of file like `/dev/null`. It exists in ... nothing! +pub struct NullZero; + +impl RawFileIOInterface for NullZero { + const NOTNULL: bool = false; + fn fopen_or_create_rw(_: &str) -> SDSSResult> { + Ok(RawFileOpen::Created(Self)) + } + fn fread_exact(&mut self, _: &mut [u8]) -> SDSSResult<()> { + Ok(()) + } + fn fwrite_all(&mut self, _: &[u8]) -> SDSSResult<()> { + Ok(()) + } + fn fsync_all(&mut self) -> SDSSResult<()> { + Ok(()) + } + fn fseek_ahead(&mut self, _: u64) -> SDSSResult<()> { + Ok(()) + } + fn flen(&self) -> SDSSResult { + Ok(0) + } + fn flen_set(&mut self, _: u64) -> SDSSResult<()> { + Ok(()) + } + fn fcursor(&mut self) -> SDSSResult { + Ok(0) + } +} + impl RawFileIOInterface for File { fn fopen_or_create_rw(file_path: &str) -> SDSSResult> { let f = File::options() diff --git a/server/src/engine/storage/v1/test_util.rs b/server/src/engine/storage/v1/test_util.rs index a2df59fe..54452d20 100644 --- a/server/src/engine/storage/v1/test_util.rs +++ b/server/src/engine/storage/v1/test_util.rs @@ -24,10 +24,14 @@ * */ +#[cfg(test)] +use super::{ + header_impl::{FileScope, FileSpecifier, FileSpecifierVersion, HostRunMode}, + rw::{FileOpen, SDSSFileIO}, +}; use { super::{ - header_impl::{FileScope, FileSpecifier, FileSpecifierVersion, HostRunMode}, - rw::{FileOpen, RawFileIOInterface, RawFileOpen, SDSSFileIO}, + rw::{RawFileIOInterface, RawFileOpen}, SDSSResult, }, crate::engine::sync::cell::Lazy, diff --git a/server/src/engine/txn/gns/mod.rs b/server/src/engine/txn/gns/mod.rs index 449edfab..eca3b8f6 100644 --- a/server/src/engine/txn/gns/mod.rs +++ b/server/src/engine/txn/gns/mod.rs @@ -24,6 +24,8 @@ * */ +#[cfg(test)] +use crate::engine::storage::v1::test_util::VirtualFS; use { super::{TransactionError, TransactionResult}, crate::{ @@ -31,8 +33,9 @@ use { core::{space::Space, GlobalNS}, data::uuid::Uuid, storage::v1::{ + self, header_meta, inf::{self, PersistObject}, - BufferedScanner, JournalAdapter, JournalWriter, SDSSResult, + BufferedScanner, JournalAdapter, JournalWriter, RawFileIOInterface, SDSSResult, }, }, util::EndianQW, @@ -54,13 +57,70 @@ pub use { space::{AlterSpaceTxn, CreateSpaceTxn, DropSpaceTxn}, }; +pub type GNSTransactionDriverNullZero = + GNSTransactionDriverAnyFS; +pub type GNSTransactionDriver = GNSTransactionDriverAnyFS; +#[cfg(test)] +pub type GNSTransactionDriverVFS = GNSTransactionDriverAnyFS; + +const CURRENT_LOG_VERSION: u32 = 0; + +pub trait GNSTransactionDriverLLInterface: RawFileIOInterface { + const NONNULL: bool = ::NOTNULL; +} +impl GNSTransactionDriverLLInterface for T {} + #[derive(Debug)] /// The GNS transaction driver is used to handle DDL transactions -pub struct GNSTransactionDriver { - journal: JournalWriter, +pub struct GNSTransactionDriverAnyFS { + journal: JournalWriter, +} + +impl GNSTransactionDriverAnyFS { + pub fn nullzero(gns: &GlobalNS) -> Self { + let journal = v1::open_journal( + "gns.db-tlog", + header_meta::FileSpecifier::GNSTxnLog, + header_meta::FileSpecifierVersion::__new(CURRENT_LOG_VERSION), + 0, + header_meta::HostRunMode::Dev, + 0, + gns, + ) + .unwrap(); + Self { journal } + } + pub fn nullzero_create_exec(gns: &GlobalNS, f: impl FnOnce(&mut Self) -> T) -> T { + let mut j = Self::nullzero(gns); + let r = f(&mut j); + j.close().unwrap(); + r + } } -impl GNSTransactionDriver { +impl GNSTransactionDriverAnyFS { + pub fn close(self) -> TransactionResult<()> { + self.journal + .append_journal_close_and_close() + .map_err(|e| e.into()) + } + pub fn open_or_reinit( + gns: &GlobalNS, + host_setting_version: u32, + host_run_mode: header_meta::HostRunMode, + host_startup_counter: u64, + ) -> TransactionResult { + let journal = v1::open_journal( + "gns.db-tlog", + header_meta::FileSpecifier::GNSTxnLog, + header_meta::FileSpecifierVersion::__new(CURRENT_LOG_VERSION), + host_setting_version, + host_run_mode, + host_startup_counter, + gns, + )?; + Ok(Self { journal }) + } /// Attempts to commit the given event into the journal, handling any possible recovery triggers and returning /// errors (if any) pub fn try_commit(&mut self, gns_event: GE) -> TransactionResult<()> { @@ -89,8 +149,35 @@ impl JournalAdapter for GNSAdapter { fn encode(GNSSuperEvent(b): Self::JournalEvent) -> Box<[u8]> { b } - fn decode_and_update_state(_: &[u8], _: &Self::GlobalState) -> TransactionResult<()> { - todo!() + fn decode_and_update_state(payload: &[u8], gs: &Self::GlobalState) -> TransactionResult<()> { + if payload.len() < 2 { + return Err(TransactionError::DecodedUnexpectedEof); + } + macro_rules! dispatch { + ($($item:ty),* $(,)?) => { + [$(<$item as GNSEvent>::decode_and_update_global_state),*, |_, _| Err(TransactionError::DecodeUnknownTxnOp)] + }; + } + static DISPATCH: [fn(&mut BufferedScanner, &GlobalNS) -> TransactionResult<()>; 9] = dispatch!( + CreateSpaceTxn, + AlterSpaceTxn, + DropSpaceTxn, + CreateModelTxn, + AlterModelAddTxn, + AlterModelRemoveTxn, + AlterModelUpdateTxn, + DropModelTxn + ); + let mut scanner = BufferedScanner::new(&payload); + let opc = unsafe { + // UNSAFE(@ohsayan): + u16::from_le_bytes(scanner.next_chunk()) + }; + match DISPATCH[core::cmp::min(opc as usize, DISPATCH.len())](&mut scanner, gs) { + Ok(()) if scanner.eof() => return Ok(()), + Ok(_) => Err(TransactionError::DecodeCorruptedPayloadMoreBytes), + Err(e) => Err(e), + } } } @@ -121,10 +208,14 @@ where fn encode_super_event(commit: Self, buf: &mut Vec) { inf::enc::enc_full_into_buffer::(buf, commit) } - /// Attempts to decode the event using the given scanner - fn decode_from_super_event( + fn decode_and_update_global_state( scanner: &mut BufferedScanner, - ) -> TransactionResult { + gns: &GlobalNS, + ) -> TransactionResult<()> { + Self::update_global_state(Self::decode(scanner)?, gns) + } + /// Attempts to decode the event using the given scanner + fn decode(scanner: &mut BufferedScanner) -> TransactionResult { inf::dec::dec_full_from_scanner::(scanner).map_err(|e| e.into()) } /// Update the global state from the restored event diff --git a/server/src/engine/txn/gns/tests.rs b/server/src/engine/txn/gns/tests/mod.rs similarity index 100% rename from server/src/engine/txn/gns/tests.rs rename to server/src/engine/txn/gns/tests/mod.rs diff --git a/server/src/engine/txn/mod.rs b/server/src/engine/txn/mod.rs index 8f1e5f99..3d9f9e31 100644 --- a/server/src/engine/txn/mod.rs +++ b/server/src/engine/txn/mod.rs @@ -32,7 +32,12 @@ pub type TransactionResult = Result; #[derive(Debug)] #[cfg_attr(test, derive(PartialEq))] pub enum TransactionError { - SDSSError(SDSSError), + /// corrupted txn payload. has more bytes than expected + DecodeCorruptedPayloadMoreBytes, + /// transaction payload is corrupted. has lesser bytes than expected + DecodedUnexpectedEof, + /// unknown transaction operation. usually indicates a corrupted payload + DecodeUnknownTxnOp, /// While restoring a certain item, a non-resolvable conflict was encountered in the global state, because the item was /// already present (when it was expected to not be present) OnRestoreDataConflictAlreadyExists, @@ -40,6 +45,7 @@ pub enum TransactionError { OnRestoreDataMissing, /// On restore, a certain item that was expected to match a certain value, has a different value OnRestoreDataConflictMismatch, + SDSSError(SDSSError), } direct_from! {