From 3eece2ca3df49c0b79c307515e26643b1f76a0f3 Mon Sep 17 00:00:00 2001 From: Ziyang Hu Date: Fri, 6 Jan 2023 23:18:10 +0800 Subject: [PATCH] skip rayon if only one rule --- cozo-core/src/query/eval.rs | 184 +++++++++++++++++++----------------- 1 file changed, 96 insertions(+), 88 deletions(-) diff --git a/cozo-core/src/query/eval.rs b/cozo-core/src/query/eval.rs index eef67717..8751374b 100644 --- a/cozo-core/src/query/eval.rs +++ b/cozo-core/src/query/eval.rs @@ -122,25 +122,86 @@ impl<'a> SessionTx<'a> { let borrowed_stores = stores as &BTreeMap<_, _>; if epoch == 0 { #[allow(clippy::needless_borrow)] - let execution = - |(k, compiled_ruleset): (_, &CompiledRuleSet)| -> Result<_> { - let new_store = match compiled_ruleset { - CompiledRuleSet::Rules(ruleset) => match compiled_ruleset.aggr_kind() { + let execution = |(k, compiled_ruleset): (_, &CompiledRuleSet)| -> Result<_> { + let new_store = match compiled_ruleset { + CompiledRuleSet::Rules(ruleset) => match compiled_ruleset.aggr_kind() { + AggrKind::None => { + let res = self.initial_rule_non_aggr_eval( + k, + &ruleset, + borrowed_stores, + &limiter, + poison.clone(), + )?; + used_limiter.fetch_or(res.0, Ordering::Relaxed); + res.1.wrap() + } + AggrKind::Normal => { + let res = self.initial_rule_aggr_eval( + k, + &ruleset, + borrowed_stores, + &limiter, + poison.clone(), + )?; + used_limiter.fetch_or(res.0, Ordering::Relaxed); + res.1.wrap() + } + AggrKind::Meet => { + let new = self.initial_rule_meet_eval( + k, + &ruleset, + borrowed_stores, + poison.clone(), + )?; + new.wrap() + } + }, + CompiledRuleSet::Fixed(fixed) => { + let fixed_impl = fixed.fixed_impl.as_ref(); + let mut out = RegularTempStore::default(); + let payload = FixedRulePayload { + manifest: &fixed, + stores: borrowed_stores, + tx: self, + }; + fixed_impl.run(payload, &mut out, poison.clone())?; + out.wrap() + } + }; + Ok((k, new_store)) + }; + #[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))] + { + if prog.len() == 1 { + let (k, new_store) = execution(prog.iter().next().unwrap())?; + to_merge.insert(k, new_store); + } else { + let execs = prog.par_iter().map(execution); + for res in execs.collect::>() { + let (k, new_store) = res?; + to_merge.insert(k, new_store); + } + } + } + #[cfg(all(target_arch = "wasm32", target_os = "unknown"))] + { + for res in prog.iter().map(execution) { + let (k, new_store) = res?; + to_merge.insert(k, new_store); + } + } + } else { + #[allow(clippy::needless_borrow)] + let execution = |(k, compiled_ruleset): (_, &CompiledRuleSet)| -> Result<_> { + let new_store = match compiled_ruleset { + CompiledRuleSet::Rules(ruleset) => { + match compiled_ruleset.aggr_kind() { AggrKind::None => { - let res = self.initial_rule_non_aggr_eval( - k, - &ruleset, - borrowed_stores, - &limiter, - poison.clone(), - )?; - used_limiter.fetch_or(res.0, Ordering::Relaxed); - res.1.wrap() - } - AggrKind::Normal => { - let res = self.initial_rule_aggr_eval( + let res = self.incremental_rule_non_aggr_eval( k, &ruleset, + epoch, borrowed_stores, &limiter, poison.clone(), @@ -149,7 +210,7 @@ impl<'a> SessionTx<'a> { res.1.wrap() } AggrKind::Meet => { - let new = self.initial_rule_meet_eval( + let new = self.incremental_rule_meet_eval( k, &ruleset, borrowed_stores, @@ -157,84 +218,31 @@ impl<'a> SessionTx<'a> { )?; new.wrap() } - }, - CompiledRuleSet::Fixed(fixed) => { - let fixed_impl = fixed.fixed_impl.as_ref(); - let mut out = RegularTempStore::default(); - let payload = FixedRulePayload { - manifest: &fixed, - stores: borrowed_stores, - tx: self, - }; - fixed_impl.run(payload, &mut out, poison.clone())?; - out.wrap() - } - }; - Ok((k, new_store)) - }; - #[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))] - { - let execs = prog.par_iter().map(execution); - for res in execs.collect::>() { - let (k, new_store) = res?; - to_merge.insert(k, new_store); - } - } - #[cfg(all(target_arch = "wasm32", target_os = "unknown"))] - { - for res in prog.iter().map(execution) { - let (k, new_store) = res?; - to_merge.insert(k, new_store); - } - } - } else { - #[allow(clippy::needless_borrow)] - let execution = - |(k, compiled_ruleset): (_, &CompiledRuleSet)| -> Result<_> { - let new_store = match compiled_ruleset { - CompiledRuleSet::Rules(ruleset) => { - match compiled_ruleset.aggr_kind() { - AggrKind::None => { - let res = self.incremental_rule_non_aggr_eval( - k, - &ruleset, - epoch, - borrowed_stores, - &limiter, - poison.clone(), - )?; - used_limiter.fetch_or(res.0, Ordering::Relaxed); - res.1.wrap() - } - AggrKind::Meet => { - let new = self.incremental_rule_meet_eval( - k, - &ruleset, - borrowed_stores, - poison.clone(), - )?; - new.wrap() - } - AggrKind::Normal => { - // not doing anything - RegularTempStore::default().wrap() - } + AggrKind::Normal => { + // not doing anything + RegularTempStore::default().wrap() } } + } - CompiledRuleSet::Fixed(_) => { - // no need to do anything, algos are only calculated once - RegularTempStore::default().wrap() - } - }; - Ok((k, new_store)) + CompiledRuleSet::Fixed(_) => { + // no need to do anything, algos are only calculated once + RegularTempStore::default().wrap() + } }; + Ok((k, new_store)) + }; #[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))] { - let execs = prog.par_iter().map(execution); - for res in execs.collect::>() { - let (k, new_store) = res?; + if prog.len() == 1 { + let (k, new_store) = execution(prog.iter().next().unwrap())?; to_merge.insert(k, new_store); + } else { + let execs = prog.par_iter().map(execution); + for res in execs.collect::>() { + let (k, new_store) = res?; + to_merge.insert(k, new_store); + } } } #[cfg(all(target_arch = "wasm32", target_os = "unknown"))]