Add jlog impls

next
Sayan Nandan 7 months ago
parent 6d6f88fb40
commit 85c40b60b2
No known key found for this signature in database
GPG Key ID: 0EBD769024B24F0A

@ -440,3 +440,11 @@ macro_rules! closure {
f()
}}
}
#[cfg(test)]
macro_rules! array {
($($(#[$attr:meta])* $vis:vis const $name:ident: [$ty:ty] = [$($expr:expr),* $(,)?]);* $(;)?) => {
$(#[allow(non_snake_case)] mod $name {pub(super) const LEN: usize = { let mut i = 0;$(let _ = $expr; i += 1;)*i += 0; i};}
$(#[$attr])* $vis const $name: [$ty; $name::LEN] = [$($expr),*];)*
}
}

@ -24,4 +24,116 @@
*
*/
#![allow(dead_code)]
mod raw;
#[cfg(test)]
mod tests;
use {
self::raw::{CommitPreference, JournalInitializer, RawJournalAdapter},
crate::engine::{
error::StorageError,
fractal,
storage::common::{
checksum::SCrc64,
interface::fs_traits::{FSInterface, FileInterface},
sdss::sdss_r1::{
rw::{TrackedReader, TrackedWriter},
FileSpecV1,
},
},
RuntimeResult,
},
std::marker::PhantomData,
};
pub type EventLogJournal<E, Fs> = raw::RawJournalWriter<EventLog<E>, Fs>;
pub struct EventLog<E: EventLogAdapter>(PhantomData<E>);
pub trait EventLogAdapter {
type SdssSpec: FileSpecV1;
type GlobalState;
type Event<'a>;
type DecodedEvent;
type EventMeta: Copy;
type Error: Into<fractal::error::Error>;
const EV_MAX: u8;
unsafe fn meta_from_raw(m: u64) -> Self::EventMeta;
fn event_md<'a>(event: &Self::Event<'a>) -> u64;
fn encode<'a>(event: Self::Event<'a>) -> Box<[u8]>;
fn decode(block: Vec<u8>, kind: Self::EventMeta) -> Result<Self::DecodedEvent, Self::Error>;
fn apply_event(g: &Self::GlobalState, ev: Self::DecodedEvent) -> Result<(), Self::Error>;
}
impl<E: EventLogAdapter> RawJournalAdapter for EventLog<E> {
const COMMIT_PREFERENCE: CommitPreference = CommitPreference::Direct;
type Spec = <E as EventLogAdapter>::SdssSpec;
type GlobalState = <E as EventLogAdapter>::GlobalState;
type Event<'a> = <E as EventLogAdapter>::Event<'a>;
type DecodedEvent = <E as EventLogAdapter>::DecodedEvent;
type EventMeta = <E as EventLogAdapter>::EventMeta;
fn initialize(_: &JournalInitializer) -> Self {
Self(PhantomData)
}
fn parse_event_meta(meta: u64) -> Option<Self::EventMeta> {
if meta > <E as EventLogAdapter>::EV_MAX as u64 {
return None;
}
unsafe {
// UNSAFE(@ohsayan): checked max
Some(<E as EventLogAdapter>::meta_from_raw(meta))
}
}
fn get_event_md<'a>(&self, event: &Self::Event<'a>) -> u64 {
<E as EventLogAdapter>::event_md(event)
}
fn commit_direct<'a, Fs: FSInterface>(
&mut self,
w: &mut TrackedWriter<Fs::File, Self::Spec>,
event: Self::Event<'a>,
) -> RuntimeResult<()> {
let pl = <E as EventLogAdapter>::encode(event);
let plen = (pl.len() as u64).to_le_bytes();
let mut checksum = SCrc64::new();
checksum.update(&plen);
checksum.update(&pl);
let checksum = checksum.finish().to_le_bytes();
/*
[CK][PLEN][PL]
*/
w.tracked_write(&checksum)?;
w.tracked_write(&plen)?;
w.tracked_write(&pl)
}
fn parse_event<'a, Fs: FSInterface>(
file: &mut TrackedReader<
<<Fs as FSInterface>::File as FileInterface>::BufReader,
Self::Spec,
>,
m: Self::EventMeta,
) -> RuntimeResult<Self::DecodedEvent> {
/*
verify checksum
*/
let expected_checksum = u64::from_le_bytes(file.read_block()?);
let plen = u64::from_le_bytes(file.read_block()?);
let mut pl = vec![0; plen as usize];
file.tracked_read(&mut pl)?;
let mut this_checksum = SCrc64::new();
this_checksum.update(&plen.to_le_bytes());
this_checksum.update(&pl);
if this_checksum.finish() != expected_checksum {
return Err(StorageError::RawJournalCorrupted.into());
}
<E as EventLogAdapter>::decode(pl, m).map_err(Into::into)
}
fn apply_event<'a>(
gs: &Self::GlobalState,
_: Self::EventMeta,
event: Self::DecodedEvent,
) -> RuntimeResult<()> {
<E as EventLogAdapter>::apply_event(gs, event).map_err(Into::into)
}
}

