new C++ interop

main
Ziyang Hu 2 years ago
parent 45522ee043
commit 41d89d0415

@ -6,9 +6,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
autocxx = "0.22.3"
cxx = "1.0"
cxx = "1.0.69"
[build-dependencies]
autocxx-build = "0.22.3"
miette = { version="4.3", features=["fancy"] } # optional but gives nicer error messages!
cxx-build = "1.0.69"

@ -0,0 +1,13 @@
//
// Created by Ziyang Hu on 2022/7/3.
//
#ifndef COZOROCKS_BRIDGE_H
#define COZOROCKS_BRIDGE_H
#include "db.h"
#include "slice.h"
#include "tx.h"
#include "status.h"
#endif //COZOROCKS_BRIDGE_H

@ -0,0 +1,30 @@
//
// Created by Ziyang Hu on 2022/7/3.
//
#ifndef COZOROCKS_ROCKS_BRIDGE_H
#define COZOROCKS_ROCKS_BRIDGE_H
#include "rust/cxx.h"
#include "rocksdb/db.h"
#include "rocksdb/slice.h"
#include "rocksdb/options.h"
#include "rocksdb/utilities/transaction.h"
#include "rocksdb/utilities/transaction_db.h"
#include "rocksdb/utilities/optimistic_transaction_db.h"
#include "rocksdb/table.h"
#include "rocksdb/filter_policy.h"
#include "rocksdb/slice_transform.h"
using namespace rocksdb;
using namespace std;
struct RdbStatus;
struct DbOpts;
typedef Status::Code StatusCode;
typedef Status::SubCode StatusSubCode;
typedef Status::Severity StatusSeverity;
#endif //COZOROCKS_ROCKS_BRIDGE_H

@ -0,0 +1,115 @@
//
// Created by Ziyang Hu on 2022/7/3.
//
#include <iostream>
#include "db.h"
#include "cozorocks/src/bridge/mod.rs.h"
shared_ptr<RocksDb> open_db(const DbOpts &opts, RdbStatus &status) {
auto options = make_unique<Options>();
if (opts.prepare_for_bulk_load) {
options->PrepareForBulkLoad();
}
if (opts.increase_parallelism > 0) {
options->IncreaseParallelism(opts.increase_parallelism);
}
if (opts.optimize_level_style_compaction) {
options->OptimizeLevelStyleCompaction();
}
options->create_if_missing = opts.create_if_missing;
options->paranoid_checks = opts.paranoid_checks;
if (opts.enable_blob_files) {
options->enable_blob_files = true;
options->min_blob_size = opts.min_blob_size;
options->blob_file_size = opts.blob_file_size;
options->enable_blob_garbage_collection = opts.enable_blob_garbage_collection;
}
if (opts.use_bloom_filter) {
BlockBasedTableOptions table_options;
table_options.filter_policy.reset(NewBloomFilterPolicy(opts.bloom_filter_bits_per_key, false));
table_options.whole_key_filtering = opts.bloom_filter_whole_key_filtering;
options->table_factory.reset(NewBlockBasedTableFactory(table_options));
}
if (opts.use_capped_prefix_extractor) {
options->prefix_extractor.reset(NewCappedPrefixTransform(opts.capped_prefix_extractor_len));
}
if (opts.use_fixed_prefix_extractor) {
options->prefix_extractor.reset(NewFixedPrefixTransform(opts.fixed_prefix_extractor_len));
}
RustComparator *cmp = nullptr;
if (opts.comparator_impl != nullptr) {
cmp = new RustComparator(
string(opts.comparator_name),
opts.comparator_different_bytes_can_be_equal,
opts.comparator_impl);
options->comparator = cmp;
}
shared_ptr<RocksDb> db_wrapper = shared_ptr<RocksDb>(nullptr);
if (opts.optimistic) {
auto db = new OptimisticRocksDb();
db->options = std::move(options);
db->db_path = string(opts.db_path);
db->comparator.reset(cmp);
OptimisticTransactionDB *txn_db = nullptr;
write_status(OptimisticTransactionDB::Open(*db->options, db->db_path, &txn_db), status);
db->db.reset(txn_db);
db->destroy_on_exit = opts.destroy_on_exit;
db_wrapper.reset(db);
} else {
auto db = new PessimisticRocksDb();
db->options = std::move(options);
db->db_path = string(opts.db_path);
db->tdb_opts = make_unique<TransactionDBOptions>();
db->comparator.reset(cmp);
TransactionDB *txn_db = nullptr;
write_status(TransactionDB::Open(*db->options, *db->tdb_opts, db->db_path, &txn_db), status);
db->db.reset(txn_db);
db->destroy_on_exit = opts.destroy_on_exit;
db_wrapper.reset(db);
}
return db_wrapper;
}
unique_ptr<RdbTx> OptimisticRocksDb::start_txn() {
return unique_ptr<RdbTx>(nullptr);
}
unique_ptr<RdbTx> PessimisticRocksDb::start_txn() {
return unique_ptr<RdbTx>(nullptr);
}
PessimisticRocksDb::~PessimisticRocksDb() {
if (destroy_on_exit) {
cerr << "destroying database on exit: " << db_path << endl;
auto status = db->Close();
if (!status.ok()) {
cerr << status.ToString() << endl;
}
db.reset();
auto status2 = DestroyDB(db_path, *options);
if (!status2.ok()) {
cerr << status.ToString() << endl;
}
}
}
OptimisticRocksDb::~OptimisticRocksDb() {
if (destroy_on_exit) {
cerr << "destroying database on exit: " << db_path << endl;
auto status = db->Close();
if (!status.ok()) {
cerr << status.ToString() << endl;
}
db.reset();
auto status2 = DestroyDB(db_path, *options);
if (!status2.ok()) {
cerr << status.ToString() << endl;
}
}
}

