Simplify gns event impls

next
Sayan Nandan 1 year ago
parent 7eff973b9e
commit b78824d6d0
No known key found for this signature in database
GPG Key ID: 42EEDF4AE9D96B54

@ -55,7 +55,7 @@ pub struct GlobalNS {
}
impl GlobalNS {
pub(self) fn spaces(&self) -> &RWLIdx<Box<str>, Space> {
pub fn spaces(&self) -> &RWLIdx<Box<str>, Space> {
&self.index_space
}
pub fn empty() -> Self {

@ -45,7 +45,7 @@ use {
};
#[derive(Debug, PartialEq, Eq, Clone, Copy, PartialOrd, Ord)]
pub struct MapIndexSizeMD(pub(super) usize);
pub struct MapIndexSizeMD(pub usize);
/// This is more of a lazy hack than anything sensible. Just implement a spec and then use this wrapper for any enc/dec operations
pub struct PersistMapImpl<'a, M: PersistMapSpec>(PhantomData<&'a M::MapType>);

@ -1,149 +0,0 @@
/*
* Created on Sun Aug 20 2023
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2023, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use {
super::{TransactionError, TransactionResult},
crate::{
engine::{
core::{space::Space, GlobalNS},
data::DictGeneric,
storage::v1::{
inf::{obj, PersistObject},
JournalAdapter, SDSSError,
},
},
util::EndianQW,
},
std::marker::PhantomData,
};
/*
journal implementor
*/
/// the journal adapter for DDL queries on the GNS
pub struct GNSAdapter;
impl JournalAdapter for GNSAdapter {
const RECOVERY_PLUGIN: bool = true;
type JournalEvent = GNSSuperEvent;
type GlobalState = GlobalNS;
type Error = TransactionError;
fn encode(GNSSuperEvent(b): Self::JournalEvent) -> Box<[u8]> {
b
}
fn decode_and_update_state(_: &[u8], _: &Self::GlobalState) -> TransactionResult<()> {
todo!()
}
}
/*
Events
---
FIXME(@ohsayan): In the current impl, we unnecessarily use an intermediary buffer which we clearly don't need to (and also makes
pointless allocations). We need to fix this, but with a consistent API (and preferably not something like commit_*(...) unless
we have absolutely no other choice)
---
[OPC:2B][PAYLOAD]
*/
pub struct GNSSuperEvent(Box<[u8]>);
pub trait GNSEvent: PersistObject {
const OPC: u16;
type InputItem;
}
/*
create space
*/
pub struct CreateSpaceTxn<'a>(PhantomData<&'a ()>);
#[derive(Clone, Copy)]
pub struct CreateSpaceTxnCommitPL<'a> {
space_meta: &'a DictGeneric,
space_name: &'a str,
space: &'a Space,
}
pub struct CreateSpaceTxnRestorePL {
space_name: Box<str>,
space: Space,
}
pub struct CreateSpaceTxnMD {
space_name_l: u64,
space_meta: <obj::SpaceLayoutRef<'static> as PersistObject>::Metadata,
}
impl<'a> GNSEvent for CreateSpaceTxn<'a> {
const OPC: u16 = 0;
type InputItem = CreateSpaceTxnCommitPL<'a>;
}
impl<'a> PersistObject for CreateSpaceTxn<'a> {
const METADATA_SIZE: usize =
<obj::SpaceLayoutRef<'static> as PersistObject>::METADATA_SIZE + sizeof!(u64);
type InputType = CreateSpaceTxnCommitPL<'a>;
type OutputType = CreateSpaceTxnRestorePL;
type Metadata = CreateSpaceTxnMD;
fn pretest_can_dec_object(
scanner: &crate::engine::storage::v1::BufferedScanner,
md: &Self::Metadata,
) -> bool {
scanner.has_left(md.space_name_l as usize)
}
fn meta_enc(buf: &mut Vec<u8>, data: Self::InputType) {
buf.extend(data.space_name.len().u64_bytes_le());
<obj::SpaceLayoutRef<'a> as PersistObject>::meta_enc(
buf,
obj::SpaceLayoutRef::from((data.space, data.space_meta)),
);
}
unsafe fn meta_dec(
scanner: &mut crate::engine::storage::v1::BufferedScanner,
) -> crate::engine::storage::v1::SDSSResult<Self::Metadata> {
let space_name_l = u64::from_le_bytes(scanner.next_chunk());
let space_meta = <obj::SpaceLayoutRef as PersistObject>::meta_dec(scanner)?;
Ok(CreateSpaceTxnMD {
space_name_l,
space_meta,
})
}
fn obj_enc(buf: &mut Vec<u8>, data: Self::InputType) {
buf.extend(data.space_name.as_bytes());
<obj::SpaceLayoutRef as PersistObject>::meta_enc(buf, (data.space, data.space_meta).into());
}
unsafe fn obj_dec(
s: &mut crate::engine::storage::v1::BufferedScanner,
md: Self::Metadata,
) -> crate::engine::storage::v1::SDSSResult<Self::OutputType> {
let space_name =
String::from_utf8(s.next_chunk_variable(md.space_name_l as usize).to_owned())
.map_err(|_| SDSSError::InternalDecodeStructureCorruptedPayload)?
.into_boxed_str();
let space = <obj::SpaceLayoutRef as PersistObject>::obj_dec(s, md.space_meta)?;
Ok(CreateSpaceTxnRestorePL { space_name, space })
}
}

