write our own FFI
parent
e61265d2a6
commit
85eac84a27
@ -0,0 +1 @@
|
||||
rocksdb
|
@ -1,15 +0,0 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<module type="CPP_MODULE" version="4">
|
||||
<component name="NewModuleRootManager">
|
||||
<content url="file://$MODULE_DIR$">
|
||||
<sourceFolder url="file://$MODULE_DIR$/cozo_parser/src" isTestSource="false" />
|
||||
<sourceFolder url="file://$MODULE_DIR$/src" isTestSource="false" />
|
||||
<sourceFolder url="file://$MODULE_DIR$/src/ast/op/src" isTestSource="false" />
|
||||
<excludeFolder url="file://$MODULE_DIR$/target" />
|
||||
<excludeFolder url="file://$MODULE_DIR$/cozo_parser/target" />
|
||||
<excludeFolder url="file://$MODULE_DIR$/src/ast/op/target" />
|
||||
</content>
|
||||
<orderEntry type="inheritedJdk" />
|
||||
<orderEntry type="sourceFolder" forTests="false" />
|
||||
</component>
|
||||
</module>
|
@ -0,0 +1,6 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project version="4">
|
||||
<component name="CMakeWorkspace" PROJECT_DIR="$PROJECT_DIR$/rocksdb">
|
||||
<contentRoot DIR="$PROJECT_DIR$" />
|
||||
</component>
|
||||
</project>
|
@ -0,0 +1,2 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<module classpath="CMake" type="CPP_MODULE" version="4" />
|
@ -0,0 +1,12 @@
|
||||
[package]
|
||||
name = "cozo-rocks-sys"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
cxx = "1.0.66"
|
||||
|
||||
[build-dependencies]
|
||||
cxx-build = "1.0.66"
|
@ -0,0 +1,16 @@
|
||||
fn main() {
|
||||
cxx_build::bridge("src/lib.rs")
|
||||
.file("src/cozorocks.cc")
|
||||
.include("../rocksdb/include")
|
||||
.include("include")
|
||||
.flag_if_supported("-std=c++17")
|
||||
.compile("rocksdb-sys");
|
||||
|
||||
println!("cargo:rustc-link-search=rocksdb/");
|
||||
println!("cargo:rustc-link-lib=rocksdb");
|
||||
println!("cargo:rustc-link-lib=z");
|
||||
println!("cargo:rustc-link-lib=bz2");
|
||||
println!("cargo:rerun-if-changed=src/main.rs");
|
||||
println!("cargo:rerun-if-changed=src/cozorocks.cc");
|
||||
println!("cargo:rerun-if-changed=include/cozorocks.h");
|
||||
}
|
@ -0,0 +1,86 @@
|
||||
//
|
||||
// Created by Ziyang Hu on 2022/4/13.
|
||||
//
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <memory>
|
||||
#include "rust/cxx.h"
|
||||
|
||||
#include "rocksdb/db.h"
|
||||
#include "rocksdb/slice.h"
|
||||
#include "rocksdb/options.h"
|
||||
#include "rocksdb/utilities/transaction.h"
|
||||
#include "rocksdb/utilities/transaction_db.h"
|
||||
|
||||
struct Status;
|
||||
|
||||
typedef ROCKSDB_NAMESPACE::Status::Code StatusCode;
|
||||
typedef ROCKSDB_NAMESPACE::Status::SubCode StatusSubCode;
|
||||
typedef ROCKSDB_NAMESPACE::Status::Severity StatusSeverity;
|
||||
|
||||
std::unique_ptr<ROCKSDB_NAMESPACE::DB> new_db();
|
||||
|
||||
struct Options {
|
||||
mutable ROCKSDB_NAMESPACE::Options inner;
|
||||
|
||||
public:
|
||||
inline void prepare_for_bulk_load() const {
|
||||
inner.PrepareForBulkLoad();
|
||||
}
|
||||
|
||||
inline void increase_parallelism() const {
|
||||
inner.IncreaseParallelism();
|
||||
}
|
||||
|
||||
inline void optimize_level_style_compaction() const {
|
||||
inner.OptimizeLevelStyleCompaction();
|
||||
};
|
||||
|
||||
inline void set_create_if_missing(bool v) const {
|
||||
inner.create_if_missing = v;
|
||||
}
|
||||
};
|
||||
|
||||
inline std::unique_ptr<Options> new_options() {
|
||||
return std::unique_ptr<Options>(new Options);
|
||||
}
|
||||
|
||||
|
||||
struct PinnableSlice {
|
||||
ROCKSDB_NAMESPACE::PinnableSlice inner;
|
||||
|
||||
inline rust::Slice<const std::uint8_t> as_bytes() const {
|
||||
return rust::Slice(reinterpret_cast<const std::uint8_t *>(inner.data()), inner.size());
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
struct DB {
|
||||
mutable ROCKSDB_NAMESPACE::DB *inner;
|
||||
|
||||
inline ~DB() {
|
||||
if (inner != nullptr) {
|
||||
delete inner;
|
||||
}
|
||||
}
|
||||
|
||||
void put(rust::Slice<const uint8_t> key, rust::Slice<const uint8_t> val, Status &status) const;
|
||||
|
||||
inline std::unique_ptr<PinnableSlice> get(rust::Slice<const uint8_t> key) const {
|
||||
auto pinnable_val = std::make_unique<PinnableSlice>();
|
||||
inner->Get(ROCKSDB_NAMESPACE::ReadOptions(),
|
||||
inner->DefaultColumnFamily(),
|
||||
ROCKSDB_NAMESPACE::Slice(reinterpret_cast<const char *>(key.data()), key.size()),
|
||||
&pinnable_val->inner);
|
||||
return pinnable_val;
|
||||
}
|
||||
};
|
||||
|
||||
inline std::unique_ptr<DB> open_db(const Options &options, const rust::Str path) {
|
||||
ROCKSDB_NAMESPACE::DB *db_ptr;
|
||||
ROCKSDB_NAMESPACE::Status s = ROCKSDB_NAMESPACE::DB::Open(options.inner, std::string(path), &db_ptr);
|
||||
auto db = std::unique_ptr<DB>(new DB);
|
||||
db->inner = db_ptr;
|
||||
return db;
|
||||
}
|
@ -0,0 +1,134 @@
|
||||
//
|
||||
// Created by Ziyang Hu on 2022/4/13.
|
||||
//
|
||||
|
||||
#include <iostream>
|
||||
#include "cozorocks.h"
|
||||
|
||||
|
||||
#include "rocksdb/db.h"
|
||||
#include "rocksdb/slice.h"
|
||||
#include "rocksdb/options.h"
|
||||
#include "rocksdb/utilities/transaction.h"
|
||||
#include "rocksdb/utilities/transaction_db.h"
|
||||
#include "cozo-rocks-sys/src/lib.rs.h"
|
||||
|
||||
//using ROCKSDB_NAMESPACE::DB;
|
||||
//using ROCKSDB_NAMESPACE::Options;
|
||||
//using ROCKSDB_NAMESPACE::PinnableSlice;
|
||||
//using ROCKSDB_NAMESPACE::ReadOptions;
|
||||
//using ROCKSDB_NAMESPACE::Status;
|
||||
//using ROCKSDB_NAMESPACE::WriteBatch;
|
||||
//using ROCKSDB_NAMESPACE::WriteOptions;
|
||||
//using ROCKSDB_NAMESPACE::ColumnFamilyDescriptor;
|
||||
//using ROCKSDB_NAMESPACE::ColumnFamilyHandle;
|
||||
//using ROCKSDB_NAMESPACE::ColumnFamilyOptions;
|
||||
//using ROCKSDB_NAMESPACE::Slice;
|
||||
//using ROCKSDB_NAMESPACE::Snapshot;
|
||||
//using ROCKSDB_NAMESPACE::Transaction;
|
||||
//using ROCKSDB_NAMESPACE::TransactionDB;
|
||||
//using ROCKSDB_NAMESPACE::TransactionDBOptions;
|
||||
//using ROCKSDB_NAMESPACE::TransactionOptions;
|
||||
|
||||
|
||||
#if defined(OS_WIN)
|
||||
std::string kDBPath = "C:\\Windows\\TEMP\\rocksdb_simple_example";
|
||||
#else
|
||||
std::string kDBPath = "/tmp/rocksdb_simple_example";
|
||||
#endif
|
||||
|
||||
//std::unique_ptr<DB> new_db() {
|
||||
// DB *db_ptr;
|
||||
// Options options;
|
||||
// // Optimize RocksDB. This is the easiest way to get RocksDB to perform well
|
||||
// options.IncreaseParallelism();
|
||||
// options.OptimizeLevelStyleCompaction();
|
||||
// // create the DB if it's not already present
|
||||
// options.create_if_missing = true;
|
||||
//
|
||||
// // open DB
|
||||
// Status s = DB::Open(options, kDBPath, &db_ptr);
|
||||
// std::unique_ptr<DB> db(db_ptr);
|
||||
// assert(s.ok());
|
||||
//
|
||||
// // Put key-value
|
||||
// s = db->Put(WriteOptions(), "key1", "value");
|
||||
// assert(s.ok());
|
||||
// std::string value;
|
||||
// // get value
|
||||
// s = db->Get(ReadOptions(), "key1", &value);
|
||||
// assert(s.ok());
|
||||
// assert(value == "value");
|
||||
//
|
||||
// // atomically apply a set of updates
|
||||
// {
|
||||
// WriteBatch batch;
|
||||
// batch.Delete("key1");
|
||||
// batch.Put("key2", value);
|
||||
// s = db->Write(WriteOptions(), &batch);
|
||||
// }
|
||||
//
|
||||
// s = db->Get(ReadOptions(), "key1", &value);
|
||||
// assert(s.IsNotFound());
|
||||
//
|
||||
// db->Get(ReadOptions(), "key2", &value);
|
||||
// assert(value == "value");
|
||||
// std::cout << value << " and fuck!" << std::endl;
|
||||
//
|
||||
// {
|
||||
// PinnableSlice pinnable_val;
|
||||
// db->Get(ReadOptions(), db->DefaultColumnFamily(), "key2", &pinnable_val);
|
||||
// assert(pinnable_val == "value");
|
||||
// }
|
||||
//
|
||||
// {
|
||||
// std::string string_val;
|
||||
// // If it cannot pin the value, it copies the value to its internal buffer.
|
||||
// // The intenral buffer could be set during construction.
|
||||
// PinnableSlice pinnable_val(&string_val);
|
||||
// db->Get(ReadOptions(), db->DefaultColumnFamily(), "key2", &pinnable_val);
|
||||
// assert(pinnable_val == "value");
|
||||
// // If the value is not pinned, the internal buffer must have the value.
|
||||
// assert(pinnable_val.IsPinned() || string_val == "value");
|
||||
// }
|
||||
//
|
||||
// PinnableSlice pinnable_val;
|
||||
// s = db->Get(ReadOptions(), db->DefaultColumnFamily(), "key1", &pinnable_val);
|
||||
// assert(s.IsNotFound());
|
||||
// // Reset PinnableSlice after each use and before each reuse
|
||||
// pinnable_val.Reset();
|
||||
// db->Get(ReadOptions(), db->DefaultColumnFamily(), "key2", &pinnable_val);
|
||||
// assert(pinnable_val == "value");
|
||||
// pinnable_val.Reset();
|
||||
// // The Slice pointed by pinnable_val is not valid after this point
|
||||
//
|
||||
// std::cout << "hello from C++" << std::endl;
|
||||
//// return std::unique_ptr<BlobstoreClient>(new BlobstoreClient());
|
||||
// return db;
|
||||
//}
|
||||
|
||||
//std::unique_ptr<CozoRocksDB> open_db(const Options& options, const std::string& path) {
|
||||
// DB *db_ptr;
|
||||
// // Optimize RocksDB. This is the easiest way to get RocksDB to perform well
|
||||
//// options.IncreaseParallelism();
|
||||
//// options.OptimizeLevelStyleCompaction();
|
||||
// // create the DB if it's not already present
|
||||
//// options.create_if_missing = true;
|
||||
//
|
||||
// // open DB
|
||||
// Status s = DB::Open(options, path, &db_ptr);
|
||||
// std::unique_ptr<DB> db(db_ptr);
|
||||
// std::unique_ptr<CozoRocksDB> cdb(new CozoRocksDB{});
|
||||
// cdb->db = std::move(db);
|
||||
// cdb->db_status = std::move(s);
|
||||
// return cdb;
|
||||
//}
|
||||
|
||||
void DB::put(rust::Slice<const uint8_t> key, rust::Slice<const uint8_t> val, Status &status) const {
|
||||
auto s = inner->Put(ROCKSDB_NAMESPACE::WriteOptions(),
|
||||
ROCKSDB_NAMESPACE::Slice(reinterpret_cast<const char *>(key.data()), key.size()),
|
||||
ROCKSDB_NAMESPACE::Slice(reinterpret_cast<const char *>(val.data()), val.size()));
|
||||
status.code = s.code();
|
||||
status.subcode = s.subcode();
|
||||
status.severity = s.severity();
|
||||
}
|
@ -0,0 +1,95 @@
|
||||
#[cxx::bridge]
|
||||
mod ffi {
|
||||
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
|
||||
struct Status {
|
||||
code: StatusCode,
|
||||
subcode: StatusSubCode,
|
||||
severity: StatusSeverity
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
|
||||
enum StatusCode {
|
||||
kOk = 0,
|
||||
kNotFound = 1,
|
||||
kCorruption = 2,
|
||||
kNotSupported = 3,
|
||||
kInvalidArgument = 4,
|
||||
kIOError = 5,
|
||||
kMergeInProgress = 6,
|
||||
kIncomplete = 7,
|
||||
kShutdownInProgress = 8,
|
||||
kTimedOut = 9,
|
||||
kAborted = 10,
|
||||
kBusy = 11,
|
||||
kExpired = 12,
|
||||
kTryAgain = 13,
|
||||
kCompactionTooLarge = 14,
|
||||
kColumnFamilyDropped = 15,
|
||||
kMaxCode,
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
|
||||
enum StatusSubCode {
|
||||
kNone = 0,
|
||||
kMutexTimeout = 1,
|
||||
kLockTimeout = 2,
|
||||
kLockLimit = 3,
|
||||
kNoSpace = 4,
|
||||
kDeadlock = 5,
|
||||
kStaleFile = 6,
|
||||
kMemoryLimit = 7,
|
||||
kSpaceLimit = 8,
|
||||
kPathNotFound = 9,
|
||||
KMergeOperandsInsufficientCapacity = 10,
|
||||
kManualCompactionPaused = 11,
|
||||
kOverwritten = 12,
|
||||
kTxnNotPrepared = 13,
|
||||
kIOFenced = 14,
|
||||
kMaxSubCode,
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
|
||||
enum StatusSeverity {
|
||||
kNoError = 0,
|
||||
kSoftError = 1,
|
||||
kHardError = 2,
|
||||
kFatalError = 3,
|
||||
kUnrecoverableError = 4,
|
||||
kMaxSeverity,
|
||||
}
|
||||
|
||||
unsafe extern "C++" {
|
||||
include!("cozo-rocks-sys/include/cozorocks.h");
|
||||
|
||||
type DB;
|
||||
type Options;
|
||||
type PinnableSlice;
|
||||
|
||||
type StatusCode;
|
||||
type StatusSubCode;
|
||||
type StatusSeverity;
|
||||
|
||||
fn as_bytes(self: &PinnableSlice) -> &[u8];
|
||||
|
||||
fn new_options() -> UniquePtr<Options>;
|
||||
fn prepare_for_bulk_load(self: &Options);
|
||||
fn increase_parallelism(self: &Options);
|
||||
fn optimize_level_style_compaction(self: &Options);
|
||||
fn set_create_if_missing(self: &Options, v: bool);
|
||||
|
||||
fn open_db(options: &Options, path: &str) -> UniquePtr<DB>;
|
||||
fn put(self: &DB, key: &[u8], val: &[u8], status: &mut Status);
|
||||
fn get(self: &DB, key: &[u8]) -> UniquePtr<PinnableSlice>;
|
||||
}
|
||||
}
|
||||
pub use ffi::*;
|
||||
|
||||
impl Status {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
code: StatusCode::kOk,
|
||||
subcode: StatusSubCode::kNone,
|
||||
severity: StatusSeverity::kNoError
|
||||
}
|
||||
}
|
||||
}
|
@ -1,75 +1,91 @@
|
||||
use rocksdb::{ColumnFamilyDescriptor, DB, Options, WriteOptions};
|
||||
use crate::error::CozoError::DatabaseClosed;
|
||||
use crate::error::Result;
|
||||
use crate::value::cozo_comparator_v1;
|
||||
|
||||
|
||||
pub struct Storage {
|
||||
pub db: Option<DB>,
|
||||
pub db: Option<()>,
|
||||
path: String,
|
||||
}
|
||||
//
|
||||
// fn make_options() -> Options {
|
||||
// let mut options = Options::default();
|
||||
//
|
||||
// options.create_missing_column_families(true);
|
||||
// options.create_if_missing(true);
|
||||
// options.set_comparator("cozo_comparator_v1", cozo_comparator_v1);
|
||||
// options
|
||||
// }
|
||||
|
||||
fn make_options() -> Options {
|
||||
let mut options = Options::default();
|
||||
|
||||
options.create_missing_column_families(true);
|
||||
options.create_if_missing(true);
|
||||
options.set_comparator("cozo_comparator_v1", cozo_comparator_v1);
|
||||
options
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
fn make_write_options(global: bool) -> WriteOptions {
|
||||
let mut options = WriteOptions::new();
|
||||
options.disable_wal(!global);
|
||||
options
|
||||
}
|
||||
// #[allow(dead_code)]
|
||||
// fn make_write_options(global: bool) -> WriteOptions {
|
||||
// let mut options = WriteOptions::new();
|
||||
// options.disable_wal(!global);
|
||||
// options
|
||||
// }
|
||||
|
||||
impl Storage {
|
||||
pub fn no_storage() -> Self {
|
||||
Self { db: None, path: "".to_string() }
|
||||
}
|
||||
pub fn new(path: String) -> Result<Self> {
|
||||
let options = make_options();
|
||||
let cfs = match DB::list_cf(&options, &path) {
|
||||
Ok(cfs) => { cfs }
|
||||
Err(_) => { vec![] }
|
||||
};
|
||||
let cfs = cfs.into_iter().map(|name| {
|
||||
ColumnFamilyDescriptor::new(name, make_options())
|
||||
});
|
||||
let db = DB::open_cf_descriptors(&options, &path, cfs)?;
|
||||
Ok(Storage { db: Some(db), path })
|
||||
unimplemented!()
|
||||
// let options = make_options();
|
||||
// let cfs = match DB::list_cf(&options, &path) {
|
||||
// Ok(cfs) => { cfs }
|
||||
// Err(_) => { vec![] }
|
||||
// };
|
||||
// let cfs = cfs.into_iter().map(|name| {
|
||||
// ColumnFamilyDescriptor::new(name, make_options())
|
||||
// });
|
||||
// let db = DB::open_cf_descriptors(&options, &path, cfs)?;
|
||||
// Ok(Storage { db: Some(db), path })
|
||||
}
|
||||
pub fn delete(&mut self) -> Result<()> {
|
||||
drop(self.db.take());
|
||||
DB::destroy(&make_options(), &self.path)?;
|
||||
unimplemented!();
|
||||
// drop(self.db.take());
|
||||
// DB::destroy(&make_options(), &self.path)?;
|
||||
Ok(())
|
||||
}
|
||||
pub fn put_global(&self, k: &[u8], v: &[u8]) -> Result<()> {
|
||||
let db = self.db.as_ref().ok_or(DatabaseClosed)?;
|
||||
db.put(k, v)?;
|
||||
// let db = self.db.as_ref().ok_or(DatabaseClosed)?;
|
||||
// db.put(k, v)?;
|
||||
unimplemented!();
|
||||
Ok(())
|
||||
}
|
||||
pub fn create_table(&mut self, name: &str, _global: bool) -> Result<()> {
|
||||
let db = self.db.as_mut().ok_or(DatabaseClosed)?;
|
||||
db.create_cf(name, &make_options())?;
|
||||
unimplemented!();
|
||||
// let db = self.db.as_mut().ok_or(DatabaseClosed)?;
|
||||
// db.create_cf(name, &make_options())?;
|
||||
Ok(())
|
||||
}
|
||||
pub fn drop_table(&mut self, name: &str, _global: bool) -> Result<()> {
|
||||
let db = self.db.as_mut().ok_or(DatabaseClosed)?;
|
||||
db.drop_cf(name)?;
|
||||
unimplemented!();
|
||||
// let db = self.db.as_mut().ok_or(DatabaseClosed)?;
|
||||
// db.drop_cf(name)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::str::from_utf8;
|
||||
use crate::typing::BaseType::String;
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn storage() {
|
||||
let mut s = Storage::new("_path_for_rocksdb_storage".to_string()).unwrap();
|
||||
s.delete().unwrap();
|
||||
fn import() {
|
||||
use cozo_rocks_sys::*;
|
||||
|
||||
let options = new_options();
|
||||
options.increase_parallelism();
|
||||
options.optimize_level_style_compaction();
|
||||
options.set_create_if_missing(true);
|
||||
let db = open_db(&options, "xxyyzz");
|
||||
let mut status = Status::new();
|
||||
db.put("A key".as_bytes(), "A motherfucking value!!! 👋👋👋".as_bytes(), &mut status);
|
||||
let val = db.get("A key".as_bytes());
|
||||
let val = val.as_bytes();
|
||||
println!("{:?} {}", status, from_utf8(val).unwrap());
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue