Add new journal framework

next
Sayan Nandan 8 months ago
parent c724678446
commit e8d968ea4d
No known key found for this signature in database
GPG Key ID: 0EBD769024B24F0A

@ -215,5 +215,10 @@ enumerate_err! {
DataBatchRestoreCorruptedBatchFile = "batch-corrupted-file",
/// the system database is corrupted
SysDBCorrupted = "sysdb-corrupted",
// raw journal errors
RawJournalEventCorruptedMetadata = "journal-event-metadata-corrupted",
RawJournalEventCorrupted = "journal-invalid-event",
RawJournalCorrupted = "journal-corrupted",
RawJournalInvalidEvent = "journal-invalid-event-order",
}
}

@ -158,6 +158,11 @@ pub trait ErrorContext<T> {
fn set_origin(self, origin: Subsystem) -> Result<T, Error>;
/// set the dmsg (do not inherit parent or local)
fn set_dmsg(self, dmsg: impl Into<Dmsg>) -> Result<T, Error>;
fn set_dmsg_fn<F, M>(self, d: F) -> Result<T, Error>
where
F: Fn() -> M,
M: Into<Dmsg>,
Self: Sized;
/// set the origin and dmsg (do not inherit)
fn set_ctx(self, origin: Subsystem, dmsg: impl Into<Dmsg>) -> Result<T, Error>;
// inherit parent
@ -197,6 +202,14 @@ where
fn set_dmsg(self, dmsg: impl Into<Dmsg>) -> Result<T, Error> {
self.map_err(|e| e.err_noinherit().add_dmsg(dmsg))
}
fn set_dmsg_fn<F, M>(self, d: F) -> Result<T, Error>
where
F: Fn() -> M,
M: Into<Dmsg>,
Self: Sized,
{
self.map_err(|e| e.err_noinherit().add_dmsg(d().into()))
}
fn set_ctx(self, origin: Subsystem, dmsg: impl Into<Dmsg>) -> Result<T, Error> {
self.map_err(|e| Error::new(e.err_noinherit().kind, origin, dmsg))
}

@ -397,7 +397,7 @@ macro_rules! sizeof {
}
macro_rules! local {
($($vis:vis static$ident:ident:$ty:ty=$expr:expr;)*)=> {::std::thread_local!{$($vis static $ident: ::std::cell::RefCell::<$ty> = ::std::cell::RefCell::new($expr);)*}};
($($(#[$attr:meta])*$vis:vis static$ident:ident:$ty:ty=$expr:expr;)*)=> {::std::thread_local!{$($(#[$attr])*$vis static $ident: ::std::cell::RefCell::<$ty> = ::std::cell::RefCell::new($expr);)*}};
}
macro_rules! local_mut {

@ -29,7 +29,10 @@
//! This module contains utils for handling checksums
//!
use crc::{Crc, Digest, CRC_64_XZ};
use {
crc::{Crc, Digest, CRC_64_XZ},
std::fmt,
};
/*
NOTE(@ohsayan): we're currently using crc's impl. but the reason I decided to make a wrapper is because I have a
@ -43,13 +46,21 @@ pub struct SCrc64 {
digest: Digest<'static, u64>,
}
impl fmt::Debug for SCrc64 {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SCrc64")
.field("digest", &self.digest.clone().finalize())
.finish()
}
}
impl SCrc64 {
pub const fn new() -> Self {
Self {
digest: CRC64.digest(),
}
}
pub fn recompute_with_new_var_block(&mut self, b: &[u8]) {
pub fn update(&mut self, b: &[u8]) {
self.digest.update(b)
}
pub fn finish(self) -> u64 {

@ -36,4 +36,4 @@
The only header that we currently use is [`v1::HeaderV1`].
*/
pub mod v1;
pub mod sdss_r1;

@ -30,7 +30,7 @@
//! and functions are defined here to deal with SDSSv1 files.
//!
pub(super) mod rw;
pub mod rw;
use {
super::super::super::{
@ -398,6 +398,7 @@ impl<H: HeaderV1Spec> HeaderV1<H> {
}
pub trait FileSpecV1 {
const SIZE: usize = HeaderV1::<Self::HeaderSpec>::SIZE;
type Metadata;
/// the header type
type HeaderSpec: HeaderV1Spec;

@ -35,7 +35,7 @@ use crate::{
FSInterface, FileInterface, FileInterfaceBufWrite, FileInterfaceExt,
FileInterfaceRead, FileInterfaceWrite, FileInterfaceWriteExt, FileOpen,
},
sdss::v1::FileSpecV1,
sdss::sdss_r1::FileSpecV1,
},
RuntimeResult,
},
@ -209,8 +209,13 @@ pub struct TrackedReader<F, S: FileSpecV1> {
impl<F: FileInterface, S: FileSpecV1> TrackedReader<F, S> {
/// Create a new [`TrackedReader`]. This needs to retrieve file position and length
pub fn new(mut f: SdssFile<F, S>) -> RuntimeResult<TrackedReader<F::BufReader, S>> {
f.file_cursor().and_then(|c| Self::with_cursor(f, c))
}
pub fn with_cursor(
f: SdssFile<F, S>,
cursor: u64,
) -> RuntimeResult<TrackedReader<F::BufReader, S>> {
let len = f.file_length()?;
let cursor = f.file_cursor()?;
let f = f.into_buffered_reader()?;
Ok(TrackedReader {
f,
@ -224,8 +229,7 @@ impl<F: FileInterface, S: FileSpecV1> TrackedReader<F, S> {
impl<F: FileInterfaceRead, S: FileSpecV1> TrackedReader<F, S> {
/// Attempt to fill the buffer. This read is tracked.
pub fn tracked_read(&mut self, buf: &mut [u8]) -> RuntimeResult<()> {
self.untracked_read(buf)
.map(|_| self.cs.recompute_with_new_var_block(buf))
self.untracked_read(buf).map(|_| self.cs.update(buf))
}
/// Attempt to read a byte. This read is also tracked.
pub fn read_byte(&mut self) -> RuntimeResult<u8> {
@ -267,6 +271,15 @@ impl<F: FileInterfaceRead, S: FileSpecV1> TrackedReader<F, S> {
pub fn read_u64_le(&mut self) -> RuntimeResult<u64> {
Ok(u64::from_le_bytes(self.read_block()?))
}
pub fn current_checksum(&self) -> u64 {
self.cs.clone().finish()
}
pub fn checksum(&self) -> SCrc64 {
self.cs.clone()
}
pub fn cursor(&self) -> u64 {
self.cursor
}
}
impl<F, S: FileSpecV1> TrackedReader<F, S> {
@ -336,6 +349,10 @@ impl<
pub fn cursor_usize(&self) -> usize {
self.cursor() as _
}
/// Returns true if not all data has been synced
pub fn is_dirty(&self) -> bool {
!self.buf.is_empty()
}
}
impl<
@ -380,6 +397,27 @@ impl<
const CHECKSUM_WRITTEN_IF_BLOCK_ERROR: bool,
> TrackedWriter<F, S, SIZE, PANIC_IF_UNFLUSHED, CHECKSUM_WRITTEN_IF_BLOCK_ERROR>
{
/// Don't write to the buffer, instead directly write to the file
///
/// NB:
/// - If errored, the number of bytes written are still tracked
/// - If errored, the checksum is updated to reflect the number of bytes written (unless otherwise configured)
pub fn tracked_write_through_buffer(&mut self, buf: &[u8]) -> RuntimeResult<()> {
debug_assert!(self.buf.is_empty());
match self.f_d.fwrite_all_count(buf) {
(cnt, r) => {
self.t_cursor += cnt;
if r.is_err() {
if CHECKSUM_WRITTEN_IF_BLOCK_ERROR {
self.t_checksum.update(&buf[..cnt as usize]);
}
} else {
self.t_checksum.update(buf);
}
r
}
}
}
/// Do a tracked write
///
/// On error, if block error checksumming is set then whatever part of the block was written
@ -388,14 +426,13 @@ impl<
let cursor_start = self.cursor_usize();
match self.untracked_write(buf) {
Ok(()) => {
self.t_checksum.recompute_with_new_var_block(buf);
self.t_checksum.update(buf);
Ok(())
}
Err(e) => {
if CHECKSUM_WRITTEN_IF_BLOCK_ERROR {
let cursor_now = self.cursor_usize();
self.t_checksum
.recompute_with_new_var_block(&buf[..cursor_now - cursor_start]);
self.t_checksum.update(&buf[..cursor_now - cursor_start]);
}
Err(e)
}
@ -490,9 +527,9 @@ fn check_vfs_buffering() {
let compiled_header = SystemDatabaseV1::metadata_to_block(()).unwrap();
let expected_checksum = {
let mut crc = SCrc64::new();
crc.recompute_with_new_var_block(&vec![0; 8192]);
crc.recompute_with_new_var_block(&[0]);
crc.recompute_with_new_var_block(&vec![0xFF; 8192]);
crc.update(&vec![0; 8192]);
crc.update(&[0]);
crc.update(&vec![0xFF; 8192]);
crc.finish()
};
closure! {

@ -25,4 +25,4 @@
*/
mod impls;
pub use impls::v1;
pub use impls::sdss_r1;

@ -61,7 +61,7 @@ const CRC: crc::Crc<u32> = crc::Crc::<u32>::new(&crc::CRC_32_ISO_HDLC);
pub fn open_or_create_journal<
TA: JournalAdapter,
Fs: FSInterface,
F: sdss::v1::FileSpecV1<DecodeArgs = (), EncodeArgs = ()>,
F: sdss::sdss_r1::FileSpecV1<DecodeArgs = (), EncodeArgs = ()>,
>(
log_file_name: &str,
gs: &TA::GlobalState,
@ -79,7 +79,7 @@ pub fn open_or_create_journal<
pub fn create_journal<
TA: JournalAdapter,
Fs: FSInterface,
F: sdss::v1::FileSpecV1<EncodeArgs = ()>,
F: sdss::sdss_r1::FileSpecV1<EncodeArgs = ()>,
>(
log_file_name: &str,
) -> RuntimeResult<JournalWriter<Fs, TA>> {
@ -89,7 +89,7 @@ pub fn create_journal<
pub fn load_journal<
TA: JournalAdapter,
Fs: FSInterface,
F: sdss::v1::FileSpecV1<DecodeArgs = ()>,
F: sdss::sdss_r1::FileSpecV1<DecodeArgs = ()>,
>(
log_file_name: &str,
gs: &TA::GlobalState,

@ -55,8 +55,7 @@ impl<Fs: FSInterface> TrackedWriter<Fs> {
})
}
pub fn tracked_write(&mut self, block: &[u8]) -> RuntimeResult<()> {
self.untracked_write(block)
.map(|_| self.cs.recompute_with_new_var_block(block))
self.untracked_write(block).map(|_| self.cs.update(block))
}
pub fn untracked_write(&mut self, block: &[u8]) -> RuntimeResult<()> {
match self.file.write_buffer(block) {
@ -108,8 +107,7 @@ impl<Fs: FSInterface> TrackedReader<Fs> {
self.remaining() >= v
}
pub fn tracked_read(&mut self, buf: &mut [u8]) -> RuntimeResult<()> {
self.untracked_read(buf)
.map(|_| self.cs.recompute_with_new_var_block(buf))
self.untracked_read(buf).map(|_| self.cs.update(buf))
}
pub fn read_byte(&mut self) -> RuntimeResult<u8> {
let mut buf = [0u8; 1];
@ -156,19 +154,23 @@ pub struct SDSSFileIO<Fs: FSInterface, F = <Fs as FSInterface>::File> {
}
impl<Fs: FSInterface> SDSSFileIO<Fs> {
pub fn open<F: sdss::v1::FileSpecV1<DecodeArgs = ()>>(
pub fn open<F: sdss::sdss_r1::FileSpecV1<DecodeArgs = ()>>(
fpath: &str,
) -> RuntimeResult<(Self, F::Metadata)> {
let mut f = Self::_new(Fs::fs_fopen_rw(fpath)?);
let v = F::read_metadata(&mut f.f, ())?;
Ok((f, v))
}
pub fn create<F: sdss::v1::FileSpecV1<EncodeArgs = ()>>(fpath: &str) -> RuntimeResult<Self> {
pub fn create<F: sdss::sdss_r1::FileSpecV1<EncodeArgs = ()>>(
fpath: &str,
) -> RuntimeResult<Self> {
let mut f = Self::_new(Fs::fs_fcreate_rw(fpath)?);
F::write_metadata(&mut f.f, ())?;
Ok(f)
}
pub fn open_or_create_perm_rw<F: sdss::v1::FileSpecV1<DecodeArgs = (), EncodeArgs = ()>>(
pub fn open_or_create_perm_rw<
F: sdss::sdss_r1::FileSpecV1<DecodeArgs = (), EncodeArgs = ()>,
>(
fpath: &str,
) -> RuntimeResult<FileOpen<Self, (Self, F::Metadata)>> {
match Fs::fs_fopen_or_create_rw(fpath)? {

@ -29,17 +29,17 @@ use crate::engine::storage::common::{
versions::{self, DriverVersion, FileSpecifierVersion, ServerVersion},
};
pub(super) type Header = sdss::v1::HeaderV1<HeaderImplV1>;
pub(super) type Header = sdss::sdss_r1::HeaderV1<HeaderImplV1>;
#[derive(Debug)]
pub(super) struct HeaderImplV1;
impl sdss::v1::HeaderV1Spec for HeaderImplV1 {
impl sdss::sdss_r1::HeaderV1Spec for HeaderImplV1 {
type FileClass = FileScope;
type FileSpecifier = FileSpecifier;
const CURRENT_SERVER_VERSION: ServerVersion = versions::v1::V1_SERVER_VERSION;
const CURRENT_DRIVER_VERSION: DriverVersion = versions::v1::V1_DRIVER_VERSION;
}
impl sdss::v1::HeaderV1Enumeration for FileScope {
impl sdss::sdss_r1::HeaderV1Enumeration for FileScope {
const MAX: u8 = FileScope::MAX;
unsafe fn new(x: u8) -> Self {
core::mem::transmute(x)
@ -48,7 +48,7 @@ impl sdss::v1::HeaderV1Enumeration for FileScope {
FileScope::value_u8(self)
}
}
impl sdss::v1::HeaderV1Enumeration for FileSpecifier {
impl sdss::sdss_r1::HeaderV1Enumeration for FileSpecifier {
const MAX: u8 = FileSpecifier::MAX;
unsafe fn new(x: u8) -> Self {
core::mem::transmute(x)
@ -84,7 +84,7 @@ pub enum FileSpecifier {
#[cfg(test)]
pub(super) struct TestFile;
#[cfg(test)]
impl sdss::v1::SimpleFileSpecV1 for TestFile {
impl sdss::sdss_r1::SimpleFileSpecV1 for TestFile {
type HeaderSpec = HeaderImplV1;
const FILE_CLASS: FileScope = FileScope::FlatmapData;
const FILE_SPECIFIER: FileSpecifier = FileSpecifier::TestTransactionLog;
@ -93,7 +93,7 @@ impl sdss::v1::SimpleFileSpecV1 for TestFile {
/// The file specification for the GNS transaction log (impl v1)
pub(super) struct GNSTransactionLogV1;
impl sdss::v1::SimpleFileSpecV1 for GNSTransactionLogV1 {
impl sdss::sdss_r1::SimpleFileSpecV1 for GNSTransactionLogV1 {
type HeaderSpec = HeaderImplV1;
const FILE_CLASS: FileScope = FileScope::Journal;
const FILE_SPECIFIER: FileSpecifier = FileSpecifier::GNSTxnLog;
@ -102,7 +102,7 @@ impl sdss::v1::SimpleFileSpecV1 for GNSTransactionLogV1 {
/// The file specification for a journal batch
pub(super) struct DataBatchJournalV1;
impl sdss::v1::SimpleFileSpecV1 for DataBatchJournalV1 {
impl sdss::sdss_r1::SimpleFileSpecV1 for DataBatchJournalV1 {
type HeaderSpec = HeaderImplV1;
const FILE_CLASS: FileScope = FileScope::DataBatch;
const FILE_SPECIFIER: FileSpecifier = FileSpecifier::TableDataBatch;
@ -111,7 +111,7 @@ impl sdss::v1::SimpleFileSpecV1 for DataBatchJournalV1 {
/// The file specification for the system db
pub(super) struct SysDBV1;
impl sdss::v1::SimpleFileSpecV1 for SysDBV1 {
impl sdss::sdss_r1::SimpleFileSpecV1 for SysDBV1 {
type HeaderSpec = HeaderImplV1;
const FILE_CLASS: FileScope = FileScope::FlatmapData;
const FILE_SPECIFIER: FileSpecifier = FileSpecifier::SysDB;

@ -0,0 +1,27 @@
/*
* Created on Sun Jan 21 2024
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2024, Sayan Nandan <nandansayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
mod raw;

@ -0,0 +1,772 @@
/*
* Created on Sun Jan 21 2024
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2024, Sayan Nandan <nandansayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
#![allow(dead_code)]
#[cfg(test)]
mod tests;
use {
crate::engine::{
error::StorageError,
mem::unsafe_apis::memcpy,
storage::common::{
checksum::SCrc64,
interface::fs_traits::{FSInterface, FileInterface},
sdss::sdss_r1::{
rw::{SdssFile, TrackedReader, TrackedWriter},
FileSpecV1,
},
},
RuntimeResult,
},
std::ops::Range,
};
/*
loader
*/
/// Create a new journal
pub fn create_journal<J: RawJournalAdapter, Fs: FSInterface>(
log_path: &str,
) -> RuntimeResult<RawJournalWriter<J, Fs>>
where
J::Spec: FileSpecV1<EncodeArgs = ()>,
{
let log = SdssFile::create::<Fs>(log_path)?;
RawJournalWriter::new(
JournalInitializer::new(<J::Spec as FileSpecV1>::SIZE as u64, SCrc64::new(), 0, 0),
log,
)
}
/// Open an existing journal
pub fn open_journal<J: RawJournalAdapter, Fs: FSInterface>(
log_path: &str,
gs: &J::GlobalState,
) -> RuntimeResult<RawJournalWriter<J, Fs>>
where
J::Spec: FileSpecV1<DecodeArgs = ()>,
{
let log = SdssFile::<_, J::Spec>::open::<Fs>(log_path)?;
let (initializer, file) = RawJournalReader::<J, Fs>::scroll(log, gs)?;
RawJournalWriter::new(initializer, file)
}
#[derive(Debug)]
pub struct JournalInitializer {
cursor: u64,
checksum: SCrc64,
last_txn_id: u64,
last_offset: u64,
}
impl JournalInitializer {
pub fn new(cursor: u64, checksum: SCrc64, txn_id: u64, last_offset: u64) -> Self {
Self {
cursor,
checksum,
last_txn_id: txn_id,
last_offset,
}
}
pub fn cursor(&self) -> u64 {
self.cursor
}
pub fn checksum(&self) -> SCrc64 {
self.checksum.clone()
}
pub fn txn_id(&self) -> u64 {
self.last_txn_id
}
pub fn last_txn_id(&self) -> u64 {
self.txn_id().saturating_sub(1)
}
pub fn is_new(&self) -> bool {
self.last_offset == 0
}
pub fn last_offset(&self) -> u64 {
self.last_offset
}
}
/*
tracing
*/
#[cfg(test)]
pub fn obtain_trace() -> Vec<JournalTraceEvent> {
local_mut!(TRACE, |t| core::mem::take(t))
}
#[derive(Debug, PartialEq)]
pub enum JournalTraceEvent {
Writer(JournalWriterTraceEvent),
Reader(JournalReaderTraceEvent),
}
direct_from! {
JournalTraceEvent => {
JournalWriterTraceEvent as Writer,
JournalReaderTraceEvent as Reader,
}
}
#[derive(Debug, PartialEq)]
pub enum JournalReaderTraceEvent {
Initialized,
Completed,
ClosedAndReachedEof,
ReopenSuccess,
// event
AttemptingEvent(u64),
DetectedServerEvent,
ServerEventMetadataParsed,
ServerEventParsed,
ServerEventAppliedSuccess,
// drv events
DriverEventExpectingClose,
DriverEventCompletedBlockRead,
DriverEventExpectedCloseGotClose,
DriverEventExpectingReopenBlock,
DriverEventExpectingReopenGotReopen,
// errors
ErrTxnIdMismatch { expected: u64, current: u64 },
DriverEventInvalidMetadata,
ErrInvalidReopenMetadata,
ErrExpectedCloseGotReopen,
}
#[derive(Debug, PartialEq)]
pub(super) enum JournalWriterTraceEvent {
Initialized,
ReinitializeAttempt,
ReinitializeComplete,
// server event
CommitAttemptForEvent(u64),
CommitServerEventWroteMetadata,
CommitServerEventAdapterCompleted,
CommitCommitServerEventSyncCompleted,
// driver event
DriverEventAttemptCommit {
event: DriverEventKind,
event_id: u64,
prev_id: u64,
},
DriverEventPresyncCompleted,
DriverEventCompleted,
DriverClosed,
}
local! {
#[cfg(test)]
static TRACE: Vec<JournalTraceEvent> = Vec::new();
}
macro_rules! jtrace {
($expr:expr) => {
#[cfg(test)]
{
local_mut!(TRACE, |traces| traces.push($expr.into()))
}
};
}
macro_rules! jtrace_writer {
($var:ident) => { jtrace!(JournalWriterTraceEvent::$var) };
($var:ident $($tt:tt)*) => { jtrace!(JournalWriterTraceEvent::$var$($tt)*) };
}
macro_rules! jtrace_reader {
($var:ident) => { jtrace!(JournalReaderTraceEvent::$var) };
($var:ident $($tt:tt)*) => { jtrace!(JournalReaderTraceEvent::$var$($tt)*) };
}
/*
impls
*/
/// An adapter defining the low-level structure of a log file
pub trait RawJournalAdapter {
/// event size buffer
const EVENT_SIZE_BUFFER: usize = 128;
/// Set to true if the journal writer should automatically flush the buffer and fsync after writing an event
const AUTO_SYNC_ON_EVENT_COMMIT: bool = true;
/// set the commit preference
const COMMIT_PREFERENCE: CommitPreference;
/// the journal's file spec
type Spec: FileSpecV1;
/// the global state that is used by this journal
type GlobalState;
/// a journal event
type Event;
/// the decoded event
type DecodedEvent<'a>;
/// a type representing the event kind
type EventMeta: Copy;
/// initialize this adapter
fn initialize(j_: &JournalInitializer) -> Self;
/// parse event metadata
fn parse_event_meta(meta: u64) -> Option<Self::EventMeta>;
/// get event metadata as an [`u64`]
fn get_event_md(&self, event: &Self::Event) -> u64;
/// commit event (direct preference)
fn commit_direct<Fs: FSInterface>(
&mut self,
_: &mut TrackedWriter<Fs::File, Self::Spec>,
_: Self::Event,
) -> RuntimeResult<()> {
unimplemented!()
}
/// commit event (buffered)
fn commit_buffered(&mut self, _: &mut Vec<u8>, _: Self::Event) {
unimplemented!()
}
/// parse the event
fn parse_event<'a, Fs: FSInterface>(
file: &mut TrackedReader<
<<Fs as FSInterface>::File as FileInterface>::BufReader,
Self::Spec,
>,
meta: Self::EventMeta,
) -> RuntimeResult<Self::DecodedEvent<'a>>;
/// apply the event
fn apply_event<'a>(
gs: &Self::GlobalState,
meta: Self::EventMeta,
event: Self::DecodedEvent<'a>,
) -> RuntimeResult<()>;
}
#[derive(Debug, PartialEq)]
pub enum CommitPreference {
Buffered,
Direct,
}
#[derive(Debug, PartialEq)]
/**
A driver event
---
Structured as:
+------------------+----------+--------------+------------------+-------------------+-----------------+-----------------+
| 16B: Event ID | 8B: Meta | 8B: Checksum | 8B: Payload size | 8B: prev checksum | 8B: prev offset | 8B: prev txn id |
+------------------+----------+--------------+------------------+-------------------+-----------------+-----------------+
*/
struct DriverEvent {
txn_id: u128,
event: DriverEventKind,
checksum: u64,
payload_len: u64,
last_checksum: u64,
last_offset: u64,
last_txn_id: u64,
}
impl DriverEvent {
const FULL_EVENT_SIZE: usize = Self::OFFSET_6_LAST_TXN_ID.end - Self::OFFSET_0_TXN_ID.start;
/// currently fixed to 24B: last checksum + last offset + last txn id
const PAYLOAD_LEN: usize = 3;
const OFFSET_0_TXN_ID: Range<usize> = 0..sizeof!(u128);
const OFFSET_1_EVENT_KIND: Range<usize> =
Self::OFFSET_0_TXN_ID.end..Self::OFFSET_0_TXN_ID.end + sizeof!(u64);
const OFFSET_2_CHECKSUM: Range<usize> =
Self::OFFSET_1_EVENT_KIND.end..Self::OFFSET_1_EVENT_KIND.end + sizeof!(u64);
const OFFSET_3_PAYLOAD_LEN: Range<usize> =
Self::OFFSET_2_CHECKSUM.end..Self::OFFSET_2_CHECKSUM.end + sizeof!(u64);
const OFFSET_4_LAST_CHECKSUM: Range<usize> =
Self::OFFSET_3_PAYLOAD_LEN.end..Self::OFFSET_3_PAYLOAD_LEN.end + sizeof!(u64);
const OFFSET_5_LAST_OFFSET: Range<usize> =
Self::OFFSET_4_LAST_CHECKSUM.end..Self::OFFSET_4_LAST_CHECKSUM.end + sizeof!(u64);
const OFFSET_6_LAST_TXN_ID: Range<usize> =
Self::OFFSET_5_LAST_OFFSET.end..Self::OFFSET_5_LAST_OFFSET.end + sizeof!(u64);
/// Create a new driver event (checksum auto-computed)
fn new(
txn_id: u128,
driver_event: DriverEventKind,
last_checksum: u64,
last_offset: u64,
last_txn_id: u64,
) -> Self {
let mut checksum = SCrc64::new();
checksum.update(&Self::PAYLOAD_LEN.to_le_bytes());
checksum.update(&last_checksum.to_le_bytes());
checksum.update(&last_offset.to_le_bytes());
checksum.update(&last_txn_id.to_le_bytes());
Self::with_checksum(
txn_id,
driver_event,
checksum.finish(),
last_checksum,
last_offset,
last_txn_id,
)
}
/// Create a new driver event with the given checksum
fn with_checksum(
txn_id: u128,
driver_event: DriverEventKind,
checksum: u64,
last_checksum: u64,
last_offset: u64,
last_txn_id: u64,
) -> Self {
Self {
txn_id,
event: driver_event,
checksum,
payload_len: Self::PAYLOAD_LEN as u64,
last_checksum,
last_offset,
last_txn_id,
}
}
/// Encode the current driver event
fn encode_self(&self) -> [u8; 64] {
Self::encode(
self.txn_id,
self.event,
self.last_checksum,
self.last_offset,
self.last_txn_id,
)
}
/// Encode a new driver event
///
/// Notes:
/// - The payload length is harcoded to 3
/// - The checksum is automatically computed
fn encode(
txn_id: u128,
driver_event: DriverEventKind,
last_checksum: u64,
last_offset: u64,
last_txn_id: u64,
) -> [u8; 64] {
const _: () = assert!(DriverEvent::OFFSET_6_LAST_TXN_ID.end == 64);
let mut block = [0; 64];
block[Self::OFFSET_0_TXN_ID].copy_from_slice(&txn_id.to_le_bytes());
block[Self::OFFSET_1_EVENT_KIND]
.copy_from_slice(&(driver_event.value_u8() as u64).to_le_bytes());
// the below is a part of the payload
let mut checksum = SCrc64::new();
block[Self::OFFSET_3_PAYLOAD_LEN].copy_from_slice(&Self::PAYLOAD_LEN.to_le_bytes());
block[Self::OFFSET_4_LAST_CHECKSUM].copy_from_slice(&last_checksum.to_le_bytes());
block[Self::OFFSET_5_LAST_OFFSET].copy_from_slice(&last_offset.to_le_bytes());
block[Self::OFFSET_6_LAST_TXN_ID].copy_from_slice(&last_txn_id.to_le_bytes());
checksum.update(&block[Self::OFFSET_3_PAYLOAD_LEN.start..Self::OFFSET_6_LAST_TXN_ID.end]);
// now update the checksum
block[Self::OFFSET_2_CHECKSUM].copy_from_slice(&checksum.finish().to_le_bytes());
block
}
fn decode(block: [u8; 64]) -> Option<Self> {
var!(
let txn_id, driver_event, checksum, payload_len, last_checksum, last_offset, last_txn_id
);
unsafe {
/*
UNSAFE(@ohsayan): we've ensured that the block size is exactly 64 and we use the offsets
correctly
*/
macro_rules! cpblk {
($target:path) => {
cpblk!($target as u64)
};
($target:path as $ty:ty) => {
<$ty>::from_le_bytes(memcpy(&block[$target]))
};
}
txn_id = cpblk!(Self::OFFSET_0_TXN_ID as u128);
let driver_event_ = cpblk!(Self::OFFSET_1_EVENT_KIND);
checksum = cpblk!(Self::OFFSET_2_CHECKSUM);
payload_len = cpblk!(Self::OFFSET_3_PAYLOAD_LEN);
last_checksum = cpblk!(Self::OFFSET_4_LAST_CHECKSUM);
last_offset = cpblk!(Self::OFFSET_5_LAST_OFFSET);
last_txn_id = cpblk!(Self::OFFSET_6_LAST_TXN_ID);
// now validate checksum
let mut checksum_ = SCrc64::new();
checksum_
.update(&block[Self::OFFSET_3_PAYLOAD_LEN.start..Self::OFFSET_6_LAST_TXN_ID.end]);
let target_checksum = checksum_.finish();
let invalid_ev_dscr = driver_event_ > DriverEventKind::MAX as u64;
let invalid_ck = checksum != target_checksum;
let invalid_pl_size = payload_len != 3;
if invalid_ev_dscr | invalid_ck | invalid_pl_size {
return None;
}
driver_event = core::mem::transmute(driver_event_ as u8);
Some(Self::with_checksum(
txn_id,
driver_event,
checksum,
last_checksum,
last_offset,
last_txn_id,
))
}
}
}
#[derive(Debug, PartialEq, Clone, Copy, sky_macros::EnumMethods)]
#[repr(u8)]
pub(super) enum DriverEventKind {
Reopened = 0,
Closed = 1,
}
/*
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Journal writer implementation
---
Quick notes:
- This is a low level writer and only handles driver events. Higher level impls must account for
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
*/
/// A low-level journal writer
pub struct RawJournalWriter<J: RawJournalAdapter, Fs: FSInterface> {
j: J,
log_file: TrackedWriter<<Fs as FSInterface>::File, <J as RawJournalAdapter>::Spec>,
txn_id: u64,
known_txn_id: u64,
known_txn_offset: u64, // if offset is 0, txn id is unset
}
const SERVER_EV_MASK: u64 = 1 << (u64::BITS - 1);
impl<J: RawJournalAdapter, Fs: FSInterface> RawJournalWriter<J, Fs> {
/// Initialize a new [`RawJournalWriter`] using a [`JournalInitializer`]
pub fn new(j_: JournalInitializer, file: SdssFile<Fs::File, J::Spec>) -> RuntimeResult<Self> {
let mut me = Self {
log_file: TrackedWriter::with_cursor_and_checksum(file, j_.cursor(), j_.checksum()),
known_txn_id: j_.last_txn_id(),
known_txn_offset: j_.last_offset(),
txn_id: j_.txn_id(),
j: J::initialize(&j_),
};
if j_.is_new() {
jtrace_writer!(Initialized);
} else {
// not a new instance, so we must update the journal with a re-open event
jtrace_writer!(ReinitializeAttempt);
Self::reopen_driver(&mut me)?;
jtrace_writer!(ReinitializeComplete);
}
Ok(me)
}
/// Commit a new event to the journal
///
/// This will auto-flush the buffer and sync metadata as soon as the [`RawJournalAdapter::commit`] method returns,
/// unless otherwise configured
pub fn commit_event(&mut self, event: J::Event) -> RuntimeResult<()> {
self.txn_context(|me, txn_id| {
let ev_md = me.j.get_event_md(&event);
jtrace_writer!(CommitAttemptForEvent(txn_id as u64));
// MSB must be unused; set msb
debug_assert!(ev_md & SERVER_EV_MASK != 1, "MSB must be unset");
let ev_md = ev_md | SERVER_EV_MASK;
// commit event
let Self { j, log_file, .. } = me;
match J::COMMIT_PREFERENCE {
CommitPreference::Buffered => {
// explicitly buffer and then directly write to the file (without buffering)
let mut buf = Vec::with_capacity(J::EVENT_SIZE_BUFFER);
buf.extend(&txn_id.to_le_bytes());
buf.extend(&ev_md.to_le_bytes());
jtrace_writer!(CommitServerEventWroteMetadata);
j.commit_buffered(&mut buf, event);
log_file.tracked_write_through_buffer(&buf)?;
}
CommitPreference::Direct => {
// use the underlying buffer
// these writes won't actually reach disk
log_file.tracked_write(&txn_id.to_le_bytes())?;
log_file.tracked_write(&ev_md.to_le_bytes())?;
jtrace_writer!(CommitServerEventWroteMetadata);
// now hand over control to adapter impl
J::commit_direct::<Fs>(j, log_file, event)?;
}
}
jtrace_writer!(CommitServerEventAdapterCompleted);
if J::AUTO_SYNC_ON_EVENT_COMMIT {
// should fsync after event
if let CommitPreference::Direct = J::COMMIT_PREFERENCE {
me.log_file.flush_sync()?;
} else {
me.log_file.fsync()?;
}
jtrace_writer!(CommitCommitServerEventSyncCompleted);
}
Ok(())
})
}
}
impl<J: RawJournalAdapter, Fs: FSInterface> RawJournalWriter<J, Fs> {
fn txn_context<T>(
&mut self,
f: impl FnOnce(&mut Self, u128) -> RuntimeResult<T>,
) -> RuntimeResult<T> {
let id = self.txn_id;
self.txn_id += 1;
let ret = f(self, id as u128);
if ret.is_ok() {
self.known_txn_id = id;
self.known_txn_offset = self.log_file.cursor();
}
ret
}
/// Commit a new driver event
fn _commit_driver_event(me: &mut Self, kind: DriverEventKind) -> RuntimeResult<()> {
jtrace_writer!(DriverEventAttemptCommit {
event: kind,
event_id: me.txn_id,
prev_id: me.known_txn_id
});
me.txn_context(|me, txn_id| {
let block = DriverEvent::encode(
txn_id,
kind,
me.log_file.current_checksum(),
me.known_txn_offset,
me.known_txn_id,
);
if !J::AUTO_SYNC_ON_EVENT_COMMIT {
// the log might still not be fully flushed, so flush it now; NB: flush does not affect checksum state;
// this is guaranteed by the impl of the tracked writer
match J::COMMIT_PREFERENCE {
CommitPreference::Buffered => {
// in this case, we know that every write is already synced
debug_assert!(!me.log_file.is_dirty());
}
CommitPreference::Direct => {
jtrace_writer!(DriverEventPresyncCompleted);
me.log_file.flush_sync()?;
}
}
}
me.log_file.tracked_write_through_buffer(&block)?;
jtrace_writer!(DriverEventCompleted);
Ok(())
})
}
/// Close driver
pub fn close_driver(me: &mut Self) -> RuntimeResult<()> {
Self::_commit_driver_event(me, DriverEventKind::Closed)?;
jtrace_writer!(DriverClosed);
Ok(())
}
/// Reopen driver
pub fn reopen_driver(me: &mut Self) -> RuntimeResult<()> {
Self::_commit_driver_event(me, DriverEventKind::Reopened)?;
Ok(())
}
}
pub struct RawJournalReader<J: RawJournalAdapter, Fs: FSInterface> {
tr: TrackedReader<
<<Fs as FSInterface>::File as FileInterface>::BufReader,
<J as RawJournalAdapter>::Spec,
>,
txn_id: u64,
last_txn_id: u64,
last_txn_offset: u64,
last_txn_checksum: u64,
}
impl<J: RawJournalAdapter, Fs: FSInterface> RawJournalReader<J, Fs> {
pub fn scroll(
file: SdssFile<<Fs as FSInterface>::File, <J as RawJournalAdapter>::Spec>,
gs: &J::GlobalState,
) -> RuntimeResult<(
JournalInitializer,
SdssFile<<Fs as FSInterface>::File, J::Spec>,
)> {
let reader = TrackedReader::with_cursor(
file,
<<J as RawJournalAdapter>::Spec as FileSpecV1>::SIZE as u64,
)?;
jtrace_reader!(Initialized);
let mut me = Self::new(reader, 0, 0, 0, 0);
loop {
if me._next_event(gs)? {
jtrace_reader!(Completed);
let initializer = JournalInitializer::new(
me.tr.cursor(),
me.tr.checksum(),
me.txn_id,
// NB: the last txn offset is important because it indicates that the log is new
me.last_txn_offset,
);
let file = me.tr.into_inner::<Fs::File>()?;
return Ok((initializer, file));
}
}
}
fn new(
reader: TrackedReader<
<<Fs as FSInterface>::File as FileInterface>::BufReader,
<J as RawJournalAdapter>::Spec,
>,
txn_id: u64,
last_txn_id: u64,
last_txn_offset: u64,
last_txn_checksum: u64,
) -> Self {
Self {
tr: reader,
txn_id,
last_txn_id,
last_txn_offset,
last_txn_checksum,
}
}
fn _update_known_txn(&mut self) {
self.last_txn_id = self.txn_id;
self.last_txn_checksum = self.tr.current_checksum();
self.last_txn_offset = self.tr.cursor();
self.txn_id += 1;
}
}
impl<J: RawJournalAdapter, Fs: FSInterface> RawJournalReader<J, Fs> {
fn _next_event(&mut self, gs: &J::GlobalState) -> RuntimeResult<bool> {
let txn_id = u128::from_le_bytes(self.tr.read_block()?);
let meta = u64::from_le_bytes(self.tr.read_block()?);
if txn_id != self.txn_id as u128 {
jtrace_reader!(ErrTxnIdMismatch {
expected: self.txn_id,
current: txn_id as u64
});
return Err(StorageError::RawJournalEventCorruptedMetadata.into());
}
jtrace_reader!(AttemptingEvent(txn_id as u64));
{
// check for a server event
// is this a server event?
if meta & SERVER_EV_MASK != 0 {
jtrace_reader!(DetectedServerEvent);
let meta = meta & !SERVER_EV_MASK;
match J::parse_event_meta(meta) {
Some(meta) => {
jtrace_reader!(ServerEventMetadataParsed);
// now parse the actual event
let Self { tr: reader, .. } = self;
let event = J::parse_event::<Fs>(reader, meta)?;
jtrace_reader!(ServerEventParsed);
// we do not consider a parsed event a success signal; so we must actually apply it
match J::apply_event(gs, meta, event) {
Ok(()) => {
jtrace_reader!(ServerEventAppliedSuccess);
self._update_known_txn();
return Ok(false);
}
Err(e) => return Err(e),
}
}
None => return Err(StorageError::RawJournalEventCorruptedMetadata.into()),
}
}
}
jtrace_reader!(DriverEventExpectingClose);
{
// attempt to parse a driver close event
let mut block = [0u8; DriverEvent::FULL_EVENT_SIZE];
block[DriverEvent::OFFSET_0_TXN_ID].copy_from_slice(&txn_id.to_le_bytes());
block[DriverEvent::OFFSET_1_EVENT_KIND].copy_from_slice(&meta.to_le_bytes());
// now get remaining block
self.tr
.tracked_read(&mut block[DriverEvent::OFFSET_2_CHECKSUM.start..])?;
jtrace_reader!(DriverEventCompletedBlockRead);
// check the driver event
let drv_close_event = match DriverEvent::decode(block) {
Some(
ev @ DriverEvent {
event: DriverEventKind::Closed,
..
},
) => ev,
Some(DriverEvent {
event: DriverEventKind::Reopened,
..
}) => {
jtrace_reader!(ErrExpectedCloseGotReopen);
return Err(StorageError::RawJournalInvalidEvent.into());
}
None => return Err(StorageError::RawJournalEventCorrupted.into()),
};
jtrace_reader!(DriverEventExpectedCloseGotClose);
// a driver closed event; we've checked integrity, but we must check the field values
let valid_meta = okay! {
self.last_txn_checksum == drv_close_event.last_checksum,
self.last_txn_id == drv_close_event.last_txn_id,
self.last_txn_offset == drv_close_event.last_offset,
};
if !valid_meta {
jtrace_reader!(DriverEventInvalidMetadata);
// either the block is corrupted or the data we read is corrupted; either way,
// we're going to refuse to read this
return Err(StorageError::RawJournalCorrupted.into());
}
// update
self._update_known_txn();
// full metadata validated; this is a valid close event but is it actually a close
if self.tr.is_eof() {
jtrace_reader!(ClosedAndReachedEof);
// yes, we're done
return Ok(true);
}
}
jtrace_reader!(AttemptingEvent(self.txn_id as u64));
jtrace_reader!(DriverEventExpectingReopenBlock);
// now we must look for a reopen event
let event_block = self.tr.read_block::<{ DriverEvent::FULL_EVENT_SIZE }>()?;
let reopen_event = match DriverEvent::decode(event_block) {
Some(ev) if ev.event == DriverEventKind::Reopened => ev,
None | Some(_) => return Err(StorageError::RawJournalEventCorrupted.into()),
};
jtrace_reader!(DriverEventExpectingReopenGotReopen);
let valid_meta = okay! {
self.last_txn_checksum == reopen_event.last_checksum,
self.last_txn_id == reopen_event.last_txn_id,
self.last_txn_offset == reopen_event.last_offset,
self.txn_id as u128 == reopen_event.txn_id,
};
if valid_meta {
// valid meta, update all
self._update_known_txn();
jtrace_reader!(ReopenSuccess);
Ok(false)
} else {
jtrace_reader!(ErrInvalidReopenMetadata);
Err(StorageError::RawJournalCorrupted.into())
}
}
}

@ -0,0 +1,432 @@
/*
* Created on Tue Jan 30 2024
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2024, Sayan Nandan <nandansayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use {
super::{
create_journal, open_journal, CommitPreference, DriverEvent, DriverEventKind,
JournalInitializer, RawJournalAdapter, RawJournalWriter,
},
crate::engine::{
error::StorageError,
fractal::error::ErrorContext,
storage::{
common::{
interface::{
fs_test::VirtualFS,
fs_traits::{FSInterface, FileInterface},
},
sdss::sdss_r1::rw::TrackedReader,
},
v2::{
journal::raw::{JournalReaderTraceEvent, JournalWriterTraceEvent},
spec::SystemDatabaseV1,
},
},
RuntimeResult,
},
std::cell::RefCell,
};
#[test]
fn encode_decode_meta() {
let dv1 = DriverEvent::new(u128::MAX - 1, DriverEventKind::Reopened, 0, 0, 0);
let encoded1 = dv1.encode_self();
let decoded1 = DriverEvent::decode(encoded1).unwrap();
assert_eq!(dv1, decoded1);
}
/*
impls for journal tests
*/
#[derive(Debug, Clone, PartialEq)]
struct SimpleDB {
data: RefCell<Vec<String>>,
}
impl SimpleDB {
fn new() -> Self {
Self {
data: RefCell::default(),
}
}
fn clear(
&mut self,
log: &mut RawJournalWriter<SimpleDBJournal, VirtualFS>,
) -> RuntimeResult<()> {
log.commit_event(DbEvent::Clear)?;
self.data.get_mut().clear();
Ok(())
}
fn pop(&mut self, log: &mut RawJournalWriter<SimpleDBJournal, VirtualFS>) -> RuntimeResult<()> {
self.data.get_mut().pop().unwrap();
log.commit_event(DbEvent::Pop)?;
Ok(())
}
fn push(
&mut self,
log: &mut RawJournalWriter<SimpleDBJournal, VirtualFS>,
new: String,
) -> RuntimeResult<()> {
log.commit_event(DbEvent::NewKey(new.clone()))?;
self.data.get_mut().push(new);
Ok(())
}
}
struct SimpleDBJournal;
enum DbEvent {
NewKey(String),
Pop,
Clear,
}
#[derive(Debug, PartialEq, Clone, Copy)]
enum EventMeta {
NewKey,
Pop,
Clear,
}
impl RawJournalAdapter for SimpleDBJournal {
const COMMIT_PREFERENCE: CommitPreference = CommitPreference::Buffered;
type Spec = SystemDatabaseV1;
type GlobalState = SimpleDB;
type Event = DbEvent;
type DecodedEvent<'a> = DbEvent;
type EventMeta = EventMeta;
fn initialize(_: &JournalInitializer) -> Self {
Self
}
fn parse_event_meta(meta: u64) -> Option<Self::EventMeta> {
Some(match meta {
0 => EventMeta::NewKey,
1 => EventMeta::Pop,
2 => EventMeta::Clear,
_ => return None,
})
}
fn get_event_md(&self, event: &Self::Event) -> u64 {
match event {
DbEvent::NewKey(_) => 0,
DbEvent::Pop => 1,
DbEvent::Clear => 2,
}
}
fn commit_buffered(&mut self, buf: &mut Vec<u8>, event: Self::Event) {
if let DbEvent::NewKey(key) = event {
buf.extend((key.len() as u64).to_le_bytes());
buf.extend(key.as_bytes());
}
}
fn parse_event<'a, Fs: FSInterface>(
file: &mut TrackedReader<
<<Fs as FSInterface>::File as FileInterface>::BufReader,
Self::Spec,
>,
meta: Self::EventMeta,
) -> RuntimeResult<Self::DecodedEvent<'a>> {
Ok(match meta {
EventMeta::NewKey => {
let key_size = u64::from_le_bytes(file.read_block()?);
let mut keybuf = vec![0u8; key_size as usize];
file.tracked_read(&mut keybuf)?;
match String::from_utf8(keybuf) {
Ok(k) => DbEvent::NewKey(k),
Err(_) => return Err(StorageError::RawJournalEventCorrupted.into()),
}
}
EventMeta::Clear => DbEvent::Clear,
EventMeta::Pop => DbEvent::Pop,
})
}
fn apply_event<'a>(
gs: &Self::GlobalState,
_: Self::EventMeta,
event: Self::DecodedEvent<'a>,
) -> RuntimeResult<()> {
match event {
DbEvent::NewKey(k) => gs.data.borrow_mut().push(k),
DbEvent::Clear => gs.data.borrow_mut().clear(),
DbEvent::Pop => {
if gs.data.borrow_mut().pop().is_none() {
return Err(StorageError::RawJournalCorrupted.into());
}
}
}
Ok(())
}
}
/*
journal tests
*/
#[test]
fn journal_open_close() {
{
// new boot
let mut j = create_journal::<SimpleDBJournal, VirtualFS>("myjournal").unwrap();
assert_eq!(
super::obtain_trace(),
intovec![JournalWriterTraceEvent::Initialized]
);
RawJournalWriter::close_driver(&mut j).unwrap();
assert_eq!(
super::obtain_trace(),
intovec![
JournalWriterTraceEvent::DriverEventAttemptCommit {
event: DriverEventKind::Closed,
event_id: 0,
prev_id: 0
},
JournalWriterTraceEvent::DriverEventCompleted,
JournalWriterTraceEvent::DriverClosed
]
);
}
{
// second boot
let mut j =
open_journal::<SimpleDBJournal, VirtualFS>("myjournal", &SimpleDB::new()).unwrap();
assert_eq!(
super::obtain_trace(),
intovec![
// init reader and read close event
JournalReaderTraceEvent::Initialized,
JournalReaderTraceEvent::AttemptingEvent(0),
JournalReaderTraceEvent::DriverEventExpectingClose,
JournalReaderTraceEvent::DriverEventCompletedBlockRead,
JournalReaderTraceEvent::DriverEventExpectedCloseGotClose,
JournalReaderTraceEvent::ClosedAndReachedEof,
JournalReaderTraceEvent::Completed,
// open writer and write reopen event
JournalWriterTraceEvent::ReinitializeAttempt,
JournalWriterTraceEvent::DriverEventAttemptCommit {
event: DriverEventKind::Reopened,
event_id: 1,
prev_id: 0
},
JournalWriterTraceEvent::DriverEventCompleted,
JournalWriterTraceEvent::ReinitializeComplete
]
);
RawJournalWriter::close_driver(&mut j).unwrap();
assert_eq!(
super::obtain_trace(),
intovec![
JournalWriterTraceEvent::DriverEventAttemptCommit {
event: DriverEventKind::Closed,
event_id: 2,
prev_id: 1
},
JournalWriterTraceEvent::DriverEventCompleted,
JournalWriterTraceEvent::DriverClosed
]
);
}
{
// third boot
let mut j =
open_journal::<SimpleDBJournal, VirtualFS>("myjournal", &SimpleDB::new()).unwrap();
assert_eq!(
super::obtain_trace(),
intovec![
// init reader and read reopen event
JournalReaderTraceEvent::Initialized,
JournalReaderTraceEvent::AttemptingEvent(0),
JournalReaderTraceEvent::DriverEventExpectingClose,
JournalReaderTraceEvent::DriverEventCompletedBlockRead,
JournalReaderTraceEvent::DriverEventExpectedCloseGotClose,
JournalReaderTraceEvent::AttemptingEvent(1),
JournalReaderTraceEvent::DriverEventExpectingReopenBlock,
JournalReaderTraceEvent::DriverEventExpectingReopenGotReopen,
JournalReaderTraceEvent::ReopenSuccess,
// now read close event
JournalReaderTraceEvent::AttemptingEvent(2),
JournalReaderTraceEvent::DriverEventExpectingClose,
JournalReaderTraceEvent::DriverEventCompletedBlockRead,
JournalReaderTraceEvent::DriverEventExpectedCloseGotClose,
JournalReaderTraceEvent::ClosedAndReachedEof,
JournalReaderTraceEvent::Completed,
// open writer and write reopen event
JournalWriterTraceEvent::ReinitializeAttempt,
JournalWriterTraceEvent::DriverEventAttemptCommit {
event: DriverEventKind::Reopened,
event_id: 3,
prev_id: 2,
},
JournalWriterTraceEvent::DriverEventCompleted,
JournalWriterTraceEvent::ReinitializeComplete
]
);
RawJournalWriter::close_driver(&mut j).unwrap();
assert_eq!(
super::obtain_trace(),
intovec![
JournalWriterTraceEvent::DriverEventAttemptCommit {
event: DriverEventKind::Closed,
event_id: 4,
prev_id: 3
},
JournalWriterTraceEvent::DriverEventCompleted,
JournalWriterTraceEvent::DriverClosed
]
);
}
}
#[test]
fn journal_with_server_single_event() {
{
let mut db = SimpleDB::new();
// new boot
let mut j = create_journal::<SimpleDBJournal, VirtualFS>("myjournal").unwrap();
db.push(&mut j, "hello world".into()).unwrap();
RawJournalWriter::close_driver(&mut j).unwrap();
assert_eq!(
super::obtain_trace(),
intovec![
JournalWriterTraceEvent::Initialized,
JournalWriterTraceEvent::CommitAttemptForEvent(0),
JournalWriterTraceEvent::CommitServerEventWroteMetadata,
JournalWriterTraceEvent::CommitServerEventAdapterCompleted,
JournalWriterTraceEvent::CommitCommitServerEventSyncCompleted,
JournalWriterTraceEvent::DriverEventAttemptCommit {
event: DriverEventKind::Closed,
event_id: 1,
prev_id: 0
},
JournalWriterTraceEvent::DriverEventCompleted,
JournalWriterTraceEvent::DriverClosed
]
);
}
{
let db = SimpleDB::new();
// second boot
let mut j = open_journal::<SimpleDBJournal, VirtualFS>("myjournal", &db)
.set_dmsg_fn(|| format!("{:?}", super::obtain_trace()))
.unwrap();
assert_eq!(db.data.borrow().len(), 1);
assert_eq!(db.data.borrow()[0], "hello world");
assert_eq!(
super::obtain_trace(),
intovec![
// init reader and read server event
JournalReaderTraceEvent::Initialized,
JournalReaderTraceEvent::AttemptingEvent(0),
JournalReaderTraceEvent::DetectedServerEvent,
JournalReaderTraceEvent::ServerEventMetadataParsed,
JournalReaderTraceEvent::ServerEventParsed,
JournalReaderTraceEvent::ServerEventAppliedSuccess,
// now read close event
JournalReaderTraceEvent::AttemptingEvent(1),
JournalReaderTraceEvent::DriverEventExpectingClose,
JournalReaderTraceEvent::DriverEventCompletedBlockRead,
JournalReaderTraceEvent::DriverEventExpectedCloseGotClose,
JournalReaderTraceEvent::ClosedAndReachedEof,
JournalReaderTraceEvent::Completed,
// now init writer
JournalWriterTraceEvent::ReinitializeAttempt,
JournalWriterTraceEvent::DriverEventAttemptCommit {
event: DriverEventKind::Reopened,
event_id: 2,
prev_id: 1
},
JournalWriterTraceEvent::DriverEventCompleted,
JournalWriterTraceEvent::ReinitializeComplete,
]
);
RawJournalWriter::close_driver(&mut j).unwrap();
assert_eq!(
super::obtain_trace(),
intovec![
JournalWriterTraceEvent::DriverEventAttemptCommit {
event: DriverEventKind::Closed,
event_id: 3,
prev_id: 2,
},
JournalWriterTraceEvent::DriverEventCompleted,
JournalWriterTraceEvent::DriverClosed
]
);
}
{
// third boot
let db = SimpleDB::new();
let mut j = open_journal::<SimpleDBJournal, VirtualFS>("myjournal", &db).unwrap();
assert_eq!(db.data.borrow().len(), 1);
assert_eq!(db.data.borrow()[0], "hello world");
assert_eq!(
super::obtain_trace(),
intovec![
// init reader and read server event
JournalReaderTraceEvent::Initialized,
JournalReaderTraceEvent::AttemptingEvent(0),
JournalReaderTraceEvent::DetectedServerEvent,
JournalReaderTraceEvent::ServerEventMetadataParsed,
JournalReaderTraceEvent::ServerEventParsed,
JournalReaderTraceEvent::ServerEventAppliedSuccess,
// now read close event
JournalReaderTraceEvent::AttemptingEvent(1),
JournalReaderTraceEvent::DriverEventExpectingClose,
JournalReaderTraceEvent::DriverEventCompletedBlockRead,
JournalReaderTraceEvent::DriverEventExpectedCloseGotClose,
// now read reopen event
JournalReaderTraceEvent::AttemptingEvent(2),
JournalReaderTraceEvent::DriverEventExpectingReopenBlock,
JournalReaderTraceEvent::DriverEventExpectingReopenGotReopen,
JournalReaderTraceEvent::ReopenSuccess,
// now read close event
JournalReaderTraceEvent::AttemptingEvent(3),
JournalReaderTraceEvent::DriverEventExpectingClose,
JournalReaderTraceEvent::DriverEventCompletedBlockRead,
JournalReaderTraceEvent::DriverEventExpectedCloseGotClose,
JournalReaderTraceEvent::ClosedAndReachedEof,
JournalReaderTraceEvent::Completed,
// now open writer and reinitialize
JournalWriterTraceEvent::ReinitializeAttempt,
JournalWriterTraceEvent::DriverEventAttemptCommit {
event: DriverEventKind::Reopened,
event_id: 4,
prev_id: 3,
},
JournalWriterTraceEvent::DriverEventCompleted,
JournalWriterTraceEvent::ReinitializeComplete,
]
);
RawJournalWriter::close_driver(&mut j).unwrap();
assert_eq!(
super::obtain_trace(),
intovec![
JournalWriterTraceEvent::DriverEventAttemptCommit {
event: DriverEventKind::Closed,
event_id: 5,
prev_id: 4,
},
JournalWriterTraceEvent::DriverEventCompleted,
JournalWriterTraceEvent::DriverClosed
]
);
}
}

@ -24,4 +24,5 @@
*
*/
pub mod journal;
pub mod spec;

@ -26,7 +26,7 @@
use {
crate::engine::storage::common::{
sdss::{self, v1::HeaderV1},
sdss::{self, sdss_r1::HeaderV1},
versions::{self, DriverVersion, FileSpecifierVersion, ServerVersion},
},
std::mem::transmute,
@ -50,7 +50,7 @@ pub enum FileSpecifier {
ModelData = 1,
}
impl sdss::v1::HeaderV1Enumeration for FileClass {
impl sdss::sdss_r1::HeaderV1Enumeration for FileClass {
const MAX: u8 = FileClass::MAX;
unsafe fn new(x: u8) -> Self {
transmute(x)
@ -60,7 +60,7 @@ impl sdss::v1::HeaderV1Enumeration for FileClass {
}
}
impl sdss::v1::HeaderV1Enumeration for FileSpecifier {
impl sdss::sdss_r1::HeaderV1Enumeration for FileSpecifier {
const MAX: u8 = FileSpecifier::MAX;
unsafe fn new(x: u8) -> Self {
transmute(x)
@ -71,7 +71,7 @@ impl sdss::v1::HeaderV1Enumeration for FileSpecifier {
}
pub struct HeaderImplV2;
impl sdss::v1::HeaderV1Spec for HeaderImplV2 {
impl sdss::sdss_r1::HeaderV1Spec for HeaderImplV2 {
type FileClass = FileClass;
type FileSpecifier = FileSpecifier;
const CURRENT_SERVER_VERSION: ServerVersion = versions::v2::V2_SERVER_VERSION;
@ -79,7 +79,7 @@ impl sdss::v1::HeaderV1Spec for HeaderImplV2 {
}
pub struct SystemDatabaseV1;
impl sdss::v1::SimpleFileSpecV1 for SystemDatabaseV1 {
impl sdss::sdss_r1::SimpleFileSpecV1 for SystemDatabaseV1 {
type HeaderSpec = HeaderImplV2;
const FILE_CLASS: FileClass = FileClass::EventLog;
const FILE_SPECIFIER: FileSpecifier = FileSpecifier::GlobalNS;
@ -87,7 +87,7 @@ impl sdss::v1::SimpleFileSpecV1 for SystemDatabaseV1 {
}
pub struct ModelDataBatchAofV1;
impl sdss::v1::SimpleFileSpecV1 for ModelDataBatchAofV1 {
impl sdss::sdss_r1::SimpleFileSpecV1 for ModelDataBatchAofV1 {
type HeaderSpec = HeaderImplV2;
const FILE_CLASS: FileClass = FileClass::Batch;
const FILE_SPECIFIER: FileSpecifier = FileSpecifier::ModelData;

@ -24,7 +24,8 @@
*
*/
#![deny(unused_crate_dependencies, unused_imports, unused_must_use)]
#![forbid(unused_crate_dependencies)]
#![deny(unused_imports, unused_must_use)]
#![cfg_attr(feature = "nightly", feature(test))]
//! # Skytable

@ -47,6 +47,12 @@ pub use {flock::FileLock, free_memory::free_memory_in_bytes};
/// A wrapper around [`std`]'s I/O [Error](std::io::Error) type for simplicity with equality
pub struct SysIOError(std::io::Error);
impl SysIOError {
pub fn into_inner(self) -> std::io::Error {
self.0
}
}
impl From<std::io::Error> for SysIOError {
fn from(e: std::io::Error) -> Self {
Self(e)

Loading…
Cancel
Save