storage: Improve error traceability

next
Sayan Nandan 6 months ago
parent 093688e102
commit 37fc01d81f
No known key found for this signature in database
GPG Key ID: 0EBD769024B24F0A

84
Cargo.lock generated

@ -30,9 +30,9 @@ dependencies = [
[[package]] [[package]]
name = "aho-corasick" name = "aho-corasick"
version = "1.1.2" version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b2969dcb958b36655471fc61f7e416fa76033bdd4bfed0678d8fee1e2d07a1f0" checksum = "8e60d3430d3a69478ad0993f19238d2df97c507009a52b3c10addcd7f6bcb916"
dependencies = [ dependencies = [
"memchr", "memchr",
] ]
@ -102,13 +102,13 @@ dependencies = [
[[package]] [[package]]
name = "async-trait" name = "async-trait"
version = "0.1.77" version = "0.1.78"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c980ee35e870bd1a4d2c8294d4c04d0499e67bca1e4b5cefcc693c2fa00caea9" checksum = "461abc97219de0eaaf81fe3ef974a540158f3d079c2ab200f891f1a2ef201e85"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.52", "syn 2.0.53",
] ]
[[package]] [[package]]
@ -119,9 +119,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]] [[package]]
name = "backtrace" name = "backtrace"
version = "0.3.69" version = "0.3.71"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2089b7e3f35b9dd2d0ed921ead4f6d318c27680d4a5bd167b3ee120edb105837" checksum = "26b05800d2e817c8b3b4b54abd461726265fa9789ae34330622f2db9ee696f9d"
dependencies = [ dependencies = [
"addr2line", "addr2line",
"cc", "cc",
@ -165,9 +165,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]] [[package]]
name = "bitflags" name = "bitflags"
version = "2.4.2" version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed570934406eb16438a4e976b1b4500774099c13b8cb96eec99f620f05090ddf" checksum = "cf4b9d6a944f767f8e5e0db018570623c85f3d925ac718db4e06d0187adb21c1"
[[package]] [[package]]
name = "block-buffer" name = "block-buffer"
@ -202,9 +202,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
[[package]] [[package]]
name = "bytes" name = "bytes"
version = "1.5.0" version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223" checksum = "514de17de45fdb8dc022b1a7975556c53c86f9f0aa5f534b98977b171857c2c9"
[[package]] [[package]]
name = "bzip2" name = "bzip2"
@ -275,9 +275,9 @@ dependencies = [
[[package]] [[package]]
name = "clipboard-win" name = "clipboard-win"
version = "5.2.0" version = "5.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "12f9a0700e0127ba15d1d52dd742097f821cd9c65939303a44d970465040a297" checksum = "d517d4b86184dbb111d3556a10f1c8a04da7428d2987bf1081602bf11c3aa9ee"
dependencies = [ dependencies = [
"error-code", "error-code",
] ]
@ -373,7 +373,7 @@ version = "0.27.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f476fe445d41c9e991fd07515a6f463074b782242ccf4a5b7b1d1012e70824df" checksum = "f476fe445d41c9e991fd07515a6f463074b782242ccf4a5b7b1d1012e70824df"
dependencies = [ dependencies = [
"bitflags 2.4.2", "bitflags 2.5.0",
"crossterm_winapi", "crossterm_winapi",
"libc", "libc",
"mio", "mio",
@ -650,9 +650,9 @@ dependencies = [
[[package]] [[package]]
name = "indexmap" name = "indexmap"
version = "2.2.5" version = "2.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b0b929d511467233429c45a44ac1dcaa21ba0f5ba11e4879e6ed28ddb4f9df4" checksum = "168fb715dda47215e360912c096649d23d58bf392ac62f73919e831745e40f26"
dependencies = [ dependencies = [
"equivalent", "equivalent",
"hashbrown", "hashbrown",
@ -812,7 +812,7 @@ version = "0.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab2156c4fce2f8df6c499cc1c763e4394b7482525bf2a9701c9d79d215f519e4" checksum = "ab2156c4fce2f8df6c499cc1c763e4394b7482525bf2a9701c9d79d215f519e4"
dependencies = [ dependencies = [
"bitflags 2.4.2", "bitflags 2.5.0",
"cfg-if", "cfg-if",
"cfg_aliases", "cfg_aliases",
"libc", "libc",
@ -864,7 +864,7 @@ version = "0.10.64"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "95a0481286a310808298130d22dd1fef0fa571e05a8f44ec801801e84b216b1f" checksum = "95a0481286a310808298130d22dd1fef0fa571e05a8f44ec801801e84b216b1f"
dependencies = [ dependencies = [
"bitflags 2.4.2", "bitflags 2.5.0",
"cfg-if", "cfg-if",
"foreign-types", "foreign-types",
"libc", "libc",
@ -881,7 +881,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.52", "syn 2.0.53",
] ]
[[package]] [[package]]
@ -1085,9 +1085,9 @@ dependencies = [
[[package]] [[package]]
name = "regex" name = "regex"
version = "1.10.3" version = "1.10.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b62dbe01f0b06f9d8dc7d49e05a0785f153b00b2c227856282f671e0318c9b15" checksum = "c117dbdfde9c8308975b6a18d71f3f385c89461f7b3fb054288ecf2a2058ba4c"
dependencies = [ dependencies = [
"aho-corasick", "aho-corasick",
"memchr", "memchr",
@ -1120,11 +1120,11 @@ checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76"
[[package]] [[package]]
name = "rustix" name = "rustix"
version = "0.38.31" version = "0.38.32"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ea3e1a662af26cd7a3ba09c0297a31af215563ecf42817c98df621387f4e949" checksum = "65e04861e65f21776e67888bfbea442b3642beaa0138fdb1dd7a84a52dffdb89"
dependencies = [ dependencies = [
"bitflags 2.4.2", "bitflags 2.5.0",
"errno", "errno",
"libc", "libc",
"linux-raw-sys", "linux-raw-sys",
@ -1137,7 +1137,7 @@ version = "14.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7803e8936da37efd9b6d4478277f4b2b9bb5cdb37a113e8d63222e58da647e63" checksum = "7803e8936da37efd9b6d4478277f4b2b9bb5cdb37a113e8d63222e58da647e63"
dependencies = [ dependencies = [
"bitflags 2.4.2", "bitflags 2.5.0",
"cfg-if", "cfg-if",
"clipboard-win", "clipboard-win",
"fd-lock", "fd-lock",
@ -1223,14 +1223,14 @@ checksum = "7eb0b34b42edc17f6b7cac84a52a1c5f0e1bb2227e997ca9011ea3dd34e8610b"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.52", "syn 2.0.53",
] ]
[[package]] [[package]]
name = "serde_yaml" name = "serde_yaml"
version = "0.9.32" version = "0.9.33"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8fd075d994154d4a774f95b51fb96bdc2832b0ea48425c92546073816cda1f2f" checksum = "a0623d197252096520c6f2a5e1171ee436e5af99a5d7caa2891e55e61950e6d9"
dependencies = [ dependencies = [
"indexmap", "indexmap",
"itoa", "itoa",
@ -1389,9 +1389,9 @@ dependencies = [
[[package]] [[package]]
name = "smallvec" name = "smallvec"
version = "1.13.1" version = "1.13.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6ecd384b10a64542d77071bd64bd7b231f4ed5940fba55e98c3de13824cf3d7" checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67"
[[package]] [[package]]
name = "socket2" name = "socket2"
@ -1422,9 +1422,9 @@ dependencies = [
[[package]] [[package]]
name = "syn" name = "syn"
version = "2.0.52" version = "2.0.53"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b699d15b36d1f02c3e7c69f8ffef53de37aefae075d8488d4ba1a7788d574a07" checksum = "7383cd0e49fff4b6b90ca5670bfd3e9d6a733b3f90c686605aa7eec8c4996032"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -1489,7 +1489,7 @@ checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.52", "syn 2.0.53",
] ]
[[package]] [[package]]
@ -1540,9 +1540,9 @@ checksum = "e51733f11c9c4f72aa0c160008246859e340b00807569a0da0e7a1079b27ba85"
[[package]] [[package]]
name = "unsafe-libyaml" name = "unsafe-libyaml"
version = "0.2.10" version = "0.2.11"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab4c90930b95a82d00dc9e9ac071b4991924390d46cbd0dfe566148667605e4b" checksum = "673aac59facbab8a9007c7f6108d11f63b603f7cabff99fabf650fea5c32b861"
[[package]] [[package]]
name = "utf8parse" name = "utf8parse"
@ -1552,9 +1552,9 @@ checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a"
[[package]] [[package]]
name = "uuid" name = "uuid"
version = "1.7.0" version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f00cc9702ca12d3c81455259621e676d0f7251cec66a21e98fe2e9a37db93b2a" checksum = "a183cf7feeba97b4dd1c0d46788634f6221d87fa961b305bed08c851829efcc0"
dependencies = [ dependencies = [
"getrandom", "getrandom",
"rand", "rand",
@ -1563,13 +1563,13 @@ dependencies = [
[[package]] [[package]]
name = "uuid-macro-internal" name = "uuid-macro-internal"
version = "1.7.0" version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7abb14ae1a50dad63eaa768a458ef43d298cd1bd44951677bd10b732a9ba2a2d" checksum = "9881bea7cbe687e36c9ab3b778c36cd0487402e270304e8b1296d5085303c1a2"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.52", "syn 2.0.53",
] ]
[[package]] [[package]]
@ -1611,7 +1611,7 @@ dependencies = [
"once_cell", "once_cell",
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.52", "syn 2.0.53",
"wasm-bindgen-shared", "wasm-bindgen-shared",
] ]
@ -1633,7 +1633,7 @@ checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.52", "syn 2.0.53",
"wasm-bindgen-backend", "wasm-bindgen-backend",
"wasm-bindgen-shared", "wasm-bindgen-shared",
] ]

@ -16,7 +16,7 @@ libsky = { path = "../libsky" }
sky_macros = { path = "../sky-macros" } sky_macros = { path = "../sky-macros" }
rcrypt = "0.4.0" rcrypt = "0.4.0"
# external deps # external deps
bytes = "1.5.0" bytes = "1.6.0"
env_logger = "0.11.3" env_logger = "0.11.3"
log = "0.4.21" log = "0.4.21"
openssl = { version = "0.10.64", features = ["vendored"] } openssl = { version = "0.10.64", features = ["vendored"] }
@ -25,9 +25,9 @@ parking_lot = "0.12.1"
serde = { version = "1.0.197", features = ["derive"] } serde = { version = "1.0.197", features = ["derive"] }
tokio = { version = "1.36.0", features = ["full"] } tokio = { version = "1.36.0", features = ["full"] }
tokio-openssl = "0.6.4" tokio-openssl = "0.6.4"
uuid = { version = "1.7.0", features = ["v4", "fast-rng", "macro-diagnostics"] } uuid = { version = "1.8.0", features = ["v4", "fast-rng", "macro-diagnostics"] }
crc = "3.0.1" crc = "3.0.1"
serde_yaml = "0.9.32" serde_yaml = "0.9.33"
chrono = "0.4.35" chrono = "0.4.35"
[target.'cfg(all(not(target_env = "msvc"), not(miri)))'.dependencies] [target.'cfg(all(not(target_env = "msvc"), not(miri)))'.dependencies]

@ -71,9 +71,13 @@ impl PrimaryIndex {
} }
#[derive(Debug)] #[derive(Debug)]
pub struct IndexLatchHandleShared<'t>(parking_lot::RwLockReadGuard<'t, ()>); pub struct IndexLatchHandleShared<'t> {
_lck: parking_lot::RwLockReadGuard<'t, ()>,
}
#[derive(Debug)] #[derive(Debug)]
pub struct IndexLatchHandleExclusive<'t>(parking_lot::RwLockWriteGuard<'t, ()>); pub struct IndexLatchHandleExclusive<'t> {
_lck: parking_lot::RwLockWriteGuard<'t, ()>,
}
#[derive(Debug)] #[derive(Debug)]
struct IndexLatch { struct IndexLatch {
@ -87,9 +91,13 @@ impl IndexLatch {
} }
} }
fn gl_handle_shared(&self) -> IndexLatchHandleShared { fn gl_handle_shared(&self) -> IndexLatchHandleShared {
IndexLatchHandleShared(self.glck.read()) IndexLatchHandleShared {
_lck: self.glck.read(),
}
} }
fn gl_handle_exclusive(&self) -> IndexLatchHandleExclusive { fn gl_handle_exclusive(&self) -> IndexLatchHandleExclusive {
IndexLatchHandleExclusive(self.glck.write()) IndexLatchHandleExclusive {
_lck: self.glck.write(),
}
} }
} }

@ -185,19 +185,24 @@ enumerate_err! {
} }
enumerate_err! { enumerate_err! {
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq, Clone, Copy)]
/// SDSS based storage engine errors /// SDSS based storage engine errors
pub enum StorageError { pub enum StorageError {
// header /*
----
SDSS Errors
----
These errors are common across all versions
*/
/// version mismatch /// version mismatch
HeaderDecodeVersionMismatch = "header-version-mismatch", FileDecodeHeaderVersionMismatch = "header-version-mismatch",
/// The entire header is corrupted /// The entire header is corrupted
HeaderDecodeCorruptedHeader = "header-corrupted", FileDecodeHeaderCorrupted = "header-corrupted",
// journal /*
/// An entry in the journal is corrupted ----
JournalLogEntryCorrupted = "journal-entry-corrupted", Common encoding errors
/// The structure of the journal is corrupted ----
JournalCorrupted = "journal-corrupted", */
// internal file structures // internal file structures
/// While attempting to decode a structure in an internal segment of a file, the storage engine ran into a possibly irrecoverable error /// While attempting to decode a structure in an internal segment of a file, the storage engine ran into a possibly irrecoverable error
InternalDecodeStructureCorrupted = "structure-decode-corrupted", InternalDecodeStructureCorrupted = "structure-decode-corrupted",
@ -205,21 +210,43 @@ enumerate_err! {
InternalDecodeStructureCorruptedPayload = "structure-decode-corrupted-payload", InternalDecodeStructureCorruptedPayload = "structure-decode-corrupted-payload",
/// the data for an internal structure was decoded but is logically invalid /// the data for an internal structure was decoded but is logically invalid
InternalDecodeStructureIllegalData = "structure-decode-illegal-data", InternalDecodeStructureIllegalData = "structure-decode-illegal-data",
/*
----
V1 Journal Errors
----
*/
/// An entry in the journal is corrupted
V1JournalDecodeLogEntryCorrupted = "journal-entry-corrupted",
/// The structure of the journal is corrupted
V1JournalDecodeCorrupted = "journal-corrupted",
/// when attempting to restore a data batch from disk, the batch journal crashed and had a corruption, but it is irrecoverable /// when attempting to restore a data batch from disk, the batch journal crashed and had a corruption, but it is irrecoverable
DataBatchRestoreCorruptedBatch = "batch-corrupted-batch", V1DataBatchDecodeCorruptedBatch = "batch-corrupted-batch",
/// when attempting to restore a data batch from disk, the driver encountered a corrupted entry /// when attempting to restore a data batch from disk, the driver encountered a corrupted entry
DataBatchRestoreCorruptedEntry = "batch-corrupted-entry", V1DataBatchDecodeCorruptedEntry = "batch-corrupted-entry",
/// we failed to close the data batch
DataBatchCloseError = "batch-persist-close-failed",
/// the data batch file is corrupted /// the data batch file is corrupted
DataBatchRestoreCorruptedBatchFile = "batch-corrupted-file", V1DataBatchDecodeCorruptedBatchFile = "batch-corrupted-file",
/// the system database is corrupted /// the system database is corrupted
SysDBCorrupted = "sysdb-corrupted", V1SysDBDecodeCorrupted = "sysdb-corrupted",
// raw journal errors /// we failed to close the data batch
RawJournalEventCorruptedMetadata = "journal-event-metadata-corrupted", V1DataBatchRuntimeCloseError = "batch-persist-close-failed",
RawJournalEventCorrupted = "journal-invalid-event", /*
RawJournalCorrupted = "journal-corrupted", ----
RawJournalInvalidEvent = "journal-invalid-event-order", V2 Journal Errors
----
*/
/// Journal event metadata corrupted
RawJournalDecodeEventCorruptedMetadata = "journal-event-metadata-corrupted",
/// The event body is corrupted
RawJournalDecodeEventCorruptedPayload = "journal-event-payload-corrupted",
/// batch contents was unexpected (for example, we expected n events but got m events)
RawJournalDecodeBatchContentsMismatch = "journal-batch-unexpected-termination",
/// batch contents was validated and executed but the final integrity check failed
RawJournalDecodeBatchIntegrityFailure = "journal-batch-integrity-check-failed",
/// unexpected order of events
RawJournalDecodeInvalidEvent = "journal-invalid-event-order",
/// corrupted event within a batch
RawJournalDecodeCorruptionInBatchMetadata = "journal-batch-corrupted-event-metadata",
/// runtime error: the lightweight heartbeat failed
RawJournalRuntimeCriticalLwtHBFail = "journal-lwt-heartbeat-failed", RawJournalRuntimeCriticalLwtHBFail = "journal-lwt-heartbeat-failed",
} }
} }

@ -37,6 +37,7 @@ use {
std::{ std::{
fmt, fmt,
mem::MaybeUninit, mem::MaybeUninit,
ptr::addr_of_mut,
sync::atomic::{AtomicUsize, Ordering}, sync::atomic::{AtomicUsize, Ordering},
}, },
tokio::sync::mpsc::unbounded_channel, tokio::sync::mpsc::unbounded_channel,
@ -262,16 +263,16 @@ impl Global {
.get_rt_stat() .get_rt_stat()
.per_mdl_delta_max_size() .per_mdl_delta_max_size()
} }
unsafe fn __gref_raw() -> &'static mut MaybeUninit<GlobalState> { unsafe fn __gref_raw() -> *mut MaybeUninit<GlobalState> {
static mut G: MaybeUninit<GlobalState> = MaybeUninit::uninit(); static mut G: MaybeUninit<GlobalState> = MaybeUninit::uninit();
&mut G addr_of_mut!(G)
} }
unsafe fn __gref(&self) -> &'static GlobalState { unsafe fn __gref(&self) -> &'static GlobalState {
Self::__gref_raw().assume_init_ref() (&*Self::__gref_raw()).assume_init_ref()
} }
pub unsafe fn unload_all(self) { pub unsafe fn unload_all(self) {
// TODO(@ohsayan): handle errors // TODO(@ohsayan): handle errors
let GlobalState { gns, .. } = Self::__gref_raw().assume_init_read(); let GlobalState { gns, .. } = Self::__gref_raw().read().assume_init();
let mut gns_driver = gns.gns_driver().txn_driver.lock(); let mut gns_driver = gns.gns_driver().txn_driver.lock();
GNSDriver::close_driver(&mut gns_driver).unwrap(); GNSDriver::close_driver(&mut gns_driver).unwrap();
for mdl in gns for mdl in gns

@ -77,6 +77,7 @@ impl TestGlobal {
model_name.entity(), model_name.entity(),
model_data.get_uuid(), model_data.get_uuid(),
), ),
Default::default(),
)?; )?;
model.driver().initialize_model_driver(driver); model.driver().initialize_model_driver(driver);
} }
@ -97,7 +98,7 @@ impl TestGlobal {
Err(e) => match e.kind() { Err(e) => match e.kind() {
ErrorKind::IoError(e_) => match e_.kind() { ErrorKind::IoError(e_) => match e_.kind() {
std::io::ErrorKind::AlreadyExists => { std::io::ErrorKind::AlreadyExists => {
GNSDriver::open_gns_with_name(log_name, &data) GNSDriver::open_gns_with_name(log_name, &data, Default::default())
} }
_ => Err(e), _ => Err(e),
}, },

@ -47,7 +47,7 @@ use {
util::{compiler::TaggedEnum, os}, util::{compiler::TaggedEnum, os},
IoResult, IoResult,
}, },
std::{mem::ManuallyDrop, ops::Range}, std::ops::Range,
}; };
pub const TEST_TIME: u128 = (u64::MAX / sizeof!(u64) as u64) as _; pub const TEST_TIME: u128 = (u64::MAX / sizeof!(u64) as u64) as _;
@ -338,14 +338,10 @@ impl<H: HeaderV1Spec> HeaderV1<H> {
}) })
} else { } else {
let version_okay = okay_header_version & okay_server_version & okay_driver_version; let version_okay = okay_header_version & okay_server_version & okay_driver_version;
let md = ManuallyDrop::new([ Err([
StorageError::HeaderDecodeCorruptedHeader, StorageError::FileDecodeHeaderCorrupted,
StorageError::HeaderDecodeVersionMismatch, StorageError::FileDecodeHeaderVersionMismatch,
]); ][!version_okay as usize])
Err(unsafe {
// UNSAFE(@ohsayan): while not needed, md for drop safety + correct index
md.as_ptr().add(!version_okay as usize).read().into()
})
} }
} }
} }
@ -444,7 +440,7 @@ pub trait SimpleFileSpecV1 {
if v == Self::FILE_SPECFIER_VERSION { if v == Self::FILE_SPECFIER_VERSION {
Ok(()) Ok(())
} else { } else {
Err(StorageError::HeaderDecodeVersionMismatch.into()) Err(StorageError::FileDecodeHeaderVersionMismatch.into())
} }
} }
} }
@ -466,7 +462,7 @@ impl<Sfs: SimpleFileSpecV1> FileSpecV1 for Sfs {
if okay { if okay {
Ok(md) Ok(md)
} else { } else {
Err(StorageError::HeaderDecodeVersionMismatch.into()) Err(StorageError::FileDecodeHeaderVersionMismatch.into())
} }
} }
fn write_metadata(f: &mut impl FileWrite, _: Self::EncodeArgs) -> IoResult<Self::Metadata> { fn write_metadata(f: &mut impl FileWrite, _: Self::EncodeArgs) -> IoResult<Self::Metadata> {

@ -234,13 +234,13 @@ impl<S: FileSpecV1> TrackedReader<S> {
Err(e) => return Err(e), Err(e) => return Err(e),
} }
} else { } else {
Err(SysIOError::from(std::io::ErrorKind::InvalidInput).into_inner()) Err(SysIOError::from(std::io::ErrorKind::UnexpectedEof).into_inner())
} }
} }
/// Tracked read of a given block size. Shorthand for [`Self::tracked_read`] /// Tracked read of a given block size. Shorthand for [`Self::tracked_read`]
pub fn read_block<const N: usize>(&mut self) -> IoResult<[u8; N]> { pub fn read_block<const N: usize>(&mut self) -> IoResult<[u8; N]> {
if !self.has_left(N as _) { if !self.has_left(N as _) {
return Err(SysIOError::from(std::io::ErrorKind::InvalidInput).into_inner()); return Err(SysIOError::from(std::io::ErrorKind::UnexpectedEof).into_inner());
} }
let mut buf = [0; N]; let mut buf = [0; N];
self.tracked_read(&mut buf)?; self.tracked_read(&mut buf)?;

@ -74,7 +74,7 @@ where
if scanner.eof() { if scanner.eof() {
Ok(()) Ok(())
} else { } else {
Err(StorageError::JournalLogEntryCorrupted.into()) Err(StorageError::V1JournalDecodeLogEntryCorrupted.into())
} }
} }
fn decode_and_update_global_state( fn decode_and_update_global_state(

@ -43,6 +43,7 @@ use {
}, },
util::{compiler::TaggedEnum, EndianQW}, util::{compiler::TaggedEnum, EndianQW},
}, },
std::marker::PhantomData,
}; };
/* /*
@ -318,10 +319,10 @@ impl FieldMD {
} }
} }
pub struct FieldRef<'a>(&'a Field); pub struct FieldRef<'a>(PhantomData<&'a Field>);
impl<'a> From<&'a Field> for FieldRef<'a> { impl<'a> From<&'a Field> for FieldRef<'a> {
fn from(f: &'a Field) -> Self { fn from(_: &'a Field) -> Self {
Self(f) Self(PhantomData)
} }
} }
impl<'a> PersistObject for FieldRef<'a> { impl<'a> PersistObject for FieldRef<'a> {

@ -53,7 +53,7 @@ impl DataBatchPersistDriver {
if slf.fsynced_write(&[MARKER_BATCH_CLOSED]).is_ok() { if slf.fsynced_write(&[MARKER_BATCH_CLOSED]).is_ok() {
return Ok(()); return Ok(());
} else { } else {
return Err(StorageError::DataBatchCloseError.into()); return Err(StorageError::V1DataBatchRuntimeCloseError.into());
} }
} }
} }

@ -181,7 +181,7 @@ impl DataBatchRestoreDriver {
} }
} }
// nope, this is a corrupted file // nope, this is a corrupted file
Err(StorageError::DataBatchRestoreCorruptedBatchFile.into()) Err(StorageError::V1DataBatchDecodeCorruptedBatchFile.into())
} }
fn handle_reopen_is_actual_close(&mut self) -> RuntimeResult<bool> { fn handle_reopen_is_actual_close(&mut self) -> RuntimeResult<bool> {
if self.f.is_eof() { if self.f.is_eof() {
@ -194,7 +194,7 @@ impl DataBatchRestoreDriver {
Ok(false) Ok(false)
} else { } else {
// that's just a nice bug // that's just a nice bug
Err(StorageError::DataBatchRestoreCorruptedBatchFile.into()) Err(StorageError::V1DataBatchDecodeCorruptedBatchFile.into())
} }
} }
} }
@ -301,7 +301,7 @@ impl DataBatchRestoreDriver {
// we must read the batch termination signature // we must read the batch termination signature
let b = self.f.read_byte()?; let b = self.f.read_byte()?;
if b != MARKER_END_OF_BATCH { if b != MARKER_END_OF_BATCH {
return Err(StorageError::DataBatchRestoreCorruptedBatch.into()); return Err(StorageError::V1DataBatchDecodeCorruptedBatch.into());
} }
} }
// read actual commit // read actual commit
@ -314,7 +314,7 @@ impl DataBatchRestoreDriver {
if actual_checksum == u64::from_le_bytes(hardcoded_checksum) { if actual_checksum == u64::from_le_bytes(hardcoded_checksum) {
Ok(actual_commit) Ok(actual_commit)
} else { } else {
Err(StorageError::DataBatchRestoreCorruptedBatch.into()) Err(StorageError::V1DataBatchDecodeCorruptedBatch.into())
} }
} }
fn read_batch(&mut self) -> RuntimeResult<Batch> { fn read_batch(&mut self) -> RuntimeResult<Batch> {
@ -334,7 +334,7 @@ impl DataBatchRestoreDriver {
} }
_ => { _ => {
// this is the only singular byte that is expected to be intact. If this isn't intact either, I'm sorry // this is the only singular byte that is expected to be intact. If this isn't intact either, I'm sorry
return Err(StorageError::DataBatchRestoreCorruptedBatch.into()); return Err(StorageError::V1DataBatchDecodeCorruptedBatch.into());
} }
} }
// decode batch start block // decode batch start block
@ -378,7 +378,7 @@ impl DataBatchRestoreDriver {
this_col_cnt -= 1; this_col_cnt -= 1;
} }
if this_col_cnt != 0 { if this_col_cnt != 0 {
return Err(StorageError::DataBatchRestoreCorruptedEntry.into()); return Err(StorageError::V1DataBatchDecodeCorruptedEntry.into());
} }
if change_type == 1 { if change_type == 1 {
this_batch.push(DecodedBatchEvent::new( this_batch.push(DecodedBatchEvent::new(
@ -396,7 +396,7 @@ impl DataBatchRestoreDriver {
processed_in_this_batch += 1; processed_in_this_batch += 1;
} }
_ => { _ => {
return Err(StorageError::DataBatchRestoreCorruptedBatch.into()); return Err(StorageError::V1DataBatchDecodeCorruptedBatch.into());
} }
} }
} }
@ -413,7 +413,7 @@ impl DataBatchRestoreDriver {
if let [MARKER_RECOVERY_EVENT] = buf { if let [MARKER_RECOVERY_EVENT] = buf {
return Ok(()); return Ok(());
} }
Err(StorageError::DataBatchRestoreCorruptedBatch.into()) Err(StorageError::V1DataBatchDecodeCorruptedBatch.into())
} }
fn read_start_batch_block(&mut self) -> RuntimeResult<BatchStartBlock> { fn read_start_batch_block(&mut self) -> RuntimeResult<BatchStartBlock> {
let pk_tag = self.f.read_byte()?; let pk_tag = self.f.read_byte()?;
@ -463,7 +463,7 @@ impl BatchStartBlock {
impl DataBatchRestoreDriver { impl DataBatchRestoreDriver {
fn decode_primary_key(&mut self, pk_type: u8) -> RuntimeResult<PrimaryIndexKey> { fn decode_primary_key(&mut self, pk_type: u8) -> RuntimeResult<PrimaryIndexKey> {
let Some(pk_type) = TagUnique::try_from_raw(pk_type) else { let Some(pk_type) = TagUnique::try_from_raw(pk_type) else {
return Err(StorageError::DataBatchRestoreCorruptedEntry.into()); return Err(StorageError::V1DataBatchDecodeCorruptedEntry.into());
}; };
Ok(match pk_type { Ok(match pk_type {
TagUnique::SignedInt | TagUnique::UnsignedInt => { TagUnique::SignedInt | TagUnique::UnsignedInt => {
@ -479,7 +479,7 @@ impl DataBatchRestoreDriver {
self.f.tracked_read(&mut data)?; self.f.tracked_read(&mut data)?;
if pk_type == TagUnique::Str { if pk_type == TagUnique::Str {
if core::str::from_utf8(&data).is_err() { if core::str::from_utf8(&data).is_err() {
return Err(StorageError::DataBatchRestoreCorruptedEntry.into()); return Err(StorageError::V1DataBatchDecodeCorruptedEntry.into());
} }
} }
unsafe { unsafe {
@ -496,7 +496,7 @@ impl DataBatchRestoreDriver {
} }
fn decode_cell(&mut self) -> RuntimeResult<Datacell> { fn decode_cell(&mut self) -> RuntimeResult<Datacell> {
let Some(dscr) = StorageCellTypeID::try_from_raw(self.f.read_byte()?) else { let Some(dscr) = StorageCellTypeID::try_from_raw(self.f.read_byte()?) else {
return Err(StorageError::DataBatchRestoreCorruptedEntry.into()); return Err(StorageError::V1DataBatchDecodeCorruptedEntry.into());
}; };
unsafe { cell::decode_element::<Datacell, TrackedReader>(&mut self.f, dscr) } unsafe { cell::decode_element::<Datacell, TrackedReader>(&mut self.f, dscr) }
.map_err(|e| e.0) .map_err(|e| e.0)
@ -516,7 +516,7 @@ impl From<std::io::Error> for ErrorHack {
} }
impl From<()> for ErrorHack { impl From<()> for ErrorHack {
fn from(_: ()) -> Self { fn from(_: ()) -> Self {
Self(StorageError::DataBatchRestoreCorruptedEntry.into()) Self(StorageError::V1DataBatchDecodeCorruptedEntry.into())
} }
} }

@ -215,7 +215,7 @@ impl<TA: JournalAdapter> JournalReader<TA> {
} }
match entry_metadata match entry_metadata
.event_source_marker() .event_source_marker()
.ok_or(StorageError::JournalLogEntryCorrupted)? .ok_or(StorageError::V1JournalDecodeLogEntryCorrupted)?
{ {
EventSourceMarker::ServerStandard => {} EventSourceMarker::ServerStandard => {}
EventSourceMarker::DriverClosed => { EventSourceMarker::DriverClosed => {
@ -230,7 +230,7 @@ impl<TA: JournalAdapter> JournalReader<TA> {
EventSourceMarker::DriverReopened | EventSourceMarker::RecoveryReverseLastJournal => { EventSourceMarker::DriverReopened | EventSourceMarker::RecoveryReverseLastJournal => {
// these two are only taken in close and error paths (respectively) so we shouldn't see them here; this is bad // these two are only taken in close and error paths (respectively) so we shouldn't see them here; this is bad
// two special directives in the middle of nowhere? incredible // two special directives in the middle of nowhere? incredible
return Err(StorageError::JournalCorrupted.into()); return Err(StorageError::V1JournalDecodeCorrupted.into());
} }
} }
// read payload // read payload
@ -263,10 +263,10 @@ impl<TA: JournalAdapter> JournalReader<TA> {
Ok(()) Ok(())
} else { } else {
// FIXME(@ohsayan): tolerate loss in this directive too // FIXME(@ohsayan): tolerate loss in this directive too
Err(StorageError::JournalCorrupted.into()) Err(StorageError::V1JournalDecodeCorrupted.into())
} }
} else { } else {
Err(StorageError::JournalCorrupted.into()) Err(StorageError::V1JournalDecodeCorrupted.into())
} }
} }
#[cold] // FIXME(@ohsayan): how bad can prod systems be? (clue: pretty bad, so look for possible changes) #[cold] // FIXME(@ohsayan): how bad can prod systems be? (clue: pretty bad, so look for possible changes)
@ -279,7 +279,7 @@ impl<TA: JournalAdapter> JournalReader<TA> {
self.__record_read_bytes(JournalEntryMetadata::SIZE); // FIXME(@ohsayan): don't assume read length? self.__record_read_bytes(JournalEntryMetadata::SIZE); // FIXME(@ohsayan): don't assume read length?
let mut entry_buf = [0u8; JournalEntryMetadata::SIZE]; let mut entry_buf = [0u8; JournalEntryMetadata::SIZE];
if self.log_file.read_buffer(&mut entry_buf).is_err() { if self.log_file.read_buffer(&mut entry_buf).is_err() {
return Err(StorageError::JournalCorrupted.into()); return Err(StorageError::V1JournalDecodeCorrupted.into());
} }
let entry = JournalEntryMetadata::decode(entry_buf); let entry = JournalEntryMetadata::decode(entry_buf);
let okay = (entry.event_id == self.evid as u128) let okay = (entry.event_id == self.evid as u128)
@ -290,7 +290,7 @@ impl<TA: JournalAdapter> JournalReader<TA> {
if okay { if okay {
return Ok(()); return Ok(());
} else { } else {
Err(StorageError::JournalCorrupted.into()) Err(StorageError::V1JournalDecodeCorrupted.into())
} }
} }
/// Read and apply all events in the given log file to the global state, returning the (open file, last event ID) /// Read and apply all events in the given log file to the global state, returning the (open file, last event ID)
@ -305,7 +305,7 @@ impl<TA: JournalAdapter> JournalReader<TA> {
if slf.closed { if slf.closed {
Ok((slf.log_file.downgrade_reader(), slf.evid)) Ok((slf.log_file.downgrade_reader(), slf.evid))
} else { } else {
Err(StorageError::JournalCorrupted.into()) Err(StorageError::V1JournalDecodeCorrupted.into())
} }
} }
} }

@ -42,7 +42,7 @@ fn rkey<T>(
) -> RuntimeResult<T> { ) -> RuntimeResult<T> {
match d.remove(key).map(transform) { match d.remove(key).map(transform) {
Some(Some(k)) => Ok(k), Some(Some(k)) => Ok(k),
_ => Err(StorageError::SysDBCorrupted.into()), _ => Err(StorageError::V1SysDBDecodeCorrupted.into()),
} }
} }
@ -95,14 +95,14 @@ impl RestoredSystemDatabase {
let mut userdata = userdata let mut userdata = userdata
.into_data() .into_data()
.and_then(Datacell::into_list) .and_then(Datacell::into_list)
.ok_or(StorageError::SysDBCorrupted)?; .ok_or(StorageError::V1SysDBDecodeCorrupted)?;
if userdata.len() != 1 { if userdata.len() != 1 {
return Err(StorageError::SysDBCorrupted.into()); return Err(StorageError::V1SysDBDecodeCorrupted.into());
} }
let user_password = userdata let user_password = userdata
.remove(0) .remove(0)
.into_bin() .into_bin()
.ok_or(StorageError::SysDBCorrupted)?; .ok_or(StorageError::V1SysDBDecodeCorrupted)?;
loaded_users.insert(username, user_password.into_boxed_slice()); loaded_users.insert(username, user_password.into_boxed_slice());
} }
// load sys data // load sys data
@ -117,7 +117,7 @@ impl RestoredSystemDatabase {
& sys_store.is_empty() & sys_store.is_empty()
& loaded_users.contains_key(SystemDatabase::ROOT_ACCOUNT)) & loaded_users.contains_key(SystemDatabase::ROOT_ACCOUNT))
{ {
return Err(StorageError::SysDBCorrupted.into()); return Err(StorageError::V1SysDBDecodeCorrupted.into());
} }
Ok(Self::new(loaded_users, sc, sv)) Ok(Self::new(loaded_users, sc, sv))
} }

@ -34,7 +34,7 @@ use {
core::GNSData, core::GNSData,
storage::{ storage::{
common_encoding::r1::impls::gns::GNSEvent, common_encoding::r1::impls::gns::GNSEvent,
v2::raw::journal::{self, EventLogDriver, JournalAdapterEvent}, v2::raw::journal::{self, EventLogDriver, JournalAdapterEvent, JournalSettings},
}, },
txn::gns::{ txn::gns::{
model::{ model::{
@ -61,11 +61,15 @@ pub struct GNSEventLog;
impl GNSDriver { impl GNSDriver {
const FILE_PATH: &'static str = "gns.db-tlog"; const FILE_PATH: &'static str = "gns.db-tlog";
pub fn open_gns_with_name(name: &str, gs: &GNSData) -> RuntimeResult<Self> { pub fn open_gns_with_name(
journal::open_journal(name, gs) name: &str,
gs: &GNSData,
settings: JournalSettings,
) -> RuntimeResult<Self> {
journal::open_journal(name, gs, settings)
} }
pub fn open_gns(gs: &GNSData) -> RuntimeResult<Self> { pub fn open_gns(gs: &GNSData, settings: JournalSettings) -> RuntimeResult<Self> {
Self::open_gns_with_name(Self::FILE_PATH, gs) Self::open_gns_with_name(Self::FILE_PATH, gs, settings)
} }
pub fn create_gns_with_name(name: &str) -> RuntimeResult<Self> { pub fn create_gns_with_name(name: &str) -> RuntimeResult<Self> {
journal::create_journal(name) journal::create_journal(name)

@ -46,7 +46,7 @@ use {
v2::raw::{ v2::raw::{
journal::{ journal::{
self, BatchAdapter, BatchAdapterSpec, BatchDriver, JournalAdapterEvent, self, BatchAdapter, BatchAdapterSpec, BatchDriver, JournalAdapterEvent,
RawJournalAdapter, JournalSettings, RawJournalAdapter,
}, },
spec::ModelDataBatchAofV1, spec::ModelDataBatchAofV1,
}, },
@ -66,8 +66,12 @@ use {
pub type ModelDriver = BatchDriver<ModelDataAdapter>; pub type ModelDriver = BatchDriver<ModelDataAdapter>;
impl ModelDriver { impl ModelDriver {
pub fn open_model_driver(mdl: &ModelData, model_data_file_path: &str) -> RuntimeResult<Self> { pub fn open_model_driver(
journal::open_journal(model_data_file_path, mdl) mdl: &ModelData,
model_data_file_path: &str,
settings: JournalSettings,
) -> RuntimeResult<Self> {
journal::open_journal(model_data_file_path, mdl, settings)
} }
/// Create a new event log /// Create a new event log
pub fn create_model_driver(model_data_file_path: &str) -> RuntimeResult<Self> { pub fn create_model_driver(model_data_file_path: &str) -> RuntimeResult<Self> {
@ -449,7 +453,7 @@ impl BatchAdapterSpec for ModelDataAdapter {
BatchType::Standard => {} BatchType::Standard => {}
} }
let pk_tag = TagUnique::try_from_raw(f.read_block().map(|[b]| b)?) let pk_tag = TagUnique::try_from_raw(f.read_block().map(|[b]| b)?)
.ok_or(StorageError::RawJournalCorrupted)?; .ok_or(StorageError::InternalDecodeStructureIllegalData)?;
let schema_version = u64::from_le_bytes(f.read_block()?); let schema_version = u64::from_le_bytes(f.read_block()?);
let column_count = u64::from_le_bytes(f.read_block()?); let column_count = u64::from_le_bytes(f.read_block()?);
Ok(BatchMetadata { Ok(BatchMetadata {
@ -656,7 +660,7 @@ mod restore_impls {
f.read(&mut data)?; f.read(&mut data)?;
if pk_type == TagUnique::Str { if pk_type == TagUnique::Str {
if core::str::from_utf8(&data).is_err() { if core::str::from_utf8(&data).is_err() {
return Err(StorageError::DataBatchRestoreCorruptedEntry.into()); return Err(StorageError::InternalDecodeStructureCorruptedPayload.into());
} }
} }
unsafe { unsafe {
@ -679,7 +683,7 @@ mod restore_impls {
let mut this_col_cnt = batch_info.column_count; let mut this_col_cnt = batch_info.column_count;
while this_col_cnt != 0 { while this_col_cnt != 0 {
let Some(dscr) = StorageCellTypeID::try_from_raw(f.read_block().map(|[b]| b)?) else { let Some(dscr) = StorageCellTypeID::try_from_raw(f.read_block().map(|[b]| b)?) else {
return Err(StorageError::DataBatchRestoreCorruptedEntry.into()); return Err(StorageError::InternalDecodeStructureIllegalData.into());
}; };
let cell = unsafe { cell::decode_element::<Datacell, _>(f, dscr) }.map_err(|e| e.0)?; let cell = unsafe { cell::decode_element::<Datacell, _>(f, dscr) }.map_err(|e| e.0)?;
row.push(cell); row.push(cell);
@ -705,7 +709,7 @@ mod restore_impls {
} }
impl From<()> for ErrorHack { impl From<()> for ErrorHack {
fn from(_: ()) -> Self { fn from(_: ()) -> Self {
Self(StorageError::DataBatchRestoreCorruptedEntry.into()) Self(StorageError::InternalDecodeStructureCorrupted.into())
} }
} }
impl<'a> DataSource for TrackedReaderContext<'a, ModelDataBatchAofV1> { impl<'a> DataSource for TrackedReaderContext<'a, ModelDataBatchAofV1> {

@ -25,7 +25,10 @@
*/ */
use { use {
self::impls::mdl_journal::{BatchStats, FullModel}, self::{
impls::mdl_journal::{BatchStats, FullModel},
raw::journal::JournalSettings,
},
super::{common::interface::fs::FileSystem, v1, SELoaded}, super::{common::interface::fs::FileSystem, v1, SELoaded},
crate::engine::{ crate::engine::{
config::Configuration, config::Configuration,
@ -120,15 +123,18 @@ pub fn initialize_new(config: &Configuration) -> RuntimeResult<SELoaded> {
pub fn restore(cfg: &Configuration) -> RuntimeResult<SELoaded> { pub fn restore(cfg: &Configuration) -> RuntimeResult<SELoaded> {
let gns = GNSData::empty(); let gns = GNSData::empty();
context::set_dmsg("loading gns"); context::set_dmsg("loading gns");
let mut gns_driver = impls::gns_log::GNSDriver::open_gns(&gns)?; let mut gns_driver = impls::gns_log::GNSDriver::open_gns(&gns, JournalSettings::default())?;
for (id, model) in gns.idx_models().write().iter_mut() { for (id, model) in gns.idx_models().write().iter_mut() {
let model_data = model.data(); let model_data = model.data();
let space_uuid = gns.idx().read().get(id.space()).unwrap().get_uuid(); let space_uuid = gns.idx().read().get(id.space()).unwrap().get_uuid();
let model_data_file_path = let model_data_file_path =
paths_v1::model_path(id.space(), space_uuid, id.entity(), model_data.get_uuid()); paths_v1::model_path(id.space(), space_uuid, id.entity(), model_data.get_uuid());
context::set_dmsg(format!("loading model driver in {model_data_file_path}")); context::set_dmsg(format!("loading model driver in {model_data_file_path}"));
let model_driver = let model_driver = impls::mdl_journal::ModelDriver::open_model_driver(
impls::mdl_journal::ModelDriver::open_model_driver(model_data, &model_data_file_path)?; model_data,
&model_data_file_path,
JournalSettings::default(),
)?;
model.driver().initialize_model_driver(model_driver); model.driver().initialize_model_driver(model_driver);
unsafe { unsafe {
// UNSAFE(@ohsayan): all pieces of data are upgraded by now, so vacuum // UNSAFE(@ohsayan): all pieces of data are upgraded by now, so vacuum

@ -47,7 +47,8 @@ mod raw;
#[cfg(test)] #[cfg(test)]
mod tests; mod tests;
pub use raw::{ pub use raw::{
create_journal, open_journal, RawJournalAdapter, RawJournalAdapterEvent as JournalAdapterEvent, create_journal, open_journal, JournalSettings, RawJournalAdapter,
RawJournalAdapterEvent as JournalAdapterEvent,
}; };
/* /*
@ -136,7 +137,7 @@ impl<EL: EventLogSpec> RawJournalAdapter for EventLogAdapter<EL> {
this_checksum.update(&plen.to_le_bytes()); this_checksum.update(&plen.to_le_bytes());
this_checksum.update(&pl); this_checksum.update(&pl);
if this_checksum.finish() != expected_checksum { if this_checksum.finish() != expected_checksum {
return Err(StorageError::RawJournalCorrupted.into()); return Err(StorageError::RawJournalDecodeCorruptionInBatchMetadata.into());
} }
<EL as EventLogSpec>::DECODE_DISPATCH <EL as EventLogSpec>::DECODE_DISPATCH
[<<EL as EventLogSpec>::EventMeta as TaggedEnum>::dscr_u64(&meta) as usize]( [<<EL as EventLogSpec>::EventMeta as TaggedEnum>::dscr_u64(&meta) as usize](
@ -165,11 +166,15 @@ pub struct BatchAdapter<BA: BatchAdapterSpec>(PhantomData<BA>);
#[cfg(test)] #[cfg(test)]
impl<BA: BatchAdapterSpec> BatchAdapter<BA> { impl<BA: BatchAdapterSpec> BatchAdapter<BA> {
/// Open a new batch journal /// Open a new batch journal
pub fn open(name: &str, gs: &BA::GlobalState) -> RuntimeResult<BatchDriver<BA>> pub fn open(
name: &str,
gs: &BA::GlobalState,
settings: JournalSettings,
) -> RuntimeResult<BatchDriver<BA>>
where where
BA::Spec: FileSpecV1<DecodeArgs = ()>, BA::Spec: FileSpecV1<DecodeArgs = ()>,
{ {
raw::open_journal::<BatchAdapter<BA>>(name, gs) raw::open_journal::<BatchAdapter<BA>>(name, gs, settings)
} }
/// Create a new batch journal /// Create a new batch journal
pub fn create(name: &str) -> RuntimeResult<BatchDriver<BA>> pub fn create(name: &str) -> RuntimeResult<BatchDriver<BA>>
@ -278,7 +283,7 @@ impl<BA: BatchAdapterSpec> RawJournalAdapter for BatchAdapter<BA> {
let event_type = <<BA as BatchAdapterSpec>::EventType as TaggedEnum>::try_from_raw( let event_type = <<BA as BatchAdapterSpec>::EventType as TaggedEnum>::try_from_raw(
f.read_block().map(|[b]| b)?, f.read_block().map(|[b]| b)?,
) )
.ok_or(StorageError::RawJournalCorrupted)?; .ok_or(StorageError::InternalDecodeStructureIllegalData)?;
// is this an early exit marker? if so, exit // is this an early exit marker? if so, exit
if <BA as BatchAdapterSpec>::is_early_exit(&event_type) { if <BA as BatchAdapterSpec>::is_early_exit(&event_type) {
break; break;
@ -299,7 +304,7 @@ impl<BA: BatchAdapterSpec> RawJournalAdapter for BatchAdapter<BA> {
// finish applying batch // finish applying batch
BA::finish(batch_state, batch_md, gs)?; BA::finish(batch_state, batch_md, gs)?;
} else { } else {
return Err(StorageError::RawJournalCorrupted.into()); return Err(StorageError::RawJournalDecodeBatchContentsMismatch.into());
} }
} }
// and finally, verify checksum // and finally, verify checksum
@ -308,7 +313,7 @@ impl<BA: BatchAdapterSpec> RawJournalAdapter for BatchAdapter<BA> {
if real_checksum == stored_checksum { if real_checksum == stored_checksum {
Ok(()) Ok(())
} else { } else {
Err(StorageError::RawJournalCorrupted.into()) Err(StorageError::RawJournalDecodeBatchIntegrityFailure.into())
} }
} }
} }

@ -30,7 +30,7 @@ mod tests;
use { use {
crate::{ crate::{
engine::{ engine::{
error::StorageError, error::{ErrorKind, StorageError},
mem::unsafe_apis::memcpy, mem::unsafe_apis::memcpy,
storage::common::{ storage::common::{
checksum::SCrc64, checksum::SCrc64,
@ -44,7 +44,7 @@ use {
util::compiler::TaggedEnum, util::compiler::TaggedEnum,
}, },
core::fmt, core::fmt,
std::ops::Range, std::{io::ErrorKind as IoErrorKind, ops::Range},
}; };
/* /*
@ -67,12 +67,13 @@ where
pub fn open_journal<J: RawJournalAdapter>( pub fn open_journal<J: RawJournalAdapter>(
log_path: &str, log_path: &str,
gs: &J::GlobalState, gs: &J::GlobalState,
settings: JournalSettings,
) -> RuntimeResult<RawJournalWriter<J>> ) -> RuntimeResult<RawJournalWriter<J>>
where where
J::Spec: FileSpecV1<DecodeArgs = ()>, J::Spec: FileSpecV1<DecodeArgs = ()>,
{ {
let log = SdssFile::<J::Spec>::open(log_path)?; let log = SdssFile::<J::Spec>::open(log_path)?;
let (initializer, file) = RawJournalReader::<J>::scroll(log, gs)?; let (initializer, file) = RawJournalReader::<J>::scroll(log, gs, settings)?;
RawJournalWriter::new(initializer, file) RawJournalWriter::new(initializer, file)
} }
@ -640,6 +641,22 @@ pub struct RawJournalReader<J: RawJournalAdapter> {
last_txn_offset: u64, last_txn_offset: u64,
last_txn_checksum: u64, last_txn_checksum: u64,
stats: JournalStats, stats: JournalStats,
_settings: JournalSettings,
}
#[derive(Debug)]
pub struct JournalSettings {}
impl Default for JournalSettings {
fn default() -> Self {
Self::new()
}
}
impl JournalSettings {
pub fn new() -> Self {
Self {}
}
} }
#[derive(Debug)] #[derive(Debug)]
@ -658,28 +675,33 @@ impl JournalStats {
} }
impl<J: RawJournalAdapter> RawJournalReader<J> { impl<J: RawJournalAdapter> RawJournalReader<J> {
pub fn scroll( fn scroll(
file: SdssFile<<J as RawJournalAdapter>::Spec>, file: SdssFile<<J as RawJournalAdapter>::Spec>,
gs: &J::GlobalState, gs: &J::GlobalState,
settings: JournalSettings,
) -> RuntimeResult<(JournalInitializer, SdssFile<J::Spec>)> { ) -> RuntimeResult<(JournalInitializer, SdssFile<J::Spec>)> {
let reader = TrackedReader::with_cursor( let reader = TrackedReader::with_cursor(
file, file,
<<J as RawJournalAdapter>::Spec as FileSpecV1>::SIZE as u64, <<J as RawJournalAdapter>::Spec as FileSpecV1>::SIZE as u64,
)?; )?;
jtrace_reader!(Initialized); jtrace_reader!(Initialized);
let mut me = Self::new(reader, 0, 0, 0, 0); let mut me = Self::new(reader, 0, 0, 0, 0, settings);
loop { loop {
if me._apply_next_event_and_stop(gs)? { match me._apply_next_event_and_stop(gs) {
jtrace_reader!(Completed); Ok(true) => {
let initializer = JournalInitializer::new( jtrace_reader!(Completed);
me.tr.cursor(), let initializer = JournalInitializer::new(
me.tr.checksum(), me.tr.cursor(),
me.txn_id, me.tr.checksum(),
// NB: the last txn offset is important because it indicates that the log is new me.txn_id,
me.last_txn_offset, // NB: the last txn offset is important because it indicates that the log is new
); me.last_txn_offset,
let file = me.tr.into_inner(); );
return Ok((initializer, file)); let file = me.tr.into_inner();
return Ok((initializer, file));
}
Ok(false) => {}
Err(e) => return Err(e),
} }
} }
} }
@ -689,6 +711,7 @@ impl<J: RawJournalAdapter> RawJournalReader<J> {
last_txn_id: u64, last_txn_id: u64,
last_txn_offset: u64, last_txn_offset: u64,
last_txn_checksum: u64, last_txn_checksum: u64,
settings: JournalSettings,
) -> Self { ) -> Self {
Self { Self {
tr: reader, tr: reader,
@ -697,6 +720,7 @@ impl<J: RawJournalAdapter> RawJournalReader<J> {
last_txn_offset, last_txn_offset,
last_txn_checksum, last_txn_checksum,
stats: JournalStats::new(), stats: JournalStats::new(),
_settings: settings,
} }
} }
fn __refresh_known_txn(me: &mut Self) { fn __refresh_known_txn(me: &mut Self) {
@ -707,6 +731,47 @@ impl<J: RawJournalAdapter> RawJournalReader<J> {
} }
} }
#[derive(Debug, PartialEq)]
pub enum JournalRepairMode {
Simple,
}
impl<J: RawJournalAdapter> RawJournalReader<J> {
pub fn repair(
file: SdssFile<<J as RawJournalAdapter>::Spec>,
gs: &J::GlobalState,
settings: JournalSettings,
repair_mode: JournalRepairMode,
) -> RuntimeResult<()> {
match repair_mode {
JournalRepairMode::Simple => {}
}
match Self::scroll(file, gs, settings) {
Ok(_) => {
// no error detected
return Ok(());
}
Err(e) => {
// now it's our task to determine exactly what happened
match e.kind() {
ErrorKind::IoError(io) => match io.kind() {
IoErrorKind::UnexpectedEof => {
// this is the only kind of error that we can actually repair since it indicates that a part of the
// file is "missing"
}
_ => return Err(e),
},
ErrorKind::Storage(_) => todo!(),
ErrorKind::Txn(_) => todo!(),
ErrorKind::Other(_) => todo!(),
ErrorKind::Config(_) => unreachable!(),
}
}
}
todo!()
}
}
impl<J: RawJournalAdapter> RawJournalReader<J> { impl<J: RawJournalAdapter> RawJournalReader<J> {
fn _apply_next_event_and_stop(&mut self, gs: &J::GlobalState) -> RuntimeResult<bool> { fn _apply_next_event_and_stop(&mut self, gs: &J::GlobalState) -> RuntimeResult<bool> {
let txn_id = u128::from_le_bytes(self.tr.read_block()?); let txn_id = u128::from_le_bytes(self.tr.read_block()?);
@ -716,7 +781,7 @@ impl<J: RawJournalAdapter> RawJournalReader<J> {
expected: self.txn_id, expected: self.txn_id,
current: txn_id as u64 current: txn_id as u64
}); });
return Err(StorageError::RawJournalEventCorruptedMetadata.into()); return Err(StorageError::RawJournalDecodeEventCorruptedMetadata.into());
} }
jtrace_reader!(AttemptingEvent(txn_id as u64)); jtrace_reader!(AttemptingEvent(txn_id as u64));
// check for a server event // check for a server event
@ -740,7 +805,7 @@ impl<J: RawJournalAdapter> RawJournalReader<J> {
Err(e) => return Err(e), Err(e) => return Err(e),
} }
} }
None => return Err(StorageError::RawJournalEventCorruptedMetadata.into()), None => return Err(StorageError::RawJournalDecodeEventCorruptedMetadata.into()),
} }
} }
return self.handle_close(txn_id, meta); return self.handle_close(txn_id, meta);
@ -772,9 +837,9 @@ impl<J: RawJournalAdapter> RawJournalReader<J> {
.. ..
}) => { }) => {
jtrace_reader!(ErrExpectedCloseGotReopen); jtrace_reader!(ErrExpectedCloseGotReopen);
return Err(StorageError::RawJournalInvalidEvent.into()); return Err(StorageError::RawJournalDecodeInvalidEvent.into());
} }
None => return Err(StorageError::RawJournalEventCorrupted.into()), None => return Err(StorageError::RawJournalDecodeEventCorruptedPayload.into()),
}; };
jtrace_reader!(DriverEventExpectedCloseGotClose); jtrace_reader!(DriverEventExpectedCloseGotClose);
// a driver closed event; we've checked integrity, but we must check the field values // a driver closed event; we've checked integrity, but we must check the field values
@ -787,7 +852,7 @@ impl<J: RawJournalAdapter> RawJournalReader<J> {
jtrace_reader!(DriverEventInvalidMetadata); jtrace_reader!(DriverEventInvalidMetadata);
// either the block is corrupted or the data we read is corrupted; either way, // either the block is corrupted or the data we read is corrupted; either way,
// we're going to refuse to read this // we're going to refuse to read this
return Err(StorageError::RawJournalCorrupted.into()); return Err(StorageError::RawJournalDecodeEventCorruptedMetadata.into());
} }
self.stats.driver_events += 1; self.stats.driver_events += 1;
// update // update
@ -807,7 +872,8 @@ impl<J: RawJournalAdapter> RawJournalReader<J> {
let event_block = self.tr.read_block::<{ DriverEvent::FULL_EVENT_SIZE }>()?; let event_block = self.tr.read_block::<{ DriverEvent::FULL_EVENT_SIZE }>()?;
let reopen_event = match DriverEvent::decode(event_block) { let reopen_event = match DriverEvent::decode(event_block) {
Some(ev) if ev.event == DriverEventKind::Reopened => ev, Some(ev) if ev.event == DriverEventKind::Reopened => ev,
None | Some(_) => return Err(StorageError::RawJournalEventCorrupted.into()), Some(_) => return Err(StorageError::RawJournalDecodeInvalidEvent.into()),
None => return Err(StorageError::RawJournalDecodeEventCorruptedPayload.into()),
}; };
jtrace_reader!(DriverEventExpectingReopenGotReopen); jtrace_reader!(DriverEventExpectingReopenGotReopen);
let valid_meta = okay! { let valid_meta = okay! {
@ -824,7 +890,7 @@ impl<J: RawJournalAdapter> RawJournalReader<J> {
Ok(false) Ok(false)
} else { } else {
jtrace_reader!(ErrInvalidReopenMetadata); jtrace_reader!(ErrInvalidReopenMetadata);
Err(StorageError::RawJournalCorrupted.into()) Err(StorageError::RawJournalDecodeEventCorruptedMetadata.into())
} }
} }
} }

@ -35,7 +35,7 @@ use {
storage::{ storage::{
common::sdss::sdss_r1::rw::TrackedReader, common::sdss::sdss_r1::rw::TrackedReader,
v2::raw::{ v2::raw::{
journal::raw::{JournalReaderTraceEvent, JournalWriterTraceEvent}, journal::raw::{JournalReaderTraceEvent, JournalSettings, JournalWriterTraceEvent},
spec::SystemDatabaseV1, spec::SystemDatabaseV1,
}, },
}, },
@ -177,7 +177,7 @@ impl RawJournalAdapter for SimpleDBJournal {
file.tracked_read(&mut keybuf)?; file.tracked_read(&mut keybuf)?;
match String::from_utf8(keybuf) { match String::from_utf8(keybuf) {
Ok(k) => gs.data.borrow_mut().push(k), Ok(k) => gs.data.borrow_mut().push(k),
Err(_) => return Err(StorageError::RawJournalEventCorrupted.into()), Err(_) => return Err(StorageError::RawJournalDecodeEventCorruptedPayload.into()),
} }
} }
EventMeta::Clear => gs.data.borrow_mut().clear(), EventMeta::Clear => gs.data.borrow_mut().clear(),
@ -219,7 +219,12 @@ fn journal_open_close() {
} }
{ {
// second boot // second boot
let mut j = open_journal::<SimpleDBJournal>(JOURNAL_NAME, &SimpleDB::new()).unwrap(); let mut j = open_journal::<SimpleDBJournal>(
JOURNAL_NAME,
&SimpleDB::new(),
JournalSettings::default(),
)
.unwrap();
assert_eq!( assert_eq!(
super::obtain_trace(), super::obtain_trace(),
intovec![ intovec![
@ -258,7 +263,12 @@ fn journal_open_close() {
} }
{ {
// third boot // third boot
let mut j = open_journal::<SimpleDBJournal>(JOURNAL_NAME, &SimpleDB::new()).unwrap(); let mut j = open_journal::<SimpleDBJournal>(
JOURNAL_NAME,
&SimpleDB::new(),
JournalSettings::default(),
)
.unwrap();
assert_eq!( assert_eq!(
super::obtain_trace(), super::obtain_trace(),
intovec![ intovec![
@ -336,7 +346,7 @@ fn journal_with_server_single_event() {
{ {
let db = SimpleDB::new(); let db = SimpleDB::new();
// second boot // second boot
let mut j = open_journal::<SimpleDBJournal>(JOURNAL_NAME, &db) let mut j = open_journal::<SimpleDBJournal>(JOURNAL_NAME, &db, JournalSettings::default())
.set_dmsg_fn(|| format!("{:?}", super::obtain_trace())) .set_dmsg_fn(|| format!("{:?}", super::obtain_trace()))
.unwrap(); .unwrap();
assert_eq!(db.data().len(), 1); assert_eq!(db.data().len(), 1);
@ -385,7 +395,8 @@ fn journal_with_server_single_event() {
{ {
// third boot // third boot
let db = SimpleDB::new(); let db = SimpleDB::new();
let mut j = open_journal::<SimpleDBJournal>(JOURNAL_NAME, &db).unwrap(); let mut j =
open_journal::<SimpleDBJournal>(JOURNAL_NAME, &db, JournalSettings::default()).unwrap();
assert_eq!(db.data().len(), 1); assert_eq!(db.data().len(), 1);
assert_eq!(db.data()[0], "hello world"); assert_eq!(db.data()[0], "hello world");
assert_eq!( assert_eq!(
@ -453,7 +464,8 @@ fn multi_boot() {
} }
{ {
let mut db = SimpleDB::new(); let mut db = SimpleDB::new();
let mut j = open_journal::<SimpleDBJournal>("multiboot", &db).unwrap(); let mut j =
open_journal::<SimpleDBJournal>("multiboot", &db, JournalSettings::default()).unwrap();
assert_eq!(db.data().as_ref(), vec!["key_a".to_string()]); assert_eq!(db.data().as_ref(), vec!["key_a".to_string()]);
db.clear(&mut j).unwrap(); db.clear(&mut j).unwrap();
db.push(&mut j, "myfinkey").unwrap(); db.push(&mut j, "myfinkey").unwrap();
@ -461,7 +473,8 @@ fn multi_boot() {
} }
{ {
let db = SimpleDB::new(); let db = SimpleDB::new();
let mut j = open_journal::<SimpleDBJournal>("multiboot", &db).unwrap(); let mut j =
open_journal::<SimpleDBJournal>("multiboot", &db, JournalSettings::default()).unwrap();
assert_eq!(db.data().as_ref(), vec!["myfinkey".to_string()]); assert_eq!(db.data().as_ref(), vec!["myfinkey".to_string()]);
RawJournalWriter::close_driver(&mut j).unwrap(); RawJournalWriter::close_driver(&mut j).unwrap();
} }

@ -30,8 +30,9 @@
use { use {
super::{ super::{
raw::RawJournalAdapterEvent, BatchAdapter, BatchAdapterSpec, BatchDriver, DispatchFn, raw::{JournalSettings, RawJournalAdapterEvent},
EventLogAdapter, EventLogDriver, EventLogSpec, BatchAdapter, BatchAdapterSpec, BatchDriver, DispatchFn, EventLogAdapter, EventLogDriver,
EventLogSpec,
}, },
crate::{ crate::{
engine::{ engine::{
@ -112,13 +113,13 @@ impl EventLogSpec for TestDBAdapter {
const DECODE_DISPATCH: Self::DecodeDispatch = [ const DECODE_DISPATCH: Self::DecodeDispatch = [
|db, payload| { |db, payload| {
if payload.len() < sizeof!(u64) { if payload.len() < sizeof!(u64) {
Err(StorageError::RawJournalCorrupted.into()) Err(StorageError::RawJournalDecodeEventCorruptedMetadata.into())
} else { } else {
let length = let length =
u64::from_le_bytes(unsafe { unsafe_apis::memcpy(&payload[..sizeof!(u64)]) }); u64::from_le_bytes(unsafe { unsafe_apis::memcpy(&payload[..sizeof!(u64)]) });
let payload = &payload[sizeof!(u64)..]; let payload = &payload[sizeof!(u64)..];
if payload.len() as u64 != length { if payload.len() as u64 != length {
Err(StorageError::RawJournalCorrupted.into()) Err(StorageError::RawJournalDecodeEventCorruptedPayload.into())
} else { } else {
let string = String::from_utf8(payload.to_owned()).unwrap(); let string = String::from_utf8(payload.to_owned()).unwrap();
db._mut().push(string); db._mut().push(string);
@ -172,7 +173,7 @@ fn open_log() -> (
super::raw::RawJournalWriter<EventLogAdapter<TestDBAdapter>>, super::raw::RawJournalWriter<EventLogAdapter<TestDBAdapter>>,
) { ) {
let db = TestDB::default(); let db = TestDB::default();
let log = open_journal("jrnl", &db).unwrap(); let log = open_journal("jrnl", &db, JournalSettings::default()).unwrap();
(db, log) (db, log)
} }
@ -381,7 +382,7 @@ fn batch_simple() {
} }
{ {
let db = BatchDB::new(); let db = BatchDB::new();
let mut batch_drv = BatchAdapter::open("mybatch", &db).unwrap(); let mut batch_drv = BatchAdapter::open("mybatch", &db, JournalSettings::default()).unwrap();
db.push(&mut batch_drv, "key3").unwrap(); db.push(&mut batch_drv, "key3").unwrap();
db.push(&mut batch_drv, "key4").unwrap(); db.push(&mut batch_drv, "key4").unwrap();
assert_eq!(db._ref().data, ["key1", "key2", "key3", "key4"]); assert_eq!(db._ref().data, ["key1", "key2", "key3", "key4"]);
@ -389,7 +390,7 @@ fn batch_simple() {
} }
{ {
let db = BatchDB::new(); let db = BatchDB::new();
let mut batch_drv = BatchAdapter::open("mybatch", &db).unwrap(); let mut batch_drv = BatchAdapter::open("mybatch", &db, JournalSettings::default()).unwrap();
db.push(&mut batch_drv, "key5").unwrap(); db.push(&mut batch_drv, "key5").unwrap();
db.push(&mut batch_drv, "key6").unwrap(); db.push(&mut batch_drv, "key6").unwrap();
assert_eq!( assert_eq!(
@ -400,7 +401,9 @@ fn batch_simple() {
} }
{ {
let db = BatchDB::new(); let db = BatchDB::new();
let mut batch_drv = BatchAdapter::<BatchDBAdapter>::open("mybatch", &db).unwrap(); let mut batch_drv =
BatchAdapter::<BatchDBAdapter>::open("mybatch", &db, JournalSettings::default())
.unwrap();
assert_eq!( assert_eq!(
db._ref().data, db._ref().data,
["key1", "key2", "key3", "key4", "key5", "key6"] ["key1", "key2", "key3", "key4", "key5", "key6"]

@ -24,11 +24,10 @@
* *
*/ */
use quote::quote;
use { use {
crate::util::{self, AttributeKind}, crate::util::{self, AttributeKind},
proc_macro::TokenStream, proc_macro::TokenStream,
quote::quote,
std::collections::HashMap, std::collections::HashMap,
syn::{parse_macro_input, AttributeArgs, ItemFn}, syn::{parse_macro_input, AttributeArgs, ItemFn},
}; };

@ -24,8 +24,10 @@
* *
*/ */
use proc_macro2::Ident; use {
use syn::{Lit, Meta, MetaNameValue, NestedMeta, Path}; proc_macro2::Ident,
syn::{Lit, Meta, MetaNameValue, NestedMeta, Path},
};
pub enum AttributeKind { pub enum AttributeKind {
Lit(Lit), Lit(Lit),

Loading…
Cancel
Save