make cf creation/deletion thread-safe

main
Ziyang Hu 2 years ago
parent be6f922738
commit f9fa75765d

@ -5,6 +5,7 @@
#pragma once #pragma once
#include <memory> #include <memory>
#include <mutex>
#include "rust/cxx.h" #include "rust/cxx.h"
#include "rocksdb/db.h" #include "rocksdb/db.h"
@ -234,6 +235,7 @@ inline unique_ptr <WriteBatchBridge> new_write_batch_raw() {
struct DBBridge { struct DBBridge {
mutable unique_ptr <DB> db; mutable unique_ptr <DB> db;
mutable unordered_map <string, shared_ptr<ColumnFamilyHandle>> handles; mutable unordered_map <string, shared_ptr<ColumnFamilyHandle>> handles;
mutable std::mutex handle_lock;
DBBridge(DB *db_, DBBridge(DB *db_,
unordered_map <string, shared_ptr<ColumnFamilyHandle>> &&handles_) : db(db_), handles(handles_) {} unordered_map <string, shared_ptr<ColumnFamilyHandle>> &&handles_) : db(db_), handles(handles_) {}
@ -313,13 +315,16 @@ struct DBBridge {
write_status_impl(status, StatusCode::kMaxCode, StatusSubCode::kMaxSubCode, StatusSeverity::kSoftError, 2); write_status_impl(status, StatusCode::kMaxCode, StatusSubCode::kMaxSubCode, StatusSeverity::kSoftError, 2);
return; return;
} }
handle_lock.lock();
ColumnFamilyHandle *handle; ColumnFamilyHandle *handle;
auto s = db->CreateColumnFamily(options.inner, name, &handle); auto s = db->CreateColumnFamily(options.inner, name, &handle);
write_status(std::move(s), status); write_status(std::move(s), status);
handles[name] = shared_ptr<ColumnFamilyHandle>(handle); handles[name] = shared_ptr<ColumnFamilyHandle>(handle);
handle_lock.unlock();
} }
inline void drop_column_family_raw(const string &name, BridgeStatus &status) const { inline void drop_column_family_raw(const string &name, BridgeStatus &status) const {
handle_lock.lock();
auto cf_it = handles.find(name); auto cf_it = handles.find(name);
if (cf_it != handles.end()) { if (cf_it != handles.end()) {
auto s = db->DropColumnFamily(cf_it->second.get()); auto s = db->DropColumnFamily(cf_it->second.get());
@ -328,6 +333,7 @@ struct DBBridge {
} else { } else {
write_status_impl(status, StatusCode::kMaxCode, StatusSubCode::kMaxSubCode, StatusSeverity::kSoftError, 3); write_status_impl(status, StatusCode::kMaxCode, StatusSubCode::kMaxSubCode, StatusSeverity::kSoftError, 3);
} }
handle_lock.unlock();
// When should we call DestroyColumnFamilyHandle? // When should we call DestroyColumnFamilyHandle?
} }

@ -131,7 +131,7 @@ mod ffi {
use std::fmt::Formatter; use std::fmt::Formatter;
use std::fmt::Debug; use std::fmt::Debug;
use std::path::Path; use std::path::Path;
use cxx::{UniquePtr, SharedPtr, let_cxx_string}; use cxx::{UniquePtr, SharedPtr, let_cxx_string, CxxString};
pub use ffi::BridgeStatus; pub use ffi::BridgeStatus;
pub use ffi::StatusBridgeCode; pub use ffi::StatusBridgeCode;
pub use ffi::StatusCode; pub use ffi::StatusCode;
@ -288,15 +288,15 @@ impl IteratorImpl for IteratorBridge {
} }
} }
fn get_path_bytes(path: &std::path::Path) -> &[u8] { fn get_path_bytes(path: &std::path::Path) -> Vec<u8> {
#[cfg(unix)] #[cfg(unix)]
{ {
use std::os::unix::ffi::OsStrExt; use std::os::unix::ffi::OsStrExt;
path.as_os_str().as_bytes() path.as_os_str().as_bytes().to_vec()
} }
#[cfg(not(unix))] #[cfg(not(unix))]
{ path.to_string_lossy().to_string().as_bytes() } { path.to_string_lossy().to_string().as_bytes().to_vec() }
} }
impl Default for BridgeStatus { impl Default for BridgeStatus {
@ -313,6 +313,7 @@ impl Default for BridgeStatus {
pub struct DB { pub struct DB {
inner: UniquePtr<DBBridge>, inner: UniquePtr<DBBridge>,
pub path: Vec<u8>,
pub options: Options, pub options: Options,
pub default_read_options: ReadOptions, pub default_read_options: ReadOptions,
pub default_write_options: WriteOptions, pub default_write_options: WriteOptions,
@ -322,35 +323,21 @@ unsafe impl Send for DB {}
unsafe impl Sync for DB {} unsafe impl Sync for DB {}
pub trait DBImpl { impl DB {
fn open(options: Options, path: &Path) -> Result<DB>; pub fn open(options: Options, path: &Path) -> Result<DB> {
fn get_cf_handle(&self, name: impl AsRef<str>) -> Result<ColumnFamilyHandle>; let path = get_path_bytes(path);
fn iterator(&self, cf: &ColumnFamilyHandle, options: Option<&ReadOptions>) -> DBIterator; let_cxx_string!(cpp_path = path.clone());
fn create_column_family(&self, name: impl AsRef<str>) -> Result<()>;
fn drop_column_family(&self, name: impl AsRef<str>) -> Result<()>;
fn all_cf_names(&self) -> Vec<String>;
fn get(&self, key: impl AsRef<[u8]>, cf: &ColumnFamilyHandle, options: Option<&ReadOptions>)
-> Result<Option<PinnableSlice>>;
fn put(&self, key: impl AsRef<[u8]>, val: impl AsRef<[u8]>, cf: &ColumnFamilyHandle, options: Option<&WriteOptions>)
-> Result<BridgeStatus>;
fn delete(&self, key: impl AsRef<[u8]>, cf: &ColumnFamilyHandle, options: Option<&WriteOptions>)
-> Result<BridgeStatus>;
fn write(&self, updates: WriteBatch, options: Option<&WriteOptions>) -> Result<BridgeStatus>;
}
impl DBImpl for DB {
fn open(options: Options, path: &Path) -> Result<DB> {
let_cxx_string!(path = get_path_bytes(path));
let mut status = BridgeStatus::default(); let mut status = BridgeStatus::default();
let bridge = open_db_raw( let bridge = open_db_raw(
&options, &options,
&path, &cpp_path,
&mut status, &mut status,
); );
if status.code == StatusCode::kOk { if status.code == StatusCode::kOk {
Ok(DB { Ok(DB {
inner: bridge, inner: bridge,
path,
options, options,
default_read_options: ReadOptions::default(), default_read_options: ReadOptions::default(),
default_write_options: WriteOptions::default(), default_write_options: WriteOptions::default(),
@ -360,7 +347,7 @@ impl DBImpl for DB {
} }
} }
fn get_cf_handle(&self, name: impl AsRef<str>) -> Result<ColumnFamilyHandle> { pub fn get_cf_handle(&self, name: impl AsRef<str>) -> Result<ColumnFamilyHandle> {
let_cxx_string!(name = name.as_ref()); let_cxx_string!(name = name.as_ref());
let ret = self.inner.get_cf_handle_raw(&name); let ret = self.inner.get_cf_handle_raw(&name);
if ret.is_null() { if ret.is_null() {
@ -376,11 +363,11 @@ impl DBImpl for DB {
} }
#[inline] #[inline]
fn iterator(&self, cf: &ColumnFamilyHandle, options: Option<&ReadOptions>) -> DBIterator { pub fn iterator(&self, cf: &ColumnFamilyHandle, options: Option<&ReadOptions>) -> DBIterator {
self.inner.iterator_raw(options.unwrap_or(&self.default_read_options), cf) self.inner.iterator_raw(options.unwrap_or(&self.default_read_options), cf)
} }
fn create_column_family(&self, name: impl AsRef<str>) -> Result<()> { pub fn create_column_family(&self, name: impl AsRef<str>) -> Result<()> {
let_cxx_string!(name = name.as_ref()); let_cxx_string!(name = name.as_ref());
let mut status = BridgeStatus::default(); let mut status = BridgeStatus::default();
self.inner.create_column_family_raw(&self.options, &name, &mut status); self.inner.create_column_family_raw(&self.options, &name, &mut status);
@ -391,7 +378,7 @@ impl DBImpl for DB {
} }
} }
fn drop_column_family(&self, name: impl AsRef<str>) -> Result<()> { pub fn drop_column_family(&self, name: impl AsRef<str>) -> Result<()> {
let_cxx_string!(name = name.as_ref()); let_cxx_string!(name = name.as_ref());
let mut status = BridgeStatus::default(); let mut status = BridgeStatus::default();
self.inner.drop_column_family_raw(&name, &mut status); self.inner.drop_column_family_raw(&name, &mut status);
@ -402,12 +389,12 @@ impl DBImpl for DB {
} }
} }
fn all_cf_names(&self) -> Vec<String> { pub fn all_cf_names(&self) -> Vec<String> {
self.inner.get_column_family_names_raw().iter().map(|v| v.to_string_lossy().to_string()).collect() self.inner.get_column_family_names_raw().iter().map(|v| v.to_string_lossy().to_string()).collect()
} }
#[inline] #[inline]
fn get(&self, key: impl AsRef<[u8]>, cf: &ColumnFamilyHandle, options: Option<&ReadOptions>) -> Result<Option<PinnableSlice>> { pub fn get(&self, key: impl AsRef<[u8]>, cf: &ColumnFamilyHandle, options: Option<&ReadOptions>) -> Result<Option<PinnableSlice>> {
let mut status = BridgeStatus::default(); let mut status = BridgeStatus::default();
let slice = self.inner.get_raw(options.unwrap_or(&self.default_read_options), cf, key.as_ref(), &mut status); let slice = self.inner.get_raw(options.unwrap_or(&self.default_read_options), cf, key.as_ref(), &mut status);
match status.code { match status.code {
@ -418,7 +405,7 @@ impl DBImpl for DB {
} }
#[inline] #[inline]
fn put(&self, key: impl AsRef<[u8]>, val: impl AsRef<[u8]>, cf: &ColumnFamilyHandle, options: Option<&WriteOptions>) -> Result<BridgeStatus> { pub fn put(&self, key: impl AsRef<[u8]>, val: impl AsRef<[u8]>, cf: &ColumnFamilyHandle, options: Option<&WriteOptions>) -> Result<BridgeStatus> {
let mut status = BridgeStatus::default(); let mut status = BridgeStatus::default();
self.inner.put_raw(options.unwrap_or(&self.default_write_options), cf, self.inner.put_raw(options.unwrap_or(&self.default_write_options), cf,
key.as_ref(), val.as_ref(), key.as_ref(), val.as_ref(),
@ -431,7 +418,7 @@ impl DBImpl for DB {
} }
#[inline] #[inline]
fn delete(&self, key: impl AsRef<[u8]>, cf: &ColumnFamilyHandle, options: Option<&WriteOptions>) -> Result<BridgeStatus> { pub fn delete(&self, key: impl AsRef<[u8]>, cf: &ColumnFamilyHandle, options: Option<&WriteOptions>) -> Result<BridgeStatus> {
let mut status = BridgeStatus::default(); let mut status = BridgeStatus::default();
self.inner.delete_raw(options.unwrap_or(&self.default_write_options), cf, self.inner.delete_raw(options.unwrap_or(&self.default_write_options), cf,
key.as_ref(), key.as_ref(),
@ -444,7 +431,7 @@ impl DBImpl for DB {
} }
#[inline] #[inline]
fn write(&self, mut updates: WriteBatch, options: Option<&WriteOptions>) -> Result<BridgeStatus> { pub fn write(&self, mut updates: WriteBatch, options: Option<&WriteOptions>) -> Result<BridgeStatus> {
let mut status = BridgeStatus::default(); let mut status = BridgeStatus::default();
self.inner.write_raw(options.unwrap_or(&self.default_write_options), self.inner.write_raw(options.unwrap_or(&self.default_write_options),
updates.pin_mut(), updates.pin_mut(),

@ -572,7 +572,7 @@ mod tests {
let mut eval = EvaluatorWithStorage::new("_path_for_rocksdb_storagex".to_string()).unwrap(); let mut eval = EvaluatorWithStorage::new("_path_for_rocksdb_storagex".to_string()).unwrap();
eval.build_table(parsed).unwrap(); eval.build_table(parsed).unwrap();
eval.restore_metadata().unwrap(); eval.restore_metadata().unwrap();
eval.storage.delete().unwrap(); eval.storage.delete_storage().unwrap();
println!("{:#?}", eval.env.resolve("Person")); println!("{:#?}", eval.env.resolve("Person"));
println!("{:#?}", eval.env.resolve("Friend")); println!("{:#?}", eval.env.resolve("Friend"));
} }

@ -31,6 +31,9 @@ pub enum CozoError {
#[error("Value required")] #[error("Value required")]
ValueRequired, ValueRequired,
#[error("Incompatible value")]
IncompatibleValue,
#[error("Wrong type")] #[error("Wrong type")]
WrongType, WrongType,

@ -3,7 +3,7 @@ use pest::iterators::Pair;
use crate::ast::{build_expr, Expr, ExprVisitor}; use crate::ast::{build_expr, Expr, ExprVisitor};
use crate::definition::build_name_in_def; use crate::definition::build_name_in_def;
use crate::env::Env; use crate::env::Env;
use crate::error::CozoError::{UndefinedTable, ValueRequired}; use crate::error::CozoError::{IncompatibleValue, UndefinedTable, ValueRequired};
use crate::eval::Evaluator; use crate::eval::Evaluator;
use crate::storage::{RocksStorage}; use crate::storage::{RocksStorage};
use crate::error::Result; use crate::error::Result;
@ -44,12 +44,13 @@ impl Evaluator<RocksStorage> {
Expr::Const(v) => v, Expr::Const(v) => v,
_ => return Err(ValueRequired) _ => return Err(ValueRequired)
}; };
let val = val.get_list().ok_or(IncompatibleValue)?;
println!("{:#?}", val); println!("{:#?}", val);
let coerced_values = self.coerce_table_values(&val, main_target); let coerced_values = self.coerce_table_values(&val, main_target);
Ok(()) Ok(())
} }
fn coerce_table_values(&self, values: &Value, table: Option<&Structured>) -> BTreeMap<&Structured, Vec<Value>> { fn coerce_table_values(&self, values: &[Value], default_table: Option<&Structured>) -> BTreeMap<&Structured, Vec<Value>> {
todo!() todo!()
} }
} }
@ -62,7 +63,7 @@ mod tests {
use crate::ast::{Expr, ExprVisitor, parse_expr_from_str}; use crate::ast::{Expr, ExprVisitor, parse_expr_from_str};
use crate::eval::{BareEvaluator, EvaluatorWithStorage}; use crate::eval::{BareEvaluator, EvaluatorWithStorage};
use pest::Parser as PestParser; use pest::Parser as PestParser;
use cozo_rocks::DBImpl; use cozo_rocks::*;
use crate::env::Env; use crate::env::Env;
use crate::typing::Structured; use crate::typing::Structured;
@ -80,7 +81,7 @@ mod tests {
let data = fs::read_to_string("test_data/hr.json")?; let data = fs::read_to_string("test_data/hr.json")?;
let parsed = parse_expr_from_str(&data)?; let parsed = parse_expr_from_str(&data)?;
let mut ev = BareEvaluator::default(); let ev = BareEvaluator::default();
let evaluated = ev.visit_expr(&parsed)?; let evaluated = ev.visit_expr(&parsed)?;
let bound_value = match evaluated { let bound_value = match evaluated {
Expr::Const(v) => v, Expr::Const(v) => v,

@ -1,3 +1,4 @@
use std::fs;
use crate::error::{CozoError, Result}; use crate::error::{CozoError, Result};
use cozo_rocks::*; use cozo_rocks::*;
use crate::value::{cozo_comparator_v1}; use crate::value::{cozo_comparator_v1};
@ -23,10 +24,13 @@ impl RocksStorage {
} }
#[allow(unused_variables)] #[allow(unused_variables)]
pub fn delete(&mut self) -> Result<()> { pub fn delete_storage(self) -> Result<()> {
// unimplemented!() // unimplemented!()
// drop(self.db.take()); // drop(self.db.take());
// DB::destroy(&make_options(), &self.path)?; // DB::destroy(&make_options(), &self.path)?;
let path = self.path.clone();
drop(self);
fs::remove_dir_all(path);
Ok(()) Ok(())
} }

Loading…
Cancel
Save