Merge pull request #336 from skytable/storage/repair-mode

storage: Add data recovery system
next
Sayan 6 months ago committed by GitHub
commit 6c3fffea79
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -4,6 +4,10 @@ All changes in this project will be noted in this file.
## Version 0.8.1 ## Version 0.8.1
### Additions
- Added support for manual repair with the `skyd repair` command
### Fixes ### Fixes
- Fixed migration from v1 SE (released with v0.8.0-beta) to v2 SE (released in v0.8.0) - Fixed migration from v1 SE (released with v0.8.0-beta) to v2 SE (released in v0.8.0)

84
Cargo.lock generated

@ -30,9 +30,9 @@ dependencies = [
[[package]] [[package]]
name = "aho-corasick" name = "aho-corasick"
version = "1.1.2" version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b2969dcb958b36655471fc61f7e416fa76033bdd4bfed0678d8fee1e2d07a1f0" checksum = "8e60d3430d3a69478ad0993f19238d2df97c507009a52b3c10addcd7f6bcb916"
dependencies = [ dependencies = [
"memchr", "memchr",
] ]
@ -102,13 +102,13 @@ dependencies = [
[[package]] [[package]]
name = "async-trait" name = "async-trait"
version = "0.1.77" version = "0.1.78"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c980ee35e870bd1a4d2c8294d4c04d0499e67bca1e4b5cefcc693c2fa00caea9" checksum = "461abc97219de0eaaf81fe3ef974a540158f3d079c2ab200f891f1a2ef201e85"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.52", "syn 2.0.53",
] ]
[[package]] [[package]]
@ -119,9 +119,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]] [[package]]
name = "backtrace" name = "backtrace"
version = "0.3.69" version = "0.3.71"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2089b7e3f35b9dd2d0ed921ead4f6d318c27680d4a5bd167b3ee120edb105837" checksum = "26b05800d2e817c8b3b4b54abd461726265fa9789ae34330622f2db9ee696f9d"
dependencies = [ dependencies = [
"addr2line", "addr2line",
"cc", "cc",
@ -165,9 +165,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]] [[package]]
name = "bitflags" name = "bitflags"
version = "2.4.2" version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed570934406eb16438a4e976b1b4500774099c13b8cb96eec99f620f05090ddf" checksum = "cf4b9d6a944f767f8e5e0db018570623c85f3d925ac718db4e06d0187adb21c1"
[[package]] [[package]]
name = "block-buffer" name = "block-buffer"
@ -202,9 +202,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
[[package]] [[package]]
name = "bytes" name = "bytes"
version = "1.5.0" version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223" checksum = "514de17de45fdb8dc022b1a7975556c53c86f9f0aa5f534b98977b171857c2c9"
[[package]] [[package]]
name = "bzip2" name = "bzip2"
@ -275,9 +275,9 @@ dependencies = [
[[package]] [[package]]
name = "clipboard-win" name = "clipboard-win"
version = "5.2.0" version = "5.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "12f9a0700e0127ba15d1d52dd742097f821cd9c65939303a44d970465040a297" checksum = "d517d4b86184dbb111d3556a10f1c8a04da7428d2987bf1081602bf11c3aa9ee"
dependencies = [ dependencies = [
"error-code", "error-code",
] ]
@ -373,7 +373,7 @@ version = "0.27.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f476fe445d41c9e991fd07515a6f463074b782242ccf4a5b7b1d1012e70824df" checksum = "f476fe445d41c9e991fd07515a6f463074b782242ccf4a5b7b1d1012e70824df"
dependencies = [ dependencies = [
"bitflags 2.4.2", "bitflags 2.5.0",
"crossterm_winapi", "crossterm_winapi",
"libc", "libc",
"mio", "mio",
@ -650,9 +650,9 @@ dependencies = [
[[package]] [[package]]
name = "indexmap" name = "indexmap"
version = "2.2.5" version = "2.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b0b929d511467233429c45a44ac1dcaa21ba0f5ba11e4879e6ed28ddb4f9df4" checksum = "168fb715dda47215e360912c096649d23d58bf392ac62f73919e831745e40f26"
dependencies = [ dependencies = [
"equivalent", "equivalent",
"hashbrown", "hashbrown",
@ -812,7 +812,7 @@ version = "0.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab2156c4fce2f8df6c499cc1c763e4394b7482525bf2a9701c9d79d215f519e4" checksum = "ab2156c4fce2f8df6c499cc1c763e4394b7482525bf2a9701c9d79d215f519e4"
dependencies = [ dependencies = [
"bitflags 2.4.2", "bitflags 2.5.0",
"cfg-if", "cfg-if",
"cfg_aliases", "cfg_aliases",
"libc", "libc",
@ -864,7 +864,7 @@ version = "0.10.64"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "95a0481286a310808298130d22dd1fef0fa571e05a8f44ec801801e84b216b1f" checksum = "95a0481286a310808298130d22dd1fef0fa571e05a8f44ec801801e84b216b1f"
dependencies = [ dependencies = [
"bitflags 2.4.2", "bitflags 2.5.0",
"cfg-if", "cfg-if",
"foreign-types", "foreign-types",
"libc", "libc",
@ -881,7 +881,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.52", "syn 2.0.53",
] ]
[[package]] [[package]]
@ -1085,9 +1085,9 @@ dependencies = [
[[package]] [[package]]
name = "regex" name = "regex"
version = "1.10.3" version = "1.10.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b62dbe01f0b06f9d8dc7d49e05a0785f153b00b2c227856282f671e0318c9b15" checksum = "c117dbdfde9c8308975b6a18d71f3f385c89461f7b3fb054288ecf2a2058ba4c"
dependencies = [ dependencies = [
"aho-corasick", "aho-corasick",
"memchr", "memchr",
@ -1120,11 +1120,11 @@ checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76"
[[package]] [[package]]
name = "rustix" name = "rustix"
version = "0.38.31" version = "0.38.32"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ea3e1a662af26cd7a3ba09c0297a31af215563ecf42817c98df621387f4e949" checksum = "65e04861e65f21776e67888bfbea442b3642beaa0138fdb1dd7a84a52dffdb89"
dependencies = [ dependencies = [
"bitflags 2.4.2", "bitflags 2.5.0",
"errno", "errno",
"libc", "libc",
"linux-raw-sys", "linux-raw-sys",
@ -1137,7 +1137,7 @@ version = "14.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7803e8936da37efd9b6d4478277f4b2b9bb5cdb37a113e8d63222e58da647e63" checksum = "7803e8936da37efd9b6d4478277f4b2b9bb5cdb37a113e8d63222e58da647e63"
dependencies = [ dependencies = [
"bitflags 2.4.2", "bitflags 2.5.0",
"cfg-if", "cfg-if",
"clipboard-win", "clipboard-win",
"fd-lock", "fd-lock",
@ -1223,14 +1223,14 @@ checksum = "7eb0b34b42edc17f6b7cac84a52a1c5f0e1bb2227e997ca9011ea3dd34e8610b"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.52", "syn 2.0.53",
] ]
[[package]] [[package]]
name = "serde_yaml" name = "serde_yaml"
version = "0.9.32" version = "0.9.33"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8fd075d994154d4a774f95b51fb96bdc2832b0ea48425c92546073816cda1f2f" checksum = "a0623d197252096520c6f2a5e1171ee436e5af99a5d7caa2891e55e61950e6d9"
dependencies = [ dependencies = [
"indexmap", "indexmap",
"itoa", "itoa",
@ -1389,9 +1389,9 @@ dependencies = [
[[package]] [[package]]
name = "smallvec" name = "smallvec"
version = "1.13.1" version = "1.13.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6ecd384b10a64542d77071bd64bd7b231f4ed5940fba55e98c3de13824cf3d7" checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67"
[[package]] [[package]]
name = "socket2" name = "socket2"
@ -1422,9 +1422,9 @@ dependencies = [
[[package]] [[package]]
name = "syn" name = "syn"
version = "2.0.52" version = "2.0.53"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b699d15b36d1f02c3e7c69f8ffef53de37aefae075d8488d4ba1a7788d574a07" checksum = "7383cd0e49fff4b6b90ca5670bfd3e9d6a733b3f90c686605aa7eec8c4996032"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -1489,7 +1489,7 @@ checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.52", "syn 2.0.53",
] ]
[[package]] [[package]]
@ -1540,9 +1540,9 @@ checksum = "e51733f11c9c4f72aa0c160008246859e340b00807569a0da0e7a1079b27ba85"
[[package]] [[package]]
name = "unsafe-libyaml" name = "unsafe-libyaml"
version = "0.2.10" version = "0.2.11"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab4c90930b95a82d00dc9e9ac071b4991924390d46cbd0dfe566148667605e4b" checksum = "673aac59facbab8a9007c7f6108d11f63b603f7cabff99fabf650fea5c32b861"
[[package]] [[package]]
name = "utf8parse" name = "utf8parse"
@ -1552,9 +1552,9 @@ checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a"
[[package]] [[package]]
name = "uuid" name = "uuid"
version = "1.7.0" version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f00cc9702ca12d3c81455259621e676d0f7251cec66a21e98fe2e9a37db93b2a" checksum = "a183cf7feeba97b4dd1c0d46788634f6221d87fa961b305bed08c851829efcc0"
dependencies = [ dependencies = [
"getrandom", "getrandom",
"rand", "rand",
@ -1563,13 +1563,13 @@ dependencies = [
[[package]] [[package]]
name = "uuid-macro-internal" name = "uuid-macro-internal"
version = "1.7.0" version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7abb14ae1a50dad63eaa768a458ef43d298cd1bd44951677bd10b732a9ba2a2d" checksum = "9881bea7cbe687e36c9ab3b778c36cd0487402e270304e8b1296d5085303c1a2"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.52", "syn 2.0.53",
] ]
[[package]] [[package]]
@ -1611,7 +1611,7 @@ dependencies = [
"once_cell", "once_cell",
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.52", "syn 2.0.53",
"wasm-bindgen-shared", "wasm-bindgen-shared",
] ]
@ -1633,7 +1633,7 @@ checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.52", "syn 2.0.53",
"wasm-bindgen-backend", "wasm-bindgen-backend",
"wasm-bindgen-shared", "wasm-bindgen-shared",
] ]

@ -16,7 +16,7 @@ libsky = { path = "../libsky" }
sky_macros = { path = "../sky-macros" } sky_macros = { path = "../sky-macros" }
rcrypt = "0.4.0" rcrypt = "0.4.0"
# external deps # external deps
bytes = "1.5.0" bytes = "1.6.0"
env_logger = "0.11.3" env_logger = "0.11.3"
log = "0.4.21" log = "0.4.21"
openssl = { version = "0.10.64", features = ["vendored"] } openssl = { version = "0.10.64", features = ["vendored"] }
@ -25,9 +25,9 @@ parking_lot = "0.12.1"
serde = { version = "1.0.197", features = ["derive"] } serde = { version = "1.0.197", features = ["derive"] }
tokio = { version = "1.36.0", features = ["full"] } tokio = { version = "1.36.0", features = ["full"] }
tokio-openssl = "0.6.4" tokio-openssl = "0.6.4"
uuid = { version = "1.7.0", features = ["v4", "fast-rng", "macro-diagnostics"] } uuid = { version = "1.8.0", features = ["v4", "fast-rng", "macro-diagnostics"] }
crc = "3.0.1" crc = "3.0.1"
serde_yaml = "0.9.32" serde_yaml = "0.9.33"
chrono = "0.4.35" chrono = "0.4.35"
[target.'cfg(all(not(target_env = "msvc"), not(miri)))'.dependencies] [target.'cfg(all(not(target_env = "msvc"), not(miri)))'.dependencies]

@ -13,6 +13,9 @@ Usage: skyd [OPTION]...
skyd is the Skytable database server daemon and can be used to serve database requests. skyd is the Skytable database server daemon and can be used to serve database requests.
Commands:
repair Check and repair any detected database storage errors
Flags: Flags:
-h, --help Display this help menu and exit. -h, --help Display this help menu and exit.
-v, --version Display the version number and exit. -v, --version Display the version number and exit.
@ -36,5 +39,7 @@ Notes:
- If no `--mode` is provided, we default to `dev` - If no `--mode` is provided, we default to `dev`
- You must provide `--auth-root-password` to set the default root password - You must provide `--auth-root-password` to set the default root password
- To use TLS, you must provide both `--tlscert` and `--tlskey` - To use TLS, you must provide both `--tlscert` and `--tlskey`
- When you run `repair`, your previous data is backed up in the `backups/` folder.
Restore if needed.
For further assistance, refer to the official documentation here: https://docs.skytable.org For further assistance, refer to the official documentation here: https://docs.skytable.org

@ -652,6 +652,7 @@ pub enum CLIConfigParseReturn<T> {
Version, Version,
/// We yielded a config /// We yielded a config
YieldedConfig(T), YieldedConfig(T),
Repair,
} }
impl<T> CLIConfigParseReturn<T> { impl<T> CLIConfigParseReturn<T> {
@ -670,10 +671,21 @@ impl<T> CLIConfigParseReturn<T> {
pub fn parse_cli_args<'a, T: 'a + AsRef<str>>( pub fn parse_cli_args<'a, T: 'a + AsRef<str>>(
src: impl Iterator<Item = T>, src: impl Iterator<Item = T>,
) -> RuntimeResult<CLIConfigParseReturn<ParsedRawArgs>> { ) -> RuntimeResult<CLIConfigParseReturn<ParsedRawArgs>> {
let mut args_iter = src.into_iter().skip(1); let mut args_iter = src.into_iter().skip(1).peekable();
let mut cli_args: ParsedRawArgs = HashMap::new(); let mut cli_args: ParsedRawArgs = HashMap::new();
while let Some(arg) = args_iter.next() { while let Some(arg) = args_iter.next() {
let arg = arg.as_ref(); let arg = arg.as_ref();
if arg == "repair" {
if args_iter.peek().is_none() {
return Ok(CLIConfigParseReturn::Repair);
} else {
return Err(ConfigError::with_src(
ConfigSource::Cli,
ConfigErrorKind::ErrorString("to use `repair`, just run `skyd repair`".into()),
)
.into());
}
}
if arg == "--help" || arg == "-h" { if arg == "--help" || arg == "-h" {
return Ok(CLIConfigParseReturn::Help); return Ok(CLIConfigParseReturn::Help);
} }
@ -978,6 +990,7 @@ pub enum ConfigReturn {
HelpMessage(String), HelpMessage(String),
/// A configuration that we have fully validated was provided /// A configuration that we have fully validated was provided
Config(Configuration), Config(Configuration),
Repair,
} }
impl ConfigReturn { impl ConfigReturn {
@ -1105,6 +1118,7 @@ pub fn check_configuration() -> RuntimeResult<ConfigReturn> {
libsky::VERSION libsky::VERSION
))); )));
} }
CLIConfigParseReturn::Repair => return Ok(ConfigReturn::Repair),
CLIConfigParseReturn::YieldedConfig(cfg) => Some(cfg), CLIConfigParseReturn::YieldedConfig(cfg) => Some(cfg),
}; };
match cli_args { match cli_args {

@ -71,9 +71,13 @@ impl PrimaryIndex {
} }
#[derive(Debug)] #[derive(Debug)]
pub struct IndexLatchHandleShared<'t>(parking_lot::RwLockReadGuard<'t, ()>); pub struct IndexLatchHandleShared<'t> {
_lck: parking_lot::RwLockReadGuard<'t, ()>,
}
#[derive(Debug)] #[derive(Debug)]
pub struct IndexLatchHandleExclusive<'t>(parking_lot::RwLockWriteGuard<'t, ()>); pub struct IndexLatchHandleExclusive<'t> {
_lck: parking_lot::RwLockWriteGuard<'t, ()>,
}
#[derive(Debug)] #[derive(Debug)]
struct IndexLatch { struct IndexLatch {
@ -87,9 +91,13 @@ impl IndexLatch {
} }
} }
fn gl_handle_shared(&self) -> IndexLatchHandleShared { fn gl_handle_shared(&self) -> IndexLatchHandleShared {
IndexLatchHandleShared(self.glck.read()) IndexLatchHandleShared {
_lck: self.glck.read(),
}
} }
fn gl_handle_exclusive(&self) -> IndexLatchHandleExclusive { fn gl_handle_exclusive(&self) -> IndexLatchHandleExclusive {
IndexLatchHandleExclusive(self.glck.write()) IndexLatchHandleExclusive {
_lck: self.glck.write(),
}
} }
} }

@ -169,11 +169,11 @@ enumerate_err! {
/// Errors that occur when restoring transactional data /// Errors that occur when restoring transactional data
pub enum TransactionError { pub enum TransactionError {
/// corrupted txn payload. has more bytes than expected /// corrupted txn payload. has more bytes than expected
DecodeCorruptedPayloadMoreBytes = "txn-payload-unexpected-content", V1DecodeCorruptedPayloadMoreBytes = "txn-payload-unexpected-content",
/// transaction payload is corrupted. has lesser bytes than expected /// transaction payload is corrupted. has lesser bytes than expected
DecodedUnexpectedEof = "txn-payload-unexpected-eof", V1DecodedUnexpectedEof = "txn-payload-unexpected-eof",
/// unknown transaction operation. usually indicates a corrupted payload /// unknown transaction operation. usually indicates a corrupted payload
DecodeUnknownTxnOp = "txn-payload-unknown-payload", V1DecodeUnknownTxnOp = "txn-payload-unknown-payload",
/// While restoring a certain item, a non-resolvable conflict was encountered in the global state, because the item was /// While restoring a certain item, a non-resolvable conflict was encountered in the global state, because the item was
/// already present (when it was expected to not be present) /// already present (when it was expected to not be present)
OnRestoreDataConflictAlreadyExists = "txn-payload-conflict-already-exists", OnRestoreDataConflictAlreadyExists = "txn-payload-conflict-already-exists",
@ -185,19 +185,24 @@ enumerate_err! {
} }
enumerate_err! { enumerate_err! {
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq, Clone, Copy)]
/// SDSS based storage engine errors /// SDSS based storage engine errors
pub enum StorageError { pub enum StorageError {
// header /*
----
SDSS Errors
----
These errors are common across all versions
*/
/// version mismatch /// version mismatch
HeaderDecodeVersionMismatch = "header-version-mismatch", FileDecodeHeaderVersionMismatch = "header-version-mismatch",
/// The entire header is corrupted /// The entire header is corrupted
HeaderDecodeCorruptedHeader = "header-corrupted", FileDecodeHeaderCorrupted = "header-corrupted",
// journal /*
/// An entry in the journal is corrupted ----
JournalLogEntryCorrupted = "journal-entry-corrupted", Common encoding errors
/// The structure of the journal is corrupted ----
JournalCorrupted = "journal-corrupted", */
// internal file structures // internal file structures
/// While attempting to decode a structure in an internal segment of a file, the storage engine ran into a possibly irrecoverable error /// While attempting to decode a structure in an internal segment of a file, the storage engine ran into a possibly irrecoverable error
InternalDecodeStructureCorrupted = "structure-decode-corrupted", InternalDecodeStructureCorrupted = "structure-decode-corrupted",
@ -205,21 +210,48 @@ enumerate_err! {
InternalDecodeStructureCorruptedPayload = "structure-decode-corrupted-payload", InternalDecodeStructureCorruptedPayload = "structure-decode-corrupted-payload",
/// the data for an internal structure was decoded but is logically invalid /// the data for an internal structure was decoded but is logically invalid
InternalDecodeStructureIllegalData = "structure-decode-illegal-data", InternalDecodeStructureIllegalData = "structure-decode-illegal-data",
/*
----
V1 Journal Errors
----
*/
/// An entry in the journal is corrupted
V1JournalDecodeLogEntryCorrupted = "journal-entry-corrupted",
/// The structure of the journal is corrupted
V1JournalDecodeCorrupted = "journal-corrupted",
/// when attempting to restore a data batch from disk, the batch journal crashed and had a corruption, but it is irrecoverable /// when attempting to restore a data batch from disk, the batch journal crashed and had a corruption, but it is irrecoverable
DataBatchRestoreCorruptedBatch = "batch-corrupted-batch", V1DataBatchDecodeCorruptedBatch = "batch-corrupted-batch",
/// when attempting to restore a data batch from disk, the driver encountered a corrupted entry /// when attempting to restore a data batch from disk, the driver encountered a corrupted entry
DataBatchRestoreCorruptedEntry = "batch-corrupted-entry", V1DataBatchDecodeCorruptedEntry = "batch-corrupted-entry",
/// we failed to close the data batch
DataBatchCloseError = "batch-persist-close-failed",
/// the data batch file is corrupted /// the data batch file is corrupted
DataBatchRestoreCorruptedBatchFile = "batch-corrupted-file", V1DataBatchDecodeCorruptedBatchFile = "batch-corrupted-file",
/// the system database is corrupted /// the system database is corrupted
SysDBCorrupted = "sysdb-corrupted", V1SysDBDecodeCorrupted = "sysdb-corrupted",
// raw journal errors /// we failed to close the data batch
RawJournalEventCorruptedMetadata = "journal-event-metadata-corrupted", V1DataBatchRuntimeCloseError = "batch-persist-close-failed",
RawJournalEventCorrupted = "journal-invalid-event", /*
RawJournalCorrupted = "journal-corrupted", ----
RawJournalInvalidEvent = "journal-invalid-event-order", V2 Journal Errors
RawJournalRuntimeCriticalLwtHBFail = "journal-lwt-heartbeat-failed", ----
*/
/// Journal event metadata corrupted
RawJournalDecodeEventCorruptedMetadata = "journal-event-metadata-corrupted",
/// The event body is corrupted
RawJournalDecodeEventCorruptedPayload = "journal-event-payload-corrupted",
/// batch contents was unexpected (for example, we expected n events but got m events)
RawJournalDecodeBatchContentsMismatch = "journal-batch-unexpected-termination",
/// batch contents was validated and executed but the final integrity check failed
RawJournalDecodeBatchIntegrityFailure = "journal-batch-integrity-check-failed",
/// unexpected order of events
RawJournalDecodeInvalidEvent = "journal-invalid-event-order",
/// corrupted event within a batch
RawJournalDecodeCorruptionInBatchMetadata = "journal-batch-corrupted-event-metadata",
/*
----
runtime errors
----
*/
RawJournalRuntimeHeartbeatFail = "journal-lwt-heartbeat-failed",
RawJournalRuntimeDirty = "journal-in-dirty-state",
} }
} }

@ -33,7 +33,7 @@ use {
EntityIDRef, EntityIDRef,
}, },
data::uuid::Uuid, data::uuid::Uuid,
error::ErrorKind, error::StorageError,
fractal::GlobalInstanceLike, fractal::GlobalInstanceLike,
storage::{ storage::{
safe_interfaces::{paths_v1, StdModelBatch}, safe_interfaces::{paths_v1, StdModelBatch},
@ -528,9 +528,7 @@ impl FractalMgr {
if mdl_driver_.status().is_iffy() { if mdl_driver_.status().is_iffy() {
// don't mess this up any further // don't mess this up any further
return Err(( return Err((
super::error::Error::from(ErrorKind::Other( super::error::Error::from(StorageError::RawJournalRuntimeDirty),
"model driver is in dirty state".into(),
)),
BatchStats::into_inner(BatchStats::new()), BatchStats::into_inner(BatchStats::new()),
)); ));
} }

@ -37,6 +37,7 @@ use {
std::{ std::{
fmt, fmt,
mem::MaybeUninit, mem::MaybeUninit,
ptr::addr_of_mut,
sync::atomic::{AtomicUsize, Ordering}, sync::atomic::{AtomicUsize, Ordering},
}, },
tokio::sync::mpsc::unbounded_channel, tokio::sync::mpsc::unbounded_channel,
@ -262,16 +263,16 @@ impl Global {
.get_rt_stat() .get_rt_stat()
.per_mdl_delta_max_size() .per_mdl_delta_max_size()
} }
unsafe fn __gref_raw() -> &'static mut MaybeUninit<GlobalState> { unsafe fn __gref_raw() -> *mut MaybeUninit<GlobalState> {
static mut G: MaybeUninit<GlobalState> = MaybeUninit::uninit(); static mut G: MaybeUninit<GlobalState> = MaybeUninit::uninit();
&mut G addr_of_mut!(G)
} }
unsafe fn __gref(&self) -> &'static GlobalState { unsafe fn __gref(&self) -> &'static GlobalState {
Self::__gref_raw().assume_init_ref() (&*Self::__gref_raw()).assume_init_ref()
} }
pub unsafe fn unload_all(self) { pub unsafe fn unload_all(self) {
// TODO(@ohsayan): handle errors // TODO(@ohsayan): handle errors
let GlobalState { gns, .. } = Self::__gref_raw().assume_init_read(); let GlobalState { gns, .. } = Self::__gref_raw().read().assume_init();
let mut gns_driver = gns.gns_driver().txn_driver.lock(); let mut gns_driver = gns.gns_driver().txn_driver.lock();
GNSDriver::close_driver(&mut gns_driver).unwrap(); GNSDriver::close_driver(&mut gns_driver).unwrap();
for mdl in gns for mdl in gns

@ -77,6 +77,7 @@ impl TestGlobal {
model_name.entity(), model_name.entity(),
model_data.get_uuid(), model_data.get_uuid(),
), ),
Default::default(),
)?; )?;
model.driver().initialize_model_driver(driver); model.driver().initialize_model_driver(driver);
} }
@ -97,7 +98,7 @@ impl TestGlobal {
Err(e) => match e.kind() { Err(e) => match e.kind() {
ErrorKind::IoError(e_) => match e_.kind() { ErrorKind::IoError(e_) => match e_.kind() {
std::io::ErrorKind::AlreadyExists => { std::io::ErrorKind::AlreadyExists => {
GNSDriver::open_gns_with_name(log_name, &data) GNSDriver::open_gns_with_name(log_name, &data, Default::default())
} }
_ => Err(e), _ => Err(e),
}, },

@ -460,3 +460,7 @@ macro_rules! e {
r($e) r($e)
}}; }};
} }
macro_rules! l {
(let $($name:ident),* = $($expr:expr),*) => { let ($($name),*) = ($($expr),*); }
}

