refactor eval

main
Ziyang Hu 2 years ago
parent 0f84dfca8d
commit d95e8281c9

@ -100,18 +100,37 @@ impl<'a> SessionTx<'a> {
if epoch == 0 {
for (k, compiled_ruleset) in prog.iter().rev() {
match compiled_ruleset {
CompiledRuleSet::Rules(ruleset) => {
let aggr_kind = compiled_ruleset.aggr_kind();
used_limiter = self.initial_rule_eval(
k,
ruleset,
aggr_kind,
stores,
&mut changed,
&mut limiter,
poison.clone(),
)? || used_limiter;
}
CompiledRuleSet::Rules(ruleset) => match compiled_ruleset.aggr_kind() {
AggrKind::None => {
used_limiter = self.initial_rule_normal_eval(
k,
ruleset,
stores,
&mut changed,
&mut limiter,
poison.clone(),
)? || used_limiter;
}
AggrKind::Normal => {
used_limiter = self.initial_rule_aggr_eval(
k,
ruleset,
stores,
&mut changed,
&mut limiter,
poison.clone(),
)? || used_limiter;
}
AggrKind::Meet => {
self.initial_rule_meet_eval(
k,
ruleset,
stores,
&mut changed,
poison.clone(),
)?;
}
},
CompiledRuleSet::Algo(algo_apply) => {
self.algo_application_eval(k, algo_apply, stores, poison.clone())?;
}
@ -130,22 +149,34 @@ impl<'a> SessionTx<'a> {
for (k, compiled_ruleset) in prog.iter().rev() {
match compiled_ruleset {
CompiledRuleSet::Rules(ruleset) => {
let is_meet_aggr = match compiled_ruleset.aggr_kind() {
AggrKind::None => false,
AggrKind::Normal => false,
AggrKind::Meet => true,
};
used_limiter = self.incremental_rule_eval(
k,
ruleset,
epoch,
is_meet_aggr,
stores,
&prev_changed,
&mut changed,
&mut limiter,
poison.clone(),
)? || used_limiter;
match compiled_ruleset.aggr_kind() {
AggrKind::None => {
used_limiter = self.incremental_rule_normal_eval(
k,
ruleset,
epoch,
stores,
&prev_changed,
&mut changed,
&mut limiter,
poison.clone(),
)? || used_limiter;
}
AggrKind::Meet => {
self.incremental_rule_meet_eval(
k,
ruleset,
epoch,
stores,
&prev_changed,
&mut changed,
poison.clone(),
)?;
}
AggrKind::Normal => {
// not doing anything
}
}
}
CompiledRuleSet::Algo(_) => {
@ -172,11 +203,10 @@ impl<'a> SessionTx<'a> {
algo_impl.run(self, algo_apply, stores, out, poison)
}
/// returns true is early return is activated
fn initial_rule_eval(
fn initial_rule_normal_eval(
&self,
rule_symb: &MagicSymbol,
ruleset: &[CompiledRule],
aggr_kind: AggrKind,
stores: &BTreeMap<MagicSymbol, InMemRelation>,
changed: &mut BTreeMap<&MagicSymbol, bool>,
limiter: &mut QueryLimiter,
@ -184,180 +214,207 @@ impl<'a> SessionTx<'a> {
) -> Result<bool> {
let store = stores.get(rule_symb).unwrap();
let use_delta = BTreeSet::default();
let should_check_limit =
limiter.total.is_some() && rule_symb.is_prog_entry() && aggr_kind != AggrKind::Meet;
match aggr_kind {
AggrKind::None | AggrKind::Meet => {
let is_meet = aggr_kind == AggrKind::Meet;
for (rule_n, rule) in ruleset.iter().enumerate() {
debug!("initial calculation for rule {:?}.{}", rule_symb, rule_n);
let mut aggr = rule.aggr.clone();
for (aggr, args) in aggr.iter_mut().flatten() {
aggr.meet_init(args)?;
}
for item_res in rule.relation.iter(self, Some(0), &use_delta)? {
let item = item_res?;
trace!("item for {:?}.{}: {:?} at {}", rule_symb, rule_n, item, 0);
if is_meet {
store.aggr_meet_put(&item, &mut aggr, 0)?;
} else if should_check_limit {
if !store.exists(&item, 0) {
store.put_with_skip(item, limiter.should_skip_next());
if limiter.incr_and_should_stop() {
trace!("early stopping due to result count limit exceeded");
return Ok(true);
}
}
} else {
store.put(item, 0);
let should_check_limit = limiter.total.is_some() && rule_symb.is_prog_entry();
for (rule_n, rule) in ruleset.iter().enumerate() {
debug!("initial calculation for rule {:?}.{}", rule_symb, rule_n);
for item_res in rule.relation.iter(self, Some(0), &use_delta)? {
let item = item_res?;
trace!("item for {:?}.{}: {:?} at {}", rule_symb, rule_n, item, 0);
if should_check_limit {
if !store.exists(&item, 0) {
store.put_with_skip(item, limiter.should_skip_next());
if limiter.incr_and_should_stop() {
trace!("early stopping due to result count limit exceeded");
return Ok(true);
}
*changed.get_mut(rule_symb).unwrap() = true;
}
poison.check()?;
}
if is_meet && store.is_empty() && ruleset[0].aggr.iter().all(|a| a.is_some()) {
let mut aggr = ruleset[0].aggr.clone();
for (aggr, args) in aggr.iter_mut().flatten() {
aggr.meet_init(args)?;
}
let value: Vec<_> = aggr
.iter()
.map(|a| -> Result<DataValue> {
let (aggr, args) = a.as_ref().unwrap();
let op = aggr.meet_op.as_ref().unwrap();
Ok(op.init_val())
})
.try_collect()?;
store.aggr_meet_put(&value, &mut aggr, 0)?;
} else {
store.put(item, 0);
}
*changed.get_mut(rule_symb).unwrap() = true;
}
AggrKind::Normal => {
let mut aggr_work: BTreeMap<Vec<DataValue>, Vec<Aggregation>> = BTreeMap::new();
poison.check()?;
}
Ok(should_check_limit)
}
fn initial_rule_meet_eval(
&self,
rule_symb: &MagicSymbol,
ruleset: &[CompiledRule],
stores: &BTreeMap<MagicSymbol, InMemRelation>,
changed: &mut BTreeMap<&MagicSymbol, bool>,
poison: Poison,
) -> Result<()> {
let store = stores.get(rule_symb).unwrap();
let use_delta = BTreeSet::default();
for (rule_n, rule) in ruleset.iter().enumerate() {
debug!(
"Calculation for normal aggr rule {:?}.{}",
rule_symb, rule_n
);
for (rule_n, rule) in ruleset.iter().enumerate() {
debug!("initial calculation for rule {:?}.{}", rule_symb, rule_n);
let mut aggr = rule.aggr.clone();
for (aggr, args) in aggr.iter_mut().flatten() {
aggr.meet_init(args)?;
}
for item_res in rule.relation.iter(self, Some(0), &use_delta)? {
let item = item_res?;
trace!("item for {:?}.{}: {:?} at {}", rule_symb, rule_n, item, 0);
store.aggr_meet_put(&item, &mut aggr, 0)?;
let keys_indices = rule
.aggr
.iter()
.enumerate()
.filter_map(|(i, a)| if a.is_none() { Some(i) } else { None })
.collect_vec();
let extract_keys = |t: &Tuple| -> Vec<DataValue> {
keys_indices.iter().map(|i| t[*i].clone()).collect_vec()
};
*changed.get_mut(rule_symb).unwrap() = true;
}
poison.check()?;
}
if store.is_empty() && ruleset[0].aggr.iter().all(|a| a.is_some()) {
let mut aggr = ruleset[0].aggr.clone();
for (aggr, args) in aggr.iter_mut().flatten() {
aggr.meet_init(args)?;
}
let value: Vec<_> = aggr
.iter()
.map(|a| -> Result<DataValue> {
let (aggr, _) = a.as_ref().unwrap();
let op = aggr.meet_op.as_ref().unwrap();
Ok(op.init_val())
})
.try_collect()?;
store.aggr_meet_put(&value, &mut aggr, 0)?;
}
Ok(())
}
fn initial_rule_aggr_eval(
&self,
rule_symb: &MagicSymbol,
ruleset: &[CompiledRule],
stores: &BTreeMap<MagicSymbol, InMemRelation>,
changed: &mut BTreeMap<&MagicSymbol, bool>,
limiter: &mut QueryLimiter,
poison: Poison,
) -> Result<bool> {
let store = stores.get(rule_symb).unwrap();
let use_delta = BTreeSet::default();
let should_check_limit = limiter.total.is_some() && rule_symb.is_prog_entry();
let mut aggr_work: BTreeMap<Vec<DataValue>, Vec<Aggregation>> = BTreeMap::new();
let val_indices_and_aggrs = rule
.aggr
.iter()
.enumerate()
.filter_map(|(i, a)| match a {
None => None,
Some(aggr) => Some((i, aggr.clone())),
})
.collect_vec();
for (rule_n, rule) in ruleset.iter().enumerate() {
debug!(
"Calculation for normal aggr rule {:?}.{}",
rule_symb, rule_n
);
for item_res in rule.relation.iter(self, Some(0), &use_delta)? {
let item = item_res?;
trace!("item for {:?}.{}: {:?} at {}", rule_symb, rule_n, item, 0);
let keys_indices = rule
.aggr
.iter()
.enumerate()
.filter_map(|(i, a)| if a.is_none() { Some(i) } else { None })
.collect_vec();
let extract_keys = |t: &Tuple| -> Vec<DataValue> {
keys_indices.iter().map(|i| t[*i].clone()).collect_vec()
};
let keys = extract_keys(&item);
let val_indices_and_aggrs = rule
.aggr
.iter()
.enumerate()
.filter_map(|(i, a)| match a {
None => None,
Some(aggr) => Some((i, aggr.clone())),
})
.collect_vec();
match aggr_work.entry(keys) {
Entry::Occupied(mut ent) => {
let aggr_ops = ent.get_mut();
for (aggr_idx, (tuple_idx, _)) in
val_indices_and_aggrs.iter().enumerate()
{
aggr_ops[aggr_idx]
.normal_op
.as_mut()
.unwrap()
.set(&item[*tuple_idx])?;
}
}
Entry::Vacant(ent) => {
let mut aggr_ops = Vec::with_capacity(val_indices_and_aggrs.len());
for (i, (aggr, params)) in &val_indices_and_aggrs {
let mut cur_aggr = aggr.clone();
cur_aggr.normal_init(params)?;
cur_aggr.normal_op.as_mut().unwrap().set(&item[*i])?;
aggr_ops.push(cur_aggr)
}
ent.insert(aggr_ops);
}
}
for item_res in rule.relation.iter(self, Some(0), &use_delta)? {
let item = item_res?;
trace!("item for {:?}.{}: {:?} at {}", rule_symb, rule_n, item, 0);
*changed.get_mut(rule_symb).unwrap() = true;
}
poison.check()?;
}
let keys = extract_keys(&item);
let mut inv_indices = Vec::with_capacity(ruleset[0].aggr.len());
let mut seen_keys = 0usize;
let mut seen_aggrs = 0usize;
for aggr in ruleset[0].aggr.iter() {
if aggr.is_some() {
inv_indices.push((true, seen_aggrs));
seen_aggrs += 1;
} else {
inv_indices.push((false, seen_keys));
seen_keys += 1;
match aggr_work.entry(keys) {
Entry::Occupied(mut ent) => {
let aggr_ops = ent.get_mut();
for (aggr_idx, (tuple_idx, _)) in val_indices_and_aggrs.iter().enumerate() {
aggr_ops[aggr_idx]
.normal_op
.as_mut()
.unwrap()
.set(&item[*tuple_idx])?;
}
}
Entry::Vacant(ent) => {
let mut aggr_ops = Vec::with_capacity(val_indices_and_aggrs.len());
for (i, (aggr, params)) in &val_indices_and_aggrs {
let mut cur_aggr = aggr.clone();
cur_aggr.normal_init(params)?;
cur_aggr.normal_op.as_mut().unwrap().set(&item[*i])?;
aggr_ops.push(cur_aggr)
}
ent.insert(aggr_ops);
}
}
if aggr_work.is_empty() && ruleset[0].aggr.iter().all(|v| v.is_some()) {
let empty_result: Vec<_> = ruleset[0]
.aggr
.iter()
.map(|a| {
let (aggr, args) = a.as_ref().unwrap();
let mut aggr = aggr.clone();
aggr.normal_init(args)?;
let op = aggr.normal_op.unwrap();
op.get()
})
.try_collect()?;
store.put(empty_result, 0);
}
*changed.get_mut(rule_symb).unwrap() = true;
}
poison.check()?;
}
for (keys, aggrs) in aggr_work {
let tuple_data: Vec<_> = inv_indices
.iter()
.map(|(is_aggr, idx)| {
if *is_aggr {
aggrs[*idx].normal_op.as_ref().unwrap().get()
} else {
Ok(keys[*idx].clone())
}
})
.try_collect()?;
let tuple = tuple_data;
if should_check_limit {
if !store.exists(&tuple, 0) {
store.put_with_skip(tuple, limiter.should_skip_next());
if limiter.incr_and_should_stop() {
return Ok(true);
}
}
// else, do nothing
let mut inv_indices = Vec::with_capacity(ruleset[0].aggr.len());
let mut seen_keys = 0usize;
let mut seen_aggrs = 0usize;
for aggr in ruleset[0].aggr.iter() {
if aggr.is_some() {
inv_indices.push((true, seen_aggrs));
seen_aggrs += 1;
} else {
inv_indices.push((false, seen_keys));
seen_keys += 1;
}
}
if aggr_work.is_empty() && ruleset[0].aggr.iter().all(|v| v.is_some()) {
let empty_result: Vec<_> = ruleset[0]
.aggr
.iter()
.map(|a| {
let (aggr, args) = a.as_ref().unwrap();
let mut aggr = aggr.clone();
aggr.normal_init(args)?;
let op = aggr.normal_op.unwrap();
op.get()
})
.try_collect()?;
store.put(empty_result, 0);
}
for (keys, aggrs) in aggr_work {
let tuple_data: Vec<_> = inv_indices
.iter()
.map(|(is_aggr, idx)| {
if *is_aggr {
aggrs[*idx].normal_op.as_ref().unwrap().get()
} else {
store.put(tuple, 0);
Ok(keys[*idx].clone())
}
})
.try_collect()?;
let tuple = tuple_data;
if should_check_limit {
if !store.exists(&tuple, 0) {
store.put_with_skip(tuple, limiter.should_skip_next());
if limiter.incr_and_should_stop() {
return Ok(true);
}
}
// else, do nothing
} else {
store.put(tuple, 0);
}
}
Ok(should_check_limit)
}
fn incremental_rule_eval(
fn incremental_rule_normal_eval(
&self,
rule_symb: &MagicSymbol,
ruleset: &[CompiledRule],
epoch: u32,
is_meet_aggr: bool,
stores: &BTreeMap<MagicSymbol, InMemRelation>,
prev_changed: &BTreeMap<&MagicSymbol, bool>,
changed: &mut BTreeMap<&MagicSymbol, bool>,
@ -365,8 +422,7 @@ impl<'a> SessionTx<'a> {
poison: Poison,
) -> Result<bool> {
let store = stores.get(rule_symb).unwrap();
let should_check_limit =
limiter.total.is_some() && rule_symb.is_prog_entry() && !is_meet_aggr;
let should_check_limit = limiter.total.is_some() && rule_symb.is_prog_entry();
for (rule_n, rule) in ruleset.iter().enumerate() {
let mut should_do_calculation = false;
for d_rule in &rule.contained_rules {
@ -398,12 +454,7 @@ impl<'a> SessionTx<'a> {
for item_res in rule.relation.iter(self, Some(epoch), &use_delta)? {
let item = item_res?;
// improvement: the clauses can actually be evaluated in parallel
if is_meet_aggr {
let aggr_changed = store.aggr_meet_put(&item, &mut aggr, epoch)?;
if aggr_changed {
*changed.get_mut(rule_symb).unwrap() = true;
}
} else if store.exists(&item, 0) {
if store.exists(&item, 0) {
trace!(
"item for {:?}.{}: {:?} at {}, rederived",
rule_symb,
@ -433,4 +484,56 @@ impl<'a> SessionTx<'a> {
}
Ok(should_check_limit)
}
fn incremental_rule_meet_eval(
&self,
rule_symb: &MagicSymbol,
ruleset: &[CompiledRule],
epoch: u32,
stores: &BTreeMap<MagicSymbol, InMemRelation>,
prev_changed: &BTreeMap<&MagicSymbol, bool>,
changed: &mut BTreeMap<&MagicSymbol, bool>,
poison: Poison,
) -> Result<()> {
let store = stores.get(rule_symb).unwrap();
for (rule_n, rule) in ruleset.iter().enumerate() {
let mut should_do_calculation = false;
for d_rule in &rule.contained_rules {
if let Some(changed) = prev_changed.get(d_rule) {
if *changed {
should_do_calculation = true;
break;
}
}
}
if !should_do_calculation {
continue;
}
let mut aggr = rule.aggr.clone();
for (aggr, args) in aggr.iter_mut().flatten() {
aggr.meet_init(args)?;
}
for (delta_key, delta_store) in stores.iter() {
if !rule.contained_rules.contains(delta_key) {
continue;
}
debug!(
"with delta {:?} for rule {:?}.{}",
delta_key, rule_symb, rule_n
);
let use_delta = BTreeSet::from([delta_store.id]);
for item_res in rule.relation.iter(self, Some(epoch), &use_delta)? {
let item = item_res?;
// improvement: the clauses can actually be evaluated in parallel
let aggr_changed = store.aggr_meet_put(&item, &mut aggr, epoch)?;
if aggr_changed {
*changed.get_mut(rule_symb).unwrap() = true;
}
}
poison.check()?;
}
}
Ok(())
}
}

@ -299,6 +299,7 @@ impl<'s, S: Storage<'s>> Db<S> {
Ok(())
}
/// Backup the running database into an Sqlite file
#[allow(unused_variables)]
pub fn backup_db(&'s self, out_file: String) -> Result<()> {
#[cfg(feature = "storage-sqlite")]
{
@ -318,6 +319,7 @@ impl<'s, S: Storage<'s>> Db<S> {
bail!("backup requires the 'storage-sqlite' feature to be enabled")
}
/// Restore from an Sqlite backup
#[allow(unused_variables)]
pub fn restore_backup(&'s self, in_file: &str) -> Result<()> {
#[cfg(feature = "storage-sqlite")]
{
@ -346,6 +348,7 @@ impl<'s, S: Storage<'s>> Db<S> {
///
/// Note that triggers are _not_ run for the relations, if any exists.
/// If you need to activate triggers, use queries with parameters.
#[allow(unused_variables)]
pub fn import_from_backup(&'s self, in_file: &str, relations: &[String]) -> Result<()> {
#[cfg(not(feature = "storage-sqlite"))]
bail!("backup requires the 'storage-sqlite' feature to be enabled");
@ -1085,7 +1088,6 @@ mod tests {
use itertools::Itertools;
use serde_json::json;
use crate::data::value::DataValue;
use crate::new_cozo_mem;
#[test]

@ -10,4 +10,4 @@ pub(crate) mod db;
pub(crate) mod transact;
pub(crate) mod in_mem;
pub(crate) mod relation;
pub(crate) mod temp_store;
// pub(crate) mod temp_store;

@ -123,6 +123,7 @@ impl RelationHandle {
ret.extend(prefix_bytes);
ret
}
#[allow(dead_code)]
pub(crate) fn amend_key_prefix(&self, data: &mut [u8]) {
let prefix_bytes = self.id.0.to_be_bytes();
data[0..8].copy_from_slice(&prefix_bytes);

@ -309,7 +309,7 @@ impl<'a> IntoIterator for TupleInIter<'a> {
}
}
struct TupleInIterIterator<'a> {
pub(crate) struct TupleInIterIterator<'a> {
inner: TupleInIter<'a>,
idx: usize,
}

Loading…
Cancel
Save