@ -0,0 +1,90 @@
/*
* Created on Sun Aug 20 2023
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2023, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use crate::engine::storage::v1::BufferedScanner;
use {
super::{TransactionError, TransactionResult},
crate::engine::{
core::GlobalNS,
storage::v1::{
inf::{self, PersistObject},
JournalAdapter,
},
},
};
mod space;
/*
journal implementor
*/
/// the journal adapter for DDL queries on the GNS
pub struct GNSAdapter;
impl JournalAdapter for GNSAdapter {
const RECOVERY_PLUGIN: bool = true;
type JournalEvent = GNSSuperEvent;
type GlobalState = GlobalNS;
type Error = TransactionError;
fn encode(GNSSuperEvent(b): Self::JournalEvent) -> Box<[u8]> {
b
}
fn decode_and_update_state(_: &[u8], _: &Self::GlobalState) -> TransactionResult<()> {
todo!()
}
}
/*
Events
---
FIXME(@ohsayan): In the current impl, we unnecessarily use an intermediary buffer which we clearly don't need to (and also makes
pointless allocations). We need to fix this, but with a consistent API (and preferably not something like commit_*(...) unless
we have absolutely no other choice)
---
[OPC:2B][PAYLOAD]
*/
pub struct GNSSuperEvent(Box<[u8]>);
pub trait GNSEvent
where
Self: PersistObject<InputType = Self::CommitType, OutputType = Self::RestoreType> + Sized,
{
const OPC: u16;
type CommitType;
type RestoreType;
fn encode_super_event(commit: Self::CommitType) -> GNSSuperEvent {
GNSSuperEvent(inf::enc::enc_full::<Self>(commit).into_boxed_slice())
}
fn decode_from_super_event(
scanner: &mut BufferedScanner,
) -> TransactionResult<Self::RestoreType> {
inf::dec::dec_full_from_scanner::<Self>(scanner).map_err(|e| e.into())
}
fn update_global_state(restore: Self::RestoreType, gns: &GlobalNS) -> TransactionResult<()>;
}

