raw rocksdb bridge

main
Ziyang Hu 2 years ago
parent a857cb11cc
commit bd077062e7

@ -3,9 +3,70 @@
//
#include <iostream>
#include <memory>
#include "db.h"
#include "cozorocks/src/bridge/mod.rs.h"
shared_ptr<RawRocksDbBridge>
open_raw_db(const DbOpts &opts, RocksDbStatus &status, bool use_cmp, RustComparatorFn cmp_impl, bool no_wal) {
auto options = make_unique<Options>();
if (opts.prepare_for_bulk_load) {
options->PrepareForBulkLoad();
}
if (opts.increase_parallelism > 0) {
options->IncreaseParallelism(opts.increase_parallelism);
}
if (opts.optimize_level_style_compaction) {
options->OptimizeLevelStyleCompaction();
}
options->create_if_missing = opts.create_if_missing;
options->paranoid_checks = opts.paranoid_checks;
if (opts.enable_blob_files) {
options->enable_blob_files = true;
options->min_blob_size = opts.min_blob_size;
options->blob_file_size = opts.blob_file_size;
options->enable_blob_garbage_collection = opts.enable_blob_garbage_collection;
}
if (opts.use_bloom_filter) {
BlockBasedTableOptions table_options;
table_options.filter_policy.reset(NewBloomFilterPolicy(opts.bloom_filter_bits_per_key, false));
table_options.whole_key_filtering = opts.bloom_filter_whole_key_filtering;
options->table_factory.reset(NewBlockBasedTableFactory(table_options));
}
if (opts.use_capped_prefix_extractor) {
options->prefix_extractor.reset(NewCappedPrefixTransform(opts.capped_prefix_extractor_len));
}
if (opts.use_fixed_prefix_extractor) {
options->prefix_extractor.reset(NewFixedPrefixTransform(opts.fixed_prefix_extractor_len));
}
RustComparator *cmp = nullptr;
if (use_cmp) {
cmp = new RustComparator(
string(opts.comparator_name),
opts.comparator_different_bytes_can_be_equal,
cmp_impl);
options->comparator = cmp;
}
shared_ptr<RawRocksDbBridge> db_wrapper = shared_ptr<RawRocksDbBridge>(nullptr);
auto db = new RawRocksDbBridge();
db->options = std::move(options);
db->db_path = string(opts.db_path);
db->comparator.reset(cmp);
DB *db_ptr = nullptr;
write_status(DB::Open(*db->options, db->db_path, &db_ptr), status);
db->db.reset(db_ptr);
db->destroy_on_exit = opts.destroy_on_exit;
db->r_opts = std::make_unique<ReadOptions>();
db->w_opts = std::make_unique<WriteOptions>();
if (no_wal) {
db->w_opts->disableWAL = true;
}
db_wrapper.reset(db);
return db_wrapper;
}
shared_ptr<RocksDbBridge> open_db(const DbOpts &opts, RocksDbStatus &status, bool use_cmp, RustComparatorFn cmp_impl) {
auto options = make_unique<Options>();
if (opts.prepare_for_bulk_load) {

@ -12,6 +12,67 @@
#include "tx.h"
#include "slice.h"
struct RawRocksDbBridge {
unique_ptr<DB> db;
unique_ptr<Comparator> comparator;
unique_ptr<Options> options;
unique_ptr<ReadOptions> r_opts;
unique_ptr<WriteOptions> w_opts;
bool destroy_on_exit;
string db_path;
inline ~RawRocksDbBridge() {
if (destroy_on_exit) {
cerr << "destroying database on exit: " << db_path << endl;
auto status = db->Close();
if (!status.ok()) {
cerr << status.ToString() << endl;
}
db.reset();
auto status2 = DestroyDB(db_path, *options);
if (!status2.ok()) {
cerr << status.ToString() << endl;
}
}
}
[[nodiscard]] inline const string &get_db_path() const {
return db_path;
}
inline unique_ptr<IterBridge> iterator() const {
return make_unique<IterBridge>(&*db);
};
inline unique_ptr<PinnableSlice> get(RustBytes key, RocksDbStatus &status) const {
Slice key_ = convert_slice(key);
auto ret = make_unique<PinnableSlice>();
auto s = db->Get(*r_opts, db->DefaultColumnFamily(), key_, &*ret);
write_status(s, status);
return ret;
}
inline void exists(RustBytes key, RocksDbStatus &status) const {
Slice key_ = convert_slice(key);
auto ret = PinnableSlice();
auto s = db->Get(*r_opts, db->DefaultColumnFamily(), key_, &ret);
write_status(s, status);
}
inline void put(RustBytes key, RustBytes val, RocksDbStatus &status) {
write_status(db->Put(*w_opts, convert_slice(key), convert_slice(val)), status);
}
inline void del(RustBytes key, RocksDbStatus &status) {
write_status(db->Delete(*w_opts, convert_slice(key)), status);
}
inline void del_range(RustBytes start, RustBytes end, RocksDbStatus &status) {
write_status(db->DeleteRange(*w_opts, db->DefaultColumnFamily(), convert_slice(start), convert_slice(end)),
status);
}
};
struct RocksDbBridge {
unique_ptr<Comparator> comparator;
unique_ptr<Options> options;
@ -81,6 +142,9 @@ public:
bool can_different_bytes_be_equal;
};
shared_ptr<RawRocksDbBridge>
open_raw_db(const DbOpts &opts, RocksDbStatus &status, bool use_cmp, RustComparatorFn cmp_impl, bool no_wal);
shared_ptr<RocksDbBridge> open_db(const DbOpts &opts, RocksDbStatus &status, bool use_cmp, RustComparatorFn cmp_impl);
#endif //COZOROCKS_DB_H

@ -10,6 +10,7 @@
#include "status.h"
struct IterBridge {
DB *db;
Transaction *tx;
unique_ptr<Iterator> iter;
string lower_storage;
@ -24,6 +25,12 @@ struct IterBridge {
r_opts->auto_prefix_mode = true;
}
explicit IterBridge(DB *db_) : db(db_), iter(nullptr), lower_bound(), upper_bound(),
r_opts(new ReadOptions) {
r_opts->ignore_range_deletions = true;
r_opts->auto_prefix_mode = true;
}
// inline ReadOptions &get_r_opts() {
// return *r_opts;
// }
@ -76,7 +83,11 @@ struct IterBridge {
}
inline void start() {
iter.reset(tx->GetIterator(*r_opts));
if (db == nullptr) {
iter.reset(tx->GetIterator(*r_opts));
} else {
iter.reset(db->NewIterator(*r_opts));
}
}
inline void reset() {

@ -107,6 +107,36 @@ pub(crate) mod ffi {
// type ReadOptions;
type RawRocksDbBridge;
fn get_db_path(self: &RawRocksDbBridge) -> &CxxString;
fn open_raw_db(
builder: &DbOpts,
status: &mut RocksDbStatus,
use_cmp: bool,
cmp_impl: fn(&[u8], &[u8]) -> i8,
no_wal: bool,
) -> SharedPtr<RawRocksDbBridge>;
fn iterator(self: &RawRocksDbBridge) -> UniquePtr<IterBridge>;
fn get(
self: &RawRocksDbBridge,
key: &[u8],
status: &mut RocksDbStatus,
) -> UniquePtr<PinnableSlice>;
fn exists(self: &RawRocksDbBridge, key: &[u8], status: &mut RocksDbStatus);
fn put(
self: Pin<&mut RawRocksDbBridge>,
key: &[u8],
val: &[u8],
status: &mut RocksDbStatus,
);
fn del(self: Pin<&mut RawRocksDbBridge>, key: &[u8], status: &mut RocksDbStatus);
fn del_range(
self: Pin<&mut RawRocksDbBridge>,
lower: &[u8],
upper: &[u8],
status: &mut RocksDbStatus,
);
type RocksDbBridge;
fn get_db_path(self: &RocksDbBridge) -> &CxxString;
fn open_db(

Loading…
Cancel
Save