// // Created by Ziyang Hu on 2022/4/13. // #pragma once #include #include #include "rust/cxx.h" #include "rocksdb/db.h" #include "rocksdb/slice.h" #include "rocksdb/options.h" #include "rocksdb/utilities/transaction.h" #include "rocksdb/utilities/transaction_db.h" typedef std::shared_mutex Lock; typedef std::unique_lock WriteLock; typedef std::shared_lock ReadLock; using namespace ROCKSDB_NAMESPACE; using std::unique_ptr; using std::shared_ptr; using std::make_unique; using std::make_shared; using std::string; using std::vector; using std::unordered_map; struct BridgeStatus; typedef Status::Code StatusCode; typedef Status::SubCode StatusSubCode; typedef Status::Severity StatusSeverity; inline Slice convert_slice(rust::Slice d) { return Slice(reinterpret_cast(d.data()), d.size()); } inline rust::Slice convert_slice_back(const Slice &s) { return rust::Slice(reinterpret_cast(s.data()), s.size()); } struct ReadOptionsBridge { mutable ReadOptions inner; inline void do_set_verify_checksums(bool v) const { inner.verify_checksums = v; } inline void do_set_total_order_seek(bool v) const { inner.total_order_seek = v; } }; struct WriteOptionsBridge { mutable WriteOptions inner; public: inline void do_set_disable_wal(bool v) const { inner.disableWAL = v; } }; typedef rust::Fn, rust::Slice)> RustComparatorFn; class RustComparator : public Comparator { public: inline int Compare(const rocksdb::Slice &a, const rocksdb::Slice &b) const { return int(rust_compare(convert_slice_back(a), convert_slice_back(b))); } const char *Name() const { return "RustComparator"; } void FindShortestSeparator(std::string *, const rocksdb::Slice &) const {} void FindShortSuccessor(std::string *) const {} void set_fn(RustComparatorFn f) const { rust_compare = f; } void set_name(rust::Str name_) const { name = std::string(name_); } mutable std::string name; mutable RustComparatorFn rust_compare; }; struct OptionsBridge { mutable Options inner; mutable RustComparator cmp_obj; inline void do_prepare_for_bulk_load() const { inner.PrepareForBulkLoad(); } inline void do_increase_parallelism() const { inner.IncreaseParallelism(); } inline void do_optimize_level_style_compaction() const { inner.OptimizeLevelStyleCompaction(); }; inline void do_set_create_if_missing(bool v) const { inner.create_if_missing = v; } inline void do_set_comparator(rust::Str name, RustComparatorFn f) const { cmp_obj = RustComparator(); cmp_obj.set_name(name); cmp_obj.set_fn(f); inner.comparator = &cmp_obj; } }; inline std::unique_ptr new_read_options() { return std::unique_ptr(new ReadOptionsBridge); } inline std::unique_ptr new_write_options() { return std::unique_ptr(new WriteOptionsBridge); } inline std::unique_ptr new_options() { return std::unique_ptr(new OptionsBridge); } struct PinnableSliceBridge { PinnableSlice inner; inline rust::Slice as_bytes() const { return convert_slice_back(inner); } }; struct SliceBridge { Slice inner; SliceBridge(Slice &&s) : inner(s) {} inline rust::Slice as_bytes() const { return convert_slice_back(inner); } }; void write_status_impl(BridgeStatus &status, StatusCode code, StatusSubCode subcode, StatusSeverity severity, int bridge_code); inline void write_status(Status &&rstatus, BridgeStatus &status) { if (rstatus.code() != StatusCode::kOk || rstatus.subcode() != StatusSubCode::kNoSpace || rstatus.severity() != StatusSeverity::kNoError) { write_status_impl(status, rstatus.code(), rstatus.subcode(), rstatus.severity(), 0); } } struct IteratorBridge { mutable std::unique_ptr inner; IteratorBridge(Iterator *it) : inner(it) {} inline void seek_to_first() const { inner->SeekToFirst(); } inline void seek_to_last() const { inner->SeekToLast(); } inline void next() const { inner->Next(); } inline bool is_valid() const { return inner->Valid(); } inline void do_seek(rust::Slice key) const { auto k = Slice(reinterpret_cast(key.data()), key.size()); inner->Seek(k); } inline void do_seek_for_prev(rust::Slice key) const { auto k = Slice(reinterpret_cast(key.data()), key.size()); inner->SeekForPrev(k); } inline std::unique_ptr key_raw() const { return std::make_unique(inner->key()); } inline std::unique_ptr value_raw() const { return std::make_unique(inner->value()); } BridgeStatus status() const; }; struct WriteBatchBridge { mutable WriteBatch inner; inline void batch_put_raw( const ColumnFamilyHandle &cf, rust::Slice key, rust::Slice val, BridgeStatus &status ) const { write_status( inner.Put(const_cast(&cf), convert_slice(key), convert_slice(val)), status ); } inline void batch_delete_raw( const ColumnFamilyHandle &cf, rust::Slice key, BridgeStatus &status ) const { write_status( inner.Delete(const_cast(&cf), convert_slice(key)), status ); } }; inline unique_ptr new_write_batch_raw() { return make_unique(); } struct DBBridge { mutable unique_ptr db; mutable unordered_map> handles; mutable Lock handle_lock; DBBridge(DB *db_, unordered_map> &&handles_) : db(db_), handles(handles_) {} inline shared_ptr get_cf_handle_raw(const string &name) const { ReadLock r_lock(handle_lock); try { return handles.at(name); } catch (const std::out_of_range &) { return shared_ptr(nullptr); } } inline void put_raw( const WriteOptionsBridge &options, const ColumnFamilyHandle &cf, rust::Slice key, rust::Slice val, BridgeStatus &status ) const { write_status( db->Put(options.inner, const_cast(&cf), convert_slice(key), convert_slice(val)), status ); } inline void delete_raw( const WriteOptionsBridge &options, const ColumnFamilyHandle &cf, rust::Slice key, BridgeStatus &status ) const { write_status( db->Delete(options.inner, const_cast(&cf), convert_slice(key)), status ); } inline void write_raw( const WriteOptionsBridge &options, WriteBatchBridge &updates, BridgeStatus &status ) const { write_status(db->Write(options.inner, &updates.inner), status); } inline std::unique_ptr get_raw( const ReadOptionsBridge &options, const ColumnFamilyHandle &cf, rust::Slice key, BridgeStatus &status ) const { auto pinnable_val = std::make_unique(); write_status( db->Get(options.inner, const_cast(&cf), convert_slice(key), &pinnable_val->inner), status ); return pinnable_val; } inline std::unique_ptr iterator_raw( const ReadOptionsBridge &options, const ColumnFamilyHandle &cf) const { return std::make_unique(db->NewIterator(options.inner, const_cast(&cf))); } inline void create_column_family_raw(const OptionsBridge &options, const string &name, BridgeStatus &status) const { { ReadLock r_lock(handle_lock); if (handles.find(name) != handles.end()) { write_status_impl(status, StatusCode::kMaxCode, StatusSubCode::kMaxSubCode, StatusSeverity::kSoftError, 2); return; } } WriteLock w_lock(handle_lock); ColumnFamilyHandle *handle; auto s = db->CreateColumnFamily(options.inner, name, &handle); write_status(std::move(s), status); handles[name] = shared_ptr(handle); } inline void drop_column_family_raw(const string &name, BridgeStatus &status) const { WriteLock w_lock(handle_lock); auto cf_it = handles.find(name); if (cf_it != handles.end()) { auto s = db->DropColumnFamily(cf_it->second.get()); handles.erase(cf_it); write_status(std::move(s), status); } else { write_status_impl(status, StatusCode::kMaxCode, StatusSubCode::kMaxSubCode, StatusSeverity::kSoftError, 3); } // When should we call DestroyColumnFamilyHandle? } inline unique_ptr> get_column_family_names_raw() const { ReadLock r_lock(handle_lock); auto ret = make_unique>(); for (auto entry: handles) { ret->push_back(entry.first); } return ret; } }; inline std::unique_ptr open_db_raw(const OptionsBridge &options, const string &path, BridgeStatus &status) { auto cf_names = std::vector(); DB::ListColumnFamilies(options.inner, path, &cf_names); if (cf_names.empty()) { cf_names.push_back(kDefaultColumnFamilyName); } std::vector column_families; for (auto el: cf_names) { column_families.push_back(ColumnFamilyDescriptor( el, options.inner)); } std::vector handles; DB *db_ptr; Status s = DB::Open(options.inner, path, column_families, &handles, &db_ptr); auto ok = s.ok(); write_status(std::move(s), status); unordered_map> handle_map; if (ok) { assert(handles.size() == cf_names.size()); for (size_t i = 0; i < handles.size(); ++i) { handle_map[cf_names[i]] = shared_ptr(handles[i]); } } return std::make_unique(db_ptr, std::move(handle_map)); }