// // Created by Ziyang Hu on 2022/4/13. // #pragma once #include #include #include #include "rust/cxx.h" #include "rocksdb/db.h" #include "rocksdb/slice.h" #include "rocksdb/options.h" #include "rocksdb/utilities/transaction.h" #include "rocksdb/utilities/transaction_db.h" #include "rocksdb/utilities/optimistic_transaction_db.h" #include "rocksdb/table.h" #include "rocksdb/filter_policy.h" #include "rocksdb/slice_transform.h" typedef std::shared_mutex Lock; typedef std::unique_lock WriteLock; typedef std::shared_lock ReadLock; using namespace ROCKSDB_NAMESPACE; using std::unique_ptr; using std::shared_ptr; using std::make_unique; using std::make_shared; using std::string; using std::vector; using std::unordered_map; using std::tuple; struct BridgeStatus; typedef Status::Code StatusCode; typedef Status::SubCode StatusSubCode; typedef Status::Severity StatusSeverity; inline Slice convert_slice(rust::Slice d) { return Slice(reinterpret_cast(d.data()), d.size()); } inline rust::Slice convert_slice_back(const Slice &s) { return rust::Slice(reinterpret_cast(s.data()), s.size()); } inline rust::Slice convert_pinnable_slice_back(const PinnableSlice &s) { return rust::Slice(reinterpret_cast(s.data()), s.size()); } void write_status_impl(BridgeStatus &status, StatusCode code, StatusSubCode subcode, StatusSeverity severity, int bridge_code); inline void write_status(Status &&rstatus, BridgeStatus &status) { if (rstatus.code() != StatusCode::kOk || rstatus.subcode() != StatusSubCode::kNoSpace || rstatus.severity() != StatusSeverity::kNoError) { write_status_impl(status, rstatus.code(), rstatus.subcode(), rstatus.severity(), 0); } } void set_verify_checksums(ReadOptions &options, const bool v) { options.verify_checksums = v; } void set_total_order_seek(ReadOptions &options, const bool v) { options.total_order_seek = v; } void set_prefix_same_as_start(ReadOptions &options, const bool v) { options.prefix_same_as_start = v; } void set_auto_prefix_mode(ReadOptions &options, const bool v) { options.auto_prefix_mode = v; } void set_disable_wal(WriteOptions &options, const bool v) { options.disableWAL = v; } typedef rust::Fn, rust::Slice)> RustComparatorFn; class RustComparator : public Comparator { public: inline int Compare(const rocksdb::Slice &a, const rocksdb::Slice &b) const { return int(rust_compare(convert_slice_back(a), convert_slice_back(b))); } const char *Name() const { return name.c_str(); } virtual bool CanKeysWithDifferentByteContentsBeEqual() const { return can_different_bytes_be_equal; } void FindShortestSeparator(std::string *, const rocksdb::Slice &) const {} void FindShortSuccessor(std::string *) const {} void set_fn(RustComparatorFn f) { rust_compare = f; } void set_name(rust::Str name_) { name = std::string(name_); } void set_can_different_bytes_be_equal(bool v) { can_different_bytes_be_equal = v; } std::string name; RustComparatorFn rust_compare; bool can_different_bytes_be_equal; }; inline unique_ptr new_rust_comparator(rust::Str name, RustComparatorFn f, bool diff_bytes_can_equal) { auto ret = make_unique(); ret->set_name(name); ret->set_fn(f); ret->set_can_different_bytes_be_equal(diff_bytes_can_equal); return ret; } inline void prepare_for_bulk_load(Options &inner) { inner.PrepareForBulkLoad(); } inline void increase_parallelism(Options &inner) { inner.IncreaseParallelism(); } inline void optimize_level_style_compaction(Options &inner) { inner.OptimizeLevelStyleCompaction(); }; inline void set_create_if_missing(Options &inner, bool v) { inner.create_if_missing = v; } inline void set_comparator(Options &inner, const RustComparator &cmp_obj) { inner.comparator = &cmp_obj; } inline void set_paranoid_checks(Options &inner, bool v) { inner.paranoid_checks = v; } inline std::unique_ptr new_read_options() { return std::make_unique(); } inline std::unique_ptr new_write_options() { return std::make_unique(); } inline std::unique_ptr new_options() { return std::make_unique(); } inline void set_bloom_filter(Options &options, const double bits_per_key, const bool whole_key_filtering) { BlockBasedTableOptions table_options; table_options.filter_policy.reset(NewBloomFilterPolicy(bits_per_key, false)); table_options.whole_key_filtering = whole_key_filtering; options.table_factory.reset( NewBlockBasedTableFactory( table_options)); } inline void set_capped_prefix_extractor(Options &options, const size_t cap_len) { options.prefix_extractor.reset(NewCappedPrefixTransform(cap_len)); } inline void set_fixed_prefix_extractor(Options &options, const size_t prefix_len) { options.prefix_extractor.reset(NewFixedPrefixTransform(prefix_len)); } struct IteratorBridge { mutable std::unique_ptr inner; IteratorBridge(Iterator *it) : inner(it) {} inline void seek_to_first() const { inner->SeekToFirst(); } inline void seek_to_last() const { inner->SeekToLast(); } inline void next() const { inner->Next(); } inline bool is_valid() const { return inner->Valid(); } inline void do_seek(rust::Slice key) const { auto k = Slice(reinterpret_cast(key.data()), key.size()); inner->Seek(k); } inline void do_seek_for_prev(rust::Slice key) const { auto k = Slice(reinterpret_cast(key.data()), key.size()); inner->SeekForPrev(k); } inline std::shared_ptr key_raw() const { // std::cout << "c++ get " << inner->key().size() << std::endl; return std::make_shared(inner->key()); } inline std::shared_ptr value_raw() const { return std::make_shared(inner->value()); } BridgeStatus status() const; }; inline unique_ptr new_transaction_options() { return make_unique(); } inline void set_deadlock_detect(TransactionOptions &inner, bool v) { inner.deadlock_detect = v; } inline unique_ptr new_optimistic_transaction_options(const RustComparator &compare) { auto ret = make_unique(); ret->cmp = &compare; return ret; } struct TransactionBridge { DB *raw_db; unique_ptr inner; mutable unique_ptr t_ops; // Put here to make sure ownership works mutable unique_ptr o_ops; // same as above mutable unique_ptr r_ops; mutable unique_ptr raw_r_ops; mutable unique_ptr w_ops; mutable unique_ptr raw_w_ops; inline void set_snapshot() const { inner->SetSnapshot(); r_ops->snapshot = inner->GetSnapshot(); } inline void commit(BridgeStatus &status) const { write_status(inner->Commit(), status); r_ops->snapshot = nullptr; } inline void rollback(BridgeStatus &status) const { write_status(inner->Rollback(), status); } inline void set_savepoint() const { inner->SetSavePoint(); } inline void rollback_to_savepoint(BridgeStatus &status) const { write_status(inner->RollbackToSavePoint(), status); } inline void pop_savepoint(BridgeStatus &status) const { write_status(inner->PopSavePoint(), status); } unique_ptr> multiget_txn( const ColumnFamilyHandle &cf, rust::Slice> keys, rust::Slice statuses) const; unique_ptr> multiget_raw( const ColumnFamilyHandle &cf, rust::Slice> keys, rust::Slice statuses) const; inline shared_ptr get_txn( const ColumnFamilyHandle &cf, rust::Slice key, BridgeStatus &status ) const { auto pinnable_val = std::make_shared(); write_status( inner->Get(*r_ops, const_cast(&cf), convert_slice(key), &*pinnable_val), status ); return pinnable_val; } inline shared_ptr get_for_update_txn( const ColumnFamilyHandle &cf, rust::Slice key, BridgeStatus &status ) const { auto pinnable_val = std::make_shared(); write_status( inner->GetForUpdate(*r_ops, const_cast(&cf), convert_slice(key), &*pinnable_val), status ); return pinnable_val; } inline std::shared_ptr get_raw( const ColumnFamilyHandle &cf, rust::Slice key, BridgeStatus &status ) const { auto pinnable_val = std::make_shared(); write_status( raw_db->Get(*r_ops, const_cast(&cf), convert_slice(key), &*pinnable_val), status ); return pinnable_val; } inline void put_txn( const ColumnFamilyHandle &cf, rust::Slice key, rust::Slice val, BridgeStatus &status ) const { write_status( inner->Put(const_cast(&cf), convert_slice(key), convert_slice(val)), status ); } inline void put_raw( const ColumnFamilyHandle &cf, rust::Slice key, rust::Slice val, BridgeStatus &status ) const { auto k = convert_slice(key); auto v = convert_slice(val); // std::cout << "c++ put " << key.size() << " " << k.size() << std::endl; write_status( raw_db->Put( *raw_w_ops, const_cast(&cf), k, v), status ); } inline void del_txn( const ColumnFamilyHandle &cf, rust::Slice key, BridgeStatus &status ) const { write_status( inner->Delete(const_cast(&cf), convert_slice(key)), status ); } inline void del_raw( const ColumnFamilyHandle &cf, rust::Slice key, BridgeStatus &status ) const { write_status( raw_db->Delete( *raw_w_ops, const_cast(&cf), convert_slice(key)), status ); } inline void del_range_raw( const ColumnFamilyHandle &cf, rust::Slice start_key, rust::Slice end_key, BridgeStatus &status ) const { write_status( raw_db->GetRootDB()->DeleteRange( *raw_w_ops, const_cast(&cf), convert_slice(start_key), convert_slice(end_key)), status); } inline void flush_raw(const ColumnFamilyHandle &cf, const FlushOptions &options, BridgeStatus &status) const { write_status(raw_db->Flush(options, const_cast(&cf)), status); } inline void compact_all_raw(const ColumnFamilyHandle &cf, BridgeStatus &status) const { auto options = CompactRangeOptions(); options.change_level = true; options.target_level = 0; options.exclusive_manual_compaction = false; write_status(raw_db->CompactRange(options, const_cast(&cf), nullptr, nullptr), status); } inline std::unique_ptr iterator_txn( const ColumnFamilyHandle &cf) const { return std::make_unique( inner->GetIterator(*r_ops, const_cast(&cf))); } inline std::unique_ptr iterator_raw( const ColumnFamilyHandle &cf) const { return std::make_unique( raw_db->NewIterator(*raw_r_ops, const_cast(&cf))); } }; inline tuple, vector> get_cf_data(const Options &options, const string &path) { auto cf_names = std::vector(); DB::ListColumnFamilies(options, path, &cf_names); if (cf_names.empty()) { cf_names.push_back(kDefaultColumnFamilyName); } std::vector column_families; for (auto el: cf_names) { column_families.push_back(ColumnFamilyDescriptor( el, options)); } return std::make_tuple(cf_names, column_families); } struct TDBBridge { mutable unique_ptr db; mutable TransactionDB *tdb; mutable OptimisticTransactionDB *odb; mutable unordered_map> handles; mutable Lock handle_lock; bool is_odb; TDBBridge(StackableDB *db_, TransactionDB *tdb_, OptimisticTransactionDB *odb_, unordered_map> &&handles_) : db(db_), tdb(tdb_), odb(odb_), handles(handles_), handle_lock() { is_odb = (tdb_ == nullptr); } inline unique_ptr begin_t_transaction( unique_ptr w_ops, unique_ptr raw_w_ops, unique_ptr r_ops, unique_ptr raw_r_ops, unique_ptr txn_options) const { if (tdb == nullptr) { return unique_ptr(nullptr); } auto ret = make_unique(); ret->raw_db = tdb; ret->r_ops = std::move(r_ops); ret->w_ops = std::move(w_ops); ret->raw_r_ops = std::move(raw_r_ops); ret->raw_w_ops = std::move(raw_w_ops); ret->t_ops = std::move(txn_options); Transaction *txn = tdb->BeginTransaction(*ret->w_ops, *ret->t_ops); ret->inner = unique_ptr(txn); return ret; } inline unique_ptr begin_o_transaction( unique_ptr w_ops, unique_ptr raw_w_ops, unique_ptr r_ops, unique_ptr raw_r_ops, unique_ptr txn_options) const { if (odb == nullptr) { return unique_ptr(nullptr); } auto ret = make_unique(); ret->raw_db = odb; ret->r_ops = std::move(r_ops); ret->w_ops = std::move(w_ops); ret->raw_r_ops = std::move(raw_r_ops); ret->raw_w_ops = std::move(raw_w_ops); ret->o_ops = std::move(txn_options); Transaction *txn = odb->BeginTransaction(*ret->w_ops, *ret->o_ops); ret->inner = unique_ptr(txn); return ret; } inline shared_ptr get_cf_handle_raw(const string &name) const { ReadLock r_lock(handle_lock); try { return handles.at(name); } catch (const std::out_of_range &) { return shared_ptr(nullptr); } } inline shared_ptr get_default_cf_handle_raw() const { return handles.at("default"); } inline shared_ptr create_column_family_raw(const Options &options, const string &name, BridgeStatus &status) const { { ReadLock r_lock(handle_lock); if (handles.find(name) != handles.end()) { write_status_impl(status, StatusCode::kMaxCode, StatusSubCode::kMaxSubCode, StatusSeverity::kSoftError, 2); return shared_ptr(nullptr); } } { WriteLock w_lock(handle_lock); ColumnFamilyHandle *handle; auto s = db->CreateColumnFamily(options, name, &handle); write_status(std::move(s), status); auto ret = shared_ptr(handle); handles[name] = ret; return ret; } } inline void drop_column_family_raw(const string &name, BridgeStatus &status) const { WriteLock w_lock(handle_lock); auto cf_it = handles.find(name); if (cf_it != handles.end()) { auto s = db->DropColumnFamily(cf_it->second.get()); handles.erase(cf_it); write_status(std::move(s), status); } else { write_status_impl(status, StatusCode::kMaxCode, StatusSubCode::kMaxSubCode, StatusSeverity::kSoftError, 3); } // When should we call DestroyColumnFamilyHandle? } inline unique_ptr> get_column_family_names_raw() const { ReadLock r_lock(handle_lock); auto ret = make_unique>(); for (auto entry: handles) { ret->push_back(entry.first); } return ret; } }; inline unique_ptr new_tdb_options() { return make_unique(); } inline unique_ptr new_odb_options() { return make_unique(); } inline unique_ptr new_flush_options() { return make_unique(); } void set_flush_wait(FlushOptions &options, bool v) { options.wait = v; } void set_allow_write_stall(FlushOptions &options, bool v) { options.allow_write_stall = v; } inline unique_ptr open_tdb_raw(const Options &options, const TransactionDBOptions &txn_db_options, const string &path, BridgeStatus &status) { auto cf_info = get_cf_data(options, path); auto cf_names = std::get<0>(cf_info); auto column_families = std::get<1>(cf_info); std::vector handles; TransactionDB *txn_db = nullptr; Status s = TransactionDB::Open(options, txn_db_options, path, column_families, &handles, &txn_db); auto ok = s.ok(); write_status(std::move(s), status); unordered_map> handle_map; if (ok) { assert(handles.size() == cf_names.size()); for (size_t i = 0; i < handles.size(); ++i) { handle_map[cf_names[i]] = shared_ptr(handles[i]); } } return make_unique(txn_db, txn_db, nullptr, std::move(handle_map)); } inline unique_ptr open_odb_raw(const Options &options, const OptimisticTransactionDBOptions &txn_db_options, const string &path, BridgeStatus &status) { auto cf_info = get_cf_data(options, path); auto cf_names = std::get<0>(cf_info); auto column_families = std::get<1>(cf_info); std::vector handles; OptimisticTransactionDB *txn_db = nullptr; Status s = OptimisticTransactionDB::Open(options, txn_db_options, path, column_families, &handles, &txn_db); auto ok = s.ok(); write_status(std::move(s), status); unordered_map> handle_map; if (ok) { assert(handles.size() == cf_names.size()); for (size_t i = 0; i < handles.size(); ++i) { handle_map[cf_names[i]] = shared_ptr(handles[i]); } } return make_unique(txn_db, nullptr, txn_db, std::move(handle_map)); }