@ -42,7 +42,7 @@ mod txn;
#[cfg(test)] #[cfg(test)]
mod tests; mod tests;
// re-export // re-export
pub use error::RuntimeResult; pub use {error::RuntimeResult, fractal::Global};
use crate::engine::storage::SELoaded; use crate::engine::storage::SELoaded;
@ -198,3 +198,7 @@ pub fn finish(g: fractal::Global) {
g.unload_all(); g.unload_all();
} }
} }
pub fn repair() -> RuntimeResult<()> {
storage::repair()
}

@ -30,6 +30,8 @@
file system file system
*/ */
use crate::util;
#[cfg(test)] #[cfg(test)]
use super::vfs::{VFileDescriptor, VirtualFS}; use super::vfs::{VFileDescriptor, VirtualFS};
use { use {
@ -56,6 +58,28 @@ impl FileSystem {
} }
impl FileSystem { impl FileSystem {
#[inline(always)]
pub fn copy_directory(from: &str, to: &str) -> IoResult<()> {
#[cfg(test)]
{
match Self::context() {
FSContext::Local => {}
FSContext::Virtual => return VirtualFS::instance().write().fs_copy(from, to),
}
}
util::os::recursive_copy(from, to)
}
#[inline(always)]
pub fn copy(from: &str, to: &str) -> IoResult<()> {
#[cfg(test)]
{
match Self::context() {
FSContext::Local => {}
FSContext::Virtual => return VirtualFS::instance().write().fs_copy(from, to),
}
}
std_fs::copy(from, to).map(|_| ())
}
#[inline(always)] #[inline(always)]
pub fn read(path: &str) -> IoResult<Vec<u8>> { pub fn read(path: &str) -> IoResult<Vec<u8>> {
#[cfg(test)] #[cfg(test)]

@ -62,6 +62,19 @@ enum VNode {
File(RwLock<VFile>), File(RwLock<VFile>),
} }
impl VNode {
fn clone_into_new_node(&self) -> Self {
match self {
Self::Dir(d) => Self::Dir(
d.iter()
.map(|(id, data)| (id.clone(), data.clone_into_new_node()))
.collect(),
),
Self::File(f) => Self::File(RwLock::new(f.read().clone_to_new_file())),
}
}
}
#[derive(Debug)] #[derive(Debug)]
pub(super) struct VFile { pub(super) struct VFile {
read: bool, read: bool,
@ -103,6 +116,14 @@ impl Drop for VFileDescriptor {
*/ */
impl VFile { impl VFile {
pub fn clone_to_new_file(&self) -> Self {
Self {
read: false,
write: false,
data: self.data.clone(),
pos: 0,
}
}
pub fn truncate(&mut self, to: u64) -> IoResult<()> { pub fn truncate(&mut self, to: u64) -> IoResult<()> {
if !self.write { if !self.write {
return Err(Error::new(ErrorKind::PermissionDenied, "Write permission denied").into()); return Err(Error::new(ErrorKind::PermissionDenied, "Write permission denied").into());
@ -187,6 +208,30 @@ impl VirtualFS {
pub fn get_data(&self, path: &str) -> IoResult<Vec<u8>> { pub fn get_data(&self, path: &str) -> IoResult<Vec<u8>> {
self.with_file(path, |f| Ok(f.data.clone())) self.with_file(path, |f| Ok(f.data.clone()))
} }
pub fn fs_copy(&mut self, from: &str, to: &str) -> IoResult<()> {
let node = self.with_item(from, |node| Ok(node.clone_into_new_node()))?;
// process components
let (target, components) = util::split_target_and_components(to);
let mut current = &mut self.root;
for component in components {
match current.get_mut(component) {
Some(VNode::Dir(dir)) => {
current = dir;
}
Some(VNode::File(_)) => return err::file_in_dir_path(),
None => return err::dir_missing_in_path(),
}
}
match current.entry(target.into()) {
Entry::Occupied(mut item) => {
item.insert(node);
}
Entry::Vacant(ve) => {
ve.insert(node);
}
}
Ok(())
}
pub fn fs_fcreate_rw(&mut self, fpath: &str) -> IoResult<VFileDescriptor> { pub fn fs_fcreate_rw(&mut self, fpath: &str) -> IoResult<VFileDescriptor> {
let (target_file, components) = util::split_target_and_components(fpath); let (target_file, components) = util::split_target_and_components(fpath);
let target_dir = util::find_target_dir_mut(components, &mut self.root)?; let target_dir = util::find_target_dir_mut(components, &mut self.root)?;
@ -354,16 +399,13 @@ impl VirtualFS {
fpath: &str, fpath: &str,
f: impl FnOnce(&VFile) -> IoResult<T>, f: impl FnOnce(&VFile) -> IoResult<T>,
) -> IoResult<T> { ) -> IoResult<T> {
let (target_file, components) = util::split_target_and_components(fpath); self.with_item(fpath, |node| match node {
let target_dir = util::find_target_dir(components, &self.root)?; VNode::File(file) => {
match target_dir.get(target_file) {
Some(VNode::File(file)) => {
let f_ = file.read(); let f_ = file.read();
f(&f_) f(&f_)
} }
Some(VNode::Dir(_)) => return err::item_is_not_file(), VNode::Dir(_) => err::item_is_not_file(),
None => return Err(Error::from(ErrorKind::NotFound).into()), })
}
} }
fn with_item_mut<T>( fn with_item_mut<T>(
&mut self, &mut self,
@ -387,6 +429,24 @@ impl VirtualFS {
Entry::Vacant(_) => return err::could_not_find_item(), Entry::Vacant(_) => return err::could_not_find_item(),
} }
} }
fn with_item<T>(&self, fpath: &str, f: impl FnOnce(&VNode) -> IoResult<T>) -> IoResult<T> {
// process components
let (target, components) = util::split_target_and_components(fpath);
let mut current = &self.root;
for component in components {
match current.get(component) {
Some(VNode::Dir(dir)) => {
current = dir;
}
Some(VNode::File(_)) => return err::file_in_dir_path(),
None => return err::dir_missing_in_path(),
}
}
match current.get(target.into()) {
Some(item) => return f(item),
None => return err::could_not_find_item(),
}
}
fn dir_delete(&mut self, fpath: &str, allow_if_non_empty: bool) -> IoResult<()> { fn dir_delete(&mut self, fpath: &str, allow_if_non_empty: bool) -> IoResult<()> {
self.with_item_mut(fpath, |node| match node.get() { self.with_item_mut(fpath, |node| match node.get() {
VNode::Dir(d) => { VNode::Dir(d) => {

@ -47,7 +47,7 @@ use {
util::{compiler::TaggedEnum, os}, util::{compiler::TaggedEnum, os},
IoResult, IoResult,
}, },
std::{mem::ManuallyDrop, ops::Range}, std::ops::Range,
}; };
pub const TEST_TIME: u128 = (u64::MAX / sizeof!(u64) as u64) as _; pub const TEST_TIME: u128 = (u64::MAX / sizeof!(u64) as u64) as _;
@ -338,14 +338,10 @@ impl<H: HeaderV1Spec> HeaderV1<H> {
}) })
} else { } else {
let version_okay = okay_header_version & okay_server_version & okay_driver_version; let version_okay = okay_header_version & okay_server_version & okay_driver_version;
let md = ManuallyDrop::new([ Err([
StorageError::HeaderDecodeCorruptedHeader, StorageError::FileDecodeHeaderCorrupted,
StorageError::HeaderDecodeVersionMismatch, StorageError::FileDecodeHeaderVersionMismatch,
]); ][!version_okay as usize])
Err(unsafe {
// UNSAFE(@ohsayan): while not needed, md for drop safety + correct index
md.as_ptr().add(!version_okay as usize).read().into()
})
} }
} }
} }
@ -444,7 +440,7 @@ pub trait SimpleFileSpecV1 {
if v == Self::FILE_SPECFIER_VERSION { if v == Self::FILE_SPECFIER_VERSION {
Ok(()) Ok(())
} else { } else {
Err(StorageError::HeaderDecodeVersionMismatch.into()) Err(StorageError::FileDecodeHeaderVersionMismatch.into())
} }
} }
} }
@ -466,7 +462,7 @@ impl<Sfs: SimpleFileSpecV1> FileSpecV1 for Sfs {
if okay { if okay {
Ok(md) Ok(md)
} else { } else {
Err(StorageError::HeaderDecodeVersionMismatch.into()) Err(StorageError::FileDecodeHeaderVersionMismatch.into())
} }
} }
fn write_metadata(f: &mut impl FileWrite, _: Self::EncodeArgs) -> IoResult<Self::Metadata> { fn write_metadata(f: &mut impl FileWrite, _: Self::EncodeArgs) -> IoResult<Self::Metadata> {

@ -140,6 +140,9 @@ impl<S: FileSpecV1, F: FileWrite + FileWriteExt> SdssFile<S, F> {
self.file.fwrite_all(data)?; self.file.fwrite_all(data)?;
self.file.fsync_all() self.file.fsync_all()
} }
pub fn truncate(&mut self, new_size: u64) -> IoResult<()> {
self.file.f_truncate(new_size)
}
} }
/* /*
@ -234,13 +237,13 @@ impl<S: FileSpecV1> TrackedReader<S> {
Err(e) => return Err(e), Err(e) => return Err(e),
} }
} else { } else {
Err(SysIOError::from(std::io::ErrorKind::InvalidInput).into_inner()) Err(SysIOError::from(std::io::ErrorKind::UnexpectedEof).into_inner())
} }
} }
/// Tracked read of a given block size. Shorthand for [`Self::tracked_read`] /// Tracked read of a given block size. Shorthand for [`Self::tracked_read`]
pub fn read_block<const N: usize>(&mut self) -> IoResult<[u8; N]> { pub fn read_block<const N: usize>(&mut self) -> IoResult<[u8; N]> {
if !self.has_left(N as _) { if !self.has_left(N as _) {
return Err(SysIOError::from(std::io::ErrorKind::InvalidInput).into_inner()); return Err(SysIOError::from(std::io::ErrorKind::UnexpectedEof).into_inner());
} }
let mut buf = [0; N]; let mut buf = [0; N];
self.tracked_read(&mut buf)?; self.tracked_read(&mut buf)?;
@ -259,6 +262,9 @@ impl<S: FileSpecV1> TrackedReader<S> {
pub fn cursor(&self) -> u64 { pub fn cursor(&self) -> u64 {
self.cursor self.cursor
} }
pub fn cached_size(&self) -> u64 {
self.len
}
} }
impl<S: FileSpecV1> TrackedReader<S> { impl<S: FileSpecV1> TrackedReader<S> {

@ -74,7 +74,7 @@ where
if scanner.eof() { if scanner.eof() {
Ok(()) Ok(())
} else { } else {
Err(StorageError::JournalLogEntryCorrupted.into()) Err(StorageError::V1JournalDecodeLogEntryCorrupted.into())
} }
} }
fn decode_and_update_global_state( fn decode_and_update_global_state(

@ -43,6 +43,7 @@ use {
}, },
util::{compiler::TaggedEnum, EndianQW}, util::{compiler::TaggedEnum, EndianQW},
}, },
std::marker::PhantomData,
}; };
/* /*
@ -318,10 +319,10 @@ impl FieldMD {
} }
} }
pub struct FieldRef<'a>(&'a Field); pub struct FieldRef<'a>(PhantomData<&'a Field>);
impl<'a> From<&'a Field> for FieldRef<'a> { impl<'a> From<&'a Field> for FieldRef<'a> {
fn from(f: &'a Field) -> Self { fn from(_: &'a Field) -> Self {
Self(f) Self(PhantomData)
} }
} }
impl<'a> PersistObject for FieldRef<'a> { impl<'a> PersistObject for FieldRef<'a> {

@ -57,6 +57,10 @@ pub struct SELoaded {
pub gns: GlobalNS, pub gns: GlobalNS,
} }
pub fn repair() -> RuntimeResult<()> {
v2::repair()
}
pub fn load(cfg: &Configuration) -> RuntimeResult<SELoaded> { pub fn load(cfg: &Configuration) -> RuntimeResult<SELoaded> {
// first determine if this is a new install, an existing install or if it uses the old driver // first determine if this is a new install, an existing install or if it uses the old driver
if Path::new(v1::SYSDB_PATH).is_file() { if Path::new(v1::SYSDB_PATH).is_file() {

@ -53,7 +53,7 @@ impl DataBatchPersistDriver {
if slf.fsynced_write(&[MARKER_BATCH_CLOSED]).is_ok() { if slf.fsynced_write(&[MARKER_BATCH_CLOSED]).is_ok() {
return Ok(()); return Ok(());
} else { } else {
return Err(StorageError::DataBatchCloseError.into()); return Err(StorageError::V1DataBatchRuntimeCloseError.into());
} }
} }
} }

@ -181,7 +181,7 @@ impl DataBatchRestoreDriver {
} }
} }
// nope, this is a corrupted file // nope, this is a corrupted file
Err(StorageError::DataBatchRestoreCorruptedBatchFile.into()) Err(StorageError::V1DataBatchDecodeCorruptedBatchFile.into())
} }
fn handle_reopen_is_actual_close(&mut self) -> RuntimeResult<bool> { fn handle_reopen_is_actual_close(&mut self) -> RuntimeResult<bool> {
if self.f.is_eof() { if self.f.is_eof() {
@ -194,7 +194,7 @@ impl DataBatchRestoreDriver {
Ok(false) Ok(false)
} else { } else {
// that's just a nice bug // that's just a nice bug
Err(StorageError::DataBatchRestoreCorruptedBatchFile.into()) Err(StorageError::V1DataBatchDecodeCorruptedBatchFile.into())
} }
} }
} }
@ -301,7 +301,7 @@ impl DataBatchRestoreDriver {
// we must read the batch termination signature // we must read the batch termination signature
let b = self.f.read_byte()?; let b = self.f.read_byte()?;
if b != MARKER_END_OF_BATCH { if b != MARKER_END_OF_BATCH {
return Err(StorageError::DataBatchRestoreCorruptedBatch.into()); return Err(StorageError::V1DataBatchDecodeCorruptedBatch.into());
} }
} }
// read actual commit // read actual commit
@ -314,7 +314,7 @@ impl DataBatchRestoreDriver {
if actual_checksum == u64::from_le_bytes(hardcoded_checksum) { if actual_checksum == u64::from_le_bytes(hardcoded_checksum) {
Ok(actual_commit) Ok(actual_commit)
} else { } else {
Err(StorageError::DataBatchRestoreCorruptedBatch.into()) Err(StorageError::V1DataBatchDecodeCorruptedBatch.into())
} }
} }
fn read_batch(&mut self) -> RuntimeResult<Batch> { fn read_batch(&mut self) -> RuntimeResult<Batch> {
@ -334,7 +334,7 @@ impl DataBatchRestoreDriver {
} }
_ => { _ => {
// this is the only singular byte that is expected to be intact. If this isn't intact either, I'm sorry // this is the only singular byte that is expected to be intact. If this isn't intact either, I'm sorry
return Err(StorageError::DataBatchRestoreCorruptedBatch.into()); return Err(StorageError::V1DataBatchDecodeCorruptedBatch.into());
} }
} }
// decode batch start block // decode batch start block
@ -378,7 +378,7 @@ impl DataBatchRestoreDriver {
this_col_cnt -= 1; this_col_cnt -= 1;
} }
if this_col_cnt != 0 { if this_col_cnt != 0 {
return Err(StorageError::DataBatchRestoreCorruptedEntry.into()); return Err(StorageError::V1DataBatchDecodeCorruptedEntry.into());
} }
if change_type == 1 { if change_type == 1 {
this_batch.push(DecodedBatchEvent::new( this_batch.push(DecodedBatchEvent::new(
@ -396,7 +396,7 @@ impl DataBatchRestoreDriver {
processed_in_this_batch += 1; processed_in_this_batch += 1;
} }
_ => { _ => {
return Err(StorageError::DataBatchRestoreCorruptedBatch.into()); return Err(StorageError::V1DataBatchDecodeCorruptedBatch.into());
} }
} }
} }
@ -413,7 +413,7 @@ impl DataBatchRestoreDriver {
if let [MARKER_RECOVERY_EVENT] = buf { if let [MARKER_RECOVERY_EVENT] = buf {
return Ok(()); return Ok(());
} }
Err(StorageError::DataBatchRestoreCorruptedBatch.into()) Err(StorageError::V1DataBatchDecodeCorruptedBatch.into())
} }
fn read_start_batch_block(&mut self) -> RuntimeResult<BatchStartBlock> { fn read_start_batch_block(&mut self) -> RuntimeResult<BatchStartBlock> {
let pk_tag = self.f.read_byte()?; let pk_tag = self.f.read_byte()?;
@ -463,7 +463,7 @@ impl BatchStartBlock {
impl DataBatchRestoreDriver { impl DataBatchRestoreDriver {
fn decode_primary_key(&mut self, pk_type: u8) -> RuntimeResult<PrimaryIndexKey> { fn decode_primary_key(&mut self, pk_type: u8) -> RuntimeResult<PrimaryIndexKey> {
let Some(pk_type) = TagUnique::try_from_raw(pk_type) else { let Some(pk_type) = TagUnique::try_from_raw(pk_type) else {
return Err(StorageError::DataBatchRestoreCorruptedEntry.into()); return Err(StorageError::V1DataBatchDecodeCorruptedEntry.into());
}; };
Ok(match pk_type { Ok(match pk_type {
TagUnique::SignedInt | TagUnique::UnsignedInt => { TagUnique::SignedInt | TagUnique::UnsignedInt => {
@ -479,7 +479,7 @@ impl DataBatchRestoreDriver {
self.f.tracked_read(&mut data)?; self.f.tracked_read(&mut data)?;
if pk_type == TagUnique::Str { if pk_type == TagUnique::Str {
if core::str::from_utf8(&data).is_err() { if core::str::from_utf8(&data).is_err() {
return Err(StorageError::DataBatchRestoreCorruptedEntry.into()); return Err(StorageError::V1DataBatchDecodeCorruptedEntry.into());
} }
} }
unsafe { unsafe {
@ -496,7 +496,7 @@ impl DataBatchRestoreDriver {
} }
fn decode_cell(&mut self) -> RuntimeResult<Datacell> { fn decode_cell(&mut self) -> RuntimeResult<Datacell> {
let Some(dscr) = StorageCellTypeID::try_from_raw(self.f.read_byte()?) else { let Some(dscr) = StorageCellTypeID::try_from_raw(self.f.read_byte()?) else {
return Err(StorageError::DataBatchRestoreCorruptedEntry.into()); return Err(StorageError::V1DataBatchDecodeCorruptedEntry.into());
}; };
unsafe { cell::decode_element::<Datacell, TrackedReader>(&mut self.f, dscr) } unsafe { cell::decode_element::<Datacell, TrackedReader>(&mut self.f, dscr) }
.map_err(|e| e.0) .map_err(|e| e.0)
@ -516,7 +516,7 @@ impl From<std::io::Error> for ErrorHack {
} }
impl From<()> for ErrorHack { impl From<()> for ErrorHack {
fn from(_: ()) -> Self { fn from(_: ()) -> Self {
Self(StorageError::DataBatchRestoreCorruptedEntry.into()) Self(StorageError::V1DataBatchDecodeCorruptedEntry.into())
} }
} }

@ -55,7 +55,7 @@ impl JournalAdapter for GNSAdapter {
fn decode_and_update_state(payload: &[u8], gs: &Self::GlobalState) -> RuntimeResult<()> { fn decode_and_update_state(payload: &[u8], gs: &Self::GlobalState) -> RuntimeResult<()> {
macro_rules! dispatch { macro_rules! dispatch {
($($item:ty),* $(,)?) => { ($($item:ty),* $(,)?) => {
[$(<$item as GNSEvent>::decode_and_update_global_state),*, |_, _| Err(TransactionError::DecodeUnknownTxnOp.into())] [$(<$item as GNSEvent>::decode_and_update_global_state),*, |_, _| Err(TransactionError::V1DecodeUnknownTxnOp.into())]
}; };
} }
static DISPATCH: [fn(&mut BufferedScanner, &GNSData) -> RuntimeResult<()>; 9] = dispatch!( static DISPATCH: [fn(&mut BufferedScanner, &GNSData) -> RuntimeResult<()>; 9] = dispatch!(
@ -69,7 +69,7 @@ impl JournalAdapter for GNSAdapter {
gns::model::DropModelTxn gns::model::DropModelTxn
); );
if payload.len() < 2 { if payload.len() < 2 {
return Err(TransactionError::DecodedUnexpectedEof.into()); return Err(TransactionError::V1DecodedUnexpectedEof.into());
} }
let mut scanner = BufferedScanner::new(&payload); let mut scanner = BufferedScanner::new(&payload);
let opc = unsafe { let opc = unsafe {
@ -78,7 +78,7 @@ impl JournalAdapter for GNSAdapter {
}; };
match DISPATCH[(opc as usize).min(DISPATCH.len())](&mut scanner, gs) { match DISPATCH[(opc as usize).min(DISPATCH.len())](&mut scanner, gs) {
Ok(()) if scanner.eof() => return Ok(()), Ok(()) if scanner.eof() => return Ok(()),
Ok(_) => Err(TransactionError::DecodeCorruptedPayloadMoreBytes.into()), Ok(_) => Err(TransactionError::V1DecodeCorruptedPayloadMoreBytes.into()),
Err(e) => Err(e), Err(e) => Err(e),
} }
} }

@ -215,7 +215,7 @@ impl<TA: JournalAdapter> JournalReader<TA> {
} }
match entry_metadata match entry_metadata
.event_source_marker() .event_source_marker()
.ok_or(StorageError::JournalLogEntryCorrupted)? .ok_or(StorageError::V1JournalDecodeLogEntryCorrupted)?
{ {
EventSourceMarker::ServerStandard => {} EventSourceMarker::ServerStandard => {}
EventSourceMarker::DriverClosed => { EventSourceMarker::DriverClosed => {
@ -230,7 +230,7 @@ impl<TA: JournalAdapter> JournalReader<TA> {
EventSourceMarker::DriverReopened | EventSourceMarker::RecoveryReverseLastJournal => { EventSourceMarker::DriverReopened | EventSourceMarker::RecoveryReverseLastJournal => {
// these two are only taken in close and error paths (respectively) so we shouldn't see them here; this is bad // these two are only taken in close and error paths (respectively) so we shouldn't see them here; this is bad
// two special directives in the middle of nowhere? incredible // two special directives in the middle of nowhere? incredible
return Err(StorageError::JournalCorrupted.into()); return Err(StorageError::V1JournalDecodeCorrupted.into());
} }
} }
// read payload // read payload
@ -263,10 +263,10 @@ impl<TA: JournalAdapter> JournalReader<TA> {
Ok(()) Ok(())
} else { } else {
// FIXME(@ohsayan): tolerate loss in this directive too // FIXME(@ohsayan): tolerate loss in this directive too
Err(StorageError::JournalCorrupted.into()) Err(StorageError::V1JournalDecodeCorrupted.into())
} }
} else { } else {
Err(StorageError::JournalCorrupted.into()) Err(StorageError::V1JournalDecodeCorrupted.into())
} }
} }
#[cold] // FIXME(@ohsayan): how bad can prod systems be? (clue: pretty bad, so look for possible changes) #[cold] // FIXME(@ohsayan): how bad can prod systems be? (clue: pretty bad, so look for possible changes)
@ -279,7 +279,7 @@ impl<TA: JournalAdapter> JournalReader<TA> {
self.__record_read_bytes(JournalEntryMetadata::SIZE); // FIXME(@ohsayan): don't assume read length? self.__record_read_bytes(JournalEntryMetadata::SIZE); // FIXME(@ohsayan): don't assume read length?
let mut entry_buf = [0u8; JournalEntryMetadata::SIZE]; let mut entry_buf = [0u8; JournalEntryMetadata::SIZE];
if self.log_file.read_buffer(&mut entry_buf).is_err() { if self.log_file.read_buffer(&mut entry_buf).is_err() {
return Err(StorageError::JournalCorrupted.into()); return Err(StorageError::V1JournalDecodeCorrupted.into());
} }
let entry = JournalEntryMetadata::decode(entry_buf); let entry = JournalEntryMetadata::decode(entry_buf);
let okay = (entry.event_id == self.evid as u128) let okay = (entry.event_id == self.evid as u128)
@ -290,7 +290,7 @@ impl<TA: JournalAdapter> JournalReader<TA> {
if okay { if okay {
return Ok(()); return Ok(());
} else { } else {
Err(StorageError::JournalCorrupted.into()) Err(StorageError::V1JournalDecodeCorrupted.into())
} }
} }
/// Read and apply all events in the given log file to the global state, returning the (open file, last event ID) /// Read and apply all events in the given log file to the global state, returning the (open file, last event ID)
@ -305,7 +305,7 @@ impl<TA: JournalAdapter> JournalReader<TA> {
if slf.closed { if slf.closed {
Ok((slf.log_file.downgrade_reader(), slf.evid)) Ok((slf.log_file.downgrade_reader(), slf.evid))
} else { } else {
Err(StorageError::JournalCorrupted.into()) Err(StorageError::V1JournalDecodeCorrupted.into())
} }
} }
} }

@ -42,7 +42,7 @@ fn rkey<T>(
) -> RuntimeResult<T> { ) -> RuntimeResult<T> {
match d.remove(key).map(transform) { match d.remove(key).map(transform) {
Some(Some(k)) => Ok(k), Some(Some(k)) => Ok(k),
_ => Err(StorageError::SysDBCorrupted.into()), _ => Err(StorageError::V1SysDBDecodeCorrupted.into()),
} }
} }
@ -95,14 +95,14 @@ impl RestoredSystemDatabase {
let mut userdata = userdata let mut userdata = userdata
.into_data() .into_data()
.and_then(Datacell::into_list) .and_then(Datacell::into_list)
.ok_or(StorageError::SysDBCorrupted)?; .ok_or(StorageError::V1SysDBDecodeCorrupted)?;
if userdata.len() != 1 { if userdata.len() != 1 {
return Err(StorageError::SysDBCorrupted.into()); return Err(StorageError::V1SysDBDecodeCorrupted.into());
} }
let user_password = userdata let user_password = userdata
.remove(0) .remove(0)
.into_bin() .into_bin()
.ok_or(StorageError::SysDBCorrupted)?; .ok_or(StorageError::V1SysDBDecodeCorrupted)?;
loaded_users.insert(username, user_password.into_boxed_slice()); loaded_users.insert(username, user_password.into_boxed_slice());
} }
// load sys data // load sys data
@ -117,7 +117,7 @@ impl RestoredSystemDatabase {
& sys_store.is_empty() & sys_store.is_empty()
& loaded_users.contains_key(SystemDatabase::ROOT_ACCOUNT)) & loaded_users.contains_key(SystemDatabase::ROOT_ACCOUNT))
{ {
return Err(StorageError::SysDBCorrupted.into()); return Err(StorageError::V1SysDBDecodeCorrupted.into());
} }
Ok(Self::new(loaded_users, sc, sv)) Ok(Self::new(loaded_users, sc, sv))
} }

@ -34,7 +34,7 @@ use {
core::GNSData, core::GNSData,
storage::{ storage::{
common_encoding::r1::impls::gns::GNSEvent, common_encoding::r1::impls::gns::GNSEvent,
v2::raw::journal::{self, EventLogDriver, JournalAdapterEvent}, v2::raw::journal::{self, EventLogDriver, JournalAdapterEvent, JournalSettings},
}, },
txn::gns::{ txn::gns::{
model::{ model::{
@ -61,11 +61,15 @@ pub struct GNSEventLog;
impl GNSDriver { impl GNSDriver {
const FILE_PATH: &'static str = "gns.db-tlog"; const FILE_PATH: &'static str = "gns.db-tlog";
pub fn open_gns_with_name(name: &str, gs: &GNSData) -> RuntimeResult<Self> { pub fn open_gns_with_name(
journal::open_journal(name, gs) name: &str,
gs: &GNSData,
settings: JournalSettings,
) -> RuntimeResult<Self> {
journal::open_journal(name, gs, settings)
} }
pub fn open_gns(gs: &GNSData) -> RuntimeResult<Self> { pub fn open_gns(gs: &GNSData, settings: JournalSettings) -> RuntimeResult<Self> {
Self::open_gns_with_name(Self::FILE_PATH, gs) Self::open_gns_with_name(Self::FILE_PATH, gs, settings)
} }
pub fn create_gns_with_name(name: &str) -> RuntimeResult<Self> { pub fn create_gns_with_name(name: &str) -> RuntimeResult<Self> {
journal::create_journal(name) journal::create_journal(name)

@ -46,7 +46,7 @@ use {
v2::raw::{ v2::raw::{
journal::{ journal::{
self, BatchAdapter, BatchAdapterSpec, BatchDriver, JournalAdapterEvent, self, BatchAdapter, BatchAdapterSpec, BatchDriver, JournalAdapterEvent,
RawJournalAdapter, JournalSettings, RawJournalAdapter,
}, },
spec::ModelDataBatchAofV1, spec::ModelDataBatchAofV1,
}, },
@ -66,8 +66,12 @@ use {
pub type ModelDriver = BatchDriver<ModelDataAdapter>; pub type ModelDriver = BatchDriver<ModelDataAdapter>;
impl ModelDriver { impl ModelDriver {
pub fn open_model_driver(mdl: &ModelData, model_data_file_path: &str) -> RuntimeResult<Self> { pub fn open_model_driver(
journal::open_journal(model_data_file_path, mdl) mdl: &ModelData,
model_data_file_path: &str,
settings: JournalSettings,
) -> RuntimeResult<Self> {
journal::open_journal(model_data_file_path, mdl, settings)
} }
/// Create a new event log /// Create a new event log
pub fn create_model_driver(model_data_file_path: &str) -> RuntimeResult<Self> { pub fn create_model_driver(model_data_file_path: &str) -> RuntimeResult<Self> {
@ -449,7 +453,7 @@ impl BatchAdapterSpec for ModelDataAdapter {
BatchType::Standard => {} BatchType::Standard => {}
} }
let pk_tag = TagUnique::try_from_raw(f.read_block().map(|[b]| b)?) let pk_tag = TagUnique::try_from_raw(f.read_block().map(|[b]| b)?)
.ok_or(StorageError::RawJournalCorrupted)?; .ok_or(StorageError::InternalDecodeStructureIllegalData)?;
let schema_version = u64::from_le_bytes(f.read_block()?); let schema_version = u64::from_le_bytes(f.read_block()?);
let column_count = u64::from_le_bytes(f.read_block()?); let column_count = u64::from_le_bytes(f.read_block()?);
Ok(BatchMetadata { Ok(BatchMetadata {
@ -656,7 +660,7 @@ mod restore_impls {
f.read(&mut data)?; f.read(&mut data)?;
if pk_type == TagUnique::Str { if pk_type == TagUnique::Str {
if core::str::from_utf8(&data).is_err() { if core::str::from_utf8(&data).is_err() {
return Err(StorageError::DataBatchRestoreCorruptedEntry.into()); return Err(StorageError::InternalDecodeStructureCorruptedPayload.into());
} }
} }
unsafe { unsafe {
@ -679,7 +683,7 @@ mod restore_impls {
let mut this_col_cnt = batch_info.column_count; let mut this_col_cnt = batch_info.column_count;
while this_col_cnt != 0 { while this_col_cnt != 0 {
let Some(dscr) = StorageCellTypeID::try_from_raw(f.read_block().map(|[b]| b)?) else { let Some(dscr) = StorageCellTypeID::try_from_raw(f.read_block().map(|[b]| b)?) else {
return Err(StorageError::DataBatchRestoreCorruptedEntry.into()); return Err(StorageError::InternalDecodeStructureIllegalData.into());
}; };
let cell = unsafe { cell::decode_element::<Datacell, _>(f, dscr) }.map_err(|e| e.0)?; let cell = unsafe { cell::decode_element::<Datacell, _>(f, dscr) }.map_err(|e| e.0)?;
row.push(cell); row.push(cell);
@ -705,7 +709,7 @@ mod restore_impls {
} }
impl From<()> for ErrorHack { impl From<()> for ErrorHack {
fn from(_: ()) -> Self { fn from(_: ()) -> Self {
Self(StorageError::DataBatchRestoreCorruptedEntry.into()) Self(StorageError::InternalDecodeStructureCorrupted.into())
} }
} }
impl<'a> DataSource for TrackedReaderContext<'a, ModelDataBatchAofV1> { impl<'a> DataSource for TrackedReaderContext<'a, ModelDataBatchAofV1> {

@ -25,25 +25,34 @@
*/ */
use { use {
self::impls::mdl_journal::{BatchStats, FullModel}, self::{
impls::mdl_journal::{BatchStats, FullModel},
raw::journal::{JournalSettings, RepairResult},
},
super::{common::interface::fs::FileSystem, v1, SELoaded}, super::{common::interface::fs::FileSystem, v1, SELoaded},
crate::engine::{ crate::{
config::Configuration, engine::{
core::{ config::Configuration,
system_db::{SystemDatabase, VerifyUser}, core::{
GNSData, GlobalNS, system_db::{SystemDatabase, VerifyUser},
}, EntityIDRef, GNSData, GlobalNS,
fractal::{context, FractalGNSDriver}, },
storage::common::paths_v1, fractal::{context, FractalGNSDriver},
txn::{ storage::{
gns::{ common::paths_v1,
model::CreateModelTxn, v2::raw::journal::{self, JournalRepairMode},
space::CreateSpaceTxn, },
sysctl::{AlterUserTxn, CreateUserTxn}, txn::{
gns::{
model::CreateModelTxn,
space::CreateSpaceTxn,
sysctl::{AlterUserTxn, CreateUserTxn},
},
SpaceIDRef,
}, },
SpaceIDRef, RuntimeResult,
}, },
RuntimeResult, util,
}, },
impls::mdl_journal::ModelDriver, impls::mdl_journal::ModelDriver,
}; };
@ -120,15 +129,18 @@ pub fn initialize_new(config: &Configuration) -> RuntimeResult<SELoaded> {
pub fn restore(cfg: &Configuration) -> RuntimeResult<SELoaded> { pub fn restore(cfg: &Configuration) -> RuntimeResult<SELoaded> {
let gns = GNSData::empty(); let gns = GNSData::empty();
context::set_dmsg("loading gns"); context::set_dmsg("loading gns");
let mut gns_driver = impls::gns_log::GNSDriver::open_gns(&gns)?; let mut gns_driver = impls::gns_log::GNSDriver::open_gns(&gns, JournalSettings::default())?;
for (id, model) in gns.idx_models().write().iter_mut() { for (id, model) in gns.idx_models().write().iter_mut() {
let model_data = model.data(); let model_data = model.data();
let space_uuid = gns.idx().read().get(id.space()).unwrap().get_uuid(); let space_uuid = gns.idx().read().get(id.space()).unwrap().get_uuid();
let model_data_file_path = let model_data_file_path =
paths_v1::model_path(id.space(), space_uuid, id.entity(), model_data.get_uuid()); paths_v1::model_path(id.space(), space_uuid, id.entity(), model_data.get_uuid());
context::set_dmsg(format!("loading model driver in {model_data_file_path}")); context::set_dmsg(format!("loading model driver in {model_data_file_path}"));
let model_driver = let model_driver = impls::mdl_journal::ModelDriver::open_model_driver(
impls::mdl_journal::ModelDriver::open_model_driver(model_data, &model_data_file_path)?; model_data,
&model_data_file_path,
JournalSettings::default(),
)?;
model.driver().initialize_model_driver(model_driver); model.driver().initialize_model_driver(model_driver);
unsafe { unsafe {
// UNSAFE(@ohsayan): all pieces of data are upgraded by now, so vacuum // UNSAFE(@ohsayan): all pieces of data are upgraded by now, so vacuum
@ -153,3 +165,69 @@ pub fn restore(cfg: &Configuration) -> RuntimeResult<SELoaded> {
gns: GlobalNS::new(gns, FractalGNSDriver::new(gns_driver)), gns: GlobalNS::new(gns, FractalGNSDriver::new(gns_driver)),
}) })
} }
pub fn repair() -> RuntimeResult<()> {
// back up all files
let backup_dir = format!(
"backups/{}-before-recovery-process",
util::time_now_string()
);
context::set_dmsg("creating backup directory");
FileSystem::create_dir_all(&backup_dir)?;
context::set_dmsg("backing up GNS");
FileSystem::copy(GNS_PATH, &format!("{backup_dir}/{GNS_PATH}"))?; // backup GNS
context::set_dmsg("backing up data directory");
FileSystem::copy_directory(DATA_DIR, &format!("{backup_dir}/{DATA_DIR}"))?; // backup data
info!("All data backed up in {backup_dir}");
// check and attempt repair: GNS
let gns = GNSData::empty();
context::set_dmsg("repair GNS");
print_repair_info(
journal::repair_journal::<raw::journal::EventLogAdapter<impls::gns_log::GNSEventLog>>(
GNS_PATH,
&gns,
JournalSettings::default(),
JournalRepairMode::Simple,
)?,
"GNS",
);
// check and attempt repair: models
let models = gns.idx_models().read();
for (space_id, space) in gns.idx().read().iter() {
for model_id in space.models().iter() {
let model = models.get(&EntityIDRef::new(&space_id, &model_id)).unwrap();
let model_data_file_path = paths_v1::model_path(
&space_id,
space.get_uuid(),
&model_id,
model.data().get_uuid(),
);
context::set_dmsg(format!("repairing {model_data_file_path}"));
print_repair_info(
journal::repair_journal::<
raw::journal::BatchAdapter<impls::mdl_journal::ModelDataAdapter>,
>(
&model_data_file_path,
model.data(),
JournalSettings::default(),
JournalRepairMode::Simple,
)?,
&model_data_file_path,
)
}
}
Ok(())
}
fn print_repair_info(result: RepairResult, id: &str) {
match result {
RepairResult::NoErrors => info!("repair: no errors detected in {id}"),
RepairResult::UnspecifiedLoss(definitely_lost) => {
if definitely_lost == 0 {
warn!("repair: LOST DATA. repaired {id} but lost an unspecified amount of data")
} else {
warn!("repair: LOST DATA. repaired {id} but lost atleast {definitely_lost} trailing bytes")
}
}
}
}

@ -47,7 +47,8 @@ mod raw;
#[cfg(test)] #[cfg(test)]
mod tests; mod tests;
pub use raw::{ pub use raw::{
create_journal, open_journal, RawJournalAdapter, RawJournalAdapterEvent as JournalAdapterEvent, create_journal, open_journal, repair_journal, JournalRepairMode, JournalSettings,
RawJournalAdapter, RawJournalAdapterEvent as JournalAdapterEvent, RepairResult,
}; };
/* /*
@ -136,7 +137,7 @@ impl<EL: EventLogSpec> RawJournalAdapter for EventLogAdapter<EL> {
this_checksum.update(&plen.to_le_bytes()); this_checksum.update(&plen.to_le_bytes());
this_checksum.update(&pl); this_checksum.update(&pl);
if this_checksum.finish() != expected_checksum { if this_checksum.finish() != expected_checksum {
return Err(StorageError::RawJournalCorrupted.into()); return Err(StorageError::RawJournalDecodeCorruptionInBatchMetadata.into());
} }
<EL as EventLogSpec>::DECODE_DISPATCH <EL as EventLogSpec>::DECODE_DISPATCH
[<<EL as EventLogSpec>::EventMeta as TaggedEnum>::dscr_u64(&meta) as usize]( [<<EL as EventLogSpec>::EventMeta as TaggedEnum>::dscr_u64(&meta) as usize](
@ -165,11 +166,15 @@ pub struct BatchAdapter<BA: BatchAdapterSpec>(PhantomData<BA>);
#[cfg(test)] #[cfg(test)]
impl<BA: BatchAdapterSpec> BatchAdapter<BA> { impl<BA: BatchAdapterSpec> BatchAdapter<BA> {
/// Open a new batch journal /// Open a new batch journal
pub fn open(name: &str, gs: &BA::GlobalState) -> RuntimeResult<BatchDriver<BA>> pub fn open(
name: &str,
gs: &BA::GlobalState,
settings: JournalSettings,
) -> RuntimeResult<BatchDriver<BA>>
where where
BA::Spec: FileSpecV1<DecodeArgs = ()>, BA::Spec: FileSpecV1<DecodeArgs = ()>,
{ {
raw::open_journal::<BatchAdapter<BA>>(name, gs) raw::open_journal::<BatchAdapter<BA>>(name, gs, settings)
} }
/// Create a new batch journal /// Create a new batch journal
pub fn create(name: &str) -> RuntimeResult<BatchDriver<BA>> pub fn create(name: &str) -> RuntimeResult<BatchDriver<BA>>
@ -278,7 +283,7 @@ impl<BA: BatchAdapterSpec> RawJournalAdapter for BatchAdapter<BA> {
let event_type = <<BA as BatchAdapterSpec>::EventType as TaggedEnum>::try_from_raw( let event_type = <<BA as BatchAdapterSpec>::EventType as TaggedEnum>::try_from_raw(
f.read_block().map(|[b]| b)?, f.read_block().map(|[b]| b)?,
) )
.ok_or(StorageError::RawJournalCorrupted)?; .ok_or(StorageError::InternalDecodeStructureIllegalData)?;
// is this an early exit marker? if so, exit // is this an early exit marker? if so, exit
if <BA as BatchAdapterSpec>::is_early_exit(&event_type) { if <BA as BatchAdapterSpec>::is_early_exit(&event_type) {
break; break;
@ -299,7 +304,7 @@ impl<BA: BatchAdapterSpec> RawJournalAdapter for BatchAdapter<BA> {
// finish applying batch // finish applying batch
BA::finish(batch_state, batch_md, gs)?; BA::finish(batch_state, batch_md, gs)?;
} else { } else {
return Err(StorageError::RawJournalCorrupted.into()); return Err(StorageError::RawJournalDecodeBatchContentsMismatch.into());
} }
} }
// and finally, verify checksum // and finally, verify checksum
@ -308,7 +313,7 @@ impl<BA: BatchAdapterSpec> RawJournalAdapter for BatchAdapter<BA> {
if real_checksum == stored_checksum { if real_checksum == stored_checksum {
Ok(()) Ok(())
} else { } else {
Err(StorageError::RawJournalCorrupted.into()) Err(StorageError::RawJournalDecodeBatchIntegrityFailure.into())
} }
} }
} }

@ -30,7 +30,8 @@ mod tests;
use { use {
crate::{ crate::{
engine::{ engine::{
error::StorageError, error::{ErrorKind, StorageError, TransactionError},
fractal::error::Error,
mem::unsafe_apis::memcpy, mem::unsafe_apis::memcpy,
storage::common::{ storage::common::{
checksum::SCrc64, checksum::SCrc64,
@ -44,7 +45,7 @@ use {
util::compiler::TaggedEnum, util::compiler::TaggedEnum,
}, },
core::fmt, core::fmt,
std::ops::Range, std::{io::ErrorKind as IoErrorKind, ops::Range},
}; };
/* /*
@ -67,15 +68,44 @@ where
pub fn open_journal<J: RawJournalAdapter>( pub fn open_journal<J: RawJournalAdapter>(
log_path: &str, log_path: &str,
gs: &J::GlobalState, gs: &J::GlobalState,
settings: JournalSettings,
) -> RuntimeResult<RawJournalWriter<J>> ) -> RuntimeResult<RawJournalWriter<J>>
where where
J::Spec: FileSpecV1<DecodeArgs = ()>, J::Spec: FileSpecV1<DecodeArgs = ()>,
{ {
let log = SdssFile::<J::Spec>::open(log_path)?; let log = SdssFile::<J::Spec>::open(log_path)?;
let (initializer, file) = RawJournalReader::<J>::scroll(log, gs)?; let (initializer, file) = RawJournalReader::<J>::scroll(log, gs, settings)?;
RawJournalWriter::new(initializer, file) RawJournalWriter::new(initializer, file)
} }
#[derive(Debug, PartialEq)]
/// The result of a journal repair operation
pub enum RepairResult {
/// No errors were detected
NoErrors,
/// Definitely lost n bytes, but might have lost more
UnspecifiedLoss(u64),
}
/**
Attempts to repair the given journal, **in-place** and returns the number of bytes that were definitely lost and could not
be repaired.
**WARNING**: Backup before calling this
*/
pub fn repair_journal<J: RawJournalAdapter>(
log_path: &str,
gs: &J::GlobalState,
settings: JournalSettings,
repair_mode: JournalRepairMode,
) -> RuntimeResult<RepairResult>
where
J::Spec: FileSpecV1<DecodeArgs = ()>,
{
let log = SdssFile::<J::Spec>::open(log_path)?;
RawJournalReader::<J>::repair(log, gs, settings, repair_mode).map(|(lost, ..)| lost)
}
#[derive(Debug)] #[derive(Debug)]
pub struct JournalInitializer { pub struct JournalInitializer {
cursor: u64, cursor: u64,
@ -118,10 +148,20 @@ impl JournalInitializer {
*/ */
#[cfg(test)] #[cfg(test)]
pub fn obtain_trace() -> Vec<JournalTraceEvent> { pub fn debug_get_trace() -> Vec<JournalTraceEvent> {
local_mut!(TRACE, |t| core::mem::take(t)) local_mut!(TRACE, |t| core::mem::take(t))
} }
#[cfg(test)]
pub fn debug_get_offsets() -> std::collections::BTreeMap<u64, u64> {
local_mut!(OFFSETS, |offsets| core::mem::take(offsets))
}
#[cfg(test)]
pub fn debug_set_offset_tracking(track: bool) {
local_mut!(TRACE_OFFSETS, |track_| *track_ = track)
}
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
#[cfg(test)] #[cfg(test)]
pub enum JournalTraceEvent { pub enum JournalTraceEvent {
@ -145,6 +185,7 @@ pub enum JournalReaderTraceEvent {
ClosedAndReachedEof, ClosedAndReachedEof,
ReopenSuccess, ReopenSuccess,
// event // event
LookingForEvent,
AttemptingEvent(u64), AttemptingEvent(u64),
DetectedServerEvent, DetectedServerEvent,
ServerEventMetadataParsed, ServerEventMetadataParsed,
@ -183,9 +224,26 @@ pub(super) enum JournalWriterTraceEvent {
DriverClosed, DriverClosed,
} }
#[cfg(test)]
local! { local! {
#[cfg(test)]
static TRACE: Vec<JournalTraceEvent> = Vec::new(); static TRACE: Vec<JournalTraceEvent> = Vec::new();
static OFFSETS: std::collections::BTreeMap<u64, u64> = Default::default();
static TRACE_OFFSETS: bool = false;
}
macro_rules! jtrace_event_offset {
($id:expr, $offset:expr) => {
#[cfg(test)]
{
local_ref!(TRACE_OFFSETS, |should_trace| {
if *should_trace {
local_mut!(OFFSETS, |offsets| assert!(offsets
.insert($id, $offset)
.is_none()))
}
})
}
};
} }
macro_rules! jtrace { macro_rules! jtrace {
@ -322,7 +380,6 @@ impl DriverEvent {
const OFFSET_6_LAST_TXN_ID: Range<usize> = const OFFSET_6_LAST_TXN_ID: Range<usize> =
Self::OFFSET_5_LAST_OFFSET.end..Self::OFFSET_5_LAST_OFFSET.end + sizeof!(u64); Self::OFFSET_5_LAST_OFFSET.end..Self::OFFSET_5_LAST_OFFSET.end + sizeof!(u64);
/// Create a new driver event (checksum auto-computed) /// Create a new driver event (checksum auto-computed)
#[cfg(test)]
fn new( fn new(
txn_id: u128, txn_id: u128,
driver_event: DriverEventKind, driver_event: DriverEventKind,
@ -364,7 +421,6 @@ impl DriverEvent {
} }
} }
/// Encode the current driver event /// Encode the current driver event
#[cfg(test)]
fn encode_self(&self) -> [u8; 64] { fn encode_self(&self) -> [u8; 64] {
Self::encode( Self::encode(
self.txn_id, self.txn_id,
@ -576,7 +632,7 @@ impl<J: RawJournalAdapter> RawJournalWriter<J> {
Ok(()) Ok(())
} else { } else {
// so, the on-disk file probably has some partial state. this is bad. throw an error // so, the on-disk file probably has some partial state. this is bad. throw an error
Err(StorageError::RawJournalRuntimeCriticalLwtHBFail.into()) Err(StorageError::RawJournalRuntimeHeartbeatFail.into())
} }
} }
} }
@ -590,6 +646,7 @@ impl<J: RawJournalAdapter> RawJournalWriter<J> {
self.txn_id += 1; self.txn_id += 1;
let ret = f(self, id as u128); let ret = f(self, id as u128);
if ret.is_ok() { if ret.is_ok() {
jtrace_event_offset!(id, self.log_file.cursor());
self.known_txn_id = id; self.known_txn_id = id;
self.known_txn_offset = self.log_file.cursor(); self.known_txn_offset = self.log_file.cursor();
} }
@ -640,6 +697,37 @@ pub struct RawJournalReader<J: RawJournalAdapter> {
last_txn_offset: u64, last_txn_offset: u64,
last_txn_checksum: u64, last_txn_checksum: u64,
stats: JournalStats, stats: JournalStats,
_settings: JournalSettings,
state: JournalState,
}
#[derive(Debug, PartialEq)]
enum JournalState {
AwaitingEvent,
AwaitingServerEvent,
AwaitingClose,
AwaitingReopen,
}
impl Default for JournalState {
fn default() -> Self {
Self::AwaitingEvent
}
}
#[derive(Debug)]
pub struct JournalSettings {}
impl Default for JournalSettings {
fn default() -> Self {
Self::new()
}
}
impl JournalSettings {
pub fn new() -> Self {
Self {}
}
} }
#[derive(Debug)] #[derive(Debug)]
@ -658,28 +746,36 @@ impl JournalStats {
} }
impl<J: RawJournalAdapter> RawJournalReader<J> { impl<J: RawJournalAdapter> RawJournalReader<J> {
pub fn scroll( fn scroll(
file: SdssFile<<J as RawJournalAdapter>::Spec>, file: SdssFile<<J as RawJournalAdapter>::Spec>,
gs: &J::GlobalState, gs: &J::GlobalState,
settings: JournalSettings,
) -> RuntimeResult<(JournalInitializer, SdssFile<J::Spec>)> { ) -> RuntimeResult<(JournalInitializer, SdssFile<J::Spec>)> {
let reader = TrackedReader::with_cursor( let reader = TrackedReader::with_cursor(
file, file,
<<J as RawJournalAdapter>::Spec as FileSpecV1>::SIZE as u64, <<J as RawJournalAdapter>::Spec as FileSpecV1>::SIZE as u64,
)?; )?;
jtrace_reader!(Initialized); jtrace_reader!(Initialized);
let mut me = Self::new(reader, 0, 0, 0, 0); let mut me = Self::new(reader, 0, 0, 0, 0, settings);
me._scroll(gs).map(|jinit| (jinit, me.tr.into_inner()))
}
fn _scroll(&mut self, gs: &J::GlobalState) -> RuntimeResult<JournalInitializer> {
loop { loop {
if me._apply_next_event_and_stop(gs)? { jtrace_reader!(LookingForEvent);
jtrace_reader!(Completed); match self._apply_next_event_and_stop(gs) {
let initializer = JournalInitializer::new( Ok(true) => {
me.tr.cursor(), jtrace_reader!(Completed);
me.tr.checksum(), let initializer = JournalInitializer::new(
me.txn_id, self.tr.cursor(),
// NB: the last txn offset is important because it indicates that the log is new self.tr.checksum(),
me.last_txn_offset, self.txn_id,
); // NB: the last txn offset is important because it indicates that the log is new
let file = me.tr.into_inner(); self.last_txn_offset,
return Ok((initializer, file)); );
return Ok(initializer);
}
Ok(false) => self.state = JournalState::AwaitingEvent,
Err(e) => return Err(e),
} }
} }
} }
@ -689,6 +785,7 @@ impl<J: RawJournalAdapter> RawJournalReader<J> {
last_txn_id: u64, last_txn_id: u64,
last_txn_offset: u64, last_txn_offset: u64,
last_txn_checksum: u64, last_txn_checksum: u64,
settings: JournalSettings,
) -> Self { ) -> Self {
Self { Self {
tr: reader, tr: reader,
@ -697,6 +794,8 @@ impl<J: RawJournalAdapter> RawJournalReader<J> {
last_txn_offset, last_txn_offset,
last_txn_checksum, last_txn_checksum,
stats: JournalStats::new(), stats: JournalStats::new(),
_settings: settings,
state: JournalState::AwaitingEvent,
} }
} }
fn __refresh_known_txn(me: &mut Self) { fn __refresh_known_txn(me: &mut Self) {
@ -707,6 +806,156 @@ impl<J: RawJournalAdapter> RawJournalReader<J> {
} }
} }
#[derive(Debug, PartialEq)]
pub enum JournalRepairMode {
Simple,
}
impl<J: RawJournalAdapter> RawJournalReader<J> {
fn repair(
file: SdssFile<<J as RawJournalAdapter>::Spec>,
gs: &J::GlobalState,
settings: JournalSettings,
repair_mode: JournalRepairMode,
) -> RuntimeResult<(RepairResult, JournalInitializer, SdssFile<J::Spec>)> {
let reader = TrackedReader::with_cursor(
file,
<<J as RawJournalAdapter>::Spec as FileSpecV1>::SIZE as u64,
)?;
jtrace_reader!(Initialized);
let mut me = Self::new(reader, 0, 0, 0, 0, settings);
match me._scroll(gs) {
Ok(init) => return Ok((RepairResult::NoErrors, init, me.tr.into_inner())),
Err(e) => me.start_repair(e, repair_mode),
}
}
fn start_repair(
self,
e: Error,
repair_mode: JournalRepairMode,
) -> RuntimeResult<(RepairResult, JournalInitializer, SdssFile<J::Spec>)> {
let lost = if self.last_txn_offset == 0 {
// we haven't scanned any events and already hit an error
// so essentially, we lost the entire log
self.tr.cached_size() - <J::Spec as FileSpecV1>::SIZE as u64
} else {
self.tr.cached_size() - self.last_txn_offset
};
let repair_result = RepairResult::UnspecifiedLoss(lost);
match repair_mode {
JournalRepairMode::Simple => {}
}
// now it's our task to determine exactly what happened
match e.kind() {
ErrorKind::IoError(io) => match io.kind() {
IoErrorKind::UnexpectedEof => {
/*
this is the only kind of error that we can actually repair since it indicates that a part of the
file is "missing." we can't deal with things like permission errors. that's supposed to be handled
by the admin by looking through the error logs
*/
}
_ => return Err(e),
},
ErrorKind::Storage(e) => match e {
// unreachable errors (no execution path here)
StorageError::RawJournalRuntimeHeartbeatFail // can't reach runtime error before driver start
| StorageError::RawJournalRuntimeDirty
| StorageError::FileDecodeHeaderVersionMismatch // should be caught earlier
| StorageError::FileDecodeHeaderCorrupted // should be caught earlier
| StorageError::V1JournalDecodeLogEntryCorrupted // v1 errors can't be raised here
| StorageError::V1JournalDecodeCorrupted
| StorageError::V1DataBatchDecodeCorruptedBatch
| StorageError::V1DataBatchDecodeCorruptedEntry
| StorageError::V1DataBatchDecodeCorruptedBatchFile
| StorageError::V1SysDBDecodeCorrupted
| StorageError::V1DataBatchRuntimeCloseError => unreachable!(),
// possible errors
StorageError::InternalDecodeStructureCorrupted
| StorageError::InternalDecodeStructureCorruptedPayload
| StorageError::InternalDecodeStructureIllegalData
| StorageError::RawJournalDecodeEventCorruptedMetadata
| StorageError::RawJournalDecodeEventCorruptedPayload
| StorageError::RawJournalDecodeBatchContentsMismatch
| StorageError::RawJournalDecodeBatchIntegrityFailure
| StorageError::RawJournalDecodeInvalidEvent
| StorageError::RawJournalDecodeCorruptionInBatchMetadata => {}
},
ErrorKind::Txn(txerr) => match txerr {
// unreachable errors
TransactionError::V1DecodeCorruptedPayloadMoreBytes // no v1 errors
| TransactionError::V1DecodedUnexpectedEof
| TransactionError::V1DecodeUnknownTxnOp => unreachable!(),
// possible errors
TransactionError::OnRestoreDataConflictAlreadyExists |
TransactionError::OnRestoreDataMissing |
TransactionError::OnRestoreDataConflictMismatch => {},
},
// these errors do not have an execution pathway
ErrorKind::Other(_) => unreachable!(),
ErrorKind::Config(_) => unreachable!(),
}
/*
revert log. record previous signatures.
*/
l!(let known_event_id, known_event_offset, known_event_checksum = self.last_txn_id, self.last_txn_offset, self.last_txn_checksum);
let mut last_logged_checksum = self.tr.checksum();
let mut base_log = self.tr.into_inner();
if known_event_offset == 0 {
// no event, so just trim upto header
base_log.truncate(<J::Spec as FileSpecV1>::SIZE as _)?;
} else {
base_log.truncate(known_event_offset)?;
}
/*
see what needs to be done next
*/
match self.state {
JournalState::AwaitingEvent
| JournalState::AwaitingServerEvent
| JournalState::AwaitingClose => {
/*
no matter what the last event was (and definitely not a close since if we are expecting a close the log was not already closed),
the log is in a dirty state that can only be resolved by closing it
*/
let drv_close = DriverEvent::new(
if known_event_offset == 0 {
// no event occurred
0
} else {
// something happened prior to this, so we'll use an incremented ID for this event
known_event_id + 1
} as u128,
DriverEventKind::Closed,
known_event_checksum,
known_event_offset,
known_event_id,
);
let drv_close_event = drv_close.encode_self();
last_logged_checksum.update(&drv_close_event);
base_log.fsynced_write(&drv_close_event)?;
}
JournalState::AwaitingReopen => {
// extra bytes indicating low to severe corruption; last event is a close, so with the revert the log is now clean
}
}
let jinit_cursor = known_event_offset + DriverEvent::FULL_EVENT_SIZE as u64;
let jinit_last_txn_offset = jinit_cursor; // same as cursor
let jinit_event_id = known_event_id + 2; // since we already used +1
let jinit_checksum = last_logged_checksum;
Ok((
repair_result,
JournalInitializer::new(
jinit_cursor,
jinit_checksum,
jinit_event_id,
jinit_last_txn_offset,
),
base_log,
))
}
}
impl<J: RawJournalAdapter> RawJournalReader<J> { impl<J: RawJournalAdapter> RawJournalReader<J> {
fn _apply_next_event_and_stop(&mut self, gs: &J::GlobalState) -> RuntimeResult<bool> { fn _apply_next_event_and_stop(&mut self, gs: &J::GlobalState) -> RuntimeResult<bool> {
let txn_id = u128::from_le_bytes(self.tr.read_block()?); let txn_id = u128::from_le_bytes(self.tr.read_block()?);
@ -716,12 +965,13 @@ impl<J: RawJournalAdapter> RawJournalReader<J> {
expected: self.txn_id, expected: self.txn_id,
current: txn_id as u64 current: txn_id as u64
}); });
return Err(StorageError::RawJournalEventCorruptedMetadata.into()); return Err(StorageError::RawJournalDecodeEventCorruptedMetadata.into());
} }
jtrace_reader!(AttemptingEvent(txn_id as u64)); jtrace_reader!(AttemptingEvent(txn_id as u64));
// check for a server event // check for a server event
// is this a server event? // is this a server event?
if meta & SERVER_EV_MASK != 0 { if meta & SERVER_EV_MASK != 0 {
self.state = JournalState::AwaitingServerEvent;
jtrace_reader!(DetectedServerEvent); jtrace_reader!(DetectedServerEvent);
let meta = meta & !SERVER_EV_MASK; let meta = meta & !SERVER_EV_MASK;
match J::parse_event_meta(meta) { match J::parse_event_meta(meta) {
@ -740,9 +990,10 @@ impl<J: RawJournalAdapter> RawJournalReader<J> {
Err(e) => return Err(e), Err(e) => return Err(e),
} }
} }
None => return Err(StorageError::RawJournalEventCorruptedMetadata.into()), None => return Err(StorageError::RawJournalDecodeEventCorruptedMetadata.into()),
} }
} }
self.state = JournalState::AwaitingClose;
return self.handle_close(txn_id, meta); return self.handle_close(txn_id, meta);
} }
fn handle_close( fn handle_close(
@ -772,9 +1023,9 @@ impl<J: RawJournalAdapter> RawJournalReader<J> {
.. ..
}) => { }) => {
jtrace_reader!(ErrExpectedCloseGotReopen); jtrace_reader!(ErrExpectedCloseGotReopen);
return Err(StorageError::RawJournalInvalidEvent.into()); return Err(StorageError::RawJournalDecodeInvalidEvent.into());
} }
None => return Err(StorageError::RawJournalEventCorrupted.into()), None => return Err(StorageError::RawJournalDecodeEventCorruptedPayload.into()),
}; };
jtrace_reader!(DriverEventExpectedCloseGotClose); jtrace_reader!(DriverEventExpectedCloseGotClose);
// a driver closed event; we've checked integrity, but we must check the field values // a driver closed event; we've checked integrity, but we must check the field values
@ -787,27 +1038,29 @@ impl<J: RawJournalAdapter> RawJournalReader<J> {
jtrace_reader!(DriverEventInvalidMetadata); jtrace_reader!(DriverEventInvalidMetadata);
// either the block is corrupted or the data we read is corrupted; either way, // either the block is corrupted or the data we read is corrupted; either way,
// we're going to refuse to read this // we're going to refuse to read this
return Err(StorageError::RawJournalCorrupted.into()); return Err(StorageError::RawJournalDecodeEventCorruptedMetadata.into());
} }
self.stats.driver_events += 1; self.stats.driver_events += 1;
// update // update
Self::__refresh_known_txn(self); Self::__refresh_known_txn(self);
// full metadata validated; this is a valid close event but is it actually a close // full metadata validated; this is a valid close event, but is it actually a close?
if self.tr.is_eof() { if self.tr.is_eof() {
jtrace_reader!(ClosedAndReachedEof); jtrace_reader!(ClosedAndReachedEof);
// yes, we're done // yes, we're done
return Ok(true); return Ok(true);
} }
self.state = JournalState::AwaitingReopen;
jtrace_reader!(DriverEventExpectingReopenBlock);
return self.handle_reopen(); return self.handle_reopen();
} }
fn handle_reopen(&mut self) -> RuntimeResult<bool> { fn handle_reopen(&mut self) -> RuntimeResult<bool> {
jtrace_reader!(AttemptingEvent(self.txn_id as u64)); jtrace_reader!(AttemptingEvent(self.txn_id as u64));
jtrace_reader!(DriverEventExpectingReopenBlock);
// now we must look for a reopen event // now we must look for a reopen event
let event_block = self.tr.read_block::<{ DriverEvent::FULL_EVENT_SIZE }>()?; let event_block = self.tr.read_block::<{ DriverEvent::FULL_EVENT_SIZE }>()?;
let reopen_event = match DriverEvent::decode(event_block) { let reopen_event = match DriverEvent::decode(event_block) {
Some(ev) if ev.event == DriverEventKind::Reopened => ev, Some(ev) if ev.event == DriverEventKind::Reopened => ev,
None | Some(_) => return Err(StorageError::RawJournalEventCorrupted.into()), Some(_) => return Err(StorageError::RawJournalDecodeInvalidEvent.into()),
None => return Err(StorageError::RawJournalDecodeEventCorruptedPayload.into()),
}; };
jtrace_reader!(DriverEventExpectingReopenGotReopen); jtrace_reader!(DriverEventExpectingReopenGotReopen);
let valid_meta = okay! { let valid_meta = okay! {
@ -824,7 +1077,7 @@ impl<J: RawJournalAdapter> RawJournalReader<J> {
Ok(false) Ok(false)
} else { } else {
jtrace_reader!(ErrInvalidReopenMetadata); jtrace_reader!(ErrInvalidReopenMetadata);
Err(StorageError::RawJournalCorrupted.into()) Err(StorageError::RawJournalDecodeEventCorruptedMetadata.into())
} }
} }
} }

