diff --git a/CHANGELOG.md b/CHANGELOG.md index 708eedfb..f02ab680 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,10 @@ All changes in this project will be noted in this file. ## Version 0.8.1 +### Additions + +- Added support for manual repair with the `skyd repair` command + ### Fixes - Fixed migration from v1 SE (released with v0.8.0-beta) to v2 SE (released in v0.8.0) diff --git a/Cargo.lock b/Cargo.lock index 63947817..ca9a799b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -30,9 +30,9 @@ dependencies = [ [[package]] name = "aho-corasick" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2969dcb958b36655471fc61f7e416fa76033bdd4bfed0678d8fee1e2d07a1f0" +checksum = "8e60d3430d3a69478ad0993f19238d2df97c507009a52b3c10addcd7f6bcb916" dependencies = [ "memchr", ] @@ -102,13 +102,13 @@ dependencies = [ [[package]] name = "async-trait" -version = "0.1.77" +version = "0.1.78" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c980ee35e870bd1a4d2c8294d4c04d0499e67bca1e4b5cefcc693c2fa00caea9" +checksum = "461abc97219de0eaaf81fe3ef974a540158f3d079c2ab200f891f1a2ef201e85" dependencies = [ "proc-macro2", "quote", - "syn 2.0.52", + "syn 2.0.53", ] [[package]] @@ -119,9 +119,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] name = "backtrace" -version = "0.3.69" +version = "0.3.71" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2089b7e3f35b9dd2d0ed921ead4f6d318c27680d4a5bd167b3ee120edb105837" +checksum = "26b05800d2e817c8b3b4b54abd461726265fa9789ae34330622f2db9ee696f9d" dependencies = [ "addr2line", "cc", @@ -165,9 +165,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bitflags" -version = "2.4.2" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed570934406eb16438a4e976b1b4500774099c13b8cb96eec99f620f05090ddf" +checksum = "cf4b9d6a944f767f8e5e0db018570623c85f3d925ac718db4e06d0187adb21c1" [[package]] name = "block-buffer" @@ -202,9 +202,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.5.0" +version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223" +checksum = "514de17de45fdb8dc022b1a7975556c53c86f9f0aa5f534b98977b171857c2c9" [[package]] name = "bzip2" @@ -275,9 +275,9 @@ dependencies = [ [[package]] name = "clipboard-win" -version = "5.2.0" +version = "5.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "12f9a0700e0127ba15d1d52dd742097f821cd9c65939303a44d970465040a297" +checksum = "d517d4b86184dbb111d3556a10f1c8a04da7428d2987bf1081602bf11c3aa9ee" dependencies = [ "error-code", ] @@ -373,7 +373,7 @@ version = "0.27.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f476fe445d41c9e991fd07515a6f463074b782242ccf4a5b7b1d1012e70824df" dependencies = [ - "bitflags 2.4.2", + "bitflags 2.5.0", "crossterm_winapi", "libc", "mio", @@ -650,9 +650,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.2.5" +version = "2.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b0b929d511467233429c45a44ac1dcaa21ba0f5ba11e4879e6ed28ddb4f9df4" +checksum = "168fb715dda47215e360912c096649d23d58bf392ac62f73919e831745e40f26" dependencies = [ "equivalent", "hashbrown", @@ -812,7 +812,7 @@ version = "0.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ab2156c4fce2f8df6c499cc1c763e4394b7482525bf2a9701c9d79d215f519e4" dependencies = [ - "bitflags 2.4.2", + "bitflags 2.5.0", "cfg-if", "cfg_aliases", "libc", @@ -864,7 +864,7 @@ version = "0.10.64" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "95a0481286a310808298130d22dd1fef0fa571e05a8f44ec801801e84b216b1f" dependencies = [ - "bitflags 2.4.2", + "bitflags 2.5.0", "cfg-if", "foreign-types", "libc", @@ -881,7 +881,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.52", + "syn 2.0.53", ] [[package]] @@ -1085,9 +1085,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.10.3" +version = "1.10.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b62dbe01f0b06f9d8dc7d49e05a0785f153b00b2c227856282f671e0318c9b15" +checksum = "c117dbdfde9c8308975b6a18d71f3f385c89461f7b3fb054288ecf2a2058ba4c" dependencies = [ "aho-corasick", "memchr", @@ -1120,11 +1120,11 @@ checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" [[package]] name = "rustix" -version = "0.38.31" +version = "0.38.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ea3e1a662af26cd7a3ba09c0297a31af215563ecf42817c98df621387f4e949" +checksum = "65e04861e65f21776e67888bfbea442b3642beaa0138fdb1dd7a84a52dffdb89" dependencies = [ - "bitflags 2.4.2", + "bitflags 2.5.0", "errno", "libc", "linux-raw-sys", @@ -1137,7 +1137,7 @@ version = "14.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7803e8936da37efd9b6d4478277f4b2b9bb5cdb37a113e8d63222e58da647e63" dependencies = [ - "bitflags 2.4.2", + "bitflags 2.5.0", "cfg-if", "clipboard-win", "fd-lock", @@ -1223,14 +1223,14 @@ checksum = "7eb0b34b42edc17f6b7cac84a52a1c5f0e1bb2227e997ca9011ea3dd34e8610b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.52", + "syn 2.0.53", ] [[package]] name = "serde_yaml" -version = "0.9.32" +version = "0.9.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8fd075d994154d4a774f95b51fb96bdc2832b0ea48425c92546073816cda1f2f" +checksum = "a0623d197252096520c6f2a5e1171ee436e5af99a5d7caa2891e55e61950e6d9" dependencies = [ "indexmap", "itoa", @@ -1389,9 +1389,9 @@ dependencies = [ [[package]] name = "smallvec" -version = "1.13.1" +version = "1.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6ecd384b10a64542d77071bd64bd7b231f4ed5940fba55e98c3de13824cf3d7" +checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" [[package]] name = "socket2" @@ -1422,9 +1422,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.52" +version = "2.0.53" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b699d15b36d1f02c3e7c69f8ffef53de37aefae075d8488d4ba1a7788d574a07" +checksum = "7383cd0e49fff4b6b90ca5670bfd3e9d6a733b3f90c686605aa7eec8c4996032" dependencies = [ "proc-macro2", "quote", @@ -1489,7 +1489,7 @@ checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.52", + "syn 2.0.53", ] [[package]] @@ -1540,9 +1540,9 @@ checksum = "e51733f11c9c4f72aa0c160008246859e340b00807569a0da0e7a1079b27ba85" [[package]] name = "unsafe-libyaml" -version = "0.2.10" +version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab4c90930b95a82d00dc9e9ac071b4991924390d46cbd0dfe566148667605e4b" +checksum = "673aac59facbab8a9007c7f6108d11f63b603f7cabff99fabf650fea5c32b861" [[package]] name = "utf8parse" @@ -1552,9 +1552,9 @@ checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" [[package]] name = "uuid" -version = "1.7.0" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f00cc9702ca12d3c81455259621e676d0f7251cec66a21e98fe2e9a37db93b2a" +checksum = "a183cf7feeba97b4dd1c0d46788634f6221d87fa961b305bed08c851829efcc0" dependencies = [ "getrandom", "rand", @@ -1563,13 +1563,13 @@ dependencies = [ [[package]] name = "uuid-macro-internal" -version = "1.7.0" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7abb14ae1a50dad63eaa768a458ef43d298cd1bd44951677bd10b732a9ba2a2d" +checksum = "9881bea7cbe687e36c9ab3b778c36cd0487402e270304e8b1296d5085303c1a2" dependencies = [ "proc-macro2", "quote", - "syn 2.0.52", + "syn 2.0.53", ] [[package]] @@ -1611,7 +1611,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.52", + "syn 2.0.53", "wasm-bindgen-shared", ] @@ -1633,7 +1633,7 @@ checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.52", + "syn 2.0.53", "wasm-bindgen-backend", "wasm-bindgen-shared", ] diff --git a/server/Cargo.toml b/server/Cargo.toml index dd757b51..98e71b48 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -16,7 +16,7 @@ libsky = { path = "../libsky" } sky_macros = { path = "../sky-macros" } rcrypt = "0.4.0" # external deps -bytes = "1.5.0" +bytes = "1.6.0" env_logger = "0.11.3" log = "0.4.21" openssl = { version = "0.10.64", features = ["vendored"] } @@ -25,9 +25,9 @@ parking_lot = "0.12.1" serde = { version = "1.0.197", features = ["derive"] } tokio = { version = "1.36.0", features = ["full"] } tokio-openssl = "0.6.4" -uuid = { version = "1.7.0", features = ["v4", "fast-rng", "macro-diagnostics"] } +uuid = { version = "1.8.0", features = ["v4", "fast-rng", "macro-diagnostics"] } crc = "3.0.1" -serde_yaml = "0.9.32" +serde_yaml = "0.9.33" chrono = "0.4.35" [target.'cfg(all(not(target_env = "msvc"), not(miri)))'.dependencies] diff --git a/server/help_text/help b/server/help_text/help index e1a54e75..94b3c79f 100644 --- a/server/help_text/help +++ b/server/help_text/help @@ -13,6 +13,9 @@ Usage: skyd [OPTION]... skyd is the Skytable database server daemon and can be used to serve database requests. +Commands: + repair Check and repair any detected database storage errors + Flags: -h, --help Display this help menu and exit. -v, --version Display the version number and exit. @@ -36,5 +39,7 @@ Notes: - If no `--mode` is provided, we default to `dev` - You must provide `--auth-root-password` to set the default root password - To use TLS, you must provide both `--tlscert` and `--tlskey` + - When you run `repair`, your previous data is backed up in the `backups/` folder. + Restore if needed. For further assistance, refer to the official documentation here: https://docs.skytable.org \ No newline at end of file diff --git a/server/src/engine/config.rs b/server/src/engine/config.rs index 9c1e8ee5..33ba576d 100644 --- a/server/src/engine/config.rs +++ b/server/src/engine/config.rs @@ -652,6 +652,7 @@ pub enum CLIConfigParseReturn { Version, /// We yielded a config YieldedConfig(T), + Repair, } impl CLIConfigParseReturn { @@ -670,10 +671,21 @@ impl CLIConfigParseReturn { pub fn parse_cli_args<'a, T: 'a + AsRef>( src: impl Iterator, ) -> RuntimeResult> { - let mut args_iter = src.into_iter().skip(1); + let mut args_iter = src.into_iter().skip(1).peekable(); let mut cli_args: ParsedRawArgs = HashMap::new(); while let Some(arg) = args_iter.next() { let arg = arg.as_ref(); + if arg == "repair" { + if args_iter.peek().is_none() { + return Ok(CLIConfigParseReturn::Repair); + } else { + return Err(ConfigError::with_src( + ConfigSource::Cli, + ConfigErrorKind::ErrorString("to use `repair`, just run `skyd repair`".into()), + ) + .into()); + } + } if arg == "--help" || arg == "-h" { return Ok(CLIConfigParseReturn::Help); } @@ -978,6 +990,7 @@ pub enum ConfigReturn { HelpMessage(String), /// A configuration that we have fully validated was provided Config(Configuration), + Repair, } impl ConfigReturn { @@ -1105,6 +1118,7 @@ pub fn check_configuration() -> RuntimeResult { libsky::VERSION ))); } + CLIConfigParseReturn::Repair => return Ok(ConfigReturn::Repair), CLIConfigParseReturn::YieldedConfig(cfg) => Some(cfg), }; match cli_args { diff --git a/server/src/engine/core/index/mod.rs b/server/src/engine/core/index/mod.rs index a6df689a..e3964ca4 100644 --- a/server/src/engine/core/index/mod.rs +++ b/server/src/engine/core/index/mod.rs @@ -71,9 +71,13 @@ impl PrimaryIndex { } #[derive(Debug)] -pub struct IndexLatchHandleShared<'t>(parking_lot::RwLockReadGuard<'t, ()>); +pub struct IndexLatchHandleShared<'t> { + _lck: parking_lot::RwLockReadGuard<'t, ()>, +} #[derive(Debug)] -pub struct IndexLatchHandleExclusive<'t>(parking_lot::RwLockWriteGuard<'t, ()>); +pub struct IndexLatchHandleExclusive<'t> { + _lck: parking_lot::RwLockWriteGuard<'t, ()>, +} #[derive(Debug)] struct IndexLatch { @@ -87,9 +91,13 @@ impl IndexLatch { } } fn gl_handle_shared(&self) -> IndexLatchHandleShared { - IndexLatchHandleShared(self.glck.read()) + IndexLatchHandleShared { + _lck: self.glck.read(), + } } fn gl_handle_exclusive(&self) -> IndexLatchHandleExclusive { - IndexLatchHandleExclusive(self.glck.write()) + IndexLatchHandleExclusive { + _lck: self.glck.write(), + } } } diff --git a/server/src/engine/error.rs b/server/src/engine/error.rs index 6997f14d..30e4bdfb 100644 --- a/server/src/engine/error.rs +++ b/server/src/engine/error.rs @@ -169,11 +169,11 @@ enumerate_err! { /// Errors that occur when restoring transactional data pub enum TransactionError { /// corrupted txn payload. has more bytes than expected - DecodeCorruptedPayloadMoreBytes = "txn-payload-unexpected-content", + V1DecodeCorruptedPayloadMoreBytes = "txn-payload-unexpected-content", /// transaction payload is corrupted. has lesser bytes than expected - DecodedUnexpectedEof = "txn-payload-unexpected-eof", + V1DecodedUnexpectedEof = "txn-payload-unexpected-eof", /// unknown transaction operation. usually indicates a corrupted payload - DecodeUnknownTxnOp = "txn-payload-unknown-payload", + V1DecodeUnknownTxnOp = "txn-payload-unknown-payload", /// While restoring a certain item, a non-resolvable conflict was encountered in the global state, because the item was /// already present (when it was expected to not be present) OnRestoreDataConflictAlreadyExists = "txn-payload-conflict-already-exists", @@ -185,19 +185,24 @@ enumerate_err! { } enumerate_err! { - #[derive(Debug, PartialEq)] + #[derive(Debug, PartialEq, Clone, Copy)] /// SDSS based storage engine errors pub enum StorageError { - // header + /* + ---- + SDSS Errors + ---- + These errors are common across all versions + */ /// version mismatch - HeaderDecodeVersionMismatch = "header-version-mismatch", + FileDecodeHeaderVersionMismatch = "header-version-mismatch", /// The entire header is corrupted - HeaderDecodeCorruptedHeader = "header-corrupted", - // journal - /// An entry in the journal is corrupted - JournalLogEntryCorrupted = "journal-entry-corrupted", - /// The structure of the journal is corrupted - JournalCorrupted = "journal-corrupted", + FileDecodeHeaderCorrupted = "header-corrupted", + /* + ---- + Common encoding errors + ---- + */ // internal file structures /// While attempting to decode a structure in an internal segment of a file, the storage engine ran into a possibly irrecoverable error InternalDecodeStructureCorrupted = "structure-decode-corrupted", @@ -205,21 +210,48 @@ enumerate_err! { InternalDecodeStructureCorruptedPayload = "structure-decode-corrupted-payload", /// the data for an internal structure was decoded but is logically invalid InternalDecodeStructureIllegalData = "structure-decode-illegal-data", + /* + ---- + V1 Journal Errors + ---- + */ + /// An entry in the journal is corrupted + V1JournalDecodeLogEntryCorrupted = "journal-entry-corrupted", + /// The structure of the journal is corrupted + V1JournalDecodeCorrupted = "journal-corrupted", /// when attempting to restore a data batch from disk, the batch journal crashed and had a corruption, but it is irrecoverable - DataBatchRestoreCorruptedBatch = "batch-corrupted-batch", + V1DataBatchDecodeCorruptedBatch = "batch-corrupted-batch", /// when attempting to restore a data batch from disk, the driver encountered a corrupted entry - DataBatchRestoreCorruptedEntry = "batch-corrupted-entry", - /// we failed to close the data batch - DataBatchCloseError = "batch-persist-close-failed", + V1DataBatchDecodeCorruptedEntry = "batch-corrupted-entry", /// the data batch file is corrupted - DataBatchRestoreCorruptedBatchFile = "batch-corrupted-file", + V1DataBatchDecodeCorruptedBatchFile = "batch-corrupted-file", /// the system database is corrupted - SysDBCorrupted = "sysdb-corrupted", - // raw journal errors - RawJournalEventCorruptedMetadata = "journal-event-metadata-corrupted", - RawJournalEventCorrupted = "journal-invalid-event", - RawJournalCorrupted = "journal-corrupted", - RawJournalInvalidEvent = "journal-invalid-event-order", - RawJournalRuntimeCriticalLwtHBFail = "journal-lwt-heartbeat-failed", + V1SysDBDecodeCorrupted = "sysdb-corrupted", + /// we failed to close the data batch + V1DataBatchRuntimeCloseError = "batch-persist-close-failed", + /* + ---- + V2 Journal Errors + ---- + */ + /// Journal event metadata corrupted + RawJournalDecodeEventCorruptedMetadata = "journal-event-metadata-corrupted", + /// The event body is corrupted + RawJournalDecodeEventCorruptedPayload = "journal-event-payload-corrupted", + /// batch contents was unexpected (for example, we expected n events but got m events) + RawJournalDecodeBatchContentsMismatch = "journal-batch-unexpected-termination", + /// batch contents was validated and executed but the final integrity check failed + RawJournalDecodeBatchIntegrityFailure = "journal-batch-integrity-check-failed", + /// unexpected order of events + RawJournalDecodeInvalidEvent = "journal-invalid-event-order", + /// corrupted event within a batch + RawJournalDecodeCorruptionInBatchMetadata = "journal-batch-corrupted-event-metadata", + /* + ---- + runtime errors + ---- + */ + RawJournalRuntimeHeartbeatFail = "journal-lwt-heartbeat-failed", + RawJournalRuntimeDirty = "journal-in-dirty-state", } } diff --git a/server/src/engine/fractal/mgr.rs b/server/src/engine/fractal/mgr.rs index 71fb33ce..09924415 100644 --- a/server/src/engine/fractal/mgr.rs +++ b/server/src/engine/fractal/mgr.rs @@ -33,7 +33,7 @@ use { EntityIDRef, }, data::uuid::Uuid, - error::ErrorKind, + error::StorageError, fractal::GlobalInstanceLike, storage::{ safe_interfaces::{paths_v1, StdModelBatch}, @@ -528,9 +528,7 @@ impl FractalMgr { if mdl_driver_.status().is_iffy() { // don't mess this up any further return Err(( - super::error::Error::from(ErrorKind::Other( - "model driver is in dirty state".into(), - )), + super::error::Error::from(StorageError::RawJournalRuntimeDirty), BatchStats::into_inner(BatchStats::new()), )); } diff --git a/server/src/engine/fractal/mod.rs b/server/src/engine/fractal/mod.rs index f153ab75..d1b10b82 100644 --- a/server/src/engine/fractal/mod.rs +++ b/server/src/engine/fractal/mod.rs @@ -37,6 +37,7 @@ use { std::{ fmt, mem::MaybeUninit, + ptr::addr_of_mut, sync::atomic::{AtomicUsize, Ordering}, }, tokio::sync::mpsc::unbounded_channel, @@ -262,16 +263,16 @@ impl Global { .get_rt_stat() .per_mdl_delta_max_size() } - unsafe fn __gref_raw() -> &'static mut MaybeUninit { + unsafe fn __gref_raw() -> *mut MaybeUninit { static mut G: MaybeUninit = MaybeUninit::uninit(); - &mut G + addr_of_mut!(G) } unsafe fn __gref(&self) -> &'static GlobalState { - Self::__gref_raw().assume_init_ref() + (&*Self::__gref_raw()).assume_init_ref() } pub unsafe fn unload_all(self) { // TODO(@ohsayan): handle errors - let GlobalState { gns, .. } = Self::__gref_raw().assume_init_read(); + let GlobalState { gns, .. } = Self::__gref_raw().read().assume_init(); let mut gns_driver = gns.gns_driver().txn_driver.lock(); GNSDriver::close_driver(&mut gns_driver).unwrap(); for mdl in gns diff --git a/server/src/engine/fractal/test_utils.rs b/server/src/engine/fractal/test_utils.rs index f2572e0b..219cf0de 100644 --- a/server/src/engine/fractal/test_utils.rs +++ b/server/src/engine/fractal/test_utils.rs @@ -77,6 +77,7 @@ impl TestGlobal { model_name.entity(), model_data.get_uuid(), ), + Default::default(), )?; model.driver().initialize_model_driver(driver); } @@ -97,7 +98,7 @@ impl TestGlobal { Err(e) => match e.kind() { ErrorKind::IoError(e_) => match e_.kind() { std::io::ErrorKind::AlreadyExists => { - GNSDriver::open_gns_with_name(log_name, &data) + GNSDriver::open_gns_with_name(log_name, &data, Default::default()) } _ => Err(e), }, diff --git a/server/src/engine/macros.rs b/server/src/engine/macros.rs index ee3090bc..2968364b 100644 --- a/server/src/engine/macros.rs +++ b/server/src/engine/macros.rs @@ -460,3 +460,7 @@ macro_rules! e { r($e) }}; } + +macro_rules! l { + (let $($name:ident),* = $($expr:expr),*) => { let ($($name),*) = ($($expr),*); } +} diff --git a/server/src/engine/mod.rs b/server/src/engine/mod.rs index 101ee060..ab16eb14 100644 --- a/server/src/engine/mod.rs +++ b/server/src/engine/mod.rs @@ -42,7 +42,7 @@ mod txn; #[cfg(test)] mod tests; // re-export -pub use error::RuntimeResult; +pub use {error::RuntimeResult, fractal::Global}; use crate::engine::storage::SELoaded; @@ -198,3 +198,7 @@ pub fn finish(g: fractal::Global) { g.unload_all(); } } + +pub fn repair() -> RuntimeResult<()> { + storage::repair() +} diff --git a/server/src/engine/storage/common/interface/fs.rs b/server/src/engine/storage/common/interface/fs.rs index 58551822..4a9a9e12 100644 --- a/server/src/engine/storage/common/interface/fs.rs +++ b/server/src/engine/storage/common/interface/fs.rs @@ -30,6 +30,8 @@ file system */ +use crate::util; + #[cfg(test)] use super::vfs::{VFileDescriptor, VirtualFS}; use { @@ -56,6 +58,28 @@ impl FileSystem { } impl FileSystem { + #[inline(always)] + pub fn copy_directory(from: &str, to: &str) -> IoResult<()> { + #[cfg(test)] + { + match Self::context() { + FSContext::Local => {} + FSContext::Virtual => return VirtualFS::instance().write().fs_copy(from, to), + } + } + util::os::recursive_copy(from, to) + } + #[inline(always)] + pub fn copy(from: &str, to: &str) -> IoResult<()> { + #[cfg(test)] + { + match Self::context() { + FSContext::Local => {} + FSContext::Virtual => return VirtualFS::instance().write().fs_copy(from, to), + } + } + std_fs::copy(from, to).map(|_| ()) + } #[inline(always)] pub fn read(path: &str) -> IoResult> { #[cfg(test)] diff --git a/server/src/engine/storage/common/interface/vfs.rs b/server/src/engine/storage/common/interface/vfs.rs index 428933a4..51f5bb4b 100644 --- a/server/src/engine/storage/common/interface/vfs.rs +++ b/server/src/engine/storage/common/interface/vfs.rs @@ -62,6 +62,19 @@ enum VNode { File(RwLock), } +impl VNode { + fn clone_into_new_node(&self) -> Self { + match self { + Self::Dir(d) => Self::Dir( + d.iter() + .map(|(id, data)| (id.clone(), data.clone_into_new_node())) + .collect(), + ), + Self::File(f) => Self::File(RwLock::new(f.read().clone_to_new_file())), + } + } +} + #[derive(Debug)] pub(super) struct VFile { read: bool, @@ -103,6 +116,14 @@ impl Drop for VFileDescriptor { */ impl VFile { + pub fn clone_to_new_file(&self) -> Self { + Self { + read: false, + write: false, + data: self.data.clone(), + pos: 0, + } + } pub fn truncate(&mut self, to: u64) -> IoResult<()> { if !self.write { return Err(Error::new(ErrorKind::PermissionDenied, "Write permission denied").into()); @@ -187,6 +208,30 @@ impl VirtualFS { pub fn get_data(&self, path: &str) -> IoResult> { self.with_file(path, |f| Ok(f.data.clone())) } + pub fn fs_copy(&mut self, from: &str, to: &str) -> IoResult<()> { + let node = self.with_item(from, |node| Ok(node.clone_into_new_node()))?; + // process components + let (target, components) = util::split_target_and_components(to); + let mut current = &mut self.root; + for component in components { + match current.get_mut(component) { + Some(VNode::Dir(dir)) => { + current = dir; + } + Some(VNode::File(_)) => return err::file_in_dir_path(), + None => return err::dir_missing_in_path(), + } + } + match current.entry(target.into()) { + Entry::Occupied(mut item) => { + item.insert(node); + } + Entry::Vacant(ve) => { + ve.insert(node); + } + } + Ok(()) + } pub fn fs_fcreate_rw(&mut self, fpath: &str) -> IoResult { let (target_file, components) = util::split_target_and_components(fpath); let target_dir = util::find_target_dir_mut(components, &mut self.root)?; @@ -354,16 +399,13 @@ impl VirtualFS { fpath: &str, f: impl FnOnce(&VFile) -> IoResult, ) -> IoResult { - let (target_file, components) = util::split_target_and_components(fpath); - let target_dir = util::find_target_dir(components, &self.root)?; - match target_dir.get(target_file) { - Some(VNode::File(file)) => { + self.with_item(fpath, |node| match node { + VNode::File(file) => { let f_ = file.read(); f(&f_) } - Some(VNode::Dir(_)) => return err::item_is_not_file(), - None => return Err(Error::from(ErrorKind::NotFound).into()), - } + VNode::Dir(_) => err::item_is_not_file(), + }) } fn with_item_mut( &mut self, @@ -387,6 +429,24 @@ impl VirtualFS { Entry::Vacant(_) => return err::could_not_find_item(), } } + fn with_item(&self, fpath: &str, f: impl FnOnce(&VNode) -> IoResult) -> IoResult { + // process components + let (target, components) = util::split_target_and_components(fpath); + let mut current = &self.root; + for component in components { + match current.get(component) { + Some(VNode::Dir(dir)) => { + current = dir; + } + Some(VNode::File(_)) => return err::file_in_dir_path(), + None => return err::dir_missing_in_path(), + } + } + match current.get(target.into()) { + Some(item) => return f(item), + None => return err::could_not_find_item(), + } + } fn dir_delete(&mut self, fpath: &str, allow_if_non_empty: bool) -> IoResult<()> { self.with_item_mut(fpath, |node| match node.get() { VNode::Dir(d) => { diff --git a/server/src/engine/storage/common/sdss/impls/sdss_r1/mod.rs b/server/src/engine/storage/common/sdss/impls/sdss_r1/mod.rs index 2143a384..7cebd236 100644 --- a/server/src/engine/storage/common/sdss/impls/sdss_r1/mod.rs +++ b/server/src/engine/storage/common/sdss/impls/sdss_r1/mod.rs @@ -47,7 +47,7 @@ use { util::{compiler::TaggedEnum, os}, IoResult, }, - std::{mem::ManuallyDrop, ops::Range}, + std::ops::Range, }; pub const TEST_TIME: u128 = (u64::MAX / sizeof!(u64) as u64) as _; @@ -338,14 +338,10 @@ impl HeaderV1 { }) } else { let version_okay = okay_header_version & okay_server_version & okay_driver_version; - let md = ManuallyDrop::new([ - StorageError::HeaderDecodeCorruptedHeader, - StorageError::HeaderDecodeVersionMismatch, - ]); - Err(unsafe { - // UNSAFE(@ohsayan): while not needed, md for drop safety + correct index - md.as_ptr().add(!version_okay as usize).read().into() - }) + Err([ + StorageError::FileDecodeHeaderCorrupted, + StorageError::FileDecodeHeaderVersionMismatch, + ][!version_okay as usize]) } } } @@ -444,7 +440,7 @@ pub trait SimpleFileSpecV1 { if v == Self::FILE_SPECFIER_VERSION { Ok(()) } else { - Err(StorageError::HeaderDecodeVersionMismatch.into()) + Err(StorageError::FileDecodeHeaderVersionMismatch.into()) } } } @@ -466,7 +462,7 @@ impl FileSpecV1 for Sfs { if okay { Ok(md) } else { - Err(StorageError::HeaderDecodeVersionMismatch.into()) + Err(StorageError::FileDecodeHeaderVersionMismatch.into()) } } fn write_metadata(f: &mut impl FileWrite, _: Self::EncodeArgs) -> IoResult { diff --git a/server/src/engine/storage/common/sdss/impls/sdss_r1/rw.rs b/server/src/engine/storage/common/sdss/impls/sdss_r1/rw.rs index 11e4410f..cdf900c2 100644 --- a/server/src/engine/storage/common/sdss/impls/sdss_r1/rw.rs +++ b/server/src/engine/storage/common/sdss/impls/sdss_r1/rw.rs @@ -140,6 +140,9 @@ impl SdssFile { self.file.fwrite_all(data)?; self.file.fsync_all() } + pub fn truncate(&mut self, new_size: u64) -> IoResult<()> { + self.file.f_truncate(new_size) + } } /* @@ -234,13 +237,13 @@ impl TrackedReader { Err(e) => return Err(e), } } else { - Err(SysIOError::from(std::io::ErrorKind::InvalidInput).into_inner()) + Err(SysIOError::from(std::io::ErrorKind::UnexpectedEof).into_inner()) } } /// Tracked read of a given block size. Shorthand for [`Self::tracked_read`] pub fn read_block(&mut self) -> IoResult<[u8; N]> { if !self.has_left(N as _) { - return Err(SysIOError::from(std::io::ErrorKind::InvalidInput).into_inner()); + return Err(SysIOError::from(std::io::ErrorKind::UnexpectedEof).into_inner()); } let mut buf = [0; N]; self.tracked_read(&mut buf)?; @@ -259,6 +262,9 @@ impl TrackedReader { pub fn cursor(&self) -> u64 { self.cursor } + pub fn cached_size(&self) -> u64 { + self.len + } } impl TrackedReader { diff --git a/server/src/engine/storage/common_encoding/r1/impls/gns/mod.rs b/server/src/engine/storage/common_encoding/r1/impls/gns/mod.rs index bf501f42..42470a8f 100644 --- a/server/src/engine/storage/common_encoding/r1/impls/gns/mod.rs +++ b/server/src/engine/storage/common_encoding/r1/impls/gns/mod.rs @@ -74,7 +74,7 @@ where if scanner.eof() { Ok(()) } else { - Err(StorageError::JournalLogEntryCorrupted.into()) + Err(StorageError::V1JournalDecodeLogEntryCorrupted.into()) } } fn decode_and_update_global_state( diff --git a/server/src/engine/storage/common_encoding/r1/obj.rs b/server/src/engine/storage/common_encoding/r1/obj.rs index 2d16c615..768009b5 100644 --- a/server/src/engine/storage/common_encoding/r1/obj.rs +++ b/server/src/engine/storage/common_encoding/r1/obj.rs @@ -43,6 +43,7 @@ use { }, util::{compiler::TaggedEnum, EndianQW}, }, + std::marker::PhantomData, }; /* @@ -318,10 +319,10 @@ impl FieldMD { } } -pub struct FieldRef<'a>(&'a Field); +pub struct FieldRef<'a>(PhantomData<&'a Field>); impl<'a> From<&'a Field> for FieldRef<'a> { - fn from(f: &'a Field) -> Self { - Self(f) + fn from(_: &'a Field) -> Self { + Self(PhantomData) } } impl<'a> PersistObject for FieldRef<'a> { diff --git a/server/src/engine/storage/mod.rs b/server/src/engine/storage/mod.rs index 2bec1cfa..15844578 100644 --- a/server/src/engine/storage/mod.rs +++ b/server/src/engine/storage/mod.rs @@ -57,6 +57,10 @@ pub struct SELoaded { pub gns: GlobalNS, } +pub fn repair() -> RuntimeResult<()> { + v2::repair() +} + pub fn load(cfg: &Configuration) -> RuntimeResult { // first determine if this is a new install, an existing install or if it uses the old driver if Path::new(v1::SYSDB_PATH).is_file() { diff --git a/server/src/engine/storage/v1/raw/batch_jrnl/persist.rs b/server/src/engine/storage/v1/raw/batch_jrnl/persist.rs index 0811406c..0d5eb11c 100644 --- a/server/src/engine/storage/v1/raw/batch_jrnl/persist.rs +++ b/server/src/engine/storage/v1/raw/batch_jrnl/persist.rs @@ -53,7 +53,7 @@ impl DataBatchPersistDriver { if slf.fsynced_write(&[MARKER_BATCH_CLOSED]).is_ok() { return Ok(()); } else { - return Err(StorageError::DataBatchCloseError.into()); + return Err(StorageError::V1DataBatchRuntimeCloseError.into()); } } } diff --git a/server/src/engine/storage/v1/raw/batch_jrnl/restore.rs b/server/src/engine/storage/v1/raw/batch_jrnl/restore.rs index 91da0550..e6ed84d0 100644 --- a/server/src/engine/storage/v1/raw/batch_jrnl/restore.rs +++ b/server/src/engine/storage/v1/raw/batch_jrnl/restore.rs @@ -181,7 +181,7 @@ impl DataBatchRestoreDriver { } } // nope, this is a corrupted file - Err(StorageError::DataBatchRestoreCorruptedBatchFile.into()) + Err(StorageError::V1DataBatchDecodeCorruptedBatchFile.into()) } fn handle_reopen_is_actual_close(&mut self) -> RuntimeResult { if self.f.is_eof() { @@ -194,7 +194,7 @@ impl DataBatchRestoreDriver { Ok(false) } else { // that's just a nice bug - Err(StorageError::DataBatchRestoreCorruptedBatchFile.into()) + Err(StorageError::V1DataBatchDecodeCorruptedBatchFile.into()) } } } @@ -301,7 +301,7 @@ impl DataBatchRestoreDriver { // we must read the batch termination signature let b = self.f.read_byte()?; if b != MARKER_END_OF_BATCH { - return Err(StorageError::DataBatchRestoreCorruptedBatch.into()); + return Err(StorageError::V1DataBatchDecodeCorruptedBatch.into()); } } // read actual commit @@ -314,7 +314,7 @@ impl DataBatchRestoreDriver { if actual_checksum == u64::from_le_bytes(hardcoded_checksum) { Ok(actual_commit) } else { - Err(StorageError::DataBatchRestoreCorruptedBatch.into()) + Err(StorageError::V1DataBatchDecodeCorruptedBatch.into()) } } fn read_batch(&mut self) -> RuntimeResult { @@ -334,7 +334,7 @@ impl DataBatchRestoreDriver { } _ => { // this is the only singular byte that is expected to be intact. If this isn't intact either, I'm sorry - return Err(StorageError::DataBatchRestoreCorruptedBatch.into()); + return Err(StorageError::V1DataBatchDecodeCorruptedBatch.into()); } } // decode batch start block @@ -378,7 +378,7 @@ impl DataBatchRestoreDriver { this_col_cnt -= 1; } if this_col_cnt != 0 { - return Err(StorageError::DataBatchRestoreCorruptedEntry.into()); + return Err(StorageError::V1DataBatchDecodeCorruptedEntry.into()); } if change_type == 1 { this_batch.push(DecodedBatchEvent::new( @@ -396,7 +396,7 @@ impl DataBatchRestoreDriver { processed_in_this_batch += 1; } _ => { - return Err(StorageError::DataBatchRestoreCorruptedBatch.into()); + return Err(StorageError::V1DataBatchDecodeCorruptedBatch.into()); } } } @@ -413,7 +413,7 @@ impl DataBatchRestoreDriver { if let [MARKER_RECOVERY_EVENT] = buf { return Ok(()); } - Err(StorageError::DataBatchRestoreCorruptedBatch.into()) + Err(StorageError::V1DataBatchDecodeCorruptedBatch.into()) } fn read_start_batch_block(&mut self) -> RuntimeResult { let pk_tag = self.f.read_byte()?; @@ -463,7 +463,7 @@ impl BatchStartBlock { impl DataBatchRestoreDriver { fn decode_primary_key(&mut self, pk_type: u8) -> RuntimeResult { let Some(pk_type) = TagUnique::try_from_raw(pk_type) else { - return Err(StorageError::DataBatchRestoreCorruptedEntry.into()); + return Err(StorageError::V1DataBatchDecodeCorruptedEntry.into()); }; Ok(match pk_type { TagUnique::SignedInt | TagUnique::UnsignedInt => { @@ -479,7 +479,7 @@ impl DataBatchRestoreDriver { self.f.tracked_read(&mut data)?; if pk_type == TagUnique::Str { if core::str::from_utf8(&data).is_err() { - return Err(StorageError::DataBatchRestoreCorruptedEntry.into()); + return Err(StorageError::V1DataBatchDecodeCorruptedEntry.into()); } } unsafe { @@ -496,7 +496,7 @@ impl DataBatchRestoreDriver { } fn decode_cell(&mut self) -> RuntimeResult { let Some(dscr) = StorageCellTypeID::try_from_raw(self.f.read_byte()?) else { - return Err(StorageError::DataBatchRestoreCorruptedEntry.into()); + return Err(StorageError::V1DataBatchDecodeCorruptedEntry.into()); }; unsafe { cell::decode_element::(&mut self.f, dscr) } .map_err(|e| e.0) @@ -516,7 +516,7 @@ impl From for ErrorHack { } impl From<()> for ErrorHack { fn from(_: ()) -> Self { - Self(StorageError::DataBatchRestoreCorruptedEntry.into()) + Self(StorageError::V1DataBatchDecodeCorruptedEntry.into()) } } diff --git a/server/src/engine/storage/v1/raw/journal/mod.rs b/server/src/engine/storage/v1/raw/journal/mod.rs index 23f58368..ef14c5ec 100644 --- a/server/src/engine/storage/v1/raw/journal/mod.rs +++ b/server/src/engine/storage/v1/raw/journal/mod.rs @@ -55,7 +55,7 @@ impl JournalAdapter for GNSAdapter { fn decode_and_update_state(payload: &[u8], gs: &Self::GlobalState) -> RuntimeResult<()> { macro_rules! dispatch { ($($item:ty),* $(,)?) => { - [$(<$item as GNSEvent>::decode_and_update_global_state),*, |_, _| Err(TransactionError::DecodeUnknownTxnOp.into())] + [$(<$item as GNSEvent>::decode_and_update_global_state),*, |_, _| Err(TransactionError::V1DecodeUnknownTxnOp.into())] }; } static DISPATCH: [fn(&mut BufferedScanner, &GNSData) -> RuntimeResult<()>; 9] = dispatch!( @@ -69,7 +69,7 @@ impl JournalAdapter for GNSAdapter { gns::model::DropModelTxn ); if payload.len() < 2 { - return Err(TransactionError::DecodedUnexpectedEof.into()); + return Err(TransactionError::V1DecodedUnexpectedEof.into()); } let mut scanner = BufferedScanner::new(&payload); let opc = unsafe { @@ -78,7 +78,7 @@ impl JournalAdapter for GNSAdapter { }; match DISPATCH[(opc as usize).min(DISPATCH.len())](&mut scanner, gs) { Ok(()) if scanner.eof() => return Ok(()), - Ok(_) => Err(TransactionError::DecodeCorruptedPayloadMoreBytes.into()), + Ok(_) => Err(TransactionError::V1DecodeCorruptedPayloadMoreBytes.into()), Err(e) => Err(e), } } diff --git a/server/src/engine/storage/v1/raw/journal/raw.rs b/server/src/engine/storage/v1/raw/journal/raw.rs index 1c0d42d7..eed6e360 100644 --- a/server/src/engine/storage/v1/raw/journal/raw.rs +++ b/server/src/engine/storage/v1/raw/journal/raw.rs @@ -215,7 +215,7 @@ impl JournalReader { } match entry_metadata .event_source_marker() - .ok_or(StorageError::JournalLogEntryCorrupted)? + .ok_or(StorageError::V1JournalDecodeLogEntryCorrupted)? { EventSourceMarker::ServerStandard => {} EventSourceMarker::DriverClosed => { @@ -230,7 +230,7 @@ impl JournalReader { EventSourceMarker::DriverReopened | EventSourceMarker::RecoveryReverseLastJournal => { // these two are only taken in close and error paths (respectively) so we shouldn't see them here; this is bad // two special directives in the middle of nowhere? incredible - return Err(StorageError::JournalCorrupted.into()); + return Err(StorageError::V1JournalDecodeCorrupted.into()); } } // read payload @@ -263,10 +263,10 @@ impl JournalReader { Ok(()) } else { // FIXME(@ohsayan): tolerate loss in this directive too - Err(StorageError::JournalCorrupted.into()) + Err(StorageError::V1JournalDecodeCorrupted.into()) } } else { - Err(StorageError::JournalCorrupted.into()) + Err(StorageError::V1JournalDecodeCorrupted.into()) } } #[cold] // FIXME(@ohsayan): how bad can prod systems be? (clue: pretty bad, so look for possible changes) @@ -279,7 +279,7 @@ impl JournalReader { self.__record_read_bytes(JournalEntryMetadata::SIZE); // FIXME(@ohsayan): don't assume read length? let mut entry_buf = [0u8; JournalEntryMetadata::SIZE]; if self.log_file.read_buffer(&mut entry_buf).is_err() { - return Err(StorageError::JournalCorrupted.into()); + return Err(StorageError::V1JournalDecodeCorrupted.into()); } let entry = JournalEntryMetadata::decode(entry_buf); let okay = (entry.event_id == self.evid as u128) @@ -290,7 +290,7 @@ impl JournalReader { if okay { return Ok(()); } else { - Err(StorageError::JournalCorrupted.into()) + Err(StorageError::V1JournalDecodeCorrupted.into()) } } /// Read and apply all events in the given log file to the global state, returning the (open file, last event ID) @@ -305,7 +305,7 @@ impl JournalReader { if slf.closed { Ok((slf.log_file.downgrade_reader(), slf.evid)) } else { - Err(StorageError::JournalCorrupted.into()) + Err(StorageError::V1JournalDecodeCorrupted.into()) } } } diff --git a/server/src/engine/storage/v1/raw/sysdb.rs b/server/src/engine/storage/v1/raw/sysdb.rs index 3fa16deb..c3c4e27a 100644 --- a/server/src/engine/storage/v1/raw/sysdb.rs +++ b/server/src/engine/storage/v1/raw/sysdb.rs @@ -42,7 +42,7 @@ fn rkey( ) -> RuntimeResult { match d.remove(key).map(transform) { Some(Some(k)) => Ok(k), - _ => Err(StorageError::SysDBCorrupted.into()), + _ => Err(StorageError::V1SysDBDecodeCorrupted.into()), } } @@ -95,14 +95,14 @@ impl RestoredSystemDatabase { let mut userdata = userdata .into_data() .and_then(Datacell::into_list) - .ok_or(StorageError::SysDBCorrupted)?; + .ok_or(StorageError::V1SysDBDecodeCorrupted)?; if userdata.len() != 1 { - return Err(StorageError::SysDBCorrupted.into()); + return Err(StorageError::V1SysDBDecodeCorrupted.into()); } let user_password = userdata .remove(0) .into_bin() - .ok_or(StorageError::SysDBCorrupted)?; + .ok_or(StorageError::V1SysDBDecodeCorrupted)?; loaded_users.insert(username, user_password.into_boxed_slice()); } // load sys data @@ -117,7 +117,7 @@ impl RestoredSystemDatabase { & sys_store.is_empty() & loaded_users.contains_key(SystemDatabase::ROOT_ACCOUNT)) { - return Err(StorageError::SysDBCorrupted.into()); + return Err(StorageError::V1SysDBDecodeCorrupted.into()); } Ok(Self::new(loaded_users, sc, sv)) } diff --git a/server/src/engine/storage/v2/impls/gns_log.rs b/server/src/engine/storage/v2/impls/gns_log.rs index 6842cdd0..878e828f 100644 --- a/server/src/engine/storage/v2/impls/gns_log.rs +++ b/server/src/engine/storage/v2/impls/gns_log.rs @@ -34,7 +34,7 @@ use { core::GNSData, storage::{ common_encoding::r1::impls::gns::GNSEvent, - v2::raw::journal::{self, EventLogDriver, JournalAdapterEvent}, + v2::raw::journal::{self, EventLogDriver, JournalAdapterEvent, JournalSettings}, }, txn::gns::{ model::{ @@ -61,11 +61,15 @@ pub struct GNSEventLog; impl GNSDriver { const FILE_PATH: &'static str = "gns.db-tlog"; - pub fn open_gns_with_name(name: &str, gs: &GNSData) -> RuntimeResult { - journal::open_journal(name, gs) + pub fn open_gns_with_name( + name: &str, + gs: &GNSData, + settings: JournalSettings, + ) -> RuntimeResult { + journal::open_journal(name, gs, settings) } - pub fn open_gns(gs: &GNSData) -> RuntimeResult { - Self::open_gns_with_name(Self::FILE_PATH, gs) + pub fn open_gns(gs: &GNSData, settings: JournalSettings) -> RuntimeResult { + Self::open_gns_with_name(Self::FILE_PATH, gs, settings) } pub fn create_gns_with_name(name: &str) -> RuntimeResult { journal::create_journal(name) diff --git a/server/src/engine/storage/v2/impls/mdl_journal.rs b/server/src/engine/storage/v2/impls/mdl_journal.rs index 6870eda3..59af1faa 100644 --- a/server/src/engine/storage/v2/impls/mdl_journal.rs +++ b/server/src/engine/storage/v2/impls/mdl_journal.rs @@ -46,7 +46,7 @@ use { v2::raw::{ journal::{ self, BatchAdapter, BatchAdapterSpec, BatchDriver, JournalAdapterEvent, - RawJournalAdapter, + JournalSettings, RawJournalAdapter, }, spec::ModelDataBatchAofV1, }, @@ -66,8 +66,12 @@ use { pub type ModelDriver = BatchDriver; impl ModelDriver { - pub fn open_model_driver(mdl: &ModelData, model_data_file_path: &str) -> RuntimeResult { - journal::open_journal(model_data_file_path, mdl) + pub fn open_model_driver( + mdl: &ModelData, + model_data_file_path: &str, + settings: JournalSettings, + ) -> RuntimeResult { + journal::open_journal(model_data_file_path, mdl, settings) } /// Create a new event log pub fn create_model_driver(model_data_file_path: &str) -> RuntimeResult { @@ -449,7 +453,7 @@ impl BatchAdapterSpec for ModelDataAdapter { BatchType::Standard => {} } let pk_tag = TagUnique::try_from_raw(f.read_block().map(|[b]| b)?) - .ok_or(StorageError::RawJournalCorrupted)?; + .ok_or(StorageError::InternalDecodeStructureIllegalData)?; let schema_version = u64::from_le_bytes(f.read_block()?); let column_count = u64::from_le_bytes(f.read_block()?); Ok(BatchMetadata { @@ -656,7 +660,7 @@ mod restore_impls { f.read(&mut data)?; if pk_type == TagUnique::Str { if core::str::from_utf8(&data).is_err() { - return Err(StorageError::DataBatchRestoreCorruptedEntry.into()); + return Err(StorageError::InternalDecodeStructureCorruptedPayload.into()); } } unsafe { @@ -679,7 +683,7 @@ mod restore_impls { let mut this_col_cnt = batch_info.column_count; while this_col_cnt != 0 { let Some(dscr) = StorageCellTypeID::try_from_raw(f.read_block().map(|[b]| b)?) else { - return Err(StorageError::DataBatchRestoreCorruptedEntry.into()); + return Err(StorageError::InternalDecodeStructureIllegalData.into()); }; let cell = unsafe { cell::decode_element::(f, dscr) }.map_err(|e| e.0)?; row.push(cell); @@ -705,7 +709,7 @@ mod restore_impls { } impl From<()> for ErrorHack { fn from(_: ()) -> Self { - Self(StorageError::DataBatchRestoreCorruptedEntry.into()) + Self(StorageError::InternalDecodeStructureCorrupted.into()) } } impl<'a> DataSource for TrackedReaderContext<'a, ModelDataBatchAofV1> { diff --git a/server/src/engine/storage/v2/mod.rs b/server/src/engine/storage/v2/mod.rs index 8d3bae33..6635cc8f 100644 --- a/server/src/engine/storage/v2/mod.rs +++ b/server/src/engine/storage/v2/mod.rs @@ -25,25 +25,34 @@ */ use { - self::impls::mdl_journal::{BatchStats, FullModel}, + self::{ + impls::mdl_journal::{BatchStats, FullModel}, + raw::journal::{JournalSettings, RepairResult}, + }, super::{common::interface::fs::FileSystem, v1, SELoaded}, - crate::engine::{ - config::Configuration, - core::{ - system_db::{SystemDatabase, VerifyUser}, - GNSData, GlobalNS, - }, - fractal::{context, FractalGNSDriver}, - storage::common::paths_v1, - txn::{ - gns::{ - model::CreateModelTxn, - space::CreateSpaceTxn, - sysctl::{AlterUserTxn, CreateUserTxn}, + crate::{ + engine::{ + config::Configuration, + core::{ + system_db::{SystemDatabase, VerifyUser}, + EntityIDRef, GNSData, GlobalNS, + }, + fractal::{context, FractalGNSDriver}, + storage::{ + common::paths_v1, + v2::raw::journal::{self, JournalRepairMode}, + }, + txn::{ + gns::{ + model::CreateModelTxn, + space::CreateSpaceTxn, + sysctl::{AlterUserTxn, CreateUserTxn}, + }, + SpaceIDRef, }, - SpaceIDRef, + RuntimeResult, }, - RuntimeResult, + util, }, impls::mdl_journal::ModelDriver, }; @@ -120,15 +129,18 @@ pub fn initialize_new(config: &Configuration) -> RuntimeResult { pub fn restore(cfg: &Configuration) -> RuntimeResult { let gns = GNSData::empty(); context::set_dmsg("loading gns"); - let mut gns_driver = impls::gns_log::GNSDriver::open_gns(&gns)?; + let mut gns_driver = impls::gns_log::GNSDriver::open_gns(&gns, JournalSettings::default())?; for (id, model) in gns.idx_models().write().iter_mut() { let model_data = model.data(); let space_uuid = gns.idx().read().get(id.space()).unwrap().get_uuid(); let model_data_file_path = paths_v1::model_path(id.space(), space_uuid, id.entity(), model_data.get_uuid()); context::set_dmsg(format!("loading model driver in {model_data_file_path}")); - let model_driver = - impls::mdl_journal::ModelDriver::open_model_driver(model_data, &model_data_file_path)?; + let model_driver = impls::mdl_journal::ModelDriver::open_model_driver( + model_data, + &model_data_file_path, + JournalSettings::default(), + )?; model.driver().initialize_model_driver(model_driver); unsafe { // UNSAFE(@ohsayan): all pieces of data are upgraded by now, so vacuum @@ -153,3 +165,69 @@ pub fn restore(cfg: &Configuration) -> RuntimeResult { gns: GlobalNS::new(gns, FractalGNSDriver::new(gns_driver)), }) } + +pub fn repair() -> RuntimeResult<()> { + // back up all files + let backup_dir = format!( + "backups/{}-before-recovery-process", + util::time_now_string() + ); + context::set_dmsg("creating backup directory"); + FileSystem::create_dir_all(&backup_dir)?; + context::set_dmsg("backing up GNS"); + FileSystem::copy(GNS_PATH, &format!("{backup_dir}/{GNS_PATH}"))?; // backup GNS + context::set_dmsg("backing up data directory"); + FileSystem::copy_directory(DATA_DIR, &format!("{backup_dir}/{DATA_DIR}"))?; // backup data + info!("All data backed up in {backup_dir}"); + // check and attempt repair: GNS + let gns = GNSData::empty(); + context::set_dmsg("repair GNS"); + print_repair_info( + journal::repair_journal::>( + GNS_PATH, + &gns, + JournalSettings::default(), + JournalRepairMode::Simple, + )?, + "GNS", + ); + // check and attempt repair: models + let models = gns.idx_models().read(); + for (space_id, space) in gns.idx().read().iter() { + for model_id in space.models().iter() { + let model = models.get(&EntityIDRef::new(&space_id, &model_id)).unwrap(); + let model_data_file_path = paths_v1::model_path( + &space_id, + space.get_uuid(), + &model_id, + model.data().get_uuid(), + ); + context::set_dmsg(format!("repairing {model_data_file_path}")); + print_repair_info( + journal::repair_journal::< + raw::journal::BatchAdapter, + >( + &model_data_file_path, + model.data(), + JournalSettings::default(), + JournalRepairMode::Simple, + )?, + &model_data_file_path, + ) + } + } + Ok(()) +} + +fn print_repair_info(result: RepairResult, id: &str) { + match result { + RepairResult::NoErrors => info!("repair: no errors detected in {id}"), + RepairResult::UnspecifiedLoss(definitely_lost) => { + if definitely_lost == 0 { + warn!("repair: LOST DATA. repaired {id} but lost an unspecified amount of data") + } else { + warn!("repair: LOST DATA. repaired {id} but lost atleast {definitely_lost} trailing bytes") + } + } + } +} diff --git a/server/src/engine/storage/v2/raw/journal/mod.rs b/server/src/engine/storage/v2/raw/journal/mod.rs index 230849d3..0da3f3de 100644 --- a/server/src/engine/storage/v2/raw/journal/mod.rs +++ b/server/src/engine/storage/v2/raw/journal/mod.rs @@ -47,7 +47,8 @@ mod raw; #[cfg(test)] mod tests; pub use raw::{ - create_journal, open_journal, RawJournalAdapter, RawJournalAdapterEvent as JournalAdapterEvent, + create_journal, open_journal, repair_journal, JournalRepairMode, JournalSettings, + RawJournalAdapter, RawJournalAdapterEvent as JournalAdapterEvent, RepairResult, }; /* @@ -136,7 +137,7 @@ impl RawJournalAdapter for EventLogAdapter { this_checksum.update(&plen.to_le_bytes()); this_checksum.update(&pl); if this_checksum.finish() != expected_checksum { - return Err(StorageError::RawJournalCorrupted.into()); + return Err(StorageError::RawJournalDecodeCorruptionInBatchMetadata.into()); } ::DECODE_DISPATCH [<::EventMeta as TaggedEnum>::dscr_u64(&meta) as usize]( @@ -165,11 +166,15 @@ pub struct BatchAdapter(PhantomData); #[cfg(test)] impl BatchAdapter { /// Open a new batch journal - pub fn open(name: &str, gs: &BA::GlobalState) -> RuntimeResult> + pub fn open( + name: &str, + gs: &BA::GlobalState, + settings: JournalSettings, + ) -> RuntimeResult> where BA::Spec: FileSpecV1, { - raw::open_journal::>(name, gs) + raw::open_journal::>(name, gs, settings) } /// Create a new batch journal pub fn create(name: &str) -> RuntimeResult> @@ -278,7 +283,7 @@ impl RawJournalAdapter for BatchAdapter { let event_type = <::EventType as TaggedEnum>::try_from_raw( f.read_block().map(|[b]| b)?, ) - .ok_or(StorageError::RawJournalCorrupted)?; + .ok_or(StorageError::InternalDecodeStructureIllegalData)?; // is this an early exit marker? if so, exit if ::is_early_exit(&event_type) { break; @@ -299,7 +304,7 @@ impl RawJournalAdapter for BatchAdapter { // finish applying batch BA::finish(batch_state, batch_md, gs)?; } else { - return Err(StorageError::RawJournalCorrupted.into()); + return Err(StorageError::RawJournalDecodeBatchContentsMismatch.into()); } } // and finally, verify checksum @@ -308,7 +313,7 @@ impl RawJournalAdapter for BatchAdapter { if real_checksum == stored_checksum { Ok(()) } else { - Err(StorageError::RawJournalCorrupted.into()) + Err(StorageError::RawJournalDecodeBatchIntegrityFailure.into()) } } } diff --git a/server/src/engine/storage/v2/raw/journal/raw/mod.rs b/server/src/engine/storage/v2/raw/journal/raw/mod.rs index 6a53516d..b93eae48 100644 --- a/server/src/engine/storage/v2/raw/journal/raw/mod.rs +++ b/server/src/engine/storage/v2/raw/journal/raw/mod.rs @@ -30,7 +30,8 @@ mod tests; use { crate::{ engine::{ - error::StorageError, + error::{ErrorKind, StorageError, TransactionError}, + fractal::error::Error, mem::unsafe_apis::memcpy, storage::common::{ checksum::SCrc64, @@ -44,7 +45,7 @@ use { util::compiler::TaggedEnum, }, core::fmt, - std::ops::Range, + std::{io::ErrorKind as IoErrorKind, ops::Range}, }; /* @@ -67,15 +68,44 @@ where pub fn open_journal( log_path: &str, gs: &J::GlobalState, + settings: JournalSettings, ) -> RuntimeResult> where J::Spec: FileSpecV1, { let log = SdssFile::::open(log_path)?; - let (initializer, file) = RawJournalReader::::scroll(log, gs)?; + let (initializer, file) = RawJournalReader::::scroll(log, gs, settings)?; RawJournalWriter::new(initializer, file) } +#[derive(Debug, PartialEq)] +/// The result of a journal repair operation +pub enum RepairResult { + /// No errors were detected + NoErrors, + /// Definitely lost n bytes, but might have lost more + UnspecifiedLoss(u64), +} + +/** + Attempts to repair the given journal, **in-place** and returns the number of bytes that were definitely lost and could not + be repaired. + + **WARNING**: Backup before calling this +*/ +pub fn repair_journal( + log_path: &str, + gs: &J::GlobalState, + settings: JournalSettings, + repair_mode: JournalRepairMode, +) -> RuntimeResult +where + J::Spec: FileSpecV1, +{ + let log = SdssFile::::open(log_path)?; + RawJournalReader::::repair(log, gs, settings, repair_mode).map(|(lost, ..)| lost) +} + #[derive(Debug)] pub struct JournalInitializer { cursor: u64, @@ -118,10 +148,20 @@ impl JournalInitializer { */ #[cfg(test)] -pub fn obtain_trace() -> Vec { +pub fn debug_get_trace() -> Vec { local_mut!(TRACE, |t| core::mem::take(t)) } +#[cfg(test)] +pub fn debug_get_offsets() -> std::collections::BTreeMap { + local_mut!(OFFSETS, |offsets| core::mem::take(offsets)) +} + +#[cfg(test)] +pub fn debug_set_offset_tracking(track: bool) { + local_mut!(TRACE_OFFSETS, |track_| *track_ = track) +} + #[derive(Debug, PartialEq)] #[cfg(test)] pub enum JournalTraceEvent { @@ -145,6 +185,7 @@ pub enum JournalReaderTraceEvent { ClosedAndReachedEof, ReopenSuccess, // event + LookingForEvent, AttemptingEvent(u64), DetectedServerEvent, ServerEventMetadataParsed, @@ -183,9 +224,26 @@ pub(super) enum JournalWriterTraceEvent { DriverClosed, } +#[cfg(test)] local! { - #[cfg(test)] static TRACE: Vec = Vec::new(); + static OFFSETS: std::collections::BTreeMap = Default::default(); + static TRACE_OFFSETS: bool = false; +} + +macro_rules! jtrace_event_offset { + ($id:expr, $offset:expr) => { + #[cfg(test)] + { + local_ref!(TRACE_OFFSETS, |should_trace| { + if *should_trace { + local_mut!(OFFSETS, |offsets| assert!(offsets + .insert($id, $offset) + .is_none())) + } + }) + } + }; } macro_rules! jtrace { @@ -322,7 +380,6 @@ impl DriverEvent { const OFFSET_6_LAST_TXN_ID: Range = Self::OFFSET_5_LAST_OFFSET.end..Self::OFFSET_5_LAST_OFFSET.end + sizeof!(u64); /// Create a new driver event (checksum auto-computed) - #[cfg(test)] fn new( txn_id: u128, driver_event: DriverEventKind, @@ -364,7 +421,6 @@ impl DriverEvent { } } /// Encode the current driver event - #[cfg(test)] fn encode_self(&self) -> [u8; 64] { Self::encode( self.txn_id, @@ -576,7 +632,7 @@ impl RawJournalWriter { Ok(()) } else { // so, the on-disk file probably has some partial state. this is bad. throw an error - Err(StorageError::RawJournalRuntimeCriticalLwtHBFail.into()) + Err(StorageError::RawJournalRuntimeHeartbeatFail.into()) } } } @@ -590,6 +646,7 @@ impl RawJournalWriter { self.txn_id += 1; let ret = f(self, id as u128); if ret.is_ok() { + jtrace_event_offset!(id, self.log_file.cursor()); self.known_txn_id = id; self.known_txn_offset = self.log_file.cursor(); } @@ -640,6 +697,37 @@ pub struct RawJournalReader { last_txn_offset: u64, last_txn_checksum: u64, stats: JournalStats, + _settings: JournalSettings, + state: JournalState, +} + +#[derive(Debug, PartialEq)] +enum JournalState { + AwaitingEvent, + AwaitingServerEvent, + AwaitingClose, + AwaitingReopen, +} + +impl Default for JournalState { + fn default() -> Self { + Self::AwaitingEvent + } +} + +#[derive(Debug)] +pub struct JournalSettings {} + +impl Default for JournalSettings { + fn default() -> Self { + Self::new() + } +} + +impl JournalSettings { + pub fn new() -> Self { + Self {} + } } #[derive(Debug)] @@ -658,28 +746,36 @@ impl JournalStats { } impl RawJournalReader { - pub fn scroll( + fn scroll( file: SdssFile<::Spec>, gs: &J::GlobalState, + settings: JournalSettings, ) -> RuntimeResult<(JournalInitializer, SdssFile)> { let reader = TrackedReader::with_cursor( file, <::Spec as FileSpecV1>::SIZE as u64, )?; jtrace_reader!(Initialized); - let mut me = Self::new(reader, 0, 0, 0, 0); + let mut me = Self::new(reader, 0, 0, 0, 0, settings); + me._scroll(gs).map(|jinit| (jinit, me.tr.into_inner())) + } + fn _scroll(&mut self, gs: &J::GlobalState) -> RuntimeResult { loop { - if me._apply_next_event_and_stop(gs)? { - jtrace_reader!(Completed); - let initializer = JournalInitializer::new( - me.tr.cursor(), - me.tr.checksum(), - me.txn_id, - // NB: the last txn offset is important because it indicates that the log is new - me.last_txn_offset, - ); - let file = me.tr.into_inner(); - return Ok((initializer, file)); + jtrace_reader!(LookingForEvent); + match self._apply_next_event_and_stop(gs) { + Ok(true) => { + jtrace_reader!(Completed); + let initializer = JournalInitializer::new( + self.tr.cursor(), + self.tr.checksum(), + self.txn_id, + // NB: the last txn offset is important because it indicates that the log is new + self.last_txn_offset, + ); + return Ok(initializer); + } + Ok(false) => self.state = JournalState::AwaitingEvent, + Err(e) => return Err(e), } } } @@ -689,6 +785,7 @@ impl RawJournalReader { last_txn_id: u64, last_txn_offset: u64, last_txn_checksum: u64, + settings: JournalSettings, ) -> Self { Self { tr: reader, @@ -697,6 +794,8 @@ impl RawJournalReader { last_txn_offset, last_txn_checksum, stats: JournalStats::new(), + _settings: settings, + state: JournalState::AwaitingEvent, } } fn __refresh_known_txn(me: &mut Self) { @@ -707,6 +806,156 @@ impl RawJournalReader { } } +#[derive(Debug, PartialEq)] +pub enum JournalRepairMode { + Simple, +} + +impl RawJournalReader { + fn repair( + file: SdssFile<::Spec>, + gs: &J::GlobalState, + settings: JournalSettings, + repair_mode: JournalRepairMode, + ) -> RuntimeResult<(RepairResult, JournalInitializer, SdssFile)> { + let reader = TrackedReader::with_cursor( + file, + <::Spec as FileSpecV1>::SIZE as u64, + )?; + jtrace_reader!(Initialized); + let mut me = Self::new(reader, 0, 0, 0, 0, settings); + match me._scroll(gs) { + Ok(init) => return Ok((RepairResult::NoErrors, init, me.tr.into_inner())), + Err(e) => me.start_repair(e, repair_mode), + } + } + fn start_repair( + self, + e: Error, + repair_mode: JournalRepairMode, + ) -> RuntimeResult<(RepairResult, JournalInitializer, SdssFile)> { + let lost = if self.last_txn_offset == 0 { + // we haven't scanned any events and already hit an error + // so essentially, we lost the entire log + self.tr.cached_size() - ::SIZE as u64 + } else { + self.tr.cached_size() - self.last_txn_offset + }; + let repair_result = RepairResult::UnspecifiedLoss(lost); + match repair_mode { + JournalRepairMode::Simple => {} + } + // now it's our task to determine exactly what happened + match e.kind() { + ErrorKind::IoError(io) => match io.kind() { + IoErrorKind::UnexpectedEof => { + /* + this is the only kind of error that we can actually repair since it indicates that a part of the + file is "missing." we can't deal with things like permission errors. that's supposed to be handled + by the admin by looking through the error logs + */ + } + _ => return Err(e), + }, + ErrorKind::Storage(e) => match e { + // unreachable errors (no execution path here) + StorageError::RawJournalRuntimeHeartbeatFail // can't reach runtime error before driver start + | StorageError::RawJournalRuntimeDirty + | StorageError::FileDecodeHeaderVersionMismatch // should be caught earlier + | StorageError::FileDecodeHeaderCorrupted // should be caught earlier + | StorageError::V1JournalDecodeLogEntryCorrupted // v1 errors can't be raised here + | StorageError::V1JournalDecodeCorrupted + | StorageError::V1DataBatchDecodeCorruptedBatch + | StorageError::V1DataBatchDecodeCorruptedEntry + | StorageError::V1DataBatchDecodeCorruptedBatchFile + | StorageError::V1SysDBDecodeCorrupted + | StorageError::V1DataBatchRuntimeCloseError => unreachable!(), + // possible errors + StorageError::InternalDecodeStructureCorrupted + | StorageError::InternalDecodeStructureCorruptedPayload + | StorageError::InternalDecodeStructureIllegalData + | StorageError::RawJournalDecodeEventCorruptedMetadata + | StorageError::RawJournalDecodeEventCorruptedPayload + | StorageError::RawJournalDecodeBatchContentsMismatch + | StorageError::RawJournalDecodeBatchIntegrityFailure + | StorageError::RawJournalDecodeInvalidEvent + | StorageError::RawJournalDecodeCorruptionInBatchMetadata => {} + }, + ErrorKind::Txn(txerr) => match txerr { + // unreachable errors + TransactionError::V1DecodeCorruptedPayloadMoreBytes // no v1 errors + | TransactionError::V1DecodedUnexpectedEof + | TransactionError::V1DecodeUnknownTxnOp => unreachable!(), + // possible errors + TransactionError::OnRestoreDataConflictAlreadyExists | + TransactionError::OnRestoreDataMissing | + TransactionError::OnRestoreDataConflictMismatch => {}, + }, + // these errors do not have an execution pathway + ErrorKind::Other(_) => unreachable!(), + ErrorKind::Config(_) => unreachable!(), + } + /* + revert log. record previous signatures. + */ + l!(let known_event_id, known_event_offset, known_event_checksum = self.last_txn_id, self.last_txn_offset, self.last_txn_checksum); + let mut last_logged_checksum = self.tr.checksum(); + let mut base_log = self.tr.into_inner(); + if known_event_offset == 0 { + // no event, so just trim upto header + base_log.truncate(::SIZE as _)?; + } else { + base_log.truncate(known_event_offset)?; + } + /* + see what needs to be done next + */ + match self.state { + JournalState::AwaitingEvent + | JournalState::AwaitingServerEvent + | JournalState::AwaitingClose => { + /* + no matter what the last event was (and definitely not a close since if we are expecting a close the log was not already closed), + the log is in a dirty state that can only be resolved by closing it + */ + let drv_close = DriverEvent::new( + if known_event_offset == 0 { + // no event occurred + 0 + } else { + // something happened prior to this, so we'll use an incremented ID for this event + known_event_id + 1 + } as u128, + DriverEventKind::Closed, + known_event_checksum, + known_event_offset, + known_event_id, + ); + let drv_close_event = drv_close.encode_self(); + last_logged_checksum.update(&drv_close_event); + base_log.fsynced_write(&drv_close_event)?; + } + JournalState::AwaitingReopen => { + // extra bytes indicating low to severe corruption; last event is a close, so with the revert the log is now clean + } + } + let jinit_cursor = known_event_offset + DriverEvent::FULL_EVENT_SIZE as u64; + let jinit_last_txn_offset = jinit_cursor; // same as cursor + let jinit_event_id = known_event_id + 2; // since we already used +1 + let jinit_checksum = last_logged_checksum; + Ok(( + repair_result, + JournalInitializer::new( + jinit_cursor, + jinit_checksum, + jinit_event_id, + jinit_last_txn_offset, + ), + base_log, + )) + } +} + impl RawJournalReader { fn _apply_next_event_and_stop(&mut self, gs: &J::GlobalState) -> RuntimeResult { let txn_id = u128::from_le_bytes(self.tr.read_block()?); @@ -716,12 +965,13 @@ impl RawJournalReader { expected: self.txn_id, current: txn_id as u64 }); - return Err(StorageError::RawJournalEventCorruptedMetadata.into()); + return Err(StorageError::RawJournalDecodeEventCorruptedMetadata.into()); } jtrace_reader!(AttemptingEvent(txn_id as u64)); // check for a server event // is this a server event? if meta & SERVER_EV_MASK != 0 { + self.state = JournalState::AwaitingServerEvent; jtrace_reader!(DetectedServerEvent); let meta = meta & !SERVER_EV_MASK; match J::parse_event_meta(meta) { @@ -740,9 +990,10 @@ impl RawJournalReader { Err(e) => return Err(e), } } - None => return Err(StorageError::RawJournalEventCorruptedMetadata.into()), + None => return Err(StorageError::RawJournalDecodeEventCorruptedMetadata.into()), } } + self.state = JournalState::AwaitingClose; return self.handle_close(txn_id, meta); } fn handle_close( @@ -772,9 +1023,9 @@ impl RawJournalReader { .. }) => { jtrace_reader!(ErrExpectedCloseGotReopen); - return Err(StorageError::RawJournalInvalidEvent.into()); + return Err(StorageError::RawJournalDecodeInvalidEvent.into()); } - None => return Err(StorageError::RawJournalEventCorrupted.into()), + None => return Err(StorageError::RawJournalDecodeEventCorruptedPayload.into()), }; jtrace_reader!(DriverEventExpectedCloseGotClose); // a driver closed event; we've checked integrity, but we must check the field values @@ -787,27 +1038,29 @@ impl RawJournalReader { jtrace_reader!(DriverEventInvalidMetadata); // either the block is corrupted or the data we read is corrupted; either way, // we're going to refuse to read this - return Err(StorageError::RawJournalCorrupted.into()); + return Err(StorageError::RawJournalDecodeEventCorruptedMetadata.into()); } self.stats.driver_events += 1; // update Self::__refresh_known_txn(self); - // full metadata validated; this is a valid close event but is it actually a close + // full metadata validated; this is a valid close event, but is it actually a close? if self.tr.is_eof() { jtrace_reader!(ClosedAndReachedEof); // yes, we're done return Ok(true); } + self.state = JournalState::AwaitingReopen; + jtrace_reader!(DriverEventExpectingReopenBlock); return self.handle_reopen(); } fn handle_reopen(&mut self) -> RuntimeResult { jtrace_reader!(AttemptingEvent(self.txn_id as u64)); - jtrace_reader!(DriverEventExpectingReopenBlock); // now we must look for a reopen event let event_block = self.tr.read_block::<{ DriverEvent::FULL_EVENT_SIZE }>()?; let reopen_event = match DriverEvent::decode(event_block) { Some(ev) if ev.event == DriverEventKind::Reopened => ev, - None | Some(_) => return Err(StorageError::RawJournalEventCorrupted.into()), + Some(_) => return Err(StorageError::RawJournalDecodeInvalidEvent.into()), + None => return Err(StorageError::RawJournalDecodeEventCorruptedPayload.into()), }; jtrace_reader!(DriverEventExpectingReopenGotReopen); let valid_meta = okay! { @@ -824,7 +1077,7 @@ impl RawJournalReader { Ok(false) } else { jtrace_reader!(ErrInvalidReopenMetadata); - Err(StorageError::RawJournalCorrupted.into()) + Err(StorageError::RawJournalDecodeEventCorruptedMetadata.into()) } } } diff --git a/server/src/engine/storage/v2/raw/journal/raw/tests.rs b/server/src/engine/storage/v2/raw/journal/raw/tests/journal_ops.rs similarity index 68% rename from server/src/engine/storage/v2/raw/journal/raw/tests.rs rename to server/src/engine/storage/v2/raw/journal/raw/tests/journal_ops.rs index 94d46221..bb892d22 100644 --- a/server/src/engine/storage/v2/raw/journal/raw/tests.rs +++ b/server/src/engine/storage/v2/raw/journal/raw/tests/journal_ops.rs @@ -1,5 +1,5 @@ /* - * Created on Tue Jan 30 2024 + * Created on Tue Mar 26 2024 * * This file is a part of Skytable * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source @@ -26,173 +26,15 @@ use { super::{ - create_journal, open_journal, CommitPreference, DriverEvent, DriverEventKind, - JournalInitializer, RawJournalAdapter, RawJournalAdapterEvent, RawJournalWriter, - }, - crate::engine::{ - error::StorageError, - fractal::error::ErrorContext, - storage::{ - common::sdss::sdss_r1::rw::TrackedReader, - v2::raw::{ - journal::raw::{JournalReaderTraceEvent, JournalWriterTraceEvent}, - spec::SystemDatabaseV1, - }, + super::{ + create_journal, debug_get_trace, open_journal, DriverEventKind, + JournalReaderTraceEvent, JournalSettings, JournalWriterTraceEvent, RawJournalWriter, }, - RuntimeResult, + SimpleDB, SimpleDBJournal, }, - std::cell::RefCell, + crate::engine::fractal::error::ErrorContext, }; -#[test] -fn encode_decode_meta() { - let dv1 = DriverEvent::new(u128::MAX - 1, DriverEventKind::Reopened, 0, 0, 0); - let encoded1 = dv1.encode_self(); - let decoded1 = DriverEvent::decode(encoded1).unwrap(); - assert_eq!(dv1, decoded1); -} - -/* - impls for journal tests -*/ - -#[derive(Debug, Clone, PartialEq)] -pub struct SimpleDB { - data: RefCell>, -} -impl SimpleDB { - fn new() -> Self { - Self { - data: RefCell::default(), - } - } - fn data(&self) -> std::cell::Ref<'_, Vec> { - self.data.borrow() - } - fn clear(&mut self, log: &mut RawJournalWriter) -> RuntimeResult<()> { - log.commit_event(DbEventClear)?; - self.data.get_mut().clear(); - Ok(()) - } - fn pop(&mut self, log: &mut RawJournalWriter) -> RuntimeResult<()> { - self.data.get_mut().pop().unwrap(); - log.commit_event(DbEventPop)?; - Ok(()) - } - fn push( - &mut self, - log: &mut RawJournalWriter, - new: impl ToString, - ) -> RuntimeResult<()> { - let new = new.to_string(); - log.commit_event(DbEventPush(&new))?; - self.data.get_mut().push(new); - Ok(()) - } -} - -/* - event impls -*/ - -pub struct SimpleDBJournal; -struct DbEventPush<'a>(&'a str); -struct DbEventPop; -struct DbEventClear; -trait SimpleDBEvent: Sized { - const OPC: u8; - fn write_buffered(self, _: &mut Vec); -} -macro_rules! impl_db_event { - ($($ty:ty as $code:expr $(=> $expr:expr)?),*) => { - $(impl SimpleDBEvent for $ty { - const OPC: u8 = $code; - fn write_buffered(self, buf: &mut Vec) { let _ = buf; fn _do_it(s: $ty, b: &mut Vec, f: impl Fn($ty, &mut Vec)) { f(s, b) } $(_do_it(self, buf, $expr))? } - })* - } -} - -impl_db_event!( - DbEventPush<'_> as 0 => |me, buf| { - buf.extend(&(me.0.len() as u64).to_le_bytes()); - buf.extend(me.0.as_bytes()); - }, - DbEventPop as 1, - DbEventClear as 2 -); - -impl RawJournalAdapterEvent for T { - fn md(&self) -> u64 { - T::OPC as _ - } - fn write_buffered(self, buf: &mut Vec, _: ()) { - T::write_buffered(self, buf) - } -} - -#[derive(Debug, PartialEq, Clone, Copy)] -pub enum EventMeta { - NewKey, - Pop, - Clear, -} -impl RawJournalAdapter for SimpleDBJournal { - const COMMIT_PREFERENCE: CommitPreference = CommitPreference::Buffered; - type Spec = SystemDatabaseV1; - type GlobalState = SimpleDB; - type EventMeta = EventMeta; - type CommitContext = (); - type Context<'a> = () where Self: 'a; - fn initialize(_: &JournalInitializer) -> Self { - Self - } - fn enter_context<'a>(_: &'a mut RawJournalWriter) -> Self::Context<'a> { - () - } - fn parse_event_meta(meta: u64) -> Option { - Some(match meta { - 0 => EventMeta::NewKey, - 1 => EventMeta::Pop, - 2 => EventMeta::Clear, - _ => return None, - }) - } - fn commit_buffered<'a, E: RawJournalAdapterEvent>( - &mut self, - buf: &mut Vec, - event: E, - ctx: (), - ) { - event.write_buffered(buf, ctx) - } - fn decode_apply<'a>( - gs: &Self::GlobalState, - meta: Self::EventMeta, - file: &mut TrackedReader, - ) -> RuntimeResult<()> { - match meta { - EventMeta::NewKey => { - let key_size = u64::from_le_bytes(file.read_block()?); - let mut keybuf = vec![0u8; key_size as usize]; - file.tracked_read(&mut keybuf)?; - match String::from_utf8(keybuf) { - Ok(k) => gs.data.borrow_mut().push(k), - Err(_) => return Err(StorageError::RawJournalEventCorrupted.into()), - } - } - EventMeta::Clear => gs.data.borrow_mut().clear(), - EventMeta::Pop => { - let _ = gs.data.borrow_mut().pop().unwrap(); - } - } - Ok(()) - } -} - -/* - journal tests -*/ - #[test] fn journal_open_close() { const JOURNAL_NAME: &str = "journal_open_close"; @@ -200,12 +42,12 @@ fn journal_open_close() { // new boot let mut j = create_journal::(JOURNAL_NAME).unwrap(); assert_eq!( - super::obtain_trace(), + debug_get_trace(), intovec![JournalWriterTraceEvent::Initialized] ); RawJournalWriter::close_driver(&mut j).unwrap(); assert_eq!( - super::obtain_trace(), + debug_get_trace(), intovec![ JournalWriterTraceEvent::DriverEventAttemptCommit { event: DriverEventKind::Closed, @@ -219,12 +61,18 @@ fn journal_open_close() { } { // second boot - let mut j = open_journal::(JOURNAL_NAME, &SimpleDB::new()).unwrap(); + let mut j = open_journal::( + JOURNAL_NAME, + &SimpleDB::new(), + JournalSettings::default(), + ) + .unwrap(); assert_eq!( - super::obtain_trace(), + debug_get_trace(), intovec![ // init reader and read close event JournalReaderTraceEvent::Initialized, + JournalReaderTraceEvent::LookingForEvent, JournalReaderTraceEvent::AttemptingEvent(0), JournalReaderTraceEvent::DriverEventExpectingClose, JournalReaderTraceEvent::DriverEventCompletedBlockRead, @@ -244,7 +92,7 @@ fn journal_open_close() { ); RawJournalWriter::close_driver(&mut j).unwrap(); assert_eq!( - super::obtain_trace(), + debug_get_trace(), intovec![ JournalWriterTraceEvent::DriverEventAttemptCommit { event: DriverEventKind::Closed, @@ -258,21 +106,28 @@ fn journal_open_close() { } { // third boot - let mut j = open_journal::(JOURNAL_NAME, &SimpleDB::new()).unwrap(); + let mut j = open_journal::( + JOURNAL_NAME, + &SimpleDB::new(), + JournalSettings::default(), + ) + .unwrap(); assert_eq!( - super::obtain_trace(), + debug_get_trace(), intovec![ // init reader and read reopen event JournalReaderTraceEvent::Initialized, + JournalReaderTraceEvent::LookingForEvent, JournalReaderTraceEvent::AttemptingEvent(0), JournalReaderTraceEvent::DriverEventExpectingClose, JournalReaderTraceEvent::DriverEventCompletedBlockRead, JournalReaderTraceEvent::DriverEventExpectedCloseGotClose, - JournalReaderTraceEvent::AttemptingEvent(1), JournalReaderTraceEvent::DriverEventExpectingReopenBlock, + JournalReaderTraceEvent::AttemptingEvent(1), JournalReaderTraceEvent::DriverEventExpectingReopenGotReopen, JournalReaderTraceEvent::ReopenSuccess, // now read close event + JournalReaderTraceEvent::LookingForEvent, JournalReaderTraceEvent::AttemptingEvent(2), JournalReaderTraceEvent::DriverEventExpectingClose, JournalReaderTraceEvent::DriverEventCompletedBlockRead, @@ -292,7 +147,7 @@ fn journal_open_close() { ); RawJournalWriter::close_driver(&mut j).unwrap(); assert_eq!( - super::obtain_trace(), + debug_get_trace(), intovec![ JournalWriterTraceEvent::DriverEventAttemptCommit { event: DriverEventKind::Closed, @@ -316,7 +171,7 @@ fn journal_with_server_single_event() { db.push(&mut j, "hello world").unwrap(); RawJournalWriter::close_driver(&mut j).unwrap(); assert_eq!( - super::obtain_trace(), + debug_get_trace(), intovec![ JournalWriterTraceEvent::Initialized, JournalWriterTraceEvent::CommitAttemptForEvent(0), @@ -336,21 +191,23 @@ fn journal_with_server_single_event() { { let db = SimpleDB::new(); // second boot - let mut j = open_journal::(JOURNAL_NAME, &db) - .set_dmsg_fn(|| format!("{:?}", super::obtain_trace())) + let mut j = open_journal::(JOURNAL_NAME, &db, JournalSettings::default()) + .set_dmsg_fn(|| format!("{:?}", debug_get_trace())) .unwrap(); assert_eq!(db.data().len(), 1); assert_eq!(db.data()[0], "hello world"); assert_eq!( - super::obtain_trace(), + debug_get_trace(), intovec![ // init reader and read server event JournalReaderTraceEvent::Initialized, + JournalReaderTraceEvent::LookingForEvent, JournalReaderTraceEvent::AttemptingEvent(0), JournalReaderTraceEvent::DetectedServerEvent, JournalReaderTraceEvent::ServerEventMetadataParsed, JournalReaderTraceEvent::ServerEventAppliedSuccess, // now read close event + JournalReaderTraceEvent::LookingForEvent, JournalReaderTraceEvent::AttemptingEvent(1), JournalReaderTraceEvent::DriverEventExpectingClose, JournalReaderTraceEvent::DriverEventCompletedBlockRead, @@ -370,7 +227,7 @@ fn journal_with_server_single_event() { ); RawJournalWriter::close_driver(&mut j).unwrap(); assert_eq!( - super::obtain_trace(), + debug_get_trace(), intovec![ JournalWriterTraceEvent::DriverEventAttemptCommit { event: DriverEventKind::Closed, @@ -385,29 +242,33 @@ fn journal_with_server_single_event() { { // third boot let db = SimpleDB::new(); - let mut j = open_journal::(JOURNAL_NAME, &db).unwrap(); + let mut j = + open_journal::(JOURNAL_NAME, &db, JournalSettings::default()).unwrap(); assert_eq!(db.data().len(), 1); assert_eq!(db.data()[0], "hello world"); assert_eq!( - super::obtain_trace(), + debug_get_trace(), intovec![ // init reader and read server event JournalReaderTraceEvent::Initialized, + JournalReaderTraceEvent::LookingForEvent, JournalReaderTraceEvent::AttemptingEvent(0), JournalReaderTraceEvent::DetectedServerEvent, JournalReaderTraceEvent::ServerEventMetadataParsed, JournalReaderTraceEvent::ServerEventAppliedSuccess, // now read close event + JournalReaderTraceEvent::LookingForEvent, JournalReaderTraceEvent::AttemptingEvent(1), JournalReaderTraceEvent::DriverEventExpectingClose, JournalReaderTraceEvent::DriverEventCompletedBlockRead, JournalReaderTraceEvent::DriverEventExpectedCloseGotClose, // now read reopen event - JournalReaderTraceEvent::AttemptingEvent(2), JournalReaderTraceEvent::DriverEventExpectingReopenBlock, + JournalReaderTraceEvent::AttemptingEvent(2), JournalReaderTraceEvent::DriverEventExpectingReopenGotReopen, JournalReaderTraceEvent::ReopenSuccess, // now read close event + JournalReaderTraceEvent::LookingForEvent, JournalReaderTraceEvent::AttemptingEvent(3), JournalReaderTraceEvent::DriverEventExpectingClose, JournalReaderTraceEvent::DriverEventCompletedBlockRead, @@ -427,7 +288,7 @@ fn journal_with_server_single_event() { ); RawJournalWriter::close_driver(&mut j).unwrap(); assert_eq!( - super::obtain_trace(), + debug_get_trace(), intovec![ JournalWriterTraceEvent::DriverEventAttemptCommit { event: DriverEventKind::Closed, @@ -453,7 +314,8 @@ fn multi_boot() { } { let mut db = SimpleDB::new(); - let mut j = open_journal::("multiboot", &db).unwrap(); + let mut j = + open_journal::("multiboot", &db, JournalSettings::default()).unwrap(); assert_eq!(db.data().as_ref(), vec!["key_a".to_string()]); db.clear(&mut j).unwrap(); db.push(&mut j, "myfinkey").unwrap(); @@ -461,7 +323,8 @@ fn multi_boot() { } { let db = SimpleDB::new(); - let mut j = open_journal::("multiboot", &db).unwrap(); + let mut j = + open_journal::("multiboot", &db, JournalSettings::default()).unwrap(); assert_eq!(db.data().as_ref(), vec!["myfinkey".to_string()]); RawJournalWriter::close_driver(&mut j).unwrap(); } diff --git a/server/src/engine/storage/v2/raw/journal/raw/tests/mod.rs b/server/src/engine/storage/v2/raw/journal/raw/tests/mod.rs new file mode 100644 index 00000000..7afdcf2f --- /dev/null +++ b/server/src/engine/storage/v2/raw/journal/raw/tests/mod.rs @@ -0,0 +1,218 @@ +/* + * Created on Tue Jan 30 2024 + * + * This file is a part of Skytable + * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source + * NoSQL database written by Sayan Nandan ("the Author") with the + * vision to provide flexibility in data modelling without compromising + * on performance, queryability or scalability. + * + * Copyright (c) 2024, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +mod journal_ops; +mod recovery; + +use { + super::{ + CommitPreference, DriverEvent, DriverEventKind, JournalInitializer, RawJournalAdapter, + RawJournalAdapterEvent, RawJournalWriter, + }, + crate::engine::{ + error::StorageError, + storage::{ + common::{checksum::SCrc64, sdss::sdss_r1::rw::TrackedReader}, + v2::raw::spec::SystemDatabaseV1, + }, + RuntimeResult, + }, + std::cell::RefCell, +}; + +const SANE_MEM_LIMIT_BYTES: usize = 2048; + +/* + impls for journal tests +*/ + +#[derive(Debug, Clone, PartialEq)] +pub struct SimpleDB { + data: RefCell>, +} +impl SimpleDB { + fn new() -> Self { + Self { + data: RefCell::default(), + } + } + fn data(&self) -> std::cell::Ref<'_, Vec> { + self.data.borrow() + } + fn clear(&mut self, log: &mut RawJournalWriter) -> RuntimeResult<()> { + log.commit_event(DbEventClear)?; + self.data.get_mut().clear(); + Ok(()) + } + fn pop(&mut self, log: &mut RawJournalWriter) -> RuntimeResult<()> { + self.data.get_mut().pop().unwrap(); + log.commit_event(DbEventPop)?; + Ok(()) + } + fn push( + &mut self, + log: &mut RawJournalWriter, + new: impl ToString, + ) -> RuntimeResult<()> { + let new = new.to_string(); + log.commit_event(DbEventPush(&new))?; + self.data.get_mut().push(new); + Ok(()) + } +} + +/* + event impls +*/ + +#[derive(Debug)] +pub struct SimpleDBJournal; +struct DbEventPush<'a>(&'a str); +struct DbEventPop; +struct DbEventClear; +trait SimpleDBEvent: Sized { + const OPC: u8; + fn write_buffered(self, _: &mut Vec); +} +macro_rules! impl_db_event { + ($($ty:ty as $code:expr $(=> $expr:expr)?),*) => { + $(impl SimpleDBEvent for $ty { + const OPC: u8 = $code; + fn write_buffered(self, buf: &mut Vec) { let _ = buf; fn _do_it(s: $ty, b: &mut Vec, f: impl Fn($ty, &mut Vec)) { f(s, b) } $(_do_it(self, buf, $expr))? } + })* + } +} + +impl_db_event!( + DbEventPush<'_> as 0 => |me, buf| { + let length_bytes = (me.0.len() as u64).to_le_bytes(); + let me_bytes = me.0.as_bytes(); + let mut checksum = SCrc64::new(); + checksum.update(&length_bytes); + checksum.update(&me_bytes); + buf.extend(&(checksum.finish().to_le_bytes())); // checksum + buf.extend(&length_bytes); // length + buf.extend(me.0.as_bytes()); // payload + }, + DbEventPop as 1, + DbEventClear as 2 +); + +impl RawJournalAdapterEvent for T { + fn md(&self) -> u64 { + T::OPC as _ + } + fn write_buffered(self, buf: &mut Vec, _: ()) { + T::write_buffered(self, buf) + } +} + +#[derive(Debug, PartialEq, Clone, Copy)] +pub enum EventMeta { + NewKey, + Pop, + Clear, +} +impl RawJournalAdapter for SimpleDBJournal { + const COMMIT_PREFERENCE: CommitPreference = CommitPreference::Buffered; + type Spec = SystemDatabaseV1; + type GlobalState = SimpleDB; + type EventMeta = EventMeta; + type CommitContext = (); + type Context<'a> = () where Self: 'a; + fn initialize(_: &JournalInitializer) -> Self { + Self + } + fn enter_context<'a>(_: &'a mut RawJournalWriter) -> Self::Context<'a> { + () + } + fn parse_event_meta(meta: u64) -> Option { + Some(match meta { + 0 => EventMeta::NewKey, + 1 => EventMeta::Pop, + 2 => EventMeta::Clear, + _ => return None, + }) + } + fn commit_buffered<'a, E: RawJournalAdapterEvent>( + &mut self, + buf: &mut Vec, + event: E, + ctx: (), + ) { + event.write_buffered(buf, ctx) + } + fn decode_apply<'a>( + gs: &Self::GlobalState, + meta: Self::EventMeta, + file: &mut TrackedReader, + ) -> RuntimeResult<()> { + match meta { + EventMeta::NewKey => { + let checksum = u64::from_le_bytes(file.read_block()?); + let length_u64 = u64::from_le_bytes(file.read_block()?); + let length = length_u64 as usize; + let mut payload = Vec::::new(); + if length > SANE_MEM_LIMIT_BYTES + || payload.try_reserve_exact(length as usize).is_err() + { + return Err(StorageError::RawJournalDecodeEventCorruptedPayload.into()); + } + unsafe { + payload.as_mut_ptr().write_bytes(0, length); + payload.set_len(length); + } + file.tracked_read(&mut payload)?; + let mut this_checksum = SCrc64::new(); + this_checksum.update(&length_u64.to_le_bytes()); + this_checksum.update(&payload); + match String::from_utf8(payload) { + Ok(k) if this_checksum.finish() == checksum => gs.data.borrow_mut().push(k), + Err(_) | Ok(_) => { + return Err(StorageError::RawJournalDecodeEventCorruptedPayload.into()) + } + } + } + EventMeta::Clear => gs.data.borrow_mut().clear(), + EventMeta::Pop => { + let _ = gs.data.borrow_mut().pop().unwrap(); + } + } + Ok(()) + } +} + +/* + basic tests +*/ + +#[test] +fn encode_decode_meta() { + let dv1 = DriverEvent::new(u128::MAX - 1, DriverEventKind::Reopened, 0, 0, 0); + let encoded1 = dv1.encode_self(); + let decoded1 = DriverEvent::decode(encoded1).unwrap(); + assert_eq!(dv1, decoded1); +} diff --git a/server/src/engine/storage/v2/raw/journal/raw/tests/recovery.rs b/server/src/engine/storage/v2/raw/journal/raw/tests/recovery.rs new file mode 100644 index 00000000..17a18bb4 --- /dev/null +++ b/server/src/engine/storage/v2/raw/journal/raw/tests/recovery.rs @@ -0,0 +1,1497 @@ +/* + * Created on Tue Mar 26 2024 + * + * This file is a part of Skytable + * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source + * NoSQL database written by Sayan Nandan ("the Author") with the + * vision to provide flexibility in data modelling without compromising + * on performance, queryability or scalability. + * + * Copyright (c) 2024, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +use { + super::{SimpleDB, SimpleDBJournal}, + crate::{ + engine::{ + error::ErrorKind, + storage::{ + common::{ + interface::fs::{File, FileExt, FileSystem, FileWrite, FileWriteExt}, + sdss::sdss_r1::FileSpecV1, + }, + v2::raw::journal::{ + create_journal, open_journal, + raw::{ + debug_get_offsets, debug_get_trace, debug_set_offset_tracking, DriverEvent, + DriverEventKind, JournalReaderTraceEvent, JournalWriterTraceEvent, + RawJournalWriter, + }, + repair_journal, JournalRepairMode, JournalSettings, RawJournalAdapter, + RepairResult, + }, + }, + RuntimeResult, + }, + IoResult, + }, + std::{collections::BTreeMap, io::ErrorKind as IoErrorKind, ops::Range}, +}; + +/// the number of events that we want to usually emulate for +const TRIALS: usize = 100; +/// See impl of [`apply_event_mix`]. We remove every 10th element +const POST_TRIALS_SIZE: usize = TRIALS - (TRIALS / 10); +/// a test key for single events +const KEY: &str = concat!("1234567890-0987654321"); +const SIMPLEDB_JOURNAL_HEADER_SIZE: usize = + <::Spec as FileSpecV1>::SIZE; + +/// The initializer for a corruption test case +struct Initializer { + journal_id: &'static str, + initializer_fn: fn(&str) -> RuntimeResult, + last_event_size: usize, +} + +#[derive(Debug)] +/// Information about the modified journal generated by an [`Initializer`] +struct ModifiedJournalInfo { + init: InitializerInfo, + storage: ModifiedJournalStorageInfo, + initializer_id: usize, +} + +impl ModifiedJournalInfo { + fn new( + init: InitializerInfo, + storage: ModifiedJournalStorageInfo, + initializer_id: usize, + ) -> Self { + Self { + init, + storage, + initializer_id, + } + } +} + +#[derive(Debug, Clone, Copy)] +/// Information about the initial state of a "good journal". Generated from [`Initializer`] +struct InitializerInfo { + corrupted_event_id: u64, + last_executed_event_id: u64, +} + +impl InitializerInfo { + /// The initializer only creates one event + fn new_last_event(last_event_id: u64) -> Self { + Self::new(last_event_id, last_event_id) + } + fn new(corrupted_event_id: u64, last_executed_event_id: u64) -> Self { + Self { + corrupted_event_id, + last_executed_event_id, + } + } + /// Returns true if the initializer created multiple events (and not a single event) + fn not_last_event(&self) -> bool { + self.corrupted_event_id != self.last_executed_event_id + } +} + +impl Initializer { + fn new( + name: &'static str, + f: fn(&str) -> RuntimeResult, + last_event_size: usize, + ) -> Self { + Self { + journal_id: name, + initializer_fn: f, + last_event_size, + } + } + fn new_driver_type(name: &'static str, f: fn(&str) -> RuntimeResult) -> Self { + Self::new(name, f, DriverEvent::FULL_EVENT_SIZE) + } +} + +fn make_corrupted_file_name(journal_id: &str, trim_size: usize) -> String { + format!("{journal_id}-trimmed-{trim_size}.db") +} + +#[derive(Debug)] +/// Information about the layout of the modified journal +struct ModifiedJournalStorageInfo { + original_file_size: usize, + modified_file_size: usize, + corruption_range: Range, +} + +impl ModifiedJournalStorageInfo { + fn new( + original_file_size: usize, + modified_file_size: usize, + corruption_range: Range, + ) -> Self { + Self { + original_file_size, + modified_file_size, + corruption_range, + } + } +} + +/** + Emulate a sequentially varying corruption. + - The initializer creates a modified journal and provides information about it + - We go over each initializer and then enumerate a bunch of corruption test cases. + - Generally, we take the size of the event, n, (it isn't necessary that it's a static size but + should atleast be computable/traced somehow) and then shave off 1 bit, followed by upto n bytes +*/ +fn emulate_sequentially_varying_single_corruption( + initializers: impl IntoIterator, + modified_journal_generator_fn: impl Fn( + &str, + &str, + &InitializerInfo, + usize, + &BTreeMap, + ) -> IoResult, + post_corruption_handler: impl Fn( + &str, + &ModifiedJournalInfo, + usize, + SimpleDB, + RuntimeResult>, + ), + post_repair_handler: impl Fn( + &str, + &ModifiedJournalInfo, + usize, + RuntimeResult, + SimpleDB, + RuntimeResult>, + ), +) { + for ( + initializer_id, + Initializer { + journal_id, + initializer_fn, + last_event_size, + }, + ) in initializers.into_iter().enumerate() + { + // initialize journal, get size and clear traces + let initializer_info = match initializer_fn(journal_id) { + Ok(nid) => nid, + Err(e) => panic!( + "failed to initialize {journal_id} due to {e}. trace: {:?}, file_data={:?}", + debug_get_trace(), + FileSystem::read(journal_id), + ), + }; + let _ = debug_get_trace(); + let original_offsets = debug_get_offsets(); + // now trim and repeat + for trim_size in 1..=last_event_size { + // create a copy of the "good" journal and corrupt it + let corrupted_journal_path = make_corrupted_file_name(journal_id, trim_size); + let open_journal_fn = |db: &SimpleDB| { + open_journal::( + &corrupted_journal_path, + db, + JournalSettings::default(), + ) + }; + // modify journal + let storage_info = modified_journal_generator_fn( + journal_id, + &corrupted_journal_path, + &initializer_info, + trim_size, + &original_offsets, + ) + .unwrap(); + assert_ne!(storage_info.corruption_range.len(), 0); + assert_ne!( + storage_info.modified_file_size, + storage_info.original_file_size + ); + let modified_journal_info = + ModifiedJournalInfo::new(initializer_info, storage_info, initializer_id); + // now let the caller handle any post corruption work + { + let sdb = SimpleDB::new(); + let open_journal_result = open_journal_fn(&sdb); + post_corruption_handler( + journal_id, + &modified_journal_info, + trim_size, + sdb, + open_journal_result, + ); + } + // repair and let the caller handle post repair work + let repair_result; + { + let sdb = SimpleDB::new(); + repair_result = repair_journal::( + &corrupted_journal_path, + &sdb, + JournalSettings::default(), + JournalRepairMode::Simple, + ); + } + { + let sdb = SimpleDB::new(); + let repaired_journal_reopen_result = open_journal_fn(&sdb); + // let caller handle any post repair work + post_repair_handler( + journal_id, + &modified_journal_info, + trim_size, + repair_result, + sdb, + repaired_journal_reopen_result, + ); + } + // we're done, delete the corrupted journal + FileSystem::remove_file(&corrupted_journal_path).unwrap(); + } + // delete the good journal, we're done with this one as well + FileSystem::remove_file(journal_id).unwrap(); + } +} + +/// In this emulation, we sequentially corrupt the last event across multiple trials +fn emulate_final_event_corruption( + initializers: impl IntoIterator, + post_corruption_handler: impl Fn( + &str, + &ModifiedJournalInfo, + usize, + SimpleDB, + RuntimeResult>, + ), + post_repair_handler: impl Fn( + &str, + &ModifiedJournalInfo, + usize, + RuntimeResult, + SimpleDB, + RuntimeResult>, + ), +) { + emulate_sequentially_varying_single_corruption( + initializers, + |original_journal, modified_journal, _, trim_amount, _offsets| { + FileSystem::copy(original_journal, modified_journal)?; + let mut f = File::open(modified_journal)?; + let real_flen = f.f_len()? as usize; + f.f_truncate((real_flen - trim_amount) as _)?; + Ok(ModifiedJournalStorageInfo::new( + real_flen, + trim_amount, + trim_amount..real_flen, + )) + }, + post_corruption_handler, + post_repair_handler, + ) +} + +/// In this emulation, we sequentially corrupt an intermediary event across multiple trials +fn emulate_midway_corruption( + initializers: impl IntoIterator, + post_corruption_handler: impl Fn( + &str, + &ModifiedJournalInfo, + usize, + SimpleDB, + RuntimeResult>, + ), + post_repair_handler: impl Fn( + &str, + &ModifiedJournalInfo, + usize, + RuntimeResult, + SimpleDB, + RuntimeResult>, + ), +) { + emulate_sequentially_varying_single_corruption( + initializers, + |original_journal_path, + corrupted_journal_path, + initializer_info, + trim_size, + original_offsets| { + let orig_journal_data = FileSystem::read(original_journal_path)?; + let orig_journal_size = orig_journal_data.len(); + let mut f = File::create(corrupted_journal_path)?; + let end_offset = *original_offsets + .get(&initializer_info.corrupted_event_id) + .unwrap() as usize; + // apply + let segment_before_corruption = &orig_journal_data[..end_offset - trim_size]; + let segment_after_corruption = &orig_journal_data[end_offset..]; + let new_size = segment_before_corruption.len() + segment_after_corruption.len(); + assert!( + new_size < orig_journal_size, + "real len is {orig_journal_size} while new len is {new_size}", + ); + assert_eq!( + segment_before_corruption.len() + segment_after_corruption.len() + trim_size, + orig_journal_size + ); + f.fwrite_all(segment_before_corruption)?; + f.fwrite_all(segment_after_corruption)?; + let corruption_range = end_offset - trim_size..end_offset; + assert_eq!(corruption_range.len(), trim_size); + Ok(ModifiedJournalStorageInfo::new( + orig_journal_size, + new_size, + corruption_range, + )) + }, + post_corruption_handler, + post_repair_handler, + ) +} + +/// Format a key as a string (padded to six bytes) +fn keyfmt(num: usize) -> String { + format!("key-{num:06}") +} + +/// Apply an event mix +/// - Add [`TRIALS`] count of elements +/// - Remove every 10th element +fn apply_event_mix(jrnl: &mut RawJournalWriter) -> RuntimeResult { + let mut op_count = 0; + let mut sdb = SimpleDB::new(); + for num in 1..=TRIALS { + op_count += 1; + sdb.push(jrnl, keyfmt(num))?; + if num % 10 == 0 { + op_count += 1; + sdb.pop(jrnl)?; + } + } + assert_eq!(sdb.data().len(), POST_TRIALS_SIZE); + Ok(op_count) +} + +#[test] +fn corruption_before_close() { + let initializers = [ + Initializer::new_driver_type( + /* + in this case we: create, close (0), corrupt close (0) + */ + "close_event_corruption_empty.db", + |jrnl_id| { + let mut jrnl = create_journal::(jrnl_id)?; + RawJournalWriter::close_driver(&mut jrnl)?; + Ok(InitializerInfo::new_last_event(0)) + }, + ), + Initializer::new_driver_type( + /* + in this case we: create, apply events ([0,99]), close (100). corrupt close (100). expect no data loss. + */ + "close_event_corruption.db", + |jrnl_id| { + let mut jrnl = create_journal::(jrnl_id)?; + let operation_count = apply_event_mix(&mut jrnl)?; + RawJournalWriter::close_driver(&mut jrnl)?; + Ok(InitializerInfo::new_last_event(operation_count)) + }, + ), + Initializer::new_driver_type( + /* + in this case we: create, close (0), reopen(1), close(2). corrupt last close (2) + */ + "close_event_corruption_open_close_open_close.db", + |jrnl_id| { + // open and close + let mut jrnl = create_journal::(jrnl_id)?; + RawJournalWriter::close_driver(&mut jrnl)?; + drop(jrnl); + // reinit and close + let mut jrnl = open_journal::( + jrnl_id, + &SimpleDB::new(), + JournalSettings::default(), + )?; + RawJournalWriter::close_driver(&mut jrnl)?; + Ok(InitializerInfo::new_last_event(2)) + }, + ), + ]; + emulate_final_event_corruption( + initializers, + |journal_id, modified_journal_info, trim_size, db, open_result| { + // open the journal and validate failure + let open_err = open_result.unwrap_err(); + let trace = debug_get_trace(); + if trim_size > (DriverEvent::FULL_EVENT_SIZE - (sizeof!(u128) + sizeof!(u64))) { + // the amount of trim from the end of the file causes us to lose valuable metadata + if modified_journal_info.init.last_executed_event_id == 0 { + // empty log + assert_eq!( + db.data().len(), + 0, + "failed at trim_size {trim_size} for journal {journal_id}" + ); + assert_eq!( + trace, + intovec![ + JournalReaderTraceEvent::Initialized, + JournalReaderTraceEvent::LookingForEvent + ], + "failed at trim_size {trim_size} for journal {journal_id}" + ) + } else { + if modified_journal_info.initializer_id == 1 { + // in the second case, we apply the event mix so we need to check this + assert_eq!( + db.data().len(), + POST_TRIALS_SIZE, + "failed at trim_size {trim_size} for journal {journal_id}" + ); + assert_eq!( + *db.data().last().unwrap(), + keyfmt(TRIALS - 1), + "failed at trim_size {trim_size} for journal {journal_id}" + ); + } else { + assert_eq!( + db.data().len(), + 0, + "failed at trim_size {trim_size} for journal {journal_id}" + ); + } + assert_eq!( + *trace.last().unwrap(), + JournalReaderTraceEvent::LookingForEvent.into(), + "failed at trim_size {trim_size} for journal {journal_id}" + ); + } + } else { + // the amount of trim still allows us to read some metadata + if modified_journal_info.init.last_executed_event_id == 0 { + // empty log + assert_eq!( + db.data().len(), + 0, + "failed at trim_size {trim_size} for journal {journal_id}" + ); + assert_eq!( + trace, + intovec![ + JournalReaderTraceEvent::Initialized, + JournalReaderTraceEvent::LookingForEvent, + JournalReaderTraceEvent::AttemptingEvent( + modified_journal_info.init.corrupted_event_id + ), + JournalReaderTraceEvent::DriverEventExpectingClose, + ], + "failed at trim_size {trim_size} for journal {journal_id}" + ) + } else { + if modified_journal_info.initializer_id == 1 { + // in the second case, we apply the event mix so we need to check this + assert_eq!( + db.data().len(), + POST_TRIALS_SIZE, + "failed at trim_size {trim_size} for journal {journal_id}" + ); + assert_eq!( + *db.data().last().unwrap(), + keyfmt(TRIALS - 1), + "failed at trim_size {trim_size} for journal {journal_id}" + ); + } else { + assert_eq!( + db.data().len(), + 0, + "failed at trim_size {trim_size} for journal {journal_id}" + ); + } + assert_eq!( + &trace[trace.len() - 3..], + &into_array![ + JournalReaderTraceEvent::LookingForEvent, + JournalReaderTraceEvent::AttemptingEvent( + modified_journal_info.init.corrupted_event_id + ), + JournalReaderTraceEvent::DriverEventExpectingClose + ], + "failed at trim_size {trim_size} for journal {journal_id}" + ); + } + } + assert_eq!( + open_err.kind(), + &ErrorKind::IoError(IoErrorKind::UnexpectedEof.into()), + "failed at trim_size {trim_size} for journal {journal_id}" + ); + }, + |journal_id, modified_journal_info, trim_size, repair_result, db, reopen_result| { + assert_eq!( + repair_result.expect(&format!( + "failed at trim_size {trim_size} for journal {journal_id}" + )), + RepairResult::UnspecifiedLoss((DriverEvent::FULL_EVENT_SIZE - trim_size) as _), + "failed at trim_size {trim_size} for journal {journal_id}" + ); + if modified_journal_info.init.last_executed_event_id == 0 + || modified_journal_info.initializer_id == 2 + { + assert_eq!( + db.data().len(), + 0, + "failed at trim_size {trim_size} for journal {journal_id}" + ); + } else { + // in the second case, we apply the event mix so we need to check this + assert_eq!( + db.data().len(), + POST_TRIALS_SIZE, + "failed at trim_size {trim_size} for journal {journal_id}" + ); + assert_eq!( + *db.data().last().unwrap(), + keyfmt(TRIALS - 1), + "failed at trim_size {trim_size} for journal {journal_id}" + ); + } + let _ = reopen_result.expect(&format!( + "failed at trim_size {trim_size} for journal {journal_id}" + )); + // clear trace + let _ = debug_get_trace(); + let _ = debug_get_offsets(); + }, + ) +} + +#[test] +fn corruption_after_reopen() { + let initializers = [ + Initializer::new_driver_type( + /* + in this case we: create, close (0), reopen(1). corrupt reopen (1) + */ + "corruption_after_reopen.db", + |jrnl_id| { + let mut jrnl = create_journal::(jrnl_id)?; + RawJournalWriter::close_driver(&mut jrnl)?; + drop(jrnl); + // reopen, but don't close + open_journal::( + jrnl_id, + &SimpleDB::new(), + JournalSettings::default(), + )?; + Ok(InitializerInfo::new_last_event(1)) + }, + ), + Initializer::new_driver_type( + /* + in this case we: create, apply events([0,99]), close (100), reopen(101). corrupt reopen (101). expect no data loss. + */ + "corruption_after_ropen_multi_before_close.db", + |jrnl_id| { + let mut jrnl = create_journal::(jrnl_id)?; + let operation_count = apply_event_mix(&mut jrnl)?; + RawJournalWriter::close_driver(&mut jrnl)?; + drop(jrnl); + // reopen, but don't close + open_journal::( + jrnl_id, + &SimpleDB::new(), + JournalSettings::default(), + )?; + Ok(InitializerInfo::new_last_event(operation_count + 1)) // + 1 since we have the reopen event which is the next event that'll vanish + }, + ), + ]; + emulate_final_event_corruption( + initializers, + |journal_id, modified_journal_info, trim_size, db, open_result| { + let trace = debug_get_trace(); + if trim_size == DriverEvent::FULL_EVENT_SIZE { + /* + IMPORTANT IFFY SITUATION: undetectable error. if an entire "correct" part of the log vanishes, it's not going to be detected. + while possible in theory, it's going to have to be one heck of a coincidence for it to happen in practice. the only way to work + around this is to use a secondary checksum. I'm not a fan of that approach either (and I don't even consider it to be a good mitigation) + because it can potentially violate consistency, conflicting the source of truth. for example: if we have a database crash, should we trust + the checksum file or the log? guarding that further requires an enormous amount of effort and it will still have holes and ironically, + will potentially introduce more bugs due to increased complexity. Get a good filesystem and disk controller (that attaches checksums to sectors)! + -- @ohsayan + */ + let mut jrnl = open_result.expect(&format!( + "failed at trim_size {trim_size} for journal {journal_id}" + )); + if modified_journal_info.init.last_executed_event_id == 1 { + // empty log, only the reopen + assert_eq!( + db.data().len(), + 0, + "failed at trim_size {trim_size} for journal {journal_id}" + ); + assert_eq!( + trace, + intovec![ + JournalReaderTraceEvent::Initialized, + JournalReaderTraceEvent::LookingForEvent, + JournalReaderTraceEvent::AttemptingEvent(0), + JournalReaderTraceEvent::DriverEventExpectingClose, + JournalReaderTraceEvent::DriverEventCompletedBlockRead, + JournalReaderTraceEvent::DriverEventExpectedCloseGotClose, + JournalReaderTraceEvent::ClosedAndReachedEof, + JournalReaderTraceEvent::Completed, + JournalWriterTraceEvent::ReinitializeAttempt, + JournalWriterTraceEvent::DriverEventAttemptCommit { + event: DriverEventKind::Reopened, + event_id: modified_journal_info.init.corrupted_event_id, + prev_id: 0 + }, + JournalWriterTraceEvent::DriverEventCompleted, + JournalWriterTraceEvent::ReinitializeComplete, + ], + "failed at trim_size {trim_size} for journal {journal_id}" + ); + } else { + // we will have upto the last event since only the reopen is gone + assert_eq!( + db.data().len(), + POST_TRIALS_SIZE, + "failed at trim_size {trim_size} for journal {journal_id}" + ); + assert_eq!( + *db.data().last().unwrap(), + keyfmt(TRIALS - 1), + "failed at trim_size {trim_size} for journal {journal_id}" + ); + assert_eq!( + &trace[trace.len() - 12..], + intovec![ + JournalReaderTraceEvent::ServerEventAppliedSuccess, + JournalReaderTraceEvent::LookingForEvent, + JournalReaderTraceEvent::AttemptingEvent( + modified_journal_info.init.corrupted_event_id - 1 + ), // close event + JournalReaderTraceEvent::DriverEventExpectingClose, + JournalReaderTraceEvent::DriverEventCompletedBlockRead, + JournalReaderTraceEvent::DriverEventExpectedCloseGotClose, + JournalReaderTraceEvent::ClosedAndReachedEof, + JournalReaderTraceEvent::Completed, + JournalWriterTraceEvent::ReinitializeAttempt, + JournalWriterTraceEvent::DriverEventAttemptCommit { + event: DriverEventKind::Reopened, + event_id: modified_journal_info.init.corrupted_event_id, + prev_id: modified_journal_info.init.corrupted_event_id - 1 // close event + }, + JournalWriterTraceEvent::DriverEventCompleted, + JournalWriterTraceEvent::ReinitializeComplete + ], + "failed at trim_size {trim_size} for journal {journal_id}" + ) + } + // now close this so that this works with the post repair handler + RawJournalWriter::close_driver(&mut jrnl).unwrap(); + let _ = debug_get_offsets(); + let _ = debug_get_trace(); + } else { + assert_eq!( + open_result.unwrap_err().kind(), + &ErrorKind::IoError(IoErrorKind::UnexpectedEof.into()), + "failed at trim_size {trim_size} for journal {journal_id}" + ); + if modified_journal_info.init.last_executed_event_id == 1 { + // empty log, only the reopen + assert_eq!( + db.data().len(), + 0, + "failed at trim_size {trim_size} for journal {journal_id}" + ); + assert_eq!( + trace, + intovec![ + JournalReaderTraceEvent::Initialized, + JournalReaderTraceEvent::LookingForEvent, + JournalReaderTraceEvent::AttemptingEvent(0), + JournalReaderTraceEvent::DriverEventExpectingClose, + JournalReaderTraceEvent::DriverEventCompletedBlockRead, + JournalReaderTraceEvent::DriverEventExpectedCloseGotClose, + JournalReaderTraceEvent::DriverEventExpectingReopenBlock, + JournalReaderTraceEvent::AttemptingEvent( + modified_journal_info.init.corrupted_event_id + ) + ], + "failed at trim_size {trim_size} for journal {journal_id}" + ); + } else { + // we will have upto the last event since only the reopen is gone + assert_eq!(db.data().len(), POST_TRIALS_SIZE); + assert_eq!(*db.data().last().unwrap(), keyfmt(TRIALS - 1)); + assert_eq!( + &trace[trace.len() - 5..], + intovec![ + JournalReaderTraceEvent::DriverEventExpectingClose, + JournalReaderTraceEvent::DriverEventCompletedBlockRead, + JournalReaderTraceEvent::DriverEventExpectedCloseGotClose, + JournalReaderTraceEvent::DriverEventExpectingReopenBlock, + JournalReaderTraceEvent::AttemptingEvent( + modified_journal_info.init.corrupted_event_id + ) + ], + "failed at trim_size {trim_size} for journal {journal_id}" + ); + } + } + }, + |journal_id, modified_journal_info, trim_size, repair_result, db, reopen_result| { + assert!(reopen_result.is_ok()); + if trim_size == DriverEvent::FULL_EVENT_SIZE { + // see earlier comment + assert_eq!( + repair_result.expect(&format!( + "failed at trim_size {trim_size} for journal {journal_id}" + )), + RepairResult::NoErrors, + "failed at trim_size {trim_size} for journal {journal_id}" + ); + } else { + assert_eq!( + repair_result.expect(&format!( + "failed at trim_size {trim_size} for journal {journal_id}" + )), + RepairResult::UnspecifiedLoss( + (DriverEvent::FULL_EVENT_SIZE - trim_size) as u64 + ), + "failed at trim_size {trim_size} for journal {journal_id}" + ); + } + if modified_journal_info.init.last_executed_event_id == 1 { + assert_eq!( + db.data().len(), + 0, + "failed at trim_size {trim_size} for journal {journal_id}" + ); + } else { + assert_eq!(db.data().len(), POST_TRIALS_SIZE); + assert_eq!(*db.data().last().unwrap(), keyfmt(TRIALS - 1)); + } + let _ = debug_get_trace(); + let _ = debug_get_offsets(); + }, + ) +} + +#[test] +fn corruption_at_runtime() { + // first get the offsets to compute the size of the event + let offset = { + debug_set_offset_tracking(true); + let mut sdb = SimpleDB::new(); + let mut jrnl = create_journal("corruption_at_runtime_test_log.db").unwrap(); + sdb.push(&mut jrnl, KEY).unwrap(); + let (_, offset) = debug_get_offsets().pop_last().unwrap(); + let ret = offset as usize - SIMPLEDB_JOURNAL_HEADER_SIZE; + debug_set_offset_tracking(false); + let _ = debug_get_trace(); + ret + }; + let initializers = [ + Initializer::new( + /* + for this one we: + - PRC1: we create and apply one event (0) + + exepct data loss (0). + */ + "corruption_at_runtime_open_commit_corrupt", + |jrnl_id| { + let mut sdb = SimpleDB::new(); + let mut jrnl = create_journal(jrnl_id)?; + sdb.push(&mut jrnl, KEY)?; + // don't close + Ok(InitializerInfo::new_last_event(0)) + }, + offset, + ), + Initializer::new( + /* + for this one we: + - PRC1: we create and apply events ([0,99]) + expect data loss (99) + */ + "corruption_at_runtime_open_multi_commit_then_corrupt", + |jrnl_id| { + let mut op_count = 0; + let mut sdb = SimpleDB::new(); + let mut jrnl = create_journal(jrnl_id)?; + for _ in 1..=TRIALS { + sdb.push(&mut jrnl, KEY)?; + op_count += 1; + } + // don't close + Ok(InitializerInfo::new_last_event(op_count)) + }, + offset, + ), + ]; + emulate_final_event_corruption( + initializers, + |journal_id, modified_journal_info, trim_size, db, open_result| { + let trace = debug_get_trace(); + let err = open_result.unwrap_err(); + assert_eq!( + err.kind(), + &ErrorKind::IoError(IoErrorKind::UnexpectedEof.into()), + "failed for journal {journal_id} with trim_size {trim_size}" + ); + if trim_size > offset - (sizeof!(u128) + sizeof!(u64)) { + if modified_journal_info.init.last_executed_event_id == 0 { + assert_eq!( + db.data().len(), + 0, + "failed for journal {journal_id} with trim_size {trim_size}" + ); + assert_eq!( + trace, + intovec![ + JournalReaderTraceEvent::Initialized, + JournalReaderTraceEvent::LookingForEvent, + ], + "failed for journal {journal_id} with trim_size {trim_size}" + ) + } else { + // we lost the last server event, so we'll have one key less + assert_eq!( + db.data().len(), + TRIALS - 1, + "failed for journal {journal_id} with trim_size {trim_size}" + ); + assert_eq!( + db.data()[TRIALS - 2], + KEY, + "failed for journal {journal_id} with trim_size {trim_size}" + ); + assert_eq!( + &trace[trace.len() - 4..], + intovec![ + JournalReaderTraceEvent::DetectedServerEvent, + JournalReaderTraceEvent::ServerEventMetadataParsed, + JournalReaderTraceEvent::ServerEventAppliedSuccess, + JournalReaderTraceEvent::LookingForEvent, + ], + "failed for journal {journal_id} with trim_size {trim_size}" + ) + } + } else { + if modified_journal_info.init.last_executed_event_id == 0 { + // empty log + assert_eq!( + db.data().len(), + 0, + "failed for journal {journal_id} with trim_size {trim_size}" + ); + assert_eq!( + trace, + intovec![ + JournalReaderTraceEvent::Initialized, + JournalReaderTraceEvent::LookingForEvent, + JournalReaderTraceEvent::AttemptingEvent(0), + JournalReaderTraceEvent::DetectedServerEvent, + JournalReaderTraceEvent::ServerEventMetadataParsed, + ], + "failed for journal {journal_id} with trim_size {trim_size}" + ); + } else { + // we lost the last server event, so we'll have one key less + assert_eq!( + db.data().len(), + TRIALS - 1, + "failed for journal {journal_id} with trim_size {trim_size}" + ); + assert_eq!( + db.data()[TRIALS - 2], + KEY, + "failed for journal {journal_id} with trim_size {trim_size}" + ); + assert_eq!( + &trace[trace.len() - 4..], + intovec![ + JournalReaderTraceEvent::LookingForEvent, + JournalReaderTraceEvent::AttemptingEvent( + modified_journal_info.init.corrupted_event_id - 1 + ), + JournalReaderTraceEvent::DetectedServerEvent, + JournalReaderTraceEvent::ServerEventMetadataParsed, + ], + "failed for journal {journal_id} with trim_size {trim_size}" + ); + } + } + }, + |journal_id, modified_journal_info, trim_size, repair_result, db, reopen_result| { + assert!(reopen_result.is_ok()); + assert_eq!( + repair_result.expect(&format!( + "failed at trim_size {trim_size} for journal {journal_id}" + )), + RepairResult::UnspecifiedLoss((offset - trim_size) as u64), + "failed for journal {journal_id} with trim_size {trim_size}" + ); + if modified_journal_info.init.last_executed_event_id == 0 { + assert_eq!( + db.data().len(), + 0, + "failed for journal {journal_id} with trim_size {trim_size}" + ); + } else { + assert_eq!( + db.data().len(), + TRIALS - 1, + "failed for journal {journal_id} with trim_size {trim_size}" + ); + assert_eq!( + db.data()[TRIALS - 2], + KEY, + "failed for journal {journal_id} with trim_size {trim_size}" + ); + } + let _ = debug_get_trace(); + }, + ) +} + +/* + midway corruption tests + --- + while in the prior tests we tested cases where the last event was corrupted, we now test cases where some middle + portion of the journal gets corrupted. the trouble is that we'll have to enumerate all cases for generated traces... + which is absolutely not feasible. Instead, we just ensure that the pre and post states are valid. +*/ + +#[test] +fn midway_corruption_close() { + let initializers = [ + Initializer::new_driver_type("midway_corruption_close_direct", |jrnl_id| { + /* + in this test corruption case we: + - PR cycle 1: create and close (0) + - PR cycle 2: open (1) and close (2) + we emulate a sequential corruption case for (0) + */ + // create and close + let mut jrnl = create_journal::(jrnl_id)?; + RawJournalWriter::close_driver(&mut jrnl)?; + drop(jrnl); + // reopen and close + let mut jrnl = open_journal::( + jrnl_id, + &SimpleDB::new(), + JournalSettings::default(), + )?; + RawJournalWriter::close_driver(&mut jrnl)?; + drop(jrnl); + Ok(InitializerInfo::new(0, 2)) // close (to corrupt), reopen, close + }), + Initializer::new_driver_type( + /* + in this test case we: + - PR cycle 1: create and close (0) + - PR cycle 2: reopen (1), apply events([2,101]), close (102) + - PR cycle 3: reopen (103), close (104) + we emulate a sequential corruption case for (102). expect all events to persist (<= 101) + */ + "midway_corruption_close_events_before_second_close", + |jrnl_id| { + { + // create and close + let mut jrnl = create_journal::(jrnl_id)?; + RawJournalWriter::close_driver(&mut jrnl)?; // (0) + } + let op_cnt; + { + // reopen, apply mix and close + let mut jrnl = open_journal::( + jrnl_id, + &SimpleDB::new(), + JournalSettings::default(), + )?; // (1) + op_cnt = apply_event_mix(&mut jrnl)?; + RawJournalWriter::close_driver(&mut jrnl)?; // <-- (op_cnt + 2) corrupt this one + } + { + // reopen and close + let mut jrnl = open_journal::( + jrnl_id, + &SimpleDB::new(), + JournalSettings::default(), + )?; // (op_cnt + 3) + RawJournalWriter::close_driver(&mut jrnl)?; // (op_cnt + 4) + } + Ok(InitializerInfo::new(op_cnt + 2, op_cnt + 4)) + }, + ), + Initializer::new_driver_type( + /* + in this test case: + - PR cycle 1: create and close (0) + - PR cycle 2: reopen (1) and close (2) + - PR cycle 3: reopen(3), apply events([4,103]), close(104) + we emulate a sequential corruption of (2) which results in a catastrophic corruption. expect major + data loss (==TRIALS) + */ + "midway_corruption_close_events_before_third_close", + |jrnl_id| { + { + // create and close + let mut jrnl = create_journal::(jrnl_id)?; + RawJournalWriter::close_driver(&mut jrnl)?; // (0) + } + { + // reopen and close + let mut jrnl = open_journal::( + jrnl_id, + &SimpleDB::new(), + JournalSettings::default(), + )?; // (1) + RawJournalWriter::close_driver(&mut jrnl)?; // <-- (2) corrupt this one + } + let op_cnt; + { + let mut jrnl = open_journal::( + jrnl_id, + &SimpleDB::new(), + JournalSettings::default(), + )?; // (3) + op_cnt = apply_event_mix(&mut jrnl)?; // (3 + op_count) + RawJournalWriter::close_driver(&mut jrnl)?; // (4 + op_count) + } + Ok(InitializerInfo::new(2, op_cnt + 4)) // corrupt the second close event + }, + ), + ]; + debug_set_offset_tracking(true); + emulate_midway_corruption( + initializers, + |journal_id, modified_journal_info, trim_size, db, open_result| { + assert!( + open_result.is_err(), + "failed for journal {journal_id} with trim_size {trim_size}" + ); + match modified_journal_info.initializer_id { + 0 | 2 => { + // in the first and third case, (0) no data is present (2) all data is lost + // all data will be lost, so the DB will be empty + assert_eq!( + db.data().len(), + 0, + "failed for journal {journal_id} with trim_size {trim_size}" + ); + } + 1 => { + // in this case, all elements will be preserved + assert_eq!( + *db.data().last().unwrap(), + keyfmt(TRIALS - 1), + "failed at trim_size {trim_size} for journal {journal_id}" + ); + } + _ => panic!(), + } + let _ = debug_get_offsets(); + let _ = debug_get_trace(); + }, + |journal_id, modified_journal_info, trim_size, repair_result, db, reopen_result| { + let _ = reopen_result.expect(&format!( + "failed at trim_size {trim_size} for journal {journal_id}" + )); + match modified_journal_info.initializer_id { + 0 | 2 => { + // all data will be lost, so the DB will be empty + assert_eq!( + db.data().len(), + 0, + "failed for journal {journal_id} with trim_size {trim_size}" + ); + if modified_journal_info.init.corrupted_event_id == 0 + && modified_journal_info.init.not_last_event() + { + // the first event was corrupted + assert_eq!( + repair_result.expect(&format!( + "failed at trim_size {trim_size} for journal {journal_id}" + )), + RepairResult::UnspecifiedLoss( + ((DriverEvent::FULL_EVENT_SIZE * 3) - trim_size) as u64 + ), + "failed for journal {journal_id} with trim_size {trim_size}" + ); + } else { + // this is a serious midway corruption with major data loss + let full_log_size = File::open(journal_id).unwrap().f_len().unwrap(); + assert_eq!( + repair_result.expect(&format!( + "failed at trim_size {trim_size} for journal {journal_id}" + )), + RepairResult::UnspecifiedLoss( + full_log_size + - SIMPLEDB_JOURNAL_HEADER_SIZE // account for header + as u64 + - (DriverEvent::FULL_EVENT_SIZE * 2) as u64 // account for close (0), reopen(1) + - trim_size as u64 // account for trim + ), + "failed for journal {journal_id} with trim_size {trim_size}" + ); + } + } + 1 => { + // in this case, all elements will be preserved + assert_eq!( + *db.data().last().unwrap(), + keyfmt(TRIALS - 1), + "failed at trim_size {trim_size} for journal {journal_id}" + ); + } + _ => panic!(), + } + let _ = debug_get_trace(); + let _ = debug_get_offsets(); + }, + ); + debug_set_offset_tracking(false); +} + +#[test] +fn midway_corruption_reopen() { + let initializers = [ + Initializer::new( + "midway_corruption_reopen_close_reopen_close", + |jrnl_id| { + /* + for this test case we create and close (0) the journal and in the next power cycle we reopen (1) and close (2) the + journal. we emulate a midway corruption where the reopen (1) gets corrupted. + */ + { + let mut jrnl = create_journal::(jrnl_id)?; + RawJournalWriter::close_driver(&mut jrnl)?; // (0) + } + { + let mut jrnl = open_journal::( + jrnl_id, + &SimpleDB::new(), + JournalSettings::default(), + )?; // (1) <-- corrupt + RawJournalWriter::close_driver(&mut jrnl)?; // (2) + } + Ok(InitializerInfo::new(1, 2)) + }, + DriverEvent::FULL_EVENT_SIZE, + ), + Initializer::new( + /* + create, apply ([0,99]), close (100). reopen(101), close (102). corrupt (101). expect no data loss + */ + "midway_corruption_reopen_apply_close_reopen_close", + |jrnl_id| { + let op_count; + { + let mut jrnl = create_journal::(jrnl_id)?; + op_count = apply_event_mix(&mut jrnl)?; + RawJournalWriter::close_driver(&mut jrnl)?; + } + { + let mut jrnl = open_journal::( + jrnl_id, + &SimpleDB::new(), + JournalSettings::default(), + )?; + RawJournalWriter::close_driver(&mut jrnl)?; + } + Ok(InitializerInfo::new((op_count + 1) as u64, 102)) + }, + DriverEvent::FULL_EVENT_SIZE, + ), + Initializer::new( + /* + create, close (0). reopen(1), apply ([2,101]), close (102). corrupt (1). expect full data loss + */ + "midway_corruption_reopen_apply_post_corrupted_reopen", + |jrnl_id| { + { + let mut jrnl = create_journal::(jrnl_id)?; + RawJournalWriter::close_driver(&mut jrnl)?; + } + { + let mut jrnl = open_journal::( + jrnl_id, + &SimpleDB::new(), + JournalSettings::default(), + )?; // <-- corrupt this one + let _ = apply_event_mix(&mut jrnl)?; // apply mix + RawJournalWriter::close_driver(&mut jrnl)?; + } + Ok(InitializerInfo::new(1, 102)) + }, + DriverEvent::FULL_EVENT_SIZE, + ), + ]; + debug_set_offset_tracking(true); // we need to track offsets + emulate_midway_corruption( + initializers, + |journal_id, modified_journal_info, trim_size, db, open_result| { + let _ = open_result.unwrap_err(); + match modified_journal_info.initializer_id { + 0 | 2 => { + assert_eq!( + db.data().len(), + 0, + "failed at trim_size {trim_size} for journal {journal_id}" + ); + } + 1 => { + assert_eq!( + db.data().len(), + POST_TRIALS_SIZE, + "failed at trim_size {trim_size} for journal {journal_id}" + ); + assert_eq!( + *db.data().last().unwrap(), + keyfmt(TRIALS - 1), + "failed at trim_size {trim_size} for journal {journal_id}" + ) + } + _ => panic!(), + } + let _ = debug_get_trace(); + let _ = debug_get_offsets(); + }, + |journal_id, modified_journal_info, trim_size, repair_result, db, open_result| { + let _ = open_result.expect(&format!( + "failed at trim_size {trim_size} for journal {journal_id}" + )); + let repair_result = repair_result.expect(&format!( + "failed at trim_size {trim_size} for journal {journal_id}" + )); + assert_eq!( + repair_result, + RepairResult::UnspecifiedLoss( + ((modified_journal_info.storage.modified_file_size + - modified_journal_info.storage.corruption_range.start) + + (DriverEvent::FULL_EVENT_SIZE - trim_size)) as u64 + ), + "failed at trim_size {trim_size} for journal {journal_id}" + ); + match modified_journal_info.initializer_id { + 0 | 2 => { + assert_eq!( + db.data().len(), + 0, + "failed at trim_size {trim_size} for journal {journal_id}" + ); + } + 1 => { + assert_eq!( + db.data().len(), + POST_TRIALS_SIZE, + "failed at trim_size {trim_size} for journal {journal_id}" + ); + assert_eq!( + *db.data().last().unwrap(), + keyfmt(TRIALS - 1), + "failed at trim_size {trim_size} for journal {journal_id}" + ) + } + _ => panic!(), + } + let _ = debug_get_trace(); + let _ = debug_get_offsets(); + }, + ); + debug_set_offset_tracking(false); +} + +#[test] +fn midway_corruption_at_runtime() { + debug_set_offset_tracking(true); + // compute offset size + let event_size_fixed_size_key = { + let mut jrnl = + create_journal::("midway_corruption_at_runtime_fixed_key").unwrap(); + SimpleDB::new().push(&mut jrnl, KEY).unwrap(); + let (_, offsets) = (debug_get_trace(), debug_get_offsets()); + *offsets.get(&0).unwrap() as usize - SIMPLEDB_JOURNAL_HEADER_SIZE + }; + // compute offset size + let event_size_dynamic_key = { + let mut jrnl = + create_journal::("midway_corruption_at_runtime_dynamic_key").unwrap(); + SimpleDB::new().push(&mut jrnl, keyfmt(0)).unwrap(); + let (_, offsets) = (debug_get_trace(), debug_get_offsets()); + *offsets.get(&0).unwrap() as usize - SIMPLEDB_JOURNAL_HEADER_SIZE + }; + let initializers = [ + Initializer::new( + /* + open, apply (0), close (1). corrupt (0). expect complete loss of the push event (0). + */ + "midway_corruption_at_runtime_open_server_event_close", + |jrnl_id| { + let mut jrnl = create_journal::(jrnl_id)?; + SimpleDB::new().push(&mut jrnl, KEY)?; + RawJournalWriter::close_driver(&mut jrnl)?; + Ok(InitializerInfo::new(0, 1)) + }, + event_size_fixed_size_key, + ), + Initializer::new( + /* + open, apply([0,99]), close (100). corrupt (99). expect complete loss of the last event(99). + */ + "midway_corruption_at_runtime_open_multiserver_event_then_close_corrupt_last", + |jrnl_id| { + let mut jrnl = create_journal::(jrnl_id)?; + let mut sdb = SimpleDB::new(); + for num in 1..=TRIALS { + sdb.push(&mut jrnl, keyfmt(num))?; + } + RawJournalWriter::close_driver(&mut jrnl)?; + Ok(InitializerInfo::new(TRIALS as u64 - 1, TRIALS as u64)) + }, + event_size_dynamic_key, + ), + Initializer::new( + /* + open, apply([0,99]), close (100). corrupt (0). expect complete loss of all events + */ + "midway_corruption_at_runtime_open_multiserver_event_then_close_corrupt_first", + |jrnl_id| { + let mut jrnl = create_journal::(jrnl_id)?; + let mut sdb = SimpleDB::new(); + for num in 1..=TRIALS { + sdb.push(&mut jrnl, keyfmt(num))?; + } + RawJournalWriter::close_driver(&mut jrnl)?; + Ok(InitializerInfo::new(0, TRIALS as u64)) + }, + event_size_dynamic_key, + ), + ]; + emulate_midway_corruption( + initializers, + |journal_id, modified_journal_info, trim_size, db, open_result| { + let _ = open_result.unwrap_err(); + let (_, _) = (debug_get_trace(), debug_get_offsets()); + match modified_journal_info.initializer_id { + 0 | 2 => { + assert_eq!( + db.data().len(), + 0, + "failed at trim_size {trim_size} for journal {journal_id}. data={:?}", + db.data() + ); + } + 1 => { + // expect to have all keys upto TRIALS - 1 + assert_eq!( + db.data().len(), + TRIALS - 1, + "failed at trim_size {trim_size} for journal {journal_id}. data={:?}", + db.data() + ); + // last key is TRIALS - 1 + assert_eq!(*db.data().last().unwrap(), keyfmt(TRIALS - 1)); + } + _ => panic!(), + } + }, + |journal_id, modified_journal_info, trim_size, repair_result, db, open_result| { + let repair_result = repair_result.expect(&format!( + "failed at trim_size {trim_size} for journal {journal_id}. file data={:?}. original_data={:?}", + FileSystem::read(&make_corrupted_file_name(journal_id, trim_size)), + FileSystem::read(journal_id), + )); + let _ = open_result.expect(&format!( + "failed at trim_size {trim_size} for journal {journal_id}. file data={:?}. original_data={:?}", + FileSystem::read(&make_corrupted_file_name(journal_id, trim_size)), + FileSystem::read(journal_id), + )); + match modified_journal_info.initializer_id { + 0 | 2 => { + assert_eq!( + db.data().len(), + 0, + "failed at trim_size {trim_size} for journal {journal_id}. data={:?}", + db.data() + ); + } + 1 => { + // expect to have all keys upto TRIALS - 1 + assert_eq!( + db.data().len(), + TRIALS - 1, + "failed at trim_size {trim_size} for journal {journal_id}. data={:?}", + db.data() + ); + // last key is TRIALS - 1 + assert_eq!(*db.data().last().unwrap(), keyfmt(TRIALS - 1)); + } + _ => panic!(), + } + match modified_journal_info.initializer_id { + 0 => { + assert_eq!( + repair_result, + RepairResult::UnspecifiedLoss( + ((modified_journal_info.storage.modified_file_size + - modified_journal_info.storage.corruption_range.start) + + (event_size_fixed_size_key - trim_size)) + as u64 + ), + "failed at trim_size {trim_size} for journal {journal_id}" + ) + } + 1 | 2 => { + assert_eq!( + repair_result, + RepairResult::UnspecifiedLoss( + ((modified_journal_info.storage.modified_file_size + - modified_journal_info.storage.corruption_range.start) + + (event_size_dynamic_key - trim_size)) + as u64 + ), + "failed at trim_size {trim_size} for journal {journal_id}" + ); + } + _ => panic!(), + } + let (_, _) = (debug_get_trace(), debug_get_offsets()); + }, + ) +} diff --git a/server/src/engine/storage/v2/raw/journal/tests.rs b/server/src/engine/storage/v2/raw/journal/tests.rs index bc2e8e78..52c58859 100644 --- a/server/src/engine/storage/v2/raw/journal/tests.rs +++ b/server/src/engine/storage/v2/raw/journal/tests.rs @@ -30,8 +30,9 @@ use { super::{ - raw::RawJournalAdapterEvent, BatchAdapter, BatchAdapterSpec, BatchDriver, DispatchFn, - EventLogAdapter, EventLogDriver, EventLogSpec, + raw::{JournalSettings, RawJournalAdapterEvent}, + BatchAdapter, BatchAdapterSpec, BatchDriver, DispatchFn, EventLogAdapter, EventLogDriver, + EventLogSpec, }, crate::{ engine::{ @@ -112,13 +113,13 @@ impl EventLogSpec for TestDBAdapter { const DECODE_DISPATCH: Self::DecodeDispatch = [ |db, payload| { if payload.len() < sizeof!(u64) { - Err(StorageError::RawJournalCorrupted.into()) + Err(StorageError::RawJournalDecodeEventCorruptedMetadata.into()) } else { let length = u64::from_le_bytes(unsafe { unsafe_apis::memcpy(&payload[..sizeof!(u64)]) }); let payload = &payload[sizeof!(u64)..]; if payload.len() as u64 != length { - Err(StorageError::RawJournalCorrupted.into()) + Err(StorageError::RawJournalDecodeEventCorruptedPayload.into()) } else { let string = String::from_utf8(payload.to_owned()).unwrap(); db._mut().push(string); @@ -172,7 +173,7 @@ fn open_log() -> ( super::raw::RawJournalWriter>, ) { let db = TestDB::default(); - let log = open_journal("jrnl", &db).unwrap(); + let log = open_journal("jrnl", &db, JournalSettings::default()).unwrap(); (db, log) } @@ -381,7 +382,7 @@ fn batch_simple() { } { let db = BatchDB::new(); - let mut batch_drv = BatchAdapter::open("mybatch", &db).unwrap(); + let mut batch_drv = BatchAdapter::open("mybatch", &db, JournalSettings::default()).unwrap(); db.push(&mut batch_drv, "key3").unwrap(); db.push(&mut batch_drv, "key4").unwrap(); assert_eq!(db._ref().data, ["key1", "key2", "key3", "key4"]); @@ -389,7 +390,7 @@ fn batch_simple() { } { let db = BatchDB::new(); - let mut batch_drv = BatchAdapter::open("mybatch", &db).unwrap(); + let mut batch_drv = BatchAdapter::open("mybatch", &db, JournalSettings::default()).unwrap(); db.push(&mut batch_drv, "key5").unwrap(); db.push(&mut batch_drv, "key6").unwrap(); assert_eq!( @@ -400,7 +401,9 @@ fn batch_simple() { } { let db = BatchDB::new(); - let mut batch_drv = BatchAdapter::::open("mybatch", &db).unwrap(); + let mut batch_drv = + BatchAdapter::::open("mybatch", &db, JournalSettings::default()) + .unwrap(); assert_eq!( db._ref().data, ["key1", "key2", "key3", "key4", "key5", "key6"] diff --git a/server/src/main.rs b/server/src/main.rs index 252f5f6f..808e5a72 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -72,27 +72,52 @@ fn main() { ConfigReturn::HelpMessage(msg) => { exit!(eprintln!("{msg}"), 0x00) } + ConfigReturn::Repair => return self::repair(), }, Err(e) => exit_fatal!(error!("{e}")), }; self::entrypoint(config) } +fn init() -> engine::RuntimeResult<(util::os::FileLock, tokio::runtime::Runtime)> { + let f_rt_start = || { + engine::set_context_init("locking PID file"); + let pid_file = util::os::FileLock::new(SKY_PID_FILE)?; + engine::set_context_init("initializing runtime"); + let runtime = tokio::runtime::Builder::new_multi_thread() + .thread_name("server") + .enable_all() + .build()?; + Ok((pid_file, runtime)) + }; + f_rt_start() +} + +fn exit( + global: Option, + pid_file: Option, + result: engine::RuntimeResult<()>, +) { + if let Some(g) = global { + info!("cleaning up data"); + engine::finish(g); + } + if let Some(_) = pid_file { + if let Err(e) = std::fs::remove_file(SKY_PID_FILE) { + error!("failed to remove PID file: {e}"); + } + } + match result { + Ok(()) => println!("goodbye"), + Err(e) => exit_fatal!(error!("{e}")), + } +} + fn entrypoint(config: engine::config::Configuration) { println!("{TEXT}\nSkytable v{VERSION} | {URL}\n"); let run = || { - let f_rt_start = || { - engine::set_context_init("locking PID file"); - let pid_file = util::os::FileLock::new(SKY_PID_FILE)?; - engine::set_context_init("initializing runtime"); - let runtime = tokio::runtime::Builder::new_multi_thread() - .thread_name("server") - .enable_all() - .build()?; - Ok((pid_file, runtime)) - }; - let (pid_file, runtime) = match f_rt_start() { - Ok((pf, rt)) => (pf, rt), + let (pid_file, runtime) = match init() { + Ok(pr) => pr, Err(e) => return (None, None, Err(e)), }; let f_glob_init = runtime.block_on(async move { @@ -113,17 +138,22 @@ fn entrypoint(config: engine::config::Configuration) { (Some(pid_file), Some(g), result_start) }; let (pid_file, global, result) = run(); - if let Some(g) = global { - info!("cleaning up data"); - engine::finish(g); - } - if let Some(_) = pid_file { - if let Err(e) = std::fs::remove_file(SKY_PID_FILE) { - error!("failed to remove PID file: {e}"); - } - } - match result { - Ok(()) => println!("goodbye"), - Err(e) => exit_fatal!(error!("{e}")), - } + self::exit(global, pid_file, result); +} + +fn repair() { + let (pid_file, rt) = match init() { + Ok(init) => init, + Err(e) => exit_fatal!(error!("failed to start repair task: {e}")), + }; + let result = rt.block_on(async move { + engine::set_context_init("binding system signals"); + let signal = util::os::TerminationSignal::init()?; + let result = tokio::task::spawn_blocking(|| engine::repair()) + .await + .unwrap(); + drop(signal); + result + }); + self::exit(None, Some(pid_file), result) } diff --git a/server/src/util/mod.rs b/server/src/util/mod.rs index e15ea2b9..5c9ef9fa 100644 --- a/server/src/util/mod.rs +++ b/server/src/util/mod.rs @@ -425,10 +425,11 @@ macro_rules! impl_endian { impl_endian!(u8, i8, u16, i16, u32, i32, u64, i64, usize, isize); +pub fn time_now_string() -> String { + chrono::Local::now().format("%Y%m%d_%H%M%S").to_string() +} + pub fn time_now_with_postfix(post_fix: &str) -> String { - let now = chrono::Local::now(); - // Format the current date and time as YYYYMMDD_HHMMSS - let formatted_date_time = now.format("%Y%m%d_%H%M%S").to_string(); // Concatenate the formatted date and time with the postfix - format!("{}-{}", formatted_date_time, post_fix) + format!("{}-{}", time_now_string(), post_fix) } diff --git a/sky-macros/src/dbtest.rs b/sky-macros/src/dbtest.rs index 52c882a7..68cfef90 100644 --- a/sky-macros/src/dbtest.rs +++ b/sky-macros/src/dbtest.rs @@ -24,11 +24,10 @@ * */ -use quote::quote; - use { crate::util::{self, AttributeKind}, proc_macro::TokenStream, + quote::quote, std::collections::HashMap, syn::{parse_macro_input, AttributeArgs, ItemFn}, }; diff --git a/sky-macros/src/util.rs b/sky-macros/src/util.rs index 540dcd7a..46b49118 100644 --- a/sky-macros/src/util.rs +++ b/sky-macros/src/util.rs @@ -24,8 +24,10 @@ * */ -use proc_macro2::Ident; -use syn::{Lit, Meta, MetaNameValue, NestedMeta, Path}; +use { + proc_macro2::Ident, + syn::{Lit, Meta, MetaNameValue, NestedMeta, Path}, +}; pub enum AttributeKind { Lit(Lit),