@ -0,0 +1,260 @@
/*
* Created on Wed Aug 23 2023
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2023, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use crate::engine::{idx::STIndex, txn::TransactionError};
use {
super::GNSEvent,
crate::{
engine::{
core::space::Space,
data::{uuid::Uuid, DictGeneric},
storage::v1::{
inf::{map, obj, PersistObject},
SDSSError,
},
},
util::EndianQW,
},
std::marker::PhantomData,
};
/*
create space
*/
pub struct CreateSpaceTxn<'a>(PhantomData<&'a ()>);
#[derive(Clone, Copy)]
pub struct CreateSpaceTxnCommitPL<'a> {
pub(crate) space_meta: &'a DictGeneric,
pub(crate) space_name: &'a str,
pub(crate) space: &'a Space,
}
pub struct CreateSpaceTxnRestorePL {
pub(crate) space_name: Box<str>,
pub(crate) space: Space,
}
pub struct CreateSpaceTxnMD {
pub(crate) space_name_l: u64,
pub(crate) space_meta: <obj::SpaceLayoutRef<'static> as PersistObject>::Metadata,
}
impl<'a> PersistObject for CreateSpaceTxn<'a> {
const METADATA_SIZE: usize =
<obj::SpaceLayoutRef<'static> as PersistObject>::METADATA_SIZE + sizeof!(u64);
type InputType = CreateSpaceTxnCommitPL<'a>;
type OutputType = CreateSpaceTxnRestorePL;
type Metadata = CreateSpaceTxnMD;
fn pretest_can_dec_object(
scanner: &crate::engine::storage::v1::BufferedScanner,
md: &Self::Metadata,
) -> bool {
scanner.has_left(md.space_name_l as usize)
}
fn meta_enc(buf: &mut Vec<u8>, data: Self::InputType) {
buf.extend(data.space_name.len().u64_bytes_le());
<obj::SpaceLayoutRef<'a> as PersistObject>::meta_enc(
buf,
obj::SpaceLayoutRef::from((data.space, data.space_meta)),
);
}
unsafe fn meta_dec(
scanner: &mut crate::engine::storage::v1::BufferedScanner,
) -> crate::engine::storage::v1::SDSSResult<Self::Metadata> {
let space_name_l = u64::from_le_bytes(scanner.next_chunk());
let space_meta = <obj::SpaceLayoutRef as PersistObject>::meta_dec(scanner)?;
Ok(CreateSpaceTxnMD {
space_name_l,
space_meta,
})
}
fn obj_enc(buf: &mut Vec<u8>, data: Self::InputType) {
buf.extend(data.space_name.as_bytes());
<obj::SpaceLayoutRef as PersistObject>::meta_enc(buf, (data.space, data.space_meta).into());
}
unsafe fn obj_dec(
s: &mut crate::engine::storage::v1::BufferedScanner,
md: Self::Metadata,
) -> crate::engine::storage::v1::SDSSResult<Self::OutputType> {
let space_name =
String::from_utf8(s.next_chunk_variable(md.space_name_l as usize).to_owned())
.map_err(|_| SDSSError::InternalDecodeStructureCorruptedPayload)?
.into_boxed_str();
let space = <obj::SpaceLayoutRef as PersistObject>::obj_dec(s, md.space_meta)?;
Ok(CreateSpaceTxnRestorePL { space_name, space })
}
}
impl<'a> GNSEvent for CreateSpaceTxn<'a> {
const OPC: u16 = 0;
type CommitType = CreateSpaceTxnCommitPL<'a>;
type RestoreType = CreateSpaceTxnRestorePL;
fn update_global_state(
CreateSpaceTxnRestorePL { space_name, space }: CreateSpaceTxnRestorePL,
gns: &crate::engine::core::GlobalNS,
) -> crate::engine::txn::TransactionResult<()> {
let mut wgns = gns.spaces().write();
if wgns.st_insert(space_name, space) {
Ok(())
} else {
Err(TransactionError::OnRestoreDataConflictAlreadyExists)
}
}
}
/*
alter space
---
for now dump the entire meta
*/
pub struct AlterSpaceTxn<'a>(PhantomData<&'a ()>);
pub struct AlterSpaceTxnMD {
uuid: Uuid,
space_name_l: u64,
dict_len: u64,
}
#[derive(Clone, Copy)]
pub struct AlterSpaceTxnCommitPL<'a> {
space_uuid: Uuid,
space_name: &'a str,
space_meta: &'a DictGeneric,
}
pub struct AlterSpaceTxnRestorePL {
space_name: Box<str>,
space_meta: DictGeneric,
}
impl<'a> PersistObject for AlterSpaceTxn<'a> {
const METADATA_SIZE: usize = sizeof!(u64, 2) + sizeof!(u128);
type InputType = AlterSpaceTxnCommitPL<'a>;
type OutputType = AlterSpaceTxnRestorePL;
type Metadata = AlterSpaceTxnMD;
fn pretest_can_dec_object(
scanner: &crate::engine::storage::v1::BufferedScanner,
md: &Self::Metadata,
) -> bool {
scanner.has_left(md.space_name_l as usize)
}
fn meta_enc(buf: &mut Vec<u8>, data: Self::InputType) {
buf.extend(data.space_uuid.to_le_bytes());
buf.extend(data.space_name.len().u64_bytes_le());
buf.extend(data.space_meta.len().u64_bytes_le());
}
unsafe fn meta_dec(
scanner: &mut crate::engine::storage::v1::BufferedScanner,
) -> crate::engine::storage::v1::SDSSResult<Self::Metadata> {
Ok(AlterSpaceTxnMD {
uuid: Uuid::from_bytes(scanner.next_chunk()),
space_name_l: u64::from_le_bytes(scanner.next_chunk()),
dict_len: u64::from_le_bytes(scanner.next_chunk()),
})
}
fn obj_enc(buf: &mut Vec<u8>, data: Self::InputType) {
buf.extend(data.space_name.as_bytes());
<map::PersistMapImpl<map::GenericDictSpec> as PersistObject>::obj_enc(buf, data.space_meta);
}
unsafe fn obj_dec(
s: &mut crate::engine::storage::v1::BufferedScanner,
md: Self::Metadata,
) -> crate::engine::storage::v1::SDSSResult<Self::OutputType> {
let space_name =
String::from_utf8(s.next_chunk_variable(md.space_name_l as usize).to_owned())
.map_err(|_| SDSSError::InternalDecodeStructureCorruptedPayload)?
.into_boxed_str();
let space_meta = <map::PersistMapImpl<map::GenericDictSpec> as PersistObject>::obj_dec(
s,
map::MapIndexSizeMD(md.dict_len as usize),
)?;
Ok(AlterSpaceTxnRestorePL {
space_name,
space_meta,
})
}
}
/*
drop space
*/
pub struct DropSpace<'a>(PhantomData<&'a ()>);
pub struct DropSpaceTxnMD {
space_name_l: u64,
uuid: Uuid,
}
#[derive(Clone, Copy)]
pub struct DropSpaceTxnCommitPL<'a> {
space_name: &'a str,
uuid: Uuid,
}
pub struct DropSpaceTxnRestorePL {
uuid: Uuid,
space_name: Box<str>,
}
impl<'a> PersistObject for DropSpace<'a> {
const METADATA_SIZE: usize = sizeof!(u128) + sizeof!(u64);
type InputType = DropSpaceTxnCommitPL<'a>;
type OutputType = DropSpaceTxnRestorePL;
type Metadata = DropSpaceTxnMD;
fn pretest_can_dec_object(
scanner: &crate::engine::storage::v1::BufferedScanner,
md: &Self::Metadata,
) -> bool {
scanner.has_left(md.space_name_l as usize)
}
fn meta_enc(buf: &mut Vec<u8>, data: Self::InputType) {
buf.extend(data.space_name.len().u64_bytes_le());
buf.extend(data.uuid.to_le_bytes());
}
unsafe fn meta_dec(
scanner: &mut crate::engine::storage::v1::BufferedScanner,
) -> crate::engine::storage::v1::SDSSResult<Self::Metadata> {
Ok(DropSpaceTxnMD {
space_name_l: u64::from_le_bytes(scanner.next_chunk()),
uuid: Uuid::from_bytes(scanner.next_chunk()),
})
}
fn obj_enc(buf: &mut Vec<u8>, data: Self::InputType) {
buf.extend(data.space_name.as_bytes());
}
unsafe fn obj_dec(
s: &mut crate::engine::storage::v1::BufferedScanner,
md: Self::Metadata,
) -> crate::engine::storage::v1::SDSSResult<Self::OutputType> {
let space_name =
String::from_utf8(s.next_chunk_variable(md.space_name_l as usize).to_owned())
.map_err(|_| SDSSError::InternalDecodeStructureCorruptedPayload)?
.into_boxed_str();
Ok(DropSpaceTxnRestorePL {
uuid: md.uuid,
space_name,
})
}
}

@ -33,6 +33,9 @@ pub type TransactionResult<T> = Result<T, TransactionError>;
#[cfg_attr(test, derive(PartialEq))]
pub enum TransactionError {
SDSSError(SDSSError),
/// While restoring a certain item, a non-resolvable conflict was encountered in the global state, because the item was
/// already present (when it was expected to not be present)
OnRestoreDataConflictAlreadyExists,
}
direct_from! {

Loading…
Cancel
Save