back to where we were, but proper support for column families

main
Ziyang Hu 2 years ago
parent 83b9e0852e
commit f8de57de91

@ -13,18 +13,31 @@
#include "rocksdb/utilities/transaction.h" #include "rocksdb/utilities/transaction.h"
#include "rocksdb/utilities/transaction_db.h" #include "rocksdb/utilities/transaction_db.h"
struct Status; using namespace ROCKSDB_NAMESPACE;
using std::unique_ptr;
namespace RDB = ROCKSDB_NAMESPACE; using std::shared_ptr;
using std::make_unique;
typedef RDB::Status::Code StatusCode; using std::make_shared;
typedef RDB::Status::SubCode StatusSubCode; using std::string;
typedef RDB::Status::Severity StatusSeverity; using std::vector;
using std::unordered_map;
struct BridgeStatus;
typedef Status::Code StatusCode;
typedef Status::SubCode StatusSubCode;
typedef Status::Severity StatusSeverity;
inline Slice convert_slice(rust::Slice<const uint8_t> d) {
return Slice(reinterpret_cast<const char *>(d.data()), d.size());
}
std::unique_ptr <RDB::DB> new_db(); inline rust::Slice<const uint8_t> convert_slice_back(const Slice &s) {
return rust::Slice(reinterpret_cast<const std::uint8_t *>(s.data()), s.size());
}
struct ReadOptionsBridge { struct ReadOptionsBridge {
mutable RDB::ReadOptions inner; mutable ReadOptions inner;
inline void do_set_verify_checksums(bool v) const { inline void do_set_verify_checksums(bool v) const {
inner.verify_checksums = v; inner.verify_checksums = v;
@ -36,7 +49,7 @@ struct ReadOptionsBridge {
}; };
struct WriteOptionsBridge { struct WriteOptionsBridge {
mutable RDB::WriteOptions inner; mutable WriteOptions inner;
public: public:
inline void do_set_disable_wal(bool v) const { inline void do_set_disable_wal(bool v) const {
@ -46,12 +59,10 @@ public:
typedef rust::Fn<std::int8_t(rust::Slice<const std::uint8_t>, rust::Slice<const std::uint8_t>)> RustComparatorFn; typedef rust::Fn<std::int8_t(rust::Slice<const std::uint8_t>, rust::Slice<const std::uint8_t>)> RustComparatorFn;
class RustComparator : public RDB::Comparator { class RustComparator : public Comparator {
public: public:
inline int Compare(const rocksdb::Slice &a, const rocksdb::Slice &b) const { inline int Compare(const rocksdb::Slice &a, const rocksdb::Slice &b) const {
auto ra = rust::Slice(reinterpret_cast<const std::uint8_t *>(a.data()), a.size()); return int(rust_compare(convert_slice_back(a), convert_slice_back(b)));
auto rb = rust::Slice(reinterpret_cast<const std::uint8_t *>(b.data()), b.size());
return int(rust_compare(ra, rb));
} }
const char *Name() const { const char *Name() const {
@ -75,10 +86,9 @@ public:
}; };
struct OptionsBridge { struct OptionsBridge {
mutable RDB::Options inner; mutable Options inner;
mutable RustComparator cmp_obj; mutable RustComparator cmp_obj;
public:
inline void do_prepare_for_bulk_load() const { inline void do_prepare_for_bulk_load() const {
inner.PrepareForBulkLoad(); inner.PrepareForBulkLoad();
} }
@ -117,40 +127,42 @@ inline std::unique_ptr <OptionsBridge> new_options() {
struct PinnableSliceBridge { struct PinnableSliceBridge {
RDB::PinnableSlice inner; PinnableSlice inner;
inline rust::Slice<const std::uint8_t> as_bytes() const { inline rust::Slice<const std::uint8_t> as_bytes() const {
return rust::Slice(reinterpret_cast<const std::uint8_t *>(inner.data()), inner.size()); return convert_slice_back(inner);
} }
}; };
struct SliceBridge { struct SliceBridge {
RDB::Slice inner; Slice inner;
SliceBridge(RDB::Slice&& s) : inner(s) {}
SliceBridge(Slice &&s) : inner(s) {}
inline rust::Slice<const std::uint8_t> as_bytes() const { inline rust::Slice<const std::uint8_t> as_bytes() const {
return rust::Slice(reinterpret_cast<const std::uint8_t *>(inner.data()), inner.size()); return convert_slice_back(inner);
} }
}; };
void write_status_impl(Status &status, StatusCode code, StatusSubCode subcode, StatusSeverity severity); void write_status_impl(BridgeStatus &status, StatusCode code, StatusSubCode subcode, StatusSeverity severity, int bridge_code);
inline void write_status(RDB::Status &&rstatus, Status &status) { inline void write_status(Status &&rstatus, BridgeStatus &status) {
if (rstatus.code() != StatusCode::kOk || rstatus.subcode() != StatusSubCode::kNoSpace || if (rstatus.code() != StatusCode::kOk || rstatus.subcode() != StatusSubCode::kNoSpace ||
rstatus.severity() != StatusSeverity::kNoError) { rstatus.severity() != StatusSeverity::kNoError) {
write_status_impl(status, rstatus.code(), rstatus.subcode(), rstatus.severity()); write_status_impl(status, rstatus.code(), rstatus.subcode(), rstatus.severity(), 0);
} }
} }
struct WriteBatchBridge { //
mutable RDB::WriteBatch inner; //struct WriteBatchBridge {
std::vector<RDB::ColumnFamilyHandle *> *handles; // mutable WriteBatch inner;
}; // std::vector<ColumnFamilyHandle *> *handles;
//};
//
struct IteratorBridge { struct IteratorBridge {
mutable std::unique_ptr <RDB::Iterator> inner; mutable std::unique_ptr <Iterator> inner;
IteratorBridge(RDB::Iterator *it) : inner(it) {} IteratorBridge(Iterator *it) : inner(it) {}
inline void seek_to_first() const { inline void seek_to_first() const {
inner->SeekToFirst(); inner->SeekToFirst();
@ -169,121 +181,151 @@ struct IteratorBridge {
} }
inline void do_seek(rust::Slice<const uint8_t> key) const { inline void do_seek(rust::Slice<const uint8_t> key) const {
auto k = RDB::Slice(reinterpret_cast<const char *>(key.data()), key.size()); auto k = Slice(reinterpret_cast<const char *>(key.data()), key.size());
inner->Seek(k); inner->Seek(k);
} }
inline void do_seek_for_prev(rust::Slice<const uint8_t> key) const { inline void do_seek_for_prev(rust::Slice<const uint8_t> key) const {
auto k = RDB::Slice(reinterpret_cast<const char *>(key.data()), key.size()); auto k = Slice(reinterpret_cast<const char *>(key.data()), key.size());
inner->SeekForPrev(k); inner->SeekForPrev(k);
} }
inline std::unique_ptr<SliceBridge> key() const { inline std::unique_ptr <SliceBridge> key() const {
return std::make_unique<SliceBridge>(inner->key()); return std::make_unique<SliceBridge>(inner->key());
} }
inline std::unique_ptr<SliceBridge> value() const { inline std::unique_ptr <SliceBridge> value() const {
return std::make_unique<SliceBridge>(inner->value()); return std::make_unique<SliceBridge>(inner->value());
} }
Status status() const; BridgeStatus status() const;
}; };
struct DBBridge { struct DBBridge {
mutable std::unique_ptr <RDB::DB> inner; mutable unique_ptr <DB> db;
mutable unordered_map <string, shared_ptr<ColumnFamilyHandle>> handles;
mutable std::vector<RDB::ColumnFamilyHandle *> handles; DBBridge(DB *db_,
unordered_map <string, shared_ptr<ColumnFamilyHandle>> &&handles_) : db(db_), handles(handles_) {}
DBBridge(RDB::DB *inner_) : inner(inner_) {}
inline std::unique_ptr <std::vector<std::string>> cf_names() const { inline shared_ptr <ColumnFamilyHandle> get_cf_handle_raw(const string &name) const {
auto ret = std::make_unique < std::vector < std::string >> (); try {
for (auto h: handles) { return handles.at(name);
ret->push_back(h->GetName()); } catch (const std::out_of_range &) {
return shared_ptr<ColumnFamilyHandle>(nullptr);
} }
return ret;
}
inline std::unique_ptr <WriteBatchBridge> write_batch() const {
auto wb = std::make_unique<WriteBatchBridge>();
wb->handles = &handles;
return wb;
} }
inline void put( //
// inline std::unique_ptr <WriteBatchBridge> write_batch() const {
// auto wb = std::make_unique<WriteBatchBridge>();
// wb->handles = &handles;
// return wb;
// }
//
inline void put_raw(
const WriteOptionsBridge &options, const WriteOptionsBridge &options,
std::size_t cf_id, const ColumnFamilyHandle &cf,
rust::Slice<const uint8_t> key, rust::Slice<const uint8_t> key,
rust::Slice<const uint8_t> val, rust::Slice<const uint8_t> val,
Status &status BridgeStatus &status
) const { ) const {
write_status( write_status(
inner->Put(options.inner, db->Put(options.inner,
handles[cf_id], const_cast<ColumnFamilyHandle *>(&cf),
RDB::Slice(reinterpret_cast<const char *>(key.data()), key.size()), convert_slice(key),
RDB::Slice(reinterpret_cast<const char *>(val.data()), val.size())), convert_slice(val)),
status status
); );
} }
inline std::unique_ptr <PinnableSliceBridge> get( inline std::unique_ptr <PinnableSliceBridge> get_raw(
const ReadOptionsBridge &options, const ReadOptionsBridge &options,
std::size_t cf_id, const ColumnFamilyHandle &cf,
rust::Slice<const uint8_t> key, rust::Slice<const uint8_t> key,
Status &status BridgeStatus &status
) const { ) const {
auto pinnable_val = std::make_unique<PinnableSliceBridge>(); auto pinnable_val = std::make_unique<PinnableSliceBridge>();
write_status( write_status(
inner->Get(options.inner, db->Get(options.inner,
handles[cf_id], const_cast<ColumnFamilyHandle *>(&cf),
RDB::Slice(reinterpret_cast<const char *>(key.data()), key.size()), convert_slice(key),
&pinnable_val->inner), &pinnable_val->inner),
status status
); );
return pinnable_val; return pinnable_val;
} }
inline std::unique_ptr <IteratorBridge> iterator(const ReadOptionsBridge &options, std::size_t cf_id) const { inline std::unique_ptr <IteratorBridge> iterator_raw(
return std::make_unique<IteratorBridge>(inner->NewIterator(options.inner, handles[cf_id])); const ReadOptionsBridge &options,
const ColumnFamilyHandle &cf) const {
return std::make_unique<IteratorBridge>(db->NewIterator(options.inner, const_cast<ColumnFamilyHandle *>(&cf)));
}
inline void create_column_family_raw(const OptionsBridge &options, const string &name, BridgeStatus &status) const {
if (handles.find(name) != handles.end()) {
write_status_impl(status, StatusCode::kMaxCode, StatusSubCode::kMaxSubCode, StatusSeverity::kSoftError, 2);
return;
}
ColumnFamilyHandle *handle;
auto s = db->CreateColumnFamily(options.inner, name, &handle);
write_status(std::move(s), status);
handles[name] = shared_ptr<ColumnFamilyHandle>(handle);
}
inline void drop_column_family_raw(const string &name, BridgeStatus &status) const {
auto cf_it = handles.find(name);
if (cf_it != handles.end()) {
auto s = db->DropColumnFamily(cf_it->second.get());
handles.erase(cf_it);
write_status(std::move(s), status);
} else {
write_status_impl(status, StatusCode::kMaxCode, StatusSubCode::kMaxSubCode, StatusSeverity::kSoftError, 3);
}
// When should we call DestroyColumnFamilyHandle?
}
inline unique_ptr<vector<string>> get_column_family_names_raw() const {
auto ret = make_unique<vector<string>>();
for (auto entry : handles) {
ret->push_back(entry.first);
}
return ret;
} }
}; };
inline std::unique_ptr <std::vector<std::string>> list_column_families(const OptionsBridge &options,
const rust::Slice<const uint8_t> path) {
auto column_families = std::make_unique < std::vector < std::string >> ();
RDB::DB::ListColumnFamilies(options.inner,
std::string(reinterpret_cast<const char *>(path.data()), path.size()),
&*column_families);
return column_families;
}
inline std::unique_ptr <DBBridge> inline std::unique_ptr <DBBridge>
open_db(const OptionsBridge &options, const rust::Slice<const uint8_t> path, Status &status) { open_db_raw(const OptionsBridge &options,
auto old_column_families = std::vector<std::string>(); const string &path,
RDB::DB::ListColumnFamilies(options.inner, BridgeStatus &status) {
std::string(reinterpret_cast<const char *>(path.data()), path.size()), auto cf_names = std::vector<std::string>();
&old_column_families); DB::ListColumnFamilies(options.inner, path, &cf_names);
if (old_column_families.empty()) { if (cf_names.empty()) {
old_column_families.push_back(RDB::kDefaultColumnFamilyName); cf_names.push_back(kDefaultColumnFamilyName);
} }
std::vector <RDB::ColumnFamilyDescriptor> column_families; std::vector <ColumnFamilyDescriptor> column_families;
for (auto el: old_column_families) { for (auto el: cf_names) {
column_families.push_back(RDB::ColumnFamilyDescriptor( column_families.push_back(ColumnFamilyDescriptor(
el, options.inner)); el, options.inner));
} }
std::vector < RDB::ColumnFamilyHandle * > handles; std::vector < ColumnFamilyHandle * > handles;
DB *db_ptr;
Status s = DB::Open(options.inner, path, column_families, &handles, &db_ptr);
RDB::DB *db_ptr; auto ok = s.ok();
RDB::Status s = RDB::DB::Open(options.inner,
std::string(reinterpret_cast<const char *>(path.data()), path.size()),
column_families,
&handles,
&db_ptr);
write_status(std::move(s), status); write_status(std::move(s), status);
auto ret = std::unique_ptr<DBBridge>(new DBBridge(db_ptr)); unordered_map <string, shared_ptr<ColumnFamilyHandle>> handle_map;
ret->handles = std::move(handles); if (ok) {
return ret; assert(handles.size() == cf_names.size());
} for (size_t i = 0; i < handles.size(); ++i) {
handle_map[cf_names[i]] = shared_ptr<ColumnFamilyHandle>(handles[i]);
}
}
return std::make_unique<DBBridge>(db_ptr, std::move(handle_map));
}

@ -1,18 +1,19 @@
////
//// Created by Ziyang Hu on 2022/4/13.
////
// //
// Created by Ziyang Hu on 2022/4/13.
//
#include "../include/cozorocks.h" #include "../include/cozorocks.h"
#include "cozo-rocks/src/lib.rs.h" #include "cozo-rocks/src/lib.rs.h"
void write_status_impl(Status &status, StatusCode code, StatusSubCode subcode, StatusSeverity severity) { void write_status_impl(BridgeStatus &status, StatusCode code, StatusSubCode subcode, StatusSeverity severity, int bridge_code) {
status.code = code; status.code = code;
status.subcode = subcode; status.subcode = subcode;
status.severity = severity; status.severity = severity;
status.bridge_code = static_cast<StatusBridgeCode>(bridge_code);
} }
Status IteratorBridge::status() const { BridgeStatus IteratorBridge::status() const {
Status s; BridgeStatus s;
write_status(inner->status(), s); write_status(inner->status(), s);
return s; return s;
} }

@ -1,10 +1,19 @@
#[cxx::bridge] #[cxx::bridge]
mod ffi { mod ffi {
#[derive(Copy, Clone, Debug, Eq, PartialEq)] #[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub struct Status { pub enum StatusBridgeCode {
OK = 0,
LOCK_ERROR = 1,
EXISTING_ERROR = 2,
NOT_FOUND_ERROR = 3,
}
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub struct BridgeStatus {
pub code: StatusCode, pub code: StatusCode,
pub subcode: StatusSubCode, pub subcode: StatusSubCode,
pub severity: StatusSeverity, pub severity: StatusSeverity,
pub bridge_code: StatusBridgeCode,
} }
#[derive(Copy, Clone, Debug, Eq, PartialEq)] #[derive(Copy, Clone, Debug, Eq, PartialEq)]
@ -66,60 +75,66 @@ mod ffi {
type StatusSeverity; type StatusSeverity;
type PinnableSliceBridge; type PinnableSliceBridge;
pub fn as_bytes(self: &PinnableSliceBridge) -> &[u8]; fn as_bytes(self: &PinnableSliceBridge) -> &[u8];
type SliceBridge; type SliceBridge;
pub fn as_bytes(self: &SliceBridge) -> &[u8]; fn as_bytes(self: &SliceBridge) -> &[u8];
type ReadOptionsBridge; type ReadOptionsBridge;
fn new_read_options() -> UniquePtr<ReadOptionsBridge>; fn new_read_options() -> UniquePtr<ReadOptionsBridge>;
pub fn do_set_verify_checksums(self: &ReadOptionsBridge, v: bool); fn do_set_verify_checksums(self: &ReadOptionsBridge, v: bool);
pub fn do_set_total_order_seek(self: &ReadOptionsBridge, v: bool); fn do_set_total_order_seek(self: &ReadOptionsBridge, v: bool);
type WriteOptionsBridge; type WriteOptionsBridge;
fn new_write_options() -> UniquePtr<WriteOptionsBridge>; fn new_write_options() -> UniquePtr<WriteOptionsBridge>;
pub fn do_set_disable_wal(self: &WriteOptionsBridge, v: bool); fn do_set_disable_wal(self: &WriteOptionsBridge, v: bool);
type OptionsBridge; type OptionsBridge;
fn new_options() -> UniquePtr<OptionsBridge>; fn new_options() -> UniquePtr<OptionsBridge>;
pub fn do_prepare_for_bulk_load(self: &OptionsBridge); fn do_prepare_for_bulk_load(self: &OptionsBridge);
pub fn do_increase_parallelism(self: &OptionsBridge); fn do_increase_parallelism(self: &OptionsBridge);
pub fn do_optimize_level_style_compaction(self: &OptionsBridge); fn do_optimize_level_style_compaction(self: &OptionsBridge);
pub fn do_set_create_if_missing(self: &OptionsBridge, v: bool); fn do_set_create_if_missing(self: &OptionsBridge, v: bool);
pub fn do_set_comparator(self: &OptionsBridge, name: &str, compare: fn(&[u8], &[u8]) -> i8); fn do_set_comparator(self: &OptionsBridge, name: &str, compare: fn(&[u8], &[u8]) -> i8);
pub type ColumnFamilyHandle;
type DBBridge; type DBBridge;
fn list_column_families(options: &OptionsBridge, path: &[u8]) -> UniquePtr<CxxVector<CxxString>>; fn open_db_raw(options: &OptionsBridge, path: &CxxString, status: &mut BridgeStatus) -> UniquePtr<DBBridge>;
fn open_db(options: &OptionsBridge, path: &[u8], status: &mut Status) -> UniquePtr<DBBridge>; fn get_cf_handle_raw(self: &DBBridge, name: &CxxString) -> SharedPtr<ColumnFamilyHandle>;
fn cf_names(self: &DBBridge) -> UniquePtr<CxxVector<CxxString>>; fn put_raw(self: &DBBridge, options: &WriteOptionsBridge, cf: &ColumnFamilyHandle, key: &[u8], val: &[u8], status: &mut BridgeStatus);
fn put(self: &DBBridge, options: &WriteOptionsBridge, cf_id: usize, key: &[u8], val: &[u8], status: &mut Status); fn get_raw(self: &DBBridge, options: &ReadOptionsBridge, cf: &ColumnFamilyHandle, key: &[u8], status: &mut BridgeStatus) -> UniquePtr<PinnableSliceBridge>;
fn get(self: &DBBridge, options: &ReadOptionsBridge, cf_id: usize, key: &[u8], status: &mut Status) -> UniquePtr<PinnableSliceBridge>; fn iterator_raw(self: &DBBridge, options: &ReadOptionsBridge, cf: &ColumnFamilyHandle) -> UniquePtr<IteratorBridge>;
fn write_batch(self: &DBBridge) -> UniquePtr<WriteBatchBridge>; fn create_column_family_raw(self: &DBBridge, options: &OptionsBridge, name: &CxxString, status: &mut BridgeStatus);
fn iterator(self: &DBBridge, options: &ReadOptionsBridge, cf_id: usize) -> UniquePtr<IteratorBridge>; fn drop_column_family_raw(self: &DBBridge, name: &CxxString, status: &mut BridgeStatus);
fn get_column_family_names_raw(self: &DBBridge) -> UniquePtr<CxxVector<CxxString>>;
type WriteBatchBridge;
// fn write_batch(self: &DBBridge) -> UniquePtr<WriteBatchBridge>;
//
// type WriteBatchBridge;
//
type IteratorBridge; type IteratorBridge;
pub fn seek_to_first(self: &IteratorBridge); fn seek_to_first(self: &IteratorBridge);
pub fn seek_to_last(self: &IteratorBridge); fn seek_to_last(self: &IteratorBridge);
pub fn next(self: &IteratorBridge); fn next(self: &IteratorBridge);
pub fn is_valid(self: &IteratorBridge) -> bool; fn is_valid(self: &IteratorBridge) -> bool;
fn do_seek(self: &IteratorBridge, key: &[u8]); fn do_seek(self: &IteratorBridge, key: &[u8]);
fn do_seek_for_prev(self: &IteratorBridge, key: &[u8]); fn do_seek_for_prev(self: &IteratorBridge, key: &[u8]);
pub fn key(self: &IteratorBridge) -> UniquePtr<SliceBridge>; fn key(self: &IteratorBridge) -> UniquePtr<SliceBridge>;
pub fn value(self: &IteratorBridge) -> UniquePtr<SliceBridge>; fn value(self: &IteratorBridge) -> UniquePtr<SliceBridge>;
pub fn status(self: &IteratorBridge) -> Status; fn status(self: &IteratorBridge) -> BridgeStatus;
} }
} }
use std::path::Path;
pub use ffi::{Status, StatusCode, StatusSubCode, StatusSeverity}; use cxx::{UniquePtr, SharedPtr, let_cxx_string};
use std::collections::BTreeMap;
use cxx::UniquePtr;
use ffi::*; use ffi::*;
type Result<T> = std::result::Result<T, BridgeStatus>;
pub type Options = UniquePtr<OptionsBridge>; pub type Options = UniquePtr<OptionsBridge>;
type ColumnFamilyHandle = SharedPtr<ffi::ColumnFamilyHandle>;
pub trait OptionsTrait { pub trait OptionsTrait {
fn prepare_for_bulk_load(self) -> Self; fn prepare_for_bulk_load(self) -> Self;
fn increase_parallelism(self) -> Self; fn increase_parallelism(self) -> Self;
@ -228,12 +243,12 @@ impl AsRef<[u8]> for Slice {
pub type Iterator = UniquePtr<IteratorBridge>; pub type Iterator = UniquePtr<IteratorBridge>;
pub trait IteratorTrait { pub trait IteratorImpl {
fn seek(&self, key: impl AsRef<[u8]>); fn seek(&self, key: impl AsRef<[u8]>);
fn seek_for_prev(&self, key: impl AsRef<[u8]>); fn seek_for_prev(&self, key: impl AsRef<[u8]>);
} }
impl IteratorTrait for Iterator { impl IteratorImpl for IteratorBridge {
fn seek(&self, key: impl AsRef<[u8]>) { fn seek(&self, key: impl AsRef<[u8]>) {
self.do_seek(key.as_ref()); self.do_seek(key.as_ref());
} }
@ -242,14 +257,6 @@ impl IteratorTrait for Iterator {
} }
} }
pub struct DB {
bridge: UniquePtr<DBBridge>,
pub options: Options,
pub default_read_options: ReadOptions,
pub default_write_options: WriteOptions,
pub column_families: BTreeMap<String, usize>,
}
fn get_path_bytes(path: &std::path::Path) -> &[u8] { fn get_path_bytes(path: &std::path::Path) -> &[u8] {
#[cfg(unix)] #[cfg(unix)]
{ {
@ -261,94 +268,191 @@ fn get_path_bytes(path: &std::path::Path) -> &[u8] {
{ path.to_string_lossy().to_string().as_bytes() } { path.to_string_lossy().to_string().as_bytes() }
} }
impl DB { //
#[inline] // #[inline]
pub fn list_column_families(options: &Options, path: impl AsRef<std::path::Path>) -> Vec<String> { // pub fn write_batch(&self) -> UniquePtr<WriteBatchBridge> {
let results = list_column_families(&options, get_path_bytes(path.as_ref())); // self.bridge.write_batch()
results.iter().map(|s| s.to_string_lossy().into_owned()).collect() // }
} //
// // #[inline]
// // pub fn get_column_family_id(&self, name: impl AsRef<str>) -> Result<Option<usize>, Status> {
// // let handles = self.cf_map.read()
// // .map_err(|_| Status::bridge(StatusBridgeCode::LOCK_ERROR))?;
// // Ok(handles.get(name.as_ref()).copied())
// // }
//
// // #[inline]
// // pub fn create_column_family(&self, name: impl AsRef<str>) -> Result<(), Status> {
// // let mut s = Status::default();
// // let mut cf_map = self.cf_map.write()
// // .map_err(|_| Status::bridge(StatusBridgeCode::LOCK_ERROR))?;
// // let mut cfs = self.cfs.write()
// // .map_err(|_| Status::bridge(StatusBridgeCode::LOCK_ERROR))?;
// // let v = self.bridge.create_column_family(&self.options, name.as_ref(), &mut s);
// // if v > 0 {
// // assert_eq!(v as usize, cfs.len());
// // cf_map.insert(name.as_ref().to_string(), v as usize);
// // cfs.push(name.as_ref().to_string());
// // Ok(())
// // } else {
// // Err(s)
// // }
// // }
//
// // #[inline]
// // pub fn drop_column_family(&self, _name: impl AsRef<str>) -> Result<(), Status> {
// // unimplemented!()
// // }
//
// pub fn destroy_data(self) -> Result<(), Status> {
// unimplemented!()
// }
// }
//
impl Default for BridgeStatus {
#[inline] #[inline]
pub fn open(options: Options, path: impl AsRef<std::path::Path>) -> Result<Self, Status> { fn default() -> Self {
let mut status = Status::default(); Self {
let bridge = open_db( code: StatusCode::kOk,
&options, subcode: StatusSubCode::kNone,
get_path_bytes(path.as_ref()), severity: StatusSeverity::kNoError,
&mut status, bridge_code: StatusBridgeCode::OK,
);
if status.code == StatusCode::kOk {
let column_families = bridge.cf_names().iter().enumerate().map(|(i, v)| (v.to_string_lossy().into_owned(), i)).collect();
Ok(Self {
bridge,
default_read_options: ReadOptions::default(),
default_write_options: WriteOptions::default(),
options,
column_families,
})
} else {
Err(status)
} }
} }
}
impl BridgeStatus {
#[inline] #[inline]
pub fn put(&self, key: impl AsRef<[u8]>, val: impl AsRef<[u8]>, cf: usize, options: Option<&WriteOptions>) -> Result<Status, Status> { fn bridge(c: StatusBridgeCode) -> Self {
let mut status = Status::default(); Self {
self.bridge.put(options.unwrap_or(&self.default_write_options), cf, code: StatusCode::kMaxCode,
key.as_ref(), val.as_ref(), subcode: StatusSubCode::kMaxSubCode,
&mut status); severity: StatusSeverity::kMaxSeverity,
if status.code == StatusCode::kOk { bridge_code: c,
Ok(status)
} else {
Err(status)
} }
} }
}
pub trait DBRead {
fn get(&self, key: impl AsRef<[u8]>, cf: &ColumnFamilyHandle, options: Option<&ReadOptions>)
-> Result<Option<PinnableSlice>>;
}
pub trait DBWrite {
fn put(&self, key: impl AsRef<[u8]>, val: impl AsRef<[u8]>, cf: &ColumnFamilyHandle, options: Option<&WriteOptions>)
-> Result<BridgeStatus>;
}
pub struct DB {
inner: UniquePtr<DBBridge>,
pub options: Options,
pub default_read_options: ReadOptions,
pub default_write_options: WriteOptions,
}
impl DBRead for DB {
#[inline] #[inline]
pub fn get(&self, key: impl AsRef<[u8]>, cf: usize, options: Option<&ReadOptions>) -> Result<Option<PinnableSlice>, Status> { fn get(&self, key: impl AsRef<[u8]>, cf: &ColumnFamilyHandle, options: Option<&ReadOptions>) -> Result<Option<PinnableSlice>> {
let mut status = Status::default(); let mut status = BridgeStatus::default();
let slice = self.bridge.get( let slice = self.inner.get_raw(options.unwrap_or(&self.default_read_options), cf, key.as_ref(), &mut status);
options.unwrap_or(&self.default_read_options), cf,
key.as_ref(), &mut status);
match status.code { match status.code {
StatusCode::kOk => Ok(Some(PinnableSlice(slice))), StatusCode::kOk => Ok(Some(PinnableSlice(slice))),
StatusCode::kNotFound => Ok(None), StatusCode::kNotFound => Ok(None),
_ => Err(status) _ => Err(status)
} }
} }
}
impl DBWrite for DB {
#[inline] #[inline]
pub fn iterator(&self, cf: usize, options: Option<&ReadOptions>) -> Iterator { fn put(&self, key: impl AsRef<[u8]>, val: impl AsRef<[u8]>, cf: &ColumnFamilyHandle, options: Option<&WriteOptions>) -> Result<BridgeStatus> {
self.bridge.iterator(options.unwrap_or(&self.default_read_options), cf) let mut status = BridgeStatus::default();
self.inner.put_raw(options.unwrap_or(&self.default_write_options), cf,
key.as_ref(), val.as_ref(),
&mut status);
if status.code == StatusCode::kOk {
Ok(status)
} else {
Err(status)
}
} }
}
#[inline]
pub fn write_batch(&self) -> UniquePtr<WriteBatchBridge> { pub trait DBImpl {
self.bridge.write_batch() fn open(options: Options, path: &Path) -> Result<DB>;
fn get_cf_handle(&self, name: impl AsRef<str>) -> Result<ColumnFamilyHandle>;
fn iterator(&self, cf: &ColumnFamilyHandle, options: Option<&ReadOptions>) -> Iterator;
fn create_column_family(&self, name: impl AsRef<str>) -> Result<()>;
fn drop_column_family(&self, name: impl AsRef<str>) -> Result<()>;
fn get_column_family_names(&self) -> Vec<String>;
}
impl DBImpl for DB {
fn open(options: Options, path: &Path) -> Result<DB> {
let_cxx_string!(path = get_path_bytes(path));
let mut status = BridgeStatus::default();
let bridge = open_db_raw(
&options,
&path,
&mut status,
);
if status.code == StatusCode::kOk {
Ok(DB {
inner: bridge,
options,
default_read_options: ReadOptions::default(),
default_write_options: WriteOptions::default(),
})
} else {
Err(status)
}
} }
#[inline] fn get_cf_handle(&self, name: impl AsRef<str>) -> Result<ColumnFamilyHandle> {
pub fn create_column_family(&self, _name: impl AsRef<str>) -> Result<usize, Status> { let_cxx_string!(name = name.as_ref());
unimplemented!() let ret = self.inner.get_cf_handle_raw(&name);
if ret.is_null() {
Err(BridgeStatus {
code: StatusCode::kMaxCode,
subcode: StatusSubCode::kMaxSubCode,
severity: StatusSeverity::kSoftError,
bridge_code: StatusBridgeCode::NOT_FOUND_ERROR
})
} else {
Ok(ret)
}
} }
#[inline] #[inline]
pub fn drop_column_family(&self, _name: impl AsRef<str>) -> Result<(), Status> { fn iterator(&self, cf: &ColumnFamilyHandle, options: Option<&ReadOptions>) -> Iterator {
unimplemented!() self.inner.iterator_raw(options.unwrap_or(&self.default_read_options), cf)
} }
pub fn destroy_data(self) -> Result<(), Status> { fn create_column_family(&self, name: impl AsRef<str>) -> Result<()> {
unimplemented!() let_cxx_string!(name = name.as_ref());
let mut status = BridgeStatus::default();
self.inner.create_column_family_raw(&self.options, &name, &mut status);
if status.code == StatusCode::kOk {
Ok(())
} else {
Err(status)
}
} }
}
impl Default for Status { fn drop_column_family(&self, name: impl AsRef<str>) -> Result<()> {
#[inline] let_cxx_string!(name = name.as_ref());
fn default() -> Self { let mut status = BridgeStatus::default();
Self { self.inner.drop_column_family_raw(&name, &mut status);
code: StatusCode::kOk, if status.code == StatusCode::kOk {
subcode: StatusSubCode::kNone, Ok(())
severity: StatusSeverity::kNoError, } else {
Err(status)
} }
} }
fn get_column_family_names(&self) -> Vec<String> {
self.inner.get_column_family_names_raw().iter().map(|v| v.to_string_lossy().to_string()).collect()
}
} }

@ -58,6 +58,7 @@ impl Storage {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::str::from_utf8; use std::str::from_utf8;
use cozo_rocks::DBImpl;
use crate::value::{ByteArrayBuilder, cozo_comparator_v1, Value}; use crate::value::{ByteArrayBuilder, cozo_comparator_v1, Value};
#[test] #[test]
@ -70,7 +71,7 @@ mod tests {
.set_comparator("cozo_comparator_v1", cozo_comparator_v1); .set_comparator("cozo_comparator_v1", cozo_comparator_v1);
let db = DB::open(options, let db = DB::open(options,
"xxyyzz.db").unwrap(); "xxyyzz.db".as_ref()).unwrap();
let mut builder = ByteArrayBuilder::default(); let mut builder = ByteArrayBuilder::default();
builder.build_value(&Value::RefString("A key")); builder.build_value(&Value::RefString("A key"));
@ -79,20 +80,22 @@ mod tests {
let mut builder = ByteArrayBuilder::default(); let mut builder = ByteArrayBuilder::default();
builder.build_value(&Value::RefString("Another key")); builder.build_value(&Value::RefString("Another key"));
let key2 = builder; let key2 = builder;
let cf = db.get_cf_handle("default").unwrap();
println!("{:?}", db.get_column_family_names());
let val = db.get(&key, 0, None).unwrap(); let val = db.get(&key, &cf, None).unwrap();
println!("before anything {}", val.is_none()); println!("before anything {}", val.is_none());
db.put(&key, "A motherfucking value!!! 👋👋👋", 0, None).unwrap(); db.put(&key, "A motherfucking value!!! 👋👋👋", &cf, None).unwrap();
db.put(&key2, "Another motherfucking value!!! 👋👋👋", 0, None).unwrap(); db.put(&key2, "Another motherfucking value!!! 👋👋👋", &cf, None).unwrap();
// db.put("Yes man", "A motherfucking value!!! 👋👋👋", None).unwrap(); // db.put("Yes man", "A motherfucking value!!! 👋👋👋", None).unwrap();
let val = db.get(&key, 0, None).unwrap().unwrap(); let val = db.get(&key, &cf, None).unwrap().unwrap();
println!("1 {}", from_utf8(val.as_ref()).unwrap()); println!("1 {}", from_utf8(val.as_ref()).unwrap());
let val = db.get(&key2, 0, None).unwrap().unwrap(); let val = db.get(&key2, &cf, None).unwrap().unwrap();
// let val = val.as_bytes(); // let val = val.as_bytes();
println!("2 {}", from_utf8(val.as_ref()).unwrap()); println!("2 {}", from_utf8(val.as_ref()).unwrap());
let val = db.get(&key, 0, None).unwrap().unwrap(); let val = db.get(&key, &cf, None).unwrap().unwrap();
println!("3 {}", from_utf8(val.as_ref()).unwrap()); println!("3 {}", from_utf8(val.as_ref()).unwrap());
println!("4 {}", from_utf8(db.get(&key, 0, None).unwrap().unwrap().as_ref()).unwrap()); println!("4 {}", from_utf8(db.get(&key, &cf, None).unwrap().unwrap().as_ref()).unwrap());
} }
} }
Loading…
Cancel
Save