diff --git a/Cargo.lock b/Cargo.lock index 63947817..ca9a799b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -30,9 +30,9 @@ dependencies = [ [[package]] name = "aho-corasick" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2969dcb958b36655471fc61f7e416fa76033bdd4bfed0678d8fee1e2d07a1f0" +checksum = "8e60d3430d3a69478ad0993f19238d2df97c507009a52b3c10addcd7f6bcb916" dependencies = [ "memchr", ] @@ -102,13 +102,13 @@ dependencies = [ [[package]] name = "async-trait" -version = "0.1.77" +version = "0.1.78" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c980ee35e870bd1a4d2c8294d4c04d0499e67bca1e4b5cefcc693c2fa00caea9" +checksum = "461abc97219de0eaaf81fe3ef974a540158f3d079c2ab200f891f1a2ef201e85" dependencies = [ "proc-macro2", "quote", - "syn 2.0.52", + "syn 2.0.53", ] [[package]] @@ -119,9 +119,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] name = "backtrace" -version = "0.3.69" +version = "0.3.71" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2089b7e3f35b9dd2d0ed921ead4f6d318c27680d4a5bd167b3ee120edb105837" +checksum = "26b05800d2e817c8b3b4b54abd461726265fa9789ae34330622f2db9ee696f9d" dependencies = [ "addr2line", "cc", @@ -165,9 +165,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bitflags" -version = "2.4.2" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed570934406eb16438a4e976b1b4500774099c13b8cb96eec99f620f05090ddf" +checksum = "cf4b9d6a944f767f8e5e0db018570623c85f3d925ac718db4e06d0187adb21c1" [[package]] name = "block-buffer" @@ -202,9 +202,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.5.0" +version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223" +checksum = "514de17de45fdb8dc022b1a7975556c53c86f9f0aa5f534b98977b171857c2c9" [[package]] name = "bzip2" @@ -275,9 +275,9 @@ dependencies = [ [[package]] name = "clipboard-win" -version = "5.2.0" +version = "5.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "12f9a0700e0127ba15d1d52dd742097f821cd9c65939303a44d970465040a297" +checksum = "d517d4b86184dbb111d3556a10f1c8a04da7428d2987bf1081602bf11c3aa9ee" dependencies = [ "error-code", ] @@ -373,7 +373,7 @@ version = "0.27.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f476fe445d41c9e991fd07515a6f463074b782242ccf4a5b7b1d1012e70824df" dependencies = [ - "bitflags 2.4.2", + "bitflags 2.5.0", "crossterm_winapi", "libc", "mio", @@ -650,9 +650,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.2.5" +version = "2.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b0b929d511467233429c45a44ac1dcaa21ba0f5ba11e4879e6ed28ddb4f9df4" +checksum = "168fb715dda47215e360912c096649d23d58bf392ac62f73919e831745e40f26" dependencies = [ "equivalent", "hashbrown", @@ -812,7 +812,7 @@ version = "0.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ab2156c4fce2f8df6c499cc1c763e4394b7482525bf2a9701c9d79d215f519e4" dependencies = [ - "bitflags 2.4.2", + "bitflags 2.5.0", "cfg-if", "cfg_aliases", "libc", @@ -864,7 +864,7 @@ version = "0.10.64" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "95a0481286a310808298130d22dd1fef0fa571e05a8f44ec801801e84b216b1f" dependencies = [ - "bitflags 2.4.2", + "bitflags 2.5.0", "cfg-if", "foreign-types", "libc", @@ -881,7 +881,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.52", + "syn 2.0.53", ] [[package]] @@ -1085,9 +1085,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.10.3" +version = "1.10.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b62dbe01f0b06f9d8dc7d49e05a0785f153b00b2c227856282f671e0318c9b15" +checksum = "c117dbdfde9c8308975b6a18d71f3f385c89461f7b3fb054288ecf2a2058ba4c" dependencies = [ "aho-corasick", "memchr", @@ -1120,11 +1120,11 @@ checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" [[package]] name = "rustix" -version = "0.38.31" +version = "0.38.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ea3e1a662af26cd7a3ba09c0297a31af215563ecf42817c98df621387f4e949" +checksum = "65e04861e65f21776e67888bfbea442b3642beaa0138fdb1dd7a84a52dffdb89" dependencies = [ - "bitflags 2.4.2", + "bitflags 2.5.0", "errno", "libc", "linux-raw-sys", @@ -1137,7 +1137,7 @@ version = "14.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7803e8936da37efd9b6d4478277f4b2b9bb5cdb37a113e8d63222e58da647e63" dependencies = [ - "bitflags 2.4.2", + "bitflags 2.5.0", "cfg-if", "clipboard-win", "fd-lock", @@ -1223,14 +1223,14 @@ checksum = "7eb0b34b42edc17f6b7cac84a52a1c5f0e1bb2227e997ca9011ea3dd34e8610b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.52", + "syn 2.0.53", ] [[package]] name = "serde_yaml" -version = "0.9.32" +version = "0.9.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8fd075d994154d4a774f95b51fb96bdc2832b0ea48425c92546073816cda1f2f" +checksum = "a0623d197252096520c6f2a5e1171ee436e5af99a5d7caa2891e55e61950e6d9" dependencies = [ "indexmap", "itoa", @@ -1389,9 +1389,9 @@ dependencies = [ [[package]] name = "smallvec" -version = "1.13.1" +version = "1.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6ecd384b10a64542d77071bd64bd7b231f4ed5940fba55e98c3de13824cf3d7" +checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" [[package]] name = "socket2" @@ -1422,9 +1422,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.52" +version = "2.0.53" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b699d15b36d1f02c3e7c69f8ffef53de37aefae075d8488d4ba1a7788d574a07" +checksum = "7383cd0e49fff4b6b90ca5670bfd3e9d6a733b3f90c686605aa7eec8c4996032" dependencies = [ "proc-macro2", "quote", @@ -1489,7 +1489,7 @@ checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.52", + "syn 2.0.53", ] [[package]] @@ -1540,9 +1540,9 @@ checksum = "e51733f11c9c4f72aa0c160008246859e340b00807569a0da0e7a1079b27ba85" [[package]] name = "unsafe-libyaml" -version = "0.2.10" +version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab4c90930b95a82d00dc9e9ac071b4991924390d46cbd0dfe566148667605e4b" +checksum = "673aac59facbab8a9007c7f6108d11f63b603f7cabff99fabf650fea5c32b861" [[package]] name = "utf8parse" @@ -1552,9 +1552,9 @@ checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" [[package]] name = "uuid" -version = "1.7.0" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f00cc9702ca12d3c81455259621e676d0f7251cec66a21e98fe2e9a37db93b2a" +checksum = "a183cf7feeba97b4dd1c0d46788634f6221d87fa961b305bed08c851829efcc0" dependencies = [ "getrandom", "rand", @@ -1563,13 +1563,13 @@ dependencies = [ [[package]] name = "uuid-macro-internal" -version = "1.7.0" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7abb14ae1a50dad63eaa768a458ef43d298cd1bd44951677bd10b732a9ba2a2d" +checksum = "9881bea7cbe687e36c9ab3b778c36cd0487402e270304e8b1296d5085303c1a2" dependencies = [ "proc-macro2", "quote", - "syn 2.0.52", + "syn 2.0.53", ] [[package]] @@ -1611,7 +1611,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.52", + "syn 2.0.53", "wasm-bindgen-shared", ] @@ -1633,7 +1633,7 @@ checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.52", + "syn 2.0.53", "wasm-bindgen-backend", "wasm-bindgen-shared", ] diff --git a/server/Cargo.toml b/server/Cargo.toml index dd757b51..98e71b48 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -16,7 +16,7 @@ libsky = { path = "../libsky" } sky_macros = { path = "../sky-macros" } rcrypt = "0.4.0" # external deps -bytes = "1.5.0" +bytes = "1.6.0" env_logger = "0.11.3" log = "0.4.21" openssl = { version = "0.10.64", features = ["vendored"] } @@ -25,9 +25,9 @@ parking_lot = "0.12.1" serde = { version = "1.0.197", features = ["derive"] } tokio = { version = "1.36.0", features = ["full"] } tokio-openssl = "0.6.4" -uuid = { version = "1.7.0", features = ["v4", "fast-rng", "macro-diagnostics"] } +uuid = { version = "1.8.0", features = ["v4", "fast-rng", "macro-diagnostics"] } crc = "3.0.1" -serde_yaml = "0.9.32" +serde_yaml = "0.9.33" chrono = "0.4.35" [target.'cfg(all(not(target_env = "msvc"), not(miri)))'.dependencies] diff --git a/server/src/engine/core/index/mod.rs b/server/src/engine/core/index/mod.rs index a6df689a..e3964ca4 100644 --- a/server/src/engine/core/index/mod.rs +++ b/server/src/engine/core/index/mod.rs @@ -71,9 +71,13 @@ impl PrimaryIndex { } #[derive(Debug)] -pub struct IndexLatchHandleShared<'t>(parking_lot::RwLockReadGuard<'t, ()>); +pub struct IndexLatchHandleShared<'t> { + _lck: parking_lot::RwLockReadGuard<'t, ()>, +} #[derive(Debug)] -pub struct IndexLatchHandleExclusive<'t>(parking_lot::RwLockWriteGuard<'t, ()>); +pub struct IndexLatchHandleExclusive<'t> { + _lck: parking_lot::RwLockWriteGuard<'t, ()>, +} #[derive(Debug)] struct IndexLatch { @@ -87,9 +91,13 @@ impl IndexLatch { } } fn gl_handle_shared(&self) -> IndexLatchHandleShared { - IndexLatchHandleShared(self.glck.read()) + IndexLatchHandleShared { + _lck: self.glck.read(), + } } fn gl_handle_exclusive(&self) -> IndexLatchHandleExclusive { - IndexLatchHandleExclusive(self.glck.write()) + IndexLatchHandleExclusive { + _lck: self.glck.write(), + } } } diff --git a/server/src/engine/error.rs b/server/src/engine/error.rs index 6997f14d..8513fe19 100644 --- a/server/src/engine/error.rs +++ b/server/src/engine/error.rs @@ -185,19 +185,24 @@ enumerate_err! { } enumerate_err! { - #[derive(Debug, PartialEq)] + #[derive(Debug, PartialEq, Clone, Copy)] /// SDSS based storage engine errors pub enum StorageError { - // header + /* + ---- + SDSS Errors + ---- + These errors are common across all versions + */ /// version mismatch - HeaderDecodeVersionMismatch = "header-version-mismatch", + FileDecodeHeaderVersionMismatch = "header-version-mismatch", /// The entire header is corrupted - HeaderDecodeCorruptedHeader = "header-corrupted", - // journal - /// An entry in the journal is corrupted - JournalLogEntryCorrupted = "journal-entry-corrupted", - /// The structure of the journal is corrupted - JournalCorrupted = "journal-corrupted", + FileDecodeHeaderCorrupted = "header-corrupted", + /* + ---- + Common encoding errors + ---- + */ // internal file structures /// While attempting to decode a structure in an internal segment of a file, the storage engine ran into a possibly irrecoverable error InternalDecodeStructureCorrupted = "structure-decode-corrupted", @@ -205,21 +210,43 @@ enumerate_err! { InternalDecodeStructureCorruptedPayload = "structure-decode-corrupted-payload", /// the data for an internal structure was decoded but is logically invalid InternalDecodeStructureIllegalData = "structure-decode-illegal-data", + /* + ---- + V1 Journal Errors + ---- + */ + /// An entry in the journal is corrupted + V1JournalDecodeLogEntryCorrupted = "journal-entry-corrupted", + /// The structure of the journal is corrupted + V1JournalDecodeCorrupted = "journal-corrupted", /// when attempting to restore a data batch from disk, the batch journal crashed and had a corruption, but it is irrecoverable - DataBatchRestoreCorruptedBatch = "batch-corrupted-batch", + V1DataBatchDecodeCorruptedBatch = "batch-corrupted-batch", /// when attempting to restore a data batch from disk, the driver encountered a corrupted entry - DataBatchRestoreCorruptedEntry = "batch-corrupted-entry", - /// we failed to close the data batch - DataBatchCloseError = "batch-persist-close-failed", + V1DataBatchDecodeCorruptedEntry = "batch-corrupted-entry", /// the data batch file is corrupted - DataBatchRestoreCorruptedBatchFile = "batch-corrupted-file", + V1DataBatchDecodeCorruptedBatchFile = "batch-corrupted-file", /// the system database is corrupted - SysDBCorrupted = "sysdb-corrupted", - // raw journal errors - RawJournalEventCorruptedMetadata = "journal-event-metadata-corrupted", - RawJournalEventCorrupted = "journal-invalid-event", - RawJournalCorrupted = "journal-corrupted", - RawJournalInvalidEvent = "journal-invalid-event-order", + V1SysDBDecodeCorrupted = "sysdb-corrupted", + /// we failed to close the data batch + V1DataBatchRuntimeCloseError = "batch-persist-close-failed", + /* + ---- + V2 Journal Errors + ---- + */ + /// Journal event metadata corrupted + RawJournalDecodeEventCorruptedMetadata = "journal-event-metadata-corrupted", + /// The event body is corrupted + RawJournalDecodeEventCorruptedPayload = "journal-event-payload-corrupted", + /// batch contents was unexpected (for example, we expected n events but got m events) + RawJournalDecodeBatchContentsMismatch = "journal-batch-unexpected-termination", + /// batch contents was validated and executed but the final integrity check failed + RawJournalDecodeBatchIntegrityFailure = "journal-batch-integrity-check-failed", + /// unexpected order of events + RawJournalDecodeInvalidEvent = "journal-invalid-event-order", + /// corrupted event within a batch + RawJournalDecodeCorruptionInBatchMetadata = "journal-batch-corrupted-event-metadata", + /// runtime error: the lightweight heartbeat failed RawJournalRuntimeCriticalLwtHBFail = "journal-lwt-heartbeat-failed", } } diff --git a/server/src/engine/fractal/mod.rs b/server/src/engine/fractal/mod.rs index f153ab75..d1b10b82 100644 --- a/server/src/engine/fractal/mod.rs +++ b/server/src/engine/fractal/mod.rs @@ -37,6 +37,7 @@ use { std::{ fmt, mem::MaybeUninit, + ptr::addr_of_mut, sync::atomic::{AtomicUsize, Ordering}, }, tokio::sync::mpsc::unbounded_channel, @@ -262,16 +263,16 @@ impl Global { .get_rt_stat() .per_mdl_delta_max_size() } - unsafe fn __gref_raw() -> &'static mut MaybeUninit { + unsafe fn __gref_raw() -> *mut MaybeUninit { static mut G: MaybeUninit = MaybeUninit::uninit(); - &mut G + addr_of_mut!(G) } unsafe fn __gref(&self) -> &'static GlobalState { - Self::__gref_raw().assume_init_ref() + (&*Self::__gref_raw()).assume_init_ref() } pub unsafe fn unload_all(self) { // TODO(@ohsayan): handle errors - let GlobalState { gns, .. } = Self::__gref_raw().assume_init_read(); + let GlobalState { gns, .. } = Self::__gref_raw().read().assume_init(); let mut gns_driver = gns.gns_driver().txn_driver.lock(); GNSDriver::close_driver(&mut gns_driver).unwrap(); for mdl in gns diff --git a/server/src/engine/fractal/test_utils.rs b/server/src/engine/fractal/test_utils.rs index f2572e0b..219cf0de 100644 --- a/server/src/engine/fractal/test_utils.rs +++ b/server/src/engine/fractal/test_utils.rs @@ -77,6 +77,7 @@ impl TestGlobal { model_name.entity(), model_data.get_uuid(), ), + Default::default(), )?; model.driver().initialize_model_driver(driver); } @@ -97,7 +98,7 @@ impl TestGlobal { Err(e) => match e.kind() { ErrorKind::IoError(e_) => match e_.kind() { std::io::ErrorKind::AlreadyExists => { - GNSDriver::open_gns_with_name(log_name, &data) + GNSDriver::open_gns_with_name(log_name, &data, Default::default()) } _ => Err(e), }, diff --git a/server/src/engine/storage/common/sdss/impls/sdss_r1/mod.rs b/server/src/engine/storage/common/sdss/impls/sdss_r1/mod.rs index 2143a384..7cebd236 100644 --- a/server/src/engine/storage/common/sdss/impls/sdss_r1/mod.rs +++ b/server/src/engine/storage/common/sdss/impls/sdss_r1/mod.rs @@ -47,7 +47,7 @@ use { util::{compiler::TaggedEnum, os}, IoResult, }, - std::{mem::ManuallyDrop, ops::Range}, + std::ops::Range, }; pub const TEST_TIME: u128 = (u64::MAX / sizeof!(u64) as u64) as _; @@ -338,14 +338,10 @@ impl HeaderV1 { }) } else { let version_okay = okay_header_version & okay_server_version & okay_driver_version; - let md = ManuallyDrop::new([ - StorageError::HeaderDecodeCorruptedHeader, - StorageError::HeaderDecodeVersionMismatch, - ]); - Err(unsafe { - // UNSAFE(@ohsayan): while not needed, md for drop safety + correct index - md.as_ptr().add(!version_okay as usize).read().into() - }) + Err([ + StorageError::FileDecodeHeaderCorrupted, + StorageError::FileDecodeHeaderVersionMismatch, + ][!version_okay as usize]) } } } @@ -444,7 +440,7 @@ pub trait SimpleFileSpecV1 { if v == Self::FILE_SPECFIER_VERSION { Ok(()) } else { - Err(StorageError::HeaderDecodeVersionMismatch.into()) + Err(StorageError::FileDecodeHeaderVersionMismatch.into()) } } } @@ -466,7 +462,7 @@ impl FileSpecV1 for Sfs { if okay { Ok(md) } else { - Err(StorageError::HeaderDecodeVersionMismatch.into()) + Err(StorageError::FileDecodeHeaderVersionMismatch.into()) } } fn write_metadata(f: &mut impl FileWrite, _: Self::EncodeArgs) -> IoResult { diff --git a/server/src/engine/storage/common/sdss/impls/sdss_r1/rw.rs b/server/src/engine/storage/common/sdss/impls/sdss_r1/rw.rs index 11e4410f..8c8a66fa 100644 --- a/server/src/engine/storage/common/sdss/impls/sdss_r1/rw.rs +++ b/server/src/engine/storage/common/sdss/impls/sdss_r1/rw.rs @@ -234,13 +234,13 @@ impl TrackedReader { Err(e) => return Err(e), } } else { - Err(SysIOError::from(std::io::ErrorKind::InvalidInput).into_inner()) + Err(SysIOError::from(std::io::ErrorKind::UnexpectedEof).into_inner()) } } /// Tracked read of a given block size. Shorthand for [`Self::tracked_read`] pub fn read_block(&mut self) -> IoResult<[u8; N]> { if !self.has_left(N as _) { - return Err(SysIOError::from(std::io::ErrorKind::InvalidInput).into_inner()); + return Err(SysIOError::from(std::io::ErrorKind::UnexpectedEof).into_inner()); } let mut buf = [0; N]; self.tracked_read(&mut buf)?; diff --git a/server/src/engine/storage/common_encoding/r1/impls/gns/mod.rs b/server/src/engine/storage/common_encoding/r1/impls/gns/mod.rs index bf501f42..42470a8f 100644 --- a/server/src/engine/storage/common_encoding/r1/impls/gns/mod.rs +++ b/server/src/engine/storage/common_encoding/r1/impls/gns/mod.rs @@ -74,7 +74,7 @@ where if scanner.eof() { Ok(()) } else { - Err(StorageError::JournalLogEntryCorrupted.into()) + Err(StorageError::V1JournalDecodeLogEntryCorrupted.into()) } } fn decode_and_update_global_state( diff --git a/server/src/engine/storage/common_encoding/r1/obj.rs b/server/src/engine/storage/common_encoding/r1/obj.rs index 2d16c615..768009b5 100644 --- a/server/src/engine/storage/common_encoding/r1/obj.rs +++ b/server/src/engine/storage/common_encoding/r1/obj.rs @@ -43,6 +43,7 @@ use { }, util::{compiler::TaggedEnum, EndianQW}, }, + std::marker::PhantomData, }; /* @@ -318,10 +319,10 @@ impl FieldMD { } } -pub struct FieldRef<'a>(&'a Field); +pub struct FieldRef<'a>(PhantomData<&'a Field>); impl<'a> From<&'a Field> for FieldRef<'a> { - fn from(f: &'a Field) -> Self { - Self(f) + fn from(_: &'a Field) -> Self { + Self(PhantomData) } } impl<'a> PersistObject for FieldRef<'a> { diff --git a/server/src/engine/storage/v1/raw/batch_jrnl/persist.rs b/server/src/engine/storage/v1/raw/batch_jrnl/persist.rs index 0811406c..0d5eb11c 100644 --- a/server/src/engine/storage/v1/raw/batch_jrnl/persist.rs +++ b/server/src/engine/storage/v1/raw/batch_jrnl/persist.rs @@ -53,7 +53,7 @@ impl DataBatchPersistDriver { if slf.fsynced_write(&[MARKER_BATCH_CLOSED]).is_ok() { return Ok(()); } else { - return Err(StorageError::DataBatchCloseError.into()); + return Err(StorageError::V1DataBatchRuntimeCloseError.into()); } } } diff --git a/server/src/engine/storage/v1/raw/batch_jrnl/restore.rs b/server/src/engine/storage/v1/raw/batch_jrnl/restore.rs index 91da0550..e6ed84d0 100644 --- a/server/src/engine/storage/v1/raw/batch_jrnl/restore.rs +++ b/server/src/engine/storage/v1/raw/batch_jrnl/restore.rs @@ -181,7 +181,7 @@ impl DataBatchRestoreDriver { } } // nope, this is a corrupted file - Err(StorageError::DataBatchRestoreCorruptedBatchFile.into()) + Err(StorageError::V1DataBatchDecodeCorruptedBatchFile.into()) } fn handle_reopen_is_actual_close(&mut self) -> RuntimeResult { if self.f.is_eof() { @@ -194,7 +194,7 @@ impl DataBatchRestoreDriver { Ok(false) } else { // that's just a nice bug - Err(StorageError::DataBatchRestoreCorruptedBatchFile.into()) + Err(StorageError::V1DataBatchDecodeCorruptedBatchFile.into()) } } } @@ -301,7 +301,7 @@ impl DataBatchRestoreDriver { // we must read the batch termination signature let b = self.f.read_byte()?; if b != MARKER_END_OF_BATCH { - return Err(StorageError::DataBatchRestoreCorruptedBatch.into()); + return Err(StorageError::V1DataBatchDecodeCorruptedBatch.into()); } } // read actual commit @@ -314,7 +314,7 @@ impl DataBatchRestoreDriver { if actual_checksum == u64::from_le_bytes(hardcoded_checksum) { Ok(actual_commit) } else { - Err(StorageError::DataBatchRestoreCorruptedBatch.into()) + Err(StorageError::V1DataBatchDecodeCorruptedBatch.into()) } } fn read_batch(&mut self) -> RuntimeResult { @@ -334,7 +334,7 @@ impl DataBatchRestoreDriver { } _ => { // this is the only singular byte that is expected to be intact. If this isn't intact either, I'm sorry - return Err(StorageError::DataBatchRestoreCorruptedBatch.into()); + return Err(StorageError::V1DataBatchDecodeCorruptedBatch.into()); } } // decode batch start block @@ -378,7 +378,7 @@ impl DataBatchRestoreDriver { this_col_cnt -= 1; } if this_col_cnt != 0 { - return Err(StorageError::DataBatchRestoreCorruptedEntry.into()); + return Err(StorageError::V1DataBatchDecodeCorruptedEntry.into()); } if change_type == 1 { this_batch.push(DecodedBatchEvent::new( @@ -396,7 +396,7 @@ impl DataBatchRestoreDriver { processed_in_this_batch += 1; } _ => { - return Err(StorageError::DataBatchRestoreCorruptedBatch.into()); + return Err(StorageError::V1DataBatchDecodeCorruptedBatch.into()); } } } @@ -413,7 +413,7 @@ impl DataBatchRestoreDriver { if let [MARKER_RECOVERY_EVENT] = buf { return Ok(()); } - Err(StorageError::DataBatchRestoreCorruptedBatch.into()) + Err(StorageError::V1DataBatchDecodeCorruptedBatch.into()) } fn read_start_batch_block(&mut self) -> RuntimeResult { let pk_tag = self.f.read_byte()?; @@ -463,7 +463,7 @@ impl BatchStartBlock { impl DataBatchRestoreDriver { fn decode_primary_key(&mut self, pk_type: u8) -> RuntimeResult { let Some(pk_type) = TagUnique::try_from_raw(pk_type) else { - return Err(StorageError::DataBatchRestoreCorruptedEntry.into()); + return Err(StorageError::V1DataBatchDecodeCorruptedEntry.into()); }; Ok(match pk_type { TagUnique::SignedInt | TagUnique::UnsignedInt => { @@ -479,7 +479,7 @@ impl DataBatchRestoreDriver { self.f.tracked_read(&mut data)?; if pk_type == TagUnique::Str { if core::str::from_utf8(&data).is_err() { - return Err(StorageError::DataBatchRestoreCorruptedEntry.into()); + return Err(StorageError::V1DataBatchDecodeCorruptedEntry.into()); } } unsafe { @@ -496,7 +496,7 @@ impl DataBatchRestoreDriver { } fn decode_cell(&mut self) -> RuntimeResult { let Some(dscr) = StorageCellTypeID::try_from_raw(self.f.read_byte()?) else { - return Err(StorageError::DataBatchRestoreCorruptedEntry.into()); + return Err(StorageError::V1DataBatchDecodeCorruptedEntry.into()); }; unsafe { cell::decode_element::(&mut self.f, dscr) } .map_err(|e| e.0) @@ -516,7 +516,7 @@ impl From for ErrorHack { } impl From<()> for ErrorHack { fn from(_: ()) -> Self { - Self(StorageError::DataBatchRestoreCorruptedEntry.into()) + Self(StorageError::V1DataBatchDecodeCorruptedEntry.into()) } } diff --git a/server/src/engine/storage/v1/raw/journal/raw.rs b/server/src/engine/storage/v1/raw/journal/raw.rs index 1c0d42d7..eed6e360 100644 --- a/server/src/engine/storage/v1/raw/journal/raw.rs +++ b/server/src/engine/storage/v1/raw/journal/raw.rs @@ -215,7 +215,7 @@ impl JournalReader { } match entry_metadata .event_source_marker() - .ok_or(StorageError::JournalLogEntryCorrupted)? + .ok_or(StorageError::V1JournalDecodeLogEntryCorrupted)? { EventSourceMarker::ServerStandard => {} EventSourceMarker::DriverClosed => { @@ -230,7 +230,7 @@ impl JournalReader { EventSourceMarker::DriverReopened | EventSourceMarker::RecoveryReverseLastJournal => { // these two are only taken in close and error paths (respectively) so we shouldn't see them here; this is bad // two special directives in the middle of nowhere? incredible - return Err(StorageError::JournalCorrupted.into()); + return Err(StorageError::V1JournalDecodeCorrupted.into()); } } // read payload @@ -263,10 +263,10 @@ impl JournalReader { Ok(()) } else { // FIXME(@ohsayan): tolerate loss in this directive too - Err(StorageError::JournalCorrupted.into()) + Err(StorageError::V1JournalDecodeCorrupted.into()) } } else { - Err(StorageError::JournalCorrupted.into()) + Err(StorageError::V1JournalDecodeCorrupted.into()) } } #[cold] // FIXME(@ohsayan): how bad can prod systems be? (clue: pretty bad, so look for possible changes) @@ -279,7 +279,7 @@ impl JournalReader { self.__record_read_bytes(JournalEntryMetadata::SIZE); // FIXME(@ohsayan): don't assume read length? let mut entry_buf = [0u8; JournalEntryMetadata::SIZE]; if self.log_file.read_buffer(&mut entry_buf).is_err() { - return Err(StorageError::JournalCorrupted.into()); + return Err(StorageError::V1JournalDecodeCorrupted.into()); } let entry = JournalEntryMetadata::decode(entry_buf); let okay = (entry.event_id == self.evid as u128) @@ -290,7 +290,7 @@ impl JournalReader { if okay { return Ok(()); } else { - Err(StorageError::JournalCorrupted.into()) + Err(StorageError::V1JournalDecodeCorrupted.into()) } } /// Read and apply all events in the given log file to the global state, returning the (open file, last event ID) @@ -305,7 +305,7 @@ impl JournalReader { if slf.closed { Ok((slf.log_file.downgrade_reader(), slf.evid)) } else { - Err(StorageError::JournalCorrupted.into()) + Err(StorageError::V1JournalDecodeCorrupted.into()) } } } diff --git a/server/src/engine/storage/v1/raw/sysdb.rs b/server/src/engine/storage/v1/raw/sysdb.rs index 3fa16deb..c3c4e27a 100644 --- a/server/src/engine/storage/v1/raw/sysdb.rs +++ b/server/src/engine/storage/v1/raw/sysdb.rs @@ -42,7 +42,7 @@ fn rkey( ) -> RuntimeResult { match d.remove(key).map(transform) { Some(Some(k)) => Ok(k), - _ => Err(StorageError::SysDBCorrupted.into()), + _ => Err(StorageError::V1SysDBDecodeCorrupted.into()), } } @@ -95,14 +95,14 @@ impl RestoredSystemDatabase { let mut userdata = userdata .into_data() .and_then(Datacell::into_list) - .ok_or(StorageError::SysDBCorrupted)?; + .ok_or(StorageError::V1SysDBDecodeCorrupted)?; if userdata.len() != 1 { - return Err(StorageError::SysDBCorrupted.into()); + return Err(StorageError::V1SysDBDecodeCorrupted.into()); } let user_password = userdata .remove(0) .into_bin() - .ok_or(StorageError::SysDBCorrupted)?; + .ok_or(StorageError::V1SysDBDecodeCorrupted)?; loaded_users.insert(username, user_password.into_boxed_slice()); } // load sys data @@ -117,7 +117,7 @@ impl RestoredSystemDatabase { & sys_store.is_empty() & loaded_users.contains_key(SystemDatabase::ROOT_ACCOUNT)) { - return Err(StorageError::SysDBCorrupted.into()); + return Err(StorageError::V1SysDBDecodeCorrupted.into()); } Ok(Self::new(loaded_users, sc, sv)) } diff --git a/server/src/engine/storage/v2/impls/gns_log.rs b/server/src/engine/storage/v2/impls/gns_log.rs index 6842cdd0..878e828f 100644 --- a/server/src/engine/storage/v2/impls/gns_log.rs +++ b/server/src/engine/storage/v2/impls/gns_log.rs @@ -34,7 +34,7 @@ use { core::GNSData, storage::{ common_encoding::r1::impls::gns::GNSEvent, - v2::raw::journal::{self, EventLogDriver, JournalAdapterEvent}, + v2::raw::journal::{self, EventLogDriver, JournalAdapterEvent, JournalSettings}, }, txn::gns::{ model::{ @@ -61,11 +61,15 @@ pub struct GNSEventLog; impl GNSDriver { const FILE_PATH: &'static str = "gns.db-tlog"; - pub fn open_gns_with_name(name: &str, gs: &GNSData) -> RuntimeResult { - journal::open_journal(name, gs) + pub fn open_gns_with_name( + name: &str, + gs: &GNSData, + settings: JournalSettings, + ) -> RuntimeResult { + journal::open_journal(name, gs, settings) } - pub fn open_gns(gs: &GNSData) -> RuntimeResult { - Self::open_gns_with_name(Self::FILE_PATH, gs) + pub fn open_gns(gs: &GNSData, settings: JournalSettings) -> RuntimeResult { + Self::open_gns_with_name(Self::FILE_PATH, gs, settings) } pub fn create_gns_with_name(name: &str) -> RuntimeResult { journal::create_journal(name) diff --git a/server/src/engine/storage/v2/impls/mdl_journal.rs b/server/src/engine/storage/v2/impls/mdl_journal.rs index 6870eda3..59af1faa 100644 --- a/server/src/engine/storage/v2/impls/mdl_journal.rs +++ b/server/src/engine/storage/v2/impls/mdl_journal.rs @@ -46,7 +46,7 @@ use { v2::raw::{ journal::{ self, BatchAdapter, BatchAdapterSpec, BatchDriver, JournalAdapterEvent, - RawJournalAdapter, + JournalSettings, RawJournalAdapter, }, spec::ModelDataBatchAofV1, }, @@ -66,8 +66,12 @@ use { pub type ModelDriver = BatchDriver; impl ModelDriver { - pub fn open_model_driver(mdl: &ModelData, model_data_file_path: &str) -> RuntimeResult { - journal::open_journal(model_data_file_path, mdl) + pub fn open_model_driver( + mdl: &ModelData, + model_data_file_path: &str, + settings: JournalSettings, + ) -> RuntimeResult { + journal::open_journal(model_data_file_path, mdl, settings) } /// Create a new event log pub fn create_model_driver(model_data_file_path: &str) -> RuntimeResult { @@ -449,7 +453,7 @@ impl BatchAdapterSpec for ModelDataAdapter { BatchType::Standard => {} } let pk_tag = TagUnique::try_from_raw(f.read_block().map(|[b]| b)?) - .ok_or(StorageError::RawJournalCorrupted)?; + .ok_or(StorageError::InternalDecodeStructureIllegalData)?; let schema_version = u64::from_le_bytes(f.read_block()?); let column_count = u64::from_le_bytes(f.read_block()?); Ok(BatchMetadata { @@ -656,7 +660,7 @@ mod restore_impls { f.read(&mut data)?; if pk_type == TagUnique::Str { if core::str::from_utf8(&data).is_err() { - return Err(StorageError::DataBatchRestoreCorruptedEntry.into()); + return Err(StorageError::InternalDecodeStructureCorruptedPayload.into()); } } unsafe { @@ -679,7 +683,7 @@ mod restore_impls { let mut this_col_cnt = batch_info.column_count; while this_col_cnt != 0 { let Some(dscr) = StorageCellTypeID::try_from_raw(f.read_block().map(|[b]| b)?) else { - return Err(StorageError::DataBatchRestoreCorruptedEntry.into()); + return Err(StorageError::InternalDecodeStructureIllegalData.into()); }; let cell = unsafe { cell::decode_element::(f, dscr) }.map_err(|e| e.0)?; row.push(cell); @@ -705,7 +709,7 @@ mod restore_impls { } impl From<()> for ErrorHack { fn from(_: ()) -> Self { - Self(StorageError::DataBatchRestoreCorruptedEntry.into()) + Self(StorageError::InternalDecodeStructureCorrupted.into()) } } impl<'a> DataSource for TrackedReaderContext<'a, ModelDataBatchAofV1> { diff --git a/server/src/engine/storage/v2/mod.rs b/server/src/engine/storage/v2/mod.rs index 8d3bae33..020ea18b 100644 --- a/server/src/engine/storage/v2/mod.rs +++ b/server/src/engine/storage/v2/mod.rs @@ -25,7 +25,10 @@ */ use { - self::impls::mdl_journal::{BatchStats, FullModel}, + self::{ + impls::mdl_journal::{BatchStats, FullModel}, + raw::journal::JournalSettings, + }, super::{common::interface::fs::FileSystem, v1, SELoaded}, crate::engine::{ config::Configuration, @@ -120,15 +123,18 @@ pub fn initialize_new(config: &Configuration) -> RuntimeResult { pub fn restore(cfg: &Configuration) -> RuntimeResult { let gns = GNSData::empty(); context::set_dmsg("loading gns"); - let mut gns_driver = impls::gns_log::GNSDriver::open_gns(&gns)?; + let mut gns_driver = impls::gns_log::GNSDriver::open_gns(&gns, JournalSettings::default())?; for (id, model) in gns.idx_models().write().iter_mut() { let model_data = model.data(); let space_uuid = gns.idx().read().get(id.space()).unwrap().get_uuid(); let model_data_file_path = paths_v1::model_path(id.space(), space_uuid, id.entity(), model_data.get_uuid()); context::set_dmsg(format!("loading model driver in {model_data_file_path}")); - let model_driver = - impls::mdl_journal::ModelDriver::open_model_driver(model_data, &model_data_file_path)?; + let model_driver = impls::mdl_journal::ModelDriver::open_model_driver( + model_data, + &model_data_file_path, + JournalSettings::default(), + )?; model.driver().initialize_model_driver(model_driver); unsafe { // UNSAFE(@ohsayan): all pieces of data are upgraded by now, so vacuum diff --git a/server/src/engine/storage/v2/raw/journal/mod.rs b/server/src/engine/storage/v2/raw/journal/mod.rs index 230849d3..c363f918 100644 --- a/server/src/engine/storage/v2/raw/journal/mod.rs +++ b/server/src/engine/storage/v2/raw/journal/mod.rs @@ -47,7 +47,8 @@ mod raw; #[cfg(test)] mod tests; pub use raw::{ - create_journal, open_journal, RawJournalAdapter, RawJournalAdapterEvent as JournalAdapterEvent, + create_journal, open_journal, JournalSettings, RawJournalAdapter, + RawJournalAdapterEvent as JournalAdapterEvent, }; /* @@ -136,7 +137,7 @@ impl RawJournalAdapter for EventLogAdapter { this_checksum.update(&plen.to_le_bytes()); this_checksum.update(&pl); if this_checksum.finish() != expected_checksum { - return Err(StorageError::RawJournalCorrupted.into()); + return Err(StorageError::RawJournalDecodeCorruptionInBatchMetadata.into()); } ::DECODE_DISPATCH [<::EventMeta as TaggedEnum>::dscr_u64(&meta) as usize]( @@ -165,11 +166,15 @@ pub struct BatchAdapter(PhantomData); #[cfg(test)] impl BatchAdapter { /// Open a new batch journal - pub fn open(name: &str, gs: &BA::GlobalState) -> RuntimeResult> + pub fn open( + name: &str, + gs: &BA::GlobalState, + settings: JournalSettings, + ) -> RuntimeResult> where BA::Spec: FileSpecV1, { - raw::open_journal::>(name, gs) + raw::open_journal::>(name, gs, settings) } /// Create a new batch journal pub fn create(name: &str) -> RuntimeResult> @@ -278,7 +283,7 @@ impl RawJournalAdapter for BatchAdapter { let event_type = <::EventType as TaggedEnum>::try_from_raw( f.read_block().map(|[b]| b)?, ) - .ok_or(StorageError::RawJournalCorrupted)?; + .ok_or(StorageError::InternalDecodeStructureIllegalData)?; // is this an early exit marker? if so, exit if ::is_early_exit(&event_type) { break; @@ -299,7 +304,7 @@ impl RawJournalAdapter for BatchAdapter { // finish applying batch BA::finish(batch_state, batch_md, gs)?; } else { - return Err(StorageError::RawJournalCorrupted.into()); + return Err(StorageError::RawJournalDecodeBatchContentsMismatch.into()); } } // and finally, verify checksum @@ -308,7 +313,7 @@ impl RawJournalAdapter for BatchAdapter { if real_checksum == stored_checksum { Ok(()) } else { - Err(StorageError::RawJournalCorrupted.into()) + Err(StorageError::RawJournalDecodeBatchIntegrityFailure.into()) } } } diff --git a/server/src/engine/storage/v2/raw/journal/raw/mod.rs b/server/src/engine/storage/v2/raw/journal/raw/mod.rs index 6a53516d..9736b026 100644 --- a/server/src/engine/storage/v2/raw/journal/raw/mod.rs +++ b/server/src/engine/storage/v2/raw/journal/raw/mod.rs @@ -30,7 +30,7 @@ mod tests; use { crate::{ engine::{ - error::StorageError, + error::{ErrorKind, StorageError}, mem::unsafe_apis::memcpy, storage::common::{ checksum::SCrc64, @@ -44,7 +44,7 @@ use { util::compiler::TaggedEnum, }, core::fmt, - std::ops::Range, + std::{io::ErrorKind as IoErrorKind, ops::Range}, }; /* @@ -67,12 +67,13 @@ where pub fn open_journal( log_path: &str, gs: &J::GlobalState, + settings: JournalSettings, ) -> RuntimeResult> where J::Spec: FileSpecV1, { let log = SdssFile::::open(log_path)?; - let (initializer, file) = RawJournalReader::::scroll(log, gs)?; + let (initializer, file) = RawJournalReader::::scroll(log, gs, settings)?; RawJournalWriter::new(initializer, file) } @@ -640,6 +641,22 @@ pub struct RawJournalReader { last_txn_offset: u64, last_txn_checksum: u64, stats: JournalStats, + _settings: JournalSettings, +} + +#[derive(Debug)] +pub struct JournalSettings {} + +impl Default for JournalSettings { + fn default() -> Self { + Self::new() + } +} + +impl JournalSettings { + pub fn new() -> Self { + Self {} + } } #[derive(Debug)] @@ -658,28 +675,33 @@ impl JournalStats { } impl RawJournalReader { - pub fn scroll( + fn scroll( file: SdssFile<::Spec>, gs: &J::GlobalState, + settings: JournalSettings, ) -> RuntimeResult<(JournalInitializer, SdssFile)> { let reader = TrackedReader::with_cursor( file, <::Spec as FileSpecV1>::SIZE as u64, )?; jtrace_reader!(Initialized); - let mut me = Self::new(reader, 0, 0, 0, 0); + let mut me = Self::new(reader, 0, 0, 0, 0, settings); loop { - if me._apply_next_event_and_stop(gs)? { - jtrace_reader!(Completed); - let initializer = JournalInitializer::new( - me.tr.cursor(), - me.tr.checksum(), - me.txn_id, - // NB: the last txn offset is important because it indicates that the log is new - me.last_txn_offset, - ); - let file = me.tr.into_inner(); - return Ok((initializer, file)); + match me._apply_next_event_and_stop(gs) { + Ok(true) => { + jtrace_reader!(Completed); + let initializer = JournalInitializer::new( + me.tr.cursor(), + me.tr.checksum(), + me.txn_id, + // NB: the last txn offset is important because it indicates that the log is new + me.last_txn_offset, + ); + let file = me.tr.into_inner(); + return Ok((initializer, file)); + } + Ok(false) => {} + Err(e) => return Err(e), } } } @@ -689,6 +711,7 @@ impl RawJournalReader { last_txn_id: u64, last_txn_offset: u64, last_txn_checksum: u64, + settings: JournalSettings, ) -> Self { Self { tr: reader, @@ -697,6 +720,7 @@ impl RawJournalReader { last_txn_offset, last_txn_checksum, stats: JournalStats::new(), + _settings: settings, } } fn __refresh_known_txn(me: &mut Self) { @@ -707,6 +731,47 @@ impl RawJournalReader { } } +#[derive(Debug, PartialEq)] +pub enum JournalRepairMode { + Simple, +} + +impl RawJournalReader { + pub fn repair( + file: SdssFile<::Spec>, + gs: &J::GlobalState, + settings: JournalSettings, + repair_mode: JournalRepairMode, + ) -> RuntimeResult<()> { + match repair_mode { + JournalRepairMode::Simple => {} + } + match Self::scroll(file, gs, settings) { + Ok(_) => { + // no error detected + return Ok(()); + } + Err(e) => { + // now it's our task to determine exactly what happened + match e.kind() { + ErrorKind::IoError(io) => match io.kind() { + IoErrorKind::UnexpectedEof => { + // this is the only kind of error that we can actually repair since it indicates that a part of the + // file is "missing" + } + _ => return Err(e), + }, + ErrorKind::Storage(_) => todo!(), + ErrorKind::Txn(_) => todo!(), + ErrorKind::Other(_) => todo!(), + ErrorKind::Config(_) => unreachable!(), + } + } + } + todo!() + } +} + impl RawJournalReader { fn _apply_next_event_and_stop(&mut self, gs: &J::GlobalState) -> RuntimeResult { let txn_id = u128::from_le_bytes(self.tr.read_block()?); @@ -716,7 +781,7 @@ impl RawJournalReader { expected: self.txn_id, current: txn_id as u64 }); - return Err(StorageError::RawJournalEventCorruptedMetadata.into()); + return Err(StorageError::RawJournalDecodeEventCorruptedMetadata.into()); } jtrace_reader!(AttemptingEvent(txn_id as u64)); // check for a server event @@ -740,7 +805,7 @@ impl RawJournalReader { Err(e) => return Err(e), } } - None => return Err(StorageError::RawJournalEventCorruptedMetadata.into()), + None => return Err(StorageError::RawJournalDecodeEventCorruptedMetadata.into()), } } return self.handle_close(txn_id, meta); @@ -772,9 +837,9 @@ impl RawJournalReader { .. }) => { jtrace_reader!(ErrExpectedCloseGotReopen); - return Err(StorageError::RawJournalInvalidEvent.into()); + return Err(StorageError::RawJournalDecodeInvalidEvent.into()); } - None => return Err(StorageError::RawJournalEventCorrupted.into()), + None => return Err(StorageError::RawJournalDecodeEventCorruptedPayload.into()), }; jtrace_reader!(DriverEventExpectedCloseGotClose); // a driver closed event; we've checked integrity, but we must check the field values @@ -787,7 +852,7 @@ impl RawJournalReader { jtrace_reader!(DriverEventInvalidMetadata); // either the block is corrupted or the data we read is corrupted; either way, // we're going to refuse to read this - return Err(StorageError::RawJournalCorrupted.into()); + return Err(StorageError::RawJournalDecodeEventCorruptedMetadata.into()); } self.stats.driver_events += 1; // update @@ -807,7 +872,8 @@ impl RawJournalReader { let event_block = self.tr.read_block::<{ DriverEvent::FULL_EVENT_SIZE }>()?; let reopen_event = match DriverEvent::decode(event_block) { Some(ev) if ev.event == DriverEventKind::Reopened => ev, - None | Some(_) => return Err(StorageError::RawJournalEventCorrupted.into()), + Some(_) => return Err(StorageError::RawJournalDecodeInvalidEvent.into()), + None => return Err(StorageError::RawJournalDecodeEventCorruptedPayload.into()), }; jtrace_reader!(DriverEventExpectingReopenGotReopen); let valid_meta = okay! { @@ -824,7 +890,7 @@ impl RawJournalReader { Ok(false) } else { jtrace_reader!(ErrInvalidReopenMetadata); - Err(StorageError::RawJournalCorrupted.into()) + Err(StorageError::RawJournalDecodeEventCorruptedMetadata.into()) } } } diff --git a/server/src/engine/storage/v2/raw/journal/raw/tests.rs b/server/src/engine/storage/v2/raw/journal/raw/tests.rs index 94d46221..341494aa 100644 --- a/server/src/engine/storage/v2/raw/journal/raw/tests.rs +++ b/server/src/engine/storage/v2/raw/journal/raw/tests.rs @@ -35,7 +35,7 @@ use { storage::{ common::sdss::sdss_r1::rw::TrackedReader, v2::raw::{ - journal::raw::{JournalReaderTraceEvent, JournalWriterTraceEvent}, + journal::raw::{JournalReaderTraceEvent, JournalSettings, JournalWriterTraceEvent}, spec::SystemDatabaseV1, }, }, @@ -177,7 +177,7 @@ impl RawJournalAdapter for SimpleDBJournal { file.tracked_read(&mut keybuf)?; match String::from_utf8(keybuf) { Ok(k) => gs.data.borrow_mut().push(k), - Err(_) => return Err(StorageError::RawJournalEventCorrupted.into()), + Err(_) => return Err(StorageError::RawJournalDecodeEventCorruptedPayload.into()), } } EventMeta::Clear => gs.data.borrow_mut().clear(), @@ -219,7 +219,12 @@ fn journal_open_close() { } { // second boot - let mut j = open_journal::(JOURNAL_NAME, &SimpleDB::new()).unwrap(); + let mut j = open_journal::( + JOURNAL_NAME, + &SimpleDB::new(), + JournalSettings::default(), + ) + .unwrap(); assert_eq!( super::obtain_trace(), intovec![ @@ -258,7 +263,12 @@ fn journal_open_close() { } { // third boot - let mut j = open_journal::(JOURNAL_NAME, &SimpleDB::new()).unwrap(); + let mut j = open_journal::( + JOURNAL_NAME, + &SimpleDB::new(), + JournalSettings::default(), + ) + .unwrap(); assert_eq!( super::obtain_trace(), intovec![ @@ -336,7 +346,7 @@ fn journal_with_server_single_event() { { let db = SimpleDB::new(); // second boot - let mut j = open_journal::(JOURNAL_NAME, &db) + let mut j = open_journal::(JOURNAL_NAME, &db, JournalSettings::default()) .set_dmsg_fn(|| format!("{:?}", super::obtain_trace())) .unwrap(); assert_eq!(db.data().len(), 1); @@ -385,7 +395,8 @@ fn journal_with_server_single_event() { { // third boot let db = SimpleDB::new(); - let mut j = open_journal::(JOURNAL_NAME, &db).unwrap(); + let mut j = + open_journal::(JOURNAL_NAME, &db, JournalSettings::default()).unwrap(); assert_eq!(db.data().len(), 1); assert_eq!(db.data()[0], "hello world"); assert_eq!( @@ -453,7 +464,8 @@ fn multi_boot() { } { let mut db = SimpleDB::new(); - let mut j = open_journal::("multiboot", &db).unwrap(); + let mut j = + open_journal::("multiboot", &db, JournalSettings::default()).unwrap(); assert_eq!(db.data().as_ref(), vec!["key_a".to_string()]); db.clear(&mut j).unwrap(); db.push(&mut j, "myfinkey").unwrap(); @@ -461,7 +473,8 @@ fn multi_boot() { } { let db = SimpleDB::new(); - let mut j = open_journal::("multiboot", &db).unwrap(); + let mut j = + open_journal::("multiboot", &db, JournalSettings::default()).unwrap(); assert_eq!(db.data().as_ref(), vec!["myfinkey".to_string()]); RawJournalWriter::close_driver(&mut j).unwrap(); } diff --git a/server/src/engine/storage/v2/raw/journal/tests.rs b/server/src/engine/storage/v2/raw/journal/tests.rs index bc2e8e78..52c58859 100644 --- a/server/src/engine/storage/v2/raw/journal/tests.rs +++ b/server/src/engine/storage/v2/raw/journal/tests.rs @@ -30,8 +30,9 @@ use { super::{ - raw::RawJournalAdapterEvent, BatchAdapter, BatchAdapterSpec, BatchDriver, DispatchFn, - EventLogAdapter, EventLogDriver, EventLogSpec, + raw::{JournalSettings, RawJournalAdapterEvent}, + BatchAdapter, BatchAdapterSpec, BatchDriver, DispatchFn, EventLogAdapter, EventLogDriver, + EventLogSpec, }, crate::{ engine::{ @@ -112,13 +113,13 @@ impl EventLogSpec for TestDBAdapter { const DECODE_DISPATCH: Self::DecodeDispatch = [ |db, payload| { if payload.len() < sizeof!(u64) { - Err(StorageError::RawJournalCorrupted.into()) + Err(StorageError::RawJournalDecodeEventCorruptedMetadata.into()) } else { let length = u64::from_le_bytes(unsafe { unsafe_apis::memcpy(&payload[..sizeof!(u64)]) }); let payload = &payload[sizeof!(u64)..]; if payload.len() as u64 != length { - Err(StorageError::RawJournalCorrupted.into()) + Err(StorageError::RawJournalDecodeEventCorruptedPayload.into()) } else { let string = String::from_utf8(payload.to_owned()).unwrap(); db._mut().push(string); @@ -172,7 +173,7 @@ fn open_log() -> ( super::raw::RawJournalWriter>, ) { let db = TestDB::default(); - let log = open_journal("jrnl", &db).unwrap(); + let log = open_journal("jrnl", &db, JournalSettings::default()).unwrap(); (db, log) } @@ -381,7 +382,7 @@ fn batch_simple() { } { let db = BatchDB::new(); - let mut batch_drv = BatchAdapter::open("mybatch", &db).unwrap(); + let mut batch_drv = BatchAdapter::open("mybatch", &db, JournalSettings::default()).unwrap(); db.push(&mut batch_drv, "key3").unwrap(); db.push(&mut batch_drv, "key4").unwrap(); assert_eq!(db._ref().data, ["key1", "key2", "key3", "key4"]); @@ -389,7 +390,7 @@ fn batch_simple() { } { let db = BatchDB::new(); - let mut batch_drv = BatchAdapter::open("mybatch", &db).unwrap(); + let mut batch_drv = BatchAdapter::open("mybatch", &db, JournalSettings::default()).unwrap(); db.push(&mut batch_drv, "key5").unwrap(); db.push(&mut batch_drv, "key6").unwrap(); assert_eq!( @@ -400,7 +401,9 @@ fn batch_simple() { } { let db = BatchDB::new(); - let mut batch_drv = BatchAdapter::::open("mybatch", &db).unwrap(); + let mut batch_drv = + BatchAdapter::::open("mybatch", &db, JournalSettings::default()) + .unwrap(); assert_eq!( db._ref().data, ["key1", "key2", "key3", "key4", "key5", "key6"] diff --git a/sky-macros/src/dbtest.rs b/sky-macros/src/dbtest.rs index 52c882a7..68cfef90 100644 --- a/sky-macros/src/dbtest.rs +++ b/sky-macros/src/dbtest.rs @@ -24,11 +24,10 @@ * */ -use quote::quote; - use { crate::util::{self, AttributeKind}, proc_macro::TokenStream, + quote::quote, std::collections::HashMap, syn::{parse_macro_input, AttributeArgs, ItemFn}, }; diff --git a/sky-macros/src/util.rs b/sky-macros/src/util.rs index 540dcd7a..46b49118 100644 --- a/sky-macros/src/util.rs +++ b/sky-macros/src/util.rs @@ -24,8 +24,10 @@ * */ -use proc_macro2::Ident; -use syn::{Lit, Meta, MetaNameValue, NestedMeta, Path}; +use { + proc_macro2::Ident, + syn::{Lit, Meta, MetaNameValue, NestedMeta, Path}, +}; pub enum AttributeKind { Lit(Lit),