Remove the delete entry once validated

next
Sayan Nandan 1 year ago
parent dc4afdc257
commit c1f8e2d1bd
No known key found for this signature in database
GPG Key ID: 42EEDF4AE9D96B54

@ -58,6 +58,7 @@ pub trait RawFileIOInterface: Sized {
fn fsync_all(&mut self) -> SDSSResult<()>;
fn fseek_ahead(&mut self, by: u64) -> SDSSResult<()>;
fn flen(&self) -> SDSSResult<u64>;
fn flen_set(&mut self, to: u64) -> SDSSResult<()>;
}
impl RawFileIOInterface for File {
@ -93,6 +94,10 @@ impl RawFileIOInterface for File {
self.seek(SeekFrom::Start(by))?;
Ok(())
}
fn flen_set(&mut self, to: u64) -> SDSSResult<()> {
self.set_len(to)?;
Ok(())
}
}
#[derive(Debug)]
@ -141,9 +146,9 @@ impl<F: RawFileIOInterface> SDSSFileIO<F> {
let mut new_header = header.clone();
new_header.dr_rs_mut().bump_modify_count();
let mut f = Self::_new(f);
f.seek_ahead(0)?;
f.seek_from_start(0)?;
f.fsynced_write(new_header.encoded().array().as_ref())?;
f.seek_ahead(SDSSHeaderRaw::header_size() as _)?;
f.seek_from_start(SDSSHeaderRaw::header_size() as _)?;
Ok(FileOpen::Existing(f, header))
}
}
@ -171,7 +176,10 @@ impl<F: RawFileIOInterface> SDSSFileIO<F> {
pub fn file_length(&self) -> SDSSResult<u64> {
self.f.flen()
}
pub fn seek_ahead(&mut self, by: u64) -> SDSSResult<()> {
pub fn seek_from_start(&mut self, by: u64) -> SDSSResult<()> {
self.f.fseek_ahead(by)
}
pub fn trim_file_to(&mut self, to: u64) -> SDSSResult<()> {
self.f.flen_set(to)
}
}

@ -149,6 +149,12 @@ impl RawFileIOInterface for VirtualFileInterface {
Ok(())
})
}
fn flen_set(&mut self, to: u64) -> SDSSResult<()> {
vfs(&self.0, |f| {
f.data.drain(f.data.len() - to as usize..);
Ok(())
})
}
}
type VirtualFS = VirtualFileInterface;
@ -301,20 +307,27 @@ mod tx {
}
}
fn open_log(
log_name: &str,
db: &Database,
) -> SDSSResult<TransactionLogWriter<FileInterface, DatabaseTxnAdapter>> {
txn::open_log::<DatabaseTxnAdapter, FileInterface>(
log_name,
FileSpecifier::TestTransactionLog,
FileSpecifierVersion::__new(0),
0,
HostRunMode::Prod,
1,
&db,
)
}
#[test]
fn two_set() {
fn first_boot_second_readonly() {
// create log
let db1 = Database::new();
let x = || -> SDSSResult<()> {
let mut log = txn::open_log(
"testtxn.log",
FileSpecifier::TestTransactionLog,
FileSpecifierVersion::__new(0),
0,
HostRunMode::Prod,
1,
&db1,
)?;
let mut log = open_log("testtxn.log", &db1)?;
db1.txn_set(0, 20, &mut log)?;
db1.txn_set(9, 21, &mut log)?;
log.close_log()
@ -324,20 +337,54 @@ mod tx {
let original_data = db1.copy_data();
// restore log
let empty_db2 = Database::new();
{
let log = txn::open_log::<DatabaseTxnAdapter, FileInterface>(
"testtxn.log",
FileSpecifier::TestTransactionLog,
FileSpecifierVersion::__new(0),
0,
HostRunMode::Prod,
1,
&empty_db2,
)
open_log("testtxn.log", &empty_db2)
.unwrap()
.close_log()
.unwrap();
log.close_log().unwrap();
}
assert_eq!(original_data, empty_db2.copy_data());
std::fs::remove_file("testtxn.log").unwrap();
}
#[test]
fn oneboot_mod_twoboot_mod_thirdboot_read() {
// first boot: set all to 1
let db1 = Database::new();
let x = || -> SDSSResult<()> {
let mut log = open_log("duatxn.db-tlog", &db1)?;
for i in 0..10 {
db1.txn_set(i, 1, &mut log)?;
}
log.close_log()
};
x().unwrap();
let bkp_db1 = db1.copy_data();
drop(db1);
// second boot
let db2 = Database::new();
let x = || -> SDSSResult<()> {
let mut log = open_log("duatxn.db-tlog", &db2)?;
assert_eq!(bkp_db1, db2.copy_data());
for i in 0..10 {
let current_val = db2.data.borrow()[i];
db2.txn_set(i, current_val + i as u8, &mut log)?;
}
log.close_log()
};
x().unwrap();
let bkp_db2 = db2.copy_data();
drop(db2);
// third boot
let db3 = Database::new();
let log = open_log("duatxn.db-tlog", &db3).unwrap();
log.close_log().unwrap();
assert_eq!(bkp_db2, db3.copy_data());
assert_eq!(
db3.copy_data(),
(1..=10)
.into_iter()
.map(u8::from)
.collect::<Box<[u8]>>()
.as_ref()
);
std::fs::remove_file("duatxn.db-tlog").unwrap();
}
}

@ -201,7 +201,7 @@ impl<TA: TransactionLogAdapter, LF: RawFileIOInterface> TransactionLogReader<TA,
// read metadata
let mut raw_txn_log_row_md = [0u8; TxnLogEntryMetadata::SIZE];
self.log_file.read_to_buffer(&mut raw_txn_log_row_md)?;
self._record_bytes_read(36);
self._record_bytes_read(TxnLogEntryMetadata::SIZE);
let event_metadata = TxnLogEntryMetadata::decode(raw_txn_log_row_md);
/*
verify metadata and read bytes into buffer, verify sum
@ -223,6 +223,16 @@ impl<TA: TransactionLogAdapter, LF: RawFileIOInterface> TransactionLogReader<TA,
EventSourceMarker::DriverClosed if event_is_zero => {
// expect last entry
if self.end_of_file() {
// remove the delete entry
self.log_file.seek_from_start(
self.log_size - TxnLogEntryMetadata::SIZE as u64
+ SDSSHeaderRaw::header_size() as u64,
)?;
self.log_file.trim_file_to(
self.log_size - TxnLogEntryMetadata::SIZE as u64
+ SDSSHeaderRaw::header_size() as u64,
)?;
self.log_file.fsync_all()?;
self.closed = true;
// good
return Ok(());
@ -284,16 +294,18 @@ pub struct TransactionLogWriter<LF, TA> {
/// the id of the **next** transaction
id: u64,
_m: PhantomData<TA>,
closed: bool,
}
impl<LF: RawFileIOInterface, TA: TransactionLogAdapter> TransactionLogWriter<LF, TA> {
pub fn new(mut log_file: SDSSFileIO<LF>, last_txn_id: u64) -> SDSSResult<Self> {
let l = log_file.file_length()?;
log_file.seek_ahead(l)?;
let log_size = log_file.file_length()?;
log_file.seek_from_start(log_size)?;
Ok(Self {
log_file,
id: last_txn_id,
_m: PhantomData,
closed: false,
})
}
pub fn append_event(&mut self, event: TA::TransactionEvent) -> SDSSResult<()> {
@ -311,10 +323,12 @@ impl<LF: RawFileIOInterface, TA: TransactionLogAdapter> TransactionLogWriter<LF,
Ok(())
}
pub fn close_log(mut self) -> SDSSResult<()> {
self.closed = true;
let id = self._incr_id() as u128;
self.log_file.fsynced_write(
&TxnLogEntryMetadata::new(id, EventSourceMarker::DRIVER_CLOSED, 0, 0).encoded(),
)
)?;
Ok(())
}
}
@ -325,3 +339,9 @@ impl<LF, TA> TransactionLogWriter<LF, TA> {
current
}
}
impl<LF, TA> Drop for TransactionLogWriter<LF, TA> {
fn drop(&mut self) {
assert!(self.closed, "log not closed");
}
}

Loading…
Cancel
Save