From 6c0273749224ea7ed1790c8a613acf2e9c71c12a Mon Sep 17 00:00:00 2001 From: Ziyang Hu Date: Sun, 3 Jul 2022 19:29:20 +0800 Subject: [PATCH] new C++ interop --- cozorocks/bridge/bridge.h | 1 + cozorocks/bridge/common.h | 1 + cozorocks/bridge/db.cpp | 13 +----- cozorocks/bridge/db.h | 26 +++++++---- cozorocks/bridge/opts.h | 22 ++++++++++ cozorocks/bridge/slice.h | 9 ++++ cozorocks/bridge/tx.cpp | 16 +++++++ cozorocks/bridge/tx.h | 87 ++++++++++++++++++++++++++++++++++++- cozorocks/build.rs | 4 +- cozorocks/src/bridge/db.rs | 34 +++++++++++---- cozorocks/src/bridge/mod.rs | 47 ++++++++++++++++++-- cozorocks/src/bridge/tx.rs | 45 +++++++++++++++++++ 12 files changed, 272 insertions(+), 33 deletions(-) create mode 100644 cozorocks/bridge/opts.h create mode 100644 cozorocks/bridge/tx.cpp create mode 100644 cozorocks/src/bridge/tx.rs diff --git a/cozorocks/bridge/bridge.h b/cozorocks/bridge/bridge.h index bf651da6..20720f67 100644 --- a/cozorocks/bridge/bridge.h +++ b/cozorocks/bridge/bridge.h @@ -9,5 +9,6 @@ #include "slice.h" #include "tx.h" #include "status.h" +#include "opts.h" #endif //COZOROCKS_BRIDGE_H diff --git a/cozorocks/bridge/common.h b/cozorocks/bridge/common.h index 490e4d1a..bb6af9ed 100644 --- a/cozorocks/bridge/common.h +++ b/cozorocks/bridge/common.h @@ -25,6 +25,7 @@ struct DbOpts; typedef Status::Code StatusCode; typedef Status::SubCode StatusSubCode; typedef Status::Severity StatusSeverity; +typedef rust::Slice RustBytes; #endif //COZOROCKS_ROCKS_BRIDGE_H diff --git a/cozorocks/bridge/db.cpp b/cozorocks/bridge/db.cpp index ee5afa16..aec58b58 100644 --- a/cozorocks/bridge/db.cpp +++ b/cozorocks/bridge/db.cpp @@ -6,7 +6,7 @@ #include "db.h" #include "cozorocks/src/bridge/mod.rs.h" -shared_ptr open_db(const DbOpts &opts, RdbStatus &status) { +shared_ptr open_db(const DbOpts &opts, RdbStatus &status) { auto options = make_unique(); if (opts.prepare_for_bulk_load) { options->PrepareForBulkLoad(); @@ -46,7 +46,7 @@ shared_ptr open_db(const DbOpts &opts, RdbStatus &status) { options->comparator = cmp; } - shared_ptr db_wrapper = shared_ptr(nullptr); + shared_ptr db_wrapper = shared_ptr(nullptr); if (opts.optimistic) { auto db = new OptimisticRocksDb(); db->options = std::move(options); @@ -75,15 +75,6 @@ shared_ptr open_db(const DbOpts &opts, RdbStatus &status) { return db_wrapper; } -unique_ptr OptimisticRocksDb::start_txn() { - return unique_ptr(nullptr); -} - -unique_ptr PessimisticRocksDb::start_txn() { - return unique_ptr(nullptr); -} - - PessimisticRocksDb::~PessimisticRocksDb() { if (destroy_on_exit) { cerr << "destroying database on exit: " << db_path << endl; diff --git a/cozorocks/bridge/db.h b/cozorocks/bridge/db.h index 28c34da7..6446fdab 100644 --- a/cozorocks/bridge/db.h +++ b/cozorocks/bridge/db.h @@ -8,38 +8,46 @@ #include "iostream" #include "common.h" #include "tx.h" +#include "slice.h" -struct RocksDb { +struct RocksDbBridge { unique_ptr comparator; unique_ptr options; bool destroy_on_exit; string db_path; - virtual unique_ptr start_txn() = 0; + virtual unique_ptr transact() const = 0; inline const string &get_db_path() const { return db_path; } }; -struct OptimisticRocksDb : public RocksDb { +struct OptimisticRocksDb : public RocksDbBridge { unique_ptr db; - virtual unique_ptr start_txn(); + inline virtual unique_ptr transact() const { + auto ret = make_unique(&*this->db); + ret->o_tx_opts->cmp = &*comparator; + return ret; + } virtual ~OptimisticRocksDb(); }; -struct PessimisticRocksDb : public RocksDb { +struct PessimisticRocksDb : public RocksDbBridge { unique_ptr tdb_opts; unique_ptr db; - virtual unique_ptr start_txn(); + inline virtual unique_ptr transact() const { + auto ret = make_unique(&*this->db); + return ret; + } virtual ~PessimisticRocksDb(); }; -typedef int8_t (*CmpFn)(const Slice &a, const Slice &b); +typedef int8_t (*CmpFn)(RustBytes a, RustBytes b); class RustComparator : public Comparator { public: @@ -51,7 +59,7 @@ public: } inline int Compare(const Slice &a, const Slice &b) const { - return ext_cmp(a, b); + return ext_cmp(convert_slice_back(a), convert_slice_back(b)); } inline const char *Name() const { @@ -71,6 +79,6 @@ public: bool can_different_bytes_be_equal; }; -shared_ptr open_db(const DbOpts &opts, RdbStatus &status); +shared_ptr open_db(const DbOpts &opts, RdbStatus &status); #endif //COZOROCKS_DB_H diff --git a/cozorocks/bridge/opts.h b/cozorocks/bridge/opts.h new file mode 100644 index 00000000..25bcfb42 --- /dev/null +++ b/cozorocks/bridge/opts.h @@ -0,0 +1,22 @@ +// +// Created by Ziyang Hu on 2022/7/3. +// + +#ifndef COZOROCKS_OPTS_H +#define COZOROCKS_OPTS_H + +#include "common.h" + +inline void set_w_opts_sync(WriteOptions& opts, bool val) { + opts.sync = val; +} + +inline void set_w_opts_disable_wal(WriteOptions& opts, bool val) { + opts.disableWAL = val; +} + +inline void set_w_opts_no_slowdown(WriteOptions& opts, bool val) { + opts.no_slowdown = val; +} + +#endif //COZOROCKS_OPTS_H diff --git a/cozorocks/bridge/slice.h b/cozorocks/bridge/slice.h index ed7dbd60..205985fd 100644 --- a/cozorocks/bridge/slice.h +++ b/cozorocks/bridge/slice.h @@ -7,4 +7,13 @@ #include "common.h" +inline Slice convert_slice(RustBytes d) { + return Slice(reinterpret_cast(d.data()), d.size()); +} + +inline RustBytes convert_slice_back(const Slice &s) { + return rust::Slice(reinterpret_cast(s.data()), s.size()); +} + + #endif //COZOROCKS_SLICE_H diff --git a/cozorocks/bridge/tx.cpp b/cozorocks/bridge/tx.cpp new file mode 100644 index 00000000..cf13a53a --- /dev/null +++ b/cozorocks/bridge/tx.cpp @@ -0,0 +1,16 @@ +// +// Created by Ziyang Hu on 2022/7/3. +// + +#include "tx.h" +#include "cozorocks/src/bridge/mod.rs.h" + +void TxBridge::start() { + if (odb != nullptr) { + Transaction *txn = odb->BeginTransaction(*w_opts, *o_tx_opts); + tx.reset(txn); + } else if (tdb != nullptr) { + Transaction *txn = tdb->BeginTransaction(*w_opts, *p_tx_opts); + tx.reset(txn); + } +} \ No newline at end of file diff --git a/cozorocks/bridge/tx.h b/cozorocks/bridge/tx.h index 05f55dd8..f5e71957 100644 --- a/cozorocks/bridge/tx.h +++ b/cozorocks/bridge/tx.h @@ -6,9 +6,94 @@ #define COZOROCKS_TX_H #include "common.h" +#include "slice.h" +#include "status.h" -struct RdbTx { +struct TxBridge { + OptimisticTransactionDB *odb; + TransactionDB *tdb; + unique_ptr tx; + unique_ptr w_opts; + unique_ptr r_opts; + unique_ptr o_tx_opts; + unique_ptr p_tx_opts; + TxBridge(OptimisticTransactionDB *odb_) : odb(odb_), tdb(nullptr), w_opts(new WriteOptions), + r_opts(new ReadOptions), + o_tx_opts(new OptimisticTransactionOptions), p_tx_opts(nullptr), tx() {} + + TxBridge(TransactionDB *tdb_) : odb(nullptr), tdb(tdb_), w_opts(new WriteOptions), o_tx_opts(nullptr), + r_opts(new ReadOptions), + p_tx_opts(new TransactionOptions), tx() {} + + WriteOptions &get_w_opts() { + return *w_opts; + } + + inline void set_snapshot() { + if (tx != nullptr) { + tx->SetSnapshot(); + } else if (o_tx_opts != nullptr) { + o_tx_opts->set_snapshot = true; + } else if (p_tx_opts != nullptr) { + p_tx_opts->set_snapshot = true; + } + } + + inline void clear_snapshot() { + tx->ClearSnapshot(); + } + + inline DB *get_db() const { + if (tdb != nullptr) { + return tdb; + } else { + return odb; + } + } + + void start(); + + inline unique_ptr get(RustBytes key, bool for_update, RdbStatus &status) { + Slice key_ = convert_slice(key); + auto ret = make_unique(); + if (for_update) { + auto s = tx->GetForUpdate(*r_opts, get_db()->DefaultColumnFamily(), key_, &*ret); + write_status(s, status); + } else { + auto s = tx->Get(*r_opts, key_, &*ret); + write_status(s, status); + } + return ret; + } + + inline void put(RustBytes key, RustBytes val, RdbStatus &status) { + write_status(tx->Put(convert_slice(key), convert_slice(val)), status); + } + + inline void del(RustBytes key, RdbStatus &status) { + write_status(tx->Delete(convert_slice(key)), status); + } + + inline void commit(RdbStatus &status) { + write_status(tx->Commit(), status); + } + + inline void rollback(RdbStatus &status) { + write_status(tx->Rollback(), status); + } + + inline void rollback_to_savepoint(RdbStatus &status) { + write_status(tx->RollbackToSavePoint(), status); + } + + inline void pop_savepoint(RdbStatus &status) { + write_status(tx->PopSavePoint(), status); + } + + inline void set_savepoint() { + tx->SetSavePoint(); + } }; #endif //COZOROCKS_TX_H diff --git a/cozorocks/build.rs b/cozorocks/build.rs index 2e0db78b..95c579e4 100644 --- a/cozorocks/build.rs +++ b/cozorocks/build.rs @@ -16,10 +16,12 @@ fn main() { println!("cargo:rerun-if-changed=cozorocks/bridge/slice.h"); println!("cargo:rerun-if-changed=cozorocks/bridge/status.h"); println!("cargo:rerun-if-changed=cozorocks/bridge/status.cpp"); + println!("cargo:rerun-if-changed=cozorocks/bridge/opts.h"); println!("cargo:rerun-if-changed=cozorocks/bridge/tx.h"); + println!("cargo:rerun-if-changed=cozorocks/bridge/tx.cpp"); cxx_build::bridge("src/bridge/mod.rs") - .files(["bridge/status.cpp", "bridge/db.cpp"]) + .files(["bridge/status.cpp", "bridge/db.cpp", "bridge/tx.cpp"]) .include("../deps/include") .include("bridge") .flag_if_supported("-std=c++17") diff --git a/cozorocks/src/bridge/db.rs b/cozorocks/src/bridge/db.rs index 0a1c867d..4aa0d05d 100644 --- a/cozorocks/src/bridge/db.rs +++ b/cozorocks/src/bridge/db.rs @@ -1,6 +1,7 @@ use crate::bridge::ffi::*; use cxx::*; use std::ptr::null; +use crate::bridge::tx::TxBuilder; #[derive(Default, Debug)] struct DbBuilder<'a> { @@ -114,18 +115,33 @@ impl<'a> DbBuilder<'a> { self.opts.destroy_on_exit = destroy; self } - pub fn build(self) -> Result, RdbStatus> { - dbg!(&self.opts); + pub fn build(self) -> Result { let mut status = RdbStatus::default(); let result = open_db(&self.opts, &mut status); if status.is_ok() { - Ok(result) + Ok(RocksDb { inner: result }) } else { Err(status) } } } +#[derive(Clone)] +pub struct RocksDb { + inner: SharedPtr, +} + +impl RocksDb { + pub fn transact(&self) -> TxBuilder { + TxBuilder { + inner: self.inner.transact(), + } + } +} + +unsafe impl Send for RocksDb {} +unsafe impl Sync for RocksDb {} + #[cfg(test)] mod tests { use crate::bridge::db::DbBuilder; @@ -142,18 +158,20 @@ mod tests { #[test] fn creation() { - { + for optimistic in [true, false] { let db = DbBuilder::default() - .path("_test_db") - .optimistic(true) + .path(&format!("_test_db_{:?}", optimistic)) + .optimistic(optimistic) .create_if_missing(true) .use_custom_comparator("rusty_cmp", test_comparator, false) .destroy_on_exit(true) .build() .unwrap(); - } - for _ in 0..10000000 { + let mut tx = db.transact() + .disable_wal(true) + .start(false); + tx.set_snapshot(); } } } diff --git a/cozorocks/src/bridge/mod.rs b/cozorocks/src/bridge/mod.rs index afa11208..d4ecfbf0 100644 --- a/cozorocks/src/bridge/mod.rs +++ b/cozorocks/src/bridge/mod.rs @@ -1,4 +1,8 @@ +use std::error::Error; +use std::fmt::{Display, Formatter}; + pub(crate) mod db; +pub(crate) mod tx; #[cxx::bridge] pub(crate) mod ffi { @@ -93,11 +97,36 @@ pub(crate) mod ffi { type StatusCode; type StatusSubCode; type StatusSeverity; + type WriteOptions; + type PinnableSlice; + + fn set_w_opts_sync(o: Pin<&mut WriteOptions>, val: bool); + fn set_w_opts_disable_wal(o: Pin<&mut WriteOptions>, val: bool); + fn set_w_opts_no_slowdown(o: Pin<&mut WriteOptions>, val: bool); - type RocksDb; - fn get_db_path(self: &RocksDb) -> &CxxString; + type ReadOptions; - fn open_db(builder: &DbOpts, status: &mut RdbStatus) -> SharedPtr; + type RocksDbBridge; + fn get_db_path(self: &RocksDbBridge) -> &CxxString; + fn open_db(builder: &DbOpts, status: &mut RdbStatus) -> SharedPtr; + fn transact(self: &RocksDbBridge) -> UniquePtr; + + type TxBridge; + fn get_w_opts(self: Pin<&mut TxBridge>) -> Pin<&mut WriteOptions>; + fn set_snapshot(self: Pin<&mut TxBridge>); + fn clear_snapshot(self: Pin<&mut TxBridge>); + fn get( + self: Pin<&mut TxBridge>, + key: &[u8], + for_update: bool, + status: &mut RdbStatus, + ) -> UniquePtr; + fn put(self: Pin<&mut TxBridge>, key: &[u8], val: &[u8], status: &mut RdbStatus); + fn del(self: Pin<&mut TxBridge>, key: &[u8], status: &mut RdbStatus); + fn commit(self: Pin<&mut TxBridge>, status: &mut RdbStatus); + fn rollback(self: Pin<&mut TxBridge>, status: &mut RdbStatus); + fn rollback_to_savepoint(self: Pin<&mut TxBridge>, status: &mut RdbStatus); + fn pop_savepoint(self: Pin<&mut TxBridge>, status: &mut RdbStatus); } } @@ -112,6 +141,18 @@ impl Default for ffi::RdbStatus { } } +impl Error for ffi::RdbStatus {} + +impl Display for ffi::RdbStatus { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + if self.message.is_empty() { + write!(f, "RocksDB error: {:?}", self) + } else { + write!(f, "RocksDB error: {}", self.message) + } + } +} + impl ffi::RdbStatus { #[inline(always)] pub fn is_ok(&self) -> bool { diff --git a/cozorocks/src/bridge/tx.rs b/cozorocks/src/bridge/tx.rs new file mode 100644 index 00000000..e4150450 --- /dev/null +++ b/cozorocks/src/bridge/tx.rs @@ -0,0 +1,45 @@ +use crate::bridge::ffi::*; +use cxx::*; +use std::ptr::null; + +pub struct TxBuilder { + pub(crate) inner: UniquePtr, +} + +impl TxBuilder { + #[inline] + pub fn start(mut self, with_snapshot: bool) -> Tx { + if with_snapshot { + self.inner.pin_mut().set_snapshot(); + } + Tx { inner: self.inner } + } + #[inline] + pub fn sync(mut self, val: bool) -> Self { + set_w_opts_sync(self.inner.pin_mut().get_w_opts(), val); + self + } + + #[inline] + pub fn no_slowdown(mut self, val: bool) -> Self { + set_w_opts_no_slowdown(self.inner.pin_mut().get_w_opts(), val); + self + } + + #[inline] + pub fn disable_wal(mut self, val: bool) -> Self { + set_w_opts_disable_wal(self.inner.pin_mut().get_w_opts(), val); + self + } +} + +pub struct Tx { + pub(crate) inner: UniquePtr, +} + +impl Tx { + #[inline] + pub fn set_snapshot(&mut self) { + self.inner.pin_mut().set_snapshot() + } +}