Further abstract batch storage driver

next
Sayan Nandan 7 months ago
parent a188ccb60d
commit 102a4b4b40
No known key found for this signature in database
GPG Key ID: 0EBD769024B24F0A

@ -29,10 +29,10 @@ use {
engine::{
core::GlobalNS,
data::uuid::Uuid,
error::RuntimeResult,
error::{RuntimeResult, StorageError},
mem::BufferedScanner,
storage::common_encoding::r1::{self, PersistObject},
txn::SpaceIDRef,
txn::{gns::GNSTransaction, SpaceIDRef},
},
util::EndianQW,
},
@ -58,17 +58,24 @@ mod tests;
/// Definition for an event in the GNS (DDL queries)
pub trait GNSEvent
where
Self: PersistObject<InputType = Self, OutputType = Self::RestoreType> + Sized,
Self: PersistObject<InputType = Self, OutputType = Self::RestoreType> + Sized + GNSTransaction,
{
/// OPC for the event (unique)
const OPC: u16;
/// Expected type for a commit
type CommitType;
/// Expected type for a restore
type RestoreType;
/// Encodes the event into the given buffer
fn encode_super_event(commit: Self, buf: &mut Vec<u8>) {
r1::enc::enc_full_into_buffer::<Self>(buf, commit)
fn encode_event(commit: Self, buf: &mut Vec<u8>) {
r1::enc::full_into_buffer::<Self>(buf, commit)
}
fn decode_apply(gns: &GlobalNS, data: Vec<u8>) -> RuntimeResult<()> {
let mut scanner = BufferedScanner::new(&data);
Self::decode_and_update_global_state(&mut scanner, gns)?;
if scanner.eof() {
Ok(())
} else {
Err(StorageError::JournalLogEntryCorrupted.into())
}
}
fn decode_and_update_global_state(
scanner: &mut BufferedScanner,
@ -78,7 +85,7 @@ where
}
/// Attempts to decode the event using the given scanner
fn decode(scanner: &mut BufferedScanner) -> RuntimeResult<Self::RestoreType> {
r1::dec::dec_full_from_scanner::<Self>(scanner).map_err(|e| e.into())
r1::dec::full_from_scanner::<Self>(scanner).map_err(|e| e.into())
}
/// Update the global state from the restored event
fn update_global_state(restore: Self::RestoreType, gns: &GlobalNS) -> RuntimeResult<()>;

@ -252,7 +252,6 @@ impl<'a> PersistObject for CreateModelTxn<'a> {
}
impl<'a> GNSEvent for CreateModelTxn<'a> {
const OPC: u16 = 3;
type CommitType = CreateModelTxn<'a>;
type RestoreType = CreateModelTxnRestorePL;
fn update_global_state(
@ -349,7 +348,6 @@ impl<'a> PersistObject for AlterModelAddTxn<'a> {
}
impl<'a> GNSEvent for AlterModelAddTxn<'a> {
const OPC: u16 = 4;
type CommitType = AlterModelAddTxn<'a>;
type RestoreType = AlterModelAddTxnRestorePL;
fn update_global_state(
@ -440,7 +438,6 @@ impl<'a> PersistObject for AlterModelRemoveTxn<'a> {
}
impl<'a> GNSEvent for AlterModelRemoveTxn<'a> {
const OPC: u16 = 5;
type CommitType = AlterModelRemoveTxn<'a>;
type RestoreType = AlterModelRemoveTxnRestorePL;
fn update_global_state(
@ -522,7 +519,6 @@ impl<'a> PersistObject for AlterModelUpdateTxn<'a> {
}
impl<'a> GNSEvent for AlterModelUpdateTxn<'a> {
const OPC: u16 = 6;
type CommitType = AlterModelUpdateTxn<'a>;
type RestoreType = AlterModelUpdateTxnRestorePL;
fn update_global_state(
@ -580,7 +576,6 @@ impl<'a> PersistObject for DropModelTxn<'a> {
}
impl<'a> GNSEvent for DropModelTxn<'a> {
const OPC: u16 = 7;
type CommitType = DropModelTxn<'a>;
type RestoreType = ModelIDRes;
fn update_global_state(

@ -98,7 +98,6 @@ impl<'a> PersistObject for CreateSpaceTxn<'a> {
}
impl<'a> GNSEvent for CreateSpaceTxn<'a> {
const OPC: u16 = 0;
type CommitType = CreateSpaceTxn<'a>;
type RestoreType = CreateSpaceTxnRestorePL;
fn update_global_state(
@ -173,7 +172,6 @@ impl<'a> PersistObject for AlterSpaceTxn<'a> {
}
impl<'a> GNSEvent for AlterSpaceTxn<'a> {
const OPC: u16 = 1;
type CommitType = AlterSpaceTxn<'a>;
type RestoreType = AlterSpaceTxnRestorePL;
@ -227,7 +225,6 @@ impl<'a> PersistObject for DropSpaceTxn<'a> {
}
impl<'a> GNSEvent for DropSpaceTxn<'a> {
const OPC: u16 = 2;
type CommitType = DropSpaceTxn<'a>;
type RestoreType = super::SpaceIDRes;
fn update_global_state(

@ -50,8 +50,8 @@ mod space_tests {
let orig_space = Space::new_auto_all();
let space_r = orig_space.props();
let txn = CreateSpaceTxn::new(&space_r, "myspace", &orig_space);
let encoded = enc::enc_full_self(txn);
let decoded = dec::dec_full::<CreateSpaceTxn>(&encoded).unwrap();
let encoded = enc::full_self(txn);
let decoded = dec::full::<CreateSpaceTxn>(&encoded).unwrap();
assert_eq!(
CreateSpaceTxnRestorePL {
space_name: "myspace".into(),
@ -65,8 +65,8 @@ mod space_tests {
let space = Space::new_auto_all();
let space_r = space.props();
let txn = AlterSpaceTxn::new(SpaceIDRef::new("myspace", &space), &space_r);
let encoded = enc::enc_full_self(txn);
let decoded = dec::dec_full::<AlterSpaceTxn>(&encoded).unwrap();
let encoded = enc::full_self(txn);
let decoded = dec::full::<AlterSpaceTxn>(&encoded).unwrap();
assert_eq!(
AlterSpaceTxnRestorePL {
space_id: super::SpaceIDRes::new(space.get_uuid(), "myspace".into()),
@ -79,8 +79,8 @@ mod space_tests {
fn drop() {
let space = Space::new_auto_all();
let txn = DropSpaceTxn::new(super::SpaceIDRef::new("myspace", &space));
let encoded = enc::enc_full_self(txn);
let decoded = dec::dec_full::<DropSpaceTxn>(&encoded).unwrap();
let encoded = enc::full_self(txn);
let decoded = dec::full::<DropSpaceTxn>(&encoded).unwrap();
assert_eq!(
super::SpaceIDRes::new(space.get_uuid(), "myspace".into()),
decoded
@ -123,8 +123,8 @@ mod model_tests {
fn create() {
let (space, model) = default_space_model();
let txn = CreateModelTxn::new(super::SpaceIDRef::new("myspace", &space), "mymodel", &model);
let encoded = super::enc::enc_full_self(txn);
let decoded = super::dec::dec_full::<CreateModelTxn>(&encoded).unwrap();
let encoded = super::enc::full_self(txn);
let decoded = super::dec::full::<CreateModelTxn>(&encoded).unwrap();
assert_eq!(
CreateModelTxnRestorePL {
space_id: super::SpaceIDRes::new(space.get_uuid(), "myspace".into()),
@ -149,8 +149,8 @@ mod model_tests {
),
&new_fields,
);
let encoded = super::enc::enc_full_self(txn);
let decoded = super::dec::dec_full::<AlterModelAddTxn>(&encoded).unwrap();
let encoded = super::enc::full_self(txn);
let decoded = super::dec::full::<AlterModelAddTxn>(&encoded).unwrap();
assert_eq!(
AlterModelAddTxnRestorePL {
model_id: super::ModelIDRes::new(
@ -179,8 +179,8 @@ mod model_tests {
),
&removed_fields,
);
let encoded = super::enc::enc_full_self(txn);
let decoded = super::dec::dec_full::<AlterModelRemoveTxn>(&encoded).unwrap();
let encoded = super::enc::full_self(txn);
let decoded = super::dec::full::<AlterModelRemoveTxn>(&encoded).unwrap();
assert_eq!(
AlterModelRemoveTxnRestorePL {
model_id: super::ModelIDRes::new(
@ -214,8 +214,8 @@ mod model_tests {
),
&updated_fields,
);
let encoded = super::enc::enc_full_self(txn);
let decoded = super::dec::dec_full::<AlterModelUpdateTxn>(&encoded).unwrap();
let encoded = super::enc::full_self(txn);
let decoded = super::dec::full::<AlterModelUpdateTxn>(&encoded).unwrap();
assert_eq!(
AlterModelUpdateTxnRestorePL {
model_id: super::ModelIDRes::new(
@ -238,8 +238,8 @@ mod model_tests {
model.get_uuid(),
model.delta_state().schema_current_version().value_u64(),
));
let encoded = super::enc::enc_full_self(txn);
let decoded = super::dec::dec_full::<DropModelTxn>(&encoded).unwrap();
let encoded = super::enc::full_self(txn);
let decoded = super::dec::full::<DropModelTxn>(&encoded).unwrap();
assert_eq!(
super::ModelIDRes::new(
super::SpaceIDRes::new(space.get_uuid(), "myspace".into()),

@ -216,25 +216,25 @@ pub mod enc {
use super::{map, MapStorageSpec, PersistObject, VecU8};
// obj
#[cfg(test)]
pub fn enc_full<Obj: PersistObject>(obj: Obj::InputType) -> Vec<u8> {
pub fn full<Obj: PersistObject>(obj: Obj::InputType) -> Vec<u8> {
let mut v = vec![];
enc_full_into_buffer::<Obj>(&mut v, obj);
full_into_buffer::<Obj>(&mut v, obj);
v
}
pub fn enc_full_into_buffer<Obj: PersistObject>(buf: &mut VecU8, obj: Obj::InputType) {
pub fn full_into_buffer<Obj: PersistObject>(buf: &mut VecU8, obj: Obj::InputType) {
Obj::default_full_enc(buf, obj)
}
#[cfg(test)]
pub fn enc_full_self<Obj: PersistObject<InputType = Obj>>(obj: Obj) -> Vec<u8> {
enc_full::<Obj>(obj)
pub fn full_self<Obj: PersistObject<InputType = Obj>>(obj: Obj) -> Vec<u8> {
full::<Obj>(obj)
}
// dict
pub fn enc_dict_full<PM: MapStorageSpec>(dict: &PM::InMemoryMap) -> Vec<u8> {
pub fn full_dict<PM: MapStorageSpec>(dict: &PM::InMemoryMap) -> Vec<u8> {
let mut v = vec![];
enc_dict_full_into_buffer::<PM>(&mut v, dict);
full_dict_into_buffer::<PM>(&mut v, dict);
v
}
pub fn enc_dict_full_into_buffer<PM: MapStorageSpec>(buf: &mut VecU8, dict: &PM::InMemoryMap) {
pub fn full_dict_into_buffer<PM: MapStorageSpec>(buf: &mut VecU8, dict: &PM::InMemoryMap) {
<map::PersistMapImpl<PM> as PersistObject>::default_full_enc(buf, dict)
}
}
@ -247,21 +247,21 @@ pub mod dec {
};
// obj
#[cfg(test)]
pub fn dec_full<Obj: PersistObject>(data: &[u8]) -> RuntimeResult<Obj::OutputType> {
pub fn full<Obj: PersistObject>(data: &[u8]) -> RuntimeResult<Obj::OutputType> {
let mut scanner = BufferedScanner::new(data);
dec_full_from_scanner::<Obj>(&mut scanner)
full_from_scanner::<Obj>(&mut scanner)
}
pub fn dec_full_from_scanner<Obj: PersistObject>(
pub fn full_from_scanner<Obj: PersistObject>(
scanner: &mut BufferedScanner,
) -> RuntimeResult<Obj::OutputType> {
Obj::default_full_dec(scanner)
}
// dec
pub fn dec_dict_full<PM: MapStorageSpec>(data: &[u8]) -> RuntimeResult<PM::RestoredMap> {
pub fn dict_full<PM: MapStorageSpec>(data: &[u8]) -> RuntimeResult<PM::RestoredMap> {
let mut scanner = BufferedScanner::new(data);
dec_dict_full_from_scanner::<PM>(&mut scanner)
dict_full_from_scanner::<PM>(&mut scanner)
}
fn dec_dict_full_from_scanner<PM: MapStorageSpec>(
fn dict_full_from_scanner<PM: MapStorageSpec>(
scanner: &mut BufferedScanner,
) -> RuntimeResult<PM::RestoredMap> {
<map::PersistMapImpl<PM> as PersistObject>::default_full_dec(scanner)

@ -51,24 +51,24 @@ fn dict() {
"and a null" => Datacell::null(),
))
};
let encoded = super::enc::enc_dict_full::<super::map::GenericDictSpec>(&dict);
let decoded = super::dec::dec_dict_full::<super::map::GenericDictSpec>(&encoded).unwrap();
let encoded = super::enc::full_dict::<super::map::GenericDictSpec>(&dict);
let decoded = super::dec::dict_full::<super::map::GenericDictSpec>(&encoded).unwrap();
assert_eq!(dict, decoded);
}
#[test]
fn layer() {
let layer = Layer::list();
let encoded = super::enc::enc_full::<obj::LayerRef>(obj::LayerRef(&layer));
let dec = super::dec::dec_full::<obj::LayerRef>(&encoded).unwrap();
let encoded = super::enc::full::<obj::LayerRef>(obj::LayerRef(&layer));
let dec = super::dec::full::<obj::LayerRef>(&encoded).unwrap();
assert_eq!(layer, dec);
}
#[test]
fn field() {
let field = Field::new([Layer::list(), Layer::uint64()].into(), true);
let encoded = super::enc::enc_full::<obj::FieldRef>((&field).into());
let dec = super::dec::dec_full::<obj::FieldRef>(&encoded).unwrap();
let encoded = super::enc::full::<obj::FieldRef>((&field).into());
let dec = super::dec::full::<obj::FieldRef>(&encoded).unwrap();
assert_eq!(field, dec);
}
@ -80,8 +80,8 @@ fn fieldmap() {
"profile_pic".into(),
Field::new([Layer::bin()].into(), true),
);
let enc = super::enc::enc_dict_full::<super::map::FieldMapSpec<_>>(&fields);
let dec = super::dec::dec_dict_full::<
let enc = super::enc::full_dict::<super::map::FieldMapSpec<_>>(&fields);
let dec = super::dec::dict_full::<
super::map::FieldMapSpec<crate::engine::idx::IndexSTSeqCns<Box<str>, _>>,
>(&enc)
.unwrap();
@ -105,8 +105,8 @@ fn model() {
"profile_pic" => Field::new([Layer::bin()].into(), true),
},
);
let enc = super::enc::enc_full::<obj::ModelLayoutRef>(obj::ModelLayoutRef(&model));
let dec = super::dec::dec_full::<obj::ModelLayoutRef>(&enc).unwrap();
let enc = super::enc::full::<obj::ModelLayoutRef>(obj::ModelLayoutRef(&model));
let dec = super::dec::full::<obj::ModelLayoutRef>(&enc).unwrap();
assert_eq!(model, dec);
}
@ -114,10 +114,8 @@ fn model() {
fn space() {
let uuid = Uuid::new();
let space = Space::new_restore_empty(uuid, Default::default());
let enc = super::enc::enc_full::<obj::SpaceLayoutRef>(obj::SpaceLayoutRef::from((
&space,
space.props(),
)));
let dec = super::dec::dec_full::<obj::SpaceLayoutRef>(&enc).unwrap();
let enc =
super::enc::full::<obj::SpaceLayoutRef>(obj::SpaceLayoutRef::from((&space, space.props())));
let dec = super::dec::full::<obj::SpaceLayoutRef>(&enc).unwrap();
assert_eq!(space, dec);
}

@ -60,8 +60,8 @@ impl<Fs: FSInterface> GNSTransactionDriverAnyFS<Fs> {
/// errors (if any)
pub fn try_commit<GE: GNSEvent>(&mut self, gns_event: GE) -> RuntimeResult<()> {
let mut buf = vec![];
buf.extend(GE::OPC.to_le_bytes());
GE::encode_super_event(gns_event, &mut buf);
buf.extend((GE::CODE as u16).to_le_bytes());
GE::encode_event(gns_event, &mut buf);
self.journal
.append_event_with_recovery_plugin(GNSSuperEvent(buf.into_boxed_slice()))?;
Ok(())
@ -87,9 +87,6 @@ impl JournalAdapter for GNSAdapter {
b
}
fn decode_and_update_state(payload: &[u8], gs: &Self::GlobalState) -> RuntimeResult<()> {
if payload.len() < 2 {
return Err(TransactionError::DecodedUnexpectedEof.into());
}
macro_rules! dispatch {
($($item:ty),* $(,)?) => {
[$(<$item as GNSEvent>::decode_and_update_global_state),*, |_, _| Err(TransactionError::DecodeUnknownTxnOp.into())]
@ -105,9 +102,12 @@ impl JournalAdapter for GNSAdapter {
gns::model::AlterModelUpdateTxn,
gns::model::DropModelTxn
);
if payload.len() < 2 {
return Err(TransactionError::DecodedUnexpectedEof.into());
}
let mut scanner = BufferedScanner::new(&payload);
let opc = unsafe {
// UNSAFE(@ohsayan):
// UNSAFE(@ohsayan): first branch ensures atleast two bytes
u16::from_le_bytes(scanner.next_chunk())
};
match DISPATCH[(opc as usize).min(DISPATCH.len())](&mut scanner, gs) {

@ -138,7 +138,7 @@ impl<Fs: FSInterface> SystemStore<Fs> {
),
);
// write
let buf = r1::enc::enc_dict_full::<r1::map::GenericDictSpec>(&map);
let buf = r1::enc::full_dict::<r1::map::GenericDictSpec>(&map);
f.fsynced_write(&buf)
}
fn _sync_with(&self, target: &str, cow: &str, auth: &SysAuth) -> RuntimeResult<()> {
@ -182,7 +182,7 @@ impl<Fs: FSInterface> SystemStore<Fs> {
Ok((slf, state))
}
fn _restore(mut f: SDSSFileIO<Fs>, run_mode: ConfigMode) -> RuntimeResult<SysConfig> {
let mut sysdb_data = r1::dec::dec_dict_full::<r1::map::GenericDictSpec>(&f.read_full()?)?;
let mut sysdb_data = r1::dec::dict_full::<r1::map::GenericDictSpec>(&f.read_full()?)?;
// get our auth and sys stores
let mut auth_store = rkey(
&mut sysdb_data,

@ -0,0 +1,83 @@
/*
* Created on Sun Feb 18 2024
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2024, Sayan Nandan <nandansayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use {
super::super::raw::{
journal::{EventLogAdapter, EventLogSpec},
spec::SystemDatabaseV1,
},
crate::{
engine::{
core::GlobalNS,
storage::{
common_encoding::r1::impls::gns::GNSEvent, v2::raw::journal::JournalAdapterEvent,
},
txn::gns::{
model::{
AlterModelAddTxn, AlterModelRemoveTxn, AlterModelUpdateTxn, CreateModelTxn,
DropModelTxn,
},
space::{AlterSpaceTxn, CreateSpaceTxn, DropSpaceTxn},
GNSTransaction, GNSTransactionCode,
},
RuntimeResult,
},
util::compiler::TaggedEnum,
},
};
/*
GNS event log impl
*/
pub struct GNSEventLog;
impl EventLogSpec for GNSEventLog {
type Spec = SystemDatabaseV1;
type GlobalState = GlobalNS;
type EventMeta = GNSTransactionCode;
type DecodeDispatch =
[fn(&GlobalNS, Vec<u8>) -> RuntimeResult<()>; GNSTransactionCode::VARIANT_COUNT];
const DECODE_DISPATCH: Self::DecodeDispatch = [
<CreateSpaceTxn as GNSEvent>::decode_apply,
<AlterSpaceTxn as GNSEvent>::decode_apply,
<DropSpaceTxn as GNSEvent>::decode_apply,
<CreateModelTxn as GNSEvent>::decode_apply,
<AlterModelAddTxn as GNSEvent>::decode_apply,
<AlterModelRemoveTxn as GNSEvent>::decode_apply,
<AlterModelUpdateTxn as GNSEvent>::decode_apply,
<DropModelTxn as GNSEvent>::decode_apply,
];
}
impl<T: GNSEvent> JournalAdapterEvent<EventLogAdapter<GNSEventLog>> for T {
fn md(&self) -> u64 {
<T as GNSTransaction>::CODE.dscr_u64()
}
fn write_buffered(self, b: &mut Vec<u8>) {
T::encode_event(self, b)
}
}

@ -0,0 +1,25 @@
/*
* Created on Sun Feb 18 2024
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2024, Sayan Nandan <nandansayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/

@ -0,0 +1,28 @@
/*
* Created on Sun Feb 18 2024
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2024, Sayan Nandan <nandansayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
mod gns_log;
mod mdl_journal;

@ -24,4 +24,5 @@
*
*/
mod impls;
pub(in crate::engine::storage) mod raw;

@ -49,10 +49,42 @@ use {
mod raw;
#[cfg(test)]
mod tests;
pub use raw::RawJournalAdapterEvent as JournalAdapterEvent;
pub type EventLogDriver<EL, Fs> = RawJournalWriter<EventLog<EL>, Fs>;
pub struct EventLog<EL: EventLogAdapter>(PhantomData<EL>);
impl<EL: EventLogAdapter> EventLog<EL> {
/*
implementation of a blanket event log
---
1. linear
2. append-only
3. single-file
4. multi-stage integrity checked
*/
/// An event log driver
pub type EventLogDriver<EL, Fs> = RawJournalWriter<EventLogAdapter<EL>, Fs>;
/// The event log adapter
pub struct EventLogAdapter<EL: EventLogSpec>(PhantomData<EL>);
impl<EL: EventLogSpec> EventLogAdapter<EL> {
/// Open a new event log
pub fn open<Fs: FSInterface>(
name: &str,
gs: &EL::GlobalState,
) -> RuntimeResult<EventLogDriver<EL, Fs>>
where
EL::Spec: FileSpecV1<DecodeArgs = ()>,
{
raw::open_journal::<EventLogAdapter<EL>, Fs>(name, gs)
}
/// Create a new event log
pub fn create<Fs: FSInterface>(name: &str) -> RuntimeResult<EventLogDriver<EL, Fs>>
where
EL::Spec: FileSpecV1<EncodeArgs = ()>,
{
raw::create_journal::<EventLogAdapter<EL>, Fs>(name)
}
/// Close an event log
pub fn close<Fs: FSInterface>(me: &mut EventLogDriver<EL, Fs>) -> RuntimeResult<()> {
RawJournalWriter::close_driver(me)
}
@ -60,9 +92,13 @@ impl<EL: EventLogAdapter> EventLog<EL> {
type DispatchFn<G> = fn(&G, Vec<u8>) -> RuntimeResult<()>;
pub trait EventLogAdapter {
/// Specification for an event log
pub trait EventLogSpec {
/// the SDSS spec for this log
type Spec: FileSpecV1;
/// the global state for this log
type GlobalState;
/// event metadata
type EventMeta: TaggedEnum<Dscr = u8>;
type DecodeDispatch: Index<usize, Output = DispatchFn<Self::GlobalState>>;
const DECODE_DISPATCH: Self::DecodeDispatch;
@ -72,15 +108,15 @@ pub trait EventLogAdapter {
);
}
impl<EL: EventLogAdapter> RawJournalAdapter for EventLog<EL> {
impl<EL: EventLogSpec> RawJournalAdapter for EventLogAdapter<EL> {
const COMMIT_PREFERENCE: CommitPreference = {
let _ = EL::ENSURE;
CommitPreference::Direct
};
type Spec = <EL as EventLogAdapter>::Spec;
type GlobalState = <EL as EventLogAdapter>::GlobalState;
type Spec = <EL as EventLogSpec>::Spec;
type GlobalState = <EL as EventLogSpec>::GlobalState;
type Context<'a> = () where Self: 'a;
type EventMeta = <EL as EventLogAdapter>::EventMeta;
type EventMeta = <EL as EventLogSpec>::EventMeta;
fn initialize(_: &raw::JournalInitializer) -> Self {
Self(PhantomData)
}
@ -89,7 +125,7 @@ impl<EL: EventLogAdapter> RawJournalAdapter for EventLog<EL> {
) -> Self::Context<'a> {
}
fn parse_event_meta(meta: u64) -> Option<Self::EventMeta> {
<<EL as EventLogAdapter>::EventMeta as TaggedEnum>::try_from_raw(meta as u8)
<<EL as EventLogSpec>::EventMeta as TaggedEnum>::try_from_raw(meta as u8)
}
fn commit_direct<'a, Fs: FSInterface, E>(
&mut self,
@ -131,42 +167,93 @@ impl<EL: EventLogAdapter> RawJournalAdapter for EventLog<EL> {
if this_checksum.finish() != expected_checksum {
return Err(StorageError::RawJournalCorrupted.into());
}
<EL as EventLogAdapter>::DECODE_DISPATCH
[<<EL as EventLogAdapter>::EventMeta as TaggedEnum>::dscr_u64(&meta) as usize](
<EL as EventLogSpec>::DECODE_DISPATCH
[<<EL as EventLogSpec>::EventMeta as TaggedEnum>::dscr_u64(&meta) as usize](
gs, pl
)
}
}
pub type BatchJournalDriver<BA, Fs> = RawJournalWriter<BatchJournal<BA>, Fs>;
pub struct BatchJournal<BA: BatchAdapter>(PhantomData<BA>);
/*
implementation of a batch journal
---
impl<BA: BatchAdapter> BatchJournal<BA> {
pub fn close<Fs: FSInterface>(me: &mut BatchJournalDriver<BA, Fs>) -> RuntimeResult<()> {
1. linear
2. append-only
3. event batches
4. integrity checked
*/
/// Batch journal driver
pub type BatchDriver<BA, Fs> = RawJournalWriter<BatchAdapter<BA>, Fs>;
/// Batch journal adapter
pub struct BatchAdapter<BA: BatchAdapterSpec>(PhantomData<BA>);
impl<BA: BatchAdapterSpec> BatchAdapter<BA> {
/// Open a new batch journal
pub fn open<Fs: FSInterface>(
name: &str,
gs: &BA::GlobalState,
) -> RuntimeResult<BatchDriver<BA, Fs>>
where
BA::Spec: FileSpecV1<DecodeArgs = ()>,
{
raw::open_journal::<BatchAdapter<BA>, Fs>(name, gs)
}
/// Create a new batch journal
pub fn create<Fs: FSInterface>(name: &str) -> RuntimeResult<BatchDriver<BA, Fs>>
where
BA::Spec: FileSpecV1<EncodeArgs = ()>,
{
raw::create_journal::<BatchAdapter<BA>, Fs>(name)
}
/// Close a batch journal
pub fn close<Fs: FSInterface>(me: &mut BatchDriver<BA, Fs>) -> RuntimeResult<()> {
RawJournalWriter::close_driver(me)
}
}
pub trait BatchAdapter {
/// A specification for a batch journal
///
/// NB: This trait's impl is fairly complex and is going to require careful handling to get it right. Also, the event has to have
/// a specific on-disk layout: `[EXPECTED COMMIT][ANY ADDITIONAL METADATA][BATCH BODY][ACTUAL COMMIT]`
pub trait BatchAdapterSpec {
type Spec: FileSpecV1;
type GlobalState;
type BatchMeta: TaggedEnum<Dscr = u8>;
fn decode_batch<Fs: FSInterface>(
type BatchType: TaggedEnum<Dscr = u8>;
type EventType: TaggedEnum<Dscr = u8> + PartialEq;
type BatchMetadata;
type BatchState;
fn is_early_exit(event_type: &Self::EventType) -> bool;
fn initialize_batch_state(gs: &Self::GlobalState) -> Self::BatchState;
fn decode_batch_metadata<Fs: FSInterface>(
gs: &Self::GlobalState,
f: &mut TrackedReaderContext<
<<Fs as FSInterface>::File as FileInterface>::BufReader,
Self::Spec,
>,
meta: Self::BatchType,
) -> RuntimeResult<Self::BatchMetadata>;
fn update_state_for_new_event<Fs: FSInterface>(
gs: &Self::GlobalState,
bs: &mut Self::BatchState,
f: &mut TrackedReaderContext<
<<Fs as FSInterface>::File as FileInterface>::BufReader,
Self::Spec,
>,
meta: Self::BatchMeta,
batch_info: &Self::BatchMetadata,
event_type: Self::EventType,
) -> RuntimeResult<()>;
fn finish(bs: Self::BatchState, gs: &Self::GlobalState) -> RuntimeResult<()>;
}
impl<BA: BatchAdapter> RawJournalAdapter for BatchJournal<BA> {
impl<BA: BatchAdapterSpec> RawJournalAdapter for BatchAdapter<BA> {
const COMMIT_PREFERENCE: CommitPreference = CommitPreference::Direct;
type Spec = <BA as BatchAdapter>::Spec;
type GlobalState = <BA as BatchAdapter>::GlobalState;
type Context<'a> = () where BA: 'a;
type EventMeta = <BA as BatchAdapter>::BatchMeta;
type Spec = <BA as BatchAdapterSpec>::Spec;
type GlobalState = <BA as BatchAdapterSpec>::GlobalState;
type Context<'a> = () where Self: 'a;
type EventMeta = <BA as BatchAdapterSpec>::BatchType;
fn initialize(_: &raw::JournalInitializer) -> Self {
Self(PhantomData)
}
@ -175,7 +262,7 @@ impl<BA: BatchAdapter> RawJournalAdapter for BatchJournal<BA> {
) -> Self::Context<'a> {
}
fn parse_event_meta(meta: u64) -> Option<Self::EventMeta> {
<<BA as BatchAdapter>::BatchMeta as TaggedEnum>::try_from_raw(meta as u8)
<<BA as BatchAdapterSpec>::BatchType as TaggedEnum>::try_from_raw(meta as u8)
}
fn commit_direct<'a, Fs: FSInterface, E>(
&mut self,
@ -192,14 +279,51 @@ impl<BA: BatchAdapter> RawJournalAdapter for BatchJournal<BA> {
fn decode_apply<'a, Fs: FSInterface>(
gs: &Self::GlobalState,
meta: Self::EventMeta,
file: &mut TrackedReader<
<<Fs as FSInterface>::File as FileInterface>::BufReader,
Self::Spec,
>,
f: &mut TrackedReader<<<Fs as FSInterface>::File as FileInterface>::BufReader, Self::Spec>,
) -> RuntimeResult<()> {
let mut reader_ctx = file.context();
<BA as BatchAdapter>::decode_batch::<Fs>(gs, &mut reader_ctx, meta)?;
let (real_checksum, file) = reader_ctx.finish();
let mut f = f.context();
{
// get metadata
// read batch size
let _stored_expected_commit_size = u64::from_le_bytes(f.read_block()?);
// read custom metadata
let batch_md = <BA as BatchAdapterSpec>::decode_batch_metadata::<Fs>(gs, &mut f, meta)?;
// now read in every event
let mut real_commit_size = 0;
let mut batch_state = <BA as BatchAdapterSpec>::initialize_batch_state(gs);
loop {
if real_commit_size == _stored_expected_commit_size {
break;
}
let event_type = f.read_block::<1>().and_then(|b| {
<<BA as BatchAdapterSpec>::EventType as TaggedEnum>::try_from_raw(b[0])
.ok_or(StorageError::RawJournalCorrupted.into())
})?;
// is this an early exit marker? if so, exit
if <BA as BatchAdapterSpec>::is_early_exit(&event_type) {
break;
}
// update batch state
BA::update_state_for_new_event::<Fs>(
gs,
&mut batch_state,
&mut f,
&batch_md,
event_type,
)?;
real_commit_size += 1;
}
// read actual commit size
let _stored_actual_commit_size = u64::from_le_bytes(f.read_block()?);
if _stored_actual_commit_size == real_commit_size {
// finish applying batch
BA::finish(batch_state, gs)?;
} else {
return Err(StorageError::RawJournalCorrupted.into());
}
}
// and finally, verify checksum
let (real_checksum, file) = f.finish();
let stored_checksum = u64::from_le_bytes(file.read_block()?);
if real_checksum == stored_checksum {
Ok(())

@ -30,9 +30,8 @@
use {
super::{
raw::{RawJournalAdapter, RawJournalAdapterEvent},
BatchAdapter, BatchJournal, BatchJournalDriver, DispatchFn, EventLog, EventLogAdapter,
EventLogDriver,
raw::RawJournalAdapterEvent, BatchAdapter, BatchAdapterSpec, BatchDriver, DispatchFn,
EventLogAdapter, EventLogDriver, EventLogSpec,
},
crate::{
engine::{
@ -96,7 +95,7 @@ impl_test_event!(
EventClear as TestEvent::Clear,
);
impl<TE: IsTestEvent> RawJournalAdapterEvent<EventLog<TestDBAdapter>> for TE {
impl<TE: IsTestEvent> RawJournalAdapterEvent<EventLogAdapter<TestDBAdapter>> for TE {
fn md(&self) -> u64 {
Self::EVCODE.dscr_u64()
}
@ -108,7 +107,7 @@ impl<TE: IsTestEvent> RawJournalAdapterEvent<EventLog<TestDBAdapter>> for TE {
// adapter
pub struct TestDBAdapter;
impl EventLogAdapter for TestDBAdapter {
impl EventLogSpec for TestDBAdapter {
type Spec = SystemDatabaseV1;
type GlobalState = TestDB;
type EventMeta = TestEvent;
@ -177,7 +176,7 @@ impl TestDB {
fn open_log() -> (
TestDB,
super::raw::RawJournalWriter<EventLog<TestDBAdapter>, VirtualFS>,
super::raw::RawJournalWriter<EventLogAdapter<TestDBAdapter>, VirtualFS>,
) {
let db = TestDB::default();
let log = open_journal("jrnl", &db).unwrap();
@ -198,30 +197,30 @@ fn test_this_data() {
for key in DATA1 {
db.push(&mut log, key).unwrap();
}
EventLog::close(&mut log).unwrap();
EventLogAdapter::close(&mut log).unwrap();
}
{
let (db, mut log) = open_log();
assert_eq!(db._ref().as_slice(), DATA1);
db.push(&mut log, DATA2[3]).unwrap();
EventLog::close(&mut log).unwrap();
EventLogAdapter::close(&mut log).unwrap();
}
{
let (db, mut log) = open_log();
assert_eq!(db._ref().as_slice(), DATA2);
db.pop(&mut log).unwrap();
EventLog::close(&mut log).unwrap();
EventLogAdapter::close(&mut log).unwrap();
}
{
let (db, mut log) = open_log();
assert_eq!(db._ref().as_slice(), DATA3);
db.push(&mut log, DATA4[3]).unwrap();
EventLog::close(&mut log).unwrap();
EventLogAdapter::close(&mut log).unwrap();
}
{
let (db, mut log) = open_log();
assert_eq!(db._ref().as_slice(), DATA4);
EventLog::close(&mut log).unwrap();
EventLogAdapter::close(&mut log).unwrap();
}
}
@ -229,111 +228,148 @@ fn test_this_data() {
batch test
*/
struct BatchDB {
data: RefCell<BatchDBInner>,
#[derive(Debug, PartialEq, Clone, Copy, TaggedEnum)]
#[repr(u8)]
pub enum BatchType {
GenericBatch = 0,
}
struct BatchDBInner {
data: Vec<String>,
changed: usize,
last_idx: usize,
#[derive(Debug, PartialEq, Clone, Copy, TaggedEnum)]
#[repr(u8)]
pub enum BatchEventType {
Push = 0,
EarlyExit = 1,
}
pub struct BatchState {
pending_inserts: Vec<String>,
}
#[derive(Debug, Default)]
pub struct BatchDB {
inner: RefCell<BatchDBInner>,
}
impl BatchDB {
const THRESHOLD: usize = 1;
fn new() -> Self {
Self {
data: RefCell::new(BatchDBInner {
data: vec![],
changed: 0,
last_idx: 0,
}),
}
Self::default()
}
fn _mut(&self) -> RefMut<BatchDBInner> {
self.data.borrow_mut()
self.inner.borrow_mut()
}
fn _ref(&self) -> Ref<BatchDBInner> {
self.data.borrow()
self.inner.borrow()
}
/// As soon as two changes occur, we sync to disk
fn push(
&self,
log: &mut BatchJournalDriver<BatchDBAdapter, VirtualFS>,
driver: &mut BatchDriver<BatchDBAdapter, VirtualFS>,
key: &str,
) -> RuntimeResult<()> {
let mut me = self._mut();
me.data.push(key.into());
if me.changed == Self::THRESHOLD {
me.changed += 1;
log.commit_event(FlushBatch::new(&me, me.last_idx, me.changed))?;
me.changed = 0;
me.last_idx = me.data.len();
Ok(())
} else {
me.changed += 1;
Ok(())
let changed = me.data.len() - me.last_flushed_at;
if changed == 2 {
// this is the second change about to happen, so flush it!
driver.commit_event(BatchDBFlush(&me, me.data.len()))?;
me.last_flushed_at = me.data.len();
}
Ok(())
}
}
struct BatchDBAdapter;
#[derive(Debug, Clone, Copy, TaggedEnum, PartialEq)]
#[repr(u8)]
enum BatchEvent {
NewBatch = 0,
#[derive(Debug, Default)]
struct BatchDBInner {
data: Vec<String>,
last_flushed_at: usize,
}
impl BatchAdapter for BatchDBAdapter {
type Spec = ModelDataBatchAofV1;
type GlobalState = BatchDB;
type BatchMeta = BatchEvent;
fn decode_batch<Fs: FSInterface>(
gs: &Self::GlobalState,
f: &mut TrackedReaderContext<
<<Fs as FSInterface>::File as FileInterface>::BufReader,
Self::Spec,
struct BatchDBFlush<'a>(&'a BatchDBInner, usize);
impl<'a> RawJournalAdapterEvent<BatchAdapter<BatchDBAdapter>> for BatchDBFlush<'a> {
fn md(&self) -> u64 {
BatchType::GenericBatch.dscr_u64()
}
fn write_direct<Fs: FSInterface>(
self,
f: &mut TrackedWriter<
Fs::File,
<BatchAdapter<BatchDBAdapter> as super::raw::RawJournalAdapter>::Spec,
>,
meta: Self::BatchMeta,
) -> RuntimeResult<()> {
let mut gs = gs._mut();
assert_eq!(meta, BatchEvent::NewBatch);
let mut batch_size = u64::from_le_bytes(f.read_block()?);
while batch_size != 0 {
let keylen = u64::from_le_bytes(f.read_block()?);
let mut key = vec![0; keylen as usize];
f.read(&mut key)?;
gs.data.push(String::from_utf8(key).unwrap());
gs.last_idx += 1;
batch_size -= 1;
// write: [expected commit][body][actual commit]
// for this dummy impl, we're expecting to write the full dataset but we're going to actually write the part
// that has actually changed, enabling us to test the underlying impl
let expected_commit = self.1 as u64;
f.dtrack_write(&expected_commit.to_le_bytes())?;
// now write all the keys
let change_cnt = self.1 - self.0.last_flushed_at;
let actual = &self.0.data[self.0.last_flushed_at..self.0.last_flushed_at + change_cnt];
for key in actual {
f.dtrack_write(&[BatchEventType::Push.dscr()])?;
f.dtrack_write(&(key.len() as u64).to_le_bytes())?;
f.dtrack_write(key.as_bytes())?;
}
// did we do something at all?
if self.1 != actual.len() {
// early exit!
f.dtrack_write(&[BatchEventType::EarlyExit.dscr()])?;
}
// actual commit
f.dtrack_write(&(actual.len() as u64).to_le_bytes())?;
Ok(())
}
}
struct FlushBatch<'a> {
data: &'a BatchDBInner,
start: usize,
cnt: usize,
}
impl<'a> FlushBatch<'a> {
fn new(data: &'a BatchDBInner, start: usize, cnt: usize) -> Self {
Self { data, start, cnt }
pub struct BatchDBAdapter;
impl BatchAdapterSpec for BatchDBAdapter {
type Spec = ModelDataBatchAofV1;
type GlobalState = BatchDB;
type BatchType = BatchType;
type EventType = BatchEventType;
type BatchMetadata = ();
type BatchState = BatchState;
fn initialize_batch_state(_: &Self::GlobalState) -> Self::BatchState {
BatchState {
pending_inserts: vec![],
}
}
}
impl<'a> RawJournalAdapterEvent<BatchJournal<BatchDBAdapter>> for FlushBatch<'a> {
fn md(&self) -> u64 {
BatchEvent::NewBatch.dscr_u64()
fn is_early_exit(ev: &Self::EventType) -> bool {
BatchEventType::EarlyExit.eq(ev)
}
fn write_direct<Fs: FSInterface>(
self,
w: &mut TrackedWriter<Fs::File, <BatchJournal<BatchDBAdapter> as RawJournalAdapter>::Spec>,
fn decode_batch_metadata<Fs: FSInterface>(
_: &Self::GlobalState,
_: &mut TrackedReaderContext<
<<Fs as FSInterface>::File as FileInterface>::BufReader,
Self::Spec,
>,
_: Self::BatchType,
) -> RuntimeResult<Self::BatchMetadata> {
Ok(())
}
fn update_state_for_new_event<Fs: FSInterface>(
_: &Self::GlobalState,
bs: &mut Self::BatchState,
f: &mut TrackedReaderContext<
<<Fs as FSInterface>::File as FileInterface>::BufReader,
Self::Spec,
>,
_: &Self::BatchMetadata,
event_type: Self::EventType,
) -> RuntimeResult<()> {
// length
w.dtrack_write(&(self.cnt as u64).to_le_bytes())?;
// now write all the new keys
for key in &self.data.data[self.start..self.start + self.cnt] {
w.dtrack_write(&(key.len() as u64).to_le_bytes())?;
w.dtrack_write(key.as_bytes())?;
match event_type {
BatchEventType::EarlyExit => unreachable!(),
BatchEventType::Push => {}
}
let key_len = u64::from_le_bytes(f.read_block()?);
let mut key = vec![0; key_len as usize];
f.read(&mut key)?;
bs.pending_inserts.push(String::from_utf8(key).unwrap());
Ok(())
}
fn finish(bs: Self::BatchState, gs: &Self::GlobalState) -> RuntimeResult<()> {
for event in bs.pending_inserts {
gs._mut().data.push(event);
gs._mut().last_flushed_at += 1;
}
Ok(())
}
@ -342,31 +378,39 @@ impl<'a> RawJournalAdapterEvent<BatchJournal<BatchDBAdapter>> for FlushBatch<'a>
#[test]
fn batch_simple() {
{
let mut log = create_journal::<_, VirtualFS>("batch_jrnl").unwrap();
let mut batch_drv = BatchAdapter::create("mybatch").unwrap();
let db = BatchDB::new();
db.push(&mut log, "a").unwrap();
db.push(&mut log, "b").unwrap();
BatchJournal::close(&mut log).unwrap();
db.push(&mut batch_drv, "key1").unwrap();
db.push(&mut batch_drv, "key2").unwrap();
BatchAdapter::close(&mut batch_drv).unwrap();
}
{
let db = BatchDB::new();
let mut log = open_journal::<_, VirtualFS>("batch_jrnl", &db).unwrap();
db.push(&mut log, "c").unwrap();
db.push(&mut log, "d").unwrap();
BatchJournal::close(&mut log).unwrap();
let mut batch_drv = BatchAdapter::open("mybatch", &db).unwrap();
db.push(&mut batch_drv, "key3").unwrap();
db.push(&mut batch_drv, "key4").unwrap();
assert_eq!(db._ref().data, ["key1", "key2", "key3", "key4"]);
BatchAdapter::close(&mut batch_drv).unwrap();
}
{
let db = BatchDB::new();
let mut log = open_journal::<_, VirtualFS>("batch_jrnl", &db).unwrap();
db.push(&mut log, "e").unwrap();
db.push(&mut log, "f").unwrap();
BatchJournal::close(&mut log).unwrap();
let mut batch_drv = BatchAdapter::open("mybatch", &db).unwrap();
db.push(&mut batch_drv, "key5").unwrap();
db.push(&mut batch_drv, "key6").unwrap();
assert_eq!(
db._ref().data,
["key1", "key2", "key3", "key4", "key5", "key6"]
);
BatchAdapter::close(&mut batch_drv).unwrap();
}
{
let db = BatchDB::new();
let mut log =
open_journal::<BatchJournal<BatchDBAdapter>, VirtualFS>("batch_jrnl", &db).unwrap();
assert_eq!(db._ref().data, ["a", "b", "c", "d", "e", "f"]);
BatchJournal::close(&mut log).unwrap();
let mut batch_drv =
BatchAdapter::<BatchDBAdapter>::open::<VirtualFS>("mybatch", &db).unwrap();
assert_eq!(
db._ref().data,
["key1", "key2", "key3", "key4", "key5", "key6"]
);
BatchAdapter::close(&mut batch_drv).unwrap();
}
}

@ -24,5 +24,28 @@
*
*/
macro_rules! impl_gns_event {
($($item:ty = $variant:ident),* $(,)?) => {
$(impl crate::engine::txn::gns::GNSTransaction for $item { const CODE: crate::engine::txn::gns::GNSTransactionCode = crate::engine::txn::gns::GNSTransactionCode::$variant;})*
}
}
pub mod model;
pub mod space;
#[derive(Debug, PartialEq, Clone, Copy, sky_macros::TaggedEnum)]
#[repr(u8)]
pub enum GNSTransactionCode {
CreateSpace = 0,
AlterSpace = 1,
DropSpace = 2,
CreateModel = 3,
AlterModelAdd = 4,
AlterModelRemove = 5,
AlterModelUpdate = 6,
DropModel = 7,
}
pub trait GNSTransaction {
const CODE: GNSTransactionCode;
}

@ -31,6 +31,14 @@ use crate::engine::{
txn::{ModelIDRef, SpaceIDRef},
};
impl_gns_event!(
CreateModelTxn<'_> = CreateModel,
AlterModelAddTxn<'_> = AlterModelAdd,
AlterModelRemoveTxn<'_> = AlterModelRemove,
AlterModelUpdateTxn<'_> = AlterModelUpdate,
DropModelTxn<'_> = DropModel
);
#[derive(Debug, Clone, Copy)]
/// The commit payload for a `create model ... (...) with {...}` txn
pub struct CreateModelTxn<'a> {

@ -26,6 +26,8 @@
use crate::engine::{core::space::Space, data::DictGeneric, txn::SpaceIDRef};
impl_gns_event!(CreateSpaceTxn<'_> = CreateSpace, AlterSpaceTxn<'_> = AlterSpace, DropSpaceTxn<'_> = DropSpace);
#[derive(Clone, Copy)]
/// Transaction commit payload for a `create space ...` query
pub struct CreateSpaceTxn<'a> {

Loading…
Cancel
Save