|
|
|
@ -122,25 +122,86 @@ impl<'a> SessionTx<'a> {
|
|
|
|
|
let borrowed_stores = stores as &BTreeMap<_, _>;
|
|
|
|
|
if epoch == 0 {
|
|
|
|
|
#[allow(clippy::needless_borrow)]
|
|
|
|
|
let execution =
|
|
|
|
|
|(k, compiled_ruleset): (_, &CompiledRuleSet)| -> Result<_> {
|
|
|
|
|
let new_store = match compiled_ruleset {
|
|
|
|
|
CompiledRuleSet::Rules(ruleset) => match compiled_ruleset.aggr_kind() {
|
|
|
|
|
let execution = |(k, compiled_ruleset): (_, &CompiledRuleSet)| -> Result<_> {
|
|
|
|
|
let new_store = match compiled_ruleset {
|
|
|
|
|
CompiledRuleSet::Rules(ruleset) => match compiled_ruleset.aggr_kind() {
|
|
|
|
|
AggrKind::None => {
|
|
|
|
|
let res = self.initial_rule_non_aggr_eval(
|
|
|
|
|
k,
|
|
|
|
|
&ruleset,
|
|
|
|
|
borrowed_stores,
|
|
|
|
|
&limiter,
|
|
|
|
|
poison.clone(),
|
|
|
|
|
)?;
|
|
|
|
|
used_limiter.fetch_or(res.0, Ordering::Relaxed);
|
|
|
|
|
res.1.wrap()
|
|
|
|
|
}
|
|
|
|
|
AggrKind::Normal => {
|
|
|
|
|
let res = self.initial_rule_aggr_eval(
|
|
|
|
|
k,
|
|
|
|
|
&ruleset,
|
|
|
|
|
borrowed_stores,
|
|
|
|
|
&limiter,
|
|
|
|
|
poison.clone(),
|
|
|
|
|
)?;
|
|
|
|
|
used_limiter.fetch_or(res.0, Ordering::Relaxed);
|
|
|
|
|
res.1.wrap()
|
|
|
|
|
}
|
|
|
|
|
AggrKind::Meet => {
|
|
|
|
|
let new = self.initial_rule_meet_eval(
|
|
|
|
|
k,
|
|
|
|
|
&ruleset,
|
|
|
|
|
borrowed_stores,
|
|
|
|
|
poison.clone(),
|
|
|
|
|
)?;
|
|
|
|
|
new.wrap()
|
|
|
|
|
}
|
|
|
|
|
},
|
|
|
|
|
CompiledRuleSet::Fixed(fixed) => {
|
|
|
|
|
let fixed_impl = fixed.fixed_impl.as_ref();
|
|
|
|
|
let mut out = RegularTempStore::default();
|
|
|
|
|
let payload = FixedRulePayload {
|
|
|
|
|
manifest: &fixed,
|
|
|
|
|
stores: borrowed_stores,
|
|
|
|
|
tx: self,
|
|
|
|
|
};
|
|
|
|
|
fixed_impl.run(payload, &mut out, poison.clone())?;
|
|
|
|
|
out.wrap()
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
Ok((k, new_store))
|
|
|
|
|
};
|
|
|
|
|
#[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))]
|
|
|
|
|
{
|
|
|
|
|
if prog.len() == 1 {
|
|
|
|
|
let (k, new_store) = execution(prog.iter().next().unwrap())?;
|
|
|
|
|
to_merge.insert(k, new_store);
|
|
|
|
|
} else {
|
|
|
|
|
let execs = prog.par_iter().map(execution);
|
|
|
|
|
for res in execs.collect::<Vec<_>>() {
|
|
|
|
|
let (k, new_store) = res?;
|
|
|
|
|
to_merge.insert(k, new_store);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
#[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
|
|
|
|
|
{
|
|
|
|
|
for res in prog.iter().map(execution) {
|
|
|
|
|
let (k, new_store) = res?;
|
|
|
|
|
to_merge.insert(k, new_store);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
#[allow(clippy::needless_borrow)]
|
|
|
|
|
let execution = |(k, compiled_ruleset): (_, &CompiledRuleSet)| -> Result<_> {
|
|
|
|
|
let new_store = match compiled_ruleset {
|
|
|
|
|
CompiledRuleSet::Rules(ruleset) => {
|
|
|
|
|
match compiled_ruleset.aggr_kind() {
|
|
|
|
|
AggrKind::None => {
|
|
|
|
|
let res = self.initial_rule_non_aggr_eval(
|
|
|
|
|
k,
|
|
|
|
|
&ruleset,
|
|
|
|
|
borrowed_stores,
|
|
|
|
|
&limiter,
|
|
|
|
|
poison.clone(),
|
|
|
|
|
)?;
|
|
|
|
|
used_limiter.fetch_or(res.0, Ordering::Relaxed);
|
|
|
|
|
res.1.wrap()
|
|
|
|
|
}
|
|
|
|
|
AggrKind::Normal => {
|
|
|
|
|
let res = self.initial_rule_aggr_eval(
|
|
|
|
|
let res = self.incremental_rule_non_aggr_eval(
|
|
|
|
|
k,
|
|
|
|
|
&ruleset,
|
|
|
|
|
epoch,
|
|
|
|
|
borrowed_stores,
|
|
|
|
|
&limiter,
|
|
|
|
|
poison.clone(),
|
|
|
|
@ -149,7 +210,7 @@ impl<'a> SessionTx<'a> {
|
|
|
|
|
res.1.wrap()
|
|
|
|
|
}
|
|
|
|
|
AggrKind::Meet => {
|
|
|
|
|
let new = self.initial_rule_meet_eval(
|
|
|
|
|
let new = self.incremental_rule_meet_eval(
|
|
|
|
|
k,
|
|
|
|
|
&ruleset,
|
|
|
|
|
borrowed_stores,
|
|
|
|
@ -157,84 +218,31 @@ impl<'a> SessionTx<'a> {
|
|
|
|
|
)?;
|
|
|
|
|
new.wrap()
|
|
|
|
|
}
|
|
|
|
|
},
|
|
|
|
|
CompiledRuleSet::Fixed(fixed) => {
|
|
|
|
|
let fixed_impl = fixed.fixed_impl.as_ref();
|
|
|
|
|
let mut out = RegularTempStore::default();
|
|
|
|
|
let payload = FixedRulePayload {
|
|
|
|
|
manifest: &fixed,
|
|
|
|
|
stores: borrowed_stores,
|
|
|
|
|
tx: self,
|
|
|
|
|
};
|
|
|
|
|
fixed_impl.run(payload, &mut out, poison.clone())?;
|
|
|
|
|
out.wrap()
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
Ok((k, new_store))
|
|
|
|
|
};
|
|
|
|
|
#[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))]
|
|
|
|
|
{
|
|
|
|
|
let execs = prog.par_iter().map(execution);
|
|
|
|
|
for res in execs.collect::<Vec<_>>() {
|
|
|
|
|
let (k, new_store) = res?;
|
|
|
|
|
to_merge.insert(k, new_store);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
#[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
|
|
|
|
|
{
|
|
|
|
|
for res in prog.iter().map(execution) {
|
|
|
|
|
let (k, new_store) = res?;
|
|
|
|
|
to_merge.insert(k, new_store);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
#[allow(clippy::needless_borrow)]
|
|
|
|
|
let execution =
|
|
|
|
|
|(k, compiled_ruleset): (_, &CompiledRuleSet)| -> Result<_> {
|
|
|
|
|
let new_store = match compiled_ruleset {
|
|
|
|
|
CompiledRuleSet::Rules(ruleset) => {
|
|
|
|
|
match compiled_ruleset.aggr_kind() {
|
|
|
|
|
AggrKind::None => {
|
|
|
|
|
let res = self.incremental_rule_non_aggr_eval(
|
|
|
|
|
k,
|
|
|
|
|
&ruleset,
|
|
|
|
|
epoch,
|
|
|
|
|
borrowed_stores,
|
|
|
|
|
&limiter,
|
|
|
|
|
poison.clone(),
|
|
|
|
|
)?;
|
|
|
|
|
used_limiter.fetch_or(res.0, Ordering::Relaxed);
|
|
|
|
|
res.1.wrap()
|
|
|
|
|
}
|
|
|
|
|
AggrKind::Meet => {
|
|
|
|
|
let new = self.incremental_rule_meet_eval(
|
|
|
|
|
k,
|
|
|
|
|
&ruleset,
|
|
|
|
|
borrowed_stores,
|
|
|
|
|
poison.clone(),
|
|
|
|
|
)?;
|
|
|
|
|
new.wrap()
|
|
|
|
|
}
|
|
|
|
|
AggrKind::Normal => {
|
|
|
|
|
// not doing anything
|
|
|
|
|
RegularTempStore::default().wrap()
|
|
|
|
|
}
|
|
|
|
|
AggrKind::Normal => {
|
|
|
|
|
// not doing anything
|
|
|
|
|
RegularTempStore::default().wrap()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
CompiledRuleSet::Fixed(_) => {
|
|
|
|
|
// no need to do anything, algos are only calculated once
|
|
|
|
|
RegularTempStore::default().wrap()
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
Ok((k, new_store))
|
|
|
|
|
CompiledRuleSet::Fixed(_) => {
|
|
|
|
|
// no need to do anything, algos are only calculated once
|
|
|
|
|
RegularTempStore::default().wrap()
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
Ok((k, new_store))
|
|
|
|
|
};
|
|
|
|
|
#[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))]
|
|
|
|
|
{
|
|
|
|
|
let execs = prog.par_iter().map(execution);
|
|
|
|
|
for res in execs.collect::<Vec<_>>() {
|
|
|
|
|
let (k, new_store) = res?;
|
|
|
|
|
if prog.len() == 1 {
|
|
|
|
|
let (k, new_store) = execution(prog.iter().next().unwrap())?;
|
|
|
|
|
to_merge.insert(k, new_store);
|
|
|
|
|
} else {
|
|
|
|
|
let execs = prog.par_iter().map(execution);
|
|
|
|
|
for res in execs.collect::<Vec<_>>() {
|
|
|
|
|
let (k, new_store) = res?;
|
|
|
|
|
to_merge.insert(k, new_store);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
#[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
|
|
|
|
|