|
|
@ -79,7 +79,7 @@ fn open_batch_data(fpath: &str, mdl: &Model) -> DataBatchPersistDriver<VirtualFS
|
|
|
|
fn new_delta(
|
|
|
|
fn new_delta(
|
|
|
|
schema: u64,
|
|
|
|
schema: u64,
|
|
|
|
txnid: u64,
|
|
|
|
txnid: u64,
|
|
|
|
pk: Datacell,
|
|
|
|
pk: impl Into<Datacell>,
|
|
|
|
data: DcFieldIndex,
|
|
|
|
data: DcFieldIndex,
|
|
|
|
change: DataDeltaKind,
|
|
|
|
change: DataDeltaKind,
|
|
|
|
) -> DataDelta {
|
|
|
|
) -> DataDelta {
|
|
|
@ -105,8 +105,99 @@ fn new_delta_with_row(schema: u64, txnid: u64, row: Row, change: DataDeltaKind)
|
|
|
|
)
|
|
|
|
)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
fn flush_deltas_and_re_read<const N: usize>(
|
|
|
|
|
|
|
|
mdl: &Model,
|
|
|
|
|
|
|
|
dt: [DataDelta; N],
|
|
|
|
|
|
|
|
fname: &str,
|
|
|
|
|
|
|
|
) -> Vec<NormalBatch> {
|
|
|
|
|
|
|
|
// delta queue
|
|
|
|
|
|
|
|
let g = pin();
|
|
|
|
|
|
|
|
for delta in dt {
|
|
|
|
|
|
|
|
mdl.delta_state().append_new_data_delta(delta, &g);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
let file = open_file(fname).into_created().unwrap();
|
|
|
|
|
|
|
|
{
|
|
|
|
|
|
|
|
let mut persist_driver = DataBatchPersistDriver::new(file, true).unwrap();
|
|
|
|
|
|
|
|
persist_driver.write_new_batch(&mdl, N).unwrap();
|
|
|
|
|
|
|
|
persist_driver.close().unwrap();
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
let mut restore_driver =
|
|
|
|
|
|
|
|
DataBatchRestoreDriver::new(open_file(fname).into_existing().unwrap().0).unwrap();
|
|
|
|
|
|
|
|
let batch = restore_driver.read_all_batches().unwrap();
|
|
|
|
|
|
|
|
batch
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
#[test]
|
|
|
|
fn deltas_only_insert() {
|
|
|
|
fn unskewed_delta() {
|
|
|
|
|
|
|
|
let uuid = Uuid::new();
|
|
|
|
|
|
|
|
let mdl = Model::new_restore(
|
|
|
|
|
|
|
|
uuid,
|
|
|
|
|
|
|
|
"username".into(),
|
|
|
|
|
|
|
|
TagSelector::Str.into_full(),
|
|
|
|
|
|
|
|
into_dict!(
|
|
|
|
|
|
|
|
"username" => Field::new([Layer::str()].into(), false),
|
|
|
|
|
|
|
|
"password" => Field::new([Layer::bin()].into(), false)
|
|
|
|
|
|
|
|
),
|
|
|
|
|
|
|
|
);
|
|
|
|
|
|
|
|
let deltas = [
|
|
|
|
|
|
|
|
new_delta(
|
|
|
|
|
|
|
|
0,
|
|
|
|
|
|
|
|
0,
|
|
|
|
|
|
|
|
"sayan",
|
|
|
|
|
|
|
|
into_dict!("password" => Datacell::new_bin("37ae4b773a9fc7a20164eb16".as_bytes().into())),
|
|
|
|
|
|
|
|
DataDeltaKind::Insert,
|
|
|
|
|
|
|
|
),
|
|
|
|
|
|
|
|
new_delta(
|
|
|
|
|
|
|
|
0,
|
|
|
|
|
|
|
|
1,
|
|
|
|
|
|
|
|
"badguy",
|
|
|
|
|
|
|
|
into_dict!("password" => Datacell::new_bin("5fe3cbdc470b667cb1ba288a".as_bytes().into())),
|
|
|
|
|
|
|
|
DataDeltaKind::Insert,
|
|
|
|
|
|
|
|
),
|
|
|
|
|
|
|
|
new_delta(
|
|
|
|
|
|
|
|
0,
|
|
|
|
|
|
|
|
2,
|
|
|
|
|
|
|
|
"doggo",
|
|
|
|
|
|
|
|
into_dict!("password" => Datacell::new_bin("c80403f9d0ae4d5d0e829dd0".as_bytes().into())),
|
|
|
|
|
|
|
|
DataDeltaKind::Insert,
|
|
|
|
|
|
|
|
),
|
|
|
|
|
|
|
|
new_delta(0, 3, "badguy", into_dict!(), DataDeltaKind::Delete),
|
|
|
|
|
|
|
|
];
|
|
|
|
|
|
|
|
let batches = flush_deltas_and_re_read(&mdl, deltas, "unskewed_delta.db-btlog");
|
|
|
|
|
|
|
|
assert_eq!(
|
|
|
|
|
|
|
|
batches,
|
|
|
|
|
|
|
|
vec![NormalBatch::new(
|
|
|
|
|
|
|
|
vec![
|
|
|
|
|
|
|
|
DecodedBatchEvent::new(
|
|
|
|
|
|
|
|
0,
|
|
|
|
|
|
|
|
pkey("sayan"),
|
|
|
|
|
|
|
|
DecodedBatchEventKind::Insert(vec![Datacell::new_bin(
|
|
|
|
|
|
|
|
b"37ae4b773a9fc7a20164eb16".to_vec().into_boxed_slice()
|
|
|
|
|
|
|
|
)])
|
|
|
|
|
|
|
|
),
|
|
|
|
|
|
|
|
DecodedBatchEvent::new(
|
|
|
|
|
|
|
|
1,
|
|
|
|
|
|
|
|
pkey("badguy"),
|
|
|
|
|
|
|
|
DecodedBatchEventKind::Insert(vec![Datacell::new_bin(
|
|
|
|
|
|
|
|
b"5fe3cbdc470b667cb1ba288a".to_vec().into_boxed_slice()
|
|
|
|
|
|
|
|
)])
|
|
|
|
|
|
|
|
),
|
|
|
|
|
|
|
|
DecodedBatchEvent::new(
|
|
|
|
|
|
|
|
2,
|
|
|
|
|
|
|
|
pkey("doggo"),
|
|
|
|
|
|
|
|
DecodedBatchEventKind::Insert(vec![Datacell::new_bin(
|
|
|
|
|
|
|
|
b"c80403f9d0ae4d5d0e829dd0".to_vec().into_boxed_slice()
|
|
|
|
|
|
|
|
)])
|
|
|
|
|
|
|
|
),
|
|
|
|
|
|
|
|
DecodedBatchEvent::new(3, pkey("badguy"), DecodedBatchEventKind::Delete)
|
|
|
|
|
|
|
|
],
|
|
|
|
|
|
|
|
0
|
|
|
|
|
|
|
|
)]
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
|
|
|
fn skewed_delta() {
|
|
|
|
// prepare model definition
|
|
|
|
// prepare model definition
|
|
|
|
let uuid = Uuid::new();
|
|
|
|
let uuid = Uuid::new();
|
|
|
|
let mdl = Model::new_restore(
|
|
|
|
let mdl = Model::new_restore(
|
|
|
@ -139,7 +230,7 @@ fn deltas_only_insert() {
|
|
|
|
new_delta(
|
|
|
|
new_delta(
|
|
|
|
0,
|
|
|
|
0,
|
|
|
|
1,
|
|
|
|
1,
|
|
|
|
Datacell::new_str("good cat".into()),
|
|
|
|
"good cat",
|
|
|
|
into_dict!("is_good" => Datacell::new_bool(true), "magical" => Datacell::new_bool(false)),
|
|
|
|
into_dict!("is_good" => Datacell::new_bool(true), "magical" => Datacell::new_bool(false)),
|
|
|
|
DataDeltaKind::Insert,
|
|
|
|
DataDeltaKind::Insert,
|
|
|
|
),
|
|
|
|
),
|
|
|
@ -147,34 +238,14 @@ fn deltas_only_insert() {
|
|
|
|
new_delta(
|
|
|
|
new_delta(
|
|
|
|
0,
|
|
|
|
0,
|
|
|
|
2,
|
|
|
|
2,
|
|
|
|
Datacell::new_str("bad cat".into()),
|
|
|
|
"bad cat",
|
|
|
|
into_dict!("is_good" => Datacell::new_bool(false), "magical" => Datacell::new_bool(false)),
|
|
|
|
into_dict!("is_good" => Datacell::new_bool(false), "magical" => Datacell::new_bool(false)),
|
|
|
|
DataDeltaKind::Insert,
|
|
|
|
DataDeltaKind::Insert,
|
|
|
|
),
|
|
|
|
),
|
|
|
|
// update catname: Schrödinger's cat, is_good: true, magical: true
|
|
|
|
// update catname: Schrödinger's cat, is_good: true, magical: true
|
|
|
|
new_delta_with_row(0, 3, row.clone(), DataDeltaKind::Update),
|
|
|
|
new_delta_with_row(0, 3, row.clone(), DataDeltaKind::Update),
|
|
|
|
];
|
|
|
|
];
|
|
|
|
// delta queue
|
|
|
|
let batch = flush_deltas_and_re_read(&mdl, deltas, "skewed_delta.db-btlog");
|
|
|
|
let g = pin();
|
|
|
|
|
|
|
|
for delta in deltas.clone() {
|
|
|
|
|
|
|
|
mdl.delta_state().append_new_data_delta(delta, &g);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
let file = open_file("deltas_only_insert.db-btlog")
|
|
|
|
|
|
|
|
.into_created()
|
|
|
|
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
{
|
|
|
|
|
|
|
|
let mut persist_driver = DataBatchPersistDriver::new(file, true).unwrap();
|
|
|
|
|
|
|
|
persist_driver.write_new_batch(&mdl, deltas.len()).unwrap();
|
|
|
|
|
|
|
|
persist_driver.close().unwrap();
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
let mut restore_driver = DataBatchRestoreDriver::new(
|
|
|
|
|
|
|
|
open_file("deltas_only_insert.db-btlog")
|
|
|
|
|
|
|
|
.into_existing()
|
|
|
|
|
|
|
|
.unwrap()
|
|
|
|
|
|
|
|
.0,
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
let batch = restore_driver.read_all_batches().unwrap();
|
|
|
|
|
|
|
|
assert_eq!(
|
|
|
|
assert_eq!(
|
|
|
|
batch,
|
|
|
|
batch,
|
|
|
|
vec![NormalBatch::new(
|
|
|
|
vec![NormalBatch::new(
|
|
|
|