optimize materialized join further

main
Ziyang Hu 2 years ago
parent ab667a6ac0
commit 01b0cb4e2b

36
Cargo.lock generated

@ -2,12 +2,6 @@
# It is not intended for manual editing.
version = 3
[[package]]
name = "Inflector"
version = "0.11.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fe438c63458706e03479442743baae6c88256498e6431708f6dfc520a26515d3"
[[package]]
name = "addr2line"
version = "0.17.0"
@ -61,12 +55,6 @@ dependencies = [
"memchr",
]
[[package]]
name = "aliasable"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "250f629c0161ad8107cf89319e990051fae62832fd343083bea452d93e2205fd"
[[package]]
name = "alloc-no-stdlib"
version = "2.0.4"
@ -566,7 +554,6 @@ dependencies = [
"nalgebra",
"num-traits",
"ordered-float",
"ouroboros",
"pest",
"pest_derive",
"priority-queue",
@ -2141,29 +2128,6 @@ version = "6.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b7820b9daea5457c9f21c69448905d723fbd21136ccf521748f23fd49e723ee"
[[package]]
name = "ouroboros"
version = "0.15.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dfbb50b356159620db6ac971c6d5c9ab788c9cc38a6f49619fca2a27acb062ca"
dependencies = [
"aliasable",
"ouroboros_macro",
]
[[package]]
name = "ouroboros_macro"
version = "0.15.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4a0d9d1a6191c4f391f87219d1ea42b23f09ee84d64763cd05ee6ea88d9f384d"
dependencies = [
"Inflector",
"proc-macro-error",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "owo-colors"
version = "3.5.0"

@ -123,7 +123,6 @@ thiserror = "1.0.34"
uuid = { version = "1.1.2", features = ["v1", "v4", "serde"] }
csv = "1.1.6"
document-features = "0.2.6"
ouroboros = "0.15.5"
rayon = { version = "1.5.3", optional = true }
nalgebra = { version = "0.31.1", optional = true }
minreq = { version = "2.6.0", features = ["https-rustls"], optional = true }

@ -6,7 +6,6 @@
* You can obtain one at https://mozilla.org/MPL/2.0/.
*/
use std::borrow::Borrow;
use std::cell::RefCell;
use std::collections::{BTreeMap, BTreeSet};
use std::fmt::{Debug, Formatter};
@ -18,7 +17,6 @@ use either::{Left, Right};
use itertools::Itertools;
use log::{debug, error};
use miette::{Diagnostic, Result};
use ouroboros::self_referencing;
use thiserror::Error;
use crate::data::expr::{compute_bounds, Expr};
@ -1450,7 +1448,7 @@ pub(crate) struct InnerJoin {
pub(crate) joiner: Joiner,
pub(crate) to_eliminate: BTreeSet<Symbol>,
pub(crate) span: SourceSpan,
mat_right_cache: RefCell<Rc<BTreeSet<Vec<DataValue>>>>,
mat_right_cache: RefCell<Rc<Vec<Vec<DataValue>>>>,
cached: AtomicBool,
}
@ -1611,6 +1609,14 @@ impl InnerJoin {
.joiner
.join_indices(&self.left.bindings_after_eliminate(), &right_bindings)
.unwrap();
let mut left_iter = self.left.iter(tx, epoch, use_delta)?;
let left_cache = match left_iter.next() {
None => return Ok(Box::new(iter::empty())),
Some(Err(err)) => return Err(err),
Some(Ok(data)) => data.0,
};
let right_join_indices_set = BTreeSet::from_iter(right_join_indices.iter().cloned());
let mut right_store_indices = right_join_indices;
for i in 0..right_bindings.len() {
@ -1618,13 +1624,14 @@ impl InnerJoin {
right_store_indices.push(i)
}
}
let right_invert_indices = right_store_indices
.iter()
.enumerate()
.sorted_by_key(|(_, b)| **b)
.map(|(a, _)| a)
.collect_vec();
let throwaway = if self.cached.load(Ordering::Relaxed) {
let cached_data = if self.cached.load(Ordering::Relaxed) {
self.mat_right_cache.borrow().clone()
} else {
let mut cache = BTreeSet::new();
@ -1637,82 +1644,85 @@ impl InnerJoin {
.collect_vec();
cache.insert(stored_tuple);
}
Err(e) => return Ok(Box::new([Err(e)].into_iter())),
Err(e) => return Err(e),
}
}
let cache = cache.into_iter().collect_vec();
*self.mat_right_cache.borrow_mut() = Rc::new(cache);
self.cached.store(true, Ordering::Relaxed);
self.mat_right_cache.borrow().clone()
};
let mut left_iter = self.left.iter(tx, epoch, use_delta)?;
let left_cache = match left_iter.next() {
None => return Ok(Box::new(iter::empty())),
Some(Err(err)) => return Ok(Box::new(iter::once(Err(err)))),
Some(Ok(data)) => data.0,
};
let it = ThrowAwayIterBuilder {
let (prefix, right_idx) =
build_mat_range_iter(&cached_data, &left_join_indices, &left_cache);
let it = CachedMaterializedIterator {
eliminate_indices,
left: left_iter,
left_cache: left_cache.clone(),
left_join_indices: left_join_indices.clone(),
materialized: throwaway,
left_cache,
left_join_indices,
materialized: cached_data,
right_invert_indices,
mat_iter_builder: |mat: &Rc<BTreeSet<Vec<DataValue>>>| {
build_mat_range_iter(mat, &left_join_indices, &left_cache)
},
}
.build();
right_idx,
prefix,
};
Ok(Box::new(it))
}
}
#[self_referencing]
struct ThrowAwayIter<'a> {
materialized: Rc<BTreeSet<Vec<DataValue>>>,
struct CachedMaterializedIterator<'a> {
materialized: Rc<Vec<Vec<DataValue>>>,
eliminate_indices: BTreeSet<usize>,
left_join_indices: Vec<usize>,
right_invert_indices: Vec<usize>,
#[borrows(materialized)]
#[covariant]
mat_iter: std::collections::btree_set::Range<'this, Vec<DataValue>>,
right_idx: usize,
prefix: Vec<DataValue>,
left: TupleIter<'a>,
left_cache: Vec<DataValue>,
}
impl<'a> ThrowAwayIter<'a> {
impl<'a> CachedMaterializedIterator<'a> {
fn advance_right(&mut self) -> Option<&Vec<DataValue>> {
if self.right_idx == self.materialized.len() {
None
} else {
let ret = &self.materialized[self.right_idx];
if ret.starts_with(&self.prefix) {
self.right_idx += 1;
Some(ret)
} else {
None
}
}
}
fn next_inner(&mut self) -> Result<Option<Tuple>> {
loop {
let right_nxt: Option<&Vec<DataValue>> = self.with_mat_iter_mut(|it| it.next());
let right_nxt = self.advance_right();
match right_nxt {
Some(data) => {
let data = data.clone();
let mut ret = self.borrow_left_cache().clone();
for i in self.borrow_right_invert_indices() {
let mut ret = self.left_cache.clone();
for i in &self.right_invert_indices {
ret.push(data[*i].clone());
}
let tuple = eliminate_from_tuple(Tuple(ret), self.borrow_eliminate_indices());
let tuple = eliminate_from_tuple(Tuple(ret), &self.eliminate_indices);
return Ok(Some(tuple));
}
None => {
let next_left = self.with_left_mut(|l| l.next());
let next_left = self.left.next();
match next_left {
None => return Ok(None),
Some(l) => {
let left_tuple = l?.0;
self.with_left_cache_mut(|cache| {
*cache = left_tuple.clone();
});
self.with_mut(|fields| {
let it = build_mat_range_iter(
fields.materialized.borrow(),
&fields.left_join_indices,
&left_tuple,
);
*fields.mat_iter = it
});
let (prefix, idx) = build_mat_range_iter(
&self.materialized,
&self.left_join_indices,
&left_tuple,
);
self.left_cache = left_tuple;
self.right_idx = idx;
self.prefix = prefix;
}
}
}
@ -1721,21 +1731,23 @@ impl<'a> ThrowAwayIter<'a> {
}
}
fn build_mat_range_iter<'a>(
mat: &'a BTreeSet<Vec<DataValue>>,
fn build_mat_range_iter(
mat: &[Vec<DataValue>],
left_join_indices: &[usize],
left_tuple: &[DataValue],
) -> std::collections::btree_set::Range<'a, Vec<DataValue>> {
) -> (Vec<DataValue>, usize) {
let prefix = left_join_indices
.iter()
.map(|i| left_tuple[*i].clone())
.collect_vec();
let mut prefix_end = prefix.clone();
prefix_end.push(DataValue::Bot);
mat.range(prefix..prefix_end)
let idx = match mat.binary_search(&prefix) {
Ok(i) => i,
Err(i) => i,
};
(prefix, idx)
}
impl<'a> Iterator for ThrowAwayIter<'a> {
impl<'a> Iterator for CachedMaterializedIterator<'a> {
type Item = Result<Tuple>;
fn next(&mut self) -> Option<Self::Item> {

Loading…
Cancel
Save