Add tests for skewed and shuffled deltas

next
Sayan Nandan 1 year ago
parent f4d2adc4e8
commit cbac4e6739
No known key found for this signature in database
GPG Key ID: 42EEDF4AE9D96B54

@ -53,6 +53,8 @@ pub struct RowData {
fields: DcFieldIndex,
txn_revised_data: DeltaVersion,
txn_revised_schema_version: DeltaVersion,
// pretty useless from an operational POV; only used during restore
restore_txn_id: DeltaVersion,
}
impl RowData {
@ -68,6 +70,12 @@ impl RowData {
pub fn get_txn_revised(&self) -> DeltaVersion {
self.txn_revised_data
}
pub fn set_restored_txn_revised(&mut self, new: DeltaVersion) {
self.restore_txn_id = new;
}
pub fn get_restored_txn_revised(&self) -> DeltaVersion {
self.restore_txn_id
}
}
impl TreeElement for Row {
@ -99,6 +107,21 @@ impl Row {
data: DcFieldIndex,
schema_version: DeltaVersion,
txn_revised_data: DeltaVersion,
) -> Self {
Self::new_restored(
pk,
data,
schema_version,
txn_revised_data,
DeltaVersion::__new(0),
)
}
pub fn new_restored(
pk: PrimaryIndexKey,
data: DcFieldIndex,
schema_version: DeltaVersion,
txn_revised_data: DeltaVersion,
restore_txn_id: DeltaVersion,
) -> Self {
Self {
__genesis_schema_version: schema_version,
@ -109,6 +132,8 @@ impl Row {
fields: data,
txn_revised_schema_version: schema_version,
txn_revised_data,
// pretty useless here
restore_txn_id,
}))
},
}

@ -265,8 +265,7 @@ impl DeltaVersion {
pub const fn genesis() -> Self {
Self(0)
}
#[cfg(test)]
pub fn test_new(v: u64) -> Self {
pub const fn __new(v: u64) -> Self {
Self(v)
}
fn step(&self) -> Self {

@ -384,7 +384,7 @@ mod exec {
);
assert_eq!(
model.delta_state().schema_current_version(),
DeltaVersion::test_new(2)
DeltaVersion::__new(2)
);
},
)
@ -411,7 +411,7 @@ mod exec {
);
assert_eq!(
mdl.delta_state().schema_current_version(),
DeltaVersion::test_new(4)
DeltaVersion::__new(4)
);
}
).unwrap();

