diff --git a/cozorocks/Cargo.toml b/cozorocks/Cargo.toml index 46ba526d..4cab17c2 100644 --- a/cozorocks/Cargo.toml +++ b/cozorocks/Cargo.toml @@ -6,7 +6,10 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +#autocxx = "0.21.2" cxx = "1.0.66" [build-dependencies] -cxx-build = "1.0.66" \ No newline at end of file +#autocxx-build = "0.21.2" +cxx-build = "1.0.66" +#miette = { version = "4.3", features = ["fancy"] } \ No newline at end of file diff --git a/cozorocks/bridge/cozorocks.cc b/cozorocks/bridge/cozorocks.cc index ac3c2353..4811e02f 100644 --- a/cozorocks/bridge/cozorocks.cc +++ b/cozorocks/bridge/cozorocks.cc @@ -3,7 +3,7 @@ // #include "cozorocks.h" -#include "cozorocks/src/lib.rs.h" +#include "cozorocks/src/bridge.rs.h" void write_status_impl(BridgeStatus &status, StatusCode code, StatusSubCode subcode, StatusSeverity severity, int bridge_code) { diff --git a/cozorocks/bridge/cozorocks.h b/cozorocks/bridge/cozorocks.h index 2d209f5d..7ead1b1a 100644 --- a/cozorocks/bridge/cozorocks.h +++ b/cozorocks/bridge/cozorocks.h @@ -13,6 +13,7 @@ #include "rocksdb/options.h" #include "rocksdb/utilities/transaction.h" #include "rocksdb/utilities/transaction_db.h" +#include "rocksdb/utilities/optimistic_transaction_db.h" typedef std::shared_mutex Lock; @@ -28,6 +29,7 @@ using std::make_shared; using std::string; using std::vector; using std::unordered_map; +using std::tuple; struct BridgeStatus; @@ -43,26 +45,34 @@ inline rust::Slice convert_slice_back(const Slice &s) { return rust::Slice(reinterpret_cast(s.data()), s.size()); } -struct ReadOptionsBridge { - mutable ReadOptions inner; - inline void do_set_verify_checksums(bool v) const { - inner.verify_checksums = v; - } +inline rust::Slice convert_pinnable_slice_back(const PinnableSlice &s) { + return rust::Slice(reinterpret_cast(s.data()), s.size()); +} + +void write_status_impl(BridgeStatus &status, StatusCode code, StatusSubCode subcode, StatusSeverity severity, + int bridge_code); - inline void do_set_total_order_seek(bool v) const { - inner.total_order_seek = v; +inline void write_status(Status &&rstatus, BridgeStatus &status) { + if (rstatus.code() != StatusCode::kOk || rstatus.subcode() != StatusSubCode::kNoSpace || + rstatus.severity() != StatusSeverity::kNoError) { + write_status_impl(status, rstatus.code(), rstatus.subcode(), rstatus.severity(), 0); } -}; +} -struct WriteOptionsBridge { - mutable WriteOptions inner; +void set_verify_checksums(ReadOptions &options, const bool v) { + options.verify_checksums = v; +} + +void set_total_order_seek(ReadOptions &options, const bool v) { + options.total_order_seek = v; +} + + +void set_disable_wal(WriteOptions &options, const bool v) { + options.disableWAL = v; +} -public: - inline void do_set_disable_wal(bool v) const { - inner.disableWAL = v; - } -}; typedef rust::Fn, rust::Slice)> RustComparatorFn; @@ -76,6 +86,7 @@ public: return "RustComparator"; } + void FindShortestSeparator(std::string *, const rocksdb::Slice &) const {} void FindShortSuccessor(std::string *) const {} @@ -92,73 +103,44 @@ public: mutable RustComparatorFn rust_compare; }; -struct OptionsBridge { - mutable Options inner; - mutable RustComparator cmp_obj; - - inline void do_prepare_for_bulk_load() const { - inner.PrepareForBulkLoad(); - } +inline unique_ptr new_rust_comparator(rust::Str name, RustComparatorFn f) { + auto ret = make_unique(); + ret->set_name(name); + ret->set_fn(f); + return ret; +} - inline void do_increase_parallelism() const { - inner.IncreaseParallelism(); - } - inline void do_optimize_level_style_compaction() const { - inner.OptimizeLevelStyleCompaction(); - }; +inline void prepare_for_bulk_load(Options &inner) { + inner.PrepareForBulkLoad(); +} - inline void do_set_create_if_missing(bool v) const { - inner.create_if_missing = v; - } +inline void increase_parallelism(Options &inner) { + inner.IncreaseParallelism(); +} - inline void do_set_comparator(rust::Str name, RustComparatorFn f) const { - cmp_obj = RustComparator(); - cmp_obj.set_name(name); - cmp_obj.set_fn(f); - inner.comparator = &cmp_obj; - } +inline void optimize_level_style_compaction(Options &inner) { + inner.OptimizeLevelStyleCompaction(); }; -inline std::unique_ptr new_read_options() { - return std::unique_ptr(new ReadOptionsBridge); +inline void set_create_if_missing(Options &inner, bool v) { + inner.create_if_missing = v; } -inline std::unique_ptr new_write_options() { - return std::unique_ptr(new WriteOptionsBridge); +inline void set_comparator(Options &inner, const RustComparator &cmp_obj) { + inner.comparator = &cmp_obj; } -inline std::unique_ptr new_options() { - return std::unique_ptr(new OptionsBridge); +inline std::unique_ptr new_read_options() { + return std::make_unique(); } +inline std::unique_ptr new_write_options() { + return std::make_unique(); +} -struct PinnableSliceBridge { - PinnableSlice inner; - - inline rust::Slice as_bytes() const { - return convert_slice_back(inner); - } -}; - -struct SliceBridge { - Slice inner; - - SliceBridge(Slice &&s) : inner(s) {} - - inline rust::Slice as_bytes() const { - return convert_slice_back(inner); - } -}; - -void write_status_impl(BridgeStatus &status, StatusCode code, StatusSubCode subcode, StatusSeverity severity, - int bridge_code); - -inline void write_status(Status &&rstatus, BridgeStatus &status) { - if (rstatus.code() != StatusCode::kOk || rstatus.subcode() != StatusSubCode::kNoSpace || - rstatus.severity() != StatusSeverity::kNoError) { - write_status_impl(status, rstatus.code(), rstatus.subcode(), rstatus.severity(), 0); - } +inline std::unique_ptr new_options() { + return std::make_unique(); } struct IteratorBridge { @@ -192,79 +174,139 @@ struct IteratorBridge { inner->SeekForPrev(k); } - inline std::unique_ptr key_raw() const { - return std::make_unique(inner->key()); + inline std::unique_ptr key_raw() const { + return std::make_unique(inner->key()); } - inline std::unique_ptr value_raw() const { - return std::make_unique(inner->value()); + inline std::unique_ptr value_raw() const { + return std::make_unique(inner->value()); } BridgeStatus status() const; }; -struct WriteBatchBridge { - mutable WriteBatch inner; +inline unique_ptr new_transaction_options() { + return make_unique(); +} + +inline void set_deadlock_detect(TransactionOptions &inner, bool v) { + inner.deadlock_detect = v; +} + +inline unique_ptr new_optimistic_transaction_options(const RustComparator &compare) { + auto ret = make_unique(); + ret->cmp = &compare; + return ret; +} + +struct TransactionBridge { + DB *raw_db; + unique_ptr inner; + mutable unique_ptr t_ops; // Put here to make sure ownership works + mutable unique_ptr o_ops; // same as above + mutable unique_ptr r_ops; + mutable unique_ptr raw_r_ops; + mutable unique_ptr w_ops; + mutable unique_ptr raw_w_ops; + + inline void set_snapshot() const { + inner->SetSnapshot(); + r_ops->snapshot = inner->GetSnapshot(); + } + + inline void commit(BridgeStatus &status) const { + write_status(inner->Commit(), status); + r_ops->snapshot = nullptr; + } + + inline void rollback(BridgeStatus &status) const { + write_status(inner->Rollback(), status); + } - inline void batch_put_raw( + inline void set_savepoint() const { + inner->SetSavePoint(); + } + + inline void rollback_to_savepoint(BridgeStatus &status) const { + write_status(inner->RollbackToSavePoint(), status); + } + + inline void pop_savepoint(BridgeStatus &status) const { + write_status(inner->PopSavePoint(), status); + } + + inline std::unique_ptr get_txn( const ColumnFamilyHandle &cf, rust::Slice key, - rust::Slice val, BridgeStatus &status ) const { + auto pinnable_val = std::make_unique(); write_status( - inner.Put(const_cast(&cf), - convert_slice(key), - convert_slice(val)), + inner->Get(*r_ops, + const_cast(&cf), + convert_slice(key), + &*pinnable_val), status ); + return pinnable_val; } - inline void batch_delete_raw( + inline std::unique_ptr get_for_update_txn( const ColumnFamilyHandle &cf, rust::Slice key, BridgeStatus &status ) const { + auto pinnable_val = std::make_unique(); write_status( - inner.Delete(const_cast(&cf), - convert_slice(key)), + inner->GetForUpdate(*r_ops, + const_cast(&cf), + convert_slice(key), + &*pinnable_val), status ); + return pinnable_val; } -}; - -inline unique_ptr new_write_batch_raw() { - return make_unique(); -} - -struct DBBridge { - mutable unique_ptr db; - mutable unordered_map> handles; - mutable Lock handle_lock; - - DBBridge(DB *db_, - unordered_map> &&handles_) : db(db_), handles(handles_) {} + inline std::unique_ptr get_raw( + const ColumnFamilyHandle &cf, + rust::Slice key, + BridgeStatus &status + ) const { + auto pinnable_val = std::make_unique(); + write_status( + raw_db->Get(*r_ops, + const_cast(&cf), + convert_slice(key), + &*pinnable_val), + status + ); + return pinnable_val; + } - inline shared_ptr get_cf_handle_raw(const string &name) const { - ReadLock r_lock(handle_lock); - try { - return handles.at(name); - } catch (const std::out_of_range &) { - return shared_ptr(nullptr); - } + inline void put_txn( + const ColumnFamilyHandle &cf, + rust::Slice key, + rust::Slice val, + BridgeStatus &status + ) const { + write_status( + inner->Put(const_cast(&cf), + convert_slice(key), + convert_slice(val)), + status + ); } inline void put_raw( - const WriteOptionsBridge &options, const ColumnFamilyHandle &cf, rust::Slice key, rust::Slice val, BridgeStatus &status ) const { write_status( - db->Put(options.inner, + raw_db->Put( + *raw_w_ops, const_cast(&cf), convert_slice(key), convert_slice(val)), @@ -272,63 +314,138 @@ struct DBBridge { ); } - inline void delete_raw( - const WriteOptionsBridge &options, + inline void del_txn( const ColumnFamilyHandle &cf, rust::Slice key, BridgeStatus &status ) const { write_status( - db->Delete(options.inner, - const_cast(&cf), - convert_slice(key)), + inner->Delete(const_cast(&cf), + convert_slice(key)), status ); } - inline void write_raw( - const WriteOptionsBridge &options, - WriteBatchBridge &updates, - BridgeStatus &status - ) const { - write_status(db->Write(options.inner, &updates.inner), status); - } - - inline std::unique_ptr get_raw( - const ReadOptionsBridge &options, + inline void del_raw( const ColumnFamilyHandle &cf, rust::Slice key, BridgeStatus &status ) const { - auto pinnable_val = std::make_unique(); write_status( - db->Get(options.inner, + raw_db->Delete( + *raw_w_ops, const_cast(&cf), - convert_slice(key), - &pinnable_val->inner), + convert_slice(key)), status ); - return pinnable_val; + } + + inline std::unique_ptr iterator_txn( + const ColumnFamilyHandle &cf) const { + return std::make_unique( + inner->GetIterator(*r_ops, const_cast(&cf))); } inline std::unique_ptr iterator_raw( - const ReadOptionsBridge &options, const ColumnFamilyHandle &cf) const { - return std::make_unique(db->NewIterator(options.inner, const_cast(&cf))); + return std::make_unique( + raw_db->NewIterator(*raw_r_ops, const_cast(&cf))); + } +}; + +inline tuple, vector> +get_cf_data(const Options &options, + const string &path) { + auto cf_names = std::vector(); + DB::ListColumnFamilies(options, path, &cf_names); + if (cf_names.empty()) { + cf_names.push_back(kDefaultColumnFamilyName); + } + + std::vector column_families; + + for (auto el: cf_names) { + column_families.push_back(ColumnFamilyDescriptor( + el, options)); + } + return std::make_tuple(cf_names, column_families); +} + +struct TDBBridge { + mutable unique_ptr db; + mutable TransactionDB *tdb; + mutable OptimisticTransactionDB *odb; + mutable unordered_map> handles; + mutable Lock handle_lock; + bool is_odb; + + TDBBridge(StackableDB *db_, + TransactionDB *tdb_, + OptimisticTransactionDB *odb_, + unordered_map> &&handles_) : + db(db_), tdb(tdb_), odb(odb_), handles(handles_), handle_lock() { + is_odb = (tdb_ == nullptr); + } + + inline unique_ptr begin_t_transaction( + unique_ptr w_ops, + unique_ptr raw_w_ops, + unique_ptr r_ops, + unique_ptr raw_r_ops, + unique_ptr txn_options) const { + auto ret = make_unique(); + ret->raw_db = tdb; + ret->r_ops = std::move(r_ops); + ret->w_ops = std::move(w_ops); + ret->raw_r_ops = std::move(raw_r_ops); + ret->raw_w_ops = std::move(raw_w_ops); + ret->t_ops = std::move(txn_options); + Transaction *txn = tdb->BeginTransaction(*ret->w_ops, *ret->t_ops); + ret->inner = unique_ptr(txn); + return ret; } - inline void create_column_family_raw(const OptionsBridge &options, const string &name, BridgeStatus &status) const { + inline unique_ptr begin_o_transaction( + unique_ptr w_ops, + unique_ptr raw_w_ops, + unique_ptr r_ops, + unique_ptr raw_r_ops, + unique_ptr txn_options) const { + auto ret = make_unique(); + ret->raw_db = odb; + ret->r_ops = std::move(r_ops); + ret->w_ops = std::move(w_ops); + ret->raw_r_ops = std::move(raw_r_ops); + ret->raw_w_ops = std::move(raw_w_ops); + ret->o_ops = std::move(txn_options); + Transaction *txn = odb->BeginTransaction(*ret->w_ops, *ret->o_ops); + ret->inner = unique_ptr(txn); + return ret; + } + + inline shared_ptr get_cf_handle_raw(const string &name) const { + ReadLock r_lock(handle_lock); + try { + return handles.at(name); + } catch (const std::out_of_range &) { + return shared_ptr(nullptr); + } + } + + inline void + create_column_family_raw(const Options &options, const string &name, BridgeStatus &status) const { { ReadLock r_lock(handle_lock); if (handles.find(name) != handles.end()) { - write_status_impl(status, StatusCode::kMaxCode, StatusSubCode::kMaxSubCode, StatusSeverity::kSoftError, + write_status_impl(status, StatusCode::kMaxCode, StatusSubCode::kMaxSubCode, + StatusSeverity::kSoftError, 2); return; } } WriteLock w_lock(handle_lock); ColumnFamilyHandle *handle; - auto s = db->CreateColumnFamily(options.inner, name, &handle); + auto s = db->CreateColumnFamily(options, name, &handle); write_status(std::move(s), status); handles[name] = shared_ptr(handle); } @@ -341,7 +458,8 @@ struct DBBridge { handles.erase(cf_it); write_status(std::move(s), status); } else { - write_status_impl(status, StatusCode::kMaxCode, StatusSubCode::kMaxSubCode, StatusSeverity::kSoftError, 3); + write_status_impl(status, StatusCode::kMaxCode, StatusSubCode::kMaxSubCode, StatusSeverity::kSoftError, + 3); } // When should we call DestroyColumnFamilyHandle? } @@ -356,31 +474,33 @@ struct DBBridge { } }; +inline unique_ptr new_tdb_options() { + return make_unique(); +} -inline std::unique_ptr -open_db_raw(const OptionsBridge &options, - const string &path, - BridgeStatus &status) { - auto cf_names = std::vector(); - DB::ListColumnFamilies(options.inner, path, &cf_names); - if (cf_names.empty()) { - cf_names.push_back(kDefaultColumnFamilyName); - } +inline unique_ptr new_odb_options() { + return make_unique(); +} - std::vector column_families; +inline unique_ptr +open_tdb_raw(const Options &options, + const TransactionDBOptions &txn_db_options, + const string &path, + BridgeStatus &status) { + auto cf_info = get_cf_data(options, path); + auto cf_names = std::get<0>(cf_info); + auto column_families = std::get<1>(cf_info); - for (auto el: cf_names) { - column_families.push_back(ColumnFamilyDescriptor( - el, options.inner)); - } std::vector handles; + TransactionDB *txn_db; - DB *db_ptr; - Status s = DB::Open(options.inner, path, column_families, &handles, &db_ptr); - + Status s = TransactionDB::Open(options, txn_db_options, path, + column_families, &handles, + &txn_db); auto ok = s.ok(); write_status(std::move(s), status); + unordered_map> handle_map; if (ok) { assert(handles.size() == cf_names.size()); @@ -388,5 +508,38 @@ open_db_raw(const OptionsBridge &options, handle_map[cf_names[i]] = shared_ptr(handles[i]); } } - return std::make_unique(db_ptr, std::move(handle_map)); + + return make_unique(txn_db, txn_db, nullptr, std::move(handle_map)); } + + +inline unique_ptr +open_odb_raw(const Options &options, + const OptimisticTransactionDBOptions &txn_db_options, + const string &path, + BridgeStatus &status) { + auto cf_info = get_cf_data(options, path); + auto cf_names = std::get<0>(cf_info); + auto column_families = std::get<1>(cf_info); + + + std::vector handles; + OptimisticTransactionDB *txn_db; + + Status s = OptimisticTransactionDB::Open(options, txn_db_options, path, + column_families, &handles, + &txn_db); + auto ok = s.ok(); + write_status(std::move(s), status); + + + unordered_map> handle_map; + if (ok) { + assert(handles.size() == cf_names.size()); + for (size_t i = 0; i < handles.size(); ++i) { + handle_map[cf_names[i]] = shared_ptr(handles[i]); + } + } + + return make_unique(txn_db, nullptr, txn_db, std::move(handle_map)); +} \ No newline at end of file diff --git a/cozorocks/build.rs b/cozorocks/build.rs index b4966d26..77e5f7cd 100644 --- a/cozorocks/build.rs +++ b/cozorocks/build.rs @@ -1,11 +1,19 @@ fn main() { - cxx_build::bridge("src/lib.rs") + cxx_build::bridge("src/bridge.rs") .file("bridge/cozorocks.cc") .include("../deps/include") .include("bridge") .flag_if_supported("-std=c++17") - .compile("cozorocks"); + .compile("cozorocks-cxx"); + // let mut b = autocxx_build::Builder::new( + // "src/bridge.rs", + // &["../deps/include", "bridge"]) + // .extra_clang_args(&["-std=c++17"]) + // .build()?; + // // This assumes all your C++ bindings are in main.rs + // b.flag_if_supported("-std=c++17") + // .compile("cozorocks-autocxx"); // arbitrary library name, pick anything println!("cargo:rustc-link-search=deps/lib/"); println!("cargo:rustc-link-lib=rocksdb"); println!("cargo:rustc-link-lib=z"); @@ -13,7 +21,7 @@ fn main() { println!("cargo:rustc-link-lib=lz4"); println!("cargo:rustc-link-lib=snappy"); println!("cargo:rustc-link-lib=zstd"); - println!("cargo:rerun-if-changed=src/main.rs"); + println!("cargo:rerun-if-changed=src/bridge.rs"); println!("cargo:rerun-if-changed=bridge/cozorocks.cc"); println!("cargo:rerun-if-changed=bridge/cozorocks.h"); } \ No newline at end of file diff --git a/cozorocks/src/bridge.rs b/cozorocks/src/bridge.rs new file mode 100644 index 00000000..682e8ad8 --- /dev/null +++ b/cozorocks/src/bridge.rs @@ -0,0 +1,175 @@ +#[cxx::bridge] +mod ffi { + #[derive(Copy, Clone, Debug, Eq, PartialEq)] + pub enum StatusBridgeCode { + OK = 0, + LOCK_ERROR = 1, + EXISTING_ERROR = 2, + NOT_FOUND_ERROR = 3, + } + + #[derive(Copy, Clone, Debug, Eq, PartialEq)] + pub struct BridgeStatus { + pub code: StatusCode, + pub subcode: StatusSubCode, + pub severity: StatusSeverity, + pub bridge_code: StatusBridgeCode, + } + + #[derive(Copy, Clone, Debug, Eq, PartialEq)] + pub enum StatusCode { + kOk = 0, + kNotFound = 1, + kCorruption = 2, + kNotSupported = 3, + kInvalidArgument = 4, + kIOError = 5, + kMergeInProgress = 6, + kIncomplete = 7, + kShutdownInProgress = 8, + kTimedOut = 9, + kAborted = 10, + kBusy = 11, + kExpired = 12, + kTryAgain = 13, + kCompactionTooLarge = 14, + kColumnFamilyDropped = 15, + kMaxCode, + } + + #[derive(Copy, Clone, Debug, Eq, PartialEq)] + pub enum StatusSubCode { + kNone = 0, + kMutexTimeout = 1, + kLockTimeout = 2, + kLockLimit = 3, + kNoSpace = 4, + kDeadlock = 5, + kStaleFile = 6, + kMemoryLimit = 7, + kSpaceLimit = 8, + kPathNotFound = 9, + KMergeOperandsInsufficientCapacity = 10, + kManualCompactionPaused = 11, + kOverwritten = 12, + kTxnNotPrepared = 13, + kIOFenced = 14, + kMaxSubCode, + } + + #[derive(Copy, Clone, Debug, Eq, PartialEq)] + pub enum StatusSeverity { + kNoError = 0, + kSoftError = 1, + kHardError = 2, + kFatalError = 3, + kUnrecoverableError = 4, + kMaxSeverity, + } + + unsafe extern "C++" { + include!("cozorocks.h"); + + type StatusCode; + type StatusSubCode; + type StatusSeverity; + + type Slice; + type PinnableSlice; + fn convert_slice_back(s: &Slice) -> &[u8]; + fn convert_pinnable_slice_back(s: &PinnableSlice) -> &[u8]; + + type Options; + fn new_options() -> UniquePtr; + fn prepare_for_bulk_load(o: Pin<&mut Options>); + fn increase_parallelism(o: Pin<&mut Options>); + fn optimize_level_style_compaction(o: Pin<&mut Options>); + fn set_create_if_missing(o: Pin<&mut Options>, v: bool); + fn set_comparator(o: Pin<&mut Options>, cmp: &RustComparator); + + type ReadOptions; + fn new_read_options() -> UniquePtr; + fn set_verify_checksums(o: Pin<&mut ReadOptions>, v: bool); + fn set_total_order_seek(o: Pin<&mut ReadOptions>, v: bool); + type WriteOptions; + fn new_write_options() -> UniquePtr; + fn set_disable_wal(o: Pin<&mut WriteOptions>, v: bool); + type TransactionOptions; + fn new_transaction_options() -> UniquePtr; + fn set_deadlock_detect(o: Pin<&mut TransactionOptions>, v: bool); + type OptimisticTransactionOptions; + fn new_optimistic_transaction_options(cmp: &RustComparator) -> UniquePtr; + type TransactionDBOptions; + fn new_tdb_options() -> UniquePtr; + type OptimisticTransactionDBOptions; + fn new_odb_options() -> UniquePtr; + + type RustComparator; + fn new_rust_comparator(name: &str, cmp: fn(&[u8], &[u8]) -> i8) -> UniquePtr; + + pub type IteratorBridge; + fn seek_to_first(self: &IteratorBridge); + fn seek_to_last(self: &IteratorBridge); + fn next(self: &IteratorBridge); + fn is_valid(self: &IteratorBridge) -> bool; + fn do_seek(self: &IteratorBridge, key: &[u8]); + fn do_seek_for_prev(self: &IteratorBridge, key: &[u8]); + fn key_raw(self: &IteratorBridge) -> UniquePtr; + fn value_raw(self: &IteratorBridge) -> UniquePtr; + fn status(self: &IteratorBridge) -> BridgeStatus; + + type TransactionBridge; + fn set_snapshot(self: &TransactionBridge); + fn commit(self: &TransactionBridge, status: &mut BridgeStatus); + fn rollback(self: &TransactionBridge, status: &mut BridgeStatus); + fn set_savepoint(self: &TransactionBridge); + fn rollback_to_savepoint(self: &TransactionBridge, status: &mut BridgeStatus); + fn pop_savepoint(self: &TransactionBridge, status: &mut BridgeStatus); + fn get_txn(self: &TransactionBridge, cf: &ColumnFamilyHandle, key: &[u8], + status: &mut BridgeStatus) -> UniquePtr; + fn get_for_update_txn(self: &TransactionBridge, cf: &ColumnFamilyHandle, key: &[u8], + status: &mut BridgeStatus) -> UniquePtr; + fn get_raw(self: &TransactionBridge, cf: &ColumnFamilyHandle, key: &[u8], + status: &mut BridgeStatus) -> UniquePtr; + fn put_txn(self: &TransactionBridge, cf: &ColumnFamilyHandle, key: &[u8], val: &[u8], + status: &mut BridgeStatus); + fn put_raw(self: &TransactionBridge, cf: &ColumnFamilyHandle, key: &[u8], val: &[u8], + status: &mut BridgeStatus); + fn del_txn(self: &TransactionBridge, cf: &ColumnFamilyHandle, key: &[u8], + status: &mut BridgeStatus); + fn del_raw(self: &TransactionBridge, cf: &ColumnFamilyHandle, key: &[u8], + status: &mut BridgeStatus); + fn iterator_txn(self: &TransactionBridge, cf: &ColumnFamilyHandle) -> UniquePtr; + fn iterator_raw(self: &TransactionBridge, cf: &ColumnFamilyHandle) -> UniquePtr; + + pub type ColumnFamilyHandle; + + type TDBBridge; + fn begin_t_transaction(self: &TDBBridge, + w_ops: UniquePtr, + raw_w_ops: UniquePtr, + r_ops: UniquePtr, + raw_r_ops: UniquePtr, + txn_options: UniquePtr) -> UniquePtr; + fn begin_o_transaction(self: &TDBBridge, + w_ops: UniquePtr, + raw_w_ops: UniquePtr, + r_ops: UniquePtr, + raw_r_ops: UniquePtr, + txn_options: UniquePtr) -> UniquePtr; + fn get_cf_handle_raw(self: &TDBBridge, name: &CxxString) -> SharedPtr; + fn create_column_family_raw(self: &TDBBridge, options: &Options, name: &CxxString, status: &mut BridgeStatus); + fn drop_column_family_raw(self: &TDBBridge, name: &CxxString, status: &mut BridgeStatus); + fn get_column_family_names_raw(self: &TDBBridge) -> UniquePtr>; + fn open_tdb_raw(options: &Options, + txn_options: &TransactionDBOptions, + path: &CxxString, + status: &mut BridgeStatus) -> UniquePtr; + fn open_odb_raw(options: &Options, + txn_options: &OptimisticTransactionDBOptions, + path: &CxxString, + status: &mut BridgeStatus) -> UniquePtr; + } +} + +pub use ffi::*; diff --git a/cozorocks/src/lib.rs b/cozorocks/src/lib.rs index aca37a54..a9120389 100644 --- a/cozorocks/src/lib.rs +++ b/cozorocks/src/lib.rs @@ -1,144 +1,20 @@ -#[cxx::bridge] -mod ffi { - #[derive(Copy, Clone, Debug, Eq, PartialEq)] - pub enum StatusBridgeCode { - OK = 0, - LOCK_ERROR = 1, - EXISTING_ERROR = 2, - NOT_FOUND_ERROR = 3, - } - - #[derive(Copy, Clone, Debug, Eq, PartialEq)] - pub struct BridgeStatus { - pub code: StatusCode, - pub subcode: StatusSubCode, - pub severity: StatusSeverity, - pub bridge_code: StatusBridgeCode, - } - - #[derive(Copy, Clone, Debug, Eq, PartialEq)] - pub enum StatusCode { - kOk = 0, - kNotFound = 1, - kCorruption = 2, - kNotSupported = 3, - kInvalidArgument = 4, - kIOError = 5, - kMergeInProgress = 6, - kIncomplete = 7, - kShutdownInProgress = 8, - kTimedOut = 9, - kAborted = 10, - kBusy = 11, - kExpired = 12, - kTryAgain = 13, - kCompactionTooLarge = 14, - kColumnFamilyDropped = 15, - kMaxCode, - } - - #[derive(Copy, Clone, Debug, Eq, PartialEq)] - pub enum StatusSubCode { - kNone = 0, - kMutexTimeout = 1, - kLockTimeout = 2, - kLockLimit = 3, - kNoSpace = 4, - kDeadlock = 5, - kStaleFile = 6, - kMemoryLimit = 7, - kSpaceLimit = 8, - kPathNotFound = 9, - KMergeOperandsInsufficientCapacity = 10, - kManualCompactionPaused = 11, - kOverwritten = 12, - kTxnNotPrepared = 13, - kIOFenced = 14, - kMaxSubCode, - } - - #[derive(Copy, Clone, Debug, Eq, PartialEq)] - pub enum StatusSeverity { - kNoError = 0, - kSoftError = 1, - kHardError = 2, - kFatalError = 3, - kUnrecoverableError = 4, - kMaxSeverity, - } - - unsafe extern "C++" { - include!("cozorocks.h"); - - type StatusCode; - type StatusSubCode; - type StatusSeverity; - - type PinnableSliceBridge; - fn as_bytes(self: &PinnableSliceBridge) -> &[u8]; - - type SliceBridge; - fn as_bytes(self: &SliceBridge) -> &[u8]; - - type ReadOptionsBridge; - fn new_read_options() -> UniquePtr; - fn do_set_verify_checksums(self: &ReadOptionsBridge, v: bool); - fn do_set_total_order_seek(self: &ReadOptionsBridge, v: bool); - - type WriteOptionsBridge; - fn new_write_options() -> UniquePtr; - fn do_set_disable_wal(self: &WriteOptionsBridge, v: bool); - - type OptionsBridge; - fn new_options() -> UniquePtr; - fn do_prepare_for_bulk_load(self: &OptionsBridge); - fn do_increase_parallelism(self: &OptionsBridge); - fn do_optimize_level_style_compaction(self: &OptionsBridge); - fn do_set_create_if_missing(self: &OptionsBridge, v: bool); - fn do_set_comparator(self: &OptionsBridge, name: &str, compare: fn(&[u8], &[u8]) -> i8); - - pub type ColumnFamilyHandle; - type DBBridge; - fn open_db_raw(options: &OptionsBridge, path: &CxxString, status: &mut BridgeStatus) -> UniquePtr; - fn get_cf_handle_raw(self: &DBBridge, name: &CxxString) -> SharedPtr; - fn write_raw(self: &DBBridge, options: &WriteOptionsBridge, updates: Pin<&mut WriteBatchBridge>, status: &mut BridgeStatus); - fn put_raw(self: &DBBridge, options: &WriteOptionsBridge, cf: &ColumnFamilyHandle, key: &[u8], val: &[u8], status: &mut BridgeStatus); - fn delete_raw(self: &DBBridge, options: &WriteOptionsBridge, cf: &ColumnFamilyHandle, key: &[u8], status: &mut BridgeStatus); - fn get_raw(self: &DBBridge, options: &ReadOptionsBridge, cf: &ColumnFamilyHandle, key: &[u8], status: &mut BridgeStatus) -> UniquePtr; - fn iterator_raw(self: &DBBridge, options: &ReadOptionsBridge, cf: &ColumnFamilyHandle) -> UniquePtr; - fn create_column_family_raw(self: &DBBridge, options: &OptionsBridge, name: &CxxString, status: &mut BridgeStatus); - fn drop_column_family_raw(self: &DBBridge, name: &CxxString, status: &mut BridgeStatus); - fn get_column_family_names_raw(self: &DBBridge) -> UniquePtr>; - - pub type IteratorBridge; - fn seek_to_first(self: &IteratorBridge); - fn seek_to_last(self: &IteratorBridge); - fn next(self: &IteratorBridge); - fn is_valid(self: &IteratorBridge) -> bool; - fn do_seek(self: &IteratorBridge, key: &[u8]); - fn do_seek_for_prev(self: &IteratorBridge, key: &[u8]); - fn key_raw(self: &IteratorBridge) -> UniquePtr; - fn value_raw(self: &IteratorBridge) -> UniquePtr; - fn status(self: &IteratorBridge) -> BridgeStatus; - - pub type WriteBatchBridge; - fn new_write_batch_raw() -> UniquePtr; - fn batch_put_raw(self: &WriteBatchBridge, cf: &ColumnFamilyHandle, key: &[u8], val: &[u8], status: &mut BridgeStatus); - fn batch_delete_raw(self: &WriteBatchBridge, cf: &ColumnFamilyHandle, key: &[u8], status: &mut BridgeStatus); - } -} - -use std::fmt::Formatter; +mod bridge; + +use bridge::*; + +use std::fmt::{Display, Formatter}; use std::fmt::Debug; -use std::path::Path; -use cxx::{UniquePtr, SharedPtr, let_cxx_string}; -pub use ffi::BridgeStatus; -pub use ffi::StatusBridgeCode; -pub use ffi::StatusCode; -pub use ffi::StatusSubCode; -pub use ffi::StatusSeverity; -pub use ffi::IteratorBridge; -use ffi::*; +use std::ops::{Deref, DerefMut}; +use cxx::{let_cxx_string}; +pub use cxx::{UniquePtr, SharedPtr}; +pub use bridge::BridgeStatus; +pub use bridge::StatusBridgeCode; +pub use bridge::StatusCode; +pub use bridge::StatusSubCode; +pub use bridge::StatusSeverity; +pub use bridge::Slice; +pub use bridge::PinnableSlice; + impl std::fmt::Display for BridgeStatus { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { @@ -146,344 +22,486 @@ impl std::fmt::Display for BridgeStatus { } } -impl std::error::Error for BridgeStatus {} - -type Result = std::result::Result; +#[derive(Debug)] +pub struct BridgeError { + pub status: BridgeStatus, +} -pub type Options = UniquePtr; +impl Display for BridgeError { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + Debug::fmt(self, f) + } +} -type ColumnFamilyHandle = SharedPtr; +impl std::error::Error for BridgeError {} -pub trait OptionsTrait { - fn prepare_for_bulk_load(self) -> Self; - fn increase_parallelism(self) -> Self; - fn optimize_level_style_compaction(self) -> Self; - fn set_create_if_missing(self, v: bool) -> Self; - fn set_comparator(self, name: &str, compare: fn(&[u8], &[u8]) -> i8) -> Self; - fn default() -> Self; +impl Default for BridgeStatus { + fn default() -> Self { + BridgeStatus { + code: StatusCode::kOk, + subcode: StatusSubCode::kNone, + severity: StatusSeverity::kNoError, + bridge_code: StatusBridgeCode::OK, + } + } } -impl OptionsTrait for Options { - #[inline] - fn prepare_for_bulk_load(self) -> Self { - self.do_prepare_for_bulk_load(); - self +impl BridgeStatus { + fn check_err(self, data: T) -> Result { + let err: Option = self.into(); + match err { + Some(e) => Err(e), + None => Ok(data) + } } +} - #[inline] - fn increase_parallelism(self) -> Self { - self.do_increase_parallelism(); - self +impl From for Option { + fn from(s: BridgeStatus) -> Self { + if s.severity == StatusSeverity::kNoError && s.bridge_code == StatusBridgeCode::OK { + None + } else { + Some(BridgeError { status: s }) + } } +} - #[inline] - fn optimize_level_style_compaction(self) -> Self { - self.do_optimize_level_style_compaction(); - self +pub type Result = std::result::Result; + +pub trait SlicePtr { + fn as_bytes(&self) -> &[u8]; +} + +impl SlicePtr for UniquePtr { + fn as_bytes(&self) -> &[u8] { + convert_slice_back(self) } +} - #[inline] - fn set_create_if_missing(self, v: bool) -> Self { - self.do_set_create_if_missing(v); - self +impl SlicePtr for UniquePtr { + fn as_bytes(&self) -> &[u8] { + convert_pinnable_slice_back(self) } +} - #[inline] - fn set_comparator(self, name: &str, compare: fn(&[u8], &[u8]) -> i8) -> Self { - self.do_set_comparator(name, compare); - self +pub struct RustComparatorPtr(UniquePtr); + +impl RustComparatorPtr { + pub fn new(name: &str, cmp: fn(&[u8], &[u8]) -> i8) -> Self { + Self(new_rust_comparator(name, cmp)) } +} - #[inline] - fn default() -> Self { - new_options() +impl Deref for RustComparatorPtr { + type Target = UniquePtr; + + fn deref(&self) -> &Self::Target { + &self.0 } } -pub type ReadOptions = UniquePtr; +pub struct OptionsPtr(UniquePtr); -pub trait ReadOptionsTrait { - fn set_total_order_seek(self, v: bool) -> Self; - fn set_verify_checksums(self, v: bool) -> Self; - fn default() -> Self; +impl Deref for OptionsPtr { + type Target = UniquePtr; + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for OptionsPtr { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } } -impl ReadOptionsTrait for ReadOptions { - fn set_total_order_seek(self, v: bool) -> Self { - self.do_set_total_order_seek(v); +impl OptionsPtr { + pub fn default() -> Self { + Self(new_options()) + } + pub fn prepare_for_bulk_load(&mut self) -> &mut Self { + prepare_for_bulk_load(self.pin_mut()); self } - fn set_verify_checksums(self, v: bool) -> Self { - self.do_set_verify_checksums(v); + pub fn increase_parallelism(&mut self) -> &mut Self { + increase_parallelism(self.pin_mut()); self } - - fn default() -> Self { - new_read_options() + pub fn optimize_level_style_compaction(&mut self) -> &mut Self { + optimize_level_style_compaction(self.pin_mut()); + self + } + pub fn set_create_if_missing(&mut self, v: bool) -> &mut Self { + set_create_if_missing(self.pin_mut(), v); + self + } + pub fn set_comparator(&mut self, cmp: &RustComparatorPtr) -> &mut Self { + set_comparator(self.pin_mut(), cmp); + self } } -pub type WriteOptions = UniquePtr; -pub trait WriteOptionsTrait { - fn set_disable_wal(self, v: bool) -> Self; - fn default() -> Self; +pub struct ReadOptionsPtr(UniquePtr); + +impl Deref for ReadOptionsPtr { + type Target = UniquePtr; + + fn deref(&self) -> &Self::Target { + &self.0 + } } -impl WriteOptionsTrait for WriteOptions { - #[inline] - fn set_disable_wal(self, v: bool) -> Self { - self.do_set_disable_wal(v); +impl DerefMut for ReadOptionsPtr { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + + +impl ReadOptionsPtr { + pub fn default() -> Self { + Self(new_read_options()) + } + pub fn set_verify_checksums(&mut self, v: bool) -> &mut Self { + set_verify_checksums(self.pin_mut(), v); self } - fn default() -> Self { - new_write_options() + pub fn set_total_order_seek(&mut self, v: bool) -> &mut Self { + set_total_order_seek(self.pin_mut(), v); + self } } -pub struct PinnableSlice(UniquePtr); +pub struct WriteOptionsPtr(UniquePtr); -impl AsRef<[u8]> for PinnableSlice { - #[inline] - fn as_ref(&self) -> &[u8] { - self.0.as_bytes() +impl Deref for WriteOptionsPtr { + type Target = UniquePtr; + fn deref(&self) -> &Self::Target { + &self.0 } } -pub struct Slice(UniquePtr); +impl DerefMut for WriteOptionsPtr { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} -impl AsRef<[u8]> for Slice { - #[inline] - fn as_ref(&self) -> &[u8] { - self.0.as_bytes() +impl WriteOptionsPtr { + pub fn default() -> Self { + Self(new_write_options()) + } + pub fn set_disable_wal(&mut self, v: bool) -> &mut Self { + set_disable_wal(self.pin_mut(), v); + self } } +pub struct TransactionOptionsPtr(UniquePtr); -pub type DBIterator = UniquePtr; +impl Deref for TransactionOptionsPtr { + type Target = UniquePtr; + fn deref(&self) -> &Self::Target { + &self.0 + } +} -pub trait IteratorImpl { - fn seek(&self, key: impl AsRef<[u8]>); - fn seek_for_prev(&self, key: impl AsRef<[u8]>); - fn key(&self) -> Slice; - fn value(&self) -> Slice; +impl DerefMut for TransactionOptionsPtr { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } } -impl IteratorImpl for IteratorBridge { - #[inline] - fn seek(&self, key: impl AsRef<[u8]>) { - self.do_seek(key.as_ref()); +impl TransactionOptionsPtr { + pub fn default() -> Self { + Self(new_transaction_options()) } - #[inline] - fn seek_for_prev(&self, key: impl AsRef<[u8]>) { - self.do_seek_for_prev(key.as_ref()) + pub fn set_deadlock_detect(&mut self, v: bool) -> &mut Self { + set_deadlock_detect(self.pin_mut(), v); + self } - #[inline] - fn key(&self) -> Slice { - Slice(self.key_raw()) +} + +pub struct OptimisticTransactionOptionsPtr(UniquePtr); + +impl Deref for OptimisticTransactionOptionsPtr { + type Target = UniquePtr; + + fn deref(&self) -> &Self::Target { + &self.0 } - #[inline] - fn value(&self) -> Slice { - Slice(self.value_raw()) +} + +impl DerefMut for OptimisticTransactionOptionsPtr { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 } } -fn get_path_bytes(path: &std::path::Path) -> Vec { - #[cfg(unix)] - { - use std::os::unix::ffi::OsStrExt; - path.as_os_str().as_bytes().to_vec() +impl OptimisticTransactionOptionsPtr { + pub fn new(cmp: &RustComparator) -> Self { + Self(new_optimistic_transaction_options(cmp)) } +} - #[cfg(not(unix))] - { path.to_string_lossy().to_string().as_bytes().to_vec() } +pub struct TransactionDBOptionsPtr(UniquePtr); + +impl Deref for TransactionDBOptionsPtr { + type Target = UniquePtr; + fn deref(&self) -> &Self::Target { + &self.0 + } } -impl Default for BridgeStatus { - #[inline] - fn default() -> Self { - Self { - code: StatusCode::kOk, - subcode: StatusSubCode::kNone, - severity: StatusSeverity::kNoError, - bridge_code: StatusBridgeCode::OK, - } +impl DerefMut for TransactionDBOptionsPtr { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 } } -pub struct DB { - inner: UniquePtr, - pub path: Vec, - pub options: Options, - pub default_read_options: ReadOptions, - pub default_write_options: WriteOptions, +impl TransactionDBOptionsPtr { + pub fn default() -> Self { + Self(new_tdb_options()) + } } -unsafe impl Send for DB {} +pub struct OptimisticTransactionDBOptionsPtr(UniquePtr); -unsafe impl Sync for DB {} +impl Deref for OptimisticTransactionDBOptionsPtr { + type Target = UniquePtr; + fn deref(&self) -> &Self::Target { + &self.0 + } +} -impl DB { - pub fn open(options: Options, path: &Path) -> Result { - let path = get_path_bytes(path); - let_cxx_string!(cpp_path = path.clone()); - let mut status = BridgeStatus::default(); - let bridge = open_db_raw( - &options, - &cpp_path, - &mut status, - ); - - if status.code == StatusCode::kOk { - Ok(DB { - inner: bridge, - path, - options, - default_read_options: ReadOptions::default(), - default_write_options: WriteOptions::default(), - }) - } else { - Err(status) - } +impl DerefMut for OptimisticTransactionDBOptionsPtr { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 } +} - pub fn get_cf_handle(&self, name: impl AsRef) -> Result { - let_cxx_string!(name = name.as_ref()); - let ret = self.inner.get_cf_handle_raw(&name); - if ret.is_null() { - Err(BridgeStatus { - code: StatusCode::kMaxCode, - subcode: StatusSubCode::kMaxSubCode, - severity: StatusSeverity::kSoftError, - bridge_code: StatusBridgeCode::NOT_FOUND_ERROR, - }) - } else { - Ok(ret) - } +impl OptimisticTransactionDBOptionsPtr { + pub fn default() -> Self { + Self(new_odb_options()) } +} + +pub struct IteratorPtr(UniquePtr); - #[inline] - pub fn iterator(&self, cf: &ColumnFamilyHandle, options: Option<&ReadOptions>) -> DBIterator { - self.inner.iterator_raw(options.unwrap_or(&self.default_read_options), cf) +impl Deref for IteratorPtr { + type Target = UniquePtr; + fn deref(&self) -> &Self::Target { + &self.0 } +} - pub fn create_column_family(&self, name: impl AsRef) -> Result<()> { - let_cxx_string!(name = name.as_ref()); - let mut status = BridgeStatus::default(); - self.inner.create_column_family_raw(&self.options, &name, &mut status); - if status.code == StatusCode::kOk { - Ok(()) +impl IteratorPtr { + pub fn to_first(&self) { + IteratorBridge::seek_to_first(self) + } + pub fn to_last(&self) { + IteratorBridge::seek_to_last(self) + } + pub fn next(&self) { + IteratorBridge::next(self) + } + pub fn is_valid(&self) -> bool { + IteratorBridge::is_valid(self) + } + pub fn seek(&self, key: impl AsRef<[u8]>) { + IteratorBridge::do_seek(self, key.as_ref()) + } + pub fn seek_for_prev(&self, key: impl AsRef<[u8]>) { + IteratorBridge::do_seek_for_prev(self, key.as_ref()) + } + pub fn key(&self) -> UniquePtr { + IteratorBridge::key_raw(self) + } + pub fn val(&self) -> UniquePtr { + IteratorBridge::value_raw(self) + } + pub fn status(&self) -> BridgeStatus { + IteratorBridge::status(self) + } + pub fn iter(&self) -> KVIterator { + KVIterator { it: self } + } + pub fn keys(&self) -> KeyIterator { + KeyIterator { it: self } + } +} + +pub struct KVIterator<'a> { + it: &'a IteratorPtr, +} + +impl Iterator for KVIterator<'_> { + type Item = (UniquePtr, UniquePtr); + fn next(&mut self) -> Option { + if self.it.is_valid() { + let ret = (self.it.key(), self.it.val()); + self.next(); + Some(ret) } else { - Err(status) + None } } +} - pub fn drop_column_family(&self, name: impl AsRef) -> Result<()> { - let_cxx_string!(name = name.as_ref()); - let mut status = BridgeStatus::default(); - self.inner.drop_column_family_raw(&name, &mut status); - if status.code == StatusCode::kOk { - Ok(()) + +pub struct KeyIterator<'a> { + it: &'a IteratorPtr, +} + +impl Iterator for KeyIterator<'_> { + type Item = UniquePtr; + fn next(&mut self) -> Option { + if self.it.is_valid() { + let ret = self.it.key(); + self.next(); + Some(ret) } else { - Err(status) + None } } +} + +pub struct TransactionPtr(UniquePtr); - pub fn all_cf_names(&self) -> Vec { - self.inner.get_column_family_names_raw().iter().map(|v| v.to_string_lossy().to_string()).collect() +impl Deref for TransactionPtr { + type Target = UniquePtr; + fn deref(&self) -> &Self::Target { + &self.0 } +} - #[inline] - pub fn get(&self, key: impl AsRef<[u8]>, cf: &ColumnFamilyHandle, options: Option<&ReadOptions>) -> Result> { + +impl TransactionPtr { + pub fn set_snapshot(&self) { + TransactionBridge::set_snapshot(self) + } + pub fn commit(&self) -> Result<()> { let mut status = BridgeStatus::default(); - let slice = self.inner.get_raw(options.unwrap_or(&self.default_read_options), cf, key.as_ref(), &mut status); - match status.code { - StatusCode::kOk => Ok(Some(PinnableSlice(slice))), - StatusCode::kNotFound => Ok(None), - _ => Err(status) - } + TransactionBridge::commit(self, &mut status); + status.check_err(()) } - - #[inline] - pub fn put(&self, key: impl AsRef<[u8]>, val: impl AsRef<[u8]>, cf: &ColumnFamilyHandle, options: Option<&WriteOptions>) -> Result { + pub fn rollback(&self) -> Result<()> { + let mut status = BridgeStatus::default(); + TransactionBridge::rollback(self, &mut status); + status.check_err(()) + } + pub fn set_savepoint(&self) { + TransactionBridge::set_savepoint(self); + } + pub fn rollback_to_savepoint(&self) -> Result<()> { let mut status = BridgeStatus::default(); - self.inner.put_raw(options.unwrap_or(&self.default_write_options), cf, - key.as_ref(), val.as_ref(), - &mut status); - if status.code == StatusCode::kOk { - Ok(status) + TransactionBridge::rollback_to_savepoint(self, &mut status); + status.check_err(()) + } + pub fn pop_savepoint(&self) -> Result<()> { + let mut status = BridgeStatus::default(); + TransactionBridge::pop_savepoint(self, &mut status); + status.check_err(()) + } + pub fn get(&self, transact: bool, cf: &ColumnFamilyHandle, key: impl AsRef<[u8]>) -> Result> { + let mut status = BridgeStatus::default(); + if transact { + let ret = self.get_txn(cf, key.as_ref(), &mut status); + status.check_err(ret) } else { - Err(status) + let ret = self.get_raw(cf, key.as_ref(), &mut status); + status.check_err(ret) } } - - #[inline] - pub fn delete(&self, key: impl AsRef<[u8]>, cf: &ColumnFamilyHandle, options: Option<&WriteOptions>) -> Result { + pub fn get_for_update(&self, cf: &ColumnFamilyHandle, key: impl AsRef<[u8]>) -> Result> { let mut status = BridgeStatus::default(); - self.inner.delete_raw(options.unwrap_or(&self.default_write_options), cf, - key.as_ref(), - &mut status); - if status.code == StatusCode::kOk { - Ok(status) + let ret = self.get_for_update_txn(cf, key.as_ref(), &mut status); + status.check_err(ret) + } + pub fn del(&self, transact: bool, cf: &ColumnFamilyHandle, key: impl AsRef<[u8]>) -> Result<()> { + let mut status = BridgeStatus::default(); + if transact { + let ret = self.del_txn(cf, key.as_ref(), &mut status); + status.check_err(ret) } else { - Err(status) + let ret = self.del_raw(cf, key.as_ref(), &mut status); + status.check_err(ret) } } - - #[inline] - pub fn write(&self, mut updates: WriteBatch, options: Option<&WriteOptions>) -> Result { + pub fn put(&self, transact: bool, cf: &ColumnFamilyHandle, key: impl AsRef<[u8]>, val: impl AsRef<[u8]>) -> Result<()> { let mut status = BridgeStatus::default(); - self.inner.write_raw(options.unwrap_or(&self.default_write_options), - updates.pin_mut(), - &mut status); - if status.code == StatusCode::kOk { - Ok(status) + if transact { + let ret = self.put_txn(cf, key.as_ref(), val.as_ref(), &mut status); + status.check_err(ret) } else { - Err(status) + let ret = self.put_raw(cf, key.as_ref(), val.as_ref(), &mut status); + status.check_err(ret) + } + } + pub fn iterator(&self, transact: bool, cf: &ColumnFamilyHandle) -> IteratorPtr { + if transact { + IteratorPtr(self.iterator_txn(cf)) + } else { + IteratorPtr(self.iterator_raw(cf)) } } } -pub type WriteBatch = UniquePtr; +pub struct DBPtr(UniquePtr); -pub trait WriteBatchWrapperImp { - fn default() -> WriteBatch; -} +impl Deref for DBPtr { + type Target = UniquePtr; -impl WriteBatchWrapperImp for WriteBatch { - fn default() -> WriteBatch { - new_write_batch_raw() + fn deref(&self) -> &Self::Target { + &self.0 } } -pub trait WriteBatchImpl { - fn put(&self, key: impl AsRef<[u8]>, val: impl AsRef<[u8]>, cf: &ColumnFamilyHandle) -> Result; - fn delete(&self, key: impl AsRef<[u8]>, cf: &ColumnFamilyHandle) -> Result; -} +unsafe impl Send for DBPtr {} -impl WriteBatchImpl for WriteBatchBridge { - #[inline] - fn put(&self, key: impl AsRef<[u8]>, val: impl AsRef<[u8]>, cf: &ColumnFamilyHandle) -> Result { +unsafe impl Sync for DBPtr {} + +impl DBPtr { + pub fn open_pessimistic(options: &Options, t_options: &TransactionDBOptions, path: impl AsRef) -> Result { + let_cxx_string!(cname = path.as_ref()); let mut status = BridgeStatus::default(); - self.batch_put_raw(cf, - key.as_ref(), val.as_ref(), - &mut status); - if status.code == StatusCode::kOk { - Ok(status) - } else { - Err(status) - } + let ret = open_tdb_raw(options, t_options, &cname, &mut status); + status.check_err(Self(ret)) } - #[inline] - fn delete(&self, key: impl AsRef<[u8]>, cf: &ColumnFamilyHandle) -> Result { + + pub fn open_optimistic(options: &Options, t_options: &OptimisticTransactionDBOptions, path: impl AsRef) -> Result { + let_cxx_string!(cname = path.as_ref()); let mut status = BridgeStatus::default(); - self.batch_delete_raw(cf, - key.as_ref(), - &mut status); - if status.code == StatusCode::kOk { - Ok(status) + let ret = open_odb_raw(options, t_options, &cname, &mut status); + status.check_err(Self(ret)) + } + + pub fn get_cf(&self, name: impl AsRef) -> Option> { + let_cxx_string!(cname = name.as_ref()); + let spt = self.get_cf_handle_raw(&cname); + if spt.is_null() { + None } else { - Err(status) + Some(spt) } } + + pub fn create_cf(&self, options: &Options, name: impl AsRef) -> Result<()> { + let_cxx_string!(name = name.as_ref()); + let mut status = BridgeStatus::default(); + self.create_column_family_raw(options, &name, &mut status); + status.check_err(()) + } + + pub fn drop_cf(&self, name: impl AsRef) -> Result<()> { + let_cxx_string!(name = name.as_ref()); + let mut status = BridgeStatus::default(); + self.drop_column_family_raw(&name, &mut status); + status.check_err(()) + } + + pub fn cf_names(&self) -> Vec { + self.get_column_family_names_raw().iter().map(|v| v.to_string_lossy().to_string()).collect() + } } \ No newline at end of file