Refactor storage and txn modules

next
Sayan Nandan 7 months ago
parent 85c40b60b2
commit ea678e2c9d
No known key found for this signature in database
GPG Key ID: 0EBD769024B24F0A

@ -43,7 +43,7 @@ use {
},
lex::Ident,
},
txn::gns as gnstxn,
txn::{gns, ModelIDRef},
},
util,
},
@ -268,13 +268,8 @@ impl Model {
// TODO(@ohsayan): this impacts lockdown duration; fix it
if G::FS_IS_NON_NULL {
// prepare txn
let txn = gnstxn::AlterModelAddTxn::new(
gnstxn::ModelIDRef::new_ref(
&space_name,
&space,
&model_name,
model,
),
let txn = gns::model::AlterModelAddTxn::new(
ModelIDRef::new_ref(&space_name, &space, &model_name, model),
&new_fields,
);
// commit txn
@ -291,8 +286,8 @@ impl Model {
AlterAction::Remove(removed) => {
if G::FS_IS_NON_NULL {
// prepare txn
let txn = gnstxn::AlterModelRemoveTxn::new(
gnstxn::ModelIDRef::new_ref(&space_name, space, &model_name, model),
let txn = gns::model::AlterModelRemoveTxn::new(
ModelIDRef::new_ref(&space_name, space, &model_name, model),
&removed,
);
// commit txn
@ -306,8 +301,8 @@ impl Model {
AlterAction::Update(updated) => {
if G::FS_IS_NON_NULL {
// prepare txn
let txn = gnstxn::AlterModelUpdateTxn::new(
gnstxn::ModelIDRef::new_ref(&space_name, space, &model_name, model),
let txn = gns::model::AlterModelUpdateTxn::new(
ModelIDRef::new_ref(&space_name, space, &model_name, model),
&updated,
);
// commit txn

@ -44,7 +44,7 @@ use {
drop::DropModel,
syn::{FieldSpec, LayerSpec},
},
txn::gns::{self as gnstxn, SpaceIDRef},
txn::{gns, ModelIDRef, SpaceIDRef},
},
std::collections::hash_map::{Entry, HashMap},
};
@ -281,7 +281,7 @@ impl Model {
if G::FS_IS_NON_NULL {
let mut txn_driver = global.namespace_txn_driver().lock();
// prepare txn
let txn = gnstxn::CreateModelTxn::new(
let txn = gns::model::CreateModelTxn::new(
SpaceIDRef::new(&space_name, &space),
&model_name,
&model,
@ -351,7 +351,7 @@ impl Model {
// okay this is looking good for us
if G::FS_IS_NON_NULL {
// prepare txn
let txn = gnstxn::DropModelTxn::new(gnstxn::ModelIDRef::new(
let txn = gns::model::DropModelTxn::new(ModelIDRef::new(
SpaceIDRef::new(&space_name, &space),
&model_name,
model.get_uuid(),

@ -34,7 +34,7 @@ use {
idx::STIndex,
ql::ddl::{alt::AlterSpace, crt::CreateSpace, drop::DropSpace},
storage::{safe_interfaces::FSInterface, v1::loader::SEInitState},
txn::gns as gnstxn,
txn::{self, SpaceIDRef},
},
std::collections::HashSet,
};
@ -167,7 +167,7 @@ impl Space {
// commit txn
if G::FS_IS_NON_NULL {
// prepare txn
let txn = gnstxn::CreateSpaceTxn::new(space.props(), &space_name, &space);
let txn = txn::gns::space::CreateSpaceTxn::new(space.props(), &space_name, &space);
// try to create space for...the space
G::FileSystem::fs_create_dir_all(&SEInitState::space_dir(
&space_name,
@ -216,8 +216,10 @@ impl Space {
};
if G::FS_IS_NON_NULL {
// prepare txn
let txn =
gnstxn::AlterSpaceTxn::new(gnstxn::SpaceIDRef::new(&space_name, space), &patch);
let txn = txn::gns::space::AlterSpaceTxn::new(
SpaceIDRef::new(&space_name, space),
&patch,
);
// commit
global.namespace_txn_driver().lock().try_commit(txn)?;
}
@ -252,7 +254,7 @@ impl Space {
if G::FS_IS_NON_NULL {
// prepare txn
let txn =
gnstxn::DropSpaceTxn::new(gnstxn::SpaceIDRef::new(&space_name, &space));
txn::gns::space::DropSpaceTxn::new(SpaceIDRef::new(&space_name, &space));
// commit txn
global.namespace_txn_driver().lock().try_commit(txn)?;
// request cleanup
@ -299,7 +301,7 @@ impl Space {
if G::FS_IS_NON_NULL {
// prepare txn
let txn =
gnstxn::DropSpaceTxn::new(gnstxn::SpaceIDRef::new(&space_name, &space));
txn::gns::space::DropSpaceTxn::new(SpaceIDRef::new(&space_name, &space));
// commit txn
global.namespace_txn_driver().lock().try_commit(txn)?;
// request cleanup

@ -28,8 +28,10 @@ use {
super::util,
crate::engine::{
error::RuntimeResult,
storage::{safe_interfaces::FSInterface, v1::data_batch::DataBatchPersistDriver},
txn::gns::GNSTransactionDriverAnyFS,
storage::{
safe_interfaces::FSInterface,
v1::{DataBatchPersistDriver, GNSTransactionDriverAnyFS},
},
},
parking_lot::Mutex,
std::sync::Arc,

@ -32,8 +32,8 @@ use {
storage::{
self,
safe_interfaces::{FSInterface, LocalFS},
v1::GNSTransactionDriverAnyFS,
},
txn::gns::GNSTransactionDriverAnyFS,
},
crate::engine::error::RuntimeResult,
parking_lot::{Mutex, RwLock},
@ -215,7 +215,7 @@ impl GlobalInstanceLike for Global {
))?;
// init driver
let driver =
storage::v1::data_batch::create(&storage::v1::loader::SEInitState::model_path(
storage::v1::create_batch_journal(&storage::v1::loader::SEInitState::model_path(
space_name, space_uuid, model_name, model_uuid,
))?;
self.get_state().mdl_driver.write().insert(

@ -35,8 +35,8 @@ use {
storage::{
self,
safe_interfaces::{FSInterface, NullFS, VirtualFS},
v1::GNSTransactionDriverAnyFS,
},
txn::gns::GNSTransactionDriverAnyFS,
},
parking_lot::{Mutex, RwLock},
std::collections::HashMap,
@ -148,7 +148,7 @@ impl<Fs: FSInterface> GlobalInstanceLike for TestGlobal<Fs> {
space_name, space_uuid, model_name, model_uuid,
))?;
let driver =
storage::v1::data_batch::create(&storage::v1::loader::SEInitState::model_path(
storage::v1::create_batch_journal(&storage::v1::loader::SEInitState::model_path(
space_name, space_uuid, model_name, model_uuid,
))?;
self.model_drivers.write().insert(

@ -444,7 +444,7 @@ macro_rules! closure {
#[cfg(test)]
macro_rules! array {
($($(#[$attr:meta])* $vis:vis const $name:ident: [$ty:ty] = [$($expr:expr),* $(,)?]);* $(;)?) => {
$(#[allow(non_snake_case)] mod $name {pub(super) const LEN: usize = { let mut i = 0;$(let _ = $expr; i += 1;)*i += 0; i};}
$(#[$attr])* $vis const $name: [$ty; $name::LEN] = [$($expr),*];)*
$(#[allow(non_snake_case)]mod$name{pub(super)const LEN:usize={let mut i=0;$(let _=$expr;i+=1;)*i+=0;i};}
$(#[$attr])*$vis const$name:[$ty;$name::LEN]=[$($expr),*];)*
}
}

@ -519,7 +519,7 @@ impl<
fn check_vfs_buffering() {
use crate::engine::storage::{
common::interface::fs_test::{VFileDescriptor, VirtualFS},
v2::spec::{Header, SystemDatabaseV1},
v2::raw::spec::{Header, SystemDatabaseV1},
};
fn rawfile() -> Vec<u8> {
VirtualFS::fetch_raw_data("myfile").unwrap()

@ -0,0 +1,217 @@
/*
* Created on Sun Aug 20 2023
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2023, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use {
super::super::raw::journal::{JournalAdapter, JournalWriter},
crate::{
engine::{
core::GlobalNS,
data::uuid::Uuid,
error::{RuntimeResult, TransactionError},
mem::BufferedScanner,
storage::{
safe_interfaces::{FSInterface, LocalFS},
v1::inf::{self, PersistObject},
},
txn::{gns, SpaceIDRef},
},
util::EndianQW,
},
std::marker::PhantomData,
};
mod model;
mod space;
// test
#[cfg(test)]
mod tests;
/// The GNS transaction driver is used to handle DDL transactions
pub struct GNSTransactionDriverAnyFS<Fs: FSInterface = LocalFS> {
journal: JournalWriter<Fs, GNSAdapter>,
}
impl<Fs: FSInterface> GNSTransactionDriverAnyFS<Fs> {
pub fn new(journal: JournalWriter<Fs, GNSAdapter>) -> Self {
Self { journal }
}
pub fn into_inner(self) -> JournalWriter<Fs, GNSAdapter> {
self.journal
}
pub fn __journal_mut(&mut self) -> &mut JournalWriter<Fs, GNSAdapter> {
&mut self.journal
}
/// Attempts to commit the given event into the journal, handling any possible recovery triggers and returning
/// errors (if any)
pub fn try_commit<GE: GNSEvent>(&mut self, gns_event: GE) -> RuntimeResult<()> {
let mut buf = vec![];
buf.extend(GE::OPC.to_le_bytes());
GE::encode_super_event(gns_event, &mut buf);
self.journal
.append_event_with_recovery_plugin(GNSSuperEvent(buf.into_boxed_slice()))?;
Ok(())
}
}
/*
journal implementor
*/
/// the journal adapter for DDL queries on the GNS
#[derive(Debug)]
pub struct GNSAdapter;
impl JournalAdapter for GNSAdapter {
const RECOVERY_PLUGIN: bool = true;
type JournalEvent = GNSSuperEvent;
type GlobalState = GlobalNS;
type Error = crate::engine::fractal::error::Error;
fn encode(GNSSuperEvent(b): Self::JournalEvent) -> Box<[u8]> {
b
}
fn decode_and_update_state(payload: &[u8], gs: &Self::GlobalState) -> RuntimeResult<()> {
if payload.len() < 2 {
return Err(TransactionError::DecodedUnexpectedEof.into());
}
macro_rules! dispatch {
($($item:ty),* $(,)?) => {
[$(<$item as GNSEvent>::decode_and_update_global_state),*, |_, _| Err(TransactionError::DecodeUnknownTxnOp.into())]
};
}
static DISPATCH: [fn(&mut BufferedScanner, &GlobalNS) -> RuntimeResult<()>; 9] = dispatch!(
gns::space::CreateSpaceTxn,
gns::space::AlterSpaceTxn,
gns::space::DropSpaceTxn,
gns::model::CreateModelTxn,
gns::model::AlterModelAddTxn,
gns::model::AlterModelRemoveTxn,
gns::model::AlterModelUpdateTxn,
gns::model::DropModelTxn
);
let mut scanner = BufferedScanner::new(&payload);
let opc = unsafe {
// UNSAFE(@ohsayan):
u16::from_le_bytes(scanner.next_chunk())
};
match DISPATCH[(opc as usize).min(DISPATCH.len())](&mut scanner, gs) {
Ok(()) if scanner.eof() => return Ok(()),
Ok(_) => Err(TransactionError::DecodeCorruptedPayloadMoreBytes.into()),
Err(e) => Err(e),
}
}
}
/*
Events
---
FIXME(@ohsayan): In the current impl, we unnecessarily use an intermediary buffer which we clearly don't need to (and also makes
pointless allocations). We need to fix this, but with a consistent API (and preferably not something like commit_*(...) unless
we have absolutely no other choice)
---
[OPC:2B][PAYLOAD]
*/
pub struct GNSSuperEvent(Box<[u8]>);
/// Definition for an event in the GNS (DDL queries)
pub trait GNSEvent
where
Self: PersistObject<InputType = Self, OutputType = Self::RestoreType> + Sized,
{
/// OPC for the event (unique)
const OPC: u16;
/// Expected type for a commit
type CommitType;
/// Expected type for a restore
type RestoreType;
/// Encodes the event into the given buffer
fn encode_super_event(commit: Self, buf: &mut Vec<u8>) {
inf::enc::enc_full_into_buffer::<Self>(buf, commit)
}
fn decode_and_update_global_state(
scanner: &mut BufferedScanner,
gns: &GlobalNS,
) -> RuntimeResult<()> {
Self::update_global_state(Self::decode(scanner)?, gns)
}
/// Attempts to decode the event using the given scanner
fn decode(scanner: &mut BufferedScanner) -> RuntimeResult<Self::RestoreType> {
inf::dec::dec_full_from_scanner::<Self>(scanner).map_err(|e| e.into())
}
/// Update the global state from the restored event
fn update_global_state(restore: Self::RestoreType, gns: &GlobalNS) -> RuntimeResult<()>;
}
#[derive(Debug, PartialEq)]
pub struct SpaceIDRes {
uuid: Uuid,
name: Box<str>,
}
impl SpaceIDRes {
#[cfg(test)]
pub fn new(uuid: Uuid, name: Box<str>) -> Self {
Self { uuid, name }
}
}
struct SpaceID<'a>(PhantomData<SpaceIDRef<'a>>);
pub struct SpaceIDMD {
uuid: Uuid,
space_name_l: u64,
}
impl<'a> PersistObject for SpaceID<'a> {
const METADATA_SIZE: usize = sizeof!(u128) + sizeof!(u64);
type InputType = SpaceIDRef<'a>;
type OutputType = SpaceIDRes;
type Metadata = SpaceIDMD;
fn pretest_can_dec_object(scanner: &BufferedScanner, md: &Self::Metadata) -> bool {
scanner.has_left(md.space_name_l as usize)
}
fn meta_enc(buf: &mut Vec<u8>, data: Self::InputType) {
buf.extend(data.uuid().to_le_bytes());
buf.extend(data.name().len().u64_bytes_le());
}
unsafe fn meta_dec(scanner: &mut BufferedScanner) -> RuntimeResult<Self::Metadata> {
Ok(SpaceIDMD {
uuid: Uuid::from_bytes(scanner.next_chunk()),
space_name_l: scanner.next_u64_le(),
})
}
fn obj_enc(buf: &mut Vec<u8>, data: Self::InputType) {
buf.extend(data.name().as_bytes());
}
unsafe fn obj_dec(
s: &mut BufferedScanner,
md: Self::Metadata,
) -> RuntimeResult<Self::OutputType> {
let str = inf::dec::utils::decode_string(s, md.space_name_l as usize)?;
Ok(SpaceIDRes {
uuid: md.uuid,
name: str.into_boxed_str(),
})
}
}

@ -0,0 +1,610 @@
/*
* Created on Wed Aug 23 2023
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2023, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use {
super::GNSEvent,
crate::{
engine::{
core::{
model::{Field, Model},
space::Space,
EntityID, EntityIDRef, GlobalNS,
},
data::uuid::Uuid,
error::{RuntimeResult, StorageError, TransactionError},
idx::{IndexSTSeqCns, STIndex, STIndexSeq},
mem::BufferedScanner,
storage::v1::inf::{self, map, obj, PersistObject},
txn::{
gns::model::{
AlterModelAddTxn, AlterModelRemoveTxn, AlterModelUpdateTxn, CreateModelTxn,
DropModelTxn,
},
ModelIDRef,
},
},
util::EndianQW,
},
core::marker::PhantomData,
};
pub struct ModelID<'a>(PhantomData<&'a ()>);
#[derive(Debug, PartialEq)]
pub struct ModelIDRes {
space_id: super::SpaceIDRes,
model_name: Box<str>,
model_uuid: Uuid,
model_version: u64,
}
impl ModelIDRes {
#[cfg(test)]
pub fn new(
space_id: super::SpaceIDRes,
model_name: Box<str>,
model_uuid: Uuid,
model_version: u64,
) -> Self {
Self {
space_id,
model_name,
model_uuid,
model_version,
}
}
}
pub struct ModelIDMD {
space_id: super::SpaceIDMD,
model_name_l: u64,
model_version: u64,
model_uuid: Uuid,
}
impl<'a> PersistObject for ModelID<'a> {
const METADATA_SIZE: usize =
sizeof!(u64, 2) + sizeof!(u128) + <super::SpaceID as PersistObject>::METADATA_SIZE;
type InputType = ModelIDRef<'a>;
type OutputType = ModelIDRes;
type Metadata = ModelIDMD;
fn pretest_can_dec_object(scanner: &BufferedScanner, md: &Self::Metadata) -> bool {
scanner.has_left(md.model_name_l as usize + md.space_id.space_name_l as usize)
}
fn meta_enc(buf: &mut Vec<u8>, data: Self::InputType) {
<super::SpaceID as PersistObject>::meta_enc(buf, data.space_id());
buf.extend(data.model_name().len().u64_bytes_le());
buf.extend(data.model_version().to_le_bytes());
buf.extend(data.model_uuid().to_le_bytes());
}
unsafe fn meta_dec(scanner: &mut BufferedScanner) -> RuntimeResult<Self::Metadata> {
Ok(ModelIDMD {
space_id: <super::SpaceID as PersistObject>::meta_dec(scanner)?,
model_name_l: scanner.next_u64_le(),
model_version: scanner.next_u64_le(),
model_uuid: Uuid::from_bytes(scanner.next_chunk()),
})
}
fn obj_enc(buf: &mut Vec<u8>, data: Self::InputType) {
<super::SpaceID as PersistObject>::obj_enc(buf, data.space_id());
buf.extend(data.model_name().as_bytes());
}
unsafe fn obj_dec(
s: &mut BufferedScanner,
md: Self::Metadata,
) -> RuntimeResult<Self::OutputType> {
Ok(ModelIDRes {
space_id: <super::SpaceID as PersistObject>::obj_dec(s, md.space_id)?,
model_name: inf::dec::utils::decode_string(s, md.model_name_l as usize)?
.into_boxed_str(),
model_uuid: md.model_uuid,
model_version: md.model_version,
})
}
}
fn with_space<T>(
gns: &GlobalNS,
space_id: &super::SpaceIDRes,
f: impl FnOnce(&Space) -> RuntimeResult<T>,
) -> RuntimeResult<T> {
let spaces = gns.idx().read();
let Some(space) = spaces.st_get(&space_id.name) else {
return Err(TransactionError::OnRestoreDataMissing.into());
};
if space.get_uuid() != space_id.uuid {
return Err(TransactionError::OnRestoreDataConflictMismatch.into());
}
f(&space)
}
fn with_space_mut<T>(
gns: &GlobalNS,
space_id: &super::SpaceIDRes,
mut f: impl FnMut(&mut Space) -> RuntimeResult<T>,
) -> RuntimeResult<T> {
let mut spaces = gns.idx().write();
let Some(space) = spaces.st_get_mut(&space_id.name) else {
return Err(TransactionError::OnRestoreDataMissing.into());
};
if space.get_uuid() != space_id.uuid {
return Err(TransactionError::OnRestoreDataConflictMismatch.into());
}
f(space)
}
fn with_model_mut<T>(
gns: &GlobalNS,
space_id: &super::SpaceIDRes,
model_id: &ModelIDRes,
f: impl FnOnce(&mut Model) -> RuntimeResult<T>,
) -> RuntimeResult<T> {
with_space(gns, space_id, |_| {
let mut models = gns.idx_models().write();
let Some(model) = models.get_mut(&EntityIDRef::new(&space_id.name, &model_id.model_name))
else {
return Err(TransactionError::OnRestoreDataMissing.into());
};
if model.get_uuid() != model_id.model_uuid {
// this should have been handled by an earlier transaction
return Err(TransactionError::OnRestoreDataConflictMismatch.into());
}
f(model)
})
}
/*
create model
*/
#[derive(Debug)]
#[cfg_attr(test, derive(PartialEq))]
pub struct CreateModelTxnRestorePL {
pub(super) space_id: super::SpaceIDRes,
pub(super) model_name: Box<str>,
pub(super) model: Model,
}
pub struct CreateModelTxnMD {
space_id_meta: super::SpaceIDMD,
model_name_l: u64,
model_meta: <obj::ModelLayoutRef<'static> as PersistObject>::Metadata,
}
impl<'a> PersistObject for CreateModelTxn<'a> {
const METADATA_SIZE: usize = <super::SpaceID as PersistObject>::METADATA_SIZE
+ sizeof!(u64)
+ <obj::ModelLayoutRef<'a> as PersistObject>::METADATA_SIZE;
type InputType = CreateModelTxn<'a>;
type OutputType = CreateModelTxnRestorePL;
type Metadata = CreateModelTxnMD;
fn pretest_can_dec_object(scanner: &BufferedScanner, md: &Self::Metadata) -> bool {
scanner.has_left((md.model_meta.p_key_len() + md.model_name_l) as usize)
}
fn meta_enc(buf: &mut Vec<u8>, data: Self::InputType) {
// space ID
<super::SpaceID as PersistObject>::meta_enc(buf, data.space_id());
// model name
buf.extend(data.model_name().len().u64_bytes_le());
// model meta dump
<obj::ModelLayoutRef as PersistObject>::meta_enc(
buf,
obj::ModelLayoutRef::from(data.model()),
)
}
unsafe fn meta_dec(scanner: &mut BufferedScanner) -> RuntimeResult<Self::Metadata> {
let space_id = <super::SpaceID as PersistObject>::meta_dec(scanner)?;
let model_name_l = scanner.next_u64_le();
let model_meta = <obj::ModelLayoutRef as PersistObject>::meta_dec(scanner)?;
Ok(CreateModelTxnMD {
space_id_meta: space_id,
model_name_l,
model_meta,
})
}
fn obj_enc(buf: &mut Vec<u8>, data: Self::InputType) {
// space id dump
<super::SpaceID as PersistObject>::obj_enc(buf, data.space_id());
// model name
buf.extend(data.model_name().as_bytes());
// model dump
<obj::ModelLayoutRef as PersistObject>::obj_enc(
buf,
obj::ModelLayoutRef::from(data.model()),
)
}
unsafe fn obj_dec(
s: &mut BufferedScanner,
md: Self::Metadata,
) -> RuntimeResult<Self::OutputType> {
let space_id = <super::SpaceID as PersistObject>::obj_dec(s, md.space_id_meta)?;
let model_name =
inf::dec::utils::decode_string(s, md.model_name_l as usize)?.into_boxed_str();
let model = <obj::ModelLayoutRef as PersistObject>::obj_dec(s, md.model_meta)?;
Ok(CreateModelTxnRestorePL {
space_id,
model_name,
model,
})
}
}
impl<'a> GNSEvent for CreateModelTxn<'a> {
const OPC: u16 = 3;
type CommitType = CreateModelTxn<'a>;
type RestoreType = CreateModelTxnRestorePL;
fn update_global_state(
CreateModelTxnRestorePL {
space_id,
model_name,
model,
}: Self::RestoreType,
gns: &GlobalNS,
) -> RuntimeResult<()> {
/*
NOTE(@ohsayan):
A jump to the second branch is practically impossible and should be caught long before we actually end up
here (due to mismatched checksums), but might be theoretically possible because the cosmic rays can be wild
(or well magnetic stuff arounding spinning disks). But we just want to be extra sure. Don't let the aliens (or
rather, radiation) from the cosmos deter us!
*/
let mut spaces = gns.idx().write();
let mut models = gns.idx_models().write();
let Some(space) = spaces.get_mut(&space_id.name) else {
return Err(TransactionError::OnRestoreDataMissing.into());
};
if space.models().contains(&model_name) {
return Err(TransactionError::OnRestoreDataConflictAlreadyExists.into());
}
if models
.insert(EntityID::new(&space_id.name, &model_name), model)
.is_some()
{
return Err(TransactionError::OnRestoreDataConflictMismatch.into());
}
space.models_mut().insert(model_name);
Ok(())
}
}
/*
alter model add
*/
pub struct AlterModelAddTxnMD {
model_id_meta: ModelIDMD,
new_field_c: u64,
}
#[derive(Debug)]
#[cfg_attr(test, derive(PartialEq))]
pub struct AlterModelAddTxnRestorePL {
pub(super) model_id: ModelIDRes,
pub(super) new_fields: IndexSTSeqCns<Box<str>, Field>,
}
impl<'a> PersistObject for AlterModelAddTxn<'a> {
const METADATA_SIZE: usize = <ModelID as PersistObject>::METADATA_SIZE + sizeof!(u64);
type InputType = AlterModelAddTxn<'a>;
type OutputType = AlterModelAddTxnRestorePL;
type Metadata = AlterModelAddTxnMD;
fn pretest_can_dec_object(scanner: &BufferedScanner, md: &Self::Metadata) -> bool {
scanner.has_left(
(md.model_id_meta.space_id.space_name_l + md.model_id_meta.model_name_l) as usize,
)
}
fn meta_enc(buf: &mut Vec<u8>, data: Self::InputType) {
<ModelID as PersistObject>::meta_enc(buf, data.model_id());
buf.extend(data.new_fields().st_len().u64_bytes_le());
}
unsafe fn meta_dec(scanner: &mut BufferedScanner) -> RuntimeResult<Self::Metadata> {
let model_id_meta = <ModelID as PersistObject>::meta_dec(scanner)?;
let new_field_c = scanner.next_u64_le();
Ok(AlterModelAddTxnMD {
model_id_meta,
new_field_c,
})
}
fn obj_enc(buf: &mut Vec<u8>, data: Self::InputType) {
<ModelID as PersistObject>::obj_enc(buf, data.model_id());
<map::PersistMapImpl<map::FieldMapSpec<_>> as PersistObject>::obj_enc(
buf,
data.new_fields(),
);
}
unsafe fn obj_dec(
s: &mut BufferedScanner,
md: Self::Metadata,
) -> RuntimeResult<Self::OutputType> {
let model_id = <ModelID as PersistObject>::obj_dec(s, md.model_id_meta)?;
let new_fields = <map::PersistMapImpl<map::FieldMapSpec<IndexSTSeqCns<Box<str>, _>>> as PersistObject>::obj_dec(
s,
map::MapIndexSizeMD(md.new_field_c as usize),
)?;
Ok(AlterModelAddTxnRestorePL {
model_id,
new_fields,
})
}
}
impl<'a> GNSEvent for AlterModelAddTxn<'a> {
const OPC: u16 = 4;
type CommitType = AlterModelAddTxn<'a>;
type RestoreType = AlterModelAddTxnRestorePL;
fn update_global_state(
AlterModelAddTxnRestorePL {
model_id,
new_fields,
}: Self::RestoreType,
gns: &GlobalNS,
) -> RuntimeResult<()> {
with_model_mut(gns, &model_id.space_id, &model_id, |model| {
let mut mutator = model.model_mutator();
for (field_name, field) in new_fields.stseq_owned_kv() {
if !mutator.add_field(field_name, field) {
return Err(TransactionError::OnRestoreDataConflictMismatch.into());
}
}
Ok(())
})
}
}
/*
alter model remove
*/
pub struct AlterModelRemoveTxnMD {
model_id_meta: ModelIDMD,
remove_field_c: u64,
}
#[derive(Debug, PartialEq)]
pub struct AlterModelRemoveTxnRestorePL {
pub(super) model_id: ModelIDRes,
pub(super) removed_fields: Box<[Box<str>]>,
}
impl<'a> PersistObject for AlterModelRemoveTxn<'a> {
const METADATA_SIZE: usize = <ModelID as PersistObject>::METADATA_SIZE + sizeof!(u64);
type InputType = AlterModelRemoveTxn<'a>;
type OutputType = AlterModelRemoveTxnRestorePL;
type Metadata = AlterModelRemoveTxnMD;
fn pretest_can_dec_object(scanner: &BufferedScanner, md: &Self::Metadata) -> bool {
scanner.has_left(
(md.model_id_meta.space_id.space_name_l + md.model_id_meta.model_name_l) as usize,
)
}
fn meta_enc(buf: &mut Vec<u8>, data: Self::InputType) {
<ModelID as PersistObject>::meta_enc(buf, data.model_id());
buf.extend(data.removed_fields().len().u64_bytes_le());
}
unsafe fn meta_dec(scanner: &mut BufferedScanner) -> RuntimeResult<Self::Metadata> {
let model_id_meta = <ModelID as PersistObject>::meta_dec(scanner)?;
Ok(AlterModelRemoveTxnMD {
model_id_meta,
remove_field_c: scanner.next_u64_le(),
})
}
fn obj_enc(buf: &mut Vec<u8>, data: Self::InputType) {
<ModelID as PersistObject>::obj_enc(buf, data.model_id());
for field in data.removed_fields() {
buf.extend(field.len().u64_bytes_le());
buf.extend(field.as_bytes());
}
}
unsafe fn obj_dec(
s: &mut BufferedScanner,
md: Self::Metadata,
) -> RuntimeResult<Self::OutputType> {
let model_id = <ModelID as PersistObject>::obj_dec(s, md.model_id_meta)?;
let mut removed_fields = Vec::with_capacity(md.remove_field_c as usize);
while !s.eof()
& (removed_fields.len() as u64 != md.remove_field_c)
& s.has_left(sizeof!(u64))
{
let len = s.next_u64_le() as usize;
if !s.has_left(len) {
break;
}
removed_fields.push(inf::dec::utils::decode_string(s, len)?.into_boxed_str());
}
if removed_fields.len() as u64 != md.remove_field_c {
return Err(StorageError::InternalDecodeStructureCorruptedPayload.into());
}
Ok(AlterModelRemoveTxnRestorePL {
model_id,
removed_fields: removed_fields.into_boxed_slice(),
})
}
}
impl<'a> GNSEvent for AlterModelRemoveTxn<'a> {
const OPC: u16 = 5;
type CommitType = AlterModelRemoveTxn<'a>;
type RestoreType = AlterModelRemoveTxnRestorePL;
fn update_global_state(
AlterModelRemoveTxnRestorePL {
model_id,
removed_fields,
}: Self::RestoreType,
gns: &GlobalNS,
) -> RuntimeResult<()> {
with_model_mut(gns, &model_id.space_id, &model_id, |model| {
let mut mutator = model.model_mutator();
for removed_field in removed_fields.iter() {
if !mutator.remove_field(&removed_field) {
return Err(TransactionError::OnRestoreDataConflictMismatch.into());
}
}
Ok(())
})
}
}
/*
alter model update
*/
pub struct AlterModelUpdateTxnMD {
model_id_md: ModelIDMD,
updated_field_c: u64,
}
#[derive(Debug, PartialEq)]
pub struct AlterModelUpdateTxnRestorePL {
pub(super) model_id: ModelIDRes,
pub(super) updated_fields: IndexSTSeqCns<Box<str>, Field>,
}
impl<'a> PersistObject for AlterModelUpdateTxn<'a> {
const METADATA_SIZE: usize = <ModelID as PersistObject>::METADATA_SIZE + sizeof!(u64);
type InputType = AlterModelUpdateTxn<'a>;
type OutputType = AlterModelUpdateTxnRestorePL;
type Metadata = AlterModelUpdateTxnMD;
fn pretest_can_dec_object(scanner: &BufferedScanner, md: &Self::Metadata) -> bool {
scanner.has_left(
md.model_id_md.space_id.space_name_l as usize + md.model_id_md.model_name_l as usize,
)
}
fn meta_enc(buf: &mut Vec<u8>, data: Self::InputType) {
<ModelID as PersistObject>::meta_enc(buf, data.model_id());
buf.extend(data.updated_fields().st_len().u64_bytes_le());
}
unsafe fn meta_dec(scanner: &mut BufferedScanner) -> RuntimeResult<Self::Metadata> {
let model_id_md = <ModelID as PersistObject>::meta_dec(scanner)?;
Ok(AlterModelUpdateTxnMD {
model_id_md,
updated_field_c: scanner.next_u64_le(),
})
}
fn obj_enc(buf: &mut Vec<u8>, data: Self::InputType) {
<ModelID as PersistObject>::obj_enc(buf, data.model_id());
<map::PersistMapImpl<map::FieldMapSpec<_>> as PersistObject>::obj_enc(
buf,
data.updated_fields(),
);
}
unsafe fn obj_dec(
s: &mut BufferedScanner,
md: Self::Metadata,
) -> RuntimeResult<Self::OutputType> {
let model_id = <ModelID as PersistObject>::obj_dec(s, md.model_id_md)?;
let updated_fields =
<map::PersistMapImpl<map::FieldMapSpec<IndexSTSeqCns<Box<str>, _>>> as PersistObject>::obj_dec(
s,
map::MapIndexSizeMD(md.updated_field_c as usize),
)?;
Ok(AlterModelUpdateTxnRestorePL {
model_id,
updated_fields,
})
}
}
impl<'a> GNSEvent for AlterModelUpdateTxn<'a> {
const OPC: u16 = 6;
type CommitType = AlterModelUpdateTxn<'a>;
type RestoreType = AlterModelUpdateTxnRestorePL;
fn update_global_state(
AlterModelUpdateTxnRestorePL {
model_id,
updated_fields,
}: Self::RestoreType,
gns: &GlobalNS,
) -> RuntimeResult<()> {
with_model_mut(gns, &model_id.space_id, &model_id, |model| {
let mut mutator = model.model_mutator();
for (field_id, field) in updated_fields.stseq_owned_kv() {
if !mutator.update_field(&field_id, field) {
return Err(TransactionError::OnRestoreDataConflictMismatch.into());
}
}
Ok(())
})
}
}
/*
drop model
*/
pub struct DropModelTxnMD {
model_id_md: ModelIDMD,
}
impl<'a> PersistObject for DropModelTxn<'a> {
const METADATA_SIZE: usize = <ModelID as PersistObject>::METADATA_SIZE;
type InputType = DropModelTxn<'a>;
type OutputType = ModelIDRes;
type Metadata = DropModelTxnMD;
fn pretest_can_dec_object(scanner: &BufferedScanner, md: &Self::Metadata) -> bool {
scanner.has_left(
md.model_id_md.space_id.space_name_l as usize + md.model_id_md.model_name_l as usize,
)
}
fn meta_enc(buf: &mut Vec<u8>, data: Self::InputType) {
<ModelID as PersistObject>::meta_enc(buf, data.model_id());
}
unsafe fn meta_dec(scanner: &mut BufferedScanner) -> RuntimeResult<Self::Metadata> {
let model_id_md = <ModelID as PersistObject>::meta_dec(scanner)?;
Ok(DropModelTxnMD { model_id_md })
}
fn obj_enc(buf: &mut Vec<u8>, data: Self::InputType) {
<ModelID as PersistObject>::obj_enc(buf, data.model_id());
}
unsafe fn obj_dec(
s: &mut BufferedScanner,
md: Self::Metadata,
) -> RuntimeResult<Self::OutputType> {
<ModelID as PersistObject>::obj_dec(s, md.model_id_md)
}
}
impl<'a> GNSEvent for DropModelTxn<'a> {
const OPC: u16 = 7;
type CommitType = DropModelTxn<'a>;
type RestoreType = ModelIDRes;
fn update_global_state(
ModelIDRes {
space_id,
model_name,
model_uuid,
model_version: _,
}: Self::RestoreType,
gns: &GlobalNS,
) -> RuntimeResult<()> {
with_space_mut(gns, &space_id, |space| {
let mut models = gns.idx_models().write();
if !space.models_mut().remove(&model_name) {
return Err(TransactionError::OnRestoreDataMissing.into());
}
let Some(removed_model) = models.remove(&EntityIDRef::new(&space_id.name, &model_name))
else {
return Err(TransactionError::OnRestoreDataMissing.into());
};
if removed_model.get_uuid() != model_uuid {
return Err(TransactionError::OnRestoreDataConflictMismatch.into());
}
Ok(())
})
}
}

@ -0,0 +1,261 @@
/*
* Created on Wed Aug 23 2023
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2023, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use {
super::GNSEvent,
crate::{
engine::{
core::{space::Space, EntityIDRef, GlobalNS},
data::DictGeneric,
error::{RuntimeResult, TransactionError},
idx::STIndex,
mem::BufferedScanner,
storage::v1::inf::{self, map, obj, PersistObject},
txn::gns::space::{AlterSpaceTxn, CreateSpaceTxn, DropSpaceTxn},
},
util::EndianQW,
},
};
/*
create space
*/
#[derive(Debug)]
#[cfg_attr(test, derive(PartialEq))]
pub struct CreateSpaceTxnRestorePL {
pub(super) space_name: Box<str>,
pub(super) space: Space,
}
pub struct CreateSpaceTxnMD {
pub(super) space_name_l: u64,
pub(super) space_meta: <obj::SpaceLayoutRef<'static> as PersistObject>::Metadata,
}
impl<'a> PersistObject for CreateSpaceTxn<'a> {
const METADATA_SIZE: usize =
<obj::SpaceLayoutRef<'static> as PersistObject>::METADATA_SIZE + sizeof!(u64);
type InputType = CreateSpaceTxn<'a>;
type OutputType = CreateSpaceTxnRestorePL;
type Metadata = CreateSpaceTxnMD;
fn pretest_can_dec_object(scanner: &BufferedScanner, md: &Self::Metadata) -> bool {
scanner.has_left(md.space_name_l as usize)
}
fn meta_enc(buf: &mut Vec<u8>, data: Self::InputType) {
buf.extend(data.space_name().len().u64_bytes_le());
<obj::SpaceLayoutRef as PersistObject>::meta_enc(
buf,
obj::SpaceLayoutRef::from((data.space(), data.space_meta())),
);
}
unsafe fn meta_dec(scanner: &mut BufferedScanner) -> RuntimeResult<Self::Metadata> {
let space_name_l = scanner.next_u64_le();
let space_meta = <obj::SpaceLayoutRef as PersistObject>::meta_dec(scanner)?;
Ok(CreateSpaceTxnMD {
space_name_l,
space_meta,
})
}
fn obj_enc(buf: &mut Vec<u8>, data: Self::InputType) {
buf.extend(data.space_name().as_bytes());
<obj::SpaceLayoutRef as PersistObject>::obj_enc(
buf,
(data.space(), data.space_meta()).into(),
);
}
unsafe fn obj_dec(
s: &mut BufferedScanner,
md: Self::Metadata,
) -> RuntimeResult<Self::OutputType> {
let space_name =
inf::dec::utils::decode_string(s, md.space_name_l as usize)?.into_boxed_str();
let space = <obj::SpaceLayoutRef as PersistObject>::obj_dec(s, md.space_meta)?;
Ok(CreateSpaceTxnRestorePL { space_name, space })
}
}
impl<'a> GNSEvent for CreateSpaceTxn<'a> {
const OPC: u16 = 0;
type CommitType = CreateSpaceTxn<'a>;
type RestoreType = CreateSpaceTxnRestorePL;
fn update_global_state(
CreateSpaceTxnRestorePL { space_name, space }: CreateSpaceTxnRestorePL,
gns: &crate::engine::core::GlobalNS,
) -> RuntimeResult<()> {
let mut spaces = gns.idx().write();
if spaces.st_insert(space_name, space.into()) {
Ok(())
} else {
Err(TransactionError::OnRestoreDataConflictAlreadyExists.into())
}
}
}
/*
alter space
---
for now dump the entire meta
*/
pub struct AlterSpaceTxnMD {
space_id_meta: super::SpaceIDMD,
dict_len: u64,
}
#[derive(Debug, PartialEq)]
pub struct AlterSpaceTxnRestorePL {
pub(super) space_id: super::SpaceIDRes,
pub(super) space_meta: DictGeneric,
}
impl<'a> PersistObject for AlterSpaceTxn<'a> {
const METADATA_SIZE: usize = sizeof!(u64, 2) + sizeof!(u128);
type InputType = AlterSpaceTxn<'a>;
type OutputType = AlterSpaceTxnRestorePL;
type Metadata = AlterSpaceTxnMD;
fn pretest_can_dec_object(scanner: &BufferedScanner, md: &Self::Metadata) -> bool {
scanner.has_left(md.space_id_meta.space_name_l as usize)
}
fn meta_enc(buf: &mut Vec<u8>, data: Self::InputType) {
<super::SpaceID as PersistObject>::meta_enc(buf, data.space_id());
buf.extend(data.updated_props().len().u64_bytes_le());
}
unsafe fn meta_dec(scanner: &mut BufferedScanner) -> RuntimeResult<Self::Metadata> {
Ok(AlterSpaceTxnMD {
space_id_meta: <super::SpaceID as PersistObject>::meta_dec(scanner)?,
dict_len: scanner.next_u64_le(),
})
}
fn obj_enc(buf: &mut Vec<u8>, data: Self::InputType) {
<super::SpaceID as PersistObject>::obj_enc(buf, data.space_id());
<map::PersistMapImpl<map::GenericDictSpec> as PersistObject>::obj_enc(
buf,
data.updated_props(),
);
}
unsafe fn obj_dec(
s: &mut BufferedScanner,
md: Self::Metadata,
) -> RuntimeResult<Self::OutputType> {
let space_id = <super::SpaceID as PersistObject>::obj_dec(s, md.space_id_meta)?;
let space_meta = <map::PersistMapImpl<map::GenericDictSpec> as PersistObject>::obj_dec(
s,
map::MapIndexSizeMD(md.dict_len as usize),
)?;
Ok(AlterSpaceTxnRestorePL {
space_id,
space_meta,
})
}
}
impl<'a> GNSEvent for AlterSpaceTxn<'a> {
const OPC: u16 = 1;
type CommitType = AlterSpaceTxn<'a>;
type RestoreType = AlterSpaceTxnRestorePL;
fn update_global_state(
AlterSpaceTxnRestorePL {
space_id,
space_meta,
}: Self::RestoreType,
gns: &crate::engine::core::GlobalNS,
) -> RuntimeResult<()> {
let mut gns = gns.idx().write();
match gns.st_get_mut(&space_id.name) {
Some(space) => {
if !crate::engine::data::dict::rmerge_metadata(space.props_mut(), space_meta) {
return Err(TransactionError::OnRestoreDataConflictMismatch.into());
}
}
None => return Err(TransactionError::OnRestoreDataMissing.into()),
}
Ok(())
}
}
/*
drop space
*/
impl<'a> PersistObject for DropSpaceTxn<'a> {
const METADATA_SIZE: usize = sizeof!(u128) + sizeof!(u64);
type InputType = DropSpaceTxn<'a>;
type OutputType = super::SpaceIDRes;
type Metadata = super::SpaceIDMD;
fn pretest_can_dec_object(scanner: &BufferedScanner, md: &Self::Metadata) -> bool {
scanner.has_left(md.space_name_l as usize)
}
fn meta_enc(buf: &mut Vec<u8>, data: Self::InputType) {
<super::SpaceID as PersistObject>::meta_enc(buf, data.space_id());
}
unsafe fn meta_dec(scanner: &mut BufferedScanner) -> RuntimeResult<Self::Metadata> {
<super::SpaceID as PersistObject>::meta_dec(scanner)
}
fn obj_enc(buf: &mut Vec<u8>, data: Self::InputType) {
<super::SpaceID as PersistObject>::obj_enc(buf, data.space_id())
}
unsafe fn obj_dec(
s: &mut BufferedScanner,
md: Self::Metadata,
) -> RuntimeResult<Self::OutputType> {
<super::SpaceID as PersistObject>::obj_dec(s, md)
}
}
impl<'a> GNSEvent for DropSpaceTxn<'a> {
const OPC: u16 = 2;
type CommitType = DropSpaceTxn<'a>;
type RestoreType = super::SpaceIDRes;
fn update_global_state(
super::SpaceIDRes { uuid, name }: Self::RestoreType,
gns: &GlobalNS,
) -> RuntimeResult<()> {
let mut wgns = gns.idx().write();
let mut wmodel = gns.idx_models().write();
match wgns.entry(name) {
std::collections::hash_map::Entry::Occupied(oe) => {
if oe.get().get_uuid() == uuid {
for model in oe.get().models() {
let id: EntityIDRef<'static> = unsafe {
// UNSAFE(@ohsayan): I really need a pack of what the borrow checker has been reveling on
core::mem::transmute(EntityIDRef::new(oe.key(), &model))
};
let _ = wmodel.st_delete(&id);
}
oe.remove_entry();
Ok(())
} else {
return Err(TransactionError::OnRestoreDataConflictMismatch.into());
}
}
std::collections::hash_map::Entry::Vacant(_) => {
return Err(TransactionError::OnRestoreDataMissing.into())
}
}
}
}

@ -26,23 +26,24 @@
use {
super::super::{
model::{self, ModelIDRef, ModelIDRes},
model::{self, ModelIDRes},
space, SpaceIDRef, SpaceIDRes,
},
crate::engine::{
core::{model::Model, space::Space},
storage::v1::inf::{dec, enc},
txn::ModelIDRef,
},
};
mod space_tests {
use super::{
dec, enc,
space::{
AlterSpaceTxn, AlterSpaceTxnRestorePL, CreateSpaceTxn, CreateSpaceTxnRestorePL,
DropSpaceTxn,
use {
super::{
dec, enc,
space::{AlterSpaceTxnRestorePL, CreateSpaceTxnRestorePL},
Space, SpaceIDRef,
},
Space, SpaceIDRef,
crate::engine::txn::gns::space::{AlterSpaceTxn, CreateSpaceTxn, DropSpaceTxn},
};
#[test]
fn create() {
@ -91,15 +92,18 @@ mod model_tests {
use {
super::{
model::{
AlterModelAddTxn, AlterModelAddTxnRestorePL, AlterModelRemoveTxn,
AlterModelRemoveTxnRestorePL, AlterModelUpdateTxn, AlterModelUpdateTxnRestorePL,
CreateModelTxn, CreateModelTxnRestorePL, DropModelTxn,
AlterModelAddTxnRestorePL, AlterModelRemoveTxnRestorePL,
AlterModelUpdateTxnRestorePL, CreateModelTxnRestorePL,
},
Model, Space,
},
crate::engine::{
core::model::{Field, Layer},
data::{tag::TagSelector, uuid::Uuid},
txn::gns::model::{
AlterModelAddTxn, AlterModelRemoveTxn, AlterModelUpdateTxn, CreateModelTxn,
DropModelTxn,
},
},
};
fn default_space_model() -> (Space, Model) {

@ -0,0 +1,27 @@
/*
* Created on Sat Feb 10 2024
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2024, Sayan Nandan <nandansayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
pub mod gns;

@ -27,7 +27,7 @@
#[cfg(test)]
use crate::engine::storage::{
common::interface::fs_traits::{FSInterface, FileOpen},
v1::JournalWriter,
v1::raw::journal::JournalWriter,
};
use crate::engine::{
core::{EntityIDRef, GlobalNS},
@ -37,9 +37,11 @@ use crate::engine::{
fractal::{FractalModelDriver, ModelDrivers, ModelUniqueID},
storage::{
common::interface::fs_imp::LocalFS,
v1::{batch_jrnl, journal, spec},
v1::{
impls::gns::{GNSAdapter, GNSTransactionDriverAnyFS},
raw::{batch_jrnl, journal, spec},
},
},
txn::gns::{GNSAdapter, GNSTransactionDriverAnyFS},
};
const GNS_FILE_PATH: &str = "gns.db-tlog";

@ -1,5 +1,5 @@
/*
* Created on Mon May 15 2023
* Created on Sat Feb 10 2024
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
@ -7,7 +7,7 @@
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2023, Sayan Nandan <ohsayan@outlook.com>
* Copyright (c) 2024, Sayan Nandan <nandansayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
@ -28,25 +28,12 @@
//!
//! Target tags: `0.8.0-beta`, `0.8.0-beta.2`, `0.8.0-beta.3`
// impls
mod batch_jrnl;
mod journal;
pub(in crate::engine) mod loader;
mod rw;
pub mod spec;
pub mod sysdb;
// hl
mod impls;
pub mod inf;
#[cfg(test)]
mod tests;
pub mod loader;
pub mod raw;
// re-exports
pub(self) use spec::Header;
pub use {
journal::{JournalAdapter, JournalWriter},
rw::SDSSFileIO,
pub use self::{
impls::gns::GNSTransactionDriverAnyFS, raw::batch_jrnl::create as create_batch_journal,
raw::batch_jrnl::DataBatchPersistDriver,
};
pub mod data_batch {
pub use super::batch_jrnl::{create, DataBatchPersistDriver};
}

@ -24,8 +24,6 @@
*
*/
use crate::engine::storage::v1::inf::obj::cell;
use {
super::{
MARKER_ACTUAL_BATCH_EVENT, MARKER_BATCH_CLOSED, MARKER_BATCH_REOPEN, MARKER_END_OF_BATCH,
@ -48,7 +46,10 @@ use {
idx::STIndexSeq,
storage::{
common::interface::fs_traits::FSInterface,
v1::rw::{SDSSFileIO, TrackedWriter},
v1::{
inf::obj::cell,
raw::rw::{SDSSFileIO, TrackedWriter},
},
},
},
util::EndianQW,

@ -24,11 +24,6 @@
*
*/
use crate::engine::storage::v1::inf::{
obj::cell::{self, StorageCellTypeID},
DataSource,
};
use {
super::{
MARKER_ACTUAL_BATCH_EVENT, MARKER_BATCH_CLOSED, MARKER_BATCH_REOPEN, MARKER_END_OF_BATCH,
@ -44,7 +39,13 @@ use {
idx::{MTIndex, STIndex, STIndexSeq},
storage::{
common::interface::fs_traits::FSInterface,
v1::rw::{SDSSFileIO, TrackedReader},
v1::{
inf::{
obj::cell::{self, StorageCellTypeID},
DataSource,
},
raw::rw::{SDSSFileIO, TrackedReader},
},
},
},
std::{

@ -43,8 +43,9 @@
#[cfg(test)]
use crate::engine::storage::common::interface::fs_traits::FileOpen;
use {
super::rw::SDSSFileIO,
super::{rw::SDSSFileIO, spec::Header},
crate::{
engine::{
error::{RuntimeResult, StorageError},
@ -215,7 +216,7 @@ pub struct JournalReader<TA, Fs: FSInterface> {
impl<TA: JournalAdapter, Fs: FSInterface> JournalReader<TA, Fs> {
pub fn new(log_file: SDSSFileIO<Fs>) -> RuntimeResult<Self> {
let log_size = log_file.file_length()? - super::Header::SIZE as u64;
let log_size = log_file.file_length()? - Header::SIZE as u64;
Ok(Self {
log_file,
evid: 0,

@ -0,0 +1,34 @@
/*
* Created on Mon May 15 2023
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2023, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
// impls
pub(super) mod batch_jrnl;
pub(super) mod journal;
pub(super) mod rw;
pub mod spec;
pub mod sysdb;
#[cfg(test)]
mod tests;

@ -29,10 +29,10 @@ use crate::engine::storage::common::{
versions::{self, DriverVersion, FileSpecifierVersion, ServerVersion},
};
pub(super) type Header = sdss::sdss_r1::HeaderV1<HeaderImplV1>;
pub type Header = sdss::sdss_r1::HeaderV1<HeaderImplV1>;
#[derive(Debug)]
pub(super) struct HeaderImplV1;
pub struct HeaderImplV1;
impl sdss::sdss_r1::HeaderV1Spec for HeaderImplV1 {
type FileClass = FileScope;
type FileSpecifier = FileSpecifier;
@ -82,7 +82,7 @@ pub enum FileSpecifier {
*/
#[cfg(test)]
pub(super) struct TestFile;
pub struct TestFile;
#[cfg(test)]
impl sdss::sdss_r1::SimpleFileSpecV1 for TestFile {
type HeaderSpec = HeaderImplV1;
@ -92,7 +92,7 @@ impl sdss::sdss_r1::SimpleFileSpecV1 for TestFile {
}
/// The file specification for the GNS transaction log (impl v1)
pub(super) struct GNSTransactionLogV1;
pub struct GNSTransactionLogV1;
impl sdss::sdss_r1::SimpleFileSpecV1 for GNSTransactionLogV1 {
type HeaderSpec = HeaderImplV1;
const FILE_CLASS: FileScope = FileScope::Journal;
@ -101,7 +101,7 @@ impl sdss::sdss_r1::SimpleFileSpecV1 for GNSTransactionLogV1 {
}
/// The file specification for a journal batch
pub(super) struct DataBatchJournalV1;
pub struct DataBatchJournalV1;
impl sdss::sdss_r1::SimpleFileSpecV1 for DataBatchJournalV1 {
type HeaderSpec = HeaderImplV1;
const FILE_CLASS: FileScope = FileScope::DataBatch;
@ -110,7 +110,7 @@ impl sdss::sdss_r1::SimpleFileSpecV1 for DataBatchJournalV1 {
}
/// The file specification for the system db
pub(super) struct SysDBV1;
pub struct SysDBV1;
impl sdss::sdss_r1::SimpleFileSpecV1 for SysDBV1 {
type HeaderSpec = HeaderImplV1;
const FILE_CLASS: FileScope = FileScope::FlatmapData;

@ -32,7 +32,10 @@ use {
fractal::sys_store::{SysAuth, SysAuthUser, SysConfig, SysHostData, SystemStore},
storage::{
common::interface::fs_traits::{FSInterface, FileOpen},
v1::{inf, spec, SDSSFileIO},
v1::{
inf,
raw::{rw::SDSSFileIO, spec},
},
},
},
parking_lot::RwLock,
@ -137,7 +140,7 @@ impl<Fs: FSInterface> SystemStore<Fs> {
),
);
// write
let buf = super::inf::enc::enc_dict_full::<super::inf::map::GenericDictSpec>(&map);
let buf = inf::enc::enc_dict_full::<inf::map::GenericDictSpec>(&map);
f.fsynced_write(&buf)
}
fn _sync_with(&self, target: &str, cow: &str, auth: &SysAuth) -> RuntimeResult<()> {

@ -38,13 +38,13 @@ use {
idx::MTIndex,
storage::{
common::interface::{fs_test::VirtualFS, fs_traits::FileOpen},
v1::{
v1::raw::{
batch_jrnl::{
DataBatchPersistDriver, DataBatchRestoreDriver, DecodedBatchEvent,
DecodedBatchEventKind, NormalBatch,
},
rw::SDSSFileIO,
spec,
spec::{self, Header},
},
},
},
@ -57,9 +57,7 @@ fn pkey(v: impl Into<Datacell>) -> PrimaryIndexKey {
PrimaryIndexKey::try_from_dc(v.into()).unwrap()
}
fn open_file(
fpath: &str,
) -> FileOpen<SDSSFileIO<VirtualFS>, (SDSSFileIO<VirtualFS>, super::super::Header)> {
fn open_file(fpath: &str) -> FileOpen<SDSSFileIO<VirtualFS>, (SDSSFileIO<VirtualFS>, Header)> {
SDSSFileIO::open_or_create_perm_rw::<spec::DataBatchJournalV1>(fpath).unwrap()
}

@ -26,7 +26,7 @@
use crate::engine::storage::{
common::interface::fs_traits::FileOpen,
v1::{rw::SDSSFileIO, spec},
v1::raw::{rw::SDSSFileIO, spec},
};
#[test]

@ -28,7 +28,7 @@ use {
crate::{
engine::{
error::{RuntimeResult, StorageError},
storage::v1::{
storage::v1::raw::{
journal::{self, JournalAdapter, JournalWriter},
spec,
},

@ -24,5 +24,4 @@
*
*/
pub mod journal;
pub mod spec;
pub(in crate::engine::storage) mod raw;

@ -48,22 +48,41 @@ use {
std::marker::PhantomData,
};
/*
Event log adapter
*/
/// A journal based on an [`EventLog`]
pub type EventLogJournal<E, Fs> = raw::RawJournalWriter<EventLog<E>, Fs>;
/// An [`EventLog`] is a standard, append-only, sequential journal with per-event and per-cycle integrity protection
pub struct EventLog<E: EventLogAdapter>(PhantomData<E>);
/// An adapter that provides the specification for an event log
pub trait EventLogAdapter {
/// the SDSS file spec
type SdssSpec: FileSpecV1;
/// global state
type GlobalState;
/// the event type
type Event<'a>;
/// the decoded event type
type DecodedEvent;
/// event metadata
type EventMeta: Copy;
/// the error type
type Error: Into<fractal::error::Error>;
/// the maximum value for the event discriminant
const EV_MAX: u8;
/// get metadata from the raw value
unsafe fn meta_from_raw(m: u64) -> Self::EventMeta;
/// get metadata from the event
fn event_md<'a>(event: &Self::Event<'a>) -> u64;
/// encode an event
fn encode<'a>(event: Self::Event<'a>) -> Box<[u8]>;
/// decode an event
fn decode(block: Vec<u8>, kind: Self::EventMeta) -> Result<Self::DecodedEvent, Self::Error>;
/// apply the event
fn apply_event(g: &Self::GlobalState, ev: Self::DecodedEvent) -> Result<(), Self::Error>;
}
@ -71,12 +90,18 @@ impl<E: EventLogAdapter> RawJournalAdapter for EventLog<E> {
const COMMIT_PREFERENCE: CommitPreference = CommitPreference::Direct;
type Spec = <E as EventLogAdapter>::SdssSpec;
type GlobalState = <E as EventLogAdapter>::GlobalState;
type Context<'a> = () where Self: 'a;
type Event<'a> = <E as EventLogAdapter>::Event<'a>;
type DecodedEvent = <E as EventLogAdapter>::DecodedEvent;
type EventMeta = <E as EventLogAdapter>::EventMeta;
fn initialize(_: &JournalInitializer) -> Self {
Self(PhantomData)
}
fn enter_context<'a, Fs: FSInterface>(
_: &'a mut raw::RawJournalWriter<Self, Fs>,
) -> Self::Context<'a> {
()
}
fn parse_event_meta(meta: u64) -> Option<Self::EventMeta> {
if meta > <E as EventLogAdapter>::EV_MAX as u64 {
return None;

@ -209,7 +209,7 @@ macro_rules! jtrace_reader {
*/
/// An adapter defining the low-level structure of a log file
pub trait RawJournalAdapter {
pub trait RawJournalAdapter: Sized {
/// event size buffer
const EVENT_SIZE_BUFFER: usize = 128;
/// Set to true if the journal writer should automatically flush the buffer and fsync after writing an event
@ -220,6 +220,10 @@ pub trait RawJournalAdapter {
type Spec: FileSpecV1;
/// the global state that is used by this journal
type GlobalState;
/// Writer context
type Context<'a>
where
Self: 'a;
/// a journal event
type Event<'a>;
/// the decoded event
@ -228,6 +232,10 @@ pub trait RawJournalAdapter {
type EventMeta: Copy;
/// initialize this adapter
fn initialize(j_: &JournalInitializer) -> Self;
/// get a write context
fn enter_context<'a, Fs: FSInterface>(
adapter: &'a mut RawJournalWriter<Self, Fs>,
) -> Self::Context<'a>;
/// parse event metadata
fn parse_event_meta(meta: u64) -> Option<Self::EventMeta>;
/// get event metadata as an [`u64`]
@ -479,6 +487,9 @@ impl<J: RawJournalAdapter, Fs: FSInterface> RawJournalWriter<J, Fs> {
}
Ok(me)
}
pub fn context(&mut self) -> J::Context<'_> {
J::enter_context(self)
}
/// Commit a new event to the journal
///
/// This will auto-flush the buffer and sync metadata as soon as the [`RawJournalAdapter::commit`] method returns,

@ -40,7 +40,7 @@ use {
},
sdss::sdss_r1::rw::TrackedReader,
},
v2::{
v2::raw::{
journal::raw::{JournalReaderTraceEvent, JournalWriterTraceEvent},
spec::SystemDatabaseV1,
},
@ -123,9 +123,15 @@ impl RawJournalAdapter for SimpleDBJournal {
type Event<'a> = DbEventEncoded<'a>;
type DecodedEvent = DbEventRestored;
type EventMeta = EventMeta;
type Context<'a> = () where Self: 'a;
fn initialize(_: &JournalInitializer) -> Self {
Self
}
fn enter_context<'a, Fs: FSInterface>(
_: &'a mut RawJournalWriter<Self, Fs>,
) -> Self::Context<'a> {
()
}
fn parse_event_meta(meta: u64) -> Option<Self::EventMeta> {
Some(match meta {
0 => EventMeta::NewKey,

@ -36,7 +36,7 @@ use {
mem::unsafe_apis,
storage::{
common::interface::fs_test::VirtualFS,
v2::{journal::raw::open_journal, spec::SystemDatabaseV1},
v2::raw::{journal::raw::open_journal, spec::SystemDatabaseV1},
},
RuntimeResult,
},

@ -0,0 +1,28 @@
/*
* Created on Sat Feb 10 2024
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2024, Sayan Nandan <nandansayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
pub mod journal;
pub mod spec;

@ -1,5 +1,5 @@
/*
* Created on Sun Aug 20 2023
* Created on Sat Feb 10 2024
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
@ -7,7 +7,7 @@
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2023, Sayan Nandan <ohsayan@outlook.com>
* Copyright (c) 2024, Sayan Nandan <nandansayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
@ -24,219 +24,5 @@
*
*/
use {
crate::{
engine::{
core::{space::Space, GlobalNS},
data::uuid::Uuid,
error::{RuntimeResult, TransactionError},
mem::BufferedScanner,
storage::{
safe_interfaces::{FSInterface, LocalFS},
v1::{
inf::{self, PersistObject},
JournalAdapter, JournalWriter,
},
},
},
util::EndianQW,
},
std::marker::PhantomData,
};
mod model;
mod space;
// test
#[cfg(test)]
mod tests;
// re-exports
pub use {
model::{
AlterModelAddTxn, AlterModelRemoveTxn, AlterModelUpdateTxn, CreateModelTxn, DropModelTxn,
ModelIDRef,
},
space::{AlterSpaceTxn, CreateSpaceTxn, DropSpaceTxn},
};
/// The GNS transaction driver is used to handle DDL transactions
pub struct GNSTransactionDriverAnyFS<Fs: FSInterface = LocalFS> {
journal: JournalWriter<Fs, GNSAdapter>,
}
impl<Fs: FSInterface> GNSTransactionDriverAnyFS<Fs> {
pub fn new(journal: JournalWriter<Fs, GNSAdapter>) -> Self {
Self { journal }
}
pub fn into_inner(self) -> JournalWriter<Fs, GNSAdapter> {
self.journal
}
pub fn __journal_mut(&mut self) -> &mut JournalWriter<Fs, GNSAdapter> {
&mut self.journal
}
/// Attempts to commit the given event into the journal, handling any possible recovery triggers and returning
/// errors (if any)
pub fn try_commit<GE: GNSEvent>(&mut self, gns_event: GE) -> RuntimeResult<()> {
let mut buf = vec![];
buf.extend(GE::OPC.to_le_bytes());
GE::encode_super_event(gns_event, &mut buf);
self.journal
.append_event_with_recovery_plugin(GNSSuperEvent(buf.into_boxed_slice()))?;
Ok(())
}
}
/*
journal implementor
*/
/// the journal adapter for DDL queries on the GNS
#[derive(Debug)]
pub struct GNSAdapter;
impl JournalAdapter for GNSAdapter {
const RECOVERY_PLUGIN: bool = true;
type JournalEvent = GNSSuperEvent;
type GlobalState = GlobalNS;
type Error = crate::engine::fractal::error::Error;
fn encode(GNSSuperEvent(b): Self::JournalEvent) -> Box<[u8]> {
b
}
fn decode_and_update_state(payload: &[u8], gs: &Self::GlobalState) -> RuntimeResult<()> {
if payload.len() < 2 {
return Err(TransactionError::DecodedUnexpectedEof.into());
}
macro_rules! dispatch {
($($item:ty),* $(,)?) => {
[$(<$item as GNSEvent>::decode_and_update_global_state),*, |_, _| Err(TransactionError::DecodeUnknownTxnOp.into())]
};
}
static DISPATCH: [fn(&mut BufferedScanner, &GlobalNS) -> RuntimeResult<()>; 9] = dispatch!(
CreateSpaceTxn,
AlterSpaceTxn,
DropSpaceTxn,
CreateModelTxn,
AlterModelAddTxn,
AlterModelRemoveTxn,
AlterModelUpdateTxn,
DropModelTxn
);
let mut scanner = BufferedScanner::new(&payload);
let opc = unsafe {
// UNSAFE(@ohsayan):
u16::from_le_bytes(scanner.next_chunk())
};
match DISPATCH[(opc as usize).min(DISPATCH.len())](&mut scanner, gs) {
Ok(()) if scanner.eof() => return Ok(()),
Ok(_) => Err(TransactionError::DecodeCorruptedPayloadMoreBytes.into()),
Err(e) => Err(e),
}
}
}
/*
Events
---
FIXME(@ohsayan): In the current impl, we unnecessarily use an intermediary buffer which we clearly don't need to (and also makes
pointless allocations). We need to fix this, but with a consistent API (and preferably not something like commit_*(...) unless
we have absolutely no other choice)
---
[OPC:2B][PAYLOAD]
*/
pub struct GNSSuperEvent(Box<[u8]>);
/// Definition for an event in the GNS (DDL queries)
pub trait GNSEvent
where
Self: PersistObject<InputType = Self, OutputType = Self::RestoreType> + Sized,
{
/// OPC for the event (unique)
const OPC: u16;
/// Expected type for a commit
type CommitType;
/// Expected type for a restore
type RestoreType;
/// Encodes the event into the given buffer
fn encode_super_event(commit: Self, buf: &mut Vec<u8>) {
inf::enc::enc_full_into_buffer::<Self>(buf, commit)
}
fn decode_and_update_global_state(
scanner: &mut BufferedScanner,
gns: &GlobalNS,
) -> RuntimeResult<()> {
Self::update_global_state(Self::decode(scanner)?, gns)
}
/// Attempts to decode the event using the given scanner
fn decode(scanner: &mut BufferedScanner) -> RuntimeResult<Self::RestoreType> {
inf::dec::dec_full_from_scanner::<Self>(scanner).map_err(|e| e.into())
}
/// Update the global state from the restored event
fn update_global_state(restore: Self::RestoreType, gns: &GlobalNS) -> RuntimeResult<()>;
}
#[derive(Debug, Clone, Copy)]
pub struct SpaceIDRef<'a> {
uuid: Uuid,
name: &'a str,
}
impl<'a> SpaceIDRef<'a> {
pub fn new(name: &'a str, space: &Space) -> Self {
Self {
uuid: space.get_uuid(),
name,
}
}
}
#[derive(Debug, PartialEq)]
pub struct SpaceIDRes {
uuid: Uuid,
name: Box<str>,
}
impl SpaceIDRes {
#[cfg(test)]
pub fn new(uuid: Uuid, name: Box<str>) -> Self {
Self { uuid, name }
}
}
struct SpaceID<'a>(PhantomData<SpaceIDRef<'a>>);
pub struct SpaceIDMD {
uuid: Uuid,
space_name_l: u64,
}
impl<'a> PersistObject for SpaceID<'a> {
const METADATA_SIZE: usize = sizeof!(u128) + sizeof!(u64);
type InputType = SpaceIDRef<'a>;
type OutputType = SpaceIDRes;
type Metadata = SpaceIDMD;
fn pretest_can_dec_object(scanner: &BufferedScanner, md: &Self::Metadata) -> bool {
scanner.has_left(md.space_name_l as usize)
}
fn meta_enc(buf: &mut Vec<u8>, data: Self::InputType) {
buf.extend(data.uuid.to_le_bytes());
buf.extend(data.name.len().u64_bytes_le());
}
unsafe fn meta_dec(scanner: &mut BufferedScanner) -> RuntimeResult<Self::Metadata> {
Ok(SpaceIDMD {
uuid: Uuid::from_bytes(scanner.next_chunk()),
space_name_l: scanner.next_u64_le(),
})
}
fn obj_enc(buf: &mut Vec<u8>, data: Self::InputType) {
buf.extend(data.name.as_bytes());
}
unsafe fn obj_dec(
s: &mut BufferedScanner,
md: Self::Metadata,
) -> RuntimeResult<Self::OutputType> {
let str = inf::dec::utils::decode_string(s, md.space_name_l as usize)?;
Ok(SpaceIDRes {
uuid: md.uuid,
name: str.into_boxed_str(),
})
}
}
pub mod space;
pub mod model;

@ -1,5 +1,5 @@
/*
* Created on Wed Aug 23 2023
* Created on Sat Feb 10 2024
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
@ -7,7 +7,7 @@
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2023, Sayan Nandan <ohsayan@outlook.com>
* Copyright (c) 2024, Sayan Nandan <nandansayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
@ -24,321 +24,40 @@
*
*/
use {
super::GNSEvent,
crate::{
engine::{
core::{
model::{Field, Model},
space::Space,
GlobalNS, {EntityID, EntityIDRef},
},
data::uuid::Uuid,
error::TransactionError,
error::{RuntimeResult, StorageError},
idx::{IndexST, IndexSTSeqCns, STIndex, STIndexSeq},
mem::BufferedScanner,
ql::lex::Ident,
storage::v1::inf::{self, map, obj, PersistObject},
},
util::EndianQW,
},
std::marker::PhantomData,
use crate::engine::{
core::model::{Field, Model},
idx::{IndexST, IndexSTSeqCns},
ql::lex::Ident,
txn::{ModelIDRef, SpaceIDRef},
};
pub struct ModelID<'a>(PhantomData<&'a ()>);
#[derive(Debug, Clone, Copy)]
pub struct ModelIDRef<'a> {
space_id: super::SpaceIDRef<'a>,
model_name: &'a str,
model_uuid: Uuid,
model_version: u64,
}
impl<'a> ModelIDRef<'a> {
pub fn new_ref(
space_name: &'a str,
space: &'a Space,
model_name: &'a str,
model: &'a Model,
) -> ModelIDRef<'a> {
ModelIDRef::new(
super::SpaceIDRef::new(space_name, space),
model_name,
model.get_uuid(),
model.delta_state().schema_current_version().value_u64(),
)
}
pub fn new(
space_id: super::SpaceIDRef<'a>,
model_name: &'a str,
model_uuid: Uuid,
model_version: u64,
) -> Self {
Self {
space_id,
model_name,
model_uuid,
model_version,
}
}
}
#[derive(Debug, PartialEq)]
pub struct ModelIDRes {
space_id: super::SpaceIDRes,
model_name: Box<str>,
model_uuid: Uuid,
model_version: u64,
}
impl ModelIDRes {
#[cfg(test)]
pub fn new(
space_id: super::SpaceIDRes,
model_name: Box<str>,
model_uuid: Uuid,
model_version: u64,
) -> Self {
Self {
space_id,
model_name,
model_uuid,
model_version,
}
}
}
pub struct ModelIDMD {
space_id: super::SpaceIDMD,
model_name_l: u64,
model_version: u64,
model_uuid: Uuid,
}
impl<'a> PersistObject for ModelID<'a> {
const METADATA_SIZE: usize =
sizeof!(u64, 2) + sizeof!(u128) + <super::SpaceID as PersistObject>::METADATA_SIZE;
type InputType = ModelIDRef<'a>;
type OutputType = ModelIDRes;
type Metadata = ModelIDMD;
fn pretest_can_dec_object(scanner: &BufferedScanner, md: &Self::Metadata) -> bool {
scanner.has_left(md.model_name_l as usize + md.space_id.space_name_l as usize)
}
fn meta_enc(buf: &mut Vec<u8>, data: Self::InputType) {
<super::SpaceID as PersistObject>::meta_enc(buf, data.space_id);
buf.extend(data.model_name.len().u64_bytes_le());
buf.extend(data.model_version.to_le_bytes());
buf.extend(data.model_uuid.to_le_bytes());
}
unsafe fn meta_dec(scanner: &mut BufferedScanner) -> RuntimeResult<Self::Metadata> {
Ok(ModelIDMD {
space_id: <super::SpaceID as PersistObject>::meta_dec(scanner)?,
model_name_l: scanner.next_u64_le(),
model_version: scanner.next_u64_le(),
model_uuid: Uuid::from_bytes(scanner.next_chunk()),
})
}
fn obj_enc(buf: &mut Vec<u8>, data: Self::InputType) {
<super::SpaceID as PersistObject>::obj_enc(buf, data.space_id);
buf.extend(data.model_name.as_bytes());
}
unsafe fn obj_dec(
s: &mut BufferedScanner,
md: Self::Metadata,
) -> RuntimeResult<Self::OutputType> {
Ok(ModelIDRes {
space_id: <super::SpaceID as PersistObject>::obj_dec(s, md.space_id)?,
model_name: inf::dec::utils::decode_string(s, md.model_name_l as usize)?
.into_boxed_str(),
model_uuid: md.model_uuid,
model_version: md.model_version,
})
}
}
fn with_space<T>(
gns: &GlobalNS,
space_id: &super::SpaceIDRes,
f: impl FnOnce(&Space) -> RuntimeResult<T>,
) -> RuntimeResult<T> {
let spaces = gns.idx().read();
let Some(space) = spaces.st_get(&space_id.name) else {
return Err(TransactionError::OnRestoreDataMissing.into());
};
if space.get_uuid() != space_id.uuid {
return Err(TransactionError::OnRestoreDataConflictMismatch.into());
}
f(&space)
}
fn with_space_mut<T>(
gns: &GlobalNS,
space_id: &super::SpaceIDRes,
mut f: impl FnMut(&mut Space) -> RuntimeResult<T>,
) -> RuntimeResult<T> {
let mut spaces = gns.idx().write();
let Some(space) = spaces.st_get_mut(&space_id.name) else {
return Err(TransactionError::OnRestoreDataMissing.into());
};
if space.get_uuid() != space_id.uuid {
return Err(TransactionError::OnRestoreDataConflictMismatch.into());
}
f(space)
}
fn with_model_mut<T>(
gns: &GlobalNS,
space_id: &super::SpaceIDRes,
model_id: &ModelIDRes,
f: impl FnOnce(&mut Model) -> RuntimeResult<T>,
) -> RuntimeResult<T> {
with_space(gns, space_id, |_| {
let mut models = gns.idx_models().write();
let Some(model) = models.get_mut(&EntityIDRef::new(&space_id.name, &model_id.model_name))
else {
return Err(TransactionError::OnRestoreDataMissing.into());
};
if model.get_uuid() != model_id.model_uuid {
// this should have been handled by an earlier transaction
return Err(TransactionError::OnRestoreDataConflictMismatch.into());
}
f(model)
})
}
/*
create model
*/
#[derive(Debug, Clone, Copy)]
/// The commit payload for a `create model ... (...) with {...}` txn
pub struct CreateModelTxn<'a> {
space_id: super::SpaceIDRef<'a>,
space_id: SpaceIDRef<'a>,
model_name: &'a str,
model: &'a Model,
}
impl<'a> CreateModelTxn<'a> {
pub const fn new(
space_id: super::SpaceIDRef<'a>,
model_name: &'a str,
model: &'a Model,
) -> Self {
pub const fn new(space_id: SpaceIDRef<'a>, model_name: &'a str, model: &'a Model) -> Self {
Self {
space_id,
model_name,
model,
}
}
}
#[derive(Debug)]
#[cfg_attr(test, derive(PartialEq))]
pub struct CreateModelTxnRestorePL {
pub(super) space_id: super::SpaceIDRes,
pub(super) model_name: Box<str>,
pub(super) model: Model,
}
pub struct CreateModelTxnMD {
space_id_meta: super::SpaceIDMD,
model_name_l: u64,
model_meta: <obj::ModelLayoutRef<'static> as PersistObject>::Metadata,
}
impl<'a> PersistObject for CreateModelTxn<'a> {
const METADATA_SIZE: usize = <super::SpaceID as PersistObject>::METADATA_SIZE
+ sizeof!(u64)
+ <obj::ModelLayoutRef<'a> as PersistObject>::METADATA_SIZE;
type InputType = CreateModelTxn<'a>;
type OutputType = CreateModelTxnRestorePL;
type Metadata = CreateModelTxnMD;
fn pretest_can_dec_object(scanner: &BufferedScanner, md: &Self::Metadata) -> bool {
scanner.has_left((md.model_meta.p_key_len() + md.model_name_l) as usize)
}
fn meta_enc(buf: &mut Vec<u8>, data: Self::InputType) {
// space ID
<super::SpaceID as PersistObject>::meta_enc(buf, data.space_id);
// model name
buf.extend(data.model_name.len().u64_bytes_le());
// model meta dump
<obj::ModelLayoutRef as PersistObject>::meta_enc(buf, obj::ModelLayoutRef::from(data.model))
pub fn space_id(&self) -> SpaceIDRef<'_> {
self.space_id
}
unsafe fn meta_dec(scanner: &mut BufferedScanner) -> RuntimeResult<Self::Metadata> {
let space_id = <super::SpaceID as PersistObject>::meta_dec(scanner)?;
let model_name_l = scanner.next_u64_le();
let model_meta = <obj::ModelLayoutRef as PersistObject>::meta_dec(scanner)?;
Ok(CreateModelTxnMD {
space_id_meta: space_id,
model_name_l,
model_meta,
})
}
fn obj_enc(buf: &mut Vec<u8>, data: Self::InputType) {
// space id dump
<super::SpaceID as PersistObject>::obj_enc(buf, data.space_id);
// model name
buf.extend(data.model_name.as_bytes());
// model dump
<obj::ModelLayoutRef as PersistObject>::obj_enc(buf, obj::ModelLayoutRef::from(data.model))
}
unsafe fn obj_dec(
s: &mut BufferedScanner,
md: Self::Metadata,
) -> RuntimeResult<Self::OutputType> {
let space_id = <super::SpaceID as PersistObject>::obj_dec(s, md.space_id_meta)?;
let model_name =
inf::dec::utils::decode_string(s, md.model_name_l as usize)?.into_boxed_str();
let model = <obj::ModelLayoutRef as PersistObject>::obj_dec(s, md.model_meta)?;
Ok(CreateModelTxnRestorePL {
space_id,
model_name,
model,
})
pub fn model_name(&self) -> &str {
self.model_name
}
}
impl<'a> GNSEvent for CreateModelTxn<'a> {
const OPC: u16 = 3;
type CommitType = CreateModelTxn<'a>;
type RestoreType = CreateModelTxnRestorePL;
fn update_global_state(
CreateModelTxnRestorePL {
space_id,
model_name,
model,
}: Self::RestoreType,
gns: &GlobalNS,
) -> RuntimeResult<()> {
/*
NOTE(@ohsayan):
A jump to the second branch is practically impossible and should be caught long before we actually end up
here (due to mismatched checksums), but might be theoretically possible because the cosmic rays can be wild
(or well magnetic stuff arounding spinning disks). But we just want to be extra sure. Don't let the aliens (or
rather, radiation) from the cosmos deter us!
*/
let mut spaces = gns.idx().write();
let mut models = gns.idx_models().write();
let Some(space) = spaces.get_mut(&space_id.name) else {
return Err(TransactionError::OnRestoreDataMissing.into());
};
if space.models().contains(&model_name) {
return Err(TransactionError::OnRestoreDataConflictAlreadyExists.into());
}
if models
.insert(EntityID::new(&space_id.name, &model_name), model)
.is_some()
{
return Err(TransactionError::OnRestoreDataConflictMismatch.into());
}
space.models_mut().insert(model_name);
Ok(())
pub fn model(&self) -> &Model {
self.model
}
}
/*
alter model add
*/
#[derive(Debug, Clone, Copy)]
/// Transaction commit payload for an `alter model add ...` query
pub struct AlterModelAddTxn<'a> {
@ -356,86 +75,14 @@ impl<'a> AlterModelAddTxn<'a> {
new_fields,
}
}
}
pub struct AlterModelAddTxnMD {
model_id_meta: ModelIDMD,
new_field_c: u64,
}
#[derive(Debug)]
#[cfg_attr(test, derive(PartialEq))]
pub struct AlterModelAddTxnRestorePL {
pub(super) model_id: ModelIDRes,
pub(super) new_fields: IndexSTSeqCns<Box<str>, Field>,
}
impl<'a> PersistObject for AlterModelAddTxn<'a> {
const METADATA_SIZE: usize = <ModelID as PersistObject>::METADATA_SIZE + sizeof!(u64);
type InputType = AlterModelAddTxn<'a>;
type OutputType = AlterModelAddTxnRestorePL;
type Metadata = AlterModelAddTxnMD;
fn pretest_can_dec_object(scanner: &BufferedScanner, md: &Self::Metadata) -> bool {
scanner.has_left(
(md.model_id_meta.space_id.space_name_l + md.model_id_meta.model_name_l) as usize,
)
}
fn meta_enc(buf: &mut Vec<u8>, data: Self::InputType) {
<ModelID as PersistObject>::meta_enc(buf, data.model_id);
buf.extend(data.new_fields.st_len().u64_bytes_le());
}
unsafe fn meta_dec(scanner: &mut BufferedScanner) -> RuntimeResult<Self::Metadata> {
let model_id_meta = <ModelID as PersistObject>::meta_dec(scanner)?;
let new_field_c = scanner.next_u64_le();
Ok(AlterModelAddTxnMD {
model_id_meta,
new_field_c,
})
pub fn model_id(&self) -> ModelIDRef<'_> {
self.model_id
}
fn obj_enc(buf: &mut Vec<u8>, data: Self::InputType) {
<ModelID as PersistObject>::obj_enc(buf, data.model_id);
<map::PersistMapImpl<map::FieldMapSpec<_>> as PersistObject>::obj_enc(buf, data.new_fields);
}
unsafe fn obj_dec(
s: &mut BufferedScanner,
md: Self::Metadata,
) -> RuntimeResult<Self::OutputType> {
let model_id = <ModelID as PersistObject>::obj_dec(s, md.model_id_meta)?;
let new_fields = <map::PersistMapImpl<map::FieldMapSpec<IndexSTSeqCns<Box<str>, _>>> as PersistObject>::obj_dec(
s,
map::MapIndexSizeMD(md.new_field_c as usize),
)?;
Ok(AlterModelAddTxnRestorePL {
model_id,
new_fields,
})
}
}
impl<'a> GNSEvent for AlterModelAddTxn<'a> {
const OPC: u16 = 4;
type CommitType = AlterModelAddTxn<'a>;
type RestoreType = AlterModelAddTxnRestorePL;
fn update_global_state(
AlterModelAddTxnRestorePL {
model_id,
new_fields,
}: Self::RestoreType,
gns: &GlobalNS,
) -> RuntimeResult<()> {
with_model_mut(gns, &model_id.space_id, &model_id, |model| {
let mut mutator = model.model_mutator();
for (field_name, field) in new_fields.stseq_owned_kv() {
if !mutator.add_field(field_name, field) {
return Err(TransactionError::OnRestoreDataConflictMismatch.into());
}
}
Ok(())
})
pub fn new_fields(&self) -> &IndexSTSeqCns<Box<str>, Field> {
self.new_fields
}
}
/*
alter model remove
*/
#[derive(Debug, Clone, Copy)]
/// Transaction commit payload for an `alter model remove` transaction
pub struct AlterModelRemoveTxn<'a> {
@ -449,98 +96,14 @@ impl<'a> AlterModelRemoveTxn<'a> {
removed_fields,
}
}
}
pub struct AlterModelRemoveTxnMD {
model_id_meta: ModelIDMD,
remove_field_c: u64,
}
#[derive(Debug, PartialEq)]
pub struct AlterModelRemoveTxnRestorePL {
pub(super) model_id: ModelIDRes,
pub(super) removed_fields: Box<[Box<str>]>,
}
impl<'a> PersistObject for AlterModelRemoveTxn<'a> {
const METADATA_SIZE: usize = <ModelID as PersistObject>::METADATA_SIZE + sizeof!(u64);
type InputType = AlterModelRemoveTxn<'a>;
type OutputType = AlterModelRemoveTxnRestorePL;
type Metadata = AlterModelRemoveTxnMD;
fn pretest_can_dec_object(scanner: &BufferedScanner, md: &Self::Metadata) -> bool {
scanner.has_left(
(md.model_id_meta.space_id.space_name_l + md.model_id_meta.model_name_l) as usize,
)
}
fn meta_enc(buf: &mut Vec<u8>, data: Self::InputType) {
<ModelID as PersistObject>::meta_enc(buf, data.model_id);
buf.extend(data.removed_fields.len().u64_bytes_le());
}
unsafe fn meta_dec(scanner: &mut BufferedScanner) -> RuntimeResult<Self::Metadata> {
let model_id_meta = <ModelID as PersistObject>::meta_dec(scanner)?;
Ok(AlterModelRemoveTxnMD {
model_id_meta,
remove_field_c: scanner.next_u64_le(),
})
}
fn obj_enc(buf: &mut Vec<u8>, data: Self::InputType) {
<ModelID as PersistObject>::obj_enc(buf, data.model_id);
for field in data.removed_fields {
buf.extend(field.len().u64_bytes_le());
buf.extend(field.as_bytes());
}
}
unsafe fn obj_dec(
s: &mut BufferedScanner,
md: Self::Metadata,
) -> RuntimeResult<Self::OutputType> {
let model_id = <ModelID as PersistObject>::obj_dec(s, md.model_id_meta)?;
let mut removed_fields = Vec::with_capacity(md.remove_field_c as usize);
while !s.eof()
& (removed_fields.len() as u64 != md.remove_field_c)
& s.has_left(sizeof!(u64))
{
let len = s.next_u64_le() as usize;
if !s.has_left(len) {
break;
}
removed_fields.push(inf::dec::utils::decode_string(s, len)?.into_boxed_str());
}
if removed_fields.len() as u64 != md.remove_field_c {
return Err(StorageError::InternalDecodeStructureCorruptedPayload.into());
}
Ok(AlterModelRemoveTxnRestorePL {
model_id,
removed_fields: removed_fields.into_boxed_slice(),
})
pub fn model_id(&self) -> ModelIDRef<'_> {
self.model_id
}
}
impl<'a> GNSEvent for AlterModelRemoveTxn<'a> {
const OPC: u16 = 5;
type CommitType = AlterModelRemoveTxn<'a>;
type RestoreType = AlterModelRemoveTxnRestorePL;
fn update_global_state(
AlterModelRemoveTxnRestorePL {
model_id,
removed_fields,
}: Self::RestoreType,
gns: &GlobalNS,
) -> RuntimeResult<()> {
with_model_mut(gns, &model_id.space_id, &model_id, |model| {
let mut mutator = model.model_mutator();
for removed_field in removed_fields.iter() {
if !mutator.remove_field(&removed_field) {
return Err(TransactionError::OnRestoreDataConflictMismatch.into());
}
}
Ok(())
})
pub fn removed_fields(&self) -> &[Ident<'_>] {
self.removed_fields
}
}
/*
alter model update
*/
#[derive(Debug, Clone, Copy)]
/// Transaction commit payload for an `alter model update ...` query
pub struct AlterModelUpdateTxn<'a> {
@ -558,89 +121,16 @@ impl<'a> AlterModelUpdateTxn<'a> {
updated_fields,
}
}
}
pub struct AlterModelUpdateTxnMD {
model_id_md: ModelIDMD,
updated_field_c: u64,
}
#[derive(Debug, PartialEq)]
pub struct AlterModelUpdateTxnRestorePL {
pub(super) model_id: ModelIDRes,
pub(super) updated_fields: IndexSTSeqCns<Box<str>, Field>,
}
impl<'a> PersistObject for AlterModelUpdateTxn<'a> {
const METADATA_SIZE: usize = <ModelID as PersistObject>::METADATA_SIZE + sizeof!(u64);
type InputType = AlterModelUpdateTxn<'a>;
type OutputType = AlterModelUpdateTxnRestorePL;
type Metadata = AlterModelUpdateTxnMD;
fn pretest_can_dec_object(scanner: &BufferedScanner, md: &Self::Metadata) -> bool {
scanner.has_left(
md.model_id_md.space_id.space_name_l as usize + md.model_id_md.model_name_l as usize,
)
}
fn meta_enc(buf: &mut Vec<u8>, data: Self::InputType) {
<ModelID as PersistObject>::meta_enc(buf, data.model_id);
buf.extend(data.updated_fields.st_len().u64_bytes_le());
}
unsafe fn meta_dec(scanner: &mut BufferedScanner) -> RuntimeResult<Self::Metadata> {
let model_id_md = <ModelID as PersistObject>::meta_dec(scanner)?;
Ok(AlterModelUpdateTxnMD {
model_id_md,
updated_field_c: scanner.next_u64_le(),
})
}
fn obj_enc(buf: &mut Vec<u8>, data: Self::InputType) {
<ModelID as PersistObject>::obj_enc(buf, data.model_id);
<map::PersistMapImpl<map::FieldMapSpec<_>> as PersistObject>::obj_enc(
buf,
data.updated_fields,
);
pub fn model_id(&self) -> ModelIDRef<'_> {
self.model_id
}
unsafe fn obj_dec(
s: &mut BufferedScanner,
md: Self::Metadata,
) -> RuntimeResult<Self::OutputType> {
let model_id = <ModelID as PersistObject>::obj_dec(s, md.model_id_md)?;
let updated_fields =
<map::PersistMapImpl<map::FieldMapSpec<IndexSTSeqCns<Box<str>, _>>> as PersistObject>::obj_dec(
s,
map::MapIndexSizeMD(md.updated_field_c as usize),
)?;
Ok(AlterModelUpdateTxnRestorePL {
model_id,
updated_fields,
})
}
}
impl<'a> GNSEvent for AlterModelUpdateTxn<'a> {
const OPC: u16 = 6;
type CommitType = AlterModelUpdateTxn<'a>;
type RestoreType = AlterModelUpdateTxnRestorePL;
fn update_global_state(
AlterModelUpdateTxnRestorePL {
model_id,
updated_fields,
}: Self::RestoreType,
gns: &GlobalNS,
) -> RuntimeResult<()> {
with_model_mut(gns, &model_id.space_id, &model_id, |model| {
let mut mutator = model.model_mutator();
for (field_id, field) in updated_fields.stseq_owned_kv() {
if !mutator.update_field(&field_id, field) {
return Err(TransactionError::OnRestoreDataConflictMismatch.into());
}
}
Ok(())
})
pub fn updated_fields(&self) -> &IndexST<Box<str>, Field> {
self.updated_fields
}
}
/*
drop model
*/
#[derive(Debug, Clone, Copy)]
/// Transaction commit payload for a `drop model ...` query
pub struct DropModelTxn<'a> {
@ -651,64 +141,7 @@ impl<'a> DropModelTxn<'a> {
pub const fn new(model_id: ModelIDRef<'a>) -> Self {
Self { model_id }
}
}
pub struct DropModelTxnMD {
model_id_md: ModelIDMD,
}
impl<'a> PersistObject for DropModelTxn<'a> {
const METADATA_SIZE: usize = <ModelID as PersistObject>::METADATA_SIZE;
type InputType = DropModelTxn<'a>;
type OutputType = ModelIDRes;
type Metadata = DropModelTxnMD;
fn pretest_can_dec_object(scanner: &BufferedScanner, md: &Self::Metadata) -> bool {
scanner.has_left(
md.model_id_md.space_id.space_name_l as usize + md.model_id_md.model_name_l as usize,
)
}
fn meta_enc(buf: &mut Vec<u8>, data: Self::InputType) {
<ModelID as PersistObject>::meta_enc(buf, data.model_id);
}
unsafe fn meta_dec(scanner: &mut BufferedScanner) -> RuntimeResult<Self::Metadata> {
let model_id_md = <ModelID as PersistObject>::meta_dec(scanner)?;
Ok(DropModelTxnMD { model_id_md })
}
fn obj_enc(buf: &mut Vec<u8>, data: Self::InputType) {
<ModelID as PersistObject>::obj_enc(buf, data.model_id);
}
unsafe fn obj_dec(
s: &mut BufferedScanner,
md: Self::Metadata,
) -> RuntimeResult<Self::OutputType> {
<ModelID as PersistObject>::obj_dec(s, md.model_id_md)
}
}
impl<'a> GNSEvent for DropModelTxn<'a> {
const OPC: u16 = 7;
type CommitType = DropModelTxn<'a>;
type RestoreType = ModelIDRes;
fn update_global_state(
ModelIDRes {
space_id,
model_name,
model_uuid,
model_version: _,
}: Self::RestoreType,
gns: &GlobalNS,
) -> RuntimeResult<()> {
with_space_mut(gns, &space_id, |space| {
let mut models = gns.idx_models().write();
if !space.models_mut().remove(&model_name) {
return Err(TransactionError::OnRestoreDataMissing.into());
}
let Some(removed_model) = models.remove(&EntityIDRef::new(&space_id.name, &model_name))
else {
return Err(TransactionError::OnRestoreDataMissing.into());
};
if removed_model.get_uuid() != model_uuid {
return Err(TransactionError::OnRestoreDataConflictMismatch.into());
}
Ok(())
})
pub fn model_id(&self) -> ModelIDRef<'_> {
self.model_id
}
}

@ -1,5 +1,5 @@
/*
* Created on Wed Aug 23 2023
* Created on Sat Feb 10 2024
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
@ -7,7 +7,7 @@
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2023, Sayan Nandan <ohsayan@outlook.com>
* Copyright (c) 2024, Sayan Nandan <nandansayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
@ -24,31 +24,14 @@
*
*/
use {
super::GNSEvent,
crate::{
engine::{
core::{space::Space, EntityIDRef, GlobalNS},
data::DictGeneric,
error::{RuntimeResult, TransactionError},
idx::STIndex,
mem::BufferedScanner,
storage::v1::inf::{self, map, obj, PersistObject},
},
util::EndianQW,
},
};
/*
create space
*/
use crate::engine::{core::space::Space, data::DictGeneric, txn::SpaceIDRef};
#[derive(Clone, Copy)]
/// Transaction commit payload for a `create space ...` query
pub struct CreateSpaceTxn<'a> {
pub(super) space_meta: &'a DictGeneric,
pub(super) space_name: &'a str,
pub(super) space: &'a Space,
space_meta: &'a DictGeneric,
space_name: &'a str,
space: &'a Space,
}
impl<'a> CreateSpaceTxn<'a> {
@ -59,245 +42,50 @@ impl<'a> CreateSpaceTxn<'a> {
space,
}
}
}
#[derive(Debug)]
#[cfg_attr(test, derive(PartialEq))]
pub struct CreateSpaceTxnRestorePL {
pub(super) space_name: Box<str>,
pub(super) space: Space,
}
pub struct CreateSpaceTxnMD {
pub(super) space_name_l: u64,
pub(super) space_meta: <obj::SpaceLayoutRef<'static> as PersistObject>::Metadata,
}
impl<'a> PersistObject for CreateSpaceTxn<'a> {
const METADATA_SIZE: usize =
<obj::SpaceLayoutRef<'static> as PersistObject>::METADATA_SIZE + sizeof!(u64);
type InputType = CreateSpaceTxn<'a>;
type OutputType = CreateSpaceTxnRestorePL;
type Metadata = CreateSpaceTxnMD;
fn pretest_can_dec_object(scanner: &BufferedScanner, md: &Self::Metadata) -> bool {
scanner.has_left(md.space_name_l as usize)
}
fn meta_enc(buf: &mut Vec<u8>, data: Self::InputType) {
buf.extend(data.space_name.len().u64_bytes_le());
<obj::SpaceLayoutRef<'a> as PersistObject>::meta_enc(
buf,
obj::SpaceLayoutRef::from((data.space, data.space_meta)),
);
pub fn space_meta(&self) -> &DictGeneric {
self.space_meta
}
unsafe fn meta_dec(scanner: &mut BufferedScanner) -> RuntimeResult<Self::Metadata> {
let space_name_l = scanner.next_u64_le();
let space_meta = <obj::SpaceLayoutRef as PersistObject>::meta_dec(scanner)?;
Ok(CreateSpaceTxnMD {
space_name_l,
space_meta,
})
}
fn obj_enc(buf: &mut Vec<u8>, data: Self::InputType) {
buf.extend(data.space_name.as_bytes());
<obj::SpaceLayoutRef as PersistObject>::obj_enc(buf, (data.space, data.space_meta).into());
pub fn space_name(&self) -> &str {
self.space_name
}
unsafe fn obj_dec(
s: &mut BufferedScanner,
md: Self::Metadata,
) -> RuntimeResult<Self::OutputType> {
let space_name =
inf::dec::utils::decode_string(s, md.space_name_l as usize)?.into_boxed_str();
let space = <obj::SpaceLayoutRef as PersistObject>::obj_dec(s, md.space_meta)?;
Ok(CreateSpaceTxnRestorePL { space_name, space })
pub fn space(&self) -> &Space {
self.space
}
}
impl<'a> GNSEvent for CreateSpaceTxn<'a> {
const OPC: u16 = 0;
type CommitType = CreateSpaceTxn<'a>;
type RestoreType = CreateSpaceTxnRestorePL;
fn update_global_state(
CreateSpaceTxnRestorePL { space_name, space }: CreateSpaceTxnRestorePL,
gns: &crate::engine::core::GlobalNS,
) -> RuntimeResult<()> {
let mut spaces = gns.idx().write();
if spaces.st_insert(space_name, space.into()) {
Ok(())
} else {
Err(TransactionError::OnRestoreDataConflictAlreadyExists.into())
}
}
}
/*
alter space
---
for now dump the entire meta
*/
#[derive(Clone, Copy)]
/// Transaction payload for an `alter space ...` query
pub struct AlterSpaceTxn<'a> {
space_id: super::SpaceIDRef<'a>,
space_id: SpaceIDRef<'a>,
updated_props: &'a DictGeneric,
}
impl<'a> AlterSpaceTxn<'a> {
pub const fn new(space_id: super::SpaceIDRef<'a>, updated_props: &'a DictGeneric) -> Self {
pub const fn new(space_id: SpaceIDRef<'a>, updated_props: &'a DictGeneric) -> Self {
Self {
space_id,
updated_props,
}
}
}
pub struct AlterSpaceTxnMD {
space_id_meta: super::SpaceIDMD,
dict_len: u64,
}
#[derive(Debug, PartialEq)]
pub struct AlterSpaceTxnRestorePL {
pub(super) space_id: super::SpaceIDRes,
pub(super) space_meta: DictGeneric,
}
impl<'a> PersistObject for AlterSpaceTxn<'a> {
const METADATA_SIZE: usize = sizeof!(u64, 2) + sizeof!(u128);
type InputType = AlterSpaceTxn<'a>;
type OutputType = AlterSpaceTxnRestorePL;
type Metadata = AlterSpaceTxnMD;
fn pretest_can_dec_object(scanner: &BufferedScanner, md: &Self::Metadata) -> bool {
scanner.has_left(md.space_id_meta.space_name_l as usize)
}
fn meta_enc(buf: &mut Vec<u8>, data: Self::InputType) {
<super::SpaceID as PersistObject>::meta_enc(buf, data.space_id);
buf.extend(data.updated_props.len().u64_bytes_le());
}
unsafe fn meta_dec(scanner: &mut BufferedScanner) -> RuntimeResult<Self::Metadata> {
Ok(AlterSpaceTxnMD {
space_id_meta: <super::SpaceID as PersistObject>::meta_dec(scanner)?,
dict_len: scanner.next_u64_le(),
})
pub fn space_id(&self) -> SpaceIDRef<'_> {
self.space_id
}
fn obj_enc(buf: &mut Vec<u8>, data: Self::InputType) {
<super::SpaceID as PersistObject>::obj_enc(buf, data.space_id);
<map::PersistMapImpl<map::GenericDictSpec> as PersistObject>::obj_enc(
buf,
data.updated_props,
);
}
unsafe fn obj_dec(
s: &mut BufferedScanner,
md: Self::Metadata,
) -> RuntimeResult<Self::OutputType> {
let space_id = <super::SpaceID as PersistObject>::obj_dec(s, md.space_id_meta)?;
let space_meta = <map::PersistMapImpl<map::GenericDictSpec> as PersistObject>::obj_dec(
s,
map::MapIndexSizeMD(md.dict_len as usize),
)?;
Ok(AlterSpaceTxnRestorePL {
space_id,
space_meta,
})
pub fn updated_props(&self) -> &DictGeneric {
self.updated_props
}
}
impl<'a> GNSEvent for AlterSpaceTxn<'a> {
const OPC: u16 = 1;
type CommitType = AlterSpaceTxn<'a>;
type RestoreType = AlterSpaceTxnRestorePL;
fn update_global_state(
AlterSpaceTxnRestorePL {
space_id,
space_meta,
}: Self::RestoreType,
gns: &crate::engine::core::GlobalNS,
) -> RuntimeResult<()> {
let mut gns = gns.idx().write();
match gns.st_get_mut(&space_id.name) {
Some(space) => {
if !crate::engine::data::dict::rmerge_metadata(space.props_mut(), space_meta) {
return Err(TransactionError::OnRestoreDataConflictMismatch.into());
}
}
None => return Err(TransactionError::OnRestoreDataMissing.into()),
}
Ok(())
}
}
/*
drop space
*/
#[derive(Clone, Copy)]
/// Transaction commit payload for a `drop space ...` query
pub struct DropSpaceTxn<'a> {
space_id: super::SpaceIDRef<'a>,
space_id: SpaceIDRef<'a>,
}
impl<'a> DropSpaceTxn<'a> {
pub const fn new(space_id: super::SpaceIDRef<'a>) -> Self {
pub const fn new(space_id: SpaceIDRef<'a>) -> Self {
Self { space_id }
}
}
impl<'a> PersistObject for DropSpaceTxn<'a> {
const METADATA_SIZE: usize = sizeof!(u128) + sizeof!(u64);
type InputType = DropSpaceTxn<'a>;
type OutputType = super::SpaceIDRes;
type Metadata = super::SpaceIDMD;
fn pretest_can_dec_object(scanner: &BufferedScanner, md: &Self::Metadata) -> bool {
scanner.has_left(md.space_name_l as usize)
}
fn meta_enc(buf: &mut Vec<u8>, data: Self::InputType) {
<super::SpaceID as PersistObject>::meta_enc(buf, data.space_id);
}
unsafe fn meta_dec(scanner: &mut BufferedScanner) -> RuntimeResult<Self::Metadata> {
<super::SpaceID as PersistObject>::meta_dec(scanner)
}
fn obj_enc(buf: &mut Vec<u8>, data: Self::InputType) {
<super::SpaceID as PersistObject>::obj_enc(buf, data.space_id)
}
unsafe fn obj_dec(
s: &mut BufferedScanner,
md: Self::Metadata,
) -> RuntimeResult<Self::OutputType> {
<super::SpaceID as PersistObject>::obj_dec(s, md)
}
}
impl<'a> GNSEvent for DropSpaceTxn<'a> {
const OPC: u16 = 2;
type CommitType = DropSpaceTxn<'a>;
type RestoreType = super::SpaceIDRes;
fn update_global_state(
super::SpaceIDRes { uuid, name }: Self::RestoreType,
gns: &GlobalNS,
) -> RuntimeResult<()> {
let mut wgns = gns.idx().write();
let mut wmodel = gns.idx_models().write();
match wgns.entry(name) {
std::collections::hash_map::Entry::Occupied(oe) => {
if oe.get().get_uuid() == uuid {
for model in oe.get().models() {
let id: EntityIDRef<'static> = unsafe {
// UNSAFE(@ohsayan): I really need a pack of what the borrow checker has been reveling on
core::mem::transmute(EntityIDRef::new(oe.key(), &model))
};
let _ = wmodel.st_delete(&id);
}
oe.remove_entry();
Ok(())
} else {
return Err(TransactionError::OnRestoreDataConflictMismatch.into());
}
}
std::collections::hash_map::Entry::Vacant(_) => {
return Err(TransactionError::OnRestoreDataMissing.into())
}
}
pub fn space_id(&self) -> SpaceIDRef<'_> {
self.space_id
}
}

@ -23,5 +23,8 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
pub mod gns;
mod shared;
// re-export
pub use shared::{ModelIDRef, SpaceIDRef};

@ -0,0 +1,100 @@
/*
* Created on Sat Feb 10 2024
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2024, Sayan Nandan <nandansayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use crate::engine::{
core::{model::Model, space::Space},
data::uuid::Uuid,
};
#[derive(Debug, Clone, Copy)]
pub struct SpaceIDRef<'a> {
uuid: Uuid,
name: &'a str,
}
impl<'a> SpaceIDRef<'a> {
pub fn new(name: &'a str, space: &Space) -> Self {
Self {
uuid: space.get_uuid(),
name,
}
}
pub fn name(&self) -> &str {
self.name
}
pub fn uuid(&self) -> Uuid {
self.uuid
}
}
#[derive(Debug, Clone, Copy)]
pub struct ModelIDRef<'a> {
space_id: SpaceIDRef<'a>,
model_name: &'a str,
model_uuid: Uuid,
model_version: u64,
}
impl<'a> ModelIDRef<'a> {
pub fn new_ref(
space_name: &'a str,
space: &'a Space,
model_name: &'a str,
model: &'a Model,
) -> ModelIDRef<'a> {
ModelIDRef::new(
SpaceIDRef::new(space_name, space),
model_name,
model.get_uuid(),
model.delta_state().schema_current_version().value_u64(),
)
}
pub fn new(
space_id: SpaceIDRef<'a>,
model_name: &'a str,
model_uuid: Uuid,
model_version: u64,
) -> Self {
Self {
space_id,
model_name,
model_uuid,
model_version,
}
}
pub fn space_id(&self) -> SpaceIDRef {
self.space_id
}
pub fn model_name(&self) -> &str {
self.model_name
}
pub fn model_uuid(&self) -> Uuid {
self.model_uuid
}
pub fn model_version(&self) -> u64 {
self.model_version
}
}
Loading…
Cancel
Save