@ -0,0 +1,76 @@
//
// Created by Ziyang Hu on 2022/7/3.
//
#ifndef COZOROCKS_DB_H
#define COZOROCKS_DB_H
#include "iostream"
#include "common.h"
#include "tx.h"
struct RocksDb {
unique_ptr<Comparator> comparator;
unique_ptr<Options> options;
bool destroy_on_exit;
string db_path;
virtual unique_ptr<RdbTx> start_txn() = 0;
inline const string &get_db_path() const {
return db_path;
}
};
struct OptimisticRocksDb : public RocksDb {
unique_ptr<OptimisticTransactionDB> db;
virtual unique_ptr<RdbTx> start_txn();
virtual ~OptimisticRocksDb();
};
struct PessimisticRocksDb : public RocksDb {
unique_ptr<TransactionDBOptions> tdb_opts;
unique_ptr<TransactionDB> db;
virtual unique_ptr<RdbTx> start_txn();
virtual ~PessimisticRocksDb();
};
typedef int8_t (*CmpFn)(const Slice &a, const Slice &b);
class RustComparator : public Comparator {
public:
inline RustComparator(string name_, bool can_different_bytes_be_equal_, uint8_t const *const f) :
name(name_),
can_different_bytes_be_equal(can_different_bytes_be_equal_) {
CmpFn f_ = CmpFn(f);
ext_cmp = f_;
}
inline int Compare(const Slice &a, const Slice &b) const {
return ext_cmp(a, b);
}
inline const char *Name() const {
return name.c_str();
}
inline virtual bool CanKeysWithDifferentByteContentsBeEqual() const {
return can_different_bytes_be_equal;
}
inline void FindShortestSeparator(string *, const Slice &) const {}
inline void FindShortSuccessor(string *) const {}
string name;
CmpFn ext_cmp;
bool can_different_bytes_be_equal;
};
shared_ptr<RocksDb> open_db(const DbOpts &opts, RdbStatus &status);
#endif //COZOROCKS_DB_H

@ -0,0 +1,10 @@
//
// Created by Ziyang Hu on 2022/7/3.
//
#ifndef COZOROCKS_SLICE_H
#define COZOROCKS_SLICE_H
#include "common.h"
#endif //COZOROCKS_SLICE_H

@ -0,0 +1,21 @@
//
// Created by Ziyang Hu on 2022/7/3.
//
#include "status.h"
#include "cozorocks/src/bridge/mod.rs.h"
void write_status(const Status &rstatus, RdbStatus &status) {
status.code = rstatus.code();
status.subcode = rstatus.subcode();
status.severity = rstatus.severity();
if (!rstatus.ok() && !rstatus.IsNotFound()) {
status.message = rust::String::lossy(rstatus.ToString());
}
}
RdbStatus convert_status(const Status &status) {
RdbStatus ret;
write_status(status, ret);
return ret;
}

@ -0,0 +1,14 @@
//
// Created by Ziyang Hu on 2022/7/3.
//
#ifndef COZOROCKS_STATUS_H
#define COZOROCKS_STATUS_H
#include "common.h"
void write_status(const Status &rstatus, RdbStatus &status);
RdbStatus convert_status(const Status &status);
#endif //COZOROCKS_STATUS_H

@ -0,0 +1,14 @@
//
// Created by Ziyang Hu on 2022/7/3.
//
#ifndef COZOROCKS_TX_H
#define COZOROCKS_TX_H
#include "common.h"
struct RdbTx {
};
#endif //COZOROCKS_TX_H

