parallel execution

main
Ziyang Hu 2 years ago
parent 16240209db
commit 07c9415f37

@ -32,12 +32,12 @@ impl Clone for Aggregation {
}
}
pub(crate) trait NormalAggrObj {
pub(crate) trait NormalAggrObj: Send + Sync {
fn set(&mut self, value: &DataValue) -> Result<()>;
fn get(&self) -> Result<DataValue>;
}
pub(crate) trait MeetAggrObj {
pub(crate) trait MeetAggrObj: Send + Sync {
fn init_val(&self) -> DataValue;
fn update(&self, left: &mut DataValue, right: &DataValue) -> Result<bool>;
}

@ -9,7 +9,6 @@
use std::collections::btree_map::Entry;
use std::collections::{BTreeMap, BTreeSet};
use std::fmt::{Debug, Display, Formatter};
use std::rc::Rc;
use std::sync::Arc;
use miette::{ensure, Diagnostic, Result};
@ -220,7 +219,7 @@ impl InputInlineRulesOrFixed {
pub(crate) struct FixedRuleApply {
pub(crate) fixed_handle: FixedRuleHandle,
pub(crate) rule_args: Vec<FixedRuleArg>,
pub(crate) options: Rc<BTreeMap<SmartString<LazyCompact>, Expr>>,
pub(crate) options: Arc<BTreeMap<SmartString<LazyCompact>, Expr>>,
pub(crate) head: Vec<Symbol>,
pub(crate) arity: usize,
pub(crate) span: SourceSpan,
@ -248,7 +247,7 @@ impl Debug for FixedRuleApply {
pub(crate) struct MagicFixedRuleApply {
pub(crate) fixed_handle: FixedRuleHandle,
pub(crate) rule_args: Vec<MagicFixedRuleRuleArg>,
pub(crate) options: Rc<BTreeMap<SmartString<LazyCompact>, Expr>>,
pub(crate) options: Arc<BTreeMap<SmartString<LazyCompact>, Expr>>,
pub(crate) span: SourceSpan,
pub(crate) arity: usize,
pub(crate) fixed_impl: Arc<Box<dyn FixedRule>>,

@ -11,7 +11,6 @@ use std::collections::btree_map::Entry;
use std::collections::{BTreeMap, BTreeSet};
use std::error::Error;
use std::fmt::{Display, Formatter};
use std::rc::Rc;
use std::sync::Arc;
use either::{Left, Right};
@ -225,7 +224,7 @@ pub(crate) fn parse_query(
fixed: FixedRuleApply {
fixed_handle: handle,
rule_args: vec![],
options: Rc::new(options),
options: Arc::new(options),
head,
arity,
span,
@ -906,7 +905,7 @@ fn parse_fixed_rule(
FixedRuleApply {
fixed_handle: fixed,
rule_args,
options: Rc::new(options),
options: Arc::new(options),
head,
arity,
span: args_list_span,
@ -944,7 +943,7 @@ fn make_empty_const_rule(prog: &mut InputProgram, bindings: &[Symbol]) {
name: Symbol::new("Constant", Default::default()),
},
rule_args: vec![],
options: Rc::new(options),
options: Arc::new(options),
head: bindings.to_vec(),
arity: bindings.len(),
span: Default::default(),

@ -8,11 +8,12 @@
use std::collections::btree_map::Entry;
use std::collections::BTreeMap;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use itertools::Itertools;
use log::{debug, trace};
use miette::Result;
use rayon::prelude::*;
use crate::data::aggr::Aggregation;
use crate::data::program::{MagicSymbol, NoEntryError};
@ -112,42 +113,42 @@ impl<'a> SessionTx<'a> {
counter: 0.into(),
};
let mut used_limiter = false;
let used_limiter: AtomicBool = false.into();
for epoch in 0u32.. {
debug!("epoch {}", epoch);
let mut to_merge = BTreeMap::new();
let borrowed_stores = stores as &BTreeMap<_, _>;
if epoch == 0 {
let execs = prog.iter().map(|(k, compiled_ruleset)| -> Result<_> {
let execs = prog.par_iter().map(|(k, compiled_ruleset)| -> Result<_> {
let new_store = match compiled_ruleset {
CompiledRuleSet::Rules(ruleset) => match compiled_ruleset.aggr_kind() {
AggrKind::None => {
let res = self.initial_rule_non_aggr_eval(
k,
ruleset,
&ruleset,
borrowed_stores,
&limiter,
poison.clone(),
)?;
used_limiter = res.0 || used_limiter;
used_limiter.fetch_or(res.0, Ordering::Relaxed);
res.1.wrap()
}
AggrKind::Normal => {
let res = self.initial_rule_aggr_eval(
k,
ruleset,
&ruleset,
borrowed_stores,
&limiter,
poison.clone(),
)?;
used_limiter = res.0 || used_limiter;
used_limiter.fetch_or(res.0, Ordering::Relaxed);
res.1.wrap()
}
AggrKind::Meet => {
let new = self.initial_rule_meet_eval(
k,
ruleset,
&ruleset,
borrowed_stores,
poison.clone(),
)?;
@ -158,7 +159,7 @@ impl<'a> SessionTx<'a> {
let fixed_impl = fixed.fixed_impl.as_ref();
let mut out = RegularTempStore::default();
let payload = FixedRulePayload {
manifest: fixed,
manifest: &fixed,
stores: borrowed_stores,
tx: self,
};
@ -168,12 +169,12 @@ impl<'a> SessionTx<'a> {
};
Ok((k, new_store))
});
for res in execs {
for res in execs.collect::<Vec<_>>() {
let (k, new_store) = res?;
to_merge.insert(k, new_store);
}
} else {
let execs = prog.iter().map(|(k, compiled_ruleset)| -> Result<_> {
let execs = prog.par_iter().map(|(k, compiled_ruleset)| -> Result<_> {
let new_store = match compiled_ruleset {
CompiledRuleSet::Rules(ruleset) => {
match compiled_ruleset.aggr_kind() {
@ -186,7 +187,7 @@ impl<'a> SessionTx<'a> {
&limiter,
poison.clone(),
)?;
used_limiter = res.0 || used_limiter;
used_limiter.fetch_or(res.0, Ordering::Relaxed);
res.1.wrap()
}
AggrKind::Meet => {
@ -212,7 +213,7 @@ impl<'a> SessionTx<'a> {
};
Ok((k, new_store))
});
for res in execs {
for res in execs.collect::<Vec<_>>() {
let (k, new_store) = res?;
to_merge.insert(k, new_store);
}
@ -228,7 +229,7 @@ impl<'a> SessionTx<'a> {
break;
}
}
Ok(used_limiter)
Ok(used_limiter.load(Ordering::Acquire))
}
/// returns true is early return is activated
fn initial_rule_non_aggr_eval(

@ -7,7 +7,6 @@
*/
use std::collections::BTreeMap;
use std::rc::Rc;
use std::sync::Arc;
use itertools::Itertools;
@ -445,7 +444,7 @@ fn make_const_rule(
name: Symbol::new("Constant", Default::default()),
},
rule_args: vec![],
options: Rc::new(options),
options: Arc::new(options),
head: bindings,
arity: bindings_arity,
span: Default::default(),

@ -10,7 +10,6 @@ use std::collections::btree_map::Entry;
use std::collections::BTreeMap;
use std::default::Default;
use std::fmt::{Debug, Formatter};
use std::rc::Rc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Mutex, RwLock};
#[allow(unused_imports)]
@ -1228,7 +1227,7 @@ fn propagate_previous_results(
name: Symbol::new("Constant", Default::default()),
},
rule_args: vec![],
options: Rc::new(BTreeMap::from([(
options: Arc::new(BTreeMap::from([(
SmartString::from("data"),
Expr::Const {
val: DataValue::List(

@ -55,7 +55,7 @@ pub trait Storage<'s>: Sync {
/// Trait for the associated transaction type of a storage engine.
/// A transaction needs to guarantee MVCC semantics for all operations.
pub trait StoreTx<'s> {
pub trait StoreTx<'s>: Sync {
/// Get a key. If `for_update` is `true` (only possible in a write transaction),
/// then the database needs to guarantee that `commit()` can only succeed if
/// the key has not been modified outside the transaction.

Loading…
Cancel
Save