@ -1,5 +1,5 @@
/* /*
* Created on Tue Jan 30 2024 * Created on Tue Mar 26 2024
* *
* This file is a part of Skytable * This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
@ -26,173 +26,15 @@
use { use {
super::{ super::{
create_journal, open_journal, CommitPreference, DriverEvent, DriverEventKind, super::{
JournalInitializer, RawJournalAdapter, RawJournalAdapterEvent, RawJournalWriter, create_journal, debug_get_trace, open_journal, DriverEventKind,
}, JournalReaderTraceEvent, JournalSettings, JournalWriterTraceEvent, RawJournalWriter,
crate::engine::{
error::StorageError,
fractal::error::ErrorContext,
storage::{
common::sdss::sdss_r1::rw::TrackedReader,
v2::raw::{
journal::raw::{JournalReaderTraceEvent, JournalWriterTraceEvent},
spec::SystemDatabaseV1,
},
}, },
RuntimeResult, SimpleDB, SimpleDBJournal,
}, },
std::cell::RefCell, crate::engine::fractal::error::ErrorContext,
}; };
#[test]
fn encode_decode_meta() {
let dv1 = DriverEvent::new(u128::MAX - 1, DriverEventKind::Reopened, 0, 0, 0);
let encoded1 = dv1.encode_self();
let decoded1 = DriverEvent::decode(encoded1).unwrap();
assert_eq!(dv1, decoded1);
}
/*
impls for journal tests
*/
#[derive(Debug, Clone, PartialEq)]
pub struct SimpleDB {
data: RefCell<Vec<String>>,
}
impl SimpleDB {
fn new() -> Self {
Self {
data: RefCell::default(),
}
}
fn data(&self) -> std::cell::Ref<'_, Vec<String>> {
self.data.borrow()
}
fn clear(&mut self, log: &mut RawJournalWriter<SimpleDBJournal>) -> RuntimeResult<()> {
log.commit_event(DbEventClear)?;
self.data.get_mut().clear();
Ok(())
}
fn pop(&mut self, log: &mut RawJournalWriter<SimpleDBJournal>) -> RuntimeResult<()> {
self.data.get_mut().pop().unwrap();
log.commit_event(DbEventPop)?;
Ok(())
}
fn push(
&mut self,
log: &mut RawJournalWriter<SimpleDBJournal>,
new: impl ToString,
) -> RuntimeResult<()> {
let new = new.to_string();
log.commit_event(DbEventPush(&new))?;
self.data.get_mut().push(new);
Ok(())
}
}
/*
event impls
*/
pub struct SimpleDBJournal;
struct DbEventPush<'a>(&'a str);
struct DbEventPop;
struct DbEventClear;
trait SimpleDBEvent: Sized {
const OPC: u8;
fn write_buffered(self, _: &mut Vec<u8>);
}
macro_rules! impl_db_event {
($($ty:ty as $code:expr $(=> $expr:expr)?),*) => {
$(impl SimpleDBEvent for $ty {
const OPC: u8 = $code;
fn write_buffered(self, buf: &mut Vec<u8>) { let _ = buf; fn _do_it(s: $ty, b: &mut Vec<u8>, f: impl Fn($ty, &mut Vec<u8>)) { f(s, b) } $(_do_it(self, buf, $expr))? }
})*
}
}
impl_db_event!(
DbEventPush<'_> as 0 => |me, buf| {
buf.extend(&(me.0.len() as u64).to_le_bytes());
buf.extend(me.0.as_bytes());
},
DbEventPop as 1,
DbEventClear as 2
);
impl<T: SimpleDBEvent> RawJournalAdapterEvent<SimpleDBJournal> for T {
fn md(&self) -> u64 {
T::OPC as _
}
fn write_buffered(self, buf: &mut Vec<u8>, _: ()) {
T::write_buffered(self, buf)
}
}
#[derive(Debug, PartialEq, Clone, Copy)]
pub enum EventMeta {
NewKey,
Pop,
Clear,
}
impl RawJournalAdapter for SimpleDBJournal {
const COMMIT_PREFERENCE: CommitPreference = CommitPreference::Buffered;
type Spec = SystemDatabaseV1;
type GlobalState = SimpleDB;
type EventMeta = EventMeta;
type CommitContext = ();
type Context<'a> = () where Self: 'a;
fn initialize(_: &JournalInitializer) -> Self {
Self
}
fn enter_context<'a>(_: &'a mut RawJournalWriter<Self>) -> Self::Context<'a> {
()
}
fn parse_event_meta(meta: u64) -> Option<Self::EventMeta> {
Some(match meta {
0 => EventMeta::NewKey,
1 => EventMeta::Pop,
2 => EventMeta::Clear,
_ => return None,
})
}
fn commit_buffered<'a, E: RawJournalAdapterEvent<Self>>(
&mut self,
buf: &mut Vec<u8>,
event: E,
ctx: (),
) {
event.write_buffered(buf, ctx)
}
fn decode_apply<'a>(
gs: &Self::GlobalState,
meta: Self::EventMeta,
file: &mut TrackedReader<Self::Spec>,
) -> RuntimeResult<()> {
match meta {
EventMeta::NewKey => {
let key_size = u64::from_le_bytes(file.read_block()?);
let mut keybuf = vec![0u8; key_size as usize];
file.tracked_read(&mut keybuf)?;
match String::from_utf8(keybuf) {
Ok(k) => gs.data.borrow_mut().push(k),
Err(_) => return Err(StorageError::RawJournalEventCorrupted.into()),
}
}
EventMeta::Clear => gs.data.borrow_mut().clear(),
EventMeta::Pop => {
let _ = gs.data.borrow_mut().pop().unwrap();
}
}
Ok(())
}
}
/*
journal tests
*/
#[test] #[test]
fn journal_open_close() { fn journal_open_close() {
const JOURNAL_NAME: &str = "journal_open_close"; const JOURNAL_NAME: &str = "journal_open_close";
@ -200,12 +42,12 @@ fn journal_open_close() {
// new boot // new boot
let mut j = create_journal::<SimpleDBJournal>(JOURNAL_NAME).unwrap(); let mut j = create_journal::<SimpleDBJournal>(JOURNAL_NAME).unwrap();
assert_eq!( assert_eq!(
super::obtain_trace(), debug_get_trace(),
intovec![JournalWriterTraceEvent::Initialized] intovec![JournalWriterTraceEvent::Initialized]
); );
RawJournalWriter::close_driver(&mut j).unwrap(); RawJournalWriter::close_driver(&mut j).unwrap();
assert_eq!( assert_eq!(
super::obtain_trace(), debug_get_trace(),
intovec![ intovec![
JournalWriterTraceEvent::DriverEventAttemptCommit { JournalWriterTraceEvent::DriverEventAttemptCommit {
event: DriverEventKind::Closed, event: DriverEventKind::Closed,
@ -219,12 +61,18 @@ fn journal_open_close() {
} }
{ {
// second boot // second boot
let mut j = open_journal::<SimpleDBJournal>(JOURNAL_NAME, &SimpleDB::new()).unwrap(); let mut j = open_journal::<SimpleDBJournal>(
JOURNAL_NAME,
&SimpleDB::new(),
JournalSettings::default(),
)
.unwrap();
assert_eq!( assert_eq!(
super::obtain_trace(), debug_get_trace(),
intovec![ intovec![
// init reader and read close event // init reader and read close event
JournalReaderTraceEvent::Initialized, JournalReaderTraceEvent::Initialized,
JournalReaderTraceEvent::LookingForEvent,
JournalReaderTraceEvent::AttemptingEvent(0), JournalReaderTraceEvent::AttemptingEvent(0),
JournalReaderTraceEvent::DriverEventExpectingClose, JournalReaderTraceEvent::DriverEventExpectingClose,
JournalReaderTraceEvent::DriverEventCompletedBlockRead, JournalReaderTraceEvent::DriverEventCompletedBlockRead,
@ -244,7 +92,7 @@ fn journal_open_close() {
); );
RawJournalWriter::close_driver(&mut j).unwrap(); RawJournalWriter::close_driver(&mut j).unwrap();
assert_eq!( assert_eq!(
super::obtain_trace(), debug_get_trace(),
intovec![ intovec![
JournalWriterTraceEvent::DriverEventAttemptCommit { JournalWriterTraceEvent::DriverEventAttemptCommit {
event: DriverEventKind::Closed, event: DriverEventKind::Closed,
@ -258,21 +106,28 @@ fn journal_open_close() {
} }
{ {
// third boot // third boot
let mut j = open_journal::<SimpleDBJournal>(JOURNAL_NAME, &SimpleDB::new()).unwrap(); let mut j = open_journal::<SimpleDBJournal>(
JOURNAL_NAME,
&SimpleDB::new(),
JournalSettings::default(),
)
.unwrap();
assert_eq!( assert_eq!(
super::obtain_trace(), debug_get_trace(),
intovec![ intovec![
// init reader and read reopen event // init reader and read reopen event
JournalReaderTraceEvent::Initialized, JournalReaderTraceEvent::Initialized,
JournalReaderTraceEvent::LookingForEvent,
JournalReaderTraceEvent::AttemptingEvent(0), JournalReaderTraceEvent::AttemptingEvent(0),
JournalReaderTraceEvent::DriverEventExpectingClose, JournalReaderTraceEvent::DriverEventExpectingClose,
JournalReaderTraceEvent::DriverEventCompletedBlockRead, JournalReaderTraceEvent::DriverEventCompletedBlockRead,
JournalReaderTraceEvent::DriverEventExpectedCloseGotClose, JournalReaderTraceEvent::DriverEventExpectedCloseGotClose,
JournalReaderTraceEvent::AttemptingEvent(1),
JournalReaderTraceEvent::DriverEventExpectingReopenBlock, JournalReaderTraceEvent::DriverEventExpectingReopenBlock,
JournalReaderTraceEvent::AttemptingEvent(1),
JournalReaderTraceEvent::DriverEventExpectingReopenGotReopen, JournalReaderTraceEvent::DriverEventExpectingReopenGotReopen,
JournalReaderTraceEvent::ReopenSuccess, JournalReaderTraceEvent::ReopenSuccess,
// now read close event // now read close event
JournalReaderTraceEvent::LookingForEvent,
JournalReaderTraceEvent::AttemptingEvent(2), JournalReaderTraceEvent::AttemptingEvent(2),
JournalReaderTraceEvent::DriverEventExpectingClose, JournalReaderTraceEvent::DriverEventExpectingClose,
JournalReaderTraceEvent::DriverEventCompletedBlockRead, JournalReaderTraceEvent::DriverEventCompletedBlockRead,
@ -292,7 +147,7 @@ fn journal_open_close() {
); );
RawJournalWriter::close_driver(&mut j).unwrap(); RawJournalWriter::close_driver(&mut j).unwrap();
assert_eq!( assert_eq!(
super::obtain_trace(), debug_get_trace(),
intovec![ intovec![
JournalWriterTraceEvent::DriverEventAttemptCommit { JournalWriterTraceEvent::DriverEventAttemptCommit {
event: DriverEventKind::Closed, event: DriverEventKind::Closed,
@ -316,7 +171,7 @@ fn journal_with_server_single_event() {
db.push(&mut j, "hello world").unwrap(); db.push(&mut j, "hello world").unwrap();
RawJournalWriter::close_driver(&mut j).unwrap(); RawJournalWriter::close_driver(&mut j).unwrap();
assert_eq!( assert_eq!(
super::obtain_trace(), debug_get_trace(),
intovec![ intovec![
JournalWriterTraceEvent::Initialized, JournalWriterTraceEvent::Initialized,
JournalWriterTraceEvent::CommitAttemptForEvent(0), JournalWriterTraceEvent::CommitAttemptForEvent(0),
@ -336,21 +191,23 @@ fn journal_with_server_single_event() {
{ {
let db = SimpleDB::new(); let db = SimpleDB::new();
// second boot // second boot
let mut j = open_journal::<SimpleDBJournal>(JOURNAL_NAME, &db) let mut j = open_journal::<SimpleDBJournal>(JOURNAL_NAME, &db, JournalSettings::default())
.set_dmsg_fn(|| format!("{:?}", super::obtain_trace())) .set_dmsg_fn(|| format!("{:?}", debug_get_trace()))
.unwrap(); .unwrap();
assert_eq!(db.data().len(), 1); assert_eq!(db.data().len(), 1);
assert_eq!(db.data()[0], "hello world"); assert_eq!(db.data()[0], "hello world");
assert_eq!( assert_eq!(
super::obtain_trace(), debug_get_trace(),
intovec![ intovec![
// init reader and read server event // init reader and read server event
JournalReaderTraceEvent::Initialized, JournalReaderTraceEvent::Initialized,
JournalReaderTraceEvent::LookingForEvent,
JournalReaderTraceEvent::AttemptingEvent(0), JournalReaderTraceEvent::AttemptingEvent(0),
JournalReaderTraceEvent::DetectedServerEvent, JournalReaderTraceEvent::DetectedServerEvent,
JournalReaderTraceEvent::ServerEventMetadataParsed, JournalReaderTraceEvent::ServerEventMetadataParsed,
JournalReaderTraceEvent::ServerEventAppliedSuccess, JournalReaderTraceEvent::ServerEventAppliedSuccess,
// now read close event // now read close event
JournalReaderTraceEvent::LookingForEvent,
JournalReaderTraceEvent::AttemptingEvent(1), JournalReaderTraceEvent::AttemptingEvent(1),
JournalReaderTraceEvent::DriverEventExpectingClose, JournalReaderTraceEvent::DriverEventExpectingClose,
JournalReaderTraceEvent::DriverEventCompletedBlockRead, JournalReaderTraceEvent::DriverEventCompletedBlockRead,
@ -370,7 +227,7 @@ fn journal_with_server_single_event() {
); );
RawJournalWriter::close_driver(&mut j).unwrap(); RawJournalWriter::close_driver(&mut j).unwrap();
assert_eq!( assert_eq!(
super::obtain_trace(), debug_get_trace(),
intovec![ intovec![
JournalWriterTraceEvent::DriverEventAttemptCommit { JournalWriterTraceEvent::DriverEventAttemptCommit {
event: DriverEventKind::Closed, event: DriverEventKind::Closed,
@ -385,29 +242,33 @@ fn journal_with_server_single_event() {
{ {
// third boot // third boot
let db = SimpleDB::new(); let db = SimpleDB::new();
let mut j = open_journal::<SimpleDBJournal>(JOURNAL_NAME, &db).unwrap(); let mut j =
open_journal::<SimpleDBJournal>(JOURNAL_NAME, &db, JournalSettings::default()).unwrap();
assert_eq!(db.data().len(), 1); assert_eq!(db.data().len(), 1);
assert_eq!(db.data()[0], "hello world"); assert_eq!(db.data()[0], "hello world");
assert_eq!( assert_eq!(
super::obtain_trace(), debug_get_trace(),
intovec![ intovec![
// init reader and read server event // init reader and read server event
JournalReaderTraceEvent::Initialized, JournalReaderTraceEvent::Initialized,
JournalReaderTraceEvent::LookingForEvent,
JournalReaderTraceEvent::AttemptingEvent(0), JournalReaderTraceEvent::AttemptingEvent(0),
JournalReaderTraceEvent::DetectedServerEvent, JournalReaderTraceEvent::DetectedServerEvent,
JournalReaderTraceEvent::ServerEventMetadataParsed, JournalReaderTraceEvent::ServerEventMetadataParsed,
JournalReaderTraceEvent::ServerEventAppliedSuccess, JournalReaderTraceEvent::ServerEventAppliedSuccess,
// now read close event // now read close event
JournalReaderTraceEvent::LookingForEvent,
JournalReaderTraceEvent::AttemptingEvent(1), JournalReaderTraceEvent::AttemptingEvent(1),
JournalReaderTraceEvent::DriverEventExpectingClose, JournalReaderTraceEvent::DriverEventExpectingClose,
JournalReaderTraceEvent::DriverEventCompletedBlockRead, JournalReaderTraceEvent::DriverEventCompletedBlockRead,
JournalReaderTraceEvent::DriverEventExpectedCloseGotClose, JournalReaderTraceEvent::DriverEventExpectedCloseGotClose,
// now read reopen event // now read reopen event
JournalReaderTraceEvent::AttemptingEvent(2),
JournalReaderTraceEvent::DriverEventExpectingReopenBlock, JournalReaderTraceEvent::DriverEventExpectingReopenBlock,
JournalReaderTraceEvent::AttemptingEvent(2),
JournalReaderTraceEvent::DriverEventExpectingReopenGotReopen, JournalReaderTraceEvent::DriverEventExpectingReopenGotReopen,
JournalReaderTraceEvent::ReopenSuccess, JournalReaderTraceEvent::ReopenSuccess,
// now read close event // now read close event
JournalReaderTraceEvent::LookingForEvent,
JournalReaderTraceEvent::AttemptingEvent(3), JournalReaderTraceEvent::AttemptingEvent(3),
JournalReaderTraceEvent::DriverEventExpectingClose, JournalReaderTraceEvent::DriverEventExpectingClose,
JournalReaderTraceEvent::DriverEventCompletedBlockRead, JournalReaderTraceEvent::DriverEventCompletedBlockRead,
@ -427,7 +288,7 @@ fn journal_with_server_single_event() {
); );
RawJournalWriter::close_driver(&mut j).unwrap(); RawJournalWriter::close_driver(&mut j).unwrap();
assert_eq!( assert_eq!(
super::obtain_trace(), debug_get_trace(),
intovec![ intovec![
JournalWriterTraceEvent::DriverEventAttemptCommit { JournalWriterTraceEvent::DriverEventAttemptCommit {
event: DriverEventKind::Closed, event: DriverEventKind::Closed,
@ -453,7 +314,8 @@ fn multi_boot() {
} }
{ {
let mut db = SimpleDB::new(); let mut db = SimpleDB::new();
let mut j = open_journal::<SimpleDBJournal>("multiboot", &db).unwrap(); let mut j =
open_journal::<SimpleDBJournal>("multiboot", &db, JournalSettings::default()).unwrap();
assert_eq!(db.data().as_ref(), vec!["key_a".to_string()]); assert_eq!(db.data().as_ref(), vec!["key_a".to_string()]);
db.clear(&mut j).unwrap(); db.clear(&mut j).unwrap();
db.push(&mut j, "myfinkey").unwrap(); db.push(&mut j, "myfinkey").unwrap();
@ -461,7 +323,8 @@ fn multi_boot() {
} }
{ {
let db = SimpleDB::new(); let db = SimpleDB::new();
let mut j = open_journal::<SimpleDBJournal>("multiboot", &db).unwrap(); let mut j =
open_journal::<SimpleDBJournal>("multiboot", &db, JournalSettings::default()).unwrap();
assert_eq!(db.data().as_ref(), vec!["myfinkey".to_string()]); assert_eq!(db.data().as_ref(), vec!["myfinkey".to_string()]);
RawJournalWriter::close_driver(&mut j).unwrap(); RawJournalWriter::close_driver(&mut j).unwrap();
} }

@ -0,0 +1,218 @@
/*
* Created on Tue Jan 30 2024
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2024, Sayan Nandan <nandansayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
mod journal_ops;
mod recovery;
use {
super::{
CommitPreference, DriverEvent, DriverEventKind, JournalInitializer, RawJournalAdapter,
RawJournalAdapterEvent, RawJournalWriter,
},
crate::engine::{
error::StorageError,
storage::{
common::{checksum::SCrc64, sdss::sdss_r1::rw::TrackedReader},
v2::raw::spec::SystemDatabaseV1,
},
RuntimeResult,
},
std::cell::RefCell,
};
const SANE_MEM_LIMIT_BYTES: usize = 2048;
/*
impls for journal tests
*/
#[derive(Debug, Clone, PartialEq)]
pub struct SimpleDB {
data: RefCell<Vec<String>>,
}
impl SimpleDB {
fn new() -> Self {
Self {
data: RefCell::default(),
}
}
fn data(&self) -> std::cell::Ref<'_, Vec<String>> {
self.data.borrow()
}
fn clear(&mut self, log: &mut RawJournalWriter<SimpleDBJournal>) -> RuntimeResult<()> {
log.commit_event(DbEventClear)?;
self.data.get_mut().clear();
Ok(())
}
fn pop(&mut self, log: &mut RawJournalWriter<SimpleDBJournal>) -> RuntimeResult<()> {
self.data.get_mut().pop().unwrap();
log.commit_event(DbEventPop)?;
Ok(())
}
fn push(
&mut self,
log: &mut RawJournalWriter<SimpleDBJournal>,
new: impl ToString,
) -> RuntimeResult<()> {
let new = new.to_string();
log.commit_event(DbEventPush(&new))?;
self.data.get_mut().push(new);
Ok(())
}
}
/*
event impls
*/
#[derive(Debug)]
pub struct SimpleDBJournal;
struct DbEventPush<'a>(&'a str);
struct DbEventPop;
struct DbEventClear;
trait SimpleDBEvent: Sized {
const OPC: u8;
fn write_buffered(self, _: &mut Vec<u8>);
}
macro_rules! impl_db_event {
($($ty:ty as $code:expr $(=> $expr:expr)?),*) => {
$(impl SimpleDBEvent for $ty {
const OPC: u8 = $code;
fn write_buffered(self, buf: &mut Vec<u8>) { let _ = buf; fn _do_it(s: $ty, b: &mut Vec<u8>, f: impl Fn($ty, &mut Vec<u8>)) { f(s, b) } $(_do_it(self, buf, $expr))? }
})*
}
}
impl_db_event!(
DbEventPush<'_> as 0 => |me, buf| {
let length_bytes = (me.0.len() as u64).to_le_bytes();
let me_bytes = me.0.as_bytes();
let mut checksum = SCrc64::new();
checksum.update(&length_bytes);
checksum.update(&me_bytes);
buf.extend(&(checksum.finish().to_le_bytes())); // checksum
buf.extend(&length_bytes); // length
buf.extend(me.0.as_bytes()); // payload
},
DbEventPop as 1,
DbEventClear as 2
);
impl<T: SimpleDBEvent> RawJournalAdapterEvent<SimpleDBJournal> for T {
fn md(&self) -> u64 {
T::OPC as _
}
fn write_buffered(self, buf: &mut Vec<u8>, _: ()) {
T::write_buffered(self, buf)
}
}
#[derive(Debug, PartialEq, Clone, Copy)]
pub enum EventMeta {
NewKey,
Pop,
Clear,
}
impl RawJournalAdapter for SimpleDBJournal {
const COMMIT_PREFERENCE: CommitPreference = CommitPreference::Buffered;
type Spec = SystemDatabaseV1;
type GlobalState = SimpleDB;
type EventMeta = EventMeta;
type CommitContext = ();
type Context<'a> = () where Self: 'a;
fn initialize(_: &JournalInitializer) -> Self {
Self
}
fn enter_context<'a>(_: &'a mut RawJournalWriter<Self>) -> Self::Context<'a> {
()
}
fn parse_event_meta(meta: u64) -> Option<Self::EventMeta> {
Some(match meta {
0 => EventMeta::NewKey,
1 => EventMeta::Pop,
2 => EventMeta::Clear,
_ => return None,
})
}
fn commit_buffered<'a, E: RawJournalAdapterEvent<Self>>(
&mut self,
buf: &mut Vec<u8>,
event: E,
ctx: (),
) {
event.write_buffered(buf, ctx)
}
fn decode_apply<'a>(
gs: &Self::GlobalState,
meta: Self::EventMeta,
file: &mut TrackedReader<Self::Spec>,
) -> RuntimeResult<()> {
match meta {
EventMeta::NewKey => {
let checksum = u64::from_le_bytes(file.read_block()?);
let length_u64 = u64::from_le_bytes(file.read_block()?);
let length = length_u64 as usize;
let mut payload = Vec::<u8>::new();
if length > SANE_MEM_LIMIT_BYTES
|| payload.try_reserve_exact(length as usize).is_err()
{
return Err(StorageError::RawJournalDecodeEventCorruptedPayload.into());
}
unsafe {
payload.as_mut_ptr().write_bytes(0, length);
payload.set_len(length);
}
file.tracked_read(&mut payload)?;
let mut this_checksum = SCrc64::new();
this_checksum.update(&length_u64.to_le_bytes());
this_checksum.update(&payload);
match String::from_utf8(payload) {
Ok(k) if this_checksum.finish() == checksum => gs.data.borrow_mut().push(k),
Err(_) | Ok(_) => {
return Err(StorageError::RawJournalDecodeEventCorruptedPayload.into())
}
}
}
EventMeta::Clear => gs.data.borrow_mut().clear(),
EventMeta::Pop => {
let _ = gs.data.borrow_mut().pop().unwrap();
}
}
Ok(())
}
}
/*
basic tests
*/
#[test]
fn encode_decode_meta() {
let dv1 = DriverEvent::new(u128::MAX - 1, DriverEventKind::Reopened, 0, 0, 0);
let encoded1 = dv1.encode_self();
let decoded1 = DriverEvent::decode(encoded1).unwrap();
assert_eq!(dv1, decoded1);
}

@ -30,8 +30,9 @@
use { use {
super::{ super::{
raw::RawJournalAdapterEvent, BatchAdapter, BatchAdapterSpec, BatchDriver, DispatchFn, raw::{JournalSettings, RawJournalAdapterEvent},
EventLogAdapter, EventLogDriver, EventLogSpec, BatchAdapter, BatchAdapterSpec, BatchDriver, DispatchFn, EventLogAdapter, EventLogDriver,
EventLogSpec,
}, },
crate::{ crate::{
engine::{ engine::{
@ -112,13 +113,13 @@ impl EventLogSpec for TestDBAdapter {
const DECODE_DISPATCH: Self::DecodeDispatch = [ const DECODE_DISPATCH: Self::DecodeDispatch = [
|db, payload| { |db, payload| {
if payload.len() < sizeof!(u64) { if payload.len() < sizeof!(u64) {
Err(StorageError::RawJournalCorrupted.into()) Err(StorageError::RawJournalDecodeEventCorruptedMetadata.into())
} else { } else {
let length = let length =
u64::from_le_bytes(unsafe { unsafe_apis::memcpy(&payload[..sizeof!(u64)]) }); u64::from_le_bytes(unsafe { unsafe_apis::memcpy(&payload[..sizeof!(u64)]) });
let payload = &payload[sizeof!(u64)..]; let payload = &payload[sizeof!(u64)..];
if payload.len() as u64 != length { if payload.len() as u64 != length {
Err(StorageError::RawJournalCorrupted.into()) Err(StorageError::RawJournalDecodeEventCorruptedPayload.into())
} else { } else {
let string = String::from_utf8(payload.to_owned()).unwrap(); let string = String::from_utf8(payload.to_owned()).unwrap();
db._mut().push(string); db._mut().push(string);
@ -172,7 +173,7 @@ fn open_log() -> (
super::raw::RawJournalWriter<EventLogAdapter<TestDBAdapter>>, super::raw::RawJournalWriter<EventLogAdapter<TestDBAdapter>>,
) { ) {
let db = TestDB::default(); let db = TestDB::default();
let log = open_journal("jrnl", &db).unwrap(); let log = open_journal("jrnl", &db, JournalSettings::default()).unwrap();
(db, log) (db, log)
} }
@ -381,7 +382,7 @@ fn batch_simple() {
} }
{ {
let db = BatchDB::new(); let db = BatchDB::new();
let mut batch_drv = BatchAdapter::open("mybatch", &db).unwrap(); let mut batch_drv = BatchAdapter::open("mybatch", &db, JournalSettings::default()).unwrap();
db.push(&mut batch_drv, "key3").unwrap(); db.push(&mut batch_drv, "key3").unwrap();
db.push(&mut batch_drv, "key4").unwrap(); db.push(&mut batch_drv, "key4").unwrap();
assert_eq!(db._ref().data, ["key1", "key2", "key3", "key4"]); assert_eq!(db._ref().data, ["key1", "key2", "key3", "key4"]);
@ -389,7 +390,7 @@ fn batch_simple() {
} }
{ {
let db = BatchDB::new(); let db = BatchDB::new();
let mut batch_drv = BatchAdapter::open("mybatch", &db).unwrap(); let mut batch_drv = BatchAdapter::open("mybatch", &db, JournalSettings::default()).unwrap();
db.push(&mut batch_drv, "key5").unwrap(); db.push(&mut batch_drv, "key5").unwrap();
db.push(&mut batch_drv, "key6").unwrap(); db.push(&mut batch_drv, "key6").unwrap();
assert_eq!( assert_eq!(
@ -400,7 +401,9 @@ fn batch_simple() {
} }
{ {
let db = BatchDB::new(); let db = BatchDB::new();
let mut batch_drv = BatchAdapter::<BatchDBAdapter>::open("mybatch", &db).unwrap(); let mut batch_drv =
BatchAdapter::<BatchDBAdapter>::open("mybatch", &db, JournalSettings::default())
.unwrap();
assert_eq!( assert_eq!(
db._ref().data, db._ref().data,
["key1", "key2", "key3", "key4", "key5", "key6"] ["key1", "key2", "key3", "key4", "key5", "key6"]

@ -72,27 +72,52 @@ fn main() {
ConfigReturn::HelpMessage(msg) => { ConfigReturn::HelpMessage(msg) => {
exit!(eprintln!("{msg}"), 0x00) exit!(eprintln!("{msg}"), 0x00)
} }
ConfigReturn::Repair => return self::repair(),
}, },
Err(e) => exit_fatal!(error!("{e}")), Err(e) => exit_fatal!(error!("{e}")),
}; };
self::entrypoint(config) self::entrypoint(config)
} }
fn init() -> engine::RuntimeResult<(util::os::FileLock, tokio::runtime::Runtime)> {
let f_rt_start = || {
engine::set_context_init("locking PID file");
let pid_file = util::os::FileLock::new(SKY_PID_FILE)?;
engine::set_context_init("initializing runtime");
let runtime = tokio::runtime::Builder::new_multi_thread()
.thread_name("server")
.enable_all()
.build()?;
Ok((pid_file, runtime))
};
f_rt_start()
}
fn exit(
global: Option<engine::Global>,
pid_file: Option<util::os::FileLock>,
result: engine::RuntimeResult<()>,
) {
if let Some(g) = global {
info!("cleaning up data");
engine::finish(g);
}
if let Some(_) = pid_file {
if let Err(e) = std::fs::remove_file(SKY_PID_FILE) {
error!("failed to remove PID file: {e}");
}
}
match result {
Ok(()) => println!("goodbye"),
Err(e) => exit_fatal!(error!("{e}")),
}
}
fn entrypoint(config: engine::config::Configuration) { fn entrypoint(config: engine::config::Configuration) {
println!("{TEXT}\nSkytable v{VERSION} | {URL}\n"); println!("{TEXT}\nSkytable v{VERSION} | {URL}\n");
let run = || { let run = || {
let f_rt_start = || { let (pid_file, runtime) = match init() {
engine::set_context_init("locking PID file"); Ok(pr) => pr,
let pid_file = util::os::FileLock::new(SKY_PID_FILE)?;
engine::set_context_init("initializing runtime");
let runtime = tokio::runtime::Builder::new_multi_thread()
.thread_name("server")
.enable_all()
.build()?;
Ok((pid_file, runtime))
};
let (pid_file, runtime) = match f_rt_start() {
Ok((pf, rt)) => (pf, rt),
Err(e) => return (None, None, Err(e)), Err(e) => return (None, None, Err(e)),
}; };
let f_glob_init = runtime.block_on(async move { let f_glob_init = runtime.block_on(async move {
@ -113,17 +138,22 @@ fn entrypoint(config: engine::config::Configuration) {
(Some(pid_file), Some(g), result_start) (Some(pid_file), Some(g), result_start)
}; };
let (pid_file, global, result) = run(); let (pid_file, global, result) = run();
if let Some(g) = global { self::exit(global, pid_file, result);
info!("cleaning up data"); }
engine::finish(g);
} fn repair() {
if let Some(_) = pid_file { let (pid_file, rt) = match init() {
if let Err(e) = std::fs::remove_file(SKY_PID_FILE) { Ok(init) => init,
error!("failed to remove PID file: {e}"); Err(e) => exit_fatal!(error!("failed to start repair task: {e}")),
} };
} let result = rt.block_on(async move {
match result { engine::set_context_init("binding system signals");
Ok(()) => println!("goodbye"), let signal = util::os::TerminationSignal::init()?;
Err(e) => exit_fatal!(error!("{e}")), let result = tokio::task::spawn_blocking(|| engine::repair())
} .await
.unwrap();
drop(signal);
result
});
self::exit(None, Some(pid_file), result)
} }

@ -425,10 +425,11 @@ macro_rules! impl_endian {
impl_endian!(u8, i8, u16, i16, u32, i32, u64, i64, usize, isize); impl_endian!(u8, i8, u16, i16, u32, i32, u64, i64, usize, isize);
pub fn time_now_string() -> String {
chrono::Local::now().format("%Y%m%d_%H%M%S").to_string()
}
pub fn time_now_with_postfix(post_fix: &str) -> String { pub fn time_now_with_postfix(post_fix: &str) -> String {
let now = chrono::Local::now();
// Format the current date and time as YYYYMMDD_HHMMSS
let formatted_date_time = now.format("%Y%m%d_%H%M%S").to_string();
// Concatenate the formatted date and time with the postfix // Concatenate the formatted date and time with the postfix
format!("{}-{}", formatted_date_time, post_fix) format!("{}-{}", time_now_string(), post_fix)
} }

@ -24,11 +24,10 @@
* *
*/ */
use quote::quote;
use { use {
crate::util::{self, AttributeKind}, crate::util::{self, AttributeKind},
proc_macro::TokenStream, proc_macro::TokenStream,
quote::quote,
std::collections::HashMap, std::collections::HashMap,
syn::{parse_macro_input, AttributeArgs, ItemFn}, syn::{parse_macro_input, AttributeArgs, ItemFn},
}; };

@ -24,8 +24,10 @@
* *
*/ */
use proc_macro2::Ident; use {
use syn::{Lit, Meta, MetaNameValue, NestedMeta, Path}; proc_macro2::Ident,
syn::{Lit, Meta, MetaNameValue, NestedMeta, Path},
};
pub enum AttributeKind { pub enum AttributeKind {
Lit(Lit), Lit(Lit),

Loading…
Cancel
Save