@ -1,8 +1,5 @@
fn main() -> miette::Result<()> {
let mut b = autocxx_build::Builder::new("src/lib.rs", &["../deps/include", "src"]).build()?;
b.flag_if_supported("-std=c++17").compile("cozorocks"); // arbitrary library name, pick anything
println!("cargo:rustc-link-search=../deps/lib/");
fn main() {
println!("cargo:rustc-link-search=deps/lib/");
println!("cargo:rustc-link-search=/opt/homebrew/lib/");
println!("cargo:rustc-link-lib=rocksdb");
println!("cargo:rustc-link-lib=z");
@ -11,7 +8,20 @@ fn main() -> miette::Result<()> {
println!("cargo:rustc-link-lib=snappy");
println!("cargo:rustc-link-lib=zstd");
println!("cargo:rustc-link-lib=jemalloc");
println!("cargo:rerun-if-changed=src/lib.rs");
println!("cargo:rerun-if-changed=src/bridge.h");
Ok(())
println!("cargo:rerun-if-changed=cozorocks/src/bridge/mod.rs");
println!("cargo:rerun-if-changed=cozorocks/bridge/bridge.h");
println!("cargo:rerun-if-changed=cozorocks/bridge/common.h");
println!("cargo:rerun-if-changed=cozorocks/bridge/db.h");
println!("cargo:rerun-if-changed=cozorocks/bridge/db.cpp");
println!("cargo:rerun-if-changed=cozorocks/bridge/slice.h");
println!("cargo:rerun-if-changed=cozorocks/bridge/status.h");
println!("cargo:rerun-if-changed=cozorocks/bridge/status.cpp");
println!("cargo:rerun-if-changed=cozorocks/bridge/tx.h");
cxx_build::bridge("src/bridge/mod.rs")
.files(["bridge/status.cpp", "bridge/db.cpp"])
.include("../deps/include")
.include("bridge")
.flag_if_supported("-std=c++17")
.compile("cozorocks");
}

@ -1,288 +0,0 @@
//
// Created by Ziyang Hu on 2022/6/29.
//
#ifndef COZOROCKS_ADDITIONS_H
#define COZOROCKS_ADDITIONS_H
#include "rocksdb/db.h"
#include "rocksdb/utilities/transaction.h"
#include "rocksdb/utilities/transaction_db.h"
#include "rocksdb/utilities/optimistic_transaction_db.h"
#include "rocksdb/table.h"
#include "rocksdb/filter_policy.h"
#include "rocksdb/slice_transform.h"
namespace rocksdb_additions {
using namespace std;
using namespace rocksdb;
// for write options
// force generation
unique_ptr<Iterator> _u1() {
return unique_ptr<Iterator>(nullptr);
}
unique_ptr<Transaction> _u2() {
return unique_ptr<Transaction>(nullptr);
}
inline void set_w_opts_sync(WriteOptions &opts, bool v) {
Status s;
opts.sync = v;
}
inline void set_w_opts_disable_wal(WriteOptions &opts, bool v) {
opts.disableWAL = v;
}
inline void set_w_opts_low_pri(WriteOptions &opts, bool v) {
opts.low_pri = v;
}
// for read options
inline void set_iterate_lower_bound(ReadOptions &opts, const Slice &lower_bound) {
opts.iterate_lower_bound = &lower_bound;
}
inline void set_iterate_upper_bound(ReadOptions &opts, const Slice &lower_bound) {
opts.iterate_upper_bound = &lower_bound;
}
inline void set_snapshot(ReadOptions &opts, const Snapshot &snapshot) {
opts.snapshot = &snapshot;
}
inline void set_r_opts_total_order_seek(ReadOptions &opts, bool total_order_seek) {
opts.total_order_seek = total_order_seek;
}
inline void set_r_opts_auto_prefix_mode(ReadOptions &opts, bool auto_prefix_mode) {
opts.auto_prefix_mode = auto_prefix_mode;
}
inline void set_r_opts_prefix_same_as_start(ReadOptions &opts, bool prefix_same_as_start) {
opts.prefix_same_as_start = prefix_same_as_start;
}
inline void set_r_opts_tailing(ReadOptions &opts, bool tailing) {
opts.tailing = tailing;
}
inline void set_r_opts_pin_data(ReadOptions &opts, bool pin_data) {
opts.pin_data = pin_data;
}
inline void set_r_opts_verify_checksums(ReadOptions &opts, bool verify_checksums) {
opts.verify_checksums = verify_checksums;
}
inline void set_r_opts_fill_cache(ReadOptions &opts, bool fill_cache) {
opts.fill_cache = fill_cache;
}
// for options
inline void set_opts_create_if_missing(Options &opts, bool v) {
opts.create_if_missing = v;
}
inline void set_opts_error_if_exists(Options &opts, bool v) {
opts.error_if_exists = v;
}
inline void set_opts_create_missing_column_families(Options &opts, bool v) {
opts.create_missing_column_families = v;
}
inline void set_opts_paranoid_checks(Options &opts, bool v) {
opts.paranoid_checks = v;
}
inline void set_opts_flush_verify_memtable_count(Options &opts, bool v) {
opts.flush_verify_memtable_count = v;
}
inline void set_opts_track_and_verify_wals_in_manifest(Options &opts, bool v) {
opts.track_and_verify_wals_in_manifest = v;
}
inline void set_opts_verify_sst_unique_id_in_manifest(Options &opts, bool v) {
opts.verify_sst_unique_id_in_manifest = v;
}
inline void set_opts_bloom_filter(Options &options, const double bits_per_key, const bool whole_key_filtering) {
BlockBasedTableOptions table_options;
table_options.filter_policy.reset(NewBloomFilterPolicy(bits_per_key, false));
table_options.whole_key_filtering = whole_key_filtering;
options.table_factory.reset(
NewBlockBasedTableFactory(
table_options));
}
inline void set_opts_capped_prefix_extractor(Options &options, const size_t cap_len) {
options.prefix_extractor.reset(NewCappedPrefixTransform(cap_len));
}
inline void set_opts_comparator(Options &inner, const Comparator &cmp_obj) {
inner.comparator = &cmp_obj;
}
inline void set_opts_enable_blob_files(Options &inner, bool v) {
inner.enable_blob_files = v;
}
inline void set_opts_min_blob_size(Options &inner, uint64_t size) {
inner.min_blob_size = size;
}
inline void set_opts_blob_file_size(Options &inner, uint64_t size) {
inner.blob_file_size = size;
}
inline void set_opts_enable_blob_garbage_collection(Options &inner, bool v) {
inner.enable_blob_garbage_collection = v;
}
inline unique_ptr<OptimisticTransactionDBOptions> new_odb_opts() {
return make_unique<OptimisticTransactionDBOptions>();
}
inline unique_ptr<TransactionDBOptions> new_tdb_opts() {
return make_unique<TransactionDBOptions>();
}
// otopts
inline void set_otopts_comparator(OptimisticTransactionOptions &opts, Comparator &cmp) {
opts.cmp = &cmp;
}
// database
enum DbKind {
RAW = 0,
PESSIMISTIC = 1,
OPTIMISTIC = 2,
};
struct DbBridge {
mutable unique_ptr<DB> db;
mutable TransactionDB *tdb;
mutable OptimisticTransactionDB *odb;
bool is_odb;
DbBridge(DB *db_) : db(db_) {}
DbBridge(TransactionDB *db_) : db(db_), tdb(db_) {}
DbBridge(OptimisticTransactionDB *db_) : db(db_), odb(db_) {}
DbKind kind() const {
if (tdb != nullptr) {
return DbKind::PESSIMISTIC;
} else if (odb != nullptr) {
return DbKind::OPTIMISTIC;
} else {
return DbKind::RAW;
}
}
DB *inner_db() const {
return db.get();
}
TransactionDB *inner_tdb() const {
return tdb;
}
OptimisticTransactionDB *inner_odb() const {
return odb;
}
};
inline shared_ptr<DbBridge>
open_db_raw(const Options &options, const string path, Status &status) {
DB *db = nullptr;
status = DB::Open(options, path, &db);
return make_shared<DbBridge>(db);
}
inline shared_ptr<DbBridge>
open_tdb_raw(const Options &options,
const TransactionDBOptions &txn_db_options,
const string path,
Status &status) {
TransactionDB *txn_db = nullptr;
status = TransactionDB::Open(options, txn_db_options, path, &txn_db);
return make_shared<DbBridge>(txn_db);
}
inline shared_ptr<DbBridge>
open_odb_raw(const Options &options, const string path, Status &status) {
OptimisticTransactionDB *txn_db = nullptr;
status = OptimisticTransactionDB::Open(options,
path,
&txn_db);
return make_shared<DbBridge>(txn_db);
}
// comparator
typedef int(*CmpFn)(const Slice &a, const Slice &b);
class RustComparator : public Comparator {
public:
inline RustComparator(string name_, bool can_different_bytes_be_equal_, void const *const f) :
name(name_),
can_different_bytes_be_equal(can_different_bytes_be_equal_) {
CmpFn f_ = CmpFn(f);
ext_cmp = f_;
}
inline int Compare(const Slice &a, const Slice &b) const {
return ext_cmp(a, b);
}
inline const char *Name() const {
return name.c_str();
}
inline virtual bool CanKeysWithDifferentByteContentsBeEqual() const {
return can_different_bytes_be_equal;
}
inline void FindShortestSeparator(string *, const Slice &) const {}
inline void FindShortSuccessor(string *) const {}
string name;
CmpFn ext_cmp;
bool can_different_bytes_be_equal;
};
inline unique_ptr<RustComparator>
new_rust_comparator(
string name_,
bool can_different_bytes_be_equal_,
void const *const f
) {
return make_unique<RustComparator>(name_, can_different_bytes_be_equal_, f);
}
}
#endif //COZOROCKS_ADDITIONS_H

@ -0,0 +1,159 @@
use crate::bridge::ffi::*;
use cxx::*;
use std::ptr::null;
#[derive(Default, Debug)]
struct DbBuilder<'a> {
opts: DbOpts<'a>,
}
impl<'a> Default for DbOpts<'a> {
fn default() -> Self {
Self {
db_path: "",
optimistic: false,
prepare_for_bulk_load: false,
increase_parallelism: 0,
optimize_level_style_compaction: false,
create_if_missing: false,
paranoid_checks: true,
enable_blob_files: false,
min_blob_size: 0,
blob_file_size: 1 << 28,
enable_blob_garbage_collection: false,
use_bloom_filter: false,
bloom_filter_bits_per_key: 0.0,
bloom_filter_whole_key_filtering: false,
use_capped_prefix_extractor: false,
capped_prefix_extractor_len: 0,
use_fixed_prefix_extractor: false,
fixed_prefix_extractor_len: 0,
comparator_impl: null(),
comparator_name: "",
comparator_different_bytes_can_be_equal: false,
destroy_on_exit: false,
}
}
}
impl<'a> DbBuilder<'a> {
pub fn path(mut self, path: &'a str) -> Self {
self.opts.db_path = path;
self
}
pub fn optimistic(mut self, val: bool) -> Self {
self.opts.optimistic = val;
self
}
pub fn prepare_for_bulk_load(mut self, val: bool) -> Self {
self.opts.prepare_for_bulk_load = val;
self
}
pub fn increase_parallelism(mut self, val: usize) -> Self {
self.opts.increase_parallelism = val;
self
}
pub fn optimize_level_style_compaction(mut self, val: bool) -> Self {
self.opts.optimize_level_style_compaction = val;
self
}
pub fn create_if_missing(mut self, val: bool) -> Self {
self.opts.create_if_missing = val;
self
}
pub fn paranoid_checks(mut self, val: bool) -> Self {
self.opts.paranoid_checks = val;
self
}
pub fn enable_blob_files(
mut self,
enable: bool,
min_blob_size: usize,
blob_file_size: usize,
garbage_collection: bool,
) -> Self {
self.opts.enable_blob_files = enable;
self.opts.min_blob_size = min_blob_size;
self.opts.blob_file_size = blob_file_size;
self.opts.enable_blob_garbage_collection = garbage_collection;
self
}
pub fn use_bloom_filter(
mut self,
enable: bool,
bits_per_key: f64,
whole_key_filtering: bool,
) -> Self {
self.opts.use_bloom_filter = enable;
self.opts.bloom_filter_bits_per_key = bits_per_key;
self.opts.bloom_filter_whole_key_filtering = whole_key_filtering;
self
}
pub fn use_capped_prefix_extractor(mut self, enable: bool, len: usize) -> Self {
self.opts.use_capped_prefix_extractor = enable;
self.opts.capped_prefix_extractor_len = len;
self
}
pub fn use_fixed_prefix_extractor(mut self, enable: bool, len: usize) -> Self {
self.opts.use_fixed_prefix_extractor = enable;
self.opts.fixed_prefix_extractor_len = len;
self
}
pub fn use_custom_comparator(
mut self,
name: &'a str,
cmp: fn(&[u8], &[u8]) -> i8,
different_bytes_can_be_equal: bool,
) -> Self {
self.opts.comparator_name = name;
self.opts.comparator_impl = unsafe { cmp as *const u8 };
self.opts.comparator_different_bytes_can_be_equal = different_bytes_can_be_equal;
self
}
pub fn destroy_on_exit(mut self, destroy: bool) -> Self {
self.opts.destroy_on_exit = destroy;
self
}
pub fn build(self) -> Result<SharedPtr<RocksDb>, RdbStatus> {
dbg!(&self.opts);
let mut status = RdbStatus::default();
let result = open_db(&self.opts, &mut status);
if status.is_ok() {
Ok(result)
} else {
Err(status)
}
}
}
#[cfg(test)]
mod tests {
use crate::bridge::db::DbBuilder;
fn test_comparator(a: &[u8], b: &[u8]) -> i8 {
use std::cmp::Ordering::*;
match a.cmp(b) {
Equal => 0,
Greater => 1,
Less => -1,
}
}
#[test]
fn creation() {
{
let db = DbBuilder::default()
.path("_test_db")
.optimistic(true)
.create_if_missing(true)
.use_custom_comparator("rusty_cmp", test_comparator, false)
.destroy_on_exit(true)
.build()
.unwrap();
}
for _ in 0..10000000 {
}
}
}

@ -0,0 +1,128 @@
pub(crate) mod db;
#[cxx::bridge]
pub(crate) mod ffi {
#[derive(Debug)]
pub struct DbOpts<'a> {
pub db_path: &'a str,
pub optimistic: bool,
pub prepare_for_bulk_load: bool,
pub increase_parallelism: usize,
pub optimize_level_style_compaction: bool,
pub create_if_missing: bool,
pub paranoid_checks: bool,
pub enable_blob_files: bool,
pub min_blob_size: usize,
pub blob_file_size: usize,
pub enable_blob_garbage_collection: bool,
pub use_bloom_filter: bool,
pub bloom_filter_bits_per_key: f64,
pub bloom_filter_whole_key_filtering: bool,
pub use_capped_prefix_extractor: bool,
pub capped_prefix_extractor_len: usize,
pub use_fixed_prefix_extractor: bool,
pub fixed_prefix_extractor_len: usize,
pub comparator_impl: *const u8,
pub comparator_name: &'a str,
pub comparator_different_bytes_can_be_equal: bool,
pub destroy_on_exit: bool,
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct RdbStatus {
pub code: StatusCode,
pub subcode: StatusSubCode,
pub severity: StatusSeverity,
pub message: String,
}
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum StatusCode {
kOk = 0,
kNotFound = 1,
kCorruption = 2,
kNotSupported = 3,
kInvalidArgument = 4,
kIOError = 5,
kMergeInProgress = 6,
kIncomplete = 7,
kShutdownInProgress = 8,
kTimedOut = 9,
kAborted = 10,
kBusy = 11,
kExpired = 12,
kTryAgain = 13,
kCompactionTooLarge = 14,
kColumnFamilyDropped = 15,
kMaxCode,
}
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum StatusSubCode {
kNone = 0,
kMutexTimeout = 1,
kLockTimeout = 2,
kLockLimit = 3,
kNoSpace = 4,
kDeadlock = 5,
kStaleFile = 6,
kMemoryLimit = 7,
kSpaceLimit = 8,
kPathNotFound = 9,
KMergeOperandsInsufficientCapacity = 10,
kManualCompactionPaused = 11,
kOverwritten = 12,
kTxnNotPrepared = 13,
kIOFenced = 14,
kMaxSubCode,
}
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum StatusSeverity {
kNoError = 0,
kSoftError = 1,
kHardError = 2,
kFatalError = 3,
kUnrecoverableError = 4,
kMaxSeverity,
}
unsafe extern "C++" {
include!("bridge.h");
type StatusCode;
type StatusSubCode;
type StatusSeverity;
type RocksDb;
fn get_db_path(self: &RocksDb) -> &CxxString;
fn open_db(builder: &DbOpts, status: &mut RdbStatus) -> SharedPtr<RocksDb>;
}
}
impl Default for ffi::RdbStatus {
fn default() -> Self {
ffi::RdbStatus {
code: ffi::StatusCode::kOk,
subcode: ffi::StatusSubCode::kNone,
severity: ffi::StatusSeverity::kNoError,
message: "".to_string(),
}
}
}
impl ffi::RdbStatus {
#[inline(always)]
pub fn is_ok(&self) -> bool {
self.code == ffi::StatusCode::kOk
}
#[inline(always)]
pub fn is_not_found(&self) -> bool {
self.code == ffi::StatusCode::kNotFound
}
#[inline(always)]
pub fn is_ok_or_not_found(&self) -> bool {
self.is_ok() || self.is_not_found()
}
}

@ -1,369 +1 @@
use autocxx::prelude::*;
use std::fmt::{Display, Formatter};
use std::os::raw::c_char;
use std::pin::Pin;
include_cpp! {
#include "bridge.h"
safety!(unsafe)
generate_ns!("rocksdb_additions")
generate_pod!("rocksdb::Slice")
generate!("rocksdb::ReadOptions")
generate!("rocksdb::WriteOptions")
generate!("rocksdb::Options")
generate!("rocksdb::DBOptions")
generate!("rocksdb::Status")
generate!("rocksdb::PinnableSlice")
generate!("rocksdb::TransactionOptions")
generate!("rocksdb::OptimisticTransactionOptions")
generate!("rocksdb::TransactionDBOptions")
generate!("rocksdb::OptimisticTransactionDBOptions")
generate!("rocksdb::FlushOptions")
generate!("rocksdb::Iterator")
generate!("rocksdb::Transaction")
generate!("rocksdb::TransactionDB")
generate!("rocksdb::OptimisticTransactionDB")
generate!("rocksdb::StackableDB")
generate!("rocksdb::DB")
generate!("rocksdb::Snapshot")
}
pub use autocxx::{c_int, c_void, WithinUniquePtr};
pub use cxx::{CxxString, CxxVector, SharedPtr, UniquePtr};
pub use ffi::rocksdb::Status_Code as StatusCode;
pub use ffi::rocksdb::Status_Severity as StatusSeverity;
pub use ffi::rocksdb::Status_SubCode as StatusSubCode;
pub use ffi::rocksdb::*;
pub use ffi::rocksdb_additions::*;
#[derive(Debug, Copy, Clone)]
pub struct DbStatus {
pub code: StatusCode,
pub subcode: StatusSubCode,
pub severity: StatusSeverity,
}
impl Display for DbStatus {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self)
}
}
impl std::error::Error for DbStatus {}
#[inline(always)]
fn convert_status(status: &ffi::rocksdb::Status) -> DbStatus {
let code = status.code();
let subcode = status.subcode();
let severity = status.severity();
DbStatus {
code,
subcode,
severity,
}
}
#[inline(always)]
pub fn convert_slice(src: &[u8]) -> Slice {
Slice {
data_: src.as_ptr() as *const c_char,
size_: src.len(),
}
}
#[inline(always)]
pub fn convert_slice_back(src: &Slice) -> &[u8] {
unsafe { std::slice::from_raw_parts(src.data() as *const u8, src.size()) }
}
pub type SnapshotPtr = *const Snapshot;
impl DbBridge {
pub fn new_raw_db(path: &str, opts: &Options) -> Result<SharedPtr<Self>, DbStatus> {
let mut status = Status::new().within_unique_ptr();
let bridge = open_db_raw(opts, path, status.pin_mut());
if !status.ok() {
Err(convert_status(&status))
} else {
Ok(bridge)
}
}
pub fn new_tdb(
path: &str,
opts: &Options,
tdb_opts: &TransactionDBOptions,
) -> Result<SharedPtr<Self>, DbStatus> {
let mut status = Status::new().within_unique_ptr();
let bridge = open_tdb_raw(opts, tdb_opts, path, status.pin_mut());
if !status.ok() {
Err(convert_status(&status))
} else {
Ok(bridge)
}
}
pub fn new_odb(
path: &str,
opts: &Options,
_odb_opts: &OptimisticTransactionDBOptions,
) -> Result<SharedPtr<Self>, DbStatus> {
let mut status = Status::new().within_unique_ptr();
let bridge = open_odb_raw(opts, path, status.pin_mut());
if !status.ok() {
Err(convert_status(&status))
} else {
Ok(bridge)
}
}
#[inline]
fn get_raw_db(&self) -> Pin<&mut DB> {
unsafe { Pin::new_unchecked(&mut *self.inner_db()) }
}
#[inline]
fn get_tdb(&self) -> Pin<&mut TransactionDB> {
debug_assert_eq!(self.kind(), DbKind::PESSIMISTIC);
unsafe { Pin::new_unchecked(&mut *self.inner_tdb()) }
}
#[inline]
fn get_odb(&self) -> Pin<&mut OptimisticTransactionDB> {
debug_assert_eq!(self.kind(), DbKind::OPTIMISTIC);
unsafe { Pin::new_unchecked(&mut *self.inner_odb()) }
}
#[inline]
pub fn p_txn(
&self,
write_options: &WriteOptions,
txn_options: &TransactionOptions,
) -> UniquePtr<Transaction> {
let tdb = self.get_tdb();
unsafe {
UniquePtr::from_raw(tdb.BeginTransaction(
write_options,
txn_options,
std::ptr::null_mut(),
))
}
}
#[inline]
pub fn o_txn(
&self,
write_options: &WriteOptions,
txn_options: &OptimisticTransactionOptions,
) -> UniquePtr<Transaction> {
let odb = self.get_odb();
unsafe {
UniquePtr::from_raw(odb.BeginTransaction(
write_options,
txn_options,
std::ptr::null_mut(),
))
}
}
#[inline]
pub fn get(
&self,
opts: &ReadOptions,
key: impl AsRef<[u8]>,
val: Pin<&mut PinnableSlice>,
) -> DbStatus {
let db = self.get_raw_db();
let key = convert_slice(key.as_ref());
let cf = db.DefaultColumnFamily();
moveit! { let status = unsafe {
let val = Pin::into_inner_unchecked(val) as *mut PinnableSlice;
db.Get1(opts, cf, &key, val)
}; }
convert_status(&status)
}
#[inline]
pub fn put(
&self,
opts: &WriteOptions,
key: impl AsRef<[u8]>,
val: impl AsRef<[u8]>,
) -> DbStatus {
let db = self.get_raw_db();
let key = convert_slice(key.as_ref());
let val = convert_slice(val.as_ref());
moveit! { let status = db.Put2(opts, &key, &val); }
convert_status(&status)
}
#[inline]
pub fn delete(&self, opts: &WriteOptions, key: impl AsRef<[u8]>) -> DbStatus {
let db = self.get_raw_db();
let key = convert_slice(key.as_ref());
moveit! { let status = db.Delete2(opts, &key); }
convert_status(&status)
}
#[inline]
pub fn delete_range(
&self,
opts: &WriteOptions,
start: impl AsRef<[u8]>,
end: impl AsRef<[u8]>,
) -> DbStatus {
let db = self.get_raw_db();
let start = convert_slice(start.as_ref());
let end = convert_slice(end.as_ref());
let cf = db.DefaultColumnFamily();
moveit! { let status = unsafe { db.DeleteRange(opts, cf, &start, &end) }; }
convert_status(&status)
}
#[inline]
pub fn iterator(&self, opts: &ReadOptions) -> UniquePtr<Iterator> {
let db = self.get_raw_db();
unsafe { UniquePtr::from_raw(db.NewIterator1(opts)) }
}
#[inline]
pub fn get_snapshot(&self) -> SnapshotPtr {
let db = self.get_raw_db();
db.GetSnapshot()
}
#[allow(clippy::not_unsafe_ptr_arg_deref)]
#[inline]
pub fn release_snapshot(&self, snapshot: SnapshotPtr) {
let db = self.get_raw_db();
unsafe { db.ReleaseSnapshot(snapshot) }
}
}
#[macro_export]
macro_rules! let_pinnable_slice {
($i:ident) => {
$crate::moveit! {
let mut $i = $crate::PinnableSlice::new();
}
};
}
#[macro_export]
macro_rules! let_write_opts {
($i:ident = {$( $opt_name:ident => $opt_val:expr ),*}) => {
$crate::moveit! {
let mut $i = $crate::WriteOptions::new();
}
let_write_opts! { @let_opts, $i, $( $opt_name, $opt_val, )* }
};
( @let_opts, $i:ident, ) => {};
( @let_opts, $i:ident, sync, $val:expr, $( $opt_name:ident, $opt_val:expr, )* ) => {
$crate::ffi::rocksdb_additions::set_w_opts_sync($i.as_mut(), $val);
let_write_opts! { @let_opts, $i, $( $opt_name, $opt_val, )* }
};
( @let_opts, $i:ident, disable_wal, $val:expr, $( $opt_name:ident, $opt_val:expr, )* ) => {
$crate::ffi::rocksdb_additions::set_w_opts_disable_wal($i.as_mut(), $val);
let_write_opts! { @let_opts, $i, $( $opt_name, $opt_val, )* }
};
( @let_opts, $i:ident, low_pri, $val:expr, $( $opt_name:ident, $opt_val:expr, )* ) => {
$crate::ffi::rocksdb_additions::set_w_opts_low_pri($i.as_mut(), $val);
let_write_opts! { @let_opts, $i, $( $opt_name, $opt_val, )* }
};
}
#[macro_export]
macro_rules! let_read_opts {
($i:ident = {$( $opt_name:ident => $opt_val:expr ),*}) => {
$crate::moveit! {
let mut $i = $crate::ReadOptions::new();
}
let_read_opts! { @let_opts, $i, $( $opt_name, $opt_val, )* }
};
( @let_opts, $i:ident, ) => {};
( @let_opts, $i:ident, lower_bound, $val:expr, $( $opt_name:ident, $opt_val:expr, )* ) => {
$crate::ffi::rocksdb_additions::set_iterate_lower_bound($i.as_mut(), &$crate::convert_slice($val.as_ref()));
let_read_opts! { @let_opts, $i, $( $opt_name, $opt_val, )* }
};
( @let_opts, $i:ident, upper_bound, $val:expr, $( $opt_name:ident, $opt_val:expr, )* ) => {
$crate::ffi::rocksdb_additions::set_iterate_upper_bound($i.as_mut(), &$crate::convert_slice($val.as_ref()));
let_read_opts! { @let_opts, $i, $( $opt_name, $opt_val, )* }
};
( @let_opts, $i:ident, snapshot, $val:expr, $( $opt_name:ident, $opt_val:expr, )* ) => {
$crate::ffi::rocksdb_additions::set_snapshot($i.as_mut(), $val);
let_read_opts! { @let_opts, $i, $( $opt_name, $opt_val, )* }
};
( @let_opts, $i:ident, total_order_seek, $val:expr, $( $opt_name:ident, $opt_val:expr, )* ) => {
$crate::ffi::rocksdb_additions::set_r_opts_total_order_seek($i.as_mut(), $val);
let_read_opts! { @let_opts, $i, $( $opt_name, $opt_val, )* }
};
( @let_opts, $i:ident, auto_prefix_mode, $val:expr, $( $opt_name:ident, $opt_val:expr, )* ) => {
$crate::ffi::rocksdb_additions::set_r_opts_auto_prefix_mode($i.as_mut(), $val);
let_read_opts! { @let_opts, $i, $( $opt_name, $opt_val, )* }
};
( @let_opts, $i:ident, prefix_same_as_start, $val:expr, $( $opt_name:ident, $opt_val:expr, )* ) => {
$crate::ffi::rocksdb_additions::set_r_opts_prefix_same_as_start($i.as_mut(), $val);
let_read_opts! { @let_opts, $i, $( $opt_name, $opt_val, )* }
};
( @let_opts, $i:ident, tailing, $val:expr, $( $opt_name:ident, $opt_val:expr, )* ) => {
$crate::ffi::rocksdb_additions::set_r_opts_tailing($i.as_mut(), $val);
let_read_opts! { @let_opts, $i, $( $opt_name, $opt_val, )* }
};
( @let_opts, $i:ident, pin_data, $val:expr, $( $opt_name:ident, $opt_val:expr, )* ) => {
$crate::ffi::rocksdb_additions::set_r_opts_pin_data($i.as_mut(), $val);
let_read_opts! { @let_opts, $i, $( $opt_name, $opt_val, )* }
};
( @let_opts, $i:ident, verify_checksums, $val:expr, $( $opt_name:ident, $opt_val:expr, )* ) => {
$crate::ffi::rocksdb_additions::set_r_opts_verify_checksums($i.as_mut(), $val);
let_read_opts! { @let_opts, $i, $( $opt_name, $opt_val, )* }
};
( @let_opts, $i:ident, fill_cache, $val:expr, $( $opt_name:ident, $opt_val:expr, )* ) => {
$crate::ffi::rocksdb_additions::set_r_opts_fill_cache($i.as_mut(), $val);
let_read_opts! { @let_opts, $i, $( $opt_name, $opt_val, )* }
};
}
unsafe impl Send for RustComparator {}
unsafe impl Sync for RustComparator {}
#[cfg(test)]
mod tests {
use super::ffi::rocksdb::{Options, ReadOptions, WriteOptions};
use super::*;
use std::mem::{size_of, size_of_val};
#[test]
fn it_works() {
dbg!(size_of::<ReadOptions>());
for i in 0..100000 {
let mut g_opts = Options::new().within_unique_ptr();
g_opts.pin_mut().OptimizeForSmallDb();
let lower_bound = "lower".as_bytes();
let upper_bound = "upper".as_bytes();
let should_tail = true;
let_read_opts!(r_opts = { lower_bound => lower_bound, tailing => should_tail, upper_bound => upper_bound });
let_write_opts!(w_opts = { disable_wal => true });
}
dbg!(size_of::<ReadOptions>());
// let cmp = RustComparator::new().within_unique_ptr();
#[no_mangle]
extern "C" fn rusty_cmp(
a: &ffi::rocksdb::Slice,
b: &ffi::rocksdb::Slice,
) -> autocxx::c_int {
dbg!(convert_slice_back(a));
dbg!(convert_slice_back(b));
autocxx::c_int(0)
}
let cmp = unsafe {
let f_ptr = rusty_cmp as *const autocxx::c_void;
new_rust_comparator("hello", false, f_ptr)
};
let a = convert_slice(&[1, 2, 3]);
let b = convert_slice(&[4, 5, 6, 7]);
cmp.Compare(&a, &b);
}
}
pub mod bridge;

