RwLock -> ShardedLock

main
Ziyang Hu 2 years ago
parent 172280a3a7
commit 381f024ac4

@ -12,7 +12,8 @@ use std::default::Default;
use std::fmt::{Debug, Formatter}; use std::fmt::{Debug, Formatter};
use std::path::Path; use std::path::Path;
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering}; use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
use std::sync::{Arc, Mutex, RwLock}; use std::sync::{Arc, Mutex};
use crossbeam::sync::ShardedLock;
#[allow(unused_imports)] #[allow(unused_imports)]
use std::thread; use std::thread;
#[allow(unused_imports)] #[allow(unused_imports)]
@ -99,7 +100,7 @@ pub struct Db<S> {
callback_count: Arc<AtomicU32>, callback_count: Arc<AtomicU32>,
callback_sender: Sender<(SmartString<LazyCompact>, CallbackOp, NamedRows, NamedRows)>, callback_sender: Sender<(SmartString<LazyCompact>, CallbackOp, NamedRows, NamedRows)>,
event_callbacks: Arc< event_callbacks: Arc<
RwLock<( ShardedLock<(
BTreeMap<u32, CallbackDeclaration>, BTreeMap<u32, CallbackDeclaration>,
BTreeMap<SmartString<LazyCompact>, BTreeSet<u32>>, BTreeMap<SmartString<LazyCompact>, BTreeSet<u32>>,
)>, )>,

@ -13,7 +13,8 @@ use std::default::Default;
use std::iter::Fuse; use std::iter::Fuse;
use std::mem; use std::mem;
use std::ops::Bound; use std::ops::Bound;
use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard}; use std::sync::{Arc};
use crossbeam::sync::{ShardedLock, ShardedLockReadGuard, ShardedLockWriteGuard};
use itertools::Itertools; use itertools::Itertools;
use miette::{bail, Result}; use miette::{bail, Result};
@ -37,7 +38,7 @@ pub fn new_cozo_mem() -> Result<crate::Db<MemStorage>> {
/// The non-persistent storage /// The non-persistent storage
#[derive(Clone, Default)] #[derive(Clone, Default)]
pub struct MemStorage { pub struct MemStorage {
store: Arc<RwLock<BTreeMap<Vec<u8>, Vec<u8>>>>, store: Arc<ShardedLock<BTreeMap<Vec<u8>, Vec<u8>>>>,
} }
impl<'s> Storage<'s> for MemStorage { impl<'s> Storage<'s> for MemStorage {
@ -98,9 +99,9 @@ impl<'s> Storage<'s> for MemStorage {
} }
pub enum MemTx<'s> { pub enum MemTx<'s> {
Reader(RwLockReadGuard<'s, BTreeMap<Vec<u8>, Vec<u8>>>), Reader(ShardedLockReadGuard<'s, BTreeMap<Vec<u8>, Vec<u8>>>),
Writer( Writer(
RwLockWriteGuard<'s, BTreeMap<Vec<u8>, Vec<u8>>>, ShardedLockWriteGuard<'s, BTreeMap<Vec<u8>, Vec<u8>>>,
BTreeMap<Vec<u8>, Option<Vec<u8>>>, BTreeMap<Vec<u8>, Option<Vec<u8>>>,
), ),
} }

@ -7,7 +7,8 @@
*/ */
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard}; use std::sync::{Arc, Mutex};
use crossbeam::sync::{ShardedLock, ShardedLockReadGuard, ShardedLockWriteGuard};
use ::sqlite::Connection; use ::sqlite::Connection;
use either::{Either, Left, Right}; use either::{Either, Left, Right};
@ -23,7 +24,7 @@ use crate::utils::swap_option_result;
/// The Sqlite storage engine /// The Sqlite storage engine
#[derive(Clone)] #[derive(Clone)]
pub struct SqliteStorage { pub struct SqliteStorage {
lock: Arc<RwLock<()>>, lock: Arc<ShardedLock<()>>,
name: PathBuf, name: PathBuf,
pool: Arc<Mutex<Vec<ConnectionWithFullMutex>>>, pool: Arc<Mutex<Vec<ConnectionWithFullMutex>>>,
} }
@ -139,7 +140,7 @@ impl<'s> Storage<'s> for SqliteStorage {
} }
pub struct SqliteTx<'a> { pub struct SqliteTx<'a> {
lock: Either<RwLockReadGuard<'a, ()>, RwLockWriteGuard<'a, ()>>, lock: Either<ShardedLockReadGuard<'a, ()>, ShardedLockWriteGuard<'a, ()>>,
storage: &'a SqliteStorage, storage: &'a SqliteStorage,
conn: Option<ConnectionWithFullMutex>, conn: Option<ConnectionWithFullMutex>,
stmts: [Mutex<Option<Statement<'a>>>; N_CACHED_QUERIES], stmts: [Mutex<Option<Statement<'a>>>; N_CACHED_QUERIES],
@ -168,7 +169,7 @@ const SKIP_RANGE_QUERY: usize = 5;
impl Drop for SqliteTx<'_> { impl Drop for SqliteTx<'_> {
fn drop(&mut self) { fn drop(&mut self) {
if let Right(RwLockWriteGuard { .. }) = self.lock { if let Right(ShardedLockWriteGuard { .. }) = self.lock {
if !self.committed { if !self.committed {
let query = r#"rollback;"#; let query = r#"rollback;"#;
let _ = self.conn.as_ref().unwrap().execute(query); let _ = self.conn.as_ref().unwrap().execute(query);
@ -252,7 +253,7 @@ impl<'s> StoreTx<'s> for SqliteTx<'s> {
} }
fn commit(&mut self) -> Result<()> { fn commit(&mut self) -> Result<()> {
if let Right(RwLockWriteGuard { .. }) = self.lock { if let Right(ShardedLockWriteGuard { .. }) = self.lock {
if !self.committed { if !self.committed {
let query = r#"commit;"#; let query = r#"commit;"#;
let mut statement = self.conn.as_ref().unwrap().prepare(query).unwrap(); let mut statement = self.conn.as_ref().unwrap().prepare(query).unwrap();

Loading…
Cancel
Save