From 20c937451f3b2993b75a3b14b596f0de5ee4438f Mon Sep 17 00:00:00 2001 From: Sayan Nandan Date: Sun, 3 Sep 2023 18:20:38 +0000 Subject: [PATCH] Implement batched persistence system --- server/src/engine/core/dml/del.rs | 2 +- server/src/engine/core/dml/ins.rs | 8 +- server/src/engine/core/dml/upd.rs | 8 +- server/src/engine/core/index/key.rs | 45 ++ server/src/engine/core/index/row.rs | 13 +- server/src/engine/core/mod.rs | 4 +- server/src/engine/core/model/delta.rs | 44 +- server/src/engine/core/model/mod.rs | 2 +- server/src/engine/data/tag.rs | 26 + server/src/engine/storage/checksum.rs | 58 +++ server/src/engine/storage/mod.rs | 3 + .../src/engine/storage/v1/batch_jrnl/mod.rs | 45 ++ .../engine/storage/v1/batch_jrnl/persist.rs | 290 ++++++++++++ .../engine/storage/v1/batch_jrnl/restore.rs | 445 ++++++++++++++++++ .../src/engine/storage/v1/header_impl/mod.rs | 10 +- server/src/engine/storage/v1/inf/map.rs | 32 +- server/src/engine/storage/v1/inf/mod.rs | 12 +- server/src/engine/storage/v1/inf/obj.rs | 4 +- server/src/engine/storage/v1/journal.rs | 52 +- server/src/engine/storage/v1/mod.rs | 11 + server/src/engine/storage/v1/rw.rs | 137 +++++- server/src/engine/storage/v1/test_util.rs | 9 +- server/src/engine/storage/v1/tests.rs | 237 +--------- server/src/engine/storage/v1/tests/batch.rs | 210 +++++++++ server/src/engine/storage/v1/tests/rw.rs | 68 +++ server/src/engine/storage/v1/tests/tx.rs | 210 +++++++++ server/src/engine/txn/data.rs | 33 +- server/src/engine/txn/mod.rs | 1 + server/src/util/os.rs | 6 + 29 files changed, 1684 insertions(+), 341 deletions(-) create mode 100644 server/src/engine/storage/checksum.rs create mode 100644 server/src/engine/storage/v1/batch_jrnl/mod.rs create mode 100644 server/src/engine/storage/v1/batch_jrnl/persist.rs create mode 100644 server/src/engine/storage/v1/batch_jrnl/restore.rs create mode 100644 server/src/engine/storage/v1/tests/batch.rs create mode 100644 server/src/engine/storage/v1/tests/rw.rs create mode 100644 server/src/engine/storage/v1/tests/tx.rs diff --git a/server/src/engine/core/dml/del.rs b/server/src/engine/core/dml/del.rs index 4deab3a5..88d0ba5f 100644 --- a/server/src/engine/core/dml/del.rs +++ b/server/src/engine/core/dml/del.rs @@ -45,7 +45,7 @@ pub fn delete(gns: &GlobalNS, mut delete: DeleteStatement) -> DatabaseResult<()> .mt_delete_return_entry(&model.resolve_where(delete.clauses_mut())?, &g) { Some(row) => { - delta_state.append_new_data_delta( + delta_state.append_new_data_delta_with( DataDeltaKind::Delete, row.clone(), schema_version, diff --git a/server/src/engine/core/dml/ins.rs b/server/src/engine/core/dml/ins.rs index 75d37bd1..f93fa556 100644 --- a/server/src/engine/core/dml/ins.rs +++ b/server/src/engine/core/dml/ins.rs @@ -43,15 +43,15 @@ pub fn insert(gns: &GlobalNS, insert: InsertStatement) -> DatabaseResult<()> { let g = cpin(); let ds = mdl.delta_state(); // create new version - let cv = ds.create_new_data_delta_version(); - let row = Row::new(pk, data, ds.schema_current_version(), cv); + let new_version = ds.create_new_data_delta_version(); + let row = Row::new(pk, data, ds.schema_current_version(), new_version); if mdl.primary_index().__raw_index().mt_insert(row.clone(), &g) { // append delta for new version - ds.append_new_data_delta( + ds.append_new_data_delta_with( DataDeltaKind::Insert, row, ds.schema_current_version(), - cv, + new_version, &g, ); Ok(()) diff --git a/server/src/engine/core/dml/upd.rs b/server/src/engine/core/dml/upd.rs index 3077d4b3..45b62532 100644 --- a/server/src/engine/core/dml/upd.rs +++ b/server/src/engine/core/dml/upd.rs @@ -248,7 +248,7 @@ pub fn update(gns: &GlobalNS, mut update: UpdateStatement) -> DatabaseResult<()> let mut row_data_wl = row.d_data().write(); // create new version let ds = mdl.delta_state(); - let cv = ds.create_new_data_delta_version(); + let new_version = ds.create_new_data_delta_version(); // process changes let mut rollback_now = false; let mut rollback_data = Vec::with_capacity(update.expressions().len()); @@ -341,13 +341,13 @@ pub fn update(gns: &GlobalNS, mut update: UpdateStatement) -> DatabaseResult<()> }); } else { // update revised tag - row_data_wl.set_txn_revised(cv); + row_data_wl.set_txn_revised(new_version); // publish delta - ds.append_new_data_delta( + ds.append_new_data_delta_with( DataDeltaKind::Update, row.clone(), ds.schema_current_version(), - cv, + new_version, &g, ); } diff --git a/server/src/engine/core/index/key.rs b/server/src/engine/core/index/key.rs index 347a2c6a..470168c9 100644 --- a/server/src/engine/core/index/key.rs +++ b/server/src/engine/core/index/key.rs @@ -24,6 +24,7 @@ * */ +use crate::engine::mem::ZERO_BLOCK; #[cfg(test)] use crate::{engine::data::spec::Dataspec1D, util::test_utils}; use { @@ -50,6 +51,29 @@ pub struct PrimaryIndexKey { data: SpecialPaddedWord, } +impl Clone for PrimaryIndexKey { + fn clone(&self) -> Self { + match self.tag { + TagUnique::SignedInt | TagUnique::UnsignedInt => { + let (qw, nw) = self.data.dwordqn_load_qw_nw(); + unsafe { + let slice = slice::from_raw_parts(nw as *const u8, qw as _); + let mut data = ManuallyDrop::new(slice.to_owned().into_boxed_slice()); + Self { + tag: self.tag, + data: SpecialPaddedWord::new(qw, data.as_mut_ptr() as usize), + } + } + } + TagUnique::Bin | TagUnique::Str => Self { + tag: self.tag, + data: unsafe { core::mem::transmute_copy(&self.data) }, + }, + _ => unreachable!(), + } + } +} + impl PrimaryIndexKey { pub fn tag(&self) -> TagUnique { self.tag @@ -127,6 +151,27 @@ impl PrimaryIndexKey { }, } } + /// Create a new quadword based primary key + pub unsafe fn new_from_qw(tag: TagUnique, qw: u64) -> Self { + debug_assert!(tag == TagUnique::SignedInt || tag == TagUnique::UnsignedInt); + Self { + tag, + data: unsafe { + // UNSAFE(@ohsayan): manually choosing block + SpecialPaddedWord::new(qw, ZERO_BLOCK.as_ptr() as usize) + }, + } + } + pub unsafe fn new_from_dual(tag: TagUnique, qw: u64, ptr: usize) -> Self { + debug_assert!(tag == TagUnique::Str || tag == TagUnique::Bin); + Self { + tag, + data: unsafe { + // UNSAFE(@ohsayan): manually choosing qw and nw + SpecialPaddedWord::new(qw, ptr) + }, + } + } pub unsafe fn raw_clone(&self) -> Self { Self { tag: self.tag, diff --git a/server/src/engine/core/index/row.rs b/server/src/engine/core/index/row.rs index 5ad2d636..f47861a8 100644 --- a/server/src/engine/core/index/row.rs +++ b/server/src/engine/core/index/row.rs @@ -139,11 +139,16 @@ impl Row { } impl Row { - pub fn resolve_schema_deltas_and_freeze<'g>( + /// Only apply deltas if a certain condition is met + pub fn resolve_schema_deltas_and_freeze_if<'g>( &'g self, delta_state: &DeltaState, + iff: impl Fn(&RowData) -> bool, ) -> RwLockReadGuard<'g, RowData> { let rwl_ug = self.d_data().upgradable_read(); + if !iff(&rwl_ug) { + return RwLockUpgradableReadGuard::downgrade(rwl_ug); + } let current_version = delta_state.schema_current_version(); if compiler::likely(current_version <= rwl_ug.txn_revised_schema_version) { return RwLockUpgradableReadGuard::downgrade(rwl_ug); @@ -167,6 +172,12 @@ impl Row { wl.txn_revised_schema_version = max_delta; return RwLockWriteGuard::downgrade(wl); } + pub fn resolve_schema_deltas_and_freeze<'g>( + &'g self, + delta_state: &DeltaState, + ) -> RwLockReadGuard<'g, RowData> { + self.resolve_schema_deltas_and_freeze_if(delta_state, |_| true) + } } impl Clone for Row { diff --git a/server/src/engine/core/mod.rs b/server/src/engine/core/mod.rs index 34a0427f..340df67b 100644 --- a/server/src/engine/core/mod.rs +++ b/server/src/engine/core/mod.rs @@ -24,7 +24,7 @@ * */ -mod dml; +pub(in crate::engine) mod dml; pub(in crate::engine) mod index; pub(in crate::engine) mod model; pub(in crate::engine) mod query_meta; @@ -32,7 +32,7 @@ pub mod space; mod util; // test #[cfg(test)] -mod tests; +pub(super) mod tests; // imports use { self::{model::Model, util::EntityLocator}, diff --git a/server/src/engine/core/model/delta.rs b/server/src/engine/core/model/delta.rs index 4265a4c9..d3c6720b 100644 --- a/server/src/engine/core/model/delta.rs +++ b/server/src/engine/core/model/delta.rs @@ -30,7 +30,7 @@ use { parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard}, std::{ collections::btree_map::{BTreeMap, Range}, - sync::atomic::{AtomicU64, Ordering}, + sync::atomic::{AtomicU64, AtomicUsize, Ordering}, }, }; @@ -153,6 +153,7 @@ pub struct DeltaState { // data data_current_version: AtomicU64, data_deltas: Queue, + data_deltas_size: AtomicUsize, } impl DeltaState { @@ -163,13 +164,14 @@ impl DeltaState { schema_deltas: RwLock::new(BTreeMap::new()), data_current_version: AtomicU64::new(0), data_deltas: Queue::new(), + data_deltas_size: AtomicUsize::new(0), } } } // data impl DeltaState { - pub fn append_new_data_delta( + pub fn append_new_data_delta_with( &self, kind: DataDeltaKind, row: Row, @@ -177,18 +179,27 @@ impl DeltaState { data_version: DeltaVersion, g: &Guard, ) { - self.data_deltas - .blocking_enqueue(DataDelta::new(schema_version, data_version, row, kind), g); + self.append_new_data_delta(DataDelta::new(schema_version, data_version, row, kind), g); + } + pub fn append_new_data_delta(&self, delta: DataDelta, g: &Guard) { + self.data_deltas.blocking_enqueue(delta, g); + self.data_deltas_size.fetch_add(1, Ordering::Release); } pub fn create_new_data_delta_version(&self) -> DeltaVersion { DeltaVersion(self.__data_delta_step()) } + pub fn get_data_delta_size(&self) -> usize { + self.data_deltas_size.load(Ordering::Acquire) + } } impl DeltaState { - pub fn __data_delta_step(&self) -> u64 { + fn __data_delta_step(&self) -> u64 { self.data_current_version.fetch_add(1, Ordering::AcqRel) } + pub fn __data_delta_queue(&self) -> &Queue { + &self.data_deltas + } } // schema @@ -314,7 +325,7 @@ impl<'a> SchemaDeltaIndexRGuard<'a> { data delta */ -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct DataDelta { schema_version: DeltaVersion, data_version: DeltaVersion, @@ -336,11 +347,24 @@ impl DataDelta { change, } } + pub fn schema_version(&self) -> DeltaVersion { + self.schema_version + } + pub fn data_version(&self) -> DeltaVersion { + self.data_version + } + pub fn row(&self) -> &Row { + &self.row + } + pub fn change(&self) -> DataDeltaKind { + self.change + } } -#[derive(Debug)] +#[derive(Debug, Clone, Copy, sky_macros::EnumMethods, PartialEq)] +#[repr(u8)] pub enum DataDeltaKind { - Insert, - Update, - Delete, + Delete = 0, + Insert = 1, + Update = 2, } diff --git a/server/src/engine/core/model/mod.rs b/server/src/engine/core/model/mod.rs index 82e4ddf3..13f7137e 100644 --- a/server/src/engine/core/model/mod.rs +++ b/server/src/engine/core/model/mod.rs @@ -52,7 +52,7 @@ use { std::cell::UnsafeCell, }; -pub(in crate::engine::core) use self::delta::{SchemaDeltaKind, DeltaState, DeltaVersion}; +pub(in crate::engine::core) use self::delta::{DeltaState, DeltaVersion, SchemaDeltaKind}; pub(in crate::engine) type Fields = IndexSTSeqCns, Field>; #[derive(Debug)] diff --git a/server/src/engine/data/tag.rs b/server/src/engine/data/tag.rs index 53c4067e..00bee7c8 100644 --- a/server/src/engine/data/tag.rs +++ b/server/src/engine/data/tag.rs @@ -41,6 +41,26 @@ impl TagClass { pub const fn max() -> usize { Self::List.d() as _ } + pub const fn try_from_raw(v: u8) -> Option { + if v > Self::List.d() { + return None; + } + Some(unsafe { Self::from_raw(v) }) + } + pub const unsafe fn from_raw(v: u8) -> Self { + core::mem::transmute(v) + } + pub const fn tag_unique(&self) -> TagUnique { + [ + TagUnique::Illegal, + TagUnique::UnsignedInt, + TagUnique::SignedInt, + TagUnique::Illegal, + TagUnique::Bin, + TagUnique::Str, + TagUnique::Illegal, + ][self.d() as usize] + } } #[repr(u8)] @@ -124,6 +144,12 @@ impl TagUnique { pub const fn is_unique(&self) -> bool { self.d() != Self::Illegal.d() } + pub const fn try_from_raw(raw: u8) -> Option { + if raw > 3 { + return None; + } + Some(unsafe { core::mem::transmute(raw) }) + } } macro_rules! d { diff --git a/server/src/engine/storage/checksum.rs b/server/src/engine/storage/checksum.rs new file mode 100644 index 00000000..aa32c3d4 --- /dev/null +++ b/server/src/engine/storage/checksum.rs @@ -0,0 +1,58 @@ +/* + * Created on Sun Sep 03 2023 + * + * This file is a part of Skytable + * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source + * NoSQL database written by Sayan Nandan ("the Author") with the + * vision to provide flexibility in data modelling without compromising + * on performance, queryability or scalability. + * + * Copyright (c) 2023, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +use crc::{Crc, Digest, CRC_64_XZ}; + +/* + NOTE(@ohsayan): we're currently using crc's impl. but the reason I decided to make a wrapper is because I have a + different impl in mind +*/ + +const CRC64: Crc = Crc::::new(&CRC_64_XZ); + +pub struct SCrc { + digest: Digest<'static, u64>, +} + +impl SCrc { + pub const fn new() -> Self { + Self { + digest: CRC64.digest(), + } + } + pub fn recompute_with_new_byte(&mut self, b: u8) { + self.digest.update(&[b]) + } + pub fn recompute_with_new_block(&mut self, b: [u8; N]) { + self.digest.update(&b); + } + pub fn recompute_with_new_var_block(&mut self, b: &[u8]) { + self.digest.update(b) + } + pub fn finish(self) -> u64 { + self.digest.finalize() + } +} diff --git a/server/src/engine/storage/mod.rs b/server/src/engine/storage/mod.rs index c656c894..6f0d2a15 100644 --- a/server/src/engine/storage/mod.rs +++ b/server/src/engine/storage/mod.rs @@ -26,7 +26,10 @@ //! Implementations of the Skytable Disk Storage Subsystem (SDSS) +mod checksum; mod header; mod versions; // impls pub mod v1; + +pub use checksum::SCrc; diff --git a/server/src/engine/storage/v1/batch_jrnl/mod.rs b/server/src/engine/storage/v1/batch_jrnl/mod.rs new file mode 100644 index 00000000..68b32d4b --- /dev/null +++ b/server/src/engine/storage/v1/batch_jrnl/mod.rs @@ -0,0 +1,45 @@ +/* + * Created on Sun Sep 03 2023 + * + * This file is a part of Skytable + * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source + * NoSQL database written by Sayan Nandan ("the Author") with the + * vision to provide flexibility in data modelling without compromising + * on performance, queryability or scalability. + * + * Copyright (c) 2023, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +mod persist; +mod restore; + +/// the data batch file was reopened +const MARKER_BATCH_REOPEN: u8 = 0xFB; +/// the data batch file was closed +const MARKER_BATCH_CLOSED: u8 = 0xFC; +/// end of batch marker +const MARKER_END_OF_BATCH: u8 = 0xFD; +/// "real" batch event marker +const MARKER_ACTUAL_BATCH_EVENT: u8 = 0xFE; +/// recovery batch event marker +const MARKER_RECOVERY_EVENT: u8 = 0xFF; +/// recovery threshold +const RECOVERY_THRESHOLD: usize = 10; + +#[cfg(test)] +pub(super) use restore::{DecodedBatchEvent, DecodedBatchEventKind, NormalBatch}; +pub use {persist::DataBatchPersistDriver, restore::DataBatchRestoreDriver}; diff --git a/server/src/engine/storage/v1/batch_jrnl/persist.rs b/server/src/engine/storage/v1/batch_jrnl/persist.rs new file mode 100644 index 00000000..1dcf824e --- /dev/null +++ b/server/src/engine/storage/v1/batch_jrnl/persist.rs @@ -0,0 +1,290 @@ +/* + * Created on Tue Sep 05 2023 + * + * This file is a part of Skytable + * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source + * NoSQL database written by Sayan Nandan ("the Author") with the + * vision to provide flexibility in data modelling without compromising + * on performance, queryability or scalability. + * + * Copyright (c) 2023, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +use { + super::{ + MARKER_ACTUAL_BATCH_EVENT, MARKER_BATCH_CLOSED, MARKER_BATCH_REOPEN, MARKER_END_OF_BATCH, + MARKER_RECOVERY_EVENT, + }, + crate::{ + engine::{ + core::{ + index::{PrimaryIndexKey, RowData}, + model::{ + delta::{DataDelta, DataDeltaKind, DeltaVersion, IRModel}, + Model, + }, + }, + data::{ + cell::Datacell, + tag::{DataTag, TagClass, TagUnique}, + }, + idx::STIndexSeq, + storage::v1::{ + inf::PersistTypeDscr, + rw::{RawFileIOInterface, SDSSFileIO, SDSSFileTrackedWriter}, + SDSSError, SDSSResult, + }, + }, + util::EndianQW, + }, + crossbeam_epoch::pin, +}; + +pub struct DataBatchPersistDriver { + f: SDSSFileTrackedWriter, +} + +impl DataBatchPersistDriver { + pub fn new(mut file: SDSSFileIO, is_new: bool) -> SDSSResult { + if !is_new { + file.fsynced_write(&[MARKER_BATCH_REOPEN])?; + } + Ok(Self { + f: SDSSFileTrackedWriter::new(file), + }) + } + pub fn close(mut self) -> SDSSResult<()> { + if self + .f + .inner_file() + .fsynced_write(&[MARKER_BATCH_CLOSED]) + .is_ok() + { + return Ok(()); + } else { + return Err(SDSSError::DataBatchCloseError); + } + } + pub fn write_new_batch(&mut self, model: &Model, observed_len: usize) -> SDSSResult<()> { + // pin model + let irm = model.intent_read_model(); + let schema_version = model.delta_state().schema_current_version(); + let data_q = model.delta_state().__data_delta_queue(); + let g = pin(); + // init restore list + let mut restore_list = Vec::new(); + // prepare computations + let mut i = 0; + let mut inconsistent_reads = 0; + let mut exec = || -> SDSSResult<()> { + // write batch start + self.write_batch_start(observed_len, schema_version)?; + while i < observed_len { + let delta = data_q.blocking_try_dequeue(&g).unwrap(); + restore_list.push(delta.clone()); // TODO(@ohsayan): avoid this + match delta.change() { + DataDeltaKind::Delete => { + self.write_batch_item_common_row_data(&delta)?; + self.encode_pk_only(delta.row().d_key())?; + } + DataDeltaKind::Insert | DataDeltaKind::Update => { + // resolve deltas (this is yet another opportunity for us to reclaim memory from deleted items) + let row_data = delta + .row() + .resolve_schema_deltas_and_freeze_if(&model.delta_state(), |row| { + row.get_txn_revised() <= delta.data_version() + }); + if row_data.get_txn_revised() > delta.data_version() { + // we made an inconsistent (stale) read; someone updated the state after our snapshot + inconsistent_reads += 1; + i += 1; + continue; + } + self.write_batch_item_common_row_data(&delta)?; + // encode data + self.encode_pk_only(delta.row().d_key())?; + self.encode_row_data(model, &irm, &row_data)?; + } + } + // fsync now; we're good to go + self.f.fsync_all()?; + i += 1; + } + return self.append_batch_summary(observed_len, inconsistent_reads); + }; + match exec() { + Ok(()) => Ok(()), + Err(_) => { + // republish changes since we failed to commit + restore_list + .into_iter() + .for_each(|delta| model.delta_state().append_new_data_delta(delta, &g)); + // now attempt to fix the file + return self.attempt_fix_data_batchfile(); + } + } + } + /// Write the batch start block: + /// - Batch start magic + /// - Expected commit + /// - Schema version + fn write_batch_start( + &mut self, + observed_len: usize, + schema_version: DeltaVersion, + ) -> Result<(), SDSSError> { + self.f.unfsynced_write(&[MARKER_ACTUAL_BATCH_EVENT])?; + let observed_len_bytes = observed_len.u64_bytes_le(); + self.f.unfsynced_write(&observed_len_bytes)?; + self.f + .unfsynced_write(&schema_version.value_u64().to_le_bytes())?; + Ok(()) + } + /// Append a summary of this batch + fn append_batch_summary( + &mut self, + observed_len: usize, + inconsistent_reads: usize, + ) -> Result<(), SDSSError> { + // [0xFD][actual_commit][checksum] + self.f.unfsynced_write(&[MARKER_END_OF_BATCH])?; + let actual_commit = (observed_len - inconsistent_reads).u64_bytes_le(); + self.f.unfsynced_write(&actual_commit)?; + let cs = self.f.reset_and_finish_checksum().to_le_bytes(); + self.f.inner_file().fsynced_write(&cs)?; + Ok(()) + } + /// Attempt to fix the batch journal + // TODO(@ohsayan): declare an "international system disaster" when this happens + fn attempt_fix_data_batchfile(&mut self) -> SDSSResult<()> { + /* + attempt to append 0xFF to the part of the file where a corruption likely occurred, marking + it recoverable + */ + let f = self.f.inner_file(); + if f.fsynced_write(&[MARKER_RECOVERY_EVENT]).is_ok() { + return Ok(()); + } + Err(SDSSError::DataBatchRecoveryFailStageOne) + } +} + +impl DataBatchPersistDriver { + /// encode the primary key only. this means NO TAG is encoded. + fn encode_pk_only(&mut self, pk: &PrimaryIndexKey) -> SDSSResult<()> { + let buf = &mut self.f; + match pk.tag() { + TagUnique::UnsignedInt | TagUnique::SignedInt => { + let data = unsafe { + // UNSAFE(@ohsayan): +tagck + pk.read_uint() + } + .to_le_bytes(); + buf.unfsynced_write(&data)?; + } + TagUnique::Str | TagUnique::Bin => { + let slice = unsafe { + // UNSAFE(@ohsayan): +tagck + pk.read_bin() + }; + let slice_l = slice.len().u64_bytes_le(); + buf.unfsynced_write(&slice_l)?; + buf.unfsynced_write(slice)?; + } + TagUnique::Illegal => unsafe { + // UNSAFE(@ohsayan): a pk can't be constructed with illegal + impossible!() + }, + } + Ok(()) + } + /// Encode a single cell + fn encode_cell(&mut self, value: &Datacell) -> SDSSResult<()> { + let ref mut buf = self.f; + buf.unfsynced_write(&[ + PersistTypeDscr::translate_from_class(value.tag().tag_class()).value_u8(), + ])?; + match value.tag().tag_class() { + TagClass::Bool if value.is_null() => {} + TagClass::Bool => { + let bool = unsafe { + // UNSAFE(@ohsayan): +tagck + value.read_bool() + } as u8; + buf.unfsynced_write(&[bool])?; + } + TagClass::SignedInt | TagClass::UnsignedInt | TagClass::Float => { + let chunk = unsafe { + // UNSAFE(@ohsayan): +tagck + value.read_uint() + } + .to_le_bytes(); + buf.unfsynced_write(&chunk)?; + } + TagClass::Str | TagClass::Bin => { + let slice = unsafe { + // UNSAFE(@ohsayan): +tagck + value.read_bin() + }; + let slice_l = slice.len().u64_bytes_le(); + buf.unfsynced_write(&slice_l)?; + buf.unfsynced_write(slice)?; + } + TagClass::List => { + let list = unsafe { + // UNSAFE(@ohsayan): +tagck + value.read_list() + } + .read(); + let list_l = list.len().u64_bytes_le(); + buf.unfsynced_write(&list_l)?; + for item in list.iter() { + self.encode_cell(item)?; + } + } + } + Ok(()) + } + /// Encode row data + fn encode_row_data( + &mut self, + mdl: &Model, + irm: &IRModel, + row_data: &RowData, + ) -> SDSSResult<()> { + // nasty hack; we need to avoid the pk + self.f + .unfsynced_write(&(row_data.fields().len()).to_le_bytes())?; + for field_name in irm.fields().stseq_ord_key() { + match row_data.fields().get(field_name) { + Some(cell) => { + self.encode_cell(cell)?; + } + None if field_name.as_ref() == mdl.p_key() => {} + None => self.f.unfsynced_write(&[0])?, + } + } + Ok(()) + } + fn write_batch_item_common_row_data(&mut self, delta: &DataDelta) -> Result<(), SDSSError> { + let p1_dc_pk_ty = [delta.change().value_u8(), delta.row().d_key().tag().d()]; + self.f.unfsynced_write(&p1_dc_pk_ty)?; + let txn_id = delta.data_version().value_u64().to_le_bytes(); + self.f.unfsynced_write(&txn_id)?; + Ok(()) + } +} diff --git a/server/src/engine/storage/v1/batch_jrnl/restore.rs b/server/src/engine/storage/v1/batch_jrnl/restore.rs new file mode 100644 index 00000000..7e54fe6d --- /dev/null +++ b/server/src/engine/storage/v1/batch_jrnl/restore.rs @@ -0,0 +1,445 @@ +/* + * Created on Tue Sep 05 2023 + * + * This file is a part of Skytable + * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source + * NoSQL database written by Sayan Nandan ("the Author") with the + * vision to provide flexibility in data modelling without compromising + * on performance, queryability or scalability. + * + * Copyright (c) 2023, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +use super::{MARKER_BATCH_CLOSED, MARKER_BATCH_REOPEN}; + +use { + super::{ + MARKER_ACTUAL_BATCH_EVENT, MARKER_END_OF_BATCH, MARKER_RECOVERY_EVENT, RECOVERY_THRESHOLD, + }, + crate::{ + engine::{ + core::{index::PrimaryIndexKey, model::Model}, + data::{ + cell::Datacell, + tag::{CUTag, TagClass, TagUnique}, + }, + storage::v1::{ + inf::PersistTypeDscr, + rw::{RawFileIOInterface, SDSSFileIO, SDSSFileTrackedReader}, + SDSSError, SDSSResult, + }, + }, + util::copy_slice_to_array, + }, + std::mem::ManuallyDrop, +}; + +#[derive(Debug, PartialEq)] +pub(in crate::engine::storage::v1) struct DecodedBatchEvent { + txn_id: u64, + pk: PrimaryIndexKey, + kind: DecodedBatchEventKind, +} + +impl DecodedBatchEvent { + pub(in crate::engine::storage::v1) const fn new( + txn_id: u64, + pk: PrimaryIndexKey, + kind: DecodedBatchEventKind, + ) -> Self { + Self { txn_id, pk, kind } + } +} + +#[derive(Debug, PartialEq)] +pub(in crate::engine::storage::v1) enum DecodedBatchEventKind { + Delete, + Insert(Vec), + Update(Vec), +} + +#[derive(Debug, PartialEq)] +pub(in crate::engine::storage::v1) struct NormalBatch { + events: Vec, + schema_version: u64, +} + +impl NormalBatch { + pub(in crate::engine::storage::v1) fn new( + events: Vec, + schema_version: u64, + ) -> Self { + Self { + events, + schema_version, + } + } +} + +enum Batch { + RecoveredFromerror, + Normal(NormalBatch), + FinishedEarly(NormalBatch), + BatchClosed, +} + +pub struct DataBatchRestoreDriver { + f: SDSSFileTrackedReader, +} + +impl DataBatchRestoreDriver { + pub fn new(f: SDSSFileIO) -> SDSSResult { + Ok(Self { + f: SDSSFileTrackedReader::new(f)?, + }) + } + pub fn into_file(self) -> SDSSFileIO { + self.f.into_inner_file() + } + pub(in crate::engine::storage::v1) fn read_data_batch_into_model( + &mut self, + model: &Model, + ) -> SDSSResult<()> { + self.read_all_batches_and_for_each(|batch| { + // apply the batch + Self::apply_batch(model, batch) + }) + } + pub fn read_all_batches(&mut self) -> SDSSResult> { + let mut all_batches = vec![]; + self.read_all_batches_and_for_each(|batch| { + all_batches.push(batch); + Ok(()) + })?; + Ok(all_batches) + } +} + +impl DataBatchRestoreDriver { + fn read_all_batches_and_for_each( + &mut self, + mut f: impl FnMut(NormalBatch) -> SDSSResult<()>, + ) -> SDSSResult<()> { + // begin + let mut closed = false; + while !self.f.is_eof() && !closed { + // try to decode this batch + let Ok(batch) = self.read_batch() else { + self.attempt_recover_data_batch()?; + continue; + }; + // see what happened when decoding it + let finished_early = matches!(batch, Batch::FinishedEarly { .. }); + let batch = match batch { + Batch::RecoveredFromerror => { + // there was an error, but it was safely "handled" because of a recovery byte mark + continue; + } + Batch::FinishedEarly(batch) | Batch::Normal(batch) => batch, + Batch::BatchClosed => { + // the batch was closed; this means that we probably are done with this round; but was it re-opened? + closed = self.handle_reopen_is_actual_close()?; + continue; + } + }; + // now we need to read the batch summary + let Ok(actual_commit) = self.read_batch_summary(finished_early) else { + self.attempt_recover_data_batch()?; + continue; + }; + // check if we have the expected batch size + if batch.events.len() as u64 != actual_commit { + // corrupted + self.attempt_recover_data_batch()?; + continue; + } + f(batch)?; + // apply the batch + } + if closed { + if self.f.is_eof() { + // that was the last batch + return Ok(()); + } + } + // nope, this is a corrupted file + Err(SDSSError::DataBatchRestoreCorruptedBatchFile) + } + fn handle_reopen_is_actual_close(&mut self) -> SDSSResult { + if self.f.is_eof() { + // yup, it was closed + Ok(true) + } else { + // maybe not + if self.f.read_byte()? == MARKER_BATCH_REOPEN { + // driver was closed, but reopened + Ok(false) + } else { + // that's just a nice bug + Err(SDSSError::DataBatchRestoreCorruptedBatchFile) + } + } + } +} + +impl DataBatchRestoreDriver { + fn apply_batch(_: &Model, _: NormalBatch) -> SDSSResult<()> { + todo!() + } +} + +impl DataBatchRestoreDriver { + fn read_batch_summary(&mut self, finished_early: bool) -> SDSSResult { + if !finished_early { + // we must read the batch termination signature + let b = self.f.read_byte()?; + if b != MARKER_END_OF_BATCH { + return Err(SDSSError::DataBatchRestoreCorruptedBatch); + } + } + // read actual commit + let mut actual_commit = [0; sizeof!(u64)]; + self.f.read_into_buffer(&mut actual_commit)?; + // find actual checksum + let actual_checksum = self.f.__reset_checksum(); + // find hardcoded checksum + let mut hardcoded_checksum = [0; sizeof!(u64)]; + self.f + .inner_file() + .read_to_buffer(&mut hardcoded_checksum)?; + // move file cursor ahead + self.f.__cursor_ahead_by(sizeof!(u64)); + if actual_checksum == u64::from_le_bytes(hardcoded_checksum) { + Ok(u64::from_le_bytes(actual_commit)) + } else { + Err(SDSSError::DataBatchRestoreCorruptedBatch) + } + } + fn read_batch(&mut self) -> SDSSResult { + let mut this_batch = vec![]; + // check batch type + let batch_type = self.f.read_byte()?; + match batch_type { + MARKER_ACTUAL_BATCH_EVENT => {} + MARKER_RECOVERY_EVENT => { + // while attempting to write this batch, some sort of an error occurred but we got a nice recovery byte + // so proceed that way + return Ok(Batch::RecoveredFromerror); + } + MARKER_BATCH_CLOSED => { + // this isn't a batch; it has been closed + return Ok(Batch::BatchClosed); + } + _ => { + // this is the only singular byte that is expected to be intact. If this isn't intact either, I'm sorry + return Err(SDSSError::DataBatchRestoreCorruptedBatch); + } + } + // we're expecting a "good batch" + let mut batch_size_schema_version = [0; sizeof!(u64, 2)]; + self.f.read_into_buffer(&mut batch_size_schema_version)?; + // we have the batch length + let batch_size = u64::from_le_bytes(copy_slice_to_array(&batch_size_schema_version[..8])); + let schema_version = + u64::from_le_bytes(copy_slice_to_array(&batch_size_schema_version[8..])); + let mut processed_in_this_batch = 0; + while (processed_in_this_batch != batch_size) & !self.f.is_eof() { + // decode common row data + let change_type = self.f.read_byte()?; + // now decode event + match change_type { + MARKER_END_OF_BATCH => { + // the file tells us that we've reached the end of this batch; hmmm + return Ok(Batch::FinishedEarly(NormalBatch::new( + this_batch, + schema_version, + ))); + } + normal_event => { + let (pk_type, txnid) = self.read_normal_event_metadata()?; + match normal_event { + 0 => { + // delete + let pk = self.decode_primary_key(pk_type)?; + this_batch.push(DecodedBatchEvent::new( + txnid, + pk, + DecodedBatchEventKind::Delete, + )); + processed_in_this_batch += 1; + } + 1 | 2 => { + // insert or update + // get pk + let pk = self.decode_primary_key(pk_type)?; + // get column count + let mut column_count = [0; sizeof!(u64)]; + self.f.read_into_buffer(&mut column_count)?; + let mut column_count = u64::from_le_bytes(column_count); + // prepare row + let mut row = vec![]; + while column_count != 0 && !self.f.is_eof() { + row.push(self.decode_cell()?); + column_count -= 1; + } + if column_count != 0 { + return Err(SDSSError::DataBatchRestoreCorruptedEntry); + } + if change_type == 1 { + this_batch.push(DecodedBatchEvent::new( + txnid, + pk, + DecodedBatchEventKind::Insert(row), + )); + } else { + this_batch.push(DecodedBatchEvent::new( + txnid, + pk, + DecodedBatchEventKind::Update(row), + )); + } + processed_in_this_batch += 1; + } + _ => { + return Err(SDSSError::DataBatchRestoreCorruptedBatch); + } + } + } + } + } + Ok(Batch::Normal(NormalBatch::new(this_batch, schema_version))) + } + fn read_normal_event_metadata(&mut self) -> Result<(u8, u64), SDSSError> { + let pk_type = self.f.read_byte()?; + let mut txnid = [0; sizeof!(u64)]; + self.f.read_into_buffer(&mut txnid)?; + let txnid = u64::from_le_bytes(txnid); + Ok((pk_type, txnid)) + } + fn attempt_recover_data_batch(&mut self) -> SDSSResult<()> { + let mut max_threshold = RECOVERY_THRESHOLD; + while max_threshold != 0 && self.f.has_left(1) { + if let Ok(MARKER_RECOVERY_EVENT) = self.f.inner_file().read_byte() { + return Ok(()); + } + max_threshold -= 1; + } + Err(SDSSError::DataBatchRestoreCorruptedBatch) + } +} + +impl DataBatchRestoreDriver { + fn decode_primary_key(&mut self, pk_type: u8) -> SDSSResult { + let Some(pk_type) = TagUnique::try_from_raw(pk_type) else { + return Err(SDSSError::DataBatchRestoreCorruptedEntry); + }; + Ok(match pk_type { + TagUnique::SignedInt | TagUnique::UnsignedInt => { + let mut chunk = [0; sizeof!(u64)]; + self.f.read_into_buffer(&mut chunk)?; + unsafe { + // UNSAFE(@ohsayan): +tagck + PrimaryIndexKey::new_from_qw(pk_type, u64::from_le_bytes(chunk)) + } + } + TagUnique::Str | TagUnique::Bin => { + let mut len = [0; sizeof!(u64)]; + self.f.read_into_buffer(&mut len)?; + let mut data = vec![0; u64::from_le_bytes(len) as usize]; + self.f.read_into_buffer(&mut data)?; + if pk_type == TagUnique::Str { + if core::str::from_utf8(&data).is_err() { + return Err(SDSSError::DataBatchRestoreCorruptedEntry); + } + } + unsafe { + // UNSAFE(@ohsayan): +tagck +verityck + let mut md = ManuallyDrop::new(data); + PrimaryIndexKey::new_from_dual( + pk_type, + u64::from_le_bytes(len), + md.as_mut_ptr() as usize, + ) + } + } + _ => unsafe { + // UNSAFE(@ohsayan): TagUnique::try_from_raw rejects an construction with Invalid as the dscr + impossible!() + }, + }) + } + fn decode_cell(&mut self) -> SDSSResult { + let cell_type_sig = self.f.read_byte()?; + let Some(cell_type) = PersistTypeDscr::try_from_raw(cell_type_sig) else { + return Err(SDSSError::DataBatchRestoreCorruptedEntry); + }; + Ok(match cell_type { + PersistTypeDscr::Null => Datacell::null(), + PersistTypeDscr::Bool => { + let bool = self.f.read_byte()?; + if bool > 1 { + return Err(SDSSError::DataBatchRestoreCorruptedEntry); + } + Datacell::new_bool(bool == 1) + } + PersistTypeDscr::UnsignedInt | PersistTypeDscr::SignedInt | PersistTypeDscr::Float => { + let mut block = [0; sizeof!(u64)]; + self.f.read_into_buffer(&mut block)?; + unsafe { + // UNSAFE(@ohsayan): choosing the correct type and tag + let tc = TagClass::from_raw(cell_type.value_u8() - 1); + Datacell::new_qw(u64::from_le_bytes(block), CUTag::new(tc, tc.tag_unique())) + } + } + PersistTypeDscr::Str | PersistTypeDscr::Bin => { + let mut len_block = [0; sizeof!(u64)]; + self.f.read_into_buffer(&mut len_block)?; + let len = u64::from_le_bytes(len_block) as usize; + let mut data = vec![0; len]; + self.f.read_into_buffer(&mut data)?; + unsafe { + // UNSAFE(@ohsayan): +tagck + if cell_type == PersistTypeDscr::Str { + if core::str::from_utf8(&data).is_err() { + return Err(SDSSError::DataBatchRestoreCorruptedEntry); + } + Datacell::new_str(String::from_utf8_unchecked(data).into_boxed_str()) + } else { + Datacell::new_bin(data.into_boxed_slice()) + } + } + } + PersistTypeDscr::List => { + let mut len_block = [0; sizeof!(u64)]; + self.f.read_into_buffer(&mut len_block)?; + let len = u64::from_le_bytes(len_block); + let mut list = Vec::new(); + while !self.f.is_eof() && list.len() as u64 != len { + list.push(self.decode_cell()?); + } + if len != list.len() as u64 { + return Err(SDSSError::DataBatchRestoreCorruptedEntry); + } + Datacell::new_list(list) + } + PersistTypeDscr::Dict => { + // we don't support dicts just yet + return Err(SDSSError::DataBatchRestoreCorruptedEntry); + } + }) + } +} diff --git a/server/src/engine/storage/v1/header_impl/mod.rs b/server/src/engine/storage/v1/header_impl/mod.rs index abec2a20..b8e1ae1f 100644 --- a/server/src/engine/storage/v1/header_impl/mod.rs +++ b/server/src/engine/storage/v1/header_impl/mod.rs @@ -72,14 +72,14 @@ mod dr; #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, sky_macros::EnumMethods)] pub enum FileScope { Journal = 0, - TransactionLogCompacted = 1, + DataBatch = 1, } impl FileScope { pub const fn try_new(id: u64) -> Option { Some(match id { 0 => Self::Journal, - 1 => Self::TransactionLogCompacted, + 1 => Self::DataBatch, _ => return None, }) } @@ -95,16 +95,18 @@ impl FileScope { #[repr(u8)] pub enum FileSpecifier { GNSTxnLog = 0, + TableDataBatch = 1, #[cfg(test)] - TestTransactionLog = 1, + TestTransactionLog = 0xFF, } impl FileSpecifier { pub const fn try_new(v: u32) -> Option { Some(match v { 0 => Self::GNSTxnLog, + 1 => Self::TableDataBatch, #[cfg(test)] - 1 => Self::TestTransactionLog, + 0xFF => Self::TestTransactionLog, _ => return None, }) } diff --git a/server/src/engine/storage/v1/inf/map.rs b/server/src/engine/storage/v1/inf/map.rs index 7cd872a4..660a4569 100644 --- a/server/src/engine/storage/v1/inf/map.rs +++ b/server/src/engine/storage/v1/inf/map.rs @@ -27,7 +27,7 @@ use { super::{ obj::{self, FieldMD}, - PersistDictEntryDscr, PersistMapSpec, PersistObject, VecU8, + PersistTypeDscr, PersistMapSpec, PersistObject, VecU8, }, crate::{ engine::{ @@ -178,7 +178,7 @@ impl PersistMapSpec for GenericDictSpec { fn pretest_entry_data(scanner: &BufferedScanner, md: &Self::EntryMD) -> bool { static EXPECT_ATLEAST: [u8; 4] = [0, 1, 8, 8]; // PAD to align let lbound_rem = md.klen + EXPECT_ATLEAST[cmp::min(md.dscr, 3) as usize] as usize; - scanner.has_left(lbound_rem) & (md.dscr <= PersistDictEntryDscr::Dict.value_u8()) + scanner.has_left(lbound_rem) & (md.dscr <= PersistTypeDscr::Dict.value_u8()) } fn entry_md_enc(buf: &mut VecU8, key: &Self::Key, _: &Self::Value) { buf.extend(key.len().u64_bytes_le()); @@ -189,7 +189,7 @@ impl PersistMapSpec for GenericDictSpec { fn enc_entry(buf: &mut VecU8, key: &Self::Key, val: &Self::Value) { match val { DictEntryGeneric::Map(map) => { - buf.push(PersistDictEntryDscr::Dict.value_u8()); + buf.push(PersistTypeDscr::Dict.value_u8()); buf.extend(key.as_bytes()); as PersistObject>::default_full_enc(buf, map); } @@ -208,17 +208,17 @@ impl PersistMapSpec for GenericDictSpec { unsafe fn dec_val(scanner: &mut BufferedScanner, md: &Self::EntryMD) -> Option { unsafe fn decode_element( scanner: &mut BufferedScanner, - dscr: PersistDictEntryDscr, + dscr: PersistTypeDscr, dg_top_element: bool, ) -> Option { let r = match dscr { - PersistDictEntryDscr::Null => DictEntryGeneric::Data(Datacell::null()), - PersistDictEntryDscr::Bool => { + PersistTypeDscr::Null => DictEntryGeneric::Data(Datacell::null()), + PersistTypeDscr::Bool => { DictEntryGeneric::Data(Datacell::new_bool(scanner.next_byte() == 1)) } - PersistDictEntryDscr::UnsignedInt - | PersistDictEntryDscr::SignedInt - | PersistDictEntryDscr::Float => DictEntryGeneric::Data(Datacell::new_qw( + PersistTypeDscr::UnsignedInt + | PersistTypeDscr::SignedInt + | PersistTypeDscr::Float => DictEntryGeneric::Data(Datacell::new_qw( scanner.next_u64_le(), CUTag::new( dscr.into_class(), @@ -230,13 +230,13 @@ impl PersistMapSpec for GenericDictSpec { ][(dscr.value_u8() - 2) as usize], ), )), - PersistDictEntryDscr::Str | PersistDictEntryDscr::Bin => { + PersistTypeDscr::Str | PersistTypeDscr::Bin => { let slc_len = scanner.next_u64_le() as usize; if !scanner.has_left(slc_len) { return None; } let slc = scanner.next_chunk_variable(slc_len); - DictEntryGeneric::Data(if dscr == PersistDictEntryDscr::Str { + DictEntryGeneric::Data(if dscr == PersistTypeDscr::Str { if core::str::from_utf8(slc).is_err() { return None; } @@ -247,18 +247,18 @@ impl PersistMapSpec for GenericDictSpec { Datacell::new_bin(slc.to_owned().into_boxed_slice()) }) } - PersistDictEntryDscr::List => { + PersistTypeDscr::List => { let list_len = scanner.next_u64_le() as usize; let mut v = Vec::with_capacity(list_len); while (!scanner.eof()) & (v.len() < list_len) { let dscr = scanner.next_byte(); - if dscr > PersistDictEntryDscr::Dict.value_u8() { + if dscr > PersistTypeDscr::Dict.value_u8() { return None; } v.push( match decode_element( scanner, - PersistDictEntryDscr::from_raw(dscr), + PersistTypeDscr::from_raw(dscr), false, ) { Some(DictEntryGeneric::Data(l)) => l, @@ -273,7 +273,7 @@ impl PersistMapSpec for GenericDictSpec { return None; } } - PersistDictEntryDscr::Dict => { + PersistTypeDscr::Dict => { if dg_top_element { DictEntryGeneric::Map( as PersistObject>::default_full_dec( @@ -288,7 +288,7 @@ impl PersistMapSpec for GenericDictSpec { }; Some(r) } - decode_element(scanner, PersistDictEntryDscr::from_raw(md.dscr), true) + decode_element(scanner, PersistTypeDscr::from_raw(md.dscr), true) } // not implemented fn enc_key(_: &mut VecU8, _: &Self::Key) { diff --git a/server/src/engine/storage/v1/inf/mod.rs b/server/src/engine/storage/v1/inf/mod.rs index 43486d23..3a0769ea 100644 --- a/server/src/engine/storage/v1/inf/mod.rs +++ b/server/src/engine/storage/v1/inf/mod.rs @@ -51,7 +51,7 @@ type VecU8 = Vec; #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, sky_macros::EnumMethods)] #[repr(u8)] /// Disambiguation for data -pub enum PersistDictEntryDscr { +pub enum PersistTypeDscr { Null = 0, Bool = 1, UnsignedInt = 2, @@ -63,11 +63,19 @@ pub enum PersistDictEntryDscr { Dict = 8, } -impl PersistDictEntryDscr { +impl PersistTypeDscr { + pub(super) const MAX: Self = Self::Dict; /// translates the tag class definition into the dscr definition pub const fn translate_from_class(class: TagClass) -> Self { unsafe { Self::from_raw(class.d() + 1) } } + pub const fn try_from_raw(v: u8) -> Option { + if v > Self::MAX.value_u8() { + None + } else { + unsafe { Some(Self::from_raw(v)) } + } + } pub const unsafe fn from_raw(v: u8) -> Self { core::mem::transmute(v) } diff --git a/server/src/engine/storage/v1/inf/obj.rs b/server/src/engine/storage/v1/inf/obj.rs index f1c11152..18cdd5bf 100644 --- a/server/src/engine/storage/v1/inf/obj.rs +++ b/server/src/engine/storage/v1/inf/obj.rs @@ -27,7 +27,7 @@ use crate::engine::{core::model::delta::IRModel, data::DictGeneric}; use { - super::{PersistDictEntryDscr, PersistObject, VecU8}, + super::{PersistTypeDscr, PersistObject, VecU8}, crate::{ engine::{ core::{ @@ -71,7 +71,7 @@ pub fn encode_element(buf: &mut VecU8, dc: &Datacell) { pub fn encode_datacell_tag(buf: &mut VecU8, dc: &Datacell) { buf.push( - PersistDictEntryDscr::translate_from_class(dc.tag().tag_class()).value_u8() + PersistTypeDscr::translate_from_class(dc.tag().tag_class()).value_u8() * (!dc.is_null() as u8), ) } diff --git a/server/src/engine/storage/v1/journal.rs b/server/src/engine/storage/v1/journal.rs index eba132c3..84ea217c 100644 --- a/server/src/engine/storage/v1/journal.rs +++ b/server/src/engine/storage/v1/journal.rs @@ -68,16 +68,18 @@ pub fn null_journal( host_startup_counter: u64, _: &TA::GlobalState, ) -> JournalWriter { - let FileOpen::Created(journal) = SDSSFileIO::::open_or_create_perm_rw( - log_file_name, - FileScope::Journal, - log_kind, - log_kind_version, - host_setting_version, - host_run_mode, - host_startup_counter, - ) - .unwrap() else { + let FileOpen::Created(journal) = + SDSSFileIO::::open_or_create_perm_rw::( + log_file_name, + FileScope::Journal, + log_kind, + log_kind_version, + host_setting_version, + host_run_mode, + host_startup_counter, + ) + .unwrap() + else { panic!() }; JournalWriter::new(journal, 0, true).unwrap() @@ -92,15 +94,25 @@ pub fn open_journal( host_startup_counter: u64, gs: &TA::GlobalState, ) -> SDSSResult> { - let f = SDSSFileIO::::open_or_create_perm_rw( - log_file_name, - FileScope::Journal, - log_kind, - log_kind_version, - host_setting_version, - host_run_mode, - host_startup_counter, - )?; + macro_rules! open_file { + ($modify:literal) => { + SDSSFileIO::::open_or_create_perm_rw::<$modify>( + log_file_name, + FileScope::Journal, + log_kind, + log_kind_version, + host_setting_version, + host_run_mode, + host_startup_counter, + ) + }; + } + // HACK(@ohsayan): until generic const exprs are stabilized, we're in a state of hell + let f = if TA::DENY_NONAPPEND { + open_file!(false) + } else { + open_file!(true) + }?; let file = match f { FileOpen::Created(f) => return JournalWriter::new(f, 0, true), FileOpen::Existing(file, _) => file, @@ -111,6 +123,8 @@ pub fn open_journal( /// The journal adapter pub trait JournalAdapter { + /// deny any SDSS file level operations that require non-append mode writes (for example, updating the SDSS header's modify count) + const DENY_NONAPPEND: bool = true; /// enable/disable automated recovery algorithms const RECOVERY_PLUGIN: bool; /// The journal event diff --git a/server/src/engine/storage/v1/mod.rs b/server/src/engine/storage/v1/mod.rs index b4113c78..0c111ef0 100644 --- a/server/src/engine/storage/v1/mod.rs +++ b/server/src/engine/storage/v1/mod.rs @@ -27,6 +27,7 @@ // raw mod header_impl; // impls +mod batch_jrnl; mod journal; mod rw; // hl @@ -110,6 +111,16 @@ pub enum SDSSError { InternalDecodeStructureCorruptedPayload, /// the data for an internal structure was decoded but is logically invalid InternalDecodeStructureIllegalData, + /// when attempting to flush a data batch, the batch journal crashed and a recovery event was triggered. But even then, + /// the data batch journal could not be fixed + DataBatchRecoveryFailStageOne, + /// when attempting to restore a data batch from disk, the batch journal crashed and had a corruption, but it is irrecoverable + DataBatchRestoreCorruptedBatch, + /// when attempting to restore a data batch from disk, the driver encountered a corrupted entry + DataBatchRestoreCorruptedEntry, + /// we failed to close the data batch + DataBatchCloseError, + DataBatchRestoreCorruptedBatchFile, } impl SDSSError { diff --git a/server/src/engine/storage/v1/rw.rs b/server/src/engine/storage/v1/rw.rs index 969f8cbc..a21b2b5e 100644 --- a/server/src/engine/storage/v1/rw.rs +++ b/server/src/engine/storage/v1/rw.rs @@ -31,7 +31,10 @@ use { }, SDSSResult, }, - crate::engine::storage::v1::SDSSError, + crate::{ + engine::storage::{v1::SDSSError, SCrc}, + util::os::SysIOError, + }, std::{ fs::File, io::{Read, Seek, SeekFrom, Write}, @@ -46,6 +49,21 @@ pub enum FileOpen { Existing(F, SDSSHeader), } +impl FileOpen { + pub fn into_existing(self) -> Option<(F, SDSSHeader)> { + match self { + Self::Existing(f, h) => Some((f, h)), + Self::Created(_) => None, + } + } + pub fn into_created(self) -> Option { + match self { + Self::Created(f) => Some(f), + Self::Existing(_, _) => None, + } + } +} + #[derive(Debug)] pub enum RawFileOpen { Created(F), @@ -138,6 +156,103 @@ impl RawFileIOInterface for File { } } +pub struct SDSSFileTrackedWriter { + f: SDSSFileIO, + cs: SCrc, +} + +impl SDSSFileTrackedWriter { + pub fn new(f: SDSSFileIO) -> Self { + Self { f, cs: SCrc::new() } + } + pub fn unfsynced_write(&mut self, block: &[u8]) -> SDSSResult<()> { + match self.f.unfsynced_write(block) { + Ok(()) => { + self.cs.recompute_with_new_var_block(block); + Ok(()) + } + e => e, + } + } + pub fn fsync_all(&mut self) -> SDSSResult<()> { + self.f.fsync_all() + } + pub fn reset_and_finish_checksum(&mut self) -> u64 { + let mut scrc = SCrc::new(); + core::mem::swap(&mut self.cs, &mut scrc); + scrc.finish() + } + pub fn inner_file(&mut self) -> &mut SDSSFileIO { + &mut self.f + } +} + +/// [`SDSSFileLenTracked`] simply maintains application level length and checksum tracking to avoid frequent syscalls because we +/// do not expect (even though it's very possible) users to randomly modify file lengths while we're reading them +pub struct SDSSFileTrackedReader { + f: SDSSFileIO, + len: u64, + pos: u64, + cs: SCrc, +} + +impl SDSSFileTrackedReader { + /// Important: this will only look at the data post the current cursor! + pub fn new(mut f: SDSSFileIO) -> SDSSResult { + let len = f.file_length()?; + let pos = f.retrieve_cursor()?; + Ok(Self { + f, + len, + pos, + cs: SCrc::new(), + }) + } + pub fn remaining(&self) -> u64 { + self.len - self.pos + } + pub fn is_eof(&self) -> bool { + self.len == self.pos + } + pub fn has_left(&self, v: u64) -> bool { + self.remaining() >= v + } + pub fn read_into_buffer(&mut self, buf: &mut [u8]) -> SDSSResult<()> { + if self.remaining() >= buf.len() as u64 { + match self.f.read_to_buffer(buf) { + Ok(()) => { + self.pos += buf.len() as u64; + self.cs.recompute_with_new_var_block(buf); + Ok(()) + } + Err(e) => return Err(e), + } + } else { + Err(SDSSError::IoError(SysIOError::from( + std::io::ErrorKind::InvalidInput, + ))) + } + } + pub fn read_byte(&mut self) -> SDSSResult { + let mut buf = [0u8; 1]; + self.read_into_buffer(&mut buf).map(|_| buf[0]) + } + pub fn __reset_checksum(&mut self) -> u64 { + let mut crc = SCrc::new(); + core::mem::swap(&mut crc, &mut self.cs); + crc.finish() + } + pub fn inner_file(&mut self) -> &mut SDSSFileIO { + &mut self.f + } + pub fn into_inner_file(self) -> SDSSFileIO { + self.f + } + pub fn __cursor_ahead_by(&mut self, sizeof: usize) { + self.pos += sizeof as u64; + } +} + #[derive(Debug)] pub struct SDSSFileIO { f: F, @@ -145,7 +260,7 @@ pub struct SDSSFileIO { impl SDSSFileIO { /// **IMPORTANT: File position: end-of-header-section** - pub fn open_or_create_perm_rw( + pub fn open_or_create_perm_rw( file_path: &str, file_scope: FileScope, file_specifier: FileSpecifier, @@ -180,13 +295,15 @@ impl SDSSFileIO { .ok_or(SDSSError::HeaderDecodeCorruptedHeader)?; // now validate the header header.verify(file_scope, file_specifier, file_specifier_version)?; - // since we updated this file, let us update the header - let mut new_header = header.clone(); - new_header.dr_rs_mut().bump_modify_count(); let mut f = Self::_new(f); - f.seek_from_start(0)?; - f.fsynced_write(new_header.encoded().array().as_ref())?; - f.seek_from_start(SDSSHeaderRaw::header_size() as _)?; + if REWRITE_MODIFY_COUNTER { + // since we updated this file, let us update the header + let mut new_header = header.clone(); + new_header.dr_rs_mut().bump_modify_count(); + f.seek_from_start(0)?; + f.fsynced_write(new_header.encoded().array().as_ref())?; + f.seek_from_start(SDSSHeaderRaw::header_size() as _)?; + } Ok(FileOpen::Existing(f, header)) } } @@ -223,6 +340,10 @@ impl SDSSFileIO { pub fn retrieve_cursor(&mut self) -> SDSSResult { self.f.fcursor() } + pub fn read_byte(&mut self) -> SDSSResult { + let mut r = [0; 1]; + self.read_to_buffer(&mut r).map(|_| r[0]) + } } pub struct BufferedScanner<'a> { diff --git a/server/src/engine/storage/v1/test_util.rs b/server/src/engine/storage/v1/test_util.rs index 54452d20..263d00c9 100644 --- a/server/src/engine/storage/v1/test_util.rs +++ b/server/src/engine/storage/v1/test_util.rs @@ -69,6 +69,11 @@ impl VFile { #[derive(Debug)] pub struct VirtualFS(Box); +impl VirtualFS { + pub fn get_file_data(f: &str) -> Option> { + VFS.read().get(f).map(|f| f.data.clone()) + } +} impl RawFileIOInterface for VirtualFS { fn fopen_or_create_rw(file_path: &str) -> super::SDSSResult> { @@ -181,7 +186,7 @@ impl RawFileIOInterface for VirtualFS { #[test] fn sdss_file() { - let f = SDSSFileIO::::open_or_create_perm_rw( + let f = SDSSFileIO::::open_or_create_perm_rw::( "this_is_a_test_file.db", FileScope::Journal, FileSpecifier::TestTransactionLog, @@ -199,7 +204,7 @@ fn sdss_file() { f.fsynced_write(b"hello, world\n").unwrap(); f.fsynced_write(b"hello, again\n").unwrap(); - let f = SDSSFileIO::::open_or_create_perm_rw( + let f = SDSSFileIO::::open_or_create_perm_rw::( "this_is_a_test_file.db", FileScope::Journal, FileSpecifier::TestTransactionLog, diff --git a/server/src/engine/storage/v1/tests.rs b/server/src/engine/storage/v1/tests.rs index dbe92fc7..8439377c 100644 --- a/server/src/engine/storage/v1/tests.rs +++ b/server/src/engine/storage/v1/tests.rs @@ -26,237 +26,6 @@ type VirtualFS = super::test_util::VirtualFS; -mod rw { - use crate::engine::storage::v1::{ - header_impl::{FileScope, FileSpecifier, FileSpecifierVersion, HostRunMode}, - rw::{FileOpen, SDSSFileIO}, - }; - - #[test] - fn create_delete() { - let f = SDSSFileIO::::open_or_create_perm_rw( - "hello_world.db-tlog", - FileScope::TransactionLogCompacted, - FileSpecifier::GNSTxnLog, - FileSpecifierVersion::__new(0), - 0, - HostRunMode::Prod, - 0, - ) - .unwrap(); - match f { - FileOpen::Existing(_, _) => panic!(), - FileOpen::Created(_) => {} - }; - let open = SDSSFileIO::::open_or_create_perm_rw( - "hello_world.db-tlog", - FileScope::TransactionLogCompacted, - FileSpecifier::GNSTxnLog, - FileSpecifierVersion::__new(0), - 0, - HostRunMode::Prod, - 0, - ) - .unwrap(); - let h = match open { - FileOpen::Existing(_, header) => header, - _ => panic!(), - }; - assert_eq!(h.gr_mdr().file_scope(), FileScope::TransactionLogCompacted); - assert_eq!(h.gr_mdr().file_spec(), FileSpecifier::GNSTxnLog); - assert_eq!(h.gr_mdr().file_spec_id(), FileSpecifierVersion::__new(0)); - assert_eq!(h.gr_hr().run_mode(), HostRunMode::Prod); - assert_eq!(h.gr_hr().setting_version(), 0); - assert_eq!(h.gr_hr().startup_counter(), 0); - } -} - -mod tx { - use crate::engine::storage::v1::header_impl::{ - FileSpecifier, FileSpecifierVersion, HostRunMode, - }; - - use { - crate::{ - engine::storage::v1::{ - journal::{self, JournalAdapter, JournalWriter}, - SDSSError, SDSSResult, - }, - util, - }, - std::cell::RefCell, - }; - pub struct Database { - data: RefCell<[u8; 10]>, - } - impl Database { - fn copy_data(&self) -> [u8; 10] { - *self.data.borrow() - } - fn new() -> Self { - Self { - data: RefCell::new([0; 10]), - } - } - fn reset(&self) { - *self.data.borrow_mut() = [0; 10]; - } - fn txn_reset( - &self, - txn_writer: &mut JournalWriter, - ) -> SDSSResult<()> { - self.reset(); - txn_writer.append_event(TxEvent::Reset) - } - fn set(&self, pos: usize, val: u8) { - self.data.borrow_mut()[pos] = val; - } - fn txn_set( - &self, - pos: usize, - val: u8, - txn_writer: &mut JournalWriter, - ) -> SDSSResult<()> { - self.set(pos, val); - txn_writer.append_event(TxEvent::Set(pos, val)) - } - } - pub enum TxEvent { - Reset, - Set(usize, u8), - } - #[derive(Debug)] - pub enum TxError { - SDSS(SDSSError), - } - direct_from! { - TxError => { - SDSSError as SDSS - } - } - #[derive(Debug)] - pub struct DatabaseTxnAdapter; - impl JournalAdapter for DatabaseTxnAdapter { - const RECOVERY_PLUGIN: bool = false; - type Error = TxError; - type JournalEvent = TxEvent; - type GlobalState = Database; - - fn encode(event: Self::JournalEvent) -> Box<[u8]> { - /* - [1B: opcode][8B:Index][1B: New value] - */ - let opcode = match event { - TxEvent::Reset => 0u8, - TxEvent::Set(_, _) => 1u8, - }; - let index = match event { - TxEvent::Reset => 0u64, - TxEvent::Set(index, _) => index as u64, - }; - let new_value = match event { - TxEvent::Reset => 0, - TxEvent::Set(_, val) => val, - }; - let mut ret = Vec::with_capacity(10); - ret.push(opcode); - ret.extend(index.to_le_bytes()); - ret.push(new_value); - ret.into_boxed_slice() - } - - fn decode_and_update_state(payload: &[u8], gs: &Self::GlobalState) -> Result<(), TxError> { - if payload.len() != 10 { - return Err(SDSSError::CorruptedFile("testtxn.log").into()); - } - let opcode = payload[0]; - let index = u64::from_le_bytes(util::copy_slice_to_array(&payload[1..9])); - let new_value = payload[9]; - match opcode { - 0 if index == 0 && new_value == 0 => gs.reset(), - 1 if index < 10 && index < isize::MAX as u64 => gs.set(index as usize, new_value), - _ => return Err(SDSSError::JournalLogEntryCorrupted.into()), - } - Ok(()) - } - } - - fn open_log( - log_name: &str, - db: &Database, - ) -> SDSSResult> { - journal::open_journal::( - log_name, - FileSpecifier::TestTransactionLog, - FileSpecifierVersion::__new(0), - 0, - HostRunMode::Prod, - 1, - &db, - ) - } - - #[test] - fn first_boot_second_readonly() { - // create log - let db1 = Database::new(); - let x = || -> SDSSResult<()> { - let mut log = open_log("testtxn.log", &db1)?; - db1.txn_set(0, 20, &mut log)?; - db1.txn_set(9, 21, &mut log)?; - log.append_journal_close_and_close() - }; - x().unwrap(); - // backup original data - let original_data = db1.copy_data(); - // restore log - let empty_db2 = Database::new(); - open_log("testtxn.log", &empty_db2) - .unwrap() - .append_journal_close_and_close() - .unwrap(); - assert_eq!(original_data, empty_db2.copy_data()); - } - #[test] - fn oneboot_mod_twoboot_mod_thirdboot_read() { - // first boot: set all to 1 - let db1 = Database::new(); - let x = || -> SDSSResult<()> { - let mut log = open_log("duatxn.db-tlog", &db1)?; - for i in 0..10 { - db1.txn_set(i, 1, &mut log)?; - } - log.append_journal_close_and_close() - }; - x().unwrap(); - let bkp_db1 = db1.copy_data(); - drop(db1); - // second boot - let db2 = Database::new(); - let x = || -> SDSSResult<()> { - let mut log = open_log("duatxn.db-tlog", &db2)?; - assert_eq!(bkp_db1, db2.copy_data()); - for i in 0..10 { - let current_val = db2.data.borrow()[i]; - db2.txn_set(i, current_val + i as u8, &mut log)?; - } - log.append_journal_close_and_close() - }; - x().unwrap(); - let bkp_db2 = db2.copy_data(); - drop(db2); - // third boot - let db3 = Database::new(); - let log = open_log("duatxn.db-tlog", &db3).unwrap(); - log.append_journal_close_and_close().unwrap(); - assert_eq!(bkp_db2, db3.copy_data()); - assert_eq!( - db3.copy_data(), - (1..=10) - .into_iter() - .map(u8::from) - .collect::>() - .as_ref() - ); - } -} +mod batch; +mod rw; +mod tx; diff --git a/server/src/engine/storage/v1/tests/batch.rs b/server/src/engine/storage/v1/tests/batch.rs new file mode 100644 index 00000000..7db00240 --- /dev/null +++ b/server/src/engine/storage/v1/tests/batch.rs @@ -0,0 +1,210 @@ +/* + * Created on Wed Sep 06 2023 + * + * This file is a part of Skytable + * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source + * NoSQL database written by Sayan Nandan ("the Author") with the + * vision to provide flexibility in data modelling without compromising + * on performance, queryability or scalability. + * + * Copyright (c) 2023, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +use { + crate::engine::{ + core::{ + index::{DcFieldIndex, PrimaryIndexKey, Row}, + model::{ + delta::{DataDelta, DataDeltaKind, DeltaVersion}, + Field, Layer, Model, + }, + }, + data::{cell::Datacell, tag::TagSelector, uuid::Uuid}, + storage::v1::{ + batch_jrnl::{ + DataBatchPersistDriver, DataBatchRestoreDriver, DecodedBatchEvent, + DecodedBatchEventKind, NormalBatch, + }, + header_meta::{FileScope, FileSpecifier, FileSpecifierVersion, HostRunMode}, + rw::{FileOpen, SDSSFileIO}, + test_util::VirtualFS, + }, + }, + crossbeam_epoch::pin, +}; + +fn pkey(v: impl Into) -> PrimaryIndexKey { + PrimaryIndexKey::try_from_dc(v.into()).unwrap() +} + +fn open_file(fpath: &str) -> FileOpen> { + SDSSFileIO::open_or_create_perm_rw::( + fpath, + FileScope::DataBatch, + FileSpecifier::TableDataBatch, + FileSpecifierVersion::__new(0), + 0, + HostRunMode::Dev, + 1, + ) + .unwrap() +} + +fn open_batch_data(fpath: &str, mdl: &Model) -> DataBatchPersistDriver { + match open_file(fpath) { + FileOpen::Created(f) => DataBatchPersistDriver::new(f, true), + FileOpen::Existing(f, _) => { + let mut dbr = DataBatchRestoreDriver::new(f).unwrap(); + dbr.read_data_batch_into_model(mdl).unwrap(); + DataBatchPersistDriver::new(dbr.into_file(), false) + } + } + .unwrap() +} + +fn new_delta( + schema: u64, + txnid: u64, + pk: Datacell, + data: DcFieldIndex, + change: DataDeltaKind, +) -> DataDelta { + new_delta_with_row( + schema, + txnid, + Row::new( + pkey(pk), + data, + DeltaVersion::test_new(schema), + DeltaVersion::test_new(txnid), + ), + change, + ) +} + +fn new_delta_with_row(schema: u64, txnid: u64, row: Row, change: DataDeltaKind) -> DataDelta { + DataDelta::new( + DeltaVersion::test_new(schema), + DeltaVersion::test_new(txnid), + row, + change, + ) +} + +#[test] +fn deltas_only_insert() { + // prepare model definition + let uuid = Uuid::new(); + let mdl = Model::new_restore( + uuid, + "catname".into(), + TagSelector::Str.into_full(), + into_dict!( + "catname" => Field::new([Layer::str()].into(), false), + "is_good" => Field::new([Layer::bool()].into(), false), + "magical" => Field::new([Layer::bool()].into(), false), + ), + ); + let row = Row::new( + pkey("Schrödinger's cat"), + into_dict!("is_good" => Datacell::new_bool(true), "magical" => Datacell::new_bool(false)), + DeltaVersion::test_new(0), + DeltaVersion::test_new(2), + ); + { + // update the row + let mut wl = row.d_data().write(); + wl.set_txn_revised(DeltaVersion::test_new(3)); + *wl.fields_mut().get_mut("magical").unwrap() = Datacell::new_bool(true); + } + // prepare deltas + let deltas = [ + // insert catname: Schrödinger's cat, is_good: true + new_delta_with_row(0, 0, row.clone(), DataDeltaKind::Insert), + // insert catname: good cat, is_good: true, magical: false + new_delta( + 0, + 1, + Datacell::new_str("good cat".into()), + into_dict!("is_good" => Datacell::new_bool(true), "magical" => Datacell::new_bool(false)), + DataDeltaKind::Insert, + ), + // insert catname: bad cat, is_good: false, magical: false + new_delta( + 0, + 2, + Datacell::new_str("bad cat".into()), + into_dict!("is_good" => Datacell::new_bool(false), "magical" => Datacell::new_bool(false)), + DataDeltaKind::Insert, + ), + // update catname: Schrödinger's cat, is_good: true, magical: true + new_delta_with_row(0, 3, row.clone(), DataDeltaKind::Update), + ]; + // delta queue + let g = pin(); + for delta in deltas.clone() { + mdl.delta_state().append_new_data_delta(delta, &g); + } + let file = open_file("deltas_only_insert.db-btlog") + .into_created() + .unwrap(); + { + let mut persist_driver = DataBatchPersistDriver::new(file, true).unwrap(); + persist_driver.write_new_batch(&mdl, deltas.len()).unwrap(); + persist_driver.close().unwrap(); + } + let mut restore_driver = DataBatchRestoreDriver::new( + open_file("deltas_only_insert.db-btlog") + .into_existing() + .unwrap() + .0, + ) + .unwrap(); + let batch = restore_driver.read_all_batches().unwrap(); + assert_eq!( + batch, + vec![NormalBatch::new( + vec![ + DecodedBatchEvent::new( + 1, + pkey("good cat"), + DecodedBatchEventKind::Insert(vec![ + Datacell::new_bool(true), + Datacell::new_bool(false) + ]) + ), + DecodedBatchEvent::new( + 2, + pkey("bad cat"), + DecodedBatchEventKind::Insert(vec![ + Datacell::new_bool(false), + Datacell::new_bool(false) + ]) + ), + DecodedBatchEvent::new( + 3, + pkey("Schrödinger's cat"), + DecodedBatchEventKind::Update(vec![ + Datacell::new_bool(true), + Datacell::new_bool(true) + ]) + ) + ], + 0 + )] + ) +} diff --git a/server/src/engine/storage/v1/tests/rw.rs b/server/src/engine/storage/v1/tests/rw.rs new file mode 100644 index 00000000..23e296f3 --- /dev/null +++ b/server/src/engine/storage/v1/tests/rw.rs @@ -0,0 +1,68 @@ +/* + * Created on Tue Sep 05 2023 + * + * This file is a part of Skytable + * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source + * NoSQL database written by Sayan Nandan ("the Author") with the + * vision to provide flexibility in data modelling without compromising + * on performance, queryability or scalability. + * + * Copyright (c) 2023, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +use crate::engine::storage::v1::{ + header_impl::{FileScope, FileSpecifier, FileSpecifierVersion, HostRunMode}, + rw::{FileOpen, SDSSFileIO}, +}; + +#[test] +fn create_delete() { + let f = SDSSFileIO::::open_or_create_perm_rw::( + "hello_world.db-tlog", + FileScope::Journal, + FileSpecifier::GNSTxnLog, + FileSpecifierVersion::__new(0), + 0, + HostRunMode::Prod, + 0, + ) + .unwrap(); + match f { + FileOpen::Existing(_, _) => panic!(), + FileOpen::Created(_) => {} + }; + let open = SDSSFileIO::::open_or_create_perm_rw::( + "hello_world.db-tlog", + FileScope::Journal, + FileSpecifier::GNSTxnLog, + FileSpecifierVersion::__new(0), + 0, + HostRunMode::Prod, + 0, + ) + .unwrap(); + let h = match open { + FileOpen::Existing(_, header) => header, + _ => panic!(), + }; + assert_eq!(h.gr_mdr().file_scope(), FileScope::Journal); + assert_eq!(h.gr_mdr().file_spec(), FileSpecifier::GNSTxnLog); + assert_eq!(h.gr_mdr().file_spec_id(), FileSpecifierVersion::__new(0)); + assert_eq!(h.gr_hr().run_mode(), HostRunMode::Prod); + assert_eq!(h.gr_hr().setting_version(), 0); + assert_eq!(h.gr_hr().startup_counter(), 0); +} diff --git a/server/src/engine/storage/v1/tests/tx.rs b/server/src/engine/storage/v1/tests/tx.rs new file mode 100644 index 00000000..01c24403 --- /dev/null +++ b/server/src/engine/storage/v1/tests/tx.rs @@ -0,0 +1,210 @@ +/* + * Created on Tue Sep 05 2023 + * + * This file is a part of Skytable + * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source + * NoSQL database written by Sayan Nandan ("the Author") with the + * vision to provide flexibility in data modelling without compromising + * on performance, queryability or scalability. + * + * Copyright (c) 2023, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +use { + crate::{ + engine::storage::v1::{ + header_impl::{FileSpecifier, FileSpecifierVersion, HostRunMode}, + journal::{self, JournalAdapter, JournalWriter}, + SDSSError, SDSSResult, + }, + util, + }, + std::cell::RefCell, +}; +pub struct Database { + data: RefCell<[u8; 10]>, +} +impl Database { + fn copy_data(&self) -> [u8; 10] { + *self.data.borrow() + } + fn new() -> Self { + Self { + data: RefCell::new([0; 10]), + } + } + fn reset(&self) { + *self.data.borrow_mut() = [0; 10]; + } + fn txn_reset( + &self, + txn_writer: &mut JournalWriter, + ) -> SDSSResult<()> { + self.reset(); + txn_writer.append_event(TxEvent::Reset) + } + fn set(&self, pos: usize, val: u8) { + self.data.borrow_mut()[pos] = val; + } + fn txn_set( + &self, + pos: usize, + val: u8, + txn_writer: &mut JournalWriter, + ) -> SDSSResult<()> { + self.set(pos, val); + txn_writer.append_event(TxEvent::Set(pos, val)) + } +} +pub enum TxEvent { + Reset, + Set(usize, u8), +} +#[derive(Debug)] +pub enum TxError { + SDSS(SDSSError), +} +direct_from! { + TxError => { + SDSSError as SDSS + } +} +#[derive(Debug)] +pub struct DatabaseTxnAdapter; +impl JournalAdapter for DatabaseTxnAdapter { + const RECOVERY_PLUGIN: bool = false; + type Error = TxError; + type JournalEvent = TxEvent; + type GlobalState = Database; + + fn encode(event: Self::JournalEvent) -> Box<[u8]> { + /* + [1B: opcode][8B:Index][1B: New value] + */ + let opcode = match event { + TxEvent::Reset => 0u8, + TxEvent::Set(_, _) => 1u8, + }; + let index = match event { + TxEvent::Reset => 0u64, + TxEvent::Set(index, _) => index as u64, + }; + let new_value = match event { + TxEvent::Reset => 0, + TxEvent::Set(_, val) => val, + }; + let mut ret = Vec::with_capacity(10); + ret.push(opcode); + ret.extend(index.to_le_bytes()); + ret.push(new_value); + ret.into_boxed_slice() + } + + fn decode_and_update_state(payload: &[u8], gs: &Self::GlobalState) -> Result<(), TxError> { + if payload.len() != 10 { + return Err(SDSSError::CorruptedFile("testtxn.log").into()); + } + let opcode = payload[0]; + let index = u64::from_le_bytes(util::copy_slice_to_array(&payload[1..9])); + let new_value = payload[9]; + match opcode { + 0 if index == 0 && new_value == 0 => gs.reset(), + 1 if index < 10 && index < isize::MAX as u64 => gs.set(index as usize, new_value), + _ => return Err(SDSSError::JournalLogEntryCorrupted.into()), + } + Ok(()) + } +} + +fn open_log( + log_name: &str, + db: &Database, +) -> SDSSResult> { + journal::open_journal::( + log_name, + FileSpecifier::TestTransactionLog, + FileSpecifierVersion::__new(0), + 0, + HostRunMode::Prod, + 1, + &db, + ) +} + +#[test] +fn first_boot_second_readonly() { + // create log + let db1 = Database::new(); + let x = || -> SDSSResult<()> { + let mut log = open_log("testtxn.log", &db1)?; + db1.txn_set(0, 20, &mut log)?; + db1.txn_set(9, 21, &mut log)?; + log.append_journal_close_and_close() + }; + x().unwrap(); + // backup original data + let original_data = db1.copy_data(); + // restore log + let empty_db2 = Database::new(); + open_log("testtxn.log", &empty_db2) + .unwrap() + .append_journal_close_and_close() + .unwrap(); + assert_eq!(original_data, empty_db2.copy_data()); +} +#[test] +fn oneboot_mod_twoboot_mod_thirdboot_read() { + // first boot: set all to 1 + let db1 = Database::new(); + let x = || -> SDSSResult<()> { + let mut log = open_log("duatxn.db-tlog", &db1)?; + for i in 0..10 { + db1.txn_set(i, 1, &mut log)?; + } + log.append_journal_close_and_close() + }; + x().unwrap(); + let bkp_db1 = db1.copy_data(); + drop(db1); + // second boot + let db2 = Database::new(); + let x = || -> SDSSResult<()> { + let mut log = open_log("duatxn.db-tlog", &db2)?; + assert_eq!(bkp_db1, db2.copy_data()); + for i in 0..10 { + let current_val = db2.data.borrow()[i]; + db2.txn_set(i, current_val + i as u8, &mut log)?; + } + log.append_journal_close_and_close() + }; + x().unwrap(); + let bkp_db2 = db2.copy_data(); + drop(db2); + // third boot + let db3 = Database::new(); + let log = open_log("duatxn.db-tlog", &db3).unwrap(); + log.append_journal_close_and_close().unwrap(); + assert_eq!(bkp_db2, db3.copy_data()); + assert_eq!( + db3.copy_data(), + (1..=10) + .into_iter() + .map(u8::from) + .collect::>() + .as_ref() + ); +} diff --git a/server/src/engine/txn/data.rs b/server/src/engine/txn/data.rs index fcc9cd48..38c70437 100644 --- a/server/src/engine/txn/data.rs +++ b/server/src/engine/txn/data.rs @@ -25,12 +25,8 @@ */ use crate::{ - engine::{ - core::{index::PrimaryIndexKey, model::delta::DataDelta, GlobalNS}, - data::cell::Datacell, - storage::v1::inf::obj, - }, - util::{os, EndianQW}, + engine::core::{model::delta::DataDelta, GlobalNS}, + util::os, }; type Buf = Vec; @@ -72,28 +68,3 @@ pub unsafe fn get_max_delta_queue_size() -> usize { // TODO(@ohsayan): dynamically approximate this limit MAX_NODES_IN_LL_CNT } - -/* - misc. methods -*/ - -fn encode_primary_key(buf: &mut Buf, pk: &PrimaryIndexKey) { - buf.push(pk.tag().d()); - static EXEC: [unsafe fn(&mut Buf, &PrimaryIndexKey); 2] = [ - |buf, pk| unsafe { buf.extend(pk.read_uint().to_le_bytes()) }, - |buf, pk| unsafe { - let bin = pk.read_bin(); - buf.extend(bin.len().u64_bytes_le()); - buf.extend(bin); - }, - ]; - unsafe { - // UNSAFE(@ohsayan): tag map - assert!((pk.tag().d() / 2) < 2); - EXEC[(pk.tag().d() / 2) as usize](buf, pk); - } -} - -fn encode_dc(buf: &mut Buf, dc: &Datacell) { - obj::encode_element(buf, dc) -} diff --git a/server/src/engine/txn/mod.rs b/server/src/engine/txn/mod.rs index ae4a6cb0..fc4863e7 100644 --- a/server/src/engine/txn/mod.rs +++ b/server/src/engine/txn/mod.rs @@ -47,6 +47,7 @@ pub enum TransactionError { /// On restore, a certain item that was expected to match a certain value, has a different value OnRestoreDataConflictMismatch, SDSSError(SDSSError), + OutOfMemory, } direct_from! { diff --git a/server/src/util/os.rs b/server/src/util/os.rs index bfab9801..ffb5234c 100644 --- a/server/src/util/os.rs +++ b/server/src/util/os.rs @@ -52,6 +52,12 @@ impl From for SysIOError { } } +impl From for SysIOError { + fn from(e: std::io::ErrorKind) -> Self { + Self(e.into()) + } +} + #[cfg(test)] impl PartialEq for SysIOError { fn eq(&self, other: &Self) -> bool {