From ab667a6ac034dd40d78948c4e6b5dd83b5d25b27 Mon Sep 17 00:00:00 2001 From: Ziyang Hu Date: Mon, 5 Dec 2022 23:40:39 +0800 Subject: [PATCH] optimize materialized join --- Cargo.lock | 36 ++++++ cozo-core/Cargo.toml | 1 + cozo-core/src/query/relation.rs | 177 ++++++++++++++++++++++++------ cozo-core/src/runtime/transact.rs | 16 --- 4 files changed, 182 insertions(+), 48 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 72136edc..a782550c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,12 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "Inflector" +version = "0.11.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe438c63458706e03479442743baae6c88256498e6431708f6dfc520a26515d3" + [[package]] name = "addr2line" version = "0.17.0" @@ -55,6 +61,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "aliasable" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "250f629c0161ad8107cf89319e990051fae62832fd343083bea452d93e2205fd" + [[package]] name = "alloc-no-stdlib" version = "2.0.4" @@ -554,6 +566,7 @@ dependencies = [ "nalgebra", "num-traits", "ordered-float", + "ouroboros", "pest", "pest_derive", "priority-queue", @@ -2128,6 +2141,29 @@ version = "6.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b7820b9daea5457c9f21c69448905d723fbd21136ccf521748f23fd49e723ee" +[[package]] +name = "ouroboros" +version = "0.15.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dfbb50b356159620db6ac971c6d5c9ab788c9cc38a6f49619fca2a27acb062ca" +dependencies = [ + "aliasable", + "ouroboros_macro", +] + +[[package]] +name = "ouroboros_macro" +version = "0.15.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a0d9d1a6191c4f391f87219d1ea42b23f09ee84d64763cd05ee6ea88d9f384d" +dependencies = [ + "Inflector", + "proc-macro-error", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "owo-colors" version = "3.5.0" diff --git a/cozo-core/Cargo.toml b/cozo-core/Cargo.toml index 45942cfa..ac1f6ac5 100644 --- a/cozo-core/Cargo.toml +++ b/cozo-core/Cargo.toml @@ -123,6 +123,7 @@ thiserror = "1.0.34" uuid = { version = "1.1.2", features = ["v1", "v4", "serde"] } csv = "1.1.6" document-features = "0.2.6" +ouroboros = "0.15.5" rayon = { version = "1.5.3", optional = true } nalgebra = { version = "0.31.1", optional = true } minreq = { version = "2.6.0", features = ["https-rustls"], optional = true } diff --git a/cozo-core/src/query/relation.rs b/cozo-core/src/query/relation.rs index 99c0ef96..308e20d9 100644 --- a/cozo-core/src/query/relation.rs +++ b/cozo-core/src/query/relation.rs @@ -6,14 +6,19 @@ * You can obtain one at https://mozilla.org/MPL/2.0/. */ +use std::borrow::Borrow; +use std::cell::RefCell; use std::collections::{BTreeMap, BTreeSet}; use std::fmt::{Debug, Formatter}; use std::iter; +use std::rc::Rc; +use std::sync::atomic::{AtomicBool, Ordering}; use either::{Left, Right}; use itertools::Itertools; use log::{debug, error}; use miette::{Diagnostic, Result}; +use ouroboros::self_referencing; use thiserror::Error; use crate::data::expr::{compute_bounds, Expr}; @@ -450,6 +455,7 @@ impl RelAlgebra { joiner, to_eliminate, span, + .. } = *inner; for filter in filters { let f_bindings = filter.bindings(); @@ -467,6 +473,8 @@ impl RelAlgebra { joiner, to_eliminate, span, + mat_right_cache: RefCell::new(Default::default()), + cached: Default::default(), })); if !remaining.is_empty() { joined = RelAlgebra::Filter(FilteredRA { @@ -512,6 +520,8 @@ impl RelAlgebra { }, to_eliminate: Default::default(), span, + mat_right_cache: RefCell::new(Default::default()), + cached: Default::default(), })) } pub(crate) fn neg_join( @@ -1440,6 +1450,8 @@ pub(crate) struct InnerJoin { pub(crate) joiner: Joiner, pub(crate) to_eliminate: BTreeSet, pub(crate) span: SourceSpan, + mat_right_cache: RefCell>>>, + cached: AtomicBool, } impl InnerJoin { @@ -1612,43 +1624,144 @@ impl InnerJoin { .sorted_by_key(|(_, b)| **b) .map(|(a, _)| a) .collect_vec(); - let throwaway = tx.new_temp_store(SourceSpan(0, 0)); - for item in self.right.iter(tx, epoch, use_delta)? { - match item { - Ok(tuple) => { - let stored_tuple = Tuple( - right_store_indices + let throwaway = if self.cached.load(Ordering::Relaxed) { + self.mat_right_cache.borrow().clone() + } else { + let mut cache = BTreeSet::new(); + for item in self.right.iter(tx, epoch, use_delta)? { + match item { + Ok(tuple) => { + let stored_tuple = right_store_indices .iter() .map(|i| tuple.0[*i].clone()) - .collect_vec(), - ); - throwaway.put(stored_tuple, 0); + .collect_vec(); + cache.insert(stored_tuple); + } + Err(e) => return Ok(Box::new([Err(e)].into_iter())), } - Err(e) => return Ok(Box::new([Err(e)].into_iter())), } + *self.mat_right_cache.borrow_mut() = Rc::new(cache); + self.cached.store(true, Ordering::Relaxed); + self.mat_right_cache.borrow().clone() + }; + + let mut left_iter = self.left.iter(tx, epoch, use_delta)?; + let left_cache = match left_iter.next() { + None => return Ok(Box::new(iter::empty())), + Some(Err(err)) => return Ok(Box::new(iter::once(Err(err)))), + Some(Ok(data)) => data.0, + }; + let it = ThrowAwayIterBuilder { + eliminate_indices, + left: left_iter, + left_cache: left_cache.clone(), + left_join_indices: left_join_indices.clone(), + materialized: throwaway, + right_invert_indices, + mat_iter_builder: |mat: &Rc>>| { + build_mat_range_iter(mat, &left_join_indices, &left_cache) + }, } - Ok(Box::new( - self.left - .iter(tx, epoch, use_delta)? - .map_ok(move |tuple| { - let eliminate_indices = eliminate_indices.clone(); - let prefix = Tuple( - left_join_indices - .iter() - .map(|i| tuple.0[*i].clone()) - .collect_vec(), - ); - let restore_indices = right_invert_indices.clone(); - throwaway.scan_prefix(&prefix).map_ok(move |found| { - let mut ret = tuple.0.clone(); - for i in restore_indices.iter() { - ret.push(found.0[*i].clone()); + .build(); + Ok(Box::new(it)) + } +} + +#[self_referencing] +struct ThrowAwayIter<'a> { + materialized: Rc>>, + eliminate_indices: BTreeSet, + left_join_indices: Vec, + right_invert_indices: Vec, + #[borrows(materialized)] + #[covariant] + mat_iter: std::collections::btree_set::Range<'this, Vec>, + left: TupleIter<'a>, + left_cache: Vec, +} + +impl<'a> ThrowAwayIter<'a> { + fn next_inner(&mut self) -> Result> { + loop { + let right_nxt: Option<&Vec> = self.with_mat_iter_mut(|it| it.next()); + match right_nxt { + Some(data) => { + let data = data.clone(); + let mut ret = self.borrow_left_cache().clone(); + for i in self.borrow_right_invert_indices() { + ret.push(data[*i].clone()); + } + let tuple = eliminate_from_tuple(Tuple(ret), self.borrow_eliminate_indices()); + return Ok(Some(tuple)); + } + None => { + let next_left = self.with_left_mut(|l| l.next()); + match next_left { + None => return Ok(None), + Some(l) => { + let left_tuple = l?.0; + + self.with_left_cache_mut(|cache| { + *cache = left_tuple.clone(); + }); + + self.with_mut(|fields| { + let it = build_mat_range_iter( + fields.materialized.borrow(), + &fields.left_join_indices, + &left_tuple, + ); + *fields.mat_iter = it + }); } - eliminate_from_tuple(Tuple(ret), &eliminate_indices) - }) - }) - .flatten_ok() - .map(flatten_err), - )) + } + } + } + } + } +} + +fn build_mat_range_iter<'a>( + mat: &'a BTreeSet>, + left_join_indices: &[usize], + left_tuple: &[DataValue], +) -> std::collections::btree_set::Range<'a, Vec> { + let prefix = left_join_indices + .iter() + .map(|i| left_tuple[*i].clone()) + .collect_vec(); + let mut prefix_end = prefix.clone(); + prefix_end.push(DataValue::Bot); + mat.range(prefix..prefix_end) +} + +impl<'a> Iterator for ThrowAwayIter<'a> { + type Item = Result; + + fn next(&mut self) -> Option { + swap_option_result(self.next_inner()) + } +} + +#[cfg(test)] +mod tests { + use serde_json::json; + + use crate::new_cozo_mem; + + #[test] + fn test_mat_join() { + let db = new_cozo_mem().unwrap(); + let res = db + .run_script( + r#" + data[a, b] <- [[1, 2], [1, 3], [2, 3]] + ?[x] := a = 3, data[x, a] + "#, + Default::default(), + ) + .unwrap() + .rows; + assert_eq!(res, vec![vec![json!(1)], vec![json!(2)]]) } } diff --git a/cozo-core/src/runtime/transact.rs b/cozo-core/src/runtime/transact.rs index 73525ff4..e72e1cf8 100644 --- a/cozo-core/src/runtime/transact.rs +++ b/cozo-core/src/runtime/transact.rs @@ -12,10 +12,8 @@ use std::sync::Arc; use miette::Result; use crate::data::program::MagicSymbol; -use crate::data::symb::Symbol; use crate::data::tuple::Tuple; use crate::data::value::DataValue; -use crate::parse::SourceSpan; use crate::runtime::in_mem::{InMemRelation, StoredRelationId}; use crate::runtime::relation::RelationId; use crate::storage::StoreTx; @@ -35,20 +33,6 @@ impl<'a> SessionTx<'a> { ret } - pub(crate) fn new_temp_store(&self, span: SourceSpan) -> InMemRelation { - let old_count = self.mem_store_id.fetch_add(1, Ordering::AcqRel); - let old_count = old_count & 0x00ff_ffffu32; - let ret = InMemRelation::new( - StoredRelationId(old_count), - MagicSymbol::Muggle { - inner: Symbol::new("", span), - }, - 0, - ); - ret.ensure_mem_db_for_epoch(0); - ret - } - pub(crate) fn load_last_relation_store_id(&self) -> Result { let tuple = Tuple(vec![DataValue::Null]); let t_encoded = tuple.encode_as_key(RelationId::SYSTEM);