revert to single column family implementation

main
Ziyang Hu 2 years ago
parent 718a2001dd
commit 656311ed78

@ -47,11 +47,8 @@ ColumnFamilyOptions default_cf_options() {
}
shared_ptr<RocksDbBridge> open_db(const DbOpts &opts, RocksDbStatus &status, bool use_cmp,
RustComparatorFn pri_cmp_impl,
RustComparatorFn snd_cmp_impl) {
RustComparatorFn cmp_impl) {
auto options = default_db_options();
auto cf_pri_opts = default_cf_options();
auto cf_snd_opts = default_cf_options();
if (opts.prepare_for_bulk_load) {
options.PrepareForBulkLoad();
@ -61,62 +58,37 @@ shared_ptr<RocksDbBridge> open_db(const DbOpts &opts, RocksDbStatus &status, boo
}
if (opts.optimize_level_style_compaction) {
options.OptimizeLevelStyleCompaction();
cf_pri_opts.OptimizeLevelStyleCompaction();
cf_snd_opts.OptimizeLevelStyleCompaction();
}
options.create_if_missing = opts.create_if_missing;
options.paranoid_checks = opts.paranoid_checks;
if (opts.enable_blob_files) {
options.enable_blob_files = true;
cf_pri_opts.enable_blob_files = true;
cf_snd_opts.enable_blob_files = true;
options.min_blob_size = opts.min_blob_size;
cf_pri_opts.min_blob_size = opts.min_blob_size;
cf_snd_opts.min_blob_size = opts.min_blob_size;
options.blob_file_size = opts.blob_file_size;
cf_pri_opts.blob_file_size = opts.blob_file_size;
cf_snd_opts.blob_file_size = opts.blob_file_size;
options.enable_blob_garbage_collection = opts.enable_blob_garbage_collection;
cf_pri_opts.enable_blob_garbage_collection = opts.enable_blob_garbage_collection;
cf_snd_opts.enable_blob_garbage_collection = opts.enable_blob_garbage_collection;
}
if (opts.use_bloom_filter) {
BlockBasedTableOptions table_options;
table_options.filter_policy.reset(NewBloomFilterPolicy(opts.bloom_filter_bits_per_key, false));
table_options.whole_key_filtering = opts.bloom_filter_whole_key_filtering;
cf_snd_opts.table_factory.reset(NewBlockBasedTableFactory(table_options));
cf_pri_opts.table_factory.reset(NewBlockBasedTableFactory(table_options));
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
}
if (opts.pri_use_capped_prefix_extractor) {
cf_pri_opts.prefix_extractor.reset(NewCappedPrefixTransform(opts.pri_capped_prefix_extractor_len));
if (opts.use_capped_prefix_extractor) {
options.prefix_extractor.reset(NewCappedPrefixTransform(opts.capped_prefix_extractor_len));
}
if (opts.snd_use_capped_prefix_extractor) {
cf_snd_opts.prefix_extractor.reset(NewCappedPrefixTransform(opts.snd_capped_prefix_extractor_len));
}
if (opts.pri_use_fixed_prefix_extractor) {
cf_pri_opts.prefix_extractor.reset(NewFixedPrefixTransform(opts.pri_fixed_prefix_extractor_len));
}
if (opts.snd_use_fixed_prefix_extractor) {
cf_pri_opts.prefix_extractor.reset(NewFixedPrefixTransform(opts.snd_fixed_prefix_extractor_len));
if (opts.use_fixed_prefix_extractor) {
options.prefix_extractor.reset(NewFixedPrefixTransform(opts.fixed_prefix_extractor_len));
}
RustComparator *pri_cmp = nullptr;
RustComparator *snd_cmp = nullptr;
if (use_cmp) {
pri_cmp = new RustComparator(
string(opts.pri_comparator_name),
opts.pri_comparator_different_bytes_can_be_equal,
pri_cmp_impl);
cf_pri_opts.comparator = pri_cmp;
snd_cmp = new RustComparator(
string(opts.snd_comparator_name),
opts.snd_comparator_different_bytes_can_be_equal,
snd_cmp_impl);
cf_snd_opts.comparator = snd_cmp;
string(opts.comparator_name),
opts.comparator_different_bytes_can_be_equal,
cmp_impl);
options.comparator = pri_cmp;
}
options.create_missing_column_families = true;
@ -124,18 +96,10 @@ shared_ptr<RocksDbBridge> open_db(const DbOpts &opts, RocksDbStatus &status, boo
db->db_path = string(opts.db_path);
db->pri_comparator.reset(pri_cmp);
db->snd_comparator.reset(snd_cmp);
std::vector<ColumnFamilyDescriptor> column_families;
column_families.emplace_back(ColumnFamilyDescriptor(
rocksdb::kDefaultColumnFamilyName, cf_pri_opts));
column_families.emplace_back(ColumnFamilyDescriptor(
"relation", cf_snd_opts));
TransactionDB *txn_db = nullptr;
write_status(
TransactionDB::Open(options, TransactionDBOptions(), db->db_path, column_families, &db->cf_handles,
&txn_db),
TransactionDB::Open(options, TransactionDBOptions(), db->db_path,&txn_db),
status);
db->db.reset(txn_db);
db->destroy_on_exit = opts.destroy_on_exit;

@ -42,16 +42,14 @@ struct SstFileWriterBridge {
struct RocksDbBridge {
unique_ptr<Comparator> pri_comparator;
unique_ptr<Comparator> snd_comparator;
unique_ptr<TransactionDB> db;
vector<ColumnFamilyHandle *> cf_handles;
bool destroy_on_exit;
string db_path;
inline unique_ptr<SstFileWriterBridge> get_sst_writer(rust::Str path, size_t idx, RocksDbStatus &status) const {
inline unique_ptr<SstFileWriterBridge> get_sst_writer(rust::Str path, RocksDbStatus &status) const {
DB *db_ = get_base_db();
auto cf = cf_handles[idx];
auto cf = db->DefaultColumnFamily();
Options options_ = db_->GetOptions(cf);
auto sst_file_writer = std::make_unique<SstFileWriterBridge>(EnvOptions(), options_);
string path_(path);
@ -60,11 +58,11 @@ struct RocksDbBridge {
return sst_file_writer;
}
inline void ingest_sst(rust::Str path, size_t idx, RocksDbStatus &status) const {
inline void ingest_sst(rust::Str path, RocksDbStatus &status) const {
IngestExternalFileOptions ifo;
DB *db_ = get_base_db();
string path_(path);
auto cf = cf_handles[idx];
auto cf = db->DefaultColumnFamily();
write_status(db_->IngestExternalFile(cf, {std::move(path_)}, ifo), status);
}
@ -74,13 +72,13 @@ struct RocksDbBridge {
[[nodiscard]] inline unique_ptr<TxBridge> transact() const {
auto ret = make_unique<TxBridge>(&*this->db, cf_handles);
auto ret = make_unique<TxBridge>(&*this->db, db->DefaultColumnFamily());
return ret;
}
inline void del_range(RustBytes start, RustBytes end, size_t idx, RocksDbStatus &status) const {
inline void del_range(RustBytes start, RustBytes end, RocksDbStatus &status) const {
WriteBatch batch;
auto cf = cf_handles[idx];
auto cf = db->DefaultColumnFamily();
auto s = batch.DeleteRange(cf, convert_slice(start), convert_slice(end));
if (!s.ok()) {
write_status(s, status);
@ -94,9 +92,9 @@ struct RocksDbBridge {
write_status(s2, status);
}
void compact_range(RustBytes start, RustBytes end, size_t idx, RocksDbStatus &status) const {
void compact_range(RustBytes start, RustBytes end, RocksDbStatus &status) const {
CompactRangeOptions options;
auto cf = cf_handles[idx];
auto cf = db->DefaultColumnFamily();
auto start_s = convert_slice(start);
auto end_s = convert_slice(end);
auto s = db->CompactRange(options, cf, &start_s, &end_s);
@ -143,7 +141,6 @@ public:
};
shared_ptr<RocksDbBridge>
open_db(const DbOpts &opts, RocksDbStatus &status, bool use_cmp, RustComparatorFn pri_cmp_impl,
RustComparatorFn snd_cmp_impl);
open_db(const DbOpts &opts, RocksDbStatus &status, bool use_cmp, RustComparatorFn cmp_impl);
#endif //COZOROCKS_DB_H

@ -17,12 +17,10 @@ struct IterBridge {
string upper_storage;
Slice lower_bound;
Slice upper_bound;
ColumnFamilyHandle *cf;
unique_ptr<ReadOptions> r_opts;
explicit IterBridge(Transaction *tx_, ColumnFamilyHandle *cf_) : db(nullptr), tx(tx_), iter(nullptr), lower_bound(),
explicit IterBridge(Transaction *tx_) : db(nullptr), tx(tx_), iter(nullptr), lower_bound(),
upper_bound(),
cf(cf_),
r_opts(new ReadOptions) {
r_opts->ignore_range_deletions = true;
r_opts->auto_prefix_mode = true;
@ -85,9 +83,9 @@ struct IterBridge {
inline void start() {
if (db == nullptr) {
iter.reset(tx->GetIterator(*r_opts, cf));
iter.reset(tx->GetIterator(*r_opts));
} else {
iter.reset(db->NewIterator(*r_opts, cf));
iter.reset(db->NewIterator(*r_opts));
}
}

@ -18,9 +18,9 @@ struct TxBridge {
unique_ptr<ReadOptions> r_opts;
unique_ptr<OptimisticTransactionOptions> o_tx_opts;
unique_ptr<TransactionOptions> p_tx_opts;
vector<ColumnFamilyHandle *> cf_handles;
ColumnFamilyHandle * cf_handle;
explicit TxBridge(TransactionDB *tdb_, vector<ColumnFamilyHandle *> cf_handles_) :
explicit TxBridge(TransactionDB *tdb_, ColumnFamilyHandle * cf_handle_) :
odb(nullptr),
tdb(tdb_),
tx(),
@ -28,7 +28,7 @@ struct TxBridge {
r_opts(new ReadOptions),
o_tx_opts(nullptr),
p_tx_opts(new TransactionOptions),
cf_handles(cf_handles_) {
cf_handle(cf_handle_) {
r_opts->ignore_range_deletions = true;
}
@ -48,9 +48,8 @@ struct TxBridge {
r_opts->fill_cache = val;
}
inline unique_ptr<IterBridge> iterator(size_t idx) const {
auto cf = cf_handles[idx];
return make_unique<IterBridge>(&*tx, cf);
inline unique_ptr<IterBridge> iterator() const {
return make_unique<IterBridge>(&*tx);
};
inline void set_snapshot(bool val) {
@ -79,41 +78,37 @@ struct TxBridge {
void start();
inline unique_ptr<PinnableSlice> get(RustBytes key, bool for_update, size_t idx, RocksDbStatus &status) const {
inline unique_ptr<PinnableSlice> get(RustBytes key, bool for_update, RocksDbStatus &status) const {
Slice key_ = convert_slice(key);
auto ret = make_unique<PinnableSlice>();
auto cf = cf_handles[idx];
if (for_update) {
auto s = tx->GetForUpdate(*r_opts, cf, key_, &*ret);
auto s = tx->GetForUpdate(*r_opts, cf_handle, key_, &*ret);
write_status(s, status);
} else {
auto s = tx->Get(*r_opts, cf, key_, &*ret);
auto s = tx->Get(*r_opts, key_, &*ret);
write_status(s, status);
}
return ret;
}
inline void exists(RustBytes key, bool for_update, size_t idx, RocksDbStatus &status) const {
inline void exists(RustBytes key, bool for_update, RocksDbStatus &status) const {
Slice key_ = convert_slice(key);
auto cf = cf_handles[idx];
auto ret = PinnableSlice();
if (for_update) {
auto s = tx->GetForUpdate(*r_opts, cf, key_, &ret);
auto s = tx->GetForUpdate(*r_opts, cf_handle, key_, &ret);
write_status(s, status);
} else {
auto s = tx->Get(*r_opts, cf, key_, &ret);
auto s = tx->Get(*r_opts, key_, &ret);
write_status(s, status);
}
}
inline void put(RustBytes key, RustBytes val, size_t idx, RocksDbStatus &status) {
auto cf = cf_handles[idx];
write_status(tx->Put(cf, convert_slice(key), convert_slice(val)), status);
inline void put(RustBytes key, RustBytes val, RocksDbStatus &status) {
write_status(tx->Put(convert_slice(key), convert_slice(val)), status);
}
inline void del(RustBytes key, size_t idx, RocksDbStatus &status) {
auto cf = cf_handles[idx];
write_status(tx->Delete(cf, convert_slice(key)), status);
inline void del(RustBytes key, RocksDbStatus &status) {
write_status(tx->Delete(convert_slice(key)), status);
}
inline void commit(RocksDbStatus &status) {

@ -4,12 +4,10 @@ use cxx::*;
use crate::bridge::ffi::*;
use crate::bridge::tx::TxBuilder;
use crate::CfHandle;
#[derive(Default, Clone)]
pub struct DbBuilder<'a> {
pub pri_cmp_fn: Option<fn(&[u8], &[u8]) -> i8>,
pub snd_cmp_fn: Option<fn(&[u8], &[u8]) -> i8>,
pub cmp_fn: Option<fn(&[u8], &[u8]) -> i8>,
pub opts: DbOpts<'a>,
}
@ -29,18 +27,12 @@ impl<'a> Default for DbOpts<'a> {
use_bloom_filter: false,
bloom_filter_bits_per_key: 0.0,
bloom_filter_whole_key_filtering: false,
pri_use_capped_prefix_extractor: false,
pri_capped_prefix_extractor_len: 0,
pri_use_fixed_prefix_extractor: false,
pri_fixed_prefix_extractor_len: 0,
pri_comparator_name: "",
pri_comparator_different_bytes_can_be_equal: false,
snd_use_capped_prefix_extractor: false,
snd_capped_prefix_extractor_len: 0,
snd_use_fixed_prefix_extractor: false,
snd_fixed_prefix_extractor_len: 0,
snd_comparator_name: "",
snd_comparator_different_bytes_can_be_equal: false,
use_capped_prefix_extractor: false,
capped_prefix_extractor_len: 0,
use_fixed_prefix_extractor: false,
fixed_prefix_extractor_len: 0,
comparator_name: "",
comparator_different_bytes_can_be_equal: false,
destroy_on_exit: false,
}
}
@ -95,52 +87,27 @@ impl<'a> DbBuilder<'a> {
self.opts.bloom_filter_whole_key_filtering = whole_key_filtering;
self
}
pub fn pri_use_capped_prefix_extractor(mut self, enable: bool, len: usize) -> Self {
self.opts.pri_use_capped_prefix_extractor = enable;
self.opts.pri_capped_prefix_extractor_len = len;
pub fn use_capped_prefix_extractor(mut self, enable: bool, len: usize) -> Self {
self.opts.use_capped_prefix_extractor = enable;
self.opts.capped_prefix_extractor_len = len;
self
}
pub fn snd_use_capped_prefix_extractor(mut self, enable: bool, len: usize) -> Self {
self.opts.snd_use_capped_prefix_extractor = enable;
self.opts.snd_capped_prefix_extractor_len = len;
pub fn use_fixed_prefix_extractor(mut self, enable: bool, len: usize) -> Self {
self.opts.use_fixed_prefix_extractor = enable;
self.opts.fixed_prefix_extractor_len = len;
self
}
pub fn pri_use_fixed_prefix_extractor(mut self, enable: bool, len: usize) -> Self {
self.opts.pri_use_fixed_prefix_extractor = enable;
self.opts.pri_fixed_prefix_extractor_len = len;
self
}
pub fn snd_use_fixed_prefix_extractor(mut self, enable: bool, len: usize) -> Self {
self.opts.snd_use_fixed_prefix_extractor = enable;
self.opts.snd_fixed_prefix_extractor_len = len;
self
}
pub fn pri_use_custom_comparator(
mut self,
name: &'a str,
cmp: fn(&[u8], &[u8]) -> i8,
different_bytes_can_be_equal: bool,
) -> Self {
self.pri_cmp_fn = Some(cmp);
self.opts.pri_comparator_name = name;
self.opts.pri_comparator_different_bytes_can_be_equal = different_bytes_can_be_equal;
self
}
pub fn snd_use_custom_comparator(
pub fn use_custom_comparator(
mut self,
name: &'a str,
cmp: fn(&[u8], &[u8]) -> i8,
different_bytes_can_be_equal: bool,
) -> Self {
self.snd_cmp_fn = Some(cmp);
self.opts.snd_comparator_name = name;
self.opts.snd_comparator_different_bytes_can_be_equal = different_bytes_can_be_equal;
self.cmp_fn = Some(cmp);
self.opts.comparator_name = name;
self.opts.comparator_different_bytes_can_be_equal = different_bytes_can_be_equal;
self
}
// pub fn destroy_on_exit(mut self, destroy: bool) -> Self {
// self.opts.destroy_on_exit = destroy;
// self
// }
pub fn build(self) -> Result<RocksDb, RocksDbStatus> {
let mut status = RocksDbStatus::default();
@ -151,9 +118,8 @@ impl<'a> DbBuilder<'a> {
let result = open_db(
&self.opts,
&mut status,
self.pri_cmp_fn.is_some() || self.snd_cmp_fn.is_some(),
self.pri_cmp_fn.unwrap_or(dummy),
self.snd_cmp_fn.unwrap_or(dummy)
self.cmp_fn.is_some(),
self.cmp_fn.unwrap_or(dummy),
);
if status.is_ok() {
Ok(RocksDb { inner: result })
@ -178,9 +144,9 @@ impl RocksDb {
}
}
#[inline]
pub fn range_del(&self, lower: &[u8], upper: &[u8], handle: CfHandle) -> Result<(), RocksDbStatus> {
pub fn range_del(&self, lower: &[u8], upper: &[u8]) -> Result<(), RocksDbStatus> {
let mut status = RocksDbStatus::default();
self.inner.del_range(lower, upper, handle.into(), &mut status);
self.inner.del_range(lower, upper, &mut status);
if status.is_ok() {
Ok(())
} else {
@ -188,27 +154,27 @@ impl RocksDb {
}
}
#[inline]
pub fn range_compact(&self, lower: &[u8], upper: &[u8], handle: CfHandle) -> Result<(), RocksDbStatus> {
pub fn range_compact(&self, lower: &[u8], upper: &[u8]) -> Result<(), RocksDbStatus> {
let mut status = RocksDbStatus::default();
self.inner.compact_range(lower, upper, handle.into(), &mut status);
self.inner.compact_range(lower, upper, &mut status);
if status.is_ok() {
Ok(())
} else {
Err(status)
}
}
pub fn get_sst_writer(&self, path: &str, handle: CfHandle) -> Result<SstWriter, RocksDbStatus> {
pub fn get_sst_writer(&self, path: &str) -> Result<SstWriter, RocksDbStatus> {
let mut status = RocksDbStatus::default();
let ret = self.inner.get_sst_writer(path, handle.into(), &mut status);
let ret = self.inner.get_sst_writer(path, &mut status);
if status.is_ok() {
Ok(SstWriter { inner: ret })
} else {
Err(status)
}
}
pub fn ingest_sst_file(&self, path: &str, handle: CfHandle) -> Result<(), RocksDbStatus> {
pub fn ingest_sst_file(&self, path: &str) -> Result<(), RocksDbStatus> {
let mut status = RocksDbStatus::default();
self.inner.ingest_sst(path, handle.into(), &mut status);
self.inner.ingest_sst(path, &mut status);
if status.is_ok() {
Ok(())
} else {

@ -26,18 +26,12 @@ pub(crate) mod ffi {
pub use_bloom_filter: bool,
pub bloom_filter_bits_per_key: f64,
pub bloom_filter_whole_key_filtering: bool,
pub pri_use_capped_prefix_extractor: bool,
pub pri_capped_prefix_extractor_len: usize,
pub pri_use_fixed_prefix_extractor: bool,
pub pri_fixed_prefix_extractor_len: usize,
pub snd_use_capped_prefix_extractor: bool,
pub snd_capped_prefix_extractor_len: usize,
pub snd_use_fixed_prefix_extractor: bool,
pub snd_fixed_prefix_extractor_len: usize,
pub pri_comparator_name: &'a str,
pub pri_comparator_different_bytes_can_be_equal: bool,
pub snd_comparator_name: &'a str,
pub snd_comparator_different_bytes_can_be_equal: bool,
pub use_capped_prefix_extractor: bool,
pub capped_prefix_extractor_len: usize,
pub use_fixed_prefix_extractor: bool,
pub fixed_prefix_extractor_len: usize,
pub comparator_name: &'a str,
pub comparator_different_bytes_can_be_equal: bool,
pub destroy_on_exit: bool,
}
@ -124,31 +118,27 @@ pub(crate) mod ffi {
builder: &DbOpts,
status: &mut RocksDbStatus,
use_cmp: bool,
pri_cmp_impl: fn(&[u8], &[u8]) -> i8,
snd_cmp_impl: fn(&[u8], &[u8]) -> i8,
cmp_impl: fn(&[u8], &[u8]) -> i8,
) -> SharedPtr<RocksDbBridge>;
fn transact(self: &RocksDbBridge) -> UniquePtr<TxBridge>;
fn del_range(
self: &RocksDbBridge,
lower: &[u8],
upper: &[u8],
idx: usize,
status: &mut RocksDbStatus,
);
fn compact_range(
self: &RocksDbBridge,
lower: &[u8],
upper: &[u8],
idx: usize,
status: &mut RocksDbStatus,
);
fn get_sst_writer(
self: &RocksDbBridge,
path: &str,
idx: usize,
status: &mut RocksDbStatus,
) -> UniquePtr<SstFileWriterBridge>;
fn ingest_sst(self: &RocksDbBridge, path: &str, idx: usize, status: &mut RocksDbStatus);
fn ingest_sst(self: &RocksDbBridge, path: &str, status: &mut RocksDbStatus);
type SstFileWriterBridge;
fn put(
@ -171,30 +161,27 @@ pub(crate) mod ffi {
self: &TxBridge,
key: &[u8],
for_update: bool,
idx: usize,
status: &mut RocksDbStatus,
) -> UniquePtr<PinnableSlice>;
fn exists(
self: &TxBridge,
key: &[u8],
for_update: bool,
idx: usize,
status: &mut RocksDbStatus,
);
fn put(
self: Pin<&mut TxBridge>,
key: &[u8],
val: &[u8],
idx: usize,
status: &mut RocksDbStatus,
);
fn del(self: Pin<&mut TxBridge>, key: &[u8], idx: usize, status: &mut RocksDbStatus);
fn del(self: Pin<&mut TxBridge>, key: &[u8], status: &mut RocksDbStatus);
fn commit(self: Pin<&mut TxBridge>, status: &mut RocksDbStatus);
fn rollback(self: Pin<&mut TxBridge>, status: &mut RocksDbStatus);
fn rollback_to_savepoint(self: Pin<&mut TxBridge>, status: &mut RocksDbStatus);
fn pop_savepoint(self: Pin<&mut TxBridge>, status: &mut RocksDbStatus);
fn set_savepoint(self: Pin<&mut TxBridge>);
fn iterator(self: &TxBridge, idx: usize) -> UniquePtr<IterBridge>;
fn iterator(self: &TxBridge) -> UniquePtr<IterBridge>;
type IterBridge;
fn start(self: Pin<&mut IterBridge>);

@ -5,7 +5,6 @@ use cxx::*;
use crate::bridge::ffi::*;
use crate::bridge::iter::IterBuilder;
use crate::CfHandle;
pub struct TxBuilder {
pub(crate) inner: UniquePtr<TxBridge>,
@ -85,9 +84,9 @@ impl Tx {
self.inner.pin_mut().clear_snapshot()
}
#[inline]
pub fn put(&mut self, key: &[u8], val: &[u8], handle: CfHandle) -> Result<(), RocksDbStatus> {
pub fn put(&mut self, key: &[u8], val: &[u8]) -> Result<(), RocksDbStatus> {
let mut status = RocksDbStatus::default();
self.inner.pin_mut().put(key, val, handle.into(), &mut status);
self.inner.pin_mut().put(key, val, &mut status);
if status.is_ok() {
Ok(())
} else {
@ -95,9 +94,9 @@ impl Tx {
}
}
#[inline]
pub fn del(&mut self, key: &[u8], handle: CfHandle) -> Result<(), RocksDbStatus> {
pub fn del(&mut self, key: &[u8]) -> Result<(), RocksDbStatus> {
let mut status = RocksDbStatus::default();
self.inner.pin_mut().del(key, handle.into(), &mut status);
self.inner.pin_mut().del(key, &mut status);
if status.is_ok() {
Ok(())
} else {
@ -105,9 +104,9 @@ impl Tx {
}
}
#[inline]
pub fn get(&self, key: &[u8], for_update: bool, handle: CfHandle) -> Result<Option<PinSlice>, RocksDbStatus> {
pub fn get(&self, key: &[u8], for_update: bool) -> Result<Option<PinSlice>, RocksDbStatus> {
let mut status = RocksDbStatus::default();
let ret = self.inner.get(key, for_update, handle.into(), &mut status);
let ret = self.inner.get(key, for_update, &mut status);
match status.code {
StatusCode::kOk => Ok(Some(PinSlice { inner: ret })),
StatusCode::kNotFound => Ok(None),
@ -115,9 +114,9 @@ impl Tx {
}
}
#[inline]
pub fn exists(&self, key: &[u8], for_update: bool, handle: CfHandle) -> Result<bool, RocksDbStatus> {
pub fn exists(&self, key: &[u8], for_update: bool) -> Result<bool, RocksDbStatus> {
let mut status = RocksDbStatus::default();
self.inner.exists(key, for_update, handle.into(), &mut status);
self.inner.exists(key, for_update, &mut status);
match status.code {
StatusCode::kOk => Ok(true),
StatusCode::kNotFound => Ok(false),
@ -169,9 +168,9 @@ impl Tx {
}
}
#[inline]
pub fn iterator(&self, handle: CfHandle) -> IterBuilder {
pub fn iterator(&self) -> IterBuilder {
IterBuilder {
inner: self.inner.iterator(handle.into()),
inner: self.inner.iterator(),
}
.auto_prefix_mode(true)
}

@ -12,18 +12,3 @@ pub use bridge::tx::Tx;
pub use bridge::tx::TxBuilder;
pub(crate) mod bridge;
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub enum CfHandle {
Pri,
Snd,
}
impl From<CfHandle> for usize {
fn from(s: CfHandle) -> Self {
match s {
CfHandle::Pri => 0,
CfHandle::Snd => 1,
}
}
}

@ -2,8 +2,6 @@ use itertools::Itertools;
use miette::{Diagnostic, Result};
use thiserror::Error;
use cozorocks::CfHandle::Snd;
use crate::data::expr::Expr;
use crate::data::program::RelationOp;
use crate::data::relation::{ColumnDef, NullableColType};
@ -58,7 +56,7 @@ impl SessionTx {
.map(|ex| ex.extract_data(&tuple))
.try_collect()?;
let key = relation_store.adhoc_encode_key(&Tuple(extracted), *span)?;
self.tx.del(&key, Snd)?;
self.tx.del(&key)?;
}
} else {
let mut key_extractors = make_extractors(
@ -88,7 +86,7 @@ impl SessionTx {
let key = relation_store.adhoc_encode_key(&extracted, *span)?;
let val = relation_store.adhoc_encode_val(&extracted, *span)?;
self.tx.put(&key, &val, Snd)?;
self.tx.put(&key, &val)?;
}
}

@ -15,7 +15,6 @@ use serde_json::json;
use smartstring::SmartString;
use thiserror::Error;
use cozorocks::CfHandle::{Pri, Snd};
use cozorocks::{DbBuilder, DbIter, RocksDb};
use crate::data::json::JsonValue;
@ -120,12 +119,9 @@ impl Db {
store_path.push("data");
let db_builder = builder
.create_if_missing(is_new)
// TODO
.pri_use_capped_prefix_extractor(true, SCRATCH_DB_KEY_PREFIX_LEN)
.pri_use_custom_comparator("cozo_rusty_cmp", rusty_scratch_cmp, false)
.use_capped_prefix_extractor(true, SCRATCH_DB_KEY_PREFIX_LEN)
.use_custom_comparator("cozo_rusty_cmp", rusty_scratch_cmp, false)
.use_bloom_filter(true, 9.9, true)
.snd_use_capped_prefix_extractor(true, SCRATCH_DB_KEY_PREFIX_LEN)
.snd_use_custom_comparator("cozo_rusty_scratch_cmp", rusty_scratch_cmp, false)
.path(store_path.to_str().unwrap());
let db = db_builder.build()?;
@ -145,7 +141,7 @@ impl Db {
pub fn compact_relation(&self) -> Result<()> {
let l = Tuple::default().encode_as_key(RelationId(0));
let u = Tuple(vec![DataValue::Bot]).encode_as_key(RelationId(u64::MAX));
self.db.range_compact(&l, &u, Snd)?;
self.db.range_compact(&l, &u)?;
Ok(())
}
@ -184,11 +180,6 @@ impl Db {
};
Ok(ret)
}
pub fn total_iter(&self) -> DbIter {
let mut it = self.db.transact().start().iterator(Pri).start();
it.seek_to_start();
it
}
pub fn run_script(
&self,
payload: &str,
@ -232,7 +223,7 @@ impl Db {
assert!(cleanups.is_empty(), "non-empty cleanups on read-only tx");
}
for (lower, upper) in cleanups {
self.db.range_del(&lower, &upper, Snd)?;
self.db.range_del(&lower, &upper)?;
}
Ok(res)
}
@ -458,7 +449,7 @@ impl Db {
let mut tx = self.transact_write()?;
let (lower, upper) = tx.destroy_relation(name)?;
tx.commit_tx()?;
self.db.range_del(&lower, &upper, Snd)?;
self.db.range_del(&lower, &upper)?;
Ok(())
}
pub(crate) fn list_running(&self) -> Result<JsonValue> {
@ -478,7 +469,7 @@ impl Db {
}
let key = Tuple(ks).encode_as_key(RelationId::SYSTEM);
let mut vtx = self.db.transact().start();
vtx.put(&key, v, Snd)?;
vtx.put(&key, v)?;
vtx.commit()?;
Ok(())
}
@ -489,7 +480,7 @@ impl Db {
}
let key = Tuple(ks).encode_as_key(RelationId::SYSTEM);
let mut vtx = self.db.transact().start();
vtx.del(&key, Snd)?;
vtx.del(&key)?;
vtx.commit()?;
Ok(())
}
@ -500,7 +491,7 @@ impl Db {
}
let key = Tuple(ks).encode_as_key(RelationId::SYSTEM);
let vtx = self.db.transact().start();
Ok(vtx.get(&key, false, Snd)?.map(|slice| slice.to_vec()))
Ok(vtx.get(&key, false)?.map(|slice| slice.to_vec()))
}
pub fn meta_range_scan(
&self,
@ -516,7 +507,7 @@ impl Db {
.db
.transact()
.start()
.iterator(Snd)
.iterator()
.upper_bound(&upper_bound)
.start();
it.seek(&lower_bound.encode_as_key(RelationId::SYSTEM));
@ -616,7 +607,7 @@ impl Db {
.db
.transact()
.start()
.iterator(Snd)
.iterator()
.upper_bound(&upper)
.start();
it.seek(&lower);

@ -10,7 +10,6 @@ use serde::Serialize;
use smartstring::{LazyCompact, SmartString};
use thiserror::Error;
use cozorocks::CfHandle::Snd;
use cozorocks::DbIter;
use crate::data::relation::StoredRelationMetadata;
@ -227,7 +226,7 @@ struct RelationIterator {
impl RelationIterator {
fn new(sess: &SessionTx, lower: &[u8], upper: &[u8]) -> Self {
let mut inner = sess.tx.iterator(Snd).upper_bound(upper).start();
let mut inner = sess.tx.iterator().upper_bound(upper).start();
inner.seek(lower);
Self {
inner,
@ -277,7 +276,7 @@ impl SessionTx {
pub(crate) fn relation_exists(&self, name: &str) -> Result<bool> {
let key = DataValue::Str(SmartString::from(name));
let encoded = Tuple(vec![key]).encode_as_key(RelationId::SYSTEM);
Ok(self.tx.exists(&encoded, false, Snd)?)
Ok(self.tx.exists(&encoded, false)?)
}
pub(crate) fn create_relation(
&mut self,
@ -286,7 +285,7 @@ impl SessionTx {
let key = DataValue::Str(input_meta.name.name.clone());
let encoded = Tuple(vec![key]).encode_as_key(RelationId::SYSTEM);
if self.tx.exists(&encoded, true, Snd)? {
if self.tx.exists(&encoded, true)? {
bail!(RelNameConflictError(input_meta.name.to_string()))
};
@ -298,17 +297,17 @@ impl SessionTx {
metadata,
};
self.tx.put(&encoded, &meta.id.raw_encode(), Snd)?;
self.tx.put(&encoded, &meta.id.raw_encode())?;
let name_key =
Tuple(vec![DataValue::Str(meta.name.clone())]).encode_as_key(RelationId::SYSTEM);
let mut meta_val = vec![];
meta.serialize(&mut Serializer::new(&mut meta_val)).unwrap();
self.tx.put(&name_key, &meta_val, Snd)?;
self.tx.put(&name_key, &meta_val)?;
let tuple = Tuple(vec![DataValue::Null]);
let t_encoded = tuple.encode_as_key(RelationId::SYSTEM);
self.tx.put(&t_encoded, &meta.id.raw_encode(), Snd)?;
self.tx.put(&t_encoded, &meta.id.raw_encode())?;
Ok(meta)
}
pub(crate) fn get_relation(&self, name: &str) -> Result<RelationHandle> {
@ -322,7 +321,7 @@ impl SessionTx {
let found = self
.tx
.get(&encoded, true, Snd)?
.get(&encoded, true)?
.ok_or_else(|| StoredRelationNotFoundError(name.to_string()))?;
let metadata = RelationHandle::decode(&found)?;
Ok(metadata)
@ -331,7 +330,7 @@ impl SessionTx {
let store = self.get_relation(name)?;
let key = DataValue::Str(SmartString::from(name as &str));
let encoded = Tuple(vec![key]).encode_as_key(RelationId::SYSTEM);
self.tx.del(&encoded, Snd)?;
self.tx.del(&encoded)?;
let lower_bound = Tuple::default().encode_as_key(store.id);
let upper_bound = Tuple::default().encode_as_key(store.id.next());
Ok((lower_bound, upper_bound))
@ -340,7 +339,7 @@ impl SessionTx {
let new_key = DataValue::Str(new.name.clone());
let new_encoded = Tuple(vec![new_key]).encode_as_key(RelationId::SYSTEM);
if self.tx.exists(&new_encoded, true, Snd)? {
if self.tx.exists(&new_encoded, true)? {
bail!(RelNameConflictError(new.name.to_string()))
};
@ -352,8 +351,8 @@ impl SessionTx {
let mut meta_val = vec![];
rel.serialize(&mut Serializer::new(&mut meta_val)).unwrap();
self.tx.del(&old_encoded, Snd)?;
self.tx.put(&new_encoded, &meta_val, Snd)?;
self.tx.del(&old_encoded)?;
self.tx.put(&new_encoded, &meta_val)?;
Ok(())
}

@ -4,7 +4,6 @@ use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use miette::Result;
use cozorocks::Tx;
use cozorocks::CfHandle::Snd;
use crate::data::program::MagicSymbol;
use crate::data::symb::Symbol;
@ -42,7 +41,7 @@ impl SessionTx {
pub(crate) fn load_last_relation_store_id(&self) -> Result<RelationId> {
let tuple = Tuple(vec![DataValue::Null]);
let t_encoded = tuple.encode_as_key(RelationId::SYSTEM);
let found = self.tx.get(&t_encoded, false, Snd)?;
let found = self.tx.get(&t_encoded, false)?;
Ok(match found {
None => RelationId::SYSTEM,
Some(slice) => RelationId::raw_decode(&slice),

Loading…
Cancel
Save