Fix delta diffing sync and restore algorithm

next
Sayan Nandan 7 months ago
parent ff1067d648
commit 092d4c8eaa
No known key found for this signature in database
GPG Key ID: 0EBD769024B24F0A

@ -53,8 +53,6 @@ pub struct RowData {
fields: DcFieldIndex,
txn_revised_data: DeltaVersion,
txn_revised_schema_version: DeltaVersion,
// pretty useless from an operational POV; only used during restore
restore_txn_id: DeltaVersion,
}
impl RowData {
@ -70,9 +68,6 @@ impl RowData {
pub fn get_txn_revised(&self) -> DeltaVersion {
self.txn_revised_data
}
pub fn get_restored_txn_revised(&self) -> DeltaVersion {
self.restore_txn_id
}
}
impl TreeElement for Row {
@ -105,20 +100,13 @@ impl Row {
schema_version: DeltaVersion,
txn_revised_data: DeltaVersion,
) -> Self {
Self::new_restored(
pk,
data,
schema_version,
txn_revised_data,
DeltaVersion::genesis(),
)
Self::new_restored(pk, data, schema_version, txn_revised_data)
}
pub fn new_restored(
pk: PrimaryIndexKey,
data: DcFieldIndex,
schema_version: DeltaVersion,
txn_revised_data: DeltaVersion,
restore_txn_id: DeltaVersion,
) -> Self {
Self {
__pk: ManuallyDrop::new(pk),
@ -128,8 +116,6 @@ impl Row {
fields: data,
txn_revised_schema_version: schema_version,
txn_revised_data,
// pretty useless here
restore_txn_id,
}))
},
}

@ -62,6 +62,10 @@ impl DeltaState {
data_deltas_size: AtomicUsize::new(0),
}
}
pub fn __set_delta_version(&self, version: DeltaVersion) {
self.data_current_version
.store(version.value_u64(), Ordering::Relaxed)
}
}
// data direct

