Change the way temp stores are passed

main
Ziyang Hu 2 years ago
parent f6f7aa9ccf
commit 2d6b56fdf3

@ -101,7 +101,7 @@ impl<'a> SessionTx<'a> {
for (name, ruleset) in &stratum.prog { for (name, ruleset) in &stratum.prog {
stores.insert( stores.insert(
name.clone(), name.clone(),
self.new_rule_store(name.clone(), ruleset.arity()?), self.new_rule_store(ruleset.arity()?),
); );
} }
} }
@ -165,15 +165,12 @@ impl<'a> SessionTx<'a> {
for atom in &rule.body { for atom in &rule.body {
match atom { match atom {
MagicAtom::Rule(rule_app) => { MagicAtom::Rule(rule_app) => {
let store = stores let store = stores.get(&rule_app.name).ok_or_else(|| {
.get(&rule_app.name) RuleNotFound(
.ok_or_else(|| { rule_app.name.symbol().to_string(),
RuleNotFound( rule_app.name.symbol().span,
rule_app.name.symbol().to_string(), )
rule_app.name.symbol().span, })?;
)
})?
.clone();
ensure!( ensure!(
store.arity == rule_app.args.len(), store.arity == rule_app.args.len(),
@ -200,7 +197,8 @@ impl<'a> SessionTx<'a> {
} }
} }
let right = RelAlgebra::derived(right_vars, store, rule_app.span); let right =
RelAlgebra::derived(right_vars, rule_app.name.clone(), rule_app.span);
debug_assert_eq!(prev_joiner_vars.len(), right_joiner_vars.len()); debug_assert_eq!(prev_joiner_vars.len(), right_joiner_vars.len());
ret = ret.join(right, prev_joiner_vars, right_joiner_vars, rule_app.span); ret = ret.join(right, prev_joiner_vars, right_joiner_vars, rule_app.span);
} }
@ -250,8 +248,7 @@ impl<'a> SessionTx<'a> {
rule_app.name.symbol().to_string(), rule_app.name.symbol().to_string(),
rule_app.name.symbol().span, rule_app.name.symbol().span,
) )
})? })?;
.clone();
ensure!( ensure!(
store.arity == rule_app.args.len(), store.arity == rule_app.args.len(),
ArityMismatch( ArityMismatch(
@ -277,7 +274,7 @@ impl<'a> SessionTx<'a> {
} }
} }
let right = RelAlgebra::derived(right_vars, store, rule_app.span); let right = RelAlgebra::derived(right_vars, rule_app.name.clone(), rule_app.span);
debug_assert_eq!(prev_joiner_vars.len(), right_joiner_vars.len()); debug_assert_eq!(prev_joiner_vars.len(), right_joiner_vars.len());
ret = ret.neg_join(right, prev_joiner_vars, right_joiner_vars, rule_app.span); ret = ret.neg_join(right, prev_joiner_vars, right_joiner_vars, rule_app.span);
} }

