From ea678e2c9d15b65a97cb0082b7cb32eae4694ff4 Mon Sep 17 00:00:00 2001 From: Sayan Nandan Date: Sat, 10 Feb 2024 18:57:42 +0530 Subject: [PATCH] Refactor storage and txn modules --- server/src/engine/core/model/alt.rs | 19 +- server/src/engine/core/model/mod.rs | 6 +- server/src/engine/core/space.rs | 14 +- server/src/engine/fractal/drivers.rs | 6 +- server/src/engine/fractal/mod.rs | 4 +- server/src/engine/fractal/test_utils.rs | 4 +- server/src/engine/macros.rs | 4 +- .../storage/common/sdss/impls/sdss_r1/rw.rs | 2 +- server/src/engine/storage/v1/impls/gns/mod.rs | 217 ++++++ .../src/engine/storage/v1/impls/gns/model.rs | 610 +++++++++++++++++ .../src/engine/storage/v1/impls/gns/space.rs | 261 ++++++++ .../v1/impls}/gns/tests/full_chain.rs | 0 .../{txn => storage/v1/impls}/gns/tests/io.rs | 24 +- .../v1/impls}/gns/tests/mod.rs | 0 server/src/engine/storage/v1/impls/mod.rs | 27 + server/src/engine/storage/v1/loader.rs | 8 +- server/src/engine/storage/v1/mod.rs | 29 +- .../storage/v1/{ => raw}/batch_jrnl/mod.rs | 0 .../v1/{ => raw}/batch_jrnl/persist.rs | 7 +- .../v1/{ => raw}/batch_jrnl/restore.rs | 13 +- .../engine/storage/v1/{ => raw}/journal.rs | 5 +- .../src/engine/storage/v1/{ => raw}/memfs.rs | 0 server/src/engine/storage/v1/raw/mod.rs | 34 + server/src/engine/storage/v1/{ => raw}/rw.rs | 0 .../src/engine/storage/v1/{ => raw}/spec.rs | 12 +- .../src/engine/storage/v1/{ => raw}/sysdb.rs | 7 +- .../src/engine/storage/v1/{ => raw}/tests.rs | 0 .../storage/v1/{ => raw}/tests/batch.rs | 8 +- .../engine/storage/v1/{ => raw}/tests/rw.rs | 2 +- .../engine/storage/v1/{ => raw}/tests/tx.rs | 2 +- server/src/engine/storage/v2/mod.rs | 3 +- .../storage/v2/{ => raw}/journal/mod.rs | 25 + .../storage/v2/{ => raw}/journal/raw/mod.rs | 13 +- .../storage/v2/{ => raw}/journal/raw/tests.rs | 8 +- .../storage/v2/{ => raw}/journal/tests.rs | 2 +- server/src/engine/storage/v2/raw/mod.rs | 28 + .../src/engine/storage/v2/{ => raw}/spec.rs | 0 server/src/engine/txn/gns/mod.rs | 222 +------ server/src/engine/txn/gns/model.rs | 625 +----------------- server/src/engine/txn/gns/space.rs | 256 +------ server/src/engine/txn/mod.rs | 5 +- server/src/engine/txn/shared.rs | 100 +++ 42 files changed, 1468 insertions(+), 1144 deletions(-) create mode 100644 server/src/engine/storage/v1/impls/gns/mod.rs create mode 100644 server/src/engine/storage/v1/impls/gns/model.rs create mode 100644 server/src/engine/storage/v1/impls/gns/space.rs rename server/src/engine/{txn => storage/v1/impls}/gns/tests/full_chain.rs (100%) rename server/src/engine/{txn => storage/v1/impls}/gns/tests/io.rs (93%) rename server/src/engine/{txn => storage/v1/impls}/gns/tests/mod.rs (100%) create mode 100644 server/src/engine/storage/v1/impls/mod.rs rename server/src/engine/storage/v1/{ => raw}/batch_jrnl/mod.rs (100%) rename server/src/engine/storage/v1/{ => raw}/batch_jrnl/persist.rs (98%) rename server/src/engine/storage/v1/{ => raw}/batch_jrnl/restore.rs (98%) rename server/src/engine/storage/v1/{ => raw}/journal.rs (99%) rename server/src/engine/storage/v1/{ => raw}/memfs.rs (100%) create mode 100644 server/src/engine/storage/v1/raw/mod.rs rename server/src/engine/storage/v1/{ => raw}/rw.rs (100%) rename server/src/engine/storage/v1/{ => raw}/spec.rs (94%) rename server/src/engine/storage/v1/{ => raw}/sysdb.rs (98%) rename server/src/engine/storage/v1/{ => raw}/tests.rs (100%) rename server/src/engine/storage/v1/{ => raw}/tests/batch.rs (98%) rename server/src/engine/storage/v1/{ => raw}/tests/rw.rs (97%) rename server/src/engine/storage/v1/{ => raw}/tests/tx.rs (99%) rename server/src/engine/storage/v2/{ => raw}/journal/mod.rs (86%) rename server/src/engine/storage/v2/{ => raw}/journal/raw/mod.rs (98%) rename server/src/engine/storage/v2/{ => raw}/journal/raw/tests.rs (98%) rename server/src/engine/storage/v2/{ => raw}/journal/tests.rs (98%) create mode 100644 server/src/engine/storage/v2/raw/mod.rs rename server/src/engine/storage/v2/{ => raw}/spec.rs (100%) create mode 100644 server/src/engine/txn/shared.rs diff --git a/server/src/engine/core/model/alt.rs b/server/src/engine/core/model/alt.rs index 3a8dbabc..ffbe7c3b 100644 --- a/server/src/engine/core/model/alt.rs +++ b/server/src/engine/core/model/alt.rs @@ -43,7 +43,7 @@ use { }, lex::Ident, }, - txn::gns as gnstxn, + txn::{gns, ModelIDRef}, }, util, }, @@ -268,13 +268,8 @@ impl Model { // TODO(@ohsayan): this impacts lockdown duration; fix it if G::FS_IS_NON_NULL { // prepare txn - let txn = gnstxn::AlterModelAddTxn::new( - gnstxn::ModelIDRef::new_ref( - &space_name, - &space, - &model_name, - model, - ), + let txn = gns::model::AlterModelAddTxn::new( + ModelIDRef::new_ref(&space_name, &space, &model_name, model), &new_fields, ); // commit txn @@ -291,8 +286,8 @@ impl Model { AlterAction::Remove(removed) => { if G::FS_IS_NON_NULL { // prepare txn - let txn = gnstxn::AlterModelRemoveTxn::new( - gnstxn::ModelIDRef::new_ref(&space_name, space, &model_name, model), + let txn = gns::model::AlterModelRemoveTxn::new( + ModelIDRef::new_ref(&space_name, space, &model_name, model), &removed, ); // commit txn @@ -306,8 +301,8 @@ impl Model { AlterAction::Update(updated) => { if G::FS_IS_NON_NULL { // prepare txn - let txn = gnstxn::AlterModelUpdateTxn::new( - gnstxn::ModelIDRef::new_ref(&space_name, space, &model_name, model), + let txn = gns::model::AlterModelUpdateTxn::new( + ModelIDRef::new_ref(&space_name, space, &model_name, model), &updated, ); // commit txn diff --git a/server/src/engine/core/model/mod.rs b/server/src/engine/core/model/mod.rs index 5bf003e7..9bf6a6ba 100644 --- a/server/src/engine/core/model/mod.rs +++ b/server/src/engine/core/model/mod.rs @@ -44,7 +44,7 @@ use { drop::DropModel, syn::{FieldSpec, LayerSpec}, }, - txn::gns::{self as gnstxn, SpaceIDRef}, + txn::{gns, ModelIDRef, SpaceIDRef}, }, std::collections::hash_map::{Entry, HashMap}, }; @@ -281,7 +281,7 @@ impl Model { if G::FS_IS_NON_NULL { let mut txn_driver = global.namespace_txn_driver().lock(); // prepare txn - let txn = gnstxn::CreateModelTxn::new( + let txn = gns::model::CreateModelTxn::new( SpaceIDRef::new(&space_name, &space), &model_name, &model, @@ -351,7 +351,7 @@ impl Model { // okay this is looking good for us if G::FS_IS_NON_NULL { // prepare txn - let txn = gnstxn::DropModelTxn::new(gnstxn::ModelIDRef::new( + let txn = gns::model::DropModelTxn::new(ModelIDRef::new( SpaceIDRef::new(&space_name, &space), &model_name, model.get_uuid(), diff --git a/server/src/engine/core/space.rs b/server/src/engine/core/space.rs index d4e71ec2..85619d17 100644 --- a/server/src/engine/core/space.rs +++ b/server/src/engine/core/space.rs @@ -34,7 +34,7 @@ use { idx::STIndex, ql::ddl::{alt::AlterSpace, crt::CreateSpace, drop::DropSpace}, storage::{safe_interfaces::FSInterface, v1::loader::SEInitState}, - txn::gns as gnstxn, + txn::{self, SpaceIDRef}, }, std::collections::HashSet, }; @@ -167,7 +167,7 @@ impl Space { // commit txn if G::FS_IS_NON_NULL { // prepare txn - let txn = gnstxn::CreateSpaceTxn::new(space.props(), &space_name, &space); + let txn = txn::gns::space::CreateSpaceTxn::new(space.props(), &space_name, &space); // try to create space for...the space G::FileSystem::fs_create_dir_all(&SEInitState::space_dir( &space_name, @@ -216,8 +216,10 @@ impl Space { }; if G::FS_IS_NON_NULL { // prepare txn - let txn = - gnstxn::AlterSpaceTxn::new(gnstxn::SpaceIDRef::new(&space_name, space), &patch); + let txn = txn::gns::space::AlterSpaceTxn::new( + SpaceIDRef::new(&space_name, space), + &patch, + ); // commit global.namespace_txn_driver().lock().try_commit(txn)?; } @@ -252,7 +254,7 @@ impl Space { if G::FS_IS_NON_NULL { // prepare txn let txn = - gnstxn::DropSpaceTxn::new(gnstxn::SpaceIDRef::new(&space_name, &space)); + txn::gns::space::DropSpaceTxn::new(SpaceIDRef::new(&space_name, &space)); // commit txn global.namespace_txn_driver().lock().try_commit(txn)?; // request cleanup @@ -299,7 +301,7 @@ impl Space { if G::FS_IS_NON_NULL { // prepare txn let txn = - gnstxn::DropSpaceTxn::new(gnstxn::SpaceIDRef::new(&space_name, &space)); + txn::gns::space::DropSpaceTxn::new(SpaceIDRef::new(&space_name, &space)); // commit txn global.namespace_txn_driver().lock().try_commit(txn)?; // request cleanup diff --git a/server/src/engine/fractal/drivers.rs b/server/src/engine/fractal/drivers.rs index 6f8f43c7..c1530a64 100644 --- a/server/src/engine/fractal/drivers.rs +++ b/server/src/engine/fractal/drivers.rs @@ -28,8 +28,10 @@ use { super::util, crate::engine::{ error::RuntimeResult, - storage::{safe_interfaces::FSInterface, v1::data_batch::DataBatchPersistDriver}, - txn::gns::GNSTransactionDriverAnyFS, + storage::{ + safe_interfaces::FSInterface, + v1::{DataBatchPersistDriver, GNSTransactionDriverAnyFS}, + }, }, parking_lot::Mutex, std::sync::Arc, diff --git a/server/src/engine/fractal/mod.rs b/server/src/engine/fractal/mod.rs index 4d3520e1..3654fa17 100644 --- a/server/src/engine/fractal/mod.rs +++ b/server/src/engine/fractal/mod.rs @@ -32,8 +32,8 @@ use { storage::{ self, safe_interfaces::{FSInterface, LocalFS}, + v1::GNSTransactionDriverAnyFS, }, - txn::gns::GNSTransactionDriverAnyFS, }, crate::engine::error::RuntimeResult, parking_lot::{Mutex, RwLock}, @@ -215,7 +215,7 @@ impl GlobalInstanceLike for Global { ))?; // init driver let driver = - storage::v1::data_batch::create(&storage::v1::loader::SEInitState::model_path( + storage::v1::create_batch_journal(&storage::v1::loader::SEInitState::model_path( space_name, space_uuid, model_name, model_uuid, ))?; self.get_state().mdl_driver.write().insert( diff --git a/server/src/engine/fractal/test_utils.rs b/server/src/engine/fractal/test_utils.rs index d2566c11..2ed870e6 100644 --- a/server/src/engine/fractal/test_utils.rs +++ b/server/src/engine/fractal/test_utils.rs @@ -35,8 +35,8 @@ use { storage::{ self, safe_interfaces::{FSInterface, NullFS, VirtualFS}, + v1::GNSTransactionDriverAnyFS, }, - txn::gns::GNSTransactionDriverAnyFS, }, parking_lot::{Mutex, RwLock}, std::collections::HashMap, @@ -148,7 +148,7 @@ impl GlobalInstanceLike for TestGlobal { space_name, space_uuid, model_name, model_uuid, ))?; let driver = - storage::v1::data_batch::create(&storage::v1::loader::SEInitState::model_path( + storage::v1::create_batch_journal(&storage::v1::loader::SEInitState::model_path( space_name, space_uuid, model_name, model_uuid, ))?; self.model_drivers.write().insert( diff --git a/server/src/engine/macros.rs b/server/src/engine/macros.rs index a56850be..eb36d00a 100644 --- a/server/src/engine/macros.rs +++ b/server/src/engine/macros.rs @@ -444,7 +444,7 @@ macro_rules! closure { #[cfg(test)] macro_rules! array { ($($(#[$attr:meta])* $vis:vis const $name:ident: [$ty:ty] = [$($expr:expr),* $(,)?]);* $(;)?) => { - $(#[allow(non_snake_case)] mod $name {pub(super) const LEN: usize = { let mut i = 0;$(let _ = $expr; i += 1;)*i += 0; i};} - $(#[$attr])* $vis const $name: [$ty; $name::LEN] = [$($expr),*];)* + $(#[allow(non_snake_case)]mod$name{pub(super)const LEN:usize={let mut i=0;$(let _=$expr;i+=1;)*i+=0;i};} + $(#[$attr])*$vis const$name:[$ty;$name::LEN]=[$($expr),*];)* } } diff --git a/server/src/engine/storage/common/sdss/impls/sdss_r1/rw.rs b/server/src/engine/storage/common/sdss/impls/sdss_r1/rw.rs index d048d9c5..4af88cd1 100644 --- a/server/src/engine/storage/common/sdss/impls/sdss_r1/rw.rs +++ b/server/src/engine/storage/common/sdss/impls/sdss_r1/rw.rs @@ -519,7 +519,7 @@ impl< fn check_vfs_buffering() { use crate::engine::storage::{ common::interface::fs_test::{VFileDescriptor, VirtualFS}, - v2::spec::{Header, SystemDatabaseV1}, + v2::raw::spec::{Header, SystemDatabaseV1}, }; fn rawfile() -> Vec { VirtualFS::fetch_raw_data("myfile").unwrap() diff --git a/server/src/engine/storage/v1/impls/gns/mod.rs b/server/src/engine/storage/v1/impls/gns/mod.rs new file mode 100644 index 00000000..faa60458 --- /dev/null +++ b/server/src/engine/storage/v1/impls/gns/mod.rs @@ -0,0 +1,217 @@ +/* + * Created on Sun Aug 20 2023 + * + * This file is a part of Skytable + * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source + * NoSQL database written by Sayan Nandan ("the Author") with the + * vision to provide flexibility in data modelling without compromising + * on performance, queryability or scalability. + * + * Copyright (c) 2023, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +use { + super::super::raw::journal::{JournalAdapter, JournalWriter}, + crate::{ + engine::{ + core::GlobalNS, + data::uuid::Uuid, + error::{RuntimeResult, TransactionError}, + mem::BufferedScanner, + storage::{ + safe_interfaces::{FSInterface, LocalFS}, + v1::inf::{self, PersistObject}, + }, + txn::{gns, SpaceIDRef}, + }, + util::EndianQW, + }, + std::marker::PhantomData, +}; + +mod model; +mod space; +// test +#[cfg(test)] +mod tests; + +/// The GNS transaction driver is used to handle DDL transactions +pub struct GNSTransactionDriverAnyFS { + journal: JournalWriter, +} + +impl GNSTransactionDriverAnyFS { + pub fn new(journal: JournalWriter) -> Self { + Self { journal } + } + pub fn into_inner(self) -> JournalWriter { + self.journal + } + pub fn __journal_mut(&mut self) -> &mut JournalWriter { + &mut self.journal + } + /// Attempts to commit the given event into the journal, handling any possible recovery triggers and returning + /// errors (if any) + pub fn try_commit(&mut self, gns_event: GE) -> RuntimeResult<()> { + let mut buf = vec![]; + buf.extend(GE::OPC.to_le_bytes()); + GE::encode_super_event(gns_event, &mut buf); + self.journal + .append_event_with_recovery_plugin(GNSSuperEvent(buf.into_boxed_slice()))?; + Ok(()) + } +} + +/* + journal implementor +*/ + +/// the journal adapter for DDL queries on the GNS +#[derive(Debug)] +pub struct GNSAdapter; + +impl JournalAdapter for GNSAdapter { + const RECOVERY_PLUGIN: bool = true; + type JournalEvent = GNSSuperEvent; + type GlobalState = GlobalNS; + type Error = crate::engine::fractal::error::Error; + fn encode(GNSSuperEvent(b): Self::JournalEvent) -> Box<[u8]> { + b + } + fn decode_and_update_state(payload: &[u8], gs: &Self::GlobalState) -> RuntimeResult<()> { + if payload.len() < 2 { + return Err(TransactionError::DecodedUnexpectedEof.into()); + } + macro_rules! dispatch { + ($($item:ty),* $(,)?) => { + [$(<$item as GNSEvent>::decode_and_update_global_state),*, |_, _| Err(TransactionError::DecodeUnknownTxnOp.into())] + }; + } + static DISPATCH: [fn(&mut BufferedScanner, &GlobalNS) -> RuntimeResult<()>; 9] = dispatch!( + gns::space::CreateSpaceTxn, + gns::space::AlterSpaceTxn, + gns::space::DropSpaceTxn, + gns::model::CreateModelTxn, + gns::model::AlterModelAddTxn, + gns::model::AlterModelRemoveTxn, + gns::model::AlterModelUpdateTxn, + gns::model::DropModelTxn + ); + let mut scanner = BufferedScanner::new(&payload); + let opc = unsafe { + // UNSAFE(@ohsayan): + u16::from_le_bytes(scanner.next_chunk()) + }; + match DISPATCH[(opc as usize).min(DISPATCH.len())](&mut scanner, gs) { + Ok(()) if scanner.eof() => return Ok(()), + Ok(_) => Err(TransactionError::DecodeCorruptedPayloadMoreBytes.into()), + Err(e) => Err(e), + } + } +} + +/* + Events + --- + FIXME(@ohsayan): In the current impl, we unnecessarily use an intermediary buffer which we clearly don't need to (and also makes + pointless allocations). We need to fix this, but with a consistent API (and preferably not something like commit_*(...) unless + we have absolutely no other choice) + --- + [OPC:2B][PAYLOAD] +*/ + +pub struct GNSSuperEvent(Box<[u8]>); + +/// Definition for an event in the GNS (DDL queries) +pub trait GNSEvent +where + Self: PersistObject + Sized, +{ + /// OPC for the event (unique) + const OPC: u16; + /// Expected type for a commit + type CommitType; + /// Expected type for a restore + type RestoreType; + /// Encodes the event into the given buffer + fn encode_super_event(commit: Self, buf: &mut Vec) { + inf::enc::enc_full_into_buffer::(buf, commit) + } + fn decode_and_update_global_state( + scanner: &mut BufferedScanner, + gns: &GlobalNS, + ) -> RuntimeResult<()> { + Self::update_global_state(Self::decode(scanner)?, gns) + } + /// Attempts to decode the event using the given scanner + fn decode(scanner: &mut BufferedScanner) -> RuntimeResult { + inf::dec::dec_full_from_scanner::(scanner).map_err(|e| e.into()) + } + /// Update the global state from the restored event + fn update_global_state(restore: Self::RestoreType, gns: &GlobalNS) -> RuntimeResult<()>; +} + +#[derive(Debug, PartialEq)] +pub struct SpaceIDRes { + uuid: Uuid, + name: Box, +} + +impl SpaceIDRes { + #[cfg(test)] + pub fn new(uuid: Uuid, name: Box) -> Self { + Self { uuid, name } + } +} +struct SpaceID<'a>(PhantomData>); +pub struct SpaceIDMD { + uuid: Uuid, + space_name_l: u64, +} + +impl<'a> PersistObject for SpaceID<'a> { + const METADATA_SIZE: usize = sizeof!(u128) + sizeof!(u64); + type InputType = SpaceIDRef<'a>; + type OutputType = SpaceIDRes; + type Metadata = SpaceIDMD; + fn pretest_can_dec_object(scanner: &BufferedScanner, md: &Self::Metadata) -> bool { + scanner.has_left(md.space_name_l as usize) + } + fn meta_enc(buf: &mut Vec, data: Self::InputType) { + buf.extend(data.uuid().to_le_bytes()); + buf.extend(data.name().len().u64_bytes_le()); + } + unsafe fn meta_dec(scanner: &mut BufferedScanner) -> RuntimeResult { + Ok(SpaceIDMD { + uuid: Uuid::from_bytes(scanner.next_chunk()), + space_name_l: scanner.next_u64_le(), + }) + } + fn obj_enc(buf: &mut Vec, data: Self::InputType) { + buf.extend(data.name().as_bytes()); + } + unsafe fn obj_dec( + s: &mut BufferedScanner, + md: Self::Metadata, + ) -> RuntimeResult { + let str = inf::dec::utils::decode_string(s, md.space_name_l as usize)?; + Ok(SpaceIDRes { + uuid: md.uuid, + name: str.into_boxed_str(), + }) + } +} diff --git a/server/src/engine/storage/v1/impls/gns/model.rs b/server/src/engine/storage/v1/impls/gns/model.rs new file mode 100644 index 00000000..d2adc78b --- /dev/null +++ b/server/src/engine/storage/v1/impls/gns/model.rs @@ -0,0 +1,610 @@ +/* + * Created on Wed Aug 23 2023 + * + * This file is a part of Skytable + * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source + * NoSQL database written by Sayan Nandan ("the Author") with the + * vision to provide flexibility in data modelling without compromising + * on performance, queryability or scalability. + * + * Copyright (c) 2023, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +use { + super::GNSEvent, + crate::{ + engine::{ + core::{ + model::{Field, Model}, + space::Space, + EntityID, EntityIDRef, GlobalNS, + }, + data::uuid::Uuid, + error::{RuntimeResult, StorageError, TransactionError}, + idx::{IndexSTSeqCns, STIndex, STIndexSeq}, + mem::BufferedScanner, + storage::v1::inf::{self, map, obj, PersistObject}, + txn::{ + gns::model::{ + AlterModelAddTxn, AlterModelRemoveTxn, AlterModelUpdateTxn, CreateModelTxn, + DropModelTxn, + }, + ModelIDRef, + }, + }, + util::EndianQW, + }, + core::marker::PhantomData, +}; + +pub struct ModelID<'a>(PhantomData<&'a ()>); + +#[derive(Debug, PartialEq)] +pub struct ModelIDRes { + space_id: super::SpaceIDRes, + model_name: Box, + model_uuid: Uuid, + model_version: u64, +} + +impl ModelIDRes { + #[cfg(test)] + pub fn new( + space_id: super::SpaceIDRes, + model_name: Box, + model_uuid: Uuid, + model_version: u64, + ) -> Self { + Self { + space_id, + model_name, + model_uuid, + model_version, + } + } +} +pub struct ModelIDMD { + space_id: super::SpaceIDMD, + model_name_l: u64, + model_version: u64, + model_uuid: Uuid, +} + +impl<'a> PersistObject for ModelID<'a> { + const METADATA_SIZE: usize = + sizeof!(u64, 2) + sizeof!(u128) + ::METADATA_SIZE; + type InputType = ModelIDRef<'a>; + type OutputType = ModelIDRes; + type Metadata = ModelIDMD; + fn pretest_can_dec_object(scanner: &BufferedScanner, md: &Self::Metadata) -> bool { + scanner.has_left(md.model_name_l as usize + md.space_id.space_name_l as usize) + } + fn meta_enc(buf: &mut Vec, data: Self::InputType) { + ::meta_enc(buf, data.space_id()); + buf.extend(data.model_name().len().u64_bytes_le()); + buf.extend(data.model_version().to_le_bytes()); + buf.extend(data.model_uuid().to_le_bytes()); + } + unsafe fn meta_dec(scanner: &mut BufferedScanner) -> RuntimeResult { + Ok(ModelIDMD { + space_id: ::meta_dec(scanner)?, + model_name_l: scanner.next_u64_le(), + model_version: scanner.next_u64_le(), + model_uuid: Uuid::from_bytes(scanner.next_chunk()), + }) + } + fn obj_enc(buf: &mut Vec, data: Self::InputType) { + ::obj_enc(buf, data.space_id()); + buf.extend(data.model_name().as_bytes()); + } + unsafe fn obj_dec( + s: &mut BufferedScanner, + md: Self::Metadata, + ) -> RuntimeResult { + Ok(ModelIDRes { + space_id: ::obj_dec(s, md.space_id)?, + model_name: inf::dec::utils::decode_string(s, md.model_name_l as usize)? + .into_boxed_str(), + model_uuid: md.model_uuid, + model_version: md.model_version, + }) + } +} + +fn with_space( + gns: &GlobalNS, + space_id: &super::SpaceIDRes, + f: impl FnOnce(&Space) -> RuntimeResult, +) -> RuntimeResult { + let spaces = gns.idx().read(); + let Some(space) = spaces.st_get(&space_id.name) else { + return Err(TransactionError::OnRestoreDataMissing.into()); + }; + if space.get_uuid() != space_id.uuid { + return Err(TransactionError::OnRestoreDataConflictMismatch.into()); + } + f(&space) +} + +fn with_space_mut( + gns: &GlobalNS, + space_id: &super::SpaceIDRes, + mut f: impl FnMut(&mut Space) -> RuntimeResult, +) -> RuntimeResult { + let mut spaces = gns.idx().write(); + let Some(space) = spaces.st_get_mut(&space_id.name) else { + return Err(TransactionError::OnRestoreDataMissing.into()); + }; + if space.get_uuid() != space_id.uuid { + return Err(TransactionError::OnRestoreDataConflictMismatch.into()); + } + f(space) +} + +fn with_model_mut( + gns: &GlobalNS, + space_id: &super::SpaceIDRes, + model_id: &ModelIDRes, + f: impl FnOnce(&mut Model) -> RuntimeResult, +) -> RuntimeResult { + with_space(gns, space_id, |_| { + let mut models = gns.idx_models().write(); + let Some(model) = models.get_mut(&EntityIDRef::new(&space_id.name, &model_id.model_name)) + else { + return Err(TransactionError::OnRestoreDataMissing.into()); + }; + if model.get_uuid() != model_id.model_uuid { + // this should have been handled by an earlier transaction + return Err(TransactionError::OnRestoreDataConflictMismatch.into()); + } + f(model) + }) +} + +/* + create model +*/ + +#[derive(Debug)] +#[cfg_attr(test, derive(PartialEq))] +pub struct CreateModelTxnRestorePL { + pub(super) space_id: super::SpaceIDRes, + pub(super) model_name: Box, + pub(super) model: Model, +} + +pub struct CreateModelTxnMD { + space_id_meta: super::SpaceIDMD, + model_name_l: u64, + model_meta: as PersistObject>::Metadata, +} + +impl<'a> PersistObject for CreateModelTxn<'a> { + const METADATA_SIZE: usize = ::METADATA_SIZE + + sizeof!(u64) + + as PersistObject>::METADATA_SIZE; + type InputType = CreateModelTxn<'a>; + type OutputType = CreateModelTxnRestorePL; + type Metadata = CreateModelTxnMD; + fn pretest_can_dec_object(scanner: &BufferedScanner, md: &Self::Metadata) -> bool { + scanner.has_left((md.model_meta.p_key_len() + md.model_name_l) as usize) + } + fn meta_enc(buf: &mut Vec, data: Self::InputType) { + // space ID + ::meta_enc(buf, data.space_id()); + // model name + buf.extend(data.model_name().len().u64_bytes_le()); + // model meta dump + ::meta_enc( + buf, + obj::ModelLayoutRef::from(data.model()), + ) + } + unsafe fn meta_dec(scanner: &mut BufferedScanner) -> RuntimeResult { + let space_id = ::meta_dec(scanner)?; + let model_name_l = scanner.next_u64_le(); + let model_meta = ::meta_dec(scanner)?; + Ok(CreateModelTxnMD { + space_id_meta: space_id, + model_name_l, + model_meta, + }) + } + fn obj_enc(buf: &mut Vec, data: Self::InputType) { + // space id dump + ::obj_enc(buf, data.space_id()); + // model name + buf.extend(data.model_name().as_bytes()); + // model dump + ::obj_enc( + buf, + obj::ModelLayoutRef::from(data.model()), + ) + } + unsafe fn obj_dec( + s: &mut BufferedScanner, + md: Self::Metadata, + ) -> RuntimeResult { + let space_id = ::obj_dec(s, md.space_id_meta)?; + let model_name = + inf::dec::utils::decode_string(s, md.model_name_l as usize)?.into_boxed_str(); + let model = ::obj_dec(s, md.model_meta)?; + Ok(CreateModelTxnRestorePL { + space_id, + model_name, + model, + }) + } +} + +impl<'a> GNSEvent for CreateModelTxn<'a> { + const OPC: u16 = 3; + type CommitType = CreateModelTxn<'a>; + type RestoreType = CreateModelTxnRestorePL; + fn update_global_state( + CreateModelTxnRestorePL { + space_id, + model_name, + model, + }: Self::RestoreType, + gns: &GlobalNS, + ) -> RuntimeResult<()> { + /* + NOTE(@ohsayan): + A jump to the second branch is practically impossible and should be caught long before we actually end up + here (due to mismatched checksums), but might be theoretically possible because the cosmic rays can be wild + (or well magnetic stuff arounding spinning disks). But we just want to be extra sure. Don't let the aliens (or + rather, radiation) from the cosmos deter us! + */ + let mut spaces = gns.idx().write(); + let mut models = gns.idx_models().write(); + let Some(space) = spaces.get_mut(&space_id.name) else { + return Err(TransactionError::OnRestoreDataMissing.into()); + }; + if space.models().contains(&model_name) { + return Err(TransactionError::OnRestoreDataConflictAlreadyExists.into()); + } + if models + .insert(EntityID::new(&space_id.name, &model_name), model) + .is_some() + { + return Err(TransactionError::OnRestoreDataConflictMismatch.into()); + } + space.models_mut().insert(model_name); + Ok(()) + } +} + +/* + alter model add +*/ + +pub struct AlterModelAddTxnMD { + model_id_meta: ModelIDMD, + new_field_c: u64, +} +#[derive(Debug)] +#[cfg_attr(test, derive(PartialEq))] +pub struct AlterModelAddTxnRestorePL { + pub(super) model_id: ModelIDRes, + pub(super) new_fields: IndexSTSeqCns, Field>, +} +impl<'a> PersistObject for AlterModelAddTxn<'a> { + const METADATA_SIZE: usize = ::METADATA_SIZE + sizeof!(u64); + type InputType = AlterModelAddTxn<'a>; + type OutputType = AlterModelAddTxnRestorePL; + type Metadata = AlterModelAddTxnMD; + fn pretest_can_dec_object(scanner: &BufferedScanner, md: &Self::Metadata) -> bool { + scanner.has_left( + (md.model_id_meta.space_id.space_name_l + md.model_id_meta.model_name_l) as usize, + ) + } + fn meta_enc(buf: &mut Vec, data: Self::InputType) { + ::meta_enc(buf, data.model_id()); + buf.extend(data.new_fields().st_len().u64_bytes_le()); + } + unsafe fn meta_dec(scanner: &mut BufferedScanner) -> RuntimeResult { + let model_id_meta = ::meta_dec(scanner)?; + let new_field_c = scanner.next_u64_le(); + Ok(AlterModelAddTxnMD { + model_id_meta, + new_field_c, + }) + } + fn obj_enc(buf: &mut Vec, data: Self::InputType) { + ::obj_enc(buf, data.model_id()); + > as PersistObject>::obj_enc( + buf, + data.new_fields(), + ); + } + unsafe fn obj_dec( + s: &mut BufferedScanner, + md: Self::Metadata, + ) -> RuntimeResult { + let model_id = ::obj_dec(s, md.model_id_meta)?; + let new_fields = , _>>> as PersistObject>::obj_dec( + s, + map::MapIndexSizeMD(md.new_field_c as usize), + )?; + Ok(AlterModelAddTxnRestorePL { + model_id, + new_fields, + }) + } +} + +impl<'a> GNSEvent for AlterModelAddTxn<'a> { + const OPC: u16 = 4; + type CommitType = AlterModelAddTxn<'a>; + type RestoreType = AlterModelAddTxnRestorePL; + fn update_global_state( + AlterModelAddTxnRestorePL { + model_id, + new_fields, + }: Self::RestoreType, + gns: &GlobalNS, + ) -> RuntimeResult<()> { + with_model_mut(gns, &model_id.space_id, &model_id, |model| { + let mut mutator = model.model_mutator(); + for (field_name, field) in new_fields.stseq_owned_kv() { + if !mutator.add_field(field_name, field) { + return Err(TransactionError::OnRestoreDataConflictMismatch.into()); + } + } + Ok(()) + }) + } +} + +/* + alter model remove +*/ + +pub struct AlterModelRemoveTxnMD { + model_id_meta: ModelIDMD, + remove_field_c: u64, +} +#[derive(Debug, PartialEq)] +pub struct AlterModelRemoveTxnRestorePL { + pub(super) model_id: ModelIDRes, + pub(super) removed_fields: Box<[Box]>, +} + +impl<'a> PersistObject for AlterModelRemoveTxn<'a> { + const METADATA_SIZE: usize = ::METADATA_SIZE + sizeof!(u64); + type InputType = AlterModelRemoveTxn<'a>; + type OutputType = AlterModelRemoveTxnRestorePL; + type Metadata = AlterModelRemoveTxnMD; + fn pretest_can_dec_object(scanner: &BufferedScanner, md: &Self::Metadata) -> bool { + scanner.has_left( + (md.model_id_meta.space_id.space_name_l + md.model_id_meta.model_name_l) as usize, + ) + } + fn meta_enc(buf: &mut Vec, data: Self::InputType) { + ::meta_enc(buf, data.model_id()); + buf.extend(data.removed_fields().len().u64_bytes_le()); + } + unsafe fn meta_dec(scanner: &mut BufferedScanner) -> RuntimeResult { + let model_id_meta = ::meta_dec(scanner)?; + Ok(AlterModelRemoveTxnMD { + model_id_meta, + remove_field_c: scanner.next_u64_le(), + }) + } + fn obj_enc(buf: &mut Vec, data: Self::InputType) { + ::obj_enc(buf, data.model_id()); + for field in data.removed_fields() { + buf.extend(field.len().u64_bytes_le()); + buf.extend(field.as_bytes()); + } + } + unsafe fn obj_dec( + s: &mut BufferedScanner, + md: Self::Metadata, + ) -> RuntimeResult { + let model_id = ::obj_dec(s, md.model_id_meta)?; + let mut removed_fields = Vec::with_capacity(md.remove_field_c as usize); + while !s.eof() + & (removed_fields.len() as u64 != md.remove_field_c) + & s.has_left(sizeof!(u64)) + { + let len = s.next_u64_le() as usize; + if !s.has_left(len) { + break; + } + removed_fields.push(inf::dec::utils::decode_string(s, len)?.into_boxed_str()); + } + if removed_fields.len() as u64 != md.remove_field_c { + return Err(StorageError::InternalDecodeStructureCorruptedPayload.into()); + } + Ok(AlterModelRemoveTxnRestorePL { + model_id, + removed_fields: removed_fields.into_boxed_slice(), + }) + } +} + +impl<'a> GNSEvent for AlterModelRemoveTxn<'a> { + const OPC: u16 = 5; + type CommitType = AlterModelRemoveTxn<'a>; + type RestoreType = AlterModelRemoveTxnRestorePL; + fn update_global_state( + AlterModelRemoveTxnRestorePL { + model_id, + removed_fields, + }: Self::RestoreType, + gns: &GlobalNS, + ) -> RuntimeResult<()> { + with_model_mut(gns, &model_id.space_id, &model_id, |model| { + let mut mutator = model.model_mutator(); + for removed_field in removed_fields.iter() { + if !mutator.remove_field(&removed_field) { + return Err(TransactionError::OnRestoreDataConflictMismatch.into()); + } + } + Ok(()) + }) + } +} + +/* + alter model update +*/ + +pub struct AlterModelUpdateTxnMD { + model_id_md: ModelIDMD, + updated_field_c: u64, +} +#[derive(Debug, PartialEq)] +pub struct AlterModelUpdateTxnRestorePL { + pub(super) model_id: ModelIDRes, + pub(super) updated_fields: IndexSTSeqCns, Field>, +} + +impl<'a> PersistObject for AlterModelUpdateTxn<'a> { + const METADATA_SIZE: usize = ::METADATA_SIZE + sizeof!(u64); + type InputType = AlterModelUpdateTxn<'a>; + type OutputType = AlterModelUpdateTxnRestorePL; + type Metadata = AlterModelUpdateTxnMD; + fn pretest_can_dec_object(scanner: &BufferedScanner, md: &Self::Metadata) -> bool { + scanner.has_left( + md.model_id_md.space_id.space_name_l as usize + md.model_id_md.model_name_l as usize, + ) + } + fn meta_enc(buf: &mut Vec, data: Self::InputType) { + ::meta_enc(buf, data.model_id()); + buf.extend(data.updated_fields().st_len().u64_bytes_le()); + } + unsafe fn meta_dec(scanner: &mut BufferedScanner) -> RuntimeResult { + let model_id_md = ::meta_dec(scanner)?; + Ok(AlterModelUpdateTxnMD { + model_id_md, + updated_field_c: scanner.next_u64_le(), + }) + } + fn obj_enc(buf: &mut Vec, data: Self::InputType) { + ::obj_enc(buf, data.model_id()); + > as PersistObject>::obj_enc( + buf, + data.updated_fields(), + ); + } + unsafe fn obj_dec( + s: &mut BufferedScanner, + md: Self::Metadata, + ) -> RuntimeResult { + let model_id = ::obj_dec(s, md.model_id_md)?; + let updated_fields = + , _>>> as PersistObject>::obj_dec( + s, + map::MapIndexSizeMD(md.updated_field_c as usize), + )?; + Ok(AlterModelUpdateTxnRestorePL { + model_id, + updated_fields, + }) + } +} + +impl<'a> GNSEvent for AlterModelUpdateTxn<'a> { + const OPC: u16 = 6; + type CommitType = AlterModelUpdateTxn<'a>; + type RestoreType = AlterModelUpdateTxnRestorePL; + fn update_global_state( + AlterModelUpdateTxnRestorePL { + model_id, + updated_fields, + }: Self::RestoreType, + gns: &GlobalNS, + ) -> RuntimeResult<()> { + with_model_mut(gns, &model_id.space_id, &model_id, |model| { + let mut mutator = model.model_mutator(); + for (field_id, field) in updated_fields.stseq_owned_kv() { + if !mutator.update_field(&field_id, field) { + return Err(TransactionError::OnRestoreDataConflictMismatch.into()); + } + } + Ok(()) + }) + } +} + +/* + drop model +*/ + +pub struct DropModelTxnMD { + model_id_md: ModelIDMD, +} +impl<'a> PersistObject for DropModelTxn<'a> { + const METADATA_SIZE: usize = ::METADATA_SIZE; + type InputType = DropModelTxn<'a>; + type OutputType = ModelIDRes; + type Metadata = DropModelTxnMD; + fn pretest_can_dec_object(scanner: &BufferedScanner, md: &Self::Metadata) -> bool { + scanner.has_left( + md.model_id_md.space_id.space_name_l as usize + md.model_id_md.model_name_l as usize, + ) + } + fn meta_enc(buf: &mut Vec, data: Self::InputType) { + ::meta_enc(buf, data.model_id()); + } + unsafe fn meta_dec(scanner: &mut BufferedScanner) -> RuntimeResult { + let model_id_md = ::meta_dec(scanner)?; + Ok(DropModelTxnMD { model_id_md }) + } + fn obj_enc(buf: &mut Vec, data: Self::InputType) { + ::obj_enc(buf, data.model_id()); + } + unsafe fn obj_dec( + s: &mut BufferedScanner, + md: Self::Metadata, + ) -> RuntimeResult { + ::obj_dec(s, md.model_id_md) + } +} + +impl<'a> GNSEvent for DropModelTxn<'a> { + const OPC: u16 = 7; + type CommitType = DropModelTxn<'a>; + type RestoreType = ModelIDRes; + fn update_global_state( + ModelIDRes { + space_id, + model_name, + model_uuid, + model_version: _, + }: Self::RestoreType, + gns: &GlobalNS, + ) -> RuntimeResult<()> { + with_space_mut(gns, &space_id, |space| { + let mut models = gns.idx_models().write(); + if !space.models_mut().remove(&model_name) { + return Err(TransactionError::OnRestoreDataMissing.into()); + } + let Some(removed_model) = models.remove(&EntityIDRef::new(&space_id.name, &model_name)) + else { + return Err(TransactionError::OnRestoreDataMissing.into()); + }; + if removed_model.get_uuid() != model_uuid { + return Err(TransactionError::OnRestoreDataConflictMismatch.into()); + } + Ok(()) + }) + } +} diff --git a/server/src/engine/storage/v1/impls/gns/space.rs b/server/src/engine/storage/v1/impls/gns/space.rs new file mode 100644 index 00000000..c90fef42 --- /dev/null +++ b/server/src/engine/storage/v1/impls/gns/space.rs @@ -0,0 +1,261 @@ +/* + * Created on Wed Aug 23 2023 + * + * This file is a part of Skytable + * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source + * NoSQL database written by Sayan Nandan ("the Author") with the + * vision to provide flexibility in data modelling without compromising + * on performance, queryability or scalability. + * + * Copyright (c) 2023, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +use { + super::GNSEvent, + crate::{ + engine::{ + core::{space::Space, EntityIDRef, GlobalNS}, + data::DictGeneric, + error::{RuntimeResult, TransactionError}, + idx::STIndex, + mem::BufferedScanner, + storage::v1::inf::{self, map, obj, PersistObject}, + txn::gns::space::{AlterSpaceTxn, CreateSpaceTxn, DropSpaceTxn}, + }, + util::EndianQW, + }, +}; + +/* + create space +*/ + +#[derive(Debug)] +#[cfg_attr(test, derive(PartialEq))] +pub struct CreateSpaceTxnRestorePL { + pub(super) space_name: Box, + pub(super) space: Space, +} + +pub struct CreateSpaceTxnMD { + pub(super) space_name_l: u64, + pub(super) space_meta: as PersistObject>::Metadata, +} + +impl<'a> PersistObject for CreateSpaceTxn<'a> { + const METADATA_SIZE: usize = + as PersistObject>::METADATA_SIZE + sizeof!(u64); + type InputType = CreateSpaceTxn<'a>; + type OutputType = CreateSpaceTxnRestorePL; + type Metadata = CreateSpaceTxnMD; + fn pretest_can_dec_object(scanner: &BufferedScanner, md: &Self::Metadata) -> bool { + scanner.has_left(md.space_name_l as usize) + } + fn meta_enc(buf: &mut Vec, data: Self::InputType) { + buf.extend(data.space_name().len().u64_bytes_le()); + ::meta_enc( + buf, + obj::SpaceLayoutRef::from((data.space(), data.space_meta())), + ); + } + unsafe fn meta_dec(scanner: &mut BufferedScanner) -> RuntimeResult { + let space_name_l = scanner.next_u64_le(); + let space_meta = ::meta_dec(scanner)?; + Ok(CreateSpaceTxnMD { + space_name_l, + space_meta, + }) + } + fn obj_enc(buf: &mut Vec, data: Self::InputType) { + buf.extend(data.space_name().as_bytes()); + ::obj_enc( + buf, + (data.space(), data.space_meta()).into(), + ); + } + unsafe fn obj_dec( + s: &mut BufferedScanner, + md: Self::Metadata, + ) -> RuntimeResult { + let space_name = + inf::dec::utils::decode_string(s, md.space_name_l as usize)?.into_boxed_str(); + let space = ::obj_dec(s, md.space_meta)?; + Ok(CreateSpaceTxnRestorePL { space_name, space }) + } +} + +impl<'a> GNSEvent for CreateSpaceTxn<'a> { + const OPC: u16 = 0; + type CommitType = CreateSpaceTxn<'a>; + type RestoreType = CreateSpaceTxnRestorePL; + fn update_global_state( + CreateSpaceTxnRestorePL { space_name, space }: CreateSpaceTxnRestorePL, + gns: &crate::engine::core::GlobalNS, + ) -> RuntimeResult<()> { + let mut spaces = gns.idx().write(); + if spaces.st_insert(space_name, space.into()) { + Ok(()) + } else { + Err(TransactionError::OnRestoreDataConflictAlreadyExists.into()) + } + } +} + +/* + alter space + --- + for now dump the entire meta +*/ + +pub struct AlterSpaceTxnMD { + space_id_meta: super::SpaceIDMD, + dict_len: u64, +} + +#[derive(Debug, PartialEq)] +pub struct AlterSpaceTxnRestorePL { + pub(super) space_id: super::SpaceIDRes, + pub(super) space_meta: DictGeneric, +} + +impl<'a> PersistObject for AlterSpaceTxn<'a> { + const METADATA_SIZE: usize = sizeof!(u64, 2) + sizeof!(u128); + type InputType = AlterSpaceTxn<'a>; + type OutputType = AlterSpaceTxnRestorePL; + type Metadata = AlterSpaceTxnMD; + fn pretest_can_dec_object(scanner: &BufferedScanner, md: &Self::Metadata) -> bool { + scanner.has_left(md.space_id_meta.space_name_l as usize) + } + fn meta_enc(buf: &mut Vec, data: Self::InputType) { + ::meta_enc(buf, data.space_id()); + buf.extend(data.updated_props().len().u64_bytes_le()); + } + unsafe fn meta_dec(scanner: &mut BufferedScanner) -> RuntimeResult { + Ok(AlterSpaceTxnMD { + space_id_meta: ::meta_dec(scanner)?, + dict_len: scanner.next_u64_le(), + }) + } + fn obj_enc(buf: &mut Vec, data: Self::InputType) { + ::obj_enc(buf, data.space_id()); + as PersistObject>::obj_enc( + buf, + data.updated_props(), + ); + } + unsafe fn obj_dec( + s: &mut BufferedScanner, + md: Self::Metadata, + ) -> RuntimeResult { + let space_id = ::obj_dec(s, md.space_id_meta)?; + let space_meta = as PersistObject>::obj_dec( + s, + map::MapIndexSizeMD(md.dict_len as usize), + )?; + Ok(AlterSpaceTxnRestorePL { + space_id, + space_meta, + }) + } +} + +impl<'a> GNSEvent for AlterSpaceTxn<'a> { + const OPC: u16 = 1; + type CommitType = AlterSpaceTxn<'a>; + type RestoreType = AlterSpaceTxnRestorePL; + + fn update_global_state( + AlterSpaceTxnRestorePL { + space_id, + space_meta, + }: Self::RestoreType, + gns: &crate::engine::core::GlobalNS, + ) -> RuntimeResult<()> { + let mut gns = gns.idx().write(); + match gns.st_get_mut(&space_id.name) { + Some(space) => { + if !crate::engine::data::dict::rmerge_metadata(space.props_mut(), space_meta) { + return Err(TransactionError::OnRestoreDataConflictMismatch.into()); + } + } + None => return Err(TransactionError::OnRestoreDataMissing.into()), + } + Ok(()) + } +} + +/* + drop space +*/ + +impl<'a> PersistObject for DropSpaceTxn<'a> { + const METADATA_SIZE: usize = sizeof!(u128) + sizeof!(u64); + type InputType = DropSpaceTxn<'a>; + type OutputType = super::SpaceIDRes; + type Metadata = super::SpaceIDMD; + fn pretest_can_dec_object(scanner: &BufferedScanner, md: &Self::Metadata) -> bool { + scanner.has_left(md.space_name_l as usize) + } + fn meta_enc(buf: &mut Vec, data: Self::InputType) { + ::meta_enc(buf, data.space_id()); + } + unsafe fn meta_dec(scanner: &mut BufferedScanner) -> RuntimeResult { + ::meta_dec(scanner) + } + fn obj_enc(buf: &mut Vec, data: Self::InputType) { + ::obj_enc(buf, data.space_id()) + } + unsafe fn obj_dec( + s: &mut BufferedScanner, + md: Self::Metadata, + ) -> RuntimeResult { + ::obj_dec(s, md) + } +} + +impl<'a> GNSEvent for DropSpaceTxn<'a> { + const OPC: u16 = 2; + type CommitType = DropSpaceTxn<'a>; + type RestoreType = super::SpaceIDRes; + fn update_global_state( + super::SpaceIDRes { uuid, name }: Self::RestoreType, + gns: &GlobalNS, + ) -> RuntimeResult<()> { + let mut wgns = gns.idx().write(); + let mut wmodel = gns.idx_models().write(); + match wgns.entry(name) { + std::collections::hash_map::Entry::Occupied(oe) => { + if oe.get().get_uuid() == uuid { + for model in oe.get().models() { + let id: EntityIDRef<'static> = unsafe { + // UNSAFE(@ohsayan): I really need a pack of what the borrow checker has been reveling on + core::mem::transmute(EntityIDRef::new(oe.key(), &model)) + }; + let _ = wmodel.st_delete(&id); + } + oe.remove_entry(); + Ok(()) + } else { + return Err(TransactionError::OnRestoreDataConflictMismatch.into()); + } + } + std::collections::hash_map::Entry::Vacant(_) => { + return Err(TransactionError::OnRestoreDataMissing.into()) + } + } + } +} diff --git a/server/src/engine/txn/gns/tests/full_chain.rs b/server/src/engine/storage/v1/impls/gns/tests/full_chain.rs similarity index 100% rename from server/src/engine/txn/gns/tests/full_chain.rs rename to server/src/engine/storage/v1/impls/gns/tests/full_chain.rs diff --git a/server/src/engine/txn/gns/tests/io.rs b/server/src/engine/storage/v1/impls/gns/tests/io.rs similarity index 93% rename from server/src/engine/txn/gns/tests/io.rs rename to server/src/engine/storage/v1/impls/gns/tests/io.rs index 9a03c5dd..8dc85535 100644 --- a/server/src/engine/txn/gns/tests/io.rs +++ b/server/src/engine/storage/v1/impls/gns/tests/io.rs @@ -26,23 +26,24 @@ use { super::super::{ - model::{self, ModelIDRef, ModelIDRes}, + model::{self, ModelIDRes}, space, SpaceIDRef, SpaceIDRes, }, crate::engine::{ core::{model::Model, space::Space}, storage::v1::inf::{dec, enc}, + txn::ModelIDRef, }, }; mod space_tests { - use super::{ - dec, enc, - space::{ - AlterSpaceTxn, AlterSpaceTxnRestorePL, CreateSpaceTxn, CreateSpaceTxnRestorePL, - DropSpaceTxn, + use { + super::{ + dec, enc, + space::{AlterSpaceTxnRestorePL, CreateSpaceTxnRestorePL}, + Space, SpaceIDRef, }, - Space, SpaceIDRef, + crate::engine::txn::gns::space::{AlterSpaceTxn, CreateSpaceTxn, DropSpaceTxn}, }; #[test] fn create() { @@ -91,15 +92,18 @@ mod model_tests { use { super::{ model::{ - AlterModelAddTxn, AlterModelAddTxnRestorePL, AlterModelRemoveTxn, - AlterModelRemoveTxnRestorePL, AlterModelUpdateTxn, AlterModelUpdateTxnRestorePL, - CreateModelTxn, CreateModelTxnRestorePL, DropModelTxn, + AlterModelAddTxnRestorePL, AlterModelRemoveTxnRestorePL, + AlterModelUpdateTxnRestorePL, CreateModelTxnRestorePL, }, Model, Space, }, crate::engine::{ core::model::{Field, Layer}, data::{tag::TagSelector, uuid::Uuid}, + txn::gns::model::{ + AlterModelAddTxn, AlterModelRemoveTxn, AlterModelUpdateTxn, CreateModelTxn, + DropModelTxn, + }, }, }; fn default_space_model() -> (Space, Model) { diff --git a/server/src/engine/txn/gns/tests/mod.rs b/server/src/engine/storage/v1/impls/gns/tests/mod.rs similarity index 100% rename from server/src/engine/txn/gns/tests/mod.rs rename to server/src/engine/storage/v1/impls/gns/tests/mod.rs diff --git a/server/src/engine/storage/v1/impls/mod.rs b/server/src/engine/storage/v1/impls/mod.rs new file mode 100644 index 00000000..94b33f77 --- /dev/null +++ b/server/src/engine/storage/v1/impls/mod.rs @@ -0,0 +1,27 @@ +/* + * Created on Sat Feb 10 2024 + * + * This file is a part of Skytable + * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source + * NoSQL database written by Sayan Nandan ("the Author") with the + * vision to provide flexibility in data modelling without compromising + * on performance, queryability or scalability. + * + * Copyright (c) 2024, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +pub mod gns; diff --git a/server/src/engine/storage/v1/loader.rs b/server/src/engine/storage/v1/loader.rs index 87e3af65..10609fd2 100644 --- a/server/src/engine/storage/v1/loader.rs +++ b/server/src/engine/storage/v1/loader.rs @@ -27,7 +27,7 @@ #[cfg(test)] use crate::engine::storage::{ common::interface::fs_traits::{FSInterface, FileOpen}, - v1::JournalWriter, + v1::raw::journal::JournalWriter, }; use crate::engine::{ core::{EntityIDRef, GlobalNS}, @@ -37,9 +37,11 @@ use crate::engine::{ fractal::{FractalModelDriver, ModelDrivers, ModelUniqueID}, storage::{ common::interface::fs_imp::LocalFS, - v1::{batch_jrnl, journal, spec}, + v1::{ + impls::gns::{GNSAdapter, GNSTransactionDriverAnyFS}, + raw::{batch_jrnl, journal, spec}, + }, }, - txn::gns::{GNSAdapter, GNSTransactionDriverAnyFS}, }; const GNS_FILE_PATH: &str = "gns.db-tlog"; diff --git a/server/src/engine/storage/v1/mod.rs b/server/src/engine/storage/v1/mod.rs index 1ce27ee6..5292316e 100644 --- a/server/src/engine/storage/v1/mod.rs +++ b/server/src/engine/storage/v1/mod.rs @@ -1,5 +1,5 @@ /* - * Created on Mon May 15 2023 + * Created on Sat Feb 10 2024 * * This file is a part of Skytable * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source @@ -7,7 +7,7 @@ * vision to provide flexibility in data modelling without compromising * on performance, queryability or scalability. * - * Copyright (c) 2023, Sayan Nandan + * Copyright (c) 2024, Sayan Nandan * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as published by @@ -28,25 +28,12 @@ //! //! Target tags: `0.8.0-beta`, `0.8.0-beta.2`, `0.8.0-beta.3` -// impls -mod batch_jrnl; -mod journal; -pub(in crate::engine) mod loader; -mod rw; -pub mod spec; -pub mod sysdb; -// hl +mod impls; pub mod inf; -#[cfg(test)] -mod tests; +pub mod loader; +pub mod raw; -// re-exports -pub(self) use spec::Header; - -pub use { - journal::{JournalAdapter, JournalWriter}, - rw::SDSSFileIO, +pub use self::{ + impls::gns::GNSTransactionDriverAnyFS, raw::batch_jrnl::create as create_batch_journal, + raw::batch_jrnl::DataBatchPersistDriver, }; -pub mod data_batch { - pub use super::batch_jrnl::{create, DataBatchPersistDriver}; -} diff --git a/server/src/engine/storage/v1/batch_jrnl/mod.rs b/server/src/engine/storage/v1/raw/batch_jrnl/mod.rs similarity index 100% rename from server/src/engine/storage/v1/batch_jrnl/mod.rs rename to server/src/engine/storage/v1/raw/batch_jrnl/mod.rs diff --git a/server/src/engine/storage/v1/batch_jrnl/persist.rs b/server/src/engine/storage/v1/raw/batch_jrnl/persist.rs similarity index 98% rename from server/src/engine/storage/v1/batch_jrnl/persist.rs rename to server/src/engine/storage/v1/raw/batch_jrnl/persist.rs index 3a6f654b..21b9f313 100644 --- a/server/src/engine/storage/v1/batch_jrnl/persist.rs +++ b/server/src/engine/storage/v1/raw/batch_jrnl/persist.rs @@ -24,8 +24,6 @@ * */ -use crate::engine::storage::v1::inf::obj::cell; - use { super::{ MARKER_ACTUAL_BATCH_EVENT, MARKER_BATCH_CLOSED, MARKER_BATCH_REOPEN, MARKER_END_OF_BATCH, @@ -48,7 +46,10 @@ use { idx::STIndexSeq, storage::{ common::interface::fs_traits::FSInterface, - v1::rw::{SDSSFileIO, TrackedWriter}, + v1::{ + inf::obj::cell, + raw::rw::{SDSSFileIO, TrackedWriter}, + }, }, }, util::EndianQW, diff --git a/server/src/engine/storage/v1/batch_jrnl/restore.rs b/server/src/engine/storage/v1/raw/batch_jrnl/restore.rs similarity index 98% rename from server/src/engine/storage/v1/batch_jrnl/restore.rs rename to server/src/engine/storage/v1/raw/batch_jrnl/restore.rs index 862d0e42..d033ae4e 100644 --- a/server/src/engine/storage/v1/batch_jrnl/restore.rs +++ b/server/src/engine/storage/v1/raw/batch_jrnl/restore.rs @@ -24,11 +24,6 @@ * */ -use crate::engine::storage::v1::inf::{ - obj::cell::{self, StorageCellTypeID}, - DataSource, -}; - use { super::{ MARKER_ACTUAL_BATCH_EVENT, MARKER_BATCH_CLOSED, MARKER_BATCH_REOPEN, MARKER_END_OF_BATCH, @@ -44,7 +39,13 @@ use { idx::{MTIndex, STIndex, STIndexSeq}, storage::{ common::interface::fs_traits::FSInterface, - v1::rw::{SDSSFileIO, TrackedReader}, + v1::{ + inf::{ + obj::cell::{self, StorageCellTypeID}, + DataSource, + }, + raw::rw::{SDSSFileIO, TrackedReader}, + }, }, }, std::{ diff --git a/server/src/engine/storage/v1/journal.rs b/server/src/engine/storage/v1/raw/journal.rs similarity index 99% rename from server/src/engine/storage/v1/journal.rs rename to server/src/engine/storage/v1/raw/journal.rs index fbe36f75..386dd23d 100644 --- a/server/src/engine/storage/v1/journal.rs +++ b/server/src/engine/storage/v1/raw/journal.rs @@ -43,8 +43,9 @@ #[cfg(test)] use crate::engine::storage::common::interface::fs_traits::FileOpen; + use { - super::rw::SDSSFileIO, + super::{rw::SDSSFileIO, spec::Header}, crate::{ engine::{ error::{RuntimeResult, StorageError}, @@ -215,7 +216,7 @@ pub struct JournalReader { impl JournalReader { pub fn new(log_file: SDSSFileIO) -> RuntimeResult { - let log_size = log_file.file_length()? - super::Header::SIZE as u64; + let log_size = log_file.file_length()? - Header::SIZE as u64; Ok(Self { log_file, evid: 0, diff --git a/server/src/engine/storage/v1/memfs.rs b/server/src/engine/storage/v1/raw/memfs.rs similarity index 100% rename from server/src/engine/storage/v1/memfs.rs rename to server/src/engine/storage/v1/raw/memfs.rs diff --git a/server/src/engine/storage/v1/raw/mod.rs b/server/src/engine/storage/v1/raw/mod.rs new file mode 100644 index 00000000..9523b43f --- /dev/null +++ b/server/src/engine/storage/v1/raw/mod.rs @@ -0,0 +1,34 @@ +/* + * Created on Mon May 15 2023 + * + * This file is a part of Skytable + * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source + * NoSQL database written by Sayan Nandan ("the Author") with the + * vision to provide flexibility in data modelling without compromising + * on performance, queryability or scalability. + * + * Copyright (c) 2023, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +// impls +pub(super) mod batch_jrnl; +pub(super) mod journal; +pub(super) mod rw; +pub mod spec; +pub mod sysdb; +#[cfg(test)] +mod tests; diff --git a/server/src/engine/storage/v1/rw.rs b/server/src/engine/storage/v1/raw/rw.rs similarity index 100% rename from server/src/engine/storage/v1/rw.rs rename to server/src/engine/storage/v1/raw/rw.rs diff --git a/server/src/engine/storage/v1/spec.rs b/server/src/engine/storage/v1/raw/spec.rs similarity index 94% rename from server/src/engine/storage/v1/spec.rs rename to server/src/engine/storage/v1/raw/spec.rs index 03aa63a1..d6f72b0a 100644 --- a/server/src/engine/storage/v1/spec.rs +++ b/server/src/engine/storage/v1/raw/spec.rs @@ -29,10 +29,10 @@ use crate::engine::storage::common::{ versions::{self, DriverVersion, FileSpecifierVersion, ServerVersion}, }; -pub(super) type Header = sdss::sdss_r1::HeaderV1; +pub type Header = sdss::sdss_r1::HeaderV1; #[derive(Debug)] -pub(super) struct HeaderImplV1; +pub struct HeaderImplV1; impl sdss::sdss_r1::HeaderV1Spec for HeaderImplV1 { type FileClass = FileScope; type FileSpecifier = FileSpecifier; @@ -82,7 +82,7 @@ pub enum FileSpecifier { */ #[cfg(test)] -pub(super) struct TestFile; +pub struct TestFile; #[cfg(test)] impl sdss::sdss_r1::SimpleFileSpecV1 for TestFile { type HeaderSpec = HeaderImplV1; @@ -92,7 +92,7 @@ impl sdss::sdss_r1::SimpleFileSpecV1 for TestFile { } /// The file specification for the GNS transaction log (impl v1) -pub(super) struct GNSTransactionLogV1; +pub struct GNSTransactionLogV1; impl sdss::sdss_r1::SimpleFileSpecV1 for GNSTransactionLogV1 { type HeaderSpec = HeaderImplV1; const FILE_CLASS: FileScope = FileScope::Journal; @@ -101,7 +101,7 @@ impl sdss::sdss_r1::SimpleFileSpecV1 for GNSTransactionLogV1 { } /// The file specification for a journal batch -pub(super) struct DataBatchJournalV1; +pub struct DataBatchJournalV1; impl sdss::sdss_r1::SimpleFileSpecV1 for DataBatchJournalV1 { type HeaderSpec = HeaderImplV1; const FILE_CLASS: FileScope = FileScope::DataBatch; @@ -110,7 +110,7 @@ impl sdss::sdss_r1::SimpleFileSpecV1 for DataBatchJournalV1 { } /// The file specification for the system db -pub(super) struct SysDBV1; +pub struct SysDBV1; impl sdss::sdss_r1::SimpleFileSpecV1 for SysDBV1 { type HeaderSpec = HeaderImplV1; const FILE_CLASS: FileScope = FileScope::FlatmapData; diff --git a/server/src/engine/storage/v1/sysdb.rs b/server/src/engine/storage/v1/raw/sysdb.rs similarity index 98% rename from server/src/engine/storage/v1/sysdb.rs rename to server/src/engine/storage/v1/raw/sysdb.rs index 9a578f44..11789720 100644 --- a/server/src/engine/storage/v1/sysdb.rs +++ b/server/src/engine/storage/v1/raw/sysdb.rs @@ -32,7 +32,10 @@ use { fractal::sys_store::{SysAuth, SysAuthUser, SysConfig, SysHostData, SystemStore}, storage::{ common::interface::fs_traits::{FSInterface, FileOpen}, - v1::{inf, spec, SDSSFileIO}, + v1::{ + inf, + raw::{rw::SDSSFileIO, spec}, + }, }, }, parking_lot::RwLock, @@ -137,7 +140,7 @@ impl SystemStore { ), ); // write - let buf = super::inf::enc::enc_dict_full::(&map); + let buf = inf::enc::enc_dict_full::(&map); f.fsynced_write(&buf) } fn _sync_with(&self, target: &str, cow: &str, auth: &SysAuth) -> RuntimeResult<()> { diff --git a/server/src/engine/storage/v1/tests.rs b/server/src/engine/storage/v1/raw/tests.rs similarity index 100% rename from server/src/engine/storage/v1/tests.rs rename to server/src/engine/storage/v1/raw/tests.rs diff --git a/server/src/engine/storage/v1/tests/batch.rs b/server/src/engine/storage/v1/raw/tests/batch.rs similarity index 98% rename from server/src/engine/storage/v1/tests/batch.rs rename to server/src/engine/storage/v1/raw/tests/batch.rs index 2a4a9067..23e7089c 100644 --- a/server/src/engine/storage/v1/tests/batch.rs +++ b/server/src/engine/storage/v1/raw/tests/batch.rs @@ -38,13 +38,13 @@ use { idx::MTIndex, storage::{ common::interface::{fs_test::VirtualFS, fs_traits::FileOpen}, - v1::{ + v1::raw::{ batch_jrnl::{ DataBatchPersistDriver, DataBatchRestoreDriver, DecodedBatchEvent, DecodedBatchEventKind, NormalBatch, }, rw::SDSSFileIO, - spec, + spec::{self, Header}, }, }, }, @@ -57,9 +57,7 @@ fn pkey(v: impl Into) -> PrimaryIndexKey { PrimaryIndexKey::try_from_dc(v.into()).unwrap() } -fn open_file( - fpath: &str, -) -> FileOpen, (SDSSFileIO, super::super::Header)> { +fn open_file(fpath: &str) -> FileOpen, (SDSSFileIO, Header)> { SDSSFileIO::open_or_create_perm_rw::(fpath).unwrap() } diff --git a/server/src/engine/storage/v1/tests/rw.rs b/server/src/engine/storage/v1/raw/tests/rw.rs similarity index 97% rename from server/src/engine/storage/v1/tests/rw.rs rename to server/src/engine/storage/v1/raw/tests/rw.rs index 8cd30794..89fe32ba 100644 --- a/server/src/engine/storage/v1/tests/rw.rs +++ b/server/src/engine/storage/v1/raw/tests/rw.rs @@ -26,7 +26,7 @@ use crate::engine::storage::{ common::interface::fs_traits::FileOpen, - v1::{rw::SDSSFileIO, spec}, + v1::raw::{rw::SDSSFileIO, spec}, }; #[test] diff --git a/server/src/engine/storage/v1/tests/tx.rs b/server/src/engine/storage/v1/raw/tests/tx.rs similarity index 99% rename from server/src/engine/storage/v1/tests/tx.rs rename to server/src/engine/storage/v1/raw/tests/tx.rs index 6258b40c..b458d6db 100644 --- a/server/src/engine/storage/v1/tests/tx.rs +++ b/server/src/engine/storage/v1/raw/tests/tx.rs @@ -28,7 +28,7 @@ use { crate::{ engine::{ error::{RuntimeResult, StorageError}, - storage::v1::{ + storage::v1::raw::{ journal::{self, JournalAdapter, JournalWriter}, spec, }, diff --git a/server/src/engine/storage/v2/mod.rs b/server/src/engine/storage/v2/mod.rs index 433b52df..846e466c 100644 --- a/server/src/engine/storage/v2/mod.rs +++ b/server/src/engine/storage/v2/mod.rs @@ -24,5 +24,4 @@ * */ -pub mod journal; -pub mod spec; +pub(in crate::engine::storage) mod raw; diff --git a/server/src/engine/storage/v2/journal/mod.rs b/server/src/engine/storage/v2/raw/journal/mod.rs similarity index 86% rename from server/src/engine/storage/v2/journal/mod.rs rename to server/src/engine/storage/v2/raw/journal/mod.rs index 278ea34a..a5b87407 100644 --- a/server/src/engine/storage/v2/journal/mod.rs +++ b/server/src/engine/storage/v2/raw/journal/mod.rs @@ -48,22 +48,41 @@ use { std::marker::PhantomData, }; +/* + Event log adapter +*/ + +/// A journal based on an [`EventLog`] pub type EventLogJournal = raw::RawJournalWriter, Fs>; +/// An [`EventLog`] is a standard, append-only, sequential journal with per-event and per-cycle integrity protection pub struct EventLog(PhantomData); +/// An adapter that provides the specification for an event log pub trait EventLogAdapter { + /// the SDSS file spec type SdssSpec: FileSpecV1; + /// global state type GlobalState; + /// the event type type Event<'a>; + /// the decoded event type type DecodedEvent; + /// event metadata type EventMeta: Copy; + /// the error type type Error: Into; + /// the maximum value for the event discriminant const EV_MAX: u8; + /// get metadata from the raw value unsafe fn meta_from_raw(m: u64) -> Self::EventMeta; + /// get metadata from the event fn event_md<'a>(event: &Self::Event<'a>) -> u64; + /// encode an event fn encode<'a>(event: Self::Event<'a>) -> Box<[u8]>; + /// decode an event fn decode(block: Vec, kind: Self::EventMeta) -> Result; + /// apply the event fn apply_event(g: &Self::GlobalState, ev: Self::DecodedEvent) -> Result<(), Self::Error>; } @@ -71,12 +90,18 @@ impl RawJournalAdapter for EventLog { const COMMIT_PREFERENCE: CommitPreference = CommitPreference::Direct; type Spec = ::SdssSpec; type GlobalState = ::GlobalState; + type Context<'a> = () where Self: 'a; type Event<'a> = ::Event<'a>; type DecodedEvent = ::DecodedEvent; type EventMeta = ::EventMeta; fn initialize(_: &JournalInitializer) -> Self { Self(PhantomData) } + fn enter_context<'a, Fs: FSInterface>( + _: &'a mut raw::RawJournalWriter, + ) -> Self::Context<'a> { + () + } fn parse_event_meta(meta: u64) -> Option { if meta > ::EV_MAX as u64 { return None; diff --git a/server/src/engine/storage/v2/journal/raw/mod.rs b/server/src/engine/storage/v2/raw/journal/raw/mod.rs similarity index 98% rename from server/src/engine/storage/v2/journal/raw/mod.rs rename to server/src/engine/storage/v2/raw/journal/raw/mod.rs index e5777551..98d0f940 100644 --- a/server/src/engine/storage/v2/journal/raw/mod.rs +++ b/server/src/engine/storage/v2/raw/journal/raw/mod.rs @@ -209,7 +209,7 @@ macro_rules! jtrace_reader { */ /// An adapter defining the low-level structure of a log file -pub trait RawJournalAdapter { +pub trait RawJournalAdapter: Sized { /// event size buffer const EVENT_SIZE_BUFFER: usize = 128; /// Set to true if the journal writer should automatically flush the buffer and fsync after writing an event @@ -220,6 +220,10 @@ pub trait RawJournalAdapter { type Spec: FileSpecV1; /// the global state that is used by this journal type GlobalState; + /// Writer context + type Context<'a> + where + Self: 'a; /// a journal event type Event<'a>; /// the decoded event @@ -228,6 +232,10 @@ pub trait RawJournalAdapter { type EventMeta: Copy; /// initialize this adapter fn initialize(j_: &JournalInitializer) -> Self; + /// get a write context + fn enter_context<'a, Fs: FSInterface>( + adapter: &'a mut RawJournalWriter, + ) -> Self::Context<'a>; /// parse event metadata fn parse_event_meta(meta: u64) -> Option; /// get event metadata as an [`u64`] @@ -479,6 +487,9 @@ impl RawJournalWriter { } Ok(me) } + pub fn context(&mut self) -> J::Context<'_> { + J::enter_context(self) + } /// Commit a new event to the journal /// /// This will auto-flush the buffer and sync metadata as soon as the [`RawJournalAdapter::commit`] method returns, diff --git a/server/src/engine/storage/v2/journal/raw/tests.rs b/server/src/engine/storage/v2/raw/journal/raw/tests.rs similarity index 98% rename from server/src/engine/storage/v2/journal/raw/tests.rs rename to server/src/engine/storage/v2/raw/journal/raw/tests.rs index ff196c69..5de4a230 100644 --- a/server/src/engine/storage/v2/journal/raw/tests.rs +++ b/server/src/engine/storage/v2/raw/journal/raw/tests.rs @@ -40,7 +40,7 @@ use { }, sdss::sdss_r1::rw::TrackedReader, }, - v2::{ + v2::raw::{ journal::raw::{JournalReaderTraceEvent, JournalWriterTraceEvent}, spec::SystemDatabaseV1, }, @@ -123,9 +123,15 @@ impl RawJournalAdapter for SimpleDBJournal { type Event<'a> = DbEventEncoded<'a>; type DecodedEvent = DbEventRestored; type EventMeta = EventMeta; + type Context<'a> = () where Self: 'a; fn initialize(_: &JournalInitializer) -> Self { Self } + fn enter_context<'a, Fs: FSInterface>( + _: &'a mut RawJournalWriter, + ) -> Self::Context<'a> { + () + } fn parse_event_meta(meta: u64) -> Option { Some(match meta { 0 => EventMeta::NewKey, diff --git a/server/src/engine/storage/v2/journal/tests.rs b/server/src/engine/storage/v2/raw/journal/tests.rs similarity index 98% rename from server/src/engine/storage/v2/journal/tests.rs rename to server/src/engine/storage/v2/raw/journal/tests.rs index ee3e7b2e..c2e4893e 100644 --- a/server/src/engine/storage/v2/journal/tests.rs +++ b/server/src/engine/storage/v2/raw/journal/tests.rs @@ -36,7 +36,7 @@ use { mem::unsafe_apis, storage::{ common::interface::fs_test::VirtualFS, - v2::{journal::raw::open_journal, spec::SystemDatabaseV1}, + v2::raw::{journal::raw::open_journal, spec::SystemDatabaseV1}, }, RuntimeResult, }, diff --git a/server/src/engine/storage/v2/raw/mod.rs b/server/src/engine/storage/v2/raw/mod.rs new file mode 100644 index 00000000..8a92b870 --- /dev/null +++ b/server/src/engine/storage/v2/raw/mod.rs @@ -0,0 +1,28 @@ +/* + * Created on Sat Feb 10 2024 + * + * This file is a part of Skytable + * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source + * NoSQL database written by Sayan Nandan ("the Author") with the + * vision to provide flexibility in data modelling without compromising + * on performance, queryability or scalability. + * + * Copyright (c) 2024, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +pub mod journal; +pub mod spec; diff --git a/server/src/engine/storage/v2/spec.rs b/server/src/engine/storage/v2/raw/spec.rs similarity index 100% rename from server/src/engine/storage/v2/spec.rs rename to server/src/engine/storage/v2/raw/spec.rs diff --git a/server/src/engine/txn/gns/mod.rs b/server/src/engine/txn/gns/mod.rs index f741d01e..a44b262c 100644 --- a/server/src/engine/txn/gns/mod.rs +++ b/server/src/engine/txn/gns/mod.rs @@ -1,5 +1,5 @@ /* - * Created on Sun Aug 20 2023 + * Created on Sat Feb 10 2024 * * This file is a part of Skytable * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source @@ -7,7 +7,7 @@ * vision to provide flexibility in data modelling without compromising * on performance, queryability or scalability. * - * Copyright (c) 2023, Sayan Nandan + * Copyright (c) 2024, Sayan Nandan * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as published by @@ -24,219 +24,5 @@ * */ -use { - crate::{ - engine::{ - core::{space::Space, GlobalNS}, - data::uuid::Uuid, - error::{RuntimeResult, TransactionError}, - mem::BufferedScanner, - storage::{ - safe_interfaces::{FSInterface, LocalFS}, - v1::{ - inf::{self, PersistObject}, - JournalAdapter, JournalWriter, - }, - }, - }, - util::EndianQW, - }, - std::marker::PhantomData, -}; - -mod model; -mod space; -// test -#[cfg(test)] -mod tests; - -// re-exports -pub use { - model::{ - AlterModelAddTxn, AlterModelRemoveTxn, AlterModelUpdateTxn, CreateModelTxn, DropModelTxn, - ModelIDRef, - }, - space::{AlterSpaceTxn, CreateSpaceTxn, DropSpaceTxn}, -}; - -/// The GNS transaction driver is used to handle DDL transactions -pub struct GNSTransactionDriverAnyFS { - journal: JournalWriter, -} - -impl GNSTransactionDriverAnyFS { - pub fn new(journal: JournalWriter) -> Self { - Self { journal } - } - pub fn into_inner(self) -> JournalWriter { - self.journal - } - pub fn __journal_mut(&mut self) -> &mut JournalWriter { - &mut self.journal - } - /// Attempts to commit the given event into the journal, handling any possible recovery triggers and returning - /// errors (if any) - pub fn try_commit(&mut self, gns_event: GE) -> RuntimeResult<()> { - let mut buf = vec![]; - buf.extend(GE::OPC.to_le_bytes()); - GE::encode_super_event(gns_event, &mut buf); - self.journal - .append_event_with_recovery_plugin(GNSSuperEvent(buf.into_boxed_slice()))?; - Ok(()) - } -} - -/* - journal implementor -*/ - -/// the journal adapter for DDL queries on the GNS -#[derive(Debug)] -pub struct GNSAdapter; - -impl JournalAdapter for GNSAdapter { - const RECOVERY_PLUGIN: bool = true; - type JournalEvent = GNSSuperEvent; - type GlobalState = GlobalNS; - type Error = crate::engine::fractal::error::Error; - fn encode(GNSSuperEvent(b): Self::JournalEvent) -> Box<[u8]> { - b - } - fn decode_and_update_state(payload: &[u8], gs: &Self::GlobalState) -> RuntimeResult<()> { - if payload.len() < 2 { - return Err(TransactionError::DecodedUnexpectedEof.into()); - } - macro_rules! dispatch { - ($($item:ty),* $(,)?) => { - [$(<$item as GNSEvent>::decode_and_update_global_state),*, |_, _| Err(TransactionError::DecodeUnknownTxnOp.into())] - }; - } - static DISPATCH: [fn(&mut BufferedScanner, &GlobalNS) -> RuntimeResult<()>; 9] = dispatch!( - CreateSpaceTxn, - AlterSpaceTxn, - DropSpaceTxn, - CreateModelTxn, - AlterModelAddTxn, - AlterModelRemoveTxn, - AlterModelUpdateTxn, - DropModelTxn - ); - let mut scanner = BufferedScanner::new(&payload); - let opc = unsafe { - // UNSAFE(@ohsayan): - u16::from_le_bytes(scanner.next_chunk()) - }; - match DISPATCH[(opc as usize).min(DISPATCH.len())](&mut scanner, gs) { - Ok(()) if scanner.eof() => return Ok(()), - Ok(_) => Err(TransactionError::DecodeCorruptedPayloadMoreBytes.into()), - Err(e) => Err(e), - } - } -} - -/* - Events - --- - FIXME(@ohsayan): In the current impl, we unnecessarily use an intermediary buffer which we clearly don't need to (and also makes - pointless allocations). We need to fix this, but with a consistent API (and preferably not something like commit_*(...) unless - we have absolutely no other choice) - --- - [OPC:2B][PAYLOAD] -*/ - -pub struct GNSSuperEvent(Box<[u8]>); - -/// Definition for an event in the GNS (DDL queries) -pub trait GNSEvent -where - Self: PersistObject + Sized, -{ - /// OPC for the event (unique) - const OPC: u16; - /// Expected type for a commit - type CommitType; - /// Expected type for a restore - type RestoreType; - /// Encodes the event into the given buffer - fn encode_super_event(commit: Self, buf: &mut Vec) { - inf::enc::enc_full_into_buffer::(buf, commit) - } - fn decode_and_update_global_state( - scanner: &mut BufferedScanner, - gns: &GlobalNS, - ) -> RuntimeResult<()> { - Self::update_global_state(Self::decode(scanner)?, gns) - } - /// Attempts to decode the event using the given scanner - fn decode(scanner: &mut BufferedScanner) -> RuntimeResult { - inf::dec::dec_full_from_scanner::(scanner).map_err(|e| e.into()) - } - /// Update the global state from the restored event - fn update_global_state(restore: Self::RestoreType, gns: &GlobalNS) -> RuntimeResult<()>; -} - -#[derive(Debug, Clone, Copy)] -pub struct SpaceIDRef<'a> { - uuid: Uuid, - name: &'a str, -} - -impl<'a> SpaceIDRef<'a> { - pub fn new(name: &'a str, space: &Space) -> Self { - Self { - uuid: space.get_uuid(), - name, - } - } -} - -#[derive(Debug, PartialEq)] -pub struct SpaceIDRes { - uuid: Uuid, - name: Box, -} - -impl SpaceIDRes { - #[cfg(test)] - pub fn new(uuid: Uuid, name: Box) -> Self { - Self { uuid, name } - } -} -struct SpaceID<'a>(PhantomData>); -pub struct SpaceIDMD { - uuid: Uuid, - space_name_l: u64, -} - -impl<'a> PersistObject for SpaceID<'a> { - const METADATA_SIZE: usize = sizeof!(u128) + sizeof!(u64); - type InputType = SpaceIDRef<'a>; - type OutputType = SpaceIDRes; - type Metadata = SpaceIDMD; - fn pretest_can_dec_object(scanner: &BufferedScanner, md: &Self::Metadata) -> bool { - scanner.has_left(md.space_name_l as usize) - } - fn meta_enc(buf: &mut Vec, data: Self::InputType) { - buf.extend(data.uuid.to_le_bytes()); - buf.extend(data.name.len().u64_bytes_le()); - } - unsafe fn meta_dec(scanner: &mut BufferedScanner) -> RuntimeResult { - Ok(SpaceIDMD { - uuid: Uuid::from_bytes(scanner.next_chunk()), - space_name_l: scanner.next_u64_le(), - }) - } - fn obj_enc(buf: &mut Vec, data: Self::InputType) { - buf.extend(data.name.as_bytes()); - } - unsafe fn obj_dec( - s: &mut BufferedScanner, - md: Self::Metadata, - ) -> RuntimeResult { - let str = inf::dec::utils::decode_string(s, md.space_name_l as usize)?; - Ok(SpaceIDRes { - uuid: md.uuid, - name: str.into_boxed_str(), - }) - } -} +pub mod space; +pub mod model; diff --git a/server/src/engine/txn/gns/model.rs b/server/src/engine/txn/gns/model.rs index fc0f6c8d..64cf555d 100644 --- a/server/src/engine/txn/gns/model.rs +++ b/server/src/engine/txn/gns/model.rs @@ -1,5 +1,5 @@ /* - * Created on Wed Aug 23 2023 + * Created on Sat Feb 10 2024 * * This file is a part of Skytable * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source @@ -7,7 +7,7 @@ * vision to provide flexibility in data modelling without compromising * on performance, queryability or scalability. * - * Copyright (c) 2023, Sayan Nandan + * Copyright (c) 2024, Sayan Nandan * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as published by @@ -24,321 +24,40 @@ * */ -use { - super::GNSEvent, - crate::{ - engine::{ - core::{ - model::{Field, Model}, - space::Space, - GlobalNS, {EntityID, EntityIDRef}, - }, - data::uuid::Uuid, - error::TransactionError, - error::{RuntimeResult, StorageError}, - idx::{IndexST, IndexSTSeqCns, STIndex, STIndexSeq}, - mem::BufferedScanner, - ql::lex::Ident, - storage::v1::inf::{self, map, obj, PersistObject}, - }, - util::EndianQW, - }, - std::marker::PhantomData, +use crate::engine::{ + core::model::{Field, Model}, + idx::{IndexST, IndexSTSeqCns}, + ql::lex::Ident, + txn::{ModelIDRef, SpaceIDRef}, }; -pub struct ModelID<'a>(PhantomData<&'a ()>); -#[derive(Debug, Clone, Copy)] -pub struct ModelIDRef<'a> { - space_id: super::SpaceIDRef<'a>, - model_name: &'a str, - model_uuid: Uuid, - model_version: u64, -} - -impl<'a> ModelIDRef<'a> { - pub fn new_ref( - space_name: &'a str, - space: &'a Space, - model_name: &'a str, - model: &'a Model, - ) -> ModelIDRef<'a> { - ModelIDRef::new( - super::SpaceIDRef::new(space_name, space), - model_name, - model.get_uuid(), - model.delta_state().schema_current_version().value_u64(), - ) - } - pub fn new( - space_id: super::SpaceIDRef<'a>, - model_name: &'a str, - model_uuid: Uuid, - model_version: u64, - ) -> Self { - Self { - space_id, - model_name, - model_uuid, - model_version, - } - } -} -#[derive(Debug, PartialEq)] -pub struct ModelIDRes { - space_id: super::SpaceIDRes, - model_name: Box, - model_uuid: Uuid, - model_version: u64, -} - -impl ModelIDRes { - #[cfg(test)] - pub fn new( - space_id: super::SpaceIDRes, - model_name: Box, - model_uuid: Uuid, - model_version: u64, - ) -> Self { - Self { - space_id, - model_name, - model_uuid, - model_version, - } - } -} -pub struct ModelIDMD { - space_id: super::SpaceIDMD, - model_name_l: u64, - model_version: u64, - model_uuid: Uuid, -} - -impl<'a> PersistObject for ModelID<'a> { - const METADATA_SIZE: usize = - sizeof!(u64, 2) + sizeof!(u128) + ::METADATA_SIZE; - type InputType = ModelIDRef<'a>; - type OutputType = ModelIDRes; - type Metadata = ModelIDMD; - fn pretest_can_dec_object(scanner: &BufferedScanner, md: &Self::Metadata) -> bool { - scanner.has_left(md.model_name_l as usize + md.space_id.space_name_l as usize) - } - fn meta_enc(buf: &mut Vec, data: Self::InputType) { - ::meta_enc(buf, data.space_id); - buf.extend(data.model_name.len().u64_bytes_le()); - buf.extend(data.model_version.to_le_bytes()); - buf.extend(data.model_uuid.to_le_bytes()); - } - unsafe fn meta_dec(scanner: &mut BufferedScanner) -> RuntimeResult { - Ok(ModelIDMD { - space_id: ::meta_dec(scanner)?, - model_name_l: scanner.next_u64_le(), - model_version: scanner.next_u64_le(), - model_uuid: Uuid::from_bytes(scanner.next_chunk()), - }) - } - fn obj_enc(buf: &mut Vec, data: Self::InputType) { - ::obj_enc(buf, data.space_id); - buf.extend(data.model_name.as_bytes()); - } - unsafe fn obj_dec( - s: &mut BufferedScanner, - md: Self::Metadata, - ) -> RuntimeResult { - Ok(ModelIDRes { - space_id: ::obj_dec(s, md.space_id)?, - model_name: inf::dec::utils::decode_string(s, md.model_name_l as usize)? - .into_boxed_str(), - model_uuid: md.model_uuid, - model_version: md.model_version, - }) - } -} - -fn with_space( - gns: &GlobalNS, - space_id: &super::SpaceIDRes, - f: impl FnOnce(&Space) -> RuntimeResult, -) -> RuntimeResult { - let spaces = gns.idx().read(); - let Some(space) = spaces.st_get(&space_id.name) else { - return Err(TransactionError::OnRestoreDataMissing.into()); - }; - if space.get_uuid() != space_id.uuid { - return Err(TransactionError::OnRestoreDataConflictMismatch.into()); - } - f(&space) -} - -fn with_space_mut( - gns: &GlobalNS, - space_id: &super::SpaceIDRes, - mut f: impl FnMut(&mut Space) -> RuntimeResult, -) -> RuntimeResult { - let mut spaces = gns.idx().write(); - let Some(space) = spaces.st_get_mut(&space_id.name) else { - return Err(TransactionError::OnRestoreDataMissing.into()); - }; - if space.get_uuid() != space_id.uuid { - return Err(TransactionError::OnRestoreDataConflictMismatch.into()); - } - f(space) -} - -fn with_model_mut( - gns: &GlobalNS, - space_id: &super::SpaceIDRes, - model_id: &ModelIDRes, - f: impl FnOnce(&mut Model) -> RuntimeResult, -) -> RuntimeResult { - with_space(gns, space_id, |_| { - let mut models = gns.idx_models().write(); - let Some(model) = models.get_mut(&EntityIDRef::new(&space_id.name, &model_id.model_name)) - else { - return Err(TransactionError::OnRestoreDataMissing.into()); - }; - if model.get_uuid() != model_id.model_uuid { - // this should have been handled by an earlier transaction - return Err(TransactionError::OnRestoreDataConflictMismatch.into()); - } - f(model) - }) -} - -/* - create model -*/ - #[derive(Debug, Clone, Copy)] /// The commit payload for a `create model ... (...) with {...}` txn pub struct CreateModelTxn<'a> { - space_id: super::SpaceIDRef<'a>, + space_id: SpaceIDRef<'a>, model_name: &'a str, model: &'a Model, } impl<'a> CreateModelTxn<'a> { - pub const fn new( - space_id: super::SpaceIDRef<'a>, - model_name: &'a str, - model: &'a Model, - ) -> Self { + pub const fn new(space_id: SpaceIDRef<'a>, model_name: &'a str, model: &'a Model) -> Self { Self { space_id, model_name, model, } } -} - -#[derive(Debug)] -#[cfg_attr(test, derive(PartialEq))] -pub struct CreateModelTxnRestorePL { - pub(super) space_id: super::SpaceIDRes, - pub(super) model_name: Box, - pub(super) model: Model, -} - -pub struct CreateModelTxnMD { - space_id_meta: super::SpaceIDMD, - model_name_l: u64, - model_meta: as PersistObject>::Metadata, -} - -impl<'a> PersistObject for CreateModelTxn<'a> { - const METADATA_SIZE: usize = ::METADATA_SIZE - + sizeof!(u64) - + as PersistObject>::METADATA_SIZE; - type InputType = CreateModelTxn<'a>; - type OutputType = CreateModelTxnRestorePL; - type Metadata = CreateModelTxnMD; - fn pretest_can_dec_object(scanner: &BufferedScanner, md: &Self::Metadata) -> bool { - scanner.has_left((md.model_meta.p_key_len() + md.model_name_l) as usize) - } - fn meta_enc(buf: &mut Vec, data: Self::InputType) { - // space ID - ::meta_enc(buf, data.space_id); - // model name - buf.extend(data.model_name.len().u64_bytes_le()); - // model meta dump - ::meta_enc(buf, obj::ModelLayoutRef::from(data.model)) + pub fn space_id(&self) -> SpaceIDRef<'_> { + self.space_id } - unsafe fn meta_dec(scanner: &mut BufferedScanner) -> RuntimeResult { - let space_id = ::meta_dec(scanner)?; - let model_name_l = scanner.next_u64_le(); - let model_meta = ::meta_dec(scanner)?; - Ok(CreateModelTxnMD { - space_id_meta: space_id, - model_name_l, - model_meta, - }) - } - fn obj_enc(buf: &mut Vec, data: Self::InputType) { - // space id dump - ::obj_enc(buf, data.space_id); - // model name - buf.extend(data.model_name.as_bytes()); - // model dump - ::obj_enc(buf, obj::ModelLayoutRef::from(data.model)) - } - unsafe fn obj_dec( - s: &mut BufferedScanner, - md: Self::Metadata, - ) -> RuntimeResult { - let space_id = ::obj_dec(s, md.space_id_meta)?; - let model_name = - inf::dec::utils::decode_string(s, md.model_name_l as usize)?.into_boxed_str(); - let model = ::obj_dec(s, md.model_meta)?; - Ok(CreateModelTxnRestorePL { - space_id, - model_name, - model, - }) + pub fn model_name(&self) -> &str { + self.model_name } -} - -impl<'a> GNSEvent for CreateModelTxn<'a> { - const OPC: u16 = 3; - type CommitType = CreateModelTxn<'a>; - type RestoreType = CreateModelTxnRestorePL; - fn update_global_state( - CreateModelTxnRestorePL { - space_id, - model_name, - model, - }: Self::RestoreType, - gns: &GlobalNS, - ) -> RuntimeResult<()> { - /* - NOTE(@ohsayan): - A jump to the second branch is practically impossible and should be caught long before we actually end up - here (due to mismatched checksums), but might be theoretically possible because the cosmic rays can be wild - (or well magnetic stuff arounding spinning disks). But we just want to be extra sure. Don't let the aliens (or - rather, radiation) from the cosmos deter us! - */ - let mut spaces = gns.idx().write(); - let mut models = gns.idx_models().write(); - let Some(space) = spaces.get_mut(&space_id.name) else { - return Err(TransactionError::OnRestoreDataMissing.into()); - }; - if space.models().contains(&model_name) { - return Err(TransactionError::OnRestoreDataConflictAlreadyExists.into()); - } - if models - .insert(EntityID::new(&space_id.name, &model_name), model) - .is_some() - { - return Err(TransactionError::OnRestoreDataConflictMismatch.into()); - } - space.models_mut().insert(model_name); - Ok(()) + pub fn model(&self) -> &Model { + self.model } } -/* - alter model add -*/ - #[derive(Debug, Clone, Copy)] /// Transaction commit payload for an `alter model add ...` query pub struct AlterModelAddTxn<'a> { @@ -356,86 +75,14 @@ impl<'a> AlterModelAddTxn<'a> { new_fields, } } -} -pub struct AlterModelAddTxnMD { - model_id_meta: ModelIDMD, - new_field_c: u64, -} -#[derive(Debug)] -#[cfg_attr(test, derive(PartialEq))] -pub struct AlterModelAddTxnRestorePL { - pub(super) model_id: ModelIDRes, - pub(super) new_fields: IndexSTSeqCns, Field>, -} -impl<'a> PersistObject for AlterModelAddTxn<'a> { - const METADATA_SIZE: usize = ::METADATA_SIZE + sizeof!(u64); - type InputType = AlterModelAddTxn<'a>; - type OutputType = AlterModelAddTxnRestorePL; - type Metadata = AlterModelAddTxnMD; - fn pretest_can_dec_object(scanner: &BufferedScanner, md: &Self::Metadata) -> bool { - scanner.has_left( - (md.model_id_meta.space_id.space_name_l + md.model_id_meta.model_name_l) as usize, - ) - } - fn meta_enc(buf: &mut Vec, data: Self::InputType) { - ::meta_enc(buf, data.model_id); - buf.extend(data.new_fields.st_len().u64_bytes_le()); - } - unsafe fn meta_dec(scanner: &mut BufferedScanner) -> RuntimeResult { - let model_id_meta = ::meta_dec(scanner)?; - let new_field_c = scanner.next_u64_le(); - Ok(AlterModelAddTxnMD { - model_id_meta, - new_field_c, - }) + pub fn model_id(&self) -> ModelIDRef<'_> { + self.model_id } - fn obj_enc(buf: &mut Vec, data: Self::InputType) { - ::obj_enc(buf, data.model_id); - > as PersistObject>::obj_enc(buf, data.new_fields); - } - unsafe fn obj_dec( - s: &mut BufferedScanner, - md: Self::Metadata, - ) -> RuntimeResult { - let model_id = ::obj_dec(s, md.model_id_meta)?; - let new_fields = , _>>> as PersistObject>::obj_dec( - s, - map::MapIndexSizeMD(md.new_field_c as usize), - )?; - Ok(AlterModelAddTxnRestorePL { - model_id, - new_fields, - }) - } -} - -impl<'a> GNSEvent for AlterModelAddTxn<'a> { - const OPC: u16 = 4; - type CommitType = AlterModelAddTxn<'a>; - type RestoreType = AlterModelAddTxnRestorePL; - fn update_global_state( - AlterModelAddTxnRestorePL { - model_id, - new_fields, - }: Self::RestoreType, - gns: &GlobalNS, - ) -> RuntimeResult<()> { - with_model_mut(gns, &model_id.space_id, &model_id, |model| { - let mut mutator = model.model_mutator(); - for (field_name, field) in new_fields.stseq_owned_kv() { - if !mutator.add_field(field_name, field) { - return Err(TransactionError::OnRestoreDataConflictMismatch.into()); - } - } - Ok(()) - }) + pub fn new_fields(&self) -> &IndexSTSeqCns, Field> { + self.new_fields } } -/* - alter model remove -*/ - #[derive(Debug, Clone, Copy)] /// Transaction commit payload for an `alter model remove` transaction pub struct AlterModelRemoveTxn<'a> { @@ -449,98 +96,14 @@ impl<'a> AlterModelRemoveTxn<'a> { removed_fields, } } -} -pub struct AlterModelRemoveTxnMD { - model_id_meta: ModelIDMD, - remove_field_c: u64, -} -#[derive(Debug, PartialEq)] -pub struct AlterModelRemoveTxnRestorePL { - pub(super) model_id: ModelIDRes, - pub(super) removed_fields: Box<[Box]>, -} - -impl<'a> PersistObject for AlterModelRemoveTxn<'a> { - const METADATA_SIZE: usize = ::METADATA_SIZE + sizeof!(u64); - type InputType = AlterModelRemoveTxn<'a>; - type OutputType = AlterModelRemoveTxnRestorePL; - type Metadata = AlterModelRemoveTxnMD; - fn pretest_can_dec_object(scanner: &BufferedScanner, md: &Self::Metadata) -> bool { - scanner.has_left( - (md.model_id_meta.space_id.space_name_l + md.model_id_meta.model_name_l) as usize, - ) - } - fn meta_enc(buf: &mut Vec, data: Self::InputType) { - ::meta_enc(buf, data.model_id); - buf.extend(data.removed_fields.len().u64_bytes_le()); - } - unsafe fn meta_dec(scanner: &mut BufferedScanner) -> RuntimeResult { - let model_id_meta = ::meta_dec(scanner)?; - Ok(AlterModelRemoveTxnMD { - model_id_meta, - remove_field_c: scanner.next_u64_le(), - }) - } - fn obj_enc(buf: &mut Vec, data: Self::InputType) { - ::obj_enc(buf, data.model_id); - for field in data.removed_fields { - buf.extend(field.len().u64_bytes_le()); - buf.extend(field.as_bytes()); - } - } - unsafe fn obj_dec( - s: &mut BufferedScanner, - md: Self::Metadata, - ) -> RuntimeResult { - let model_id = ::obj_dec(s, md.model_id_meta)?; - let mut removed_fields = Vec::with_capacity(md.remove_field_c as usize); - while !s.eof() - & (removed_fields.len() as u64 != md.remove_field_c) - & s.has_left(sizeof!(u64)) - { - let len = s.next_u64_le() as usize; - if !s.has_left(len) { - break; - } - removed_fields.push(inf::dec::utils::decode_string(s, len)?.into_boxed_str()); - } - if removed_fields.len() as u64 != md.remove_field_c { - return Err(StorageError::InternalDecodeStructureCorruptedPayload.into()); - } - Ok(AlterModelRemoveTxnRestorePL { - model_id, - removed_fields: removed_fields.into_boxed_slice(), - }) + pub fn model_id(&self) -> ModelIDRef<'_> { + self.model_id } -} - -impl<'a> GNSEvent for AlterModelRemoveTxn<'a> { - const OPC: u16 = 5; - type CommitType = AlterModelRemoveTxn<'a>; - type RestoreType = AlterModelRemoveTxnRestorePL; - fn update_global_state( - AlterModelRemoveTxnRestorePL { - model_id, - removed_fields, - }: Self::RestoreType, - gns: &GlobalNS, - ) -> RuntimeResult<()> { - with_model_mut(gns, &model_id.space_id, &model_id, |model| { - let mut mutator = model.model_mutator(); - for removed_field in removed_fields.iter() { - if !mutator.remove_field(&removed_field) { - return Err(TransactionError::OnRestoreDataConflictMismatch.into()); - } - } - Ok(()) - }) + pub fn removed_fields(&self) -> &[Ident<'_>] { + self.removed_fields } } -/* - alter model update -*/ - #[derive(Debug, Clone, Copy)] /// Transaction commit payload for an `alter model update ...` query pub struct AlterModelUpdateTxn<'a> { @@ -558,89 +121,16 @@ impl<'a> AlterModelUpdateTxn<'a> { updated_fields, } } -} -pub struct AlterModelUpdateTxnMD { - model_id_md: ModelIDMD, - updated_field_c: u64, -} -#[derive(Debug, PartialEq)] -pub struct AlterModelUpdateTxnRestorePL { - pub(super) model_id: ModelIDRes, - pub(super) updated_fields: IndexSTSeqCns, Field>, -} -impl<'a> PersistObject for AlterModelUpdateTxn<'a> { - const METADATA_SIZE: usize = ::METADATA_SIZE + sizeof!(u64); - type InputType = AlterModelUpdateTxn<'a>; - type OutputType = AlterModelUpdateTxnRestorePL; - type Metadata = AlterModelUpdateTxnMD; - fn pretest_can_dec_object(scanner: &BufferedScanner, md: &Self::Metadata) -> bool { - scanner.has_left( - md.model_id_md.space_id.space_name_l as usize + md.model_id_md.model_name_l as usize, - ) - } - fn meta_enc(buf: &mut Vec, data: Self::InputType) { - ::meta_enc(buf, data.model_id); - buf.extend(data.updated_fields.st_len().u64_bytes_le()); - } - unsafe fn meta_dec(scanner: &mut BufferedScanner) -> RuntimeResult { - let model_id_md = ::meta_dec(scanner)?; - Ok(AlterModelUpdateTxnMD { - model_id_md, - updated_field_c: scanner.next_u64_le(), - }) - } - fn obj_enc(buf: &mut Vec, data: Self::InputType) { - ::obj_enc(buf, data.model_id); - > as PersistObject>::obj_enc( - buf, - data.updated_fields, - ); + pub fn model_id(&self) -> ModelIDRef<'_> { + self.model_id } - unsafe fn obj_dec( - s: &mut BufferedScanner, - md: Self::Metadata, - ) -> RuntimeResult { - let model_id = ::obj_dec(s, md.model_id_md)?; - let updated_fields = - , _>>> as PersistObject>::obj_dec( - s, - map::MapIndexSizeMD(md.updated_field_c as usize), - )?; - Ok(AlterModelUpdateTxnRestorePL { - model_id, - updated_fields, - }) - } -} -impl<'a> GNSEvent for AlterModelUpdateTxn<'a> { - const OPC: u16 = 6; - type CommitType = AlterModelUpdateTxn<'a>; - type RestoreType = AlterModelUpdateTxnRestorePL; - fn update_global_state( - AlterModelUpdateTxnRestorePL { - model_id, - updated_fields, - }: Self::RestoreType, - gns: &GlobalNS, - ) -> RuntimeResult<()> { - with_model_mut(gns, &model_id.space_id, &model_id, |model| { - let mut mutator = model.model_mutator(); - for (field_id, field) in updated_fields.stseq_owned_kv() { - if !mutator.update_field(&field_id, field) { - return Err(TransactionError::OnRestoreDataConflictMismatch.into()); - } - } - Ok(()) - }) + pub fn updated_fields(&self) -> &IndexST, Field> { + self.updated_fields } } -/* - drop model -*/ - #[derive(Debug, Clone, Copy)] /// Transaction commit payload for a `drop model ...` query pub struct DropModelTxn<'a> { @@ -651,64 +141,7 @@ impl<'a> DropModelTxn<'a> { pub const fn new(model_id: ModelIDRef<'a>) -> Self { Self { model_id } } -} -pub struct DropModelTxnMD { - model_id_md: ModelIDMD, -} -impl<'a> PersistObject for DropModelTxn<'a> { - const METADATA_SIZE: usize = ::METADATA_SIZE; - type InputType = DropModelTxn<'a>; - type OutputType = ModelIDRes; - type Metadata = DropModelTxnMD; - fn pretest_can_dec_object(scanner: &BufferedScanner, md: &Self::Metadata) -> bool { - scanner.has_left( - md.model_id_md.space_id.space_name_l as usize + md.model_id_md.model_name_l as usize, - ) - } - fn meta_enc(buf: &mut Vec, data: Self::InputType) { - ::meta_enc(buf, data.model_id); - } - unsafe fn meta_dec(scanner: &mut BufferedScanner) -> RuntimeResult { - let model_id_md = ::meta_dec(scanner)?; - Ok(DropModelTxnMD { model_id_md }) - } - fn obj_enc(buf: &mut Vec, data: Self::InputType) { - ::obj_enc(buf, data.model_id); - } - unsafe fn obj_dec( - s: &mut BufferedScanner, - md: Self::Metadata, - ) -> RuntimeResult { - ::obj_dec(s, md.model_id_md) - } -} - -impl<'a> GNSEvent for DropModelTxn<'a> { - const OPC: u16 = 7; - type CommitType = DropModelTxn<'a>; - type RestoreType = ModelIDRes; - fn update_global_state( - ModelIDRes { - space_id, - model_name, - model_uuid, - model_version: _, - }: Self::RestoreType, - gns: &GlobalNS, - ) -> RuntimeResult<()> { - with_space_mut(gns, &space_id, |space| { - let mut models = gns.idx_models().write(); - if !space.models_mut().remove(&model_name) { - return Err(TransactionError::OnRestoreDataMissing.into()); - } - let Some(removed_model) = models.remove(&EntityIDRef::new(&space_id.name, &model_name)) - else { - return Err(TransactionError::OnRestoreDataMissing.into()); - }; - if removed_model.get_uuid() != model_uuid { - return Err(TransactionError::OnRestoreDataConflictMismatch.into()); - } - Ok(()) - }) + pub fn model_id(&self) -> ModelIDRef<'_> { + self.model_id } } diff --git a/server/src/engine/txn/gns/space.rs b/server/src/engine/txn/gns/space.rs index c4199463..984db529 100644 --- a/server/src/engine/txn/gns/space.rs +++ b/server/src/engine/txn/gns/space.rs @@ -1,5 +1,5 @@ /* - * Created on Wed Aug 23 2023 + * Created on Sat Feb 10 2024 * * This file is a part of Skytable * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source @@ -7,7 +7,7 @@ * vision to provide flexibility in data modelling without compromising * on performance, queryability or scalability. * - * Copyright (c) 2023, Sayan Nandan + * Copyright (c) 2024, Sayan Nandan * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as published by @@ -24,31 +24,14 @@ * */ -use { - super::GNSEvent, - crate::{ - engine::{ - core::{space::Space, EntityIDRef, GlobalNS}, - data::DictGeneric, - error::{RuntimeResult, TransactionError}, - idx::STIndex, - mem::BufferedScanner, - storage::v1::inf::{self, map, obj, PersistObject}, - }, - util::EndianQW, - }, -}; - -/* - create space -*/ +use crate::engine::{core::space::Space, data::DictGeneric, txn::SpaceIDRef}; #[derive(Clone, Copy)] /// Transaction commit payload for a `create space ...` query pub struct CreateSpaceTxn<'a> { - pub(super) space_meta: &'a DictGeneric, - pub(super) space_name: &'a str, - pub(super) space: &'a Space, + space_meta: &'a DictGeneric, + space_name: &'a str, + space: &'a Space, } impl<'a> CreateSpaceTxn<'a> { @@ -59,245 +42,50 @@ impl<'a> CreateSpaceTxn<'a> { space, } } -} - -#[derive(Debug)] -#[cfg_attr(test, derive(PartialEq))] -pub struct CreateSpaceTxnRestorePL { - pub(super) space_name: Box, - pub(super) space: Space, -} - -pub struct CreateSpaceTxnMD { - pub(super) space_name_l: u64, - pub(super) space_meta: as PersistObject>::Metadata, -} - -impl<'a> PersistObject for CreateSpaceTxn<'a> { - const METADATA_SIZE: usize = - as PersistObject>::METADATA_SIZE + sizeof!(u64); - type InputType = CreateSpaceTxn<'a>; - type OutputType = CreateSpaceTxnRestorePL; - type Metadata = CreateSpaceTxnMD; - fn pretest_can_dec_object(scanner: &BufferedScanner, md: &Self::Metadata) -> bool { - scanner.has_left(md.space_name_l as usize) - } - fn meta_enc(buf: &mut Vec, data: Self::InputType) { - buf.extend(data.space_name.len().u64_bytes_le()); - as PersistObject>::meta_enc( - buf, - obj::SpaceLayoutRef::from((data.space, data.space_meta)), - ); + pub fn space_meta(&self) -> &DictGeneric { + self.space_meta } - unsafe fn meta_dec(scanner: &mut BufferedScanner) -> RuntimeResult { - let space_name_l = scanner.next_u64_le(); - let space_meta = ::meta_dec(scanner)?; - Ok(CreateSpaceTxnMD { - space_name_l, - space_meta, - }) - } - fn obj_enc(buf: &mut Vec, data: Self::InputType) { - buf.extend(data.space_name.as_bytes()); - ::obj_enc(buf, (data.space, data.space_meta).into()); + pub fn space_name(&self) -> &str { + self.space_name } - unsafe fn obj_dec( - s: &mut BufferedScanner, - md: Self::Metadata, - ) -> RuntimeResult { - let space_name = - inf::dec::utils::decode_string(s, md.space_name_l as usize)?.into_boxed_str(); - let space = ::obj_dec(s, md.space_meta)?; - Ok(CreateSpaceTxnRestorePL { space_name, space }) + pub fn space(&self) -> &Space { + self.space } } -impl<'a> GNSEvent for CreateSpaceTxn<'a> { - const OPC: u16 = 0; - type CommitType = CreateSpaceTxn<'a>; - type RestoreType = CreateSpaceTxnRestorePL; - fn update_global_state( - CreateSpaceTxnRestorePL { space_name, space }: CreateSpaceTxnRestorePL, - gns: &crate::engine::core::GlobalNS, - ) -> RuntimeResult<()> { - let mut spaces = gns.idx().write(); - if spaces.st_insert(space_name, space.into()) { - Ok(()) - } else { - Err(TransactionError::OnRestoreDataConflictAlreadyExists.into()) - } - } -} - -/* - alter space - --- - for now dump the entire meta -*/ - #[derive(Clone, Copy)] /// Transaction payload for an `alter space ...` query pub struct AlterSpaceTxn<'a> { - space_id: super::SpaceIDRef<'a>, + space_id: SpaceIDRef<'a>, updated_props: &'a DictGeneric, } impl<'a> AlterSpaceTxn<'a> { - pub const fn new(space_id: super::SpaceIDRef<'a>, updated_props: &'a DictGeneric) -> Self { + pub const fn new(space_id: SpaceIDRef<'a>, updated_props: &'a DictGeneric) -> Self { Self { space_id, updated_props, } } -} - -pub struct AlterSpaceTxnMD { - space_id_meta: super::SpaceIDMD, - dict_len: u64, -} - -#[derive(Debug, PartialEq)] -pub struct AlterSpaceTxnRestorePL { - pub(super) space_id: super::SpaceIDRes, - pub(super) space_meta: DictGeneric, -} - -impl<'a> PersistObject for AlterSpaceTxn<'a> { - const METADATA_SIZE: usize = sizeof!(u64, 2) + sizeof!(u128); - type InputType = AlterSpaceTxn<'a>; - type OutputType = AlterSpaceTxnRestorePL; - type Metadata = AlterSpaceTxnMD; - fn pretest_can_dec_object(scanner: &BufferedScanner, md: &Self::Metadata) -> bool { - scanner.has_left(md.space_id_meta.space_name_l as usize) - } - fn meta_enc(buf: &mut Vec, data: Self::InputType) { - ::meta_enc(buf, data.space_id); - buf.extend(data.updated_props.len().u64_bytes_le()); - } - unsafe fn meta_dec(scanner: &mut BufferedScanner) -> RuntimeResult { - Ok(AlterSpaceTxnMD { - space_id_meta: ::meta_dec(scanner)?, - dict_len: scanner.next_u64_le(), - }) + pub fn space_id(&self) -> SpaceIDRef<'_> { + self.space_id } - fn obj_enc(buf: &mut Vec, data: Self::InputType) { - ::obj_enc(buf, data.space_id); - as PersistObject>::obj_enc( - buf, - data.updated_props, - ); - } - unsafe fn obj_dec( - s: &mut BufferedScanner, - md: Self::Metadata, - ) -> RuntimeResult { - let space_id = ::obj_dec(s, md.space_id_meta)?; - let space_meta = as PersistObject>::obj_dec( - s, - map::MapIndexSizeMD(md.dict_len as usize), - )?; - Ok(AlterSpaceTxnRestorePL { - space_id, - space_meta, - }) + pub fn updated_props(&self) -> &DictGeneric { + self.updated_props } } -impl<'a> GNSEvent for AlterSpaceTxn<'a> { - const OPC: u16 = 1; - type CommitType = AlterSpaceTxn<'a>; - type RestoreType = AlterSpaceTxnRestorePL; - - fn update_global_state( - AlterSpaceTxnRestorePL { - space_id, - space_meta, - }: Self::RestoreType, - gns: &crate::engine::core::GlobalNS, - ) -> RuntimeResult<()> { - let mut gns = gns.idx().write(); - match gns.st_get_mut(&space_id.name) { - Some(space) => { - if !crate::engine::data::dict::rmerge_metadata(space.props_mut(), space_meta) { - return Err(TransactionError::OnRestoreDataConflictMismatch.into()); - } - } - None => return Err(TransactionError::OnRestoreDataMissing.into()), - } - Ok(()) - } -} - -/* - drop space -*/ - #[derive(Clone, Copy)] /// Transaction commit payload for a `drop space ...` query pub struct DropSpaceTxn<'a> { - space_id: super::SpaceIDRef<'a>, + space_id: SpaceIDRef<'a>, } impl<'a> DropSpaceTxn<'a> { - pub const fn new(space_id: super::SpaceIDRef<'a>) -> Self { + pub const fn new(space_id: SpaceIDRef<'a>) -> Self { Self { space_id } } -} - -impl<'a> PersistObject for DropSpaceTxn<'a> { - const METADATA_SIZE: usize = sizeof!(u128) + sizeof!(u64); - type InputType = DropSpaceTxn<'a>; - type OutputType = super::SpaceIDRes; - type Metadata = super::SpaceIDMD; - fn pretest_can_dec_object(scanner: &BufferedScanner, md: &Self::Metadata) -> bool { - scanner.has_left(md.space_name_l as usize) - } - fn meta_enc(buf: &mut Vec, data: Self::InputType) { - ::meta_enc(buf, data.space_id); - } - unsafe fn meta_dec(scanner: &mut BufferedScanner) -> RuntimeResult { - ::meta_dec(scanner) - } - fn obj_enc(buf: &mut Vec, data: Self::InputType) { - ::obj_enc(buf, data.space_id) - } - unsafe fn obj_dec( - s: &mut BufferedScanner, - md: Self::Metadata, - ) -> RuntimeResult { - ::obj_dec(s, md) - } -} - -impl<'a> GNSEvent for DropSpaceTxn<'a> { - const OPC: u16 = 2; - type CommitType = DropSpaceTxn<'a>; - type RestoreType = super::SpaceIDRes; - fn update_global_state( - super::SpaceIDRes { uuid, name }: Self::RestoreType, - gns: &GlobalNS, - ) -> RuntimeResult<()> { - let mut wgns = gns.idx().write(); - let mut wmodel = gns.idx_models().write(); - match wgns.entry(name) { - std::collections::hash_map::Entry::Occupied(oe) => { - if oe.get().get_uuid() == uuid { - for model in oe.get().models() { - let id: EntityIDRef<'static> = unsafe { - // UNSAFE(@ohsayan): I really need a pack of what the borrow checker has been reveling on - core::mem::transmute(EntityIDRef::new(oe.key(), &model)) - }; - let _ = wmodel.st_delete(&id); - } - oe.remove_entry(); - Ok(()) - } else { - return Err(TransactionError::OnRestoreDataConflictMismatch.into()); - } - } - std::collections::hash_map::Entry::Vacant(_) => { - return Err(TransactionError::OnRestoreDataMissing.into()) - } - } + pub fn space_id(&self) -> SpaceIDRef<'_> { + self.space_id } } diff --git a/server/src/engine/txn/mod.rs b/server/src/engine/txn/mod.rs index 3c258b82..27f89fce 100644 --- a/server/src/engine/txn/mod.rs +++ b/server/src/engine/txn/mod.rs @@ -23,5 +23,8 @@ * along with this program. If not, see . * */ - pub mod gns; +mod shared; + +// re-export +pub use shared::{ModelIDRef, SpaceIDRef}; diff --git a/server/src/engine/txn/shared.rs b/server/src/engine/txn/shared.rs new file mode 100644 index 00000000..bb72d00c --- /dev/null +++ b/server/src/engine/txn/shared.rs @@ -0,0 +1,100 @@ +/* + * Created on Sat Feb 10 2024 + * + * This file is a part of Skytable + * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source + * NoSQL database written by Sayan Nandan ("the Author") with the + * vision to provide flexibility in data modelling without compromising + * on performance, queryability or scalability. + * + * Copyright (c) 2024, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +use crate::engine::{ + core::{model::Model, space::Space}, + data::uuid::Uuid, +}; + +#[derive(Debug, Clone, Copy)] +pub struct SpaceIDRef<'a> { + uuid: Uuid, + name: &'a str, +} + +impl<'a> SpaceIDRef<'a> { + pub fn new(name: &'a str, space: &Space) -> Self { + Self { + uuid: space.get_uuid(), + name, + } + } + pub fn name(&self) -> &str { + self.name + } + pub fn uuid(&self) -> Uuid { + self.uuid + } +} + +#[derive(Debug, Clone, Copy)] +pub struct ModelIDRef<'a> { + space_id: SpaceIDRef<'a>, + model_name: &'a str, + model_uuid: Uuid, + model_version: u64, +} + +impl<'a> ModelIDRef<'a> { + pub fn new_ref( + space_name: &'a str, + space: &'a Space, + model_name: &'a str, + model: &'a Model, + ) -> ModelIDRef<'a> { + ModelIDRef::new( + SpaceIDRef::new(space_name, space), + model_name, + model.get_uuid(), + model.delta_state().schema_current_version().value_u64(), + ) + } + pub fn new( + space_id: SpaceIDRef<'a>, + model_name: &'a str, + model_uuid: Uuid, + model_version: u64, + ) -> Self { + Self { + space_id, + model_name, + model_uuid, + model_version, + } + } + pub fn space_id(&self) -> SpaceIDRef { + self.space_id + } + pub fn model_name(&self) -> &str { + self.model_name + } + pub fn model_uuid(&self) -> Uuid { + self.model_uuid + } + pub fn model_version(&self) -> u64 { + self.model_version + } +}