diff --git a/cozorocks/bridge/db.cpp b/cozorocks/bridge/db.cpp index 00cee530..56018e7d 100644 --- a/cozorocks/bridge/db.cpp +++ b/cozorocks/bridge/db.cpp @@ -7,15 +7,15 @@ #include "db.h" #include "cozorocks/src/bridge/mod.rs.h" -unique_ptr default_db_options() { - auto options = make_unique(); - options->bottommost_compression = kZSTD; - options->compression = kLZ4Compression; - options->level_compaction_dynamic_level_bytes = true; - options->max_background_compactions = 4; - options->max_background_flushes = 2; - options->bytes_per_sync = 1048576; - options->compaction_pri = kMinOverlappingRatio; +Options default_db_options() { + Options options = Options(); + options.bottommost_compression = kZSTD; + options.compression = kLZ4Compression; + options.level_compaction_dynamic_level_bytes = true; + options.max_background_compactions = 4; + options.max_background_flushes = 2; + options.bytes_per_sync = 1048576; + options.compaction_pri = kMinOverlappingRatio; BlockBasedTableOptions table_options; table_options.block_size = 16 * 1024; table_options.cache_index_and_filter_blocks = true; @@ -23,42 +23,81 @@ unique_ptr default_db_options() { table_options.format_version = 5; auto table_factory = NewBlockBasedTableFactory(table_options); - options->table_factory.reset(table_factory); + options.table_factory.reset(table_factory); + + return options; +} + +ColumnFamilyOptions default_cf_options() { + ColumnFamilyOptions options = ColumnFamilyOptions(); + options.bottommost_compression = kZSTD; + options.compression = kLZ4Compression; + options.level_compaction_dynamic_level_bytes = true; + options.compaction_pri = kMinOverlappingRatio; + BlockBasedTableOptions table_options; + table_options.block_size = 16 * 1024; + table_options.cache_index_and_filter_blocks = true; + table_options.pin_l0_filter_and_index_blocks_in_cache = true; + table_options.format_version = 5; + + auto table_factory = NewBlockBasedTableFactory(table_options); + options.table_factory.reset(table_factory); return options; } shared_ptr open_db(const DbOpts &opts, RocksDbStatus &status, bool use_cmp, RustComparatorFn cmp_impl) { auto options = default_db_options(); + auto cf_pri_opts = default_cf_options(); + auto cf_snd_opts = default_cf_options(); if (opts.prepare_for_bulk_load) { - options->PrepareForBulkLoad(); + options.PrepareForBulkLoad(); } if (opts.increase_parallelism > 0) { - options->IncreaseParallelism(opts.increase_parallelism); + options.IncreaseParallelism(opts.increase_parallelism); } if (opts.optimize_level_style_compaction) { - options->OptimizeLevelStyleCompaction(); + options.OptimizeLevelStyleCompaction(); + cf_pri_opts.OptimizeLevelStyleCompaction(); + cf_snd_opts.OptimizeLevelStyleCompaction(); } - options->create_if_missing = opts.create_if_missing; - options->paranoid_checks = opts.paranoid_checks; + options.create_if_missing = opts.create_if_missing; + options.paranoid_checks = opts.paranoid_checks; if (opts.enable_blob_files) { - options->enable_blob_files = true; - options->min_blob_size = opts.min_blob_size; - options->blob_file_size = opts.blob_file_size; - options->enable_blob_garbage_collection = opts.enable_blob_garbage_collection; + options.enable_blob_files = true; + cf_pri_opts.enable_blob_files = true; + cf_snd_opts.enable_blob_files = true; + + options.min_blob_size = opts.min_blob_size; + cf_pri_opts.min_blob_size = opts.min_blob_size; + cf_snd_opts.min_blob_size = opts.min_blob_size; + + options.blob_file_size = opts.blob_file_size; + cf_pri_opts.blob_file_size = opts.blob_file_size; + cf_snd_opts.blob_file_size = opts.blob_file_size; + + options.enable_blob_garbage_collection = opts.enable_blob_garbage_collection; + cf_pri_opts.enable_blob_garbage_collection = opts.enable_blob_garbage_collection; + cf_snd_opts.enable_blob_garbage_collection = opts.enable_blob_garbage_collection; } if (opts.use_bloom_filter) { BlockBasedTableOptions table_options; table_options.filter_policy.reset(NewBloomFilterPolicy(opts.bloom_filter_bits_per_key, false)); table_options.whole_key_filtering = opts.bloom_filter_whole_key_filtering; - options->table_factory.reset(NewBlockBasedTableFactory(table_options)); + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + cf_pri_opts.table_factory.reset(NewBlockBasedTableFactory(table_options)); + cf_snd_opts.table_factory.reset(NewBlockBasedTableFactory(table_options)); } if (opts.use_capped_prefix_extractor) { - options->prefix_extractor.reset(NewCappedPrefixTransform(opts.capped_prefix_extractor_len)); + options.prefix_extractor.reset(NewCappedPrefixTransform(opts.capped_prefix_extractor_len)); + cf_pri_opts.prefix_extractor.reset(NewCappedPrefixTransform(opts.capped_prefix_extractor_len)); + cf_snd_opts.prefix_extractor.reset(NewCappedPrefixTransform(opts.capped_prefix_extractor_len)); } if (opts.use_fixed_prefix_extractor) { - options->prefix_extractor.reset(NewFixedPrefixTransform(opts.fixed_prefix_extractor_len)); + options.prefix_extractor.reset(NewFixedPrefixTransform(opts.fixed_prefix_extractor_len)); + cf_pri_opts.prefix_extractor.reset(NewFixedPrefixTransform(opts.fixed_prefix_extractor_len)); + cf_snd_opts.prefix_extractor.reset(NewFixedPrefixTransform(opts.fixed_prefix_extractor_len)); } RustComparator *cmp = nullptr; if (use_cmp) { @@ -66,25 +105,33 @@ shared_ptr open_db(const DbOpts &opts, RocksDbStatus &status, boo string(opts.comparator_name), opts.comparator_different_bytes_can_be_equal, cmp_impl); - options->comparator = cmp; + options.comparator = cmp; + cf_pri_opts.comparator = cmp; + cf_snd_opts.comparator = cmp; } + options.create_missing_column_families = true; - shared_ptr db_wrapper = shared_ptr(nullptr); + shared_ptr db = make_shared(); - auto db = new RocksDbBridge(); - db->options = std::move(options); db->db_path = string(opts.db_path); - db->tdb_opts = make_unique(); db->comparator.reset(cmp); + std::vector column_families; + column_families.emplace_back(ColumnFamilyDescriptor( + rocksdb::kDefaultColumnFamilyName, cf_pri_opts)); + column_families.emplace_back(ColumnFamilyDescriptor( + "relation", cf_snd_opts)); + TransactionDB *txn_db = nullptr; - write_status(TransactionDB::Open(*db->options, *db->tdb_opts, db->db_path, &txn_db), status); + write_status( + TransactionDB::Open(options, TransactionDBOptions(), db->db_path, column_families, &db->cf_handles, + &txn_db), + status); db->db.reset(txn_db); db->destroy_on_exit = opts.destroy_on_exit; - db_wrapper.reset(db); - return db_wrapper; + return db; } RocksDbBridge::~RocksDbBridge() { @@ -95,7 +142,8 @@ RocksDbBridge::~RocksDbBridge() { cerr << status.ToString() << endl; } db.reset(); - auto status2 = DestroyDB(db_path, *options); + Options options{}; + auto status2 = DestroyDB(db_path, options); if (!status2.ok()) { cerr << status2.ToString() << endl; } diff --git a/cozorocks/bridge/db.h b/cozorocks/bridge/db.h index e5ca5dd7..42707bb4 100644 --- a/cozorocks/bridge/db.h +++ b/cozorocks/bridge/db.h @@ -42,16 +42,16 @@ struct SstFileWriterBridge { struct RocksDbBridge { unique_ptr comparator; - unique_ptr options; - unique_ptr tdb_opts; unique_ptr db; + vector cf_handles; bool destroy_on_exit; string db_path; - inline unique_ptr get_sst_writer(rust::Str path, RocksDbStatus &status) const { + inline unique_ptr get_sst_writer(rust::Str path, size_t idx, RocksDbStatus &status) const { DB *db_ = get_base_db(); - Options options_ = db_->GetOptions(); + auto cf = cf_handles[idx]; + Options options_ = db_->GetOptions(cf); auto sst_file_writer = std::make_unique(EnvOptions(), options_); string path_(path); @@ -59,11 +59,12 @@ struct RocksDbBridge { return sst_file_writer; } - inline void ingest_sst(rust::Str path, RocksDbStatus &status) const { + inline void ingest_sst(rust::Str path, size_t idx, RocksDbStatus &status) const { IngestExternalFileOptions ifo; DB *db_ = get_base_db(); string path_(path); - write_status(db_->IngestExternalFile({std::move(path_)}, ifo), status); + auto cf = cf_handles[idx]; + write_status(db_->IngestExternalFile(cf, {std::move(path_)}, ifo), status); } [[nodiscard]] inline const string &get_db_path() const { @@ -72,13 +73,14 @@ struct RocksDbBridge { [[nodiscard]] inline unique_ptr transact() const { - auto ret = make_unique(&*this->db); + auto ret = make_unique(&*this->db, cf_handles); return ret; } - inline void del_range(RustBytes start, RustBytes end, RocksDbStatus &status) const { + inline void del_range(RustBytes start, RustBytes end, size_t idx, RocksDbStatus &status) const { WriteBatch batch; - auto s = batch.DeleteRange(db->DefaultColumnFamily(), convert_slice(start), convert_slice(end)); + auto cf = cf_handles[idx]; + auto s = batch.DeleteRange(cf, convert_slice(start), convert_slice(end)); if (!s.ok()) { write_status(s, status); return; @@ -91,11 +93,12 @@ struct RocksDbBridge { write_status(s2, status); } - void compact_range(RustBytes start, RustBytes end, RocksDbStatus &status) const { + void compact_range(RustBytes start, RustBytes end, size_t idx, RocksDbStatus &status) const { CompactRangeOptions options; + auto cf = cf_handles[idx]; auto start_s = convert_slice(start); auto end_s = convert_slice(end); - auto s = db->CompactRange(options, &start_s, &end_s); + auto s = db->CompactRange(options, cf, &start_s, &end_s); write_status(s, status); } diff --git a/cozorocks/bridge/iter.h b/cozorocks/bridge/iter.h index e491e4b6..bff3c3e2 100644 --- a/cozorocks/bridge/iter.h +++ b/cozorocks/bridge/iter.h @@ -17,16 +17,13 @@ struct IterBridge { string upper_storage; Slice lower_bound; Slice upper_bound; + ColumnFamilyHandle *cf; unique_ptr r_opts; - explicit IterBridge(Transaction *tx_) : db(nullptr), tx(tx_), iter(nullptr), lower_bound(), upper_bound(), - r_opts(new ReadOptions) { - r_opts->ignore_range_deletions = true; - r_opts->auto_prefix_mode = true; - } - - explicit IterBridge(DB *db_) : db(db_), tx(nullptr), iter(nullptr), lower_bound(), upper_bound(), - r_opts(new ReadOptions) { + explicit IterBridge(Transaction *tx_, ColumnFamilyHandle *cf_) : db(nullptr), tx(tx_), iter(nullptr), lower_bound(), + upper_bound(), + cf(cf_), + r_opts(new ReadOptions) { r_opts->ignore_range_deletions = true; r_opts->auto_prefix_mode = true; } @@ -88,9 +85,9 @@ struct IterBridge { inline void start() { if (db == nullptr) { - iter.reset(tx->GetIterator(*r_opts)); + iter.reset(tx->GetIterator(*r_opts, cf)); } else { - iter.reset(db->NewIterator(*r_opts)); + iter.reset(db->NewIterator(*r_opts, cf)); } } diff --git a/cozorocks/bridge/tx.h b/cozorocks/bridge/tx.h index 39dacabd..6439e7ef 100644 --- a/cozorocks/bridge/tx.h +++ b/cozorocks/bridge/tx.h @@ -18,26 +18,17 @@ struct TxBridge { unique_ptr r_opts; unique_ptr o_tx_opts; unique_ptr p_tx_opts; + vector cf_handles; - explicit TxBridge(OptimisticTransactionDB *odb_) : - odb(odb_), - tdb(nullptr), - tx(), - w_opts(new WriteOptions), - r_opts(new ReadOptions), - o_tx_opts(new OptimisticTransactionOptions), - p_tx_opts(nullptr) { - r_opts->ignore_range_deletions = true; - } - - explicit TxBridge(TransactionDB *tdb_) : + explicit TxBridge(TransactionDB *tdb_, vector cf_handles_) : odb(nullptr), tdb(tdb_), tx(), w_opts(new WriteOptions), r_opts(new ReadOptions), o_tx_opts(nullptr), - p_tx_opts(new TransactionOptions) { + p_tx_opts(new TransactionOptions), + cf_handles(cf_handles_) { r_opts->ignore_range_deletions = true; } @@ -57,8 +48,9 @@ struct TxBridge { r_opts->fill_cache = val; } - inline unique_ptr iterator() const { - return make_unique(&*tx); + inline unique_ptr iterator(size_t idx) const { + auto cf = cf_handles[idx]; + return make_unique(&*tx, cf); }; inline void set_snapshot(bool val) { @@ -87,37 +79,41 @@ struct TxBridge { void start(); - inline unique_ptr get(RustBytes key, bool for_update, RocksDbStatus &status) const { + inline unique_ptr get(RustBytes key, bool for_update, size_t idx, RocksDbStatus &status) const { Slice key_ = convert_slice(key); auto ret = make_unique(); + auto cf = cf_handles[idx]; if (for_update) { - auto s = tx->GetForUpdate(*r_opts, get_db()->DefaultColumnFamily(), key_, &*ret); + auto s = tx->GetForUpdate(*r_opts, cf, key_, &*ret); write_status(s, status); } else { - auto s = tx->Get(*r_opts, key_, &*ret); + auto s = tx->Get(*r_opts, cf, key_, &*ret); write_status(s, status); } return ret; } - inline void exists(RustBytes key, bool for_update, RocksDbStatus &status) const { + inline void exists(RustBytes key, bool for_update, size_t idx, RocksDbStatus &status) const { Slice key_ = convert_slice(key); + auto cf = cf_handles[idx]; auto ret = PinnableSlice(); if (for_update) { - auto s = tx->GetForUpdate(*r_opts, get_db()->DefaultColumnFamily(), key_, &ret); + auto s = tx->GetForUpdate(*r_opts, cf, key_, &ret); write_status(s, status); } else { - auto s = tx->Get(*r_opts, key_, &ret); + auto s = tx->Get(*r_opts, cf, key_, &ret); write_status(s, status); } } - inline void put(RustBytes key, RustBytes val, RocksDbStatus &status) { - write_status(tx->Put(convert_slice(key), convert_slice(val)), status); + inline void put(RustBytes key, RustBytes val, size_t idx, RocksDbStatus &status) { + auto cf = cf_handles[idx]; + write_status(tx->Put(cf, convert_slice(key), convert_slice(val)), status); } - inline void del(RustBytes key, RocksDbStatus &status) { - write_status(tx->Delete(convert_slice(key)), status); + inline void del(RustBytes key, size_t idx, RocksDbStatus &status) { + auto cf = cf_handles[idx]; + write_status(tx->Delete(cf, convert_slice(key)), status); } inline void commit(RocksDbStatus &status) { diff --git a/cozorocks/src/bridge/db.rs b/cozorocks/src/bridge/db.rs index 730f012f..04a82cfa 100644 --- a/cozorocks/src/bridge/db.rs +++ b/cozorocks/src/bridge/db.rs @@ -4,6 +4,7 @@ use cxx::*; use crate::bridge::ffi::*; use crate::bridge::tx::TxBuilder; +use crate::CfHandle; #[derive(Default, Clone)] pub struct DbBuilder<'a> { @@ -153,9 +154,9 @@ impl RocksDb { } } #[inline] - pub fn range_del(&self, lower: &[u8], upper: &[u8]) -> Result<(), RocksDbStatus> { + pub fn range_del(&self, lower: &[u8], upper: &[u8], handle: CfHandle) -> Result<(), RocksDbStatus> { let mut status = RocksDbStatus::default(); - self.inner.del_range(lower, upper, &mut status); + self.inner.del_range(lower, upper, handle.into(), &mut status); if status.is_ok() { Ok(()) } else { @@ -163,27 +164,27 @@ impl RocksDb { } } #[inline] - pub fn range_compact(&self, lower: &[u8], upper: &[u8]) -> Result<(), RocksDbStatus> { + pub fn range_compact(&self, lower: &[u8], upper: &[u8], handle: CfHandle) -> Result<(), RocksDbStatus> { let mut status = RocksDbStatus::default(); - self.inner.compact_range(lower, upper, &mut status); + self.inner.compact_range(lower, upper, handle.into(), &mut status); if status.is_ok() { Ok(()) } else { Err(status) } } - pub fn get_sst_writer(&self, path: &str) -> Result { + pub fn get_sst_writer(&self, path: &str, handle: CfHandle) -> Result { let mut status = RocksDbStatus::default(); - let ret = self.inner.get_sst_writer(path, &mut status); + let ret = self.inner.get_sst_writer(path, handle.into(), &mut status); if status.is_ok() { Ok(SstWriter { inner: ret }) } else { Err(status) } } - pub fn ingest_sst_file(&self, path: &str) -> Result<(), RocksDbStatus> { + pub fn ingest_sst_file(&self, path: &str, handle: CfHandle) -> Result<(), RocksDbStatus> { let mut status = RocksDbStatus::default(); - self.inner.ingest_sst(path, &mut status); + self.inner.ingest_sst(path, handle.into(), &mut status); if status.is_ok() { Ok(()) } else { diff --git a/cozorocks/src/bridge/mod.rs b/cozorocks/src/bridge/mod.rs index c6659ce9..9b4080e0 100644 --- a/cozorocks/src/bridge/mod.rs +++ b/cozorocks/src/bridge/mod.rs @@ -122,19 +122,27 @@ pub(crate) mod ffi { cmp_impl: fn(&[u8], &[u8]) -> i8, ) -> SharedPtr; fn transact(self: &RocksDbBridge) -> UniquePtr; - fn del_range(self: &RocksDbBridge, lower: &[u8], upper: &[u8], status: &mut RocksDbStatus); + fn del_range( + self: &RocksDbBridge, + lower: &[u8], + upper: &[u8], + idx: usize, + status: &mut RocksDbStatus, + ); fn compact_range( self: &RocksDbBridge, lower: &[u8], upper: &[u8], + idx: usize, status: &mut RocksDbStatus, ); fn get_sst_writer( self: &RocksDbBridge, path: &str, + idx: usize, status: &mut RocksDbStatus, ) -> UniquePtr; - fn ingest_sst(self: &RocksDbBridge, path: &str, status: &mut RocksDbStatus); + fn ingest_sst(self: &RocksDbBridge, path: &str, idx: usize, status: &mut RocksDbStatus); type SstFileWriterBridge; fn put( @@ -157,17 +165,30 @@ pub(crate) mod ffi { self: &TxBridge, key: &[u8], for_update: bool, + idx: usize, status: &mut RocksDbStatus, ) -> UniquePtr; - fn exists(self: &TxBridge, key: &[u8], for_update: bool, status: &mut RocksDbStatus); - fn put(self: Pin<&mut TxBridge>, key: &[u8], val: &[u8], status: &mut RocksDbStatus); - fn del(self: Pin<&mut TxBridge>, key: &[u8], status: &mut RocksDbStatus); + fn exists( + self: &TxBridge, + key: &[u8], + for_update: bool, + idx: usize, + status: &mut RocksDbStatus, + ); + fn put( + self: Pin<&mut TxBridge>, + key: &[u8], + val: &[u8], + idx: usize, + status: &mut RocksDbStatus, + ); + fn del(self: Pin<&mut TxBridge>, key: &[u8], idx: usize, status: &mut RocksDbStatus); fn commit(self: Pin<&mut TxBridge>, status: &mut RocksDbStatus); fn rollback(self: Pin<&mut TxBridge>, status: &mut RocksDbStatus); fn rollback_to_savepoint(self: Pin<&mut TxBridge>, status: &mut RocksDbStatus); fn pop_savepoint(self: Pin<&mut TxBridge>, status: &mut RocksDbStatus); fn set_savepoint(self: Pin<&mut TxBridge>); - fn iterator(self: &TxBridge) -> UniquePtr; + fn iterator(self: &TxBridge, idx: usize) -> UniquePtr; type IterBridge; fn start(self: Pin<&mut IterBridge>); diff --git a/cozorocks/src/bridge/tx.rs b/cozorocks/src/bridge/tx.rs index 19606a41..7217d238 100644 --- a/cozorocks/src/bridge/tx.rs +++ b/cozorocks/src/bridge/tx.rs @@ -5,6 +5,7 @@ use cxx::*; use crate::bridge::ffi::*; use crate::bridge::iter::IterBuilder; +use crate::CfHandle; pub struct TxBuilder { pub(crate) inner: UniquePtr, @@ -84,9 +85,9 @@ impl Tx { self.inner.pin_mut().clear_snapshot() } #[inline] - pub fn put(&mut self, key: &[u8], val: &[u8]) -> Result<(), RocksDbStatus> { + pub fn put(&mut self, key: &[u8], val: &[u8], handle: CfHandle) -> Result<(), RocksDbStatus> { let mut status = RocksDbStatus::default(); - self.inner.pin_mut().put(key, val, &mut status); + self.inner.pin_mut().put(key, val, handle.into(), &mut status); if status.is_ok() { Ok(()) } else { @@ -94,9 +95,9 @@ impl Tx { } } #[inline] - pub fn del(&mut self, key: &[u8]) -> Result<(), RocksDbStatus> { + pub fn del(&mut self, key: &[u8], handle: CfHandle) -> Result<(), RocksDbStatus> { let mut status = RocksDbStatus::default(); - self.inner.pin_mut().del(key, &mut status); + self.inner.pin_mut().del(key, handle.into(), &mut status); if status.is_ok() { Ok(()) } else { @@ -104,9 +105,9 @@ impl Tx { } } #[inline] - pub fn get(&self, key: &[u8], for_update: bool) -> Result, RocksDbStatus> { + pub fn get(&self, key: &[u8], for_update: bool, handle: CfHandle) -> Result, RocksDbStatus> { let mut status = RocksDbStatus::default(); - let ret = self.inner.get(key, for_update, &mut status); + let ret = self.inner.get(key, for_update, handle.into(), &mut status); match status.code { StatusCode::kOk => Ok(Some(PinSlice { inner: ret })), StatusCode::kNotFound => Ok(None), @@ -114,9 +115,9 @@ impl Tx { } } #[inline] - pub fn exists(&self, key: &[u8], for_update: bool) -> Result { + pub fn exists(&self, key: &[u8], for_update: bool, handle: CfHandle) -> Result { let mut status = RocksDbStatus::default(); - self.inner.exists(key, for_update, &mut status); + self.inner.exists(key, for_update, handle.into(), &mut status); match status.code { StatusCode::kOk => Ok(true), StatusCode::kNotFound => Ok(false), @@ -168,9 +169,9 @@ impl Tx { } } #[inline] - pub fn iterator(&self) -> IterBuilder { + pub fn iterator(&self, handle: CfHandle) -> IterBuilder { IterBuilder { - inner: self.inner.iterator(), + inner: self.inner.iterator(handle.into()), } .auto_prefix_mode(true) } diff --git a/cozorocks/src/lib.rs b/cozorocks/src/lib.rs index d98cbc07..fc02011b 100644 --- a/cozorocks/src/lib.rs +++ b/cozorocks/src/lib.rs @@ -13,5 +13,20 @@ pub use bridge::tx::TxBuilder; pub(crate) mod bridge; +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +pub enum CfHandle { + Pri, + Snd, +} + +impl From for usize { + fn from(s: CfHandle) -> Self { + match s { + CfHandle::Pri => 0, + CfHandle::Snd => 1, + } + } +} + #[cfg(test)] mod tests; diff --git a/src/query/pull.rs b/src/query/pull.rs index f6549e9a..29b73609 100644 --- a/src/query/pull.rs +++ b/src/query/pull.rs @@ -4,6 +4,7 @@ use itertools::Itertools; use miette::{ensure, miette, IntoDiagnostic, Result}; use serde_json::{json, Map}; use tempfile::NamedTempFile; +use cozorocks::CfHandle::Snd; use crate::data::attr::Attribute; use crate::data::id::{EntityId, Validity}; @@ -72,7 +73,7 @@ impl SessionTx { for data in res_iter { let data = data?; let encoded = data.encode_as_key(view_store.metadata.id); - vtx.del(&encoded)?; + vtx.del(&encoded, Snd)?; } vtx.commit()?; @@ -80,14 +81,18 @@ impl SessionTx { let file = NamedTempFile::new().into_diagnostic()?; let path = file.into_temp_path(); let path = path.to_string_lossy(); - let mut writer = self.view_db.get_sst_writer(&path)?; + let mut writer = self.view_db.get_sst_writer(&path, Snd)?; + let mut written = false; for data in res_iter { let data = data?; let encoded = data.encode_as_key(view_store.metadata.id); writer.put(&encoded, &[])?; + written = true; + } + if written { + writer.finish()?; + self.view_db.ingest_sst_file(&path, Snd)?; } - writer.finish()?; - self.view_db.ingest_sst_file(&path)?; } Ok(()) } diff --git a/src/runtime/db.rs b/src/runtime/db.rs index 6fc46068..eca2cc19 100644 --- a/src/runtime/db.rs +++ b/src/runtime/db.rs @@ -14,6 +14,7 @@ use serde_json::json; use smartstring::SmartString; use cozorocks::{DbBuilder, DbIter, RocksDb}; +use cozorocks::CfHandle::{Pri, Snd}; use crate::data::compare::{rusty_cmp, DB_KEY_PREFIX_LEN}; use crate::data::encode::{largest_key, smallest_key}; @@ -116,14 +117,14 @@ impl Db { pub fn compact_main(&self) -> Result<()> { let l = smallest_key(); let u = largest_key(); - self.db.range_compact(&l, &u)?; + self.db.range_compact(&l, &u, Pri)?; Ok(()) } pub fn compact_view(&self) -> Result<()> { let l = Tuple::default().encode_as_key(ViewRelId(0)); let u = Tuple(vec![DataValue::Bot]).encode_as_key(ViewRelId(u64::MAX)); - self.db.range_compact(&l, &u)?; + self.db.range_compact(&l, &u, Snd)?; Ok(()) } @@ -192,7 +193,7 @@ impl Db { Ok(ret) } pub fn total_iter(&self) -> DbIter { - let mut it = self.db.transact().start().iterator().start(); + let mut it = self.db.transact().start().iterator(Pri).start(); it.seek_to_start(); it } @@ -461,7 +462,7 @@ impl Db { } let key = Tuple(ks).encode_as_key(ViewRelId::SYSTEM); let mut vtx = self.view_db.transact().start(); - vtx.put(&key, v)?; + vtx.put(&key, v, Snd)?; vtx.commit()?; Ok(()) } @@ -472,7 +473,7 @@ impl Db { } let key = Tuple(ks).encode_as_key(ViewRelId::SYSTEM); let mut vtx = self.view_db.transact().start(); - vtx.del(&key)?; + vtx.del(&key, Snd)?; vtx.commit()?; Ok(()) } @@ -483,7 +484,7 @@ impl Db { } let key = Tuple(ks).encode_as_key(ViewRelId::SYSTEM); let vtx = self.view_db.transact().start(); - Ok(match vtx.get(&key, false)? { + Ok(match vtx.get(&key, false, Snd)? { None => None, Some(slice) => Some(slice.to_vec()), }) @@ -501,7 +502,7 @@ impl Db { .view_db .transact() .start() - .iterator() + .iterator(Snd) .upper_bound(&upper_bound.encode_as_key(ViewRelId::SYSTEM)) .start(); it.seek(&lower_bound.encode_as_key(ViewRelId::SYSTEM)); @@ -559,7 +560,7 @@ impl Db { .view_db .transact() .start() - .iterator() + .iterator(Snd) .upper_bound(&upper) .start(); it.seek(&lower); diff --git a/src/runtime/transact.rs b/src/runtime/transact.rs index 5dfb899d..95f199db 100644 --- a/src/runtime/transact.rs +++ b/src/runtime/transact.rs @@ -9,6 +9,7 @@ use serde::Serialize; use smallvec::SmallVec; use cozorocks::{DbIter, RocksDb, Tx}; +use cozorocks::CfHandle::{Pri, Snd}; use crate::data::attr::Attribute; use crate::data::encode::{ @@ -116,7 +117,7 @@ impl SessionTx { let tuple = Tuple(vec![DataValue::Null]); let t_encoded = tuple.encode_as_key(ViewRelId::SYSTEM); let vtx = self.view_db.transact().start(); - let found = vtx.get(&t_encoded, false)?; + let found = vtx.get(&t_encoded, false, Snd)?; match found { None => Ok(ViewRelId::SYSTEM), Some(slice) => ViewRelId::raw_decode(&slice), @@ -138,7 +139,7 @@ impl SessionTx { let encoded = encode_tx(tx_id); let log = TxLog::new(tx_id, comment); - self.tx.put(&encoded, &log.encode())?; + self.tx.put(&encoded, &log.encode(), Pri)?; self.tx.commit()?; if refresh { let new_tx_id = TxId(self.last_tx_id.fetch_add(1, Ordering::AcqRel) + 1); @@ -155,7 +156,7 @@ impl SessionTx { } pub(crate) fn bounded_scan_first(&self, lower: &[u8], upper: &[u8]) -> DbIter { // this is tricky, must be written like this! - let mut it = self.tx.iterator().upper_bound(upper).start(); + let mut it = self.tx.iterator(Pri).upper_bound(upper).start(); it.seek(lower); it } @@ -164,7 +165,7 @@ impl SessionTx { // this is tricky, must be written like this! let mut it = self .tx - .iterator() + .iterator(Pri) .lower_bound(lower) .upper_bound(upper) .start(); diff --git a/src/runtime/view.rs b/src/runtime/view.rs index 21f82750..4e605979 100644 --- a/src/runtime/view.rs +++ b/src/runtime/view.rs @@ -5,6 +5,7 @@ use miette::{bail, miette, IntoDiagnostic, Result}; use rmp_serde::Serializer; use serde::Serialize; +use cozorocks::CfHandle::Snd; use cozorocks::{DbIter, RocksDb, Tx}; use crate::data::symb::Symbol; @@ -117,7 +118,12 @@ struct ViewRelIterator { impl ViewRelIterator { fn new(db: &RocksDb, lower: &[u8], upper: &[u8]) -> Self { - let mut inner = db.transact().start().iterator().upper_bound(upper).start(); + let mut inner = db + .transact() + .start() + .iterator(Snd) + .upper_bound(upper) + .start(); inner.seek(lower); Self { inner, @@ -149,14 +155,14 @@ impl SessionTx { let key = DataValue::Str(name.0.clone()); let encoded = Tuple(vec![key]).encode_as_key(ViewRelId::SYSTEM); let vtx = self.view_db.transact().start(); - Ok(vtx.exists(&encoded, false)?) + Ok(vtx.exists(&encoded, false, Snd)?) } pub(crate) fn create_view_rel(&self, mut meta: ViewRelMetadata) -> Result { let key = DataValue::Str(meta.name.0.clone()); let encoded = Tuple(vec![key]).encode_as_key(ViewRelId::SYSTEM); let mut vtx = self.view_db.transact().set_snapshot(true).start(); - if vtx.exists(&encoded, true)? { + if vtx.exists(&encoded, true, Snd)? { bail!( "cannot create view {}: one with the same name already exists", meta.name @@ -164,17 +170,17 @@ impl SessionTx { }; let last_id = self.view_store_id.fetch_add(1, Ordering::SeqCst); meta.id = ViewRelId::new(last_id + 1)?; - vtx.put(&encoded, &meta.id.raw_encode())?; + vtx.put(&encoded, &meta.id.raw_encode(), Snd)?; let name_key = Tuple(vec![DataValue::Str(meta.name.0.clone())]).encode_as_key(ViewRelId::SYSTEM); let mut meta_val = vec![]; meta.serialize(&mut Serializer::new(&mut meta_val)).unwrap(); - vtx.put(&name_key, &meta_val)?; + vtx.put(&name_key, &meta_val, Snd)?; let tuple = Tuple(vec![DataValue::Null]); let t_encoded = tuple.encode_as_key(ViewRelId::SYSTEM); - vtx.put(&t_encoded, &meta.id.raw_encode())?; + vtx.put(&t_encoded, &meta.id.raw_encode(), Snd)?; vtx.commit()?; Ok(ViewRelStore { view_db: self.view_db.clone(), @@ -190,7 +196,7 @@ impl SessionTx { let encoded = Tuple(vec![key]).encode_as_key(ViewRelId::SYSTEM); let found = vtx - .get(&encoded, true)? + .get(&encoded, true, Snd)? .ok_or_else(|| miette!("cannot find stored view {}", name))?; let metadata: ViewRelMetadata = rmp_serde::from_slice(&found).into_diagnostic()?; Ok(ViewRelStore { @@ -203,10 +209,10 @@ impl SessionTx { let store = self.do_get_view_rel(name, &vtx)?; let key = DataValue::Str(name.0.clone()); let encoded = Tuple(vec![key]).encode_as_key(ViewRelId::SYSTEM); - vtx.del(&encoded)?; + vtx.del(&encoded, Snd)?; let lower_bound = Tuple::default().encode_as_key(store.metadata.id); let upper_bound = Tuple::default().encode_as_key(store.metadata.id.next()?); - self.view_db.range_del(&lower_bound, &upper_bound)?; + self.view_db.range_del(&lower_bound, &upper_bound, Snd)?; vtx.commit()?; Ok(()) } diff --git a/src/transact/meta.rs b/src/transact/meta.rs index 0da42308..c3fb2b6b 100644 --- a/src/transact/meta.rs +++ b/src/transact/meta.rs @@ -3,6 +3,7 @@ use std::sync::atomic::Ordering; use miette::{bail, ensure, miette, Result}; use cozorocks::{DbIter, IterBuilder}; +use cozorocks::CfHandle::Pri; use crate::data::attr::Attribute; use crate::data::encode::{ @@ -44,7 +45,7 @@ impl SessionTx { } let anchor = encode_sentinel_attr_by_id(aid); - Ok(match self.tx.get(&anchor, false)? { + Ok(match self.tx.get(&anchor, false, Pri)? { None => { self.attr_by_id_cache.borrow_mut().insert(aid, None); None @@ -76,7 +77,7 @@ impl SessionTx { } let anchor = encode_sentinel_attr_by_name(name); - Ok(match self.tx.get(&anchor, false)? { + Ok(match self.tx.get(&anchor, false, Pri)? { None => { self.attr_by_kw_cache .borrow_mut() @@ -108,7 +109,7 @@ impl SessionTx { } pub(crate) fn all_attrs(&mut self) -> impl Iterator> { - AttrIter::new(self.tx.iterator()) + AttrIter::new(self.tx.iterator(Pri)) } /// conflict if new attribute has same name as existing one @@ -153,7 +154,7 @@ impl SessionTx { ); let kw_sentinel = encode_sentinel_attr_by_name(&existing.name); let attr_data = existing.encode_with_op_and_tx(StoreOp::Retract, tx_id); - self.tx.put(&kw_sentinel, &attr_data)?; + self.tx.put(&kw_sentinel, &attr_data, Pri)?; } self.put_attr(&attr, StoreOp::Assert) } @@ -162,11 +163,11 @@ impl SessionTx { let tx_id = self.get_write_tx_id()?; let attr_data = attr.encode_with_op_and_tx(op, tx_id); let id_encoded = encode_attr_by_id(attr.id, tx_id); - self.tx.put(&id_encoded, &attr_data)?; + self.tx.put(&id_encoded, &attr_data, Pri)?; let id_sentinel = encode_sentinel_attr_by_id(attr.id); - self.tx.put(&id_sentinel, &attr_data)?; + self.tx.put(&id_sentinel, &attr_data, Pri)?; let kw_sentinel = encode_sentinel_attr_by_name(&attr.name); - self.tx.put(&kw_sentinel, &attr_data)?; + self.tx.put(&kw_sentinel, &attr_data, Pri)?; Ok(attr.id) } diff --git a/src/transact/triple.rs b/src/transact/triple.rs index a546a89e..91f9dd99 100644 --- a/src/transact/triple.rs +++ b/src/transact/triple.rs @@ -4,6 +4,7 @@ use std::sync::atomic::Ordering; use miette::{bail, ensure, miette, Result}; use smartstring::{LazyCompact, SmartString}; +use cozorocks::CfHandle::Pri; use cozorocks::{DbIter, IterBuilder}; use crate::data::attr::Attribute; @@ -145,23 +146,22 @@ impl SessionTx { let aev_encoded = encode_aev_key(attr.id, eid, v_in_key, vld_in_key); if real_delete { - self.tx.del(&aev_encoded)?; + self.tx.del(&aev_encoded, Pri)?; } else { - self.tx.put(&aev_encoded, &val_encoded)?; + self.tx.put(&aev_encoded, &val_encoded, Pri)?; } // vae for ref types if attr.val_type.is_ref_type() { let vae_encoded = encode_vae_key(v.get_entity_id()?, attr.id, eid, vld_in_key); if real_delete { - self.tx.del(&vae_encoded)?; + self.tx.del(&vae_encoded, Pri)?; } else { - self.tx - .put( - &vae_encoded, - &DataValue::Guard.encode_with_op_and_tx(op, tx_id), - ) - ?; + self.tx.put( + &vae_encoded, + &DataValue::Guard.encode_with_op_and_tx(op, tx_id), + Pri, + )?; } } @@ -202,7 +202,7 @@ impl SessionTx { v ); } - } else if let Some(v_slice) = self.tx.get(&ave_encoded, false)? { + } else if let Some(v_slice) = self.tx.get(&ave_encoded, false, Pri)? { let found_eid = decode_value_from_val(&v_slice)?.get_entity_id()?; ensure!( found_eid == eid, @@ -214,27 +214,23 @@ impl SessionTx { } let e_in_val_encoded = eid.as_datavalue().encode_with_op_and_tx(op, tx_id); if real_delete { - self.tx.del(&ave_encoded)?; + self.tx.del(&ave_encoded, Pri)?; } else { - self.tx - .put(&ave_encoded, &e_in_val_encoded) - ?; + self.tx.put(&ave_encoded, &e_in_val_encoded, Pri)?; } - self.tx - .put( - &encode_sentinel_attr_val(attr.id, v), - &tx_id.bytes_with_op(op), - ) - ?; + self.tx.put( + &encode_sentinel_attr_val(attr.id, v), + &tx_id.bytes_with_op(op), + Pri, + )?; } - self.tx - .put( - &encode_sentinel_entity_attr(eid, attr.id), - &tx_id.bytes_with_op(op), - ) - ?; + self.tx.put( + &encode_sentinel_entity_attr(eid, attr.id), + &tx_id.bytes_with_op(op), + Pri, + )?; Ok(eid) } @@ -286,11 +282,7 @@ impl SessionTx { let lower = encode_ave_key_for_unique_v(attr.id, v, vld); let upper = encode_ave_key_for_unique_v(attr.id, v, Validity::MIN); Ok( - if let Some(v_slice) = self - .bounded_scan_first(&lower, &upper) - .val() - ? - { + if let Some(v_slice) = self.bounded_scan_first(&lower, &upper).val()? { if StoreOp::try_from(v_slice[0])?.is_assert() { let eid = decode_value(&v_slice[8..])?.get_entity_id()?; let ret = Some(eid); @@ -320,7 +312,7 @@ impl SessionTx { ) -> impl Iterator> { let lower = encode_aev_key(aid, eid, &DataValue::Null, Validity::MAX); let upper = encode_aev_key(aid, eid, &DataValue::Bot, Validity::MIN); - TripleAttrEntityIter::new(self.tx.iterator(), lower, upper) + TripleAttrEntityIter::new(self.tx.iterator(Pri), lower, upper) } pub(crate) fn triple_ae_range_scan( &self, @@ -331,7 +323,7 @@ impl SessionTx { ) -> impl Iterator> { let lower = encode_aev_key(aid, eid, &v_lower, Validity::MAX); let upper = encode_aev_key(aid, eid, &DataValue::Bot, Validity::MIN); - TripleAttrEntityRangeIter::new(self.tx.iterator(), lower, upper, v_upper) + TripleAttrEntityRangeIter::new(self.tx.iterator(Pri), lower, upper, v_upper) } pub(crate) fn triple_ae_before_scan( &self, @@ -341,7 +333,7 @@ impl SessionTx { ) -> impl Iterator> { let lower = encode_aev_key(aid, eid, &DataValue::Null, Validity::MAX); let upper = encode_aev_key(aid, eid, &DataValue::Bot, Validity::MIN); - TripleAttrEntityBeforeIter::new(self.tx.iterator(), lower, upper, before) + TripleAttrEntityBeforeIter::new(self.tx.iterator(Pri), lower, upper, before) } pub(crate) fn triple_ae_range_before_scan( &self, @@ -353,7 +345,7 @@ impl SessionTx { ) -> impl Iterator> { let lower = encode_aev_key(aid, eid, &v_lower, Validity::MAX); let upper = encode_aev_key(aid, eid, &DataValue::Bot, Validity::MIN); - TripleAttrEntityRangeBeforeIter::new(self.tx.iterator(), lower, upper, v_upper, before) + TripleAttrEntityRangeBeforeIter::new(self.tx.iterator(Pri), lower, upper, v_upper, before) } pub(crate) fn aev_exists( &self, @@ -378,7 +370,7 @@ impl SessionTx { ) -> impl Iterator> { let lower = encode_ave_key(aid, lower, EntityId::ZERO, Validity::MAX); let upper = encode_ave_key(aid, &DataValue::Bot, EntityId::MAX_PERM, Validity::MIN); - TripleAttrValueRangeIter::new(self.tx.iterator(), lower, upper, upper_inc.clone()) + TripleAttrValueRangeIter::new(self.tx.iterator(Pri), lower, upper, upper_inc.clone()) } pub(crate) fn triple_av_scan( &self, @@ -387,7 +379,7 @@ impl SessionTx { ) -> impl Iterator> { let lower = encode_ave_key(aid, v, EntityId::ZERO, Validity::MAX); let upper = encode_ave_key(aid, v, EntityId::MAX_PERM, Validity::MIN); - TripleAttrValueIter::new(self.tx.iterator(), lower, upper) + TripleAttrValueIter::new(self.tx.iterator(Pri), lower, upper) } pub(crate) fn triple_av_range_before_scan( &self, @@ -399,7 +391,7 @@ impl SessionTx { let lower = encode_ave_key(aid, lower, EntityId::ZERO, Validity::MAX); let upper = encode_ave_key(aid, &DataValue::Bot, EntityId::MAX_PERM, Validity::MIN); TripleAttrValueRangeBeforeIter::new( - self.tx.iterator(), + self.tx.iterator(Pri), lower, upper, upper_inc.clone(), @@ -414,7 +406,7 @@ impl SessionTx { ) -> impl Iterator> { let lower = encode_ave_key(aid, v, EntityId::ZERO, Validity::MAX); let upper = encode_ave_key(aid, v, EntityId::MAX_PERM, Validity::MIN); - TripleAttrValueBeforeIter::new(self.tx.iterator(), lower, upper, before) + TripleAttrValueBeforeIter::new(self.tx.iterator(Pri), lower, upper, before) } pub(crate) fn triple_av_after_scan( &self, @@ -424,7 +416,7 @@ impl SessionTx { ) -> impl Iterator> { let lower = encode_ave_key(aid, v, EntityId::ZERO, Validity::MAX); let upper = encode_ave_key(aid, v, EntityId::MAX_PERM, Validity::MIN); - TripleAttrValueAfterIter::new(self.tx.iterator(), lower, upper, after) + TripleAttrValueAfterIter::new(self.tx.iterator(Pri), lower, upper, after) } pub(crate) fn triple_vref_a_scan( &self, @@ -433,7 +425,7 @@ impl SessionTx { ) -> impl Iterator> { let lower = encode_vae_key(v_eid, aid, EntityId::ZERO, Validity::MAX); let upper = encode_vae_key(v_eid, aid, EntityId::MAX_PERM, Validity::MIN); - TripleValueRefAttrIter::new(self.tx.iterator(), lower, upper) + TripleValueRefAttrIter::new(self.tx.iterator(Pri), lower, upper) } pub(crate) fn triple_vref_a_before_scan( &self, @@ -443,7 +435,7 @@ impl SessionTx { ) -> impl Iterator> { let lower = encode_vae_key(v_eid, aid, EntityId::ZERO, Validity::MAX); let upper = encode_vae_key(v_eid, aid, EntityId::MAX_PERM, Validity::MIN); - TripleValueRefAttrBeforeIter::new(self.tx.iterator(), lower, upper, before) + TripleValueRefAttrBeforeIter::new(self.tx.iterator(Pri), lower, upper, before) } pub(crate) fn triple_a_scan( &self, @@ -451,7 +443,7 @@ impl SessionTx { ) -> impl Iterator> { let lower = encode_aev_key(aid, EntityId::ZERO, &DataValue::Null, Validity::MAX); let upper = encode_aev_key(aid, EntityId::MAX_PERM, &DataValue::Bot, Validity::MIN); - TripleAttrEntityIter::new(self.tx.iterator(), lower, upper) + TripleAttrEntityIter::new(self.tx.iterator(Pri), lower, upper) } pub(crate) fn triple_a_before_scan( &self, @@ -460,7 +452,7 @@ impl SessionTx { ) -> impl Iterator> { let lower = encode_aev_key(aid, EntityId::ZERO, &DataValue::Null, Validity::MAX); let upper = encode_aev_key(aid, EntityId::MAX_PERM, &DataValue::Bot, Validity::MIN); - TripleAttrEntityBeforeIter::new(self.tx.iterator(), lower, upper, before) + TripleAttrEntityBeforeIter::new(self.tx.iterator(Pri), lower, upper, before) } }