deterministic ordering even with parallelism

main
Ziyang Hu 2 years ago
parent 5dba39143d
commit 46f4e8e0fe

@ -43,6 +43,13 @@ impl QueryLimiter {
false
}
}
pub(crate) fn is_stopped(&self) -> bool {
if let Some(limit) = self.total {
self.counter.load(Ordering::Acquire) >= limit
} else {
false
}
}
pub(crate) fn should_skip_next(&self) -> bool {
match self.skip {
None => false,
@ -173,7 +180,24 @@ impl<'a> SessionTx<'a> {
};
#[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))]
{
let execs = prog.par_iter().map(execution);
let limiter_enabled = limiter.total.is_some();
for res in prog
.iter()
.filter(|(symb, _)| limiter_enabled && symb.is_prog_entry())
.map(execution)
{
let (k, new_store) = res?;
to_merge.insert(k, new_store);
if limiter.is_stopped() {
break;
}
}
let execs = prog
.par_iter()
.filter(|(symb, _)| !(limiter_enabled && symb.is_prog_entry()))
.map(execution);
for res in execs.collect::<Vec<_>>() {
let (k, new_store) = res?;
to_merge.insert(k, new_store);
@ -229,7 +253,24 @@ impl<'a> SessionTx<'a> {
};
#[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))]
{
let execs = prog.par_iter().map(execution);
let limiter_enabled = limiter.total.is_some();
// entry rules with limiter must execute sequentially in order to get deterministic ordering
for res in prog
.iter()
.filter(|(symb, _)| limiter_enabled && symb.is_prog_entry())
.map(execution)
{
let (k, new_store) = res?;
to_merge.insert(k, new_store);
if limiter.is_stopped() {
break;
}
}
let execs = prog
.par_iter()
.filter(|(symb, _)| !(limiter_enabled && symb.is_prog_entry()))
.map(execution);
for res in execs.collect::<Vec<_>>() {
let (k, new_store) = res?;
to_merge.insert(k, new_store);

Loading…
Cancel
Save