Enable generic events to be passed to the journal

next
Sayan Nandan 7 months ago
parent 794d4e3806
commit 196fd746e6
No known key found for this signature in database
GPG Key ID: 0EBD769024B24F0A

@ -26,100 +26,81 @@
#![allow(dead_code)]
mod raw;
#[cfg(test)]
mod tests;
use {
self::raw::{CommitPreference, JournalInitializer, RawJournalAdapter},
crate::engine::{
error::StorageError,
fractal,
storage::common::{
checksum::SCrc64,
interface::fs_traits::{FSInterface, FileInterface},
sdss::sdss_r1::{
rw::{TrackedReader, TrackedWriter},
FileSpecV1,
self::raw::{CommitPreference, RawJournalAdapter, RawJournalAdapterEvent, RawJournalWriter},
crate::{
engine::{
error::StorageError,
storage::common::{
checksum::SCrc64,
interface::fs_traits::{FSInterface, FileInterface},
sdss::sdss_r1::{
rw::{TrackedReader, TrackedWriter},
FileSpecV1,
},
},
RuntimeResult,
},
RuntimeResult,
util::compiler::TaggedEnum,
},
std::marker::PhantomData,
std::{marker::PhantomData, mem, ops::Index},
};
/*
Event log adapter
*/
mod raw;
#[cfg(test)]
mod tests;
/// A journal based on an [`EventLog`]
pub type EventLogJournal<E, Fs> = raw::RawJournalWriter<EventLog<E>, Fs>;
pub type EventLogDriver<EL, Fs> = RawJournalWriter<EventLog<EL>, Fs>;
pub struct EventLog<EL: EventLogAdapter>(PhantomData<EL>);
impl<EL: EventLogAdapter> EventLog<EL> {
pub fn close<Fs: FSInterface>(me: &mut EventLogDriver<EL, Fs>) -> RuntimeResult<()> {
RawJournalWriter::close_driver(me)
}
}
/// An [`EventLog`] is a standard, append-only, sequential journal with per-event and per-cycle integrity protection
pub struct EventLog<E: EventLogAdapter>(PhantomData<E>);
type DispatchFn<G> = fn(&G, Vec<u8>) -> RuntimeResult<()>;
/// An adapter that provides the specification for an event log
pub trait EventLogAdapter {
/// the SDSS file spec
type SdssSpec: FileSpecV1;
/// global state
type Spec: FileSpecV1;
type GlobalState;
/// the event type
type Event<'a>;
/// the decoded event type
type DecodedEvent;
/// event metadata
type EventMeta: Copy;
/// the error type
type Error: Into<fractal::error::Error>;
/// the maximum value for the event discriminant
const EV_MAX: u8;
/// get metadata from the raw value
unsafe fn meta_from_raw(m: u64) -> Self::EventMeta;
/// get metadata from the event
fn event_md<'a>(event: &Self::Event<'a>) -> u64;
/// encode an event
fn encode<'a>(event: Self::Event<'a>) -> Box<[u8]>;
/// decode an event
fn decode(block: Vec<u8>, kind: Self::EventMeta) -> Result<Self::DecodedEvent, Self::Error>;
/// apply the event
fn apply_event(g: &Self::GlobalState, ev: Self::DecodedEvent) -> Result<(), Self::Error>;
type EventMeta: TaggedEnum<Dscr = u8>;
type DecodeDispatch: Index<usize, Output = DispatchFn<Self::GlobalState>>;
const DECODE_DISPATCH: Self::DecodeDispatch;
const ENSURE: () = assert!(
(mem::size_of::<Self::DecodeDispatch>() / mem::size_of::<DispatchFn<Self::GlobalState>>())
== Self::EventMeta::VARIANT_COUNT as usize
);
}
impl<E: EventLogAdapter> RawJournalAdapter for EventLog<E> {
const COMMIT_PREFERENCE: CommitPreference = CommitPreference::Direct;
type Spec = <E as EventLogAdapter>::SdssSpec;
type GlobalState = <E as EventLogAdapter>::GlobalState;
impl<EL: EventLogAdapter> RawJournalAdapter for EventLog<EL> {
const COMMIT_PREFERENCE: CommitPreference = {
let _ = EL::ENSURE;
CommitPreference::Direct
};
type Spec = <EL as EventLogAdapter>::Spec;
type GlobalState = <EL as EventLogAdapter>::GlobalState;
type Context<'a> = () where Self: 'a;
type Event<'a> = <E as EventLogAdapter>::Event<'a>;
type DecodedEvent = <E as EventLogAdapter>::DecodedEvent;
type EventMeta = <E as EventLogAdapter>::EventMeta;
fn initialize(_: &JournalInitializer) -> Self {
type EventMeta = <EL as EventLogAdapter>::EventMeta;
fn initialize(_: &raw::JournalInitializer) -> Self {
Self(PhantomData)
}
fn enter_context<'a, Fs: FSInterface>(
_: &'a mut raw::RawJournalWriter<Self, Fs>,
_: &'a mut RawJournalWriter<Self, Fs>,
) -> Self::Context<'a> {
()
}
fn parse_event_meta(meta: u64) -> Option<Self::EventMeta> {
if meta > <E as EventLogAdapter>::EV_MAX as u64 {
return None;
}
unsafe {
// UNSAFE(@ohsayan): checked max
Some(<E as EventLogAdapter>::meta_from_raw(meta))
}
}
fn get_event_md<'a>(&self, event: &Self::Event<'a>) -> u64 {
<E as EventLogAdapter>::event_md(event)
<<EL as EventLogAdapter>::EventMeta as TaggedEnum>::try_from_raw(meta as u8)
}
fn commit_direct<'a, Fs: FSInterface>(
fn commit_direct<'a, Fs: FSInterface, E>(
&mut self,
w: &mut TrackedWriter<Fs::File, Self::Spec>,
event: Self::Event<'a>,
) -> RuntimeResult<()> {
let pl = <E as EventLogAdapter>::encode(event);
ev: E,
) -> RuntimeResult<()>
where
E: RawJournalAdapterEvent<Self>,
{
let mut pl = vec![];
ev.write_buffered(&mut pl);
let plen = (pl.len() as u64).to_le_bytes();
let mut checksum = SCrc64::new();
checksum.update(&plen);
@ -132,16 +113,14 @@ impl<E: EventLogAdapter> RawJournalAdapter for EventLog<E> {
w.tracked_write(&plen)?;
w.tracked_write(&pl)
}
fn parse_event<'a, Fs: FSInterface>(
fn decode_apply<'a, Fs: FSInterface>(
gs: &Self::GlobalState,
meta: Self::EventMeta,
file: &mut TrackedReader<
<<Fs as FSInterface>::File as FileInterface>::BufReader,
Self::Spec,
>,
m: Self::EventMeta,
) -> RuntimeResult<Self::DecodedEvent> {
/*
verify checksum
*/
) -> RuntimeResult<()> {
let expected_checksum = u64::from_le_bytes(file.read_block()?);
let plen = u64::from_le_bytes(file.read_block()?);
let mut pl = vec![0; plen as usize];
@ -152,13 +131,9 @@ impl<E: EventLogAdapter> RawJournalAdapter for EventLog<E> {
if this_checksum.finish() != expected_checksum {
return Err(StorageError::RawJournalCorrupted.into());
}
<E as EventLogAdapter>::decode(pl, m).map_err(Into::into)
}
fn apply_event<'a>(
gs: &Self::GlobalState,
_: Self::EventMeta,
event: Self::DecodedEvent,
) -> RuntimeResult<()> {
<E as EventLogAdapter>::apply_event(gs, event).map_err(Into::into)
<EL as EventLogAdapter>::DECODE_DISPATCH
[<<EL as EventLogAdapter>::EventMeta as TaggedEnum>::dscr_u64(&meta) as usize](
gs, pl
)
}
}

