diff --git a/cozo-rocks/include/cozorocks.h b/cozo-rocks/include/cozorocks.h index b8a37860..41008cc0 100644 --- a/cozo-rocks/include/cozorocks.h +++ b/cozo-rocks/include/cozorocks.h @@ -5,6 +5,7 @@ #pragma once #include +#include #include "rust/cxx.h" #include "rocksdb/db.h" @@ -234,6 +235,7 @@ inline unique_ptr new_write_batch_raw() { struct DBBridge { mutable unique_ptr db; mutable unordered_map > handles; + mutable std::mutex handle_lock; DBBridge(DB *db_, unordered_map > &&handles_) : db(db_), handles(handles_) {} @@ -313,13 +315,16 @@ struct DBBridge { write_status_impl(status, StatusCode::kMaxCode, StatusSubCode::kMaxSubCode, StatusSeverity::kSoftError, 2); return; } + handle_lock.lock(); ColumnFamilyHandle *handle; auto s = db->CreateColumnFamily(options.inner, name, &handle); write_status(std::move(s), status); handles[name] = shared_ptr(handle); + handle_lock.unlock(); } inline void drop_column_family_raw(const string &name, BridgeStatus &status) const { + handle_lock.lock(); auto cf_it = handles.find(name); if (cf_it != handles.end()) { auto s = db->DropColumnFamily(cf_it->second.get()); @@ -328,6 +333,7 @@ struct DBBridge { } else { write_status_impl(status, StatusCode::kMaxCode, StatusSubCode::kMaxSubCode, StatusSeverity::kSoftError, 3); } + handle_lock.unlock(); // When should we call DestroyColumnFamilyHandle? } diff --git a/cozo-rocks/src/lib.rs b/cozo-rocks/src/lib.rs index 326c1d63..ade17ffc 100644 --- a/cozo-rocks/src/lib.rs +++ b/cozo-rocks/src/lib.rs @@ -131,7 +131,7 @@ mod ffi { use std::fmt::Formatter; use std::fmt::Debug; use std::path::Path; -use cxx::{UniquePtr, SharedPtr, let_cxx_string}; +use cxx::{UniquePtr, SharedPtr, let_cxx_string, CxxString}; pub use ffi::BridgeStatus; pub use ffi::StatusBridgeCode; pub use ffi::StatusCode; @@ -288,15 +288,15 @@ impl IteratorImpl for IteratorBridge { } } -fn get_path_bytes(path: &std::path::Path) -> &[u8] { +fn get_path_bytes(path: &std::path::Path) -> Vec { #[cfg(unix)] { use std::os::unix::ffi::OsStrExt; - path.as_os_str().as_bytes() + path.as_os_str().as_bytes().to_vec() } #[cfg(not(unix))] - { path.to_string_lossy().to_string().as_bytes() } + { path.to_string_lossy().to_string().as_bytes().to_vec() } } impl Default for BridgeStatus { @@ -313,6 +313,7 @@ impl Default for BridgeStatus { pub struct DB { inner: UniquePtr, + pub path: Vec, pub options: Options, pub default_read_options: ReadOptions, pub default_write_options: WriteOptions, @@ -322,35 +323,21 @@ unsafe impl Send for DB {} unsafe impl Sync for DB {} -pub trait DBImpl { - fn open(options: Options, path: &Path) -> Result; - fn get_cf_handle(&self, name: impl AsRef) -> Result; - fn iterator(&self, cf: &ColumnFamilyHandle, options: Option<&ReadOptions>) -> DBIterator; - fn create_column_family(&self, name: impl AsRef) -> Result<()>; - fn drop_column_family(&self, name: impl AsRef) -> Result<()>; - fn all_cf_names(&self) -> Vec; - fn get(&self, key: impl AsRef<[u8]>, cf: &ColumnFamilyHandle, options: Option<&ReadOptions>) - -> Result>; - fn put(&self, key: impl AsRef<[u8]>, val: impl AsRef<[u8]>, cf: &ColumnFamilyHandle, options: Option<&WriteOptions>) - -> Result; - fn delete(&self, key: impl AsRef<[u8]>, cf: &ColumnFamilyHandle, options: Option<&WriteOptions>) - -> Result; - fn write(&self, updates: WriteBatch, options: Option<&WriteOptions>) -> Result; -} - -impl DBImpl for DB { - fn open(options: Options, path: &Path) -> Result { - let_cxx_string!(path = get_path_bytes(path)); +impl DB { + pub fn open(options: Options, path: &Path) -> Result { + let path = get_path_bytes(path); + let_cxx_string!(cpp_path = path.clone()); let mut status = BridgeStatus::default(); let bridge = open_db_raw( &options, - &path, + &cpp_path, &mut status, ); if status.code == StatusCode::kOk { Ok(DB { inner: bridge, + path, options, default_read_options: ReadOptions::default(), default_write_options: WriteOptions::default(), @@ -360,7 +347,7 @@ impl DBImpl for DB { } } - fn get_cf_handle(&self, name: impl AsRef) -> Result { + pub fn get_cf_handle(&self, name: impl AsRef) -> Result { let_cxx_string!(name = name.as_ref()); let ret = self.inner.get_cf_handle_raw(&name); if ret.is_null() { @@ -376,11 +363,11 @@ impl DBImpl for DB { } #[inline] - fn iterator(&self, cf: &ColumnFamilyHandle, options: Option<&ReadOptions>) -> DBIterator { + pub fn iterator(&self, cf: &ColumnFamilyHandle, options: Option<&ReadOptions>) -> DBIterator { self.inner.iterator_raw(options.unwrap_or(&self.default_read_options), cf) } - fn create_column_family(&self, name: impl AsRef) -> Result<()> { + pub fn create_column_family(&self, name: impl AsRef) -> Result<()> { let_cxx_string!(name = name.as_ref()); let mut status = BridgeStatus::default(); self.inner.create_column_family_raw(&self.options, &name, &mut status); @@ -391,7 +378,7 @@ impl DBImpl for DB { } } - fn drop_column_family(&self, name: impl AsRef) -> Result<()> { + pub fn drop_column_family(&self, name: impl AsRef) -> Result<()> { let_cxx_string!(name = name.as_ref()); let mut status = BridgeStatus::default(); self.inner.drop_column_family_raw(&name, &mut status); @@ -402,12 +389,12 @@ impl DBImpl for DB { } } - fn all_cf_names(&self) -> Vec { + pub fn all_cf_names(&self) -> Vec { self.inner.get_column_family_names_raw().iter().map(|v| v.to_string_lossy().to_string()).collect() } #[inline] - fn get(&self, key: impl AsRef<[u8]>, cf: &ColumnFamilyHandle, options: Option<&ReadOptions>) -> Result> { + pub fn get(&self, key: impl AsRef<[u8]>, cf: &ColumnFamilyHandle, options: Option<&ReadOptions>) -> Result> { let mut status = BridgeStatus::default(); let slice = self.inner.get_raw(options.unwrap_or(&self.default_read_options), cf, key.as_ref(), &mut status); match status.code { @@ -418,7 +405,7 @@ impl DBImpl for DB { } #[inline] - fn put(&self, key: impl AsRef<[u8]>, val: impl AsRef<[u8]>, cf: &ColumnFamilyHandle, options: Option<&WriteOptions>) -> Result { + pub fn put(&self, key: impl AsRef<[u8]>, val: impl AsRef<[u8]>, cf: &ColumnFamilyHandle, options: Option<&WriteOptions>) -> Result { let mut status = BridgeStatus::default(); self.inner.put_raw(options.unwrap_or(&self.default_write_options), cf, key.as_ref(), val.as_ref(), @@ -431,7 +418,7 @@ impl DBImpl for DB { } #[inline] - fn delete(&self, key: impl AsRef<[u8]>, cf: &ColumnFamilyHandle, options: Option<&WriteOptions>) -> Result { + pub fn delete(&self, key: impl AsRef<[u8]>, cf: &ColumnFamilyHandle, options: Option<&WriteOptions>) -> Result { let mut status = BridgeStatus::default(); self.inner.delete_raw(options.unwrap_or(&self.default_write_options), cf, key.as_ref(), @@ -444,7 +431,7 @@ impl DBImpl for DB { } #[inline] - fn write(&self, mut updates: WriteBatch, options: Option<&WriteOptions>) -> Result { + pub fn write(&self, mut updates: WriteBatch, options: Option<&WriteOptions>) -> Result { let mut status = BridgeStatus::default(); self.inner.write_raw(options.unwrap_or(&self.default_write_options), updates.pin_mut(), diff --git a/src/definition.rs b/src/definition.rs index 8d45275f..ff6ae2a4 100644 --- a/src/definition.rs +++ b/src/definition.rs @@ -572,7 +572,7 @@ mod tests { let mut eval = EvaluatorWithStorage::new("_path_for_rocksdb_storagex".to_string()).unwrap(); eval.build_table(parsed).unwrap(); eval.restore_metadata().unwrap(); - eval.storage.delete().unwrap(); + eval.storage.delete_storage().unwrap(); println!("{:#?}", eval.env.resolve("Person")); println!("{:#?}", eval.env.resolve("Friend")); } diff --git a/src/error.rs b/src/error.rs index 94235e0c..8db7e603 100644 --- a/src/error.rs +++ b/src/error.rs @@ -31,6 +31,9 @@ pub enum CozoError { #[error("Value required")] ValueRequired, + #[error("Incompatible value")] + IncompatibleValue, + #[error("Wrong type")] WrongType, diff --git a/src/mutation.rs b/src/mutation.rs index 8be6d13c..9933355b 100644 --- a/src/mutation.rs +++ b/src/mutation.rs @@ -3,7 +3,7 @@ use pest::iterators::Pair; use crate::ast::{build_expr, Expr, ExprVisitor}; use crate::definition::build_name_in_def; use crate::env::Env; -use crate::error::CozoError::{UndefinedTable, ValueRequired}; +use crate::error::CozoError::{IncompatibleValue, UndefinedTable, ValueRequired}; use crate::eval::Evaluator; use crate::storage::{RocksStorage}; use crate::error::Result; @@ -44,12 +44,13 @@ impl Evaluator { Expr::Const(v) => v, _ => return Err(ValueRequired) }; + let val = val.get_list().ok_or(IncompatibleValue)?; println!("{:#?}", val); let coerced_values = self.coerce_table_values(&val, main_target); Ok(()) } - fn coerce_table_values(&self, values: &Value, table: Option<&Structured>) -> BTreeMap<&Structured, Vec> { + fn coerce_table_values(&self, values: &[Value], default_table: Option<&Structured>) -> BTreeMap<&Structured, Vec> { todo!() } } @@ -62,7 +63,7 @@ mod tests { use crate::ast::{Expr, ExprVisitor, parse_expr_from_str}; use crate::eval::{BareEvaluator, EvaluatorWithStorage}; use pest::Parser as PestParser; - use cozo_rocks::DBImpl; + use cozo_rocks::*; use crate::env::Env; use crate::typing::Structured; @@ -80,7 +81,7 @@ mod tests { let data = fs::read_to_string("test_data/hr.json")?; let parsed = parse_expr_from_str(&data)?; - let mut ev = BareEvaluator::default(); + let ev = BareEvaluator::default(); let evaluated = ev.visit_expr(&parsed)?; let bound_value = match evaluated { Expr::Const(v) => v, diff --git a/src/storage.rs b/src/storage.rs index c201ff79..037a95d3 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -1,3 +1,4 @@ +use std::fs; use crate::error::{CozoError, Result}; use cozo_rocks::*; use crate::value::{cozo_comparator_v1}; @@ -23,10 +24,13 @@ impl RocksStorage { } #[allow(unused_variables)] - pub fn delete(&mut self) -> Result<()> { + pub fn delete_storage(self) -> Result<()> { // unimplemented!() // drop(self.db.take()); // DB::destroy(&make_options(), &self.path)?; + let path = self.path.clone(); + drop(self); + fs::remove_dir_all(path); Ok(()) }