@ -131,11 +131,11 @@ pub trait GlobalInstanceLike {
model: &Model,
hint: QueryExecMeta,
) {
let current_delta_size = hint.delta_hint();
let index_size = model.primary_index().count();
let five = (index_size as f64 * 0.05) as usize;
let max_delta = five.max(self.get_max_delta_size());
if current_delta_size >= max_delta {
// check if we need to sync
let r_tolerated_change = hint.delta_hint() >= self.get_max_delta_size();
let r_percent_change = (hint.delta_hint() >= ((model.primary_index().count() / 100) * 5))
& (r_tolerated_change);
if r_tolerated_change | r_percent_change {
let obtained_delta_size = model
.delta_state()
.__fractal_take_full_from_data_delta(FractalToken::new());

@ -76,18 +76,19 @@ impl<Fs: FSInterface> TestGlobal<Fs> {
let space_idx = self.gns.idx().read();
for (model_name, model) in self.gns.idx_models().read().iter() {
let space_uuid = space_idx.get(model_name.space()).unwrap().get_uuid();
let driver = ModelDriver::open_model_driver(
model,
&paths_v1::model_path(
model_name.space(),
space_uuid,
model_name.entity(),
model.get_uuid(),
),
)?;
assert!(mdl_drivers
.insert(
ModelUniqueID::new(model_name.space(), model_name.entity(), model.get_uuid()),
FractalModelDriver::init(ModelDriver::open_model_driver(
model,
&paths_v1::model_path(
model_name.space(),
space_uuid,
model_name.entity(),
model.get_uuid(),
),
)?)
FractalModelDriver::init(driver)
)
.is_none());
}
@ -152,7 +153,7 @@ impl<Fs: FSInterface> GlobalInstanceLike for TestGlobal<Fs> {
.batch_driver()
.lock()
.commit_event(StdModelBatch::new(mdl, count))
.unwrap()
.unwrap();
}
}
}

@ -216,7 +216,7 @@ impl<F: FSInterface> DataBatchRestoreDriver<F> {
DecodedBatchEventKind::Insert(new_row) | DecodedBatchEventKind::Update(new_row) => {
// this is more like a "newrow"
match p_index.mt_get_element(&pk, &g) {
Some(row) if row.d_data().read().get_restored_txn_revised() > txn_id => {
Some(row) if row.d_data().read().get_txn_revised() > txn_id => {
// skewed
// resolve deltas if any
let _ = row.resolve_schema_deltas_and_freeze(m.delta_state());
@ -244,7 +244,6 @@ impl<F: FSInterface> DataBatchRestoreDriver<F> {
pk,
data,
DeltaVersion::__new(schema_version),
DeltaVersion::__new(0),
txn_id,
);
// resolve any deltas
@ -276,7 +275,7 @@ impl<F: FSInterface> DataBatchRestoreDriver<F> {
for (pk, txn_id) in pending_delete {
match p_index.mt_get(&pk, &g) {
Some(row) => {
if row.read().get_restored_txn_revised() > txn_id {
if row.read().get_txn_revised() > txn_id {
// our delete "happened before" this row was inserted
continue;
}

@ -510,54 +510,66 @@ impl BatchAdapterSpec for ModelDataAdapter {
let mut pending_delete = HashMap::new();
let p_index = gs.primary_index().__raw_index();
let m = gs;
let mut real_last_txn_id = DeltaVersion::genesis();
for DecodedBatchEvent { txn_id, pk, kind } in batch_state.events {
match kind {
DecodedBatchEventKind::Insert(new_row) | DecodedBatchEventKind::Update(new_row) => {
// this is more like a "newrow"
match p_index.mt_get_element(&pk, &g) {
Some(row) if row.d_data().read().get_restored_txn_revised() > txn_id => {
// skewed
// resolve deltas if any
let _ = row.resolve_schema_deltas_and_freeze(m.delta_state());
continue;
}
Some(_) | None => {
// new row (logically)
let _ = p_index.mt_delete(&pk, &g);
let mut data = DcFieldIndex::default();
for (field_name, new_data) in m
.fields()
.stseq_ord_key()
.filter(|key| key.as_str() != m.p_key())
.zip(new_row)
{
data.st_insert(
unsafe {
// UNSAFE(@ohsayan): model in scope, we're good
field_name.clone()
},
new_data,
);
}
let row = Row::new_restored(
pk,
data,
DeltaVersion::__new(batch_md.schema_version),
DeltaVersion::__new(0),
txn_id,
);
// resolve any deltas
let _ = row.resolve_schema_deltas_and_freeze(m.delta_state());
// put it back in (lol); blame @ohsayan for this joke
p_index.mt_insert(row, &g);
}
let popped_row = p_index.mt_delete_return(&pk, &g);
if let Some(row) = popped_row {
/*
if a newer version of the row is received first and the older version is pending to be synced, the older
version is never synced. this is how the diffing algorithm works to ensure consistency.
the delta diff algorithm statically guarantees this.
*/
let row_txn_revised = row.read().get_txn_revised();
assert!(
row_txn_revised.value_u64() == 0 || row_txn_revised < txn_id,
"revised ID is {} but our row has version {}",
row.read().get_txn_revised().value_u64(),
txn_id.value_u64()
);
}
if txn_id > real_last_txn_id {
real_last_txn_id = txn_id;
}
let mut data = DcFieldIndex::default();
for (field_name, new_data) in m
.fields()
.stseq_ord_key()
.filter(|key| key.as_str() != m.p_key())
.zip(new_row)
{
data.st_insert(
unsafe {
// UNSAFE(@ohsayan): model in scope, we're good
field_name.clone()
},
new_data,
);
}
let row = Row::new_restored(
pk,
data,
DeltaVersion::__new(batch_md.schema_version),
txn_id,
);
// resolve any deltas
let _ = row.resolve_schema_deltas_and_freeze(m.delta_state());
// put it back in (lol); blame @ohsayan for this joke
p_index.mt_insert(row, &g);
}
DecodedBatchEventKind::Delete => {
/*
due to the concurrent nature of the engine, deletes can "appear before" an insert or update and since
we don't store deleted txn ids, we just put this in a pending list.
*/
match pending_delete.entry(pk) {
HMEntry::Occupied(mut existing_delete) => {
if *existing_delete.get() > txn_id {
// the existing delete "happened after" our delete, so it takes precedence
/*
this is a "newer delete" and it takes precedence. basically the same key was
deleted by two txns but they were only synced much later, and out of order.
*/
continue;
}
// the existing delete happened before our delete, so our delete takes precedence
@ -572,11 +584,14 @@ impl BatchAdapterSpec for ModelDataAdapter {
}
}
}
// apply pending deletes; are our conflicts would have been resolved by now
// apply pending deletes; all our conflicts would have been resolved by now
for (pk, txn_id) in pending_delete {
if txn_id > real_last_txn_id {
real_last_txn_id = txn_id;
}
match p_index.mt_get(&pk, &g) {
Some(row) => {
if row.read().get_restored_txn_revised() > txn_id {
if row.read().get_txn_revised() > txn_id {
// our delete "happened before" this row was inserted
continue;
}
@ -584,11 +599,15 @@ impl BatchAdapterSpec for ModelDataAdapter {
let _ = p_index.mt_delete(&pk, &g);
}
None => {
// since we never delete rows until here, this is impossible
unreachable!()
// if we reach here it basically means that both an (insert and/or update) and a delete
// were present as part of the same diff and the delta algorithm ignored the insert/update
// in this case, we do nothing
}
}
}
// +1 since it is a fetch add!
m.delta_state()
.__set_delta_version(DeltaVersion::__new(real_last_txn_id.value_u64() + 1));
Ok(())
}
}

@ -34,7 +34,7 @@ use {
ql::{
ast,
ddl::crt::{CreateModel, CreateSpace},
dml::ins::InsertStatement,
dml::{ins::InsertStatement, upd::UpdateStatement},
tests::lex_insecure,
},
storage::common::interface::fs_test::VirtualFS,
@ -61,6 +61,12 @@ fn run_insert(global: &TestGlobal<VirtualFS>, insert: &str) -> QueryResult<()> {
dml::insert(global, insert)
}
fn run_update(global: &TestGlobal<VirtualFS>, update: &str) -> QueryResult<()> {
let tokens = lex_insecure(update.as_bytes()).unwrap();
let insert: UpdateStatement = ast::parse_ast_node_full(&tokens[1..]).unwrap();
dml::update(global, insert)
}
#[test]
fn empty_model_data() {
test_utils::with_variable("empty_model_data", |log_name| {
@ -81,9 +87,21 @@ fn empty_model_data() {
})
}
fn create_test_kv(change_count: usize) -> Vec<(String, String)> {
(1..=change_count)
.map(|i| {
(
format!("user-{i:0>change_count$}"),
format!("password-{i:0>change_count$}"),
)
})
.collect()
}
#[test]
fn model_data_deltas() {
test_utils::with_variable(("model_data_deltas", 1000), |(log_name, change_count)| {
fn model_data_inserts() {
test_utils::with_variable(("model_data_inserts", 1000), |(log_name, change_count)| {
let key_values = create_test_kv(change_count);
// create, insert and close
{
let mut global = TestGlobal::new_with_vfs_driver(log_name);
@ -93,15 +111,15 @@ fn model_data_deltas() {
"create model apps.social(user_name: string, password: string)",
)
.unwrap();
for i in 1..=change_count {
for (username, password) in key_values.iter() {
run_insert(
&global,
&format!("insert into apps.social('user-{i:0>1000}', 'password-{i:0>1000}')"),
&format!("insert into apps.social('{username}', '{password}')"),
)
.unwrap();
}
}
// reopen and verify a 100 times
// reopen and verify 100 times
test_utils::multi_run(100, || {
let global = TestGlobal::new_with_vfs_driver(log_name);
global.load_model_drivers().unwrap();
@ -109,21 +127,19 @@ fn model_data_deltas() {
.state()
.with_model(EntityIDRef::new("apps", "social"), |model| {
let g = pin();
for i in 1..=change_count {
for (username, password) in key_values.iter() {
assert_eq!(
model
.primary_index()
.select(Lit::new_string(format!("user-{i:0>1000}")), &g)
.select(Lit::new_str(username.as_str()), &g)
.unwrap()
.d_data()
.read()
.fields()
.get("password")
.cloned()
.unwrap()
.into_str()
.unwrap(),
format!("password-{i:0>1000}")
.str(),
password.as_str()
)
}
Ok(())
@ -132,3 +148,90 @@ fn model_data_deltas() {
})
})
}
#[test]
fn model_data_updates() {
test_utils::with_variable(("model_data_updates", 50), |(log_name, n)| {
let key_values = create_test_kv(n);
/*
- we first open the log and then insert n values
- we then reopen the log 100 times, changing n / 100 values every time (we set the string to an empty one)
- we finally reopen the log and check if all the keys have empty string as the password
*/
{
// insert n values
let mut global = TestGlobal::new_with_vfs_driver(log_name);
global.set_max_data_pressure(n);
let _ = create_model_and_space(
&global,
"create model apps.social(user_name: string, password: string)",
)
.unwrap();
for (username, password) in key_values.iter() {
run_insert(
&global,
&format!("insert into apps.social('{username}', '{password}')"),
)
.unwrap();
}
}
{
// reopen and update multiple times
// this effectively opens the log 100 times
let changes_per_cycle = n / 10;
let reopen_count = n / changes_per_cycle;
// now update values
let mut actual_position = 0;
for _ in 0..reopen_count {
let mut global = TestGlobal::new_with_vfs_driver(log_name);
global.set_max_data_pressure(changes_per_cycle);
global.load_model_drivers().unwrap();
let mut j = 0;
for _ in 0..changes_per_cycle {
let (username, _) = &key_values[actual_position];
run_update(
&global,
&format!(
"update apps.social set password = '' where user_name = '{username}'"
),
)
.unwrap();
actual_position += 1;
j += 1;
}
assert_eq!(j, changes_per_cycle);
drop(global);
}
assert_eq!(actual_position, n);
}
{
let global = TestGlobal::new_with_vfs_driver(log_name);
global.load_model_drivers().unwrap();
for (txn_id, (username, _)) in key_values
.iter()
.enumerate()
.map(|(i, x)| ((i + n) as u64, x))
{
global
.state()
.with_model(EntityIDRef::new("apps", "social"), |model| {
let g = pin();
let row = model
.primary_index()
.select(Lit::new_str(username.as_str()), &g)
.unwrap()
.d_data()
.read();
let pass = row.fields().get("password").unwrap().str();
assert!(
pass.is_empty(),
"failed for {username} because pass is {pass}",
);
assert_eq!(row.get_txn_revised().value_u64(), txn_id);
Ok(())
})
.unwrap();
}
}
})
}

@ -607,6 +607,22 @@ pub struct RawJournalReader<J: RawJournalAdapter, Fs: FSInterface> {
last_txn_id: u64,
last_txn_offset: u64,
last_txn_checksum: u64,
stats: JournalStats,
}
#[derive(Debug)]
pub struct JournalStats {
server_events: usize,
driver_events: usize,
}
impl JournalStats {
fn new() -> Self {
Self {
server_events: 0,
driver_events: 0,
}
}
}
impl<J: RawJournalAdapter, Fs: FSInterface> RawJournalReader<J, Fs> {
@ -624,7 +640,7 @@ impl<J: RawJournalAdapter, Fs: FSInterface> RawJournalReader<J, Fs> {
jtrace_reader!(Initialized);
let mut me = Self::new(reader, 0, 0, 0, 0);
loop {
if me._next_event(gs)? {
if me._apply_next_event_and_stop(gs)? {
jtrace_reader!(Completed);
let initializer = JournalInitializer::new(
me.tr.cursor(),
@ -654,6 +670,7 @@ impl<J: RawJournalAdapter, Fs: FSInterface> RawJournalReader<J, Fs> {
last_txn_id,
last_txn_offset,
last_txn_checksum,
stats: JournalStats::new(),
}
}
fn __refresh_known_txn(me: &mut Self) {
@ -665,7 +682,7 @@ impl<J: RawJournalAdapter, Fs: FSInterface> RawJournalReader<J, Fs> {
}
impl<J: RawJournalAdapter, Fs: FSInterface> RawJournalReader<J, Fs> {
fn _next_event(&mut self, gs: &J::GlobalState) -> RuntimeResult<bool> {
fn _apply_next_event_and_stop(&mut self, gs: &J::GlobalState) -> RuntimeResult<bool> {
let txn_id = u128::from_le_bytes(self.tr.read_block()?);
let meta = u64::from_le_bytes(self.tr.read_block()?);
if txn_id != self.txn_id as u128 {
@ -691,6 +708,7 @@ impl<J: RawJournalAdapter, Fs: FSInterface> RawJournalReader<J, Fs> {
Ok(()) => {
jtrace_reader!(ServerEventAppliedSuccess);
Self::__refresh_known_txn(self);
self.stats.server_events += 1;
return Ok(false);
}
Err(e) => return Err(e),
@ -745,6 +763,7 @@ impl<J: RawJournalAdapter, Fs: FSInterface> RawJournalReader<J, Fs> {
// we're going to refuse to read this
return Err(StorageError::RawJournalCorrupted.into());
}
self.stats.driver_events += 1;
// update
Self::__refresh_known_txn(self);
// full metadata validated; this is a valid close event but is it actually a close
@ -774,6 +793,7 @@ impl<J: RawJournalAdapter, Fs: FSInterface> RawJournalReader<J, Fs> {
if valid_meta {
// valid meta, update all
Self::__refresh_known_txn(self);
self.stats.driver_events += 1;
jtrace_reader!(ReopenSuccess);
Ok(false)
} else {

Loading…
Cancel
Save