support for column families

main
Ziyang Hu 2 years ago
parent 07d25e4c7e
commit 80df573cee

@ -7,15 +7,15 @@
#include "db.h"
#include "cozorocks/src/bridge/mod.rs.h"
unique_ptr<Options> default_db_options() {
auto options = make_unique<Options>();
options->bottommost_compression = kZSTD;
options->compression = kLZ4Compression;
options->level_compaction_dynamic_level_bytes = true;
options->max_background_compactions = 4;
options->max_background_flushes = 2;
options->bytes_per_sync = 1048576;
options->compaction_pri = kMinOverlappingRatio;
Options default_db_options() {
Options options = Options();
options.bottommost_compression = kZSTD;
options.compression = kLZ4Compression;
options.level_compaction_dynamic_level_bytes = true;
options.max_background_compactions = 4;
options.max_background_flushes = 2;
options.bytes_per_sync = 1048576;
options.compaction_pri = kMinOverlappingRatio;
BlockBasedTableOptions table_options;
table_options.block_size = 16 * 1024;
table_options.cache_index_and_filter_blocks = true;
@ -23,42 +23,81 @@ unique_ptr<Options> default_db_options() {
table_options.format_version = 5;
auto table_factory = NewBlockBasedTableFactory(table_options);
options->table_factory.reset(table_factory);
options.table_factory.reset(table_factory);
return options;
}
ColumnFamilyOptions default_cf_options() {
ColumnFamilyOptions options = ColumnFamilyOptions();
options.bottommost_compression = kZSTD;
options.compression = kLZ4Compression;
options.level_compaction_dynamic_level_bytes = true;
options.compaction_pri = kMinOverlappingRatio;
BlockBasedTableOptions table_options;
table_options.block_size = 16 * 1024;
table_options.cache_index_and_filter_blocks = true;
table_options.pin_l0_filter_and_index_blocks_in_cache = true;
table_options.format_version = 5;
auto table_factory = NewBlockBasedTableFactory(table_options);
options.table_factory.reset(table_factory);
return options;
}
shared_ptr<RocksDbBridge> open_db(const DbOpts &opts, RocksDbStatus &status, bool use_cmp, RustComparatorFn cmp_impl) {
auto options = default_db_options();
auto cf_pri_opts = default_cf_options();
auto cf_snd_opts = default_cf_options();
if (opts.prepare_for_bulk_load) {
options->PrepareForBulkLoad();
options.PrepareForBulkLoad();
}
if (opts.increase_parallelism > 0) {
options->IncreaseParallelism(opts.increase_parallelism);
options.IncreaseParallelism(opts.increase_parallelism);
}
if (opts.optimize_level_style_compaction) {
options->OptimizeLevelStyleCompaction();
options.OptimizeLevelStyleCompaction();
cf_pri_opts.OptimizeLevelStyleCompaction();
cf_snd_opts.OptimizeLevelStyleCompaction();
}
options->create_if_missing = opts.create_if_missing;
options->paranoid_checks = opts.paranoid_checks;
options.create_if_missing = opts.create_if_missing;
options.paranoid_checks = opts.paranoid_checks;
if (opts.enable_blob_files) {
options->enable_blob_files = true;
options->min_blob_size = opts.min_blob_size;
options->blob_file_size = opts.blob_file_size;
options->enable_blob_garbage_collection = opts.enable_blob_garbage_collection;
options.enable_blob_files = true;
cf_pri_opts.enable_blob_files = true;
cf_snd_opts.enable_blob_files = true;
options.min_blob_size = opts.min_blob_size;
cf_pri_opts.min_blob_size = opts.min_blob_size;
cf_snd_opts.min_blob_size = opts.min_blob_size;
options.blob_file_size = opts.blob_file_size;
cf_pri_opts.blob_file_size = opts.blob_file_size;
cf_snd_opts.blob_file_size = opts.blob_file_size;
options.enable_blob_garbage_collection = opts.enable_blob_garbage_collection;
cf_pri_opts.enable_blob_garbage_collection = opts.enable_blob_garbage_collection;
cf_snd_opts.enable_blob_garbage_collection = opts.enable_blob_garbage_collection;
}
if (opts.use_bloom_filter) {
BlockBasedTableOptions table_options;
table_options.filter_policy.reset(NewBloomFilterPolicy(opts.bloom_filter_bits_per_key, false));
table_options.whole_key_filtering = opts.bloom_filter_whole_key_filtering;
options->table_factory.reset(NewBlockBasedTableFactory(table_options));
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
cf_pri_opts.table_factory.reset(NewBlockBasedTableFactory(table_options));
cf_snd_opts.table_factory.reset(NewBlockBasedTableFactory(table_options));
}
if (opts.use_capped_prefix_extractor) {
options->prefix_extractor.reset(NewCappedPrefixTransform(opts.capped_prefix_extractor_len));
options.prefix_extractor.reset(NewCappedPrefixTransform(opts.capped_prefix_extractor_len));
cf_pri_opts.prefix_extractor.reset(NewCappedPrefixTransform(opts.capped_prefix_extractor_len));
cf_snd_opts.prefix_extractor.reset(NewCappedPrefixTransform(opts.capped_prefix_extractor_len));
}
if (opts.use_fixed_prefix_extractor) {
options->prefix_extractor.reset(NewFixedPrefixTransform(opts.fixed_prefix_extractor_len));
options.prefix_extractor.reset(NewFixedPrefixTransform(opts.fixed_prefix_extractor_len));
cf_pri_opts.prefix_extractor.reset(NewFixedPrefixTransform(opts.fixed_prefix_extractor_len));
cf_snd_opts.prefix_extractor.reset(NewFixedPrefixTransform(opts.fixed_prefix_extractor_len));
}
RustComparator *cmp = nullptr;
if (use_cmp) {
@ -66,25 +105,33 @@ shared_ptr<RocksDbBridge> open_db(const DbOpts &opts, RocksDbStatus &status, boo
string(opts.comparator_name),
opts.comparator_different_bytes_can_be_equal,
cmp_impl);
options->comparator = cmp;
options.comparator = cmp;
cf_pri_opts.comparator = cmp;
cf_snd_opts.comparator = cmp;
}
options.create_missing_column_families = true;
shared_ptr<RocksDbBridge> db_wrapper = shared_ptr<RocksDbBridge>(nullptr);
shared_ptr<RocksDbBridge> db = make_shared<RocksDbBridge>();
auto db = new RocksDbBridge();
db->options = std::move(options);
db->db_path = string(opts.db_path);
db->tdb_opts = make_unique<TransactionDBOptions>();
db->comparator.reset(cmp);
std::vector<ColumnFamilyDescriptor> column_families;
column_families.emplace_back(ColumnFamilyDescriptor(
rocksdb::kDefaultColumnFamilyName, cf_pri_opts));
column_families.emplace_back(ColumnFamilyDescriptor(
"relation", cf_snd_opts));
TransactionDB *txn_db = nullptr;
write_status(TransactionDB::Open(*db->options, *db->tdb_opts, db->db_path, &txn_db), status);
write_status(
TransactionDB::Open(options, TransactionDBOptions(), db->db_path, column_families, &db->cf_handles,
&txn_db),
status);
db->db.reset(txn_db);
db->destroy_on_exit = opts.destroy_on_exit;
db_wrapper.reset(db);
return db_wrapper;
return db;
}
RocksDbBridge::~RocksDbBridge() {
@ -95,7 +142,8 @@ RocksDbBridge::~RocksDbBridge() {
cerr << status.ToString() << endl;
}
db.reset();
auto status2 = DestroyDB(db_path, *options);
Options options{};
auto status2 = DestroyDB(db_path, options);
if (!status2.ok()) {
cerr << status2.ToString() << endl;
}

@ -42,16 +42,16 @@ struct SstFileWriterBridge {
struct RocksDbBridge {
unique_ptr<Comparator> comparator;
unique_ptr<Options> options;
unique_ptr<TransactionDBOptions> tdb_opts;
unique_ptr<TransactionDB> db;
vector<ColumnFamilyHandle *> cf_handles;
bool destroy_on_exit;
string db_path;
inline unique_ptr<SstFileWriterBridge> get_sst_writer(rust::Str path, RocksDbStatus &status) const {
inline unique_ptr<SstFileWriterBridge> get_sst_writer(rust::Str path, size_t idx, RocksDbStatus &status) const {
DB *db_ = get_base_db();
Options options_ = db_->GetOptions();
auto cf = cf_handles[idx];
Options options_ = db_->GetOptions(cf);
auto sst_file_writer = std::make_unique<SstFileWriterBridge>(EnvOptions(), options_);
string path_(path);
@ -59,11 +59,12 @@ struct RocksDbBridge {
return sst_file_writer;
}
inline void ingest_sst(rust::Str path, RocksDbStatus &status) const {
inline void ingest_sst(rust::Str path, size_t idx, RocksDbStatus &status) const {
IngestExternalFileOptions ifo;
DB *db_ = get_base_db();
string path_(path);
write_status(db_->IngestExternalFile({std::move(path_)}, ifo), status);
auto cf = cf_handles[idx];
write_status(db_->IngestExternalFile(cf, {std::move(path_)}, ifo), status);
}
[[nodiscard]] inline const string &get_db_path() const {
@ -72,13 +73,14 @@ struct RocksDbBridge {
[[nodiscard]] inline unique_ptr<TxBridge> transact() const {
auto ret = make_unique<TxBridge>(&*this->db);
auto ret = make_unique<TxBridge>(&*this->db, cf_handles);
return ret;
}
inline void del_range(RustBytes start, RustBytes end, RocksDbStatus &status) const {
inline void del_range(RustBytes start, RustBytes end, size_t idx, RocksDbStatus &status) const {
WriteBatch batch;
auto s = batch.DeleteRange(db->DefaultColumnFamily(), convert_slice(start), convert_slice(end));
auto cf = cf_handles[idx];
auto s = batch.DeleteRange(cf, convert_slice(start), convert_slice(end));
if (!s.ok()) {
write_status(s, status);
return;
@ -91,11 +93,12 @@ struct RocksDbBridge {
write_status(s2, status);
}
void compact_range(RustBytes start, RustBytes end, RocksDbStatus &status) const {
void compact_range(RustBytes start, RustBytes end, size_t idx, RocksDbStatus &status) const {
CompactRangeOptions options;
auto cf = cf_handles[idx];
auto start_s = convert_slice(start);
auto end_s = convert_slice(end);
auto s = db->CompactRange(options, &start_s, &end_s);
auto s = db->CompactRange(options, cf, &start_s, &end_s);
write_status(s, status);
}

@ -17,16 +17,13 @@ struct IterBridge {
string upper_storage;
Slice lower_bound;
Slice upper_bound;
ColumnFamilyHandle *cf;
unique_ptr<ReadOptions> r_opts;
explicit IterBridge(Transaction *tx_) : db(nullptr), tx(tx_), iter(nullptr), lower_bound(), upper_bound(),
r_opts(new ReadOptions) {
r_opts->ignore_range_deletions = true;
r_opts->auto_prefix_mode = true;
}
explicit IterBridge(DB *db_) : db(db_), tx(nullptr), iter(nullptr), lower_bound(), upper_bound(),
r_opts(new ReadOptions) {
explicit IterBridge(Transaction *tx_, ColumnFamilyHandle *cf_) : db(nullptr), tx(tx_), iter(nullptr), lower_bound(),
upper_bound(),
cf(cf_),
r_opts(new ReadOptions) {
r_opts->ignore_range_deletions = true;
r_opts->auto_prefix_mode = true;
}
@ -88,9 +85,9 @@ struct IterBridge {
inline void start() {
if (db == nullptr) {
iter.reset(tx->GetIterator(*r_opts));
iter.reset(tx->GetIterator(*r_opts, cf));
} else {
iter.reset(db->NewIterator(*r_opts));
iter.reset(db->NewIterator(*r_opts, cf));
}
}

@ -18,26 +18,17 @@ struct TxBridge {
unique_ptr<ReadOptions> r_opts;
unique_ptr<OptimisticTransactionOptions> o_tx_opts;
unique_ptr<TransactionOptions> p_tx_opts;
vector<ColumnFamilyHandle *> cf_handles;
explicit TxBridge(OptimisticTransactionDB *odb_) :
odb(odb_),
tdb(nullptr),
tx(),
w_opts(new WriteOptions),
r_opts(new ReadOptions),
o_tx_opts(new OptimisticTransactionOptions),
p_tx_opts(nullptr) {
r_opts->ignore_range_deletions = true;
}
explicit TxBridge(TransactionDB *tdb_) :
explicit TxBridge(TransactionDB *tdb_, vector<ColumnFamilyHandle *> cf_handles_) :
odb(nullptr),
tdb(tdb_),
tx(),
w_opts(new WriteOptions),
r_opts(new ReadOptions),
o_tx_opts(nullptr),
p_tx_opts(new TransactionOptions) {
p_tx_opts(new TransactionOptions),
cf_handles(cf_handles_) {
r_opts->ignore_range_deletions = true;
}
@ -57,8 +48,9 @@ struct TxBridge {
r_opts->fill_cache = val;
}
inline unique_ptr<IterBridge> iterator() const {
return make_unique<IterBridge>(&*tx);
inline unique_ptr<IterBridge> iterator(size_t idx) const {
auto cf = cf_handles[idx];
return make_unique<IterBridge>(&*tx, cf);
};
inline void set_snapshot(bool val) {
@ -87,37 +79,41 @@ struct TxBridge {
void start();
inline unique_ptr<PinnableSlice> get(RustBytes key, bool for_update, RocksDbStatus &status) const {
inline unique_ptr<PinnableSlice> get(RustBytes key, bool for_update, size_t idx, RocksDbStatus &status) const {
Slice key_ = convert_slice(key);
auto ret = make_unique<PinnableSlice>();
auto cf = cf_handles[idx];
if (for_update) {
auto s = tx->GetForUpdate(*r_opts, get_db()->DefaultColumnFamily(), key_, &*ret);
auto s = tx->GetForUpdate(*r_opts, cf, key_, &*ret);
write_status(s, status);
} else {
auto s = tx->Get(*r_opts, key_, &*ret);
auto s = tx->Get(*r_opts, cf, key_, &*ret);
write_status(s, status);
}
return ret;
}
inline void exists(RustBytes key, bool for_update, RocksDbStatus &status) const {
inline void exists(RustBytes key, bool for_update, size_t idx, RocksDbStatus &status) const {
Slice key_ = convert_slice(key);
auto cf = cf_handles[idx];
auto ret = PinnableSlice();
if (for_update) {
auto s = tx->GetForUpdate(*r_opts, get_db()->DefaultColumnFamily(), key_, &ret);
auto s = tx->GetForUpdate(*r_opts, cf, key_, &ret);
write_status(s, status);
} else {
auto s = tx->Get(*r_opts, key_, &ret);
auto s = tx->Get(*r_opts, cf, key_, &ret);
write_status(s, status);
}
}
inline void put(RustBytes key, RustBytes val, RocksDbStatus &status) {
write_status(tx->Put(convert_slice(key), convert_slice(val)), status);
inline void put(RustBytes key, RustBytes val, size_t idx, RocksDbStatus &status) {
auto cf = cf_handles[idx];
write_status(tx->Put(cf, convert_slice(key), convert_slice(val)), status);
}
inline void del(RustBytes key, RocksDbStatus &status) {
write_status(tx->Delete(convert_slice(key)), status);
inline void del(RustBytes key, size_t idx, RocksDbStatus &status) {
auto cf = cf_handles[idx];
write_status(tx->Delete(cf, convert_slice(key)), status);
}
inline void commit(RocksDbStatus &status) {

@ -4,6 +4,7 @@ use cxx::*;
use crate::bridge::ffi::*;
use crate::bridge::tx::TxBuilder;
use crate::CfHandle;
#[derive(Default, Clone)]
pub struct DbBuilder<'a> {
@ -153,9 +154,9 @@ impl RocksDb {
}
}
#[inline]
pub fn range_del(&self, lower: &[u8], upper: &[u8]) -> Result<(), RocksDbStatus> {
pub fn range_del(&self, lower: &[u8], upper: &[u8], handle: CfHandle) -> Result<(), RocksDbStatus> {
let mut status = RocksDbStatus::default();
self.inner.del_range(lower, upper, &mut status);
self.inner.del_range(lower, upper, handle.into(), &mut status);
if status.is_ok() {
Ok(())
} else {
@ -163,27 +164,27 @@ impl RocksDb {
}
}
#[inline]
pub fn range_compact(&self, lower: &[u8], upper: &[u8]) -> Result<(), RocksDbStatus> {
pub fn range_compact(&self, lower: &[u8], upper: &[u8], handle: CfHandle) -> Result<(), RocksDbStatus> {
let mut status = RocksDbStatus::default();
self.inner.compact_range(lower, upper, &mut status);
self.inner.compact_range(lower, upper, handle.into(), &mut status);
if status.is_ok() {
Ok(())
} else {
Err(status)
}
}
pub fn get_sst_writer(&self, path: &str) -> Result<SstWriter, RocksDbStatus> {
pub fn get_sst_writer(&self, path: &str, handle: CfHandle) -> Result<SstWriter, RocksDbStatus> {
let mut status = RocksDbStatus::default();
let ret = self.inner.get_sst_writer(path, &mut status);
let ret = self.inner.get_sst_writer(path, handle.into(), &mut status);
if status.is_ok() {
Ok(SstWriter { inner: ret })
} else {
Err(status)
}
}
pub fn ingest_sst_file(&self, path: &str) -> Result<(), RocksDbStatus> {
pub fn ingest_sst_file(&self, path: &str, handle: CfHandle) -> Result<(), RocksDbStatus> {
let mut status = RocksDbStatus::default();
self.inner.ingest_sst(path, &mut status);
self.inner.ingest_sst(path, handle.into(), &mut status);
if status.is_ok() {
Ok(())
} else {

@ -122,19 +122,27 @@ pub(crate) mod ffi {
cmp_impl: fn(&[u8], &[u8]) -> i8,
) -> SharedPtr<RocksDbBridge>;
fn transact(self: &RocksDbBridge) -> UniquePtr<TxBridge>;
fn del_range(self: &RocksDbBridge, lower: &[u8], upper: &[u8], status: &mut RocksDbStatus);
fn del_range(
self: &RocksDbBridge,
lower: &[u8],
upper: &[u8],
idx: usize,
status: &mut RocksDbStatus,
);
fn compact_range(
self: &RocksDbBridge,
lower: &[u8],
upper: &[u8],
idx: usize,
status: &mut RocksDbStatus,
);
fn get_sst_writer(
self: &RocksDbBridge,
path: &str,
idx: usize,
status: &mut RocksDbStatus,
) -> UniquePtr<SstFileWriterBridge>;
fn ingest_sst(self: &RocksDbBridge, path: &str, status: &mut RocksDbStatus);
fn ingest_sst(self: &RocksDbBridge, path: &str, idx: usize, status: &mut RocksDbStatus);
type SstFileWriterBridge;
fn put(
@ -157,17 +165,30 @@ pub(crate) mod ffi {
self: &TxBridge,
key: &[u8],
for_update: bool,
idx: usize,
status: &mut RocksDbStatus,
) -> UniquePtr<PinnableSlice>;
fn exists(self: &TxBridge, key: &[u8], for_update: bool, status: &mut RocksDbStatus);
fn put(self: Pin<&mut TxBridge>, key: &[u8], val: &[u8], status: &mut RocksDbStatus);
fn del(self: Pin<&mut TxBridge>, key: &[u8], status: &mut RocksDbStatus);
fn exists(
self: &TxBridge,
key: &[u8],
for_update: bool,
idx: usize,
status: &mut RocksDbStatus,
);
fn put(
self: Pin<&mut TxBridge>,
key: &[u8],
val: &[u8],
idx: usize,
status: &mut RocksDbStatus,
);
fn del(self: Pin<&mut TxBridge>, key: &[u8], idx: usize, status: &mut RocksDbStatus);
fn commit(self: Pin<&mut TxBridge>, status: &mut RocksDbStatus);
fn rollback(self: Pin<&mut TxBridge>, status: &mut RocksDbStatus);
fn rollback_to_savepoint(self: Pin<&mut TxBridge>, status: &mut RocksDbStatus);
fn pop_savepoint(self: Pin<&mut TxBridge>, status: &mut RocksDbStatus);
fn set_savepoint(self: Pin<&mut TxBridge>);
fn iterator(self: &TxBridge) -> UniquePtr<IterBridge>;
fn iterator(self: &TxBridge, idx: usize) -> UniquePtr<IterBridge>;
type IterBridge;
fn start(self: Pin<&mut IterBridge>);

@ -5,6 +5,7 @@ use cxx::*;
use crate::bridge::ffi::*;
use crate::bridge::iter::IterBuilder;
use crate::CfHandle;
pub struct TxBuilder {
pub(crate) inner: UniquePtr<TxBridge>,
@ -84,9 +85,9 @@ impl Tx {
self.inner.pin_mut().clear_snapshot()
}
#[inline]
pub fn put(&mut self, key: &[u8], val: &[u8]) -> Result<(), RocksDbStatus> {
pub fn put(&mut self, key: &[u8], val: &[u8], handle: CfHandle) -> Result<(), RocksDbStatus> {
let mut status = RocksDbStatus::default();
self.inner.pin_mut().put(key, val, &mut status);
self.inner.pin_mut().put(key, val, handle.into(), &mut status);
if status.is_ok() {
Ok(())
} else {
@ -94,9 +95,9 @@ impl Tx {
}
}
#[inline]
pub fn del(&mut self, key: &[u8]) -> Result<(), RocksDbStatus> {
pub fn del(&mut self, key: &[u8], handle: CfHandle) -> Result<(), RocksDbStatus> {
let mut status = RocksDbStatus::default();
self.inner.pin_mut().del(key, &mut status);
self.inner.pin_mut().del(key, handle.into(), &mut status);
if status.is_ok() {
Ok(())
} else {
@ -104,9 +105,9 @@ impl Tx {
}
}
#[inline]
pub fn get(&self, key: &[u8], for_update: bool) -> Result<Option<PinSlice>, RocksDbStatus> {
pub fn get(&self, key: &[u8], for_update: bool, handle: CfHandle) -> Result<Option<PinSlice>, RocksDbStatus> {
let mut status = RocksDbStatus::default();
let ret = self.inner.get(key, for_update, &mut status);
let ret = self.inner.get(key, for_update, handle.into(), &mut status);
match status.code {
StatusCode::kOk => Ok(Some(PinSlice { inner: ret })),
StatusCode::kNotFound => Ok(None),
@ -114,9 +115,9 @@ impl Tx {
}
}
#[inline]
pub fn exists(&self, key: &[u8], for_update: bool) -> Result<bool, RocksDbStatus> {
pub fn exists(&self, key: &[u8], for_update: bool, handle: CfHandle) -> Result<bool, RocksDbStatus> {
let mut status = RocksDbStatus::default();
self.inner.exists(key, for_update, &mut status);
self.inner.exists(key, for_update, handle.into(), &mut status);
match status.code {
StatusCode::kOk => Ok(true),
StatusCode::kNotFound => Ok(false),
@ -168,9 +169,9 @@ impl Tx {
}
}
#[inline]
pub fn iterator(&self) -> IterBuilder {
pub fn iterator(&self, handle: CfHandle) -> IterBuilder {
IterBuilder {
inner: self.inner.iterator(),
inner: self.inner.iterator(handle.into()),
}
.auto_prefix_mode(true)
}

@ -13,5 +13,20 @@ pub use bridge::tx::TxBuilder;
pub(crate) mod bridge;
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub enum CfHandle {
Pri,
Snd,
}
impl From<CfHandle> for usize {
fn from(s: CfHandle) -> Self {
match s {
CfHandle::Pri => 0,
CfHandle::Snd => 1,
}
}
}
#[cfg(test)]
mod tests;

@ -4,6 +4,7 @@ use itertools::Itertools;
use miette::{ensure, miette, IntoDiagnostic, Result};
use serde_json::{json, Map};
use tempfile::NamedTempFile;
use cozorocks::CfHandle::Snd;
use crate::data::attr::Attribute;
use crate::data::id::{EntityId, Validity};
@ -72,7 +73,7 @@ impl SessionTx {
for data in res_iter {
let data = data?;
let encoded = data.encode_as_key(view_store.metadata.id);
vtx.del(&encoded)?;
vtx.del(&encoded, Snd)?;
}
vtx.commit()?;
@ -80,14 +81,18 @@ impl SessionTx {
let file = NamedTempFile::new().into_diagnostic()?;
let path = file.into_temp_path();
let path = path.to_string_lossy();
let mut writer = self.view_db.get_sst_writer(&path)?;
let mut writer = self.view_db.get_sst_writer(&path, Snd)?;
let mut written = false;
for data in res_iter {
let data = data?;
let encoded = data.encode_as_key(view_store.metadata.id);
writer.put(&encoded, &[])?;
written = true;
}
if written {
writer.finish()?;
self.view_db.ingest_sst_file(&path, Snd)?;
}
writer.finish()?;
self.view_db.ingest_sst_file(&path)?;
}
Ok(())
}

@ -14,6 +14,7 @@ use serde_json::json;
use smartstring::SmartString;
use cozorocks::{DbBuilder, DbIter, RocksDb};
use cozorocks::CfHandle::{Pri, Snd};
use crate::data::compare::{rusty_cmp, DB_KEY_PREFIX_LEN};
use crate::data::encode::{largest_key, smallest_key};
@ -116,14 +117,14 @@ impl Db {
pub fn compact_main(&self) -> Result<()> {
let l = smallest_key();
let u = largest_key();
self.db.range_compact(&l, &u)?;
self.db.range_compact(&l, &u, Pri)?;
Ok(())
}
pub fn compact_view(&self) -> Result<()> {
let l = Tuple::default().encode_as_key(ViewRelId(0));
let u = Tuple(vec![DataValue::Bot]).encode_as_key(ViewRelId(u64::MAX));
self.db.range_compact(&l, &u)?;
self.db.range_compact(&l, &u, Snd)?;
Ok(())
}
@ -192,7 +193,7 @@ impl Db {
Ok(ret)
}
pub fn total_iter(&self) -> DbIter {
let mut it = self.db.transact().start().iterator().start();
let mut it = self.db.transact().start().iterator(Pri).start();
it.seek_to_start();
it
}
@ -461,7 +462,7 @@ impl Db {
}
let key = Tuple(ks).encode_as_key(ViewRelId::SYSTEM);
let mut vtx = self.view_db.transact().start();
vtx.put(&key, v)?;
vtx.put(&key, v, Snd)?;
vtx.commit()?;
Ok(())
}
@ -472,7 +473,7 @@ impl Db {
}
let key = Tuple(ks).encode_as_key(ViewRelId::SYSTEM);
let mut vtx = self.view_db.transact().start();
vtx.del(&key)?;
vtx.del(&key, Snd)?;
vtx.commit()?;
Ok(())
}
@ -483,7 +484,7 @@ impl Db {
}
let key = Tuple(ks).encode_as_key(ViewRelId::SYSTEM);
let vtx = self.view_db.transact().start();
Ok(match vtx.get(&key, false)? {
Ok(match vtx.get(&key, false, Snd)? {
None => None,
Some(slice) => Some(slice.to_vec()),
})
@ -501,7 +502,7 @@ impl Db {
.view_db
.transact()
.start()
.iterator()
.iterator(Snd)
.upper_bound(&upper_bound.encode_as_key(ViewRelId::SYSTEM))
.start();
it.seek(&lower_bound.encode_as_key(ViewRelId::SYSTEM));
@ -559,7 +560,7 @@ impl Db {
.view_db
.transact()
.start()
.iterator()
.iterator(Snd)
.upper_bound(&upper)
.start();
it.seek(&lower);

@ -9,6 +9,7 @@ use serde::Serialize;
use smallvec::SmallVec;
use cozorocks::{DbIter, RocksDb, Tx};
use cozorocks::CfHandle::{Pri, Snd};
use crate::data::attr::Attribute;
use crate::data::encode::{
@ -116,7 +117,7 @@ impl SessionTx {
let tuple = Tuple(vec![DataValue::Null]);
let t_encoded = tuple.encode_as_key(ViewRelId::SYSTEM);
let vtx = self.view_db.transact().start();
let found = vtx.get(&t_encoded, false)?;
let found = vtx.get(&t_encoded, false, Snd)?;
match found {
None => Ok(ViewRelId::SYSTEM),
Some(slice) => ViewRelId::raw_decode(&slice),
@ -138,7 +139,7 @@ impl SessionTx {
let encoded = encode_tx(tx_id);
let log = TxLog::new(tx_id, comment);
self.tx.put(&encoded, &log.encode())?;
self.tx.put(&encoded, &log.encode(), Pri)?;
self.tx.commit()?;
if refresh {
let new_tx_id = TxId(self.last_tx_id.fetch_add(1, Ordering::AcqRel) + 1);
@ -155,7 +156,7 @@ impl SessionTx {
}
pub(crate) fn bounded_scan_first(&self, lower: &[u8], upper: &[u8]) -> DbIter {
// this is tricky, must be written like this!
let mut it = self.tx.iterator().upper_bound(upper).start();
let mut it = self.tx.iterator(Pri).upper_bound(upper).start();
it.seek(lower);
it
}
@ -164,7 +165,7 @@ impl SessionTx {
// this is tricky, must be written like this!
let mut it = self
.tx
.iterator()
.iterator(Pri)
.lower_bound(lower)
.upper_bound(upper)
.start();

@ -5,6 +5,7 @@ use miette::{bail, miette, IntoDiagnostic, Result};
use rmp_serde::Serializer;
use serde::Serialize;
use cozorocks::CfHandle::Snd;
use cozorocks::{DbIter, RocksDb, Tx};
use crate::data::symb::Symbol;
@ -117,7 +118,12 @@ struct ViewRelIterator {
impl ViewRelIterator {
fn new(db: &RocksDb, lower: &[u8], upper: &[u8]) -> Self {
let mut inner = db.transact().start().iterator().upper_bound(upper).start();
let mut inner = db
.transact()
.start()
.iterator(Snd)
.upper_bound(upper)
.start();
inner.seek(lower);
Self {
inner,
@ -149,14 +155,14 @@ impl SessionTx {
let key = DataValue::Str(name.0.clone());
let encoded = Tuple(vec![key]).encode_as_key(ViewRelId::SYSTEM);
let vtx = self.view_db.transact().start();
Ok(vtx.exists(&encoded, false)?)
Ok(vtx.exists(&encoded, false, Snd)?)
}
pub(crate) fn create_view_rel(&self, mut meta: ViewRelMetadata) -> Result<ViewRelStore> {
let key = DataValue::Str(meta.name.0.clone());
let encoded = Tuple(vec![key]).encode_as_key(ViewRelId::SYSTEM);
let mut vtx = self.view_db.transact().set_snapshot(true).start();
if vtx.exists(&encoded, true)? {
if vtx.exists(&encoded, true, Snd)? {
bail!(
"cannot create view {}: one with the same name already exists",
meta.name
@ -164,17 +170,17 @@ impl SessionTx {
};
let last_id = self.view_store_id.fetch_add(1, Ordering::SeqCst);
meta.id = ViewRelId::new(last_id + 1)?;
vtx.put(&encoded, &meta.id.raw_encode())?;
vtx.put(&encoded, &meta.id.raw_encode(), Snd)?;
let name_key =
Tuple(vec![DataValue::Str(meta.name.0.clone())]).encode_as_key(ViewRelId::SYSTEM);
let mut meta_val = vec![];
meta.serialize(&mut Serializer::new(&mut meta_val)).unwrap();
vtx.put(&name_key, &meta_val)?;
vtx.put(&name_key, &meta_val, Snd)?;
let tuple = Tuple(vec![DataValue::Null]);
let t_encoded = tuple.encode_as_key(ViewRelId::SYSTEM);
vtx.put(&t_encoded, &meta.id.raw_encode())?;
vtx.put(&t_encoded, &meta.id.raw_encode(), Snd)?;
vtx.commit()?;
Ok(ViewRelStore {
view_db: self.view_db.clone(),
@ -190,7 +196,7 @@ impl SessionTx {
let encoded = Tuple(vec![key]).encode_as_key(ViewRelId::SYSTEM);
let found = vtx
.get(&encoded, true)?
.get(&encoded, true, Snd)?
.ok_or_else(|| miette!("cannot find stored view {}", name))?;
let metadata: ViewRelMetadata = rmp_serde::from_slice(&found).into_diagnostic()?;
Ok(ViewRelStore {
@ -203,10 +209,10 @@ impl SessionTx {
let store = self.do_get_view_rel(name, &vtx)?;
let key = DataValue::Str(name.0.clone());
let encoded = Tuple(vec![key]).encode_as_key(ViewRelId::SYSTEM);
vtx.del(&encoded)?;
vtx.del(&encoded, Snd)?;
let lower_bound = Tuple::default().encode_as_key(store.metadata.id);
let upper_bound = Tuple::default().encode_as_key(store.metadata.id.next()?);
self.view_db.range_del(&lower_bound, &upper_bound)?;
self.view_db.range_del(&lower_bound, &upper_bound, Snd)?;
vtx.commit()?;
Ok(())
}

@ -3,6 +3,7 @@ use std::sync::atomic::Ordering;
use miette::{bail, ensure, miette, Result};
use cozorocks::{DbIter, IterBuilder};
use cozorocks::CfHandle::Pri;
use crate::data::attr::Attribute;
use crate::data::encode::{
@ -44,7 +45,7 @@ impl SessionTx {
}
let anchor = encode_sentinel_attr_by_id(aid);
Ok(match self.tx.get(&anchor, false)? {
Ok(match self.tx.get(&anchor, false, Pri)? {
None => {
self.attr_by_id_cache.borrow_mut().insert(aid, None);
None
@ -76,7 +77,7 @@ impl SessionTx {
}
let anchor = encode_sentinel_attr_by_name(name);
Ok(match self.tx.get(&anchor, false)? {
Ok(match self.tx.get(&anchor, false, Pri)? {
None => {
self.attr_by_kw_cache
.borrow_mut()
@ -108,7 +109,7 @@ impl SessionTx {
}
pub(crate) fn all_attrs(&mut self) -> impl Iterator<Item = Result<Attribute>> {
AttrIter::new(self.tx.iterator())
AttrIter::new(self.tx.iterator(Pri))
}
/// conflict if new attribute has same name as existing one
@ -153,7 +154,7 @@ impl SessionTx {
);
let kw_sentinel = encode_sentinel_attr_by_name(&existing.name);
let attr_data = existing.encode_with_op_and_tx(StoreOp::Retract, tx_id);
self.tx.put(&kw_sentinel, &attr_data)?;
self.tx.put(&kw_sentinel, &attr_data, Pri)?;
}
self.put_attr(&attr, StoreOp::Assert)
}
@ -162,11 +163,11 @@ impl SessionTx {
let tx_id = self.get_write_tx_id()?;
let attr_data = attr.encode_with_op_and_tx(op, tx_id);
let id_encoded = encode_attr_by_id(attr.id, tx_id);
self.tx.put(&id_encoded, &attr_data)?;
self.tx.put(&id_encoded, &attr_data, Pri)?;
let id_sentinel = encode_sentinel_attr_by_id(attr.id);
self.tx.put(&id_sentinel, &attr_data)?;
self.tx.put(&id_sentinel, &attr_data, Pri)?;
let kw_sentinel = encode_sentinel_attr_by_name(&attr.name);
self.tx.put(&kw_sentinel, &attr_data)?;
self.tx.put(&kw_sentinel, &attr_data, Pri)?;
Ok(attr.id)
}

@ -4,6 +4,7 @@ use std::sync::atomic::Ordering;
use miette::{bail, ensure, miette, Result};
use smartstring::{LazyCompact, SmartString};
use cozorocks::CfHandle::Pri;
use cozorocks::{DbIter, IterBuilder};
use crate::data::attr::Attribute;
@ -145,23 +146,22 @@ impl SessionTx {
let aev_encoded = encode_aev_key(attr.id, eid, v_in_key, vld_in_key);
if real_delete {
self.tx.del(&aev_encoded)?;
self.tx.del(&aev_encoded, Pri)?;
} else {
self.tx.put(&aev_encoded, &val_encoded)?;
self.tx.put(&aev_encoded, &val_encoded, Pri)?;
}
// vae for ref types
if attr.val_type.is_ref_type() {
let vae_encoded = encode_vae_key(v.get_entity_id()?, attr.id, eid, vld_in_key);
if real_delete {
self.tx.del(&vae_encoded)?;
self.tx.del(&vae_encoded, Pri)?;
} else {
self.tx
.put(
&vae_encoded,
&DataValue::Guard.encode_with_op_and_tx(op, tx_id),
)
?;
self.tx.put(
&vae_encoded,
&DataValue::Guard.encode_with_op_and_tx(op, tx_id),
Pri,
)?;
}
}
@ -202,7 +202,7 @@ impl SessionTx {
v
);
}
} else if let Some(v_slice) = self.tx.get(&ave_encoded, false)? {
} else if let Some(v_slice) = self.tx.get(&ave_encoded, false, Pri)? {
let found_eid = decode_value_from_val(&v_slice)?.get_entity_id()?;
ensure!(
found_eid == eid,
@ -214,27 +214,23 @@ impl SessionTx {
}
let e_in_val_encoded = eid.as_datavalue().encode_with_op_and_tx(op, tx_id);
if real_delete {
self.tx.del(&ave_encoded)?;
self.tx.del(&ave_encoded, Pri)?;
} else {
self.tx
.put(&ave_encoded, &e_in_val_encoded)
?;
self.tx.put(&ave_encoded, &e_in_val_encoded, Pri)?;
}
self.tx
.put(
&encode_sentinel_attr_val(attr.id, v),
&tx_id.bytes_with_op(op),
)
?;
self.tx.put(
&encode_sentinel_attr_val(attr.id, v),
&tx_id.bytes_with_op(op),
Pri,
)?;
}
self.tx
.put(
&encode_sentinel_entity_attr(eid, attr.id),
&tx_id.bytes_with_op(op),
)
?;
self.tx.put(
&encode_sentinel_entity_attr(eid, attr.id),
&tx_id.bytes_with_op(op),
Pri,
)?;
Ok(eid)
}
@ -286,11 +282,7 @@ impl SessionTx {
let lower = encode_ave_key_for_unique_v(attr.id, v, vld);
let upper = encode_ave_key_for_unique_v(attr.id, v, Validity::MIN);
Ok(
if let Some(v_slice) = self
.bounded_scan_first(&lower, &upper)
.val()
?
{
if let Some(v_slice) = self.bounded_scan_first(&lower, &upper).val()? {
if StoreOp::try_from(v_slice[0])?.is_assert() {
let eid = decode_value(&v_slice[8..])?.get_entity_id()?;
let ret = Some(eid);
@ -320,7 +312,7 @@ impl SessionTx {
) -> impl Iterator<Item = Result<(AttrId, EntityId, DataValue)>> {
let lower = encode_aev_key(aid, eid, &DataValue::Null, Validity::MAX);
let upper = encode_aev_key(aid, eid, &DataValue::Bot, Validity::MIN);
TripleAttrEntityIter::new(self.tx.iterator(), lower, upper)
TripleAttrEntityIter::new(self.tx.iterator(Pri), lower, upper)
}
pub(crate) fn triple_ae_range_scan(
&self,
@ -331,7 +323,7 @@ impl SessionTx {
) -> impl Iterator<Item = Result<(AttrId, EntityId, DataValue)>> {
let lower = encode_aev_key(aid, eid, &v_lower, Validity::MAX);
let upper = encode_aev_key(aid, eid, &DataValue::Bot, Validity::MIN);
TripleAttrEntityRangeIter::new(self.tx.iterator(), lower, upper, v_upper)
TripleAttrEntityRangeIter::new(self.tx.iterator(Pri), lower, upper, v_upper)
}
pub(crate) fn triple_ae_before_scan(
&self,
@ -341,7 +333,7 @@ impl SessionTx {
) -> impl Iterator<Item = Result<(AttrId, EntityId, DataValue)>> {
let lower = encode_aev_key(aid, eid, &DataValue::Null, Validity::MAX);
let upper = encode_aev_key(aid, eid, &DataValue::Bot, Validity::MIN);
TripleAttrEntityBeforeIter::new(self.tx.iterator(), lower, upper, before)
TripleAttrEntityBeforeIter::new(self.tx.iterator(Pri), lower, upper, before)
}
pub(crate) fn triple_ae_range_before_scan(
&self,
@ -353,7 +345,7 @@ impl SessionTx {
) -> impl Iterator<Item = Result<(AttrId, EntityId, DataValue)>> {
let lower = encode_aev_key(aid, eid, &v_lower, Validity::MAX);
let upper = encode_aev_key(aid, eid, &DataValue::Bot, Validity::MIN);
TripleAttrEntityRangeBeforeIter::new(self.tx.iterator(), lower, upper, v_upper, before)
TripleAttrEntityRangeBeforeIter::new(self.tx.iterator(Pri), lower, upper, v_upper, before)
}
pub(crate) fn aev_exists(
&self,
@ -378,7 +370,7 @@ impl SessionTx {
) -> impl Iterator<Item = Result<(AttrId, DataValue, EntityId)>> {
let lower = encode_ave_key(aid, lower, EntityId::ZERO, Validity::MAX);
let upper = encode_ave_key(aid, &DataValue::Bot, EntityId::MAX_PERM, Validity::MIN);
TripleAttrValueRangeIter::new(self.tx.iterator(), lower, upper, upper_inc.clone())
TripleAttrValueRangeIter::new(self.tx.iterator(Pri), lower, upper, upper_inc.clone())
}
pub(crate) fn triple_av_scan(
&self,
@ -387,7 +379,7 @@ impl SessionTx {
) -> impl Iterator<Item = Result<(AttrId, DataValue, EntityId)>> {
let lower = encode_ave_key(aid, v, EntityId::ZERO, Validity::MAX);
let upper = encode_ave_key(aid, v, EntityId::MAX_PERM, Validity::MIN);
TripleAttrValueIter::new(self.tx.iterator(), lower, upper)
TripleAttrValueIter::new(self.tx.iterator(Pri), lower, upper)
}
pub(crate) fn triple_av_range_before_scan(
&self,
@ -399,7 +391,7 @@ impl SessionTx {
let lower = encode_ave_key(aid, lower, EntityId::ZERO, Validity::MAX);
let upper = encode_ave_key(aid, &DataValue::Bot, EntityId::MAX_PERM, Validity::MIN);
TripleAttrValueRangeBeforeIter::new(
self.tx.iterator(),
self.tx.iterator(Pri),
lower,
upper,
upper_inc.clone(),
@ -414,7 +406,7 @@ impl SessionTx {
) -> impl Iterator<Item = Result<(AttrId, DataValue, EntityId)>> {
let lower = encode_ave_key(aid, v, EntityId::ZERO, Validity::MAX);
let upper = encode_ave_key(aid, v, EntityId::MAX_PERM, Validity::MIN);
TripleAttrValueBeforeIter::new(self.tx.iterator(), lower, upper, before)
TripleAttrValueBeforeIter::new(self.tx.iterator(Pri), lower, upper, before)
}
pub(crate) fn triple_av_after_scan(
&self,
@ -424,7 +416,7 @@ impl SessionTx {
) -> impl Iterator<Item = Result<(AttrId, DataValue, EntityId)>> {
let lower = encode_ave_key(aid, v, EntityId::ZERO, Validity::MAX);
let upper = encode_ave_key(aid, v, EntityId::MAX_PERM, Validity::MIN);
TripleAttrValueAfterIter::new(self.tx.iterator(), lower, upper, after)
TripleAttrValueAfterIter::new(self.tx.iterator(Pri), lower, upper, after)
}
pub(crate) fn triple_vref_a_scan(
&self,
@ -433,7 +425,7 @@ impl SessionTx {
) -> impl Iterator<Item = Result<(EntityId, AttrId, EntityId)>> {
let lower = encode_vae_key(v_eid, aid, EntityId::ZERO, Validity::MAX);
let upper = encode_vae_key(v_eid, aid, EntityId::MAX_PERM, Validity::MIN);
TripleValueRefAttrIter::new(self.tx.iterator(), lower, upper)
TripleValueRefAttrIter::new(self.tx.iterator(Pri), lower, upper)
}
pub(crate) fn triple_vref_a_before_scan(
&self,
@ -443,7 +435,7 @@ impl SessionTx {
) -> impl Iterator<Item = Result<(EntityId, AttrId, EntityId)>> {
let lower = encode_vae_key(v_eid, aid, EntityId::ZERO, Validity::MAX);
let upper = encode_vae_key(v_eid, aid, EntityId::MAX_PERM, Validity::MIN);
TripleValueRefAttrBeforeIter::new(self.tx.iterator(), lower, upper, before)
TripleValueRefAttrBeforeIter::new(self.tx.iterator(Pri), lower, upper, before)
}
pub(crate) fn triple_a_scan(
&self,
@ -451,7 +443,7 @@ impl SessionTx {
) -> impl Iterator<Item = Result<(AttrId, EntityId, DataValue)>> {
let lower = encode_aev_key(aid, EntityId::ZERO, &DataValue::Null, Validity::MAX);
let upper = encode_aev_key(aid, EntityId::MAX_PERM, &DataValue::Bot, Validity::MIN);
TripleAttrEntityIter::new(self.tx.iterator(), lower, upper)
TripleAttrEntityIter::new(self.tx.iterator(Pri), lower, upper)
}
pub(crate) fn triple_a_before_scan(
&self,
@ -460,7 +452,7 @@ impl SessionTx {
) -> impl Iterator<Item = Result<(AttrId, EntityId, DataValue)>> {
let lower = encode_aev_key(aid, EntityId::ZERO, &DataValue::Null, Validity::MAX);
let upper = encode_aev_key(aid, EntityId::MAX_PERM, &DataValue::Bot, Validity::MIN);
TripleAttrEntityBeforeIter::new(self.tx.iterator(), lower, upper, before)
TripleAttrEntityBeforeIter::new(self.tx.iterator(Pri), lower, upper, before)
}
}

Loading…
Cancel
Save