revamp C++ interop

main
Ziyang Hu 2 years ago
parent e18d72db5c
commit a8ff97d7c9

@ -6,7 +6,10 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
#autocxx = "0.21.2"
cxx = "1.0.66" cxx = "1.0.66"
[build-dependencies] [build-dependencies]
cxx-build = "1.0.66" #autocxx-build = "0.21.2"
cxx-build = "1.0.66"
#miette = { version = "4.3", features = ["fancy"] }

@ -3,7 +3,7 @@
// //
#include "cozorocks.h" #include "cozorocks.h"
#include "cozorocks/src/lib.rs.h" #include "cozorocks/src/bridge.rs.h"
void write_status_impl(BridgeStatus &status, StatusCode code, StatusSubCode subcode, StatusSeverity severity, void write_status_impl(BridgeStatus &status, StatusCode code, StatusSubCode subcode, StatusSeverity severity,
int bridge_code) { int bridge_code) {

@ -13,6 +13,7 @@
#include "rocksdb/options.h" #include "rocksdb/options.h"
#include "rocksdb/utilities/transaction.h" #include "rocksdb/utilities/transaction.h"
#include "rocksdb/utilities/transaction_db.h" #include "rocksdb/utilities/transaction_db.h"
#include "rocksdb/utilities/optimistic_transaction_db.h"
typedef std::shared_mutex Lock; typedef std::shared_mutex Lock;
@ -28,6 +29,7 @@ using std::make_shared;
using std::string; using std::string;
using std::vector; using std::vector;
using std::unordered_map; using std::unordered_map;
using std::tuple;
struct BridgeStatus; struct BridgeStatus;
@ -43,26 +45,34 @@ inline rust::Slice<const uint8_t> convert_slice_back(const Slice &s) {
return rust::Slice(reinterpret_cast<const std::uint8_t *>(s.data()), s.size()); return rust::Slice(reinterpret_cast<const std::uint8_t *>(s.data()), s.size());
} }
struct ReadOptionsBridge {
mutable ReadOptions inner;
inline void do_set_verify_checksums(bool v) const { inline rust::Slice<const uint8_t> convert_pinnable_slice_back(const PinnableSlice &s) {
inner.verify_checksums = v; return rust::Slice(reinterpret_cast<const std::uint8_t *>(s.data()), s.size());
} }
void write_status_impl(BridgeStatus &status, StatusCode code, StatusSubCode subcode, StatusSeverity severity,
int bridge_code);
inline void do_set_total_order_seek(bool v) const { inline void write_status(Status &&rstatus, BridgeStatus &status) {
inner.total_order_seek = v; if (rstatus.code() != StatusCode::kOk || rstatus.subcode() != StatusSubCode::kNoSpace ||
rstatus.severity() != StatusSeverity::kNoError) {
write_status_impl(status, rstatus.code(), rstatus.subcode(), rstatus.severity(), 0);
} }
}; }
struct WriteOptionsBridge { void set_verify_checksums(ReadOptions &options, const bool v) {
mutable WriteOptions inner; options.verify_checksums = v;
}
void set_total_order_seek(ReadOptions &options, const bool v) {
options.total_order_seek = v;
}
void set_disable_wal(WriteOptions &options, const bool v) {
options.disableWAL = v;
}
public:
inline void do_set_disable_wal(bool v) const {
inner.disableWAL = v;
}
};
typedef rust::Fn<std::int8_t(rust::Slice<const std::uint8_t>, rust::Slice<const std::uint8_t>)> RustComparatorFn; typedef rust::Fn<std::int8_t(rust::Slice<const std::uint8_t>, rust::Slice<const std::uint8_t>)> RustComparatorFn;
@ -76,6 +86,7 @@ public:
return "RustComparator"; return "RustComparator";
} }
void FindShortestSeparator(std::string *, const rocksdb::Slice &) const {} void FindShortestSeparator(std::string *, const rocksdb::Slice &) const {}
void FindShortSuccessor(std::string *) const {} void FindShortSuccessor(std::string *) const {}
@ -92,73 +103,44 @@ public:
mutable RustComparatorFn rust_compare; mutable RustComparatorFn rust_compare;
}; };
struct OptionsBridge { inline unique_ptr<RustComparator> new_rust_comparator(rust::Str name, RustComparatorFn f) {
mutable Options inner; auto ret = make_unique<RustComparator>();
mutable RustComparator cmp_obj; ret->set_name(name);
ret->set_fn(f);
inline void do_prepare_for_bulk_load() const { return ret;
inner.PrepareForBulkLoad(); }
}
inline void do_increase_parallelism() const {
inner.IncreaseParallelism();
}
inline void do_optimize_level_style_compaction() const { inline void prepare_for_bulk_load(Options &inner) {
inner.OptimizeLevelStyleCompaction(); inner.PrepareForBulkLoad();
}; }
inline void do_set_create_if_missing(bool v) const { inline void increase_parallelism(Options &inner) {
inner.create_if_missing = v; inner.IncreaseParallelism();
} }
inline void do_set_comparator(rust::Str name, RustComparatorFn f) const { inline void optimize_level_style_compaction(Options &inner) {
cmp_obj = RustComparator(); inner.OptimizeLevelStyleCompaction();
cmp_obj.set_name(name);
cmp_obj.set_fn(f);
inner.comparator = &cmp_obj;
}
}; };
inline std::unique_ptr<ReadOptionsBridge> new_read_options() { inline void set_create_if_missing(Options &inner, bool v) {
return std::unique_ptr<ReadOptionsBridge>(new ReadOptionsBridge); inner.create_if_missing = v;
} }
inline std::unique_ptr<WriteOptionsBridge> new_write_options() { inline void set_comparator(Options &inner, const RustComparator &cmp_obj) {
return std::unique_ptr<WriteOptionsBridge>(new WriteOptionsBridge); inner.comparator = &cmp_obj;
} }
inline std::unique_ptr<OptionsBridge> new_options() { inline std::unique_ptr<ReadOptions> new_read_options() {
return std::unique_ptr<OptionsBridge>(new OptionsBridge); return std::make_unique<ReadOptions>();
} }
inline std::unique_ptr<WriteOptions> new_write_options() {
return std::make_unique<WriteOptions>();
}
struct PinnableSliceBridge { inline std::unique_ptr<Options> new_options() {
PinnableSlice inner; return std::make_unique<Options>();
inline rust::Slice<const std::uint8_t> as_bytes() const {
return convert_slice_back(inner);
}
};
struct SliceBridge {
Slice inner;
SliceBridge(Slice &&s) : inner(s) {}
inline rust::Slice<const std::uint8_t> as_bytes() const {
return convert_slice_back(inner);
}
};
void write_status_impl(BridgeStatus &status, StatusCode code, StatusSubCode subcode, StatusSeverity severity,
int bridge_code);
inline void write_status(Status &&rstatus, BridgeStatus &status) {
if (rstatus.code() != StatusCode::kOk || rstatus.subcode() != StatusSubCode::kNoSpace ||
rstatus.severity() != StatusSeverity::kNoError) {
write_status_impl(status, rstatus.code(), rstatus.subcode(), rstatus.severity(), 0);
}
} }
struct IteratorBridge { struct IteratorBridge {
@ -192,79 +174,139 @@ struct IteratorBridge {
inner->SeekForPrev(k); inner->SeekForPrev(k);
} }
inline std::unique_ptr<SliceBridge> key_raw() const { inline std::unique_ptr<Slice> key_raw() const {
return std::make_unique<SliceBridge>(inner->key()); return std::make_unique<Slice>(inner->key());
} }
inline std::unique_ptr<SliceBridge> value_raw() const { inline std::unique_ptr<Slice> value_raw() const {
return std::make_unique<SliceBridge>(inner->value()); return std::make_unique<Slice>(inner->value());
} }
BridgeStatus status() const; BridgeStatus status() const;
}; };
struct WriteBatchBridge { inline unique_ptr<TransactionOptions> new_transaction_options() {
mutable WriteBatch inner; return make_unique<TransactionOptions>();
}
inline void set_deadlock_detect(TransactionOptions &inner, bool v) {
inner.deadlock_detect = v;
}
inline unique_ptr<OptimisticTransactionOptions> new_optimistic_transaction_options(const RustComparator &compare) {
auto ret = make_unique<OptimisticTransactionOptions>();
ret->cmp = &compare;
return ret;
}
struct TransactionBridge {
DB *raw_db;
unique_ptr<Transaction> inner;
mutable unique_ptr<TransactionOptions> t_ops; // Put here to make sure ownership works
mutable unique_ptr<OptimisticTransactionOptions> o_ops; // same as above
mutable unique_ptr<ReadOptions> r_ops;
mutable unique_ptr<ReadOptions> raw_r_ops;
mutable unique_ptr<WriteOptions> w_ops;
mutable unique_ptr<WriteOptions> raw_w_ops;
inline void set_snapshot() const {
inner->SetSnapshot();
r_ops->snapshot = inner->GetSnapshot();
}
inline void commit(BridgeStatus &status) const {
write_status(inner->Commit(), status);
r_ops->snapshot = nullptr;
}
inline void rollback(BridgeStatus &status) const {
write_status(inner->Rollback(), status);
}
inline void batch_put_raw( inline void set_savepoint() const {
inner->SetSavePoint();
}
inline void rollback_to_savepoint(BridgeStatus &status) const {
write_status(inner->RollbackToSavePoint(), status);
}
inline void pop_savepoint(BridgeStatus &status) const {
write_status(inner->PopSavePoint(), status);
}
inline std::unique_ptr<PinnableSlice> get_txn(
const ColumnFamilyHandle &cf, const ColumnFamilyHandle &cf,
rust::Slice<const uint8_t> key, rust::Slice<const uint8_t> key,
rust::Slice<const uint8_t> val,
BridgeStatus &status BridgeStatus &status
) const { ) const {
auto pinnable_val = std::make_unique<PinnableSlice>();
write_status( write_status(
inner.Put(const_cast<ColumnFamilyHandle *>(&cf), inner->Get(*r_ops,
convert_slice(key), const_cast<ColumnFamilyHandle *>(&cf),
convert_slice(val)), convert_slice(key),
&*pinnable_val),
status status
); );
return pinnable_val;
} }
inline void batch_delete_raw( inline std::unique_ptr<PinnableSlice> get_for_update_txn(
const ColumnFamilyHandle &cf, const ColumnFamilyHandle &cf,
rust::Slice<const uint8_t> key, rust::Slice<const uint8_t> key,
BridgeStatus &status BridgeStatus &status
) const { ) const {
auto pinnable_val = std::make_unique<PinnableSlice>();
write_status( write_status(
inner.Delete(const_cast<ColumnFamilyHandle *>(&cf), inner->GetForUpdate(*r_ops,
convert_slice(key)), const_cast<ColumnFamilyHandle *>(&cf),
convert_slice(key),
&*pinnable_val),
status status
); );
return pinnable_val;
} }
};
inline unique_ptr<WriteBatchBridge> new_write_batch_raw() {
return make_unique<WriteBatchBridge>();
}
struct DBBridge {
mutable unique_ptr<DB> db;
mutable unordered_map<string, shared_ptr<ColumnFamilyHandle>> handles;
mutable Lock handle_lock;
DBBridge(DB *db_,
unordered_map<string, shared_ptr<ColumnFamilyHandle>> &&handles_) : db(db_), handles(handles_) {}
inline std::unique_ptr<PinnableSlice> get_raw(
const ColumnFamilyHandle &cf,
rust::Slice<const uint8_t> key,
BridgeStatus &status
) const {
auto pinnable_val = std::make_unique<PinnableSlice>();
write_status(
raw_db->Get(*r_ops,
const_cast<ColumnFamilyHandle *>(&cf),
convert_slice(key),
&*pinnable_val),
status
);
return pinnable_val;
}
inline shared_ptr<ColumnFamilyHandle> get_cf_handle_raw(const string &name) const { inline void put_txn(
ReadLock r_lock(handle_lock); const ColumnFamilyHandle &cf,
try { rust::Slice<const uint8_t> key,
return handles.at(name); rust::Slice<const uint8_t> val,
} catch (const std::out_of_range &) { BridgeStatus &status
return shared_ptr<ColumnFamilyHandle>(nullptr); ) const {
} write_status(
inner->Put(const_cast<ColumnFamilyHandle *>(&cf),
convert_slice(key),
convert_slice(val)),
status
);
} }
inline void put_raw( inline void put_raw(
const WriteOptionsBridge &options,
const ColumnFamilyHandle &cf, const ColumnFamilyHandle &cf,
rust::Slice<const uint8_t> key, rust::Slice<const uint8_t> key,
rust::Slice<const uint8_t> val, rust::Slice<const uint8_t> val,
BridgeStatus &status BridgeStatus &status
) const { ) const {
write_status( write_status(
db->Put(options.inner, raw_db->Put(
*raw_w_ops,
const_cast<ColumnFamilyHandle *>(&cf), const_cast<ColumnFamilyHandle *>(&cf),
convert_slice(key), convert_slice(key),
convert_slice(val)), convert_slice(val)),
@ -272,63 +314,138 @@ struct DBBridge {
); );
} }
inline void delete_raw( inline void del_txn(
const WriteOptionsBridge &options,
const ColumnFamilyHandle &cf, const ColumnFamilyHandle &cf,
rust::Slice<const uint8_t> key, rust::Slice<const uint8_t> key,
BridgeStatus &status BridgeStatus &status
) const { ) const {
write_status( write_status(
db->Delete(options.inner, inner->Delete(const_cast<ColumnFamilyHandle *>(&cf),
const_cast<ColumnFamilyHandle *>(&cf), convert_slice(key)),
convert_slice(key)),
status status
); );
} }
inline void write_raw( inline void del_raw(
const WriteOptionsBridge &options,
WriteBatchBridge &updates,
BridgeStatus &status
) const {
write_status(db->Write(options.inner, &updates.inner), status);
}
inline std::unique_ptr<PinnableSliceBridge> get_raw(
const ReadOptionsBridge &options,
const ColumnFamilyHandle &cf, const ColumnFamilyHandle &cf,
rust::Slice<const uint8_t> key, rust::Slice<const uint8_t> key,
BridgeStatus &status BridgeStatus &status
) const { ) const {
auto pinnable_val = std::make_unique<PinnableSliceBridge>();
write_status( write_status(
db->Get(options.inner, raw_db->Delete(
*raw_w_ops,
const_cast<ColumnFamilyHandle *>(&cf), const_cast<ColumnFamilyHandle *>(&cf),
convert_slice(key), convert_slice(key)),
&pinnable_val->inner),
status status
); );
return pinnable_val; }
inline std::unique_ptr<IteratorBridge> iterator_txn(
const ColumnFamilyHandle &cf) const {
return std::make_unique<IteratorBridge>(
inner->GetIterator(*r_ops, const_cast<ColumnFamilyHandle *>(&cf)));
} }
inline std::unique_ptr<IteratorBridge> iterator_raw( inline std::unique_ptr<IteratorBridge> iterator_raw(
const ReadOptionsBridge &options,
const ColumnFamilyHandle &cf) const { const ColumnFamilyHandle &cf) const {
return std::make_unique<IteratorBridge>(db->NewIterator(options.inner, const_cast<ColumnFamilyHandle *>(&cf))); return std::make_unique<IteratorBridge>(
raw_db->NewIterator(*raw_r_ops, const_cast<ColumnFamilyHandle *>(&cf)));
}
};
inline tuple<vector<string>, vector<ColumnFamilyDescriptor>>
get_cf_data(const Options &options,
const string &path) {
auto cf_names = std::vector<std::string>();
DB::ListColumnFamilies(options, path, &cf_names);
if (cf_names.empty()) {
cf_names.push_back(kDefaultColumnFamilyName);
}
std::vector<ColumnFamilyDescriptor> column_families;
for (auto el: cf_names) {
column_families.push_back(ColumnFamilyDescriptor(
el, options));
}
return std::make_tuple(cf_names, column_families);
}
struct TDBBridge {
mutable unique_ptr<StackableDB> db;
mutable TransactionDB *tdb;
mutable OptimisticTransactionDB *odb;
mutable unordered_map<string, shared_ptr<ColumnFamilyHandle>> handles;
mutable Lock handle_lock;
bool is_odb;
TDBBridge(StackableDB *db_,
TransactionDB *tdb_,
OptimisticTransactionDB *odb_,
unordered_map<string, shared_ptr<ColumnFamilyHandle>> &&handles_) :
db(db_), tdb(tdb_), odb(odb_), handles(handles_), handle_lock() {
is_odb = (tdb_ == nullptr);
}
inline unique_ptr<TransactionBridge> begin_t_transaction(
unique_ptr<WriteOptions> w_ops,
unique_ptr<WriteOptions> raw_w_ops,
unique_ptr<ReadOptions> r_ops,
unique_ptr<ReadOptions> raw_r_ops,
unique_ptr<TransactionOptions> txn_options) const {
auto ret = make_unique<TransactionBridge>();
ret->raw_db = tdb;
ret->r_ops = std::move(r_ops);
ret->w_ops = std::move(w_ops);
ret->raw_r_ops = std::move(raw_r_ops);
ret->raw_w_ops = std::move(raw_w_ops);
ret->t_ops = std::move(txn_options);
Transaction *txn = tdb->BeginTransaction(*ret->w_ops, *ret->t_ops);
ret->inner = unique_ptr<Transaction>(txn);
return ret;
} }
inline void create_column_family_raw(const OptionsBridge &options, const string &name, BridgeStatus &status) const { inline unique_ptr<TransactionBridge> begin_o_transaction(
unique_ptr<WriteOptions> w_ops,
unique_ptr<WriteOptions> raw_w_ops,
unique_ptr<ReadOptions> r_ops,
unique_ptr<ReadOptions> raw_r_ops,
unique_ptr<OptimisticTransactionOptions> txn_options) const {
auto ret = make_unique<TransactionBridge>();
ret->raw_db = odb;
ret->r_ops = std::move(r_ops);
ret->w_ops = std::move(w_ops);
ret->raw_r_ops = std::move(raw_r_ops);
ret->raw_w_ops = std::move(raw_w_ops);
ret->o_ops = std::move(txn_options);
Transaction *txn = odb->BeginTransaction(*ret->w_ops, *ret->o_ops);
ret->inner = unique_ptr<Transaction>(txn);
return ret;
}
inline shared_ptr<ColumnFamilyHandle> get_cf_handle_raw(const string &name) const {
ReadLock r_lock(handle_lock);
try {
return handles.at(name);
} catch (const std::out_of_range &) {
return shared_ptr<ColumnFamilyHandle>(nullptr);
}
}
inline void
create_column_family_raw(const Options &options, const string &name, BridgeStatus &status) const {
{ {
ReadLock r_lock(handle_lock); ReadLock r_lock(handle_lock);
if (handles.find(name) != handles.end()) { if (handles.find(name) != handles.end()) {
write_status_impl(status, StatusCode::kMaxCode, StatusSubCode::kMaxSubCode, StatusSeverity::kSoftError, write_status_impl(status, StatusCode::kMaxCode, StatusSubCode::kMaxSubCode,
StatusSeverity::kSoftError,
2); 2);
return; return;
} }
} }
WriteLock w_lock(handle_lock); WriteLock w_lock(handle_lock);
ColumnFamilyHandle *handle; ColumnFamilyHandle *handle;
auto s = db->CreateColumnFamily(options.inner, name, &handle); auto s = db->CreateColumnFamily(options, name, &handle);
write_status(std::move(s), status); write_status(std::move(s), status);
handles[name] = shared_ptr<ColumnFamilyHandle>(handle); handles[name] = shared_ptr<ColumnFamilyHandle>(handle);
} }
@ -341,7 +458,8 @@ struct DBBridge {
handles.erase(cf_it); handles.erase(cf_it);
write_status(std::move(s), status); write_status(std::move(s), status);
} else { } else {
write_status_impl(status, StatusCode::kMaxCode, StatusSubCode::kMaxSubCode, StatusSeverity::kSoftError, 3); write_status_impl(status, StatusCode::kMaxCode, StatusSubCode::kMaxSubCode, StatusSeverity::kSoftError,
3);
} }
// When should we call DestroyColumnFamilyHandle? // When should we call DestroyColumnFamilyHandle?
} }
@ -356,31 +474,33 @@ struct DBBridge {
} }
}; };
inline unique_ptr<TransactionDBOptions> new_tdb_options() {
return make_unique<TransactionDBOptions>();
}
inline std::unique_ptr<DBBridge> inline unique_ptr<OptimisticTransactionDBOptions> new_odb_options() {
open_db_raw(const OptionsBridge &options, return make_unique<OptimisticTransactionDBOptions>();
const string &path, }
BridgeStatus &status) {
auto cf_names = std::vector<std::string>();
DB::ListColumnFamilies(options.inner, path, &cf_names);
if (cf_names.empty()) {
cf_names.push_back(kDefaultColumnFamilyName);
}
std::vector<ColumnFamilyDescriptor> column_families; inline unique_ptr<TDBBridge>
open_tdb_raw(const Options &options,
const TransactionDBOptions &txn_db_options,
const string &path,
BridgeStatus &status) {
auto cf_info = get_cf_data(options, path);
auto cf_names = std::get<0>(cf_info);
auto column_families = std::get<1>(cf_info);
for (auto el: cf_names) {
column_families.push_back(ColumnFamilyDescriptor(
el, options.inner));
}
std::vector<ColumnFamilyHandle *> handles; std::vector<ColumnFamilyHandle *> handles;
TransactionDB *txn_db;
DB *db_ptr; Status s = TransactionDB::Open(options, txn_db_options, path,
Status s = DB::Open(options.inner, path, column_families, &handles, &db_ptr); column_families, &handles,
&txn_db);
auto ok = s.ok(); auto ok = s.ok();
write_status(std::move(s), status); write_status(std::move(s), status);
unordered_map<string, shared_ptr<ColumnFamilyHandle>> handle_map; unordered_map<string, shared_ptr<ColumnFamilyHandle>> handle_map;
if (ok) { if (ok) {
assert(handles.size() == cf_names.size()); assert(handles.size() == cf_names.size());
@ -388,5 +508,38 @@ open_db_raw(const OptionsBridge &options,
handle_map[cf_names[i]] = shared_ptr<ColumnFamilyHandle>(handles[i]); handle_map[cf_names[i]] = shared_ptr<ColumnFamilyHandle>(handles[i]);
} }
} }
return std::make_unique<DBBridge>(db_ptr, std::move(handle_map));
return make_unique<TDBBridge>(txn_db, txn_db, nullptr, std::move(handle_map));
} }
inline unique_ptr<TDBBridge>
open_odb_raw(const Options &options,
const OptimisticTransactionDBOptions &txn_db_options,
const string &path,
BridgeStatus &status) {
auto cf_info = get_cf_data(options, path);
auto cf_names = std::get<0>(cf_info);
auto column_families = std::get<1>(cf_info);
std::vector<ColumnFamilyHandle *> handles;
OptimisticTransactionDB *txn_db;
Status s = OptimisticTransactionDB::Open(options, txn_db_options, path,
column_families, &handles,
&txn_db);
auto ok = s.ok();
write_status(std::move(s), status);
unordered_map<string, shared_ptr<ColumnFamilyHandle>> handle_map;
if (ok) {
assert(handles.size() == cf_names.size());
for (size_t i = 0; i < handles.size(); ++i) {
handle_map[cf_names[i]] = shared_ptr<ColumnFamilyHandle>(handles[i]);
}
}
return make_unique<TDBBridge>(txn_db, nullptr, txn_db, std::move(handle_map));
}

