Make sure that model drops are synchronized across the system

next
Sayan Nandan 10 months ago
parent 250a2b3c16
commit a3d87e4850
No known key found for this signature in database
GPG Key ID: 42EEDF4AE9D96B54

2
Cargo.lock generated

@ -1232,7 +1232,7 @@ dependencies = [
[[package]]
name = "skytable"
version = "0.8.0"
source = "git+https://github.com/skytable/client-rust.git?branch=octave#ce16be88204044cd8358a5aa48521bd30e60ae33"
source = "git+https://github.com/skytable/client-rust.git?branch=octave#18aad43142fbe013ace64d92c3ffd71ab1394a18"
dependencies = [
"async-trait",
"bb8",

@ -44,7 +44,6 @@ pub fn delete_resp(
pub fn delete(global: &impl GlobalInstanceLike, mut delete: DeleteStatement) -> QueryResult<()> {
core::with_model_for_data_update(global, delete.entity(), |model| {
let g = sync::atm::cpin();
let schema_version = model.delta_state().schema_current_version();
let delta_state = model.delta_state();
// create new version
let new_version = delta_state.create_new_data_delta_version();
@ -57,7 +56,6 @@ pub fn delete(global: &impl GlobalInstanceLike, mut delete: DeleteStatement) ->
let dp = delta_state.append_new_data_delta_with(
DataDeltaKind::Delete,
row.clone(),
schema_version,
new_version,
&g,
);

@ -57,13 +57,7 @@ pub fn insert(global: &impl GlobalInstanceLike, insert: InsertStatement) -> Quer
let row = Row::new(pk, data, ds.schema_current_version(), new_version);
if mdl.primary_index().__raw_index().mt_insert(row.clone(), &g) {
// append delta for new version
let dp = ds.append_new_data_delta_with(
DataDeltaKind::Insert,
row,
ds.schema_current_version(),
new_version,
&g,
);
let dp = ds.append_new_data_delta_with(DataDeltaKind::Insert, row, new_version, &g);
Ok(QueryExecMeta::new(dp))
} else {
Err(QueryError::QExecDmlDuplicate)

@ -372,13 +372,8 @@ pub fn update(global: &impl GlobalInstanceLike, mut update: UpdateStatement) ->
// update revised tag
row_data_wl.set_txn_revised(new_version);
// publish delta
let dp = ds.append_new_data_delta_with(
DataDeltaKind::Update,
row.clone(),
ds.schema_current_version(),
new_version,
&g,
);
let dp =
ds.append_new_data_delta_with(DataDeltaKind::Update, row.clone(), new_version, &g);
ret = Ok(QueryExecMeta::new(dp))
}
ret

@ -43,7 +43,6 @@ pub type DcFieldIndex = IndexST<Box<str>, Datacell, HasherNativeFx>;
#[derive(Debug)]
pub struct Row {
__genesis_schema_version: DeltaVersion,
__pk: ManuallyDrop<PrimaryIndexKey>,
__rc: RawRC<RwLock<RowData>>,
}
@ -121,7 +120,6 @@ impl Row {
restore_txn_id: DeltaVersion,
) -> Self {
Self {
__genesis_schema_version: schema_version,
__pk: ManuallyDrop::new(pk),
__rc: unsafe {
// UNSAFE(@ohsayan): we free this up later

@ -193,11 +193,10 @@ impl DeltaState {
&self,
kind: DataDeltaKind,
row: Row,
schema_version: DeltaVersion,
data_version: DeltaVersion,
g: &Guard,
) -> usize {
self.append_new_data_delta(DataDelta::new(schema_version, data_version, row, kind), g)
self.append_new_data_delta(DataDelta::new(data_version, row, kind), g)
}
pub fn append_new_data_delta(&self, delta: DataDelta, g: &Guard) -> usize {
self.data_deltas.blocking_enqueue(delta, g);
@ -348,21 +347,14 @@ impl<'a> SchemaDeltaIndexRGuard<'a> {
#[derive(Debug, Clone)]
pub struct DataDelta {
_schema_version: DeltaVersion,
data_version: DeltaVersion,
row: Row,
change: DataDeltaKind,
}
impl DataDelta {
pub const fn new(
schema_version: DeltaVersion,
data_version: DeltaVersion,
row: Row,
change: DataDeltaKind,
) -> Self {
pub const fn new(data_version: DeltaVersion, row: Row, change: DataDeltaKind) -> Self {
Self {
_schema_version: schema_version,
data_version,
row,
change,

@ -290,12 +290,12 @@ impl Model {
// commit txn
global.namespace_txn_driver().lock().try_commit(txn)?;
// request cleanup
global.taskmgr_post_standard_priority(Task::new(GenericTask::delete_model_dir(
&space_name,
global.purge_model_driver(
space_name.as_str(),
space.get_uuid(),
&model_name,
model_name.as_str(),
model.get_uuid(),
)));
);
}
// update global state
let _ = models_idx.remove(&EntityIDRef::new(&space_name, &model_name));

@ -340,13 +340,13 @@ impl FractalMgr {
_ = sigterm.recv() => {
info!("flp: finishing any pending maintenance tasks");
let global = global.clone();
tokio::task::spawn_blocking(|| self.general_executor_model_maintenance(global)).await.unwrap();
tokio::task::spawn_blocking(|| self.general_executor(global)).await.unwrap();
info!("flp: exited executor service");
break;
},
_ = tokio::time::sleep(dur) => {
let global = global.clone();
tokio::task::spawn_blocking(|| self.general_executor_model_maintenance(global)).await.unwrap()
tokio::task::spawn_blocking(|| self.general_executor(global)).await.unwrap()
}
task = lpq.recv() => {
let Task { threshold, task } = match task {
@ -377,7 +377,7 @@ impl FractalMgr {
}
}
}
fn general_executor_model_maintenance(&'static self, global: super::Global) {
fn general_executor(&'static self, global: super::Global) {
let mdl_drivers = global.get_state().get_mdl_drivers().read();
for (model_id, driver) in mdl_drivers.iter() {
let mut observed_len = 0;

@ -120,6 +120,13 @@ pub trait GlobalInstanceLike {
model_name: &str,
model_uuid: Uuid,
) -> RuntimeResult<()>;
fn purge_model_driver(
&self,
space_name: &str,
space_uuid: Uuid,
model_name: &str,
model_uuid: Uuid,
);
// taskmgr
fn taskmgr_post_high_priority(&self, task: Task<CriticalTask>);
fn taskmgr_post_standard_priority(&self, task: Task<GenericTask>);
@ -174,6 +181,23 @@ impl GlobalInstanceLike for Global {
&self.get_state().config
}
// model
fn purge_model_driver(
&self,
space_name: &str,
space_uuid: Uuid,
model_name: &str,
model_uuid: Uuid,
) {
let id = ModelUniqueID::new(space_name, model_name, model_uuid);
self.get_state()
.mdl_driver
.write()
.remove(&id)
.expect("tried to remove non existent driver");
self.taskmgr_post_standard_priority(Task::new(GenericTask::delete_model_dir(
space_name, space_uuid, model_name, model_uuid,
)));
}
fn initialize_model_driver(
&self,
space_name: &str,

@ -120,6 +120,22 @@ impl<Fs: RawFSInterface> GlobalInstanceLike for TestGlobal<Fs> {
fn sys_store(&self) -> &SystemStore<Fs> {
&self.sys_cfg
}
fn purge_model_driver(
&self,
space_name: &str,
space_uuid: Uuid,
model_name: &str,
model_uuid: Uuid,
) {
let id = ModelUniqueID::new(space_name, model_name, model_uuid);
self.model_drivers
.write()
.remove(&id)
.expect("tried to remove non-existent model");
self.taskmgr_post_standard_priority(Task::new(GenericTask::delete_model_dir(
space_name, space_uuid, model_name, model_uuid,
)));
}
fn initialize_model_driver(
&self,
space_name: &str,

@ -81,7 +81,6 @@ fn new_delta(
change: DataDeltaKind,
) -> DataDelta {
new_delta_with_row(
schema,
txnid,
Row::new(
pkey(pk),
@ -93,13 +92,8 @@ fn new_delta(
)
}
fn new_delta_with_row(schema: u64, txnid: u64, row: Row, change: DataDeltaKind) -> DataDelta {
DataDelta::new(
DeltaVersion::__new(schema),
DeltaVersion::__new(txnid),
row,
change,
)
fn new_delta_with_row(txnid: u64, row: Row, change: DataDeltaKind) -> DataDelta {
DataDelta::new(DeltaVersion::__new(txnid), row, change)
}
fn flush_deltas_and_re_read<const N: usize>(
@ -247,7 +241,7 @@ fn skewed_delta() {
// prepare deltas
let deltas = [
// insert catname: Schrödinger's cat, is_good: true
new_delta_with_row(0, 0, row.clone(), DataDeltaKind::Insert),
new_delta_with_row(0, row.clone(), DataDeltaKind::Insert),
// insert catname: good cat, is_good: true, magical: false
new_delta(
0,
@ -265,7 +259,7 @@ fn skewed_delta() {
DataDeltaKind::Insert,
),
// update catname: Schrödinger's cat, is_good: true, magical: true
new_delta_with_row(0, 3, row.clone(), DataDeltaKind::Update),
new_delta_with_row(3, row.clone(), DataDeltaKind::Update),
];
let batch = flush_deltas_and_re_read(&mdl, deltas, "skewed_delta.db-btlog");
assert_eq!(
@ -352,10 +346,10 @@ fn skewed_shuffled_persist_restore() {
into_dict!("password" => "pwd456789"),
DataDeltaKind::Insert,
),
new_delta_with_row(0, 4, mongobongo.clone(), DataDeltaKind::Insert),
new_delta_with_row(0, 5, rds.clone(), DataDeltaKind::Insert),
new_delta_with_row(0, 6, mongobongo.clone(), DataDeltaKind::Delete),
new_delta_with_row(0, 7, rds.clone(), DataDeltaKind::Delete),
new_delta_with_row(4, mongobongo.clone(), DataDeltaKind::Insert),
new_delta_with_row(5, rds.clone(), DataDeltaKind::Insert),
new_delta_with_row(6, mongobongo.clone(), DataDeltaKind::Delete),
new_delta_with_row(7, rds.clone(), DataDeltaKind::Delete),
];
for i in 0..deltas.len() {
// prepare pretest

@ -91,22 +91,28 @@ impl BombardTaskSpec {
let mut q = query!(&self.base_query);
let resp = match self.kind {
BombardTaskKind::Insert(second_column) => {
q.push_param(format!("{:0>width$}", current, width = self.pk_len));
self.push_pk(&mut q, current);
q.push_param(second_column);
Response::Empty
}
BombardTaskKind::Update => {
q.push_param(1u64);
q.push_param(format!("{:0>width$}", current, width = self.pk_len));
self.push_pk(&mut q, current);
Response::Empty
}
BombardTaskKind::Delete => {
q.push_param(format!("{:0>width$}", current, width = self.pk_len));
self.push_pk(&mut q, current);
Response::Empty
}
};
(q, resp)
}
fn push_pk(&self, q: &mut Query, current: u64) {
q.push_param(self.get_primary_key(current));
}
fn get_primary_key(&self, current: u64) -> String {
format!("{:0>width$}", current, width = self.pk_len)
}
}
/// Errors while running a bombard
@ -229,36 +235,89 @@ fn print_table(data: Vec<(&'static str, RuntimeStats)>) {
bench runner
*/
struct BenchItem {
name: &'static str,
spec: BombardTaskSpec,
count: usize,
}
impl BenchItem {
fn new(name: &'static str, spec: BombardTaskSpec, count: usize) -> Self {
Self { name, spec, count }
}
fn print_log_start(&self) {
info!(
"benchmarking `{}`. average payload size = {} bytes. queries = {}",
self.name,
self.spec.generate(0).0.debug_encode_packet().len(),
self.count
)
}
fn run(self, pool: &mut BombardPool<BombardTask>) -> BenchResult<RuntimeStats> {
pool.blocking_bombard(self.spec, self.count)
.map_err(From::from)
}
}
fn bench_internal(
config: BombardTask,
bench: BenchConfig,
) -> BenchResult<Vec<(&'static str, RuntimeStats)>> {
let mut ret = vec![];
// initialize pool
info!("initializing connection pool");
let mut pool = BombardPool::new(bench.threads, config)?;
// bench INSERT
info!("benchmarking `INSERT`");
let insert = BombardTaskSpec::insert("insert into bench.bench(?, ?)".into(), bench.key_size, 0);
let insert_stats = pool.blocking_bombard(insert, bench.query_count)?;
ret.push(("INSERT", insert_stats));
// bench UPDATE
info!("benchmarking `UPDATE`");
let update = BombardTaskSpec::update(
"update bench.bench set pw += ? where un = ?".into(),
bench.key_size,
info!(
"initializing connections. threads={}, primary key size ={} bytes",
bench.threads, bench.key_size
);
let update_stats = pool.blocking_bombard(update, bench.query_count)?;
ret.push(("UPDATE", update_stats));
// bench DELETE
info!("benchmarking `DELETE`");
let delete = BombardTaskSpec::delete(
"delete from bench.bench where un = ?".into(),
bench.key_size,
let mut pool = BombardPool::new(bench.threads, config)?;
// prepare benches
let benches = vec![
BenchItem::new(
"INSERT",
BombardTaskSpec::insert("insert into bench.bench(?, ?)".into(), bench.key_size, 0),
bench.query_count,
),
BenchItem::new(
"UPDATE",
BombardTaskSpec::update(
"update bench.bench set pw += ? where un = ?".into(),
bench.key_size,
),
bench.query_count,
),
BenchItem::new(
"DELETE",
BombardTaskSpec::delete(
"delete from bench.bench where un = ?".into(),
bench.key_size,
),
bench.query_count,
),
];
// bench
let total_queries = bench.query_count as u64 * benches.len() as u64;
let mut results = vec![];
for task in benches {
let name = task.name;
task.print_log_start();
let this_result = task.run(&mut pool)?;
results.push((name, this_result));
}
info!(
"benchmark complete. finished executing {} queries",
fmt_u64(total_queries)
);
let delete_stats = pool.blocking_bombard(delete, bench.query_count)?;
ret.push(("DELETE", delete_stats));
info!("completed benchmarks. closing pool");
drop(pool);
Ok(ret)
Ok(results)
}
fn fmt_u64(n: u64) -> String {
let num_str = n.to_string();
let mut result = String::new();
let chars_rev: Vec<_> = num_str.chars().rev().collect();
for (i, ch) in chars_rev.iter().enumerate() {
if i % 3 == 0 && i != 0 {
result.push(',');
}
result.push(*ch);
}
result.chars().rev().collect()
}

@ -320,13 +320,10 @@ impl<Bt: ThreadedBombardTask> BombardPool<Bt> {
let mut global_stop = None;
let mut global_head = u128::MAX;
let mut global_tail = 0u128;
let messages: Vec<<Bt as ThreadedBombardTask>::WorkerTaskSpec> =
(0..self.workers.len())
.into_iter()
.map(|_| task_description.clone())
.collect();
for ((_, sender), msg) in self.workers.iter().zip(messages) {
sender.send(WorkerTask::Task(msg)).unwrap();
for (_, sender) in self.workers.iter() {
sender
.send(WorkerTask::Task(task_description.clone()))
.unwrap();
}
// wait for all workers to complete
let mut received = 0;

Loading…
Cancel
Save