engine creation

main
Ziyang Hu 2 years ago
parent 4835a3d832
commit 2afb722683

@ -493,7 +493,7 @@ open_tdb_raw(const Options &options,
std::vector<ColumnFamilyHandle *> handles;
TransactionDB *txn_db;
TransactionDB *txn_db = nullptr;
Status s = TransactionDB::Open(options, txn_db_options, path,
column_families, &handles,
@ -524,7 +524,7 @@ open_odb_raw(const Options &options,
std::vector<ColumnFamilyHandle *> handles;
OptimisticTransactionDB *txn_db;
OptimisticTransactionDB *txn_db = nullptr;
Status s = OptimisticTransactionDB::Open(options, txn_db_options, path,
column_families, &handles,

@ -15,12 +15,14 @@ fn main() {
// b.flag_if_supported("-std=c++17")
// .compile("cozorocks-autocxx"); // arbitrary library name, pick anything
println!("cargo:rustc-link-search=deps/lib/");
println!("cargo:rustc-link-search=/opt/homebrew/lib/");
println!("cargo:rustc-link-lib=rocksdb");
println!("cargo:rustc-link-lib=z");
println!("cargo:rustc-link-lib=bz2");
println!("cargo:rustc-link-lib=lz4");
println!("cargo:rustc-link-lib=snappy");
println!("cargo:rustc-link-lib=zstd");
println!("cargo:rustc-link-lib=jemalloc");
println!("cargo:rerun-if-changed=src/bridge.rs");
println!("cargo:rerun-if-changed=bridge/cozorocks.cc");
println!("cargo:rerun-if-changed=bridge/cozorocks.h");

@ -62,7 +62,9 @@ impl BridgeStatus {
impl From<BridgeStatus> for Option<BridgeError> {
#[inline]
fn from(s: BridgeStatus) -> Self {
if s.severity == StatusSeverity::kNoError && s.bridge_code == StatusBridgeCode::OK {
if s.severity == StatusSeverity::kNoError &&
s.bridge_code == StatusBridgeCode::OK &&
s.code == StatusCode::kOk {
None
} else {
Some(BridgeError { status: s })
@ -533,36 +535,37 @@ unsafe impl Send for DBPtr {}
unsafe impl Sync for DBPtr {}
pub enum TransactOption {
pub enum TransactOptions {
Pessimistic(TransactionOptionsPtr),
Optimistic(OptimisticTransactionOptionsPtr),
}
impl DBPtr {
pub fn open_pessimistic(options: &OptionsPtr, t_options: &TransactionDBOptionsPtr, path: impl AsRef<str>) -> Result<Self> {
let_cxx_string!(cname = path.as_ref());
let mut status = BridgeStatus::default();
let ret = open_tdb_raw(options, t_options, &cname, &mut status);
status.check_err(Self(ret))
}
pub enum TDBOptions {
Pessimistic(TransactionDBOptionsPtr),
Optimistic(OptimisticTransactionDBOptionsPtr),
}
pub fn open_optimistic(options: &OptionsPtr, t_options: &OptimisticTransactionDBOptionsPtr, path: impl AsRef<str>) -> Result<Self> {
impl DBPtr {
pub fn open(options: &OptionsPtr, t_options: &TDBOptions, path: impl AsRef<str>) -> Result<Self> {
let_cxx_string!(cname = path.as_ref());
let mut status = BridgeStatus::default();
let ret = open_odb_raw(options, t_options, &cname, &mut status);
let ret = match t_options {
TDBOptions::Pessimistic(o) => open_tdb_raw(options, o, &cname, &mut status),
TDBOptions::Optimistic(o) => open_odb_raw(options, o, &cname, &mut status)
};
status.check_err(Self(ret))
}
#[inline]
pub fn make_transaction(&self,
options: TransactOption,
options: TransactOptions,
read_ops: ReadOptionsPtr,
raw_read_ops: ReadOptionsPtr,
write_ops: WriteOptionsPtr,
raw_write_ops: WriteOptionsPtr,
) -> TransactionPtr {
TransactionPtr(match options {
TransactOption::Optimistic(o) => {
TransactOptions::Optimistic(o) => {
self.begin_o_transaction(
write_ops.0,
raw_write_ops.0,
@ -571,7 +574,7 @@ impl DBPtr {
o.0,
)
}
TransactOption::Pessimistic(o) => {
TransactOptions::Pessimistic(o) => {
self.begin_t_transaction(
write_ops.0,
raw_write_ops.0,
@ -610,4 +613,9 @@ impl DBPtr {
pub fn cf_names(&self) -> Vec<String> {
self.get_column_family_names_raw().iter().map(|v| v.to_string_lossy().to_string()).collect()
}
pub fn drop_non_default_cfs(&self) {
for name in self.cf_names() {
self.drop_cf(name);
}
}
}

@ -2,19 +2,48 @@
// will be shared among threads
use cozorocks::*;
use std::sync::{Arc, RwLock};
use std::sync::atomic::AtomicUsize;
struct EngineOptions {
cmp: RustComparatorPtr,
options: OptionsPtr,
t_options: TDBOptions,
path: String,
}
pub struct Engine {
db: (),
session_handles: Vec<Arc<RwLock<SessionHandle>>>
db: DBPtr,
options_store: Box<EngineOptions>,
session_handles: Vec<Arc<SessionHandle>>,
}
unsafe impl Send for Engine {}
unsafe impl Sync for Engine {}
impl Engine {
pub fn new(_x: String) -> Self {
todo!()
pub fn new(path: String, optimistic: bool) -> Result<Self> {
let t_options = if optimistic {
TDBOptions::Optimistic(OptimisticTransactionDBOptionsPtr::default())
} else {
TDBOptions::Pessimistic(TransactionDBOptionsPtr::default())
};
let mut options = OptionsPtr::default();
let cmp = RustComparatorPtr::new("cozo_cmp_v1", crate::relation::key_order::compare);
options.set_comparator(&cmp).increase_parallelism().optimize_level_style_compaction().set_create_if_missing(true);
let e_options = Box::new(EngineOptions { cmp, options, t_options, path });
let db = DBPtr::open(&e_options.options, &e_options.t_options, &e_options.path)?;
db.drop_non_default_cfs();
Ok(Self {
db,
options_store: e_options,
session_handles: vec![]
})
}
pub fn session(&self) -> Session {
// find a handle if there is one available
// otherwise create
todo!()
}
}
@ -28,11 +57,44 @@ pub struct Session<'a> {
pub struct SessionHandle {
cf_ident: String,
status: SessionStatus
status: SessionStatus,
table_idx: AtomicUsize
}
pub enum SessionStatus {
Prepared,
Running,
Completed
Completed,
}
#[cfg(test)]
mod tests {
use std::fs;
use super::*;
#[test]
fn test_create() {
let p1= "_test_db_create1";
let p2 = "_test_db_create2";
let p3 = "_test_db_create3";
{
{
let engine = Engine::new(p1.to_string(), true);
assert!(engine.is_ok());
let engine = Engine::new(p2.to_string(), true);
assert!(engine.is_ok());
let engine = Engine::new(p3.to_string(), true);
assert!(engine.is_ok());
let engine2 = Engine::new(p1.to_string(), false);
assert!(engine2.is_err());
}
let engine2 = Engine::new(p2.to_string(), false);
assert!(engine2.is_ok());
let engine2 = Engine::new(p3.to_string(), false);
assert!(engine2.is_ok());
}
let _ = fs::remove_dir_all(p1);
let _ = fs::remove_dir_all(p2);
let _ = fs::remove_dir_all(p3);
}
}

@ -1,3 +1,5 @@
extern crate core;
// pub mod value;
// pub mod typing;
// pub mod env;

@ -329,6 +329,21 @@ impl Tuple<Vec<u8>> {
};
self.push_varint(u);
}
#[inline]
pub fn concat_data<T: AsRef<[u8]>>(&mut self, other: &Tuple<T>) {
let other_data_part = &other.as_ref()[4..];
self.data.extend_from_slice(other_data_part);
}
}
impl <'a> Extend<Value<'a>> for Tuple<Vec<u8>> {
#[inline]
fn extend<T: IntoIterator<Item=Value<'a>>>(&mut self, iter: T) {
for v in iter {
self.push_value(&v)
}
}
}
impl<T: AsRef<[u8]>> PartialEq for Tuple<T> {

Loading…
Cancel
Save