brand new C++ interop

main
Ziyang Hu 2 years ago
parent 6c02737492
commit faf87a0439

@ -10,5 +10,6 @@
#include "tx.h"
#include "status.h"
#include "opts.h"
#include "iter.h"
#endif //COZOROCKS_BRIDGE_H

@ -0,0 +1,122 @@
//
// Created by Ziyang Hu on 2022/7/3.
//
#ifndef COZOROCKS_ITER_H
#define COZOROCKS_ITER_H
#include "common.h"
#include "slice.h"
#include "status.h"
struct IterBridge {
Transaction *tx;
unique_ptr<Iterator> iter;
Slice lower_bound;
Slice upper_bound;
unique_ptr<ReadOptions> r_opts;
IterBridge(Transaction *tx_) : tx(tx_), iter(nullptr), lower_bound(), upper_bound(), r_opts(new ReadOptions) {
r_opts->ignore_range_deletions = true;
}
// inline ReadOptions &get_r_opts() {
// return *r_opts;
// }
inline void verify_checksums(bool val) {
r_opts->verify_checksums = val;
}
inline void fill_cache(bool val) {
r_opts->fill_cache = val;
}
inline void tailing(bool val) {
r_opts->tailing = val;
}
inline void total_order_seek(bool val) {
r_opts->total_order_seek = val;
}
inline void auto_prefix_mode(bool val) {
r_opts->auto_prefix_mode = val;
}
inline void prefix_same_as_start(bool val) {
r_opts->prefix_same_as_start = val;
}
inline void pin_data(bool val) {
r_opts->pin_data = val;
}
inline void clear_bounds() {
r_opts->iterate_lower_bound = nullptr;
r_opts->iterate_upper_bound = nullptr;
lower_bound.clear();
upper_bound.clear();
}
inline void set_lower_bound(RustBytes bound) {
lower_bound = convert_slice(bound);
r_opts->iterate_lower_bound = &lower_bound;
}
inline void set_upper_bound(RustBytes bound) {
upper_bound = convert_slice(bound);
r_opts->iterate_upper_bound = &upper_bound;
}
inline void start() {
iter.reset(tx->GetIterator(*r_opts));
}
inline void reset() {
iter.reset();
clear_bounds();
}
inline void to_start() {
iter->SeekToFirst();
}
inline void to_end() {
iter->SeekToLast();
}
inline void seek(RustBytes key) {
iter->Seek(convert_slice(key));
}
inline void seek_backward(RustBytes key) {
iter->SeekForPrev(convert_slice(key));
}
inline bool is_valid() const {
return iter->Valid();
}
inline void next() {
iter->Next();
}
inline void prev() {
iter->Prev();
}
inline void status(RdbStatus &status) const {
write_status(iter->status(), status);
}
inline RustBytes key() const {
return convert_slice_back(iter->key());
}
inline RustBytes val() const {
return convert_slice_back(iter->value());
}
};
#endif //COZOROCKS_ITER_H

@ -15,5 +15,8 @@ inline RustBytes convert_slice_back(const Slice &s) {
return rust::Slice(reinterpret_cast<const std::uint8_t *>(s.data()), s.size());
}
inline RustBytes convert_pinnable_slice_back(const PinnableSlice &s) {
return rust::Slice(reinterpret_cast<const std::uint8_t *>(s.data()), s.size());
}
#endif //COZOROCKS_SLICE_H

@ -13,4 +13,5 @@ void TxBridge::start() {
Transaction *txn = tdb->BeginTransaction(*w_opts, *p_tx_opts);
tx.reset(txn);
}
assert(tx);
}

