new C++ interop

main
Ziyang Hu 2 years ago
parent 41d89d0415
commit 6c02737492

@ -9,5 +9,6 @@
#include "slice.h"
#include "tx.h"
#include "status.h"
#include "opts.h"
#endif //COZOROCKS_BRIDGE_H

@ -25,6 +25,7 @@ struct DbOpts;
typedef Status::Code StatusCode;
typedef Status::SubCode StatusSubCode;
typedef Status::Severity StatusSeverity;
typedef rust::Slice<const uint8_t> RustBytes;
#endif //COZOROCKS_ROCKS_BRIDGE_H

@ -6,7 +6,7 @@
#include "db.h"
#include "cozorocks/src/bridge/mod.rs.h"
shared_ptr<RocksDb> open_db(const DbOpts &opts, RdbStatus &status) {
shared_ptr<RocksDbBridge> open_db(const DbOpts &opts, RdbStatus &status) {
auto options = make_unique<Options>();
if (opts.prepare_for_bulk_load) {
options->PrepareForBulkLoad();
@ -46,7 +46,7 @@ shared_ptr<RocksDb> open_db(const DbOpts &opts, RdbStatus &status) {
options->comparator = cmp;
}
shared_ptr<RocksDb> db_wrapper = shared_ptr<RocksDb>(nullptr);
shared_ptr<RocksDbBridge> db_wrapper = shared_ptr<RocksDbBridge>(nullptr);
if (opts.optimistic) {
auto db = new OptimisticRocksDb();
db->options = std::move(options);
@ -75,15 +75,6 @@ shared_ptr<RocksDb> open_db(const DbOpts &opts, RdbStatus &status) {
return db_wrapper;
}
unique_ptr<RdbTx> OptimisticRocksDb::start_txn() {
return unique_ptr<RdbTx>(nullptr);
}
unique_ptr<RdbTx> PessimisticRocksDb::start_txn() {
return unique_ptr<RdbTx>(nullptr);
}
PessimisticRocksDb::~PessimisticRocksDb() {
if (destroy_on_exit) {
cerr << "destroying database on exit: " << db_path << endl;

@ -8,38 +8,46 @@
#include "iostream"
#include "common.h"
#include "tx.h"
#include "slice.h"
struct RocksDb {
struct RocksDbBridge {
unique_ptr<Comparator> comparator;
unique_ptr<Options> options;
bool destroy_on_exit;
string db_path;
virtual unique_ptr<RdbTx> start_txn() = 0;
virtual unique_ptr<TxBridge> transact() const = 0;
inline const string &get_db_path() const {
return db_path;
}
};
struct OptimisticRocksDb : public RocksDb {
struct OptimisticRocksDb : public RocksDbBridge {
unique_ptr<OptimisticTransactionDB> db;
virtual unique_ptr<RdbTx> start_txn();
inline virtual unique_ptr<TxBridge> transact() const {
auto ret = make_unique<TxBridge>(&*this->db);
ret->o_tx_opts->cmp = &*comparator;
return ret;
}
virtual ~OptimisticRocksDb();
};
struct PessimisticRocksDb : public RocksDb {
struct PessimisticRocksDb : public RocksDbBridge {
unique_ptr<TransactionDBOptions> tdb_opts;
unique_ptr<TransactionDB> db;
virtual unique_ptr<RdbTx> start_txn();
inline virtual unique_ptr<TxBridge> transact() const {
auto ret = make_unique<TxBridge>(&*this->db);
return ret;
}
virtual ~PessimisticRocksDb();
};
typedef int8_t (*CmpFn)(const Slice &a, const Slice &b);
typedef int8_t (*CmpFn)(RustBytes a, RustBytes b);
class RustComparator : public Comparator {
public:
@ -51,7 +59,7 @@ public:
}
inline int Compare(const Slice &a, const Slice &b) const {
return ext_cmp(a, b);
return ext_cmp(convert_slice_back(a), convert_slice_back(b));
}
inline const char *Name() const {
@ -71,6 +79,6 @@ public:
bool can_different_bytes_be_equal;
};
shared_ptr<RocksDb> open_db(const DbOpts &opts, RdbStatus &status);
shared_ptr<RocksDbBridge> open_db(const DbOpts &opts, RdbStatus &status);
#endif //COZOROCKS_DB_H

@ -0,0 +1,22 @@
//
// Created by Ziyang Hu on 2022/7/3.
//
#ifndef COZOROCKS_OPTS_H
#define COZOROCKS_OPTS_H
#include "common.h"
inline void set_w_opts_sync(WriteOptions& opts, bool val) {
opts.sync = val;
}
inline void set_w_opts_disable_wal(WriteOptions& opts, bool val) {
opts.disableWAL = val;
}
inline void set_w_opts_no_slowdown(WriteOptions& opts, bool val) {
opts.no_slowdown = val;
}
#endif //COZOROCKS_OPTS_H

@ -7,4 +7,13 @@
#include "common.h"
inline Slice convert_slice(RustBytes d) {
return Slice(reinterpret_cast<const char *>(d.data()), d.size());
}
inline RustBytes convert_slice_back(const Slice &s) {
return rust::Slice(reinterpret_cast<const std::uint8_t *>(s.data()), s.size());
}
#endif //COZOROCKS_SLICE_H

@ -0,0 +1,16 @@
//
// Created by Ziyang Hu on 2022/7/3.
//
#include "tx.h"
#include "cozorocks/src/bridge/mod.rs.h"
void TxBridge::start() {
if (odb != nullptr) {
Transaction *txn = odb->BeginTransaction(*w_opts, *o_tx_opts);
tx.reset(txn);
} else if (tdb != nullptr) {
Transaction *txn = tdb->BeginTransaction(*w_opts, *p_tx_opts);
tx.reset(txn);
}
}

@ -6,9 +6,94 @@
#define COZOROCKS_TX_H
#include "common.h"
#include "slice.h"
#include "status.h"
struct RdbTx {
struct TxBridge {
OptimisticTransactionDB *odb;
TransactionDB *tdb;
unique_ptr<Transaction> tx;
unique_ptr<WriteOptions> w_opts;
unique_ptr<ReadOptions> r_opts;
unique_ptr<OptimisticTransactionOptions> o_tx_opts;
unique_ptr<TransactionOptions> p_tx_opts;
TxBridge(OptimisticTransactionDB *odb_) : odb(odb_), tdb(nullptr), w_opts(new WriteOptions),
r_opts(new ReadOptions),
o_tx_opts(new OptimisticTransactionOptions), p_tx_opts(nullptr), tx() {}
TxBridge(TransactionDB *tdb_) : odb(nullptr), tdb(tdb_), w_opts(new WriteOptions), o_tx_opts(nullptr),
r_opts(new ReadOptions),
p_tx_opts(new TransactionOptions), tx() {}
WriteOptions &get_w_opts() {
return *w_opts;
}
inline void set_snapshot() {
if (tx != nullptr) {
tx->SetSnapshot();
} else if (o_tx_opts != nullptr) {
o_tx_opts->set_snapshot = true;
} else if (p_tx_opts != nullptr) {
p_tx_opts->set_snapshot = true;
}
}
inline void clear_snapshot() {
tx->ClearSnapshot();
}
inline DB *get_db() const {
if (tdb != nullptr) {
return tdb;
} else {
return odb;
}
}
void start();
inline unique_ptr<PinnableSlice> get(RustBytes key, bool for_update, RdbStatus &status) {
Slice key_ = convert_slice(key);
auto ret = make_unique<PinnableSlice>();
if (for_update) {
auto s = tx->GetForUpdate(*r_opts, get_db()->DefaultColumnFamily(), key_, &*ret);
write_status(s, status);
} else {
auto s = tx->Get(*r_opts, key_, &*ret);
write_status(s, status);
}
return ret;
}
inline void put(RustBytes key, RustBytes val, RdbStatus &status) {
write_status(tx->Put(convert_slice(key), convert_slice(val)), status);
}
inline void del(RustBytes key, RdbStatus &status) {
write_status(tx->Delete(convert_slice(key)), status);
}
inline void commit(RdbStatus &status) {
write_status(tx->Commit(), status);
}
inline void rollback(RdbStatus &status) {
write_status(tx->Rollback(), status);
}
inline void rollback_to_savepoint(RdbStatus &status) {
write_status(tx->RollbackToSavePoint(), status);
}
inline void pop_savepoint(RdbStatus &status) {
write_status(tx->PopSavePoint(), status);
}
inline void set_savepoint() {
tx->SetSavePoint();
}
};
#endif //COZOROCKS_TX_H

@ -16,10 +16,12 @@ fn main() {
println!("cargo:rerun-if-changed=cozorocks/bridge/slice.h");
println!("cargo:rerun-if-changed=cozorocks/bridge/status.h");
println!("cargo:rerun-if-changed=cozorocks/bridge/status.cpp");
println!("cargo:rerun-if-changed=cozorocks/bridge/opts.h");
println!("cargo:rerun-if-changed=cozorocks/bridge/tx.h");
println!("cargo:rerun-if-changed=cozorocks/bridge/tx.cpp");
cxx_build::bridge("src/bridge/mod.rs")
.files(["bridge/status.cpp", "bridge/db.cpp"])
.files(["bridge/status.cpp", "bridge/db.cpp", "bridge/tx.cpp"])
.include("../deps/include")
.include("bridge")
.flag_if_supported("-std=c++17")

@ -1,6 +1,7 @@
use crate::bridge::ffi::*;
use cxx::*;
use std::ptr::null;
use crate::bridge::tx::TxBuilder;
#[derive(Default, Debug)]
struct DbBuilder<'a> {
@ -114,18 +115,33 @@ impl<'a> DbBuilder<'a> {
self.opts.destroy_on_exit = destroy;
self
}
pub fn build(self) -> Result<SharedPtr<RocksDb>, RdbStatus> {
dbg!(&self.opts);
pub fn build(self) -> Result<RocksDb, RdbStatus> {
let mut status = RdbStatus::default();
let result = open_db(&self.opts, &mut status);
if status.is_ok() {
Ok(result)
Ok(RocksDb { inner: result })
} else {
Err(status)
}
}
}
#[derive(Clone)]
pub struct RocksDb {
inner: SharedPtr<RocksDbBridge>,
}
impl RocksDb {
pub fn transact(&self) -> TxBuilder {
TxBuilder {
inner: self.inner.transact(),
}
}
}
unsafe impl Send for RocksDb {}
unsafe impl Sync for RocksDb {}
#[cfg(test)]
mod tests {
use crate::bridge::db::DbBuilder;
@ -142,18 +158,20 @@ mod tests {
#[test]
fn creation() {
{
for optimistic in [true, false] {
let db = DbBuilder::default()
.path("_test_db")
.optimistic(true)
.path(&format!("_test_db_{:?}", optimistic))
.optimistic(optimistic)
.create_if_missing(true)
.use_custom_comparator("rusty_cmp", test_comparator, false)
.destroy_on_exit(true)
.build()
.unwrap();
}
for _ in 0..10000000 {
let mut tx = db.transact()
.disable_wal(true)
.start(false);
tx.set_snapshot();
}
}
}

@ -1,4 +1,8 @@
use std::error::Error;
use std::fmt::{Display, Formatter};
pub(crate) mod db;
pub(crate) mod tx;
#[cxx::bridge]
pub(crate) mod ffi {
@ -93,11 +97,36 @@ pub(crate) mod ffi {
type StatusCode;
type StatusSubCode;
type StatusSeverity;
type WriteOptions;
type PinnableSlice;
fn set_w_opts_sync(o: Pin<&mut WriteOptions>, val: bool);
fn set_w_opts_disable_wal(o: Pin<&mut WriteOptions>, val: bool);
fn set_w_opts_no_slowdown(o: Pin<&mut WriteOptions>, val: bool);
type RocksDb;
fn get_db_path(self: &RocksDb) -> &CxxString;
type ReadOptions;
fn open_db(builder: &DbOpts, status: &mut RdbStatus) -> SharedPtr<RocksDb>;
type RocksDbBridge;
fn get_db_path(self: &RocksDbBridge) -> &CxxString;
fn open_db(builder: &DbOpts, status: &mut RdbStatus) -> SharedPtr<RocksDbBridge>;
fn transact(self: &RocksDbBridge) -> UniquePtr<TxBridge>;
type TxBridge;
fn get_w_opts(self: Pin<&mut TxBridge>) -> Pin<&mut WriteOptions>;
fn set_snapshot(self: Pin<&mut TxBridge>);
fn clear_snapshot(self: Pin<&mut TxBridge>);
fn get(
self: Pin<&mut TxBridge>,
key: &[u8],
for_update: bool,
status: &mut RdbStatus,
) -> UniquePtr<PinnableSlice>;
fn put(self: Pin<&mut TxBridge>, key: &[u8], val: &[u8], status: &mut RdbStatus);
fn del(self: Pin<&mut TxBridge>, key: &[u8], status: &mut RdbStatus);
fn commit(self: Pin<&mut TxBridge>, status: &mut RdbStatus);
fn rollback(self: Pin<&mut TxBridge>, status: &mut RdbStatus);
fn rollback_to_savepoint(self: Pin<&mut TxBridge>, status: &mut RdbStatus);
fn pop_savepoint(self: Pin<&mut TxBridge>, status: &mut RdbStatus);
}
}
@ -112,6 +141,18 @@ impl Default for ffi::RdbStatus {
}
}
impl Error for ffi::RdbStatus {}
impl Display for ffi::RdbStatus {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
if self.message.is_empty() {
write!(f, "RocksDB error: {:?}", self)
} else {
write!(f, "RocksDB error: {}", self.message)
}
}
}
impl ffi::RdbStatus {
#[inline(always)]
pub fn is_ok(&self) -> bool {

@ -0,0 +1,45 @@
use crate::bridge::ffi::*;
use cxx::*;
use std::ptr::null;
pub struct TxBuilder {
pub(crate) inner: UniquePtr<TxBridge>,
}
impl TxBuilder {
#[inline]
pub fn start(mut self, with_snapshot: bool) -> Tx {
if with_snapshot {
self.inner.pin_mut().set_snapshot();
}
Tx { inner: self.inner }
}
#[inline]
pub fn sync(mut self, val: bool) -> Self {
set_w_opts_sync(self.inner.pin_mut().get_w_opts(), val);
self
}
#[inline]
pub fn no_slowdown(mut self, val: bool) -> Self {
set_w_opts_no_slowdown(self.inner.pin_mut().get_w_opts(), val);
self
}
#[inline]
pub fn disable_wal(mut self, val: bool) -> Self {
set_w_opts_disable_wal(self.inner.pin_mut().get_w_opts(), val);
self
}
}
pub struct Tx {
pub(crate) inner: UniquePtr<TxBridge>,
}
impl Tx {
#[inline]
pub fn set_snapshot(&mut self) {
self.inner.pin_mut().set_snapshot()
}
}
Loading…
Cancel
Save