From 023d32b6ecd6cfd3f100ca1d3bf00428acea7f27 Mon Sep 17 00:00:00 2001 From: Ziyang Hu Date: Thu, 14 Apr 2022 20:01:26 +0800 Subject: [PATCH] custom comparator --- README.md | 8 ++ cozo-rocks-sys/include/cozorocks.h | 142 +++++++++++++++++----- cozo-rocks-sys/src/cozorocks.cc | 129 +------------------- cozo-rocks-sys/src/lib.rs | 186 ++++++++++++++++++++++++++--- src/storage.rs | 39 ++++-- src/value.rs | 10 +- 6 files changed, 329 insertions(+), 185 deletions(-) create mode 100644 README.md diff --git a/README.md b/README.md new file mode 100644 index 00000000..df62a126 --- /dev/null +++ b/README.md @@ -0,0 +1,8 @@ +## Build + +First build static lib for RocksDB + +```bash +cd rocksdb +USE_RTTI=1 DEBUG_LEVEL=0 make static_lib +``` \ No newline at end of file diff --git a/cozo-rocks-sys/include/cozorocks.h b/cozo-rocks-sys/include/cozorocks.h index 2d19154f..ae9b33e3 100644 --- a/cozo-rocks-sys/include/cozorocks.h +++ b/cozo-rocks-sys/include/cozorocks.h @@ -15,14 +15,58 @@ struct Status; -typedef ROCKSDB_NAMESPACE::Status::Code StatusCode; -typedef ROCKSDB_NAMESPACE::Status::SubCode StatusSubCode; -typedef ROCKSDB_NAMESPACE::Status::Severity StatusSeverity; +namespace RDB = ROCKSDB_NAMESPACE; -std::unique_ptr new_db(); +typedef RDB::Status::Code StatusCode; +typedef RDB::Status::SubCode StatusSubCode; +typedef RDB::Status::Severity StatusSeverity; -struct Options { - mutable ROCKSDB_NAMESPACE::Options inner; +std::unique_ptr new_db(); + +struct ReadOptionsBridge { + mutable RDB::ReadOptions inner; +}; + +struct WriteOptionsBridge { + mutable RDB::WriteOptions inner; + +public: + inline void set_disable_wal(bool v) const { + inner.disableWAL = v; + } +}; + +typedef rust::Fn, rust::Slice)> RustComparatorFn; + +class RustComparator: public RDB::Comparator { +public: + inline int Compare(const rocksdb::Slice& a, const rocksdb::Slice& b ) const { + auto ra = rust::Slice(reinterpret_cast(a.data()), a.size()); + auto rb = rust::Slice(reinterpret_cast(b.data()), b.size()); + return int(rust_compare(ra, rb)); + } + + const char* Name() const { + return "RustComparator"; + } + void FindShortestSeparator(std::string*, const rocksdb::Slice&) const { } + void FindShortSuccessor(std::string*) const { } + + void set_fn(RustComparatorFn f) const { + rust_compare = f; + } + + void set_name(rust::Str name_) const { + name = std::string(name_); + } + + mutable std::string name; + mutable RustComparatorFn rust_compare; +}; + +struct OptionsBridge { + mutable RDB::Options inner; + mutable RustComparator cmp_obj; public: inline void prepare_for_bulk_load() const { @@ -40,47 +84,85 @@ public: inline void set_create_if_missing(bool v) const { inner.create_if_missing = v; } + + inline void set_comparator(rust::Str name, RustComparatorFn f) const { + cmp_obj = RustComparator(); + cmp_obj.set_name(name); + cmp_obj.set_fn(f); + inner.comparator = &cmp_obj; + } }; -inline std::unique_ptr new_options() { - return std::unique_ptr(new Options); +inline std::unique_ptr new_read_options() { + return std::unique_ptr(new ReadOptionsBridge); +} + +inline std::unique_ptr new_write_options() { + return std::unique_ptr(new WriteOptionsBridge); +} + +inline std::unique_ptr new_options() { + return std::unique_ptr(new OptionsBridge); } -struct PinnableSlice { - ROCKSDB_NAMESPACE::PinnableSlice inner; +struct PinnableSliceBridge { + RDB::PinnableSlice inner; inline rust::Slice as_bytes() const { return rust::Slice(reinterpret_cast(inner.data()), inner.size()); } }; +void write_status_impl(Status &status, StatusCode code, StatusSubCode subcode, StatusSeverity severity); -struct DB { - mutable ROCKSDB_NAMESPACE::DB *inner; - - inline ~DB() { - if (inner != nullptr) { - delete inner; - } +inline void write_status(RDB::Status &&rstatus, Status &status) { + if (rstatus.code() != StatusCode::kOk || rstatus.subcode() != StatusSubCode::kNoSpace || + rstatus.severity() != StatusSeverity::kNoError) { + write_status_impl(status, rstatus.code(), rstatus.subcode(), rstatus.severity()); } +} - void put(rust::Slice key, rust::Slice val, Status &status) const; +struct DBBridge { + mutable std::unique_ptr inner; + + DBBridge(RDB::DB *inner_) : inner(inner_) {} + + inline void put( + const WriteOptionsBridge &options, + rust::Slice key, + rust::Slice val, + Status &status + ) const { + write_status( + inner->Put(options.inner, + RDB::Slice(reinterpret_cast(key.data()), key.size()), + RDB::Slice(reinterpret_cast(val.data()), val.size())), + status + ); + } - inline std::unique_ptr get(rust::Slice key) const { - auto pinnable_val = std::make_unique(); - inner->Get(ROCKSDB_NAMESPACE::ReadOptions(), - inner->DefaultColumnFamily(), - ROCKSDB_NAMESPACE::Slice(reinterpret_cast(key.data()), key.size()), - &pinnable_val->inner); + inline std::unique_ptr get( + const ReadOptionsBridge &options, + rust::Slice key, + Status &status + ) const { + auto pinnable_val = std::make_unique(); + write_status( + inner->Get(options.inner, + inner->DefaultColumnFamily(), + RDB::Slice(reinterpret_cast(key.data()), key.size()), + &pinnable_val->inner), + status + ); return pinnable_val; } }; -inline std::unique_ptr open_db(const Options &options, const rust::Str path) { - ROCKSDB_NAMESPACE::DB *db_ptr; - ROCKSDB_NAMESPACE::Status s = ROCKSDB_NAMESPACE::DB::Open(options.inner, std::string(path), &db_ptr); - auto db = std::unique_ptr(new DB); - db->inner = db_ptr; - return db; +inline std::unique_ptr open_db(const OptionsBridge &options, const rust::Slice path) { + RDB::DB *db_ptr; + RDB::Status s = RDB::DB::Open(options.inner, + std::string(reinterpret_cast(path.data()), path.size()), + &db_ptr); + return std::unique_ptr(new DBBridge(db_ptr)); } \ No newline at end of file diff --git a/cozo-rocks-sys/src/cozorocks.cc b/cozo-rocks-sys/src/cozorocks.cc index 19117234..295d04e2 100644 --- a/cozo-rocks-sys/src/cozorocks.cc +++ b/cozo-rocks-sys/src/cozorocks.cc @@ -2,133 +2,12 @@ // Created by Ziyang Hu on 2022/4/13. // -#include #include "cozorocks.h" - - -#include "rocksdb/db.h" -#include "rocksdb/slice.h" -#include "rocksdb/options.h" -#include "rocksdb/utilities/transaction.h" -#include "rocksdb/utilities/transaction_db.h" #include "cozo-rocks-sys/src/lib.rs.h" -//using ROCKSDB_NAMESPACE::DB; -//using ROCKSDB_NAMESPACE::Options; -//using ROCKSDB_NAMESPACE::PinnableSlice; -//using ROCKSDB_NAMESPACE::ReadOptions; -//using ROCKSDB_NAMESPACE::Status; -//using ROCKSDB_NAMESPACE::WriteBatch; -//using ROCKSDB_NAMESPACE::WriteOptions; -//using ROCKSDB_NAMESPACE::ColumnFamilyDescriptor; -//using ROCKSDB_NAMESPACE::ColumnFamilyHandle; -//using ROCKSDB_NAMESPACE::ColumnFamilyOptions; -//using ROCKSDB_NAMESPACE::Slice; -//using ROCKSDB_NAMESPACE::Snapshot; -//using ROCKSDB_NAMESPACE::Transaction; -//using ROCKSDB_NAMESPACE::TransactionDB; -//using ROCKSDB_NAMESPACE::TransactionDBOptions; -//using ROCKSDB_NAMESPACE::TransactionOptions; - - -#if defined(OS_WIN) -std::string kDBPath = "C:\\Windows\\TEMP\\rocksdb_simple_example"; -#else -std::string kDBPath = "/tmp/rocksdb_simple_example"; -#endif - -//std::unique_ptr new_db() { -// DB *db_ptr; -// Options options; -// // Optimize RocksDB. This is the easiest way to get RocksDB to perform well -// options.IncreaseParallelism(); -// options.OptimizeLevelStyleCompaction(); -// // create the DB if it's not already present -// options.create_if_missing = true; -// -// // open DB -// Status s = DB::Open(options, kDBPath, &db_ptr); -// std::unique_ptr db(db_ptr); -// assert(s.ok()); -// -// // Put key-value -// s = db->Put(WriteOptions(), "key1", "value"); -// assert(s.ok()); -// std::string value; -// // get value -// s = db->Get(ReadOptions(), "key1", &value); -// assert(s.ok()); -// assert(value == "value"); -// -// // atomically apply a set of updates -// { -// WriteBatch batch; -// batch.Delete("key1"); -// batch.Put("key2", value); -// s = db->Write(WriteOptions(), &batch); -// } -// -// s = db->Get(ReadOptions(), "key1", &value); -// assert(s.IsNotFound()); -// -// db->Get(ReadOptions(), "key2", &value); -// assert(value == "value"); -// std::cout << value << " and fuck!" << std::endl; -// -// { -// PinnableSlice pinnable_val; -// db->Get(ReadOptions(), db->DefaultColumnFamily(), "key2", &pinnable_val); -// assert(pinnable_val == "value"); -// } -// -// { -// std::string string_val; -// // If it cannot pin the value, it copies the value to its internal buffer. -// // The intenral buffer could be set during construction. -// PinnableSlice pinnable_val(&string_val); -// db->Get(ReadOptions(), db->DefaultColumnFamily(), "key2", &pinnable_val); -// assert(pinnable_val == "value"); -// // If the value is not pinned, the internal buffer must have the value. -// assert(pinnable_val.IsPinned() || string_val == "value"); -// } -// -// PinnableSlice pinnable_val; -// s = db->Get(ReadOptions(), db->DefaultColumnFamily(), "key1", &pinnable_val); -// assert(s.IsNotFound()); -// // Reset PinnableSlice after each use and before each reuse -// pinnable_val.Reset(); -// db->Get(ReadOptions(), db->DefaultColumnFamily(), "key2", &pinnable_val); -// assert(pinnable_val == "value"); -// pinnable_val.Reset(); -// // The Slice pointed by pinnable_val is not valid after this point -// -// std::cout << "hello from C++" << std::endl; -//// return std::unique_ptr(new BlobstoreClient()); -// return db; -//} - -//std::unique_ptr open_db(const Options& options, const std::string& path) { -// DB *db_ptr; -// // Optimize RocksDB. This is the easiest way to get RocksDB to perform well -//// options.IncreaseParallelism(); -//// options.OptimizeLevelStyleCompaction(); -// // create the DB if it's not already present -//// options.create_if_missing = true; -// -// // open DB -// Status s = DB::Open(options, path, &db_ptr); -// std::unique_ptr db(db_ptr); -// std::unique_ptr cdb(new CozoRocksDB{}); -// cdb->db = std::move(db); -// cdb->db_status = std::move(s); -// return cdb; -//} +void write_status_impl(Status &status, StatusCode code, StatusSubCode subcode, StatusSeverity severity) { + status.code = code; + status.subcode = subcode; + status.severity = severity; -void DB::put(rust::Slice key, rust::Slice val, Status &status) const { - auto s = inner->Put(ROCKSDB_NAMESPACE::WriteOptions(), - ROCKSDB_NAMESPACE::Slice(reinterpret_cast(key.data()), key.size()), - ROCKSDB_NAMESPACE::Slice(reinterpret_cast(val.data()), val.size())); - status.code = s.code(); - status.subcode = s.subcode(); - status.severity = s.severity(); } \ No newline at end of file diff --git a/cozo-rocks-sys/src/lib.rs b/cozo-rocks-sys/src/lib.rs index 96566130..58284fca 100644 --- a/cozo-rocks-sys/src/lib.rs +++ b/cozo-rocks-sys/src/lib.rs @@ -4,7 +4,7 @@ mod ffi { struct Status { code: StatusCode, subcode: StatusSubCode, - severity: StatusSeverity + severity: StatusSeverity, } #[derive(Copy, Clone, Debug, Eq, PartialEq)] @@ -61,35 +61,187 @@ mod ffi { unsafe extern "C++" { include!("cozo-rocks-sys/include/cozorocks.h"); - type DB; - type Options; - type PinnableSlice; - type StatusCode; type StatusSubCode; type StatusSeverity; - fn as_bytes(self: &PinnableSlice) -> &[u8]; + type PinnableSliceBridge; + fn as_bytes(self: &PinnableSliceBridge) -> &[u8]; + + type ReadOptionsBridge; + fn new_read_options() -> UniquePtr; - fn new_options() -> UniquePtr; - fn prepare_for_bulk_load(self: &Options); - fn increase_parallelism(self: &Options); - fn optimize_level_style_compaction(self: &Options); - fn set_create_if_missing(self: &Options, v: bool); + type WriteOptionsBridge; + fn new_write_options() -> UniquePtr; + fn set_disable_wal(self: &WriteOptionsBridge, v: bool); - fn open_db(options: &Options, path: &str) -> UniquePtr; - fn put(self: &DB, key: &[u8], val: &[u8], status: &mut Status); - fn get(self: &DB, key: &[u8]) -> UniquePtr; + type OptionsBridge; + fn new_options() -> UniquePtr; + fn prepare_for_bulk_load(self: &OptionsBridge); + fn increase_parallelism(self: &OptionsBridge); + fn optimize_level_style_compaction(self: &OptionsBridge); + fn set_create_if_missing(self: &OptionsBridge, v: bool); + fn set_comparator(self: &OptionsBridge, name: &str, compare: fn(&[u8], &[u8]) -> i8); + + type DBBridge; + fn open_db(options: &OptionsBridge, path: &[u8]) -> UniquePtr; + fn put(self: &DBBridge, options: &WriteOptionsBridge, key: &[u8], val: &[u8], status: &mut Status); + fn get(self: &DBBridge, options: &ReadOptionsBridge, key: &[u8], status: &mut Status) -> UniquePtr; } } + + +use std::sync::atomic::Ordering; +use cxx::UniquePtr; pub use ffi::*; -impl Status { - pub fn new() -> Self { +pub struct Options { + bridge: UniquePtr, +} + +impl Options { + #[inline] + pub fn prepare_for_bulk_load(self) -> Self { + self.bridge.prepare_for_bulk_load(); + self + } + + #[inline] + pub fn increase_parallelism(self) -> Self { + self.bridge.increase_parallelism(); + self + } + + #[inline] + pub fn optimize_level_style_compaction(self) -> Self { + self.bridge.optimize_level_style_compaction(); + self + } + + #[inline] + pub fn set_create_if_missing(self, v: bool) -> Self { + self.bridge.set_create_if_missing(v); + self + } + + #[inline] + pub fn set_comparator(self, name: &str, compare: fn(&[u8], &[u8]) -> i8) -> Self { + self.bridge.set_comparator(name, compare); + self + } +} + +impl Default for Options { + #[inline] + fn default() -> Self { + Self { bridge: new_options() } + } +} + +pub struct PinnableSlice { + bridge: UniquePtr, +} + +impl PinnableSlice { + pub fn as_bytes(&self) -> &[u8] { + self.bridge.as_bytes() + } +} + +pub struct ReadOptions { + bridge: UniquePtr, +} + +impl Default for ReadOptions { + fn default() -> Self { + Self { bridge: new_read_options() } + } +} + +pub struct WriteOptions { + bridge: UniquePtr, +} + +impl WriteOptions { + #[inline] + pub fn set_disable_wal(&self, v: bool) { + self.bridge.set_disable_wal(v); + } +} + +impl Default for WriteOptions { + fn default() -> Self { + Self { bridge: new_write_options() } + } +} + +pub struct DB { + bridge: UniquePtr, + options: Options, + default_read_options: ReadOptions, + default_write_options: WriteOptions, +} + +impl DB { + #[inline] + pub fn open(options: Options, path: impl AsRef) -> Self { + #[cfg(unix)] + { + use std::os::unix::ffi::OsStrExt; + Self { + bridge: open_db( + &options.bridge, + path.as_ref().as_os_str().as_bytes(), + ), + default_read_options: ReadOptions::default(), + default_write_options: WriteOptions::default(), + options + } + } + #[cfg(not(unix))] + { + Self { + bridge: open_db( + &options.bridge, + path.as_ref().to_string_lossy().to_string().as_bytes()) + } + } + } + + #[inline] + pub fn put(&self, key: impl AsRef<[u8]>, val: impl AsRef<[u8]>, options: Option<&WriteOptions>) -> Result { + let mut status = Status::default(); + self.bridge.put(&options.unwrap_or(&self.default_write_options).bridge, + key.as_ref(), val.as_ref(), + &mut status); + if status.code == StatusCode::kOk { + Ok(status) + } else { + Err(status) + } + } + + #[inline] + pub fn get(&self, key: impl AsRef<[u8]>, options: Option<&ReadOptions>) -> Result { + let mut status = Status::default(); + let slice = self.bridge.get( + &options.unwrap_or(&self.default_read_options).bridge, + key.as_ref(), &mut status); + if status.code == StatusCode::kOk { + Ok(PinnableSlice { bridge: slice }) + } else { + Err(status) + } + } +} + +impl Default for Status { + #[inline] + fn default() -> Self { Self { code: StatusCode::kOk, subcode: StatusSubCode::kNone, - severity: StatusSeverity::kNoError + severity: StatusSeverity::kNoError, } } } \ No newline at end of file diff --git a/src/storage.rs b/src/storage.rs index c0153bd6..c3598f22 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -70,22 +70,41 @@ impl Storage { #[cfg(test)] mod tests { use std::str::from_utf8; - use crate::typing::BaseType::String; + use crate::value::{ByteArrayBuilder, Value}; use super::*; #[test] fn import() { use cozo_rocks_sys::*; - let options = new_options(); - options.increase_parallelism(); - options.optimize_level_style_compaction(); - options.set_create_if_missing(true); - let db = open_db(&options, "xxyyzz"); - let mut status = Status::new(); - db.put("A key".as_bytes(), "A motherfucking value!!! 👋👋👋".as_bytes(), &mut status); - let val = db.get("A key".as_bytes()); + let db = DB::open(Options::default() + .increase_parallelism() + .optimize_level_style_compaction() + .set_create_if_missing(true) + .set_comparator("cozo_comparator_v1", cozo_comparator_v1), + "xxyyzz"); + + let mut x = vec![]; + let mut builder = ByteArrayBuilder::new(&mut x); + builder.build_value(&Value::RefString("A key")); + let key = builder.get(); + + let mut x = vec![]; + let mut builder = ByteArrayBuilder::new(&mut x); + builder.build_value(&Value::RefString("Another key")); + let key2 = builder.get(); + + db.put(&key, "A motherfucking value!!! 👋👋👋", None).unwrap(); + db.put(&key2, "Another motherfucking value!!! 👋👋👋", None).unwrap(); + // db.put("Yes man", "A motherfucking value!!! 👋👋👋", None).unwrap(); + let val = db.get(&key, None).unwrap(); + let val = val.as_bytes(); + println!("{}", from_utf8(val).unwrap()); + let val = db.get(&key2, None).unwrap(); + let val = val.as_bytes(); + println!("{}", from_utf8(val).unwrap()); + let val = db.get(&key, None).unwrap(); let val = val.as_bytes(); - println!("{:?} {}", status, from_utf8(val).unwrap()); + println!("{}", from_utf8(val).unwrap()); } } \ No newline at end of file diff --git a/src/value.rs b/src/value.rs index ad9ceb2e..65aef352 100644 --- a/src/value.rs +++ b/src/value.rs @@ -469,10 +469,14 @@ impl ByteArrayBuilder { } } -pub fn cozo_comparator_v1(a: &[u8], b: &[u8]) -> Ordering { - cmp_data(&mut ByteArrayParser { bytes: a, current: 0 }, - &mut ByteArrayParser { bytes: b, current: 0 }) +pub fn cozo_comparator_v1(a: &[u8], b: &[u8]) -> i8 { + match cmp_data(&mut ByteArrayParser { bytes: a, current: 0 }, + &mut ByteArrayParser { bytes: b, current: 0 }) { + Less => -1, + Equal => 0, + Greater => 1 + } } pub fn cmp_data<'a>(pa: &mut ByteArrayParser<'a>, pb: &mut ByteArrayParser<'a>) -> Ordering {