@ -8,6 +8,7 @@
#include "common.h"
#include "slice.h"
#include "status.h"
#include "iter.h"
struct TxBridge {
OptimisticTransactionDB *odb;
@ -20,23 +21,45 @@ struct TxBridge {
TxBridge(OptimisticTransactionDB *odb_) : odb(odb_), tdb(nullptr), w_opts(new WriteOptions),
r_opts(new ReadOptions),
o_tx_opts(new OptimisticTransactionOptions), p_tx_opts(nullptr), tx() {}
o_tx_opts(new OptimisticTransactionOptions), p_tx_opts(nullptr), tx() {
r_opts->ignore_range_deletions = true;
}
TxBridge(TransactionDB *tdb_) : odb(nullptr), tdb(tdb_), w_opts(new WriteOptions), o_tx_opts(nullptr),
r_opts(new ReadOptions),
p_tx_opts(new TransactionOptions), tx() {}
p_tx_opts(new TransactionOptions), tx() {
r_opts->ignore_range_deletions = true;
}
WriteOptions &get_w_opts() {
inline WriteOptions &get_w_opts() {
return *w_opts;
}
inline void set_snapshot() {
// inline ReadOptions &get_r_opts() {
// return *r_opts;
// }
inline void verify_checksums(bool val) {
r_opts->verify_checksums = val;
}
inline void fill_cache(bool val) {
r_opts->fill_cache = val;
}
inline unique_ptr<IterBridge> iterator() {
return make_unique<IterBridge>(&*tx);
};
inline void set_snapshot(bool val) {
if (tx != nullptr) {
tx->SetSnapshot();
if (val) {
tx->SetSnapshot();
}
} else if (o_tx_opts != nullptr) {
o_tx_opts->set_snapshot = true;
o_tx_opts->set_snapshot = val;
} else if (p_tx_opts != nullptr) {
p_tx_opts->set_snapshot = true;
p_tx_opts->set_snapshot = val;
}
}

@ -17,6 +17,7 @@ fn main() {
println!("cargo:rerun-if-changed=cozorocks/bridge/status.h");
println!("cargo:rerun-if-changed=cozorocks/bridge/status.cpp");
println!("cargo:rerun-if-changed=cozorocks/bridge/opts.h");
println!("cargo:rerun-if-changed=cozorocks/bridge/iter.h");
println!("cargo:rerun-if-changed=cozorocks/bridge/tx.h");
println!("cargo:rerun-if-changed=cozorocks/bridge/tx.cpp");

@ -1,10 +1,10 @@
use crate::bridge::ffi::*;
use crate::bridge::tx::TxBuilder;
use cxx::*;
use std::ptr::null;
use crate::bridge::tx::TxBuilder;
#[derive(Default, Debug)]
struct DbBuilder<'a> {
pub struct DbBuilder<'a> {
opts: DbOpts<'a>,
}
@ -142,36 +142,3 @@ impl RocksDb {
unsafe impl Send for RocksDb {}
unsafe impl Sync for RocksDb {}
#[cfg(test)]
mod tests {
use crate::bridge::db::DbBuilder;
fn test_comparator(a: &[u8], b: &[u8]) -> i8 {
use std::cmp::Ordering::*;
match a.cmp(b) {
Equal => 0,
Greater => 1,
Less => -1,
}
}
#[test]
fn creation() {
for optimistic in [true, false] {
let db = DbBuilder::default()
.path(&format!("_test_db_{:?}", optimistic))
.optimistic(optimistic)
.create_if_missing(true)
.use_custom_comparator("rusty_cmp", test_comparator, false)
.destroy_on_exit(true)
.build()
.unwrap();
let mut tx = db.transact()
.disable_wal(true)
.start(false);
tx.set_snapshot();
}
}
}

@ -0,0 +1,149 @@
use crate::bridge::ffi::*;
use cxx::UniquePtr;
use std::os::macos::raw::stat;
pub struct IterBuilder {
pub(crate) inner: UniquePtr<IterBridge>,
}
pub struct DbIter {
pub(crate) inner: UniquePtr<IterBridge>,
}
impl IterBuilder {
pub fn start(mut self) -> DbIter {
self.inner.pin_mut().start();
DbIter { inner: self.inner }
}
pub fn clear_bounds(&mut self) {
self.inner.pin_mut().clear_bounds();
}
pub fn lower_bound(mut self, bound: &[u8]) -> Self {
self.inner.pin_mut().set_lower_bound(bound);
self
}
pub fn upper_bound(mut self, bound: &[u8]) -> Self {
self.inner.pin_mut().set_upper_bound(bound);
self
}
#[inline]
pub fn verify_checksums(mut self, val: bool) -> Self {
self.inner.pin_mut().verify_checksums(val);
self
}
#[inline]
pub fn fill_cache(mut self, val: bool) -> Self {
self.inner.pin_mut().fill_cache(val);
self
}
#[inline]
pub fn tailing(mut self, val: bool) -> Self {
self.inner.pin_mut().tailing(val);
self
}
#[inline]
pub fn total_order_seek(mut self, val: bool) -> Self {
self.inner.pin_mut().total_order_seek(val);
self
}
#[inline]
pub fn auto_prefix_mode(mut self, val: bool) -> Self {
self.inner.pin_mut().auto_prefix_mode(val);
self
}
#[inline]
pub fn prefix_same_as_start(mut self, val: bool) -> Self {
self.inner.pin_mut().prefix_same_as_start(val);
self
}
#[inline]
pub fn pin_data(mut self, val: bool) -> Self {
self.inner.pin_mut().pin_data(val);
self
}
}
impl DbIter {
#[inline]
pub fn reset(mut self) -> IterBuilder {
self.inner.pin_mut().reset();
IterBuilder { inner: self.inner }
}
#[inline]
pub fn seek_to_start(&mut self) {
self.inner.pin_mut().to_start();
}
#[inline]
pub fn seek_to_end(&mut self) {
self.inner.pin_mut().to_end();
}
#[inline]
pub fn seek(&mut self, key: &[u8]) {
self.inner.pin_mut().seek(key);
}
#[inline]
pub fn seek_back(&mut self, key: &[u8]) {
self.inner.pin_mut().seek_backward(key);
}
#[inline]
pub fn is_valid(&self) -> bool {
self.inner.is_valid()
}
#[inline]
pub fn next(&mut self) {
self.inner.pin_mut().next();
}
#[inline]
pub fn prev(&mut self) {
self.inner.pin_mut().prev();
}
#[inline]
pub fn status(&self) -> RdbStatus {
let mut status = RdbStatus::default();
self.inner.status(&mut status);
status
}
#[inline]
pub fn key(&self) -> Result<Option<&[u8]>, RdbStatus> {
if self.is_valid() {
Ok(Some(self.inner.key()))
} else {
let status = self.status();
if status.is_ok() {
Ok(None)
} else {
Err(status)
}
}
}
#[inline]
pub fn val(&self) -> Result<Option<&[u8]>, RdbStatus> {
if self.is_valid() {
Ok(Some(self.inner.val()))
} else {
let status = self.status();
if status.is_ok() {
Ok(None)
} else {
Err(status)
}
}
}
#[inline]
pub fn pair(&self) -> Result<Option<(&[u8], &[u8])>, RdbStatus> {
if self.is_valid() {
Ok(Some((self.inner.key(), self.inner.val())))
} else {
let status = self.status();
if status.is_ok() {
Ok(None)
} else {
Err(status)
}
}
}
}

@ -2,6 +2,7 @@ use std::error::Error;
use std::fmt::{Display, Formatter};
pub(crate) mod db;
pub(crate) mod iter;
pub(crate) mod tx;
#[cxx::bridge]
@ -99,12 +100,13 @@ pub(crate) mod ffi {
type StatusSeverity;
type WriteOptions;
type PinnableSlice;
fn convert_pinnable_slice_back(s: &PinnableSlice) -> &[u8];
fn set_w_opts_sync(o: Pin<&mut WriteOptions>, val: bool);
fn set_w_opts_disable_wal(o: Pin<&mut WriteOptions>, val: bool);
fn set_w_opts_no_slowdown(o: Pin<&mut WriteOptions>, val: bool);
type ReadOptions;
// type ReadOptions;
type RocksDbBridge;
fn get_db_path(self: &RocksDbBridge) -> &CxxString;
@ -112,8 +114,12 @@ pub(crate) mod ffi {
fn transact(self: &RocksDbBridge) -> UniquePtr<TxBridge>;
type TxBridge;
// fn get_r_opts(self: Pin<&mut TxBridge>) -> Pin<&mut ReadOptions>;
fn verify_checksums(self: Pin<&mut TxBridge>, val: bool);
fn fill_cache(self: Pin<&mut TxBridge>, val: bool);
fn get_w_opts(self: Pin<&mut TxBridge>) -> Pin<&mut WriteOptions>;
fn set_snapshot(self: Pin<&mut TxBridge>);
fn start(self: Pin<&mut TxBridge>);
fn set_snapshot(self: Pin<&mut TxBridge>, val: bool);
fn clear_snapshot(self: Pin<&mut TxBridge>);
fn get(
self: Pin<&mut TxBridge>,
@ -127,10 +133,39 @@ pub(crate) mod ffi {
fn rollback(self: Pin<&mut TxBridge>, status: &mut RdbStatus);
fn rollback_to_savepoint(self: Pin<&mut TxBridge>, status: &mut RdbStatus);
fn pop_savepoint(self: Pin<&mut TxBridge>, status: &mut RdbStatus);
fn set_savepoint(self: Pin<&mut TxBridge>);
fn iterator(self: Pin<&mut TxBridge>) -> UniquePtr<IterBridge>;
type IterBridge;
fn start(self: Pin<&mut IterBridge>);
fn reset(self: Pin<&mut IterBridge>);
// fn get_r_opts(self: Pin<&mut IterBridge>) -> Pin<&mut ReadOptions>;
fn clear_bounds(self: Pin<&mut IterBridge>);
fn set_lower_bound(self: Pin<&mut IterBridge>, bound: &[u8]);
fn set_upper_bound(self: Pin<&mut IterBridge>, bound: &[u8]);
fn verify_checksums(self: Pin<&mut IterBridge>, val: bool);
fn fill_cache(self: Pin<&mut IterBridge>, val: bool);
fn tailing(self: Pin<&mut IterBridge>, val: bool);
fn total_order_seek(self: Pin<&mut IterBridge>, val: bool);
fn auto_prefix_mode(self: Pin<&mut IterBridge>, val: bool);
fn prefix_same_as_start(self: Pin<&mut IterBridge>, val: bool);
fn pin_data(self: Pin<&mut IterBridge>, val: bool);
fn to_start(self: Pin<&mut IterBridge>);
fn to_end(self: Pin<&mut IterBridge>);
fn seek(self: Pin<&mut IterBridge>, key: &[u8]);
fn seek_backward(self: Pin<&mut IterBridge>, key: &[u8]);
fn is_valid(self: &IterBridge) -> bool;
fn next(self: Pin<&mut IterBridge>);
fn prev(self: Pin<&mut IterBridge>);
fn status(self: &IterBridge, status: &mut RdbStatus);
fn key(self: &IterBridge) -> &[u8];
fn val(self: &IterBridge) -> &[u8];
}
}
impl Default for ffi::RdbStatus {
#[inline]
fn default() -> Self {
ffi::RdbStatus {
code: ffi::StatusCode::kOk,

@ -1,20 +1,43 @@
use crate::bridge::ffi::*;
use crate::bridge::iter::IterBuilder;
use cxx::*;
use std::ptr::null;
use std::fmt::{Debug, Formatter};
use std::ops::Deref;
pub struct TxBuilder {
pub(crate) inner: UniquePtr<TxBridge>,
}
pub struct PinSlice {
inner: UniquePtr<PinnableSlice>,
}
impl Deref for PinSlice {
type Target = [u8];
fn deref(&self) -> &Self::Target {
convert_pinnable_slice_back(&self.inner)
}
}
impl Debug for PinSlice {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let to_d: &[u8] = &*self;
write!(f, "{:?}", to_d)
}
}
impl TxBuilder {
#[inline]
pub fn start(mut self, with_snapshot: bool) -> Tx {
if with_snapshot {
self.inner.pin_mut().set_snapshot();
}
pub fn start(mut self) -> Tx {
self.inner.pin_mut().start();
Tx { inner: self.inner }
}
#[inline]
pub fn set_snapshot(mut self, val: bool) -> Self {
self.inner.pin_mut().set_snapshot(val);
self
}
#[inline]
pub fn sync(mut self, val: bool) -> Self {
set_w_opts_sync(self.inner.pin_mut().get_w_opts(), val);
self
@ -31,6 +54,18 @@ impl TxBuilder {
set_w_opts_disable_wal(self.inner.pin_mut().get_w_opts(), val);
self
}
#[inline]
pub fn verify_checksums(mut self, val: bool) -> Self {
self.inner.pin_mut().verify_checksums(val);
self
}
#[inline]
pub fn fill_cache(mut self, val: bool) -> Self {
self.inner.pin_mut().fill_cache(val);
self
}
}
pub struct Tx {
@ -40,6 +75,90 @@ pub struct Tx {
impl Tx {
#[inline]
pub fn set_snapshot(&mut self) {
self.inner.pin_mut().set_snapshot()
self.inner.pin_mut().set_snapshot(true)
}
#[inline]
pub fn clear_snapshot(&mut self) {
self.inner.pin_mut().clear_snapshot()
}
#[inline]
pub fn put(&mut self, key: &[u8], val: &[u8]) -> Result<(), RdbStatus> {
let mut status = RdbStatus::default();
self.inner.pin_mut().put(key, val, &mut status);
if status.is_ok() {
Ok(())
} else {
Err(status)
}
}
#[inline]
pub fn del(&mut self, key: &[u8]) -> Result<(), RdbStatus> {
let mut status = RdbStatus::default();
self.inner.pin_mut().del(key, &mut status);
if status.is_ok() {
Ok(())
} else {
Err(status)
}
}
#[inline]
pub fn get(&mut self, key: &[u8], for_update: bool) -> Result<Option<PinSlice>, RdbStatus> {
let mut status = RdbStatus::default();
let ret = self.inner.pin_mut().get(key, for_update, &mut status);
match status.code {
StatusCode::kOk => Ok(Some(PinSlice { inner: ret })),
StatusCode::kNotFound => Ok(None),
_ => Err(status),
}
}
#[inline]
pub fn commit(&mut self) -> Result<(), RdbStatus> {
let mut status = RdbStatus::default();
self.inner.pin_mut().commit(&mut status);
if status.is_ok() {
Ok(())
} else {
Err(status)
}
}
#[inline]
pub fn rollback(&mut self) -> Result<(), RdbStatus> {
let mut status = RdbStatus::default();
self.inner.pin_mut().rollback(&mut status);
if status.is_ok() {
Ok(())
} else {
Err(status)
}
}
#[inline]
pub fn rollback_to_save(&mut self) -> Result<(), RdbStatus> {
let mut status = RdbStatus::default();
self.inner.pin_mut().rollback_to_savepoint(&mut status);
if status.is_ok() {
Ok(())
} else {
Err(status)
}
}
#[inline]
pub fn save(&mut self) {
self.inner.pin_mut().set_savepoint();
}
#[inline]
pub fn pop_save(&mut self) -> Result<(), RdbStatus> {
let mut status = RdbStatus::default();
self.inner.pin_mut().pop_savepoint(&mut status);
if status.is_ok() {
Ok(())
} else {
Err(status)
}
}
#[inline]
pub fn iterator(&mut self) -> IterBuilder {
IterBuilder {
inner: self.inner.pin_mut().iterator(),
}
}
}

@ -1 +1,17 @@
pub mod bridge;
pub(crate) mod bridge;
#[cfg(test)]
mod tests;
pub use bridge::db::DbBuilder;
pub use bridge::db::RocksDb;
pub use bridge::ffi::DbOpts;
pub use bridge::ffi::RdbStatus;
pub use bridge::ffi::StatusCode;
pub use bridge::ffi::StatusSeverity;
pub use bridge::ffi::StatusSubCode;
pub use bridge::iter::DbIter;
pub use bridge::iter::IterBuilder;
pub use bridge::tx::PinSlice;
pub use bridge::tx::Tx;
pub use bridge::tx::TxBuilder;

@ -0,0 +1,52 @@
use crate::*;
fn test_comparator(a: &[u8], b: &[u8]) -> i8 {
use std::cmp::Ordering::*;
match a.cmp(b) {
Equal => 0,
Greater => 1,
Less => -1,
}
}
#[test]
fn creation() {
for optimistic in [true, false] {
let db = DbBuilder::default()
.path(&format!("_test_db_{:?}", optimistic))
.optimistic(optimistic)
.create_if_missing(true)
.use_custom_comparator("rusty_cmp_test", test_comparator, false)
.destroy_on_exit(true)
.build()
.unwrap();
let mut tx = db.transact().disable_wal(true).start();
tx.set_snapshot();
tx.put("hello".as_bytes(), "world".as_bytes()).unwrap();
tx.put("你好".as_bytes(), "世界".as_bytes()).unwrap();
assert_eq!(
"world".as_bytes(),
tx.get("hello".as_bytes(), false).unwrap().unwrap().as_ref()
);
assert_eq!(
"世界".as_bytes(),
tx.get("你好".as_bytes(), false).unwrap().unwrap().as_ref()
);
assert!(tx.get("bye".as_bytes(), false).unwrap().is_none());
let mut it = tx.iterator()
.total_order_seek(true).start();
it.seek_to_start();
while let Some((k, v)) = it.pair().unwrap() {
let mut res = String::from_utf8_lossy(k);
res += ": ";
res += String::from_utf8_lossy(v);
dbg!(res);
it.next();
}
tx.commit().unwrap();
}
}
Loading…
Cancel
Save