Implement batched persistence system

next
Sayan Nandan 1 year ago
parent ee9ccd5a30
commit 20c937451f
No known key found for this signature in database
GPG Key ID: 42EEDF4AE9D96B54

@ -45,7 +45,7 @@ pub fn delete(gns: &GlobalNS, mut delete: DeleteStatement) -> DatabaseResult<()>
.mt_delete_return_entry(&model.resolve_where(delete.clauses_mut())?, &g)
{
Some(row) => {
delta_state.append_new_data_delta(
delta_state.append_new_data_delta_with(
DataDeltaKind::Delete,
row.clone(),
schema_version,

@ -43,15 +43,15 @@ pub fn insert(gns: &GlobalNS, insert: InsertStatement) -> DatabaseResult<()> {
let g = cpin();
let ds = mdl.delta_state();
// create new version
let cv = ds.create_new_data_delta_version();
let row = Row::new(pk, data, ds.schema_current_version(), cv);
let new_version = ds.create_new_data_delta_version();
let row = Row::new(pk, data, ds.schema_current_version(), new_version);
if mdl.primary_index().__raw_index().mt_insert(row.clone(), &g) {
// append delta for new version
ds.append_new_data_delta(
ds.append_new_data_delta_with(
DataDeltaKind::Insert,
row,
ds.schema_current_version(),
cv,
new_version,
&g,
);
Ok(())

@ -248,7 +248,7 @@ pub fn update(gns: &GlobalNS, mut update: UpdateStatement) -> DatabaseResult<()>
let mut row_data_wl = row.d_data().write();
// create new version
let ds = mdl.delta_state();
let cv = ds.create_new_data_delta_version();
let new_version = ds.create_new_data_delta_version();
// process changes
let mut rollback_now = false;
let mut rollback_data = Vec::with_capacity(update.expressions().len());
@ -341,13 +341,13 @@ pub fn update(gns: &GlobalNS, mut update: UpdateStatement) -> DatabaseResult<()>
});
} else {
// update revised tag
row_data_wl.set_txn_revised(cv);
row_data_wl.set_txn_revised(new_version);
// publish delta
ds.append_new_data_delta(
ds.append_new_data_delta_with(
DataDeltaKind::Update,
row.clone(),
ds.schema_current_version(),
cv,
new_version,
&g,
);
}

@ -24,6 +24,7 @@
*
*/
use crate::engine::mem::ZERO_BLOCK;
#[cfg(test)]
use crate::{engine::data::spec::Dataspec1D, util::test_utils};
use {
@ -50,6 +51,29 @@ pub struct PrimaryIndexKey {
data: SpecialPaddedWord,
}
impl Clone for PrimaryIndexKey {
fn clone(&self) -> Self {
match self.tag {
TagUnique::SignedInt | TagUnique::UnsignedInt => {
let (qw, nw) = self.data.dwordqn_load_qw_nw();
unsafe {
let slice = slice::from_raw_parts(nw as *const u8, qw as _);
let mut data = ManuallyDrop::new(slice.to_owned().into_boxed_slice());
Self {
tag: self.tag,
data: SpecialPaddedWord::new(qw, data.as_mut_ptr() as usize),
}
}
}
TagUnique::Bin | TagUnique::Str => Self {
tag: self.tag,
data: unsafe { core::mem::transmute_copy(&self.data) },
},
_ => unreachable!(),
}
}
}
impl PrimaryIndexKey {
pub fn tag(&self) -> TagUnique {
self.tag
@ -127,6 +151,27 @@ impl PrimaryIndexKey {
},
}
}
/// Create a new quadword based primary key
pub unsafe fn new_from_qw(tag: TagUnique, qw: u64) -> Self {
debug_assert!(tag == TagUnique::SignedInt || tag == TagUnique::UnsignedInt);
Self {
tag,
data: unsafe {
// UNSAFE(@ohsayan): manually choosing block
SpecialPaddedWord::new(qw, ZERO_BLOCK.as_ptr() as usize)
},
}
}
pub unsafe fn new_from_dual(tag: TagUnique, qw: u64, ptr: usize) -> Self {
debug_assert!(tag == TagUnique::Str || tag == TagUnique::Bin);
Self {
tag,
data: unsafe {
// UNSAFE(@ohsayan): manually choosing qw and nw
SpecialPaddedWord::new(qw, ptr)
},
}
}
pub unsafe fn raw_clone(&self) -> Self {
Self {
tag: self.tag,

@ -139,11 +139,16 @@ impl Row {
}
impl Row {
pub fn resolve_schema_deltas_and_freeze<'g>(
/// Only apply deltas if a certain condition is met
pub fn resolve_schema_deltas_and_freeze_if<'g>(
&'g self,
delta_state: &DeltaState,
iff: impl Fn(&RowData) -> bool,
) -> RwLockReadGuard<'g, RowData> {
let rwl_ug = self.d_data().upgradable_read();
if !iff(&rwl_ug) {
return RwLockUpgradableReadGuard::downgrade(rwl_ug);
}
let current_version = delta_state.schema_current_version();
if compiler::likely(current_version <= rwl_ug.txn_revised_schema_version) {
return RwLockUpgradableReadGuard::downgrade(rwl_ug);
@ -167,6 +172,12 @@ impl Row {
wl.txn_revised_schema_version = max_delta;
return RwLockWriteGuard::downgrade(wl);
}
pub fn resolve_schema_deltas_and_freeze<'g>(
&'g self,
delta_state: &DeltaState,
) -> RwLockReadGuard<'g, RowData> {
self.resolve_schema_deltas_and_freeze_if(delta_state, |_| true)
}
}
impl Clone for Row {

@ -24,7 +24,7 @@
*
*/
mod dml;
pub(in crate::engine) mod dml;
pub(in crate::engine) mod index;
pub(in crate::engine) mod model;
pub(in crate::engine) mod query_meta;
@ -32,7 +32,7 @@ pub mod space;
mod util;
// test
#[cfg(test)]
mod tests;
pub(super) mod tests;
// imports
use {
self::{model::Model, util::EntityLocator},

@ -30,7 +30,7 @@ use {
parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard},
std::{
collections::btree_map::{BTreeMap, Range},
sync::atomic::{AtomicU64, Ordering},
sync::atomic::{AtomicU64, AtomicUsize, Ordering},
},
};
@ -153,6 +153,7 @@ pub struct DeltaState {
// data
data_current_version: AtomicU64,
data_deltas: Queue<DataDelta>,
data_deltas_size: AtomicUsize,
}
impl DeltaState {
@ -163,13 +164,14 @@ impl DeltaState {
schema_deltas: RwLock::new(BTreeMap::new()),
data_current_version: AtomicU64::new(0),
data_deltas: Queue::new(),
data_deltas_size: AtomicUsize::new(0),
}
}
}
// data
impl DeltaState {
pub fn append_new_data_delta(
pub fn append_new_data_delta_with(
&self,
kind: DataDeltaKind,
row: Row,
@ -177,18 +179,27 @@ impl DeltaState {
data_version: DeltaVersion,
g: &Guard,
) {
self.data_deltas
.blocking_enqueue(DataDelta::new(schema_version, data_version, row, kind), g);
self.append_new_data_delta(DataDelta::new(schema_version, data_version, row, kind), g);
}
pub fn append_new_data_delta(&self, delta: DataDelta, g: &Guard) {
self.data_deltas.blocking_enqueue(delta, g);
self.data_deltas_size.fetch_add(1, Ordering::Release);
}
pub fn create_new_data_delta_version(&self) -> DeltaVersion {
DeltaVersion(self.__data_delta_step())
}
pub fn get_data_delta_size(&self) -> usize {
self.data_deltas_size.load(Ordering::Acquire)
}
}
impl DeltaState {
pub fn __data_delta_step(&self) -> u64 {
fn __data_delta_step(&self) -> u64 {
self.data_current_version.fetch_add(1, Ordering::AcqRel)
}
pub fn __data_delta_queue(&self) -> &Queue<DataDelta> {
&self.data_deltas
}
}
// schema
@ -314,7 +325,7 @@ impl<'a> SchemaDeltaIndexRGuard<'a> {
data delta
*/
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct DataDelta {
schema_version: DeltaVersion,
data_version: DeltaVersion,
@ -336,11 +347,24 @@ impl DataDelta {
change,
}
}
pub fn schema_version(&self) -> DeltaVersion {
self.schema_version
}
pub fn data_version(&self) -> DeltaVersion {
self.data_version
}
pub fn row(&self) -> &Row {
&self.row
}
pub fn change(&self) -> DataDeltaKind {
self.change
}
}
#[derive(Debug)]
#[derive(Debug, Clone, Copy, sky_macros::EnumMethods, PartialEq)]
#[repr(u8)]
pub enum DataDeltaKind {
Insert,
Update,
Delete,
Delete = 0,
Insert = 1,
Update = 2,
}

@ -52,7 +52,7 @@ use {
std::cell::UnsafeCell,
};
pub(in crate::engine::core) use self::delta::{SchemaDeltaKind, DeltaState, DeltaVersion};
pub(in crate::engine::core) use self::delta::{DeltaState, DeltaVersion, SchemaDeltaKind};
pub(in crate::engine) type Fields = IndexSTSeqCns<Box<str>, Field>;
#[derive(Debug)]

@ -41,6 +41,26 @@ impl TagClass {
pub const fn max() -> usize {
Self::List.d() as _
}
pub const fn try_from_raw(v: u8) -> Option<Self> {
if v > Self::List.d() {
return None;
}
Some(unsafe { Self::from_raw(v) })
}
pub const unsafe fn from_raw(v: u8) -> Self {
core::mem::transmute(v)
}
pub const fn tag_unique(&self) -> TagUnique {
[
TagUnique::Illegal,
TagUnique::UnsignedInt,
TagUnique::SignedInt,
TagUnique::Illegal,
TagUnique::Bin,
TagUnique::Str,
TagUnique::Illegal,
][self.d() as usize]
}
}
#[repr(u8)]
@ -124,6 +144,12 @@ impl TagUnique {
pub const fn is_unique(&self) -> bool {
self.d() != Self::Illegal.d()
}
pub const fn try_from_raw(raw: u8) -> Option<Self> {
if raw > 3 {
return None;
}
Some(unsafe { core::mem::transmute(raw) })
}
}
macro_rules! d {

@ -0,0 +1,58 @@
/*
* Created on Sun Sep 03 2023
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2023, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use crc::{Crc, Digest, CRC_64_XZ};
/*
NOTE(@ohsayan): we're currently using crc's impl. but the reason I decided to make a wrapper is because I have a
different impl in mind
*/
const CRC64: Crc<u64> = Crc::<u64>::new(&CRC_64_XZ);
pub struct SCrc {
digest: Digest<'static, u64>,
}
impl SCrc {
pub const fn new() -> Self {
Self {
digest: CRC64.digest(),
}
}
pub fn recompute_with_new_byte(&mut self, b: u8) {
self.digest.update(&[b])
}
pub fn recompute_with_new_block<const N: usize>(&mut self, b: [u8; N]) {
self.digest.update(&b);
}
pub fn recompute_with_new_var_block(&mut self, b: &[u8]) {
self.digest.update(b)
}
pub fn finish(self) -> u64 {
self.digest.finalize()
}
}

@ -26,7 +26,10 @@
//! Implementations of the Skytable Disk Storage Subsystem (SDSS)
mod checksum;
mod header;
mod versions;
// impls
pub mod v1;
pub use checksum::SCrc;

@ -0,0 +1,45 @@
/*
* Created on Sun Sep 03 2023
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2023, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
mod persist;
mod restore;
/// the data batch file was reopened
const MARKER_BATCH_REOPEN: u8 = 0xFB;
/// the data batch file was closed
const MARKER_BATCH_CLOSED: u8 = 0xFC;
/// end of batch marker
const MARKER_END_OF_BATCH: u8 = 0xFD;
/// "real" batch event marker
const MARKER_ACTUAL_BATCH_EVENT: u8 = 0xFE;
/// recovery batch event marker
const MARKER_RECOVERY_EVENT: u8 = 0xFF;
/// recovery threshold
const RECOVERY_THRESHOLD: usize = 10;
#[cfg(test)]
pub(super) use restore::{DecodedBatchEvent, DecodedBatchEventKind, NormalBatch};
pub use {persist::DataBatchPersistDriver, restore::DataBatchRestoreDriver};

@ -0,0 +1,290 @@
/*
* Created on Tue Sep 05 2023
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2023, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use {
super::{
MARKER_ACTUAL_BATCH_EVENT, MARKER_BATCH_CLOSED, MARKER_BATCH_REOPEN, MARKER_END_OF_BATCH,
MARKER_RECOVERY_EVENT,
},
crate::{
engine::{
core::{
index::{PrimaryIndexKey, RowData},
model::{
delta::{DataDelta, DataDeltaKind, DeltaVersion, IRModel},
Model,
},
},
data::{
cell::Datacell,
tag::{DataTag, TagClass, TagUnique},
},
idx::STIndexSeq,
storage::v1::{
inf::PersistTypeDscr,
rw::{RawFileIOInterface, SDSSFileIO, SDSSFileTrackedWriter},
SDSSError, SDSSResult,
},
},
util::EndianQW,
},
crossbeam_epoch::pin,
};
pub struct DataBatchPersistDriver<F> {
f: SDSSFileTrackedWriter<F>,
}
impl<F: RawFileIOInterface> DataBatchPersistDriver<F> {
pub fn new(mut file: SDSSFileIO<F>, is_new: bool) -> SDSSResult<Self> {
if !is_new {
file.fsynced_write(&[MARKER_BATCH_REOPEN])?;
}
Ok(Self {
f: SDSSFileTrackedWriter::new(file),
})
}
pub fn close(mut self) -> SDSSResult<()> {
if self
.f
.inner_file()
.fsynced_write(&[MARKER_BATCH_CLOSED])
.is_ok()
{
return Ok(());
} else {
return Err(SDSSError::DataBatchCloseError);
}
}
pub fn write_new_batch(&mut self, model: &Model, observed_len: usize) -> SDSSResult<()> {
// pin model
let irm = model.intent_read_model();
let schema_version = model.delta_state().schema_current_version();
let data_q = model.delta_state().__data_delta_queue();
let g = pin();
// init restore list
let mut restore_list = Vec::new();
// prepare computations
let mut i = 0;
let mut inconsistent_reads = 0;
let mut exec = || -> SDSSResult<()> {
// write batch start
self.write_batch_start(observed_len, schema_version)?;
while i < observed_len {
let delta = data_q.blocking_try_dequeue(&g).unwrap();
restore_list.push(delta.clone()); // TODO(@ohsayan): avoid this
match delta.change() {
DataDeltaKind::Delete => {
self.write_batch_item_common_row_data(&delta)?;
self.encode_pk_only(delta.row().d_key())?;
}
DataDeltaKind::Insert | DataDeltaKind::Update => {
// resolve deltas (this is yet another opportunity for us to reclaim memory from deleted items)
let row_data = delta
.row()
.resolve_schema_deltas_and_freeze_if(&model.delta_state(), |row| {
row.get_txn_revised() <= delta.data_version()
});
if row_data.get_txn_revised() > delta.data_version() {
// we made an inconsistent (stale) read; someone updated the state after our snapshot
inconsistent_reads += 1;
i += 1;
continue;
}
self.write_batch_item_common_row_data(&delta)?;
// encode data
self.encode_pk_only(delta.row().d_key())?;
self.encode_row_data(model, &irm, &row_data)?;
}
}
// fsync now; we're good to go
self.f.fsync_all()?;
i += 1;
}
return self.append_batch_summary(observed_len, inconsistent_reads);
};
match exec() {
Ok(()) => Ok(()),
Err(_) => {
// republish changes since we failed to commit
restore_list
.into_iter()
.for_each(|delta| model.delta_state().append_new_data_delta(delta, &g));
// now attempt to fix the file
return self.attempt_fix_data_batchfile();
}
}
}
/// Write the batch start block:
/// - Batch start magic
/// - Expected commit
/// - Schema version
fn write_batch_start(
&mut self,
observed_len: usize,
schema_version: DeltaVersion,
) -> Result<(), SDSSError> {
self.f.unfsynced_write(&[MARKER_ACTUAL_BATCH_EVENT])?;
let observed_len_bytes = observed_len.u64_bytes_le();
self.f.unfsynced_write(&observed_len_bytes)?;
self.f
.unfsynced_write(&schema_version.value_u64().to_le_bytes())?;
Ok(())
}
/// Append a summary of this batch
fn append_batch_summary(
&mut self,
observed_len: usize,
inconsistent_reads: usize,
) -> Result<(), SDSSError> {
// [0xFD][actual_commit][checksum]
self.f.unfsynced_write(&[MARKER_END_OF_BATCH])?;
let actual_commit = (observed_len - inconsistent_reads).u64_bytes_le();
self.f.unfsynced_write(&actual_commit)?;
let cs = self.f.reset_and_finish_checksum().to_le_bytes();
self.f.inner_file().fsynced_write(&cs)?;
Ok(())
}
/// Attempt to fix the batch journal
// TODO(@ohsayan): declare an "international system disaster" when this happens
fn attempt_fix_data_batchfile(&mut self) -> SDSSResult<()> {
/*
attempt to append 0xFF to the part of the file where a corruption likely occurred, marking
it recoverable
*/
let f = self.f.inner_file();
if f.fsynced_write(&[MARKER_RECOVERY_EVENT]).is_ok() {
return Ok(());
}
Err(SDSSError::DataBatchRecoveryFailStageOne)
}
}
impl<F: RawFileIOInterface> DataBatchPersistDriver<F> {
/// encode the primary key only. this means NO TAG is encoded.
fn encode_pk_only(&mut self, pk: &PrimaryIndexKey) -> SDSSResult<()> {
let buf = &mut self.f;
match pk.tag() {
TagUnique::UnsignedInt | TagUnique::SignedInt => {
let data = unsafe {
// UNSAFE(@ohsayan): +tagck
pk.read_uint()
}
.to_le_bytes();
buf.unfsynced_write(&data)?;
}
TagUnique::Str | TagUnique::Bin => {
let slice = unsafe {
// UNSAFE(@ohsayan): +tagck
pk.read_bin()
};
let slice_l = slice.len().u64_bytes_le();
buf.unfsynced_write(&slice_l)?;
buf.unfsynced_write(slice)?;
}
TagUnique::Illegal => unsafe {
// UNSAFE(@ohsayan): a pk can't be constructed with illegal
impossible!()
},
}
Ok(())
}
/// Encode a single cell
fn encode_cell(&mut self, value: &Datacell) -> SDSSResult<()> {
let ref mut buf = self.f;
buf.unfsynced_write(&[
PersistTypeDscr::translate_from_class(value.tag().tag_class()).value_u8(),
])?;
match value.tag().tag_class() {
TagClass::Bool if value.is_null() => {}
TagClass::Bool => {
let bool = unsafe {
// UNSAFE(@ohsayan): +tagck
value.read_bool()
} as u8;
buf.unfsynced_write(&[bool])?;
}
TagClass::SignedInt | TagClass::UnsignedInt | TagClass::Float => {
let chunk = unsafe {
// UNSAFE(@ohsayan): +tagck
value.read_uint()
}
.to_le_bytes();
buf.unfsynced_write(&chunk)?;
}
TagClass::Str | TagClass::Bin => {
let slice = unsafe {
// UNSAFE(@ohsayan): +tagck
value.read_bin()
};
let slice_l = slice.len().u64_bytes_le();
buf.unfsynced_write(&slice_l)?;
buf.unfsynced_write(slice)?;
}
TagClass::List => {
let list = unsafe {
// UNSAFE(@ohsayan): +tagck
value.read_list()
}
.read();
let list_l = list.len().u64_bytes_le();
buf.unfsynced_write(&list_l)?;
for item in list.iter() {
self.encode_cell(item)?;
}
}
}
Ok(())
}
/// Encode row data
fn encode_row_data(
&mut self,
mdl: &Model,
irm: &IRModel,
row_data: &RowData,
) -> SDSSResult<()> {
// nasty hack; we need to avoid the pk
self.f
.unfsynced_write(&(row_data.fields().len()).to_le_bytes())?;
for field_name in irm.fields().stseq_ord_key() {
match row_data.fields().get(field_name) {
Some(cell) => {
self.encode_cell(cell)?;
}
None if field_name.as_ref() == mdl.p_key() => {}
None => self.f.unfsynced_write(&[0])?,
}
}
Ok(())
}
fn write_batch_item_common_row_data(&mut self, delta: &DataDelta) -> Result<(), SDSSError> {
let p1_dc_pk_ty = [delta.change().value_u8(), delta.row().d_key().tag().d()];
self.f.unfsynced_write(&p1_dc_pk_ty)?;
let txn_id = delta.data_version().value_u64().to_le_bytes();
self.f.unfsynced_write(&txn_id)?;
Ok(())
}
}

@ -0,0 +1,445 @@
/*
* Created on Tue Sep 05 2023
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2023, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use super::{MARKER_BATCH_CLOSED, MARKER_BATCH_REOPEN};
use {
super::{
MARKER_ACTUAL_BATCH_EVENT, MARKER_END_OF_BATCH, MARKER_RECOVERY_EVENT, RECOVERY_THRESHOLD,
},
crate::{
engine::{
core::{index::PrimaryIndexKey, model::Model},
data::{
cell::Datacell,
tag::{CUTag, TagClass, TagUnique},
},
storage::v1::{
inf::PersistTypeDscr,
rw::{RawFileIOInterface, SDSSFileIO, SDSSFileTrackedReader},
SDSSError, SDSSResult,
},
},
util::copy_slice_to_array,
},
std::mem::ManuallyDrop,
};
#[derive(Debug, PartialEq)]
pub(in crate::engine::storage::v1) struct DecodedBatchEvent {
txn_id: u64,
pk: PrimaryIndexKey,
kind: DecodedBatchEventKind,
}
impl DecodedBatchEvent {
pub(in crate::engine::storage::v1) const fn new(
txn_id: u64,
pk: PrimaryIndexKey,
kind: DecodedBatchEventKind,
) -> Self {
Self { txn_id, pk, kind }
}
}
#[derive(Debug, PartialEq)]
pub(in crate::engine::storage::v1) enum DecodedBatchEventKind {
Delete,
Insert(Vec<Datacell>),
Update(Vec<Datacell>),
}
#[derive(Debug, PartialEq)]
pub(in crate::engine::storage::v1) struct NormalBatch {
events: Vec<DecodedBatchEvent>,
schema_version: u64,
}
impl NormalBatch {
pub(in crate::engine::storage::v1) fn new(
events: Vec<DecodedBatchEvent>,
schema_version: u64,
) -> Self {
Self {
events,
schema_version,
}
}
}
enum Batch {
RecoveredFromerror,
Normal(NormalBatch),
FinishedEarly(NormalBatch),
BatchClosed,
}
pub struct DataBatchRestoreDriver<F> {
f: SDSSFileTrackedReader<F>,
}
impl<F: RawFileIOInterface> DataBatchRestoreDriver<F> {
pub fn new(f: SDSSFileIO<F>) -> SDSSResult<Self> {
Ok(Self {
f: SDSSFileTrackedReader::new(f)?,
})
}
pub fn into_file(self) -> SDSSFileIO<F> {
self.f.into_inner_file()
}
pub(in crate::engine::storage::v1) fn read_data_batch_into_model(
&mut self,
model: &Model,
) -> SDSSResult<()> {
self.read_all_batches_and_for_each(|batch| {
// apply the batch
Self::apply_batch(model, batch)
})
}
pub fn read_all_batches(&mut self) -> SDSSResult<Vec<NormalBatch>> {
let mut all_batches = vec![];
self.read_all_batches_and_for_each(|batch| {
all_batches.push(batch);
Ok(())
})?;
Ok(all_batches)
}
}
impl<F: RawFileIOInterface> DataBatchRestoreDriver<F> {
fn read_all_batches_and_for_each(
&mut self,
mut f: impl FnMut(NormalBatch) -> SDSSResult<()>,
) -> SDSSResult<()> {
// begin
let mut closed = false;
while !self.f.is_eof() && !closed {
// try to decode this batch
let Ok(batch) = self.read_batch() else {
self.attempt_recover_data_batch()?;
continue;
};
// see what happened when decoding it
let finished_early = matches!(batch, Batch::FinishedEarly { .. });
let batch = match batch {
Batch::RecoveredFromerror => {
// there was an error, but it was safely "handled" because of a recovery byte mark
continue;
}
Batch::FinishedEarly(batch) | Batch::Normal(batch) => batch,
Batch::BatchClosed => {
// the batch was closed; this means that we probably are done with this round; but was it re-opened?
closed = self.handle_reopen_is_actual_close()?;
continue;
}
};
// now we need to read the batch summary
let Ok(actual_commit) = self.read_batch_summary(finished_early) else {
self.attempt_recover_data_batch()?;
continue;
};
// check if we have the expected batch size
if batch.events.len() as u64 != actual_commit {
// corrupted
self.attempt_recover_data_batch()?;
continue;
}
f(batch)?;
// apply the batch
}
if closed {
if self.f.is_eof() {
// that was the last batch
return Ok(());
}
}
// nope, this is a corrupted file
Err(SDSSError::DataBatchRestoreCorruptedBatchFile)
}
fn handle_reopen_is_actual_close(&mut self) -> SDSSResult<bool> {
if self.f.is_eof() {
// yup, it was closed
Ok(true)
} else {
// maybe not
if self.f.read_byte()? == MARKER_BATCH_REOPEN {
// driver was closed, but reopened
Ok(false)
} else {
// that's just a nice bug
Err(SDSSError::DataBatchRestoreCorruptedBatchFile)
}
}
}
}
impl<F: RawFileIOInterface> DataBatchRestoreDriver<F> {
fn apply_batch(_: &Model, _: NormalBatch) -> SDSSResult<()> {
todo!()
}
}
impl<F: RawFileIOInterface> DataBatchRestoreDriver<F> {
fn read_batch_summary(&mut self, finished_early: bool) -> SDSSResult<u64> {
if !finished_early {
// we must read the batch termination signature
let b = self.f.read_byte()?;
if b != MARKER_END_OF_BATCH {
return Err(SDSSError::DataBatchRestoreCorruptedBatch);
}
}
// read actual commit
let mut actual_commit = [0; sizeof!(u64)];
self.f.read_into_buffer(&mut actual_commit)?;
// find actual checksum
let actual_checksum = self.f.__reset_checksum();
// find hardcoded checksum
let mut hardcoded_checksum = [0; sizeof!(u64)];
self.f
.inner_file()
.read_to_buffer(&mut hardcoded_checksum)?;
// move file cursor ahead
self.f.__cursor_ahead_by(sizeof!(u64));
if actual_checksum == u64::from_le_bytes(hardcoded_checksum) {
Ok(u64::from_le_bytes(actual_commit))
} else {
Err(SDSSError::DataBatchRestoreCorruptedBatch)
}
}
fn read_batch(&mut self) -> SDSSResult<Batch> {
let mut this_batch = vec![];
// check batch type
let batch_type = self.f.read_byte()?;
match batch_type {
MARKER_ACTUAL_BATCH_EVENT => {}
MARKER_RECOVERY_EVENT => {
// while attempting to write this batch, some sort of an error occurred but we got a nice recovery byte
// so proceed that way
return Ok(Batch::RecoveredFromerror);
}
MARKER_BATCH_CLOSED => {
// this isn't a batch; it has been closed
return Ok(Batch::BatchClosed);
}
_ => {
// this is the only singular byte that is expected to be intact. If this isn't intact either, I'm sorry
return Err(SDSSError::DataBatchRestoreCorruptedBatch);
}
}
// we're expecting a "good batch"
let mut batch_size_schema_version = [0; sizeof!(u64, 2)];
self.f.read_into_buffer(&mut batch_size_schema_version)?;
// we have the batch length
let batch_size = u64::from_le_bytes(copy_slice_to_array(&batch_size_schema_version[..8]));
let schema_version =
u64::from_le_bytes(copy_slice_to_array(&batch_size_schema_version[8..]));
let mut processed_in_this_batch = 0;
while (processed_in_this_batch != batch_size) & !self.f.is_eof() {
// decode common row data
let change_type = self.f.read_byte()?;
// now decode event
match change_type {
MARKER_END_OF_BATCH => {
// the file tells us that we've reached the end of this batch; hmmm
return Ok(Batch::FinishedEarly(NormalBatch::new(
this_batch,
schema_version,
)));
}
normal_event => {
let (pk_type, txnid) = self.read_normal_event_metadata()?;
match normal_event {
0 => {
// delete
let pk = self.decode_primary_key(pk_type)?;
this_batch.push(DecodedBatchEvent::new(
txnid,
pk,
DecodedBatchEventKind::Delete,
));
processed_in_this_batch += 1;
}
1 | 2 => {
// insert or update
// get pk
let pk = self.decode_primary_key(pk_type)?;
// get column count
let mut column_count = [0; sizeof!(u64)];
self.f.read_into_buffer(&mut column_count)?;
let mut column_count = u64::from_le_bytes(column_count);
// prepare row
let mut row = vec![];
while column_count != 0 && !self.f.is_eof() {
row.push(self.decode_cell()?);
column_count -= 1;
}
if column_count != 0 {
return Err(SDSSError::DataBatchRestoreCorruptedEntry);
}
if change_type == 1 {
this_batch.push(DecodedBatchEvent::new(
txnid,
pk,
DecodedBatchEventKind::Insert(row),
));
} else {
this_batch.push(DecodedBatchEvent::new(
txnid,
pk,
DecodedBatchEventKind::Update(row),
));
}
processed_in_this_batch += 1;
}
_ => {
return Err(SDSSError::DataBatchRestoreCorruptedBatch);
}
}
}
}
}
Ok(Batch::Normal(NormalBatch::new(this_batch, schema_version)))
}
fn read_normal_event_metadata(&mut self) -> Result<(u8, u64), SDSSError> {
let pk_type = self.f.read_byte()?;
let mut txnid = [0; sizeof!(u64)];
self.f.read_into_buffer(&mut txnid)?;
let txnid = u64::from_le_bytes(txnid);
Ok((pk_type, txnid))
}
fn attempt_recover_data_batch(&mut self) -> SDSSResult<()> {
let mut max_threshold = RECOVERY_THRESHOLD;
while max_threshold != 0 && self.f.has_left(1) {
if let Ok(MARKER_RECOVERY_EVENT) = self.f.inner_file().read_byte() {
return Ok(());
}
max_threshold -= 1;
}
Err(SDSSError::DataBatchRestoreCorruptedBatch)
}
}
impl<F: RawFileIOInterface> DataBatchRestoreDriver<F> {
fn decode_primary_key(&mut self, pk_type: u8) -> SDSSResult<PrimaryIndexKey> {
let Some(pk_type) = TagUnique::try_from_raw(pk_type) else {
return Err(SDSSError::DataBatchRestoreCorruptedEntry);
};
Ok(match pk_type {
TagUnique::SignedInt | TagUnique::UnsignedInt => {
let mut chunk = [0; sizeof!(u64)];
self.f.read_into_buffer(&mut chunk)?;
unsafe {
// UNSAFE(@ohsayan): +tagck
PrimaryIndexKey::new_from_qw(pk_type, u64::from_le_bytes(chunk))
}
}
TagUnique::Str | TagUnique::Bin => {
let mut len = [0; sizeof!(u64)];
self.f.read_into_buffer(&mut len)?;
let mut data = vec![0; u64::from_le_bytes(len) as usize];
self.f.read_into_buffer(&mut data)?;
if pk_type == TagUnique::Str {
if core::str::from_utf8(&data).is_err() {
return Err(SDSSError::DataBatchRestoreCorruptedEntry);
}
}
unsafe {
// UNSAFE(@ohsayan): +tagck +verityck
let mut md = ManuallyDrop::new(data);
PrimaryIndexKey::new_from_dual(
pk_type,
u64::from_le_bytes(len),
md.as_mut_ptr() as usize,
)
}
}
_ => unsafe {
// UNSAFE(@ohsayan): TagUnique::try_from_raw rejects an construction with Invalid as the dscr
impossible!()
},
})
}
fn decode_cell(&mut self) -> SDSSResult<Datacell> {
let cell_type_sig = self.f.read_byte()?;
let Some(cell_type) = PersistTypeDscr::try_from_raw(cell_type_sig) else {
return Err(SDSSError::DataBatchRestoreCorruptedEntry);
};
Ok(match cell_type {
PersistTypeDscr::Null => Datacell::null(),
PersistTypeDscr::Bool => {
let bool = self.f.read_byte()?;
if bool > 1 {
return Err(SDSSError::DataBatchRestoreCorruptedEntry);
}
Datacell::new_bool(bool == 1)
}
PersistTypeDscr::UnsignedInt | PersistTypeDscr::SignedInt | PersistTypeDscr::Float => {
let mut block = [0; sizeof!(u64)];
self.f.read_into_buffer(&mut block)?;
unsafe {
// UNSAFE(@ohsayan): choosing the correct type and tag
let tc = TagClass::from_raw(cell_type.value_u8() - 1);
Datacell::new_qw(u64::from_le_bytes(block), CUTag::new(tc, tc.tag_unique()))
}
}
PersistTypeDscr::Str | PersistTypeDscr::Bin => {
let mut len_block = [0; sizeof!(u64)];
self.f.read_into_buffer(&mut len_block)?;
let len = u64::from_le_bytes(len_block) as usize;
let mut data = vec![0; len];
self.f.read_into_buffer(&mut data)?;
unsafe {
// UNSAFE(@ohsayan): +tagck
if cell_type == PersistTypeDscr::Str {
if core::str::from_utf8(&data).is_err() {
return Err(SDSSError::DataBatchRestoreCorruptedEntry);
}
Datacell::new_str(String::from_utf8_unchecked(data).into_boxed_str())
} else {
Datacell::new_bin(data.into_boxed_slice())
}
}
}
PersistTypeDscr::List => {
let mut len_block = [0; sizeof!(u64)];
self.f.read_into_buffer(&mut len_block)?;
let len = u64::from_le_bytes(len_block);
let mut list = Vec::new();
while !self.f.is_eof() && list.len() as u64 != len {
list.push(self.decode_cell()?);
}
if len != list.len() as u64 {
return Err(SDSSError::DataBatchRestoreCorruptedEntry);
}
Datacell::new_list(list)
}
PersistTypeDscr::Dict => {
// we don't support dicts just yet
return Err(SDSSError::DataBatchRestoreCorruptedEntry);
}
})
}
}

@ -72,14 +72,14 @@ mod dr;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, sky_macros::EnumMethods)]
pub enum FileScope {
Journal = 0,
TransactionLogCompacted = 1,
DataBatch = 1,
}
impl FileScope {
pub const fn try_new(id: u64) -> Option<Self> {
Some(match id {
0 => Self::Journal,
1 => Self::TransactionLogCompacted,
1 => Self::DataBatch,
_ => return None,
})
}
@ -95,16 +95,18 @@ impl FileScope {
#[repr(u8)]
pub enum FileSpecifier {
GNSTxnLog = 0,
TableDataBatch = 1,
#[cfg(test)]
TestTransactionLog = 1,
TestTransactionLog = 0xFF,
}
impl FileSpecifier {
pub const fn try_new(v: u32) -> Option<Self> {
Some(match v {
0 => Self::GNSTxnLog,
1 => Self::TableDataBatch,
#[cfg(test)]
1 => Self::TestTransactionLog,
0xFF => Self::TestTransactionLog,
_ => return None,
})
}

@ -27,7 +27,7 @@
use {
super::{
obj::{self, FieldMD},
PersistDictEntryDscr, PersistMapSpec, PersistObject, VecU8,
PersistTypeDscr, PersistMapSpec, PersistObject, VecU8,
},
crate::{
engine::{
@ -178,7 +178,7 @@ impl PersistMapSpec for GenericDictSpec {
fn pretest_entry_data(scanner: &BufferedScanner, md: &Self::EntryMD) -> bool {
static EXPECT_ATLEAST: [u8; 4] = [0, 1, 8, 8]; // PAD to align
let lbound_rem = md.klen + EXPECT_ATLEAST[cmp::min(md.dscr, 3) as usize] as usize;
scanner.has_left(lbound_rem) & (md.dscr <= PersistDictEntryDscr::Dict.value_u8())
scanner.has_left(lbound_rem) & (md.dscr <= PersistTypeDscr::Dict.value_u8())
}
fn entry_md_enc(buf: &mut VecU8, key: &Self::Key, _: &Self::Value) {
buf.extend(key.len().u64_bytes_le());
@ -189,7 +189,7 @@ impl PersistMapSpec for GenericDictSpec {
fn enc_entry(buf: &mut VecU8, key: &Self::Key, val: &Self::Value) {
match val {
DictEntryGeneric::Map(map) => {
buf.push(PersistDictEntryDscr::Dict.value_u8());
buf.push(PersistTypeDscr::Dict.value_u8());
buf.extend(key.as_bytes());
<PersistMapImpl<Self> as PersistObject>::default_full_enc(buf, map);
}
@ -208,17 +208,17 @@ impl PersistMapSpec for GenericDictSpec {
unsafe fn dec_val(scanner: &mut BufferedScanner, md: &Self::EntryMD) -> Option<Self::Value> {
unsafe fn decode_element(
scanner: &mut BufferedScanner,
dscr: PersistDictEntryDscr,
dscr: PersistTypeDscr,
dg_top_element: bool,
) -> Option<DictEntryGeneric> {
let r = match dscr {
PersistDictEntryDscr::Null => DictEntryGeneric::Data(Datacell::null()),
PersistDictEntryDscr::Bool => {
PersistTypeDscr::Null => DictEntryGeneric::Data(Datacell::null()),
PersistTypeDscr::Bool => {
DictEntryGeneric::Data(Datacell::new_bool(scanner.next_byte() == 1))
}
PersistDictEntryDscr::UnsignedInt
| PersistDictEntryDscr::SignedInt
| PersistDictEntryDscr::Float => DictEntryGeneric::Data(Datacell::new_qw(
PersistTypeDscr::UnsignedInt
| PersistTypeDscr::SignedInt
| PersistTypeDscr::Float => DictEntryGeneric::Data(Datacell::new_qw(
scanner.next_u64_le(),
CUTag::new(
dscr.into_class(),
@ -230,13 +230,13 @@ impl PersistMapSpec for GenericDictSpec {
][(dscr.value_u8() - 2) as usize],
),
)),
PersistDictEntryDscr::Str | PersistDictEntryDscr::Bin => {
PersistTypeDscr::Str | PersistTypeDscr::Bin => {
let slc_len = scanner.next_u64_le() as usize;
if !scanner.has_left(slc_len) {
return None;
}
let slc = scanner.next_chunk_variable(slc_len);
DictEntryGeneric::Data(if dscr == PersistDictEntryDscr::Str {
DictEntryGeneric::Data(if dscr == PersistTypeDscr::Str {
if core::str::from_utf8(slc).is_err() {
return None;
}
@ -247,18 +247,18 @@ impl PersistMapSpec for GenericDictSpec {
Datacell::new_bin(slc.to_owned().into_boxed_slice())
})
}
PersistDictEntryDscr::List => {
PersistTypeDscr::List => {
let list_len = scanner.next_u64_le() as usize;
let mut v = Vec::with_capacity(list_len);
while (!scanner.eof()) & (v.len() < list_len) {
let dscr = scanner.next_byte();
if dscr > PersistDictEntryDscr::Dict.value_u8() {
if dscr > PersistTypeDscr::Dict.value_u8() {
return None;
}
v.push(
match decode_element(
scanner,
PersistDictEntryDscr::from_raw(dscr),
PersistTypeDscr::from_raw(dscr),
false,
) {
Some(DictEntryGeneric::Data(l)) => l,
@ -273,7 +273,7 @@ impl PersistMapSpec for GenericDictSpec {
return None;
}
}
PersistDictEntryDscr::Dict => {
PersistTypeDscr::Dict => {
if dg_top_element {
DictEntryGeneric::Map(
<PersistMapImpl<GenericDictSpec> as PersistObject>::default_full_dec(
@ -288,7 +288,7 @@ impl PersistMapSpec for GenericDictSpec {
};
Some(r)
}
decode_element(scanner, PersistDictEntryDscr::from_raw(md.dscr), true)
decode_element(scanner, PersistTypeDscr::from_raw(md.dscr), true)
}
// not implemented
fn enc_key(_: &mut VecU8, _: &Self::Key) {

@ -51,7 +51,7 @@ type VecU8 = Vec<u8>;
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, sky_macros::EnumMethods)]
#[repr(u8)]
/// Disambiguation for data
pub enum PersistDictEntryDscr {
pub enum PersistTypeDscr {
Null = 0,
Bool = 1,
UnsignedInt = 2,
@ -63,11 +63,19 @@ pub enum PersistDictEntryDscr {
Dict = 8,
}
impl PersistDictEntryDscr {
impl PersistTypeDscr {
pub(super) const MAX: Self = Self::Dict;
/// translates the tag class definition into the dscr definition
pub const fn translate_from_class(class: TagClass) -> Self {
unsafe { Self::from_raw(class.d() + 1) }
}
pub const fn try_from_raw(v: u8) -> Option<Self> {
if v > Self::MAX.value_u8() {
None
} else {
unsafe { Some(Self::from_raw(v)) }
}
}
pub const unsafe fn from_raw(v: u8) -> Self {
core::mem::transmute(v)
}

@ -27,7 +27,7 @@
use crate::engine::{core::model::delta::IRModel, data::DictGeneric};
use {
super::{PersistDictEntryDscr, PersistObject, VecU8},
super::{PersistTypeDscr, PersistObject, VecU8},
crate::{
engine::{
core::{
@ -71,7 +71,7 @@ pub fn encode_element(buf: &mut VecU8, dc: &Datacell) {
pub fn encode_datacell_tag(buf: &mut VecU8, dc: &Datacell) {
buf.push(
PersistDictEntryDscr::translate_from_class(dc.tag().tag_class()).value_u8()
PersistTypeDscr::translate_from_class(dc.tag().tag_class()).value_u8()
* (!dc.is_null() as u8),
)
}

@ -68,16 +68,18 @@ pub fn null_journal<TA: JournalAdapter>(
host_startup_counter: u64,
_: &TA::GlobalState,
) -> JournalWriter<super::rw::NullZero, TA> {
let FileOpen::Created(journal) = SDSSFileIO::<super::rw::NullZero>::open_or_create_perm_rw(
log_file_name,
FileScope::Journal,
log_kind,
log_kind_version,
host_setting_version,
host_run_mode,
host_startup_counter,
)
.unwrap() else {
let FileOpen::Created(journal) =
SDSSFileIO::<super::rw::NullZero>::open_or_create_perm_rw::<false>(
log_file_name,
FileScope::Journal,
log_kind,
log_kind_version,
host_setting_version,
host_run_mode,
host_startup_counter,
)
.unwrap()
else {
panic!()
};
JournalWriter::new(journal, 0, true).unwrap()
@ -92,15 +94,25 @@ pub fn open_journal<TA: JournalAdapter, LF: RawFileIOInterface>(
host_startup_counter: u64,
gs: &TA::GlobalState,
) -> SDSSResult<JournalWriter<LF, TA>> {
let f = SDSSFileIO::<LF>::open_or_create_perm_rw(
log_file_name,
FileScope::Journal,
log_kind,
log_kind_version,
host_setting_version,
host_run_mode,
host_startup_counter,
)?;
macro_rules! open_file {
($modify:literal) => {
SDSSFileIO::<LF>::open_or_create_perm_rw::<$modify>(
log_file_name,
FileScope::Journal,
log_kind,
log_kind_version,
host_setting_version,
host_run_mode,
host_startup_counter,
)
};
}
// HACK(@ohsayan): until generic const exprs are stabilized, we're in a state of hell
let f = if TA::DENY_NONAPPEND {
open_file!(false)
} else {
open_file!(true)
}?;
let file = match f {
FileOpen::Created(f) => return JournalWriter::new(f, 0, true),
FileOpen::Existing(file, _) => file,
@ -111,6 +123,8 @@ pub fn open_journal<TA: JournalAdapter, LF: RawFileIOInterface>(
/// The journal adapter
pub trait JournalAdapter {
/// deny any SDSS file level operations that require non-append mode writes (for example, updating the SDSS header's modify count)
const DENY_NONAPPEND: bool = true;
/// enable/disable automated recovery algorithms
const RECOVERY_PLUGIN: bool;
/// The journal event

@ -27,6 +27,7 @@
// raw
mod header_impl;
// impls
mod batch_jrnl;
mod journal;
mod rw;
// hl
@ -110,6 +111,16 @@ pub enum SDSSError {
InternalDecodeStructureCorruptedPayload,
/// the data for an internal structure was decoded but is logically invalid
InternalDecodeStructureIllegalData,
/// when attempting to flush a data batch, the batch journal crashed and a recovery event was triggered. But even then,
/// the data batch journal could not be fixed
DataBatchRecoveryFailStageOne,
/// when attempting to restore a data batch from disk, the batch journal crashed and had a corruption, but it is irrecoverable
DataBatchRestoreCorruptedBatch,
/// when attempting to restore a data batch from disk, the driver encountered a corrupted entry
DataBatchRestoreCorruptedEntry,
/// we failed to close the data batch
DataBatchCloseError,
DataBatchRestoreCorruptedBatchFile,
}
impl SDSSError {

@ -31,7 +31,10 @@ use {
},
SDSSResult,
},
crate::engine::storage::v1::SDSSError,
crate::{
engine::storage::{v1::SDSSError, SCrc},
util::os::SysIOError,
},
std::{
fs::File,
io::{Read, Seek, SeekFrom, Write},
@ -46,6 +49,21 @@ pub enum FileOpen<F> {
Existing(F, SDSSHeader),
}
impl<F> FileOpen<F> {
pub fn into_existing(self) -> Option<(F, SDSSHeader)> {
match self {
Self::Existing(f, h) => Some((f, h)),
Self::Created(_) => None,
}
}
pub fn into_created(self) -> Option<F> {
match self {
Self::Created(f) => Some(f),
Self::Existing(_, _) => None,
}
}
}
#[derive(Debug)]
pub enum RawFileOpen<F> {
Created(F),
@ -138,6 +156,103 @@ impl RawFileIOInterface for File {
}
}
pub struct SDSSFileTrackedWriter<F> {
f: SDSSFileIO<F>,
cs: SCrc,
}
impl<F: RawFileIOInterface> SDSSFileTrackedWriter<F> {
pub fn new(f: SDSSFileIO<F>) -> Self {
Self { f, cs: SCrc::new() }
}
pub fn unfsynced_write(&mut self, block: &[u8]) -> SDSSResult<()> {
match self.f.unfsynced_write(block) {
Ok(()) => {
self.cs.recompute_with_new_var_block(block);
Ok(())
}
e => e,
}
}
pub fn fsync_all(&mut self) -> SDSSResult<()> {
self.f.fsync_all()
}
pub fn reset_and_finish_checksum(&mut self) -> u64 {
let mut scrc = SCrc::new();
core::mem::swap(&mut self.cs, &mut scrc);
scrc.finish()
}
pub fn inner_file(&mut self) -> &mut SDSSFileIO<F> {
&mut self.f
}
}
/// [`SDSSFileLenTracked`] simply maintains application level length and checksum tracking to avoid frequent syscalls because we
/// do not expect (even though it's very possible) users to randomly modify file lengths while we're reading them
pub struct SDSSFileTrackedReader<F> {
f: SDSSFileIO<F>,
len: u64,
pos: u64,
cs: SCrc,
}
impl<F: RawFileIOInterface> SDSSFileTrackedReader<F> {
/// Important: this will only look at the data post the current cursor!
pub fn new(mut f: SDSSFileIO<F>) -> SDSSResult<Self> {
let len = f.file_length()?;
let pos = f.retrieve_cursor()?;
Ok(Self {
f,
len,
pos,
cs: SCrc::new(),
})
}
pub fn remaining(&self) -> u64 {
self.len - self.pos
}
pub fn is_eof(&self) -> bool {
self.len == self.pos
}
pub fn has_left(&self, v: u64) -> bool {
self.remaining() >= v
}
pub fn read_into_buffer(&mut self, buf: &mut [u8]) -> SDSSResult<()> {
if self.remaining() >= buf.len() as u64 {
match self.f.read_to_buffer(buf) {
Ok(()) => {
self.pos += buf.len() as u64;
self.cs.recompute_with_new_var_block(buf);
Ok(())
}
Err(e) => return Err(e),
}
} else {
Err(SDSSError::IoError(SysIOError::from(
std::io::ErrorKind::InvalidInput,
)))
}
}
pub fn read_byte(&mut self) -> SDSSResult<u8> {
let mut buf = [0u8; 1];
self.read_into_buffer(&mut buf).map(|_| buf[0])
}
pub fn __reset_checksum(&mut self) -> u64 {
let mut crc = SCrc::new();
core::mem::swap(&mut crc, &mut self.cs);
crc.finish()
}
pub fn inner_file(&mut self) -> &mut SDSSFileIO<F> {
&mut self.f
}
pub fn into_inner_file(self) -> SDSSFileIO<F> {
self.f
}
pub fn __cursor_ahead_by(&mut self, sizeof: usize) {
self.pos += sizeof as u64;
}
}
#[derive(Debug)]
pub struct SDSSFileIO<F> {
f: F,
@ -145,7 +260,7 @@ pub struct SDSSFileIO<F> {
impl<F: RawFileIOInterface> SDSSFileIO<F> {
/// **IMPORTANT: File position: end-of-header-section**
pub fn open_or_create_perm_rw(
pub fn open_or_create_perm_rw<const REWRITE_MODIFY_COUNTER: bool>(
file_path: &str,
file_scope: FileScope,
file_specifier: FileSpecifier,
@ -180,13 +295,15 @@ impl<F: RawFileIOInterface> SDSSFileIO<F> {
.ok_or(SDSSError::HeaderDecodeCorruptedHeader)?;
// now validate the header
header.verify(file_scope, file_specifier, file_specifier_version)?;
// since we updated this file, let us update the header
let mut new_header = header.clone();
new_header.dr_rs_mut().bump_modify_count();
let mut f = Self::_new(f);
f.seek_from_start(0)?;
f.fsynced_write(new_header.encoded().array().as_ref())?;
f.seek_from_start(SDSSHeaderRaw::header_size() as _)?;
if REWRITE_MODIFY_COUNTER {
// since we updated this file, let us update the header
let mut new_header = header.clone();
new_header.dr_rs_mut().bump_modify_count();
f.seek_from_start(0)?;
f.fsynced_write(new_header.encoded().array().as_ref())?;
f.seek_from_start(SDSSHeaderRaw::header_size() as _)?;
}
Ok(FileOpen::Existing(f, header))
}
}
@ -223,6 +340,10 @@ impl<F: RawFileIOInterface> SDSSFileIO<F> {
pub fn retrieve_cursor(&mut self) -> SDSSResult<u64> {
self.f.fcursor()
}
pub fn read_byte(&mut self) -> SDSSResult<u8> {
let mut r = [0; 1];
self.read_to_buffer(&mut r).map(|_| r[0])
}
}
pub struct BufferedScanner<'a> {

@ -69,6 +69,11 @@ impl VFile {
#[derive(Debug)]
pub struct VirtualFS(Box<str>);
impl VirtualFS {
pub fn get_file_data(f: &str) -> Option<Vec<u8>> {
VFS.read().get(f).map(|f| f.data.clone())
}
}
impl RawFileIOInterface for VirtualFS {
fn fopen_or_create_rw(file_path: &str) -> super::SDSSResult<RawFileOpen<Self>> {
@ -181,7 +186,7 @@ impl RawFileIOInterface for VirtualFS {
#[test]
fn sdss_file() {
let f = SDSSFileIO::<VirtualFS>::open_or_create_perm_rw(
let f = SDSSFileIO::<VirtualFS>::open_or_create_perm_rw::<false>(
"this_is_a_test_file.db",
FileScope::Journal,
FileSpecifier::TestTransactionLog,
@ -199,7 +204,7 @@ fn sdss_file() {
f.fsynced_write(b"hello, world\n").unwrap();
f.fsynced_write(b"hello, again\n").unwrap();
let f = SDSSFileIO::<VirtualFS>::open_or_create_perm_rw(
let f = SDSSFileIO::<VirtualFS>::open_or_create_perm_rw::<false>(
"this_is_a_test_file.db",
FileScope::Journal,
FileSpecifier::TestTransactionLog,

@ -26,237 +26,6 @@
type VirtualFS = super::test_util::VirtualFS;
mod rw {
use crate::engine::storage::v1::{
header_impl::{FileScope, FileSpecifier, FileSpecifierVersion, HostRunMode},
rw::{FileOpen, SDSSFileIO},
};
#[test]
fn create_delete() {
let f = SDSSFileIO::<super::VirtualFS>::open_or_create_perm_rw(
"hello_world.db-tlog",
FileScope::TransactionLogCompacted,
FileSpecifier::GNSTxnLog,
FileSpecifierVersion::__new(0),
0,
HostRunMode::Prod,
0,
)
.unwrap();
match f {
FileOpen::Existing(_, _) => panic!(),
FileOpen::Created(_) => {}
};
let open = SDSSFileIO::<super::VirtualFS>::open_or_create_perm_rw(
"hello_world.db-tlog",
FileScope::TransactionLogCompacted,
FileSpecifier::GNSTxnLog,
FileSpecifierVersion::__new(0),
0,
HostRunMode::Prod,
0,
)
.unwrap();
let h = match open {
FileOpen::Existing(_, header) => header,
_ => panic!(),
};
assert_eq!(h.gr_mdr().file_scope(), FileScope::TransactionLogCompacted);
assert_eq!(h.gr_mdr().file_spec(), FileSpecifier::GNSTxnLog);
assert_eq!(h.gr_mdr().file_spec_id(), FileSpecifierVersion::__new(0));
assert_eq!(h.gr_hr().run_mode(), HostRunMode::Prod);
assert_eq!(h.gr_hr().setting_version(), 0);
assert_eq!(h.gr_hr().startup_counter(), 0);
}
}
mod tx {
use crate::engine::storage::v1::header_impl::{
FileSpecifier, FileSpecifierVersion, HostRunMode,
};
use {
crate::{
engine::storage::v1::{
journal::{self, JournalAdapter, JournalWriter},
SDSSError, SDSSResult,
},
util,
},
std::cell::RefCell,
};
pub struct Database {
data: RefCell<[u8; 10]>,
}
impl Database {
fn copy_data(&self) -> [u8; 10] {
*self.data.borrow()
}
fn new() -> Self {
Self {
data: RefCell::new([0; 10]),
}
}
fn reset(&self) {
*self.data.borrow_mut() = [0; 10];
}
fn txn_reset(
&self,
txn_writer: &mut JournalWriter<super::VirtualFS, DatabaseTxnAdapter>,
) -> SDSSResult<()> {
self.reset();
txn_writer.append_event(TxEvent::Reset)
}
fn set(&self, pos: usize, val: u8) {
self.data.borrow_mut()[pos] = val;
}
fn txn_set(
&self,
pos: usize,
val: u8,
txn_writer: &mut JournalWriter<super::VirtualFS, DatabaseTxnAdapter>,
) -> SDSSResult<()> {
self.set(pos, val);
txn_writer.append_event(TxEvent::Set(pos, val))
}
}
pub enum TxEvent {
Reset,
Set(usize, u8),
}
#[derive(Debug)]
pub enum TxError {
SDSS(SDSSError),
}
direct_from! {
TxError => {
SDSSError as SDSS
}
}
#[derive(Debug)]
pub struct DatabaseTxnAdapter;
impl JournalAdapter for DatabaseTxnAdapter {
const RECOVERY_PLUGIN: bool = false;
type Error = TxError;
type JournalEvent = TxEvent;
type GlobalState = Database;
fn encode(event: Self::JournalEvent) -> Box<[u8]> {
/*
[1B: opcode][8B:Index][1B: New value]
*/
let opcode = match event {
TxEvent::Reset => 0u8,
TxEvent::Set(_, _) => 1u8,
};
let index = match event {
TxEvent::Reset => 0u64,
TxEvent::Set(index, _) => index as u64,
};
let new_value = match event {
TxEvent::Reset => 0,
TxEvent::Set(_, val) => val,
};
let mut ret = Vec::with_capacity(10);
ret.push(opcode);
ret.extend(index.to_le_bytes());
ret.push(new_value);
ret.into_boxed_slice()
}
fn decode_and_update_state(payload: &[u8], gs: &Self::GlobalState) -> Result<(), TxError> {
if payload.len() != 10 {
return Err(SDSSError::CorruptedFile("testtxn.log").into());
}
let opcode = payload[0];
let index = u64::from_le_bytes(util::copy_slice_to_array(&payload[1..9]));
let new_value = payload[9];
match opcode {
0 if index == 0 && new_value == 0 => gs.reset(),
1 if index < 10 && index < isize::MAX as u64 => gs.set(index as usize, new_value),
_ => return Err(SDSSError::JournalLogEntryCorrupted.into()),
}
Ok(())
}
}
fn open_log(
log_name: &str,
db: &Database,
) -> SDSSResult<JournalWriter<super::VirtualFS, DatabaseTxnAdapter>> {
journal::open_journal::<DatabaseTxnAdapter, super::VirtualFS>(
log_name,
FileSpecifier::TestTransactionLog,
FileSpecifierVersion::__new(0),
0,
HostRunMode::Prod,
1,
&db,
)
}
#[test]
fn first_boot_second_readonly() {
// create log
let db1 = Database::new();
let x = || -> SDSSResult<()> {
let mut log = open_log("testtxn.log", &db1)?;
db1.txn_set(0, 20, &mut log)?;
db1.txn_set(9, 21, &mut log)?;
log.append_journal_close_and_close()
};
x().unwrap();
// backup original data
let original_data = db1.copy_data();
// restore log
let empty_db2 = Database::new();
open_log("testtxn.log", &empty_db2)
.unwrap()
.append_journal_close_and_close()
.unwrap();
assert_eq!(original_data, empty_db2.copy_data());
}
#[test]
fn oneboot_mod_twoboot_mod_thirdboot_read() {
// first boot: set all to 1
let db1 = Database::new();
let x = || -> SDSSResult<()> {
let mut log = open_log("duatxn.db-tlog", &db1)?;
for i in 0..10 {
db1.txn_set(i, 1, &mut log)?;
}
log.append_journal_close_and_close()
};
x().unwrap();
let bkp_db1 = db1.copy_data();
drop(db1);
// second boot
let db2 = Database::new();
let x = || -> SDSSResult<()> {
let mut log = open_log("duatxn.db-tlog", &db2)?;
assert_eq!(bkp_db1, db2.copy_data());
for i in 0..10 {
let current_val = db2.data.borrow()[i];
db2.txn_set(i, current_val + i as u8, &mut log)?;
}
log.append_journal_close_and_close()
};
x().unwrap();
let bkp_db2 = db2.copy_data();
drop(db2);
// third boot
let db3 = Database::new();
let log = open_log("duatxn.db-tlog", &db3).unwrap();
log.append_journal_close_and_close().unwrap();
assert_eq!(bkp_db2, db3.copy_data());
assert_eq!(
db3.copy_data(),
(1..=10)
.into_iter()
.map(u8::from)
.collect::<Box<[u8]>>()
.as_ref()
);
}
}
mod batch;
mod rw;
mod tx;

@ -0,0 +1,210 @@
/*
* Created on Wed Sep 06 2023
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2023, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use {
crate::engine::{
core::{
index::{DcFieldIndex, PrimaryIndexKey, Row},
model::{
delta::{DataDelta, DataDeltaKind, DeltaVersion},
Field, Layer, Model,
},
},
data::{cell::Datacell, tag::TagSelector, uuid::Uuid},
storage::v1::{
batch_jrnl::{
DataBatchPersistDriver, DataBatchRestoreDriver, DecodedBatchEvent,
DecodedBatchEventKind, NormalBatch,
},
header_meta::{FileScope, FileSpecifier, FileSpecifierVersion, HostRunMode},
rw::{FileOpen, SDSSFileIO},
test_util::VirtualFS,
},
},
crossbeam_epoch::pin,
};
fn pkey(v: impl Into<Datacell>) -> PrimaryIndexKey {
PrimaryIndexKey::try_from_dc(v.into()).unwrap()
}
fn open_file(fpath: &str) -> FileOpen<SDSSFileIO<VirtualFS>> {
SDSSFileIO::open_or_create_perm_rw::<false>(
fpath,
FileScope::DataBatch,
FileSpecifier::TableDataBatch,
FileSpecifierVersion::__new(0),
0,
HostRunMode::Dev,
1,
)
.unwrap()
}
fn open_batch_data(fpath: &str, mdl: &Model) -> DataBatchPersistDriver<VirtualFS> {
match open_file(fpath) {
FileOpen::Created(f) => DataBatchPersistDriver::new(f, true),
FileOpen::Existing(f, _) => {
let mut dbr = DataBatchRestoreDriver::new(f).unwrap();
dbr.read_data_batch_into_model(mdl).unwrap();
DataBatchPersistDriver::new(dbr.into_file(), false)
}
}
.unwrap()
}
fn new_delta(
schema: u64,
txnid: u64,
pk: Datacell,
data: DcFieldIndex,
change: DataDeltaKind,
) -> DataDelta {
new_delta_with_row(
schema,
txnid,
Row::new(
pkey(pk),
data,
DeltaVersion::test_new(schema),
DeltaVersion::test_new(txnid),
),
change,
)
}
fn new_delta_with_row(schema: u64, txnid: u64, row: Row, change: DataDeltaKind) -> DataDelta {
DataDelta::new(
DeltaVersion::test_new(schema),
DeltaVersion::test_new(txnid),
row,
change,
)
}
#[test]
fn deltas_only_insert() {
// prepare model definition
let uuid = Uuid::new();
let mdl = Model::new_restore(
uuid,
"catname".into(),
TagSelector::Str.into_full(),
into_dict!(
"catname" => Field::new([Layer::str()].into(), false),
"is_good" => Field::new([Layer::bool()].into(), false),
"magical" => Field::new([Layer::bool()].into(), false),
),
);
let row = Row::new(
pkey("Schrödinger's cat"),
into_dict!("is_good" => Datacell::new_bool(true), "magical" => Datacell::new_bool(false)),
DeltaVersion::test_new(0),
DeltaVersion::test_new(2),
);
{
// update the row
let mut wl = row.d_data().write();
wl.set_txn_revised(DeltaVersion::test_new(3));
*wl.fields_mut().get_mut("magical").unwrap() = Datacell::new_bool(true);
}
// prepare deltas
let deltas = [
// insert catname: Schrödinger's cat, is_good: true
new_delta_with_row(0, 0, row.clone(), DataDeltaKind::Insert),
// insert catname: good cat, is_good: true, magical: false
new_delta(
0,
1,
Datacell::new_str("good cat".into()),
into_dict!("is_good" => Datacell::new_bool(true), "magical" => Datacell::new_bool(false)),
DataDeltaKind::Insert,
),
// insert catname: bad cat, is_good: false, magical: false
new_delta(
0,
2,
Datacell::new_str("bad cat".into()),
into_dict!("is_good" => Datacell::new_bool(false), "magical" => Datacell::new_bool(false)),
DataDeltaKind::Insert,
),
// update catname: Schrödinger's cat, is_good: true, magical: true
new_delta_with_row(0, 3, row.clone(), DataDeltaKind::Update),
];
// delta queue
let g = pin();
for delta in deltas.clone() {
mdl.delta_state().append_new_data_delta(delta, &g);
}
let file = open_file("deltas_only_insert.db-btlog")
.into_created()
.unwrap();
{
let mut persist_driver = DataBatchPersistDriver::new(file, true).unwrap();
persist_driver.write_new_batch(&mdl, deltas.len()).unwrap();
persist_driver.close().unwrap();
}
let mut restore_driver = DataBatchRestoreDriver::new(
open_file("deltas_only_insert.db-btlog")
.into_existing()
.unwrap()
.0,
)
.unwrap();
let batch = restore_driver.read_all_batches().unwrap();
assert_eq!(
batch,
vec![NormalBatch::new(
vec![
DecodedBatchEvent::new(
1,
pkey("good cat"),
DecodedBatchEventKind::Insert(vec![
Datacell::new_bool(true),
Datacell::new_bool(false)
])
),
DecodedBatchEvent::new(
2,
pkey("bad cat"),
DecodedBatchEventKind::Insert(vec![
Datacell::new_bool(false),
Datacell::new_bool(false)
])
),
DecodedBatchEvent::new(
3,
pkey("Schrödinger's cat"),
DecodedBatchEventKind::Update(vec![
Datacell::new_bool(true),
Datacell::new_bool(true)
])
)
],
0
)]
)
}

@ -0,0 +1,68 @@
/*
* Created on Tue Sep 05 2023
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2023, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use crate::engine::storage::v1::{
header_impl::{FileScope, FileSpecifier, FileSpecifierVersion, HostRunMode},
rw::{FileOpen, SDSSFileIO},
};
#[test]
fn create_delete() {
let f = SDSSFileIO::<super::VirtualFS>::open_or_create_perm_rw::<false>(
"hello_world.db-tlog",
FileScope::Journal,
FileSpecifier::GNSTxnLog,
FileSpecifierVersion::__new(0),
0,
HostRunMode::Prod,
0,
)
.unwrap();
match f {
FileOpen::Existing(_, _) => panic!(),
FileOpen::Created(_) => {}
};
let open = SDSSFileIO::<super::VirtualFS>::open_or_create_perm_rw::<false>(
"hello_world.db-tlog",
FileScope::Journal,
FileSpecifier::GNSTxnLog,
FileSpecifierVersion::__new(0),
0,
HostRunMode::Prod,
0,
)
.unwrap();
let h = match open {
FileOpen::Existing(_, header) => header,
_ => panic!(),
};
assert_eq!(h.gr_mdr().file_scope(), FileScope::Journal);
assert_eq!(h.gr_mdr().file_spec(), FileSpecifier::GNSTxnLog);
assert_eq!(h.gr_mdr().file_spec_id(), FileSpecifierVersion::__new(0));
assert_eq!(h.gr_hr().run_mode(), HostRunMode::Prod);
assert_eq!(h.gr_hr().setting_version(), 0);
assert_eq!(h.gr_hr().startup_counter(), 0);
}

@ -0,0 +1,210 @@
/*
* Created on Tue Sep 05 2023
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2023, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use {
crate::{
engine::storage::v1::{
header_impl::{FileSpecifier, FileSpecifierVersion, HostRunMode},
journal::{self, JournalAdapter, JournalWriter},
SDSSError, SDSSResult,
},
util,
},
std::cell::RefCell,
};
pub struct Database {
data: RefCell<[u8; 10]>,
}
impl Database {
fn copy_data(&self) -> [u8; 10] {
*self.data.borrow()
}
fn new() -> Self {
Self {
data: RefCell::new([0; 10]),
}
}
fn reset(&self) {
*self.data.borrow_mut() = [0; 10];
}
fn txn_reset(
&self,
txn_writer: &mut JournalWriter<super::VirtualFS, DatabaseTxnAdapter>,
) -> SDSSResult<()> {
self.reset();
txn_writer.append_event(TxEvent::Reset)
}
fn set(&self, pos: usize, val: u8) {
self.data.borrow_mut()[pos] = val;
}
fn txn_set(
&self,
pos: usize,
val: u8,
txn_writer: &mut JournalWriter<super::VirtualFS, DatabaseTxnAdapter>,
) -> SDSSResult<()> {
self.set(pos, val);
txn_writer.append_event(TxEvent::Set(pos, val))
}
}
pub enum TxEvent {
Reset,
Set(usize, u8),
}
#[derive(Debug)]
pub enum TxError {
SDSS(SDSSError),
}
direct_from! {
TxError => {
SDSSError as SDSS
}
}
#[derive(Debug)]
pub struct DatabaseTxnAdapter;
impl JournalAdapter for DatabaseTxnAdapter {
const RECOVERY_PLUGIN: bool = false;
type Error = TxError;
type JournalEvent = TxEvent;
type GlobalState = Database;
fn encode(event: Self::JournalEvent) -> Box<[u8]> {
/*
[1B: opcode][8B:Index][1B: New value]
*/
let opcode = match event {
TxEvent::Reset => 0u8,
TxEvent::Set(_, _) => 1u8,
};
let index = match event {
TxEvent::Reset => 0u64,
TxEvent::Set(index, _) => index as u64,
};
let new_value = match event {
TxEvent::Reset => 0,
TxEvent::Set(_, val) => val,
};
let mut ret = Vec::with_capacity(10);
ret.push(opcode);
ret.extend(index.to_le_bytes());
ret.push(new_value);
ret.into_boxed_slice()
}
fn decode_and_update_state(payload: &[u8], gs: &Self::GlobalState) -> Result<(), TxError> {
if payload.len() != 10 {
return Err(SDSSError::CorruptedFile("testtxn.log").into());
}
let opcode = payload[0];
let index = u64::from_le_bytes(util::copy_slice_to_array(&payload[1..9]));
let new_value = payload[9];
match opcode {
0 if index == 0 && new_value == 0 => gs.reset(),
1 if index < 10 && index < isize::MAX as u64 => gs.set(index as usize, new_value),
_ => return Err(SDSSError::JournalLogEntryCorrupted.into()),
}
Ok(())
}
}
fn open_log(
log_name: &str,
db: &Database,
) -> SDSSResult<JournalWriter<super::VirtualFS, DatabaseTxnAdapter>> {
journal::open_journal::<DatabaseTxnAdapter, super::VirtualFS>(
log_name,
FileSpecifier::TestTransactionLog,
FileSpecifierVersion::__new(0),
0,
HostRunMode::Prod,
1,
&db,
)
}
#[test]
fn first_boot_second_readonly() {
// create log
let db1 = Database::new();
let x = || -> SDSSResult<()> {
let mut log = open_log("testtxn.log", &db1)?;
db1.txn_set(0, 20, &mut log)?;
db1.txn_set(9, 21, &mut log)?;
log.append_journal_close_and_close()
};
x().unwrap();
// backup original data
let original_data = db1.copy_data();
// restore log
let empty_db2 = Database::new();
open_log("testtxn.log", &empty_db2)
.unwrap()
.append_journal_close_and_close()
.unwrap();
assert_eq!(original_data, empty_db2.copy_data());
}
#[test]
fn oneboot_mod_twoboot_mod_thirdboot_read() {
// first boot: set all to 1
let db1 = Database::new();
let x = || -> SDSSResult<()> {
let mut log = open_log("duatxn.db-tlog", &db1)?;
for i in 0..10 {
db1.txn_set(i, 1, &mut log)?;
}
log.append_journal_close_and_close()
};
x().unwrap();
let bkp_db1 = db1.copy_data();
drop(db1);
// second boot
let db2 = Database::new();
let x = || -> SDSSResult<()> {
let mut log = open_log("duatxn.db-tlog", &db2)?;
assert_eq!(bkp_db1, db2.copy_data());
for i in 0..10 {
let current_val = db2.data.borrow()[i];
db2.txn_set(i, current_val + i as u8, &mut log)?;
}
log.append_journal_close_and_close()
};
x().unwrap();
let bkp_db2 = db2.copy_data();
drop(db2);
// third boot
let db3 = Database::new();
let log = open_log("duatxn.db-tlog", &db3).unwrap();
log.append_journal_close_and_close().unwrap();
assert_eq!(bkp_db2, db3.copy_data());
assert_eq!(
db3.copy_data(),
(1..=10)
.into_iter()
.map(u8::from)
.collect::<Box<[u8]>>()
.as_ref()
);
}

@ -25,12 +25,8 @@
*/
use crate::{
engine::{
core::{index::PrimaryIndexKey, model::delta::DataDelta, GlobalNS},
data::cell::Datacell,
storage::v1::inf::obj,
},
util::{os, EndianQW},
engine::core::{model::delta::DataDelta, GlobalNS},
util::os,
};
type Buf = Vec<u8>;
@ -72,28 +68,3 @@ pub unsafe fn get_max_delta_queue_size() -> usize {
// TODO(@ohsayan): dynamically approximate this limit
MAX_NODES_IN_LL_CNT
}
/*
misc. methods
*/
fn encode_primary_key(buf: &mut Buf, pk: &PrimaryIndexKey) {
buf.push(pk.tag().d());
static EXEC: [unsafe fn(&mut Buf, &PrimaryIndexKey); 2] = [
|buf, pk| unsafe { buf.extend(pk.read_uint().to_le_bytes()) },
|buf, pk| unsafe {
let bin = pk.read_bin();
buf.extend(bin.len().u64_bytes_le());
buf.extend(bin);
},
];
unsafe {
// UNSAFE(@ohsayan): tag map
assert!((pk.tag().d() / 2) < 2);
EXEC[(pk.tag().d() / 2) as usize](buf, pk);
}
}
fn encode_dc(buf: &mut Buf, dc: &Datacell) {
obj::encode_element(buf, dc)
}

@ -47,6 +47,7 @@ pub enum TransactionError {
/// On restore, a certain item that was expected to match a certain value, has a different value
OnRestoreDataConflictMismatch,
SDSSError(SDSSError),
OutOfMemory,
}
direct_from! {

@ -52,6 +52,12 @@ impl From<std::io::Error> for SysIOError {
}
}
impl From<std::io::ErrorKind> for SysIOError {
fn from(e: std::io::ErrorKind) -> Self {
Self(e.into())
}
}
#[cfg(test)]
impl PartialEq for SysIOError {
fn eq(&self, other: &Self) -> bool {

Loading…
Cancel
Save