From a3d87e48506bfe151e2a11aa87e6f3b8f014529b Mon Sep 17 00:00:00 2001 From: Sayan Nandan Date: Mon, 20 Nov 2023 16:47:13 +0530 Subject: [PATCH] Make sure that model drops are synchronized across the system --- Cargo.lock | 2 +- server/src/engine/core/dml/del.rs | 2 - server/src/engine/core/dml/ins.rs | 8 +- server/src/engine/core/dml/upd.rs | 9 +- server/src/engine/core/index/row.rs | 2 - server/src/engine/core/model/delta.rs | 12 +- server/src/engine/core/model/mod.rs | 8 +- server/src/engine/fractal/mgr.rs | 6 +- server/src/engine/fractal/mod.rs | 24 ++++ server/src/engine/fractal/test_utils.rs | 16 +++ server/src/engine/storage/v1/tests/batch.rs | 22 ++-- sky-bench/src/bench.rs | 115 +++++++++++++++----- sky-bench/src/runtime.rs | 11 +- 13 files changed, 152 insertions(+), 85 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4dc37767..b49ca2a3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1232,7 +1232,7 @@ dependencies = [ [[package]] name = "skytable" version = "0.8.0" -source = "git+https://github.com/skytable/client-rust.git?branch=octave#ce16be88204044cd8358a5aa48521bd30e60ae33" +source = "git+https://github.com/skytable/client-rust.git?branch=octave#18aad43142fbe013ace64d92c3ffd71ab1394a18" dependencies = [ "async-trait", "bb8", diff --git a/server/src/engine/core/dml/del.rs b/server/src/engine/core/dml/del.rs index 0cea6aaa..119d1c92 100644 --- a/server/src/engine/core/dml/del.rs +++ b/server/src/engine/core/dml/del.rs @@ -44,7 +44,6 @@ pub fn delete_resp( pub fn delete(global: &impl GlobalInstanceLike, mut delete: DeleteStatement) -> QueryResult<()> { core::with_model_for_data_update(global, delete.entity(), |model| { let g = sync::atm::cpin(); - let schema_version = model.delta_state().schema_current_version(); let delta_state = model.delta_state(); // create new version let new_version = delta_state.create_new_data_delta_version(); @@ -57,7 +56,6 @@ pub fn delete(global: &impl GlobalInstanceLike, mut delete: DeleteStatement) -> let dp = delta_state.append_new_data_delta_with( DataDeltaKind::Delete, row.clone(), - schema_version, new_version, &g, ); diff --git a/server/src/engine/core/dml/ins.rs b/server/src/engine/core/dml/ins.rs index 33242746..2ff9723b 100644 --- a/server/src/engine/core/dml/ins.rs +++ b/server/src/engine/core/dml/ins.rs @@ -57,13 +57,7 @@ pub fn insert(global: &impl GlobalInstanceLike, insert: InsertStatement) -> Quer let row = Row::new(pk, data, ds.schema_current_version(), new_version); if mdl.primary_index().__raw_index().mt_insert(row.clone(), &g) { // append delta for new version - let dp = ds.append_new_data_delta_with( - DataDeltaKind::Insert, - row, - ds.schema_current_version(), - new_version, - &g, - ); + let dp = ds.append_new_data_delta_with(DataDeltaKind::Insert, row, new_version, &g); Ok(QueryExecMeta::new(dp)) } else { Err(QueryError::QExecDmlDuplicate) diff --git a/server/src/engine/core/dml/upd.rs b/server/src/engine/core/dml/upd.rs index 296a227e..b3810a5d 100644 --- a/server/src/engine/core/dml/upd.rs +++ b/server/src/engine/core/dml/upd.rs @@ -372,13 +372,8 @@ pub fn update(global: &impl GlobalInstanceLike, mut update: UpdateStatement) -> // update revised tag row_data_wl.set_txn_revised(new_version); // publish delta - let dp = ds.append_new_data_delta_with( - DataDeltaKind::Update, - row.clone(), - ds.schema_current_version(), - new_version, - &g, - ); + let dp = + ds.append_new_data_delta_with(DataDeltaKind::Update, row.clone(), new_version, &g); ret = Ok(QueryExecMeta::new(dp)) } ret diff --git a/server/src/engine/core/index/row.rs b/server/src/engine/core/index/row.rs index c44d8a4f..b667edd4 100644 --- a/server/src/engine/core/index/row.rs +++ b/server/src/engine/core/index/row.rs @@ -43,7 +43,6 @@ pub type DcFieldIndex = IndexST, Datacell, HasherNativeFx>; #[derive(Debug)] pub struct Row { - __genesis_schema_version: DeltaVersion, __pk: ManuallyDrop, __rc: RawRC>, } @@ -121,7 +120,6 @@ impl Row { restore_txn_id: DeltaVersion, ) -> Self { Self { - __genesis_schema_version: schema_version, __pk: ManuallyDrop::new(pk), __rc: unsafe { // UNSAFE(@ohsayan): we free this up later diff --git a/server/src/engine/core/model/delta.rs b/server/src/engine/core/model/delta.rs index a3760b24..a09c5514 100644 --- a/server/src/engine/core/model/delta.rs +++ b/server/src/engine/core/model/delta.rs @@ -193,11 +193,10 @@ impl DeltaState { &self, kind: DataDeltaKind, row: Row, - schema_version: DeltaVersion, data_version: DeltaVersion, g: &Guard, ) -> usize { - self.append_new_data_delta(DataDelta::new(schema_version, data_version, row, kind), g) + self.append_new_data_delta(DataDelta::new(data_version, row, kind), g) } pub fn append_new_data_delta(&self, delta: DataDelta, g: &Guard) -> usize { self.data_deltas.blocking_enqueue(delta, g); @@ -348,21 +347,14 @@ impl<'a> SchemaDeltaIndexRGuard<'a> { #[derive(Debug, Clone)] pub struct DataDelta { - _schema_version: DeltaVersion, data_version: DeltaVersion, row: Row, change: DataDeltaKind, } impl DataDelta { - pub const fn new( - schema_version: DeltaVersion, - data_version: DeltaVersion, - row: Row, - change: DataDeltaKind, - ) -> Self { + pub const fn new(data_version: DeltaVersion, row: Row, change: DataDeltaKind) -> Self { Self { - _schema_version: schema_version, data_version, row, change, diff --git a/server/src/engine/core/model/mod.rs b/server/src/engine/core/model/mod.rs index e1b222e0..71203879 100644 --- a/server/src/engine/core/model/mod.rs +++ b/server/src/engine/core/model/mod.rs @@ -290,12 +290,12 @@ impl Model { // commit txn global.namespace_txn_driver().lock().try_commit(txn)?; // request cleanup - global.taskmgr_post_standard_priority(Task::new(GenericTask::delete_model_dir( - &space_name, + global.purge_model_driver( + space_name.as_str(), space.get_uuid(), - &model_name, + model_name.as_str(), model.get_uuid(), - ))); + ); } // update global state let _ = models_idx.remove(&EntityIDRef::new(&space_name, &model_name)); diff --git a/server/src/engine/fractal/mgr.rs b/server/src/engine/fractal/mgr.rs index 2abc9b2d..3440dbf7 100644 --- a/server/src/engine/fractal/mgr.rs +++ b/server/src/engine/fractal/mgr.rs @@ -340,13 +340,13 @@ impl FractalMgr { _ = sigterm.recv() => { info!("flp: finishing any pending maintenance tasks"); let global = global.clone(); - tokio::task::spawn_blocking(|| self.general_executor_model_maintenance(global)).await.unwrap(); + tokio::task::spawn_blocking(|| self.general_executor(global)).await.unwrap(); info!("flp: exited executor service"); break; }, _ = tokio::time::sleep(dur) => { let global = global.clone(); - tokio::task::spawn_blocking(|| self.general_executor_model_maintenance(global)).await.unwrap() + tokio::task::spawn_blocking(|| self.general_executor(global)).await.unwrap() } task = lpq.recv() => { let Task { threshold, task } = match task { @@ -377,7 +377,7 @@ impl FractalMgr { } } } - fn general_executor_model_maintenance(&'static self, global: super::Global) { + fn general_executor(&'static self, global: super::Global) { let mdl_drivers = global.get_state().get_mdl_drivers().read(); for (model_id, driver) in mdl_drivers.iter() { let mut observed_len = 0; diff --git a/server/src/engine/fractal/mod.rs b/server/src/engine/fractal/mod.rs index 1645f2e6..168593cb 100644 --- a/server/src/engine/fractal/mod.rs +++ b/server/src/engine/fractal/mod.rs @@ -120,6 +120,13 @@ pub trait GlobalInstanceLike { model_name: &str, model_uuid: Uuid, ) -> RuntimeResult<()>; + fn purge_model_driver( + &self, + space_name: &str, + space_uuid: Uuid, + model_name: &str, + model_uuid: Uuid, + ); // taskmgr fn taskmgr_post_high_priority(&self, task: Task); fn taskmgr_post_standard_priority(&self, task: Task); @@ -174,6 +181,23 @@ impl GlobalInstanceLike for Global { &self.get_state().config } // model + fn purge_model_driver( + &self, + space_name: &str, + space_uuid: Uuid, + model_name: &str, + model_uuid: Uuid, + ) { + let id = ModelUniqueID::new(space_name, model_name, model_uuid); + self.get_state() + .mdl_driver + .write() + .remove(&id) + .expect("tried to remove non existent driver"); + self.taskmgr_post_standard_priority(Task::new(GenericTask::delete_model_dir( + space_name, space_uuid, model_name, model_uuid, + ))); + } fn initialize_model_driver( &self, space_name: &str, diff --git a/server/src/engine/fractal/test_utils.rs b/server/src/engine/fractal/test_utils.rs index 504a5773..91e28852 100644 --- a/server/src/engine/fractal/test_utils.rs +++ b/server/src/engine/fractal/test_utils.rs @@ -120,6 +120,22 @@ impl GlobalInstanceLike for TestGlobal { fn sys_store(&self) -> &SystemStore { &self.sys_cfg } + fn purge_model_driver( + &self, + space_name: &str, + space_uuid: Uuid, + model_name: &str, + model_uuid: Uuid, + ) { + let id = ModelUniqueID::new(space_name, model_name, model_uuid); + self.model_drivers + .write() + .remove(&id) + .expect("tried to remove non-existent model"); + self.taskmgr_post_standard_priority(Task::new(GenericTask::delete_model_dir( + space_name, space_uuid, model_name, model_uuid, + ))); + } fn initialize_model_driver( &self, space_name: &str, diff --git a/server/src/engine/storage/v1/tests/batch.rs b/server/src/engine/storage/v1/tests/batch.rs index a0e6ad1e..0cb4d24c 100644 --- a/server/src/engine/storage/v1/tests/batch.rs +++ b/server/src/engine/storage/v1/tests/batch.rs @@ -81,7 +81,6 @@ fn new_delta( change: DataDeltaKind, ) -> DataDelta { new_delta_with_row( - schema, txnid, Row::new( pkey(pk), @@ -93,13 +92,8 @@ fn new_delta( ) } -fn new_delta_with_row(schema: u64, txnid: u64, row: Row, change: DataDeltaKind) -> DataDelta { - DataDelta::new( - DeltaVersion::__new(schema), - DeltaVersion::__new(txnid), - row, - change, - ) +fn new_delta_with_row(txnid: u64, row: Row, change: DataDeltaKind) -> DataDelta { + DataDelta::new(DeltaVersion::__new(txnid), row, change) } fn flush_deltas_and_re_read( @@ -247,7 +241,7 @@ fn skewed_delta() { // prepare deltas let deltas = [ // insert catname: Schrödinger's cat, is_good: true - new_delta_with_row(0, 0, row.clone(), DataDeltaKind::Insert), + new_delta_with_row(0, row.clone(), DataDeltaKind::Insert), // insert catname: good cat, is_good: true, magical: false new_delta( 0, @@ -265,7 +259,7 @@ fn skewed_delta() { DataDeltaKind::Insert, ), // update catname: Schrödinger's cat, is_good: true, magical: true - new_delta_with_row(0, 3, row.clone(), DataDeltaKind::Update), + new_delta_with_row(3, row.clone(), DataDeltaKind::Update), ]; let batch = flush_deltas_and_re_read(&mdl, deltas, "skewed_delta.db-btlog"); assert_eq!( @@ -352,10 +346,10 @@ fn skewed_shuffled_persist_restore() { into_dict!("password" => "pwd456789"), DataDeltaKind::Insert, ), - new_delta_with_row(0, 4, mongobongo.clone(), DataDeltaKind::Insert), - new_delta_with_row(0, 5, rds.clone(), DataDeltaKind::Insert), - new_delta_with_row(0, 6, mongobongo.clone(), DataDeltaKind::Delete), - new_delta_with_row(0, 7, rds.clone(), DataDeltaKind::Delete), + new_delta_with_row(4, mongobongo.clone(), DataDeltaKind::Insert), + new_delta_with_row(5, rds.clone(), DataDeltaKind::Insert), + new_delta_with_row(6, mongobongo.clone(), DataDeltaKind::Delete), + new_delta_with_row(7, rds.clone(), DataDeltaKind::Delete), ]; for i in 0..deltas.len() { // prepare pretest diff --git a/sky-bench/src/bench.rs b/sky-bench/src/bench.rs index 7325734c..dc725c66 100644 --- a/sky-bench/src/bench.rs +++ b/sky-bench/src/bench.rs @@ -91,22 +91,28 @@ impl BombardTaskSpec { let mut q = query!(&self.base_query); let resp = match self.kind { BombardTaskKind::Insert(second_column) => { - q.push_param(format!("{:0>width$}", current, width = self.pk_len)); + self.push_pk(&mut q, current); q.push_param(second_column); Response::Empty } BombardTaskKind::Update => { q.push_param(1u64); - q.push_param(format!("{:0>width$}", current, width = self.pk_len)); + self.push_pk(&mut q, current); Response::Empty } BombardTaskKind::Delete => { - q.push_param(format!("{:0>width$}", current, width = self.pk_len)); + self.push_pk(&mut q, current); Response::Empty } }; (q, resp) } + fn push_pk(&self, q: &mut Query, current: u64) { + q.push_param(self.get_primary_key(current)); + } + fn get_primary_key(&self, current: u64) -> String { + format!("{:0>width$}", current, width = self.pk_len) + } } /// Errors while running a bombard @@ -229,36 +235,89 @@ fn print_table(data: Vec<(&'static str, RuntimeStats)>) { bench runner */ +struct BenchItem { + name: &'static str, + spec: BombardTaskSpec, + count: usize, +} + +impl BenchItem { + fn new(name: &'static str, spec: BombardTaskSpec, count: usize) -> Self { + Self { name, spec, count } + } + fn print_log_start(&self) { + info!( + "benchmarking `{}`. average payload size = {} bytes. queries = {}", + self.name, + self.spec.generate(0).0.debug_encode_packet().len(), + self.count + ) + } + fn run(self, pool: &mut BombardPool) -> BenchResult { + pool.blocking_bombard(self.spec, self.count) + .map_err(From::from) + } +} + fn bench_internal( config: BombardTask, bench: BenchConfig, ) -> BenchResult> { - let mut ret = vec![]; // initialize pool - info!("initializing connection pool"); - let mut pool = BombardPool::new(bench.threads, config)?; - // bench INSERT - info!("benchmarking `INSERT`"); - let insert = BombardTaskSpec::insert("insert into bench.bench(?, ?)".into(), bench.key_size, 0); - let insert_stats = pool.blocking_bombard(insert, bench.query_count)?; - ret.push(("INSERT", insert_stats)); - // bench UPDATE - info!("benchmarking `UPDATE`"); - let update = BombardTaskSpec::update( - "update bench.bench set pw += ? where un = ?".into(), - bench.key_size, + info!( + "initializing connections. threads={}, primary key size ={} bytes", + bench.threads, bench.key_size ); - let update_stats = pool.blocking_bombard(update, bench.query_count)?; - ret.push(("UPDATE", update_stats)); - // bench DELETE - info!("benchmarking `DELETE`"); - let delete = BombardTaskSpec::delete( - "delete from bench.bench where un = ?".into(), - bench.key_size, + let mut pool = BombardPool::new(bench.threads, config)?; + // prepare benches + let benches = vec![ + BenchItem::new( + "INSERT", + BombardTaskSpec::insert("insert into bench.bench(?, ?)".into(), bench.key_size, 0), + bench.query_count, + ), + BenchItem::new( + "UPDATE", + BombardTaskSpec::update( + "update bench.bench set pw += ? where un = ?".into(), + bench.key_size, + ), + bench.query_count, + ), + BenchItem::new( + "DELETE", + BombardTaskSpec::delete( + "delete from bench.bench where un = ?".into(), + bench.key_size, + ), + bench.query_count, + ), + ]; + // bench + let total_queries = bench.query_count as u64 * benches.len() as u64; + let mut results = vec![]; + for task in benches { + let name = task.name; + task.print_log_start(); + let this_result = task.run(&mut pool)?; + results.push((name, this_result)); + } + info!( + "benchmark complete. finished executing {} queries", + fmt_u64(total_queries) ); - let delete_stats = pool.blocking_bombard(delete, bench.query_count)?; - ret.push(("DELETE", delete_stats)); - info!("completed benchmarks. closing pool"); - drop(pool); - Ok(ret) + Ok(results) +} + +fn fmt_u64(n: u64) -> String { + let num_str = n.to_string(); + let mut result = String::new(); + let chars_rev: Vec<_> = num_str.chars().rev().collect(); + for (i, ch) in chars_rev.iter().enumerate() { + if i % 3 == 0 && i != 0 { + result.push(','); + } + result.push(*ch); + } + result.chars().rev().collect() } diff --git a/sky-bench/src/runtime.rs b/sky-bench/src/runtime.rs index 4386e1a9..5675c11e 100644 --- a/sky-bench/src/runtime.rs +++ b/sky-bench/src/runtime.rs @@ -320,13 +320,10 @@ impl BombardPool { let mut global_stop = None; let mut global_head = u128::MAX; let mut global_tail = 0u128; - let messages: Vec<::WorkerTaskSpec> = - (0..self.workers.len()) - .into_iter() - .map(|_| task_description.clone()) - .collect(); - for ((_, sender), msg) in self.workers.iter().zip(messages) { - sender.send(WorkerTask::Task(msg)).unwrap(); + for (_, sender) in self.workers.iter() { + sender + .send(WorkerTask::Task(task_description.clone())) + .unwrap(); } // wait for all workers to complete let mut received = 0;