Add preliminary event impls

next
Sayan Nandan 1 year ago
parent d8cabb9761
commit df61b627b4
No known key found for this signature in database
GPG Key ID: 42EEDF4AE9D96B54

@ -36,3 +36,4 @@ mod mem;
mod ql;
mod storage;
mod sync;
mod txn;

@ -29,4 +29,4 @@
mod header;
mod versions;
// impls
mod v1;
pub mod v1;

@ -94,10 +94,12 @@ pub trait JournalAdapter {
type JournalEvent;
/// The global state, which we want to modify on decoding the event
type GlobalState;
/// The transactional impl that makes use of this journal, should define it's error type
type Error;
/// Encode a journal event into a blob
fn encode(event: Self::JournalEvent) -> Box<[u8]>;
/// Decode a journal event and apply it to the global state
fn decode_and_update_state(payload: &[u8], gs: &Self::GlobalState) -> SDSSResult<()>;
fn decode_and_update_state(payload: &[u8], gs: &Self::GlobalState) -> Result<(), Self::Error>;
}
#[derive(Debug)]
@ -232,7 +234,7 @@ impl<TA: JournalAdapter, LF: RawFileIOInterface> JournalReader<TA, LF> {
if self.evid != entry_metadata.event_id as u64 {
// the only case when this happens is when the journal faults at runtime with a write zero (or some other error when no bytes were written)
self.remaining_bytes += JournalEntryMetadata::SIZE as u64;
// move back cursor
// move back cursor to see if we have a recovery block
let new_cursor = self.log_file.retrieve_cursor()? - JournalEntryMetadata::SIZE as u64;
self.log_file.seek_from_start(new_cursor)?;
return self.try_recover_journal_strategy_simple_reverse();

@ -36,7 +36,10 @@ mod start_stop;
#[cfg(test)]
mod tests;
use std::io::Error as IoError;
// re-exports
pub use journal::JournalAdapter;
use crate::util::os::SysIOError as IoError;
pub type SDSSResult<T> = Result<T, SDSSError>;
@ -52,7 +55,15 @@ impl SDSSErrorContext for IoError {
}
}
impl SDSSErrorContext for std::io::Error {
type ExtraData = &'static str;
fn with_extra(self, extra: Self::ExtraData) -> SDSSError {
SDSSError::IoErrorExtra(self.into(), extra)
}
}
#[derive(Debug)]
#[cfg_attr(test, derive(PartialEq))]
pub enum SDSSError {
// IO errors
/// An IO err
@ -113,3 +124,9 @@ impl From<IoError> for SDSSError {
Self::IoError(e)
}
}
impl From<std::io::Error> for SDSSError {
fn from(e: std::io::Error) -> Self {
Self::IoError(e.into())
}
}

@ -265,9 +265,19 @@ mod tx {
Set(usize, u8),
}
#[derive(Debug)]
pub enum TxError {
SDSS(SDSSError),
}
direct_from! {
TxError => {
SDSSError as SDSS
}
}
#[derive(Debug)]
pub struct DatabaseTxnAdapter;
impl JournalAdapter for DatabaseTxnAdapter {
const RECOVERY_PLUGIN: bool = false;
type Error = TxError;
type JournalEvent = TxEvent;
type GlobalState = Database;
@ -294,9 +304,9 @@ mod tx {
ret.into_boxed_slice()
}
fn decode_and_update_state(payload: &[u8], gs: &Self::GlobalState) -> SDSSResult<()> {
fn decode_and_update_state(payload: &[u8], gs: &Self::GlobalState) -> Result<(), TxError> {
if payload.len() != 10 {
return Err(SDSSError::CorruptedFile("testtxn.log"));
return Err(SDSSError::CorruptedFile("testtxn.log").into());
}
let opcode = payload[0];
let index = u64::from_le_bytes(util::copy_slice_to_array(&payload[1..9]));
@ -304,7 +314,7 @@ mod tx {
match opcode {
0 if index == 0 && new_value == 0 => gs.reset(),
1 if index < 10 && index < isize::MAX as u64 => gs.set(index as usize, new_value),
_ => return Err(SDSSError::JournalLogEntryCorrupted),
_ => return Err(SDSSError::JournalLogEntryCorrupted.into()),
}
Ok(())
}

@ -0,0 +1,61 @@
/*
* Created on Sun Aug 20 2023
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2023, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use {
super::{TransactionError, TransactionResult},
crate::engine::{core::GlobalNS, storage::v1::JournalAdapter},
};
/*
journal implementor
*/
/// the journal adapter for DDL queries on the GNS
pub struct GNSAdapter;
impl JournalAdapter for GNSAdapter {
const RECOVERY_PLUGIN: bool = true;
type JournalEvent = GNSSuperEvent;
type GlobalState = GlobalNS;
type Error = TransactionError;
fn encode(GNSSuperEvent(b): Self::JournalEvent) -> Box<[u8]> {
b
}
fn decode_and_update_state(_: &[u8], _: &Self::GlobalState) -> TransactionResult<()> {
todo!()
}
}
/*
Events
---
FIXME(@ohsayan): In the current impl, we unnecessarily use an intermediary buffer which we clearly don't need to (and also makes
pointless allocations). We need to fix this, but with a consistent API (and preferably not something like commit_*(...) unless
we have absolutely no other choice)
*/
// ah that stinging buffer
pub struct GNSSuperEvent(Box<[u8]>);

@ -0,0 +1,42 @@
/*
* Created on Sun Aug 20 2023
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2023, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
pub mod gns;
use super::storage::v1::SDSSError;
pub type TransactionResult<T> = Result<T, TransactionError>;
#[derive(Debug)]
#[cfg_attr(test, derive(PartialEq))]
pub enum TransactionError {
SDSSError(SDSSError),
}
direct_from! {
TransactionError => {
SDSSError as SDSSError
}
}

@ -39,6 +39,24 @@ use {
},
};
#[derive(Debug)]
#[repr(transparent)]
/// A wrapper around [`std`]'s I/O [Error](std::io::Error) type for simplicity with equality
pub struct SysIOError(std::io::Error);
impl From<std::io::Error> for SysIOError {
fn from(e: std::io::Error) -> Self {
Self(e)
}
}
#[cfg(test)]
impl PartialEq for SysIOError {
fn eq(&self, other: &Self) -> bool {
self.0.to_string() == other.0.to_string()
}
}
#[cfg(unix)]
mod unix {
use {

Loading…
Cancel
Save