diff --git a/cozo-core/src/runtime/db.rs b/cozo-core/src/runtime/db.rs index bbef8047..3a407b3b 100644 --- a/cozo-core/src/runtime/db.rs +++ b/cozo-core/src/runtime/db.rs @@ -12,7 +12,8 @@ use std::default::Default; use std::fmt::{Debug, Formatter}; use std::path::Path; use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering}; -use std::sync::{Arc, Mutex, RwLock}; +use std::sync::{Arc, Mutex}; +use crossbeam::sync::ShardedLock; #[allow(unused_imports)] use std::thread; #[allow(unused_imports)] @@ -99,7 +100,7 @@ pub struct Db { callback_count: Arc, callback_sender: Sender<(SmartString, CallbackOp, NamedRows, NamedRows)>, event_callbacks: Arc< - RwLock<( + ShardedLock<( BTreeMap, BTreeMap, BTreeSet>, )>, diff --git a/cozo-core/src/storage/mem.rs b/cozo-core/src/storage/mem.rs index 3834a262..e5fbc887 100644 --- a/cozo-core/src/storage/mem.rs +++ b/cozo-core/src/storage/mem.rs @@ -13,7 +13,8 @@ use std::default::Default; use std::iter::Fuse; use std::mem; use std::ops::Bound; -use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard}; +use std::sync::{Arc}; +use crossbeam::sync::{ShardedLock, ShardedLockReadGuard, ShardedLockWriteGuard}; use itertools::Itertools; use miette::{bail, Result}; @@ -37,7 +38,7 @@ pub fn new_cozo_mem() -> Result> { /// The non-persistent storage #[derive(Clone, Default)] pub struct MemStorage { - store: Arc, Vec>>>, + store: Arc, Vec>>>, } impl<'s> Storage<'s> for MemStorage { @@ -98,9 +99,9 @@ impl<'s> Storage<'s> for MemStorage { } pub enum MemTx<'s> { - Reader(RwLockReadGuard<'s, BTreeMap, Vec>>), + Reader(ShardedLockReadGuard<'s, BTreeMap, Vec>>), Writer( - RwLockWriteGuard<'s, BTreeMap, Vec>>, + ShardedLockWriteGuard<'s, BTreeMap, Vec>>, BTreeMap, Option>>, ), } diff --git a/cozo-core/src/storage/sqlite.rs b/cozo-core/src/storage/sqlite.rs index 7e7c04b7..13ac6e06 100644 --- a/cozo-core/src/storage/sqlite.rs +++ b/cozo-core/src/storage/sqlite.rs @@ -7,7 +7,8 @@ */ use std::path::{Path, PathBuf}; -use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard}; +use std::sync::{Arc, Mutex}; +use crossbeam::sync::{ShardedLock, ShardedLockReadGuard, ShardedLockWriteGuard}; use ::sqlite::Connection; use either::{Either, Left, Right}; @@ -23,7 +24,7 @@ use crate::utils::swap_option_result; /// The Sqlite storage engine #[derive(Clone)] pub struct SqliteStorage { - lock: Arc>, + lock: Arc>, name: PathBuf, pool: Arc>>, } @@ -139,7 +140,7 @@ impl<'s> Storage<'s> for SqliteStorage { } pub struct SqliteTx<'a> { - lock: Either, RwLockWriteGuard<'a, ()>>, + lock: Either, ShardedLockWriteGuard<'a, ()>>, storage: &'a SqliteStorage, conn: Option, stmts: [Mutex>>; N_CACHED_QUERIES], @@ -168,7 +169,7 @@ const SKIP_RANGE_QUERY: usize = 5; impl Drop for SqliteTx<'_> { fn drop(&mut self) { - if let Right(RwLockWriteGuard { .. }) = self.lock { + if let Right(ShardedLockWriteGuard { .. }) = self.lock { if !self.committed { let query = r#"rollback;"#; let _ = self.conn.as_ref().unwrap().execute(query); @@ -252,7 +253,7 @@ impl<'s> StoreTx<'s> for SqliteTx<'s> { } fn commit(&mut self) -> Result<()> { - if let Right(RwLockWriteGuard { .. }) = self.lock { + if let Right(ShardedLockWriteGuard { .. }) = self.lock { if !self.committed { let query = r#"commit;"#; let mut statement = self.conn.as_ref().unwrap().prepare(query).unwrap();