custom comparator

main
Ziyang Hu 2 years ago
parent 9a26171297
commit 023d32b6ec

@ -0,0 +1,8 @@
## Build
First build static lib for RocksDB
```bash
cd rocksdb
USE_RTTI=1 DEBUG_LEVEL=0 make static_lib
```

@ -15,14 +15,58 @@
struct Status;
typedef ROCKSDB_NAMESPACE::Status::Code StatusCode;
typedef ROCKSDB_NAMESPACE::Status::SubCode StatusSubCode;
typedef ROCKSDB_NAMESPACE::Status::Severity StatusSeverity;
namespace RDB = ROCKSDB_NAMESPACE;
std::unique_ptr<ROCKSDB_NAMESPACE::DB> new_db();
typedef RDB::Status::Code StatusCode;
typedef RDB::Status::SubCode StatusSubCode;
typedef RDB::Status::Severity StatusSeverity;
struct Options {
mutable ROCKSDB_NAMESPACE::Options inner;
std::unique_ptr<RDB::DB> new_db();
struct ReadOptionsBridge {
mutable RDB::ReadOptions inner;
};
struct WriteOptionsBridge {
mutable RDB::WriteOptions inner;
public:
inline void set_disable_wal(bool v) const {
inner.disableWAL = v;
}
};
typedef rust::Fn<std::int8_t(rust::Slice<const std::uint8_t>, rust::Slice<const std::uint8_t>)> RustComparatorFn;
class RustComparator: public RDB::Comparator {
public:
inline int Compare(const rocksdb::Slice& a, const rocksdb::Slice& b ) const {
auto ra = rust::Slice(reinterpret_cast<const std::uint8_t *>(a.data()), a.size());
auto rb = rust::Slice(reinterpret_cast<const std::uint8_t *>(b.data()), b.size());
return int(rust_compare(ra, rb));
}
const char* Name() const {
return "RustComparator";
}
void FindShortestSeparator(std::string*, const rocksdb::Slice&) const { }
void FindShortSuccessor(std::string*) const { }
void set_fn(RustComparatorFn f) const {
rust_compare = f;
}
void set_name(rust::Str name_) const {
name = std::string(name_);
}
mutable std::string name;
mutable RustComparatorFn rust_compare;
};
struct OptionsBridge {
mutable RDB::Options inner;
mutable RustComparator cmp_obj;
public:
inline void prepare_for_bulk_load() const {
@ -40,47 +84,85 @@ public:
inline void set_create_if_missing(bool v) const {
inner.create_if_missing = v;
}
inline void set_comparator(rust::Str name, RustComparatorFn f) const {
cmp_obj = RustComparator();
cmp_obj.set_name(name);
cmp_obj.set_fn(f);
inner.comparator = &cmp_obj;
}
};
inline std::unique_ptr<Options> new_options() {
return std::unique_ptr<Options>(new Options);
inline std::unique_ptr<ReadOptionsBridge> new_read_options() {
return std::unique_ptr<ReadOptionsBridge>(new ReadOptionsBridge);
}
inline std::unique_ptr<WriteOptionsBridge> new_write_options() {
return std::unique_ptr<WriteOptionsBridge>(new WriteOptionsBridge);
}
inline std::unique_ptr<OptionsBridge> new_options() {
return std::unique_ptr<OptionsBridge>(new OptionsBridge);
}
struct PinnableSlice {
ROCKSDB_NAMESPACE::PinnableSlice inner;
struct PinnableSliceBridge {
RDB::PinnableSlice inner;
inline rust::Slice<const std::uint8_t> as_bytes() const {
return rust::Slice(reinterpret_cast<const std::uint8_t *>(inner.data()), inner.size());
}
};
void write_status_impl(Status &status, StatusCode code, StatusSubCode subcode, StatusSeverity severity);
struct DB {
mutable ROCKSDB_NAMESPACE::DB *inner;
inline ~DB() {
if (inner != nullptr) {
delete inner;
}
inline void write_status(RDB::Status &&rstatus, Status &status) {
if (rstatus.code() != StatusCode::kOk || rstatus.subcode() != StatusSubCode::kNoSpace ||
rstatus.severity() != StatusSeverity::kNoError) {
write_status_impl(status, rstatus.code(), rstatus.subcode(), rstatus.severity());
}
}
void put(rust::Slice<const uint8_t> key, rust::Slice<const uint8_t> val, Status &status) const;
struct DBBridge {
mutable std::unique_ptr<RDB::DB> inner;
DBBridge(RDB::DB *inner_) : inner(inner_) {}
inline void put(
const WriteOptionsBridge &options,
rust::Slice<const uint8_t> key,
rust::Slice<const uint8_t> val,
Status &status
) const {
write_status(
inner->Put(options.inner,
RDB::Slice(reinterpret_cast<const char *>(key.data()), key.size()),
RDB::Slice(reinterpret_cast<const char *>(val.data()), val.size())),
status
);
}
inline std::unique_ptr<PinnableSlice> get(rust::Slice<const uint8_t> key) const {
auto pinnable_val = std::make_unique<PinnableSlice>();
inner->Get(ROCKSDB_NAMESPACE::ReadOptions(),
inline std::unique_ptr<PinnableSliceBridge> get(
const ReadOptionsBridge &options,
rust::Slice<const uint8_t> key,
Status &status
) const {
auto pinnable_val = std::make_unique<PinnableSliceBridge>();
write_status(
inner->Get(options.inner,
inner->DefaultColumnFamily(),
ROCKSDB_NAMESPACE::Slice(reinterpret_cast<const char *>(key.data()), key.size()),
&pinnable_val->inner);
RDB::Slice(reinterpret_cast<const char *>(key.data()), key.size()),
&pinnable_val->inner),
status
);
return pinnable_val;
}
};
inline std::unique_ptr<DB> open_db(const Options &options, const rust::Str path) {
ROCKSDB_NAMESPACE::DB *db_ptr;
ROCKSDB_NAMESPACE::Status s = ROCKSDB_NAMESPACE::DB::Open(options.inner, std::string(path), &db_ptr);
auto db = std::unique_ptr<DB>(new DB);
db->inner = db_ptr;
return db;
inline std::unique_ptr<DBBridge> open_db(const OptionsBridge &options, const rust::Slice<const uint8_t> path) {
RDB::DB *db_ptr;
RDB::Status s = RDB::DB::Open(options.inner,
std::string(reinterpret_cast<const char *>(path.data()), path.size()),
&db_ptr);
return std::unique_ptr<DBBridge>(new DBBridge(db_ptr));
}

@ -2,133 +2,12 @@
// Created by Ziyang Hu on 2022/4/13.
//
#include <iostream>
#include "cozorocks.h"
#include "rocksdb/db.h"
#include "rocksdb/slice.h"
#include "rocksdb/options.h"
#include "rocksdb/utilities/transaction.h"
#include "rocksdb/utilities/transaction_db.h"
#include "cozo-rocks-sys/src/lib.rs.h"
//using ROCKSDB_NAMESPACE::DB;
//using ROCKSDB_NAMESPACE::Options;
//using ROCKSDB_NAMESPACE::PinnableSlice;
//using ROCKSDB_NAMESPACE::ReadOptions;
//using ROCKSDB_NAMESPACE::Status;
//using ROCKSDB_NAMESPACE::WriteBatch;
//using ROCKSDB_NAMESPACE::WriteOptions;
//using ROCKSDB_NAMESPACE::ColumnFamilyDescriptor;
//using ROCKSDB_NAMESPACE::ColumnFamilyHandle;
//using ROCKSDB_NAMESPACE::ColumnFamilyOptions;
//using ROCKSDB_NAMESPACE::Slice;
//using ROCKSDB_NAMESPACE::Snapshot;
//using ROCKSDB_NAMESPACE::Transaction;
//using ROCKSDB_NAMESPACE::TransactionDB;
//using ROCKSDB_NAMESPACE::TransactionDBOptions;
//using ROCKSDB_NAMESPACE::TransactionOptions;
#if defined(OS_WIN)
std::string kDBPath = "C:\\Windows\\TEMP\\rocksdb_simple_example";
#else
std::string kDBPath = "/tmp/rocksdb_simple_example";
#endif
//std::unique_ptr<DB> new_db() {
// DB *db_ptr;
// Options options;
// // Optimize RocksDB. This is the easiest way to get RocksDB to perform well
// options.IncreaseParallelism();
// options.OptimizeLevelStyleCompaction();
// // create the DB if it's not already present
// options.create_if_missing = true;
//
// // open DB
// Status s = DB::Open(options, kDBPath, &db_ptr);
// std::unique_ptr<DB> db(db_ptr);
// assert(s.ok());
//
// // Put key-value
// s = db->Put(WriteOptions(), "key1", "value");
// assert(s.ok());
// std::string value;
// // get value
// s = db->Get(ReadOptions(), "key1", &value);
// assert(s.ok());
// assert(value == "value");
//
// // atomically apply a set of updates
// {
// WriteBatch batch;
// batch.Delete("key1");
// batch.Put("key2", value);
// s = db->Write(WriteOptions(), &batch);
// }
//
// s = db->Get(ReadOptions(), "key1", &value);
// assert(s.IsNotFound());
//
// db->Get(ReadOptions(), "key2", &value);
// assert(value == "value");
// std::cout << value << " and fuck!" << std::endl;
//
// {
// PinnableSlice pinnable_val;
// db->Get(ReadOptions(), db->DefaultColumnFamily(), "key2", &pinnable_val);
// assert(pinnable_val == "value");
// }
//
// {
// std::string string_val;
// // If it cannot pin the value, it copies the value to its internal buffer.
// // The intenral buffer could be set during construction.
// PinnableSlice pinnable_val(&string_val);
// db->Get(ReadOptions(), db->DefaultColumnFamily(), "key2", &pinnable_val);
// assert(pinnable_val == "value");
// // If the value is not pinned, the internal buffer must have the value.
// assert(pinnable_val.IsPinned() || string_val == "value");
// }
//
// PinnableSlice pinnable_val;
// s = db->Get(ReadOptions(), db->DefaultColumnFamily(), "key1", &pinnable_val);
// assert(s.IsNotFound());
// // Reset PinnableSlice after each use and before each reuse
// pinnable_val.Reset();
// db->Get(ReadOptions(), db->DefaultColumnFamily(), "key2", &pinnable_val);
// assert(pinnable_val == "value");
// pinnable_val.Reset();
// // The Slice pointed by pinnable_val is not valid after this point
//
// std::cout << "hello from C++" << std::endl;
//// return std::unique_ptr<BlobstoreClient>(new BlobstoreClient());
// return db;
//}
//std::unique_ptr<CozoRocksDB> open_db(const Options& options, const std::string& path) {
// DB *db_ptr;
// // Optimize RocksDB. This is the easiest way to get RocksDB to perform well
//// options.IncreaseParallelism();
//// options.OptimizeLevelStyleCompaction();
// // create the DB if it's not already present
//// options.create_if_missing = true;
//
// // open DB
// Status s = DB::Open(options, path, &db_ptr);
// std::unique_ptr<DB> db(db_ptr);
// std::unique_ptr<CozoRocksDB> cdb(new CozoRocksDB{});
// cdb->db = std::move(db);
// cdb->db_status = std::move(s);
// return cdb;
//}
void write_status_impl(Status &status, StatusCode code, StatusSubCode subcode, StatusSeverity severity) {
status.code = code;
status.subcode = subcode;
status.severity = severity;
void DB::put(rust::Slice<const uint8_t> key, rust::Slice<const uint8_t> val, Status &status) const {
auto s = inner->Put(ROCKSDB_NAMESPACE::WriteOptions(),
ROCKSDB_NAMESPACE::Slice(reinterpret_cast<const char *>(key.data()), key.size()),
ROCKSDB_NAMESPACE::Slice(reinterpret_cast<const char *>(val.data()), val.size()));
status.code = s.code();
status.subcode = s.subcode();
status.severity = s.severity();
}

@ -4,7 +4,7 @@ mod ffi {
struct Status {
code: StatusCode,
subcode: StatusSubCode,
severity: StatusSeverity
severity: StatusSeverity,
}
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
@ -61,35 +61,187 @@ mod ffi {
unsafe extern "C++" {
include!("cozo-rocks-sys/include/cozorocks.h");
type DB;
type Options;
type PinnableSlice;
type StatusCode;
type StatusSubCode;
type StatusSeverity;
fn as_bytes(self: &PinnableSlice) -> &[u8];
type PinnableSliceBridge;
fn as_bytes(self: &PinnableSliceBridge) -> &[u8];
type ReadOptionsBridge;
fn new_read_options() -> UniquePtr<ReadOptionsBridge>;
fn new_options() -> UniquePtr<Options>;
fn prepare_for_bulk_load(self: &Options);
fn increase_parallelism(self: &Options);
fn optimize_level_style_compaction(self: &Options);
fn set_create_if_missing(self: &Options, v: bool);
type WriteOptionsBridge;
fn new_write_options() -> UniquePtr<WriteOptionsBridge>;
fn set_disable_wal(self: &WriteOptionsBridge, v: bool);
fn open_db(options: &Options, path: &str) -> UniquePtr<DB>;
fn put(self: &DB, key: &[u8], val: &[u8], status: &mut Status);
fn get(self: &DB, key: &[u8]) -> UniquePtr<PinnableSlice>;
type OptionsBridge;
fn new_options() -> UniquePtr<OptionsBridge>;
fn prepare_for_bulk_load(self: &OptionsBridge);
fn increase_parallelism(self: &OptionsBridge);
fn optimize_level_style_compaction(self: &OptionsBridge);
fn set_create_if_missing(self: &OptionsBridge, v: bool);
fn set_comparator(self: &OptionsBridge, name: &str, compare: fn(&[u8], &[u8]) -> i8);
type DBBridge;
fn open_db(options: &OptionsBridge, path: &[u8]) -> UniquePtr<DBBridge>;
fn put(self: &DBBridge, options: &WriteOptionsBridge, key: &[u8], val: &[u8], status: &mut Status);
fn get(self: &DBBridge, options: &ReadOptionsBridge, key: &[u8], status: &mut Status) -> UniquePtr<PinnableSliceBridge>;
}
}
use std::sync::atomic::Ordering;
use cxx::UniquePtr;
pub use ffi::*;
impl Status {
pub fn new() -> Self {
pub struct Options {
bridge: UniquePtr<OptionsBridge>,
}
impl Options {
#[inline]
pub fn prepare_for_bulk_load(self) -> Self {
self.bridge.prepare_for_bulk_load();
self
}
#[inline]
pub fn increase_parallelism(self) -> Self {
self.bridge.increase_parallelism();
self
}
#[inline]
pub fn optimize_level_style_compaction(self) -> Self {
self.bridge.optimize_level_style_compaction();
self
}
#[inline]
pub fn set_create_if_missing(self, v: bool) -> Self {
self.bridge.set_create_if_missing(v);
self
}
#[inline]
pub fn set_comparator(self, name: &str, compare: fn(&[u8], &[u8]) -> i8) -> Self {
self.bridge.set_comparator(name, compare);
self
}
}
impl Default for Options {
#[inline]
fn default() -> Self {
Self { bridge: new_options() }
}
}
pub struct PinnableSlice {
bridge: UniquePtr<PinnableSliceBridge>,
}
impl PinnableSlice {
pub fn as_bytes(&self) -> &[u8] {
self.bridge.as_bytes()
}
}
pub struct ReadOptions {
bridge: UniquePtr<ReadOptionsBridge>,
}
impl Default for ReadOptions {
fn default() -> Self {
Self { bridge: new_read_options() }
}
}
pub struct WriteOptions {
bridge: UniquePtr<WriteOptionsBridge>,
}
impl WriteOptions {
#[inline]
pub fn set_disable_wal(&self, v: bool) {
self.bridge.set_disable_wal(v);
}
}
impl Default for WriteOptions {
fn default() -> Self {
Self { bridge: new_write_options() }
}
}
pub struct DB {
bridge: UniquePtr<DBBridge>,
options: Options,
default_read_options: ReadOptions,
default_write_options: WriteOptions,
}
impl DB {
#[inline]
pub fn open(options: Options, path: impl AsRef<std::path::Path>) -> Self {
#[cfg(unix)]
{
use std::os::unix::ffi::OsStrExt;
Self {
bridge: open_db(
&options.bridge,
path.as_ref().as_os_str().as_bytes(),
),
default_read_options: ReadOptions::default(),
default_write_options: WriteOptions::default(),
options
}
}
#[cfg(not(unix))]
{
Self {
bridge: open_db(
&options.bridge,
path.as_ref().to_string_lossy().to_string().as_bytes())
}
}
}
#[inline]
pub fn put(&self, key: impl AsRef<[u8]>, val: impl AsRef<[u8]>, options: Option<&WriteOptions>) -> Result<Status, Status> {
let mut status = Status::default();
self.bridge.put(&options.unwrap_or(&self.default_write_options).bridge,
key.as_ref(), val.as_ref(),
&mut status);
if status.code == StatusCode::kOk {
Ok(status)
} else {
Err(status)
}
}
#[inline]
pub fn get(&self, key: impl AsRef<[u8]>, options: Option<&ReadOptions>) -> Result<PinnableSlice, Status> {
let mut status = Status::default();
let slice = self.bridge.get(
&options.unwrap_or(&self.default_read_options).bridge,
key.as_ref(), &mut status);
if status.code == StatusCode::kOk {
Ok(PinnableSlice { bridge: slice })
} else {
Err(status)
}
}
}
impl Default for Status {
#[inline]
fn default() -> Self {
Self {
code: StatusCode::kOk,
subcode: StatusSubCode::kNone,
severity: StatusSeverity::kNoError
severity: StatusSeverity::kNoError,
}
}
}

@ -70,22 +70,41 @@ impl Storage {
#[cfg(test)]
mod tests {
use std::str::from_utf8;
use crate::typing::BaseType::String;
use crate::value::{ByteArrayBuilder, Value};
use super::*;
#[test]
fn import() {
use cozo_rocks_sys::*;
let options = new_options();
options.increase_parallelism();
options.optimize_level_style_compaction();
options.set_create_if_missing(true);
let db = open_db(&options, "xxyyzz");
let mut status = Status::new();
db.put("A key".as_bytes(), "A motherfucking value!!! 👋👋👋".as_bytes(), &mut status);
let val = db.get("A key".as_bytes());
let db = DB::open(Options::default()
.increase_parallelism()
.optimize_level_style_compaction()
.set_create_if_missing(true)
.set_comparator("cozo_comparator_v1", cozo_comparator_v1),
"xxyyzz");
let mut x = vec![];
let mut builder = ByteArrayBuilder::new(&mut x);
builder.build_value(&Value::RefString("A key"));
let key = builder.get();
let mut x = vec![];
let mut builder = ByteArrayBuilder::new(&mut x);
builder.build_value(&Value::RefString("Another key"));
let key2 = builder.get();
db.put(&key, "A motherfucking value!!! 👋👋👋", None).unwrap();
db.put(&key2, "Another motherfucking value!!! 👋👋👋", None).unwrap();
// db.put("Yes man", "A motherfucking value!!! 👋👋👋", None).unwrap();
let val = db.get(&key, None).unwrap();
let val = val.as_bytes();
println!("{}", from_utf8(val).unwrap());
let val = db.get(&key2, None).unwrap();
let val = val.as_bytes();
println!("{}", from_utf8(val).unwrap());
let val = db.get(&key, None).unwrap();
let val = val.as_bytes();
println!("{:?} {}", status, from_utf8(val).unwrap());
println!("{}", from_utf8(val).unwrap());
}
}

@ -469,10 +469,14 @@ impl<T: Write> ByteArrayBuilder<T> {
}
}
pub fn cozo_comparator_v1(a: &[u8], b: &[u8]) -> Ordering {
cmp_data(&mut ByteArrayParser { bytes: a, current: 0 },
&mut ByteArrayParser { bytes: b, current: 0 })
pub fn cozo_comparator_v1(a: &[u8], b: &[u8]) -> i8 {
match cmp_data(&mut ByteArrayParser { bytes: a, current: 0 },
&mut ByteArrayParser { bytes: b, current: 0 }) {
Less => -1,
Equal => 0,
Greater => 1
}
}
pub fn cmp_data<'a>(pa: &mut ByteArrayParser<'a>, pb: &mut ByteArrayParser<'a>) -> Ordering {

Loading…
Cancel
Save