Implement transaction reader

next
Sayan Nandan 1 year ago
parent 9091db6bd3
commit 55f53456f8
No known key found for this signature in database
GPG Key ID: 42EEDF4AE9D96B54

16
Cargo.lock generated

@ -303,6 +303,21 @@ dependencies = [
"libc",
]
[[package]]
name = "crc"
version = "3.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "86ec7a15cbe22e59248fc7eadb1907dab5ba09372595da4d73dd805ed4417dfe"
dependencies = [
"crc-catalog",
]
[[package]]
name = "crc-catalog"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9cace84e55f07e7301bae1c519df89cdad8cc3cd868413d3fdbdeca9ff3db484"
[[package]]
name = "crc32fast"
version = "1.3.2"
@ -1394,6 +1409,7 @@ dependencies = [
"cc",
"chrono",
"clap",
"crc",
"crossbeam-epoch",
"env_logger",
"hashbrown",

@ -28,6 +28,7 @@ tokio-openssl = "0.6.3"
toml = "0.5.9"
base64 = "0.13.0"
uuid = { version = "1.2.2", features = ["v4", "fast-rng", "macro-diagnostics"] }
crc = "3.0.1"
[target.'cfg(all(not(target_env = "msvc"), not(miri)))'.dependencies]
# external deps

@ -57,6 +57,8 @@ pub enum SDSSError {
CorruptedFile(&'static str),
StartupError(&'static str),
CorruptedHeader,
TransactionLogEntryCorrupted,
TransactionLogCorrupted,
}
impl SDSSError {

@ -56,6 +56,7 @@ pub trait RawFileIOInterface: Sized {
fn fread_exact(&mut self, buf: &mut [u8]) -> SDSSResult<()>;
fn fwrite_all(&mut self, bytes: &[u8]) -> SDSSResult<()>;
fn fsync_all(&mut self) -> SDSSResult<()>;
fn flen(&self) -> SDSSResult<u64>;
}
impl RawFileIOInterface for File {
@ -84,6 +85,9 @@ impl RawFileIOInterface for File {
self.sync_all()?;
Ok(())
}
fn flen(&self) -> SDSSResult<u64> {
Ok(self.metadata()?.len())
}
}
#[derive(Debug)]
@ -146,4 +150,7 @@ impl<F: RawFileIOInterface> SDSSFileIO<F> {
pub fn read_to_buffer(&mut self, buffer: &mut [u8]) -> SDSSResult<()> {
self.f.fread_exact(buffer)
}
pub fn file_length(&self) -> SDSSResult<u64> {
self.f.flen()
}
}

@ -100,6 +100,9 @@ impl RawFileIOInterface for VirtualFileInterface {
fn fsync_all(&mut self) -> super::SDSSResult<()> {
Ok(())
}
fn flen(&self) -> SDSSResult<u64> {
vfs(&self.0, |f| Ok(f.data.len() as _))
}
}
mod rw {

@ -25,14 +25,211 @@
*/
/*
+----------------+------------------------------+-----------------+------------------+--------------------+
| EVENT ID (16B) | EVENT SOURCE + METADATA (8B) | EVENT MD5 (16B) | PAYLOAD LEN (8B) | EVENT PAYLOAD (?B) |
+----------------+------------------------------+-----------------+------------------+--------------------+
+----------------+------------------------------+------------------+------------------+--------------------+
| EVENT ID (16B) | EVENT SOURCE + METADATA (8B) | EVENT CRC32 (4B) | PAYLOAD LEN (8B) | EVENT PAYLOAD (?B) |
+----------------+------------------------------+------------------+------------------+--------------------+
Event ID:
- The atomically incrementing event ID (for future scale we have 16B; it's like the ZFS situation haha)
- Event source (1B) + 7B padding (for future metadata)
- Event MD5; yeah, it's "not as strong as" SHA256 but I've chosen to have it here (since it's sometimes faster and further on,
we already sum the entire log)
- Event CRC32
- Payload len: the size of the pyload
- Payload: the payload
*/
use {
super::{
rw::{RawFileIOInterface, SDSSFileIO},
SDSSError, SDSSResult,
},
crate::util::{compiler, copy_a_into_b, copy_slice_to_array as memcpy},
std::marker::PhantomData,
};
/// The transaction adapter
pub trait TransactionLogAdapter {
/// The transaction event
type TransactionEvent;
/// The global state, which we want to modify on decoding the event
type GlobalState;
/// Encode a transaction event into a blob
fn encode(event: Self::TransactionEvent) -> Box<[u8]>;
/// Decode a transaction event and apply it to the global state
fn decode_and_update_state(payload: &[u8], gs: &Self::GlobalState) -> SDSSResult<()>;
}
pub struct TxnLogEntryMetadata {
event_id: u128,
event_source_md: u64,
event_crc: u32,
event_payload_len: u64,
}
impl TxnLogEntryMetadata {
const SIZE: usize = sizeof!(u128) + sizeof!(u64) + sizeof!(u32) + sizeof!(u64);
const P0: usize = 0;
const P1: usize = sizeof!(u128);
const P2: usize = Self::P1 + sizeof!(u64);
const P3: usize = Self::P2 + sizeof!(u32);
pub const fn new(
event_id: u128,
event_source_md: u64,
event_crc: u32,
event_payload_len: u64,
) -> Self {
Self {
event_id,
event_source_md,
event_crc,
event_payload_len,
}
}
/// Encodes the log entry metadata
pub const fn encoded(&self) -> [u8; TxnLogEntryMetadata::SIZE] {
let mut encoded = [0u8; TxnLogEntryMetadata::SIZE];
encoded = copy_a_into_b(self.event_id.to_le_bytes(), encoded, Self::P0);
encoded = copy_a_into_b(self.event_source_md.to_le_bytes(), encoded, Self::P1);
encoded = copy_a_into_b(self.event_crc.to_le_bytes(), encoded, Self::P2);
encoded = copy_a_into_b(self.event_payload_len.to_le_bytes(), encoded, Self::P3);
encoded
}
/// Decodes the log entry metadata (essentially a simply type transmutation)
pub fn decode(data: [u8; TxnLogEntryMetadata::SIZE]) -> Self {
Self::new(
u128::from_le_bytes(memcpy(&data[..Self::P1])),
u64::from_le_bytes(memcpy(&data[Self::P1..Self::P2])),
u32::from_le_bytes(memcpy(&data[Self::P2..Self::P3])),
u64::from_le_bytes(memcpy(&data[Self::P3..])),
)
}
}
/*
Event source:
* * * * _ * * * *
b1 (s+d): event source (unset -> driver, set -> server)
b* -> unused. MUST be unset
b8 (d):
- unset: closed log
*/
pub enum EventSourceMarker {
ServerStandard,
DriverClosed,
}
impl EventSourceMarker {
const SERVER_STD: u64 = 1 << 63;
const DRIVER_CLOSED: u64 = 0;
}
impl TxnLogEntryMetadata {
pub const fn is_server_event(&self) -> bool {
self.event_source_md == EventSourceMarker::SERVER_STD
}
pub const fn is_driver_event(&self) -> bool {
self.event_source_md <= 1
}
pub const fn event_source_marker(&self) -> Option<EventSourceMarker> {
Some(match self.event_source_md {
EventSourceMarker::DRIVER_CLOSED => EventSourceMarker::DriverClosed,
EventSourceMarker::SERVER_STD => EventSourceMarker::ServerStandard,
_ => return None,
})
}
}
#[derive(Debug)]
pub struct TransactionLogReader<TA, LF> {
log_file: SDSSFileIO<LF>,
evid: u64,
closed: bool,
remaining_bytes: u64,
_m: PhantomData<TA>,
}
impl<TA: TransactionLogAdapter, LF: RawFileIOInterface> TransactionLogReader<TA, LF> {
pub fn new(log_file: SDSSFileIO<LF>) -> SDSSResult<Self> {
let log_size = log_file.file_length()?;
Ok(Self {
log_file,
evid: 0,
closed: false,
remaining_bytes: log_size,
_m: PhantomData,
})
}
/// Read the next event and apply it to the global state
pub fn rapply_next_event(&mut self, gs: &TA::GlobalState) -> SDSSResult<()> {
self._incr_evid();
// read metadata
let mut raw_txn_log_row_md = [0u8; TxnLogEntryMetadata::SIZE];
self.log_file.read_to_buffer(&mut raw_txn_log_row_md)?;
let event_metadata = TxnLogEntryMetadata::decode(raw_txn_log_row_md);
/*
verify metadata and read bytes into buffer, verify sum
*/
// verify md
let event_src_marker = event_metadata.event_source_marker();
let okay = (self.evid == (event_metadata.event_id as _))
& event_src_marker.is_some()
& (event_metadata.event_payload_len < (isize::MAX as u64))
& self.has_remaining_bytes(event_metadata.event_payload_len);
if compiler::unlikely(!okay) {
return Err(SDSSError::TransactionLogEntryCorrupted);
}
let event_is_zero =
(event_metadata.event_crc == 0) & (event_metadata.event_payload_len == 0);
let event_src_marker = event_src_marker.unwrap();
match event_src_marker {
EventSourceMarker::ServerStandard => {}
EventSourceMarker::DriverClosed if event_is_zero => {
// expect last entry
if self.end_of_file() {
self.closed = true;
// good
return Ok(());
} else {
return Err(SDSSError::TransactionLogCorrupted);
}
}
_ => return Err(SDSSError::TransactionLogEntryCorrupted),
}
// read bytes
let mut payload_data_block = vec![0u8; event_metadata.event_payload_len as usize];
self.log_file.read_to_buffer(&mut payload_data_block)?;
self._record_bytes_read(event_metadata.event_payload_len as _);
// verify sum
const CRC: crc::Crc<u32> = crc::Crc::<u32>::new(&crc::CRC_32_ISO_HDLC);
let actual_sum = CRC.checksum(&payload_data_block);
if compiler::likely(actual_sum == event_metadata.event_crc) {
// great, the sums match
TA::decode_and_update_state(&payload_data_block, gs)?;
Ok(())
} else {
Err(SDSSError::TransactionLogEntryCorrupted)
}
}
/// Read and apply all events in the given log file to the global state, returning the open file
pub fn scroll(file: SDSSFileIO<LF>, gs: &TA::GlobalState) -> SDSSResult<SDSSFileIO<LF>> {
let mut slf = Self::new(file)?;
while !slf.end_of_file() {
slf.rapply_next_event(gs)?;
}
Ok(slf.log_file)
}
}
impl<TA, LF> TransactionLogReader<TA, LF> {
fn _incr_evid(&mut self) {
self.evid += 1;
}
fn _record_bytes_read(&mut self, cnt: usize) {
self.remaining_bytes -= cnt as u64;
}
fn has_remaining_bytes(&self, size: u64) -> bool {
self.remaining_bytes >= size
}
fn end_of_file(&self) -> bool {
self.remaining_bytes == 0
}
}

@ -376,3 +376,19 @@ pub const fn copy_slice_to_array<const N: usize>(bytes: &[u8]) -> [u8; N] {
pub const fn copy_str_to_array<const N: usize>(str: &str) -> [u8; N] {
copy_slice_to_array(str.as_bytes())
}
/// Copy the elements of a into b, beginning the copy at `pos`
pub const fn copy_a_into_b<const M: usize, const N: usize>(
a: [u8; M],
mut b: [u8; N],
mut pos: usize,
) -> [u8; N] {
assert!(M <= N);
assert!(pos < N);
let mut i = 0;
while i < M {
b[pos] = a[pos];
i += 1;
pos += 1;
}
b
}

Loading…
Cancel
Save