db does not need to be Clone

main
Ziyang Hu 2 years ago
parent 6938b98be4
commit 5326a16595

@ -664,8 +664,8 @@ pub(crate) struct FixedRuleHandle {
} }
lazy_static! { lazy_static! {
pub(crate) static ref DEFAULT_FIXED_RULES: Arc<BTreeMap<String, Arc<Box<dyn FixedRule>>>> = { pub(crate) static ref DEFAULT_FIXED_RULES: BTreeMap<String, Arc<Box<dyn FixedRule>>> = {
Arc::new(BTreeMap::from([ BTreeMap::from([
#[cfg(feature = "graph-algo")] #[cfg(feature = "graph-algo")]
( (
"ClusteringCoefficients".to_string(), "ClusteringCoefficients".to_string(),
@ -792,7 +792,7 @@ lazy_static! {
"Constant".to_string(), "Constant".to_string(),
Arc::<Box<dyn FixedRule>>::new(Box::new(Constant)), Arc::<Box<dyn FixedRule>>::new(Box::new(Constant)),
), ),
])) ])
}; };
} }

@ -80,7 +80,6 @@ pub(crate) mod runtime;
pub(crate) mod storage; pub(crate) mod storage;
pub(crate) mod utils; pub(crate) mod utils;
#[derive(Clone)]
/// A dispatcher for concrete storage implementations, wrapping [Db]. This is done so that /// A dispatcher for concrete storage implementations, wrapping [Db]. This is done so that
/// client code does not have to deal with generic code constantly. You may prefer to use /// client code does not have to deal with generic code constantly. You may prefer to use
/// [Db] directly, especially if you provide a custom storage engine. /// [Db] directly, especially if you provide a custom storage engine.
@ -412,7 +411,7 @@ impl DbInstance {
} }
/// Dispatcher method. See [crate::Db::register_fixed_rule]. /// Dispatcher method. See [crate::Db::register_fixed_rule].
pub fn register_fixed_rule( pub fn register_fixed_rule(
&mut self, &self,
name: String, name: String,
rule_impl: Box<dyn FixedRule>, rule_impl: Box<dyn FixedRule>,
) -> Result<()> { ) -> Result<()> {

@ -79,7 +79,7 @@ impl<'a> SessionTx<'a> {
} }
for trigger in &old_handle.replace_triggers { for trigger in &old_handle.replace_triggers {
let program = let program =
parse_script(trigger, &Default::default(), &db.algorithms, cur_vld)? parse_script(trigger, &Default::default(), &db.algorithms.read().unwrap(), cur_vld)?
.get_single_program()?; .get_single_program()?;
let (_, cleanups) = db let (_, cleanups) = db
@ -203,7 +203,7 @@ impl<'a> SessionTx<'a> {
let mut program = parse_script( let mut program = parse_script(
trigger, trigger,
&Default::default(), &Default::default(),
&db.algorithms, &db.algorithms.read().unwrap(),
cur_vld, cur_vld,
)? )?
.get_single_program()?; .get_single_program()?;
@ -482,7 +482,7 @@ impl<'a> SessionTx<'a> {
let mut program = parse_script( let mut program = parse_script(
trigger, trigger,
&Default::default(), &Default::default(),
&db.algorithms, &db.algorithms.read().unwrap(),
cur_vld, cur_vld,
)? )?
.get_single_program()?; .get_single_program()?;

@ -82,22 +82,21 @@ pub struct DbManifest {
} }
/// The database object of Cozo. /// The database object of Cozo.
#[derive(Clone)]
pub struct Db<S> { pub struct Db<S> {
pub(crate) db: S, pub(crate) db: S,
temp_db: TempStorage, temp_db: TempStorage,
relation_store_id: Arc<AtomicU64>, relation_store_id: Arc<AtomicU64>,
queries_count: Arc<AtomicU64>, queries_count: AtomicU64,
running_queries: Arc<Mutex<BTreeMap<u64, RunningQueryHandle>>>, running_queries: Arc<Mutex<BTreeMap<u64, RunningQueryHandle>>>,
pub(crate) algorithms: Arc<BTreeMap<String, Arc<Box<dyn FixedRule>>>>, pub(crate) algorithms: ShardedLock<BTreeMap<String, Arc<Box<dyn FixedRule>>>>,
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
callback_count: Arc<AtomicU32>, callback_count: AtomicU32,
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
pub(crate) callback_sender: pub(crate) callback_sender:
Sender<(SmartString<LazyCompact>, CallbackOp, NamedRows, NamedRows)>, Sender<(SmartString<LazyCompact>, CallbackOp, NamedRows, NamedRows)>,
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
pub(crate) event_callbacks: Arc<ShardedLock<EventCallbackRegistry>>, pub(crate) event_callbacks: Arc<ShardedLock<EventCallbackRegistry>>,
relation_locks: Arc<ShardedLock<BTreeMap<SmartString<LazyCompact>, Arc<ShardedLock<()>>>>>, relation_locks: ShardedLock<BTreeMap<SmartString<LazyCompact>, Arc<ShardedLock<()>>>>,
} }
impl<S> Debug for Db<S> { impl<S> Debug for Db<S> {
@ -189,18 +188,18 @@ impl<'s, S: Storage<'s>> Db<S> {
let ret = Self { let ret = Self {
db: storage, db: storage,
temp_db: Default::default(), temp_db: Default::default(),
relation_store_id: Arc::new(Default::default()), relation_store_id: Default::default(),
queries_count: Arc::new(Default::default()), queries_count: Default::default(),
running_queries: Arc::new(Mutex::new(Default::default())), running_queries: Default::default(),
algorithms: DEFAULT_FIXED_RULES.clone(), algorithms: ShardedLock::new(DEFAULT_FIXED_RULES.clone()),
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
callback_count: Arc::new(Default::default()), callback_count: Default::default(),
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
callback_sender: sender, callback_sender: sender,
// callback_receiver: Arc::new(receiver), // callback_receiver: Arc::new(receiver),
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
event_callbacks: Arc::new(Default::default()), event_callbacks: Default::default(),
relation_locks: Arc::new(Default::default()), relation_locks: Default::default(),
}; };
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
{ {
@ -549,16 +548,12 @@ impl<'s, S: Storage<'s>> Db<S> {
} }
} }
/// Register a custom fixed rule implementation. /// Register a custom fixed rule implementation.
///
/// You must register fixed rules BEFORE you clone the database,
/// otherwise already cloned instances will not get the new fixed rule.
pub fn register_fixed_rule( pub fn register_fixed_rule(
&mut self, &self,
name: String, name: String,
rule_impl: Box<dyn FixedRule>, rule_impl: Box<dyn FixedRule>,
) -> Result<()> { ) -> Result<()> {
let new = Arc::make_mut(&mut self.algorithms); match self.algorithms.write().unwrap().entry(name) {
match new.entry(name) {
Entry::Vacant(ent) => { Entry::Vacant(ent) => {
ent.insert(Arc::new(rule_impl)); ent.insert(Arc::new(rule_impl));
Ok(()) Ok(())
@ -713,7 +708,7 @@ impl<'s, S: Storage<'s>> Db<S> {
param_pool: &BTreeMap<String, DataValue>, param_pool: &BTreeMap<String, DataValue>,
cur_vld: ValidityTs, cur_vld: ValidityTs,
) -> Result<NamedRows> { ) -> Result<NamedRows> {
match parse_script(payload, param_pool, &self.algorithms, cur_vld)? { match parse_script(payload, param_pool, &self.algorithms.read().unwrap(), cur_vld)? {
CozoScript::Single(p) => self.execute_single(cur_vld, p), CozoScript::Single(p) => self.execute_single(cur_vld, p),
CozoScript::Imperative(ps) => self.execute_imperative(cur_vld, &ps), CozoScript::Imperative(ps) => self.execute_imperative(cur_vld, &ps),
CozoScript::Sys(op) => self.run_sys_op(op), CozoScript::Sys(op) => self.run_sys_op(op),

@ -36,7 +36,7 @@ pub fn new_cozo_mem() -> Result<crate::Db<MemStorage>> {
} }
/// The non-persistent storage /// The non-persistent storage
#[derive(Clone, Default)] #[derive(Default)]
pub struct MemStorage { pub struct MemStorage {
store: Arc<ShardedLock<BTreeMap<Vec<u8>, Vec<u8>>>>, store: Arc<ShardedLock<BTreeMap<Vec<u8>, Vec<u8>>>>,
} }

@ -111,7 +111,6 @@ pub fn new_cozo_rocksdb(path: impl AsRef<Path>) -> Result<Db<RocksDbStorage>> {
} }
/// RocksDB storage engine /// RocksDB storage engine
#[derive(Clone)]
pub struct RocksDbStorage { pub struct RocksDbStorage {
db: RocksDb, db: RocksDb,
} }

@ -33,7 +33,6 @@ pub fn new_cozo_sled(path: impl AsRef<Path>) -> Result<crate::Db<SledStorage>> {
} }
/// Storage engine using Sled /// Storage engine using Sled
#[derive(Clone)]
pub struct SledStorage { pub struct SledStorage {
db: Db, db: Db,
} }

@ -22,11 +22,10 @@ use crate::storage::{Storage, StoreTx};
use crate::utils::swap_option_result; use crate::utils::swap_option_result;
/// The Sqlite storage engine /// The Sqlite storage engine
#[derive(Clone)]
pub struct SqliteStorage { pub struct SqliteStorage {
lock: Arc<ShardedLock<()>>, lock: Arc<ShardedLock<()>>,
name: PathBuf, name: PathBuf,
pool: Arc<Mutex<Vec<ConnectionWithFullMutex>>>, pool: Mutex<Vec<ConnectionWithFullMutex>>,
} }
/// Create a sqlite backed database. /// Create a sqlite backed database.
@ -50,9 +49,9 @@ pub fn new_cozo_sqlite(path: impl AsRef<Path>) -> Result<crate::Db<SqliteStorage
while statement.next().into_diagnostic()? != State::Done {} while statement.next().into_diagnostic()? != State::Done {}
let ret = crate::Db::new(SqliteStorage { let ret = crate::Db::new(SqliteStorage {
lock: Arc::new(Default::default()), lock: Default::default(),
name: PathBuf::from(path.as_ref()), name: PathBuf::from(path.as_ref()),
pool: Arc::new(Mutex::new(vec![])), pool: Mutex::new(vec![]),
})?; })?;
ret.initialize()?; ret.initialize()?;

@ -17,7 +17,7 @@ use crate::runtime::relation::decode_tuple_from_kv;
use crate::storage::mem::SkipIterator; use crate::storage::mem::SkipIterator;
use crate::storage::{Storage, StoreTx}; use crate::storage::{Storage, StoreTx};
#[derive(Clone, Default)] #[derive(Default)]
pub(crate) struct TempStorage; pub(crate) struct TempStorage;
impl<'s> Storage<'s> for TempStorage { impl<'s> Storage<'s> for TempStorage {

@ -52,7 +52,6 @@ lazy_static! {
} }
/// Storage engine based on TiKV /// Storage engine based on TiKV
#[derive(Clone)]
pub struct TiKvStorage { pub struct TiKvStorage {
client: Arc<TransactionClient>, client: Arc<TransactionClient>,
raw_client: Arc<RawClient>, raw_client: Arc<RawClient>,

@ -53,7 +53,7 @@ ColumnFamilyOptions default_cf_options() {
return options; return options;
} }
shared_ptr <RocksDbBridge> open_db(const DbOpts &opts, RocksDbStatus &status) { unique_ptr <RocksDbBridge> open_db(const DbOpts &opts, RocksDbStatus &status) {
auto options = default_db_options(); auto options = default_db_options();
shared_ptr<Cache> cache = nullptr; shared_ptr<Cache> cache = nullptr;
@ -120,7 +120,7 @@ shared_ptr <RocksDbBridge> open_db(const DbOpts &opts, RocksDbStatus &status) {
} }
options.create_missing_column_families = true; options.create_missing_column_families = true;
shared_ptr <RocksDbBridge> db = make_shared<RocksDbBridge>(); unique_ptr <RocksDbBridge> db = make_unique<RocksDbBridge>();
db->db_path = convert_vec_to_string(opts.db_path); db->db_path = convert_vec_to_string(opts.db_path);

@ -117,7 +117,7 @@ struct RocksDbBridge {
~RocksDbBridge(); ~RocksDbBridge();
}; };
shared_ptr<RocksDbBridge> unique_ptr<RocksDbBridge>
open_db(const DbOpts &opts, RocksDbStatus &status); open_db(const DbOpts &opts, RocksDbStatus &status);
#endif //COZOROCKS_DB_H #endif //COZOROCKS_DB_H

@ -133,9 +133,8 @@ impl DbBuilder {
} }
} }
#[derive(Clone)]
pub struct RocksDb { pub struct RocksDb {
inner: SharedPtr<RocksDbBridge>, inner: UniquePtr<RocksDbBridge>,
} }
impl RocksDb { impl RocksDb {

@ -125,7 +125,7 @@ pub(crate) mod ffi {
fn open_db( fn open_db(
builder: &DbOpts, builder: &DbOpts,
status: &mut RocksDbStatus, status: &mut RocksDbStatus,
) -> SharedPtr<RocksDbBridge>; ) -> UniquePtr<RocksDbBridge>;
fn transact(self: &RocksDbBridge) -> UniquePtr<TxBridge>; fn transact(self: &RocksDbBridge) -> UniquePtr<TxBridge>;
fn del_range( fn del_range(
self: &RocksDbBridge, self: &RocksDbBridge,

Loading…
Cancel
Save