diff --git a/server/src/engine/core/mod.rs b/server/src/engine/core/mod.rs index 7e036e22..b9fca120 100644 --- a/server/src/engine/core/mod.rs +++ b/server/src/engine/core/mod.rs @@ -55,7 +55,7 @@ pub struct GlobalNS { } impl GlobalNS { - pub(self) fn spaces(&self) -> &RWLIdx, Space> { + pub fn spaces(&self) -> &RWLIdx, Space> { &self.index_space } pub fn empty() -> Self { diff --git a/server/src/engine/storage/v1/inf/map.rs b/server/src/engine/storage/v1/inf/map.rs index 5ea9e05e..6cc9c1aa 100644 --- a/server/src/engine/storage/v1/inf/map.rs +++ b/server/src/engine/storage/v1/inf/map.rs @@ -45,7 +45,7 @@ use { }; #[derive(Debug, PartialEq, Eq, Clone, Copy, PartialOrd, Ord)] -pub struct MapIndexSizeMD(pub(super) usize); +pub struct MapIndexSizeMD(pub usize); /// This is more of a lazy hack than anything sensible. Just implement a spec and then use this wrapper for any enc/dec operations pub struct PersistMapImpl<'a, M: PersistMapSpec>(PhantomData<&'a M::MapType>); diff --git a/server/src/engine/txn/gns.rs b/server/src/engine/txn/gns.rs deleted file mode 100644 index ce0d1cd8..00000000 --- a/server/src/engine/txn/gns.rs +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Created on Sun Aug 20 2023 - * - * This file is a part of Skytable - * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source - * NoSQL database written by Sayan Nandan ("the Author") with the - * vision to provide flexibility in data modelling without compromising - * on performance, queryability or scalability. - * - * Copyright (c) 2023, Sayan Nandan - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * -*/ - -use { - super::{TransactionError, TransactionResult}, - crate::{ - engine::{ - core::{space::Space, GlobalNS}, - data::DictGeneric, - storage::v1::{ - inf::{obj, PersistObject}, - JournalAdapter, SDSSError, - }, - }, - util::EndianQW, - }, - std::marker::PhantomData, -}; - -/* - journal implementor -*/ - -/// the journal adapter for DDL queries on the GNS -pub struct GNSAdapter; - -impl JournalAdapter for GNSAdapter { - const RECOVERY_PLUGIN: bool = true; - type JournalEvent = GNSSuperEvent; - type GlobalState = GlobalNS; - type Error = TransactionError; - fn encode(GNSSuperEvent(b): Self::JournalEvent) -> Box<[u8]> { - b - } - fn decode_and_update_state(_: &[u8], _: &Self::GlobalState) -> TransactionResult<()> { - todo!() - } -} - -/* - Events - --- - FIXME(@ohsayan): In the current impl, we unnecessarily use an intermediary buffer which we clearly don't need to (and also makes - pointless allocations). We need to fix this, but with a consistent API (and preferably not something like commit_*(...) unless - we have absolutely no other choice) - --- - [OPC:2B][PAYLOAD] -*/ - -pub struct GNSSuperEvent(Box<[u8]>); - -pub trait GNSEvent: PersistObject { - const OPC: u16; - type InputItem; -} - -/* - create space -*/ - -pub struct CreateSpaceTxn<'a>(PhantomData<&'a ()>); -#[derive(Clone, Copy)] -pub struct CreateSpaceTxnCommitPL<'a> { - space_meta: &'a DictGeneric, - space_name: &'a str, - space: &'a Space, -} -pub struct CreateSpaceTxnRestorePL { - space_name: Box, - space: Space, -} -pub struct CreateSpaceTxnMD { - space_name_l: u64, - space_meta: as PersistObject>::Metadata, -} - -impl<'a> GNSEvent for CreateSpaceTxn<'a> { - const OPC: u16 = 0; - type InputItem = CreateSpaceTxnCommitPL<'a>; -} - -impl<'a> PersistObject for CreateSpaceTxn<'a> { - const METADATA_SIZE: usize = - as PersistObject>::METADATA_SIZE + sizeof!(u64); - type InputType = CreateSpaceTxnCommitPL<'a>; - type OutputType = CreateSpaceTxnRestorePL; - type Metadata = CreateSpaceTxnMD; - fn pretest_can_dec_object( - scanner: &crate::engine::storage::v1::BufferedScanner, - md: &Self::Metadata, - ) -> bool { - scanner.has_left(md.space_name_l as usize) - } - fn meta_enc(buf: &mut Vec, data: Self::InputType) { - buf.extend(data.space_name.len().u64_bytes_le()); - as PersistObject>::meta_enc( - buf, - obj::SpaceLayoutRef::from((data.space, data.space_meta)), - ); - } - unsafe fn meta_dec( - scanner: &mut crate::engine::storage::v1::BufferedScanner, - ) -> crate::engine::storage::v1::SDSSResult { - let space_name_l = u64::from_le_bytes(scanner.next_chunk()); - let space_meta = ::meta_dec(scanner)?; - Ok(CreateSpaceTxnMD { - space_name_l, - space_meta, - }) - } - fn obj_enc(buf: &mut Vec, data: Self::InputType) { - buf.extend(data.space_name.as_bytes()); - ::meta_enc(buf, (data.space, data.space_meta).into()); - } - unsafe fn obj_dec( - s: &mut crate::engine::storage::v1::BufferedScanner, - md: Self::Metadata, - ) -> crate::engine::storage::v1::SDSSResult { - let space_name = - String::from_utf8(s.next_chunk_variable(md.space_name_l as usize).to_owned()) - .map_err(|_| SDSSError::InternalDecodeStructureCorruptedPayload)? - .into_boxed_str(); - let space = ::obj_dec(s, md.space_meta)?; - Ok(CreateSpaceTxnRestorePL { space_name, space }) - } -} diff --git a/server/src/engine/txn/gns/mod.rs b/server/src/engine/txn/gns/mod.rs new file mode 100644 index 00000000..a3d7135a --- /dev/null +++ b/server/src/engine/txn/gns/mod.rs @@ -0,0 +1,90 @@ +/* + * Created on Sun Aug 20 2023 + * + * This file is a part of Skytable + * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source + * NoSQL database written by Sayan Nandan ("the Author") with the + * vision to provide flexibility in data modelling without compromising + * on performance, queryability or scalability. + * + * Copyright (c) 2023, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +use crate::engine::storage::v1::BufferedScanner; + +use { + super::{TransactionError, TransactionResult}, + crate::engine::{ + core::GlobalNS, + storage::v1::{ + inf::{self, PersistObject}, + JournalAdapter, + }, + }, +}; + +mod space; + +/* + journal implementor +*/ + +/// the journal adapter for DDL queries on the GNS +pub struct GNSAdapter; + +impl JournalAdapter for GNSAdapter { + const RECOVERY_PLUGIN: bool = true; + type JournalEvent = GNSSuperEvent; + type GlobalState = GlobalNS; + type Error = TransactionError; + fn encode(GNSSuperEvent(b): Self::JournalEvent) -> Box<[u8]> { + b + } + fn decode_and_update_state(_: &[u8], _: &Self::GlobalState) -> TransactionResult<()> { + todo!() + } +} + +/* + Events + --- + FIXME(@ohsayan): In the current impl, we unnecessarily use an intermediary buffer which we clearly don't need to (and also makes + pointless allocations). We need to fix this, but with a consistent API (and preferably not something like commit_*(...) unless + we have absolutely no other choice) + --- + [OPC:2B][PAYLOAD] +*/ + +pub struct GNSSuperEvent(Box<[u8]>); + +pub trait GNSEvent +where + Self: PersistObject + Sized, +{ + const OPC: u16; + type CommitType; + type RestoreType; + fn encode_super_event(commit: Self::CommitType) -> GNSSuperEvent { + GNSSuperEvent(inf::enc::enc_full::(commit).into_boxed_slice()) + } + fn decode_from_super_event( + scanner: &mut BufferedScanner, + ) -> TransactionResult { + inf::dec::dec_full_from_scanner::(scanner).map_err(|e| e.into()) + } + fn update_global_state(restore: Self::RestoreType, gns: &GlobalNS) -> TransactionResult<()>; +} diff --git a/server/src/engine/txn/gns/space.rs b/server/src/engine/txn/gns/space.rs new file mode 100644 index 00000000..6dd39694 --- /dev/null +++ b/server/src/engine/txn/gns/space.rs @@ -0,0 +1,260 @@ +/* + * Created on Wed Aug 23 2023 + * + * This file is a part of Skytable + * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source + * NoSQL database written by Sayan Nandan ("the Author") with the + * vision to provide flexibility in data modelling without compromising + * on performance, queryability or scalability. + * + * Copyright (c) 2023, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +use crate::engine::{idx::STIndex, txn::TransactionError}; + +use { + super::GNSEvent, + crate::{ + engine::{ + core::space::Space, + data::{uuid::Uuid, DictGeneric}, + storage::v1::{ + inf::{map, obj, PersistObject}, + SDSSError, + }, + }, + util::EndianQW, + }, + std::marker::PhantomData, +}; + +/* + create space +*/ + +pub struct CreateSpaceTxn<'a>(PhantomData<&'a ()>); + +#[derive(Clone, Copy)] +pub struct CreateSpaceTxnCommitPL<'a> { + pub(crate) space_meta: &'a DictGeneric, + pub(crate) space_name: &'a str, + pub(crate) space: &'a Space, +} + +pub struct CreateSpaceTxnRestorePL { + pub(crate) space_name: Box, + pub(crate) space: Space, +} + +pub struct CreateSpaceTxnMD { + pub(crate) space_name_l: u64, + pub(crate) space_meta: as PersistObject>::Metadata, +} + +impl<'a> PersistObject for CreateSpaceTxn<'a> { + const METADATA_SIZE: usize = + as PersistObject>::METADATA_SIZE + sizeof!(u64); + type InputType = CreateSpaceTxnCommitPL<'a>; + type OutputType = CreateSpaceTxnRestorePL; + type Metadata = CreateSpaceTxnMD; + fn pretest_can_dec_object( + scanner: &crate::engine::storage::v1::BufferedScanner, + md: &Self::Metadata, + ) -> bool { + scanner.has_left(md.space_name_l as usize) + } + fn meta_enc(buf: &mut Vec, data: Self::InputType) { + buf.extend(data.space_name.len().u64_bytes_le()); + as PersistObject>::meta_enc( + buf, + obj::SpaceLayoutRef::from((data.space, data.space_meta)), + ); + } + unsafe fn meta_dec( + scanner: &mut crate::engine::storage::v1::BufferedScanner, + ) -> crate::engine::storage::v1::SDSSResult { + let space_name_l = u64::from_le_bytes(scanner.next_chunk()); + let space_meta = ::meta_dec(scanner)?; + Ok(CreateSpaceTxnMD { + space_name_l, + space_meta, + }) + } + fn obj_enc(buf: &mut Vec, data: Self::InputType) { + buf.extend(data.space_name.as_bytes()); + ::meta_enc(buf, (data.space, data.space_meta).into()); + } + unsafe fn obj_dec( + s: &mut crate::engine::storage::v1::BufferedScanner, + md: Self::Metadata, + ) -> crate::engine::storage::v1::SDSSResult { + let space_name = + String::from_utf8(s.next_chunk_variable(md.space_name_l as usize).to_owned()) + .map_err(|_| SDSSError::InternalDecodeStructureCorruptedPayload)? + .into_boxed_str(); + let space = ::obj_dec(s, md.space_meta)?; + Ok(CreateSpaceTxnRestorePL { space_name, space }) + } +} + +impl<'a> GNSEvent for CreateSpaceTxn<'a> { + const OPC: u16 = 0; + type CommitType = CreateSpaceTxnCommitPL<'a>; + type RestoreType = CreateSpaceTxnRestorePL; + fn update_global_state( + CreateSpaceTxnRestorePL { space_name, space }: CreateSpaceTxnRestorePL, + gns: &crate::engine::core::GlobalNS, + ) -> crate::engine::txn::TransactionResult<()> { + let mut wgns = gns.spaces().write(); + if wgns.st_insert(space_name, space) { + Ok(()) + } else { + Err(TransactionError::OnRestoreDataConflictAlreadyExists) + } + } +} + +/* + alter space + --- + for now dump the entire meta +*/ + +pub struct AlterSpaceTxn<'a>(PhantomData<&'a ()>); +pub struct AlterSpaceTxnMD { + uuid: Uuid, + space_name_l: u64, + dict_len: u64, +} +#[derive(Clone, Copy)] +pub struct AlterSpaceTxnCommitPL<'a> { + space_uuid: Uuid, + space_name: &'a str, + space_meta: &'a DictGeneric, +} +pub struct AlterSpaceTxnRestorePL { + space_name: Box, + space_meta: DictGeneric, +} + +impl<'a> PersistObject for AlterSpaceTxn<'a> { + const METADATA_SIZE: usize = sizeof!(u64, 2) + sizeof!(u128); + type InputType = AlterSpaceTxnCommitPL<'a>; + type OutputType = AlterSpaceTxnRestorePL; + type Metadata = AlterSpaceTxnMD; + fn pretest_can_dec_object( + scanner: &crate::engine::storage::v1::BufferedScanner, + md: &Self::Metadata, + ) -> bool { + scanner.has_left(md.space_name_l as usize) + } + fn meta_enc(buf: &mut Vec, data: Self::InputType) { + buf.extend(data.space_uuid.to_le_bytes()); + buf.extend(data.space_name.len().u64_bytes_le()); + buf.extend(data.space_meta.len().u64_bytes_le()); + } + unsafe fn meta_dec( + scanner: &mut crate::engine::storage::v1::BufferedScanner, + ) -> crate::engine::storage::v1::SDSSResult { + Ok(AlterSpaceTxnMD { + uuid: Uuid::from_bytes(scanner.next_chunk()), + space_name_l: u64::from_le_bytes(scanner.next_chunk()), + dict_len: u64::from_le_bytes(scanner.next_chunk()), + }) + } + fn obj_enc(buf: &mut Vec, data: Self::InputType) { + buf.extend(data.space_name.as_bytes()); + as PersistObject>::obj_enc(buf, data.space_meta); + } + unsafe fn obj_dec( + s: &mut crate::engine::storage::v1::BufferedScanner, + md: Self::Metadata, + ) -> crate::engine::storage::v1::SDSSResult { + let space_name = + String::from_utf8(s.next_chunk_variable(md.space_name_l as usize).to_owned()) + .map_err(|_| SDSSError::InternalDecodeStructureCorruptedPayload)? + .into_boxed_str(); + let space_meta = as PersistObject>::obj_dec( + s, + map::MapIndexSizeMD(md.dict_len as usize), + )?; + Ok(AlterSpaceTxnRestorePL { + space_name, + space_meta, + }) + } +} + +/* + drop space +*/ + +pub struct DropSpace<'a>(PhantomData<&'a ()>); +pub struct DropSpaceTxnMD { + space_name_l: u64, + uuid: Uuid, +} +#[derive(Clone, Copy)] +pub struct DropSpaceTxnCommitPL<'a> { + space_name: &'a str, + uuid: Uuid, +} +pub struct DropSpaceTxnRestorePL { + uuid: Uuid, + space_name: Box, +} + +impl<'a> PersistObject for DropSpace<'a> { + const METADATA_SIZE: usize = sizeof!(u128) + sizeof!(u64); + type InputType = DropSpaceTxnCommitPL<'a>; + type OutputType = DropSpaceTxnRestorePL; + type Metadata = DropSpaceTxnMD; + fn pretest_can_dec_object( + scanner: &crate::engine::storage::v1::BufferedScanner, + md: &Self::Metadata, + ) -> bool { + scanner.has_left(md.space_name_l as usize) + } + fn meta_enc(buf: &mut Vec, data: Self::InputType) { + buf.extend(data.space_name.len().u64_bytes_le()); + buf.extend(data.uuid.to_le_bytes()); + } + unsafe fn meta_dec( + scanner: &mut crate::engine::storage::v1::BufferedScanner, + ) -> crate::engine::storage::v1::SDSSResult { + Ok(DropSpaceTxnMD { + space_name_l: u64::from_le_bytes(scanner.next_chunk()), + uuid: Uuid::from_bytes(scanner.next_chunk()), + }) + } + fn obj_enc(buf: &mut Vec, data: Self::InputType) { + buf.extend(data.space_name.as_bytes()); + } + unsafe fn obj_dec( + s: &mut crate::engine::storage::v1::BufferedScanner, + md: Self::Metadata, + ) -> crate::engine::storage::v1::SDSSResult { + let space_name = + String::from_utf8(s.next_chunk_variable(md.space_name_l as usize).to_owned()) + .map_err(|_| SDSSError::InternalDecodeStructureCorruptedPayload)? + .into_boxed_str(); + Ok(DropSpaceTxnRestorePL { + uuid: md.uuid, + space_name, + }) + } +} diff --git a/server/src/engine/txn/mod.rs b/server/src/engine/txn/mod.rs index 62745ada..c7ba9657 100644 --- a/server/src/engine/txn/mod.rs +++ b/server/src/engine/txn/mod.rs @@ -33,6 +33,9 @@ pub type TransactionResult = Result; #[cfg_attr(test, derive(PartialEq))] pub enum TransactionError { SDSSError(SDSSError), + /// While restoring a certain item, a non-resolvable conflict was encountered in the global state, because the item was + /// already present (when it was expected to not be present) + OnRestoreDataConflictAlreadyExists, } direct_from! {