@ -209,7 +209,7 @@ impl<'a> SessionTx<'a> {
for (rule_n, rule) in ruleset.iter().enumerate() { for (rule_n, rule) in ruleset.iter().enumerate() {
debug!("initial calculation for rule {:?}.{}", rule_symb, rule_n); debug!("initial calculation for rule {:?}.{}", rule_symb, rule_n);
for item_res in rule.relation.iter(self, Some(0), &use_delta)? { for item_res in rule.relation.iter(self, Some(0), &use_delta, stores)? {
let item = item_res?; let item = item_res?;
trace!("item for {:?}.{}: {:?} at {}", rule_symb, rule_n, item, 0); trace!("item for {:?}.{}: {:?} at {}", rule_symb, rule_n, item, 0);
if should_check_limit { if should_check_limit {
@ -247,7 +247,7 @@ impl<'a> SessionTx<'a> {
for (aggr, args) in aggr.iter_mut().flatten() { for (aggr, args) in aggr.iter_mut().flatten() {
aggr.meet_init(args)?; aggr.meet_init(args)?;
} }
for item_res in rule.relation.iter(self, Some(0), &use_delta)? { for item_res in rule.relation.iter(self, Some(0), &use_delta, stores)? {
let item = item_res?; let item = item_res?;
trace!("item for {:?}.{}: {:?} at {}", rule_symb, rule_n, item, 0); trace!("item for {:?}.{}: {:?} at {}", rule_symb, rule_n, item, 0);
store.aggr_meet_put(&item, &mut aggr, 0)?; store.aggr_meet_put(&item, &mut aggr, 0)?;
@ -313,7 +313,7 @@ impl<'a> SessionTx<'a> {
}) })
.collect_vec(); .collect_vec();
for item_res in rule.relation.iter(self, Some(0), &use_delta)? { for item_res in rule.relation.iter(self, Some(0), &use_delta, stores)? {
let item = item_res?; let item = item_res?;
trace!("item for {:?}.{}: {:?} at {}", rule_symb, rule_n, item, 0); trace!("item for {:?}.{}: {:?} at {}", rule_symb, rule_n, item, 0);
@ -442,7 +442,7 @@ impl<'a> SessionTx<'a> {
delta_key, rule_symb, rule_n delta_key, rule_symb, rule_n
); );
let use_delta = BTreeSet::from([delta_store.id]); let use_delta = BTreeSet::from([delta_store.id]);
for item_res in rule.relation.iter(self, Some(epoch), &use_delta)? { for item_res in rule.relation.iter(self, Some(epoch), &use_delta, stores)? {
let item = item_res?; let item = item_res?;
// improvement: the clauses can actually be evaluated in parallel // improvement: the clauses can actually be evaluated in parallel
if store.exists(&item, 0) { if store.exists(&item, 0) {
@ -514,7 +514,7 @@ impl<'a> SessionTx<'a> {
delta_key, rule_symb, rule_n delta_key, rule_symb, rule_n
); );
let use_delta = BTreeSet::from([delta_store.id]); let use_delta = BTreeSet::from([delta_store.id]);
for item_res in rule.relation.iter(self, Some(epoch), &use_delta)? { for item_res in rule.relation.iter(self, Some(epoch), &use_delta, stores)? {
let item = item_res?; let item = item_res?;
// improvement: the clauses can actually be evaluated in parallel // improvement: the clauses can actually be evaluated in parallel
let aggr_changed = store.aggr_meet_put(&item, &mut aggr, epoch)?; let aggr_changed = store.aggr_meet_put(&item, &mut aggr, epoch)?;

@ -20,6 +20,7 @@ use miette::{Diagnostic, Result};
use thiserror::Error; use thiserror::Error;
use crate::data::expr::{compute_bounds, Expr}; use crate::data::expr::{compute_bounds, Expr};
use crate::data::program::MagicSymbol;
use crate::data::symb::Symbol; use crate::data::symb::Symbol;
use crate::data::tuple::{Tuple, TupleIter}; use crate::data::tuple::{Tuple, TupleIter};
use crate::data::value::DataValue; use crate::data::value::DataValue;
@ -31,7 +32,7 @@ use crate::utils::swap_option_result;
pub(crate) enum RelAlgebra { pub(crate) enum RelAlgebra {
Fixed(InlineFixedRA), Fixed(InlineFixedRA),
InMem(InMemRelationRA), TempStore(TempStoreRA),
Stored(StoredRA), Stored(StoredRA),
Join(Box<InnerJoin>), Join(Box<InnerJoin>),
NegJoin(Box<NegJoin>), NegJoin(Box<NegJoin>),
@ -44,7 +45,7 @@ impl RelAlgebra {
pub(crate) fn span(&self) -> SourceSpan { pub(crate) fn span(&self) -> SourceSpan {
match self { match self {
RelAlgebra::Fixed(i) => i.span, RelAlgebra::Fixed(i) => i.span,
RelAlgebra::InMem(i) => i.span, RelAlgebra::TempStore(i) => i.span,
RelAlgebra::Stored(i) => i.span, RelAlgebra::Stored(i) => i.span,
RelAlgebra::Join(i) => i.span, RelAlgebra::Join(i) => i.span,
RelAlgebra::NegJoin(i) => i.span, RelAlgebra::NegJoin(i) => i.span,
@ -114,6 +115,7 @@ impl UnificationRA {
tx: &'a SessionTx<'_>, tx: &'a SessionTx<'_>,
epoch: Option<u32>, epoch: Option<u32>,
use_delta: &BTreeSet<StoredRelationId>, use_delta: &BTreeSet<StoredRelationId>,
stores: &'a BTreeMap<MagicSymbol, InMemRelation>
) -> Result<TupleIter<'a>> { ) -> Result<TupleIter<'a>> {
let mut bindings = self.parent.bindings_after_eliminate(); let mut bindings = self.parent.bindings_after_eliminate();
bindings.push(self.binding.clone()); bindings.push(self.binding.clone());
@ -121,7 +123,7 @@ impl UnificationRA {
Ok(if self.is_multi { Ok(if self.is_multi {
let it = self let it = self
.parent .parent
.iter(tx, epoch, use_delta)? .iter(tx, epoch, use_delta, stores)?
.map_ok(move |tuple| -> Result<Vec<Tuple>> { .map_ok(move |tuple| -> Result<Vec<Tuple>> {
let result_list = self.expr.eval(&tuple)?; let result_list = self.expr.eval(&tuple)?;
let result_list = result_list.get_list().ok_or_else(|| { let result_list = result_list.get_list().ok_or_else(|| {
@ -149,7 +151,7 @@ impl UnificationRA {
} else { } else {
Box::new( Box::new(
self.parent self.parent
.iter(tx, epoch, use_delta)? .iter(tx, epoch, use_delta, stores)?
.map_ok(move |tuple| -> Result<Tuple> { .map_ok(move |tuple| -> Result<Tuple> {
let result = self.expr.eval(&tuple)?; let result = self.expr.eval(&tuple)?;
let mut ret = tuple; let mut ret = tuple;
@ -204,12 +206,13 @@ impl FilteredRA {
tx: &'a SessionTx<'_>, tx: &'a SessionTx<'_>,
epoch: Option<u32>, epoch: Option<u32>,
use_delta: &BTreeSet<StoredRelationId>, use_delta: &BTreeSet<StoredRelationId>,
stores: &'a BTreeMap<MagicSymbol, InMemRelation>
) -> Result<TupleIter<'a>> { ) -> Result<TupleIter<'a>> {
let bindings = self.parent.bindings_after_eliminate(); let bindings = self.parent.bindings_after_eliminate();
let eliminate_indices = get_eliminate_indices(&bindings, &self.to_eliminate); let eliminate_indices = get_eliminate_indices(&bindings, &self.to_eliminate);
Ok(Box::new( Ok(Box::new(
self.parent self.parent
.iter(tx, epoch, use_delta)? .iter(tx, epoch, use_delta, stores)?
.filter_map(move |tuple| match tuple { .filter_map(move |tuple| match tuple {
Ok(t) => { Ok(t) => {
for p in self.pred.iter() { for p in self.pred.iter() {
@ -256,10 +259,10 @@ impl Debug for RelAlgebra {
.finish() .finish()
} }
} }
RelAlgebra::InMem(r) => f RelAlgebra::TempStore(r) => f
.debug_tuple("Derived") .debug_tuple("Derived")
.field(&bindings) .field(&bindings)
.field(&r.storage.rule_name) .field(&r.storage_key)
.field(&r.filters) .field(&r.filters)
.finish(), .finish(),
RelAlgebra::Stored(r) => f RelAlgebra::Stored(r) => f
@ -313,7 +316,7 @@ impl RelAlgebra {
pub(crate) fn fill_binding_indices(&mut self) -> Result<()> { pub(crate) fn fill_binding_indices(&mut self) -> Result<()> {
match self { match self {
RelAlgebra::Fixed(_) => {} RelAlgebra::Fixed(_) => {}
RelAlgebra::InMem(d) => { RelAlgebra::TempStore(d) => {
d.fill_binding_indices()?; d.fill_binding_indices()?;
} }
RelAlgebra::Stored(v) => { RelAlgebra::Stored(v) => {
@ -353,10 +356,10 @@ impl RelAlgebra {
pub(crate) fn cartesian_join(self, right: RelAlgebra, span: SourceSpan) -> Self { pub(crate) fn cartesian_join(self, right: RelAlgebra, span: SourceSpan) -> Self {
self.join(right, vec![], vec![], span) self.join(right, vec![], vec![], span)
} }
pub(crate) fn derived(bindings: Vec<Symbol>, storage: InMemRelation, span: SourceSpan) -> Self { pub(crate) fn derived(bindings: Vec<Symbol>, storage_key: MagicSymbol, span: SourceSpan) -> Self {
Self::InMem(InMemRelationRA { Self::TempStore(TempStoreRA {
bindings, bindings,
storage, storage_key,
filters: vec![], filters: vec![],
span, span,
}) })
@ -407,16 +410,16 @@ impl RelAlgebra {
span, span,
}) })
} }
RelAlgebra::InMem(InMemRelationRA { RelAlgebra::TempStore(TempStoreRA {
bindings, bindings,
storage, storage_key,
mut filters, mut filters,
span, span,
}) => { }) => {
filters.push(filter); filters.push(filter);
RelAlgebra::InMem(InMemRelationRA { RelAlgebra::TempStore(TempStoreRA {
bindings, bindings,
storage, storage_key,
filters, filters,
span, span,
}) })
@ -555,6 +558,7 @@ impl ReorderRA {
tx: &'a SessionTx<'_>, tx: &'a SessionTx<'_>,
epoch: Option<u32>, epoch: Option<u32>,
use_delta: &BTreeSet<StoredRelationId>, use_delta: &BTreeSet<StoredRelationId>,
stores: &'a BTreeMap<MagicSymbol, InMemRelation>
) -> Result<TupleIter<'a>> { ) -> Result<TupleIter<'a>> {
let old_order = self.relation.bindings_after_eliminate(); let old_order = self.relation.bindings_after_eliminate();
let old_order_indices: BTreeMap<_, _> = old_order let old_order_indices: BTreeMap<_, _> = old_order
@ -571,7 +575,7 @@ impl ReorderRA {
.expect("program logic error: reorder indices mismatch") .expect("program logic error: reorder indices mismatch")
}) })
.collect_vec(); .collect_vec();
Ok(Box::new(self.relation.iter(tx, epoch, use_delta)?.map_ok( Ok(Box::new(self.relation.iter(tx, epoch, use_delta, stores)?.map_ok(
move |tuple| { move |tuple| {
let old = tuple; let old = tuple;
let new = reorder_indices let new = reorder_indices
@ -1041,14 +1045,14 @@ fn join_is_prefix(right_join_indices: &[usize]) -> bool {
} }
#[derive(Debug)] #[derive(Debug)]
pub(crate) struct InMemRelationRA { pub(crate) struct TempStoreRA {
pub(crate) bindings: Vec<Symbol>, pub(crate) bindings: Vec<Symbol>,
pub(crate) storage: InMemRelation, pub(crate) storage_key: MagicSymbol,
pub(crate) filters: Vec<Expr>, pub(crate) filters: Vec<Expr>,
pub(crate) span: SourceSpan, pub(crate) span: SourceSpan,
} }
impl InMemRelationRA { impl TempStoreRA {
fn fill_binding_indices(&mut self) -> Result<()> { fn fill_binding_indices(&mut self) -> Result<()> {
let bindings: BTreeMap<_, _> = self let bindings: BTreeMap<_, _> = self
.bindings .bindings
@ -1063,26 +1067,28 @@ impl InMemRelationRA {
Ok(()) Ok(())
} }
fn iter( fn iter<'a>(
&self, &'a self,
epoch: Option<u32>, epoch: Option<u32>,
use_delta: &BTreeSet<StoredRelationId>, use_delta: &BTreeSet<StoredRelationId>,
) -> Result<TupleIter<'_>> { stores: &'a BTreeMap<MagicSymbol, InMemRelation>
if epoch == Some(0) && use_delta.contains(&self.storage.id) { ) -> Result<TupleIter<'a>> {
let storage = stores.get(&self.storage_key).unwrap();
if epoch == Some(0) && use_delta.contains(&storage.id) {
return Ok(Box::new(iter::empty())); return Ok(Box::new(iter::empty()));
} }
let scan_epoch = match epoch { let scan_epoch = match epoch {
None => 0, None => 0,
Some(ep) => { Some(ep) => {
if use_delta.contains(&self.storage.id) { if use_delta.contains(&storage.id) {
ep - 1 ep - 1
} else { } else {
0 0
} }
} }
}; };
let it = self.storage.scan_all_for_epoch(scan_epoch); let it = storage.scan_all_for_epoch(scan_epoch);
Ok(if self.filters.is_empty() { Ok(if self.filters.is_empty() {
Box::new(it) Box::new(it)
} else { } else {
@ -1094,7 +1100,9 @@ impl InMemRelationRA {
left_iter: TupleIter<'a>, left_iter: TupleIter<'a>,
(left_join_indices, right_join_indices): (Vec<usize>, Vec<usize>), (left_join_indices, right_join_indices): (Vec<usize>, Vec<usize>),
eliminate_indices: BTreeSet<usize>, eliminate_indices: BTreeSet<usize>,
stores: &'a BTreeMap<MagicSymbol, InMemRelation>
) -> Result<TupleIter<'a>> { ) -> Result<TupleIter<'a>> {
let storage = stores.get(&self.storage_key).unwrap();
debug_assert!(!right_join_indices.is_empty()); debug_assert!(!right_join_indices.is_empty());
let mut right_invert_indices = right_join_indices.iter().enumerate().collect_vec(); let mut right_invert_indices = right_join_indices.iter().enumerate().collect_vec();
right_invert_indices.sort_by_key(|(_, b)| **b); right_invert_indices.sort_by_key(|(_, b)| **b);
@ -1114,7 +1122,7 @@ impl InMemRelationRA {
.map(|i| tuple[*i].clone()) .map(|i| tuple[*i].clone())
.collect_vec(); .collect_vec();
'outer: for found in self.storage.scan_prefix(&prefix) { 'outer: for found in storage.scan_prefix(&prefix) {
let found = found?; let found = found?;
for (left_idx, right_idx) in for (left_idx, right_idx) in
left_join_indices.iter().zip(right_join_indices.iter()) left_join_indices.iter().zip(right_join_indices.iter())
@ -1147,7 +1155,7 @@ impl InMemRelationRA {
)) ))
} else { } else {
let mut right_join_vals = BTreeSet::new(); let mut right_join_vals = BTreeSet::new();
for tuple in self.storage.scan_all() { for tuple in storage.scan_all() {
let tuple = tuple?; let tuple = tuple?;
let to_join: Box<[DataValue]> = right_join_indices let to_join: Box<[DataValue]> = right_join_indices
.iter() .iter()
@ -1194,8 +1202,10 @@ impl InMemRelationRA {
eliminate_indices: BTreeSet<usize>, eliminate_indices: BTreeSet<usize>,
epoch: Option<u32>, epoch: Option<u32>,
use_delta: &BTreeSet<StoredRelationId>, use_delta: &BTreeSet<StoredRelationId>,
stores: &'a BTreeMap<MagicSymbol, InMemRelation>
) -> Result<TupleIter<'a>> { ) -> Result<TupleIter<'a>> {
if epoch == Some(0) && use_delta.contains(&self.storage.id) { let storage = stores.get(&self.storage_key).unwrap();
if epoch == Some(0) && use_delta.contains(&storage.id) {
return Ok(Box::new(iter::empty())); return Ok(Box::new(iter::empty()));
} }
let mut right_invert_indices = right_join_indices.iter().enumerate().collect_vec(); let mut right_invert_indices = right_join_indices.iter().enumerate().collect_vec();
@ -1207,7 +1217,7 @@ impl InMemRelationRA {
let scan_epoch = match epoch { let scan_epoch = match epoch {
None => 0, None => 0,
Some(ep) => { Some(ep) => {
if use_delta.contains(&self.storage.id) { if use_delta.contains(&storage.id) {
ep - 1 ep - 1
} else { } else {
0 0
@ -1232,7 +1242,7 @@ impl InMemRelationRA {
|| !u_bound.iter().all(|v| *v == DataValue::Bot) || !u_bound.iter().all(|v| *v == DataValue::Bot)
{ {
return Left( return Left(
self.storage storage
.scan_bounded_prefix_for_epoch( .scan_bounded_prefix_for_epoch(
&prefix, &l_bound, &u_bound, scan_epoch, &prefix, &l_bound, &u_bound, scan_epoch,
) )
@ -1253,7 +1263,7 @@ impl InMemRelationRA {
} }
skip_range_check = true; skip_range_check = true;
Right( Right(
self.storage storage
.scan_prefix_for_epoch(&prefix, scan_epoch) .scan_prefix_for_epoch(&prefix, scan_epoch)
.map(move |res_found| -> Result<Option<Tuple>> { .map(move |res_found| -> Result<Option<Tuple>> {
let found = res_found?; let found = res_found?;
@ -1332,7 +1342,7 @@ impl RelAlgebra {
pub(crate) fn eliminate_temp_vars(&mut self, used: &BTreeSet<Symbol>) -> Result<()> { pub(crate) fn eliminate_temp_vars(&mut self, used: &BTreeSet<Symbol>) -> Result<()> {
match self { match self {
RelAlgebra::Fixed(r) => r.do_eliminate_temp_vars(used), RelAlgebra::Fixed(r) => r.do_eliminate_temp_vars(used),
RelAlgebra::InMem(_r) => Ok(()), RelAlgebra::TempStore(_r) => Ok(()),
RelAlgebra::Stored(_v) => Ok(()), RelAlgebra::Stored(_v) => Ok(()),
RelAlgebra::Join(r) => r.do_eliminate_temp_vars(used), RelAlgebra::Join(r) => r.do_eliminate_temp_vars(used),
RelAlgebra::Reorder(r) => r.relation.eliminate_temp_vars(used), RelAlgebra::Reorder(r) => r.relation.eliminate_temp_vars(used),
@ -1345,7 +1355,7 @@ impl RelAlgebra {
fn eliminate_set(&self) -> Option<&BTreeSet<Symbol>> { fn eliminate_set(&self) -> Option<&BTreeSet<Symbol>> {
match self { match self {
RelAlgebra::Fixed(r) => Some(&r.to_eliminate), RelAlgebra::Fixed(r) => Some(&r.to_eliminate),
RelAlgebra::InMem(_) => None, RelAlgebra::TempStore(_) => None,
RelAlgebra::Stored(_) => None, RelAlgebra::Stored(_) => None,
RelAlgebra::Join(r) => Some(&r.to_eliminate), RelAlgebra::Join(r) => Some(&r.to_eliminate),
RelAlgebra::Reorder(_) => None, RelAlgebra::Reorder(_) => None,
@ -1369,7 +1379,7 @@ impl RelAlgebra {
fn bindings_before_eliminate(&self) -> Vec<Symbol> { fn bindings_before_eliminate(&self) -> Vec<Symbol> {
match self { match self {
RelAlgebra::Fixed(f) => f.bindings.clone(), RelAlgebra::Fixed(f) => f.bindings.clone(),
RelAlgebra::InMem(d) => d.bindings.clone(), RelAlgebra::TempStore(d) => d.bindings.clone(),
RelAlgebra::Stored(v) => v.bindings.clone(), RelAlgebra::Stored(v) => v.bindings.clone(),
RelAlgebra::Join(j) => j.bindings(), RelAlgebra::Join(j) => j.bindings(),
RelAlgebra::Reorder(r) => r.bindings(), RelAlgebra::Reorder(r) => r.bindings(),
@ -1387,16 +1397,17 @@ impl RelAlgebra {
tx: &'a SessionTx<'_>, tx: &'a SessionTx<'_>,
epoch: Option<u32>, epoch: Option<u32>,
use_delta: &BTreeSet<StoredRelationId>, use_delta: &BTreeSet<StoredRelationId>,
stores: &'a BTreeMap<MagicSymbol, InMemRelation>
) -> Result<TupleIter<'a>> { ) -> Result<TupleIter<'a>> {
match self { match self {
RelAlgebra::Fixed(f) => Ok(Box::new(f.data.iter().map(|t| Ok(t.clone())))), RelAlgebra::Fixed(f) => Ok(Box::new(f.data.iter().map(|t| Ok(t.clone())))),
RelAlgebra::InMem(r) => r.iter(epoch, use_delta), RelAlgebra::TempStore(r) => r.iter(epoch, use_delta, stores),
RelAlgebra::Stored(v) => v.iter(tx), RelAlgebra::Stored(v) => v.iter(tx),
RelAlgebra::Join(j) => j.iter(tx, epoch, use_delta), RelAlgebra::Join(j) => j.iter(tx, epoch, use_delta, stores),
RelAlgebra::Reorder(r) => r.iter(tx, epoch, use_delta), RelAlgebra::Reorder(r) => r.iter(tx, epoch, use_delta, stores),
RelAlgebra::Filter(r) => r.iter(tx, epoch, use_delta), RelAlgebra::Filter(r) => r.iter(tx, epoch, use_delta, stores),
RelAlgebra::NegJoin(r) => r.iter(tx, epoch, use_delta), RelAlgebra::NegJoin(r) => r.iter(tx, epoch, use_delta, stores),
RelAlgebra::Unification(r) => r.iter(tx, epoch, use_delta), RelAlgebra::Unification(r) => r.iter(tx, epoch, use_delta, stores),
} }
} }
} }
@ -1426,7 +1437,7 @@ impl NegJoin {
pub(crate) fn join_type(&self) -> &str { pub(crate) fn join_type(&self) -> &str {
match &self.right { match &self.right {
RelAlgebra::InMem(_) => { RelAlgebra::TempStore(_) => {
let join_indices = self let join_indices = self
.joiner .joiner
.join_indices( .join_indices(
@ -1465,11 +1476,12 @@ impl NegJoin {
tx: &'a SessionTx<'_>, tx: &'a SessionTx<'_>,
epoch: Option<u32>, epoch: Option<u32>,
use_delta: &BTreeSet<StoredRelationId>, use_delta: &BTreeSet<StoredRelationId>,
stores: &'a BTreeMap<MagicSymbol, InMemRelation>
) -> Result<TupleIter<'a>> { ) -> Result<TupleIter<'a>> {
let bindings = self.left.bindings_after_eliminate(); let bindings = self.left.bindings_after_eliminate();
let eliminate_indices = get_eliminate_indices(&bindings, &self.to_eliminate); let eliminate_indices = get_eliminate_indices(&bindings, &self.to_eliminate);
match &self.right { match &self.right {
RelAlgebra::InMem(r) => { RelAlgebra::TempStore(r) => {
let join_indices = self let join_indices = self
.joiner .joiner
.join_indices( .join_indices(
@ -1478,9 +1490,10 @@ impl NegJoin {
) )
.unwrap(); .unwrap();
r.neg_join( r.neg_join(
self.left.iter(tx, epoch, use_delta)?, self.left.iter(tx, epoch, use_delta, stores)?,
join_indices, join_indices,
eliminate_indices, eliminate_indices,
stores
) )
} }
RelAlgebra::Stored(v) => { RelAlgebra::Stored(v) => {
@ -1493,7 +1506,7 @@ impl NegJoin {
.unwrap(); .unwrap();
v.neg_join( v.neg_join(
tx, tx,
self.left.iter(tx, epoch, use_delta)?, self.left.iter(tx, epoch, use_delta, stores)?,
join_indices, join_indices,
eliminate_indices, eliminate_indices,
) )
@ -1526,7 +1539,7 @@ impl InnerJoin {
let mut left = used.clone(); let mut left = used.clone();
left.extend(self.joiner.left_keys.clone()); left.extend(self.joiner.left_keys.clone());
if let Some(filters) = match &self.right { if let Some(filters) = match &self.right {
RelAlgebra::InMem(r) => Some(&r.filters), RelAlgebra::TempStore(r) => Some(&r.filters),
_ => None, _ => None,
} { } {
for filter in filters { for filter in filters {
@ -1549,7 +1562,7 @@ impl InnerJoin {
pub(crate) fn join_type(&self) -> &str { pub(crate) fn join_type(&self) -> &str {
match &self.right { match &self.right {
RelAlgebra::Fixed(f) => f.join_type(), RelAlgebra::Fixed(f) => f.join_type(),
RelAlgebra::InMem(_) => { RelAlgebra::TempStore(_) => {
let join_indices = self let join_indices = self
.joiner .joiner
.join_indices( .join_indices(
@ -1593,6 +1606,7 @@ impl InnerJoin {
tx: &'a SessionTx<'_>, tx: &'a SessionTx<'_>,
epoch: Option<u32>, epoch: Option<u32>,
use_delta: &BTreeSet<StoredRelationId>, use_delta: &BTreeSet<StoredRelationId>,
stores: &'a BTreeMap<MagicSymbol, InMemRelation>
) -> Result<TupleIter<'a>> { ) -> Result<TupleIter<'a>> {
let bindings = self.bindings(); let bindings = self.bindings();
let eliminate_indices = get_eliminate_indices(&bindings, &self.to_eliminate); let eliminate_indices = get_eliminate_indices(&bindings, &self.to_eliminate);
@ -1606,12 +1620,12 @@ impl InnerJoin {
) )
.unwrap(); .unwrap();
f.join( f.join(
self.left.iter(tx, epoch, use_delta)?, self.left.iter(tx, epoch, use_delta, stores)?,
join_indices, join_indices,
eliminate_indices, eliminate_indices,
) )
} }
RelAlgebra::InMem(r) => { RelAlgebra::TempStore(r) => {
let join_indices = self let join_indices = self
.joiner .joiner
.join_indices( .join_indices(
@ -1621,14 +1635,15 @@ impl InnerJoin {
.unwrap(); .unwrap();
if join_is_prefix(&join_indices.1) { if join_is_prefix(&join_indices.1) {
r.prefix_join( r.prefix_join(
self.left.iter(tx, epoch, use_delta)?, self.left.iter(tx, epoch, use_delta, stores)?,
join_indices, join_indices,
eliminate_indices, eliminate_indices,
epoch, epoch,
use_delta, use_delta,
stores
) )
} else { } else {
self.materialized_join(tx, eliminate_indices, epoch, use_delta) self.materialized_join(tx, eliminate_indices, epoch, use_delta, stores)
} }
} }
RelAlgebra::Stored(r) => { RelAlgebra::Stored(r) => {
@ -1643,17 +1658,17 @@ impl InnerJoin {
let left_len = self.left.bindings_after_eliminate().len(); let left_len = self.left.bindings_after_eliminate().len();
r.prefix_join( r.prefix_join(
tx, tx,
self.left.iter(tx, epoch, use_delta)?, self.left.iter(tx, epoch, use_delta, stores)?,
join_indices, join_indices,
eliminate_indices, eliminate_indices,
left_len, left_len,
) )
} else { } else {
self.materialized_join(tx, eliminate_indices, epoch, use_delta) self.materialized_join(tx, eliminate_indices, epoch, use_delta, stores)
} }
} }
RelAlgebra::Join(_) | RelAlgebra::Filter(_) | RelAlgebra::Unification(_) => { RelAlgebra::Join(_) | RelAlgebra::Filter(_) | RelAlgebra::Unification(_) => {
self.materialized_join(tx, eliminate_indices, epoch, use_delta) self.materialized_join(tx, eliminate_indices, epoch, use_delta, stores)
} }
RelAlgebra::Reorder(_) => { RelAlgebra::Reorder(_) => {
panic!("joining on reordered") panic!("joining on reordered")
@ -1669,6 +1684,7 @@ impl InnerJoin {
eliminate_indices: BTreeSet<usize>, eliminate_indices: BTreeSet<usize>,
epoch: Option<u32>, epoch: Option<u32>,
use_delta: &BTreeSet<StoredRelationId>, use_delta: &BTreeSet<StoredRelationId>,
stores: &'a BTreeMap<MagicSymbol, InMemRelation>
) -> Result<TupleIter<'a>> { ) -> Result<TupleIter<'a>> {
let right_bindings = self.right.bindings_after_eliminate(); let right_bindings = self.right.bindings_after_eliminate();
let (left_join_indices, right_join_indices) = self let (left_join_indices, right_join_indices) = self
@ -1676,7 +1692,7 @@ impl InnerJoin {
.join_indices(&self.left.bindings_after_eliminate(), &right_bindings) .join_indices(&self.left.bindings_after_eliminate(), &right_bindings)
.unwrap(); .unwrap();
let mut left_iter = self.left.iter(tx, epoch, use_delta)?; let mut left_iter = self.left.iter(tx, epoch, use_delta, stores)?;
let left_cache = match left_iter.next() { let left_cache = match left_iter.next() {
None => return Ok(Box::new(iter::empty())), None => return Ok(Box::new(iter::empty())),
Some(Err(err)) => return Err(err), Some(Err(err)) => return Err(err),
@ -1701,7 +1717,7 @@ impl InnerJoin {
self.mat_right_cache.borrow().clone() self.mat_right_cache.borrow().clone()
} else { } else {
let mut cache = BTreeSet::new(); let mut cache = BTreeSet::new();
for item in self.right.iter(tx, epoch, use_delta)? { for item in self.right.iter(tx, epoch, use_delta, stores)? {
match item { match item {
Ok(tuple) => { Ok(tuple) => {
let stored_tuple = right_store_indices let stored_tuple = right_store_indices

@ -31,7 +31,7 @@ use crate::parse::sys::SysOp;
use crate::parse::{parse_script, CozoScript, SourceSpan}; use crate::parse::{parse_script, CozoScript, SourceSpan};
use crate::query::compile::{CompiledProgram, CompiledRule, CompiledRuleSet}; use crate::query::compile::{CompiledProgram, CompiledRule, CompiledRuleSet};
use crate::query::ra::{ use crate::query::ra::{
FilteredRA, InMemRelationRA, InnerJoin, NegJoin, RelAlgebra, ReorderRA, StoredRA, UnificationRA, FilteredRA, TempStoreRA, InnerJoin, NegJoin, RelAlgebra, ReorderRA, StoredRA, UnificationRA,
}; };
use crate::runtime::relation::{AccessLevel, InsufficientAccessLevel, RelationHandle, RelationId}; use crate::runtime::relation::{AccessLevel, InsufficientAccessLevel, RelationHandle, RelationId};
use crate::runtime::transact::SessionTx; use crate::runtime::transact::SessionTx;
@ -529,11 +529,11 @@ impl<'s, S: Storage<'s>> Db<S> {
} }
("fixed", json!(null), json!(null), json!(null)) ("fixed", json!(null), json!(null), json!(null))
} }
RelAlgebra::InMem(InMemRelationRA { RelAlgebra::TempStore(TempStoreRA {
storage, filters, .. storage_key, filters, ..
}) => ( }) => (
"load_mem", "load_mem",
json!(storage.rule_name.to_string()), json!(storage_key.to_string()),
json!(null), json!(null),
json!(filters.iter().map(|f| f.to_string()).collect_vec()), json!(filters.iter().map(|f| f.to_string()).collect_vec()),
), ),

@ -18,7 +18,6 @@ use itertools::Itertools;
use miette::Result; use miette::Result;
use crate::data::aggr::Aggregation; use crate::data::aggr::Aggregation;
use crate::data::program::MagicSymbol;
use crate::data::tuple::Tuple; use crate::data::tuple::Tuple;
use crate::data::value::DataValue; use crate::data::value::DataValue;
@ -36,7 +35,6 @@ pub(crate) struct InMemRelation {
mem_db: Rc<RefCell<Vec<Rc<RefCell<BTreeMap<Tuple, Tuple>>>>>>, mem_db: Rc<RefCell<Vec<Rc<RefCell<BTreeMap<Tuple, Tuple>>>>>>,
epoch_size: Arc<AtomicU32>, epoch_size: Arc<AtomicU32>,
pub(crate) id: StoredRelationId, pub(crate) id: StoredRelationId,
pub(crate) rule_name: MagicSymbol,
pub(crate) arity: usize, pub(crate) arity: usize,
} }
@ -47,12 +45,11 @@ impl Debug for InMemRelation {
} }
impl InMemRelation { impl InMemRelation {
pub(crate) fn new(id: StoredRelationId, rule_name: MagicSymbol, arity: usize) -> InMemRelation { pub(crate) fn new(id: StoredRelationId, arity: usize) -> InMemRelation {
Self { Self {
epoch_size: Default::default(), epoch_size: Default::default(),
mem_db: Default::default(), mem_db: Default::default(),
id, id,
rule_name,
arity, arity,
} }
} }

@ -11,7 +11,6 @@ use std::sync::Arc;
use miette::Result; use miette::Result;
use crate::data::program::MagicSymbol;
use crate::data::tuple::TupleT; use crate::data::tuple::TupleT;
use crate::data::value::DataValue; use crate::data::value::DataValue;
use crate::runtime::in_mem::{InMemRelation, StoredRelationId}; use crate::runtime::in_mem::{InMemRelation, StoredRelationId};
@ -25,10 +24,10 @@ pub struct SessionTx<'a> {
} }
impl<'a> SessionTx<'a> { impl<'a> SessionTx<'a> {
pub(crate) fn new_rule_store(&self, rule_name: MagicSymbol, arity: usize) -> InMemRelation { pub(crate) fn new_rule_store(&self, arity: usize) -> InMemRelation {
let old_count = self.mem_store_id.fetch_add(1, Ordering::AcqRel); let old_count = self.mem_store_id.fetch_add(1, Ordering::AcqRel);
let old_count = old_count & 0x00ff_ffffu32; let old_count = old_count & 0x00ff_ffffu32;
let ret = InMemRelation::new(StoredRelationId(old_count), rule_name, arity); let ret = InMemRelation::new(StoredRelationId(old_count), arity);
ret.ensure_mem_db_for_epoch(0); ret.ensure_mem_db_for_epoch(0);
ret ret
} }

Loading…
Cancel
Save