Use buffered IO and fix r/w through cache [skip ci]

Also ensure that any pending tasks are completed before exit
next
Sayan Nandan 11 months ago
parent b7fd815e9e
commit 09bd217998
No known key found for this signature in database
GPG Key ID: 42EEDF4AE9D96B54

@ -27,6 +27,7 @@
use {
super::util,
crate::engine::{
error::RuntimeResult,
storage::v1::{data_batch::DataBatchPersistDriver, RawFSInterface},
txn::gns::GNSTransactionDriverAnyFS,
},
@ -38,7 +39,7 @@ use {
pub(super) struct FractalGNSDriver<Fs: RawFSInterface> {
#[allow(unused)]
status: util::Status,
txn_driver: Mutex<GNSTransactionDriverAnyFS<Fs>>,
pub(super) txn_driver: Mutex<GNSTransactionDriverAnyFS<Fs>>,
}
impl<Fs: RawFSInterface> FractalGNSDriver<Fs> {
@ -72,6 +73,9 @@ impl<Fs: RawFSInterface> FractalModelDriver<Fs> {
pub fn batch_driver(&self) -> &Mutex<DataBatchPersistDriver<Fs>> {
&self.batch_driver
}
pub fn close(self) -> RuntimeResult<()> {
self.batch_driver.into_inner().close()
}
}
/// Model hooks

@ -24,14 +24,13 @@
*
*/
use crate::engine::storage::v1::LocalFS;
use {
super::ModelUniqueID,
crate::{
engine::{
core::model::{delta::DataDelta, Model},
data::uuid::Uuid,
storage::v1::LocalFS,
},
util::os,
},
@ -228,7 +227,7 @@ impl FractalMgr {
// services
impl FractalMgr {
const GENERAL_EXECUTOR_WINDOW: u64 = 5;
const GENERAL_EXECUTOR_WINDOW: u64 = 5 * 60;
/// The high priority executor service runs in the background to take care of high priority tasks and take any
/// appropriate action. It will exclusively own the high priority queue since it is the only broker that is
/// allowed to perform HP tasks
@ -239,57 +238,77 @@ impl FractalMgr {
mut sigterm: broadcast::Receiver<()>,
) {
loop {
let Task { threshold, task } = tokio::select! {
let task = tokio::select! {
task = receiver.recv() => {
match task {
Some(t) => t,
None => {
info!("exiting fhp executor service because all tasks closed");
info!("fhp: exiting executor service because all tasks closed");
break;
}
}
}
_ = sigterm.recv() => {
info!("exited fhp executor service");
info!("fhp: finishing pending tasks");
while let Ok(task) = receiver.try_recv() {
let global = global.clone();
tokio::task::spawn_blocking(move || self.hp_executor(global, task)).await.unwrap()
}
info!("fhp: exited executor service");
break;
}
};
// TODO(@ohsayan): check threshold and update hooks
match task {
CriticalTask::WriteBatch(model_id, observed_size) => {
let mdl_drivers = global.get_state().get_mdl_drivers().read();
let Some(mdl_driver) = mdl_drivers.get(&model_id) else {
// because we maximize throughput, the model driver may have been already removed but this task
// was way behind in the queue
continue;
};
let res = global._namespace().with_model(
(model_id.space(), model_id.model()),
|model| {
let global = global.clone();
tokio::task::spawn_blocking(move || self.hp_executor(global, task))
.await
.unwrap()
}
}
fn hp_executor(
&'static self,
global: super::Global,
Task { threshold, task }: Task<CriticalTask>,
) {
// TODO(@ohsayan): check threshold and update hooks
match task {
CriticalTask::WriteBatch(model_id, observed_size) => {
info!("fhp: {model_id} has reached cache capacity. writing to disk");
let mdl_drivers = global.get_state().get_mdl_drivers().read();
let Some(mdl_driver) = mdl_drivers.get(&model_id) else {
// because we maximize throughput, the model driver may have been already removed but this task
// was way behind in the queue
return;
};
let res =
global
._namespace()
.with_model((model_id.space(), model_id.model()), |model| {
if model.get_uuid() != model_id.uuid() {
// once again, throughput maximization will lead to, in extremely rare cases, this
// branch returning. but it is okay
return Ok(());
}
Self::try_write_model_data_batch(model, observed_size, mdl_driver)
},
);
match res {
Ok(()) => {}
Err(_) => {
log::error!(
"Error writing data batch for model {}. Retrying...",
model_id.uuid()
);
// enqueue again for retrying
self.hp_dispatcher
.send(Task::with_threshold(
CriticalTask::WriteBatch(model_id, observed_size),
threshold - 1,
))
.unwrap();
});
match res {
Ok(()) => {
if observed_size != 0 {
info!("fhp: completed maintenance task for {model_id}, synced={observed_size}")
}
}
Err(_) => {
log::error!(
"fhp: error writing data batch for model {}. Retrying...",
model_id.uuid()
);
// enqueue again for retrying
self.hp_dispatcher
.send(Task::with_threshold(
CriticalTask::WriteBatch(model_id, observed_size),
threshold - 1,
))
.unwrap();
}
}
}
}
@ -307,37 +326,21 @@ impl FractalMgr {
loop {
tokio::select! {
_ = sigterm.recv() => {
info!("exited flp executor service");
info!("flp: finishing any pending maintenance tasks");
let global = global.clone();
tokio::task::spawn_blocking(|| self.general_executor_model_maintenance(global)).await.unwrap();
info!("flp: exited executor service");
break;
},
_ = tokio::time::sleep(std::time::Duration::from_secs(Self::GENERAL_EXECUTOR_WINDOW)) => {
let mdl_drivers = global.get_state().get_mdl_drivers().read();
for (model_id, driver) in mdl_drivers.iter() {
let mut observed_len = 0;
let res = global._namespace().with_model((model_id.space(), model_id.model()), |model| {
if model.get_uuid() != model_id.uuid() {
// once again, throughput maximization will lead to, in extremely rare cases, this
// branch returning. but it is okay
return Ok(());
}
// mark that we're taking these deltas
observed_len = model.delta_state().__fractal_take_full_from_data_delta(super::FractalToken::new());
Self::try_write_model_data_batch(model, observed_len, driver)
});
match res {
Ok(()) => {}
Err(_) => {
// this failure is *not* good, so we want to promote this to a critical task
self.hp_dispatcher.send(Task::new(CriticalTask::WriteBatch(model_id.clone(), observed_len))).unwrap()
}
}
}
let global = global.clone();
tokio::task::spawn_blocking(|| self.general_executor_model_maintenance(global)).await.unwrap()
}
task = lpq.recv() => {
let Task { threshold, task } = match task {
Some(t) => t,
None => {
info!("exiting flp executor service because all tasks closed");
info!("flp: exiting executor service because all tasks closed");
break;
}
};
@ -362,6 +365,45 @@ impl FractalMgr {
}
}
}
fn general_executor_model_maintenance(&'static self, global: super::Global) {
let mdl_drivers = global.get_state().get_mdl_drivers().read();
for (model_id, driver) in mdl_drivers.iter() {
let mut observed_len = 0;
let res =
global
._namespace()
.with_model((model_id.space(), model_id.model()), |model| {
if model.get_uuid() != model_id.uuid() {
// once again, throughput maximization will lead to, in extremely rare cases, this
// branch returning. but it is okay
return Ok(());
}
// mark that we're taking these deltas
observed_len = model
.delta_state()
.__fractal_take_full_from_data_delta(super::FractalToken::new());
Self::try_write_model_data_batch(model, observed_len, driver)
});
match res {
Ok(()) => {
if observed_len != 0 {
info!(
"flp: completed maintenance task for {model_id}, synced={observed_len}"
)
}
}
Err(_) => {
// this failure is *not* good, so we want to promote this to a critical task
self.hp_dispatcher
.send(Task::new(CriticalTask::WriteBatch(
model_id.clone(),
observed_len,
)))
.unwrap()
}
}
}
}
}
// util

@ -36,7 +36,7 @@ use {
},
crate::engine::error::RuntimeResult,
parking_lot::{Mutex, RwLock},
std::{collections::HashMap, mem::MaybeUninit},
std::{collections::HashMap, fmt, mem::MaybeUninit},
tokio::sync::mpsc::unbounded_channel,
};
@ -240,15 +240,17 @@ impl Global {
}
pub unsafe fn unload_all(self) {
// TODO(@ohsayan): handle errors
self.namespace_txn_driver()
.lock()
.__journal_mut()
.__append_journal_close_and_close()
.unwrap();
for (_, driver) in self.get_state().mdl_driver.write().iter_mut() {
driver.batch_driver().lock().close().unwrap();
let GlobalState {
gns_driver,
mdl_driver,
..
} = Self::__gref_raw().assume_init_read();
let gns_driver = gns_driver.txn_driver.into_inner().into_inner();
let mdl_drivers = mdl_driver.into_inner();
gns_driver.close().unwrap();
for (_, driver) in mdl_drivers {
driver.close().unwrap();
}
core::ptr::drop_in_place(Self::__gref_raw().as_mut_ptr());
}
}
@ -302,6 +304,12 @@ pub struct ModelUniqueID {
uuid: Uuid,
}
impl fmt::Display for ModelUniqueID {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "model-{}@{}", self.model(), self.space())
}
}
impl ModelUniqueID {
/// Create a new unique model ID
pub fn new(space: &str, model: &str, uuid: Uuid) -> Self {

@ -146,9 +146,6 @@ impl<Fs: RawFSInterface> GlobalInstanceLike for TestGlobal<Fs> {
impl<Fs: RawFSInterface> Drop for TestGlobal<Fs> {
fn drop(&mut self) {
let mut txn_driver = self.txn_driver.lock();
txn_driver
.__journal_mut()
.__append_journal_close_and_close()
.unwrap();
txn_driver.__journal_mut().__close_mut().unwrap();
}
}

@ -108,6 +108,12 @@ impl CHTRuntimeLog {
}
}
impl Drop for CHTRuntimeLog {
fn drop(&mut self) {
let _ = self.data;
}
}
pub struct Node<C: Config> {
branch: [Atomic<Self>; <DefConfig as Config>::BRANCH_MX],
}

@ -197,11 +197,10 @@ pub async fn start(
tokio::select! {
_ = endpoint_handles.listen() => {}
_ = termsig => {
info!("received terminate signal");
info!("received terminate signal. waiting for inflight tasks to complete ...");
}
}
drop(signal);
info!("waiting for inflight tasks to complete ...");
endpoint_handles.finish().await;
info!("waiting for fractal engine to exit ...");
let (hp_handle, lp_handle) = tokio::join!(fractal_handle.hp_handle, fractal_handle.lp_handle);

@ -56,7 +56,7 @@ pub fn reinit<Fs: RawFSInterface>(
// restore
let mut restore_driver = DataBatchRestoreDriver::new(f)?;
restore_driver.read_data_batch_into_model(model)?;
DataBatchPersistDriver::new(restore_driver.into_file(), false)
DataBatchPersistDriver::new(restore_driver.into_file()?, false)
}
/// Create a new batch journal

@ -63,16 +63,12 @@ impl<Fs: RawFSInterface> DataBatchPersistDriver<Fs> {
file.fsynced_write(&[MARKER_BATCH_REOPEN])?;
}
Ok(Self {
f: SDSSFileTrackedWriter::new(file),
f: SDSSFileTrackedWriter::new(file)?,
})
}
pub fn close(&mut self) -> RuntimeResult<()> {
if self
.f
.inner_file()
.fsynced_write(&[MARKER_BATCH_CLOSED])
.is_ok()
{
pub fn close(self) -> RuntimeResult<()> {
let mut slf = self.f.into_inner_file()?;
if slf.fsynced_write(&[MARKER_BATCH_CLOSED]).is_ok() {
return Ok(());
} else {
return Err(StorageError::DataBatchCloseError.into());
@ -123,11 +119,9 @@ impl<Fs: RawFSInterface> DataBatchPersistDriver<Fs> {
self.encode_row_data(model, &irm, &row_data)?;
}
}
// fsync now; we're good to go
self.f.fsync_all()?;
i += 1;
}
return self.append_batch_summary(observed_len, inconsistent_reads);
return self.append_batch_summary_and_sync(observed_len, inconsistent_reads);
};
match exec() {
Ok(()) => Ok(()),
@ -155,26 +149,28 @@ impl<Fs: RawFSInterface> DataBatchPersistDriver<Fs> {
col_cnt: usize,
) -> RuntimeResult<()> {
self.f
.unfsynced_write(&[MARKER_ACTUAL_BATCH_EVENT, pk_tag.value_u8()])?;
.write_unfsynced(&[MARKER_ACTUAL_BATCH_EVENT, pk_tag.value_u8()])?;
let observed_len_bytes = observed_len.u64_bytes_le();
self.f.unfsynced_write(&observed_len_bytes)?;
self.f.write_unfsynced(&observed_len_bytes)?;
self.f
.unfsynced_write(&schema_version.value_u64().to_le_bytes())?;
self.f.unfsynced_write(&col_cnt.u64_bytes_le())?;
.write_unfsynced(&schema_version.value_u64().to_le_bytes())?;
self.f.write_unfsynced(&col_cnt.u64_bytes_le())?;
Ok(())
}
/// Append a summary of this batch
fn append_batch_summary(
/// Append a summary of this batch and most importantly, **sync everything to disk**
fn append_batch_summary_and_sync(
&mut self,
observed_len: usize,
inconsistent_reads: usize,
) -> RuntimeResult<()> {
// [0xFD][actual_commit][checksum]
self.f.unfsynced_write(&[MARKER_END_OF_BATCH])?;
self.f.write_unfsynced(&[MARKER_END_OF_BATCH])?;
let actual_commit = (observed_len - inconsistent_reads).u64_bytes_le();
self.f.unfsynced_write(&actual_commit)?;
self.f.write_unfsynced(&actual_commit)?;
let cs = self.f.reset_and_finish_checksum().to_le_bytes();
self.f.inner_file().fsynced_write(&cs)?;
self.f.untracked_write(&cs)?;
// IMPORTANT: now that all data has been written, we need to actually ensure that the writes pass through the cache
self.f.sync_writes()?;
Ok(())
}
/// Attempt to fix the batch journal
@ -184,8 +180,7 @@ impl<Fs: RawFSInterface> DataBatchPersistDriver<Fs> {
attempt to append 0xFF to the part of the file where a corruption likely occurred, marking
it recoverable
*/
let f = self.f.inner_file();
if f.fsynced_write(&[MARKER_RECOVERY_EVENT]).is_ok() {
if self.f.untracked_write(&[MARKER_RECOVERY_EVENT]).is_ok() {
return Ok(());
}
Err(StorageError::DataBatchRecoveryFailStageOne.into())
@ -203,7 +198,7 @@ impl<Fs: RawFSInterface> DataBatchPersistDriver<Fs> {
pk.read_uint()
}
.to_le_bytes();
buf.unfsynced_write(&data)?;
buf.write_unfsynced(&data)?;
}
TagUnique::Str | TagUnique::Bin => {
let slice = unsafe {
@ -211,8 +206,8 @@ impl<Fs: RawFSInterface> DataBatchPersistDriver<Fs> {
pk.read_bin()
};
let slice_l = slice.len().u64_bytes_le();
buf.unfsynced_write(&slice_l)?;
buf.unfsynced_write(slice)?;
buf.write_unfsynced(&slice_l)?;
buf.write_unfsynced(slice)?;
}
TagUnique::Illegal => unsafe {
// UNSAFE(@ohsayan): a pk can't be constructed with illegal
@ -225,7 +220,7 @@ impl<Fs: RawFSInterface> DataBatchPersistDriver<Fs> {
fn encode_cell(&mut self, value: &Datacell) -> RuntimeResult<()> {
let mut buf = vec![];
cell::encode(&mut buf, value);
self.f.unfsynced_write(&buf)?;
self.f.write_unfsynced(&buf)?;
Ok(())
}
/// Encode row data
@ -241,7 +236,7 @@ impl<Fs: RawFSInterface> DataBatchPersistDriver<Fs> {
self.encode_cell(cell)?;
}
None if field_name.as_ref() == mdl.p_key() => {}
None => self.f.unfsynced_write(&[0])?,
None => self.f.write_unfsynced(&[0])?,
}
}
Ok(())
@ -249,9 +244,9 @@ impl<Fs: RawFSInterface> DataBatchPersistDriver<Fs> {
/// Write the change type and txnid
fn write_batch_item_common_row_data(&mut self, delta: &DataDelta) -> RuntimeResult<()> {
let change_type = [delta.change().value_u8()];
self.f.unfsynced_write(&change_type)?;
self.f.write_unfsynced(&change_type)?;
let txn_id = delta.data_version().value_u64().to_le_bytes();
self.f.unfsynced_write(&txn_id)?;
self.f.write_unfsynced(&txn_id)?;
Ok(())
}
}

@ -114,7 +114,7 @@ impl<F: RawFSInterface> DataBatchRestoreDriver<F> {
f: SDSSFileTrackedReader::new(f)?,
})
}
pub fn into_file(self) -> SDSSFileIO<F> {
pub fn into_file(self) -> RuntimeResult<SDSSFileIO<F>> {
self.f.into_inner_file()
}
pub(in crate::engine::storage::v1) fn read_data_batch_into_model(
@ -312,11 +312,7 @@ impl<F: RawFSInterface> DataBatchRestoreDriver<F> {
let actual_checksum = self.f.__reset_checksum();
// find hardcoded checksum
let mut hardcoded_checksum = [0; sizeof!(u64)];
self.f
.inner_file()
.read_to_buffer(&mut hardcoded_checksum)?;
// move file cursor ahead
self.f.__cursor_ahead_by(sizeof!(u64));
self.f.untracked_read(&mut hardcoded_checksum)?;
if actual_checksum == u64::from_le_bytes(hardcoded_checksum) {
Ok(actual_commit)
} else {
@ -414,7 +410,9 @@ impl<F: RawFSInterface> DataBatchRestoreDriver<F> {
)))
}
fn attempt_recover_data_batch(&mut self) -> RuntimeResult<()> {
if let Ok(MARKER_RECOVERY_EVENT) = self.f.inner_file().read_byte() {
let mut buf = [0u8; 1];
self.f.untracked_read(&mut buf)?;
if let [MARKER_RECOVERY_EVENT] = buf {
return Ok(());
}
Err(StorageError::DataBatchRestoreCorruptedBatch.into())

@ -423,7 +423,7 @@ impl<Fs: RawFSInterface, TA> JournalWriter<Fs, TA> {
&JournalEntryMetadata::new(id, EventSourceMarker::DRIVER_REOPENED, 0, 0).encoded(),
)
}
pub fn __append_journal_close_and_close(&mut self) -> RuntimeResult<()> {
pub fn __close_mut(&mut self) -> RuntimeResult<()> {
self.closed = true;
let id = self._incr_id() as u128;
self.log_file.fsynced_write(
@ -431,9 +431,8 @@ impl<Fs: RawFSInterface, TA> JournalWriter<Fs, TA> {
)?;
Ok(())
}
#[cfg(test)]
pub fn append_journal_close_and_close(mut self) -> RuntimeResult<()> {
self.__append_journal_close_and_close()
pub fn close(mut self) -> RuntimeResult<()> {
self.__close_mut()
}
}

@ -70,26 +70,36 @@ impl SEInitState {
&gns,
)
}?;
if is_new {
std::fs::create_dir(DATA_DIR).inherit_set_dmsg("creating data directory")?;
}
let mut model_drivers = ModelDrivers::new();
if !is_new {
// this is an existing instance, so read in all data
for (space_name, space) in gns.spaces().read().iter() {
let space_uuid = space.get_uuid();
for (model_name, model) in space.models().read().iter() {
let path =
Self::model_path(space_name, space_uuid, model_name, model.get_uuid());
let persist_driver = batch_jrnl::reinit(&path, model).inherit_set_dmsg(
format!("failed to restore model data from journal in `{path}`"),
)?;
let _ = model_drivers.insert(
ModelUniqueID::new(space_name, model_name, model.get_uuid()),
FractalModelDriver::init(persist_driver),
);
let mut driver_guard = || {
if is_new {
std::fs::create_dir(DATA_DIR).inherit_set_dmsg("creating data directory")?;
}
if !is_new {
// this is an existing instance, so read in all data
for (space_name, space) in gns.spaces().read().iter() {
let space_uuid = space.get_uuid();
for (model_name, model) in space.models().read().iter() {
let path =
Self::model_path(space_name, space_uuid, model_name, model.get_uuid());
let persist_driver = batch_jrnl::reinit(&path, model).inherit_set_dmsg(
format!("failed to restore model data from journal in `{path}`"),
)?;
let _ = model_drivers.insert(
ModelUniqueID::new(space_name, model_name, model.get_uuid()),
FractalModelDriver::init(persist_driver),
);
}
}
}
RuntimeResult::Ok(())
};
if let Err(e) = driver_guard() {
gns_txn_driver.close().unwrap();
for (_, driver) in model_drivers {
driver.close().unwrap();
}
return Err(e);
}
Ok(SEInitState::new(
GNSTransactionDriverAnyFS::new(gns_txn_driver),

@ -28,8 +28,9 @@ use {
crate::engine::{
error::RuntimeResult,
storage::v1::rw::{
FileOpen, RawFSInterface, RawFileInterface, RawFileInterfaceExt, RawFileInterfaceRead,
RawFileInterfaceWrite, RawFileInterfaceWriteExt,
FileOpen, RawFSInterface, RawFileInterface, RawFileInterfaceBufferedWriter,
RawFileInterfaceExt, RawFileInterfaceRead, RawFileInterfaceWrite,
RawFileInterfaceWriteExt,
},
sync::cell::Lazy,
},
@ -119,7 +120,7 @@ impl RawFSInterface for VirtualFS {
c.fw_write_all(&data)?;
}
FileOpen::Existing(mut e) => {
e.fw_truncate_to(0)?;
e.fwext_truncate_to(0)?;
e.fw_write_all(&data)?;
}
}
@ -386,16 +387,24 @@ fn with_file<T>(fpath: &str, mut f: impl FnMut(&VFile) -> RuntimeResult<T>) -> R
}
impl RawFileInterface for VFileDescriptor {
type Reader = Self;
type Writer = Self;
fn into_buffered_reader(self) -> RuntimeResult<Self::Reader> {
type BufReader = Self;
type BufWriter = Self;
fn into_buffered_reader(self) -> RuntimeResult<Self::BufReader> {
Ok(self)
}
fn into_buffered_writer(self) -> RuntimeResult<Self::Writer> {
fn downgrade_reader(r: Self::BufReader) -> RuntimeResult<Self> {
Ok(r)
}
fn into_buffered_writer(self) -> RuntimeResult<Self::BufWriter> {
Ok(self)
}
fn downgrade_writer(w: Self::BufWriter) -> RuntimeResult<Self> {
Ok(w)
}
}
impl RawFileInterfaceBufferedWriter for VFileDescriptor {}
impl RawFileInterfaceRead for VFileDescriptor {
fn fr_read_exact(&mut self, buf: &mut [u8]) -> RuntimeResult<()> {
with_file_mut(&self.0, |file| {
@ -434,10 +443,10 @@ impl RawFileInterfaceWrite for VFileDescriptor {
}
impl RawFileInterfaceWriteExt for VFileDescriptor {
fn fw_fsync_all(&mut self) -> RuntimeResult<()> {
fn fwext_fsync_all(&mut self) -> RuntimeResult<()> {
with_file(&self.0, |_| Ok(()))
}
fn fw_truncate_to(&mut self, to: u64) -> RuntimeResult<()> {
fn fwext_truncate_to(&mut self, to: u64) -> RuntimeResult<()> {
with_file_mut(&self.0, |file| {
if !file.write {
return Err(
@ -532,10 +541,10 @@ impl RawFileInterfaceWrite for NullFile {
}
}
impl RawFileInterfaceWriteExt for NullFile {
fn fw_fsync_all(&mut self) -> RuntimeResult<()> {
fn fwext_fsync_all(&mut self) -> RuntimeResult<()> {
Ok(())
}
fn fw_truncate_to(&mut self, _: u64) -> RuntimeResult<()> {
fn fwext_truncate_to(&mut self, _: u64) -> RuntimeResult<()> {
Ok(())
}
}
@ -553,12 +562,20 @@ impl RawFileInterfaceExt for NullFile {
}
}
impl RawFileInterface for NullFile {
type Reader = Self;
type Writer = Self;
fn into_buffered_reader(self) -> RuntimeResult<Self::Reader> {
type BufReader = Self;
type BufWriter = Self;
fn into_buffered_reader(self) -> RuntimeResult<Self::BufReader> {
Ok(self)
}
fn into_buffered_writer(self) -> RuntimeResult<Self::Writer> {
fn downgrade_reader(r: Self::BufReader) -> RuntimeResult<Self> {
Ok(r)
}
fn into_buffered_writer(self) -> RuntimeResult<Self::BufWriter> {
Ok(self)
}
fn downgrade_writer(w: Self::BufWriter) -> RuntimeResult<Self> {
Ok(w)
}
}
impl RawFileInterfaceBufferedWriter for NullFile {}

@ -101,17 +101,28 @@ pub trait RawFSInterface {
}
/// A file (well, probably) that can be used for RW operations along with advanced write and extended operations (such as seeking)
pub trait RawFileInterface
pub trait RawFileInterface: Sized
where
Self: RawFileInterfaceRead
+ RawFileInterfaceWrite
+ RawFileInterfaceWriteExt
+ RawFileInterfaceExt,
{
type Reader: RawFileInterfaceRead + RawFileInterfaceExt;
type Writer: RawFileInterfaceWrite + RawFileInterfaceExt;
fn into_buffered_reader(self) -> RuntimeResult<Self::Reader>;
fn into_buffered_writer(self) -> RuntimeResult<Self::Writer>;
type BufReader: RawFileInterfaceBufferedReader;
type BufWriter: RawFileInterfaceBufferedWriter;
fn into_buffered_reader(self) -> RuntimeResult<Self::BufReader>;
fn downgrade_reader(r: Self::BufReader) -> RuntimeResult<Self>;
fn into_buffered_writer(self) -> RuntimeResult<Self::BufWriter>;
fn downgrade_writer(w: Self::BufWriter) -> RuntimeResult<Self>;
}
pub trait RawFileInterfaceBufferedReader: RawFileInterfaceRead + RawFileInterfaceExt {}
impl<R: RawFileInterfaceRead + RawFileInterfaceExt> RawFileInterfaceBufferedReader for R {}
pub trait RawFileInterfaceBufferedWriter: RawFileInterfaceWrite + RawFileInterfaceExt {
fn sync_write_cache(&mut self) -> RuntimeResult<()> {
Ok(())
}
}
/// A file interface that supports read operations
@ -138,8 +149,8 @@ impl<W: Write> RawFileInterfaceWrite for W {
/// A file interface that supports advanced write operations
pub trait RawFileInterfaceWriteExt {
fn fw_fsync_all(&mut self) -> RuntimeResult<()>;
fn fw_truncate_to(&mut self, to: u64) -> RuntimeResult<()>;
fn fwext_fsync_all(&mut self) -> RuntimeResult<()>;
fn fwext_truncate_to(&mut self, to: u64) -> RuntimeResult<()>;
}
/// A file interface that supports advanced file operations
@ -206,21 +217,37 @@ impl RawFSInterface for LocalFS {
}
impl RawFileInterface for File {
type Reader = BufReader<Self>;
type Writer = BufWriter<Self>;
fn into_buffered_reader(self) -> RuntimeResult<Self::Reader> {
type BufReader = BufReader<Self>;
type BufWriter = BufWriter<Self>;
fn into_buffered_reader(self) -> RuntimeResult<Self::BufReader> {
Ok(BufReader::new(self))
}
fn into_buffered_writer(self) -> RuntimeResult<Self::Writer> {
fn downgrade_reader(r: Self::BufReader) -> RuntimeResult<Self> {
Ok(r.into_inner())
}
fn into_buffered_writer(self) -> RuntimeResult<Self::BufWriter> {
Ok(BufWriter::new(self))
}
fn downgrade_writer(mut w: Self::BufWriter) -> RuntimeResult<Self> {
w.flush()?; // TODO(@ohsayan): handle rare case where writer does panic
let (w, _) = w.into_parts();
Ok(w)
}
}
impl RawFileInterfaceBufferedWriter for BufWriter<File> {
fn sync_write_cache(&mut self) -> RuntimeResult<()> {
self.flush()?;
self.get_mut().sync_all()?;
Ok(())
}
}
impl RawFileInterfaceWriteExt for File {
fn fw_fsync_all(&mut self) -> RuntimeResult<()> {
fn fwext_fsync_all(&mut self) -> RuntimeResult<()> {
cvt(self.sync_all())
}
fn fw_truncate_to(&mut self, to: u64) -> RuntimeResult<()> {
fn fwext_truncate_to(&mut self, to: u64) -> RuntimeResult<()> {
cvt(self.set_len(to))
}
}
@ -270,40 +297,44 @@ impl<F: LocalFSFile> RawFileInterfaceExt for F {
}
pub struct SDSSFileTrackedWriter<Fs: RawFSInterface> {
f: SDSSFileIO<Fs>,
f: SDSSFileIO<Fs, <Fs::File as RawFileInterface>::BufWriter>,
cs: SCrc,
}
impl<Fs: RawFSInterface> SDSSFileTrackedWriter<Fs> {
pub fn new(f: SDSSFileIO<Fs>) -> Self {
Self { f, cs: SCrc::new() }
pub fn new(f: SDSSFileIO<Fs>) -> RuntimeResult<Self> {
Ok(Self {
f: f.into_buffered_sdss_writer()?,
cs: SCrc::new(),
})
}
pub fn unfsynced_write(&mut self, block: &[u8]) -> RuntimeResult<()> {
pub fn write_unfsynced(&mut self, block: &[u8]) -> RuntimeResult<()> {
self.untracked_write(block)
.map(|_| self.cs.recompute_with_new_var_block(block))
}
pub fn untracked_write(&mut self, block: &[u8]) -> RuntimeResult<()> {
match self.f.unfsynced_write(block) {
Ok(()) => {
self.cs.recompute_with_new_var_block(block);
Ok(())
}
Ok(()) => Ok(()),
e => e,
}
}
pub fn fsync_all(&mut self) -> RuntimeResult<()> {
self.f.fsync_all()
pub fn sync_writes(&mut self) -> RuntimeResult<()> {
self.f.f.sync_write_cache()
}
pub fn reset_and_finish_checksum(&mut self) -> u64 {
let mut scrc = SCrc::new();
core::mem::swap(&mut self.cs, &mut scrc);
scrc.finish()
}
pub fn inner_file(&mut self) -> &mut SDSSFileIO<Fs> {
&mut self.f
pub fn into_inner_file(self) -> RuntimeResult<SDSSFileIO<Fs>> {
self.f.downgrade_writer()
}
}
/// [`SDSSFileLenTracked`] simply maintains application level length and checksum tracking to avoid frequent syscalls because we
/// do not expect (even though it's very possible) users to randomly modify file lengths while we're reading them
pub struct SDSSFileTrackedReader<Fs: RawFSInterface> {
f: SDSSFileIO<Fs>,
f: SDSSFileIO<Fs, <Fs::File as RawFileInterface>::BufReader>,
len: u64,
pos: u64,
cs: SCrc,
@ -314,6 +345,7 @@ impl<Fs: RawFSInterface> SDSSFileTrackedReader<Fs> {
pub fn new(mut f: SDSSFileIO<Fs>) -> RuntimeResult<Self> {
let len = f.file_length()?;
let pos = f.retrieve_cursor()?;
let f = f.into_buffered_sdss_reader()?;
Ok(Self {
f,
len,
@ -331,11 +363,23 @@ impl<Fs: RawFSInterface> SDSSFileTrackedReader<Fs> {
self.remaining() >= v
}
pub fn read_into_buffer(&mut self, buf: &mut [u8]) -> RuntimeResult<()> {
self.untracked_read(buf)
.map(|_| self.cs.recompute_with_new_var_block(buf))
}
pub fn read_byte(&mut self) -> RuntimeResult<u8> {
let mut buf = [0u8; 1];
self.read_into_buffer(&mut buf).map(|_| buf[0])
}
pub fn __reset_checksum(&mut self) -> u64 {
let mut crc = SCrc::new();
core::mem::swap(&mut crc, &mut self.cs);
crc.finish()
}
pub fn untracked_read(&mut self, buf: &mut [u8]) -> RuntimeResult<()> {
if self.remaining() >= buf.len() as u64 {
match self.f.read_to_buffer(buf) {
Ok(()) => {
self.pos += buf.len() as u64;
self.cs.recompute_with_new_var_block(buf);
Ok(())
}
Err(e) => return Err(e),
@ -344,23 +388,8 @@ impl<Fs: RawFSInterface> SDSSFileTrackedReader<Fs> {
Err(SysIOError::from(std::io::ErrorKind::InvalidInput).into())
}
}
pub fn read_byte(&mut self) -> RuntimeResult<u8> {
let mut buf = [0u8; 1];
self.read_into_buffer(&mut buf).map(|_| buf[0])
}
pub fn __reset_checksum(&mut self) -> u64 {
let mut crc = SCrc::new();
core::mem::swap(&mut crc, &mut self.cs);
crc.finish()
}
pub fn inner_file(&mut self) -> &mut SDSSFileIO<Fs> {
&mut self.f
}
pub fn into_inner_file(self) -> SDSSFileIO<Fs> {
self.f
}
pub fn __cursor_ahead_by(&mut self, sizeof: usize) {
self.pos += sizeof as u64;
pub fn into_inner_file(self) -> RuntimeResult<SDSSFileIO<Fs>> {
self.f.downgrade_reader()
}
pub fn read_block<const N: usize>(&mut self) -> RuntimeResult<[u8; N]> {
if !self.has_left(N as _) {
@ -376,8 +405,8 @@ impl<Fs: RawFSInterface> SDSSFileTrackedReader<Fs> {
}
#[derive(Debug)]
pub struct SDSSFileIO<Fs: RawFSInterface> {
f: Fs::File,
pub struct SDSSFileIO<Fs: RawFSInterface, F = <Fs as RawFSInterface>::File> {
f: F,
_fs: PhantomData<Fs>,
}
@ -408,42 +437,60 @@ impl<Fs: RawFSInterface> SDSSFileIO<Fs> {
}
}
}
pub fn into_buffered_sdss_reader(
self,
) -> RuntimeResult<SDSSFileIO<Fs, <Fs::File as RawFileInterface>::BufReader>> {
self.f.into_buffered_reader().map(SDSSFileIO::_new)
}
pub fn into_buffered_sdss_writer(
self,
) -> RuntimeResult<SDSSFileIO<Fs, <Fs::File as RawFileInterface>::BufWriter>> {
self.f.into_buffered_writer().map(SDSSFileIO::_new)
}
}
impl<Fs: RawFSInterface> SDSSFileIO<Fs> {
pub fn _new(f: Fs::File) -> Self {
impl<Fs: RawFSInterface> SDSSFileIO<Fs, <Fs::File as RawFileInterface>::BufReader> {
pub fn downgrade_reader(self) -> RuntimeResult<SDSSFileIO<Fs, Fs::File>> {
let me = <Fs::File as RawFileInterface>::downgrade_reader(self.f)?;
Ok(SDSSFileIO::_new(me))
}
}
impl<Fs: RawFSInterface> SDSSFileIO<Fs, <Fs::File as RawFileInterface>::BufWriter> {
pub fn downgrade_writer(self) -> RuntimeResult<SDSSFileIO<Fs>> {
let me = <Fs::File as RawFileInterface>::downgrade_writer(self.f)?;
Ok(SDSSFileIO::_new(me))
}
}
impl<Fs: RawFSInterface, F> SDSSFileIO<Fs, F> {
pub fn _new(f: F) -> Self {
Self {
f,
_fs: PhantomData,
}
}
pub fn unfsynced_write(&mut self, data: &[u8]) -> RuntimeResult<()> {
self.f.fw_write_all(data)
}
pub fn fsync_all(&mut self) -> RuntimeResult<()> {
self.f.fw_fsync_all()?;
Ok(())
}
pub fn fsynced_write(&mut self, data: &[u8]) -> RuntimeResult<()> {
self.f.fw_write_all(data)?;
self.f.fw_fsync_all()
}
}
impl<Fs: RawFSInterface, F: RawFileInterfaceRead> SDSSFileIO<Fs, F> {
pub fn read_to_buffer(&mut self, buffer: &mut [u8]) -> RuntimeResult<()> {
self.f.fr_read_exact(buffer)
}
}
impl<Fs: RawFSInterface, F: RawFileInterfaceExt> SDSSFileIO<Fs, F> {
pub fn retrieve_cursor(&mut self) -> RuntimeResult<u64> {
self.f.fext_cursor()
}
pub fn file_length(&self) -> RuntimeResult<u64> {
self.f.fext_file_length()
}
pub fn seek_from_start(&mut self, by: u64) -> RuntimeResult<()> {
self.f.fext_seek_ahead_from_start_by(by)
}
pub fn retrieve_cursor(&mut self) -> RuntimeResult<u64> {
self.f.fext_cursor()
}
pub fn read_byte(&mut self) -> RuntimeResult<u8> {
let mut r = [0; 1];
self.read_to_buffer(&mut r).map(|_| r[0])
}
}
impl<Fs: RawFSInterface, F: RawFileInterfaceRead + RawFileInterfaceExt> SDSSFileIO<Fs, F> {
pub fn load_remaining_into_buffer(&mut self) -> RuntimeResult<Vec<u8>> {
let len = self.file_length()? - self.retrieve_cursor()?;
let mut buf = vec![0; len as usize];
@ -451,3 +498,20 @@ impl<Fs: RawFSInterface> SDSSFileIO<Fs> {
Ok(buf)
}
}
impl<Fs: RawFSInterface, F: RawFileInterfaceWrite> SDSSFileIO<Fs, F> {
pub fn unfsynced_write(&mut self, data: &[u8]) -> RuntimeResult<()> {
self.f.fw_write_all(data)
}
}
impl<Fs: RawFSInterface, F: RawFileInterfaceWrite + RawFileInterfaceWriteExt> SDSSFileIO<Fs, F> {
pub fn fsync_all(&mut self) -> RuntimeResult<()> {
self.f.fwext_fsync_all()?;
Ok(())
}
pub fn fsynced_write(&mut self, data: &[u8]) -> RuntimeResult<()> {
self.f.fw_write_all(data)?;
self.f.fwext_fsync_all()
}
}

@ -67,7 +67,7 @@ fn open_batch_data(fpath: &str, mdl: &Model) -> DataBatchPersistDriver<VirtualFS
FileOpen::Existing((f, _header)) => {
let mut dbr = DataBatchRestoreDriver::new(f).unwrap();
dbr.read_data_batch_into_model(mdl).unwrap();
DataBatchPersistDriver::new(dbr.into_file(), false)
DataBatchPersistDriver::new(dbr.into_file().unwrap(), false)
}
}
.unwrap()
@ -144,7 +144,7 @@ fn empty_multi_open_reopen() {
),
);
for _ in 0..100 {
let mut writer = open_batch_data("empty_multi_open_reopen.db-btlog", &mdl);
let writer = open_batch_data("empty_multi_open_reopen.db-btlog", &mdl);
writer.close().unwrap();
}
}

@ -142,7 +142,7 @@ fn first_boot_second_readonly() {
let mut log = open_log("testtxn.log", &db1)?;
db1.txn_set(0, 20, &mut log)?;
db1.txn_set(9, 21, &mut log)?;
log.append_journal_close_and_close()
log.close()
};
x().unwrap();
// backup original data
@ -151,7 +151,7 @@ fn first_boot_second_readonly() {
let empty_db2 = Database::new();
open_log("testtxn.log", &empty_db2)
.unwrap()
.append_journal_close_and_close()
.close()
.unwrap();
assert_eq!(original_data, empty_db2.copy_data());
}
@ -164,7 +164,7 @@ fn oneboot_mod_twoboot_mod_thirdboot_read() {
for i in 0..10 {
db1.txn_set(i, 1, &mut log)?;
}
log.append_journal_close_and_close()
log.close()
};
x().unwrap();
let bkp_db1 = db1.copy_data();
@ -178,7 +178,7 @@ fn oneboot_mod_twoboot_mod_thirdboot_read() {
let current_val = db2.data.borrow()[i];
db2.txn_set(i, current_val + i as u8, &mut log)?;
}
log.append_journal_close_and_close()
log.close()
};
x().unwrap();
let bkp_db2 = db2.copy_data();
@ -186,7 +186,7 @@ fn oneboot_mod_twoboot_mod_thirdboot_read() {
// third boot
let db3 = Database::new();
let log = open_log("duatxn.db-tlog", &db3).unwrap();
log.append_journal_close_and_close().unwrap();
log.close().unwrap();
assert_eq!(bkp_db2, db3.copy_data());
assert_eq!(
db3.copy_data(),

@ -65,6 +65,9 @@ impl<Fs: RawFSInterface> GNSTransactionDriverAnyFS<Fs> {
pub fn new(journal: JournalWriter<Fs, GNSAdapter>) -> Self {
Self { journal }
}
pub fn into_inner(self) -> JournalWriter<Fs, GNSAdapter> {
self.journal
}
pub fn __journal_mut(&mut self) -> &mut JournalWriter<Fs, GNSAdapter> {
&mut self.journal
}

@ -83,6 +83,7 @@ fn main() {
match runtime.block_on(async move { engine::start(config, global).await }) {
Ok(()) => {
engine::finish(g);
info!("finished all pending tasks. Goodbye!");
}
Err(e) => {
error!("{e}");

Loading…
Cancel
Save