prepare for index locks

main
Ziyang Hu 2 years ago
parent 381f024ac4
commit 4e99083664

@ -518,15 +518,15 @@ struct EntryHeadNotExplicitlyDefinedError(#[label] SourceSpan);
pub(crate) struct NoEntryError;
impl InputProgram {
// pub(crate) fn used_rule(&self, rule_name: &Symbol) -> bool {
// self.prog.values().any(|rule| rule.used_rule(rule_name))
// }
pub(crate) fn needs_write_tx(&self) -> bool {
pub(crate) fn needs_write_lock(&self) -> Option<SmartString<LazyCompact>> {
if let Some((h, _)) = &self.out_opts.store_relation {
!h.name.name.starts_with('_')
if !h.name.name.starts_with('_') {
Some(h.name.name.clone())
} else {
None
}
} else {
false
None
}
}

@ -8,7 +8,7 @@
use either::Either;
use std::cmp::{max, min};
use std::collections::BTreeMap;
use std::collections::{BTreeMap, BTreeSet};
use std::fmt::{Display, Formatter};
use std::sync::Arc;
@ -96,30 +96,41 @@ pub(crate) type ImperativeCondition = Either<SmartString<LazyCompact>, InputProg
pub(crate) type ImperativeProgram = Vec<ImperativeStmt>;
impl ImperativeStmt {
pub(crate) fn needs_write_tx(&self) -> bool {
pub(crate) fn needs_write_locks(&self, collector: &mut BTreeSet<SmartString<LazyCompact>>) {
match self {
ImperativeStmt::ReturnProgram { prog, .. }
| ImperativeStmt::Program { prog, .. }
| ImperativeStmt::IgnoreErrorProgram { prog, .. } => prog.needs_write_tx(),
| ImperativeStmt::IgnoreErrorProgram { prog, .. } => {
if let Some(name) = prog.needs_write_lock() {
collector.insert(name);
}
},
ImperativeStmt::If {
condition,
then_branch,
else_branch,
..
} => {
(match condition {
ImperativeCondition::Left(_) => false,
ImperativeCondition::Right(prog) => prog.needs_write_tx(),
}) || then_branch.iter().any(|p| p.needs_write_tx())
|| else_branch.iter().any(|p| p.needs_write_tx())
if let ImperativeCondition::Right(prog) = condition {
if let Some(name) = prog.needs_write_lock() {
collector.insert(name);
}
}
for prog in then_branch.iter().chain(else_branch.iter()) {
prog.needs_write_locks(collector);
}
}
ImperativeStmt::Loop { body, .. } => body.iter().any(|p| p.needs_write_tx()),
ImperativeStmt::Loop { body, .. } => {
for prog in body {
prog.needs_write_locks(collector);
}
},
ImperativeStmt::TempDebug { .. }
| ImperativeStmt::ReturnTemp { .. }
| ImperativeStmt::Break { .. }
| ImperativeStmt::Continue { .. }
| ImperativeStmt::ReturnNil { .. }
| ImperativeStmt::TempSwap { .. } => false,
| ImperativeStmt::TempSwap { .. } => {},
}
}
}

@ -6,6 +6,7 @@
* You can obtain one at https://mozilla.org/MPL/2.0/.
*/
use crossbeam::sync::ShardedLock;
use std::collections::btree_map::Entry;
use std::collections::{BTreeMap, BTreeSet};
use std::default::Default;
@ -13,7 +14,6 @@ use std::fmt::{Debug, Formatter};
use std::path::Path;
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use crossbeam::sync::ShardedLock;
#[allow(unused_imports)]
use std::thread;
#[allow(unused_imports)]
@ -99,14 +99,15 @@ pub struct Db<S> {
pub(crate) algorithms: Arc<BTreeMap<String, Arc<Box<dyn FixedRule>>>>,
callback_count: Arc<AtomicU32>,
callback_sender: Sender<(SmartString<LazyCompact>, CallbackOp, NamedRows, NamedRows)>,
event_callbacks: Arc<
ShardedLock<(
BTreeMap<u32, CallbackDeclaration>,
BTreeMap<SmartString<LazyCompact>, BTreeSet<u32>>,
)>,
>,
event_callbacks: Arc<ShardedLock<EventCallbackRegistry>>,
relation_locks: Arc<ShardedLock<BTreeMap<SmartString<LazyCompact>, Arc<ShardedLock<()>>>>>,
}
type EventCallbackRegistry = (
BTreeMap<u32, CallbackDeclaration>,
BTreeMap<SmartString<LazyCompact>, BTreeSet<u32>>,
);
impl<S> Debug for Db<S> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "Db")
@ -172,6 +173,7 @@ impl<'s, S: Storage<'s>> Db<S> {
callback_sender: sender,
// callback_receiver: Arc::new(receiver),
event_callbacks: Arc::new(Default::default()),
relation_locks: Arc::new(Default::default()),
};
let callbacks = ret.event_callbacks.clone();
thread::spawn(move || {
@ -482,7 +484,10 @@ impl<'s, S: Storage<'s>> Db<S> {
Ok(())
}
Entry::Occupied(ent) => {
bail!("A fixed rule with the name {} is already registered", ent.key())
bail!(
"A fixed rule with the name {} is already registered",
ent.key()
)
}
}
}
@ -526,6 +531,33 @@ impl<'s, S: Storage<'s>> Db<S> {
ret.is_some()
}
fn obtain_relation_locks<'a, T: Iterator<Item = &'a SmartString<LazyCompact>>>(
&'s self,
rels: T,
) -> Vec<Arc<ShardedLock<()>>> {
let mut collected = vec![];
let mut pending = vec![];
{
let locks = self.relation_locks.read().unwrap();
for rel in rels {
match locks.get(rel) {
None => {
pending.push(rel);
}
Some(lock) => collected.push(lock.clone()),
}
}
}
if !pending.is_empty() {
let mut locks = self.relation_locks.write().unwrap();
for rel in pending {
let lock = locks.entry(rel.clone()).or_default().clone();
collected.push(lock);
}
}
return collected;
}
fn compact_relation(&'s self) -> Result<()> {
let l = Tuple::default().encode_as_key(RelationId(0));
let u = vec![DataValue::Bot].encode_as_key(RelationId(u64::MAX));
@ -651,7 +683,14 @@ impl<'s, S: Storage<'s>> Db<S> {
match parse_script(payload, param_pool, &self.algorithms, cur_vld)? {
CozoScript::Single(p) => {
let mut callback_collector = BTreeMap::new();
let is_write = p.needs_write_tx();
let write_lock_names = p.needs_write_lock();
let is_write = write_lock_names.is_some();
let write_lock = self.obtain_relation_locks(write_lock_names.iter());
let _write_lock_guards = if is_write {
Some(write_lock[0].read().unwrap())
} else {
None
};
let callback_targets = if is_write {
self.current_callback_targets()
} else {
@ -693,7 +732,14 @@ impl<'s, S: Storage<'s>> Db<S> {
}
CozoScript::Imperative(ps) => {
let mut callback_collector = BTreeMap::new();
let is_write = ps.iter().any(|p| p.needs_write_tx());
let mut write_lock_names = BTreeSet::new();
for p in &ps {
p.needs_write_locks(&mut write_lock_names);
}
let is_write = !write_lock_names.is_empty();
let write_lock = self.obtain_relation_locks(write_lock_names.iter());
let _write_lock_guards = write_lock.iter().map(|l| l.read()).collect_vec();
let callback_targets = if is_write {
self.current_callback_targets()
} else {

Loading…
Cancel
Save