@ -24,8 +24,6 @@
*
*/
#![allow(dead_code)]
#[cfg(test)]
mod tests;
@ -223,9 +221,9 @@ pub trait RawJournalAdapter {
/// the global state that is used by this journal
type GlobalState;
/// a journal event
type Event;
type Event<'a>;
/// the decoded event
type DecodedEvent<'a>;
type DecodedEvent;
/// a type representing the event kind
type EventMeta: Copy;
/// initialize this adapter
@ -233,17 +231,17 @@ pub trait RawJournalAdapter {
/// parse event metadata
fn parse_event_meta(meta: u64) -> Option<Self::EventMeta>;
/// get event metadata as an [`u64`]
fn get_event_md(&self, event: &Self::Event) -> u64;
fn get_event_md<'a>(&self, event: &Self::Event<'a>) -> u64;
/// commit event (direct preference)
fn commit_direct<Fs: FSInterface>(
fn commit_direct<'a, Fs: FSInterface>(
&mut self,
_: &mut TrackedWriter<Fs::File, Self::Spec>,
_: Self::Event,
_: Self::Event<'a>,
) -> RuntimeResult<()> {
unimplemented!()
}
/// commit event (buffered)
fn commit_buffered(&mut self, _: &mut Vec<u8>, _: Self::Event) {
fn commit_buffered<'a>(&mut self, _: &mut Vec<u8>, _: Self::Event<'a>) {
unimplemented!()
}
/// parse the event
@ -253,12 +251,12 @@ pub trait RawJournalAdapter {
Self::Spec,
>,
meta: Self::EventMeta,
) -> RuntimeResult<Self::DecodedEvent<'a>>;
) -> RuntimeResult<Self::DecodedEvent>;
/// apply the event
fn apply_event<'a>(
gs: &Self::GlobalState,
meta: Self::EventMeta,
event: Self::DecodedEvent<'a>,
event: Self::DecodedEvent,
) -> RuntimeResult<()>;
}
@ -485,7 +483,7 @@ impl<J: RawJournalAdapter, Fs: FSInterface> RawJournalWriter<J, Fs> {
///
/// This will auto-flush the buffer and sync metadata as soon as the [`RawJournalAdapter::commit`] method returns,
/// unless otherwise configured
pub fn commit_event(&mut self, event: J::Event) -> RuntimeResult<()> {
pub fn commit_event<'a>(&mut self, event: J::Event<'a>) -> RuntimeResult<()> {
self.txn_context(|me, txn_id| {
let ev_md = me.j.get_event_md(&event);
jtrace_writer!(CommitAttemptForEvent(txn_id as u64));
@ -517,11 +515,7 @@ impl<J: RawJournalAdapter, Fs: FSInterface> RawJournalWriter<J, Fs> {
jtrace_writer!(CommitServerEventAdapterCompleted);
if J::AUTO_SYNC_ON_EVENT_COMMIT {
// should fsync after event
if let CommitPreference::Direct = J::COMMIT_PREFERENCE {
me.log_file.flush_sync()?;
} else {
me.log_file.fsync()?;
}
log_file.flush_sync()?;
jtrace_writer!(CommitCommitServerEventSyncCompleted);
}
Ok(())
@ -561,16 +555,7 @@ impl<J: RawJournalAdapter, Fs: FSInterface> RawJournalWriter<J, Fs> {
if !J::AUTO_SYNC_ON_EVENT_COMMIT {
// the log might still not be fully flushed, so flush it now; NB: flush does not affect checksum state;
// this is guaranteed by the impl of the tracked writer
match J::COMMIT_PREFERENCE {
CommitPreference::Buffered => {
// in this case, we know that every write is already synced
debug_assert!(!me.log_file.is_dirty());
}
CommitPreference::Direct => {
jtrace_writer!(DriverEventPresyncCompleted);
me.log_file.flush_sync()?;
}
}
me.log_file.flush_sync()?;
}
me.log_file.tracked_write_through_buffer(&block)?;
jtrace_writer!(DriverEventCompleted);

@ -79,13 +79,13 @@ impl SimpleDB {
&mut self,
log: &mut RawJournalWriter<SimpleDBJournal, VirtualFS>,
) -> RuntimeResult<()> {
log.commit_event(DbEvent::Clear)?;
log.commit_event(DbEventEncoded::Clear)?;
self.data.get_mut().clear();
Ok(())
}
fn pop(&mut self, log: &mut RawJournalWriter<SimpleDBJournal, VirtualFS>) -> RuntimeResult<()> {
self.data.get_mut().pop().unwrap();
log.commit_event(DbEvent::Pop)?;
log.commit_event(DbEventEncoded::Pop)?;
Ok(())
}
fn push(
@ -94,13 +94,18 @@ impl SimpleDB {
new: impl ToString,
) -> RuntimeResult<()> {
let new = new.to_string();
log.commit_event(DbEvent::NewKey(new.clone()))?;
log.commit_event(DbEventEncoded::NewKey(&new))?;
self.data.get_mut().push(new);
Ok(())
}
}
struct SimpleDBJournal;
enum DbEvent {
enum DbEventEncoded<'a> {
NewKey(&'a str),
Pop,
Clear,
}
enum DbEventRestored {
NewKey(String),
Pop,
Clear,
@ -115,8 +120,8 @@ impl RawJournalAdapter for SimpleDBJournal {
const COMMIT_PREFERENCE: CommitPreference = CommitPreference::Buffered;
type Spec = SystemDatabaseV1;
type GlobalState = SimpleDB;
type Event = DbEvent;
type DecodedEvent<'a> = DbEvent;
type Event<'a> = DbEventEncoded<'a>;
type DecodedEvent = DbEventRestored;
type EventMeta = EventMeta;
fn initialize(_: &JournalInitializer) -> Self {
Self
@ -129,15 +134,15 @@ impl RawJournalAdapter for SimpleDBJournal {
_ => return None,
})
}
fn get_event_md(&self, event: &Self::Event) -> u64 {
fn get_event_md<'a>(&self, event: &Self::Event<'a>) -> u64 {
match event {
DbEvent::NewKey(_) => 0,
DbEvent::Pop => 1,
DbEvent::Clear => 2,
DbEventEncoded::NewKey(_) => 0,
DbEventEncoded::Pop => 1,
DbEventEncoded::Clear => 2,
}
}
fn commit_buffered(&mut self, buf: &mut Vec<u8>, event: Self::Event) {
if let DbEvent::NewKey(key) = event {
fn commit_buffered<'a>(&mut self, buf: &mut Vec<u8>, event: Self::Event<'a>) {
if let DbEventEncoded::NewKey(key) = event {
buf.extend((key.len() as u64).to_le_bytes());
buf.extend(key.as_bytes());
}
@ -148,30 +153,30 @@ impl RawJournalAdapter for SimpleDBJournal {
Self::Spec,
>,
meta: Self::EventMeta,
) -> RuntimeResult<Self::DecodedEvent<'a>> {
) -> RuntimeResult<Self::DecodedEvent> {
Ok(match meta {
EventMeta::NewKey => {
let key_size = u64::from_le_bytes(file.read_block()?);
let mut keybuf = vec![0u8; key_size as usize];
file.tracked_read(&mut keybuf)?;
match String::from_utf8(keybuf) {
Ok(k) => DbEvent::NewKey(k),
Ok(k) => DbEventRestored::NewKey(k),
Err(_) => return Err(StorageError::RawJournalEventCorrupted.into()),
}
}
EventMeta::Clear => DbEvent::Clear,
EventMeta::Pop => DbEvent::Pop,
EventMeta::Clear => DbEventRestored::Clear,
EventMeta::Pop => DbEventRestored::Pop,
})
}
fn apply_event<'a>(
gs: &Self::GlobalState,
_: Self::EventMeta,
event: Self::DecodedEvent<'a>,
event: Self::DecodedEvent,
) -> RuntimeResult<()> {
match event {
DbEvent::NewKey(k) => gs.data.borrow_mut().push(k),
DbEvent::Clear => gs.data.borrow_mut().clear(),
DbEvent::Pop => {
DbEventRestored::NewKey(k) => gs.data.borrow_mut().push(k),
DbEventRestored::Clear => gs.data.borrow_mut().clear(),
DbEventRestored::Pop => {
if gs.data.borrow_mut().pop().is_none() {
return Err(StorageError::RawJournalCorrupted.into());
}

@ -0,0 +1,174 @@
/*
* Created on Fri Feb 09 2024
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2024, Sayan Nandan <nandansayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
/*
event log test
*/
use {
super::{raw::create_journal, EventLog, EventLogAdapter, EventLogJournal},
crate::engine::{
error::StorageError,
fractal,
mem::unsafe_apis,
storage::{
common::interface::fs_test::VirtualFS,
v2::{journal::raw::open_journal, spec::SystemDatabaseV1},
},
RuntimeResult,
},
std::cell::{Ref, RefCell, RefMut},
};
#[derive(Default)]
pub struct SimpleDB {
values: RefCell<Vec<String>>,
}
impl SimpleDB {
fn as_mut(&self) -> RefMut<Vec<String>> {
self.values.borrow_mut()
}
fn as_ref(&self) -> Ref<Vec<String>> {
self.values.borrow()
}
fn push(
&self,
log: &mut EventLogJournal<SimpleDBAdapter, VirtualFS>,
key: &str,
) -> RuntimeResult<()> {
log.commit_event(DbEvent::Push(key))?;
self.as_mut().push(key.into());
Ok(())
}
fn pop(&self, log: &mut EventLogJournal<SimpleDBAdapter, VirtualFS>) -> RuntimeResult<()> {
log.commit_event(DbEvent::Pop)?;
self.as_mut().pop().unwrap();
Ok(())
}
fn clear(&self, log: &mut EventLogJournal<SimpleDBAdapter, VirtualFS>) -> RuntimeResult<()> {
log.commit_event(DbEvent::Clear)?;
self.as_mut().clear();
Ok(())
}
}
enum DbEvent<'a> {
Push(&'a str),
Pop,
Clear,
}
enum DbEventDecoded {
Push(String),
Pop,
Clear,
}
struct SimpleDBAdapter;
impl EventLogAdapter for SimpleDBAdapter {
type SdssSpec = SystemDatabaseV1;
type GlobalState = SimpleDB;
type Event<'a> = DbEvent<'a>;
type DecodedEvent = DbEventDecoded;
type EventMeta = u64;
type Error = fractal::error::Error;
const EV_MAX: u8 = 2;
unsafe fn meta_from_raw(m: u64) -> Self::EventMeta {
m
}
fn event_md<'a>(event: &Self::Event<'a>) -> u64 {
match event {
DbEvent::Push(_) => 0,
DbEvent::Pop => 1,
DbEvent::Clear => 2,
}
}
fn encode<'a>(event: Self::Event<'a>) -> Box<[u8]> {
if let DbEvent::Push(k) = event {
let mut buf = Vec::new();
buf.extend(&(k.len() as u64).to_le_bytes());
buf.extend(k.as_bytes());
buf.into_boxed_slice()
} else {
Default::default()
}
}
fn decode(block: Vec<u8>, m: u64) -> Result<Self::DecodedEvent, Self::Error> {
Ok(match m {
0 => {
if block.len() < sizeof!(u64) {
return Err(StorageError::RawJournalCorrupted.into());
}
let len =
u64::from_le_bytes(unsafe { unsafe_apis::memcpy(&block[..sizeof!(u64)]) });
let block = &block[sizeof!(u64)..];
if block.len() as u64 != len {
return Err(StorageError::RawJournalCorrupted.into());
}
DbEventDecoded::Push(String::from_utf8_lossy(block).into())
}
1 => DbEventDecoded::Pop,
2 => DbEventDecoded::Clear,
_ => panic!(),
})
}
fn apply_event(g: &Self::GlobalState, ev: Self::DecodedEvent) -> Result<(), Self::Error> {
match ev {
DbEventDecoded::Push(new) => g.as_mut().push(new),
DbEventDecoded::Pop => {
let _ = g.as_mut().pop();
}
DbEventDecoded::Clear => g.as_mut().clear(),
}
Ok(())
}
}
#[test]
fn event_log_basic_events() {
array!(const VALUES: [&str] = ["key1", "key2", "key3", "fancykey", "done"]);
{
let mut log = create_journal::<EventLog<SimpleDBAdapter>, _>("jrnl1").unwrap();
let db = SimpleDB::default();
for value in VALUES {
db.push(&mut log, value).unwrap();
}
db.pop(&mut log).unwrap();
EventLogJournal::close_driver(&mut log).unwrap();
}
{
let db = SimpleDB::default();
let mut log = open_journal::<EventLog<SimpleDBAdapter>, VirtualFS>("jrnl1", &db).unwrap();
EventLogJournal::close_driver(&mut log).unwrap();
assert_eq!(
db.as_ref().as_slice().last().unwrap(),
VALUES[VALUES.len() - 2]
);
assert_eq!(db.as_ref().as_slice(), &VALUES[..VALUES.len() - 1]);
}
}
Loading…
Cancel
Save