Add create space txn

next
Sayan Nandan 1 year ago
parent 97a9471529
commit ea072f281c
No known key found for this signature in database
GPG Key ID: 42EEDF4AE9D96B54

@ -31,6 +31,7 @@ use {
error::{DatabaseError, DatabaseResult},
idx::{IndexST, STIndex},
ql::ddl::{alt::AlterSpace, crt::CreateSpace, drop::DropSpace},
txn::gns as gnstxn,
},
parking_lot::RwLock,
};
@ -168,15 +169,34 @@ impl Space {
}
impl Space {
/// Execute a `create` stmt
pub fn exec_create(gns: &super::GlobalNS, space: CreateSpace) -> DatabaseResult<()> {
pub fn transactional_exec_create<TI: gnstxn::GNSTransactionDriverLLInterface>(
gns: &super::GlobalNS,
txn_driver: &mut gnstxn::GNSTransactionDriverAnyFS<TI>,
space: CreateSpace,
) -> DatabaseResult<()> {
// process create
let ProcedureCreate { space_name, space } = Self::process_create(space)?;
// acquire access
let mut wl = gns.spaces().write();
if wl.st_insert(space_name, space) {
Ok(())
} else {
Err(DatabaseError::DdlSpaceAlreadyExists)
if wl.st_contains(&space_name) {
return Err(DatabaseError::DdlSpaceAlreadyExists);
}
// commit txn
if TI::NONNULL {
// prepare and commit txn
let s_read = space.metadata().env().read();
txn_driver.try_commit(gnstxn::CreateSpaceTxn::new(&s_read, &space_name, &space))?;
}
// update global state
let _ = wl.st_insert(space_name, space);
Ok(())
}
/// Execute a `create` stmt
#[cfg(test)]
pub fn exec_create(gns: &super::GlobalNS, space: CreateSpace) -> DatabaseResult<()> {
gnstxn::GNSTransactionDriverNullZero::nullzero_create_exec(gns, move |driver| {
Self::transactional_exec_create(gns, driver, space)
})
}
/// Execute a `alter` stmt
pub fn exec_alter(

@ -57,10 +57,33 @@ use {
const CRC: crc::Crc<u32> = crc::Crc::<u32>::new(&crc::CRC_32_ISO_HDLC);
const RECOVERY_BLOCK_AUTO_THRESHOLD: usize = 5;
pub fn open_journal<
TA: JournalAdapter + core::fmt::Debug,
LF: RawFileIOInterface + core::fmt::Debug,
>(
/// A journal to `/dev/null` (app. level impl)
#[cfg(test)]
pub fn null_journal<TA: JournalAdapter>(
log_file_name: &str,
log_kind: FileSpecifier,
log_kind_version: FileSpecifierVersion,
host_setting_version: u32,
host_run_mode: HostRunMode,
host_startup_counter: u64,
_: &TA::GlobalState,
) -> JournalWriter<super::rw::NullZero, TA> {
let FileOpen::Created(journal) = SDSSFileIO::<super::rw::NullZero>::open_or_create_perm_rw(
log_file_name,
FileScope::Journal,
log_kind,
log_kind_version,
host_setting_version,
host_run_mode,
host_startup_counter,
)
.unwrap() else {
panic!()
};
JournalWriter::new(journal, 0, true).unwrap()
}
pub fn open_journal<TA: JournalAdapter, LF: RawFileIOInterface>(
log_file_name: &str,
log_kind: FileSpecifier,
log_kind_version: FileSpecifierVersion,

@ -41,8 +41,11 @@ mod tests;
// re-exports
pub use {
journal::{open_journal, JournalAdapter, JournalWriter},
rw::BufferedScanner,
rw::{BufferedScanner, NullZero, RawFileIOInterface, SDSSFileIO},
};
pub mod header_meta {
pub use super::header_impl::{FileScope, FileSpecifier, FileSpecifierVersion, HostRunMode};
}
use crate::util::os::SysIOError as IoError;

@ -53,6 +53,8 @@ pub enum RawFileOpen<F> {
}
pub trait RawFileIOInterface: Sized {
/// Indicates that the interface is not a `/dev/null` (or related) implementation
const NOTNULL: bool = true;
fn fopen_or_create_rw(file_path: &str) -> SDSSResult<RawFileOpen<Self>>;
fn fread_exact(&mut self, buf: &mut [u8]) -> SDSSResult<()>;
fn fwrite_all(&mut self, bytes: &[u8]) -> SDSSResult<()>;
@ -63,6 +65,37 @@ pub trait RawFileIOInterface: Sized {
fn fcursor(&mut self) -> SDSSResult<u64>;
}
/// This is a kind of file like `/dev/null`. It exists in ... nothing!
pub struct NullZero;
impl RawFileIOInterface for NullZero {
const NOTNULL: bool = false;
fn fopen_or_create_rw(_: &str) -> SDSSResult<RawFileOpen<Self>> {
Ok(RawFileOpen::Created(Self))
}
fn fread_exact(&mut self, _: &mut [u8]) -> SDSSResult<()> {
Ok(())
}
fn fwrite_all(&mut self, _: &[u8]) -> SDSSResult<()> {
Ok(())
}
fn fsync_all(&mut self) -> SDSSResult<()> {
Ok(())
}
fn fseek_ahead(&mut self, _: u64) -> SDSSResult<()> {
Ok(())
}
fn flen(&self) -> SDSSResult<u64> {
Ok(0)
}
fn flen_set(&mut self, _: u64) -> SDSSResult<()> {
Ok(())
}
fn fcursor(&mut self) -> SDSSResult<u64> {
Ok(0)
}
}
impl RawFileIOInterface for File {
fn fopen_or_create_rw(file_path: &str) -> SDSSResult<RawFileOpen<Self>> {
let f = File::options()

@ -24,10 +24,14 @@
*
*/
#[cfg(test)]
use super::{
header_impl::{FileScope, FileSpecifier, FileSpecifierVersion, HostRunMode},
rw::{FileOpen, SDSSFileIO},
};
use {
super::{
header_impl::{FileScope, FileSpecifier, FileSpecifierVersion, HostRunMode},
rw::{FileOpen, RawFileIOInterface, RawFileOpen, SDSSFileIO},
rw::{RawFileIOInterface, RawFileOpen},
SDSSResult,
},
crate::engine::sync::cell::Lazy,

@ -24,6 +24,8 @@
*
*/
#[cfg(test)]
use crate::engine::storage::v1::test_util::VirtualFS;
use {
super::{TransactionError, TransactionResult},
crate::{
@ -31,8 +33,9 @@ use {
core::{space::Space, GlobalNS},
data::uuid::Uuid,
storage::v1::{
self, header_meta,
inf::{self, PersistObject},
BufferedScanner, JournalAdapter, JournalWriter, SDSSResult,
BufferedScanner, JournalAdapter, JournalWriter, RawFileIOInterface, SDSSResult,
},
},
util::EndianQW,
@ -54,13 +57,70 @@ pub use {
space::{AlterSpaceTxn, CreateSpaceTxn, DropSpaceTxn},
};
pub type GNSTransactionDriverNullZero =
GNSTransactionDriverAnyFS<crate::engine::storage::v1::NullZero>;
pub type GNSTransactionDriver = GNSTransactionDriverAnyFS<File>;
#[cfg(test)]
pub type GNSTransactionDriverVFS = GNSTransactionDriverAnyFS<VirtualFS>;
const CURRENT_LOG_VERSION: u32 = 0;
pub trait GNSTransactionDriverLLInterface: RawFileIOInterface {
const NONNULL: bool = <Self as RawFileIOInterface>::NOTNULL;
}
impl<T: RawFileIOInterface> GNSTransactionDriverLLInterface for T {}
#[derive(Debug)]
/// The GNS transaction driver is used to handle DDL transactions
pub struct GNSTransactionDriver {
journal: JournalWriter<File, GNSAdapter>,
pub struct GNSTransactionDriverAnyFS<F = File> {
journal: JournalWriter<F, GNSAdapter>,
}
impl GNSTransactionDriverAnyFS<crate::engine::storage::v1::NullZero> {
pub fn nullzero(gns: &GlobalNS) -> Self {
let journal = v1::open_journal(
"gns.db-tlog",
header_meta::FileSpecifier::GNSTxnLog,
header_meta::FileSpecifierVersion::__new(CURRENT_LOG_VERSION),
0,
header_meta::HostRunMode::Dev,
0,
gns,
)
.unwrap();
Self { journal }
}
pub fn nullzero_create_exec<T>(gns: &GlobalNS, f: impl FnOnce(&mut Self) -> T) -> T {
let mut j = Self::nullzero(gns);
let r = f(&mut j);
j.close().unwrap();
r
}
}
impl GNSTransactionDriver {
impl<F: GNSTransactionDriverLLInterface> GNSTransactionDriverAnyFS<F> {
pub fn close(self) -> TransactionResult<()> {
self.journal
.append_journal_close_and_close()
.map_err(|e| e.into())
}
pub fn open_or_reinit(
gns: &GlobalNS,
host_setting_version: u32,
host_run_mode: header_meta::HostRunMode,
host_startup_counter: u64,
) -> TransactionResult<Self> {
let journal = v1::open_journal(
"gns.db-tlog",
header_meta::FileSpecifier::GNSTxnLog,
header_meta::FileSpecifierVersion::__new(CURRENT_LOG_VERSION),
host_setting_version,
host_run_mode,
host_startup_counter,
gns,
)?;
Ok(Self { journal })
}
/// Attempts to commit the given event into the journal, handling any possible recovery triggers and returning
/// errors (if any)
pub fn try_commit<GE: GNSEvent>(&mut self, gns_event: GE) -> TransactionResult<()> {
@ -89,8 +149,35 @@ impl JournalAdapter for GNSAdapter {
fn encode(GNSSuperEvent(b): Self::JournalEvent) -> Box<[u8]> {
b
}
fn decode_and_update_state(_: &[u8], _: &Self::GlobalState) -> TransactionResult<()> {
todo!()
fn decode_and_update_state(payload: &[u8], gs: &Self::GlobalState) -> TransactionResult<()> {
if payload.len() < 2 {
return Err(TransactionError::DecodedUnexpectedEof);
}
macro_rules! dispatch {
($($item:ty),* $(,)?) => {
[$(<$item as GNSEvent>::decode_and_update_global_state),*, |_, _| Err(TransactionError::DecodeUnknownTxnOp)]
};
}
static DISPATCH: [fn(&mut BufferedScanner, &GlobalNS) -> TransactionResult<()>; 9] = dispatch!(
CreateSpaceTxn,
AlterSpaceTxn,
DropSpaceTxn,
CreateModelTxn,
AlterModelAddTxn,
AlterModelRemoveTxn,
AlterModelUpdateTxn,
DropModelTxn
);
let mut scanner = BufferedScanner::new(&payload);
let opc = unsafe {
// UNSAFE(@ohsayan):
u16::from_le_bytes(scanner.next_chunk())
};
match DISPATCH[core::cmp::min(opc as usize, DISPATCH.len())](&mut scanner, gs) {
Ok(()) if scanner.eof() => return Ok(()),
Ok(_) => Err(TransactionError::DecodeCorruptedPayloadMoreBytes),
Err(e) => Err(e),
}
}
}
@ -121,10 +208,14 @@ where
fn encode_super_event(commit: Self, buf: &mut Vec<u8>) {
inf::enc::enc_full_into_buffer::<Self>(buf, commit)
}
/// Attempts to decode the event using the given scanner
fn decode_from_super_event(
fn decode_and_update_global_state(
scanner: &mut BufferedScanner,
) -> TransactionResult<Self::RestoreType> {
gns: &GlobalNS,
) -> TransactionResult<()> {
Self::update_global_state(Self::decode(scanner)?, gns)
}
/// Attempts to decode the event using the given scanner
fn decode(scanner: &mut BufferedScanner) -> TransactionResult<Self::RestoreType> {
inf::dec::dec_full_from_scanner::<Self>(scanner).map_err(|e| e.into())
}
/// Update the global state from the restored event

@ -32,7 +32,12 @@ pub type TransactionResult<T> = Result<T, TransactionError>;
#[derive(Debug)]
#[cfg_attr(test, derive(PartialEq))]
pub enum TransactionError {
SDSSError(SDSSError),
/// corrupted txn payload. has more bytes than expected
DecodeCorruptedPayloadMoreBytes,
/// transaction payload is corrupted. has lesser bytes than expected
DecodedUnexpectedEof,
/// unknown transaction operation. usually indicates a corrupted payload
DecodeUnknownTxnOp,
/// While restoring a certain item, a non-resolvable conflict was encountered in the global state, because the item was
/// already present (when it was expected to not be present)
OnRestoreDataConflictAlreadyExists,
@ -40,6 +45,7 @@ pub enum TransactionError {
OnRestoreDataMissing,
/// On restore, a certain item that was expected to match a certain value, has a different value
OnRestoreDataConflictMismatch,
SDSSError(SDSSError),
}
direct_from! {

Loading…
Cancel
Save