Impl txn log writer

next
Sayan Nandan 1 year ago
parent 55f53456f8
commit 843ff05d85
No known key found for this signature in database
GPG Key ID: 42EEDF4AE9D96B54

@ -34,7 +34,7 @@ use {
crate::engine::storage::v1::SDSSError,
std::{
fs::File,
io::{Read, Write},
io::{Read, Seek, SeekFrom, Write},
},
};
@ -56,6 +56,7 @@ pub trait RawFileIOInterface: Sized {
fn fread_exact(&mut self, buf: &mut [u8]) -> SDSSResult<()>;
fn fwrite_all(&mut self, bytes: &[u8]) -> SDSSResult<()>;
fn fsync_all(&mut self) -> SDSSResult<()>;
fn fseek_ahead(&mut self, by: usize) -> SDSSResult<()>;
fn flen(&self) -> SDSSResult<u64>;
}
@ -88,6 +89,10 @@ impl RawFileIOInterface for File {
fn flen(&self) -> SDSSResult<u64> {
Ok(self.metadata()?.len())
}
fn fseek_ahead(&mut self, by: usize) -> SDSSResult<()> {
self.seek(SeekFrom::Start(by as _))?;
Ok(())
}
}
#[derive(Debug)]
@ -143,6 +148,13 @@ impl<F: RawFileIOInterface> SDSSFileIO<F> {
fn _new(f: F) -> Self {
Self { f }
}
pub fn unfsynced_write(&mut self, data: &[u8]) -> SDSSResult<()> {
self.f.fwrite_all(data)
}
pub fn fsync_all(&mut self) -> SDSSResult<()> {
self.f.fsync_all()?;
Ok(())
}
pub fn fsynced_write(&mut self, data: &[u8]) -> SDSSResult<()> {
self.f.fwrite_all(data)?;
self.f.fsync_all()
@ -153,4 +165,7 @@ impl<F: RawFileIOInterface> SDSSFileIO<F> {
pub fn file_length(&self) -> SDSSResult<u64> {
self.f.flen()
}
pub fn seek_ahead(&mut self, by: usize) -> SDSSResult<()> {
self.f.fseek_ahead(by)
}
}

@ -51,6 +51,7 @@ fn vfs<T>(fname: &str, mut func: impl FnMut(&mut VirtualFile) -> SDSSResult<T>)
}
struct VirtualFile {
pos: u64,
read: bool,
write: bool,
data: Vec<u8>,
@ -58,7 +59,12 @@ struct VirtualFile {
impl VirtualFile {
fn new(read: bool, write: bool, data: Vec<u8>) -> Self {
Self { read, write, data }
Self {
read,
write,
data,
pos: 0,
}
}
fn rw(data: Vec<u8>) -> Self {
Self::new(true, true, data)
@ -69,6 +75,16 @@ impl VirtualFile {
fn r(data: Vec<u8>) -> Self {
Self::new(true, false, data)
}
fn seek_forward(&mut self, by: usize) {
self.pos += by as u64;
assert!(self.pos <= self.data.len() as u64);
}
fn data(&self) -> &[u8] {
&self.data[self.pos as usize..]
}
fn data_mut(&mut self) -> &mut [u8] {
&mut self.data[self.pos as usize..]
}
}
struct VirtualFileInterface(Box<str>);
@ -86,14 +102,14 @@ impl RawFileIOInterface for VirtualFileInterface {
fn fread_exact(&mut self, buf: &mut [u8]) -> super::SDSSResult<()> {
vfs(&self.0, |f| {
assert!(f.read);
f.data.as_slice().read_exact(buf)?;
f.data().read_exact(buf)?;
Ok(())
})
}
fn fwrite_all(&mut self, bytes: &[u8]) -> super::SDSSResult<()> {
vfs(&self.0, |f| {
assert!(f.write);
f.data.write_all(bytes)?;
f.data_mut().write_all(bytes)?;
Ok(())
})
}
@ -103,6 +119,12 @@ impl RawFileIOInterface for VirtualFileInterface {
fn flen(&self) -> SDSSResult<u64> {
vfs(&self.0, |f| Ok(f.data.len() as _))
}
fn fseek_ahead(&mut self, by: usize) -> SDSSResult<()> {
vfs(&self.0, |f| {
f.seek_forward(by);
Ok(())
})
}
}
mod rw {

@ -45,6 +45,8 @@ use {
std::marker::PhantomData,
};
const CRC: crc::Crc<u32> = crc::Crc::<u32>::new(&crc::CRC_32_ISO_HDLC);
/// The transaction adapter
pub trait TransactionLogAdapter {
/// The transaction event
@ -199,7 +201,6 @@ impl<TA: TransactionLogAdapter, LF: RawFileIOInterface> TransactionLogReader<TA,
self.log_file.read_to_buffer(&mut payload_data_block)?;
self._record_bytes_read(event_metadata.event_payload_len as _);
// verify sum
const CRC: crc::Crc<u32> = crc::Crc::<u32>::new(&crc::CRC_32_ISO_HDLC);
let actual_sum = CRC.checksum(&payload_data_block);
if compiler::likely(actual_sum == event_metadata.event_crc) {
// great, the sums match
@ -233,3 +234,55 @@ impl<TA, LF> TransactionLogReader<TA, LF> {
self.remaining_bytes == 0
}
}
pub struct TransactionLogWriter<LF, TA> {
/// the txn log file
log_file: SDSSFileIO<LF>,
/// the id of the **next** transaction
id: u64,
_m: PhantomData<TA>,
}
impl<LF: RawFileIOInterface, TA: TransactionLogAdapter> TransactionLogWriter<LF, TA> {
pub fn new(
mut log_file: SDSSFileIO<LF>,
last_txn_id: u64,
last_size: usize,
) -> SDSSResult<Self> {
log_file.seek_ahead(last_size)?;
Ok(Self {
log_file,
id: last_txn_id,
_m: PhantomData,
})
}
pub fn append_event(&mut self, event: TA::TransactionEvent) -> SDSSResult<()> {
let encoded = TA::encode(event);
let md = TxnLogEntryMetadata::new(
self._incr_id() as u128,
EventSourceMarker::SERVER_STD,
CRC.checksum(&encoded),
encoded.len() as u64,
)
.encoded();
self.log_file.unfsynced_write(&md)?;
self.log_file.unfsynced_write(&encoded)?;
self.log_file.fsync_all()?;
Ok(())
}
pub fn close_log(mut self) -> SDSSResult<()> {
let crc = CRC.checksum(EventSourceMarker::DRIVER_CLOSED.to_le_bytes().as_ref());
let id = self._incr_id() as u128;
self.log_file.fsynced_write(
&TxnLogEntryMetadata::new(id, EventSourceMarker::DRIVER_CLOSED, crc, 0).encoded(),
)
}
}
impl<LF, TA> TransactionLogWriter<LF, TA> {
fn _incr_id(&mut self) -> u64 {
let current = self.id;
self.id += 1;
current
}
}

Loading…
Cancel
Save