From 1f86b966063a5ae246a610b1de90caab1f876c90 Mon Sep 17 00:00:00 2001 From: Ziyang Hu Date: Thu, 30 Jun 2022 21:37:36 +0800 Subject: [PATCH] more c++ API integration --- cozorocks/src/bridge.h | 76 +++++++++++++++++++---- cozorocks/src/lib.rs | 135 +++++++++++++++++++++++++++++++++++------ 2 files changed, 182 insertions(+), 29 deletions(-) diff --git a/cozorocks/src/bridge.h b/cozorocks/src/bridge.h index 80f6c029..2b938915 100644 --- a/cozorocks/src/bridge.h +++ b/cozorocks/src/bridge.h @@ -20,6 +20,17 @@ namespace rocksdb_additions { // for write options + // force generation + + unique_ptr _u1() { + return unique_ptr(nullptr); + } + + unique_ptr _u2() { + return unique_ptr(nullptr); + } + + inline void set_w_opts_sync(WriteOptions &opts, bool v) { Status s; opts.sync = v; @@ -145,17 +156,58 @@ namespace rocksdb_additions { opts.cmp = &cmp; } - // opening + // database + + enum DbKind { + RAW = 0, + PESSIMISTIC = 1, + OPTIMISTIC = 2, + }; + + struct DbBridge { + mutable unique_ptr db; + mutable TransactionDB *tdb; + mutable OptimisticTransactionDB *odb; + bool is_odb; + + DbBridge(DB *db_) : db(db_) {} + + DbBridge(TransactionDB *db_) : db(db_), tdb(db_) {} + + DbBridge(OptimisticTransactionDB *db_) : db(db_), odb(db_) {} + + DbKind kind() const { + if (tdb != nullptr) { + return DbKind::PESSIMISTIC; + } else if (odb != nullptr) { + return DbKind::OPTIMISTIC; + } else { + return DbKind::RAW; + } + } + + DB *inner_db() const { + return db.get(); + } + + TransactionDB *inner_tdb() const { + return tdb; + } + + OptimisticTransactionDB *inner_odb() const { + return odb; + } + }; - inline shared_ptr + inline shared_ptr open_db_raw(const Options &options, const string &path, Status &status) { DB *db = nullptr; status = DB::Open(options, path, &db); - return shared_ptr(db); + return make_shared(db); } - inline shared_ptr + inline shared_ptr open_tdb_raw(const Options &options, const TransactionDBOptions &txn_db_options, const string &path, @@ -164,11 +216,11 @@ namespace rocksdb_additions { status = TransactionDB::Open(options, txn_db_options, path, &txn_db); - return shared_ptr(txn_db); + return make_shared(txn_db); } - inline shared_ptr + inline shared_ptr open_odb_raw(const Options &options, const string &path, Status &status) { OptimisticTransactionDB *txn_db = nullptr; @@ -176,13 +228,13 @@ namespace rocksdb_additions { path, &txn_db); - return shared_ptr(txn_db); + return make_shared(txn_db); } // comparator - typedef int(*CmpFn)(const rocksdb::Slice &a, const rocksdb::Slice &b); + typedef int(*CmpFn)(const Slice &a, const Slice &b); class RustComparator : public Comparator { public: @@ -193,7 +245,7 @@ namespace rocksdb_additions { ext_cmp = f_; } - inline int Compare(const rocksdb::Slice &a, const rocksdb::Slice &b) const { + inline int Compare(const Slice &a, const Slice &b) const { return ext_cmp(a, b); } @@ -205,11 +257,11 @@ namespace rocksdb_additions { return can_different_bytes_be_equal; } - inline void FindShortestSeparator(std::string *, const rocksdb::Slice &) const {} + inline void FindShortestSeparator(string *, const Slice &) const {} - inline void FindShortSuccessor(std::string *) const {} + inline void FindShortSuccessor(string *) const {} - std::string name; + string name; CmpFn ext_cmp; bool can_different_bytes_be_equal; }; diff --git a/cozorocks/src/lib.rs b/cozorocks/src/lib.rs index e909b849..82594d57 100644 --- a/cozorocks/src/lib.rs +++ b/cozorocks/src/lib.rs @@ -67,17 +67,124 @@ pub fn convert_slice_back(src: &Slice) -> &[u8] { unsafe { std::slice::from_raw_parts(src.data() as *const u8, src.size()) } } -#[inline] -pub fn put( - db: Pin<&mut DB>, - opts: &WriteOptions, - key: impl AsRef<[u8]>, - val: impl AsRef<[u8]>, -) -> DbStatus { - let key = convert_slice(key.as_ref()); - let val = convert_slice(val.as_ref()); - moveit! { let status = db.Put2(opts, &key, &val); } - convert_status(&status) +impl DbBridge { + #[inline] + fn get_raw_db(&self) -> Pin<&mut DB> { + unsafe { Pin::new_unchecked(&mut *self.inner_db()) } + } + #[inline] + fn get_tdb(&self) -> Pin<&mut TransactionDB> { + debug_assert_eq!(self.kind(), DbKind::PESSIMISTIC); + unsafe { Pin::new_unchecked(&mut *self.inner_tdb()) } + } + #[inline] + fn get_odb(&self) -> Pin<&mut OptimisticTransactionDB> { + debug_assert_eq!(self.kind(), DbKind::OPTIMISTIC); + unsafe { Pin::new_unchecked(&mut *self.inner_odb()) } + } + + #[inline] + pub fn p_txn( + &self, + write_options: &WriteOptions, + txn_options: &TransactionOptions, + ) -> UniquePtr { + let tdb = self.get_tdb(); + unsafe { + UniquePtr::from_raw(tdb.BeginTransaction( + write_options, + txn_options, + std::ptr::null_mut(), + )) + } + } + + #[inline] + pub fn o_txn( + &self, + write_options: &WriteOptions, + txn_options: &OptimisticTransactionOptions, + ) -> UniquePtr { + let odb = self.get_odb(); + unsafe { + UniquePtr::from_raw(odb.BeginTransaction( + write_options, + txn_options, + std::ptr::null_mut(), + )) + } + } + + #[inline] + pub fn get( + &self, + opts: &ReadOptions, + key: impl AsRef<[u8]>, + val: Pin<&mut PinnableSlice>, + ) -> DbStatus { + let db = self.get_raw_db(); + let key = convert_slice(key.as_ref()); + let cf = db.DefaultColumnFamily(); + + moveit! { let status = unsafe { + let val = Pin::into_inner_unchecked(val) as *mut PinnableSlice; + db.Get1(opts, cf, &key, val) + }; } + convert_status(&status) + } + + #[inline] + pub fn put( + &self, + opts: &WriteOptions, + key: impl AsRef<[u8]>, + val: impl AsRef<[u8]>, + ) -> DbStatus { + let db = self.get_raw_db(); + let key = convert_slice(key.as_ref()); + let val = convert_slice(val.as_ref()); + moveit! { let status = db.Put2(opts, &key, &val); } + convert_status(&status) + } + + #[inline] + pub fn delete(&self, opts: &WriteOptions, key: impl AsRef<[u8]>) -> DbStatus { + let db = self.get_raw_db(); + let key = convert_slice(key.as_ref()); + moveit! { let status = db.Delete2(opts, &key); } + convert_status(&status) + } + #[inline] + pub fn delete_range( + &self, + opts: &WriteOptions, + start: impl AsRef<[u8]>, + end: impl AsRef<[u8]>, + ) -> DbStatus { + let db = self.get_raw_db(); + let start = convert_slice(start.as_ref()); + let end = convert_slice(end.as_ref()); + let cf = db.DefaultColumnFamily(); + moveit! { let status = unsafe { db.DeleteRange(opts, cf, &start, &end) }; } + convert_status(&status) + } + #[inline] + pub fn iterator(&self, opts: &ReadOptions) -> UniquePtr { + let db = self.get_raw_db(); + unsafe { UniquePtr::from_raw(db.NewIterator1(opts)) } + } + + #[inline] + pub fn get_snapshot(&self) -> *const Snapshot { + let db = self.get_raw_db(); + db.GetSnapshot() + } + + #[inline] + pub fn release_snapshot(&self, snapshot: *const Snapshot) { + let db = self.get_raw_db(); + unsafe { db.ReleaseSnapshot(snapshot) } + } } #[macro_export] @@ -166,12 +273,6 @@ macro_rules! let_read_opts { unsafe impl Send for RustComparator {} unsafe impl Sync for RustComparator {} -impl ReadOptions { - pub fn x(&self) -> bool { - true - } -} - #[cfg(test)] mod tests { use super::ffi::rocksdb::{Options, ReadOptions, WriteOptions};