|
|
@ -23,7 +23,9 @@ use crate::data::tuple::Tuple;
|
|
|
|
use crate::data::value::DataValue;
|
|
|
|
use crate::data::value::DataValue;
|
|
|
|
use crate::fixed_rule::FixedRulePayload;
|
|
|
|
use crate::fixed_rule::FixedRulePayload;
|
|
|
|
use crate::parse::SourceSpan;
|
|
|
|
use crate::parse::SourceSpan;
|
|
|
|
use crate::query::compile::{AggrKind, CompiledProgram, CompiledRule, CompiledRuleSet};
|
|
|
|
use crate::query::compile::{
|
|
|
|
|
|
|
|
AggrKind, CompiledProgram, CompiledRule, CompiledRuleSet, ContainedRuleMultiplicity,
|
|
|
|
|
|
|
|
};
|
|
|
|
use crate::runtime::db::Poison;
|
|
|
|
use crate::runtime::db::Poison;
|
|
|
|
use crate::runtime::temp_store::{EpochStore, MeetAggrStore, RegularTempStore};
|
|
|
|
use crate::runtime::temp_store::{EpochStore, MeetAggrStore, RegularTempStore};
|
|
|
|
use crate::runtime::transact::SessionTx;
|
|
|
|
use crate::runtime::transact::SessionTx;
|
|
|
@ -212,6 +214,7 @@ impl<'a> SessionTx<'a> {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
} else {
|
|
|
|
|
|
|
|
// Follow up epoch > 0
|
|
|
|
#[allow(clippy::needless_borrow)]
|
|
|
|
#[allow(clippy::needless_borrow)]
|
|
|
|
let execution = |(k, compiled_ruleset): (_, &CompiledRuleSet)| -> Result<_> {
|
|
|
|
let execution = |(k, compiled_ruleset): (_, &CompiledRuleSet)| -> Result<_> {
|
|
|
|
let new_store = match compiled_ruleset {
|
|
|
|
let new_store = match compiled_ruleset {
|
|
|
@ -510,21 +513,64 @@ impl<'a> SessionTx<'a> {
|
|
|
|
limiter: &QueryLimiter,
|
|
|
|
limiter: &QueryLimiter,
|
|
|
|
poison: Poison,
|
|
|
|
poison: Poison,
|
|
|
|
) -> Result<(bool, RegularTempStore)> {
|
|
|
|
) -> Result<(bool, RegularTempStore)> {
|
|
|
|
|
|
|
|
// TODO: handle the case where self-join is involved
|
|
|
|
let prev_store = stores.get(rule_symb).unwrap();
|
|
|
|
let prev_store = stores.get(rule_symb).unwrap();
|
|
|
|
let mut out_store = RegularTempStore::default();
|
|
|
|
let mut out_store = RegularTempStore::default();
|
|
|
|
let should_check_limit = limiter.total.is_some() && rule_symb.is_prog_entry();
|
|
|
|
let should_check_limit = limiter.total.is_some() && rule_symb.is_prog_entry();
|
|
|
|
for (rule_n, rule) in ruleset.iter().enumerate() {
|
|
|
|
for (rule_n, rule) in ruleset.iter().enumerate() {
|
|
|
|
let dependencies_changed = rule
|
|
|
|
let mut need_complete_run = false;
|
|
|
|
.contained_rules
|
|
|
|
let mut dependencies_changed = false;
|
|
|
|
.iter()
|
|
|
|
|
|
|
|
.map(|symb| stores.get(symb).unwrap().has_delta())
|
|
|
|
for (symb, multiplicity) in rule.contained_rules.iter() {
|
|
|
|
.any(|v| v);
|
|
|
|
if stores.get(symb).unwrap().has_delta() {
|
|
|
|
|
|
|
|
dependencies_changed = true;
|
|
|
|
|
|
|
|
if *multiplicity == ContainedRuleMultiplicity::Many {
|
|
|
|
|
|
|
|
need_complete_run = true;
|
|
|
|
|
|
|
|
break;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if !dependencies_changed {
|
|
|
|
if !dependencies_changed {
|
|
|
|
continue;
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if need_complete_run {
|
|
|
|
|
|
|
|
debug!("complete rule for rule {:?}.{}", rule_symb, rule_n);
|
|
|
|
|
|
|
|
for item_res in rule.relation.iter(self, None, stores)? {
|
|
|
|
|
|
|
|
let item = item_res?;
|
|
|
|
|
|
|
|
// improvement: the clauses can actually be evaluated in parallel
|
|
|
|
|
|
|
|
if prev_store.exists(&item) {
|
|
|
|
|
|
|
|
trace!(
|
|
|
|
|
|
|
|
"item for {:?}.{}: {:?} at {}, rederived",
|
|
|
|
|
|
|
|
rule_symb,
|
|
|
|
|
|
|
|
rule_n,
|
|
|
|
|
|
|
|
item,
|
|
|
|
|
|
|
|
epoch
|
|
|
|
|
|
|
|
);
|
|
|
|
|
|
|
|
} else {
|
|
|
|
|
|
|
|
trace!(
|
|
|
|
|
|
|
|
"item for {:?}.{}: {:?} at {}",
|
|
|
|
|
|
|
|
rule_symb,
|
|
|
|
|
|
|
|
rule_n,
|
|
|
|
|
|
|
|
item,
|
|
|
|
|
|
|
|
epoch
|
|
|
|
|
|
|
|
);
|
|
|
|
|
|
|
|
if limiter.should_skip_next() {
|
|
|
|
|
|
|
|
out_store.put_with_skip(item);
|
|
|
|
|
|
|
|
} else {
|
|
|
|
|
|
|
|
out_store.put(item);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
if should_check_limit && limiter.incr_and_should_stop() {
|
|
|
|
|
|
|
|
trace!("early stopping due to result count limit exceeded");
|
|
|
|
|
|
|
|
return Ok((true, out_store));
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
poison.check()?;
|
|
|
|
|
|
|
|
} else {
|
|
|
|
for (delta_key, _) in stores.iter() {
|
|
|
|
for (delta_key, _) in stores.iter() {
|
|
|
|
if !rule.contained_rules.contains(delta_key) {
|
|
|
|
if !rule.contained_rules.contains_key(delta_key) {
|
|
|
|
continue;
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
debug!(
|
|
|
|
debug!(
|
|
|
@ -564,6 +610,7 @@ impl<'a> SessionTx<'a> {
|
|
|
|
poison.check()?;
|
|
|
|
poison.check()?;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
Ok((should_check_limit, out_store))
|
|
|
|
Ok((should_check_limit, out_store))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
fn incremental_rule_meet_eval(
|
|
|
|
fn incremental_rule_meet_eval(
|
|
|
@ -573,13 +620,22 @@ impl<'a> SessionTx<'a> {
|
|
|
|
stores: &BTreeMap<MagicSymbol, EpochStore>,
|
|
|
|
stores: &BTreeMap<MagicSymbol, EpochStore>,
|
|
|
|
poison: Poison,
|
|
|
|
poison: Poison,
|
|
|
|
) -> Result<MeetAggrStore> {
|
|
|
|
) -> Result<MeetAggrStore> {
|
|
|
|
|
|
|
|
// TODO handle the case where self-joins are involved
|
|
|
|
let mut out_store = MeetAggrStore::new(ruleset[0].aggr.clone())?;
|
|
|
|
let mut out_store = MeetAggrStore::new(ruleset[0].aggr.clone())?;
|
|
|
|
for (rule_n, rule) in ruleset.iter().enumerate() {
|
|
|
|
for (rule_n, rule) in ruleset.iter().enumerate() {
|
|
|
|
let dependencies_changed = rule
|
|
|
|
let mut need_complete_run = false;
|
|
|
|
.contained_rules
|
|
|
|
let mut dependencies_changed = false;
|
|
|
|
.iter()
|
|
|
|
|
|
|
|
.map(|symb| stores.get(symb).unwrap().has_delta())
|
|
|
|
for (symb, multiplicity) in rule.contained_rules.iter() {
|
|
|
|
.any(|v| v);
|
|
|
|
if stores.get(symb).unwrap().has_delta() {
|
|
|
|
|
|
|
|
dependencies_changed = true;
|
|
|
|
|
|
|
|
if *multiplicity == ContainedRuleMultiplicity::Many {
|
|
|
|
|
|
|
|
need_complete_run = true;
|
|
|
|
|
|
|
|
break;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if !dependencies_changed {
|
|
|
|
if !dependencies_changed {
|
|
|
|
continue;
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -589,8 +645,15 @@ impl<'a> SessionTx<'a> {
|
|
|
|
aggr.meet_init(args)?;
|
|
|
|
aggr.meet_init(args)?;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if need_complete_run {
|
|
|
|
|
|
|
|
debug!("complete run for rule {:?}.{}", rule_symb, rule_n);
|
|
|
|
|
|
|
|
for item_res in rule.relation.iter(self, None, stores)? {
|
|
|
|
|
|
|
|
out_store.meet_put(item_res?)?;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
poison.check()?;
|
|
|
|
|
|
|
|
} else {
|
|
|
|
for (delta_key, _) in stores.iter() {
|
|
|
|
for (delta_key, _) in stores.iter() {
|
|
|
|
if !rule.contained_rules.contains(delta_key) {
|
|
|
|
if !rule.contained_rules.contains_key(delta_key) {
|
|
|
|
continue;
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
debug!(
|
|
|
|
debug!(
|
|
|
@ -603,6 +666,7 @@ impl<'a> SessionTx<'a> {
|
|
|
|
poison.check()?;
|
|
|
|
poison.check()?;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
Ok(out_store)
|
|
|
|
Ok(out_store)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|