|
|
|
@ -31,6 +31,7 @@ use {
|
|
|
|
|
crate::{
|
|
|
|
|
engine::{
|
|
|
|
|
error::{ErrorKind, StorageError, TransactionError},
|
|
|
|
|
fractal::error::Error,
|
|
|
|
|
mem::unsafe_apis::memcpy,
|
|
|
|
|
storage::common::{
|
|
|
|
|
checksum::SCrc64,
|
|
|
|
@ -77,6 +78,36 @@ where
|
|
|
|
|
RawJournalWriter::new(initializer, file)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[derive(Debug)]
|
|
|
|
|
/// The result of a journal repair operation
|
|
|
|
|
pub enum RepairResult {
|
|
|
|
|
/// No errors were detected
|
|
|
|
|
NoErrors,
|
|
|
|
|
/// Lost n bytes
|
|
|
|
|
LostBytes(u64),
|
|
|
|
|
/// Definitely lost n bytes, but might have lost more
|
|
|
|
|
UnspecifiedLoss(u64),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
Attempts to repair the given journal, **in-place** and returns the number of bytes that were definitely lost and could not
|
|
|
|
|
be repaired.
|
|
|
|
|
|
|
|
|
|
**WARNING**: Backup before calling this
|
|
|
|
|
*/
|
|
|
|
|
pub fn repair_journal<J: RawJournalAdapter>(
|
|
|
|
|
log_path: &str,
|
|
|
|
|
gs: &J::GlobalState,
|
|
|
|
|
settings: JournalSettings,
|
|
|
|
|
repair_mode: JournalRepairMode,
|
|
|
|
|
) -> RuntimeResult<RepairResult>
|
|
|
|
|
where
|
|
|
|
|
J::Spec: FileSpecV1<DecodeArgs = ()>,
|
|
|
|
|
{
|
|
|
|
|
let log = SdssFile::<J::Spec>::open(log_path)?;
|
|
|
|
|
RawJournalReader::<J>::repair(log, gs, settings, repair_mode).map(|(lost, ..)| lost)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[derive(Debug)]
|
|
|
|
|
pub struct JournalInitializer {
|
|
|
|
|
cursor: u64,
|
|
|
|
@ -323,7 +354,6 @@ impl DriverEvent {
|
|
|
|
|
const OFFSET_6_LAST_TXN_ID: Range<usize> =
|
|
|
|
|
Self::OFFSET_5_LAST_OFFSET.end..Self::OFFSET_5_LAST_OFFSET.end + sizeof!(u64);
|
|
|
|
|
/// Create a new driver event (checksum auto-computed)
|
|
|
|
|
#[cfg(test)]
|
|
|
|
|
fn new(
|
|
|
|
|
txn_id: u128,
|
|
|
|
|
driver_event: DriverEventKind,
|
|
|
|
@ -365,7 +395,6 @@ impl DriverEvent {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
/// Encode the current driver event
|
|
|
|
|
#[cfg(test)]
|
|
|
|
|
fn encode_self(&self) -> [u8; 64] {
|
|
|
|
|
Self::encode(
|
|
|
|
|
self.txn_id,
|
|
|
|
@ -642,6 +671,21 @@ pub struct RawJournalReader<J: RawJournalAdapter> {
|
|
|
|
|
last_txn_checksum: u64,
|
|
|
|
|
stats: JournalStats,
|
|
|
|
|
_settings: JournalSettings,
|
|
|
|
|
state: JournalState,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[derive(Debug, PartialEq)]
|
|
|
|
|
enum JournalState {
|
|
|
|
|
AwaitingEvent,
|
|
|
|
|
AwaitingServerEvent,
|
|
|
|
|
AwaitingClose,
|
|
|
|
|
AwaitingReopen,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl Default for JournalState {
|
|
|
|
|
fn default() -> Self {
|
|
|
|
|
Self::AwaitingEvent
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[derive(Debug)]
|
|
|
|
@ -686,23 +730,26 @@ impl<J: RawJournalAdapter> RawJournalReader<J> {
|
|
|
|
|
)?;
|
|
|
|
|
jtrace_reader!(Initialized);
|
|
|
|
|
let mut me = Self::new(reader, 0, 0, 0, 0, settings);
|
|
|
|
|
me._scroll(gs).map(|jinit| (jinit, me.tr.into_inner()))
|
|
|
|
|
}
|
|
|
|
|
fn _scroll(&mut self, gs: &J::GlobalState) -> RuntimeResult<JournalInitializer> {
|
|
|
|
|
loop {
|
|
|
|
|
match me._apply_next_event_and_stop(gs) {
|
|
|
|
|
match self._apply_next_event_and_stop(gs) {
|
|
|
|
|
Ok(true) => {
|
|
|
|
|
jtrace_reader!(Completed);
|
|
|
|
|
let initializer = JournalInitializer::new(
|
|
|
|
|
me.tr.cursor(),
|
|
|
|
|
me.tr.checksum(),
|
|
|
|
|
me.txn_id,
|
|
|
|
|
self.tr.cursor(),
|
|
|
|
|
self.tr.checksum(),
|
|
|
|
|
self.txn_id,
|
|
|
|
|
// NB: the last txn offset is important because it indicates that the log is new
|
|
|
|
|
me.last_txn_offset,
|
|
|
|
|
self.last_txn_offset,
|
|
|
|
|
);
|
|
|
|
|
let file = me.tr.into_inner();
|
|
|
|
|
return Ok((initializer, file));
|
|
|
|
|
return Ok(initializer);
|
|
|
|
|
}
|
|
|
|
|
Ok(false) => {}
|
|
|
|
|
Err(e) => return Err(e),
|
|
|
|
|
}
|
|
|
|
|
self.state = JournalState::AwaitingEvent;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
fn new(
|
|
|
|
@ -721,6 +768,7 @@ impl<J: RawJournalAdapter> RawJournalReader<J> {
|
|
|
|
|
last_txn_checksum,
|
|
|
|
|
stats: JournalStats::new(),
|
|
|
|
|
_settings: settings,
|
|
|
|
|
state: JournalState::AwaitingEvent,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
fn __refresh_known_txn(me: &mut Self) {
|
|
|
|
@ -737,74 +785,135 @@ pub enum JournalRepairMode {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<J: RawJournalAdapter> RawJournalReader<J> {
|
|
|
|
|
pub fn repair(
|
|
|
|
|
fn repair(
|
|
|
|
|
file: SdssFile<<J as RawJournalAdapter>::Spec>,
|
|
|
|
|
gs: &J::GlobalState,
|
|
|
|
|
settings: JournalSettings,
|
|
|
|
|
repair_mode: JournalRepairMode,
|
|
|
|
|
) -> RuntimeResult<()> {
|
|
|
|
|
) -> RuntimeResult<(RepairResult, JournalInitializer, SdssFile<J::Spec>)> {
|
|
|
|
|
let reader = TrackedReader::with_cursor(
|
|
|
|
|
file,
|
|
|
|
|
<<J as RawJournalAdapter>::Spec as FileSpecV1>::SIZE as u64,
|
|
|
|
|
)?;
|
|
|
|
|
jtrace_reader!(Initialized);
|
|
|
|
|
let mut me = Self::new(reader, 0, 0, 0, 0, settings);
|
|
|
|
|
match me._scroll(gs) {
|
|
|
|
|
Ok(init) => return Ok((RepairResult::NoErrors, init, me.tr.into_inner())),
|
|
|
|
|
Err(e) => me.start_repair(e, repair_mode),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
fn start_repair(
|
|
|
|
|
self,
|
|
|
|
|
e: Error,
|
|
|
|
|
repair_mode: JournalRepairMode,
|
|
|
|
|
) -> RuntimeResult<(RepairResult, JournalInitializer, SdssFile<J::Spec>)> {
|
|
|
|
|
let lost = self.tr.cached_size() - self.tr.cursor();
|
|
|
|
|
let mut repair_result = RepairResult::LostBytes(lost);
|
|
|
|
|
match repair_mode {
|
|
|
|
|
JournalRepairMode::Simple => {}
|
|
|
|
|
}
|
|
|
|
|
match Self::scroll(file, gs, settings) {
|
|
|
|
|
Ok(_) => {
|
|
|
|
|
// no error detected
|
|
|
|
|
return Ok(());
|
|
|
|
|
}
|
|
|
|
|
Err(e) => {
|
|
|
|
|
// now it's our task to determine exactly what happened
|
|
|
|
|
match e.kind() {
|
|
|
|
|
ErrorKind::IoError(io) => match io.kind() {
|
|
|
|
|
IoErrorKind::UnexpectedEof => {
|
|
|
|
|
/*
|
|
|
|
|
this is the only kind of error that we can actually repair since it indicates that a part of the
|
|
|
|
|
file is "missing." we can't deal with things like permission errors. that's supposed to be handled
|
|
|
|
|
by the admin by looking through the error logs
|
|
|
|
|
*/
|
|
|
|
|
}
|
|
|
|
|
_ => return Err(e),
|
|
|
|
|
},
|
|
|
|
|
ErrorKind::Storage(e) => match e {
|
|
|
|
|
// unreachable errors (no execution path here)
|
|
|
|
|
StorageError::RawJournalRuntimeHeartbeatFail // can't reach runtime error before driver start
|
|
|
|
|
| StorageError::RawJournalRuntimeDirty
|
|
|
|
|
| StorageError::FileDecodeHeaderVersionMismatch // should be caught earlier
|
|
|
|
|
| StorageError::FileDecodeHeaderCorrupted // should be caught earlier
|
|
|
|
|
| StorageError::V1JournalDecodeLogEntryCorrupted // v1 errors can't be raised here
|
|
|
|
|
| StorageError::V1JournalDecodeCorrupted
|
|
|
|
|
| StorageError::V1DataBatchDecodeCorruptedBatch
|
|
|
|
|
| StorageError::V1DataBatchDecodeCorruptedEntry
|
|
|
|
|
| StorageError::V1DataBatchDecodeCorruptedBatchFile
|
|
|
|
|
| StorageError::V1SysDBDecodeCorrupted
|
|
|
|
|
| StorageError::V1DataBatchRuntimeCloseError => unreachable!(),
|
|
|
|
|
// possible errors
|
|
|
|
|
StorageError::InternalDecodeStructureCorrupted
|
|
|
|
|
| StorageError::InternalDecodeStructureCorruptedPayload
|
|
|
|
|
| StorageError::InternalDecodeStructureIllegalData
|
|
|
|
|
| StorageError::RawJournalDecodeEventCorruptedMetadata
|
|
|
|
|
| StorageError::RawJournalDecodeEventCorruptedPayload
|
|
|
|
|
| StorageError::RawJournalDecodeBatchContentsMismatch
|
|
|
|
|
| StorageError::RawJournalDecodeBatchIntegrityFailure
|
|
|
|
|
| StorageError::RawJournalDecodeInvalidEvent
|
|
|
|
|
| StorageError::RawJournalDecodeCorruptionInBatchMetadata => {}
|
|
|
|
|
},
|
|
|
|
|
ErrorKind::Txn(txerr) => match txerr {
|
|
|
|
|
// unreachable errors
|
|
|
|
|
TransactionError::V1DecodeCorruptedPayloadMoreBytes // no v1 errors
|
|
|
|
|
| TransactionError::V1DecodedUnexpectedEof
|
|
|
|
|
| TransactionError::V1DecodeUnknownTxnOp => unreachable!(),
|
|
|
|
|
// possible errors
|
|
|
|
|
TransactionError::OnRestoreDataConflictAlreadyExists |
|
|
|
|
|
TransactionError::OnRestoreDataMissing |
|
|
|
|
|
TransactionError::OnRestoreDataConflictMismatch => {},
|
|
|
|
|
},
|
|
|
|
|
// these errors do not have an execution pathway
|
|
|
|
|
ErrorKind::Other(_) => unreachable!(),
|
|
|
|
|
ErrorKind::Config(_) => unreachable!(),
|
|
|
|
|
// now it's our task to determine exactly what happened
|
|
|
|
|
match e.kind() {
|
|
|
|
|
ErrorKind::IoError(io) => match io.kind() {
|
|
|
|
|
IoErrorKind::UnexpectedEof => {
|
|
|
|
|
/*
|
|
|
|
|
this is the only kind of error that we can actually repair since it indicates that a part of the
|
|
|
|
|
file is "missing." we can't deal with things like permission errors. that's supposed to be handled
|
|
|
|
|
by the admin by looking through the error logs
|
|
|
|
|
*/
|
|
|
|
|
}
|
|
|
|
|
_ => return Err(e),
|
|
|
|
|
},
|
|
|
|
|
ErrorKind::Storage(e) => match e {
|
|
|
|
|
// unreachable errors (no execution path here)
|
|
|
|
|
StorageError::RawJournalRuntimeHeartbeatFail // can't reach runtime error before driver start
|
|
|
|
|
| StorageError::RawJournalRuntimeDirty
|
|
|
|
|
| StorageError::FileDecodeHeaderVersionMismatch // should be caught earlier
|
|
|
|
|
| StorageError::FileDecodeHeaderCorrupted // should be caught earlier
|
|
|
|
|
| StorageError::V1JournalDecodeLogEntryCorrupted // v1 errors can't be raised here
|
|
|
|
|
| StorageError::V1JournalDecodeCorrupted
|
|
|
|
|
| StorageError::V1DataBatchDecodeCorruptedBatch
|
|
|
|
|
| StorageError::V1DataBatchDecodeCorruptedEntry
|
|
|
|
|
| StorageError::V1DataBatchDecodeCorruptedBatchFile
|
|
|
|
|
| StorageError::V1SysDBDecodeCorrupted
|
|
|
|
|
| StorageError::V1DataBatchRuntimeCloseError => unreachable!(),
|
|
|
|
|
// possible errors
|
|
|
|
|
StorageError::InternalDecodeStructureCorrupted
|
|
|
|
|
| StorageError::InternalDecodeStructureCorruptedPayload
|
|
|
|
|
| StorageError::InternalDecodeStructureIllegalData
|
|
|
|
|
| StorageError::RawJournalDecodeEventCorruptedMetadata
|
|
|
|
|
| StorageError::RawJournalDecodeEventCorruptedPayload
|
|
|
|
|
| StorageError::RawJournalDecodeBatchContentsMismatch
|
|
|
|
|
| StorageError::RawJournalDecodeBatchIntegrityFailure
|
|
|
|
|
| StorageError::RawJournalDecodeInvalidEvent
|
|
|
|
|
| StorageError::RawJournalDecodeCorruptionInBatchMetadata => {}
|
|
|
|
|
},
|
|
|
|
|
ErrorKind::Txn(txerr) => match txerr {
|
|
|
|
|
// unreachable errors
|
|
|
|
|
TransactionError::V1DecodeCorruptedPayloadMoreBytes // no v1 errors
|
|
|
|
|
| TransactionError::V1DecodedUnexpectedEof
|
|
|
|
|
| TransactionError::V1DecodeUnknownTxnOp => unreachable!(),
|
|
|
|
|
// possible errors
|
|
|
|
|
TransactionError::OnRestoreDataConflictAlreadyExists |
|
|
|
|
|
TransactionError::OnRestoreDataMissing |
|
|
|
|
|
TransactionError::OnRestoreDataConflictMismatch => {},
|
|
|
|
|
},
|
|
|
|
|
// these errors do not have an execution pathway
|
|
|
|
|
ErrorKind::Other(_) => unreachable!(),
|
|
|
|
|
ErrorKind::Config(_) => unreachable!(),
|
|
|
|
|
}
|
|
|
|
|
/*
|
|
|
|
|
revert log. record previous signatures.
|
|
|
|
|
*/
|
|
|
|
|
l!(let known_event_id, known_event_offset, known_event_checksum = self.last_txn_id, self.last_txn_offset, self.last_txn_checksum);
|
|
|
|
|
let mut last_logged_checksum = self.tr.checksum();
|
|
|
|
|
let was_eof = self.tr.is_eof();
|
|
|
|
|
let mut base_log = self.tr.into_inner();
|
|
|
|
|
base_log.truncate(known_event_offset)?;
|
|
|
|
|
/*
|
|
|
|
|
see what needs to be done next
|
|
|
|
|
*/
|
|
|
|
|
match self.state {
|
|
|
|
|
JournalState::AwaitingEvent
|
|
|
|
|
| JournalState::AwaitingServerEvent
|
|
|
|
|
| JournalState::AwaitingClose => {
|
|
|
|
|
/*
|
|
|
|
|
no matter what the last event was (and definitely not a close since if we are expecting a close the log was not already closed),
|
|
|
|
|
the log is in a dirty state that can only be resolved by closing it
|
|
|
|
|
*/
|
|
|
|
|
let drv_close = DriverEvent::new(
|
|
|
|
|
(known_event_id + 1) as u128,
|
|
|
|
|
DriverEventKind::Closed,
|
|
|
|
|
known_event_checksum,
|
|
|
|
|
known_event_offset,
|
|
|
|
|
known_event_id,
|
|
|
|
|
);
|
|
|
|
|
if matches!(self.state, JournalState::AwaitingClose) & was_eof {
|
|
|
|
|
// we reached eof and we were expecting a close. definitely lost an unspecified number of bytes
|
|
|
|
|
repair_result = RepairResult::UnspecifiedLoss(lost);
|
|
|
|
|
}
|
|
|
|
|
let drv_close_event = drv_close.encode_self();
|
|
|
|
|
last_logged_checksum.update(&drv_close_event);
|
|
|
|
|
base_log.fsynced_write(&drv_close_event)?;
|
|
|
|
|
}
|
|
|
|
|
JournalState::AwaitingReopen => {
|
|
|
|
|
// extra bytes indicating low to severe corruption; last event is a close, so with the revert the log is now clean
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
todo!()
|
|
|
|
|
let jinit_cursor = known_event_offset + DriverEvent::FULL_EVENT_SIZE as u64;
|
|
|
|
|
let jinit_last_txn_offset = jinit_cursor; // same as cursor
|
|
|
|
|
let jinit_event_id = known_event_id + 2; // since we already used +1
|
|
|
|
|
let jinit_checksum = last_logged_checksum;
|
|
|
|
|
Ok((
|
|
|
|
|
repair_result,
|
|
|
|
|
JournalInitializer::new(
|
|
|
|
|
jinit_cursor,
|
|
|
|
|
jinit_checksum,
|
|
|
|
|
jinit_event_id,
|
|
|
|
|
jinit_last_txn_offset,
|
|
|
|
|
),
|
|
|
|
|
base_log,
|
|
|
|
|
))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -823,6 +932,7 @@ impl<J: RawJournalAdapter> RawJournalReader<J> {
|
|
|
|
|
// check for a server event
|
|
|
|
|
// is this a server event?
|
|
|
|
|
if meta & SERVER_EV_MASK != 0 {
|
|
|
|
|
self.state = JournalState::AwaitingServerEvent;
|
|
|
|
|
jtrace_reader!(DetectedServerEvent);
|
|
|
|
|
let meta = meta & !SERVER_EV_MASK;
|
|
|
|
|
match J::parse_event_meta(meta) {
|
|
|
|
@ -844,6 +954,7 @@ impl<J: RawJournalAdapter> RawJournalReader<J> {
|
|
|
|
|
None => return Err(StorageError::RawJournalDecodeEventCorruptedMetadata.into()),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
self.state = JournalState::AwaitingClose;
|
|
|
|
|
return self.handle_close(txn_id, meta);
|
|
|
|
|
}
|
|
|
|
|
fn handle_close(
|
|
|
|
@ -899,6 +1010,7 @@ impl<J: RawJournalAdapter> RawJournalReader<J> {
|
|
|
|
|
// yes, we're done
|
|
|
|
|
return Ok(true);
|
|
|
|
|
}
|
|
|
|
|
self.state = JournalState::AwaitingReopen;
|
|
|
|
|
return self.handle_reopen();
|
|
|
|
|
}
|
|
|
|
|
fn handle_reopen(&mut self) -> RuntimeResult<bool> {
|
|
|
|
|