From 092d4c8eaa9e2972fbfd8311862dfaaac2b9ccb0 Mon Sep 17 00:00:00 2001 From: Sayan Nandan Date: Fri, 23 Feb 2024 09:08:43 +0530 Subject: [PATCH] Fix delta diffing sync and restore algorithm --- server/src/engine/core/index/row.rs | 16 +-- server/src/engine/core/model/delta.rs | 4 + server/src/engine/fractal/mod.rs | 10 +- server/src/engine/fractal/test_utils.rs | 21 +-- .../storage/v1/raw/batch_jrnl/restore.rs | 5 +- .../engine/storage/v2/impls/mdl_journal.rs | 105 +++++++++------ .../storage/v2/impls/tests/model_driver.rs | 127 ++++++++++++++++-- .../engine/storage/v2/raw/journal/raw/mod.rs | 24 +++- 8 files changed, 222 insertions(+), 90 deletions(-) diff --git a/server/src/engine/core/index/row.rs b/server/src/engine/core/index/row.rs index dab50a7b..656422d5 100644 --- a/server/src/engine/core/index/row.rs +++ b/server/src/engine/core/index/row.rs @@ -53,8 +53,6 @@ pub struct RowData { fields: DcFieldIndex, txn_revised_data: DeltaVersion, txn_revised_schema_version: DeltaVersion, - // pretty useless from an operational POV; only used during restore - restore_txn_id: DeltaVersion, } impl RowData { @@ -70,9 +68,6 @@ impl RowData { pub fn get_txn_revised(&self) -> DeltaVersion { self.txn_revised_data } - pub fn get_restored_txn_revised(&self) -> DeltaVersion { - self.restore_txn_id - } } impl TreeElement for Row { @@ -105,20 +100,13 @@ impl Row { schema_version: DeltaVersion, txn_revised_data: DeltaVersion, ) -> Self { - Self::new_restored( - pk, - data, - schema_version, - txn_revised_data, - DeltaVersion::genesis(), - ) + Self::new_restored(pk, data, schema_version, txn_revised_data) } pub fn new_restored( pk: PrimaryIndexKey, data: DcFieldIndex, schema_version: DeltaVersion, txn_revised_data: DeltaVersion, - restore_txn_id: DeltaVersion, ) -> Self { Self { __pk: ManuallyDrop::new(pk), @@ -128,8 +116,6 @@ impl Row { fields: data, txn_revised_schema_version: schema_version, txn_revised_data, - // pretty useless here - restore_txn_id, })) }, } diff --git a/server/src/engine/core/model/delta.rs b/server/src/engine/core/model/delta.rs index e902f073..661a5bce 100644 --- a/server/src/engine/core/model/delta.rs +++ b/server/src/engine/core/model/delta.rs @@ -62,6 +62,10 @@ impl DeltaState { data_deltas_size: AtomicUsize::new(0), } } + pub fn __set_delta_version(&self, version: DeltaVersion) { + self.data_current_version + .store(version.value_u64(), Ordering::Relaxed) + } } // data direct diff --git a/server/src/engine/fractal/mod.rs b/server/src/engine/fractal/mod.rs index 1ae612cb..b9cdb465 100644 --- a/server/src/engine/fractal/mod.rs +++ b/server/src/engine/fractal/mod.rs @@ -131,11 +131,11 @@ pub trait GlobalInstanceLike { model: &Model, hint: QueryExecMeta, ) { - let current_delta_size = hint.delta_hint(); - let index_size = model.primary_index().count(); - let five = (index_size as f64 * 0.05) as usize; - let max_delta = five.max(self.get_max_delta_size()); - if current_delta_size >= max_delta { + // check if we need to sync + let r_tolerated_change = hint.delta_hint() >= self.get_max_delta_size(); + let r_percent_change = (hint.delta_hint() >= ((model.primary_index().count() / 100) * 5)) + & (r_tolerated_change); + if r_tolerated_change | r_percent_change { let obtained_delta_size = model .delta_state() .__fractal_take_full_from_data_delta(FractalToken::new()); diff --git a/server/src/engine/fractal/test_utils.rs b/server/src/engine/fractal/test_utils.rs index 5b1bb646..f78db5f9 100644 --- a/server/src/engine/fractal/test_utils.rs +++ b/server/src/engine/fractal/test_utils.rs @@ -76,18 +76,19 @@ impl TestGlobal { let space_idx = self.gns.idx().read(); for (model_name, model) in self.gns.idx_models().read().iter() { let space_uuid = space_idx.get(model_name.space()).unwrap().get_uuid(); + let driver = ModelDriver::open_model_driver( + model, + &paths_v1::model_path( + model_name.space(), + space_uuid, + model_name.entity(), + model.get_uuid(), + ), + )?; assert!(mdl_drivers .insert( ModelUniqueID::new(model_name.space(), model_name.entity(), model.get_uuid()), - FractalModelDriver::init(ModelDriver::open_model_driver( - model, - &paths_v1::model_path( - model_name.space(), - space_uuid, - model_name.entity(), - model.get_uuid(), - ), - )?) + FractalModelDriver::init(driver) ) .is_none()); } @@ -152,7 +153,7 @@ impl GlobalInstanceLike for TestGlobal { .batch_driver() .lock() .commit_event(StdModelBatch::new(mdl, count)) - .unwrap() + .unwrap(); } } } diff --git a/server/src/engine/storage/v1/raw/batch_jrnl/restore.rs b/server/src/engine/storage/v1/raw/batch_jrnl/restore.rs index c7a7bd0e..0116ee9b 100644 --- a/server/src/engine/storage/v1/raw/batch_jrnl/restore.rs +++ b/server/src/engine/storage/v1/raw/batch_jrnl/restore.rs @@ -216,7 +216,7 @@ impl DataBatchRestoreDriver { DecodedBatchEventKind::Insert(new_row) | DecodedBatchEventKind::Update(new_row) => { // this is more like a "newrow" match p_index.mt_get_element(&pk, &g) { - Some(row) if row.d_data().read().get_restored_txn_revised() > txn_id => { + Some(row) if row.d_data().read().get_txn_revised() > txn_id => { // skewed // resolve deltas if any let _ = row.resolve_schema_deltas_and_freeze(m.delta_state()); @@ -244,7 +244,6 @@ impl DataBatchRestoreDriver { pk, data, DeltaVersion::__new(schema_version), - DeltaVersion::__new(0), txn_id, ); // resolve any deltas @@ -276,7 +275,7 @@ impl DataBatchRestoreDriver { for (pk, txn_id) in pending_delete { match p_index.mt_get(&pk, &g) { Some(row) => { - if row.read().get_restored_txn_revised() > txn_id { + if row.read().get_txn_revised() > txn_id { // our delete "happened before" this row was inserted continue; } diff --git a/server/src/engine/storage/v2/impls/mdl_journal.rs b/server/src/engine/storage/v2/impls/mdl_journal.rs index 398fef9c..2aae8aa9 100644 --- a/server/src/engine/storage/v2/impls/mdl_journal.rs +++ b/server/src/engine/storage/v2/impls/mdl_journal.rs @@ -510,54 +510,66 @@ impl BatchAdapterSpec for ModelDataAdapter { let mut pending_delete = HashMap::new(); let p_index = gs.primary_index().__raw_index(); let m = gs; + let mut real_last_txn_id = DeltaVersion::genesis(); for DecodedBatchEvent { txn_id, pk, kind } in batch_state.events { match kind { DecodedBatchEventKind::Insert(new_row) | DecodedBatchEventKind::Update(new_row) => { - // this is more like a "newrow" - match p_index.mt_get_element(&pk, &g) { - Some(row) if row.d_data().read().get_restored_txn_revised() > txn_id => { - // skewed - // resolve deltas if any - let _ = row.resolve_schema_deltas_and_freeze(m.delta_state()); - continue; - } - Some(_) | None => { - // new row (logically) - let _ = p_index.mt_delete(&pk, &g); - let mut data = DcFieldIndex::default(); - for (field_name, new_data) in m - .fields() - .stseq_ord_key() - .filter(|key| key.as_str() != m.p_key()) - .zip(new_row) - { - data.st_insert( - unsafe { - // UNSAFE(@ohsayan): model in scope, we're good - field_name.clone() - }, - new_data, - ); - } - let row = Row::new_restored( - pk, - data, - DeltaVersion::__new(batch_md.schema_version), - DeltaVersion::__new(0), - txn_id, - ); - // resolve any deltas - let _ = row.resolve_schema_deltas_and_freeze(m.delta_state()); - // put it back in (lol); blame @ohsayan for this joke - p_index.mt_insert(row, &g); - } + let popped_row = p_index.mt_delete_return(&pk, &g); + if let Some(row) = popped_row { + /* + if a newer version of the row is received first and the older version is pending to be synced, the older + version is never synced. this is how the diffing algorithm works to ensure consistency. + the delta diff algorithm statically guarantees this. + */ + let row_txn_revised = row.read().get_txn_revised(); + assert!( + row_txn_revised.value_u64() == 0 || row_txn_revised < txn_id, + "revised ID is {} but our row has version {}", + row.read().get_txn_revised().value_u64(), + txn_id.value_u64() + ); + } + if txn_id > real_last_txn_id { + real_last_txn_id = txn_id; } + let mut data = DcFieldIndex::default(); + for (field_name, new_data) in m + .fields() + .stseq_ord_key() + .filter(|key| key.as_str() != m.p_key()) + .zip(new_row) + { + data.st_insert( + unsafe { + // UNSAFE(@ohsayan): model in scope, we're good + field_name.clone() + }, + new_data, + ); + } + let row = Row::new_restored( + pk, + data, + DeltaVersion::__new(batch_md.schema_version), + txn_id, + ); + // resolve any deltas + let _ = row.resolve_schema_deltas_and_freeze(m.delta_state()); + // put it back in (lol); blame @ohsayan for this joke + p_index.mt_insert(row, &g); } DecodedBatchEventKind::Delete => { + /* + due to the concurrent nature of the engine, deletes can "appear before" an insert or update and since + we don't store deleted txn ids, we just put this in a pending list. + */ match pending_delete.entry(pk) { HMEntry::Occupied(mut existing_delete) => { if *existing_delete.get() > txn_id { - // the existing delete "happened after" our delete, so it takes precedence + /* + this is a "newer delete" and it takes precedence. basically the same key was + deleted by two txns but they were only synced much later, and out of order. + */ continue; } // the existing delete happened before our delete, so our delete takes precedence @@ -572,11 +584,14 @@ impl BatchAdapterSpec for ModelDataAdapter { } } } - // apply pending deletes; are our conflicts would have been resolved by now + // apply pending deletes; all our conflicts would have been resolved by now for (pk, txn_id) in pending_delete { + if txn_id > real_last_txn_id { + real_last_txn_id = txn_id; + } match p_index.mt_get(&pk, &g) { Some(row) => { - if row.read().get_restored_txn_revised() > txn_id { + if row.read().get_txn_revised() > txn_id { // our delete "happened before" this row was inserted continue; } @@ -584,11 +599,15 @@ impl BatchAdapterSpec for ModelDataAdapter { let _ = p_index.mt_delete(&pk, &g); } None => { - // since we never delete rows until here, this is impossible - unreachable!() + // if we reach here it basically means that both an (insert and/or update) and a delete + // were present as part of the same diff and the delta algorithm ignored the insert/update + // in this case, we do nothing } } } + // +1 since it is a fetch add! + m.delta_state() + .__set_delta_version(DeltaVersion::__new(real_last_txn_id.value_u64() + 1)); Ok(()) } } diff --git a/server/src/engine/storage/v2/impls/tests/model_driver.rs b/server/src/engine/storage/v2/impls/tests/model_driver.rs index fcb927b3..2188244a 100644 --- a/server/src/engine/storage/v2/impls/tests/model_driver.rs +++ b/server/src/engine/storage/v2/impls/tests/model_driver.rs @@ -34,7 +34,7 @@ use { ql::{ ast, ddl::crt::{CreateModel, CreateSpace}, - dml::ins::InsertStatement, + dml::{ins::InsertStatement, upd::UpdateStatement}, tests::lex_insecure, }, storage::common::interface::fs_test::VirtualFS, @@ -61,6 +61,12 @@ fn run_insert(global: &TestGlobal, insert: &str) -> QueryResult<()> { dml::insert(global, insert) } +fn run_update(global: &TestGlobal, update: &str) -> QueryResult<()> { + let tokens = lex_insecure(update.as_bytes()).unwrap(); + let insert: UpdateStatement = ast::parse_ast_node_full(&tokens[1..]).unwrap(); + dml::update(global, insert) +} + #[test] fn empty_model_data() { test_utils::with_variable("empty_model_data", |log_name| { @@ -81,9 +87,21 @@ fn empty_model_data() { }) } +fn create_test_kv(change_count: usize) -> Vec<(String, String)> { + (1..=change_count) + .map(|i| { + ( + format!("user-{i:0>change_count$}"), + format!("password-{i:0>change_count$}"), + ) + }) + .collect() +} + #[test] -fn model_data_deltas() { - test_utils::with_variable(("model_data_deltas", 1000), |(log_name, change_count)| { +fn model_data_inserts() { + test_utils::with_variable(("model_data_inserts", 1000), |(log_name, change_count)| { + let key_values = create_test_kv(change_count); // create, insert and close { let mut global = TestGlobal::new_with_vfs_driver(log_name); @@ -93,15 +111,15 @@ fn model_data_deltas() { "create model apps.social(user_name: string, password: string)", ) .unwrap(); - for i in 1..=change_count { + for (username, password) in key_values.iter() { run_insert( &global, - &format!("insert into apps.social('user-{i:0>1000}', 'password-{i:0>1000}')"), + &format!("insert into apps.social('{username}', '{password}')"), ) .unwrap(); } } - // reopen and verify a 100 times + // reopen and verify 100 times test_utils::multi_run(100, || { let global = TestGlobal::new_with_vfs_driver(log_name); global.load_model_drivers().unwrap(); @@ -109,21 +127,19 @@ fn model_data_deltas() { .state() .with_model(EntityIDRef::new("apps", "social"), |model| { let g = pin(); - for i in 1..=change_count { + for (username, password) in key_values.iter() { assert_eq!( model .primary_index() - .select(Lit::new_string(format!("user-{i:0>1000}")), &g) + .select(Lit::new_str(username.as_str()), &g) .unwrap() .d_data() .read() .fields() .get("password") - .cloned() .unwrap() - .into_str() - .unwrap(), - format!("password-{i:0>1000}") + .str(), + password.as_str() ) } Ok(()) @@ -132,3 +148,90 @@ fn model_data_deltas() { }) }) } + +#[test] +fn model_data_updates() { + test_utils::with_variable(("model_data_updates", 50), |(log_name, n)| { + let key_values = create_test_kv(n); + /* + - we first open the log and then insert n values + - we then reopen the log 100 times, changing n / 100 values every time (we set the string to an empty one) + - we finally reopen the log and check if all the keys have empty string as the password + */ + { + // insert n values + let mut global = TestGlobal::new_with_vfs_driver(log_name); + global.set_max_data_pressure(n); + let _ = create_model_and_space( + &global, + "create model apps.social(user_name: string, password: string)", + ) + .unwrap(); + for (username, password) in key_values.iter() { + run_insert( + &global, + &format!("insert into apps.social('{username}', '{password}')"), + ) + .unwrap(); + } + } + { + // reopen and update multiple times + // this effectively opens the log 100 times + let changes_per_cycle = n / 10; + let reopen_count = n / changes_per_cycle; + // now update values + let mut actual_position = 0; + for _ in 0..reopen_count { + let mut global = TestGlobal::new_with_vfs_driver(log_name); + global.set_max_data_pressure(changes_per_cycle); + global.load_model_drivers().unwrap(); + let mut j = 0; + for _ in 0..changes_per_cycle { + let (username, _) = &key_values[actual_position]; + run_update( + &global, + &format!( + "update apps.social set password = '' where user_name = '{username}'" + ), + ) + .unwrap(); + actual_position += 1; + j += 1; + } + assert_eq!(j, changes_per_cycle); + drop(global); + } + assert_eq!(actual_position, n); + } + { + let global = TestGlobal::new_with_vfs_driver(log_name); + global.load_model_drivers().unwrap(); + for (txn_id, (username, _)) in key_values + .iter() + .enumerate() + .map(|(i, x)| ((i + n) as u64, x)) + { + global + .state() + .with_model(EntityIDRef::new("apps", "social"), |model| { + let g = pin(); + let row = model + .primary_index() + .select(Lit::new_str(username.as_str()), &g) + .unwrap() + .d_data() + .read(); + let pass = row.fields().get("password").unwrap().str(); + assert!( + pass.is_empty(), + "failed for {username} because pass is {pass}", + ); + assert_eq!(row.get_txn_revised().value_u64(), txn_id); + Ok(()) + }) + .unwrap(); + } + } + }) +} diff --git a/server/src/engine/storage/v2/raw/journal/raw/mod.rs b/server/src/engine/storage/v2/raw/journal/raw/mod.rs index 63a60bf6..c28cf2db 100644 --- a/server/src/engine/storage/v2/raw/journal/raw/mod.rs +++ b/server/src/engine/storage/v2/raw/journal/raw/mod.rs @@ -607,6 +607,22 @@ pub struct RawJournalReader { last_txn_id: u64, last_txn_offset: u64, last_txn_checksum: u64, + stats: JournalStats, +} + +#[derive(Debug)] +pub struct JournalStats { + server_events: usize, + driver_events: usize, +} + +impl JournalStats { + fn new() -> Self { + Self { + server_events: 0, + driver_events: 0, + } + } } impl RawJournalReader { @@ -624,7 +640,7 @@ impl RawJournalReader { jtrace_reader!(Initialized); let mut me = Self::new(reader, 0, 0, 0, 0); loop { - if me._next_event(gs)? { + if me._apply_next_event_and_stop(gs)? { jtrace_reader!(Completed); let initializer = JournalInitializer::new( me.tr.cursor(), @@ -654,6 +670,7 @@ impl RawJournalReader { last_txn_id, last_txn_offset, last_txn_checksum, + stats: JournalStats::new(), } } fn __refresh_known_txn(me: &mut Self) { @@ -665,7 +682,7 @@ impl RawJournalReader { } impl RawJournalReader { - fn _next_event(&mut self, gs: &J::GlobalState) -> RuntimeResult { + fn _apply_next_event_and_stop(&mut self, gs: &J::GlobalState) -> RuntimeResult { let txn_id = u128::from_le_bytes(self.tr.read_block()?); let meta = u64::from_le_bytes(self.tr.read_block()?); if txn_id != self.txn_id as u128 { @@ -691,6 +708,7 @@ impl RawJournalReader { Ok(()) => { jtrace_reader!(ServerEventAppliedSuccess); Self::__refresh_known_txn(self); + self.stats.server_events += 1; return Ok(false); } Err(e) => return Err(e), @@ -745,6 +763,7 @@ impl RawJournalReader { // we're going to refuse to read this return Err(StorageError::RawJournalCorrupted.into()); } + self.stats.driver_events += 1; // update Self::__refresh_known_txn(self); // full metadata validated; this is a valid close event but is it actually a close @@ -774,6 +793,7 @@ impl RawJournalReader { if valid_meta { // valid meta, update all Self::__refresh_known_txn(self); + self.stats.driver_events += 1; jtrace_reader!(ReopenSuccess); Ok(false) } else {