push validity spec to RA

main
Ziyang Hu 2 years ago
parent 8042eb0246
commit 30070590e0

@ -238,7 +238,7 @@ impl<'a> SessionTx<'a> {
}
}
let right = RelAlgebra::relation(right_vars, store, rel_app.span);
let right = RelAlgebra::relation(right_vars, store, rel_app.span, rel_app.valid_at);
debug_assert_eq!(prev_joiner_vars.len(), right_joiner_vars.len());
ret = ret.join(right, prev_joiner_vars, right_joiner_vars, rel_app.span);
}
@ -306,7 +306,7 @@ impl<'a> SessionTx<'a> {
}
}
let right = RelAlgebra::relation(right_vars, store, relation_app.span);
let right = RelAlgebra::relation(right_vars, store, relation_app.span, relation_app.valid_at);
debug_assert_eq!(prev_joiner_vars.len(), right_joiner_vars.len());
ret = ret.neg_join(
right,
@ -366,7 +366,7 @@ impl<'a> SessionTx<'a> {
#[error("Symbol '{0}' in rule head is unbound")]
#[diagnostic(code(eval::unbound_symb_in_head))]
#[diagnostic(help(
"Note that symbols occurring only in negated positions are not considered bound"
"Note that symbols occurring only in negated positions are not considered bound"
))]
struct UnboundSymbolInRuleHead(String, #[label] SourceSpan);

@ -6,6 +6,7 @@
* You can obtain one at https://mozilla.org/MPL/2.0/.
*/
use std::cmp::Reverse;
use std::collections::{BTreeMap, BTreeSet};
use std::fmt::{Debug, Formatter};
use std::iter;
@ -31,6 +32,7 @@ pub(crate) enum RelAlgebra {
Fixed(InlineFixedRA),
TempStore(TempStoreRA),
Stored(StoredRA),
StoredWithValidity(StoredWithValidityRA),
Join(Box<InnerJoin>),
NegJoin(Box<NegJoin>),
Reorder(ReorderRA),
@ -49,6 +51,7 @@ impl RelAlgebra {
RelAlgebra::Reorder(i) => i.relation.span(),
RelAlgebra::Filter(i) => i.span,
RelAlgebra::Unification(i) => i.span,
RelAlgebra::StoredWithValidity(i) => i.span,
}
}
}
@ -266,6 +269,13 @@ impl Debug for RelAlgebra {
.field(&r.storage.name)
.field(&r.filters)
.finish(),
RelAlgebra::StoredWithValidity(r) => f
.debug_tuple("StoredWithValidity")
.field(&bindings)
.field(&r.storage.name)
.field(&r.filters)
.field(&r.valid_at)
.finish(),
RelAlgebra::Join(r) => {
if r.left.is_unit() {
r.right.fmt(f)
@ -317,6 +327,9 @@ impl RelAlgebra {
RelAlgebra::Stored(v) => {
v.fill_binding_indices()?;
}
RelAlgebra::StoredWithValidity(v) => {
v.fill_binding_indices()?;
}
RelAlgebra::Reorder(r) => {
r.relation.fill_binding_indices()?;
}
@ -367,13 +380,27 @@ impl RelAlgebra {
bindings: Vec<Symbol>,
storage: RelationHandle,
span: SourceSpan,
validity: Option<Reverse<i64>>,
) -> Self {
Self::Stored(StoredRA {
bindings,
storage,
filters: vec![],
span,
})
match validity {
None => {
Self::Stored(StoredRA {
bindings,
storage,
filters: vec![],
span,
})
}
Some(vld) => {
Self::StoredWithValidity(StoredWithValidityRA {
bindings,
storage,
filters: vec![],
valid_at: vld,
span,
})
}
}
}
pub(crate) fn reorder(self, new_order: Vec<Symbol>) -> Self {
Self::Reorder(ReorderRA {
@ -396,11 +423,11 @@ impl RelAlgebra {
})
}
RelAlgebra::Filter(FilteredRA {
parent,
mut pred,
to_eliminate,
span,
}) => {
parent,
mut pred,
to_eliminate,
span,
}) => {
pred.push(filter);
RelAlgebra::Filter(FilteredRA {
parent,
@ -410,11 +437,11 @@ impl RelAlgebra {
})
}
RelAlgebra::TempStore(TempStoreRA {
bindings,
storage_key,
mut filters,
span,
}) => {
bindings,
storage_key,
mut filters,
span,
}) => {
filters.push(filter);
RelAlgebra::TempStore(TempStoreRA {
bindings,
@ -424,11 +451,11 @@ impl RelAlgebra {
})
}
RelAlgebra::Stored(StoredRA {
bindings,
storage,
mut filters,
span,
}) => {
bindings,
storage,
mut filters,
span,
}) => {
filters.push(filter);
RelAlgebra::Stored(StoredRA {
bindings,
@ -437,6 +464,22 @@ impl RelAlgebra {
span,
})
}
RelAlgebra::StoredWithValidity(StoredWithValidityRA {
bindings,
storage,
mut filters,
span,
valid_at,
}) => {
filters.push(filter);
RelAlgebra::StoredWithValidity(StoredWithValidityRA {
bindings,
storage,
filters,
span,
valid_at,
})
}
RelAlgebra::Join(inner) => {
let filters = filter.to_conjunction();
let left_bindings: BTreeSet<Symbol> =
@ -701,8 +744,8 @@ fn invert_option_err<T>(v: Result<Option<T>>) -> Option<Result<T>> {
fn filter_iter(
filters: Vec<Expr>,
it: impl Iterator<Item = Result<Tuple>>,
) -> impl Iterator<Item = Result<Tuple>> {
it: impl Iterator<Item=Result<Tuple>>,
) -> impl Iterator<Item=Result<Tuple>> {
it.filter_map_ok(move |t| -> Option<Result<Tuple>> {
for p in filters.iter() {
match p.eval_pred(&t) {
@ -716,7 +759,7 @@ fn filter_iter(
}
Some(Ok(t))
})
.map(flatten_err)
.map(flatten_err)
}
fn get_eliminate_indices(bindings: &[Symbol], eliminate: &BTreeSet<Symbol>) -> BTreeSet<usize> {
@ -741,6 +784,130 @@ pub(crate) struct StoredRA {
pub(crate) span: SourceSpan,
}
#[derive(Debug)]
pub(crate) struct StoredWithValidityRA {
pub(crate) bindings: Vec<Symbol>,
pub(crate) storage: RelationHandle,
pub(crate) filters: Vec<Expr>,
pub(crate) valid_at: Reverse<i64>,
pub(crate) span: SourceSpan,
}
impl StoredWithValidityRA {
fn fill_binding_indices(&mut self) -> Result<()> {
let bindings: BTreeMap<_, _> = self
.bindings
.iter()
.cloned()
.enumerate()
.map(|(a, b)| (b, a))
.collect();
for e in self.filters.iter_mut() {
e.fill_binding_indices(&bindings)?;
}
Ok(())
}
fn iter<'a>(&'a self, tx: &'a SessionTx<'_>) -> Result<TupleIter<'a>> {
todo!()
// let it = self.storage.scan_all(tx);
// Ok(if self.filters.is_empty() {
// Box::new(it)
// } else {
// Box::new(filter_iter(self.filters.clone(), it))
// })
}
fn prefix_join<'a>(
&'a self,
tx: &'a SessionTx<'_>,
left_iter: TupleIter<'a>,
(left_join_indices, right_join_indices): (Vec<usize>, Vec<usize>),
eliminate_indices: BTreeSet<usize>,
left_tuple_len: usize,
) -> Result<TupleIter<'a>> {
todo!()
// let mut right_invert_indices = right_join_indices.iter().enumerate().collect_vec();
// right_invert_indices.sort_by_key(|(_, b)| **b);
// let left_to_prefix_indices = right_invert_indices
// .into_iter()
// .map(|(a, _)| left_join_indices[a])
// .collect_vec();
//
// let key_len = self.storage.metadata.keys.len();
// if left_to_prefix_indices.len() >= key_len {
// return self.point_lookup_join(
// tx,
// left_iter,
// key_len,
// left_to_prefix_indices,
// eliminate_indices,
// left_tuple_len,
// );
// }
//
// let mut skip_range_check = false;
// // In some cases, maybe we can stop as soon as we get one result?
// let it = left_iter
// .map_ok(move |tuple| {
// let prefix = left_to_prefix_indices
// .iter()
// .map(|i| tuple[*i].clone())
// .collect_vec();
//
// if !skip_range_check && !self.filters.is_empty() {
// let other_bindings = &self.bindings[right_join_indices.len()..];
// let (l_bound, u_bound) = match compute_bounds(&self.filters, other_bindings) {
// Ok(b) => b,
// _ => (vec![], vec![]),
// };
// if !l_bound.iter().all(|v| *v == DataValue::Null)
// || !u_bound.iter().all(|v| *v == DataValue::Bot)
// {
// return Left(
// self.storage
// .scan_bounded_prefix(tx, &prefix, &l_bound, &u_bound)
// .map(move |res_found| -> Result<Option<Tuple>> {
// let found = res_found?;
// for p in self.filters.iter() {
// if !p.eval_pred(&found)? {
// return Ok(None);
// }
// }
// let mut ret = tuple.clone();
// ret.extend(found);
// Ok(Some(ret))
// })
// .filter_map(swap_option_result),
// );
// }
// }
// skip_range_check = true;
// Right(
// self.storage
// .scan_prefix(tx, &prefix)
// .map(move |res_found| -> Result<Option<Tuple>> {
// let found = res_found?;
// for p in self.filters.iter() {
// if !p.eval_pred(&found)? {
// return Ok(None);
// }
// }
// let mut ret = tuple.clone();
// ret.extend(found);
// Ok(Some(ret))
// })
// .filter_map(swap_option_result),
// )
// })
// .flatten_ok()
// .map(flatten_err);
// Ok(if eliminate_indices.is_empty() {
// Box::new(it)
// } else {
// Box::new(it.map_ok(move |t| eliminate_from_tuple(t, &eliminate_indices)))
// })
}
}
impl StoredRA {
fn fill_binding_indices(&mut self) -> Result<()> {
let bindings: BTreeMap<_, _> = self
@ -951,7 +1118,7 @@ impl StoredRA {
'outer: for found in self.storage.scan_prefix(tx, &prefix) {
let found = found?;
for (left_idx, right_idx) in
left_join_indices.iter().zip(right_join_indices.iter())
left_join_indices.iter().zip(right_join_indices.iter())
{
if tuple[*left_idx] != found[*right_idx] {
continue 'outer;
@ -1114,7 +1281,7 @@ impl TempStoreRA {
'outer: for found in storage.prefix_iter(&prefix) {
for (left_idx, right_idx) in
left_join_indices.iter().zip(right_join_indices.iter())
left_join_indices.iter().zip(right_join_indices.iter())
{
if tuple[*left_idx] != *found.get(*right_idx) {
continue 'outer;
@ -1247,7 +1414,7 @@ impl TempStoreRA {
Ok(Some(ret))
}
})
.filter_map(swap_option_result),
.filter_map(swap_option_result),
);
}
}
@ -1277,7 +1444,7 @@ impl TempStoreRA {
Ok(Some(ret))
}
})
.filter_map(swap_option_result),
.filter_map(swap_option_result),
)
})
.flatten_ok()
@ -1300,7 +1467,7 @@ impl Debug for Joiner {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let left_bindings = BindingFormatter(self.left_keys.clone());
let right_bindings = BindingFormatter(self.right_keys.clone());
write!(f, "{:?}<->{:?}", left_bindings, right_bindings,)
write!(f, "{:?}<->{:?}", left_bindings, right_bindings, )
}
}
@ -1345,6 +1512,7 @@ impl RelAlgebra {
RelAlgebra::Fixed(r) => r.do_eliminate_temp_vars(used),
RelAlgebra::TempStore(_r) => Ok(()),
RelAlgebra::Stored(_v) => Ok(()),
RelAlgebra::StoredWithValidity(_v) => Ok(()),
RelAlgebra::Join(r) => r.do_eliminate_temp_vars(used),
RelAlgebra::Reorder(r) => r.relation.eliminate_temp_vars(used),
RelAlgebra::Filter(r) => r.do_eliminate_temp_vars(used),
@ -1358,6 +1526,7 @@ impl RelAlgebra {
RelAlgebra::Fixed(r) => Some(&r.to_eliminate),
RelAlgebra::TempStore(_) => None,
RelAlgebra::Stored(_) => None,
RelAlgebra::StoredWithValidity(_) => None,
RelAlgebra::Join(r) => Some(&r.to_eliminate),
RelAlgebra::Reorder(_) => None,
RelAlgebra::Filter(r) => Some(&r.to_eliminate),
@ -1382,6 +1551,7 @@ impl RelAlgebra {
RelAlgebra::Fixed(f) => f.bindings.clone(),
RelAlgebra::TempStore(d) => d.bindings.clone(),
RelAlgebra::Stored(v) => v.bindings.clone(),
RelAlgebra::StoredWithValidity(v) => v.bindings.clone(),
RelAlgebra::Join(j) => j.bindings(),
RelAlgebra::Reorder(r) => r.bindings(),
RelAlgebra::Filter(r) => r.parent.bindings_after_eliminate(),
@ -1403,6 +1573,7 @@ impl RelAlgebra {
RelAlgebra::Fixed(f) => Ok(Box::new(f.data.iter().map(|t| Ok(t.clone())))),
RelAlgebra::TempStore(r) => r.iter(delta_rule, stores),
RelAlgebra::Stored(v) => v.iter(tx),
RelAlgebra::StoredWithValidity(v) => v.iter(tx),
RelAlgebra::Join(j) => j.iter(tx, delta_rule, stores),
RelAlgebra::Reorder(r) => r.iter(tx, delta_rule, stores),
RelAlgebra::Filter(r) => r.iter(tx, delta_rule, stores),
@ -1587,6 +1758,20 @@ impl InnerJoin {
"stored_mat_join"
}
}
RelAlgebra::StoredWithValidity(_) => {
let join_indices = self
.joiner
.join_indices(
&self.left.bindings_after_eliminate(),
&self.right.bindings_after_eliminate(),
)
.unwrap();
if join_is_prefix(&join_indices.1) {
"stored_prefix_join"
} else {
"stored_mat_join"
}
}
RelAlgebra::Join(_) | RelAlgebra::Filter(_) | RelAlgebra::Unification(_) => {
"generic_mat_join"
}
@ -1662,6 +1847,27 @@ impl InnerJoin {
self.materialized_join(tx, eliminate_indices, delta_rule, stores)
}
}
RelAlgebra::StoredWithValidity(r) => {
let join_indices = self
.joiner
.join_indices(
&self.left.bindings_after_eliminate(),
&self.right.bindings_after_eliminate(),
)
.unwrap();
if join_is_prefix(&join_indices.1) {
let left_len = self.left.bindings_after_eliminate().len();
r.prefix_join(
tx,
self.left.iter(tx, delta_rule, stores)?,
join_indices,
eliminate_indices,
left_len,
)
} else {
self.materialized_join(tx, eliminate_indices, delta_rule, stores)
}
}
RelAlgebra::Join(_) | RelAlgebra::Filter(_) | RelAlgebra::Unification(_) => {
self.materialized_join(tx, eliminate_indices, delta_rule, stores)
}

@ -34,9 +34,7 @@ use crate::fixed_rule::DEFAULT_FIXED_RULES;
use crate::parse::sys::SysOp;
use crate::parse::{parse_script, CozoScript, SourceSpan};
use crate::query::compile::{CompiledProgram, CompiledRule, CompiledRuleSet};
use crate::query::ra::{
FilteredRA, InnerJoin, NegJoin, RelAlgebra, ReorderRA, StoredRA, TempStoreRA, UnificationRA,
};
use crate::query::ra::{FilteredRA, InnerJoin, NegJoin, RelAlgebra, ReorderRA, StoredRA, StoredWithValidityRA, TempStoreRA, UnificationRA};
use crate::runtime::relation::{AccessLevel, InsufficientAccessLevel, RelationHandle, RelationId};
use crate::runtime::transact::SessionTx;
use crate::storage::{Storage, StoreTx};
@ -612,6 +610,14 @@ impl<'s, S: Storage<'s>> Db<S> {
json!(null),
json!(filters.iter().map(|f| f.to_string()).collect_vec()),
),
RelAlgebra::StoredWithValidity(StoredWithValidityRA {
storage, filters, ..
}) => (
"load_stored_with_validity",
json!(format!(":{}", storage.name)),
json!(null),
json!(filters.iter().map(|f| f.to_string()).collect_vec()),
),
RelAlgebra::Join(inner) => {
if inner.left.is_unit() {
rel_stack.push(&inner.right);

Loading…
Cancel
Save