From 196fd746e6f56f65af16f8555d1ac05894308366 Mon Sep 17 00:00:00 2001 From: Sayan Nandan Date: Thu, 15 Feb 2024 18:52:26 +0530 Subject: [PATCH] Enable generic events to be passed to the journal --- .../src/engine/storage/v2/raw/journal/mod.rs | 145 ++++------ .../engine/storage/v2/raw/journal/raw/mod.rs | 79 +++--- .../storage/v2/raw/journal/raw/tests.rs | 110 ++++---- .../engine/storage/v2/raw/journal/tests.rs | 262 ++++++++++-------- server/src/util/compiler.rs | 23 ++ sky-macros/src/lib.rs | 112 ++++++-- 6 files changed, 421 insertions(+), 310 deletions(-) diff --git a/server/src/engine/storage/v2/raw/journal/mod.rs b/server/src/engine/storage/v2/raw/journal/mod.rs index a5b87407..ce4d357b 100644 --- a/server/src/engine/storage/v2/raw/journal/mod.rs +++ b/server/src/engine/storage/v2/raw/journal/mod.rs @@ -26,100 +26,81 @@ #![allow(dead_code)] -mod raw; -#[cfg(test)] -mod tests; - use { - self::raw::{CommitPreference, JournalInitializer, RawJournalAdapter}, - crate::engine::{ - error::StorageError, - fractal, - storage::common::{ - checksum::SCrc64, - interface::fs_traits::{FSInterface, FileInterface}, - sdss::sdss_r1::{ - rw::{TrackedReader, TrackedWriter}, - FileSpecV1, + self::raw::{CommitPreference, RawJournalAdapter, RawJournalAdapterEvent, RawJournalWriter}, + crate::{ + engine::{ + error::StorageError, + storage::common::{ + checksum::SCrc64, + interface::fs_traits::{FSInterface, FileInterface}, + sdss::sdss_r1::{ + rw::{TrackedReader, TrackedWriter}, + FileSpecV1, + }, }, + RuntimeResult, }, - RuntimeResult, + util::compiler::TaggedEnum, }, - std::marker::PhantomData, + std::{marker::PhantomData, mem, ops::Index}, }; -/* - Event log adapter -*/ +mod raw; +#[cfg(test)] +mod tests; -/// A journal based on an [`EventLog`] -pub type EventLogJournal = raw::RawJournalWriter, Fs>; +pub type EventLogDriver = RawJournalWriter, Fs>; +pub struct EventLog(PhantomData); +impl EventLog { + pub fn close(me: &mut EventLogDriver) -> RuntimeResult<()> { + RawJournalWriter::close_driver(me) + } +} -/// An [`EventLog`] is a standard, append-only, sequential journal with per-event and per-cycle integrity protection -pub struct EventLog(PhantomData); +type DispatchFn = fn(&G, Vec) -> RuntimeResult<()>; -/// An adapter that provides the specification for an event log pub trait EventLogAdapter { - /// the SDSS file spec - type SdssSpec: FileSpecV1; - /// global state + type Spec: FileSpecV1; type GlobalState; - /// the event type - type Event<'a>; - /// the decoded event type - type DecodedEvent; - /// event metadata - type EventMeta: Copy; - /// the error type - type Error: Into; - /// the maximum value for the event discriminant - const EV_MAX: u8; - /// get metadata from the raw value - unsafe fn meta_from_raw(m: u64) -> Self::EventMeta; - /// get metadata from the event - fn event_md<'a>(event: &Self::Event<'a>) -> u64; - /// encode an event - fn encode<'a>(event: Self::Event<'a>) -> Box<[u8]>; - /// decode an event - fn decode(block: Vec, kind: Self::EventMeta) -> Result; - /// apply the event - fn apply_event(g: &Self::GlobalState, ev: Self::DecodedEvent) -> Result<(), Self::Error>; + type EventMeta: TaggedEnum; + type DecodeDispatch: Index>; + const DECODE_DISPATCH: Self::DecodeDispatch; + const ENSURE: () = assert!( + (mem::size_of::() / mem::size_of::>()) + == Self::EventMeta::VARIANT_COUNT as usize + ); } -impl RawJournalAdapter for EventLog { - const COMMIT_PREFERENCE: CommitPreference = CommitPreference::Direct; - type Spec = ::SdssSpec; - type GlobalState = ::GlobalState; +impl RawJournalAdapter for EventLog { + const COMMIT_PREFERENCE: CommitPreference = { + let _ = EL::ENSURE; + CommitPreference::Direct + }; + type Spec = ::Spec; + type GlobalState = ::GlobalState; type Context<'a> = () where Self: 'a; - type Event<'a> = ::Event<'a>; - type DecodedEvent = ::DecodedEvent; - type EventMeta = ::EventMeta; - fn initialize(_: &JournalInitializer) -> Self { + type EventMeta = ::EventMeta; + fn initialize(_: &raw::JournalInitializer) -> Self { Self(PhantomData) } fn enter_context<'a, Fs: FSInterface>( - _: &'a mut raw::RawJournalWriter, + _: &'a mut RawJournalWriter, ) -> Self::Context<'a> { - () } fn parse_event_meta(meta: u64) -> Option { - if meta > ::EV_MAX as u64 { - return None; - } - unsafe { - // UNSAFE(@ohsayan): checked max - Some(::meta_from_raw(meta)) - } - } - fn get_event_md<'a>(&self, event: &Self::Event<'a>) -> u64 { - ::event_md(event) + <::EventMeta as TaggedEnum>::try_from_raw(meta as u8) } - fn commit_direct<'a, Fs: FSInterface>( + fn commit_direct<'a, Fs: FSInterface, E>( &mut self, w: &mut TrackedWriter, - event: Self::Event<'a>, - ) -> RuntimeResult<()> { - let pl = ::encode(event); + ev: E, + ) -> RuntimeResult<()> + where + E: RawJournalAdapterEvent, + { + let mut pl = vec![]; + ev.write_buffered(&mut pl); let plen = (pl.len() as u64).to_le_bytes(); let mut checksum = SCrc64::new(); checksum.update(&plen); @@ -132,16 +113,14 @@ impl RawJournalAdapter for EventLog { w.tracked_write(&plen)?; w.tracked_write(&pl) } - fn parse_event<'a, Fs: FSInterface>( + fn decode_apply<'a, Fs: FSInterface>( + gs: &Self::GlobalState, + meta: Self::EventMeta, file: &mut TrackedReader< <::File as FileInterface>::BufReader, Self::Spec, >, - m: Self::EventMeta, - ) -> RuntimeResult { - /* - verify checksum - */ + ) -> RuntimeResult<()> { let expected_checksum = u64::from_le_bytes(file.read_block()?); let plen = u64::from_le_bytes(file.read_block()?); let mut pl = vec![0; plen as usize]; @@ -152,13 +131,9 @@ impl RawJournalAdapter for EventLog { if this_checksum.finish() != expected_checksum { return Err(StorageError::RawJournalCorrupted.into()); } - ::decode(pl, m).map_err(Into::into) - } - fn apply_event<'a>( - gs: &Self::GlobalState, - _: Self::EventMeta, - event: Self::DecodedEvent, - ) -> RuntimeResult<()> { - ::apply_event(gs, event).map_err(Into::into) + ::DECODE_DISPATCH + [<::EventMeta as TaggedEnum>::dscr_u64(&meta) as usize]( + gs, pl + ) } } diff --git a/server/src/engine/storage/v2/raw/journal/raw/mod.rs b/server/src/engine/storage/v2/raw/journal/raw/mod.rs index 98d0f940..ccece463 100644 --- a/server/src/engine/storage/v2/raw/journal/raw/mod.rs +++ b/server/src/engine/storage/v2/raw/journal/raw/mod.rs @@ -144,7 +144,6 @@ pub enum JournalReaderTraceEvent { AttemptingEvent(u64), DetectedServerEvent, ServerEventMetadataParsed, - ServerEventParsed, ServerEventAppliedSuccess, // drv events DriverEventExpectingClose, @@ -208,6 +207,19 @@ macro_rules! jtrace_reader { impls */ +pub trait RawJournalAdapterEvent: Sized { + fn md(&self) -> u64; + fn write_direct( + self, + _: &mut TrackedWriter::Spec>, + ) -> RuntimeResult<()> { + unimplemented!() + } + fn write_buffered(self, _: &mut Vec) { + unimplemented!() + } +} + /// An adapter defining the low-level structure of a log file pub trait RawJournalAdapter: Sized { /// event size buffer @@ -224,12 +236,8 @@ pub trait RawJournalAdapter: Sized { type Context<'a> where Self: 'a; - /// a journal event - type Event<'a>; - /// the decoded event - type DecodedEvent; /// a type representing the event kind - type EventMeta: Copy; + type EventMeta; /// initialize this adapter fn initialize(j_: &JournalInitializer) -> Self; /// get a write context @@ -238,33 +246,32 @@ pub trait RawJournalAdapter: Sized { ) -> Self::Context<'a>; /// parse event metadata fn parse_event_meta(meta: u64) -> Option; - /// get event metadata as an [`u64`] - fn get_event_md<'a>(&self, event: &Self::Event<'a>) -> u64; /// commit event (direct preference) - fn commit_direct<'a, Fs: FSInterface>( + fn commit_direct<'a, Fs: FSInterface, E>( &mut self, _: &mut TrackedWriter, - _: Self::Event<'a>, - ) -> RuntimeResult<()> { + _: E, + ) -> RuntimeResult<()> + where + E: RawJournalAdapterEvent, + { unimplemented!() } /// commit event (buffered) - fn commit_buffered<'a>(&mut self, _: &mut Vec, _: Self::Event<'a>) { + fn commit_buffered<'a, E>(&mut self, _: &mut Vec, _: E) + where + E: RawJournalAdapterEvent, + { unimplemented!() } - /// parse the event - fn parse_event<'a, Fs: FSInterface>( + /// decode and apply the event + fn decode_apply<'a, Fs: FSInterface>( + gs: &Self::GlobalState, + meta: Self::EventMeta, file: &mut TrackedReader< <::File as FileInterface>::BufReader, Self::Spec, >, - meta: Self::EventMeta, - ) -> RuntimeResult; - /// apply the event - fn apply_event<'a>( - gs: &Self::GlobalState, - meta: Self::EventMeta, - event: Self::DecodedEvent, ) -> RuntimeResult<()>; } @@ -275,15 +282,14 @@ pub enum CommitPreference { } #[derive(Debug, PartialEq)] -/** -A driver event ---- -Structured as: -+------------------+----------+--------------+------------------+-------------------+-----------------+-----------------+ -| 16B: Event ID | 8B: Meta | 8B: Checksum | 8B: Payload size | 8B: prev checksum | 8B: prev offset | 8B: prev txn id | -+------------------+----------+--------------+------------------+-------------------+-----------------+-----------------+ +/* + A driver event + --- + Structured as: + +------------------+----------+--------------+------------------+-------------------+-----------------+-----------------+ + | 16B: Event ID | 8B: Meta | 8B: Checksum | 8B: Payload size | 8B: prev checksum | 8B: prev offset | 8B: prev txn id | + +------------------+----------+--------------+------------------+-------------------+-----------------+-----------------+ */ - struct DriverEvent { txn_id: u128, event: DriverEventKind, @@ -493,10 +499,13 @@ impl RawJournalWriter { /// Commit a new event to the journal /// /// This will auto-flush the buffer and sync metadata as soon as the [`RawJournalAdapter::commit`] method returns, - /// unless otherwise configured - pub fn commit_event<'a>(&mut self, event: J::Event<'a>) -> RuntimeResult<()> { + /// unless otherwise configured. + pub fn commit_event<'a, E: RawJournalAdapterEvent>( + &mut self, + event: E, + ) -> RuntimeResult<()> { self.txn_context(|me, txn_id| { - let ev_md = me.j.get_event_md(&event); + let ev_md = event.md(); jtrace_writer!(CommitAttemptForEvent(txn_id as u64)); // MSB must be unused; set msb debug_assert!(ev_md & SERVER_EV_MASK != 1, "MSB must be unset"); @@ -520,7 +529,7 @@ impl RawJournalWriter { log_file.tracked_write(&ev_md.to_le_bytes())?; jtrace_writer!(CommitServerEventWroteMetadata); // now hand over control to adapter impl - J::commit_direct::(j, log_file, event)?; + J::commit_direct::(j, log_file, event)?; } } jtrace_writer!(CommitServerEventAdapterCompleted); @@ -674,10 +683,8 @@ impl RawJournalReader { jtrace_reader!(ServerEventMetadataParsed); // now parse the actual event let Self { tr: reader, .. } = self; - let event = J::parse_event::(reader, meta)?; - jtrace_reader!(ServerEventParsed); // we do not consider a parsed event a success signal; so we must actually apply it - match J::apply_event(gs, meta, event) { + match J::decode_apply::(gs, meta, reader) { Ok(()) => { jtrace_reader!(ServerEventAppliedSuccess); Self::__refresh_known_txn(self); diff --git a/server/src/engine/storage/v2/raw/journal/raw/tests.rs b/server/src/engine/storage/v2/raw/journal/raw/tests.rs index 5de4a230..554907c1 100644 --- a/server/src/engine/storage/v2/raw/journal/raw/tests.rs +++ b/server/src/engine/storage/v2/raw/journal/raw/tests.rs @@ -27,7 +27,7 @@ use { super::{ create_journal, open_journal, CommitPreference, DriverEvent, DriverEventKind, - JournalInitializer, RawJournalAdapter, RawJournalWriter, + JournalInitializer, RawJournalAdapter, RawJournalAdapterEvent, RawJournalWriter, }, crate::engine::{ error::StorageError, @@ -63,7 +63,7 @@ fn encode_decode_meta() { */ #[derive(Debug, Clone, PartialEq)] -struct SimpleDB { +pub struct SimpleDB { data: RefCell>, } impl SimpleDB { @@ -79,13 +79,13 @@ impl SimpleDB { &mut self, log: &mut RawJournalWriter, ) -> RuntimeResult<()> { - log.commit_event(DbEventEncoded::Clear)?; + log.commit_event(DbEventClear)?; self.data.get_mut().clear(); Ok(()) } fn pop(&mut self, log: &mut RawJournalWriter) -> RuntimeResult<()> { self.data.get_mut().pop().unwrap(); - log.commit_event(DbEventEncoded::Pop)?; + log.commit_event(DbEventPop)?; Ok(()) } fn push( @@ -94,24 +94,58 @@ impl SimpleDB { new: impl ToString, ) -> RuntimeResult<()> { let new = new.to_string(); - log.commit_event(DbEventEncoded::NewKey(&new))?; + log.commit_event(DbEventPush(&new))?; self.data.get_mut().push(new); Ok(()) } } -struct SimpleDBJournal; -enum DbEventEncoded<'a> { - NewKey(&'a str), - Pop, - Clear, + +/* + event impls +*/ + +pub struct SimpleDBJournal; +struct DbEventPush<'a>(&'a str); +struct DbEventPop; +struct DbEventClear; +trait SimpleDBEvent: Sized { + const OPC: u8; + fn write_buffered(self, _: &mut Vec); +} +macro_rules! impl_db_event { + ($($ty:ty as $code:expr $(=> $expr:expr)?),*) => { + $(impl SimpleDBEvent for $ty { + const OPC: u8 = $code; + fn write_buffered(self, buf: &mut Vec) { let _ = buf; fn do_it(s: $ty, b: &mut Vec, f: impl Fn($ty, &mut Vec)) { f(s, b) } $(do_it(self, buf, $expr))? } + })* + } +} + +impl_db_event!( + DbEventPush<'_> as 0 => |me, buf| { + buf.extend(&(me.0.len() as u64).to_le_bytes()); + buf.extend(me.0.as_bytes()); + }, + DbEventPop as 1, + DbEventClear as 2 +); + +impl RawJournalAdapterEvent for T { + fn md(&self) -> u64 { + T::OPC as _ + } + fn write_buffered(self, buf: &mut Vec) { + T::write_buffered(self, buf) + } } -enum DbEventRestored { + +pub enum DbEventRestored { NewKey(String), Pop, Clear, } #[derive(Debug, PartialEq, Clone, Copy)] -enum EventMeta { +pub enum EventMeta { NewKey, Pop, Clear, @@ -120,8 +154,6 @@ impl RawJournalAdapter for SimpleDBJournal { const COMMIT_PREFERENCE: CommitPreference = CommitPreference::Buffered; type Spec = SystemDatabaseV1; type GlobalState = SimpleDB; - type Event<'a> = DbEventEncoded<'a>; - type DecodedEvent = DbEventRestored; type EventMeta = EventMeta; type Context<'a> = () where Self: 'a; fn initialize(_: &JournalInitializer) -> Self { @@ -140,52 +172,34 @@ impl RawJournalAdapter for SimpleDBJournal { _ => return None, }) } - fn get_event_md<'a>(&self, event: &Self::Event<'a>) -> u64 { - match event { - DbEventEncoded::NewKey(_) => 0, - DbEventEncoded::Pop => 1, - DbEventEncoded::Clear => 2, - } - } - fn commit_buffered<'a>(&mut self, buf: &mut Vec, event: Self::Event<'a>) { - if let DbEventEncoded::NewKey(key) = event { - buf.extend((key.len() as u64).to_le_bytes()); - buf.extend(key.as_bytes()); - } + fn commit_buffered<'a, E: RawJournalAdapterEvent>( + &mut self, + buf: &mut Vec, + event: E, + ) { + event.write_buffered(buf) } - fn parse_event<'a, Fs: FSInterface>( + fn decode_apply<'a, Fs: FSInterface>( + gs: &Self::GlobalState, + meta: Self::EventMeta, file: &mut TrackedReader< <::File as FileInterface>::BufReader, Self::Spec, >, - meta: Self::EventMeta, - ) -> RuntimeResult { - Ok(match meta { + ) -> RuntimeResult<()> { + match meta { EventMeta::NewKey => { let key_size = u64::from_le_bytes(file.read_block()?); let mut keybuf = vec![0u8; key_size as usize]; file.tracked_read(&mut keybuf)?; match String::from_utf8(keybuf) { - Ok(k) => DbEventRestored::NewKey(k), + Ok(k) => gs.data.borrow_mut().push(k), Err(_) => return Err(StorageError::RawJournalEventCorrupted.into()), } } - EventMeta::Clear => DbEventRestored::Clear, - EventMeta::Pop => DbEventRestored::Pop, - }) - } - fn apply_event<'a>( - gs: &Self::GlobalState, - _: Self::EventMeta, - event: Self::DecodedEvent, - ) -> RuntimeResult<()> { - match event { - DbEventRestored::NewKey(k) => gs.data.borrow_mut().push(k), - DbEventRestored::Clear => gs.data.borrow_mut().clear(), - DbEventRestored::Pop => { - if gs.data.borrow_mut().pop().is_none() { - return Err(StorageError::RawJournalCorrupted.into()); - } + EventMeta::Clear => gs.data.borrow_mut().clear(), + EventMeta::Pop => { + let _ = gs.data.borrow_mut().pop().unwrap(); } } Ok(()) @@ -354,7 +368,6 @@ fn journal_with_server_single_event() { JournalReaderTraceEvent::AttemptingEvent(0), JournalReaderTraceEvent::DetectedServerEvent, JournalReaderTraceEvent::ServerEventMetadataParsed, - JournalReaderTraceEvent::ServerEventParsed, JournalReaderTraceEvent::ServerEventAppliedSuccess, // now read close event JournalReaderTraceEvent::AttemptingEvent(1), @@ -402,7 +415,6 @@ fn journal_with_server_single_event() { JournalReaderTraceEvent::AttemptingEvent(0), JournalReaderTraceEvent::DetectedServerEvent, JournalReaderTraceEvent::ServerEventMetadataParsed, - JournalReaderTraceEvent::ServerEventParsed, JournalReaderTraceEvent::ServerEventAppliedSuccess, // now read close event JournalReaderTraceEvent::AttemptingEvent(1), diff --git a/server/src/engine/storage/v2/raw/journal/tests.rs b/server/src/engine/storage/v2/raw/journal/tests.rs index c2e4893e..019fcfa4 100644 --- a/server/src/engine/storage/v2/raw/journal/tests.rs +++ b/server/src/engine/storage/v2/raw/journal/tests.rs @@ -29,146 +29,188 @@ */ use { - super::{raw::create_journal, EventLog, EventLogAdapter, EventLogJournal}, - crate::engine::{ - error::StorageError, - fractal, - mem::unsafe_apis, - storage::{ - common::interface::fs_test::VirtualFS, - v2::raw::{journal::raw::open_journal, spec::SystemDatabaseV1}, + super::{raw::RawJournalAdapterEvent, DispatchFn, EventLog, EventLogAdapter, EventLogDriver}, + crate::{ + engine::{ + error::StorageError, + mem::unsafe_apis, + storage::{ + common::interface::fs_test::VirtualFS, + v2::raw::{ + journal::raw::{create_journal, open_journal}, + spec::SystemDatabaseV1, + }, + }, + RuntimeResult, }, - RuntimeResult, + util::compiler::TaggedEnum, }, + sky_macros::TaggedEnum, std::cell::{Ref, RefCell, RefMut}, }; +// event definitions + +#[derive(TaggedEnum, Clone, Copy, Debug)] +#[repr(u8)] +pub enum TestEvent { + Push = 0, + Pop = 1, + Clear = 2, +} + +pub trait IsTestEvent { + const EVCODE: TestEvent; + fn encode(self, _: &mut Vec); +} + +macro_rules! impl_test_event { + ($($ty:ty as $code:expr $(=> $expr:expr)?),* $(,)?) => { + $(impl IsTestEvent for $ty { + const EVCODE: TestEvent = $code; + fn encode(self, buf: &mut Vec) { let _ = buf; fn do_it(s: $ty, b: &mut Vec, f: impl Fn($ty, &mut Vec)) { f(s, b) } $(do_it(self, buf, $expr))? } + })* + } +} + +pub struct EventPush<'a>(&'a str); +pub struct EventPop; +pub struct EventClear; + +impl_test_event!( + EventPush<'_> as TestEvent::Push => |me, buf| { + buf.extend(&(me.0.len() as u64).to_le_bytes()); + buf.extend(me.0.as_bytes()) + }, + EventPop as TestEvent::Pop, + EventClear as TestEvent::Clear, +); + +impl RawJournalAdapterEvent> for TE { + fn md(&self) -> u64 { + Self::EVCODE.dscr_u64() + } + fn write_buffered(self, buf: &mut Vec) { + TE::encode(self, buf) + } +} + +// adapter + +pub struct TestDBAdapter; +impl EventLogAdapter for TestDBAdapter { + type Spec = SystemDatabaseV1; + type GlobalState = TestDB; + type EventMeta = TestEvent; + type DecodeDispatch = [DispatchFn; 3]; + const DECODE_DISPATCH: Self::DecodeDispatch = [ + |db, payload| { + if payload.len() < sizeof!(u64) { + Err(StorageError::RawJournalCorrupted.into()) + } else { + let length = + u64::from_le_bytes(unsafe { unsafe_apis::memcpy(&payload[..sizeof!(u64)]) }); + let payload = &payload[sizeof!(u64)..]; + if payload.len() as u64 != length { + Err(StorageError::RawJournalCorrupted.into()) + } else { + let string = String::from_utf8(payload.to_owned()).unwrap(); + db._mut().push(string); + Ok(()) + } + } + }, + |db, _| { + let _ = db._mut().pop(); + Ok(()) + }, + |db, _| { + db._mut().clear(); + Ok(()) + }, + ]; +} + #[derive(Default)] -pub struct SimpleDB { - values: RefCell>, +pub struct TestDB { + data: RefCell>, } -impl SimpleDB { - fn as_mut(&self) -> RefMut> { - self.values.borrow_mut() +impl TestDB { + fn _mut(&self) -> RefMut> { + self.data.borrow_mut() } - fn as_ref(&self) -> Ref> { - self.values.borrow() + fn _ref(&self) -> Ref> { + self.data.borrow() } fn push( &self, - log: &mut EventLogJournal, + log: &mut EventLogDriver, key: &str, ) -> RuntimeResult<()> { - log.commit_event(DbEvent::Push(key))?; - self.as_mut().push(key.into()); + log.commit_event(EventPush(key))?; + self._mut().push(key.into()); Ok(()) } - fn pop(&self, log: &mut EventLogJournal) -> RuntimeResult<()> { - log.commit_event(DbEvent::Pop)?; - self.as_mut().pop().unwrap(); + fn pop(&self, log: &mut EventLogDriver) -> RuntimeResult<()> { + assert!(!self._ref().is_empty()); + log.commit_event(EventPop)?; + self._mut().pop().unwrap(); Ok(()) } - fn clear(&self, log: &mut EventLogJournal) -> RuntimeResult<()> { - log.commit_event(DbEvent::Clear)?; - self.as_mut().clear(); + fn clear(&self, log: &mut EventLogDriver) -> RuntimeResult<()> { + log.commit_event(EventClear)?; + self._mut().clear(); Ok(()) } } -enum DbEvent<'a> { - Push(&'a str), - Pop, - Clear, +fn open_log() -> ( + TestDB, + super::raw::RawJournalWriter, VirtualFS>, +) { + let db = TestDB::default(); + let log = open_journal("jrnl", &db).unwrap(); + (db, log) } -enum DbEventDecoded { - Push(String), - Pop, - Clear, -} - -struct SimpleDBAdapter; - -impl EventLogAdapter for SimpleDBAdapter { - type SdssSpec = SystemDatabaseV1; - type GlobalState = SimpleDB; - type Event<'a> = DbEvent<'a>; - type DecodedEvent = DbEventDecoded; - type EventMeta = u64; - type Error = fractal::error::Error; - const EV_MAX: u8 = 2; - unsafe fn meta_from_raw(m: u64) -> Self::EventMeta { - m - } - fn event_md<'a>(event: &Self::Event<'a>) -> u64 { - match event { - DbEvent::Push(_) => 0, - DbEvent::Pop => 1, - DbEvent::Clear => 2, - } - } - fn encode<'a>(event: Self::Event<'a>) -> Box<[u8]> { - if let DbEvent::Push(k) = event { - let mut buf = Vec::new(); - buf.extend(&(k.len() as u64).to_le_bytes()); - buf.extend(k.as_bytes()); - buf.into_boxed_slice() - } else { - Default::default() +#[test] +fn test_this_data() { + array!( + const DATA1: [&str] = ["acai berry", "billberry", "cranberry"]; + const DATA2: [&str] = ["acai berry", "billberry", "cranberry", "bradbury"]; + const DATA3: [&str] = ["acai berry", "billberry", "cranberry"]; + const DATA4: [&str] = ["acai berry", "billberry", "cranberry", "dewberry"]; + ); + { + let db = TestDB::default(); + let mut log = create_journal("jrnl").unwrap(); + for key in DATA1 { + db.push(&mut log, key).unwrap(); } + EventLog::close(&mut log).unwrap(); } - fn decode(block: Vec, m: u64) -> Result { - Ok(match m { - 0 => { - if block.len() < sizeof!(u64) { - return Err(StorageError::RawJournalCorrupted.into()); - } - let len = - u64::from_le_bytes(unsafe { unsafe_apis::memcpy(&block[..sizeof!(u64)]) }); - let block = &block[sizeof!(u64)..]; - if block.len() as u64 != len { - return Err(StorageError::RawJournalCorrupted.into()); - } - DbEventDecoded::Push(String::from_utf8_lossy(block).into()) - } - 1 => DbEventDecoded::Pop, - 2 => DbEventDecoded::Clear, - _ => panic!(), - }) - } - fn apply_event(g: &Self::GlobalState, ev: Self::DecodedEvent) -> Result<(), Self::Error> { - match ev { - DbEventDecoded::Push(new) => g.as_mut().push(new), - DbEventDecoded::Pop => { - let _ = g.as_mut().pop(); - } - DbEventDecoded::Clear => g.as_mut().clear(), - } - Ok(()) + { + let (db, mut log) = open_log(); + assert_eq!(db._ref().as_slice(), DATA1); + db.push(&mut log, DATA2[3]).unwrap(); + EventLog::close(&mut log).unwrap(); } -} - -#[test] -fn event_log_basic_events() { - array!(const VALUES: [&str] = ["key1", "key2", "key3", "fancykey", "done"]); { - let mut log = create_journal::, _>("jrnl1").unwrap(); - let db = SimpleDB::default(); - for value in VALUES { - db.push(&mut log, value).unwrap(); - } + let (db, mut log) = open_log(); + assert_eq!(db._ref().as_slice(), DATA2); db.pop(&mut log).unwrap(); - EventLogJournal::close_driver(&mut log).unwrap(); + EventLog::close(&mut log).unwrap(); + } + { + let (db, mut log) = open_log(); + assert_eq!(db._ref().as_slice(), DATA3); + db.push(&mut log, DATA4[3]).unwrap(); + EventLog::close(&mut log).unwrap(); } { - let db = SimpleDB::default(); - let mut log = open_journal::, VirtualFS>("jrnl1", &db).unwrap(); - EventLogJournal::close_driver(&mut log).unwrap(); - assert_eq!( - db.as_ref().as_slice().last().unwrap(), - VALUES[VALUES.len() - 2] - ); - assert_eq!(db.as_ref().as_slice(), &VALUES[..VALUES.len() - 1]); + let (db, mut log) = open_log(); + assert_eq!(db._ref().as_slice(), DATA4); + EventLog::close(&mut log).unwrap(); } } diff --git a/server/src/util/compiler.rs b/server/src/util/compiler.rs index 92567d44..65a87276 100644 --- a/server/src/util/compiler.rs +++ b/server/src/util/compiler.rs @@ -70,3 +70,26 @@ pub const fn hot(v: T) -> T { pub fn cold_rerr(e: E) -> Result { Err(e) } + +/* + pure enumerations +*/ + +pub trait TaggedEnum: Sized { + type Dscr: PartialOrd; + const MAX_DSCR: Self::Dscr; + const VARIANT_COUNT: usize; + fn dscr(&self) -> Self::Dscr; + fn dscr_u64(&self) -> u64; + unsafe fn from_raw(d: Self::Dscr) -> Self; + fn try_from_raw(d: Self::Dscr) -> Option { + if d > Self::MAX_DSCR { + None + } else { + Some(unsafe { + // UNSAFE(@ohsayan): just verified the dscr + ::from_raw(d) + }) + } + } +} diff --git a/sky-macros/src/lib.rs b/sky-macros/src/lib.rs index 3d517caf..3e24468c 100644 --- a/sky-macros/src/lib.rs +++ b/sky-macros/src/lib.rs @@ -102,9 +102,83 @@ fn wrapper(item: DeriveInput) -> TokenStream2 { } } +#[proc_macro_derive(TaggedEnum)] +pub fn derive_tagged_enum(input: TokenStream) -> TokenStream { + let ast = parse_macro_input!(input as DeriveInput); + let (enum_name, _, value_expressions, variant_len, repr_type_ident) = process_enum_tags(&ast); + quote! { + impl crate::util::compiler::TaggedEnum for #enum_name { + type Dscr = #repr_type_ident; + const MAX_DSCR: #repr_type_ident = { + let values = #value_expressions; + let mut i = 1; + let mut max = values[0]; + while i < values.len() { + if values[i] > max { + max = values[i]; + } + i = i + 1; + } + max + }; + const VARIANT_COUNT: usize = #variant_len; + fn dscr(&self) -> #repr_type_ident { + unsafe { + core::mem::transmute(*self) + } + } + fn dscr_u64(&self) -> u64 { + self.dscr() as u64 + } + unsafe fn from_raw(d: #repr_type_ident) -> Self { + core::mem::transmute(d) + } + } + } + .into() +} + #[proc_macro_derive(EnumMethods)] pub fn derive_value_methods(input: TokenStream) -> TokenStream { let ast = parse_macro_input!(input as DeriveInput); + let (enum_name, repr_type, value_expressions, variant_len, repr_type_ident) = process_enum_tags(&ast); + let repr_type_ident_func = syn::Ident::new( + &format!("value_{repr_type}"), + proc_macro2::Span::call_site(), + ); + let gen = quote! { + impl #enum_name { + pub const MAX: #repr_type_ident = Self::max_value(); + pub const VARIANTS: usize = #variant_len; + pub const fn #repr_type_ident_func(&self) -> #repr_type_ident { unsafe { core::mem::transmute(*self) } } + pub const fn value_word(&self) -> usize { self.#repr_type_ident_func() as usize } + pub const fn value_qword(&self) -> u64 { self.#repr_type_ident_func() as u64 } + pub const fn max_value() -> #repr_type_ident { + let values = #value_expressions; + let mut i = 1; + let mut max = values[0]; + while i < values.len() { + if values[i] > max { + max = values[i]; + } + i = i + 1; + } + max + } + } + }; + gen.into() +} + +fn process_enum_tags( + ast: &DeriveInput, +) -> ( + &proc_macro2::Ident, + String, + TokenStream2, + usize, + proc_macro2::Ident, +) { let enum_name = &ast.ident; let mut repr_type = None; // Get repr attribute @@ -136,39 +210,17 @@ pub fn derive_value_methods(input: TokenStream) -> TokenStream { } else { panic!("This derive macro only works on enums"); } - + assert!(!dscr_expressions.is_empty(), "must be a non-empty enumeration"); let value_expressions = quote! { [#(#dscr_expressions),*] }; - let variant_len = dscr_expressions.len(); - let repr_type_ident = syn::Ident::new(&repr_type, proc_macro2::Span::call_site()); - let repr_type_ident_func = syn::Ident::new( - &format!("value_{repr_type}"), - proc_macro2::Span::call_site(), - ); - - let gen = quote! { - impl #enum_name { - pub const MAX: #repr_type_ident = Self::max_value(); - pub const VARIANTS: usize = #variant_len; - pub const fn #repr_type_ident_func(&self) -> #repr_type_ident { unsafe { core::mem::transmute(*self) } } - pub const fn value_word(&self) -> usize { self.#repr_type_ident_func() as usize } - pub const fn value_qword(&self) -> u64 { self.#repr_type_ident_func() as u64 } - pub const fn max_value() -> #repr_type_ident { - let values = #value_expressions; - let mut i = 1; - let mut max = values[0]; - while i < values.len() { - if values[i] > max { - max = values[i]; - } - i = i + 1; - } - max - } - } - }; - gen.into() + ( + enum_name, + repr_type, + value_expressions, + variant_len, + repr_type_ident, + ) }