@ -6,27 +6,27 @@ use crate::data::id::{EntityId, TxId};
use lazy_static::lazy_static;
use std::cmp::Ordering;
#[no_mangle]
extern "C" fn rusty_cmp(a: &cozorocks::Slice, b: &cozorocks::Slice) -> cozorocks::c_int {
let a = cozorocks::convert_slice_back(a);
let b = cozorocks::convert_slice_back(b);
cozorocks::c_int(match compare_key(a, b) {
Ordering::Greater => 1,
Ordering::Equal => 0,
Ordering::Less => -1,
})
}
// #[no_mangle]
// extern "C" fn rusty_cmp(a: &cozorocks::Slice, b: &cozorocks::Slice) -> cozorocks::c_int {
// let a = cozorocks::convert_slice_back(a);
// let b = cozorocks::convert_slice_back(b);
// cozorocks::c_int(match compare_key(a, b) {
// Ordering::Greater => 1,
// Ordering::Equal => 0,
// Ordering::Less => -1,
// })
// }
pub(crate) const DB_KEY_PREFIX_LEN: usize = 4;
lazy_static! {
pub(crate) static ref RUSTY_COMPARATOR: cozorocks::UniquePtr<cozorocks::RustComparator> = {
unsafe {
let f_ptr = rusty_cmp as *const cozorocks::c_void;
cozorocks::new_rust_comparator("cozo_rusty_cmp_v1", false, f_ptr)
}
};
}
//
// lazy_static! {
// pub(crate) static ref RUSTY_COMPARATOR: cozorocks::UniquePtr<cozorocks::RustComparator> = {
// unsafe {
// let f_ptr = rusty_cmp as *const cozorocks::c_void;
// cozorocks::new_rust_comparator("cozo_rusty_cmp_v1", false, f_ptr)
// }
// };
// }
macro_rules! return_if_resolved {
($o:expr) => {

@ -1,63 +1,93 @@
use crate::data::compare::DB_KEY_PREFIX_LEN;
use anyhow::Result;
use cozorocks::*;
use std::sync::atomic::{AtomicU32, AtomicU64};
use std::sync::{Arc, Mutex};
pub struct DbInstance {
pub destroy_on_close: bool,
db: SharedPtr<DbBridge>,
db_opts: UniquePtr<Options>,
tdb_opts: Option<UniquePtr<TransactionDBOptions>>,
odb_opts: Option<UniquePtr<OptimisticTransactionDBOptions>>,
path: String,
last_attr_id: Arc<AtomicU32>,
last_ent_id: Arc<AtomicU64>,
last_tx_id: Arc<AtomicU64>,
sessions: Mutex<Vec<Arc<Mutex<SessionHandle>>>>,
}
struct SessionHandle {
id: usize,
temp: SharedPtr<DbBridge>,
status: SessionStatus,
}
#[derive(Eq, PartialEq, Debug, Clone, Copy)]
pub enum SessionStatus {
Prepared,
Running,
Completed,
}
impl DbInstance {
pub fn new(path: &str, optimistic: bool, destroy_on_close: bool) -> Result<Self> {
let mut db_opts = Options::new().within_unique_ptr();
set_opts_create_if_missing(db_opts.pin_mut(), true);
set_opts_bloom_filter(db_opts.pin_mut(), 10., true);
set_opts_capped_prefix_extractor(db_opts.pin_mut(), DB_KEY_PREFIX_LEN);
let (db, tdb_opts, odb_opts) = if optimistic {
let o = new_odb_opts();
let db = DbBridge::new_odb(path, &db_opts, &o)?;
(db, None, Some(o))
} else {
let o = new_odb_opts();
let db = DbBridge::new_odb(path, &db_opts, &o)?;
(db, Some(new_tdb_opts()), None)
};
Ok(Self {
db,
db_opts,
tdb_opts,
odb_opts,
path: path.to_string(),
destroy_on_close,
last_attr_id: Arc::new(Default::default()),
last_ent_id: Arc::new(Default::default()),
last_tx_id: Arc::new(Default::default()),
sessions: Mutex::new(vec![]),
})
}
}
// use crate::data::compare::DB_KEY_PREFIX_LEN;
// use anyhow::Result;
// use cozorocks::*;
// use std::sync::atomic::{AtomicU32, AtomicU64};
// use std::sync::{Arc, Mutex};
//
// pub struct DbInstance {
// pub destroy_on_close: bool,
// db: SharedPtr<DbBridge>,
// db_opts: UniquePtr<Options>,
// tdb_opts: Option<UniquePtr<TransactionDBOptions>>,
// odb_opts: Option<UniquePtr<OptimisticTransactionDBOptions>>,
// path: String,
// last_attr_id: Arc<AtomicU32>,
// last_ent_id: Arc<AtomicU64>,
// last_tx_id: Arc<AtomicU64>,
// sessions: Mutex<Vec<Arc<Mutex<SessionHandle>>>>,
// }
//
// struct SessionHandle {
// id: usize,
// temp: SharedPtr<DbBridge>,
// status: SessionStatus,
// }
//
// #[derive(Eq, PartialEq, Debug, Clone, Copy)]
// pub enum SessionStatus {
// Prepared,
// Running,
// Completed,
// }
//
// impl DbInstance {
// pub fn new(path: &str, optimistic: bool, destroy_on_close: bool) -> Result<Self> {
// let mut db_opts = Options::new().within_unique_ptr();
// set_opts_create_if_missing(db_opts.pin_mut(), true);
// set_opts_bloom_filter(db_opts.pin_mut(), 10., true);
// set_opts_capped_prefix_extractor(db_opts.pin_mut(), DB_KEY_PREFIX_LEN);
//
// let (db, tdb_opts, odb_opts) = if optimistic {
// let o = new_odb_opts();
// // let db = DbBridge::new_odb(path, &db_opts, &o)?;
// let db = todo!();
// (db, None, Some(o))
// } else {
// let o = new_tdb_opts();
// let db = DbBridge::new_tdb(path, &db_opts, &o)?;
// // let db = todo!();
// (db, Some(new_tdb_opts()), None)
// };
// //
// // Ok(Self {
// // db,
// // db_opts,
// // tdb_opts,
// // odb_opts,
// // path: path.to_string(),
// // destroy_on_close,
// // last_attr_id: Arc::new(Default::default()),
// // last_ent_id: Arc::new(Default::default()),
// // last_tx_id: Arc::new(Default::default()),
// // sessions: Mutex::new(vec![]),
// // })
// todo!()
// }
// }
//
// #[cfg(test)]
// mod tests {
// use crate::data::compare::RUSTY_COMPARATOR;
// use super::*;
//
// #[test]
// fn test_create() {
// let mut opts = Options::new().within_unique_ptr();
// // set_opts_comparator(opts.pin_mut(), &RUSTY_COMPARATOR);
// set_opts_create_if_missing(opts.pin_mut(), true);
// set_opts_bloom_filter(opts.pin_mut(), 10., true);
// set_opts_capped_prefix_extractor(opts.pin_mut(), DB_KEY_PREFIX_LEN);
// let db_ = DbBridge::new_raw_db("_test", &opts).unwrap();
// //
// // let o = new_odb_opts();
// // let db = DbBridge::new_odb("_test2", &opts, &o).unwrap();
// //
// // let o = new_tdb_opts();
// // let db = DbBridge::new_tdb("_test21", &opts, &o).unwrap();
// //
// //
// // dbg!(12345);
//
// // let db = DbInstance::new("_test3", false, true).unwrap();
// }
// }

Loading…
Cancel
Save