remove raw db logic

main
Ziyang Hu 2 years ago
parent 658784b358
commit f619e8f860

@ -28,67 +28,6 @@ unique_ptr<Options> default_db_options() {
return options;
}
shared_ptr<RawRocksDbBridge>
open_raw_db(const DbOpts &opts, RocksDbStatus &status, bool use_cmp, RustComparatorFn cmp_impl, bool no_wal) {
auto options = default_db_options();
if (opts.prepare_for_bulk_load) {
options->PrepareForBulkLoad();
}
if (opts.increase_parallelism > 0) {
options->IncreaseParallelism(opts.increase_parallelism);
}
if (opts.optimize_level_style_compaction) {
options->OptimizeLevelStyleCompaction();
}
options->create_if_missing = opts.create_if_missing;
options->paranoid_checks = opts.paranoid_checks;
if (opts.enable_blob_files) {
options->enable_blob_files = true;
options->min_blob_size = opts.min_blob_size;
options->blob_file_size = opts.blob_file_size;
options->enable_blob_garbage_collection = opts.enable_blob_garbage_collection;
}
if (opts.use_bloom_filter) {
BlockBasedTableOptions table_options;
table_options.filter_policy.reset(NewBloomFilterPolicy(opts.bloom_filter_bits_per_key, false));
table_options.whole_key_filtering = opts.bloom_filter_whole_key_filtering;
options->table_factory.reset(NewBlockBasedTableFactory(table_options));
}
if (opts.use_capped_prefix_extractor) {
options->prefix_extractor.reset(NewCappedPrefixTransform(opts.capped_prefix_extractor_len));
}
if (opts.use_fixed_prefix_extractor) {
options->prefix_extractor.reset(NewFixedPrefixTransform(opts.fixed_prefix_extractor_len));
}
RustComparator *cmp = nullptr;
if (use_cmp) {
cmp = new RustComparator(
string(opts.comparator_name),
opts.comparator_different_bytes_can_be_equal,
cmp_impl);
options->comparator = cmp;
}
shared_ptr<RawRocksDbBridge> db_wrapper = shared_ptr<RawRocksDbBridge>(nullptr);
auto db = new RawRocksDbBridge();
db->options = std::move(options);
db->db_path = string(opts.db_path);
db->comparator.reset(cmp);
DB *db_ptr = nullptr;
write_status(DB::Open(*db->options, db->db_path, &db_ptr), status);
db->db.reset(db_ptr);
db->destroy_on_exit = opts.destroy_on_exit;
db->r_opts = std::make_unique<ReadOptions>();
db->w_opts = std::make_unique<WriteOptions>();
if (no_wal) {
db->w_opts->disableWAL = true;
}
db_wrapper.reset(db);
return db_wrapper;
}
shared_ptr<RocksDbBridge> open_db(const DbOpts &opts, RocksDbStatus &status, bool use_cmp, RustComparatorFn cmp_impl) {
auto options = default_db_options();

@ -24,81 +24,6 @@ struct SnapshotBridge {
}
};
struct RawRocksDbBridge {
unique_ptr<DB> db;
unique_ptr<Comparator> comparator;
unique_ptr<Options> options;
unique_ptr<ReadOptions> r_opts;
unique_ptr<WriteOptions> w_opts;
bool destroy_on_exit;
string db_path;
inline ~RawRocksDbBridge() {
if (destroy_on_exit) {
auto status = db->Close();
if (!status.ok()) {
cerr << status.ToString() << endl;
}
db.reset();
auto status2 = DestroyDB(db_path, *options);
if (!status2.ok()) {
cerr << status.ToString() << endl;
}
}
}
shared_ptr<SnapshotBridge> make_snapshot() const {
const Snapshot *snapshot = db->GetSnapshot();
return make_shared<SnapshotBridge>(snapshot, &*db);
}
inline void set_ignore_range_deletions(bool v) const {
r_opts->ignore_range_deletions = v;
}
[[nodiscard]] inline const string &get_db_path() const {
return db_path;
}
inline unique_ptr<IterBridge> iterator() const {
return make_unique<IterBridge>(&*db);
};
inline unique_ptr<IterBridge> iterator_with_snapshot(const SnapshotBridge &sb) const {
auto ret = make_unique<IterBridge>(&*db);
ret->set_snapshot(sb.snapshot);
return ret;
};
inline unique_ptr<PinnableSlice> get(RustBytes key, RocksDbStatus &status) const {
Slice key_ = convert_slice(key);
auto ret = make_unique<PinnableSlice>();
auto s = db->Get(*r_opts, db->DefaultColumnFamily(), key_, &*ret);
write_status(s, status);
return ret;
}
inline void exists(RustBytes key, RocksDbStatus &status) const {
Slice key_ = convert_slice(key);
auto ret = PinnableSlice();
auto s = db->Get(*r_opts, db->DefaultColumnFamily(), key_, &ret);
write_status(s, status);
}
inline void put(RustBytes key, RustBytes val, RocksDbStatus &status) const {
write_status(db->Put(*w_opts, convert_slice(key), convert_slice(val)), status);
}
inline void del(RustBytes key, RocksDbStatus &status) const {
write_status(db->Delete(*w_opts, convert_slice(key)), status);
}
inline void del_range(RustBytes start, RustBytes end, RocksDbStatus &status) const {
write_status(db->DeleteRange(*w_opts, db->DefaultColumnFamily(), convert_slice(start), convert_slice(end)),
status);
}
};
struct SstFileWriterBridge {
SstFileWriter inner;
@ -249,9 +174,6 @@ public:
bool can_different_bytes_be_equal;
};
shared_ptr<RawRocksDbBridge>
open_raw_db(const DbOpts &opts, RocksDbStatus &status, bool use_cmp, RustComparatorFn cmp_impl, bool no_wal);
shared_ptr<RocksDbBridge> open_db(const DbOpts &opts, RocksDbStatus &status, bool use_cmp, RustComparatorFn cmp_impl);
#endif //COZOROCKS_DB_H

