Add new batch impl

next
Sayan Nandan 7 months ago
parent 196fd746e6
commit a188ccb60d
No known key found for this signature in database
GPG Key ID: 0EBD769024B24F0A

@ -46,6 +46,12 @@ pub struct SCrc64 {
digest: Digest<'static, u64>,
}
impl Default for SCrc64 {
fn default() -> Self {
Self::new()
}
}
impl fmt::Debug for SCrc64 {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SCrc64")

@ -26,20 +26,23 @@
#![allow(dead_code)]
use crate::{
engine::{
mem::fixed_vec::FixedVec,
storage::common::{
checksum::SCrc64,
interface::fs_traits::{
FSInterface, FileInterface, FileInterfaceBufWrite, FileInterfaceExt,
FileInterfaceRead, FileInterfaceWrite, FileInterfaceWriteExt, FileOpen,
use {
crate::{
engine::{
mem::fixed_vec::FixedVec,
storage::common::{
checksum::SCrc64,
interface::fs_traits::{
FSInterface, FileInterface, FileInterfaceBufWrite, FileInterfaceExt,
FileInterfaceRead, FileInterfaceWrite, FileInterfaceWriteExt, FileOpen,
},
sdss::sdss_r1::FileSpecV1,
},
sdss::sdss_r1::FileSpecV1,
RuntimeResult,
},
RuntimeResult,
util::os::SysIOError,
},
util::os::SysIOError,
std::mem,
};
/*
@ -206,6 +209,30 @@ pub struct TrackedReader<F, S: FileSpecV1> {
cs: SCrc64,
}
pub struct TrackedReaderContext<'a, F, S: FileSpecV1> {
tr: &'a mut TrackedReader<F, S>,
p_checksum: SCrc64,
}
impl<'a, F: FileInterfaceRead, S: FileSpecV1> TrackedReaderContext<'a, F, S> {
pub fn read(&mut self, buf: &mut [u8]) -> RuntimeResult<()> {
self.tr
.tracked_read(buf)
.map(|_| self.p_checksum.update(buf))
}
pub fn read_block<const N: usize>(&mut self) -> RuntimeResult<[u8; N]> {
let mut block = [0; N];
self.tr.tracked_read(&mut block).map(|_| {
self.p_checksum.update(&block);
block
})
}
pub fn finish(self) -> (u64, &'a mut TrackedReader<F, S>) {
let Self { tr, p_checksum } = self;
(p_checksum.finish(), tr)
}
}
impl<F: FileInterface, S: FileSpecV1> TrackedReader<F, S> {
/// Create a new [`TrackedReader`]. This needs to retrieve file position and length
pub fn new(mut f: SdssFile<F, S>) -> RuntimeResult<TrackedReader<F::BufReader, S>> {
@ -227,6 +254,12 @@ impl<F: FileInterface, S: FileSpecV1> TrackedReader<F, S> {
}
impl<F: FileInterfaceRead, S: FileSpecV1> TrackedReader<F, S> {
pub fn context(&mut self) -> TrackedReaderContext<F, S> {
TrackedReaderContext {
tr: self,
p_checksum: SCrc64::new(),
}
}
/// Attempt to fill the buffer. This read is tracked.
pub fn tracked_read(&mut self, buf: &mut [u8]) -> RuntimeResult<()> {
self.untracked_read(buf).map(|_| self.cs.update(buf))
@ -319,6 +352,7 @@ pub struct TrackedWriter<
f_md: S::Metadata,
t_cursor: u64,
t_checksum: SCrc64,
t_partial_checksum: SCrc64,
buf: FixedVec<u8, SIZE>,
}
@ -336,6 +370,7 @@ impl<
f_md,
t_cursor,
t_checksum,
t_partial_checksum: SCrc64::new(),
buf: FixedVec::allocate(),
}
}
@ -397,6 +432,11 @@ impl<
const CHECKSUM_WRITTEN_IF_BLOCK_ERROR: bool,
> TrackedWriter<F, S, SIZE, PANIC_IF_UNFLUSHED, CHECKSUM_WRITTEN_IF_BLOCK_ERROR>
{
/// Same as [`Self::tracked_write_through_buffer`], but the partial state is updated
pub fn dtrack_write_through_buffer(&mut self, buf: &[u8]) -> RuntimeResult<()> {
self.tracked_write_through_buffer(buf)
.map(|_| self.t_partial_checksum.update(buf))
}
/// Don't write to the buffer, instead directly write to the file
///
/// NB:
@ -418,6 +458,15 @@ impl<
}
}
}
/// Same as [`Self::tracked_write`], but the partial state is updated
pub fn dtrack_write(&mut self, buf: &[u8]) -> RuntimeResult<()> {
self.tracked_write(buf)
.map(|_| self.t_partial_checksum.update(buf))
}
/// Reset the partial state
pub fn reset_partial(&mut self) -> u64 {
mem::take(&mut self.t_partial_checksum).finish()
}
/// Do a tracked write
///
/// On error, if block error checksumming is set then whatever part of the block was written

@ -35,7 +35,7 @@ use {
checksum::SCrc64,
interface::fs_traits::{FSInterface, FileInterface},
sdss::sdss_r1::{
rw::{TrackedReader, TrackedWriter},
rw::{TrackedReader, TrackedReaderContext, TrackedWriter},
FileSpecV1,
},
},
@ -137,3 +137,74 @@ impl<EL: EventLogAdapter> RawJournalAdapter for EventLog<EL> {
)
}
}
pub type BatchJournalDriver<BA, Fs> = RawJournalWriter<BatchJournal<BA>, Fs>;
pub struct BatchJournal<BA: BatchAdapter>(PhantomData<BA>);
impl<BA: BatchAdapter> BatchJournal<BA> {
pub fn close<Fs: FSInterface>(me: &mut BatchJournalDriver<BA, Fs>) -> RuntimeResult<()> {
RawJournalWriter::close_driver(me)
}
}
pub trait BatchAdapter {
type Spec: FileSpecV1;
type GlobalState;
type BatchMeta: TaggedEnum<Dscr = u8>;
fn decode_batch<Fs: FSInterface>(
gs: &Self::GlobalState,
f: &mut TrackedReaderContext<
<<Fs as FSInterface>::File as FileInterface>::BufReader,
Self::Spec,
>,
meta: Self::BatchMeta,
) -> RuntimeResult<()>;
}
impl<BA: BatchAdapter> RawJournalAdapter for BatchJournal<BA> {
const COMMIT_PREFERENCE: CommitPreference = CommitPreference::Direct;
type Spec = <BA as BatchAdapter>::Spec;
type GlobalState = <BA as BatchAdapter>::GlobalState;
type Context<'a> = () where BA: 'a;
type EventMeta = <BA as BatchAdapter>::BatchMeta;
fn initialize(_: &raw::JournalInitializer) -> Self {
Self(PhantomData)
}
fn enter_context<'a, Fs: FSInterface>(
_: &'a mut RawJournalWriter<Self, Fs>,
) -> Self::Context<'a> {
}
fn parse_event_meta(meta: u64) -> Option<Self::EventMeta> {
<<BA as BatchAdapter>::BatchMeta as TaggedEnum>::try_from_raw(meta as u8)
}
fn commit_direct<'a, Fs: FSInterface, E>(
&mut self,
w: &mut TrackedWriter<Fs::File, Self::Spec>,
ev: E,
) -> RuntimeResult<()>
where
E: RawJournalAdapterEvent<Self>,
{
ev.write_direct::<Fs>(w)?;
let checksum = w.reset_partial();
w.tracked_write(&checksum.to_le_bytes())
}
fn decode_apply<'a, Fs: FSInterface>(
gs: &Self::GlobalState,
meta: Self::EventMeta,
file: &mut TrackedReader<
<<Fs as FSInterface>::File as FileInterface>::BufReader,
Self::Spec,
>,
) -> RuntimeResult<()> {
let mut reader_ctx = file.context();
<BA as BatchAdapter>::decode_batch::<Fs>(gs, &mut reader_ctx, meta)?;
let (real_checksum, file) = reader_ctx.finish();
let stored_checksum = u64::from_le_bytes(file.read_block()?);
if real_checksum == stored_checksum {
Ok(())
} else {
Err(StorageError::RawJournalCorrupted.into())
}
}
}

@ -29,16 +29,26 @@
*/
use {
super::{raw::RawJournalAdapterEvent, DispatchFn, EventLog, EventLogAdapter, EventLogDriver},
super::{
raw::{RawJournalAdapter, RawJournalAdapterEvent},
BatchAdapter, BatchJournal, BatchJournalDriver, DispatchFn, EventLog, EventLogAdapter,
EventLogDriver,
},
crate::{
engine::{
error::StorageError,
mem::unsafe_apis,
storage::{
common::interface::fs_test::VirtualFS,
common::{
interface::{
fs_test::VirtualFS,
fs_traits::{FSInterface, FileInterface},
},
sdss::sdss_r1::rw::{TrackedReaderContext, TrackedWriter},
},
v2::raw::{
journal::raw::{create_journal, open_journal},
spec::SystemDatabaseV1,
spec::{ModelDataBatchAofV1, SystemDatabaseV1},
},
},
RuntimeResult,
@ -214,3 +224,149 @@ fn test_this_data() {
EventLog::close(&mut log).unwrap();
}
}
/*
batch test
*/
struct BatchDB {
data: RefCell<BatchDBInner>,
}
struct BatchDBInner {
data: Vec<String>,
changed: usize,
last_idx: usize,
}
impl BatchDB {
const THRESHOLD: usize = 1;
fn new() -> Self {
Self {
data: RefCell::new(BatchDBInner {
data: vec![],
changed: 0,
last_idx: 0,
}),
}
}
fn _mut(&self) -> RefMut<BatchDBInner> {
self.data.borrow_mut()
}
fn _ref(&self) -> Ref<BatchDBInner> {
self.data.borrow()
}
fn push(
&self,
log: &mut BatchJournalDriver<BatchDBAdapter, VirtualFS>,
key: &str,
) -> RuntimeResult<()> {
let mut me = self._mut();
me.data.push(key.into());
if me.changed == Self::THRESHOLD {
me.changed += 1;
log.commit_event(FlushBatch::new(&me, me.last_idx, me.changed))?;
me.changed = 0;
me.last_idx = me.data.len();
Ok(())
} else {
me.changed += 1;
Ok(())
}
}
}
struct BatchDBAdapter;
#[derive(Debug, Clone, Copy, TaggedEnum, PartialEq)]
#[repr(u8)]
enum BatchEvent {
NewBatch = 0,
}
impl BatchAdapter for BatchDBAdapter {
type Spec = ModelDataBatchAofV1;
type GlobalState = BatchDB;
type BatchMeta = BatchEvent;
fn decode_batch<Fs: FSInterface>(
gs: &Self::GlobalState,
f: &mut TrackedReaderContext<
<<Fs as FSInterface>::File as FileInterface>::BufReader,
Self::Spec,
>,
meta: Self::BatchMeta,
) -> RuntimeResult<()> {
let mut gs = gs._mut();
assert_eq!(meta, BatchEvent::NewBatch);
let mut batch_size = u64::from_le_bytes(f.read_block()?);
while batch_size != 0 {
let keylen = u64::from_le_bytes(f.read_block()?);
let mut key = vec![0; keylen as usize];
f.read(&mut key)?;
gs.data.push(String::from_utf8(key).unwrap());
gs.last_idx += 1;
batch_size -= 1;
}
Ok(())
}
}
struct FlushBatch<'a> {
data: &'a BatchDBInner,
start: usize,
cnt: usize,
}
impl<'a> FlushBatch<'a> {
fn new(data: &'a BatchDBInner, start: usize, cnt: usize) -> Self {
Self { data, start, cnt }
}
}
impl<'a> RawJournalAdapterEvent<BatchJournal<BatchDBAdapter>> for FlushBatch<'a> {
fn md(&self) -> u64 {
BatchEvent::NewBatch.dscr_u64()
}
fn write_direct<Fs: FSInterface>(
self,
w: &mut TrackedWriter<Fs::File, <BatchJournal<BatchDBAdapter> as RawJournalAdapter>::Spec>,
) -> RuntimeResult<()> {
// length
w.dtrack_write(&(self.cnt as u64).to_le_bytes())?;
// now write all the new keys
for key in &self.data.data[self.start..self.start + self.cnt] {
w.dtrack_write(&(key.len() as u64).to_le_bytes())?;
w.dtrack_write(key.as_bytes())?;
}
Ok(())
}
}
#[test]
fn batch_simple() {
{
let mut log = create_journal::<_, VirtualFS>("batch_jrnl").unwrap();
let db = BatchDB::new();
db.push(&mut log, "a").unwrap();
db.push(&mut log, "b").unwrap();
BatchJournal::close(&mut log).unwrap();
}
{
let db = BatchDB::new();
let mut log = open_journal::<_, VirtualFS>("batch_jrnl", &db).unwrap();
db.push(&mut log, "c").unwrap();
db.push(&mut log, "d").unwrap();
BatchJournal::close(&mut log).unwrap();
}
{
let db = BatchDB::new();
let mut log = open_journal::<_, VirtualFS>("batch_jrnl", &db).unwrap();
db.push(&mut log, "e").unwrap();
db.push(&mut log, "f").unwrap();
BatchJournal::close(&mut log).unwrap();
}
{
let db = BatchDB::new();
let mut log =
open_journal::<BatchJournal<BatchDBAdapter>, VirtualFS>("batch_jrnl", &db).unwrap();
assert_eq!(db._ref().data, ["a", "b", "c", "d", "e", "f"]);
BatchJournal::close(&mut log).unwrap();
}
}

@ -141,7 +141,8 @@ pub fn derive_tagged_enum(input: TokenStream) -> TokenStream {
#[proc_macro_derive(EnumMethods)]
pub fn derive_value_methods(input: TokenStream) -> TokenStream {
let ast = parse_macro_input!(input as DeriveInput);
let (enum_name, repr_type, value_expressions, variant_len, repr_type_ident) = process_enum_tags(&ast);
let (enum_name, repr_type, value_expressions, variant_len, repr_type_ident) =
process_enum_tags(&ast);
let repr_type_ident_func = syn::Ident::new(
&format!("value_{repr_type}"),
proc_macro2::Span::call_site(),
@ -210,7 +211,10 @@ fn process_enum_tags(
} else {
panic!("This derive macro only works on enums");
}
assert!(!dscr_expressions.is_empty(), "must be a non-empty enumeration");
assert!(
!dscr_expressions.is_empty(),
"must be a non-empty enumeration"
);
let value_expressions = quote! {
[#(#dscr_expressions),*]
};

Loading…
Cancel
Save