new tempstore now passes all tests

main
Ziyang Hu 2 years ago
parent ae730c006f
commit 104d14a1a4

@ -96,7 +96,7 @@ fn astar<'a>(
let goal_node = &goal[0];
let eval_heuristic = |node: &Tuple| -> Result<f64> {
let mut v = node.clone();
v.extend_from_slice(&goal);
v.extend_from_slice(goal);
let t = v;
let cost_val = heuristic.eval(&t)?;
let cost = cost_val.get_float().ok_or_else(|| {

@ -78,7 +78,7 @@ impl AlgoImpl for RandomWalk {
.iter()
.map(|t| -> Result<f64> {
let mut cand = current_tuple.clone();
cand.extend_from_slice(&t);
cand.extend_from_slice(t);
Ok(match weight_expr.eval(&cand)? {
DataValue::Num(n) => {
let f = n.get_float();

@ -1035,7 +1035,7 @@ impl MeetAggrObj for MeetAggrBitAnd {
if left == right {
return Ok(false);
}
if left.len() == 0 {
if left.is_empty() {
*left = right.clone();
return Ok(true);
}
@ -1104,7 +1104,7 @@ impl MeetAggrObj for MeetAggrBitOr {
if left == right {
return Ok(false);
}
if left.len() == 0 {
if left.is_empty() {
*left = right.clone();
return Ok(true);
}

@ -72,7 +72,7 @@ pub(crate) fn op_coalesce(args: &[DataValue]) -> Result<DataValue> {
return Ok(val.clone());
}
}
return Ok(DataValue::Null);
Ok(DataValue::Null)
}
define_op!(OP_EQ, 2, false);
@ -1280,8 +1280,8 @@ define_op!(OP_TO_UNITY, 1, false);
pub(crate) fn op_to_unity(args: &[DataValue]) -> Result<DataValue> {
Ok(DataValue::from(match &args[0] {
DataValue::Null => 0,
DataValue::Bool(b) => if *b {1} else {0},
DataValue::Num(n) => if n.get_float() != 0. {1} else {0},
DataValue::Bool(b) => *b as i64,
DataValue::Num(n) => (n.get_float() != 0.) as i64,
DataValue::Str(s) => if s.is_empty() {0} else { 1},
DataValue::Bytes(b) => if b.is_empty() {0} else { 1},
DataValue::Uuid(u) => if u.0.is_nil() {0} else { 1 },

@ -36,6 +36,7 @@
#![allow(clippy::too_many_arguments)]
use std::collections::BTreeMap;
#[allow(unused_imports)]
use std::time::Instant;
use lazy_static::lazy_static;

@ -8,7 +8,6 @@
use std::collections::btree_map::Entry;
use std::collections::BTreeMap;
use std::mem;
use itertools::Itertools;
use log::{debug, trace};
@ -62,13 +61,10 @@ impl<'a> SessionTx<'a> {
for (stratum, cur_prog) in strata.iter().enumerate() {
if stratum > 0 {
// remove stores that have outlived their usefulness!
stores = stores
.into_iter()
.filter(|(name, _)| match store_lifetimes.get(name) {
None => false,
Some(n) => *n >= stratum,
})
.collect();
stores.retain(|name, _| match store_lifetimes.get(name) {
None => false,
Some(n) => *n >= stratum,
});
trace!("{:?}", stores);
}
for (rule_name, rule_set) in cur_prog {
@ -108,8 +104,6 @@ impl<'a> SessionTx<'a> {
num_to_skip: Option<usize>,
poison: Poison,
) -> Result<bool> {
let mut changed: BTreeMap<_, _> = prog.keys().map(|k| (k, false)).collect();
let mut prev_changed = changed.clone();
let mut limiter = QueryLimiter {
total: total_num_to_take,
skip: num_to_skip,
@ -130,7 +124,6 @@ impl<'a> SessionTx<'a> {
k,
ruleset,
stores,
&mut changed,
&mut limiter,
poison.clone(),
)?;
@ -142,7 +135,6 @@ impl<'a> SessionTx<'a> {
k,
ruleset,
stores,
&mut changed,
&mut limiter,
poison.clone(),
)?;
@ -154,7 +146,6 @@ impl<'a> SessionTx<'a> {
k,
ruleset,
stores,
&mut changed,
poison.clone(),
)?;
new.wrap()
@ -170,11 +161,6 @@ impl<'a> SessionTx<'a> {
to_merge.insert(k, new_store);
}
} else {
mem::swap(&mut changed, &mut prev_changed);
for (_k, v) in changed.iter_mut() {
*v = false;
}
for (k, compiled_ruleset) in prog.iter().rev() {
let new_store = match compiled_ruleset {
CompiledRuleSet::Rules(ruleset) => {
@ -185,8 +171,6 @@ impl<'a> SessionTx<'a> {
ruleset,
epoch,
stores,
&prev_changed,
&mut changed,
&mut limiter,
poison.clone(),
)?;
@ -198,8 +182,6 @@ impl<'a> SessionTx<'a> {
k,
ruleset,
stores,
&prev_changed,
&mut changed,
poison.clone(),
)?;
new.wrap()
@ -219,11 +201,14 @@ impl<'a> SessionTx<'a> {
to_merge.insert(k, new_store);
}
}
let mut changed = false;
for (k, new_store) in to_merge {
let old_store = stores.get_mut(k).unwrap();
old_store.merge(new_store)?;
old_store.merge_in(new_store)?;
trace!("delta for {}: {}", k, old_store.has_delta());
changed |= old_store.has_delta();
}
if changed.values().all(|rule_changed| !*rule_changed) {
if !changed {
break;
}
}
@ -235,7 +220,6 @@ impl<'a> SessionTx<'a> {
rule_symb: &MagicSymbol,
ruleset: &[CompiledRule],
stores: &mut BTreeMap<MagicSymbol, EpochStore>,
changed: &mut BTreeMap<&MagicSymbol, bool>,
limiter: &mut QueryLimiter,
poison: Poison,
) -> Result<(bool, NormalTempStore)> {
@ -262,7 +246,6 @@ impl<'a> SessionTx<'a> {
} else {
out_store.put(item);
}
*changed.get_mut(rule_symb).unwrap() = true;
}
poison.check()?;
}
@ -274,7 +257,6 @@ impl<'a> SessionTx<'a> {
rule_symb: &MagicSymbol,
ruleset: &[CompiledRule],
stores: &mut BTreeMap<MagicSymbol, EpochStore>,
changed: &mut BTreeMap<&MagicSymbol, bool>,
poison: Poison,
) -> Result<MeetAggrStore> {
let mut out_store = MeetAggrStore::new(ruleset[0].aggr.clone())?;
@ -289,8 +271,6 @@ impl<'a> SessionTx<'a> {
let item = item_res?;
trace!("item for {:?}.{}: {:?} at {}", rule_symb, rule_n, item, 0);
out_store.meet_put(item)?;
*changed.get_mut(rule_symb).unwrap() = true;
}
poison.check()?;
}
@ -316,7 +296,6 @@ impl<'a> SessionTx<'a> {
rule_symb: &MagicSymbol,
ruleset: &[CompiledRule],
stores: &mut BTreeMap<MagicSymbol, EpochStore>,
changed: &mut BTreeMap<&MagicSymbol, bool>,
limiter: &mut QueryLimiter,
poison: Poison,
) -> Result<(bool, NormalTempStore)> {
@ -345,10 +324,7 @@ impl<'a> SessionTx<'a> {
.aggr
.iter()
.enumerate()
.filter_map(|(i, a)| match a {
None => None,
Some(aggr) => Some((i, aggr.clone())),
})
.filter_map(|(i, a)| a.as_ref().map(|aggr| (i, aggr.clone())))
.collect_vec();
for item_res in rule.relation.iter(self, None, stores)? {
@ -379,8 +355,6 @@ impl<'a> SessionTx<'a> {
ent.insert(aggr_ops);
}
}
*changed.get_mut(rule_symb).unwrap() = true;
}
poison.check()?;
}
@ -449,8 +423,6 @@ impl<'a> SessionTx<'a> {
ruleset: &[CompiledRule],
epoch: u32,
stores: &mut BTreeMap<MagicSymbol, EpochStore>,
prev_changed: &BTreeMap<&MagicSymbol, bool>,
changed: &mut BTreeMap<&MagicSymbol, bool>,
limiter: &mut QueryLimiter,
poison: Poison,
) -> Result<(bool, NormalTempStore)> {
@ -458,16 +430,12 @@ impl<'a> SessionTx<'a> {
let mut out_store = NormalTempStore::default();
let should_check_limit = limiter.total.is_some() && rule_symb.is_prog_entry();
for (rule_n, rule) in ruleset.iter().enumerate() {
let mut should_do_calculation = false;
for d_rule in &rule.contained_rules {
if let Some(changed) = prev_changed.get(d_rule) {
if *changed {
should_do_calculation = true;
break;
}
}
}
if !should_do_calculation {
let dependencies_changed = rule
.contained_rules
.iter()
.map(|symb| stores.get(symb).unwrap().has_delta())
.any(|v| v);
if !dependencies_changed {
continue;
}
@ -498,7 +466,6 @@ impl<'a> SessionTx<'a> {
item,
epoch
);
*changed.get_mut(rule_symb).unwrap() = true;
if limiter.should_skip_next() {
out_store.put_with_skip(item);
} else {
@ -520,23 +487,16 @@ impl<'a> SessionTx<'a> {
rule_symb: &MagicSymbol,
ruleset: &[CompiledRule],
stores: &mut BTreeMap<MagicSymbol, EpochStore>,
prev_changed: &BTreeMap<&MagicSymbol, bool>,
changed: &mut BTreeMap<&MagicSymbol, bool>,
poison: Poison,
) -> Result<MeetAggrStore> {
// let store = stores.get(rule_symb).unwrap();
let mut out_store = MeetAggrStore::new(ruleset[0].aggr.clone())?;
for (rule_n, rule) in ruleset.iter().enumerate() {
let mut should_do_calculation = false;
for d_rule in &rule.contained_rules {
if let Some(changed) = prev_changed.get(d_rule) {
if *changed {
should_do_calculation = true;
break;
}
}
}
if !should_do_calculation {
let dependencies_changed = rule
.contained_rules
.iter()
.map(|symb| stores.get(symb).unwrap().has_delta())
.any(|v| v);
if !dependencies_changed {
continue;
}
@ -554,12 +514,7 @@ impl<'a> SessionTx<'a> {
delta_key, rule_symb, rule_n
);
for item_res in rule.relation.iter(self, Some(delta_key), stores)? {
let item = item_res?;
// improvement: the clauses can actually be evaluated in parallel
let aggr_changed = out_store.meet_put(item)?;
if aggr_changed {
*changed.get_mut(rule_symb).unwrap() = true;
}
out_store.meet_put(item_res?)?;
}
poison.check()?;
}

@ -775,7 +775,7 @@ impl StoredRA {
let val_len = self.storage.metadata.non_keys.len();
let all_right_val_indices: BTreeSet<usize> =
(0..val_len).map(|i| left_tuple_len + key_len + i).collect();
return if self.filters.is_empty() && eliminate_indices.is_superset(&all_right_val_indices) {
if self.filters.is_empty() && eliminate_indices.is_superset(&all_right_val_indices) {
let it = left_iter
.map_ok(move |tuple| -> Result<Option<Tuple>> {
let prefix = left_to_prefix_indices
@ -835,7 +835,7 @@ impl StoredRA {
} else {
Box::new(it.map_ok(move |t| eliminate_from_tuple(t, &eliminate_indices)))
})
};
}
}
fn prefix_join<'a>(
@ -1229,7 +1229,7 @@ impl TempStoreRA {
{
let mut lower_bound = prefix.clone();
lower_bound.extend(l_bound);
let mut upper_bound = prefix.clone();
let mut upper_bound = prefix;
upper_bound.extend(u_bound);
let it = if scan_epoch {
Left(storage.delta_range_iter(&lower_bound, &upper_bound, true))
@ -1744,7 +1744,7 @@ impl InnerJoin {
let it = CachedMaterializedIterator {
eliminate_indices,
left: left_iter,
left_cache: left_cache,
left_cache,
left_join_indices,
materialized: cached_data,
right_invert_indices,

@ -13,7 +13,10 @@ use itertools::Itertools;
use miette::{ensure, Diagnostic, Result};
use thiserror::Error;
use crate::data::program::{AlgoRuleArg, MagicSymbol, NormalFormAlgoOrRules, NormalFormAtom, NormalFormProgram, StratifiedNormalFormProgram};
use crate::data::program::{
AlgoRuleArg, MagicSymbol, NormalFormAlgoOrRules, NormalFormAtom, NormalFormProgram,
StratifiedNormalFormProgram,
};
use crate::data::symb::{Symbol, PROG_ENTRY};
use crate::parse::SourceSpan;
use crate::query::graph::{
@ -212,7 +215,9 @@ fn make_scc_reduced_graph<'a>(
impl NormalFormProgram {
/// returns the stratified program and the store lifetimes of the intermediate relations
pub(crate) fn stratify(self) -> Result<(StratifiedNormalFormProgram, BTreeMap<MagicSymbol, usize>)> {
pub(crate) fn stratify(
self,
) -> Result<(StratifiedNormalFormProgram, BTreeMap<MagicSymbol, usize>)> {
// prerequisite: the program is already in disjunctive normal form
// 0. build a graph of the program
let prog_entry: &Symbol = &Symbol::new(PROG_ENTRY, SourceSpan(0, 0));
@ -257,9 +262,11 @@ impl NormalFormProgram {
for (fr, tos) in &stratified_graph {
if let Some(fr_idx) = invert_indices.get(fr) {
if let Some(fr_stratum) = invert_sort_result.get(fr_idx) {
for (to, _) in tos {
for to in tos.keys() {
let used_in = n_strata - 1 - *fr_stratum;
let magic_to = MagicSymbol::Muggle { inner: (*to).clone() };
let magic_to = MagicSymbol::Muggle {
inner: (*to).clone(),
};
match store_lifetimes.entry(magic_to) {
Entry::Vacant(e) => {
e.insert(used_in);

@ -11,11 +11,14 @@ use std::default::Default;
use std::fmt::{Debug, Formatter};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
#[allow(unused_imports)]
use std::thread;
#[allow(unused_imports)]
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use either::{Left, Right};
use itertools::Itertools;
#[allow(unused_imports)]
use miette::{bail, ensure, miette, Diagnostic, IntoDiagnostic, Result, WrapErr};
use serde_json::json;
use smartstring::SmartString;
@ -439,6 +442,7 @@ impl<'s, S: Storage<'s>> Db<S> {
};
for p in ps {
#[allow(unused_variables)]
let sleep_opt = p.out_opts.sleep;
let (q_res, q_cleanups) = self.run_query(&mut tx, p)?;
res = q_res;

@ -19,7 +19,7 @@ use thiserror::Error;
use crate::data::memcmp::MemCmpEncoder;
use crate::data::relation::StoredRelationMetadata;
use crate::data::symb::Symbol;
use crate::data::tuple::{Tuple, TupleT, ENCODED_KEY_MIN_LEN, decode_tuple_from_key};
use crate::data::tuple::{decode_tuple_from_key, Tuple, TupleT, ENCODED_KEY_MIN_LEN};
use crate::data::value::DataValue;
use crate::parse::SourceSpan;
use crate::runtime::transact::SessionTx;
@ -226,17 +226,21 @@ impl RelationHandle {
tx.tx.range_scan_tuple(&lower, &upper)
}
pub(crate) fn get<'a>(&self, tx: &'a SessionTx<'_>, key: &[DataValue]) -> Result<Option<Tuple>> {
pub(crate) fn get<'a>(
&self,
tx: &'a SessionTx<'_>,
key: &[DataValue],
) -> Result<Option<Tuple>> {
let key_data = key.encode_as_key(self.id);
Ok(match tx.tx.get(&key_data, false)? {
None => None,
Some(val_data) => Some(decode_tuple_from_kv(&key_data, &val_data)),
})
Ok(tx
.tx
.get(&key_data, false)?
.map(|val_data| decode_tuple_from_kv(&key_data, &val_data)))
}
pub(crate) fn exists<'a>(&self, tx: &'a SessionTx<'_>, key: &[DataValue]) -> Result<bool> {
let key_data = key.encode_as_key(self.id);
Ok(tx.tx.exists(&key_data, false)?)
tx.tx.exists(&key_data, false)
}
pub(crate) fn scan_prefix<'a>(

@ -60,13 +60,29 @@ impl NormalTempStore {
pub(crate) fn put_with_skip(&mut self, tuple: Tuple) {
self.inner.insert(tuple, true);
}
fn merge(&mut self, mut other: Self) {
// returns true if prev is guaranteed to be the same as self after this function call,
// false if we are not sure.
pub(crate) fn merge_in(&mut self, prev: &mut Self, mut new: Self) -> bool {
prev.inner.clear();
if new.inner.is_empty() {
return false;
}
if self.inner.is_empty() {
mem::swap(&mut self.inner, &mut other.inner);
return;
mem::swap(&mut new, self);
return true;
}
for (k, v) in new.inner {
match self.inner.entry(k) {
Entry::Vacant(ent) => {
prev.inner.insert(ent.key().clone(), v);
ent.insert(v);
}
Entry::Occupied(mut ent) => {
ent.insert(v);
}
}
}
// must do it in this order! cannot swap!
self.inner.extend(other.inner)
false
}
}
@ -103,27 +119,6 @@ impl MeetAggrStore {
grouping_len,
})
}
fn merge(&mut self, mut other: Self) -> Result<()> {
// can switch the order because we are dealing with meet aggregations
if self.inner.len() < other.inner.len() {
mem::swap(&mut self.inner, &mut other.inner);
}
for (k, v) in other.inner {
match self.inner.entry(k) {
Entry::Vacant(ent) => {
ent.insert(v);
}
Entry::Occupied(mut ent) => {
let current = ent.get_mut();
for (i, (aggr, _)) in self.aggregations.iter().enumerate() {
let op = aggr.meet_op.as_ref().unwrap();
op.update(&mut current[i], &v[i])?;
}
}
}
}
Ok(())
}
// also need to check if value exists beforehand! use the idempotency!
// need to think this through more carefully.
pub(crate) fn meet_put(&mut self, tuple: Tuple) -> Result<bool> {
@ -182,6 +177,40 @@ impl MeetAggrStore {
}
})
}
/// returns true if prev is guaranteed to be the same as self after this function call,
/// false if we are not sure.
pub(crate) fn merge_in(&mut self, prev: &mut Self, mut new: Self) -> Result<bool> {
prev.inner.clear();
if new.inner.is_empty() {
return Ok(false);
}
if self.inner.is_empty() {
mem::swap(self, &mut new);
return Ok(true);
}
for (k, v) in new.inner {
match self.inner.entry(k) {
Entry::Vacant(ent) => {
prev.inner.insert(ent.key().clone(), v.clone());
ent.insert(v);
}
Entry::Occupied(mut ent) => {
let mut changed = false;
{
let target = ent.get_mut();
for (i, (aggr_op, _)) in self.aggregations.iter().enumerate() {
let op = aggr_op.meet_op.as_ref().unwrap();
changed |= op.update(&mut target[i], &v[i])?;
}
}
if changed {
prev.inner.insert(ent.key().clone(), ent.get().clone());
}
}
}
}
Ok(false)
}
}
#[derive(Debug)]
@ -191,32 +220,12 @@ pub(crate) enum TempStore {
}
impl TempStore {
// TODO
// pub(crate) fn new() -> Self {
// Self::Normal(NormalTempStore::default())
// }
fn exists(&self, key: &Tuple) -> bool {
match self {
TempStore::Normal(n) => n.exists(key),
TempStore::MeetAggr(m) => m.exists(key),
}
}
fn merge(&mut self, other: Self) -> Result<()> {
match (self, other) {
(TempStore::Normal(s), TempStore::Normal(o)) => {
s.merge(o);
Ok(())
}
(TempStore::MeetAggr(s), TempStore::MeetAggr(o)) => s.merge(o),
_ => unreachable!(),
}
}
// fn is_empty(&self) -> bool {
// match self {
// TempStore::Normal(n) => n.inner.is_empty(),
// TempStore::MeetAggr(m) => m.inner.is_empty(),
// }
// }
fn range_iter(
&self,
lower: &Tuple,
@ -228,36 +237,60 @@ impl TempStore {
TempStore::MeetAggr(m) => Right(m.range_iter(lower, upper, upper_inclusive)),
}
}
fn is_empty(&self) -> bool {
match self {
TempStore::Normal(n) => n.inner.is_empty(),
TempStore::MeetAggr(m) => m.inner.is_empty(),
}
}
}
#[derive(Debug)]
pub(crate) struct EpochStore {
prev: TempStore,
total: TempStore,
delta: TempStore,
use_total_for_delta: bool,
pub(crate) arity: usize,
}
impl EpochStore {
pub(crate) fn exists(&self, key: &Tuple) -> bool {
self.prev.exists(key) || self.delta.exists(key)
self.total.exists(key)
}
pub(crate) fn new_normal(arity: usize) -> Self {
Self {
prev: TempStore::Normal(NormalTempStore::default()),
total: TempStore::Normal(NormalTempStore::default()),
delta: TempStore::Normal(NormalTempStore::default()),
use_total_for_delta: true,
arity,
}
}
pub(crate) fn new_meet(aggrs: &[Option<(Aggregation, Vec<DataValue>)>]) -> Result<Self> {
Ok(Self {
prev: TempStore::MeetAggr(MeetAggrStore::new(aggrs.to_vec())?),
total: TempStore::MeetAggr(MeetAggrStore::new(aggrs.to_vec())?),
delta: TempStore::MeetAggr(MeetAggrStore::new(aggrs.to_vec())?),
use_total_for_delta: true,
arity: aggrs.len(),
})
}
pub(crate) fn merge(&mut self, mut new: TempStore) -> Result<()> {
mem::swap(&mut new, &mut self.delta);
self.prev.merge(new)
pub(crate) fn merge_in(&mut self, new: TempStore) -> Result<()> {
match (&mut self.total, &mut self.delta, new) {
(TempStore::Normal(total), TempStore::Normal(prev), TempStore::Normal(new)) => {
self.use_total_for_delta = total.merge_in(prev, new);
}
(TempStore::MeetAggr(total), TempStore::MeetAggr(prev), TempStore::MeetAggr(new)) => {
self.use_total_for_delta = total.merge_in(prev, new)?;
}
_ => unreachable!(),
}
Ok(())
}
pub(crate) fn has_delta(&self) -> bool {
if self.use_total_for_delta {
!self.total.is_empty()
} else {
!self.delta.is_empty()
}
}
pub(crate) fn range_iter(
&self,
@ -265,9 +298,7 @@ impl EpochStore {
upper: &Tuple,
upper_inclusive: bool,
) -> impl Iterator<Item = TupleInIter<'_>> {
self.delta
.range_iter(lower, upper, upper_inclusive)
.merge(self.prev.range_iter(lower, upper, upper_inclusive))
self.total.range_iter(lower, upper, upper_inclusive)
}
pub(crate) fn delta_range_iter(
&self,
@ -275,7 +306,11 @@ impl EpochStore {
upper: &Tuple,
upper_inclusive: bool,
) -> impl Iterator<Item = TupleInIter<'_>> {
self.delta.range_iter(lower, upper, upper_inclusive)
if self.use_total_for_delta {
self.total.range_iter(lower, upper, upper_inclusive)
} else {
self.delta.range_iter(lower, upper, upper_inclusive)
}
}
pub(crate) fn prefix_iter(&self, prefix: &Tuple) -> impl Iterator<Item = TupleInIter<'_>> {
let mut upper = prefix.to_vec();
@ -363,7 +398,7 @@ impl PartialEq<[DataValue]> for TupleInIter<'_> {
impl PartialOrd<[DataValue]> for TupleInIter<'_> {
fn partial_cmp(&self, other: &'_ [DataValue]) -> Option<Ordering> {
self.into_iter().partial_cmp(other.into_iter())
self.into_iter().partial_cmp(other.iter())
}
}

Loading…
Cancel
Save