@ -144,7 +144,6 @@ pub enum JournalReaderTraceEvent {
AttemptingEvent(u64),
DetectedServerEvent,
ServerEventMetadataParsed,
ServerEventParsed,
ServerEventAppliedSuccess,
// drv events
DriverEventExpectingClose,
@ -208,6 +207,19 @@ macro_rules! jtrace_reader {
impls
*/
pub trait RawJournalAdapterEvent<CA: RawJournalAdapter>: Sized {
fn md(&self) -> u64;
fn write_direct<Fs: FSInterface>(
self,
_: &mut TrackedWriter<Fs::File, <CA as RawJournalAdapter>::Spec>,
) -> RuntimeResult<()> {
unimplemented!()
}
fn write_buffered(self, _: &mut Vec<u8>) {
unimplemented!()
}
}
/// An adapter defining the low-level structure of a log file
pub trait RawJournalAdapter: Sized {
/// event size buffer
@ -224,12 +236,8 @@ pub trait RawJournalAdapter: Sized {
type Context<'a>
where
Self: 'a;
/// a journal event
type Event<'a>;
/// the decoded event
type DecodedEvent;
/// a type representing the event kind
type EventMeta: Copy;
type EventMeta;
/// initialize this adapter
fn initialize(j_: &JournalInitializer) -> Self;
/// get a write context
@ -238,33 +246,32 @@ pub trait RawJournalAdapter: Sized {
) -> Self::Context<'a>;
/// parse event metadata
fn parse_event_meta(meta: u64) -> Option<Self::EventMeta>;
/// get event metadata as an [`u64`]
fn get_event_md<'a>(&self, event: &Self::Event<'a>) -> u64;
/// commit event (direct preference)
fn commit_direct<'a, Fs: FSInterface>(
fn commit_direct<'a, Fs: FSInterface, E>(
&mut self,
_: &mut TrackedWriter<Fs::File, Self::Spec>,
_: Self::Event<'a>,
) -> RuntimeResult<()> {
_: E,
) -> RuntimeResult<()>
where
E: RawJournalAdapterEvent<Self>,
{
unimplemented!()
}
/// commit event (buffered)
fn commit_buffered<'a>(&mut self, _: &mut Vec<u8>, _: Self::Event<'a>) {
fn commit_buffered<'a, E>(&mut self, _: &mut Vec<u8>, _: E)
where
E: RawJournalAdapterEvent<Self>,
{
unimplemented!()
}
/// parse the event
fn parse_event<'a, Fs: FSInterface>(
/// decode and apply the event
fn decode_apply<'a, Fs: FSInterface>(
gs: &Self::GlobalState,
meta: Self::EventMeta,
file: &mut TrackedReader<
<<Fs as FSInterface>::File as FileInterface>::BufReader,
Self::Spec,
>,
meta: Self::EventMeta,
) -> RuntimeResult<Self::DecodedEvent>;
/// apply the event
fn apply_event<'a>(
gs: &Self::GlobalState,
meta: Self::EventMeta,
event: Self::DecodedEvent,
) -> RuntimeResult<()>;
}
@ -275,15 +282,14 @@ pub enum CommitPreference {
}
#[derive(Debug, PartialEq)]
/**
A driver event
---
Structured as:
+------------------+----------+--------------+------------------+-------------------+-----------------+-----------------+
| 16B: Event ID | 8B: Meta | 8B: Checksum | 8B: Payload size | 8B: prev checksum | 8B: prev offset | 8B: prev txn id |
+------------------+----------+--------------+------------------+-------------------+-----------------+-----------------+
/*
A driver event
---
Structured as:
+------------------+----------+--------------+------------------+-------------------+-----------------+-----------------+
| 16B: Event ID | 8B: Meta | 8B: Checksum | 8B: Payload size | 8B: prev checksum | 8B: prev offset | 8B: prev txn id |
+------------------+----------+--------------+------------------+-------------------+-----------------+-----------------+
*/
struct DriverEvent {
txn_id: u128,
event: DriverEventKind,
@ -493,10 +499,13 @@ impl<J: RawJournalAdapter, Fs: FSInterface> RawJournalWriter<J, Fs> {
/// Commit a new event to the journal
///
/// This will auto-flush the buffer and sync metadata as soon as the [`RawJournalAdapter::commit`] method returns,
/// unless otherwise configured
pub fn commit_event<'a>(&mut self, event: J::Event<'a>) -> RuntimeResult<()> {
/// unless otherwise configured.
pub fn commit_event<'a, E: RawJournalAdapterEvent<J>>(
&mut self,
event: E,
) -> RuntimeResult<()> {
self.txn_context(|me, txn_id| {
let ev_md = me.j.get_event_md(&event);
let ev_md = event.md();
jtrace_writer!(CommitAttemptForEvent(txn_id as u64));
// MSB must be unused; set msb
debug_assert!(ev_md & SERVER_EV_MASK != 1, "MSB must be unset");
@ -520,7 +529,7 @@ impl<J: RawJournalAdapter, Fs: FSInterface> RawJournalWriter<J, Fs> {
log_file.tracked_write(&ev_md.to_le_bytes())?;
jtrace_writer!(CommitServerEventWroteMetadata);
// now hand over control to adapter impl
J::commit_direct::<Fs>(j, log_file, event)?;
J::commit_direct::<Fs, _>(j, log_file, event)?;
}
}
jtrace_writer!(CommitServerEventAdapterCompleted);
@ -674,10 +683,8 @@ impl<J: RawJournalAdapter, Fs: FSInterface> RawJournalReader<J, Fs> {
jtrace_reader!(ServerEventMetadataParsed);
// now parse the actual event
let Self { tr: reader, .. } = self;
let event = J::parse_event::<Fs>(reader, meta)?;
jtrace_reader!(ServerEventParsed);
// we do not consider a parsed event a success signal; so we must actually apply it
match J::apply_event(gs, meta, event) {
match J::decode_apply::<Fs>(gs, meta, reader) {
Ok(()) => {
jtrace_reader!(ServerEventAppliedSuccess);
Self::__refresh_known_txn(self);

@ -27,7 +27,7 @@
use {
super::{
create_journal, open_journal, CommitPreference, DriverEvent, DriverEventKind,
JournalInitializer, RawJournalAdapter, RawJournalWriter,
JournalInitializer, RawJournalAdapter, RawJournalAdapterEvent, RawJournalWriter,
},
crate::engine::{
error::StorageError,
@ -63,7 +63,7 @@ fn encode_decode_meta() {
*/
#[derive(Debug, Clone, PartialEq)]
struct SimpleDB {
pub struct SimpleDB {
data: RefCell<Vec<String>>,
}
impl SimpleDB {
@ -79,13 +79,13 @@ impl SimpleDB {
&mut self,
log: &mut RawJournalWriter<SimpleDBJournal, VirtualFS>,
) -> RuntimeResult<()> {
log.commit_event(DbEventEncoded::Clear)?;
log.commit_event(DbEventClear)?;
self.data.get_mut().clear();
Ok(())
}
fn pop(&mut self, log: &mut RawJournalWriter<SimpleDBJournal, VirtualFS>) -> RuntimeResult<()> {
self.data.get_mut().pop().unwrap();
log.commit_event(DbEventEncoded::Pop)?;
log.commit_event(DbEventPop)?;
Ok(())
}
fn push(
@ -94,24 +94,58 @@ impl SimpleDB {
new: impl ToString,
) -> RuntimeResult<()> {
let new = new.to_string();
log.commit_event(DbEventEncoded::NewKey(&new))?;
log.commit_event(DbEventPush(&new))?;
self.data.get_mut().push(new);
Ok(())
}
}
struct SimpleDBJournal;
enum DbEventEncoded<'a> {
NewKey(&'a str),
Pop,
Clear,
/*
event impls
*/
pub struct SimpleDBJournal;
struct DbEventPush<'a>(&'a str);
struct DbEventPop;
struct DbEventClear;
trait SimpleDBEvent: Sized {
const OPC: u8;
fn write_buffered(self, _: &mut Vec<u8>);
}
macro_rules! impl_db_event {
($($ty:ty as $code:expr $(=> $expr:expr)?),*) => {
$(impl SimpleDBEvent for $ty {
const OPC: u8 = $code;
fn write_buffered(self, buf: &mut Vec<u8>) { let _ = buf; fn do_it(s: $ty, b: &mut Vec<u8>, f: impl Fn($ty, &mut Vec<u8>)) { f(s, b) } $(do_it(self, buf, $expr))? }
})*
}
}
impl_db_event!(
DbEventPush<'_> as 0 => |me, buf| {
buf.extend(&(me.0.len() as u64).to_le_bytes());
buf.extend(me.0.as_bytes());
},
DbEventPop as 1,
DbEventClear as 2
);
impl<T: SimpleDBEvent> RawJournalAdapterEvent<SimpleDBJournal> for T {
fn md(&self) -> u64 {
T::OPC as _
}
fn write_buffered(self, buf: &mut Vec<u8>) {
T::write_buffered(self, buf)
}
}
enum DbEventRestored {
pub enum DbEventRestored {
NewKey(String),
Pop,
Clear,
}
#[derive(Debug, PartialEq, Clone, Copy)]
enum EventMeta {
pub enum EventMeta {
NewKey,
Pop,
Clear,
@ -120,8 +154,6 @@ impl RawJournalAdapter for SimpleDBJournal {
const COMMIT_PREFERENCE: CommitPreference = CommitPreference::Buffered;
type Spec = SystemDatabaseV1;
type GlobalState = SimpleDB;
type Event<'a> = DbEventEncoded<'a>;
type DecodedEvent = DbEventRestored;
type EventMeta = EventMeta;
type Context<'a> = () where Self: 'a;
fn initialize(_: &JournalInitializer) -> Self {
@ -140,52 +172,34 @@ impl RawJournalAdapter for SimpleDBJournal {
_ => return None,
})
}
fn get_event_md<'a>(&self, event: &Self::Event<'a>) -> u64 {
match event {
DbEventEncoded::NewKey(_) => 0,
DbEventEncoded::Pop => 1,
DbEventEncoded::Clear => 2,
}
}
fn commit_buffered<'a>(&mut self, buf: &mut Vec<u8>, event: Self::Event<'a>) {
if let DbEventEncoded::NewKey(key) = event {
buf.extend((key.len() as u64).to_le_bytes());
buf.extend(key.as_bytes());
}
fn commit_buffered<'a, E: RawJournalAdapterEvent<Self>>(
&mut self,
buf: &mut Vec<u8>,
event: E,
) {
event.write_buffered(buf)
}
fn parse_event<'a, Fs: FSInterface>(
fn decode_apply<'a, Fs: FSInterface>(
gs: &Self::GlobalState,
meta: Self::EventMeta,
file: &mut TrackedReader<
<<Fs as FSInterface>::File as FileInterface>::BufReader,
Self::Spec,
>,
meta: Self::EventMeta,
) -> RuntimeResult<Self::DecodedEvent> {
Ok(match meta {
) -> RuntimeResult<()> {
match meta {
EventMeta::NewKey => {
let key_size = u64::from_le_bytes(file.read_block()?);
let mut keybuf = vec![0u8; key_size as usize];
file.tracked_read(&mut keybuf)?;
match String::from_utf8(keybuf) {
Ok(k) => DbEventRestored::NewKey(k),
Ok(k) => gs.data.borrow_mut().push(k),
Err(_) => return Err(StorageError::RawJournalEventCorrupted.into()),
}
}
EventMeta::Clear => DbEventRestored::Clear,
EventMeta::Pop => DbEventRestored::Pop,
})
}
fn apply_event<'a>(
gs: &Self::GlobalState,
_: Self::EventMeta,
event: Self::DecodedEvent,
) -> RuntimeResult<()> {
match event {
DbEventRestored::NewKey(k) => gs.data.borrow_mut().push(k),
DbEventRestored::Clear => gs.data.borrow_mut().clear(),
DbEventRestored::Pop => {
if gs.data.borrow_mut().pop().is_none() {
return Err(StorageError::RawJournalCorrupted.into());
}
EventMeta::Clear => gs.data.borrow_mut().clear(),
EventMeta::Pop => {
let _ = gs.data.borrow_mut().pop().unwrap();
}
}
Ok(())
@ -354,7 +368,6 @@ fn journal_with_server_single_event() {
JournalReaderTraceEvent::AttemptingEvent(0),
JournalReaderTraceEvent::DetectedServerEvent,
JournalReaderTraceEvent::ServerEventMetadataParsed,
JournalReaderTraceEvent::ServerEventParsed,
JournalReaderTraceEvent::ServerEventAppliedSuccess,
// now read close event
JournalReaderTraceEvent::AttemptingEvent(1),
@ -402,7 +415,6 @@ fn journal_with_server_single_event() {
JournalReaderTraceEvent::AttemptingEvent(0),
JournalReaderTraceEvent::DetectedServerEvent,
JournalReaderTraceEvent::ServerEventMetadataParsed,
JournalReaderTraceEvent::ServerEventParsed,
JournalReaderTraceEvent::ServerEventAppliedSuccess,
// now read close event
JournalReaderTraceEvent::AttemptingEvent(1),

@ -29,146 +29,188 @@
*/
use {
super::{raw::create_journal, EventLog, EventLogAdapter, EventLogJournal},
crate::engine::{
error::StorageError,
fractal,
mem::unsafe_apis,
storage::{
common::interface::fs_test::VirtualFS,
v2::raw::{journal::raw::open_journal, spec::SystemDatabaseV1},
super::{raw::RawJournalAdapterEvent, DispatchFn, EventLog, EventLogAdapter, EventLogDriver},
crate::{
engine::{
error::StorageError,
mem::unsafe_apis,
storage::{
common::interface::fs_test::VirtualFS,
v2::raw::{
journal::raw::{create_journal, open_journal},
spec::SystemDatabaseV1,
},
},
RuntimeResult,
},
RuntimeResult,
util::compiler::TaggedEnum,
},
sky_macros::TaggedEnum,
std::cell::{Ref, RefCell, RefMut},
};
// event definitions
#[derive(TaggedEnum, Clone, Copy, Debug)]
#[repr(u8)]
pub enum TestEvent {
Push = 0,
Pop = 1,
Clear = 2,
}
pub trait IsTestEvent {
const EVCODE: TestEvent;
fn encode(self, _: &mut Vec<u8>);
}
macro_rules! impl_test_event {
($($ty:ty as $code:expr $(=> $expr:expr)?),* $(,)?) => {
$(impl IsTestEvent for $ty {
const EVCODE: TestEvent = $code;
fn encode(self, buf: &mut Vec<u8>) { let _ = buf; fn do_it(s: $ty, b: &mut Vec<u8>, f: impl Fn($ty, &mut Vec<u8>)) { f(s, b) } $(do_it(self, buf, $expr))? }
})*
}
}
pub struct EventPush<'a>(&'a str);
pub struct EventPop;
pub struct EventClear;
impl_test_event!(
EventPush<'_> as TestEvent::Push => |me, buf| {
buf.extend(&(me.0.len() as u64).to_le_bytes());
buf.extend(me.0.as_bytes())
},
EventPop as TestEvent::Pop,
EventClear as TestEvent::Clear,
);
impl<TE: IsTestEvent> RawJournalAdapterEvent<EventLog<TestDBAdapter>> for TE {
fn md(&self) -> u64 {
Self::EVCODE.dscr_u64()
}
fn write_buffered(self, buf: &mut Vec<u8>) {
TE::encode(self, buf)
}
}
// adapter
pub struct TestDBAdapter;
impl EventLogAdapter for TestDBAdapter {
type Spec = SystemDatabaseV1;
type GlobalState = TestDB;
type EventMeta = TestEvent;
type DecodeDispatch = [DispatchFn<TestDB>; 3];
const DECODE_DISPATCH: Self::DecodeDispatch = [
|db, payload| {
if payload.len() < sizeof!(u64) {
Err(StorageError::RawJournalCorrupted.into())
} else {
let length =
u64::from_le_bytes(unsafe { unsafe_apis::memcpy(&payload[..sizeof!(u64)]) });
let payload = &payload[sizeof!(u64)..];
if payload.len() as u64 != length {
Err(StorageError::RawJournalCorrupted.into())
} else {
let string = String::from_utf8(payload.to_owned()).unwrap();
db._mut().push(string);
Ok(())
}
}
},
|db, _| {
let _ = db._mut().pop();
Ok(())
},
|db, _| {
db._mut().clear();
Ok(())
},
];
}
#[derive(Default)]
pub struct SimpleDB {
values: RefCell<Vec<String>>,
pub struct TestDB {
data: RefCell<Vec<String>>,
}
impl SimpleDB {
fn as_mut(&self) -> RefMut<Vec<String>> {
self.values.borrow_mut()
impl TestDB {
fn _mut(&self) -> RefMut<Vec<String>> {
self.data.borrow_mut()
}
fn as_ref(&self) -> Ref<Vec<String>> {
self.values.borrow()
fn _ref(&self) -> Ref<Vec<String>> {
self.data.borrow()
}
fn push(
&self,
log: &mut EventLogJournal<SimpleDBAdapter, VirtualFS>,
log: &mut EventLogDriver<TestDBAdapter, VirtualFS>,
key: &str,
) -> RuntimeResult<()> {
log.commit_event(DbEvent::Push(key))?;
self.as_mut().push(key.into());
log.commit_event(EventPush(key))?;
self._mut().push(key.into());
Ok(())
}
fn pop(&self, log: &mut EventLogJournal<SimpleDBAdapter, VirtualFS>) -> RuntimeResult<()> {
log.commit_event(DbEvent::Pop)?;
self.as_mut().pop().unwrap();
fn pop(&self, log: &mut EventLogDriver<TestDBAdapter, VirtualFS>) -> RuntimeResult<()> {
assert!(!self._ref().is_empty());
log.commit_event(EventPop)?;
self._mut().pop().unwrap();
Ok(())
}
fn clear(&self, log: &mut EventLogJournal<SimpleDBAdapter, VirtualFS>) -> RuntimeResult<()> {
log.commit_event(DbEvent::Clear)?;
self.as_mut().clear();
fn clear(&self, log: &mut EventLogDriver<TestDBAdapter, VirtualFS>) -> RuntimeResult<()> {
log.commit_event(EventClear)?;
self._mut().clear();
Ok(())
}
}
enum DbEvent<'a> {
Push(&'a str),
Pop,
Clear,
fn open_log() -> (
TestDB,
super::raw::RawJournalWriter<EventLog<TestDBAdapter>, VirtualFS>,
) {
let db = TestDB::default();
let log = open_journal("jrnl", &db).unwrap();
(db, log)
}
enum DbEventDecoded {
Push(String),
Pop,
Clear,
}
struct SimpleDBAdapter;
impl EventLogAdapter for SimpleDBAdapter {
type SdssSpec = SystemDatabaseV1;
type GlobalState = SimpleDB;
type Event<'a> = DbEvent<'a>;
type DecodedEvent = DbEventDecoded;
type EventMeta = u64;
type Error = fractal::error::Error;
const EV_MAX: u8 = 2;
unsafe fn meta_from_raw(m: u64) -> Self::EventMeta {
m
}
fn event_md<'a>(event: &Self::Event<'a>) -> u64 {
match event {
DbEvent::Push(_) => 0,
DbEvent::Pop => 1,
DbEvent::Clear => 2,
}
}
fn encode<'a>(event: Self::Event<'a>) -> Box<[u8]> {
if let DbEvent::Push(k) = event {
let mut buf = Vec::new();
buf.extend(&(k.len() as u64).to_le_bytes());
buf.extend(k.as_bytes());
buf.into_boxed_slice()
} else {
Default::default()
#[test]
fn test_this_data() {
array!(
const DATA1: [&str] = ["acai berry", "billberry", "cranberry"];
const DATA2: [&str] = ["acai berry", "billberry", "cranberry", "bradbury"];
const DATA3: [&str] = ["acai berry", "billberry", "cranberry"];
const DATA4: [&str] = ["acai berry", "billberry", "cranberry", "dewberry"];
);
{
let db = TestDB::default();
let mut log = create_journal("jrnl").unwrap();
for key in DATA1 {
db.push(&mut log, key).unwrap();
}
EventLog::close(&mut log).unwrap();
}
fn decode(block: Vec<u8>, m: u64) -> Result<Self::DecodedEvent, Self::Error> {
Ok(match m {
0 => {
if block.len() < sizeof!(u64) {
return Err(StorageError::RawJournalCorrupted.into());
}
let len =
u64::from_le_bytes(unsafe { unsafe_apis::memcpy(&block[..sizeof!(u64)]) });
let block = &block[sizeof!(u64)..];
if block.len() as u64 != len {
return Err(StorageError::RawJournalCorrupted.into());
}
DbEventDecoded::Push(String::from_utf8_lossy(block).into())
}
1 => DbEventDecoded::Pop,
2 => DbEventDecoded::Clear,
_ => panic!(),
})
}
fn apply_event(g: &Self::GlobalState, ev: Self::DecodedEvent) -> Result<(), Self::Error> {
match ev {
DbEventDecoded::Push(new) => g.as_mut().push(new),
DbEventDecoded::Pop => {
let _ = g.as_mut().pop();
}
DbEventDecoded::Clear => g.as_mut().clear(),
}
Ok(())
{
let (db, mut log) = open_log();
assert_eq!(db._ref().as_slice(), DATA1);
db.push(&mut log, DATA2[3]).unwrap();
EventLog::close(&mut log).unwrap();
}
}
#[test]
fn event_log_basic_events() {
array!(const VALUES: [&str] = ["key1", "key2", "key3", "fancykey", "done"]);
{
let mut log = create_journal::<EventLog<SimpleDBAdapter>, _>("jrnl1").unwrap();
let db = SimpleDB::default();
for value in VALUES {
db.push(&mut log, value).unwrap();
}
let (db, mut log) = open_log();
assert_eq!(db._ref().as_slice(), DATA2);
db.pop(&mut log).unwrap();
EventLogJournal::close_driver(&mut log).unwrap();
EventLog::close(&mut log).unwrap();
}
{
let (db, mut log) = open_log();
assert_eq!(db._ref().as_slice(), DATA3);
db.push(&mut log, DATA4[3]).unwrap();
EventLog::close(&mut log).unwrap();
}
{
let db = SimpleDB::default();
let mut log = open_journal::<EventLog<SimpleDBAdapter>, VirtualFS>("jrnl1", &db).unwrap();
EventLogJournal::close_driver(&mut log).unwrap();
assert_eq!(
db.as_ref().as_slice().last().unwrap(),
VALUES[VALUES.len() - 2]
);
assert_eq!(db.as_ref().as_slice(), &VALUES[..VALUES.len() - 1]);
let (db, mut log) = open_log();
assert_eq!(db._ref().as_slice(), DATA4);
EventLog::close(&mut log).unwrap();
}
}

@ -70,3 +70,26 @@ pub const fn hot<T>(v: T) -> T {
pub fn cold_rerr<T, E>(e: E) -> Result<T, E> {
Err(e)
}
/*
pure enumerations
*/
pub trait TaggedEnum: Sized {
type Dscr: PartialOrd;
const MAX_DSCR: Self::Dscr;
const VARIANT_COUNT: usize;
fn dscr(&self) -> Self::Dscr;
fn dscr_u64(&self) -> u64;
unsafe fn from_raw(d: Self::Dscr) -> Self;
fn try_from_raw(d: Self::Dscr) -> Option<Self> {
if d > Self::MAX_DSCR {
None
} else {
Some(unsafe {
// UNSAFE(@ohsayan): just verified the dscr
<Self as TaggedEnum>::from_raw(d)
})
}
}
}

@ -102,9 +102,83 @@ fn wrapper(item: DeriveInput) -> TokenStream2 {
}
}
#[proc_macro_derive(TaggedEnum)]
pub fn derive_tagged_enum(input: TokenStream) -> TokenStream {
let ast = parse_macro_input!(input as DeriveInput);
let (enum_name, _, value_expressions, variant_len, repr_type_ident) = process_enum_tags(&ast);
quote! {
impl crate::util::compiler::TaggedEnum for #enum_name {
type Dscr = #repr_type_ident;
const MAX_DSCR: #repr_type_ident = {
let values = #value_expressions;
let mut i = 1;
let mut max = values[0];
while i < values.len() {
if values[i] > max {
max = values[i];
}
i = i + 1;
}
max
};
const VARIANT_COUNT: usize = #variant_len;
fn dscr(&self) -> #repr_type_ident {
unsafe {
core::mem::transmute(*self)
}
}
fn dscr_u64(&self) -> u64 {
self.dscr() as u64
}
unsafe fn from_raw(d: #repr_type_ident) -> Self {
core::mem::transmute(d)
}
}
}
.into()
}
#[proc_macro_derive(EnumMethods)]
pub fn derive_value_methods(input: TokenStream) -> TokenStream {
let ast = parse_macro_input!(input as DeriveInput);
let (enum_name, repr_type, value_expressions, variant_len, repr_type_ident) = process_enum_tags(&ast);
let repr_type_ident_func = syn::Ident::new(
&format!("value_{repr_type}"),
proc_macro2::Span::call_site(),
);
let gen = quote! {
impl #enum_name {
pub const MAX: #repr_type_ident = Self::max_value();
pub const VARIANTS: usize = #variant_len;
pub const fn #repr_type_ident_func(&self) -> #repr_type_ident { unsafe { core::mem::transmute(*self) } }
pub const fn value_word(&self) -> usize { self.#repr_type_ident_func() as usize }
pub const fn value_qword(&self) -> u64 { self.#repr_type_ident_func() as u64 }
pub const fn max_value() -> #repr_type_ident {
let values = #value_expressions;
let mut i = 1;
let mut max = values[0];
while i < values.len() {
if values[i] > max {
max = values[i];
}
i = i + 1;
}
max
}
}
};
gen.into()
}
fn process_enum_tags(
ast: &DeriveInput,
) -> (
&proc_macro2::Ident,
String,
TokenStream2,
usize,
proc_macro2::Ident,
) {
let enum_name = &ast.ident;
let mut repr_type = None;
// Get repr attribute
@ -136,39 +210,17 @@ pub fn derive_value_methods(input: TokenStream) -> TokenStream {
} else {
panic!("This derive macro only works on enums");
}
assert!(!dscr_expressions.is_empty(), "must be a non-empty enumeration");
let value_expressions = quote! {
[#(#dscr_expressions),*]
};
let variant_len = dscr_expressions.len();
let repr_type_ident = syn::Ident::new(&repr_type, proc_macro2::Span::call_site());
let repr_type_ident_func = syn::Ident::new(
&format!("value_{repr_type}"),
proc_macro2::Span::call_site(),
);
let gen = quote! {
impl #enum_name {
pub const MAX: #repr_type_ident = Self::max_value();
pub const VARIANTS: usize = #variant_len;
pub const fn #repr_type_ident_func(&self) -> #repr_type_ident { unsafe { core::mem::transmute(*self) } }
pub const fn value_word(&self) -> usize { self.#repr_type_ident_func() as usize }
pub const fn value_qword(&self) -> u64 { self.#repr_type_ident_func() as u64 }
pub const fn max_value() -> #repr_type_ident {
let values = #value_expressions;
let mut i = 1;
let mut max = values[0];
while i < values.len() {
if values[i] > max {
max = values[i];
}
i = i + 1;
}
max
}
}
};
gen.into()
(
enum_name,
repr_type,
value_expressions,
variant_len,
repr_type_ident,
)
}

Loading…
Cancel
Save