@ -1,11 +1,19 @@
fn main() { fn main() {
cxx_build::bridge("src/lib.rs") cxx_build::bridge("src/bridge.rs")
.file("bridge/cozorocks.cc") .file("bridge/cozorocks.cc")
.include("../deps/include") .include("../deps/include")
.include("bridge") .include("bridge")
.flag_if_supported("-std=c++17") .flag_if_supported("-std=c++17")
.compile("cozorocks"); .compile("cozorocks-cxx");
// let mut b = autocxx_build::Builder::new(
// "src/bridge.rs",
// &["../deps/include", "bridge"])
// .extra_clang_args(&["-std=c++17"])
// .build()?;
// // This assumes all your C++ bindings are in main.rs
// b.flag_if_supported("-std=c++17")
// .compile("cozorocks-autocxx"); // arbitrary library name, pick anything
println!("cargo:rustc-link-search=deps/lib/"); println!("cargo:rustc-link-search=deps/lib/");
println!("cargo:rustc-link-lib=rocksdb"); println!("cargo:rustc-link-lib=rocksdb");
println!("cargo:rustc-link-lib=z"); println!("cargo:rustc-link-lib=z");
@ -13,7 +21,7 @@ fn main() {
println!("cargo:rustc-link-lib=lz4"); println!("cargo:rustc-link-lib=lz4");
println!("cargo:rustc-link-lib=snappy"); println!("cargo:rustc-link-lib=snappy");
println!("cargo:rustc-link-lib=zstd"); println!("cargo:rustc-link-lib=zstd");
println!("cargo:rerun-if-changed=src/main.rs"); println!("cargo:rerun-if-changed=src/bridge.rs");
println!("cargo:rerun-if-changed=bridge/cozorocks.cc"); println!("cargo:rerun-if-changed=bridge/cozorocks.cc");
println!("cargo:rerun-if-changed=bridge/cozorocks.h"); println!("cargo:rerun-if-changed=bridge/cozorocks.h");
} }

@ -0,0 +1,175 @@
#[cxx::bridge]
mod ffi {
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum StatusBridgeCode {
OK = 0,
LOCK_ERROR = 1,
EXISTING_ERROR = 2,
NOT_FOUND_ERROR = 3,
}
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub struct BridgeStatus {
pub code: StatusCode,
pub subcode: StatusSubCode,
pub severity: StatusSeverity,
pub bridge_code: StatusBridgeCode,
}
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum StatusCode {
kOk = 0,
kNotFound = 1,
kCorruption = 2,
kNotSupported = 3,
kInvalidArgument = 4,
kIOError = 5,
kMergeInProgress = 6,
kIncomplete = 7,
kShutdownInProgress = 8,
kTimedOut = 9,
kAborted = 10,
kBusy = 11,
kExpired = 12,
kTryAgain = 13,
kCompactionTooLarge = 14,
kColumnFamilyDropped = 15,
kMaxCode,
}
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum StatusSubCode {
kNone = 0,
kMutexTimeout = 1,
kLockTimeout = 2,
kLockLimit = 3,
kNoSpace = 4,
kDeadlock = 5,
kStaleFile = 6,
kMemoryLimit = 7,
kSpaceLimit = 8,
kPathNotFound = 9,
KMergeOperandsInsufficientCapacity = 10,
kManualCompactionPaused = 11,
kOverwritten = 12,
kTxnNotPrepared = 13,
kIOFenced = 14,
kMaxSubCode,
}
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum StatusSeverity {
kNoError = 0,
kSoftError = 1,
kHardError = 2,
kFatalError = 3,
kUnrecoverableError = 4,
kMaxSeverity,
}
unsafe extern "C++" {
include!("cozorocks.h");
type StatusCode;
type StatusSubCode;
type StatusSeverity;
type Slice;
type PinnableSlice;
fn convert_slice_back(s: &Slice) -> &[u8];
fn convert_pinnable_slice_back(s: &PinnableSlice) -> &[u8];
type Options;
fn new_options() -> UniquePtr<Options>;
fn prepare_for_bulk_load(o: Pin<&mut Options>);
fn increase_parallelism(o: Pin<&mut Options>);
fn optimize_level_style_compaction(o: Pin<&mut Options>);
fn set_create_if_missing(o: Pin<&mut Options>, v: bool);
fn set_comparator(o: Pin<&mut Options>, cmp: &RustComparator);
type ReadOptions;
fn new_read_options() -> UniquePtr<ReadOptions>;
fn set_verify_checksums(o: Pin<&mut ReadOptions>, v: bool);
fn set_total_order_seek(o: Pin<&mut ReadOptions>, v: bool);
type WriteOptions;
fn new_write_options() -> UniquePtr<WriteOptions>;
fn set_disable_wal(o: Pin<&mut WriteOptions>, v: bool);
type TransactionOptions;
fn new_transaction_options() -> UniquePtr<TransactionOptions>;
fn set_deadlock_detect(o: Pin<&mut TransactionOptions>, v: bool);
type OptimisticTransactionOptions;
fn new_optimistic_transaction_options(cmp: &RustComparator) -> UniquePtr<OptimisticTransactionOptions>;
type TransactionDBOptions;
fn new_tdb_options() -> UniquePtr<TransactionDBOptions>;
type OptimisticTransactionDBOptions;
fn new_odb_options() -> UniquePtr<OptimisticTransactionDBOptions>;
type RustComparator;
fn new_rust_comparator(name: &str, cmp: fn(&[u8], &[u8]) -> i8) -> UniquePtr<RustComparator>;
pub type IteratorBridge;
fn seek_to_first(self: &IteratorBridge);
fn seek_to_last(self: &IteratorBridge);
fn next(self: &IteratorBridge);
fn is_valid(self: &IteratorBridge) -> bool;
fn do_seek(self: &IteratorBridge, key: &[u8]);
fn do_seek_for_prev(self: &IteratorBridge, key: &[u8]);
fn key_raw(self: &IteratorBridge) -> UniquePtr<Slice>;
fn value_raw(self: &IteratorBridge) -> UniquePtr<Slice>;
fn status(self: &IteratorBridge) -> BridgeStatus;
type TransactionBridge;
fn set_snapshot(self: &TransactionBridge);
fn commit(self: &TransactionBridge, status: &mut BridgeStatus);
fn rollback(self: &TransactionBridge, status: &mut BridgeStatus);
fn set_savepoint(self: &TransactionBridge);
fn rollback_to_savepoint(self: &TransactionBridge, status: &mut BridgeStatus);
fn pop_savepoint(self: &TransactionBridge, status: &mut BridgeStatus);
fn get_txn(self: &TransactionBridge, cf: &ColumnFamilyHandle, key: &[u8],
status: &mut BridgeStatus) -> UniquePtr<PinnableSlice>;
fn get_for_update_txn(self: &TransactionBridge, cf: &ColumnFamilyHandle, key: &[u8],
status: &mut BridgeStatus) -> UniquePtr<PinnableSlice>;
fn get_raw(self: &TransactionBridge, cf: &ColumnFamilyHandle, key: &[u8],
status: &mut BridgeStatus) -> UniquePtr<PinnableSlice>;
fn put_txn(self: &TransactionBridge, cf: &ColumnFamilyHandle, key: &[u8], val: &[u8],
status: &mut BridgeStatus);
fn put_raw(self: &TransactionBridge, cf: &ColumnFamilyHandle, key: &[u8], val: &[u8],
status: &mut BridgeStatus);
fn del_txn(self: &TransactionBridge, cf: &ColumnFamilyHandle, key: &[u8],
status: &mut BridgeStatus);
fn del_raw(self: &TransactionBridge, cf: &ColumnFamilyHandle, key: &[u8],
status: &mut BridgeStatus);
fn iterator_txn(self: &TransactionBridge, cf: &ColumnFamilyHandle) -> UniquePtr<IteratorBridge>;
fn iterator_raw(self: &TransactionBridge, cf: &ColumnFamilyHandle) -> UniquePtr<IteratorBridge>;
pub type ColumnFamilyHandle;
type TDBBridge;
fn begin_t_transaction(self: &TDBBridge,
w_ops: UniquePtr<WriteOptions>,
raw_w_ops: UniquePtr<WriteOptions>,
r_ops: UniquePtr<ReadOptions>,
raw_r_ops: UniquePtr<ReadOptions>,
txn_options: UniquePtr<TransactionOptions>) -> UniquePtr<TransactionBridge>;
fn begin_o_transaction(self: &TDBBridge,
w_ops: UniquePtr<WriteOptions>,
raw_w_ops: UniquePtr<WriteOptions>,
r_ops: UniquePtr<ReadOptions>,
raw_r_ops: UniquePtr<ReadOptions>,
txn_options: UniquePtr<OptimisticTransactionOptions>) -> UniquePtr<TransactionBridge>;
fn get_cf_handle_raw(self: &TDBBridge, name: &CxxString) -> SharedPtr<ColumnFamilyHandle>;
fn create_column_family_raw(self: &TDBBridge, options: &Options, name: &CxxString, status: &mut BridgeStatus);
fn drop_column_family_raw(self: &TDBBridge, name: &CxxString, status: &mut BridgeStatus);
fn get_column_family_names_raw(self: &TDBBridge) -> UniquePtr<CxxVector<CxxString>>;
fn open_tdb_raw(options: &Options,
txn_options: &TransactionDBOptions,
path: &CxxString,
status: &mut BridgeStatus) -> UniquePtr<TDBBridge>;
fn open_odb_raw(options: &Options,
txn_options: &OptimisticTransactionDBOptions,
path: &CxxString,
status: &mut BridgeStatus) -> UniquePtr<TDBBridge>;
}
}
pub use ffi::*;

@ -1,144 +1,20 @@
#[cxx::bridge] mod bridge;
mod ffi {
#[derive(Copy, Clone, Debug, Eq, PartialEq)] use bridge::*;
pub enum StatusBridgeCode {
OK = 0, use std::fmt::{Display, Formatter};
LOCK_ERROR = 1,
EXISTING_ERROR = 2,
NOT_FOUND_ERROR = 3,
}
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub struct BridgeStatus {
pub code: StatusCode,
pub subcode: StatusSubCode,
pub severity: StatusSeverity,
pub bridge_code: StatusBridgeCode,
}
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum StatusCode {
kOk = 0,
kNotFound = 1,
kCorruption = 2,
kNotSupported = 3,
kInvalidArgument = 4,
kIOError = 5,
kMergeInProgress = 6,
kIncomplete = 7,
kShutdownInProgress = 8,
kTimedOut = 9,
kAborted = 10,
kBusy = 11,
kExpired = 12,
kTryAgain = 13,
kCompactionTooLarge = 14,
kColumnFamilyDropped = 15,
kMaxCode,
}
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum StatusSubCode {
kNone = 0,
kMutexTimeout = 1,
kLockTimeout = 2,
kLockLimit = 3,
kNoSpace = 4,
kDeadlock = 5,
kStaleFile = 6,
kMemoryLimit = 7,
kSpaceLimit = 8,
kPathNotFound = 9,
KMergeOperandsInsufficientCapacity = 10,
kManualCompactionPaused = 11,
kOverwritten = 12,
kTxnNotPrepared = 13,
kIOFenced = 14,
kMaxSubCode,
}
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum StatusSeverity {
kNoError = 0,
kSoftError = 1,
kHardError = 2,
kFatalError = 3,
kUnrecoverableError = 4,
kMaxSeverity,
}
unsafe extern "C++" {
include!("cozorocks.h");
type StatusCode;
type StatusSubCode;
type StatusSeverity;
type PinnableSliceBridge;
fn as_bytes(self: &PinnableSliceBridge) -> &[u8];
type SliceBridge;
fn as_bytes(self: &SliceBridge) -> &[u8];
type ReadOptionsBridge;
fn new_read_options() -> UniquePtr<ReadOptionsBridge>;
fn do_set_verify_checksums(self: &ReadOptionsBridge, v: bool);
fn do_set_total_order_seek(self: &ReadOptionsBridge, v: bool);
type WriteOptionsBridge;
fn new_write_options() -> UniquePtr<WriteOptionsBridge>;
fn do_set_disable_wal(self: &WriteOptionsBridge, v: bool);
type OptionsBridge;
fn new_options() -> UniquePtr<OptionsBridge>;
fn do_prepare_for_bulk_load(self: &OptionsBridge);
fn do_increase_parallelism(self: &OptionsBridge);
fn do_optimize_level_style_compaction(self: &OptionsBridge);
fn do_set_create_if_missing(self: &OptionsBridge, v: bool);
fn do_set_comparator(self: &OptionsBridge, name: &str, compare: fn(&[u8], &[u8]) -> i8);
pub type ColumnFamilyHandle;
type DBBridge;
fn open_db_raw(options: &OptionsBridge, path: &CxxString, status: &mut BridgeStatus) -> UniquePtr<DBBridge>;
fn get_cf_handle_raw(self: &DBBridge, name: &CxxString) -> SharedPtr<ColumnFamilyHandle>;
fn write_raw(self: &DBBridge, options: &WriteOptionsBridge, updates: Pin<&mut WriteBatchBridge>, status: &mut BridgeStatus);
fn put_raw(self: &DBBridge, options: &WriteOptionsBridge, cf: &ColumnFamilyHandle, key: &[u8], val: &[u8], status: &mut BridgeStatus);
fn delete_raw(self: &DBBridge, options: &WriteOptionsBridge, cf: &ColumnFamilyHandle, key: &[u8], status: &mut BridgeStatus);
fn get_raw(self: &DBBridge, options: &ReadOptionsBridge, cf: &ColumnFamilyHandle, key: &[u8], status: &mut BridgeStatus) -> UniquePtr<PinnableSliceBridge>;
fn iterator_raw(self: &DBBridge, options: &ReadOptionsBridge, cf: &ColumnFamilyHandle) -> UniquePtr<IteratorBridge>;
fn create_column_family_raw(self: &DBBridge, options: &OptionsBridge, name: &CxxString, status: &mut BridgeStatus);
fn drop_column_family_raw(self: &DBBridge, name: &CxxString, status: &mut BridgeStatus);
fn get_column_family_names_raw(self: &DBBridge) -> UniquePtr<CxxVector<CxxString>>;
pub type IteratorBridge;
fn seek_to_first(self: &IteratorBridge);
fn seek_to_last(self: &IteratorBridge);
fn next(self: &IteratorBridge);
fn is_valid(self: &IteratorBridge) -> bool;
fn do_seek(self: &IteratorBridge, key: &[u8]);
fn do_seek_for_prev(self: &IteratorBridge, key: &[u8]);
fn key_raw(self: &IteratorBridge) -> UniquePtr<SliceBridge>;
fn value_raw(self: &IteratorBridge) -> UniquePtr<SliceBridge>;
fn status(self: &IteratorBridge) -> BridgeStatus;
pub type WriteBatchBridge;
fn new_write_batch_raw() -> UniquePtr<WriteBatchBridge>;
fn batch_put_raw(self: &WriteBatchBridge, cf: &ColumnFamilyHandle, key: &[u8], val: &[u8], status: &mut BridgeStatus);
fn batch_delete_raw(self: &WriteBatchBridge, cf: &ColumnFamilyHandle, key: &[u8], status: &mut BridgeStatus);
}
}
use std::fmt::Formatter;
use std::fmt::Debug; use std::fmt::Debug;
use std::path::Path; use std::ops::{Deref, DerefMut};
use cxx::{UniquePtr, SharedPtr, let_cxx_string}; use cxx::{let_cxx_string};
pub use ffi::BridgeStatus; pub use cxx::{UniquePtr, SharedPtr};
pub use ffi::StatusBridgeCode; pub use bridge::BridgeStatus;
pub use ffi::StatusCode; pub use bridge::StatusBridgeCode;
pub use ffi::StatusSubCode; pub use bridge::StatusCode;
pub use ffi::StatusSeverity; pub use bridge::StatusSubCode;
pub use ffi::IteratorBridge; pub use bridge::StatusSeverity;
use ffi::*; pub use bridge::Slice;
pub use bridge::PinnableSlice;
impl std::fmt::Display for BridgeStatus { impl std::fmt::Display for BridgeStatus {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
@ -146,344 +22,486 @@ impl std::fmt::Display for BridgeStatus {
} }
} }
impl std::error::Error for BridgeStatus {} #[derive(Debug)]
pub struct BridgeError {
type Result<T> = std::result::Result<T, BridgeStatus>; pub status: BridgeStatus,
}
pub type Options = UniquePtr<OptionsBridge>; impl Display for BridgeError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
Debug::fmt(self, f)
}
}
type ColumnFamilyHandle = SharedPtr<ffi::ColumnFamilyHandle>; impl std::error::Error for BridgeError {}
pub trait OptionsTrait { impl Default for BridgeStatus {
fn prepare_for_bulk_load(self) -> Self; fn default() -> Self {
fn increase_parallelism(self) -> Self; BridgeStatus {
fn optimize_level_style_compaction(self) -> Self; code: StatusCode::kOk,
fn set_create_if_missing(self, v: bool) -> Self; subcode: StatusSubCode::kNone,
fn set_comparator(self, name: &str, compare: fn(&[u8], &[u8]) -> i8) -> Self; severity: StatusSeverity::kNoError,
fn default() -> Self; bridge_code: StatusBridgeCode::OK,
}
}
} }
impl OptionsTrait for Options { impl BridgeStatus {
#[inline] fn check_err<T>(self, data: T) -> Result<T> {
fn prepare_for_bulk_load(self) -> Self { let err: Option<BridgeError> = self.into();
self.do_prepare_for_bulk_load(); match err {
self Some(e) => Err(e),
None => Ok(data)
}
} }
}
#[inline] impl From<BridgeStatus> for Option<BridgeError> {
fn increase_parallelism(self) -> Self { fn from(s: BridgeStatus) -> Self {
self.do_increase_parallelism(); if s.severity == StatusSeverity::kNoError && s.bridge_code == StatusBridgeCode::OK {
self None
} else {
Some(BridgeError { status: s })
}
} }
}
#[inline] pub type Result<T> = std::result::Result<T, BridgeError>;
fn optimize_level_style_compaction(self) -> Self {
self.do_optimize_level_style_compaction(); pub trait SlicePtr {
self fn as_bytes(&self) -> &[u8];
}
impl SlicePtr for UniquePtr<Slice> {
fn as_bytes(&self) -> &[u8] {
convert_slice_back(self)
} }
}
#[inline] impl SlicePtr for UniquePtr<PinnableSlice> {
fn set_create_if_missing(self, v: bool) -> Self { fn as_bytes(&self) -> &[u8] {
self.do_set_create_if_missing(v); convert_pinnable_slice_back(self)
self
} }
}
#[inline] pub struct RustComparatorPtr(UniquePtr<RustComparator>);
fn set_comparator(self, name: &str, compare: fn(&[u8], &[u8]) -> i8) -> Self {
self.do_set_comparator(name, compare); impl RustComparatorPtr {
self pub fn new(name: &str, cmp: fn(&[u8], &[u8]) -> i8) -> Self {
Self(new_rust_comparator(name, cmp))
} }
}
#[inline] impl Deref for RustComparatorPtr {
fn default() -> Self { type Target = UniquePtr<RustComparator>;
new_options()
fn deref(&self) -> &Self::Target {
&self.0
} }
} }
pub type ReadOptions = UniquePtr<ReadOptionsBridge>; pub struct OptionsPtr(UniquePtr<Options>);
pub trait ReadOptionsTrait { impl Deref for OptionsPtr {
fn set_total_order_seek(self, v: bool) -> Self; type Target = UniquePtr<Options>;
fn set_verify_checksums(self, v: bool) -> Self; fn deref(&self) -> &Self::Target {
fn default() -> Self; &self.0
}
}
impl DerefMut for OptionsPtr {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
} }
impl ReadOptionsTrait for ReadOptions { impl OptionsPtr {
fn set_total_order_seek(self, v: bool) -> Self { pub fn default() -> Self {
self.do_set_total_order_seek(v); Self(new_options())
}
pub fn prepare_for_bulk_load(&mut self) -> &mut Self {
prepare_for_bulk_load(self.pin_mut());
self self
} }
fn set_verify_checksums(self, v: bool) -> Self { pub fn increase_parallelism(&mut self) -> &mut Self {
self.do_set_verify_checksums(v); increase_parallelism(self.pin_mut());
self self
} }
pub fn optimize_level_style_compaction(&mut self) -> &mut Self {
fn default() -> Self { optimize_level_style_compaction(self.pin_mut());
new_read_options() self
}
pub fn set_create_if_missing(&mut self, v: bool) -> &mut Self {
set_create_if_missing(self.pin_mut(), v);
self
}
pub fn set_comparator(&mut self, cmp: &RustComparatorPtr) -> &mut Self {
set_comparator(self.pin_mut(), cmp);
self
} }
} }
pub type WriteOptions = UniquePtr<WriteOptionsBridge>;
pub trait WriteOptionsTrait { pub struct ReadOptionsPtr(UniquePtr<ReadOptions>);
fn set_disable_wal(self, v: bool) -> Self;
fn default() -> Self; impl Deref for ReadOptionsPtr {
type Target = UniquePtr<ReadOptions>;
fn deref(&self) -> &Self::Target {
&self.0
}
} }
impl WriteOptionsTrait for WriteOptions { impl DerefMut for ReadOptionsPtr {
#[inline] fn deref_mut(&mut self) -> &mut Self::Target {
fn set_disable_wal(self, v: bool) -> Self { &mut self.0
self.do_set_disable_wal(v); }
}
impl ReadOptionsPtr {
pub fn default() -> Self {
Self(new_read_options())
}
pub fn set_verify_checksums(&mut self, v: bool) -> &mut Self {
set_verify_checksums(self.pin_mut(), v);
self self
} }
fn default() -> Self { pub fn set_total_order_seek(&mut self, v: bool) -> &mut Self {
new_write_options() set_total_order_seek(self.pin_mut(), v);
self
} }
} }
pub struct PinnableSlice(UniquePtr<PinnableSliceBridge>); pub struct WriteOptionsPtr(UniquePtr<WriteOptions>);
impl AsRef<[u8]> for PinnableSlice { impl Deref for WriteOptionsPtr {
#[inline] type Target = UniquePtr<WriteOptions>;
fn as_ref(&self) -> &[u8] { fn deref(&self) -> &Self::Target {
self.0.as_bytes() &self.0
} }
} }
pub struct Slice(UniquePtr<SliceBridge>); impl DerefMut for WriteOptionsPtr {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl AsRef<[u8]> for Slice { impl WriteOptionsPtr {
#[inline] pub fn default() -> Self {
fn as_ref(&self) -> &[u8] { Self(new_write_options())
self.0.as_bytes() }
pub fn set_disable_wal(&mut self, v: bool) -> &mut Self {
set_disable_wal(self.pin_mut(), v);
self
} }
} }
pub struct TransactionOptionsPtr(UniquePtr<TransactionOptions>);
pub type DBIterator = UniquePtr<IteratorBridge>; impl Deref for TransactionOptionsPtr {
type Target = UniquePtr<TransactionOptions>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
pub trait IteratorImpl { impl DerefMut for TransactionOptionsPtr {
fn seek(&self, key: impl AsRef<[u8]>); fn deref_mut(&mut self) -> &mut Self::Target {
fn seek_for_prev(&self, key: impl AsRef<[u8]>); &mut self.0
fn key(&self) -> Slice; }
fn value(&self) -> Slice;
} }
impl IteratorImpl for IteratorBridge { impl TransactionOptionsPtr {
#[inline] pub fn default() -> Self {
fn seek(&self, key: impl AsRef<[u8]>) { Self(new_transaction_options())
self.do_seek(key.as_ref());
} }
#[inline] pub fn set_deadlock_detect(&mut self, v: bool) -> &mut Self {
fn seek_for_prev(&self, key: impl AsRef<[u8]>) { set_deadlock_detect(self.pin_mut(), v);
self.do_seek_for_prev(key.as_ref()) self
} }
#[inline] }
fn key(&self) -> Slice {
Slice(self.key_raw()) pub struct OptimisticTransactionOptionsPtr(UniquePtr<OptimisticTransactionOptions>);
impl Deref for OptimisticTransactionOptionsPtr {
type Target = UniquePtr<OptimisticTransactionOptions>;
fn deref(&self) -> &Self::Target {
&self.0
} }
#[inline] }
fn value(&self) -> Slice {
Slice(self.value_raw()) impl DerefMut for OptimisticTransactionOptionsPtr {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
} }
} }
fn get_path_bytes(path: &std::path::Path) -> Vec<u8> { impl OptimisticTransactionOptionsPtr {
#[cfg(unix)] pub fn new(cmp: &RustComparator) -> Self {
{ Self(new_optimistic_transaction_options(cmp))
use std::os::unix::ffi::OsStrExt;
path.as_os_str().as_bytes().to_vec()
} }
}
#[cfg(not(unix))] pub struct TransactionDBOptionsPtr(UniquePtr<TransactionDBOptions>);
{ path.to_string_lossy().to_string().as_bytes().to_vec() }
impl Deref for TransactionDBOptionsPtr {
type Target = UniquePtr<TransactionDBOptions>;
fn deref(&self) -> &Self::Target {
&self.0
}
} }
impl Default for BridgeStatus { impl DerefMut for TransactionDBOptionsPtr {
#[inline] fn deref_mut(&mut self) -> &mut Self::Target {
fn default() -> Self { &mut self.0
Self {
code: StatusCode::kOk,
subcode: StatusSubCode::kNone,
severity: StatusSeverity::kNoError,
bridge_code: StatusBridgeCode::OK,
}
} }
} }
pub struct DB { impl TransactionDBOptionsPtr {
inner: UniquePtr<DBBridge>, pub fn default() -> Self {
pub path: Vec<u8>, Self(new_tdb_options())
pub options: Options, }
pub default_read_options: ReadOptions,
pub default_write_options: WriteOptions,
} }
unsafe impl Send for DB {} pub struct OptimisticTransactionDBOptionsPtr(UniquePtr<OptimisticTransactionDBOptions>);
unsafe impl Sync for DB {} impl Deref for OptimisticTransactionDBOptionsPtr {
type Target = UniquePtr<OptimisticTransactionDBOptions>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DB { impl DerefMut for OptimisticTransactionDBOptionsPtr {
pub fn open(options: Options, path: &Path) -> Result<DB> { fn deref_mut(&mut self) -> &mut Self::Target {
let path = get_path_bytes(path); &mut self.0
let_cxx_string!(cpp_path = path.clone());
let mut status = BridgeStatus::default();
let bridge = open_db_raw(
&options,
&cpp_path,
&mut status,
);
if status.code == StatusCode::kOk {
Ok(DB {
inner: bridge,
path,
options,
default_read_options: ReadOptions::default(),
default_write_options: WriteOptions::default(),
})
} else {
Err(status)
}
} }
}
pub fn get_cf_handle(&self, name: impl AsRef<str>) -> Result<ColumnFamilyHandle> { impl OptimisticTransactionDBOptionsPtr {
let_cxx_string!(name = name.as_ref()); pub fn default() -> Self {
let ret = self.inner.get_cf_handle_raw(&name); Self(new_odb_options())
if ret.is_null() {
Err(BridgeStatus {
code: StatusCode::kMaxCode,
subcode: StatusSubCode::kMaxSubCode,
severity: StatusSeverity::kSoftError,
bridge_code: StatusBridgeCode::NOT_FOUND_ERROR,
})
} else {
Ok(ret)
}
} }
}
pub struct IteratorPtr(UniquePtr<IteratorBridge>);
#[inline] impl Deref for IteratorPtr {
pub fn iterator(&self, cf: &ColumnFamilyHandle, options: Option<&ReadOptions>) -> DBIterator { type Target = UniquePtr<IteratorBridge>;
self.inner.iterator_raw(options.unwrap_or(&self.default_read_options), cf) fn deref(&self) -> &Self::Target {
&self.0
} }
}
pub fn create_column_family(&self, name: impl AsRef<str>) -> Result<()> { impl IteratorPtr {
let_cxx_string!(name = name.as_ref()); pub fn to_first(&self) {
let mut status = BridgeStatus::default(); IteratorBridge::seek_to_first(self)
self.inner.create_column_family_raw(&self.options, &name, &mut status); }
if status.code == StatusCode::kOk { pub fn to_last(&self) {
Ok(()) IteratorBridge::seek_to_last(self)
}
pub fn next(&self) {
IteratorBridge::next(self)
}
pub fn is_valid(&self) -> bool {
IteratorBridge::is_valid(self)
}
pub fn seek(&self, key: impl AsRef<[u8]>) {
IteratorBridge::do_seek(self, key.as_ref())
}
pub fn seek_for_prev(&self, key: impl AsRef<[u8]>) {
IteratorBridge::do_seek_for_prev(self, key.as_ref())
}
pub fn key(&self) -> UniquePtr<Slice> {
IteratorBridge::key_raw(self)
}
pub fn val(&self) -> UniquePtr<Slice> {
IteratorBridge::value_raw(self)
}
pub fn status(&self) -> BridgeStatus {
IteratorBridge::status(self)
}
pub fn iter(&self) -> KVIterator {
KVIterator { it: self }
}
pub fn keys(&self) -> KeyIterator {
KeyIterator { it: self }
}
}
pub struct KVIterator<'a> {
it: &'a IteratorPtr,
}
impl Iterator for KVIterator<'_> {
type Item = (UniquePtr<Slice>, UniquePtr<Slice>);
fn next(&mut self) -> Option<Self::Item> {
if self.it.is_valid() {
let ret = (self.it.key(), self.it.val());
self.next();
Some(ret)
} else { } else {
Err(status) None
} }
} }
}
pub fn drop_column_family(&self, name: impl AsRef<str>) -> Result<()> {
let_cxx_string!(name = name.as_ref()); pub struct KeyIterator<'a> {
let mut status = BridgeStatus::default(); it: &'a IteratorPtr,
self.inner.drop_column_family_raw(&name, &mut status); }
if status.code == StatusCode::kOk {
Ok(()) impl Iterator for KeyIterator<'_> {
type Item = UniquePtr<Slice>;
fn next(&mut self) -> Option<Self::Item> {
if self.it.is_valid() {
let ret = self.it.key();
self.next();
Some(ret)
} else { } else {
Err(status) None
} }
} }
}
pub struct TransactionPtr(UniquePtr<TransactionBridge>);
pub fn all_cf_names(&self) -> Vec<String> { impl Deref for TransactionPtr {
self.inner.get_column_family_names_raw().iter().map(|v| v.to_string_lossy().to_string()).collect() type Target = UniquePtr<TransactionBridge>;
fn deref(&self) -> &Self::Target {
&self.0
} }
}
#[inline]
pub fn get(&self, key: impl AsRef<[u8]>, cf: &ColumnFamilyHandle, options: Option<&ReadOptions>) -> Result<Option<PinnableSlice>> { impl TransactionPtr {
pub fn set_snapshot(&self) {
TransactionBridge::set_snapshot(self)
}
pub fn commit(&self) -> Result<()> {
let mut status = BridgeStatus::default(); let mut status = BridgeStatus::default();
let slice = self.inner.get_raw(options.unwrap_or(&self.default_read_options), cf, key.as_ref(), &mut status); TransactionBridge::commit(self, &mut status);
match status.code { status.check_err(())
StatusCode::kOk => Ok(Some(PinnableSlice(slice))),
StatusCode::kNotFound => Ok(None),
_ => Err(status)
}
} }
pub fn rollback(&self) -> Result<()> {
#[inline] let mut status = BridgeStatus::default();
pub fn put(&self, key: impl AsRef<[u8]>, val: impl AsRef<[u8]>, cf: &ColumnFamilyHandle, options: Option<&WriteOptions>) -> Result<BridgeStatus> { TransactionBridge::rollback(self, &mut status);
status.check_err(())
}
pub fn set_savepoint(&self) {
TransactionBridge::set_savepoint(self);
}
pub fn rollback_to_savepoint(&self) -> Result<()> {
let mut status = BridgeStatus::default(); let mut status = BridgeStatus::default();
self.inner.put_raw(options.unwrap_or(&self.default_write_options), cf, TransactionBridge::rollback_to_savepoint(self, &mut status);
key.as_ref(), val.as_ref(), status.check_err(())
&mut status); }
if status.code == StatusCode::kOk { pub fn pop_savepoint(&self) -> Result<()> {
Ok(status) let mut status = BridgeStatus::default();
TransactionBridge::pop_savepoint(self, &mut status);
status.check_err(())
}
pub fn get(&self, transact: bool, cf: &ColumnFamilyHandle, key: impl AsRef<[u8]>) -> Result<UniquePtr<PinnableSlice>> {
let mut status = BridgeStatus::default();
if transact {
let ret = self.get_txn(cf, key.as_ref(), &mut status);
status.check_err(ret)
} else { } else {
Err(status) let ret = self.get_raw(cf, key.as_ref(), &mut status);
status.check_err(ret)
} }
} }
pub fn get_for_update(&self, cf: &ColumnFamilyHandle, key: impl AsRef<[u8]>) -> Result<UniquePtr<PinnableSlice>> {
#[inline]
pub fn delete(&self, key: impl AsRef<[u8]>, cf: &ColumnFamilyHandle, options: Option<&WriteOptions>) -> Result<BridgeStatus> {
let mut status = BridgeStatus::default(); let mut status = BridgeStatus::default();
self.inner.delete_raw(options.unwrap_or(&self.default_write_options), cf, let ret = self.get_for_update_txn(cf, key.as_ref(), &mut status);
key.as_ref(), status.check_err(ret)
&mut status); }
if status.code == StatusCode::kOk { pub fn del(&self, transact: bool, cf: &ColumnFamilyHandle, key: impl AsRef<[u8]>) -> Result<()> {
Ok(status) let mut status = BridgeStatus::default();
if transact {
let ret = self.del_txn(cf, key.as_ref(), &mut status);
status.check_err(ret)
} else { } else {
Err(status) let ret = self.del_raw(cf, key.as_ref(), &mut status);
status.check_err(ret)
} }
} }
pub fn put(&self, transact: bool, cf: &ColumnFamilyHandle, key: impl AsRef<[u8]>, val: impl AsRef<[u8]>) -> Result<()> {
#[inline]
pub fn write(&self, mut updates: WriteBatch, options: Option<&WriteOptions>) -> Result<BridgeStatus> {
let mut status = BridgeStatus::default(); let mut status = BridgeStatus::default();
self.inner.write_raw(options.unwrap_or(&self.default_write_options), if transact {
updates.pin_mut(), let ret = self.put_txn(cf, key.as_ref(), val.as_ref(), &mut status);
&mut status); status.check_err(ret)
if status.code == StatusCode::kOk {
Ok(status)
} else { } else {
Err(status) let ret = self.put_raw(cf, key.as_ref(), val.as_ref(), &mut status);
status.check_err(ret)
}
}
pub fn iterator(&self, transact: bool, cf: &ColumnFamilyHandle) -> IteratorPtr {
if transact {
IteratorPtr(self.iterator_txn(cf))
} else {
IteratorPtr(self.iterator_raw(cf))
} }
} }
} }
pub type WriteBatch = UniquePtr<WriteBatchBridge>; pub struct DBPtr(UniquePtr<TDBBridge>);
pub trait WriteBatchWrapperImp { impl Deref for DBPtr {
fn default() -> WriteBatch; type Target = UniquePtr<TDBBridge>;
}
impl WriteBatchWrapperImp for WriteBatch { fn deref(&self) -> &Self::Target {
fn default() -> WriteBatch { &self.0
new_write_batch_raw()
} }
} }
pub trait WriteBatchImpl { unsafe impl Send for DBPtr {}
fn put(&self, key: impl AsRef<[u8]>, val: impl AsRef<[u8]>, cf: &ColumnFamilyHandle) -> Result<BridgeStatus>;
fn delete(&self, key: impl AsRef<[u8]>, cf: &ColumnFamilyHandle) -> Result<BridgeStatus>;
}
impl WriteBatchImpl for WriteBatchBridge { unsafe impl Sync for DBPtr {}
#[inline]
fn put(&self, key: impl AsRef<[u8]>, val: impl AsRef<[u8]>, cf: &ColumnFamilyHandle) -> Result<BridgeStatus> { impl DBPtr {
pub fn open_pessimistic(options: &Options, t_options: &TransactionDBOptions, path: impl AsRef<str>) -> Result<Self> {
let_cxx_string!(cname = path.as_ref());
let mut status = BridgeStatus::default(); let mut status = BridgeStatus::default();
self.batch_put_raw(cf, let ret = open_tdb_raw(options, t_options, &cname, &mut status);
key.as_ref(), val.as_ref(), status.check_err(Self(ret))
&mut status);
if status.code == StatusCode::kOk {
Ok(status)
} else {
Err(status)
}
} }
#[inline]
fn delete(&self, key: impl AsRef<[u8]>, cf: &ColumnFamilyHandle) -> Result<BridgeStatus> { pub fn open_optimistic(options: &Options, t_options: &OptimisticTransactionDBOptions, path: impl AsRef<str>) -> Result<Self> {
let_cxx_string!(cname = path.as_ref());
let mut status = BridgeStatus::default(); let mut status = BridgeStatus::default();
self.batch_delete_raw(cf, let ret = open_odb_raw(options, t_options, &cname, &mut status);
key.as_ref(), status.check_err(Self(ret))
&mut status); }
if status.code == StatusCode::kOk {
Ok(status) pub fn get_cf(&self, name: impl AsRef<str>) -> Option<SharedPtr<ColumnFamilyHandle>> {
let_cxx_string!(cname = name.as_ref());
let spt = self.get_cf_handle_raw(&cname);
if spt.is_null() {
None
} else { } else {
Err(status) Some(spt)
} }
} }
pub fn create_cf(&self, options: &Options, name: impl AsRef<str>) -> Result<()> {
let_cxx_string!(name = name.as_ref());
let mut status = BridgeStatus::default();
self.create_column_family_raw(options, &name, &mut status);
status.check_err(())
}
pub fn drop_cf(&self, name: impl AsRef<str>) -> Result<()> {
let_cxx_string!(name = name.as_ref());
let mut status = BridgeStatus::default();
self.drop_column_family_raw(&name, &mut status);
status.check_err(())
}
pub fn cf_names(&self) -> Vec<String> {
self.get_column_family_names_raw().iter().map(|v| v.to_string_lossy().to_string()).collect()
}
} }
Loading…
Cancel
Save