rewrite semi-naive, suggestive of parallel execution

main
Ziyang Hu 2 years ago
parent 83683fcdd5
commit 16240209db

@ -14,12 +14,12 @@ use itertools::Itertools;
use log::{debug, trace};
use miette::Result;
use crate::fixed_rule::FixedRulePayload;
use crate::data::aggr::Aggregation;
use crate::data::program::{MagicSymbol, NoEntryError};
use crate::data::symb::{Symbol, PROG_ENTRY};
use crate::data::tuple::Tuple;
use crate::data::value::DataValue;
use crate::fixed_rule::FixedRulePayload;
use crate::parse::SourceSpan;
use crate::query::compile::{AggrKind, CompiledProgram, CompiledRule, CompiledRuleSet};
use crate::runtime::db::Poison;
@ -119,7 +119,7 @@ impl<'a> SessionTx<'a> {
let mut to_merge = BTreeMap::new();
let borrowed_stores = stores as &BTreeMap<_, _>;
if epoch == 0 {
for (k, compiled_ruleset) in prog.iter().rev() {
let execs = prog.iter().map(|(k, compiled_ruleset)| -> Result<_> {
let new_store = match compiled_ruleset {
CompiledRuleSet::Rules(ruleset) => match compiled_ruleset.aggr_kind() {
AggrKind::None => {
@ -166,10 +166,14 @@ impl<'a> SessionTx<'a> {
out.wrap()
}
};
Ok((k, new_store))
});
for res in execs {
let (k, new_store) = res?;
to_merge.insert(k, new_store);
}
} else {
for (k, compiled_ruleset) in prog.iter().rev() {
let execs = prog.iter().map(|(k, compiled_ruleset)| -> Result<_> {
let new_store = match compiled_ruleset {
CompiledRuleSet::Rules(ruleset) => {
match compiled_ruleset.aggr_kind() {
@ -206,6 +210,10 @@ impl<'a> SessionTx<'a> {
RegularTempStore::default().wrap()
}
};
Ok((k, new_store))
});
for res in execs {
let (k, new_store) = res?;
to_merge.insert(k, new_store);
}
}

Loading…
Cancel
Save