diff --git a/server/src/engine/config.rs b/server/src/engine/config.rs index 9c1e8ee5..33ba576d 100644 --- a/server/src/engine/config.rs +++ b/server/src/engine/config.rs @@ -652,6 +652,7 @@ pub enum CLIConfigParseReturn { Version, /// We yielded a config YieldedConfig(T), + Repair, } impl CLIConfigParseReturn { @@ -670,10 +671,21 @@ impl CLIConfigParseReturn { pub fn parse_cli_args<'a, T: 'a + AsRef>( src: impl Iterator, ) -> RuntimeResult> { - let mut args_iter = src.into_iter().skip(1); + let mut args_iter = src.into_iter().skip(1).peekable(); let mut cli_args: ParsedRawArgs = HashMap::new(); while let Some(arg) = args_iter.next() { let arg = arg.as_ref(); + if arg == "repair" { + if args_iter.peek().is_none() { + return Ok(CLIConfigParseReturn::Repair); + } else { + return Err(ConfigError::with_src( + ConfigSource::Cli, + ConfigErrorKind::ErrorString("to use `repair`, just run `skyd repair`".into()), + ) + .into()); + } + } if arg == "--help" || arg == "-h" { return Ok(CLIConfigParseReturn::Help); } @@ -978,6 +990,7 @@ pub enum ConfigReturn { HelpMessage(String), /// A configuration that we have fully validated was provided Config(Configuration), + Repair, } impl ConfigReturn { @@ -1105,6 +1118,7 @@ pub fn check_configuration() -> RuntimeResult { libsky::VERSION ))); } + CLIConfigParseReturn::Repair => return Ok(ConfigReturn::Repair), CLIConfigParseReturn::YieldedConfig(cfg) => Some(cfg), }; match cli_args { diff --git a/server/src/engine/macros.rs b/server/src/engine/macros.rs index ee3090bc..2968364b 100644 --- a/server/src/engine/macros.rs +++ b/server/src/engine/macros.rs @@ -460,3 +460,7 @@ macro_rules! e { r($e) }}; } + +macro_rules! l { + (let $($name:ident),* = $($expr:expr),*) => { let ($($name),*) = ($($expr),*); } +} diff --git a/server/src/engine/mod.rs b/server/src/engine/mod.rs index 101ee060..ab16eb14 100644 --- a/server/src/engine/mod.rs +++ b/server/src/engine/mod.rs @@ -42,7 +42,7 @@ mod txn; #[cfg(test)] mod tests; // re-export -pub use error::RuntimeResult; +pub use {error::RuntimeResult, fractal::Global}; use crate::engine::storage::SELoaded; @@ -198,3 +198,7 @@ pub fn finish(g: fractal::Global) { g.unload_all(); } } + +pub fn repair() -> RuntimeResult<()> { + storage::repair() +} diff --git a/server/src/engine/storage/common/interface/fs.rs b/server/src/engine/storage/common/interface/fs.rs index 58551822..4a9a9e12 100644 --- a/server/src/engine/storage/common/interface/fs.rs +++ b/server/src/engine/storage/common/interface/fs.rs @@ -30,6 +30,8 @@ file system */ +use crate::util; + #[cfg(test)] use super::vfs::{VFileDescriptor, VirtualFS}; use { @@ -56,6 +58,28 @@ impl FileSystem { } impl FileSystem { + #[inline(always)] + pub fn copy_directory(from: &str, to: &str) -> IoResult<()> { + #[cfg(test)] + { + match Self::context() { + FSContext::Local => {} + FSContext::Virtual => return VirtualFS::instance().write().fs_copy(from, to), + } + } + util::os::recursive_copy(from, to) + } + #[inline(always)] + pub fn copy(from: &str, to: &str) -> IoResult<()> { + #[cfg(test)] + { + match Self::context() { + FSContext::Local => {} + FSContext::Virtual => return VirtualFS::instance().write().fs_copy(from, to), + } + } + std_fs::copy(from, to).map(|_| ()) + } #[inline(always)] pub fn read(path: &str) -> IoResult> { #[cfg(test)] diff --git a/server/src/engine/storage/common/interface/vfs.rs b/server/src/engine/storage/common/interface/vfs.rs index 428933a4..51f5bb4b 100644 --- a/server/src/engine/storage/common/interface/vfs.rs +++ b/server/src/engine/storage/common/interface/vfs.rs @@ -62,6 +62,19 @@ enum VNode { File(RwLock), } +impl VNode { + fn clone_into_new_node(&self) -> Self { + match self { + Self::Dir(d) => Self::Dir( + d.iter() + .map(|(id, data)| (id.clone(), data.clone_into_new_node())) + .collect(), + ), + Self::File(f) => Self::File(RwLock::new(f.read().clone_to_new_file())), + } + } +} + #[derive(Debug)] pub(super) struct VFile { read: bool, @@ -103,6 +116,14 @@ impl Drop for VFileDescriptor { */ impl VFile { + pub fn clone_to_new_file(&self) -> Self { + Self { + read: false, + write: false, + data: self.data.clone(), + pos: 0, + } + } pub fn truncate(&mut self, to: u64) -> IoResult<()> { if !self.write { return Err(Error::new(ErrorKind::PermissionDenied, "Write permission denied").into()); @@ -187,6 +208,30 @@ impl VirtualFS { pub fn get_data(&self, path: &str) -> IoResult> { self.with_file(path, |f| Ok(f.data.clone())) } + pub fn fs_copy(&mut self, from: &str, to: &str) -> IoResult<()> { + let node = self.with_item(from, |node| Ok(node.clone_into_new_node()))?; + // process components + let (target, components) = util::split_target_and_components(to); + let mut current = &mut self.root; + for component in components { + match current.get_mut(component) { + Some(VNode::Dir(dir)) => { + current = dir; + } + Some(VNode::File(_)) => return err::file_in_dir_path(), + None => return err::dir_missing_in_path(), + } + } + match current.entry(target.into()) { + Entry::Occupied(mut item) => { + item.insert(node); + } + Entry::Vacant(ve) => { + ve.insert(node); + } + } + Ok(()) + } pub fn fs_fcreate_rw(&mut self, fpath: &str) -> IoResult { let (target_file, components) = util::split_target_and_components(fpath); let target_dir = util::find_target_dir_mut(components, &mut self.root)?; @@ -354,16 +399,13 @@ impl VirtualFS { fpath: &str, f: impl FnOnce(&VFile) -> IoResult, ) -> IoResult { - let (target_file, components) = util::split_target_and_components(fpath); - let target_dir = util::find_target_dir(components, &self.root)?; - match target_dir.get(target_file) { - Some(VNode::File(file)) => { + self.with_item(fpath, |node| match node { + VNode::File(file) => { let f_ = file.read(); f(&f_) } - Some(VNode::Dir(_)) => return err::item_is_not_file(), - None => return Err(Error::from(ErrorKind::NotFound).into()), - } + VNode::Dir(_) => err::item_is_not_file(), + }) } fn with_item_mut( &mut self, @@ -387,6 +429,24 @@ impl VirtualFS { Entry::Vacant(_) => return err::could_not_find_item(), } } + fn with_item(&self, fpath: &str, f: impl FnOnce(&VNode) -> IoResult) -> IoResult { + // process components + let (target, components) = util::split_target_and_components(fpath); + let mut current = &self.root; + for component in components { + match current.get(component) { + Some(VNode::Dir(dir)) => { + current = dir; + } + Some(VNode::File(_)) => return err::file_in_dir_path(), + None => return err::dir_missing_in_path(), + } + } + match current.get(target.into()) { + Some(item) => return f(item), + None => return err::could_not_find_item(), + } + } fn dir_delete(&mut self, fpath: &str, allow_if_non_empty: bool) -> IoResult<()> { self.with_item_mut(fpath, |node| match node.get() { VNode::Dir(d) => { diff --git a/server/src/engine/storage/common/sdss/impls/sdss_r1/rw.rs b/server/src/engine/storage/common/sdss/impls/sdss_r1/rw.rs index 8c8a66fa..cdf900c2 100644 --- a/server/src/engine/storage/common/sdss/impls/sdss_r1/rw.rs +++ b/server/src/engine/storage/common/sdss/impls/sdss_r1/rw.rs @@ -140,6 +140,9 @@ impl SdssFile { self.file.fwrite_all(data)?; self.file.fsync_all() } + pub fn truncate(&mut self, new_size: u64) -> IoResult<()> { + self.file.f_truncate(new_size) + } } /* @@ -259,6 +262,9 @@ impl TrackedReader { pub fn cursor(&self) -> u64 { self.cursor } + pub fn cached_size(&self) -> u64 { + self.len + } } impl TrackedReader { diff --git a/server/src/engine/storage/mod.rs b/server/src/engine/storage/mod.rs index 2bec1cfa..15844578 100644 --- a/server/src/engine/storage/mod.rs +++ b/server/src/engine/storage/mod.rs @@ -57,6 +57,10 @@ pub struct SELoaded { pub gns: GlobalNS, } +pub fn repair() -> RuntimeResult<()> { + v2::repair() +} + pub fn load(cfg: &Configuration) -> RuntimeResult { // first determine if this is a new install, an existing install or if it uses the old driver if Path::new(v1::SYSDB_PATH).is_file() { diff --git a/server/src/engine/storage/v2/mod.rs b/server/src/engine/storage/v2/mod.rs index 020ea18b..33e11a57 100644 --- a/server/src/engine/storage/v2/mod.rs +++ b/server/src/engine/storage/v2/mod.rs @@ -27,26 +27,32 @@ use { self::{ impls::mdl_journal::{BatchStats, FullModel}, - raw::journal::JournalSettings, + raw::journal::{JournalSettings, RepairResult}, }, super::{common::interface::fs::FileSystem, v1, SELoaded}, - crate::engine::{ - config::Configuration, - core::{ - system_db::{SystemDatabase, VerifyUser}, - GNSData, GlobalNS, - }, - fractal::{context, FractalGNSDriver}, - storage::common::paths_v1, - txn::{ - gns::{ - model::CreateModelTxn, - space::CreateSpaceTxn, - sysctl::{AlterUserTxn, CreateUserTxn}, + crate::{ + engine::{ + config::Configuration, + core::{ + system_db::{SystemDatabase, VerifyUser}, + EntityIDRef, GNSData, GlobalNS, + }, + fractal::{context, FractalGNSDriver}, + storage::{ + common::paths_v1, + v2::raw::journal::{self, JournalRepairMode}, }, - SpaceIDRef, + txn::{ + gns::{ + model::CreateModelTxn, + space::CreateSpaceTxn, + sysctl::{AlterUserTxn, CreateUserTxn}, + }, + SpaceIDRef, + }, + RuntimeResult, }, - RuntimeResult, + util, }, impls::mdl_journal::ModelDriver, }; @@ -159,3 +165,69 @@ pub fn restore(cfg: &Configuration) -> RuntimeResult { gns: GlobalNS::new(gns, FractalGNSDriver::new(gns_driver)), }) } + +pub fn repair() -> RuntimeResult<()> { + // back up all files + let backup_dir = format!("backups/{}-before-repair-backup", util::time_now_string()); + context::set_dmsg("creating backup directory"); + FileSystem::create_dir_all(&backup_dir)?; + context::set_dmsg("backing up GNS"); + FileSystem::copy(GNS_PATH, &format!("{backup_dir}/{GNS_PATH}"))?; // backup GNS + context::set_dmsg("backing up data directory"); + FileSystem::copy_directory(DATA_DIR, &format!("{backup_dir}/{DATA_DIR}"))?; // backup data + info!("All data backed up in {backup_dir}"); + // check and attempt repair: GNS + let gns = GNSData::empty(); + context::set_dmsg("repair GNS"); + print_repair_info( + journal::repair_journal::>( + GNS_PATH, + &gns, + JournalSettings::default(), + JournalRepairMode::Simple, + )?, + "GNS", + ); + // check and attempt repair: models + let models = gns.idx_models().read(); + for (space_id, space) in gns.idx().read().iter() { + for model_id in space.models().iter() { + let model = models.get(&EntityIDRef::new(&space_id, &model_id)).unwrap(); + let model_data_file_path = paths_v1::model_path( + &space_id, + space.get_uuid(), + &model_id, + model.data().get_uuid(), + ); + context::set_dmsg(format!("repairing {model_data_file_path}")); + print_repair_info( + journal::repair_journal::< + raw::journal::BatchAdapter, + >( + &model_data_file_path, + model.data(), + JournalSettings::default(), + JournalRepairMode::Simple, + )?, + &model_data_file_path, + ) + } + } + Ok(()) +} + +fn print_repair_info(result: RepairResult, id: &str) { + match result { + RepairResult::NoErrors => info!("repair: no errors detected in {id}"), + RepairResult::LostBytes(lost) => { + warn!("repair: LOST DATA. repaired {id} but lost {lost} trailing bytes") + } + RepairResult::UnspecifiedLoss(definitely_lost) => { + if definitely_lost == 0 { + warn!("repair: LOST DATA. repaired {id} but lost an unspecified amount of data") + } else { + warn!("repair: LOST DATA. repaired {id} but lost atleast {definitely_lost} trailing bytes") + } + } + } +} diff --git a/server/src/engine/storage/v2/raw/journal/mod.rs b/server/src/engine/storage/v2/raw/journal/mod.rs index c363f918..0da3f3de 100644 --- a/server/src/engine/storage/v2/raw/journal/mod.rs +++ b/server/src/engine/storage/v2/raw/journal/mod.rs @@ -47,8 +47,8 @@ mod raw; #[cfg(test)] mod tests; pub use raw::{ - create_journal, open_journal, JournalSettings, RawJournalAdapter, - RawJournalAdapterEvent as JournalAdapterEvent, + create_journal, open_journal, repair_journal, JournalRepairMode, JournalSettings, + RawJournalAdapter, RawJournalAdapterEvent as JournalAdapterEvent, RepairResult, }; /* diff --git a/server/src/engine/storage/v2/raw/journal/raw/mod.rs b/server/src/engine/storage/v2/raw/journal/raw/mod.rs index 03a1dadd..fc000de3 100644 --- a/server/src/engine/storage/v2/raw/journal/raw/mod.rs +++ b/server/src/engine/storage/v2/raw/journal/raw/mod.rs @@ -31,6 +31,7 @@ use { crate::{ engine::{ error::{ErrorKind, StorageError, TransactionError}, + fractal::error::Error, mem::unsafe_apis::memcpy, storage::common::{ checksum::SCrc64, @@ -77,6 +78,36 @@ where RawJournalWriter::new(initializer, file) } +#[derive(Debug)] +/// The result of a journal repair operation +pub enum RepairResult { + /// No errors were detected + NoErrors, + /// Lost n bytes + LostBytes(u64), + /// Definitely lost n bytes, but might have lost more + UnspecifiedLoss(u64), +} + +/** + Attempts to repair the given journal, **in-place** and returns the number of bytes that were definitely lost and could not + be repaired. + + **WARNING**: Backup before calling this +*/ +pub fn repair_journal( + log_path: &str, + gs: &J::GlobalState, + settings: JournalSettings, + repair_mode: JournalRepairMode, +) -> RuntimeResult +where + J::Spec: FileSpecV1, +{ + let log = SdssFile::::open(log_path)?; + RawJournalReader::::repair(log, gs, settings, repair_mode).map(|(lost, ..)| lost) +} + #[derive(Debug)] pub struct JournalInitializer { cursor: u64, @@ -323,7 +354,6 @@ impl DriverEvent { const OFFSET_6_LAST_TXN_ID: Range = Self::OFFSET_5_LAST_OFFSET.end..Self::OFFSET_5_LAST_OFFSET.end + sizeof!(u64); /// Create a new driver event (checksum auto-computed) - #[cfg(test)] fn new( txn_id: u128, driver_event: DriverEventKind, @@ -365,7 +395,6 @@ impl DriverEvent { } } /// Encode the current driver event - #[cfg(test)] fn encode_self(&self) -> [u8; 64] { Self::encode( self.txn_id, @@ -642,6 +671,21 @@ pub struct RawJournalReader { last_txn_checksum: u64, stats: JournalStats, _settings: JournalSettings, + state: JournalState, +} + +#[derive(Debug, PartialEq)] +enum JournalState { + AwaitingEvent, + AwaitingServerEvent, + AwaitingClose, + AwaitingReopen, +} + +impl Default for JournalState { + fn default() -> Self { + Self::AwaitingEvent + } } #[derive(Debug)] @@ -686,23 +730,26 @@ impl RawJournalReader { )?; jtrace_reader!(Initialized); let mut me = Self::new(reader, 0, 0, 0, 0, settings); + me._scroll(gs).map(|jinit| (jinit, me.tr.into_inner())) + } + fn _scroll(&mut self, gs: &J::GlobalState) -> RuntimeResult { loop { - match me._apply_next_event_and_stop(gs) { + match self._apply_next_event_and_stop(gs) { Ok(true) => { jtrace_reader!(Completed); let initializer = JournalInitializer::new( - me.tr.cursor(), - me.tr.checksum(), - me.txn_id, + self.tr.cursor(), + self.tr.checksum(), + self.txn_id, // NB: the last txn offset is important because it indicates that the log is new - me.last_txn_offset, + self.last_txn_offset, ); - let file = me.tr.into_inner(); - return Ok((initializer, file)); + return Ok(initializer); } Ok(false) => {} Err(e) => return Err(e), } + self.state = JournalState::AwaitingEvent; } } fn new( @@ -721,6 +768,7 @@ impl RawJournalReader { last_txn_checksum, stats: JournalStats::new(), _settings: settings, + state: JournalState::AwaitingEvent, } } fn __refresh_known_txn(me: &mut Self) { @@ -737,74 +785,135 @@ pub enum JournalRepairMode { } impl RawJournalReader { - pub fn repair( + fn repair( file: SdssFile<::Spec>, gs: &J::GlobalState, settings: JournalSettings, repair_mode: JournalRepairMode, - ) -> RuntimeResult<()> { + ) -> RuntimeResult<(RepairResult, JournalInitializer, SdssFile)> { + let reader = TrackedReader::with_cursor( + file, + <::Spec as FileSpecV1>::SIZE as u64, + )?; + jtrace_reader!(Initialized); + let mut me = Self::new(reader, 0, 0, 0, 0, settings); + match me._scroll(gs) { + Ok(init) => return Ok((RepairResult::NoErrors, init, me.tr.into_inner())), + Err(e) => me.start_repair(e, repair_mode), + } + } + fn start_repair( + self, + e: Error, + repair_mode: JournalRepairMode, + ) -> RuntimeResult<(RepairResult, JournalInitializer, SdssFile)> { + let lost = self.tr.cached_size() - self.tr.cursor(); + let mut repair_result = RepairResult::LostBytes(lost); match repair_mode { JournalRepairMode::Simple => {} } - match Self::scroll(file, gs, settings) { - Ok(_) => { - // no error detected - return Ok(()); - } - Err(e) => { - // now it's our task to determine exactly what happened - match e.kind() { - ErrorKind::IoError(io) => match io.kind() { - IoErrorKind::UnexpectedEof => { - /* - this is the only kind of error that we can actually repair since it indicates that a part of the - file is "missing." we can't deal with things like permission errors. that's supposed to be handled - by the admin by looking through the error logs - */ - } - _ => return Err(e), - }, - ErrorKind::Storage(e) => match e { - // unreachable errors (no execution path here) - StorageError::RawJournalRuntimeHeartbeatFail // can't reach runtime error before driver start - | StorageError::RawJournalRuntimeDirty - | StorageError::FileDecodeHeaderVersionMismatch // should be caught earlier - | StorageError::FileDecodeHeaderCorrupted // should be caught earlier - | StorageError::V1JournalDecodeLogEntryCorrupted // v1 errors can't be raised here - | StorageError::V1JournalDecodeCorrupted - | StorageError::V1DataBatchDecodeCorruptedBatch - | StorageError::V1DataBatchDecodeCorruptedEntry - | StorageError::V1DataBatchDecodeCorruptedBatchFile - | StorageError::V1SysDBDecodeCorrupted - | StorageError::V1DataBatchRuntimeCloseError => unreachable!(), - // possible errors - StorageError::InternalDecodeStructureCorrupted - | StorageError::InternalDecodeStructureCorruptedPayload - | StorageError::InternalDecodeStructureIllegalData - | StorageError::RawJournalDecodeEventCorruptedMetadata - | StorageError::RawJournalDecodeEventCorruptedPayload - | StorageError::RawJournalDecodeBatchContentsMismatch - | StorageError::RawJournalDecodeBatchIntegrityFailure - | StorageError::RawJournalDecodeInvalidEvent - | StorageError::RawJournalDecodeCorruptionInBatchMetadata => {} - }, - ErrorKind::Txn(txerr) => match txerr { - // unreachable errors - TransactionError::V1DecodeCorruptedPayloadMoreBytes // no v1 errors - | TransactionError::V1DecodedUnexpectedEof - | TransactionError::V1DecodeUnknownTxnOp => unreachable!(), - // possible errors - TransactionError::OnRestoreDataConflictAlreadyExists | - TransactionError::OnRestoreDataMissing | - TransactionError::OnRestoreDataConflictMismatch => {}, - }, - // these errors do not have an execution pathway - ErrorKind::Other(_) => unreachable!(), - ErrorKind::Config(_) => unreachable!(), + // now it's our task to determine exactly what happened + match e.kind() { + ErrorKind::IoError(io) => match io.kind() { + IoErrorKind::UnexpectedEof => { + /* + this is the only kind of error that we can actually repair since it indicates that a part of the + file is "missing." we can't deal with things like permission errors. that's supposed to be handled + by the admin by looking through the error logs + */ + } + _ => return Err(e), + }, + ErrorKind::Storage(e) => match e { + // unreachable errors (no execution path here) + StorageError::RawJournalRuntimeHeartbeatFail // can't reach runtime error before driver start + | StorageError::RawJournalRuntimeDirty + | StorageError::FileDecodeHeaderVersionMismatch // should be caught earlier + | StorageError::FileDecodeHeaderCorrupted // should be caught earlier + | StorageError::V1JournalDecodeLogEntryCorrupted // v1 errors can't be raised here + | StorageError::V1JournalDecodeCorrupted + | StorageError::V1DataBatchDecodeCorruptedBatch + | StorageError::V1DataBatchDecodeCorruptedEntry + | StorageError::V1DataBatchDecodeCorruptedBatchFile + | StorageError::V1SysDBDecodeCorrupted + | StorageError::V1DataBatchRuntimeCloseError => unreachable!(), + // possible errors + StorageError::InternalDecodeStructureCorrupted + | StorageError::InternalDecodeStructureCorruptedPayload + | StorageError::InternalDecodeStructureIllegalData + | StorageError::RawJournalDecodeEventCorruptedMetadata + | StorageError::RawJournalDecodeEventCorruptedPayload + | StorageError::RawJournalDecodeBatchContentsMismatch + | StorageError::RawJournalDecodeBatchIntegrityFailure + | StorageError::RawJournalDecodeInvalidEvent + | StorageError::RawJournalDecodeCorruptionInBatchMetadata => {} + }, + ErrorKind::Txn(txerr) => match txerr { + // unreachable errors + TransactionError::V1DecodeCorruptedPayloadMoreBytes // no v1 errors + | TransactionError::V1DecodedUnexpectedEof + | TransactionError::V1DecodeUnknownTxnOp => unreachable!(), + // possible errors + TransactionError::OnRestoreDataConflictAlreadyExists | + TransactionError::OnRestoreDataMissing | + TransactionError::OnRestoreDataConflictMismatch => {}, + }, + // these errors do not have an execution pathway + ErrorKind::Other(_) => unreachable!(), + ErrorKind::Config(_) => unreachable!(), + } + /* + revert log. record previous signatures. + */ + l!(let known_event_id, known_event_offset, known_event_checksum = self.last_txn_id, self.last_txn_offset, self.last_txn_checksum); + let mut last_logged_checksum = self.tr.checksum(); + let was_eof = self.tr.is_eof(); + let mut base_log = self.tr.into_inner(); + base_log.truncate(known_event_offset)?; + /* + see what needs to be done next + */ + match self.state { + JournalState::AwaitingEvent + | JournalState::AwaitingServerEvent + | JournalState::AwaitingClose => { + /* + no matter what the last event was (and definitely not a close since if we are expecting a close the log was not already closed), + the log is in a dirty state that can only be resolved by closing it + */ + let drv_close = DriverEvent::new( + (known_event_id + 1) as u128, + DriverEventKind::Closed, + known_event_checksum, + known_event_offset, + known_event_id, + ); + if matches!(self.state, JournalState::AwaitingClose) & was_eof { + // we reached eof and we were expecting a close. definitely lost an unspecified number of bytes + repair_result = RepairResult::UnspecifiedLoss(lost); } + let drv_close_event = drv_close.encode_self(); + last_logged_checksum.update(&drv_close_event); + base_log.fsynced_write(&drv_close_event)?; + } + JournalState::AwaitingReopen => { + // extra bytes indicating low to severe corruption; last event is a close, so with the revert the log is now clean } } - todo!() + let jinit_cursor = known_event_offset + DriverEvent::FULL_EVENT_SIZE as u64; + let jinit_last_txn_offset = jinit_cursor; // same as cursor + let jinit_event_id = known_event_id + 2; // since we already used +1 + let jinit_checksum = last_logged_checksum; + Ok(( + repair_result, + JournalInitializer::new( + jinit_cursor, + jinit_checksum, + jinit_event_id, + jinit_last_txn_offset, + ), + base_log, + )) } } @@ -823,6 +932,7 @@ impl RawJournalReader { // check for a server event // is this a server event? if meta & SERVER_EV_MASK != 0 { + self.state = JournalState::AwaitingServerEvent; jtrace_reader!(DetectedServerEvent); let meta = meta & !SERVER_EV_MASK; match J::parse_event_meta(meta) { @@ -844,6 +954,7 @@ impl RawJournalReader { None => return Err(StorageError::RawJournalDecodeEventCorruptedMetadata.into()), } } + self.state = JournalState::AwaitingClose; return self.handle_close(txn_id, meta); } fn handle_close( @@ -899,6 +1010,7 @@ impl RawJournalReader { // yes, we're done return Ok(true); } + self.state = JournalState::AwaitingReopen; return self.handle_reopen(); } fn handle_reopen(&mut self) -> RuntimeResult { diff --git a/server/src/main.rs b/server/src/main.rs index 252f5f6f..808e5a72 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -72,27 +72,52 @@ fn main() { ConfigReturn::HelpMessage(msg) => { exit!(eprintln!("{msg}"), 0x00) } + ConfigReturn::Repair => return self::repair(), }, Err(e) => exit_fatal!(error!("{e}")), }; self::entrypoint(config) } +fn init() -> engine::RuntimeResult<(util::os::FileLock, tokio::runtime::Runtime)> { + let f_rt_start = || { + engine::set_context_init("locking PID file"); + let pid_file = util::os::FileLock::new(SKY_PID_FILE)?; + engine::set_context_init("initializing runtime"); + let runtime = tokio::runtime::Builder::new_multi_thread() + .thread_name("server") + .enable_all() + .build()?; + Ok((pid_file, runtime)) + }; + f_rt_start() +} + +fn exit( + global: Option, + pid_file: Option, + result: engine::RuntimeResult<()>, +) { + if let Some(g) = global { + info!("cleaning up data"); + engine::finish(g); + } + if let Some(_) = pid_file { + if let Err(e) = std::fs::remove_file(SKY_PID_FILE) { + error!("failed to remove PID file: {e}"); + } + } + match result { + Ok(()) => println!("goodbye"), + Err(e) => exit_fatal!(error!("{e}")), + } +} + fn entrypoint(config: engine::config::Configuration) { println!("{TEXT}\nSkytable v{VERSION} | {URL}\n"); let run = || { - let f_rt_start = || { - engine::set_context_init("locking PID file"); - let pid_file = util::os::FileLock::new(SKY_PID_FILE)?; - engine::set_context_init("initializing runtime"); - let runtime = tokio::runtime::Builder::new_multi_thread() - .thread_name("server") - .enable_all() - .build()?; - Ok((pid_file, runtime)) - }; - let (pid_file, runtime) = match f_rt_start() { - Ok((pf, rt)) => (pf, rt), + let (pid_file, runtime) = match init() { + Ok(pr) => pr, Err(e) => return (None, None, Err(e)), }; let f_glob_init = runtime.block_on(async move { @@ -113,17 +138,22 @@ fn entrypoint(config: engine::config::Configuration) { (Some(pid_file), Some(g), result_start) }; let (pid_file, global, result) = run(); - if let Some(g) = global { - info!("cleaning up data"); - engine::finish(g); - } - if let Some(_) = pid_file { - if let Err(e) = std::fs::remove_file(SKY_PID_FILE) { - error!("failed to remove PID file: {e}"); - } - } - match result { - Ok(()) => println!("goodbye"), - Err(e) => exit_fatal!(error!("{e}")), - } + self::exit(global, pid_file, result); +} + +fn repair() { + let (pid_file, rt) = match init() { + Ok(init) => init, + Err(e) => exit_fatal!(error!("failed to start repair task: {e}")), + }; + let result = rt.block_on(async move { + engine::set_context_init("binding system signals"); + let signal = util::os::TerminationSignal::init()?; + let result = tokio::task::spawn_blocking(|| engine::repair()) + .await + .unwrap(); + drop(signal); + result + }); + self::exit(None, Some(pid_file), result) } diff --git a/server/src/util/mod.rs b/server/src/util/mod.rs index e15ea2b9..5c9ef9fa 100644 --- a/server/src/util/mod.rs +++ b/server/src/util/mod.rs @@ -425,10 +425,11 @@ macro_rules! impl_endian { impl_endian!(u8, i8, u16, i16, u32, i32, u64, i64, usize, isize); +pub fn time_now_string() -> String { + chrono::Local::now().format("%Y%m%d_%H%M%S").to_string() +} + pub fn time_now_with_postfix(post_fix: &str) -> String { - let now = chrono::Local::now(); - // Format the current date and time as YYYYMMDD_HHMMSS - let formatted_date_time = now.format("%Y%m%d_%H%M%S").to_string(); // Concatenate the formatted date and time with the postfix - format!("{}-{}", formatted_date_time, post_fix) + format!("{}-{}", time_now_string(), post_fix) }