diff --git a/server/src/engine/fractal/drivers.rs b/server/src/engine/fractal/drivers.rs index 2a41e2e8..6a633660 100644 --- a/server/src/engine/fractal/drivers.rs +++ b/server/src/engine/fractal/drivers.rs @@ -27,6 +27,7 @@ use { super::util, crate::engine::{ + error::RuntimeResult, storage::v1::{data_batch::DataBatchPersistDriver, RawFSInterface}, txn::gns::GNSTransactionDriverAnyFS, }, @@ -38,7 +39,7 @@ use { pub(super) struct FractalGNSDriver { #[allow(unused)] status: util::Status, - txn_driver: Mutex>, + pub(super) txn_driver: Mutex>, } impl FractalGNSDriver { @@ -72,6 +73,9 @@ impl FractalModelDriver { pub fn batch_driver(&self) -> &Mutex> { &self.batch_driver } + pub fn close(self) -> RuntimeResult<()> { + self.batch_driver.into_inner().close() + } } /// Model hooks diff --git a/server/src/engine/fractal/mgr.rs b/server/src/engine/fractal/mgr.rs index 99375c2c..0307c549 100644 --- a/server/src/engine/fractal/mgr.rs +++ b/server/src/engine/fractal/mgr.rs @@ -24,14 +24,13 @@ * */ -use crate::engine::storage::v1::LocalFS; - use { super::ModelUniqueID, crate::{ engine::{ core::model::{delta::DataDelta, Model}, data::uuid::Uuid, + storage::v1::LocalFS, }, util::os, }, @@ -228,7 +227,7 @@ impl FractalMgr { // services impl FractalMgr { - const GENERAL_EXECUTOR_WINDOW: u64 = 5; + const GENERAL_EXECUTOR_WINDOW: u64 = 5 * 60; /// The high priority executor service runs in the background to take care of high priority tasks and take any /// appropriate action. It will exclusively own the high priority queue since it is the only broker that is /// allowed to perform HP tasks @@ -239,57 +238,77 @@ impl FractalMgr { mut sigterm: broadcast::Receiver<()>, ) { loop { - let Task { threshold, task } = tokio::select! { + let task = tokio::select! { task = receiver.recv() => { match task { Some(t) => t, None => { - info!("exiting fhp executor service because all tasks closed"); + info!("fhp: exiting executor service because all tasks closed"); break; } } } _ = sigterm.recv() => { - info!("exited fhp executor service"); + info!("fhp: finishing pending tasks"); + while let Ok(task) = receiver.try_recv() { + let global = global.clone(); + tokio::task::spawn_blocking(move || self.hp_executor(global, task)).await.unwrap() + } + info!("fhp: exited executor service"); break; } }; - // TODO(@ohsayan): check threshold and update hooks - match task { - CriticalTask::WriteBatch(model_id, observed_size) => { - let mdl_drivers = global.get_state().get_mdl_drivers().read(); - let Some(mdl_driver) = mdl_drivers.get(&model_id) else { - // because we maximize throughput, the model driver may have been already removed but this task - // was way behind in the queue - continue; - }; - let res = global._namespace().with_model( - (model_id.space(), model_id.model()), - |model| { + let global = global.clone(); + tokio::task::spawn_blocking(move || self.hp_executor(global, task)) + .await + .unwrap() + } + } + fn hp_executor( + &'static self, + global: super::Global, + Task { threshold, task }: Task, + ) { + // TODO(@ohsayan): check threshold and update hooks + match task { + CriticalTask::WriteBatch(model_id, observed_size) => { + info!("fhp: {model_id} has reached cache capacity. writing to disk"); + let mdl_drivers = global.get_state().get_mdl_drivers().read(); + let Some(mdl_driver) = mdl_drivers.get(&model_id) else { + // because we maximize throughput, the model driver may have been already removed but this task + // was way behind in the queue + return; + }; + let res = + global + ._namespace() + .with_model((model_id.space(), model_id.model()), |model| { if model.get_uuid() != model_id.uuid() { // once again, throughput maximization will lead to, in extremely rare cases, this // branch returning. but it is okay return Ok(()); } Self::try_write_model_data_batch(model, observed_size, mdl_driver) - }, - ); - match res { - Ok(()) => {} - Err(_) => { - log::error!( - "Error writing data batch for model {}. Retrying...", - model_id.uuid() - ); - // enqueue again for retrying - self.hp_dispatcher - .send(Task::with_threshold( - CriticalTask::WriteBatch(model_id, observed_size), - threshold - 1, - )) - .unwrap(); + }); + match res { + Ok(()) => { + if observed_size != 0 { + info!("fhp: completed maintenance task for {model_id}, synced={observed_size}") } } + Err(_) => { + log::error!( + "fhp: error writing data batch for model {}. Retrying...", + model_id.uuid() + ); + // enqueue again for retrying + self.hp_dispatcher + .send(Task::with_threshold( + CriticalTask::WriteBatch(model_id, observed_size), + threshold - 1, + )) + .unwrap(); + } } } } @@ -307,37 +326,21 @@ impl FractalMgr { loop { tokio::select! { _ = sigterm.recv() => { - info!("exited flp executor service"); + info!("flp: finishing any pending maintenance tasks"); + let global = global.clone(); + tokio::task::spawn_blocking(|| self.general_executor_model_maintenance(global)).await.unwrap(); + info!("flp: exited executor service"); break; }, _ = tokio::time::sleep(std::time::Duration::from_secs(Self::GENERAL_EXECUTOR_WINDOW)) => { - let mdl_drivers = global.get_state().get_mdl_drivers().read(); - for (model_id, driver) in mdl_drivers.iter() { - let mut observed_len = 0; - let res = global._namespace().with_model((model_id.space(), model_id.model()), |model| { - if model.get_uuid() != model_id.uuid() { - // once again, throughput maximization will lead to, in extremely rare cases, this - // branch returning. but it is okay - return Ok(()); - } - // mark that we're taking these deltas - observed_len = model.delta_state().__fractal_take_full_from_data_delta(super::FractalToken::new()); - Self::try_write_model_data_batch(model, observed_len, driver) - }); - match res { - Ok(()) => {} - Err(_) => { - // this failure is *not* good, so we want to promote this to a critical task - self.hp_dispatcher.send(Task::new(CriticalTask::WriteBatch(model_id.clone(), observed_len))).unwrap() - } - } - } + let global = global.clone(); + tokio::task::spawn_blocking(|| self.general_executor_model_maintenance(global)).await.unwrap() } task = lpq.recv() => { let Task { threshold, task } = match task { Some(t) => t, None => { - info!("exiting flp executor service because all tasks closed"); + info!("flp: exiting executor service because all tasks closed"); break; } }; @@ -362,6 +365,45 @@ impl FractalMgr { } } } + fn general_executor_model_maintenance(&'static self, global: super::Global) { + let mdl_drivers = global.get_state().get_mdl_drivers().read(); + for (model_id, driver) in mdl_drivers.iter() { + let mut observed_len = 0; + let res = + global + ._namespace() + .with_model((model_id.space(), model_id.model()), |model| { + if model.get_uuid() != model_id.uuid() { + // once again, throughput maximization will lead to, in extremely rare cases, this + // branch returning. but it is okay + return Ok(()); + } + // mark that we're taking these deltas + observed_len = model + .delta_state() + .__fractal_take_full_from_data_delta(super::FractalToken::new()); + Self::try_write_model_data_batch(model, observed_len, driver) + }); + match res { + Ok(()) => { + if observed_len != 0 { + info!( + "flp: completed maintenance task for {model_id}, synced={observed_len}" + ) + } + } + Err(_) => { + // this failure is *not* good, so we want to promote this to a critical task + self.hp_dispatcher + .send(Task::new(CriticalTask::WriteBatch( + model_id.clone(), + observed_len, + ))) + .unwrap() + } + } + } + } } // util diff --git a/server/src/engine/fractal/mod.rs b/server/src/engine/fractal/mod.rs index c23854a1..6cc37f0f 100644 --- a/server/src/engine/fractal/mod.rs +++ b/server/src/engine/fractal/mod.rs @@ -36,7 +36,7 @@ use { }, crate::engine::error::RuntimeResult, parking_lot::{Mutex, RwLock}, - std::{collections::HashMap, mem::MaybeUninit}, + std::{collections::HashMap, fmt, mem::MaybeUninit}, tokio::sync::mpsc::unbounded_channel, }; @@ -240,15 +240,17 @@ impl Global { } pub unsafe fn unload_all(self) { // TODO(@ohsayan): handle errors - self.namespace_txn_driver() - .lock() - .__journal_mut() - .__append_journal_close_and_close() - .unwrap(); - for (_, driver) in self.get_state().mdl_driver.write().iter_mut() { - driver.batch_driver().lock().close().unwrap(); + let GlobalState { + gns_driver, + mdl_driver, + .. + } = Self::__gref_raw().assume_init_read(); + let gns_driver = gns_driver.txn_driver.into_inner().into_inner(); + let mdl_drivers = mdl_driver.into_inner(); + gns_driver.close().unwrap(); + for (_, driver) in mdl_drivers { + driver.close().unwrap(); } - core::ptr::drop_in_place(Self::__gref_raw().as_mut_ptr()); } } @@ -302,6 +304,12 @@ pub struct ModelUniqueID { uuid: Uuid, } +impl fmt::Display for ModelUniqueID { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "model-{}@{}", self.model(), self.space()) + } +} + impl ModelUniqueID { /// Create a new unique model ID pub fn new(space: &str, model: &str, uuid: Uuid) -> Self { diff --git a/server/src/engine/fractal/test_utils.rs b/server/src/engine/fractal/test_utils.rs index 152e057a..d3211ee9 100644 --- a/server/src/engine/fractal/test_utils.rs +++ b/server/src/engine/fractal/test_utils.rs @@ -146,9 +146,6 @@ impl GlobalInstanceLike for TestGlobal { impl Drop for TestGlobal { fn drop(&mut self) { let mut txn_driver = self.txn_driver.lock(); - txn_driver - .__journal_mut() - .__append_journal_close_and_close() - .unwrap(); + txn_driver.__journal_mut().__close_mut().unwrap(); } } diff --git a/server/src/engine/idx/mtchm/mod.rs b/server/src/engine/idx/mtchm/mod.rs index e1e8bc6b..da78271b 100644 --- a/server/src/engine/idx/mtchm/mod.rs +++ b/server/src/engine/idx/mtchm/mod.rs @@ -108,6 +108,12 @@ impl CHTRuntimeLog { } } +impl Drop for CHTRuntimeLog { + fn drop(&mut self) { + let _ = self.data; + } +} + pub struct Node { branch: [Atomic; ::BRANCH_MX], } diff --git a/server/src/engine/mod.rs b/server/src/engine/mod.rs index 30b0130a..8e367b2e 100644 --- a/server/src/engine/mod.rs +++ b/server/src/engine/mod.rs @@ -197,11 +197,10 @@ pub async fn start( tokio::select! { _ = endpoint_handles.listen() => {} _ = termsig => { - info!("received terminate signal"); + info!("received terminate signal. waiting for inflight tasks to complete ..."); } } drop(signal); - info!("waiting for inflight tasks to complete ..."); endpoint_handles.finish().await; info!("waiting for fractal engine to exit ..."); let (hp_handle, lp_handle) = tokio::join!(fractal_handle.hp_handle, fractal_handle.lp_handle); diff --git a/server/src/engine/storage/v1/batch_jrnl/mod.rs b/server/src/engine/storage/v1/batch_jrnl/mod.rs index 36f147b5..2013acbf 100644 --- a/server/src/engine/storage/v1/batch_jrnl/mod.rs +++ b/server/src/engine/storage/v1/batch_jrnl/mod.rs @@ -56,7 +56,7 @@ pub fn reinit( // restore let mut restore_driver = DataBatchRestoreDriver::new(f)?; restore_driver.read_data_batch_into_model(model)?; - DataBatchPersistDriver::new(restore_driver.into_file(), false) + DataBatchPersistDriver::new(restore_driver.into_file()?, false) } /// Create a new batch journal diff --git a/server/src/engine/storage/v1/batch_jrnl/persist.rs b/server/src/engine/storage/v1/batch_jrnl/persist.rs index c29e3486..ee355690 100644 --- a/server/src/engine/storage/v1/batch_jrnl/persist.rs +++ b/server/src/engine/storage/v1/batch_jrnl/persist.rs @@ -63,16 +63,12 @@ impl DataBatchPersistDriver { file.fsynced_write(&[MARKER_BATCH_REOPEN])?; } Ok(Self { - f: SDSSFileTrackedWriter::new(file), + f: SDSSFileTrackedWriter::new(file)?, }) } - pub fn close(&mut self) -> RuntimeResult<()> { - if self - .f - .inner_file() - .fsynced_write(&[MARKER_BATCH_CLOSED]) - .is_ok() - { + pub fn close(self) -> RuntimeResult<()> { + let mut slf = self.f.into_inner_file()?; + if slf.fsynced_write(&[MARKER_BATCH_CLOSED]).is_ok() { return Ok(()); } else { return Err(StorageError::DataBatchCloseError.into()); @@ -123,11 +119,9 @@ impl DataBatchPersistDriver { self.encode_row_data(model, &irm, &row_data)?; } } - // fsync now; we're good to go - self.f.fsync_all()?; i += 1; } - return self.append_batch_summary(observed_len, inconsistent_reads); + return self.append_batch_summary_and_sync(observed_len, inconsistent_reads); }; match exec() { Ok(()) => Ok(()), @@ -155,26 +149,28 @@ impl DataBatchPersistDriver { col_cnt: usize, ) -> RuntimeResult<()> { self.f - .unfsynced_write(&[MARKER_ACTUAL_BATCH_EVENT, pk_tag.value_u8()])?; + .write_unfsynced(&[MARKER_ACTUAL_BATCH_EVENT, pk_tag.value_u8()])?; let observed_len_bytes = observed_len.u64_bytes_le(); - self.f.unfsynced_write(&observed_len_bytes)?; + self.f.write_unfsynced(&observed_len_bytes)?; self.f - .unfsynced_write(&schema_version.value_u64().to_le_bytes())?; - self.f.unfsynced_write(&col_cnt.u64_bytes_le())?; + .write_unfsynced(&schema_version.value_u64().to_le_bytes())?; + self.f.write_unfsynced(&col_cnt.u64_bytes_le())?; Ok(()) } - /// Append a summary of this batch - fn append_batch_summary( + /// Append a summary of this batch and most importantly, **sync everything to disk** + fn append_batch_summary_and_sync( &mut self, observed_len: usize, inconsistent_reads: usize, ) -> RuntimeResult<()> { // [0xFD][actual_commit][checksum] - self.f.unfsynced_write(&[MARKER_END_OF_BATCH])?; + self.f.write_unfsynced(&[MARKER_END_OF_BATCH])?; let actual_commit = (observed_len - inconsistent_reads).u64_bytes_le(); - self.f.unfsynced_write(&actual_commit)?; + self.f.write_unfsynced(&actual_commit)?; let cs = self.f.reset_and_finish_checksum().to_le_bytes(); - self.f.inner_file().fsynced_write(&cs)?; + self.f.untracked_write(&cs)?; + // IMPORTANT: now that all data has been written, we need to actually ensure that the writes pass through the cache + self.f.sync_writes()?; Ok(()) } /// Attempt to fix the batch journal @@ -184,8 +180,7 @@ impl DataBatchPersistDriver { attempt to append 0xFF to the part of the file where a corruption likely occurred, marking it recoverable */ - let f = self.f.inner_file(); - if f.fsynced_write(&[MARKER_RECOVERY_EVENT]).is_ok() { + if self.f.untracked_write(&[MARKER_RECOVERY_EVENT]).is_ok() { return Ok(()); } Err(StorageError::DataBatchRecoveryFailStageOne.into()) @@ -203,7 +198,7 @@ impl DataBatchPersistDriver { pk.read_uint() } .to_le_bytes(); - buf.unfsynced_write(&data)?; + buf.write_unfsynced(&data)?; } TagUnique::Str | TagUnique::Bin => { let slice = unsafe { @@ -211,8 +206,8 @@ impl DataBatchPersistDriver { pk.read_bin() }; let slice_l = slice.len().u64_bytes_le(); - buf.unfsynced_write(&slice_l)?; - buf.unfsynced_write(slice)?; + buf.write_unfsynced(&slice_l)?; + buf.write_unfsynced(slice)?; } TagUnique::Illegal => unsafe { // UNSAFE(@ohsayan): a pk can't be constructed with illegal @@ -225,7 +220,7 @@ impl DataBatchPersistDriver { fn encode_cell(&mut self, value: &Datacell) -> RuntimeResult<()> { let mut buf = vec![]; cell::encode(&mut buf, value); - self.f.unfsynced_write(&buf)?; + self.f.write_unfsynced(&buf)?; Ok(()) } /// Encode row data @@ -241,7 +236,7 @@ impl DataBatchPersistDriver { self.encode_cell(cell)?; } None if field_name.as_ref() == mdl.p_key() => {} - None => self.f.unfsynced_write(&[0])?, + None => self.f.write_unfsynced(&[0])?, } } Ok(()) @@ -249,9 +244,9 @@ impl DataBatchPersistDriver { /// Write the change type and txnid fn write_batch_item_common_row_data(&mut self, delta: &DataDelta) -> RuntimeResult<()> { let change_type = [delta.change().value_u8()]; - self.f.unfsynced_write(&change_type)?; + self.f.write_unfsynced(&change_type)?; let txn_id = delta.data_version().value_u64().to_le_bytes(); - self.f.unfsynced_write(&txn_id)?; + self.f.write_unfsynced(&txn_id)?; Ok(()) } } diff --git a/server/src/engine/storage/v1/batch_jrnl/restore.rs b/server/src/engine/storage/v1/batch_jrnl/restore.rs index 1454735b..963be39f 100644 --- a/server/src/engine/storage/v1/batch_jrnl/restore.rs +++ b/server/src/engine/storage/v1/batch_jrnl/restore.rs @@ -114,7 +114,7 @@ impl DataBatchRestoreDriver { f: SDSSFileTrackedReader::new(f)?, }) } - pub fn into_file(self) -> SDSSFileIO { + pub fn into_file(self) -> RuntimeResult> { self.f.into_inner_file() } pub(in crate::engine::storage::v1) fn read_data_batch_into_model( @@ -312,11 +312,7 @@ impl DataBatchRestoreDriver { let actual_checksum = self.f.__reset_checksum(); // find hardcoded checksum let mut hardcoded_checksum = [0; sizeof!(u64)]; - self.f - .inner_file() - .read_to_buffer(&mut hardcoded_checksum)?; - // move file cursor ahead - self.f.__cursor_ahead_by(sizeof!(u64)); + self.f.untracked_read(&mut hardcoded_checksum)?; if actual_checksum == u64::from_le_bytes(hardcoded_checksum) { Ok(actual_commit) } else { @@ -414,7 +410,9 @@ impl DataBatchRestoreDriver { ))) } fn attempt_recover_data_batch(&mut self) -> RuntimeResult<()> { - if let Ok(MARKER_RECOVERY_EVENT) = self.f.inner_file().read_byte() { + let mut buf = [0u8; 1]; + self.f.untracked_read(&mut buf)?; + if let [MARKER_RECOVERY_EVENT] = buf { return Ok(()); } Err(StorageError::DataBatchRestoreCorruptedBatch.into()) diff --git a/server/src/engine/storage/v1/journal.rs b/server/src/engine/storage/v1/journal.rs index 9e200bd6..0f28ecef 100644 --- a/server/src/engine/storage/v1/journal.rs +++ b/server/src/engine/storage/v1/journal.rs @@ -423,7 +423,7 @@ impl JournalWriter { &JournalEntryMetadata::new(id, EventSourceMarker::DRIVER_REOPENED, 0, 0).encoded(), ) } - pub fn __append_journal_close_and_close(&mut self) -> RuntimeResult<()> { + pub fn __close_mut(&mut self) -> RuntimeResult<()> { self.closed = true; let id = self._incr_id() as u128; self.log_file.fsynced_write( @@ -431,9 +431,8 @@ impl JournalWriter { )?; Ok(()) } - #[cfg(test)] - pub fn append_journal_close_and_close(mut self) -> RuntimeResult<()> { - self.__append_journal_close_and_close() + pub fn close(mut self) -> RuntimeResult<()> { + self.__close_mut() } } diff --git a/server/src/engine/storage/v1/loader.rs b/server/src/engine/storage/v1/loader.rs index feb5ecba..b44b2000 100644 --- a/server/src/engine/storage/v1/loader.rs +++ b/server/src/engine/storage/v1/loader.rs @@ -70,26 +70,36 @@ impl SEInitState { &gns, ) }?; - if is_new { - std::fs::create_dir(DATA_DIR).inherit_set_dmsg("creating data directory")?; - } let mut model_drivers = ModelDrivers::new(); - if !is_new { - // this is an existing instance, so read in all data - for (space_name, space) in gns.spaces().read().iter() { - let space_uuid = space.get_uuid(); - for (model_name, model) in space.models().read().iter() { - let path = - Self::model_path(space_name, space_uuid, model_name, model.get_uuid()); - let persist_driver = batch_jrnl::reinit(&path, model).inherit_set_dmsg( - format!("failed to restore model data from journal in `{path}`"), - )?; - let _ = model_drivers.insert( - ModelUniqueID::new(space_name, model_name, model.get_uuid()), - FractalModelDriver::init(persist_driver), - ); + let mut driver_guard = || { + if is_new { + std::fs::create_dir(DATA_DIR).inherit_set_dmsg("creating data directory")?; + } + if !is_new { + // this is an existing instance, so read in all data + for (space_name, space) in gns.spaces().read().iter() { + let space_uuid = space.get_uuid(); + for (model_name, model) in space.models().read().iter() { + let path = + Self::model_path(space_name, space_uuid, model_name, model.get_uuid()); + let persist_driver = batch_jrnl::reinit(&path, model).inherit_set_dmsg( + format!("failed to restore model data from journal in `{path}`"), + )?; + let _ = model_drivers.insert( + ModelUniqueID::new(space_name, model_name, model.get_uuid()), + FractalModelDriver::init(persist_driver), + ); + } } } + RuntimeResult::Ok(()) + }; + if let Err(e) = driver_guard() { + gns_txn_driver.close().unwrap(); + for (_, driver) in model_drivers { + driver.close().unwrap(); + } + return Err(e); } Ok(SEInitState::new( GNSTransactionDriverAnyFS::new(gns_txn_driver), diff --git a/server/src/engine/storage/v1/memfs.rs b/server/src/engine/storage/v1/memfs.rs index 697ef903..d7d5ade5 100644 --- a/server/src/engine/storage/v1/memfs.rs +++ b/server/src/engine/storage/v1/memfs.rs @@ -28,8 +28,9 @@ use { crate::engine::{ error::RuntimeResult, storage::v1::rw::{ - FileOpen, RawFSInterface, RawFileInterface, RawFileInterfaceExt, RawFileInterfaceRead, - RawFileInterfaceWrite, RawFileInterfaceWriteExt, + FileOpen, RawFSInterface, RawFileInterface, RawFileInterfaceBufferedWriter, + RawFileInterfaceExt, RawFileInterfaceRead, RawFileInterfaceWrite, + RawFileInterfaceWriteExt, }, sync::cell::Lazy, }, @@ -119,7 +120,7 @@ impl RawFSInterface for VirtualFS { c.fw_write_all(&data)?; } FileOpen::Existing(mut e) => { - e.fw_truncate_to(0)?; + e.fwext_truncate_to(0)?; e.fw_write_all(&data)?; } } @@ -386,16 +387,24 @@ fn with_file(fpath: &str, mut f: impl FnMut(&VFile) -> RuntimeResult) -> R } impl RawFileInterface for VFileDescriptor { - type Reader = Self; - type Writer = Self; - fn into_buffered_reader(self) -> RuntimeResult { + type BufReader = Self; + type BufWriter = Self; + fn into_buffered_reader(self) -> RuntimeResult { Ok(self) } - fn into_buffered_writer(self) -> RuntimeResult { + fn downgrade_reader(r: Self::BufReader) -> RuntimeResult { + Ok(r) + } + fn into_buffered_writer(self) -> RuntimeResult { Ok(self) } + fn downgrade_writer(w: Self::BufWriter) -> RuntimeResult { + Ok(w) + } } +impl RawFileInterfaceBufferedWriter for VFileDescriptor {} + impl RawFileInterfaceRead for VFileDescriptor { fn fr_read_exact(&mut self, buf: &mut [u8]) -> RuntimeResult<()> { with_file_mut(&self.0, |file| { @@ -434,10 +443,10 @@ impl RawFileInterfaceWrite for VFileDescriptor { } impl RawFileInterfaceWriteExt for VFileDescriptor { - fn fw_fsync_all(&mut self) -> RuntimeResult<()> { + fn fwext_fsync_all(&mut self) -> RuntimeResult<()> { with_file(&self.0, |_| Ok(())) } - fn fw_truncate_to(&mut self, to: u64) -> RuntimeResult<()> { + fn fwext_truncate_to(&mut self, to: u64) -> RuntimeResult<()> { with_file_mut(&self.0, |file| { if !file.write { return Err( @@ -532,10 +541,10 @@ impl RawFileInterfaceWrite for NullFile { } } impl RawFileInterfaceWriteExt for NullFile { - fn fw_fsync_all(&mut self) -> RuntimeResult<()> { + fn fwext_fsync_all(&mut self) -> RuntimeResult<()> { Ok(()) } - fn fw_truncate_to(&mut self, _: u64) -> RuntimeResult<()> { + fn fwext_truncate_to(&mut self, _: u64) -> RuntimeResult<()> { Ok(()) } } @@ -553,12 +562,20 @@ impl RawFileInterfaceExt for NullFile { } } impl RawFileInterface for NullFile { - type Reader = Self; - type Writer = Self; - fn into_buffered_reader(self) -> RuntimeResult { + type BufReader = Self; + type BufWriter = Self; + fn into_buffered_reader(self) -> RuntimeResult { Ok(self) } - fn into_buffered_writer(self) -> RuntimeResult { + fn downgrade_reader(r: Self::BufReader) -> RuntimeResult { + Ok(r) + } + fn into_buffered_writer(self) -> RuntimeResult { Ok(self) } + fn downgrade_writer(w: Self::BufWriter) -> RuntimeResult { + Ok(w) + } } + +impl RawFileInterfaceBufferedWriter for NullFile {} diff --git a/server/src/engine/storage/v1/rw.rs b/server/src/engine/storage/v1/rw.rs index 72028842..5f29b828 100644 --- a/server/src/engine/storage/v1/rw.rs +++ b/server/src/engine/storage/v1/rw.rs @@ -101,17 +101,28 @@ pub trait RawFSInterface { } /// A file (well, probably) that can be used for RW operations along with advanced write and extended operations (such as seeking) -pub trait RawFileInterface +pub trait RawFileInterface: Sized where Self: RawFileInterfaceRead + RawFileInterfaceWrite + RawFileInterfaceWriteExt + RawFileInterfaceExt, { - type Reader: RawFileInterfaceRead + RawFileInterfaceExt; - type Writer: RawFileInterfaceWrite + RawFileInterfaceExt; - fn into_buffered_reader(self) -> RuntimeResult; - fn into_buffered_writer(self) -> RuntimeResult; + type BufReader: RawFileInterfaceBufferedReader; + type BufWriter: RawFileInterfaceBufferedWriter; + fn into_buffered_reader(self) -> RuntimeResult; + fn downgrade_reader(r: Self::BufReader) -> RuntimeResult; + fn into_buffered_writer(self) -> RuntimeResult; + fn downgrade_writer(w: Self::BufWriter) -> RuntimeResult; +} + +pub trait RawFileInterfaceBufferedReader: RawFileInterfaceRead + RawFileInterfaceExt {} +impl RawFileInterfaceBufferedReader for R {} + +pub trait RawFileInterfaceBufferedWriter: RawFileInterfaceWrite + RawFileInterfaceExt { + fn sync_write_cache(&mut self) -> RuntimeResult<()> { + Ok(()) + } } /// A file interface that supports read operations @@ -138,8 +149,8 @@ impl RawFileInterfaceWrite for W { /// A file interface that supports advanced write operations pub trait RawFileInterfaceWriteExt { - fn fw_fsync_all(&mut self) -> RuntimeResult<()>; - fn fw_truncate_to(&mut self, to: u64) -> RuntimeResult<()>; + fn fwext_fsync_all(&mut self) -> RuntimeResult<()>; + fn fwext_truncate_to(&mut self, to: u64) -> RuntimeResult<()>; } /// A file interface that supports advanced file operations @@ -206,21 +217,37 @@ impl RawFSInterface for LocalFS { } impl RawFileInterface for File { - type Reader = BufReader; - type Writer = BufWriter; - fn into_buffered_reader(self) -> RuntimeResult { + type BufReader = BufReader; + type BufWriter = BufWriter; + fn into_buffered_reader(self) -> RuntimeResult { Ok(BufReader::new(self)) } - fn into_buffered_writer(self) -> RuntimeResult { + fn downgrade_reader(r: Self::BufReader) -> RuntimeResult { + Ok(r.into_inner()) + } + fn into_buffered_writer(self) -> RuntimeResult { Ok(BufWriter::new(self)) } + fn downgrade_writer(mut w: Self::BufWriter) -> RuntimeResult { + w.flush()?; // TODO(@ohsayan): handle rare case where writer does panic + let (w, _) = w.into_parts(); + Ok(w) + } +} + +impl RawFileInterfaceBufferedWriter for BufWriter { + fn sync_write_cache(&mut self) -> RuntimeResult<()> { + self.flush()?; + self.get_mut().sync_all()?; + Ok(()) + } } impl RawFileInterfaceWriteExt for File { - fn fw_fsync_all(&mut self) -> RuntimeResult<()> { + fn fwext_fsync_all(&mut self) -> RuntimeResult<()> { cvt(self.sync_all()) } - fn fw_truncate_to(&mut self, to: u64) -> RuntimeResult<()> { + fn fwext_truncate_to(&mut self, to: u64) -> RuntimeResult<()> { cvt(self.set_len(to)) } } @@ -270,40 +297,44 @@ impl RawFileInterfaceExt for F { } pub struct SDSSFileTrackedWriter { - f: SDSSFileIO, + f: SDSSFileIO::BufWriter>, cs: SCrc, } impl SDSSFileTrackedWriter { - pub fn new(f: SDSSFileIO) -> Self { - Self { f, cs: SCrc::new() } + pub fn new(f: SDSSFileIO) -> RuntimeResult { + Ok(Self { + f: f.into_buffered_sdss_writer()?, + cs: SCrc::new(), + }) } - pub fn unfsynced_write(&mut self, block: &[u8]) -> RuntimeResult<()> { + pub fn write_unfsynced(&mut self, block: &[u8]) -> RuntimeResult<()> { + self.untracked_write(block) + .map(|_| self.cs.recompute_with_new_var_block(block)) + } + pub fn untracked_write(&mut self, block: &[u8]) -> RuntimeResult<()> { match self.f.unfsynced_write(block) { - Ok(()) => { - self.cs.recompute_with_new_var_block(block); - Ok(()) - } + Ok(()) => Ok(()), e => e, } } - pub fn fsync_all(&mut self) -> RuntimeResult<()> { - self.f.fsync_all() + pub fn sync_writes(&mut self) -> RuntimeResult<()> { + self.f.f.sync_write_cache() } pub fn reset_and_finish_checksum(&mut self) -> u64 { let mut scrc = SCrc::new(); core::mem::swap(&mut self.cs, &mut scrc); scrc.finish() } - pub fn inner_file(&mut self) -> &mut SDSSFileIO { - &mut self.f + pub fn into_inner_file(self) -> RuntimeResult> { + self.f.downgrade_writer() } } /// [`SDSSFileLenTracked`] simply maintains application level length and checksum tracking to avoid frequent syscalls because we /// do not expect (even though it's very possible) users to randomly modify file lengths while we're reading them pub struct SDSSFileTrackedReader { - f: SDSSFileIO, + f: SDSSFileIO::BufReader>, len: u64, pos: u64, cs: SCrc, @@ -314,6 +345,7 @@ impl SDSSFileTrackedReader { pub fn new(mut f: SDSSFileIO) -> RuntimeResult { let len = f.file_length()?; let pos = f.retrieve_cursor()?; + let f = f.into_buffered_sdss_reader()?; Ok(Self { f, len, @@ -331,11 +363,23 @@ impl SDSSFileTrackedReader { self.remaining() >= v } pub fn read_into_buffer(&mut self, buf: &mut [u8]) -> RuntimeResult<()> { + self.untracked_read(buf) + .map(|_| self.cs.recompute_with_new_var_block(buf)) + } + pub fn read_byte(&mut self) -> RuntimeResult { + let mut buf = [0u8; 1]; + self.read_into_buffer(&mut buf).map(|_| buf[0]) + } + pub fn __reset_checksum(&mut self) -> u64 { + let mut crc = SCrc::new(); + core::mem::swap(&mut crc, &mut self.cs); + crc.finish() + } + pub fn untracked_read(&mut self, buf: &mut [u8]) -> RuntimeResult<()> { if self.remaining() >= buf.len() as u64 { match self.f.read_to_buffer(buf) { Ok(()) => { self.pos += buf.len() as u64; - self.cs.recompute_with_new_var_block(buf); Ok(()) } Err(e) => return Err(e), @@ -344,23 +388,8 @@ impl SDSSFileTrackedReader { Err(SysIOError::from(std::io::ErrorKind::InvalidInput).into()) } } - pub fn read_byte(&mut self) -> RuntimeResult { - let mut buf = [0u8; 1]; - self.read_into_buffer(&mut buf).map(|_| buf[0]) - } - pub fn __reset_checksum(&mut self) -> u64 { - let mut crc = SCrc::new(); - core::mem::swap(&mut crc, &mut self.cs); - crc.finish() - } - pub fn inner_file(&mut self) -> &mut SDSSFileIO { - &mut self.f - } - pub fn into_inner_file(self) -> SDSSFileIO { - self.f - } - pub fn __cursor_ahead_by(&mut self, sizeof: usize) { - self.pos += sizeof as u64; + pub fn into_inner_file(self) -> RuntimeResult> { + self.f.downgrade_reader() } pub fn read_block(&mut self) -> RuntimeResult<[u8; N]> { if !self.has_left(N as _) { @@ -376,8 +405,8 @@ impl SDSSFileTrackedReader { } #[derive(Debug)] -pub struct SDSSFileIO { - f: Fs::File, +pub struct SDSSFileIO::File> { + f: F, _fs: PhantomData, } @@ -408,42 +437,60 @@ impl SDSSFileIO { } } } + pub fn into_buffered_sdss_reader( + self, + ) -> RuntimeResult::BufReader>> { + self.f.into_buffered_reader().map(SDSSFileIO::_new) + } + pub fn into_buffered_sdss_writer( + self, + ) -> RuntimeResult::BufWriter>> { + self.f.into_buffered_writer().map(SDSSFileIO::_new) + } } -impl SDSSFileIO { - pub fn _new(f: Fs::File) -> Self { +impl SDSSFileIO::BufReader> { + pub fn downgrade_reader(self) -> RuntimeResult> { + let me = ::downgrade_reader(self.f)?; + Ok(SDSSFileIO::_new(me)) + } +} + +impl SDSSFileIO::BufWriter> { + pub fn downgrade_writer(self) -> RuntimeResult> { + let me = ::downgrade_writer(self.f)?; + Ok(SDSSFileIO::_new(me)) + } +} + +impl SDSSFileIO { + pub fn _new(f: F) -> Self { Self { f, _fs: PhantomData, } } - pub fn unfsynced_write(&mut self, data: &[u8]) -> RuntimeResult<()> { - self.f.fw_write_all(data) - } - pub fn fsync_all(&mut self) -> RuntimeResult<()> { - self.f.fw_fsync_all()?; - Ok(()) - } - pub fn fsynced_write(&mut self, data: &[u8]) -> RuntimeResult<()> { - self.f.fw_write_all(data)?; - self.f.fw_fsync_all() - } +} + +impl SDSSFileIO { pub fn read_to_buffer(&mut self, buffer: &mut [u8]) -> RuntimeResult<()> { self.f.fr_read_exact(buffer) } +} + +impl SDSSFileIO { + pub fn retrieve_cursor(&mut self) -> RuntimeResult { + self.f.fext_cursor() + } pub fn file_length(&self) -> RuntimeResult { self.f.fext_file_length() } pub fn seek_from_start(&mut self, by: u64) -> RuntimeResult<()> { self.f.fext_seek_ahead_from_start_by(by) } - pub fn retrieve_cursor(&mut self) -> RuntimeResult { - self.f.fext_cursor() - } - pub fn read_byte(&mut self) -> RuntimeResult { - let mut r = [0; 1]; - self.read_to_buffer(&mut r).map(|_| r[0]) - } +} + +impl SDSSFileIO { pub fn load_remaining_into_buffer(&mut self) -> RuntimeResult> { let len = self.file_length()? - self.retrieve_cursor()?; let mut buf = vec![0; len as usize]; @@ -451,3 +498,20 @@ impl SDSSFileIO { Ok(buf) } } + +impl SDSSFileIO { + pub fn unfsynced_write(&mut self, data: &[u8]) -> RuntimeResult<()> { + self.f.fw_write_all(data) + } +} + +impl SDSSFileIO { + pub fn fsync_all(&mut self) -> RuntimeResult<()> { + self.f.fwext_fsync_all()?; + Ok(()) + } + pub fn fsynced_write(&mut self, data: &[u8]) -> RuntimeResult<()> { + self.f.fw_write_all(data)?; + self.f.fwext_fsync_all() + } +} diff --git a/server/src/engine/storage/v1/tests/batch.rs b/server/src/engine/storage/v1/tests/batch.rs index 4dd654e8..a0e6ad1e 100644 --- a/server/src/engine/storage/v1/tests/batch.rs +++ b/server/src/engine/storage/v1/tests/batch.rs @@ -67,7 +67,7 @@ fn open_batch_data(fpath: &str, mdl: &Model) -> DataBatchPersistDriver { let mut dbr = DataBatchRestoreDriver::new(f).unwrap(); dbr.read_data_batch_into_model(mdl).unwrap(); - DataBatchPersistDriver::new(dbr.into_file(), false) + DataBatchPersistDriver::new(dbr.into_file().unwrap(), false) } } .unwrap() @@ -144,7 +144,7 @@ fn empty_multi_open_reopen() { ), ); for _ in 0..100 { - let mut writer = open_batch_data("empty_multi_open_reopen.db-btlog", &mdl); + let writer = open_batch_data("empty_multi_open_reopen.db-btlog", &mdl); writer.close().unwrap(); } } diff --git a/server/src/engine/storage/v1/tests/tx.rs b/server/src/engine/storage/v1/tests/tx.rs index 17dae70d..ba814378 100644 --- a/server/src/engine/storage/v1/tests/tx.rs +++ b/server/src/engine/storage/v1/tests/tx.rs @@ -142,7 +142,7 @@ fn first_boot_second_readonly() { let mut log = open_log("testtxn.log", &db1)?; db1.txn_set(0, 20, &mut log)?; db1.txn_set(9, 21, &mut log)?; - log.append_journal_close_and_close() + log.close() }; x().unwrap(); // backup original data @@ -151,7 +151,7 @@ fn first_boot_second_readonly() { let empty_db2 = Database::new(); open_log("testtxn.log", &empty_db2) .unwrap() - .append_journal_close_and_close() + .close() .unwrap(); assert_eq!(original_data, empty_db2.copy_data()); } @@ -164,7 +164,7 @@ fn oneboot_mod_twoboot_mod_thirdboot_read() { for i in 0..10 { db1.txn_set(i, 1, &mut log)?; } - log.append_journal_close_and_close() + log.close() }; x().unwrap(); let bkp_db1 = db1.copy_data(); @@ -178,7 +178,7 @@ fn oneboot_mod_twoboot_mod_thirdboot_read() { let current_val = db2.data.borrow()[i]; db2.txn_set(i, current_val + i as u8, &mut log)?; } - log.append_journal_close_and_close() + log.close() }; x().unwrap(); let bkp_db2 = db2.copy_data(); @@ -186,7 +186,7 @@ fn oneboot_mod_twoboot_mod_thirdboot_read() { // third boot let db3 = Database::new(); let log = open_log("duatxn.db-tlog", &db3).unwrap(); - log.append_journal_close_and_close().unwrap(); + log.close().unwrap(); assert_eq!(bkp_db2, db3.copy_data()); assert_eq!( db3.copy_data(), diff --git a/server/src/engine/txn/gns/mod.rs b/server/src/engine/txn/gns/mod.rs index 9befe0a1..09f33f5c 100644 --- a/server/src/engine/txn/gns/mod.rs +++ b/server/src/engine/txn/gns/mod.rs @@ -65,6 +65,9 @@ impl GNSTransactionDriverAnyFS { pub fn new(journal: JournalWriter) -> Self { Self { journal } } + pub fn into_inner(self) -> JournalWriter { + self.journal + } pub fn __journal_mut(&mut self) -> &mut JournalWriter { &mut self.journal } diff --git a/server/src/main.rs b/server/src/main.rs index c6dc512b..bed3a748 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -83,6 +83,7 @@ fn main() { match runtime.block_on(async move { engine::start(config, global).await }) { Ok(()) => { engine::finish(g); + info!("finished all pending tasks. Goodbye!"); } Err(e) => { error!("{e}");