Add mock txn impl and fix txn impls

next
Sayan Nandan 1 year ago
parent 710fd79e64
commit dc4afdc257
No known key found for this signature in database
GPG Key ID: 42EEDF4AE9D96B54

@ -626,7 +626,6 @@ mod try_from_config_source_impls {
has_mutated: bool,
) {
let mut mutated = false;
dbg!(new.is_present(), is_present);
assert_eq!(new.is_present(), is_present);
assert_eq!(new.mutate_failed(&mut default, &mut mutated), mutate_failed);
assert_eq!(mutated, has_mutated);

@ -95,12 +95,16 @@ impl FileScope {
#[repr(u8)]
pub enum FileSpecifier {
GNSTxnLog = 0,
#[cfg(test)]
TestTransactionLog = 1,
}
impl FileSpecifier {
pub const fn try_new(v: u32) -> Option<Self> {
Some(match v {
0 => Self::GNSTxnLog,
#[cfg(test)]
1 => Self::TestTransactionLog,
_ => return None,
})
}

@ -68,10 +68,10 @@ impl RawFileIOInterface for File {
.write(true)
.open(file_path)?;
let md = f.metadata()?;
if md.created()? == md.modified()? {
return Ok(RawFileOpen::Created(f));
if md.len() == 0 {
Ok(RawFileOpen::Created(f))
} else {
return Ok(RawFileOpen::Existing(f));
Ok(RawFileOpen::Existing(f))
}
}
fn fread_exact(&mut self, buf: &mut [u8]) -> SDSSResult<()> {
@ -101,6 +101,7 @@ pub struct SDSSFileIO<F> {
}
impl<F: RawFileIOInterface> SDSSFileIO<F> {
/// **IMPORTANT: File position: end-of-header-section**
pub fn open_or_create_perm_rw(
file_path: &str,
file_scope: FileScope,
@ -140,7 +141,9 @@ impl<F: RawFileIOInterface> SDSSFileIO<F> {
let mut new_header = header.clone();
new_header.dr_rs_mut().bump_modify_count();
let mut f = Self::_new(f);
f.seek_ahead(0)?;
f.fsynced_write(new_header.encoded().array().as_ref())?;
f.seek_ahead(SDSSHeaderRaw::header_size() as _)?;
Ok(FileOpen::Existing(f, header))
}
}

@ -85,14 +85,34 @@ impl VirtualFile {
fn data_mut(&mut self) -> &mut [u8] {
&mut self.data[self.pos as usize..]
}
fn close(&mut self) {
self.pos = 0;
self.read = false;
self.write = false;
}
}
struct VirtualFileInterface(Box<str>);
impl Drop for VirtualFileInterface {
fn drop(&mut self) {
vfs(&self.0, |f| {
f.close();
Ok(())
})
.unwrap();
}
}
impl RawFileIOInterface for VirtualFileInterface {
fn fopen_or_create_rw(file_path: &str) -> SDSSResult<RawFileOpen<Self>> {
match VFS.write().entry(file_path.to_owned()) {
Entry::Occupied(_) => Ok(RawFileOpen::Existing(Self(file_path.into()))),
Entry::Occupied(mut oe) => {
let file_md = oe.get_mut();
file_md.read = true;
file_md.write = true;
Ok(RawFileOpen::Existing(Self(file_path.into())))
}
Entry::Vacant(ve) => {
ve.insert(VirtualFile::rw(vec![]));
Ok(RawFileOpen::Created(Self(file_path.into())))
@ -131,18 +151,18 @@ impl RawFileIOInterface for VirtualFileInterface {
}
}
type VirtualFS = VirtualFileInterface;
type RealFS = std::fs::File;
mod rw {
use {
super::VirtualFileInterface,
crate::engine::storage::v1::{
header_impl::{FileScope, FileSpecifier, FileSpecifierVersion, HostRunMode},
rw::{FileOpen, SDSSFileIO},
},
use crate::engine::storage::v1::{
header_impl::{FileScope, FileSpecifier, FileSpecifierVersion, HostRunMode},
rw::{FileOpen, SDSSFileIO},
};
#[test]
fn create_delete() {
let f = SDSSFileIO::<VirtualFileInterface>::open_or_create_perm_rw(
let f = SDSSFileIO::<super::VirtualFS>::open_or_create_perm_rw(
"hello_world.db-tlog",
FileScope::TransactionLogCompacted,
FileSpecifier::GNSTxnLog,
@ -156,7 +176,7 @@ mod rw {
FileOpen::Existing(_, _) => panic!(),
FileOpen::Created(_) => {}
};
let open = SDSSFileIO::<VirtualFileInterface>::open_or_create_perm_rw(
let open = SDSSFileIO::<super::VirtualFS>::open_or_create_perm_rw(
"hello_world.db-tlog",
FileScope::TransactionLogCompacted,
FileSpecifier::GNSTxnLog,
@ -178,3 +198,146 @@ mod rw {
assert_eq!(h.gr_hr().startup_counter(), 0);
}
}
mod tx {
use crate::engine::storage::v1::header_impl::{
FileSpecifier, FileSpecifierVersion, HostRunMode,
};
type FileInterface = super::RealFS;
use {
crate::{
engine::storage::v1::{
txn::{self, TransactionLogAdapter, TransactionLogWriter},
SDSSError, SDSSResult,
},
util,
},
std::cell::RefCell,
};
pub struct Database {
data: RefCell<[u8; 10]>,
}
impl Database {
fn copy_data(&self) -> [u8; 10] {
*self.data.borrow()
}
fn new() -> Self {
Self {
data: RefCell::new([0; 10]),
}
}
fn reset(&self) {
*self.data.borrow_mut() = [0; 10];
}
fn txn_reset(
&self,
txn_writer: &mut TransactionLogWriter<FileInterface, DatabaseTxnAdapter>,
) -> SDSSResult<()> {
self.reset();
txn_writer.append_event(TxEvent::Reset)
}
fn set(&self, pos: usize, val: u8) {
self.data.borrow_mut()[pos] = val;
}
fn txn_set(
&self,
pos: usize,
val: u8,
txn_writer: &mut TransactionLogWriter<FileInterface, DatabaseTxnAdapter>,
) -> SDSSResult<()> {
self.set(pos, val);
txn_writer.append_event(TxEvent::Set(pos, val))
}
}
pub enum TxEvent {
Reset,
Set(usize, u8),
}
#[derive(Debug)]
pub struct DatabaseTxnAdapter;
impl TransactionLogAdapter for DatabaseTxnAdapter {
type TransactionEvent = TxEvent;
type GlobalState = Database;
fn encode(event: Self::TransactionEvent) -> Box<[u8]> {
/*
[1B: opcode][8B:Index][1B: New value]
*/
let opcode = match event {
TxEvent::Reset => 0u8,
TxEvent::Set(_, _) => 1u8,
};
let index = match event {
TxEvent::Reset => 0u64,
TxEvent::Set(index, _) => index as u64,
};
let new_value = match event {
TxEvent::Reset => 0,
TxEvent::Set(_, val) => val,
};
let mut ret = Vec::with_capacity(10);
ret.push(opcode);
ret.extend(index.to_le_bytes());
ret.push(new_value);
ret.into_boxed_slice()
}
fn decode_and_update_state(payload: &[u8], gs: &Self::GlobalState) -> SDSSResult<()> {
if payload.len() != 10 {
return Err(SDSSError::CorruptedFile("testtxn.log"));
}
let opcode = payload[0];
let index = u64::from_le_bytes(util::copy_slice_to_array(&payload[1..9]));
let new_value = payload[9];
match opcode {
0 if index == 0 && new_value == 0 => gs.reset(),
1 if index < 10 && index < isize::MAX as u64 => gs.set(index as usize, new_value),
_ => return Err(SDSSError::TransactionLogEntryCorrupted),
}
Ok(())
}
}
#[test]
fn two_set() {
// create log
let db1 = Database::new();
let x = || -> SDSSResult<()> {
let mut log = txn::open_log(
"testtxn.log",
FileSpecifier::TestTransactionLog,
FileSpecifierVersion::__new(0),
0,
HostRunMode::Prod,
1,
&db1,
)?;
db1.txn_set(0, 20, &mut log)?;
db1.txn_set(9, 21, &mut log)?;
log.close_log()
};
x().unwrap();
// backup original data
let original_data = db1.copy_data();
// restore log
let empty_db2 = Database::new();
{
let log = txn::open_log::<DatabaseTxnAdapter, FileInterface>(
"testtxn.log",
FileSpecifier::TestTransactionLog,
FileSpecifierVersion::__new(0),
0,
HostRunMode::Prod,
1,
&empty_db2,
)
.unwrap();
log.close_log().unwrap();
}
assert_eq!(original_data, empty_db2.copy_data());
std::fs::remove_file("testtxn.log").unwrap();
}
}

@ -38,7 +38,7 @@
use {
super::{
header_impl::{FileSpecifierVersion, HostRunMode},
header_impl::{FileSpecifierVersion, HostRunMode, SDSSHeaderRaw},
rw::{FileOpen, RawFileIOInterface, SDSSFileIO},
SDSSError, SDSSResult,
},
@ -51,7 +51,10 @@ use {
const CRC: crc::Crc<u32> = crc::Crc::<u32>::new(&crc::CRC_32_ISO_HDLC);
pub fn open_log<TA: TransactionLogAdapter, LF: RawFileIOInterface>(
pub fn open_log<
TA: TransactionLogAdapter + core::fmt::Debug,
LF: RawFileIOInterface + core::fmt::Debug,
>(
log_file_name: &str,
log_kind: FileSpecifier,
log_kind_version: FileSpecifierVersion,
@ -70,11 +73,11 @@ pub fn open_log<TA: TransactionLogAdapter, LF: RawFileIOInterface>(
host_startup_counter,
)?;
let file = match f {
FileOpen::Created(f) => return TransactionLogWriter::new(f, 0, 0),
FileOpen::Created(f) => return TransactionLogWriter::new(f, 0),
FileOpen::Existing(file, _) => file,
};
let (file, size, last_txn) = TransactionLogReader::<TA, LF>::scroll(file, gs)?;
TransactionLogWriter::new(file, size, last_txn)
let (file, last_txn) = TransactionLogReader::<TA, LF>::scroll(file, gs)?;
TransactionLogWriter::new(file, last_txn)
}
/// The transaction adapter
@ -89,6 +92,7 @@ pub trait TransactionLogAdapter {
fn decode_and_update_state(payload: &[u8], gs: &Self::GlobalState) -> SDSSResult<()>;
}
#[derive(Debug)]
pub struct TxnLogEntryMetadata {
event_id: u128,
event_source_md: u64,
@ -182,7 +186,7 @@ pub struct TransactionLogReader<TA, LF> {
impl<TA: TransactionLogAdapter, LF: RawFileIOInterface> TransactionLogReader<TA, LF> {
pub fn new(log_file: SDSSFileIO<LF>) -> SDSSResult<Self> {
let log_size = log_file.file_length()?;
let log_size = log_file.file_length()? - SDSSHeaderRaw::header_size() as u64;
Ok(Self {
log_file,
log_size,
@ -194,10 +198,10 @@ impl<TA: TransactionLogAdapter, LF: RawFileIOInterface> TransactionLogReader<TA,
}
/// Read the next event and apply it to the global state
pub fn rapply_next_event(&mut self, gs: &TA::GlobalState) -> SDSSResult<()> {
self._incr_evid();
// read metadata
let mut raw_txn_log_row_md = [0u8; TxnLogEntryMetadata::SIZE];
self.log_file.read_to_buffer(&mut raw_txn_log_row_md)?;
self._record_bytes_read(36);
let event_metadata = TxnLogEntryMetadata::decode(raw_txn_log_row_md);
/*
verify metadata and read bytes into buffer, verify sum
@ -226,7 +230,9 @@ impl<TA: TransactionLogAdapter, LF: RawFileIOInterface> TransactionLogReader<TA,
return Err(SDSSError::TransactionLogCorrupted);
}
}
_ => return Err(SDSSError::TransactionLogEntryCorrupted),
EventSourceMarker::DriverClosed => {
return Err(SDSSError::TransactionLogEntryCorrupted);
}
}
// read bytes
let mut payload_data_block = vec![0u8; event_metadata.event_payload_len as usize];
@ -235,6 +241,7 @@ impl<TA: TransactionLogAdapter, LF: RawFileIOInterface> TransactionLogReader<TA,
// verify sum
let actual_sum = CRC.checksum(&payload_data_block);
if compiler::likely(actual_sum == event_metadata.event_crc) {
self._incr_evid();
// great, the sums match
TA::decode_and_update_state(&payload_data_block, gs)?;
Ok(())
@ -242,17 +249,17 @@ impl<TA: TransactionLogAdapter, LF: RawFileIOInterface> TransactionLogReader<TA,
Err(SDSSError::TransactionLogEntryCorrupted)
}
}
/// Read and apply all events in the given log file to the global state, returning the
/// (open file, log size, last event ID)
pub fn scroll(
file: SDSSFileIO<LF>,
gs: &TA::GlobalState,
) -> SDSSResult<(SDSSFileIO<LF>, u64, u64)> {
/// Read and apply all events in the given log file to the global state, returning the (open file, last event ID)
pub fn scroll(file: SDSSFileIO<LF>, gs: &TA::GlobalState) -> SDSSResult<(SDSSFileIO<LF>, u64)> {
let mut slf = Self::new(file)?;
while !slf.end_of_file() {
slf.rapply_next_event(gs)?;
}
Ok((slf.log_file, slf.log_size, slf.evid))
if slf.closed {
Ok((slf.log_file, slf.evid))
} else {
Err(SDSSError::TransactionLogCorrupted)
}
}
}
@ -280,8 +287,9 @@ pub struct TransactionLogWriter<LF, TA> {
}
impl<LF: RawFileIOInterface, TA: TransactionLogAdapter> TransactionLogWriter<LF, TA> {
pub fn new(mut log_file: SDSSFileIO<LF>, last_size: u64, last_txn_id: u64) -> SDSSResult<Self> {
log_file.seek_ahead(last_size)?;
pub fn new(mut log_file: SDSSFileIO<LF>, last_txn_id: u64) -> SDSSResult<Self> {
let l = log_file.file_length()?;
log_file.seek_ahead(l)?;
Ok(Self {
log_file,
id: last_txn_id,
@ -303,10 +311,9 @@ impl<LF: RawFileIOInterface, TA: TransactionLogAdapter> TransactionLogWriter<LF,
Ok(())
}
pub fn close_log(mut self) -> SDSSResult<()> {
let crc = CRC.checksum(EventSourceMarker::DRIVER_CLOSED.to_le_bytes().as_ref());
let id = self._incr_id() as u128;
self.log_file.fsynced_write(
&TxnLogEntryMetadata::new(id, EventSourceMarker::DRIVER_CLOSED, crc, 0).encoded(),
&TxnLogEntryMetadata::new(id, EventSourceMarker::DRIVER_CLOSED, 0, 0).encoded(),
)
}
}

@ -378,17 +378,17 @@ pub const fn copy_str_to_array<const N: usize>(str: &str) -> [u8; N] {
}
/// Copy the elements of a into b, beginning the copy at `pos`
pub const fn copy_a_into_b<const M: usize, const N: usize>(
a: [u8; M],
mut b: [u8; N],
from: [u8; M],
mut to: [u8; N],
mut pos: usize,
) -> [u8; N] {
assert!(M <= N);
assert!(pos < N);
let mut i = 0;
while i < M {
b[pos] = a[pos];
to[pos] = from[i];
i += 1;
pos += 1;
}
b
to
}

@ -24,6 +24,8 @@
*
*/
use std::io::Read;
use {
rand::{
distributions::{uniform::SampleUniform, Alphanumeric},
@ -36,6 +38,18 @@ use {
},
};
pub fn wait_for_key(msg: &str) {
use std::io::{self, Write};
print!("{msg}");
let x = || -> std::io::Result<()> {
io::stdout().flush()?;
let mut key = [0u8; 1];
io::stdin().read_exact(&mut key)?;
Ok(())
};
x().unwrap();
}
// TODO(@ohsayan): Use my own PRNG algo here. Maybe my quadratic one?
/// Generates a random boolean based on Bernoulli distributions

Loading…
Cancel
Save