|
|
|
@ -8,6 +8,7 @@
|
|
|
|
|
|
|
|
|
|
use std::collections::btree_map::Entry;
|
|
|
|
|
use std::collections::BTreeMap;
|
|
|
|
|
use std::sync::atomic::{AtomicUsize, Ordering};
|
|
|
|
|
|
|
|
|
|
use itertools::Itertools;
|
|
|
|
|
use log::{debug, trace};
|
|
|
|
@ -28,14 +29,14 @@ use crate::runtime::transact::SessionTx;
|
|
|
|
|
pub(crate) struct QueryLimiter {
|
|
|
|
|
total: Option<usize>,
|
|
|
|
|
skip: Option<usize>,
|
|
|
|
|
counter: usize,
|
|
|
|
|
counter: AtomicUsize,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl QueryLimiter {
|
|
|
|
|
pub(crate) fn incr_and_should_stop(&mut self) -> bool {
|
|
|
|
|
pub(crate) fn incr_and_should_stop(&self) -> bool {
|
|
|
|
|
if let Some(limit) = self.total {
|
|
|
|
|
self.counter += 1;
|
|
|
|
|
self.counter >= limit
|
|
|
|
|
let old_count = self.counter.fetch_add(1, Ordering::Relaxed);
|
|
|
|
|
old_count + 1 >= limit
|
|
|
|
|
} else {
|
|
|
|
|
false
|
|
|
|
|
}
|
|
|
|
@ -43,7 +44,7 @@ impl QueryLimiter {
|
|
|
|
|
pub(crate) fn should_skip_next(&self) -> bool {
|
|
|
|
|
match self.skip {
|
|
|
|
|
None => false,
|
|
|
|
|
Some(i) => i > self.counter,
|
|
|
|
|
Some(i) => i > self.counter.load(Ordering::Relaxed),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -105,10 +106,10 @@ impl<'a> SessionTx<'a> {
|
|
|
|
|
num_to_skip: Option<usize>,
|
|
|
|
|
poison: Poison,
|
|
|
|
|
) -> Result<bool> {
|
|
|
|
|
let mut limiter = QueryLimiter {
|
|
|
|
|
let limiter = QueryLimiter {
|
|
|
|
|
total: total_num_to_take,
|
|
|
|
|
skip: num_to_skip,
|
|
|
|
|
counter: 0,
|
|
|
|
|
counter: 0.into(),
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let mut used_limiter = false;
|
|
|
|
@ -116,6 +117,7 @@ impl<'a> SessionTx<'a> {
|
|
|
|
|
for epoch in 0u32.. {
|
|
|
|
|
debug!("epoch {}", epoch);
|
|
|
|
|
let mut to_merge = BTreeMap::new();
|
|
|
|
|
let borrowed_stores = stores as &BTreeMap<_, _>;
|
|
|
|
|
if epoch == 0 {
|
|
|
|
|
for (k, compiled_ruleset) in prog.iter().rev() {
|
|
|
|
|
let new_store = match compiled_ruleset {
|
|
|
|
@ -124,8 +126,8 @@ impl<'a> SessionTx<'a> {
|
|
|
|
|
let res = self.initial_rule_non_aggr_eval(
|
|
|
|
|
k,
|
|
|
|
|
ruleset,
|
|
|
|
|
stores,
|
|
|
|
|
&mut limiter,
|
|
|
|
|
borrowed_stores,
|
|
|
|
|
&limiter,
|
|
|
|
|
poison.clone(),
|
|
|
|
|
)?;
|
|
|
|
|
used_limiter = res.0 || used_limiter;
|
|
|
|
@ -135,8 +137,8 @@ impl<'a> SessionTx<'a> {
|
|
|
|
|
let res = self.initial_rule_aggr_eval(
|
|
|
|
|
k,
|
|
|
|
|
ruleset,
|
|
|
|
|
stores,
|
|
|
|
|
&mut limiter,
|
|
|
|
|
borrowed_stores,
|
|
|
|
|
&limiter,
|
|
|
|
|
poison.clone(),
|
|
|
|
|
)?;
|
|
|
|
|
used_limiter = res.0 || used_limiter;
|
|
|
|
@ -146,7 +148,7 @@ impl<'a> SessionTx<'a> {
|
|
|
|
|
let new = self.initial_rule_meet_eval(
|
|
|
|
|
k,
|
|
|
|
|
ruleset,
|
|
|
|
|
stores,
|
|
|
|
|
borrowed_stores,
|
|
|
|
|
poison.clone(),
|
|
|
|
|
)?;
|
|
|
|
|
new.wrap()
|
|
|
|
@ -157,7 +159,7 @@ impl<'a> SessionTx<'a> {
|
|
|
|
|
let mut out = RegularTempStore::default();
|
|
|
|
|
let payload = FixedRulePayload {
|
|
|
|
|
manifest: fixed,
|
|
|
|
|
stores,
|
|
|
|
|
stores: borrowed_stores,
|
|
|
|
|
tx: self,
|
|
|
|
|
};
|
|
|
|
|
fixed_impl.run(payload, &mut out, poison.clone())?;
|
|
|
|
@ -176,8 +178,8 @@ impl<'a> SessionTx<'a> {
|
|
|
|
|
k,
|
|
|
|
|
ruleset,
|
|
|
|
|
epoch,
|
|
|
|
|
stores,
|
|
|
|
|
&mut limiter,
|
|
|
|
|
borrowed_stores,
|
|
|
|
|
&limiter,
|
|
|
|
|
poison.clone(),
|
|
|
|
|
)?;
|
|
|
|
|
used_limiter = res.0 || used_limiter;
|
|
|
|
@ -187,7 +189,7 @@ impl<'a> SessionTx<'a> {
|
|
|
|
|
let new = self.incremental_rule_meet_eval(
|
|
|
|
|
k,
|
|
|
|
|
ruleset,
|
|
|
|
|
stores,
|
|
|
|
|
borrowed_stores,
|
|
|
|
|
poison.clone(),
|
|
|
|
|
)?;
|
|
|
|
|
new.wrap()
|
|
|
|
@ -225,8 +227,8 @@ impl<'a> SessionTx<'a> {
|
|
|
|
|
&self,
|
|
|
|
|
rule_symb: &MagicSymbol,
|
|
|
|
|
ruleset: &[CompiledRule],
|
|
|
|
|
stores: &mut BTreeMap<MagicSymbol, EpochStore>,
|
|
|
|
|
limiter: &mut QueryLimiter,
|
|
|
|
|
stores: &BTreeMap<MagicSymbol, EpochStore>,
|
|
|
|
|
limiter: &QueryLimiter,
|
|
|
|
|
poison: Poison,
|
|
|
|
|
) -> Result<(bool, RegularTempStore)> {
|
|
|
|
|
let mut out_store = RegularTempStore::default();
|
|
|
|
@ -262,7 +264,7 @@ impl<'a> SessionTx<'a> {
|
|
|
|
|
&self,
|
|
|
|
|
rule_symb: &MagicSymbol,
|
|
|
|
|
ruleset: &[CompiledRule],
|
|
|
|
|
stores: &mut BTreeMap<MagicSymbol, EpochStore>,
|
|
|
|
|
stores: &BTreeMap<MagicSymbol, EpochStore>,
|
|
|
|
|
poison: Poison,
|
|
|
|
|
) -> Result<MeetAggrStore> {
|
|
|
|
|
let mut out_store = MeetAggrStore::new(ruleset[0].aggr.clone())?;
|
|
|
|
@ -301,8 +303,8 @@ impl<'a> SessionTx<'a> {
|
|
|
|
|
&self,
|
|
|
|
|
rule_symb: &MagicSymbol,
|
|
|
|
|
ruleset: &[CompiledRule],
|
|
|
|
|
stores: &mut BTreeMap<MagicSymbol, EpochStore>,
|
|
|
|
|
limiter: &mut QueryLimiter,
|
|
|
|
|
stores: &BTreeMap<MagicSymbol, EpochStore>,
|
|
|
|
|
limiter: &QueryLimiter,
|
|
|
|
|
poison: Poison,
|
|
|
|
|
) -> Result<(bool, RegularTempStore)> {
|
|
|
|
|
let mut out_store = RegularTempStore::default();
|
|
|
|
@ -428,8 +430,8 @@ impl<'a> SessionTx<'a> {
|
|
|
|
|
rule_symb: &MagicSymbol,
|
|
|
|
|
ruleset: &[CompiledRule],
|
|
|
|
|
epoch: u32,
|
|
|
|
|
stores: &mut BTreeMap<MagicSymbol, EpochStore>,
|
|
|
|
|
limiter: &mut QueryLimiter,
|
|
|
|
|
stores: &BTreeMap<MagicSymbol, EpochStore>,
|
|
|
|
|
limiter: &QueryLimiter,
|
|
|
|
|
poison: Poison,
|
|
|
|
|
) -> Result<(bool, RegularTempStore)> {
|
|
|
|
|
let prev_store = stores.get(rule_symb).unwrap();
|
|
|
|
@ -492,7 +494,7 @@ impl<'a> SessionTx<'a> {
|
|
|
|
|
&self,
|
|
|
|
|
rule_symb: &MagicSymbol,
|
|
|
|
|
ruleset: &[CompiledRule],
|
|
|
|
|
stores: &mut BTreeMap<MagicSymbol, EpochStore>,
|
|
|
|
|
stores: &BTreeMap<MagicSymbol, EpochStore>,
|
|
|
|
|
poison: Poison,
|
|
|
|
|
) -> Result<MeetAggrStore> {
|
|
|
|
|
let mut out_store = MeetAggrStore::new(ruleset[0].aggr.clone())?;
|
|
|
|
|