@ -24,30 +24,37 @@
*
*/
use super::{MARKER_BATCH_CLOSED, MARKER_BATCH_REOPEN};
use {
super::{
MARKER_ACTUAL_BATCH_EVENT, MARKER_END_OF_BATCH, MARKER_RECOVERY_EVENT, RECOVERY_THRESHOLD,
MARKER_ACTUAL_BATCH_EVENT, MARKER_BATCH_CLOSED, MARKER_BATCH_REOPEN, MARKER_END_OF_BATCH,
MARKER_RECOVERY_EVENT, RECOVERY_THRESHOLD,
},
crate::engine::{
core::{index::PrimaryIndexKey, model::Model},
core::{
index::{DcFieldIndex, PrimaryIndexKey, Row},
model::{delta::DeltaVersion, Model},
},
data::{
cell::Datacell,
tag::{CUTag, TagClass, TagUnique},
},
idx::{MTIndex, STIndex, STIndexSeq},
storage::v1::{
inf::PersistTypeDscr,
rw::{RawFileIOInterface, SDSSFileIO, SDSSFileTrackedReader},
SDSSError, SDSSResult,
},
},
std::mem::ManuallyDrop,
crossbeam_epoch::pin,
std::{
collections::{hash_map::Entry as HMEntry, HashMap},
mem::ManuallyDrop,
},
};
#[derive(Debug, PartialEq)]
pub(in crate::engine::storage::v1) struct DecodedBatchEvent {
txn_id: u64,
txn_id: DeltaVersion,
pk: PrimaryIndexKey,
kind: DecodedBatchEventKind,
}
@ -58,7 +65,11 @@ impl DecodedBatchEvent {
pk: PrimaryIndexKey,
kind: DecodedBatchEventKind,
) -> Self {
Self { txn_id, pk, kind }
Self {
txn_id: DeltaVersion::__new(txn_id),
pk,
kind,
}
}
}
@ -196,8 +207,93 @@ impl<F: RawFileIOInterface> DataBatchRestoreDriver<F> {
}
impl<F: RawFileIOInterface> DataBatchRestoreDriver<F> {
fn apply_batch(_: &Model, _: NormalBatch) -> SDSSResult<()> {
todo!()
fn apply_batch(
m: &Model,
NormalBatch {
events,
schema_version,
}: NormalBatch,
) -> SDSSResult<()> {
// NOTE(@ohsayan): current complexity is O(n) which is good enough (in the future I might revise this to a fancier impl)
// pin model
let irm = m.intent_read_model();
let g = pin();
let mut pending_delete = HashMap::new();
let p_index = m.primary_index().__raw_index();
// scan rows
for DecodedBatchEvent { txn_id, pk, kind } in events {
match kind {
DecodedBatchEventKind::Insert(new_row) | DecodedBatchEventKind::Update(new_row) => {
// this is more like a "newrow"
match p_index.mt_get_element(&pk, &g) {
Some(row) if row.d_data().read().get_restored_txn_revised() > txn_id => {
// skewed
// resolve deltas if any
let _ = row.resolve_schema_deltas_and_freeze(m.delta_state());
continue;
}
Some(_) | None => {
// new row (logically)
let _ = p_index.mt_delete(&pk, &g);
let mut data = DcFieldIndex::default();
for (field_name, new_data) in irm
.fields()
.stseq_ord_key()
.filter(|key| key.as_ref() != m.p_key())
.zip(new_row)
{
data.st_insert(field_name.clone(), new_data);
}
let row = Row::new_restored(
pk,
data,
DeltaVersion::__new(schema_version),
DeltaVersion::__new(0),
txn_id,
);
// resolve any deltas
let _ = row.resolve_schema_deltas_and_freeze(m.delta_state());
// put it back in (lol); blame @ohsayan for this joke
p_index.mt_insert(row, &g);
}
}
}
DecodedBatchEventKind::Delete => {
match pending_delete.entry(pk) {
HMEntry::Occupied(mut existing_delete) => {
if *existing_delete.get() > txn_id {
// the existing delete "happened after" our delete, so it takes precedence
continue;
}
// the existing delete happened before our delete, so our delete takes precedence
// we have a newer delete for the same key
*existing_delete.get_mut() = txn_id;
}
HMEntry::Vacant(new) => {
// we never deleted this
new.insert(txn_id);
}
}
}
}
}
for (pk, txn_id) in pending_delete {
match p_index.mt_get(&pk, &g) {
Some(row) => {
if row.read().get_restored_txn_revised() > txn_id {
// our delete "happened before" this row was inserted
continue;
}
// yup, go ahead and chuck it
let _ = p_index.mt_delete(&pk, &g);
}
None => {
// since we never delete rows until here, this is quite impossible
unreachable!()
}
}
}
Ok(())
}
}

@ -25,24 +25,28 @@
*/
use {
crate::engine::{
core::{
index::{DcFieldIndex, PrimaryIndexKey, Row},
model::{
delta::{DataDelta, DataDeltaKind, DeltaVersion},
Field, Layer, Model,
crate::{
engine::{
core::{
index::{DcFieldIndex, PrimaryIndexKey, Row},
model::{
delta::{DataDelta, DataDeltaKind, DeltaVersion},
Field, Layer, Model,
},
},
},
data::{cell::Datacell, tag::TagSelector, uuid::Uuid},
storage::v1::{
batch_jrnl::{
DataBatchPersistDriver, DataBatchRestoreDriver, DecodedBatchEvent,
DecodedBatchEventKind, NormalBatch,
data::{cell::Datacell, tag::TagSelector, uuid::Uuid},
idx::MTIndex,
storage::v1::{
batch_jrnl::{
DataBatchPersistDriver, DataBatchRestoreDriver, DecodedBatchEvent,
DecodedBatchEventKind, NormalBatch,
},
header_meta::{FileScope, FileSpecifier, FileSpecifierVersion, HostRunMode},
rw::{FileOpen, SDSSFileIO},
test_util::VirtualFS,
},
header_meta::{FileScope, FileSpecifier, FileSpecifierVersion, HostRunMode},
rw::{FileOpen, SDSSFileIO},
test_util::VirtualFS,
},
util::test_utils,
},
crossbeam_epoch::pin,
};
@ -89,8 +93,8 @@ fn new_delta(
Row::new(
pkey(pk),
data,
DeltaVersion::test_new(schema),
DeltaVersion::test_new(txnid),
DeltaVersion::__new(schema),
DeltaVersion::__new(txnid),
),
change,
)
@ -98,8 +102,8 @@ fn new_delta(
fn new_delta_with_row(schema: u64, txnid: u64, row: Row, change: DataDeltaKind) -> DataDelta {
DataDelta::new(
DeltaVersion::test_new(schema),
DeltaVersion::test_new(txnid),
DeltaVersion::__new(schema),
DeltaVersion::__new(txnid),
row,
change,
)
@ -110,6 +114,16 @@ fn flush_deltas_and_re_read<const N: usize>(
dt: [DataDelta; N],
fname: &str,
) -> Vec<NormalBatch> {
let mut restore_driver = flush_batches_and_return_restore_driver(dt, mdl, fname);
let batch = restore_driver.read_all_batches().unwrap();
batch
}
fn flush_batches_and_return_restore_driver<const N: usize>(
dt: [DataDelta; N],
mdl: &Model,
fname: &str,
) -> DataBatchRestoreDriver<VirtualFS> {
// delta queue
let g = pin();
for delta in dt {
@ -121,10 +135,25 @@ fn flush_deltas_and_re_read<const N: usize>(
persist_driver.write_new_batch(&mdl, N).unwrap();
persist_driver.close().unwrap();
}
let mut restore_driver =
DataBatchRestoreDriver::new(open_file(fname).into_existing().unwrap().0).unwrap();
let batch = restore_driver.read_all_batches().unwrap();
batch
DataBatchRestoreDriver::new(open_file(fname).into_existing().unwrap().0).unwrap()
}
#[test]
fn empty_multi_open_reopen() {
let uuid = Uuid::new();
let mdl = Model::new_restore(
uuid,
"username".into(),
TagSelector::Str.into_full(),
into_dict!(
"username" => Field::new([Layer::str()].into(), false),
"password" => Field::new([Layer::bin()].into(), false)
),
);
for _ in 0..100 {
let writer = open_batch_data("empty_multi_open_reopen.db-btlog", &mdl);
writer.close().unwrap();
}
}
#[test]
@ -213,13 +242,13 @@ fn skewed_delta() {
let row = Row::new(
pkey("Schrödinger's cat"),
into_dict!("is_good" => Datacell::new_bool(true), "magical" => Datacell::new_bool(false)),
DeltaVersion::test_new(0),
DeltaVersion::test_new(2),
DeltaVersion::__new(0),
DeltaVersion::__new(2),
);
{
// update the row
let mut wl = row.d_data().write();
wl.set_txn_revised(DeltaVersion::test_new(3));
wl.set_txn_revised(DeltaVersion::__new(3));
*wl.fields_mut().get_mut("magical").unwrap() = Datacell::new_bool(true);
}
// prepare deltas
@ -279,3 +308,90 @@ fn skewed_delta() {
)]
)
}
#[test]
fn skewed_shuffled_persist_restore() {
let uuid = Uuid::new();
let model = Model::new_restore(
uuid,
"username".into(),
TagSelector::Str.into_full(),
into_dict!("username" => Field::new([Layer::str()].into(), false), "password" => Field::new([Layer::str()].into(), false)),
);
let mongobongo = Row::new(
pkey("mongobongo"),
into_dict!("password" => "dumbo"),
DeltaVersion::__new(0),
DeltaVersion::__new(4),
);
let rds = Row::new(
pkey("rds"),
into_dict!("password" => "snail"),
DeltaVersion::__new(0),
DeltaVersion::__new(5),
);
let deltas = [
new_delta(
0,
0,
"sayan",
into_dict!("password" => "pwd123456"),
DataDeltaKind::Insert,
),
new_delta(
0,
1,
"joseph",
into_dict!("password" => "pwd234567"),
DataDeltaKind::Insert,
),
new_delta(
0,
2,
"haley",
into_dict!("password" => "pwd345678"),
DataDeltaKind::Insert,
),
new_delta(
0,
3,
"charlotte",
into_dict!("password" => "pwd456789"),
DataDeltaKind::Insert,
),
new_delta_with_row(0, 4, mongobongo.clone(), DataDeltaKind::Insert),
new_delta_with_row(0, 5, rds.clone(), DataDeltaKind::Insert),
new_delta_with_row(0, 6, mongobongo.clone(), DataDeltaKind::Delete),
new_delta_with_row(0, 7, rds.clone(), DataDeltaKind::Delete),
];
for i in 0..deltas.len() {
// prepare pretest
let fname = format!("skewed_shuffled_persist_restore_round{i}.db-btlog");
let mut deltas = deltas.clone();
let mut randomizer = test_utils::randomizer();
test_utils::shuffle_slice(&mut deltas, &mut randomizer);
// restore
let mut restore_driver = flush_batches_and_return_restore_driver(deltas, &model, &fname);
restore_driver.read_data_batch_into_model(&model).unwrap();
}
let g = pin();
for delta in &deltas[..4] {
let row = model
.primary_index()
.__raw_index()
.mt_get(delta.row().d_key(), &g)
.unwrap();
let row_data = row.read();
assert_eq!(row_data.fields().len(), 1);
assert_eq!(
row_data.fields().get("password").unwrap(),
delta
.row()
.d_data()
.read()
.fields()
.get("password")
.unwrap()
);
}
}

@ -415,6 +415,13 @@ impl<'a> GNSEvent for AlterModelAddTxn<'a> {
return Err(TransactionError::OnRestoreDataConflictMismatch);
}
}
// TODO(@ohsayan): avoid double iteration
// publish deltas
for field_name in new_fields.stseq_ord_key() {
model
.delta_state()
.schema_append_unresolved_field_add(field_name);
}
Ok(())
})
}
@ -527,6 +534,13 @@ impl<'a> GNSEvent for AlterModelRemoveTxn<'a> {
}
}
}
// TODO(@ohsayan): avoid double iteration
// publish deltas
for field_name in removed_fields.iter() {
model
.delta_state()
.schema_append_unresolved_field_rem(field_name);
}
Ok(())
})
}

@ -26,6 +26,8 @@
use std::io::Read;
use rand::seq::SliceRandom;
use {
rand::{
distributions::{uniform::SampleUniform, Alphanumeric},
@ -38,6 +40,10 @@ use {
},
};
pub fn shuffle_slice<T>(slice: &mut [T], rng: &mut impl Rng) {
slice.shuffle(rng)
}
pub fn wait_for_key(msg: &str) {
use std::io::{self, Write};
print!("{msg}");

Loading…
Cancel
Save