Refactor to isolate common txn storage encodings

next
Sayan Nandan 7 months ago
parent 4735fd4c1d
commit 794d4e3806
No known key found for this signature in database
GPG Key ID: 0EBD769024B24F0A

@ -273,7 +273,7 @@ impl Model {
&new_fields,
);
// commit txn
global.namespace_txn_driver().lock().try_commit(txn)?;
global.gns_driver().lock().gns_driver().try_commit(txn)?;
}
let mut mutator = model.model_mutator();
new_fields
@ -291,7 +291,7 @@ impl Model {
&removed,
);
// commit txn
global.namespace_txn_driver().lock().try_commit(txn)?;
global.gns_driver().lock().gns_driver().try_commit(txn)?;
}
let mut mutator = model.model_mutator();
removed.iter().for_each(|field_id| {
@ -306,7 +306,7 @@ impl Model {
&updated,
);
// commit txn
global.namespace_txn_driver().lock().try_commit(txn)?;
global.gns_driver().lock().gns_driver().try_commit(txn)?;
}
let mut mutator = model.model_mutator();
updated.into_iter().for_each(|(field_id, field)| {

@ -279,7 +279,7 @@ impl Model {
}
// since we've locked this down, no one else can parallely create another model in the same space (or remove)
if G::FS_IS_NON_NULL {
let mut txn_driver = global.namespace_txn_driver().lock();
let mut txn_driver = global.gns_driver().lock();
// prepare txn
let txn = gns::model::CreateModelTxn::new(
SpaceIDRef::new(&space_name, &space),
@ -294,7 +294,7 @@ impl Model {
model.get_uuid(),
)?;
// commit txn
match txn_driver.try_commit(txn) {
match txn_driver.gns_driver().try_commit(txn) {
Ok(()) => {}
Err(e) => {
// failed to commit, request cleanup
@ -358,7 +358,7 @@ impl Model {
model.delta_state().schema_current_version().value_u64(),
));
// commit txn
global.namespace_txn_driver().lock().try_commit(txn)?;
global.gns_driver().lock().gns_driver().try_commit(txn)?;
// request cleanup
global.purge_model_driver(
space_name,

@ -174,7 +174,7 @@ impl Space {
space.get_uuid(),
))?;
// commit txn
match global.namespace_txn_driver().lock().try_commit(txn) {
match global.gns_driver().lock().gns_driver().try_commit(txn) {
Ok(()) => {}
Err(e) => {
// tell fractal to clean it up sometime
@ -221,7 +221,7 @@ impl Space {
&patch,
);
// commit
global.namespace_txn_driver().lock().try_commit(txn)?;
global.gns_driver().lock().gns_driver().try_commit(txn)?;
}
// merge
dict::rmerge_data_with_patch(space.props_mut(), patch);
@ -256,7 +256,7 @@ impl Space {
let txn =
txn::gns::space::DropSpaceTxn::new(SpaceIDRef::new(&space_name, &space));
// commit txn
global.namespace_txn_driver().lock().try_commit(txn)?;
global.gns_driver().lock().gns_driver().try_commit(txn)?;
// request cleanup
global.taskmgr_post_standard_priority(Task::new(
GenericTask::delete_space_dir(&space_name, space.get_uuid()),
@ -303,7 +303,7 @@ impl Space {
let txn =
txn::gns::space::DropSpaceTxn::new(SpaceIDRef::new(&space_name, &space));
// commit txn
global.namespace_txn_driver().lock().try_commit(txn)?;
global.gns_driver().lock().gns_driver().try_commit(txn)?;
// request cleanup
global.taskmgr_post_standard_priority(Task::new(
GenericTask::delete_space_dir(&space_name, space.get_uuid()),

@ -38,21 +38,21 @@ use {
};
/// GNS driver
pub(super) struct FractalGNSDriver<Fs: FSInterface> {
pub struct FractalGNSDriver<Fs: FSInterface> {
#[allow(unused)]
status: util::Status,
pub(super) txn_driver: Mutex<GNSTransactionDriverAnyFS<Fs>>,
pub(super) txn_driver: GNSTransactionDriverAnyFS<Fs>,
}
impl<Fs: FSInterface> FractalGNSDriver<Fs> {
pub(super) fn new(txn_driver: GNSTransactionDriverAnyFS<Fs>) -> Self {
Self {
status: util::Status::new_okay(),
txn_driver: Mutex::new(txn_driver),
txn_driver: txn_driver,
}
}
pub fn txn_driver(&self) -> &Mutex<GNSTransactionDriverAnyFS<Fs>> {
&self.txn_driver
pub fn gns_driver(&mut self) -> &mut GNSTransactionDriverAnyFS<Fs> {
&mut self.txn_driver
}
}

@ -111,7 +111,7 @@ pub trait GlobalInstanceLike {
fn get_max_delta_size(&self) -> usize;
// global namespace
fn namespace(&self) -> &GlobalNS;
fn namespace_txn_driver(&self) -> &Mutex<GNSTransactionDriverAnyFS<Self::FileSystem>>;
fn gns_driver(&self) -> &Mutex<drivers::FractalGNSDriver<Self::FileSystem>>;
// model drivers
fn initialize_model_driver(
&self,
@ -163,8 +163,8 @@ impl GlobalInstanceLike for Global {
fn namespace(&self) -> &GlobalNS {
self._namespace()
}
fn namespace_txn_driver(&self) -> &Mutex<GNSTransactionDriverAnyFS<Self::FileSystem>> {
self.get_state().gns_driver.txn_driver()
fn gns_driver(&self) -> &Mutex<drivers::FractalGNSDriver<Self::FileSystem>> {
&self.get_state().gns_driver
}
// taskmgr
fn taskmgr_post_high_priority(&self, task: Task<CriticalTask>) {
@ -274,7 +274,7 @@ impl Global {
mdl_driver,
..
} = Self::__gref_raw().assume_init_read();
let gns_driver = gns_driver.txn_driver.into_inner().into_inner();
let gns_driver = gns_driver.into_inner().txn_driver.into_inner();
let mdl_drivers = mdl_driver.into_inner();
gns_driver.close().unwrap();
for (_, driver) in mdl_drivers {
@ -290,7 +290,7 @@ impl Global {
/// The global state
struct GlobalState {
gns: GlobalNS,
gns_driver: drivers::FractalGNSDriver<LocalFS>,
gns_driver: Mutex<drivers::FractalGNSDriver<LocalFS>>,
mdl_driver: RwLock<ModelDrivers<LocalFS>>,
task_mgr: mgr::FractalMgr,
config: SystemStore<LocalFS>,
@ -306,7 +306,7 @@ impl GlobalState {
) -> Self {
Self {
gns,
gns_driver,
gns_driver: Mutex::new(gns_driver),
mdl_driver,
task_mgr,
config,

@ -26,6 +26,7 @@
use {
super::{
drivers::FractalGNSDriver,
sys_store::{SysConfig, SystemStore},
CriticalTask, FractalModelDriver, GenericTask, GlobalInstanceLike, ModelUniqueID, Task,
},
@ -49,7 +50,7 @@ pub struct TestGlobal<Fs: FSInterface = VirtualFS> {
lp_queue: RwLock<Vec<Task<GenericTask>>>,
#[allow(unused)]
max_delta_size: usize,
txn_driver: Mutex<GNSTransactionDriverAnyFS<Fs>>,
txn_driver: Mutex<FractalGNSDriver<Fs>>,
model_drivers: RwLock<HashMap<ModelUniqueID, FractalModelDriver<Fs>>>,
sys_cfg: SystemStore<Fs>,
}
@ -65,7 +66,7 @@ impl<Fs: FSInterface> TestGlobal<Fs> {
hp_queue: RwLock::default(),
lp_queue: RwLock::default(),
max_delta_size,
txn_driver: Mutex::new(txn_driver),
txn_driver: Mutex::new(FractalGNSDriver::new(txn_driver)),
model_drivers: RwLock::default(),
sys_cfg: SystemStore::_new(SysConfig::test_default()),
}
@ -102,7 +103,7 @@ impl<Fs: FSInterface> GlobalInstanceLike for TestGlobal<Fs> {
fn namespace(&self) -> &GlobalNS {
&self.gns
}
fn namespace_txn_driver(&self) -> &Mutex<GNSTransactionDriverAnyFS<Self::FileSystem>> {
fn gns_driver(&self) -> &Mutex<FractalGNSDriver<Self::FileSystem>> {
&self.txn_driver
}
fn taskmgr_post_high_priority(&self, task: Task<CriticalTask>) {
@ -162,6 +163,10 @@ impl<Fs: FSInterface> GlobalInstanceLike for TestGlobal<Fs> {
impl<Fs: FSInterface> Drop for TestGlobal<Fs> {
fn drop(&mut self) {
let mut txn_driver = self.txn_driver.lock();
txn_driver.__journal_mut().__close_mut().unwrap();
txn_driver
.gns_driver()
.__journal_mut()
.__close_mut()
.unwrap();
}
}

@ -25,105 +25,26 @@
*/
use {
super::super::raw::journal::{JournalAdapter, JournalWriter},
crate::{
engine::{
core::GlobalNS,
data::uuid::Uuid,
error::{RuntimeResult, TransactionError},
error::RuntimeResult,
mem::BufferedScanner,
storage::{
common_encoding::r1::{self, PersistObject},
safe_interfaces::{FSInterface, LocalFS},
},
txn::{gns, SpaceIDRef},
storage::common_encoding::r1::{self, PersistObject},
txn::SpaceIDRef,
},
util::EndianQW,
},
std::marker::PhantomData,
};
mod model;
mod space;
pub mod model;
pub mod space;
// test
#[cfg(test)]
mod tests;
/// The GNS transaction driver is used to handle DDL transactions
pub struct GNSTransactionDriverAnyFS<Fs: FSInterface = LocalFS> {
journal: JournalWriter<Fs, GNSAdapter>,
}
impl<Fs: FSInterface> GNSTransactionDriverAnyFS<Fs> {
pub fn new(journal: JournalWriter<Fs, GNSAdapter>) -> Self {
Self { journal }
}
pub fn into_inner(self) -> JournalWriter<Fs, GNSAdapter> {
self.journal
}
pub fn __journal_mut(&mut self) -> &mut JournalWriter<Fs, GNSAdapter> {
&mut self.journal
}
/// Attempts to commit the given event into the journal, handling any possible recovery triggers and returning
/// errors (if any)
pub fn try_commit<GE: GNSEvent>(&mut self, gns_event: GE) -> RuntimeResult<()> {
let mut buf = vec![];
buf.extend(GE::OPC.to_le_bytes());
GE::encode_super_event(gns_event, &mut buf);
self.journal
.append_event_with_recovery_plugin(GNSSuperEvent(buf.into_boxed_slice()))?;
Ok(())
}
}
/*
journal implementor
*/
/// the journal adapter for DDL queries on the GNS
#[derive(Debug)]
pub struct GNSAdapter;
impl JournalAdapter for GNSAdapter {
const RECOVERY_PLUGIN: bool = true;
type JournalEvent = GNSSuperEvent;
type GlobalState = GlobalNS;
type Error = crate::engine::fractal::error::Error;
fn encode(GNSSuperEvent(b): Self::JournalEvent) -> Box<[u8]> {
b
}
fn decode_and_update_state(payload: &[u8], gs: &Self::GlobalState) -> RuntimeResult<()> {
if payload.len() < 2 {
return Err(TransactionError::DecodedUnexpectedEof.into());
}
macro_rules! dispatch {
($($item:ty),* $(,)?) => {
[$(<$item as GNSEvent>::decode_and_update_global_state),*, |_, _| Err(TransactionError::DecodeUnknownTxnOp.into())]
};
}
static DISPATCH: [fn(&mut BufferedScanner, &GlobalNS) -> RuntimeResult<()>; 9] = dispatch!(
gns::space::CreateSpaceTxn,
gns::space::AlterSpaceTxn,
gns::space::DropSpaceTxn,
gns::model::CreateModelTxn,
gns::model::AlterModelAddTxn,
gns::model::AlterModelRemoveTxn,
gns::model::AlterModelUpdateTxn,
gns::model::DropModelTxn
);
let mut scanner = BufferedScanner::new(&payload);
let opc = unsafe {
// UNSAFE(@ohsayan):
u16::from_le_bytes(scanner.next_chunk())
};
match DISPATCH[(opc as usize).min(DISPATCH.len())](&mut scanner, gs) {
Ok(()) if scanner.eof() => return Ok(()),
Ok(_) => Err(TransactionError::DecodeCorruptedPayloadMoreBytes.into()),
Err(e) => Err(e),
}
}
}
/*
Events
---
@ -134,8 +55,6 @@ impl JournalAdapter for GNSAdapter {
[OPC:2B][PAYLOAD]
*/
pub struct GNSSuperEvent(Box<[u8]>);
/// Definition for an event in the GNS (DDL queries)
pub trait GNSEvent
where

@ -29,6 +29,7 @@
//! This is revision 1 of high-level interface encoding.
//!
pub mod impls;
pub mod map;
pub mod obj;
// tests

@ -27,19 +27,19 @@
#[cfg(test)]
use crate::engine::storage::{
common::interface::fs_traits::{FSInterface, FileOpen},
v1::raw::journal::JournalWriter,
v1::raw::journal::raw::JournalWriter,
};
use crate::engine::{
core::{EntityIDRef, GlobalNS},
data::uuid::Uuid,
error::RuntimeResult,
fractal::error::ErrorContext,
fractal::{FractalModelDriver, ModelDrivers, ModelUniqueID},
fractal::{error::ErrorContext, FractalModelDriver, ModelDrivers, ModelUniqueID},
storage::{
common::interface::fs_imp::LocalFS,
v1::{
impls::gns::{GNSAdapter, GNSTransactionDriverAnyFS},
raw::{batch_jrnl, journal, spec},
v1::raw::{
batch_jrnl,
journal::{raw as raw_journal, GNSAdapter, GNSTransactionDriverAnyFS},
spec,
},
},
};
@ -68,9 +68,11 @@ impl SEInitState {
pub fn try_init(is_new: bool) -> RuntimeResult<Self> {
let gns = GlobalNS::empty();
let gns_txn_driver = if is_new {
journal::create_journal::<GNSAdapter, LocalFS, spec::GNSTransactionLogV1>(GNS_FILE_PATH)
raw_journal::create_journal::<GNSAdapter, LocalFS, spec::GNSTransactionLogV1>(
GNS_FILE_PATH,
)
} else {
journal::load_journal::<GNSAdapter, LocalFS, spec::GNSTransactionLogV1>(
raw_journal::load_journal::<GNSAdapter, LocalFS, spec::GNSTransactionLogV1>(
GNS_FILE_PATH,
&gns,
)
@ -149,5 +151,5 @@ pub fn open_gns_driver<Fs: FSInterface>(
path: &str,
gns: &GlobalNS,
) -> RuntimeResult<FileOpen<JournalWriter<Fs, GNSAdapter>>> {
journal::open_or_create_journal::<GNSAdapter, Fs, spec::GNSTransactionLogV1>(path, gns)
raw_journal::open_or_create_journal::<GNSAdapter, Fs, spec::GNSTransactionLogV1>(path, gns)
}

@ -28,11 +28,10 @@
//!
//! Target tags: `0.8.0-beta`, `0.8.0-beta.2`, `0.8.0-beta.3`
mod impls;
pub mod loader;
pub mod raw;
pub use self::{
impls::gns::GNSTransactionDriverAnyFS, raw::batch_jrnl::create as create_batch_journal,
raw::batch_jrnl::DataBatchPersistDriver,
raw::batch_jrnl::create as create_batch_journal, raw::batch_jrnl::DataBatchPersistDriver,
raw::journal::GNSTransactionDriverAnyFS,
};

@ -0,0 +1,119 @@
/*
* Created on Tue Feb 13 2024
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2024, Sayan Nandan <nandansayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use {
self::raw::{JournalAdapter, JournalWriter},
crate::engine::{
core::GlobalNS,
error::TransactionError,
mem::BufferedScanner,
storage::{
common_encoding::r1::impls::gns::GNSEvent,
safe_interfaces::{FSInterface, LocalFS},
},
txn::gns,
RuntimeResult,
},
};
pub mod raw;
/// The GNS transaction driver is used to handle DDL transactions
pub struct GNSTransactionDriverAnyFS<Fs: FSInterface = LocalFS> {
journal: JournalWriter<Fs, GNSAdapter>,
}
impl<Fs: FSInterface> GNSTransactionDriverAnyFS<Fs> {
pub fn new(journal: JournalWriter<Fs, GNSAdapter>) -> Self {
Self { journal }
}
pub fn into_inner(self) -> JournalWriter<Fs, GNSAdapter> {
self.journal
}
pub fn __journal_mut(&mut self) -> &mut JournalWriter<Fs, GNSAdapter> {
&mut self.journal
}
/// Attempts to commit the given event into the journal, handling any possible recovery triggers and returning
/// errors (if any)
pub fn try_commit<GE: GNSEvent>(&mut self, gns_event: GE) -> RuntimeResult<()> {
let mut buf = vec![];
buf.extend(GE::OPC.to_le_bytes());
GE::encode_super_event(gns_event, &mut buf);
self.journal
.append_event_with_recovery_plugin(GNSSuperEvent(buf.into_boxed_slice()))?;
Ok(())
}
}
/*
journal implementor
*/
pub struct GNSSuperEvent(Box<[u8]>);
/// the journal adapter for DDL queries on the GNS
#[derive(Debug)]
pub struct GNSAdapter;
impl JournalAdapter for GNSAdapter {
const RECOVERY_PLUGIN: bool = true;
type JournalEvent = GNSSuperEvent;
type GlobalState = GlobalNS;
type Error = crate::engine::fractal::error::Error;
fn encode(GNSSuperEvent(b): Self::JournalEvent) -> Box<[u8]> {
b
}
fn decode_and_update_state(payload: &[u8], gs: &Self::GlobalState) -> RuntimeResult<()> {
if payload.len() < 2 {
return Err(TransactionError::DecodedUnexpectedEof.into());
}
macro_rules! dispatch {
($($item:ty),* $(,)?) => {
[$(<$item as GNSEvent>::decode_and_update_global_state),*, |_, _| Err(TransactionError::DecodeUnknownTxnOp.into())]
};
}
static DISPATCH: [fn(&mut BufferedScanner, &GlobalNS) -> RuntimeResult<()>; 9] = dispatch!(
gns::space::CreateSpaceTxn,
gns::space::AlterSpaceTxn,
gns::space::DropSpaceTxn,
gns::model::CreateModelTxn,
gns::model::AlterModelAddTxn,
gns::model::AlterModelRemoveTxn,
gns::model::AlterModelUpdateTxn,
gns::model::DropModelTxn
);
let mut scanner = BufferedScanner::new(&payload);
let opc = unsafe {
// UNSAFE(@ohsayan):
u16::from_le_bytes(scanner.next_chunk())
};
match DISPATCH[(opc as usize).min(DISPATCH.len())](&mut scanner, gs) {
Ok(()) if scanner.eof() => return Ok(()),
Ok(_) => Err(TransactionError::DecodeCorruptedPayloadMoreBytes.into()),
Err(e) => Err(e),
}
}
}

@ -45,7 +45,7 @@
use crate::engine::storage::common::interface::fs_traits::FileOpen;
use {
super::{rw::SDSSFileIO, spec::Header},
super::super::{rw::SDSSFileIO, spec::Header},
crate::{
engine::{
error::{RuntimeResult, StorageError},

@ -29,7 +29,7 @@ use {
engine::{
error::{RuntimeResult, StorageError},
storage::v1::raw::{
journal::{self, JournalAdapter, JournalWriter},
journal::raw::{self, JournalAdapter, JournalWriter},
spec,
},
},
@ -130,7 +130,7 @@ fn open_log(
log_name: &str,
db: &Database,
) -> RuntimeResult<JournalWriter<super::VirtualFS, DatabaseTxnAdapter>> {
journal::open_or_create_journal::<DatabaseTxnAdapter, super::VirtualFS, spec::TestFile>(
raw::open_or_create_journal::<DatabaseTxnAdapter, super::VirtualFS, spec::TestFile>(
log_name, db,
)
.map(|v| v.into_inner())

Loading…
Cancel
Save