From 49d213f66eef7b70a655fb4b40f69d4746238c2f Mon Sep 17 00:00:00 2001 From: Ziyang Hu Date: Tue, 6 Sep 2022 15:55:27 +0800 Subject: [PATCH] recombine main and view relations --- cozorocks/bridge/db.cpp | 46 +++++++++++-------- cozorocks/bridge/db.h | 7 ++- cozorocks/src/bridge/db.rs | 72 +++++++++++++++++++---------- cozorocks/src/bridge/mod.rs | 22 +++++---- src/algo/mod.rs | 4 +- src/query/pull.rs | 31 ++++--------- src/query/relation.rs | 16 ++++--- src/runtime/db.rs | 91 +++++++++++++------------------------ src/runtime/transact.rs | 6 +-- src/runtime/view.rs | 70 +++++++++++----------------- 10 files changed, 178 insertions(+), 187 deletions(-) diff --git a/cozorocks/bridge/db.cpp b/cozorocks/bridge/db.cpp index 56018e7d..c3faf203 100644 --- a/cozorocks/bridge/db.cpp +++ b/cozorocks/bridge/db.cpp @@ -46,7 +46,9 @@ ColumnFamilyOptions default_cf_options() { return options; } -shared_ptr open_db(const DbOpts &opts, RocksDbStatus &status, bool use_cmp, RustComparatorFn cmp_impl) { +shared_ptr open_db(const DbOpts &opts, RocksDbStatus &status, bool use_cmp, + RustComparatorFn pri_cmp_impl, + RustComparatorFn snd_cmp_impl) { auto options = default_db_options(); auto cf_pri_opts = default_cf_options(); auto cf_snd_opts = default_cf_options(); @@ -89,32 +91,40 @@ shared_ptr open_db(const DbOpts &opts, RocksDbStatus &status, boo cf_pri_opts.table_factory.reset(NewBlockBasedTableFactory(table_options)); cf_snd_opts.table_factory.reset(NewBlockBasedTableFactory(table_options)); } - if (opts.use_capped_prefix_extractor) { - options.prefix_extractor.reset(NewCappedPrefixTransform(opts.capped_prefix_extractor_len)); - cf_pri_opts.prefix_extractor.reset(NewCappedPrefixTransform(opts.capped_prefix_extractor_len)); - cf_snd_opts.prefix_extractor.reset(NewCappedPrefixTransform(opts.capped_prefix_extractor_len)); + if (opts.pri_use_capped_prefix_extractor) { + cf_pri_opts.prefix_extractor.reset(NewCappedPrefixTransform(opts.pri_capped_prefix_extractor_len)); } - if (opts.use_fixed_prefix_extractor) { - options.prefix_extractor.reset(NewFixedPrefixTransform(opts.fixed_prefix_extractor_len)); - cf_pri_opts.prefix_extractor.reset(NewFixedPrefixTransform(opts.fixed_prefix_extractor_len)); - cf_snd_opts.prefix_extractor.reset(NewFixedPrefixTransform(opts.fixed_prefix_extractor_len)); + if (opts.snd_use_capped_prefix_extractor) { + cf_snd_opts.prefix_extractor.reset(NewCappedPrefixTransform(opts.snd_capped_prefix_extractor_len)); } - RustComparator *cmp = nullptr; + if (opts.pri_use_fixed_prefix_extractor) { + cf_pri_opts.prefix_extractor.reset(NewFixedPrefixTransform(opts.pri_fixed_prefix_extractor_len)); + } + if (opts.snd_use_fixed_prefix_extractor) { + cf_pri_opts.prefix_extractor.reset(NewFixedPrefixTransform(opts.snd_fixed_prefix_extractor_len)); + } + RustComparator *pri_cmp = nullptr; + RustComparator *snd_cmp = nullptr; if (use_cmp) { - cmp = new RustComparator( - string(opts.comparator_name), - opts.comparator_different_bytes_can_be_equal, - cmp_impl); - options.comparator = cmp; - cf_pri_opts.comparator = cmp; - cf_snd_opts.comparator = cmp; + pri_cmp = new RustComparator( + string(opts.pri_comparator_name), + opts.pri_comparator_different_bytes_can_be_equal, + pri_cmp_impl); + cf_pri_opts.comparator = pri_cmp; + + snd_cmp = new RustComparator( + string(opts.snd_comparator_name), + opts.snd_comparator_different_bytes_can_be_equal, + snd_cmp_impl); + cf_snd_opts.comparator = snd_cmp; } options.create_missing_column_families = true; shared_ptr db = make_shared(); db->db_path = string(opts.db_path); - db->comparator.reset(cmp); + db->pri_comparator.reset(pri_cmp); + db->snd_comparator.reset(snd_cmp); std::vector column_families; column_families.emplace_back(ColumnFamilyDescriptor( diff --git a/cozorocks/bridge/db.h b/cozorocks/bridge/db.h index 42707bb4..60a7911a 100644 --- a/cozorocks/bridge/db.h +++ b/cozorocks/bridge/db.h @@ -41,7 +41,8 @@ struct SstFileWriterBridge { }; struct RocksDbBridge { - unique_ptr comparator; + unique_ptr pri_comparator; + unique_ptr snd_comparator; unique_ptr db; vector cf_handles; @@ -141,6 +142,8 @@ public: bool can_different_bytes_be_equal; }; -shared_ptr open_db(const DbOpts &opts, RocksDbStatus &status, bool use_cmp, RustComparatorFn cmp_impl); +shared_ptr +open_db(const DbOpts &opts, RocksDbStatus &status, bool use_cmp, RustComparatorFn pri_cmp_impl, + RustComparatorFn snd_cmp_impl); #endif //COZOROCKS_DB_H diff --git a/cozorocks/src/bridge/db.rs b/cozorocks/src/bridge/db.rs index 04a82cfa..a29b08a5 100644 --- a/cozorocks/src/bridge/db.rs +++ b/cozorocks/src/bridge/db.rs @@ -8,7 +8,8 @@ use crate::CfHandle; #[derive(Default, Clone)] pub struct DbBuilder<'a> { - pub cmp_fn: Option i8>, + pub pri_cmp_fn: Option i8>, + pub snd_cmp_fn: Option i8>, pub opts: DbOpts<'a>, } @@ -16,7 +17,6 @@ impl<'a> Default for DbOpts<'a> { fn default() -> Self { Self { db_path: "", - optimistic: false, prepare_for_bulk_load: false, increase_parallelism: 0, optimize_level_style_compaction: false, @@ -29,12 +29,18 @@ impl<'a> Default for DbOpts<'a> { use_bloom_filter: false, bloom_filter_bits_per_key: 0.0, bloom_filter_whole_key_filtering: false, - use_capped_prefix_extractor: false, - capped_prefix_extractor_len: 0, - use_fixed_prefix_extractor: false, - fixed_prefix_extractor_len: 0, - comparator_name: "", - comparator_different_bytes_can_be_equal: false, + pri_use_capped_prefix_extractor: false, + pri_capped_prefix_extractor_len: 0, + pri_use_fixed_prefix_extractor: false, + pri_fixed_prefix_extractor_len: 0, + pri_comparator_name: "", + pri_comparator_different_bytes_can_be_equal: false, + snd_use_capped_prefix_extractor: false, + snd_capped_prefix_extractor_len: 0, + snd_use_fixed_prefix_extractor: false, + snd_fixed_prefix_extractor_len: 0, + snd_comparator_name: "", + snd_comparator_different_bytes_can_be_equal: false, destroy_on_exit: false, } } @@ -45,10 +51,6 @@ impl<'a> DbBuilder<'a> { self.opts.db_path = path; self } - pub fn optimistic(mut self, val: bool) -> Self { - self.opts.optimistic = val; - self - } pub fn prepare_for_bulk_load(mut self, val: bool) -> Self { self.opts.prepare_for_bulk_load = val; self @@ -93,25 +95,46 @@ impl<'a> DbBuilder<'a> { self.opts.bloom_filter_whole_key_filtering = whole_key_filtering; self } - pub fn use_capped_prefix_extractor(mut self, enable: bool, len: usize) -> Self { - self.opts.use_capped_prefix_extractor = enable; - self.opts.capped_prefix_extractor_len = len; + pub fn pri_use_capped_prefix_extractor(mut self, enable: bool, len: usize) -> Self { + self.opts.pri_use_capped_prefix_extractor = enable; + self.opts.pri_capped_prefix_extractor_len = len; + self + } + pub fn snd_use_capped_prefix_extractor(mut self, enable: bool, len: usize) -> Self { + self.opts.snd_use_capped_prefix_extractor = enable; + self.opts.snd_capped_prefix_extractor_len = len; self } - pub fn use_fixed_prefix_extractor(mut self, enable: bool, len: usize) -> Self { - self.opts.use_fixed_prefix_extractor = enable; - self.opts.fixed_prefix_extractor_len = len; + pub fn pri_use_fixed_prefix_extractor(mut self, enable: bool, len: usize) -> Self { + self.opts.pri_use_fixed_prefix_extractor = enable; + self.opts.pri_fixed_prefix_extractor_len = len; + self + } + pub fn snd_use_fixed_prefix_extractor(mut self, enable: bool, len: usize) -> Self { + self.opts.snd_use_fixed_prefix_extractor = enable; + self.opts.snd_fixed_prefix_extractor_len = len; + self + } + pub fn pri_use_custom_comparator( + mut self, + name: &'a str, + cmp: fn(&[u8], &[u8]) -> i8, + different_bytes_can_be_equal: bool, + ) -> Self { + self.pri_cmp_fn = Some(cmp); + self.opts.pri_comparator_name = name; + self.opts.pri_comparator_different_bytes_can_be_equal = different_bytes_can_be_equal; self } - pub fn use_custom_comparator( + pub fn snd_use_custom_comparator( mut self, name: &'a str, cmp: fn(&[u8], &[u8]) -> i8, different_bytes_can_be_equal: bool, ) -> Self { - self.cmp_fn = Some(cmp); - self.opts.comparator_name = name; - self.opts.comparator_different_bytes_can_be_equal = different_bytes_can_be_equal; + self.snd_cmp_fn = Some(cmp); + self.opts.snd_comparator_name = name; + self.opts.snd_comparator_different_bytes_can_be_equal = different_bytes_can_be_equal; self } pub fn destroy_on_exit(mut self, destroy: bool) -> Self { @@ -128,8 +151,9 @@ impl<'a> DbBuilder<'a> { let result = open_db( &self.opts, &mut status, - self.cmp_fn.is_some(), - self.cmp_fn.unwrap_or(dummy), + self.pri_cmp_fn.is_some() || self.snd_cmp_fn.is_some(), + self.pri_cmp_fn.unwrap_or(dummy), + self.snd_cmp_fn.unwrap_or(dummy) ); if status.is_ok() { Ok(RocksDb { inner: result }) diff --git a/cozorocks/src/bridge/mod.rs b/cozorocks/src/bridge/mod.rs index 9b4080e0..f51add36 100644 --- a/cozorocks/src/bridge/mod.rs +++ b/cozorocks/src/bridge/mod.rs @@ -14,7 +14,6 @@ pub(crate) mod ffi { #[derive(Debug, Clone)] struct DbOpts<'a> { pub db_path: &'a str, - pub optimistic: bool, pub prepare_for_bulk_load: bool, pub increase_parallelism: usize, pub optimize_level_style_compaction: bool, @@ -27,12 +26,18 @@ pub(crate) mod ffi { pub use_bloom_filter: bool, pub bloom_filter_bits_per_key: f64, pub bloom_filter_whole_key_filtering: bool, - pub use_capped_prefix_extractor: bool, - pub capped_prefix_extractor_len: usize, - pub use_fixed_prefix_extractor: bool, - pub fixed_prefix_extractor_len: usize, - pub comparator_name: &'a str, - pub comparator_different_bytes_can_be_equal: bool, + pub pri_use_capped_prefix_extractor: bool, + pub pri_capped_prefix_extractor_len: usize, + pub pri_use_fixed_prefix_extractor: bool, + pub pri_fixed_prefix_extractor_len: usize, + pub snd_use_capped_prefix_extractor: bool, + pub snd_capped_prefix_extractor_len: usize, + pub snd_use_fixed_prefix_extractor: bool, + pub snd_fixed_prefix_extractor_len: usize, + pub pri_comparator_name: &'a str, + pub pri_comparator_different_bytes_can_be_equal: bool, + pub snd_comparator_name: &'a str, + pub snd_comparator_different_bytes_can_be_equal: bool, pub destroy_on_exit: bool, } @@ -119,7 +124,8 @@ pub(crate) mod ffi { builder: &DbOpts, status: &mut RocksDbStatus, use_cmp: bool, - cmp_impl: fn(&[u8], &[u8]) -> i8, + pri_cmp_impl: fn(&[u8], &[u8]) -> i8, + snd_cmp_impl: fn(&[u8], &[u8]) -> i8, ) -> SharedPtr; fn transact(self: &RocksDbBridge) -> UniquePtr; fn del_range( diff --git a/src/algo/mod.rs b/src/algo/mod.rs index 34a5bb31..4d4b3133 100644 --- a/src/algo/mod.rs +++ b/src/algo/mod.rs @@ -271,7 +271,7 @@ impl MagicAlgoRuleArg { MagicAlgoRuleArg::Stored(s, _) => { let view_rel = tx.get_view_rel(s)?; let t = Tuple(vec![prefix.clone()]); - Box::new(view_rel.scan_prefix(&t)) + Box::new(view_rel.scan_prefix(tx, &t)) } MagicAlgoRuleArg::Triple(attr, _, dir, vld) => { if *dir == TripleDir::Bwd && !attr.val_type.is_ref_type() { @@ -362,7 +362,7 @@ impl MagicAlgoRuleArg { } MagicAlgoRuleArg::Stored(s, _) => { let view_rel = tx.get_view_rel(s)?; - Box::new(view_rel.scan_all()?) + Box::new(view_rel.scan_all(tx)?) } MagicAlgoRuleArg::Triple(attr, _, dir, vld) => match dir { TripleDir::Fwd => { diff --git a/src/query/pull.rs b/src/query/pull.rs index 29b73609..58bcc57f 100644 --- a/src/query/pull.rs +++ b/src/query/pull.rs @@ -1,9 +1,9 @@ use std::collections::BTreeMap; use itertools::Itertools; -use miette::{ensure, miette, IntoDiagnostic, Result}; +use miette::{ensure, miette, Result}; use serde_json::{json, Map}; -use tempfile::NamedTempFile; + use cozorocks::CfHandle::Snd; use crate::data::attr::Attribute; @@ -47,9 +47,12 @@ impl SessionTx { res_iter: impl Iterator> + 'a, op: ViewOp, meta: &ViewRelMetadata, - ) -> Result<()> { + ) -> Result, Vec)>> { + let mut to_clear = None; if op == ViewOp::Rederive { - let _ = self.destroy_view_rel(&meta.name); + if let Ok(c) = self.destroy_view_rel(&meta.name) { + to_clear = Some(c); + } } let view_store = if op == ViewOp::Rederive || op == ViewOp::Create { self.create_view_rel(meta.clone())? @@ -68,33 +71,19 @@ impl SessionTx { found }; if op == ViewOp::Retract { - let mut vtx = self.view_db.transact().start(); - for data in res_iter { let data = data?; let encoded = data.encode_as_key(view_store.metadata.id); - vtx.del(&encoded, Snd)?; + self.tx.del(&encoded, Snd)?; } - - vtx.commit()?; } else { - let file = NamedTempFile::new().into_diagnostic()?; - let path = file.into_temp_path(); - let path = path.to_string_lossy(); - let mut writer = self.view_db.get_sst_writer(&path, Snd)?; - let mut written = false; for data in res_iter { let data = data?; let encoded = data.encode_as_key(view_store.metadata.id); - writer.put(&encoded, &[])?; - written = true; - } - if written { - writer.finish()?; - self.view_db.ingest_sst_file(&path, Snd)?; + self.tx.put(&encoded, &[], Snd)?; } } - Ok(()) + Ok(to_clear) } fn run_pull_on_item(&self, id: EntityId, specs: &[OutPullSpecWithAttr]) -> Result { let mut ret_map = Map::default(); diff --git a/src/query/relation.rs b/src/query/relation.rs index f6222171..64772f10 100644 --- a/src/query/relation.rs +++ b/src/query/relation.rs @@ -1350,6 +1350,7 @@ impl ViewRelation { fn prefix_join<'a>( &'a self, + tx: &'a SessionTx, left_iter: TupleIter<'a>, (left_join_indices, right_join_indices): (Vec, Vec), eliminate_indices: BTreeSet, @@ -1382,7 +1383,7 @@ impl ViewRelation { { return Left( self.storage - .scan_bounded_prefix(&prefix, &l_bound, &u_bound) + .scan_bounded_prefix(tx, &prefix, &l_bound, &u_bound) .filter_map_ok(move |found| { // dbg!("filter", &tuple, &prefix, &found); let mut ret = tuple.0.clone(); @@ -1395,7 +1396,7 @@ impl ViewRelation { skip_range_check = true; Right( self.storage - .scan_prefix(&prefix) + .scan_prefix(tx, &prefix) .filter_map_ok(move |found| { // dbg!("filter", &tuple, &prefix, &found); let mut ret = tuple.0.clone(); @@ -1423,6 +1424,7 @@ impl ViewRelation { fn neg_join<'a>( &'a self, + tx: &'a SessionTx, left_iter: TupleIter<'a>, (left_join_indices, right_join_indices): (Vec, Vec), eliminate_indices: BTreeSet, @@ -1448,7 +1450,7 @@ impl ViewRelation { .collect_vec(), ); - 'outer: for found in self.storage.scan_prefix(&prefix) { + 'outer: for found in self.storage.scan_prefix(tx, &prefix) { let found = found?; for (left_idx, right_idx) in left_join_indices.iter().zip(right_join_indices.iter()) @@ -1483,8 +1485,8 @@ impl ViewRelation { )) } - fn iter(&self) -> Result> { - let it = self.storage.scan_all()?; + fn iter(&self, tx: &SessionTx) -> Result> { + let it = self.storage.scan_all(tx)?; Ok(if self.filters.is_empty() { Box::new(it) } else { @@ -1838,7 +1840,7 @@ impl Relation { Relation::Fixed(f) => Ok(Box::new(f.data.iter().map(|t| Ok(Tuple(t.clone()))))), Relation::Triple(r) => r.iter(tx), Relation::Derived(r) => r.iter(epoch, use_delta), - Relation::View(v) => v.iter(), + Relation::View(v) => v.iter(tx), Relation::Join(j) => j.iter(tx, epoch, use_delta), Relation::Reorder(r) => r.iter(tx, epoch, use_delta), Relation::Filter(r) => r.iter(tx, epoch, use_delta), @@ -1917,6 +1919,7 @@ impl NegJoin { ) .unwrap(); v.neg_join( + tx, self.left.iter(tx, epoch, use_delta)?, join_indices, eliminate_indices, @@ -2036,6 +2039,7 @@ impl InnerJoin { .unwrap(); if r.join_is_prefix(&join_indices.1) { r.prefix_join( + tx, self.left.iter(tx, epoch, use_delta)?, join_indices, eliminate_indices, diff --git a/src/runtime/db.rs b/src/runtime/db.rs index eca2cc19..e048ed34 100644 --- a/src/runtime/db.rs +++ b/src/runtime/db.rs @@ -13,8 +13,8 @@ use miette::{bail, ensure, miette, IntoDiagnostic, Result}; use serde_json::json; use smartstring::SmartString; -use cozorocks::{DbBuilder, DbIter, RocksDb}; use cozorocks::CfHandle::{Pri, Snd}; +use cozorocks::{DbBuilder, DbIter, RocksDb}; use crate::data::compare::{rusty_cmp, DB_KEY_PREFIX_LEN}; use crate::data::encode::{largest_key, smallest_key}; @@ -53,7 +53,6 @@ impl Drop for RunningQueryCleanup { pub struct Db { db: RocksDb, - view_db: RocksDb, last_attr_id: Arc, last_ent_id: Arc, last_tx_id: Arc, @@ -79,28 +78,19 @@ impl Db { let path = builder.opts.db_path; fs::create_dir_all(path).into_diagnostic()?; let path_buf = PathBuf::from(path); - let mut triple_path = path_buf.clone(); - triple_path.push("triple"); + let mut store_path = path_buf.clone(); + store_path.push("data"); let db_builder = builder - .use_capped_prefix_extractor(true, DB_KEY_PREFIX_LEN) - .optimistic(false) - .use_custom_comparator("cozo_rusty_cmp", rusty_cmp, false) - .path(triple_path.to_str().unwrap()); - let mut rel_path = path_buf; - rel_path.push("rel"); - let view_db_builder = db_builder - .clone() - .optimistic(false) - .path(rel_path.to_str().unwrap()) - .use_capped_prefix_extractor(true, SCRATCH_DB_KEY_PREFIX_LEN) - .use_custom_comparator("cozo_rusty_scratch_cmp", rusty_scratch_cmp, false); + .pri_use_capped_prefix_extractor(true, DB_KEY_PREFIX_LEN) + .pri_use_custom_comparator("cozo_rusty_cmp", rusty_cmp, false) + .snd_use_capped_prefix_extractor(true, SCRATCH_DB_KEY_PREFIX_LEN) + .snd_use_custom_comparator("cozo_rusty_scratch_cmp", rusty_scratch_cmp, false) + .path(store_path.to_str().unwrap()); let db = db_builder.build()?; - let view_db = view_db_builder.build()?; let ret = Self { db, - view_db, last_attr_id: Arc::new(Default::default()), last_ent_id: Arc::new(Default::default()), last_tx_id: Arc::new(Default::default()), @@ -133,7 +123,6 @@ impl Db { Ok(Self { db: self.db.clone(), - view_db: self.view_db.clone(), last_attr_id: self.last_attr_id.clone(), last_ent_id: self.last_ent_id.clone(), last_tx_id: self.last_tx_id.clone(), @@ -160,7 +149,6 @@ impl Db { pub fn transact(&self) -> Result { let ret = SessionTx { tx: self.db.transact().set_snapshot(true).start(), - view_db: self.view_db.clone(), mem_store_id: Default::default(), view_store_id: self.view_store_id.clone(), w_tx_id: None, @@ -179,7 +167,6 @@ impl Db { let ret = SessionTx { tx: self.db.transact().set_snapshot(true).start(), - view_db: self.view_db.clone(), mem_store_id: Default::default(), view_store_id: self.view_store_id.clone(), w_tx_id: Some(cur_tx_id), @@ -197,34 +184,6 @@ impl Db { it.seek_to_start(); it } - // pub fn pull(&self, eid: &JsonValue, payload: &JsonValue, vld: &JsonValue) -> Result { - // let eid = EntityId::try_from(eid)?; - // let vld = match vld { - // JsonValue::Null => Validity::current(), - // v => Validity::try_from(v)?, - // }; - // let mut tx = self.transact()?; - // let specs = tx.parse_pull(payload, 0)?; - // let mut collected = Default::default(); - // let mut recursive_seen = Default::default(); - // for (idx, spec) in specs.iter().enumerate() { - // tx.pull( - // eid, - // vld, - // spec, - // 0, - // &specs, - // CurrentPath::new(idx)?, - // &mut collected, - // &mut recursive_seen, - // )?; - // } - // Ok(JsonValue::Object(collected)) - // } - // pub fn run_tx_triples(&self, payload: &str) -> Result { - // let payload = parse_tx_to_json(payload)?; - // self.transact_triples(&payload) - // } fn transact_triples(&self, payloads: Vec) -> Result { let mut tx = self.transact_write()?; let res: JsonValue = tx @@ -332,7 +291,11 @@ impl Db { } } fn run_query(&self, input_program: InputProgram) -> Result { - let mut tx = self.transact()?; + let mut tx = if input_program.out_opts.as_view.is_some() { + self.transact_write()? + } else { + self.transact()? + }; if let Some((meta, op)) = &input_program.out_opts.as_view { if *op == ViewOp::Create { ensure!( @@ -404,7 +367,11 @@ impl Db { Right(sorted_iter) }; if let Some((meta, view_op)) = input_program.out_opts.as_view { - tx.execute_view(sorted_iter, view_op, &meta)?; + let to_clear = tx.execute_view(sorted_iter, view_op, &meta)?; + tx.commit_tx("", false)?; + if let Some((lower, upper)) = to_clear { + self.db.range_del(&lower, &upper, Snd)?; + } Ok(json!({"view": "OK"})) } else { let ret: Vec<_> = tx.run_pull_on_query_results( @@ -427,7 +394,11 @@ impl Db { }; if let Some((meta, view_op)) = input_program.out_opts.as_view { - tx.execute_view(scan, view_op, &meta)?; + let to_clear = tx.execute_view(scan, view_op, &meta)?; + tx.commit_tx("", false)?; + if let Some((lower, upper)) = to_clear { + self.db.range_del(&lower, &upper, Snd)?; + } Ok(json!({"view": "OK"})) } else { let ret: Vec<_> = tx.run_pull_on_query_results( @@ -442,8 +413,10 @@ impl Db { } pub fn remove_view(&self, name: &str) -> Result<()> { let name = Symbol::from(name); - let tx = self.transact()?; - tx.destroy_view_rel(&name) + let mut tx = self.transact_write()?; + let (lower, upper) = tx.destroy_view_rel(&name)?; + self.db.range_del(&lower, &upper, Snd)?; + Ok(()) } pub fn list_running(&self) -> Result { let res = self @@ -461,7 +434,7 @@ impl Db { ks.push(DataValue::Str(SmartString::from(*el))); } let key = Tuple(ks).encode_as_key(ViewRelId::SYSTEM); - let mut vtx = self.view_db.transact().start(); + let mut vtx = self.db.transact().start(); vtx.put(&key, v, Snd)?; vtx.commit()?; Ok(()) @@ -472,7 +445,7 @@ impl Db { ks.push(DataValue::Str(SmartString::from(*el))); } let key = Tuple(ks).encode_as_key(ViewRelId::SYSTEM); - let mut vtx = self.view_db.transact().start(); + let mut vtx = self.db.transact().start(); vtx.del(&key, Snd)?; vtx.commit()?; Ok(()) @@ -483,7 +456,7 @@ impl Db { ks.push(DataValue::Str(SmartString::from(*el))); } let key = Tuple(ks).encode_as_key(ViewRelId::SYSTEM); - let vtx = self.view_db.transact().start(); + let vtx = self.db.transact().start(); Ok(match vtx.get(&key, false, Snd)? { None => None, Some(slice) => Some(slice.to_vec()), @@ -499,7 +472,7 @@ impl Db { } let upper_bound = Tuple(vec![DataValue::Bot]); let mut it = self - .view_db + .db .transact() .start() .iterator(Snd) @@ -557,7 +530,7 @@ impl Db { )))]) .encode_as_key(ViewRelId::SYSTEM); let mut it = self - .view_db + .db .transact() .start() .iterator(Snd) diff --git a/src/runtime/transact.rs b/src/runtime/transact.rs index 95f199db..668f0f2e 100644 --- a/src/runtime/transact.rs +++ b/src/runtime/transact.rs @@ -8,7 +8,7 @@ use rmp_serde::Serializer; use serde::Serialize; use smallvec::SmallVec; -use cozorocks::{DbIter, RocksDb, Tx}; +use cozorocks::{DbIter, Tx}; use cozorocks::CfHandle::{Pri, Snd}; use crate::data::attr::Attribute; @@ -25,7 +25,6 @@ use crate::runtime::view::ViewRelId; pub struct SessionTx { pub(crate) tx: Tx, - pub(crate) view_db: RocksDb, pub(crate) view_store_id: Arc, pub(crate) mem_store_id: Arc, pub(crate) w_tx_id: Option, @@ -116,8 +115,7 @@ impl SessionTx { pub(crate) fn load_last_view_store_id(&self) -> Result { let tuple = Tuple(vec![DataValue::Null]); let t_encoded = tuple.encode_as_key(ViewRelId::SYSTEM); - let vtx = self.view_db.transact().start(); - let found = vtx.get(&t_encoded, false, Snd)?; + let found = self.tx.get(&t_encoded, false, Snd)?; match found { None => Ok(ViewRelId::SYSTEM), Some(slice) => ViewRelId::raw_decode(&slice), diff --git a/src/runtime/view.rs b/src/runtime/view.rs index 4e605979..7cf6ec8e 100644 --- a/src/runtime/view.rs +++ b/src/runtime/view.rs @@ -6,7 +6,7 @@ use rmp_serde::Serializer; use serde::Serialize; use cozorocks::CfHandle::Snd; -use cozorocks::{DbIter, RocksDb, Tx}; +use cozorocks::DbIter; use crate::data::symb::Symbol; use crate::data::tuple::{EncodedTuple, Tuple}; @@ -70,7 +70,6 @@ pub(crate) struct ViewRelMetadata { #[derive(Clone)] pub(crate) struct ViewRelStore { - view_db: RocksDb, pub(crate) metadata: ViewRelMetadata, } @@ -81,21 +80,26 @@ impl Debug for ViewRelStore { } impl ViewRelStore { - pub(crate) fn scan_all(&self) -> Result>> { + pub(crate) fn scan_all(&self, tx: &SessionTx) -> Result>> { let lower = Tuple::default().encode_as_key(self.metadata.id); let upper = Tuple::default().encode_as_key(self.metadata.id.next()?); - Ok(ViewRelIterator::new(&self.view_db, &lower, &upper)) + Ok(ViewRelIterator::new(tx, &lower, &upper)) } - pub(crate) fn scan_prefix(&self, prefix: &Tuple) -> impl Iterator> { + pub(crate) fn scan_prefix( + &self, + tx: &SessionTx, + prefix: &Tuple, + ) -> impl Iterator> { let mut upper = prefix.0.clone(); upper.push(DataValue::Bot); let prefix_encoded = prefix.encode_as_key(self.metadata.id); let upper_encoded = Tuple(upper).encode_as_key(self.metadata.id); - ViewRelIterator::new(&self.view_db, &prefix_encoded, &upper_encoded) + ViewRelIterator::new(tx, &prefix_encoded, &upper_encoded) } pub(crate) fn scan_bounded_prefix( &self, + tx: &SessionTx, prefix: &Tuple, lower: &[DataValue], upper: &[DataValue], @@ -107,7 +111,7 @@ impl ViewRelStore { upper_t.0.push(DataValue::Bot); let lower_encoded = lower_t.encode_as_key(self.metadata.id); let upper_encoded = upper_t.encode_as_key(self.metadata.id); - ViewRelIterator::new(&self.view_db, &lower_encoded, &upper_encoded) + ViewRelIterator::new(tx, &lower_encoded, &upper_encoded) } } @@ -117,13 +121,8 @@ struct ViewRelIterator { } impl ViewRelIterator { - fn new(db: &RocksDb, lower: &[u8], upper: &[u8]) -> Self { - let mut inner = db - .transact() - .start() - .iterator(Snd) - .upper_bound(upper) - .start(); + fn new(sess: &SessionTx, lower: &[u8], upper: &[u8]) -> Self { + let mut inner = sess.tx.iterator(Snd).upper_bound(upper).start(); inner.seek(lower); Self { inner, @@ -154,15 +153,13 @@ impl SessionTx { pub(crate) fn view_exists(&self, name: &Symbol) -> Result { let key = DataValue::Str(name.0.clone()); let encoded = Tuple(vec![key]).encode_as_key(ViewRelId::SYSTEM); - let vtx = self.view_db.transact().start(); - Ok(vtx.exists(&encoded, false, Snd)?) + Ok(self.tx.exists(&encoded, false, Snd)?) } - pub(crate) fn create_view_rel(&self, mut meta: ViewRelMetadata) -> Result { + pub(crate) fn create_view_rel(&mut self, mut meta: ViewRelMetadata) -> Result { let key = DataValue::Str(meta.name.0.clone()); let encoded = Tuple(vec![key]).encode_as_key(ViewRelId::SYSTEM); - let mut vtx = self.view_db.transact().set_snapshot(true).start(); - if vtx.exists(&encoded, true, Snd)? { + if self.tx.exists(&encoded, true, Snd)? { bail!( "cannot create view {}: one with the same name already exists", meta.name @@ -170,50 +167,37 @@ impl SessionTx { }; let last_id = self.view_store_id.fetch_add(1, Ordering::SeqCst); meta.id = ViewRelId::new(last_id + 1)?; - vtx.put(&encoded, &meta.id.raw_encode(), Snd)?; + self.tx.put(&encoded, &meta.id.raw_encode(), Snd)?; let name_key = Tuple(vec![DataValue::Str(meta.name.0.clone())]).encode_as_key(ViewRelId::SYSTEM); let mut meta_val = vec![]; meta.serialize(&mut Serializer::new(&mut meta_val)).unwrap(); - vtx.put(&name_key, &meta_val, Snd)?; + self.tx.put(&name_key, &meta_val, Snd)?; let tuple = Tuple(vec![DataValue::Null]); let t_encoded = tuple.encode_as_key(ViewRelId::SYSTEM); - vtx.put(&t_encoded, &meta.id.raw_encode(), Snd)?; - vtx.commit()?; - Ok(ViewRelStore { - view_db: self.view_db.clone(), - metadata: meta, - }) + self.tx.put(&t_encoded, &meta.id.raw_encode(), Snd)?; + Ok(ViewRelStore { metadata: meta }) } pub(crate) fn get_view_rel(&self, name: &Symbol) -> Result { - let vtx = self.view_db.transact().start(); - self.do_get_view_rel(name, &vtx) - } - fn do_get_view_rel(&self, name: &Symbol, vtx: &Tx) -> Result { let key = DataValue::Str(name.0.clone()); let encoded = Tuple(vec![key]).encode_as_key(ViewRelId::SYSTEM); - let found = vtx + let found = self + .tx .get(&encoded, true, Snd)? .ok_or_else(|| miette!("cannot find stored view {}", name))?; let metadata: ViewRelMetadata = rmp_serde::from_slice(&found).into_diagnostic()?; - Ok(ViewRelStore { - view_db: self.view_db.clone(), - metadata, - }) + Ok(ViewRelStore { metadata }) } - pub(crate) fn destroy_view_rel(&self, name: &Symbol) -> Result<()> { - let mut vtx = self.view_db.transact().start(); - let store = self.do_get_view_rel(name, &vtx)?; + pub(crate) fn destroy_view_rel(&mut self, name: &Symbol) -> Result<(Vec, Vec)> { + let store = self.get_view_rel(name)?; let key = DataValue::Str(name.0.clone()); let encoded = Tuple(vec![key]).encode_as_key(ViewRelId::SYSTEM); - vtx.del(&encoded, Snd)?; + self.tx.del(&encoded, Snd)?; let lower_bound = Tuple::default().encode_as_key(store.metadata.id); let upper_bound = Tuple::default().encode_as_key(store.metadata.id.next()?); - self.view_db.range_del(&lower_bound, &upper_bound, Snd)?; - vtx.commit()?; - Ok(()) + Ok((lower_bound, upper_bound)) } }