diff --git a/cozo-core/src/query/compile.rs b/cozo-core/src/query/compile.rs index 9f5515cb..0b94b3ee 100644 --- a/cozo-core/src/query/compile.rs +++ b/cozo-core/src/query/compile.rs @@ -238,7 +238,7 @@ impl<'a> SessionTx<'a> { } } - let right = RelAlgebra::relation(right_vars, store, rel_app.span); + let right = RelAlgebra::relation(right_vars, store, rel_app.span, rel_app.valid_at); debug_assert_eq!(prev_joiner_vars.len(), right_joiner_vars.len()); ret = ret.join(right, prev_joiner_vars, right_joiner_vars, rel_app.span); } @@ -306,7 +306,7 @@ impl<'a> SessionTx<'a> { } } - let right = RelAlgebra::relation(right_vars, store, relation_app.span); + let right = RelAlgebra::relation(right_vars, store, relation_app.span, relation_app.valid_at); debug_assert_eq!(prev_joiner_vars.len(), right_joiner_vars.len()); ret = ret.neg_join( right, @@ -366,7 +366,7 @@ impl<'a> SessionTx<'a> { #[error("Symbol '{0}' in rule head is unbound")] #[diagnostic(code(eval::unbound_symb_in_head))] #[diagnostic(help( - "Note that symbols occurring only in negated positions are not considered bound" + "Note that symbols occurring only in negated positions are not considered bound" ))] struct UnboundSymbolInRuleHead(String, #[label] SourceSpan); diff --git a/cozo-core/src/query/ra.rs b/cozo-core/src/query/ra.rs index d1fc5ece..a28c87f7 100644 --- a/cozo-core/src/query/ra.rs +++ b/cozo-core/src/query/ra.rs @@ -6,6 +6,7 @@ * You can obtain one at https://mozilla.org/MPL/2.0/. */ +use std::cmp::Reverse; use std::collections::{BTreeMap, BTreeSet}; use std::fmt::{Debug, Formatter}; use std::iter; @@ -31,6 +32,7 @@ pub(crate) enum RelAlgebra { Fixed(InlineFixedRA), TempStore(TempStoreRA), Stored(StoredRA), + StoredWithValidity(StoredWithValidityRA), Join(Box), NegJoin(Box), Reorder(ReorderRA), @@ -49,6 +51,7 @@ impl RelAlgebra { RelAlgebra::Reorder(i) => i.relation.span(), RelAlgebra::Filter(i) => i.span, RelAlgebra::Unification(i) => i.span, + RelAlgebra::StoredWithValidity(i) => i.span, } } } @@ -266,6 +269,13 @@ impl Debug for RelAlgebra { .field(&r.storage.name) .field(&r.filters) .finish(), + RelAlgebra::StoredWithValidity(r) => f + .debug_tuple("StoredWithValidity") + .field(&bindings) + .field(&r.storage.name) + .field(&r.filters) + .field(&r.valid_at) + .finish(), RelAlgebra::Join(r) => { if r.left.is_unit() { r.right.fmt(f) @@ -317,6 +327,9 @@ impl RelAlgebra { RelAlgebra::Stored(v) => { v.fill_binding_indices()?; } + RelAlgebra::StoredWithValidity(v) => { + v.fill_binding_indices()?; + } RelAlgebra::Reorder(r) => { r.relation.fill_binding_indices()?; } @@ -367,13 +380,27 @@ impl RelAlgebra { bindings: Vec, storage: RelationHandle, span: SourceSpan, + validity: Option>, ) -> Self { - Self::Stored(StoredRA { - bindings, - storage, - filters: vec![], - span, - }) + match validity { + None => { + Self::Stored(StoredRA { + bindings, + storage, + filters: vec![], + span, + }) + } + Some(vld) => { + Self::StoredWithValidity(StoredWithValidityRA { + bindings, + storage, + filters: vec![], + valid_at: vld, + span, + }) + } + } } pub(crate) fn reorder(self, new_order: Vec) -> Self { Self::Reorder(ReorderRA { @@ -396,11 +423,11 @@ impl RelAlgebra { }) } RelAlgebra::Filter(FilteredRA { - parent, - mut pred, - to_eliminate, - span, - }) => { + parent, + mut pred, + to_eliminate, + span, + }) => { pred.push(filter); RelAlgebra::Filter(FilteredRA { parent, @@ -410,11 +437,11 @@ impl RelAlgebra { }) } RelAlgebra::TempStore(TempStoreRA { - bindings, - storage_key, - mut filters, - span, - }) => { + bindings, + storage_key, + mut filters, + span, + }) => { filters.push(filter); RelAlgebra::TempStore(TempStoreRA { bindings, @@ -424,11 +451,11 @@ impl RelAlgebra { }) } RelAlgebra::Stored(StoredRA { - bindings, - storage, - mut filters, - span, - }) => { + bindings, + storage, + mut filters, + span, + }) => { filters.push(filter); RelAlgebra::Stored(StoredRA { bindings, @@ -437,6 +464,22 @@ impl RelAlgebra { span, }) } + RelAlgebra::StoredWithValidity(StoredWithValidityRA { + bindings, + storage, + mut filters, + span, + valid_at, + }) => { + filters.push(filter); + RelAlgebra::StoredWithValidity(StoredWithValidityRA { + bindings, + storage, + filters, + span, + valid_at, + }) + } RelAlgebra::Join(inner) => { let filters = filter.to_conjunction(); let left_bindings: BTreeSet = @@ -701,8 +744,8 @@ fn invert_option_err(v: Result>) -> Option> { fn filter_iter( filters: Vec, - it: impl Iterator>, -) -> impl Iterator> { + it: impl Iterator>, +) -> impl Iterator> { it.filter_map_ok(move |t| -> Option> { for p in filters.iter() { match p.eval_pred(&t) { @@ -716,7 +759,7 @@ fn filter_iter( } Some(Ok(t)) }) - .map(flatten_err) + .map(flatten_err) } fn get_eliminate_indices(bindings: &[Symbol], eliminate: &BTreeSet) -> BTreeSet { @@ -741,6 +784,130 @@ pub(crate) struct StoredRA { pub(crate) span: SourceSpan, } +#[derive(Debug)] +pub(crate) struct StoredWithValidityRA { + pub(crate) bindings: Vec, + pub(crate) storage: RelationHandle, + pub(crate) filters: Vec, + pub(crate) valid_at: Reverse, + pub(crate) span: SourceSpan, +} + +impl StoredWithValidityRA { + fn fill_binding_indices(&mut self) -> Result<()> { + let bindings: BTreeMap<_, _> = self + .bindings + .iter() + .cloned() + .enumerate() + .map(|(a, b)| (b, a)) + .collect(); + for e in self.filters.iter_mut() { + e.fill_binding_indices(&bindings)?; + } + Ok(()) + } + fn iter<'a>(&'a self, tx: &'a SessionTx<'_>) -> Result> { + todo!() + // let it = self.storage.scan_all(tx); + // Ok(if self.filters.is_empty() { + // Box::new(it) + // } else { + // Box::new(filter_iter(self.filters.clone(), it)) + // }) + } + fn prefix_join<'a>( + &'a self, + tx: &'a SessionTx<'_>, + left_iter: TupleIter<'a>, + (left_join_indices, right_join_indices): (Vec, Vec), + eliminate_indices: BTreeSet, + left_tuple_len: usize, + ) -> Result> { + todo!() + // let mut right_invert_indices = right_join_indices.iter().enumerate().collect_vec(); + // right_invert_indices.sort_by_key(|(_, b)| **b); + // let left_to_prefix_indices = right_invert_indices + // .into_iter() + // .map(|(a, _)| left_join_indices[a]) + // .collect_vec(); + // + // let key_len = self.storage.metadata.keys.len(); + // if left_to_prefix_indices.len() >= key_len { + // return self.point_lookup_join( + // tx, + // left_iter, + // key_len, + // left_to_prefix_indices, + // eliminate_indices, + // left_tuple_len, + // ); + // } + // + // let mut skip_range_check = false; + // // In some cases, maybe we can stop as soon as we get one result? + // let it = left_iter + // .map_ok(move |tuple| { + // let prefix = left_to_prefix_indices + // .iter() + // .map(|i| tuple[*i].clone()) + // .collect_vec(); + // + // if !skip_range_check && !self.filters.is_empty() { + // let other_bindings = &self.bindings[right_join_indices.len()..]; + // let (l_bound, u_bound) = match compute_bounds(&self.filters, other_bindings) { + // Ok(b) => b, + // _ => (vec![], vec![]), + // }; + // if !l_bound.iter().all(|v| *v == DataValue::Null) + // || !u_bound.iter().all(|v| *v == DataValue::Bot) + // { + // return Left( + // self.storage + // .scan_bounded_prefix(tx, &prefix, &l_bound, &u_bound) + // .map(move |res_found| -> Result> { + // let found = res_found?; + // for p in self.filters.iter() { + // if !p.eval_pred(&found)? { + // return Ok(None); + // } + // } + // let mut ret = tuple.clone(); + // ret.extend(found); + // Ok(Some(ret)) + // }) + // .filter_map(swap_option_result), + // ); + // } + // } + // skip_range_check = true; + // Right( + // self.storage + // .scan_prefix(tx, &prefix) + // .map(move |res_found| -> Result> { + // let found = res_found?; + // for p in self.filters.iter() { + // if !p.eval_pred(&found)? { + // return Ok(None); + // } + // } + // let mut ret = tuple.clone(); + // ret.extend(found); + // Ok(Some(ret)) + // }) + // .filter_map(swap_option_result), + // ) + // }) + // .flatten_ok() + // .map(flatten_err); + // Ok(if eliminate_indices.is_empty() { + // Box::new(it) + // } else { + // Box::new(it.map_ok(move |t| eliminate_from_tuple(t, &eliminate_indices))) + // }) + } +} + impl StoredRA { fn fill_binding_indices(&mut self) -> Result<()> { let bindings: BTreeMap<_, _> = self @@ -951,7 +1118,7 @@ impl StoredRA { 'outer: for found in self.storage.scan_prefix(tx, &prefix) { let found = found?; for (left_idx, right_idx) in - left_join_indices.iter().zip(right_join_indices.iter()) + left_join_indices.iter().zip(right_join_indices.iter()) { if tuple[*left_idx] != found[*right_idx] { continue 'outer; @@ -1114,7 +1281,7 @@ impl TempStoreRA { 'outer: for found in storage.prefix_iter(&prefix) { for (left_idx, right_idx) in - left_join_indices.iter().zip(right_join_indices.iter()) + left_join_indices.iter().zip(right_join_indices.iter()) { if tuple[*left_idx] != *found.get(*right_idx) { continue 'outer; @@ -1247,7 +1414,7 @@ impl TempStoreRA { Ok(Some(ret)) } }) - .filter_map(swap_option_result), + .filter_map(swap_option_result), ); } } @@ -1277,7 +1444,7 @@ impl TempStoreRA { Ok(Some(ret)) } }) - .filter_map(swap_option_result), + .filter_map(swap_option_result), ) }) .flatten_ok() @@ -1300,7 +1467,7 @@ impl Debug for Joiner { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { let left_bindings = BindingFormatter(self.left_keys.clone()); let right_bindings = BindingFormatter(self.right_keys.clone()); - write!(f, "{:?}<->{:?}", left_bindings, right_bindings,) + write!(f, "{:?}<->{:?}", left_bindings, right_bindings, ) } } @@ -1345,6 +1512,7 @@ impl RelAlgebra { RelAlgebra::Fixed(r) => r.do_eliminate_temp_vars(used), RelAlgebra::TempStore(_r) => Ok(()), RelAlgebra::Stored(_v) => Ok(()), + RelAlgebra::StoredWithValidity(_v) => Ok(()), RelAlgebra::Join(r) => r.do_eliminate_temp_vars(used), RelAlgebra::Reorder(r) => r.relation.eliminate_temp_vars(used), RelAlgebra::Filter(r) => r.do_eliminate_temp_vars(used), @@ -1358,6 +1526,7 @@ impl RelAlgebra { RelAlgebra::Fixed(r) => Some(&r.to_eliminate), RelAlgebra::TempStore(_) => None, RelAlgebra::Stored(_) => None, + RelAlgebra::StoredWithValidity(_) => None, RelAlgebra::Join(r) => Some(&r.to_eliminate), RelAlgebra::Reorder(_) => None, RelAlgebra::Filter(r) => Some(&r.to_eliminate), @@ -1382,6 +1551,7 @@ impl RelAlgebra { RelAlgebra::Fixed(f) => f.bindings.clone(), RelAlgebra::TempStore(d) => d.bindings.clone(), RelAlgebra::Stored(v) => v.bindings.clone(), + RelAlgebra::StoredWithValidity(v) => v.bindings.clone(), RelAlgebra::Join(j) => j.bindings(), RelAlgebra::Reorder(r) => r.bindings(), RelAlgebra::Filter(r) => r.parent.bindings_after_eliminate(), @@ -1403,6 +1573,7 @@ impl RelAlgebra { RelAlgebra::Fixed(f) => Ok(Box::new(f.data.iter().map(|t| Ok(t.clone())))), RelAlgebra::TempStore(r) => r.iter(delta_rule, stores), RelAlgebra::Stored(v) => v.iter(tx), + RelAlgebra::StoredWithValidity(v) => v.iter(tx), RelAlgebra::Join(j) => j.iter(tx, delta_rule, stores), RelAlgebra::Reorder(r) => r.iter(tx, delta_rule, stores), RelAlgebra::Filter(r) => r.iter(tx, delta_rule, stores), @@ -1587,6 +1758,20 @@ impl InnerJoin { "stored_mat_join" } } + RelAlgebra::StoredWithValidity(_) => { + let join_indices = self + .joiner + .join_indices( + &self.left.bindings_after_eliminate(), + &self.right.bindings_after_eliminate(), + ) + .unwrap(); + if join_is_prefix(&join_indices.1) { + "stored_prefix_join" + } else { + "stored_mat_join" + } + } RelAlgebra::Join(_) | RelAlgebra::Filter(_) | RelAlgebra::Unification(_) => { "generic_mat_join" } @@ -1662,6 +1847,27 @@ impl InnerJoin { self.materialized_join(tx, eliminate_indices, delta_rule, stores) } } + RelAlgebra::StoredWithValidity(r) => { + let join_indices = self + .joiner + .join_indices( + &self.left.bindings_after_eliminate(), + &self.right.bindings_after_eliminate(), + ) + .unwrap(); + if join_is_prefix(&join_indices.1) { + let left_len = self.left.bindings_after_eliminate().len(); + r.prefix_join( + tx, + self.left.iter(tx, delta_rule, stores)?, + join_indices, + eliminate_indices, + left_len, + ) + } else { + self.materialized_join(tx, eliminate_indices, delta_rule, stores) + } + } RelAlgebra::Join(_) | RelAlgebra::Filter(_) | RelAlgebra::Unification(_) => { self.materialized_join(tx, eliminate_indices, delta_rule, stores) } diff --git a/cozo-core/src/runtime/db.rs b/cozo-core/src/runtime/db.rs index 57eff8b2..b8018181 100644 --- a/cozo-core/src/runtime/db.rs +++ b/cozo-core/src/runtime/db.rs @@ -34,9 +34,7 @@ use crate::fixed_rule::DEFAULT_FIXED_RULES; use crate::parse::sys::SysOp; use crate::parse::{parse_script, CozoScript, SourceSpan}; use crate::query::compile::{CompiledProgram, CompiledRule, CompiledRuleSet}; -use crate::query::ra::{ - FilteredRA, InnerJoin, NegJoin, RelAlgebra, ReorderRA, StoredRA, TempStoreRA, UnificationRA, -}; +use crate::query::ra::{FilteredRA, InnerJoin, NegJoin, RelAlgebra, ReorderRA, StoredRA, StoredWithValidityRA, TempStoreRA, UnificationRA}; use crate::runtime::relation::{AccessLevel, InsufficientAccessLevel, RelationHandle, RelationId}; use crate::runtime::transact::SessionTx; use crate::storage::{Storage, StoreTx}; @@ -612,6 +610,14 @@ impl<'s, S: Storage<'s>> Db { json!(null), json!(filters.iter().map(|f| f.to_string()).collect_vec()), ), + RelAlgebra::StoredWithValidity(StoredWithValidityRA { + storage, filters, .. + }) => ( + "load_stored_with_validity", + json!(format!(":{}", storage.name)), + json!(null), + json!(filters.iter().map(|f| f.to_string()).collect_vec()), + ), RelAlgebra::Join(inner) => { if inner.left.is_unit() { rel_stack.push(&inner.right);