@ -4,7 +4,6 @@ use cxx::*;
use crate::bridge::ffi::*;
use crate::bridge::tx::TxBuilder;
use crate::{IterBuilder, PinSlice};
#[derive(Default, Clone)]
pub struct DbBuilder<'a> {
@ -137,109 +136,6 @@ impl<'a> DbBuilder<'a> {
Err(status)
}
}
pub fn build_raw(self, no_wal: bool) -> Result<RawRocksDb, RocksDbStatus> {
let mut status = RocksDbStatus::default();
fn dummy(_a: &[u8], _b: &[u8]) -> i8 {
0
}
let result = open_raw_db(
&self.opts,
&mut status,
self.cmp_fn.is_some(),
self.cmp_fn.unwrap_or(dummy),
no_wal,
);
if status.is_ok() {
Ok(RawRocksDb { inner: result })
} else {
Err(status)
}
}
}
#[derive(Clone)]
pub struct RawRocksDb {
inner: SharedPtr<RawRocksDbBridge>,
}
impl RawRocksDb {
pub fn db_path(&self) -> Cow<str> {
self.inner.get_db_path().to_string_lossy()
}
pub fn ignore_range_deletions(self, val: bool) -> Self {
self.inner.set_ignore_range_deletions(val);
self
}
#[inline]
pub fn make_snapshot(&self) -> SharedPtr<SnapshotBridge> {
self.inner.make_snapshot()
}
#[inline]
pub fn iterator(&self) -> IterBuilder {
IterBuilder {
inner: self.inner.iterator(),
}
.auto_prefix_mode(true)
}
#[inline]
pub fn iterator_with_snapshot(&self, snapshot: &SnapshotBridge) -> IterBuilder {
IterBuilder {
inner: self.inner.iterator_with_snapshot(snapshot),
}
.auto_prefix_mode(true)
}
#[inline]
pub fn put(&self, key: &[u8], val: &[u8]) -> Result<(), RocksDbStatus> {
let mut status = RocksDbStatus::default();
self.inner.put(key, val, &mut status);
if status.is_ok() {
Ok(())
} else {
Err(status)
}
}
#[inline]
pub fn del(&self, key: &[u8]) -> Result<(), RocksDbStatus> {
let mut status = RocksDbStatus::default();
self.inner.del(key, &mut status);
if status.is_ok() {
Ok(())
} else {
Err(status)
}
}
#[inline]
pub fn range_del(&mut self, lower: &[u8], upper: &[u8]) -> Result<(), RocksDbStatus> {
let mut status = RocksDbStatus::default();
self.inner.del_range(lower, upper, &mut status);
if status.is_ok() {
Ok(())
} else {
Err(status)
}
}
#[inline]
pub fn get(&self, key: &[u8]) -> Result<Option<PinSlice>, RocksDbStatus> {
let mut status = RocksDbStatus::default();
let ret = self.inner.get(key, &mut status);
match status.code {
StatusCode::kOk => Ok(Some(PinSlice { inner: ret })),
StatusCode::kNotFound => Ok(None),
_ => Err(status),
}
}
#[inline]
pub fn exists(&self, key: &[u8]) -> Result<bool, RocksDbStatus> {
let mut status = RocksDbStatus::default();
self.inner.exists(key, &mut status);
match status.code {
StatusCode::kOk => Ok(true),
StatusCode::kNotFound => Ok(false),
_ => Err(status),
}
}
}
#[derive(Clone)]
@ -325,7 +221,3 @@ impl SstWriter {
unsafe impl Send for RocksDb {}
unsafe impl Sync for RocksDb {}
unsafe impl Send for RawRocksDb {}
unsafe impl Sync for RawRocksDb {}

@ -111,37 +111,7 @@ pub(crate) mod ffi {
// type ReadOptions;
type RawRocksDbBridge;
pub type SnapshotBridge;
fn get_db_path(self: &RawRocksDbBridge) -> &CxxString;
fn open_raw_db(
builder: &DbOpts,
status: &mut RocksDbStatus,
use_cmp: bool,
cmp_impl: fn(&[u8], &[u8]) -> i8,
no_wal: bool,
) -> SharedPtr<RawRocksDbBridge>;
fn set_ignore_range_deletions(self: &RawRocksDbBridge, val: bool);
fn make_snapshot(self: &RawRocksDbBridge) -> SharedPtr<SnapshotBridge>;
fn iterator(self: &RawRocksDbBridge) -> UniquePtr<IterBridge>;
fn iterator_with_snapshot(
self: &RawRocksDbBridge,
snapshot: &SnapshotBridge,
) -> UniquePtr<IterBridge>;
fn get(
self: &RawRocksDbBridge,
key: &[u8],
status: &mut RocksDbStatus,
) -> UniquePtr<PinnableSlice>;
fn exists(self: &RawRocksDbBridge, key: &[u8], status: &mut RocksDbStatus);
fn put(self: &RawRocksDbBridge, key: &[u8], val: &[u8], status: &mut RocksDbStatus);
fn del(self: &RawRocksDbBridge, key: &[u8], status: &mut RocksDbStatus);
fn del_range(
self: &RawRocksDbBridge,
lower: &[u8],
upper: &[u8],
status: &mut RocksDbStatus,
);
type RocksDbBridge;
fn get_db_path(self: &RocksDbBridge) -> &CxxString;

@ -1,5 +1,4 @@
pub use bridge::db::DbBuilder;
pub use bridge::db::RawRocksDb;
pub use bridge::db::RocksDb;
pub use bridge::ffi::RocksDbStatus;
pub use bridge::ffi::SnapshotBridge;

Loading…
Cancel
Save