diff --git a/cozo-core/src/data/aggr.rs b/cozo-core/src/data/aggr.rs index 0c6b6df4..381eef46 100644 --- a/cozo-core/src/data/aggr.rs +++ b/cozo-core/src/data/aggr.rs @@ -32,12 +32,12 @@ impl Clone for Aggregation { } } -pub(crate) trait NormalAggrObj { +pub(crate) trait NormalAggrObj: Send + Sync { fn set(&mut self, value: &DataValue) -> Result<()>; fn get(&self) -> Result; } -pub(crate) trait MeetAggrObj { +pub(crate) trait MeetAggrObj: Send + Sync { fn init_val(&self) -> DataValue; fn update(&self, left: &mut DataValue, right: &DataValue) -> Result; } diff --git a/cozo-core/src/data/program.rs b/cozo-core/src/data/program.rs index 9782cbf3..c28b5599 100644 --- a/cozo-core/src/data/program.rs +++ b/cozo-core/src/data/program.rs @@ -9,7 +9,6 @@ use std::collections::btree_map::Entry; use std::collections::{BTreeMap, BTreeSet}; use std::fmt::{Debug, Display, Formatter}; -use std::rc::Rc; use std::sync::Arc; use miette::{ensure, Diagnostic, Result}; @@ -220,7 +219,7 @@ impl InputInlineRulesOrFixed { pub(crate) struct FixedRuleApply { pub(crate) fixed_handle: FixedRuleHandle, pub(crate) rule_args: Vec, - pub(crate) options: Rc, Expr>>, + pub(crate) options: Arc, Expr>>, pub(crate) head: Vec, pub(crate) arity: usize, pub(crate) span: SourceSpan, @@ -248,7 +247,7 @@ impl Debug for FixedRuleApply { pub(crate) struct MagicFixedRuleApply { pub(crate) fixed_handle: FixedRuleHandle, pub(crate) rule_args: Vec, - pub(crate) options: Rc, Expr>>, + pub(crate) options: Arc, Expr>>, pub(crate) span: SourceSpan, pub(crate) arity: usize, pub(crate) fixed_impl: Arc>, diff --git a/cozo-core/src/parse/query.rs b/cozo-core/src/parse/query.rs index b785fc71..7ba96617 100644 --- a/cozo-core/src/parse/query.rs +++ b/cozo-core/src/parse/query.rs @@ -11,7 +11,6 @@ use std::collections::btree_map::Entry; use std::collections::{BTreeMap, BTreeSet}; use std::error::Error; use std::fmt::{Display, Formatter}; -use std::rc::Rc; use std::sync::Arc; use either::{Left, Right}; @@ -225,7 +224,7 @@ pub(crate) fn parse_query( fixed: FixedRuleApply { fixed_handle: handle, rule_args: vec![], - options: Rc::new(options), + options: Arc::new(options), head, arity, span, @@ -906,7 +905,7 @@ fn parse_fixed_rule( FixedRuleApply { fixed_handle: fixed, rule_args, - options: Rc::new(options), + options: Arc::new(options), head, arity, span: args_list_span, @@ -944,7 +943,7 @@ fn make_empty_const_rule(prog: &mut InputProgram, bindings: &[Symbol]) { name: Symbol::new("Constant", Default::default()), }, rule_args: vec![], - options: Rc::new(options), + options: Arc::new(options), head: bindings.to_vec(), arity: bindings.len(), span: Default::default(), diff --git a/cozo-core/src/query/eval.rs b/cozo-core/src/query/eval.rs index 0305ad1d..bf22338a 100644 --- a/cozo-core/src/query/eval.rs +++ b/cozo-core/src/query/eval.rs @@ -8,11 +8,12 @@ use std::collections::btree_map::Entry; use std::collections::BTreeMap; -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use itertools::Itertools; use log::{debug, trace}; use miette::Result; +use rayon::prelude::*; use crate::data::aggr::Aggregation; use crate::data::program::{MagicSymbol, NoEntryError}; @@ -112,42 +113,42 @@ impl<'a> SessionTx<'a> { counter: 0.into(), }; - let mut used_limiter = false; + let used_limiter: AtomicBool = false.into(); for epoch in 0u32.. { debug!("epoch {}", epoch); let mut to_merge = BTreeMap::new(); let borrowed_stores = stores as &BTreeMap<_, _>; if epoch == 0 { - let execs = prog.iter().map(|(k, compiled_ruleset)| -> Result<_> { + let execs = prog.par_iter().map(|(k, compiled_ruleset)| -> Result<_> { let new_store = match compiled_ruleset { CompiledRuleSet::Rules(ruleset) => match compiled_ruleset.aggr_kind() { AggrKind::None => { let res = self.initial_rule_non_aggr_eval( k, - ruleset, + &ruleset, borrowed_stores, &limiter, poison.clone(), )?; - used_limiter = res.0 || used_limiter; + used_limiter.fetch_or(res.0, Ordering::Relaxed); res.1.wrap() } AggrKind::Normal => { let res = self.initial_rule_aggr_eval( k, - ruleset, + &ruleset, borrowed_stores, &limiter, poison.clone(), )?; - used_limiter = res.0 || used_limiter; + used_limiter.fetch_or(res.0, Ordering::Relaxed); res.1.wrap() } AggrKind::Meet => { let new = self.initial_rule_meet_eval( k, - ruleset, + &ruleset, borrowed_stores, poison.clone(), )?; @@ -158,7 +159,7 @@ impl<'a> SessionTx<'a> { let fixed_impl = fixed.fixed_impl.as_ref(); let mut out = RegularTempStore::default(); let payload = FixedRulePayload { - manifest: fixed, + manifest: &fixed, stores: borrowed_stores, tx: self, }; @@ -168,12 +169,12 @@ impl<'a> SessionTx<'a> { }; Ok((k, new_store)) }); - for res in execs { + for res in execs.collect::>() { let (k, new_store) = res?; to_merge.insert(k, new_store); } } else { - let execs = prog.iter().map(|(k, compiled_ruleset)| -> Result<_> { + let execs = prog.par_iter().map(|(k, compiled_ruleset)| -> Result<_> { let new_store = match compiled_ruleset { CompiledRuleSet::Rules(ruleset) => { match compiled_ruleset.aggr_kind() { @@ -186,7 +187,7 @@ impl<'a> SessionTx<'a> { &limiter, poison.clone(), )?; - used_limiter = res.0 || used_limiter; + used_limiter.fetch_or(res.0, Ordering::Relaxed); res.1.wrap() } AggrKind::Meet => { @@ -212,7 +213,7 @@ impl<'a> SessionTx<'a> { }; Ok((k, new_store)) }); - for res in execs { + for res in execs.collect::>() { let (k, new_store) = res?; to_merge.insert(k, new_store); } @@ -228,7 +229,7 @@ impl<'a> SessionTx<'a> { break; } } - Ok(used_limiter) + Ok(used_limiter.load(Ordering::Acquire)) } /// returns true is early return is activated fn initial_rule_non_aggr_eval( diff --git a/cozo-core/src/query/stored.rs b/cozo-core/src/query/stored.rs index 1f523e1d..46c5b58a 100644 --- a/cozo-core/src/query/stored.rs +++ b/cozo-core/src/query/stored.rs @@ -7,7 +7,6 @@ */ use std::collections::BTreeMap; -use std::rc::Rc; use std::sync::Arc; use itertools::Itertools; @@ -445,7 +444,7 @@ fn make_const_rule( name: Symbol::new("Constant", Default::default()), }, rule_args: vec![], - options: Rc::new(options), + options: Arc::new(options), head: bindings, arity: bindings_arity, span: Default::default(), diff --git a/cozo-core/src/runtime/db.rs b/cozo-core/src/runtime/db.rs index db331676..5cef4e48 100644 --- a/cozo-core/src/runtime/db.rs +++ b/cozo-core/src/runtime/db.rs @@ -10,7 +10,6 @@ use std::collections::btree_map::Entry; use std::collections::BTreeMap; use std::default::Default; use std::fmt::{Debug, Formatter}; -use std::rc::Rc; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::{Arc, Mutex, RwLock}; #[allow(unused_imports)] @@ -1228,7 +1227,7 @@ fn propagate_previous_results( name: Symbol::new("Constant", Default::default()), }, rule_args: vec![], - options: Rc::new(BTreeMap::from([( + options: Arc::new(BTreeMap::from([( SmartString::from("data"), Expr::Const { val: DataValue::List( diff --git a/cozo-core/src/storage/mod.rs b/cozo-core/src/storage/mod.rs index 3d1e1dd2..1108703e 100644 --- a/cozo-core/src/storage/mod.rs +++ b/cozo-core/src/storage/mod.rs @@ -55,7 +55,7 @@ pub trait Storage<'s>: Sync { /// Trait for the associated transaction type of a storage engine. /// A transaction needs to guarantee MVCC semantics for all operations. -pub trait StoreTx<'s> { +pub trait StoreTx<'s>: Sync { /// Get a key. If `for_update` is `true` (only possible in a write transaction), /// then the database needs to guarantee that `commit()` can only succeed if /// the key has not been modified outside the transaction.