From dc4afdc257fc9ba2a65e0321b1a48dc2ce1fc8bf Mon Sep 17 00:00:00 2001 From: Sayan Nandan Date: Mon, 31 Jul 2023 07:52:36 +0000 Subject: [PATCH] Add mock txn impl and fix txn impls --- server/src/config/tests.rs | 1 - .../src/engine/storage/v1/header_impl/mod.rs | 4 + server/src/engine/storage/v1/rw.rs | 9 +- server/src/engine/storage/v1/tests.rs | 181 +++++++++++++++++- server/src/engine/storage/v1/txn.rs | 45 +++-- server/src/util/mod.rs | 8 +- server/src/util/test_utils.rs | 14 ++ 7 files changed, 226 insertions(+), 36 deletions(-) diff --git a/server/src/config/tests.rs b/server/src/config/tests.rs index 2e425c87..ca06e29e 100644 --- a/server/src/config/tests.rs +++ b/server/src/config/tests.rs @@ -626,7 +626,6 @@ mod try_from_config_source_impls { has_mutated: bool, ) { let mut mutated = false; - dbg!(new.is_present(), is_present); assert_eq!(new.is_present(), is_present); assert_eq!(new.mutate_failed(&mut default, &mut mutated), mutate_failed); assert_eq!(mutated, has_mutated); diff --git a/server/src/engine/storage/v1/header_impl/mod.rs b/server/src/engine/storage/v1/header_impl/mod.rs index 75e6bb9e..4e2fa051 100644 --- a/server/src/engine/storage/v1/header_impl/mod.rs +++ b/server/src/engine/storage/v1/header_impl/mod.rs @@ -95,12 +95,16 @@ impl FileScope { #[repr(u8)] pub enum FileSpecifier { GNSTxnLog = 0, + #[cfg(test)] + TestTransactionLog = 1, } impl FileSpecifier { pub const fn try_new(v: u32) -> Option { Some(match v { 0 => Self::GNSTxnLog, + #[cfg(test)] + 1 => Self::TestTransactionLog, _ => return None, }) } diff --git a/server/src/engine/storage/v1/rw.rs b/server/src/engine/storage/v1/rw.rs index 2c15b3d8..c59cf9a8 100644 --- a/server/src/engine/storage/v1/rw.rs +++ b/server/src/engine/storage/v1/rw.rs @@ -68,10 +68,10 @@ impl RawFileIOInterface for File { .write(true) .open(file_path)?; let md = f.metadata()?; - if md.created()? == md.modified()? { - return Ok(RawFileOpen::Created(f)); + if md.len() == 0 { + Ok(RawFileOpen::Created(f)) } else { - return Ok(RawFileOpen::Existing(f)); + Ok(RawFileOpen::Existing(f)) } } fn fread_exact(&mut self, buf: &mut [u8]) -> SDSSResult<()> { @@ -101,6 +101,7 @@ pub struct SDSSFileIO { } impl SDSSFileIO { + /// **IMPORTANT: File position: end-of-header-section** pub fn open_or_create_perm_rw( file_path: &str, file_scope: FileScope, @@ -140,7 +141,9 @@ impl SDSSFileIO { let mut new_header = header.clone(); new_header.dr_rs_mut().bump_modify_count(); let mut f = Self::_new(f); + f.seek_ahead(0)?; f.fsynced_write(new_header.encoded().array().as_ref())?; + f.seek_ahead(SDSSHeaderRaw::header_size() as _)?; Ok(FileOpen::Existing(f, header)) } } diff --git a/server/src/engine/storage/v1/tests.rs b/server/src/engine/storage/v1/tests.rs index ce0e15e4..97b65c6c 100644 --- a/server/src/engine/storage/v1/tests.rs +++ b/server/src/engine/storage/v1/tests.rs @@ -85,14 +85,34 @@ impl VirtualFile { fn data_mut(&mut self) -> &mut [u8] { &mut self.data[self.pos as usize..] } + fn close(&mut self) { + self.pos = 0; + self.read = false; + self.write = false; + } } struct VirtualFileInterface(Box); +impl Drop for VirtualFileInterface { + fn drop(&mut self) { + vfs(&self.0, |f| { + f.close(); + Ok(()) + }) + .unwrap(); + } +} + impl RawFileIOInterface for VirtualFileInterface { fn fopen_or_create_rw(file_path: &str) -> SDSSResult> { match VFS.write().entry(file_path.to_owned()) { - Entry::Occupied(_) => Ok(RawFileOpen::Existing(Self(file_path.into()))), + Entry::Occupied(mut oe) => { + let file_md = oe.get_mut(); + file_md.read = true; + file_md.write = true; + Ok(RawFileOpen::Existing(Self(file_path.into()))) + } Entry::Vacant(ve) => { ve.insert(VirtualFile::rw(vec![])); Ok(RawFileOpen::Created(Self(file_path.into()))) @@ -131,18 +151,18 @@ impl RawFileIOInterface for VirtualFileInterface { } } +type VirtualFS = VirtualFileInterface; +type RealFS = std::fs::File; + mod rw { - use { - super::VirtualFileInterface, - crate::engine::storage::v1::{ - header_impl::{FileScope, FileSpecifier, FileSpecifierVersion, HostRunMode}, - rw::{FileOpen, SDSSFileIO}, - }, + use crate::engine::storage::v1::{ + header_impl::{FileScope, FileSpecifier, FileSpecifierVersion, HostRunMode}, + rw::{FileOpen, SDSSFileIO}, }; #[test] fn create_delete() { - let f = SDSSFileIO::::open_or_create_perm_rw( + let f = SDSSFileIO::::open_or_create_perm_rw( "hello_world.db-tlog", FileScope::TransactionLogCompacted, FileSpecifier::GNSTxnLog, @@ -156,7 +176,7 @@ mod rw { FileOpen::Existing(_, _) => panic!(), FileOpen::Created(_) => {} }; - let open = SDSSFileIO::::open_or_create_perm_rw( + let open = SDSSFileIO::::open_or_create_perm_rw( "hello_world.db-tlog", FileScope::TransactionLogCompacted, FileSpecifier::GNSTxnLog, @@ -178,3 +198,146 @@ mod rw { assert_eq!(h.gr_hr().startup_counter(), 0); } } + +mod tx { + use crate::engine::storage::v1::header_impl::{ + FileSpecifier, FileSpecifierVersion, HostRunMode, + }; + + type FileInterface = super::RealFS; + + use { + crate::{ + engine::storage::v1::{ + txn::{self, TransactionLogAdapter, TransactionLogWriter}, + SDSSError, SDSSResult, + }, + util, + }, + std::cell::RefCell, + }; + pub struct Database { + data: RefCell<[u8; 10]>, + } + impl Database { + fn copy_data(&self) -> [u8; 10] { + *self.data.borrow() + } + fn new() -> Self { + Self { + data: RefCell::new([0; 10]), + } + } + fn reset(&self) { + *self.data.borrow_mut() = [0; 10]; + } + fn txn_reset( + &self, + txn_writer: &mut TransactionLogWriter, + ) -> SDSSResult<()> { + self.reset(); + txn_writer.append_event(TxEvent::Reset) + } + fn set(&self, pos: usize, val: u8) { + self.data.borrow_mut()[pos] = val; + } + fn txn_set( + &self, + pos: usize, + val: u8, + txn_writer: &mut TransactionLogWriter, + ) -> SDSSResult<()> { + self.set(pos, val); + txn_writer.append_event(TxEvent::Set(pos, val)) + } + } + pub enum TxEvent { + Reset, + Set(usize, u8), + } + #[derive(Debug)] + pub struct DatabaseTxnAdapter; + impl TransactionLogAdapter for DatabaseTxnAdapter { + type TransactionEvent = TxEvent; + + type GlobalState = Database; + + fn encode(event: Self::TransactionEvent) -> Box<[u8]> { + /* + [1B: opcode][8B:Index][1B: New value] + */ + let opcode = match event { + TxEvent::Reset => 0u8, + TxEvent::Set(_, _) => 1u8, + }; + let index = match event { + TxEvent::Reset => 0u64, + TxEvent::Set(index, _) => index as u64, + }; + let new_value = match event { + TxEvent::Reset => 0, + TxEvent::Set(_, val) => val, + }; + let mut ret = Vec::with_capacity(10); + ret.push(opcode); + ret.extend(index.to_le_bytes()); + ret.push(new_value); + ret.into_boxed_slice() + } + + fn decode_and_update_state(payload: &[u8], gs: &Self::GlobalState) -> SDSSResult<()> { + if payload.len() != 10 { + return Err(SDSSError::CorruptedFile("testtxn.log")); + } + let opcode = payload[0]; + let index = u64::from_le_bytes(util::copy_slice_to_array(&payload[1..9])); + let new_value = payload[9]; + match opcode { + 0 if index == 0 && new_value == 0 => gs.reset(), + 1 if index < 10 && index < isize::MAX as u64 => gs.set(index as usize, new_value), + _ => return Err(SDSSError::TransactionLogEntryCorrupted), + } + Ok(()) + } + } + + #[test] + fn two_set() { + // create log + let db1 = Database::new(); + let x = || -> SDSSResult<()> { + let mut log = txn::open_log( + "testtxn.log", + FileSpecifier::TestTransactionLog, + FileSpecifierVersion::__new(0), + 0, + HostRunMode::Prod, + 1, + &db1, + )?; + db1.txn_set(0, 20, &mut log)?; + db1.txn_set(9, 21, &mut log)?; + log.close_log() + }; + x().unwrap(); + // backup original data + let original_data = db1.copy_data(); + // restore log + let empty_db2 = Database::new(); + { + let log = txn::open_log::( + "testtxn.log", + FileSpecifier::TestTransactionLog, + FileSpecifierVersion::__new(0), + 0, + HostRunMode::Prod, + 1, + &empty_db2, + ) + .unwrap(); + log.close_log().unwrap(); + } + assert_eq!(original_data, empty_db2.copy_data()); + std::fs::remove_file("testtxn.log").unwrap(); + } +} diff --git a/server/src/engine/storage/v1/txn.rs b/server/src/engine/storage/v1/txn.rs index d9410551..4fce4b97 100644 --- a/server/src/engine/storage/v1/txn.rs +++ b/server/src/engine/storage/v1/txn.rs @@ -38,7 +38,7 @@ use { super::{ - header_impl::{FileSpecifierVersion, HostRunMode}, + header_impl::{FileSpecifierVersion, HostRunMode, SDSSHeaderRaw}, rw::{FileOpen, RawFileIOInterface, SDSSFileIO}, SDSSError, SDSSResult, }, @@ -51,7 +51,10 @@ use { const CRC: crc::Crc = crc::Crc::::new(&crc::CRC_32_ISO_HDLC); -pub fn open_log( +pub fn open_log< + TA: TransactionLogAdapter + core::fmt::Debug, + LF: RawFileIOInterface + core::fmt::Debug, +>( log_file_name: &str, log_kind: FileSpecifier, log_kind_version: FileSpecifierVersion, @@ -70,11 +73,11 @@ pub fn open_log( host_startup_counter, )?; let file = match f { - FileOpen::Created(f) => return TransactionLogWriter::new(f, 0, 0), + FileOpen::Created(f) => return TransactionLogWriter::new(f, 0), FileOpen::Existing(file, _) => file, }; - let (file, size, last_txn) = TransactionLogReader::::scroll(file, gs)?; - TransactionLogWriter::new(file, size, last_txn) + let (file, last_txn) = TransactionLogReader::::scroll(file, gs)?; + TransactionLogWriter::new(file, last_txn) } /// The transaction adapter @@ -89,6 +92,7 @@ pub trait TransactionLogAdapter { fn decode_and_update_state(payload: &[u8], gs: &Self::GlobalState) -> SDSSResult<()>; } +#[derive(Debug)] pub struct TxnLogEntryMetadata { event_id: u128, event_source_md: u64, @@ -182,7 +186,7 @@ pub struct TransactionLogReader { impl TransactionLogReader { pub fn new(log_file: SDSSFileIO) -> SDSSResult { - let log_size = log_file.file_length()?; + let log_size = log_file.file_length()? - SDSSHeaderRaw::header_size() as u64; Ok(Self { log_file, log_size, @@ -194,10 +198,10 @@ impl TransactionLogReader SDSSResult<()> { - self._incr_evid(); // read metadata let mut raw_txn_log_row_md = [0u8; TxnLogEntryMetadata::SIZE]; self.log_file.read_to_buffer(&mut raw_txn_log_row_md)?; + self._record_bytes_read(36); let event_metadata = TxnLogEntryMetadata::decode(raw_txn_log_row_md); /* verify metadata and read bytes into buffer, verify sum @@ -226,7 +230,9 @@ impl TransactionLogReader return Err(SDSSError::TransactionLogEntryCorrupted), + EventSourceMarker::DriverClosed => { + return Err(SDSSError::TransactionLogEntryCorrupted); + } } // read bytes let mut payload_data_block = vec![0u8; event_metadata.event_payload_len as usize]; @@ -235,6 +241,7 @@ impl TransactionLogReader TransactionLogReader, - gs: &TA::GlobalState, - ) -> SDSSResult<(SDSSFileIO, u64, u64)> { + /// Read and apply all events in the given log file to the global state, returning the (open file, last event ID) + pub fn scroll(file: SDSSFileIO, gs: &TA::GlobalState) -> SDSSResult<(SDSSFileIO, u64)> { let mut slf = Self::new(file)?; while !slf.end_of_file() { slf.rapply_next_event(gs)?; } - Ok((slf.log_file, slf.log_size, slf.evid)) + if slf.closed { + Ok((slf.log_file, slf.evid)) + } else { + Err(SDSSError::TransactionLogCorrupted) + } } } @@ -280,8 +287,9 @@ pub struct TransactionLogWriter { } impl TransactionLogWriter { - pub fn new(mut log_file: SDSSFileIO, last_size: u64, last_txn_id: u64) -> SDSSResult { - log_file.seek_ahead(last_size)?; + pub fn new(mut log_file: SDSSFileIO, last_txn_id: u64) -> SDSSResult { + let l = log_file.file_length()?; + log_file.seek_ahead(l)?; Ok(Self { log_file, id: last_txn_id, @@ -303,10 +311,9 @@ impl TransactionLogWriter SDSSResult<()> { - let crc = CRC.checksum(EventSourceMarker::DRIVER_CLOSED.to_le_bytes().as_ref()); let id = self._incr_id() as u128; self.log_file.fsynced_write( - &TxnLogEntryMetadata::new(id, EventSourceMarker::DRIVER_CLOSED, crc, 0).encoded(), + &TxnLogEntryMetadata::new(id, EventSourceMarker::DRIVER_CLOSED, 0, 0).encoded(), ) } } diff --git a/server/src/util/mod.rs b/server/src/util/mod.rs index 98783b33..8d971c4c 100644 --- a/server/src/util/mod.rs +++ b/server/src/util/mod.rs @@ -378,17 +378,17 @@ pub const fn copy_str_to_array(str: &str) -> [u8; N] { } /// Copy the elements of a into b, beginning the copy at `pos` pub const fn copy_a_into_b( - a: [u8; M], - mut b: [u8; N], + from: [u8; M], + mut to: [u8; N], mut pos: usize, ) -> [u8; N] { assert!(M <= N); assert!(pos < N); let mut i = 0; while i < M { - b[pos] = a[pos]; + to[pos] = from[i]; i += 1; pos += 1; } - b + to } diff --git a/server/src/util/test_utils.rs b/server/src/util/test_utils.rs index f35f1034..01d80524 100644 --- a/server/src/util/test_utils.rs +++ b/server/src/util/test_utils.rs @@ -24,6 +24,8 @@ * */ +use std::io::Read; + use { rand::{ distributions::{uniform::SampleUniform, Alphanumeric}, @@ -36,6 +38,18 @@ use { }, }; +pub fn wait_for_key(msg: &str) { + use std::io::{self, Write}; + print!("{msg}"); + let x = || -> std::io::Result<()> { + io::stdout().flush()?; + let mut key = [0u8; 1]; + io::stdin().read_exact(&mut key)?; + Ok(()) + }; + x().unwrap(); +} + // TODO(@ohsayan): Use my own PRNG algo here. Maybe my quadratic one? /// Generates a random boolean based on Bernoulli distributions