optimize materialized join

main
Ziyang Hu 2 years ago
parent b1ea7240e1
commit ab667a6ac0

36
Cargo.lock generated

@ -2,6 +2,12 @@
# It is not intended for manual editing.
version = 3
[[package]]
name = "Inflector"
version = "0.11.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fe438c63458706e03479442743baae6c88256498e6431708f6dfc520a26515d3"
[[package]]
name = "addr2line"
version = "0.17.0"
@ -55,6 +61,12 @@ dependencies = [
"memchr",
]
[[package]]
name = "aliasable"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "250f629c0161ad8107cf89319e990051fae62832fd343083bea452d93e2205fd"
[[package]]
name = "alloc-no-stdlib"
version = "2.0.4"
@ -554,6 +566,7 @@ dependencies = [
"nalgebra",
"num-traits",
"ordered-float",
"ouroboros",
"pest",
"pest_derive",
"priority-queue",
@ -2128,6 +2141,29 @@ version = "6.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b7820b9daea5457c9f21c69448905d723fbd21136ccf521748f23fd49e723ee"
[[package]]
name = "ouroboros"
version = "0.15.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dfbb50b356159620db6ac971c6d5c9ab788c9cc38a6f49619fca2a27acb062ca"
dependencies = [
"aliasable",
"ouroboros_macro",
]
[[package]]
name = "ouroboros_macro"
version = "0.15.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4a0d9d1a6191c4f391f87219d1ea42b23f09ee84d64763cd05ee6ea88d9f384d"
dependencies = [
"Inflector",
"proc-macro-error",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "owo-colors"
version = "3.5.0"

@ -123,6 +123,7 @@ thiserror = "1.0.34"
uuid = { version = "1.1.2", features = ["v1", "v4", "serde"] }
csv = "1.1.6"
document-features = "0.2.6"
ouroboros = "0.15.5"
rayon = { version = "1.5.3", optional = true }
nalgebra = { version = "0.31.1", optional = true }
minreq = { version = "2.6.0", features = ["https-rustls"], optional = true }

@ -6,14 +6,19 @@
* You can obtain one at https://mozilla.org/MPL/2.0/.
*/
use std::borrow::Borrow;
use std::cell::RefCell;
use std::collections::{BTreeMap, BTreeSet};
use std::fmt::{Debug, Formatter};
use std::iter;
use std::rc::Rc;
use std::sync::atomic::{AtomicBool, Ordering};
use either::{Left, Right};
use itertools::Itertools;
use log::{debug, error};
use miette::{Diagnostic, Result};
use ouroboros::self_referencing;
use thiserror::Error;
use crate::data::expr::{compute_bounds, Expr};
@ -450,6 +455,7 @@ impl RelAlgebra {
joiner,
to_eliminate,
span,
..
} = *inner;
for filter in filters {
let f_bindings = filter.bindings();
@ -467,6 +473,8 @@ impl RelAlgebra {
joiner,
to_eliminate,
span,
mat_right_cache: RefCell::new(Default::default()),
cached: Default::default(),
}));
if !remaining.is_empty() {
joined = RelAlgebra::Filter(FilteredRA {
@ -512,6 +520,8 @@ impl RelAlgebra {
},
to_eliminate: Default::default(),
span,
mat_right_cache: RefCell::new(Default::default()),
cached: Default::default(),
}))
}
pub(crate) fn neg_join(
@ -1440,6 +1450,8 @@ pub(crate) struct InnerJoin {
pub(crate) joiner: Joiner,
pub(crate) to_eliminate: BTreeSet<Symbol>,
pub(crate) span: SourceSpan,
mat_right_cache: RefCell<Rc<BTreeSet<Vec<DataValue>>>>,
cached: AtomicBool,
}
impl InnerJoin {
@ -1612,43 +1624,144 @@ impl InnerJoin {
.sorted_by_key(|(_, b)| **b)
.map(|(a, _)| a)
.collect_vec();
let throwaway = tx.new_temp_store(SourceSpan(0, 0));
for item in self.right.iter(tx, epoch, use_delta)? {
match item {
Ok(tuple) => {
let stored_tuple = Tuple(
right_store_indices
let throwaway = if self.cached.load(Ordering::Relaxed) {
self.mat_right_cache.borrow().clone()
} else {
let mut cache = BTreeSet::new();
for item in self.right.iter(tx, epoch, use_delta)? {
match item {
Ok(tuple) => {
let stored_tuple = right_store_indices
.iter()
.map(|i| tuple.0[*i].clone())
.collect_vec(),
);
throwaway.put(stored_tuple, 0);
.collect_vec();
cache.insert(stored_tuple);
}
Err(e) => return Ok(Box::new([Err(e)].into_iter())),
}
Err(e) => return Ok(Box::new([Err(e)].into_iter())),
}
*self.mat_right_cache.borrow_mut() = Rc::new(cache);
self.cached.store(true, Ordering::Relaxed);
self.mat_right_cache.borrow().clone()
};
let mut left_iter = self.left.iter(tx, epoch, use_delta)?;
let left_cache = match left_iter.next() {
None => return Ok(Box::new(iter::empty())),
Some(Err(err)) => return Ok(Box::new(iter::once(Err(err)))),
Some(Ok(data)) => data.0,
};
let it = ThrowAwayIterBuilder {
eliminate_indices,
left: left_iter,
left_cache: left_cache.clone(),
left_join_indices: left_join_indices.clone(),
materialized: throwaway,
right_invert_indices,
mat_iter_builder: |mat: &Rc<BTreeSet<Vec<DataValue>>>| {
build_mat_range_iter(mat, &left_join_indices, &left_cache)
},
}
Ok(Box::new(
self.left
.iter(tx, epoch, use_delta)?
.map_ok(move |tuple| {
let eliminate_indices = eliminate_indices.clone();
let prefix = Tuple(
left_join_indices
.iter()
.map(|i| tuple.0[*i].clone())
.collect_vec(),
);
let restore_indices = right_invert_indices.clone();
throwaway.scan_prefix(&prefix).map_ok(move |found| {
let mut ret = tuple.0.clone();
for i in restore_indices.iter() {
ret.push(found.0[*i].clone());
.build();
Ok(Box::new(it))
}
}
#[self_referencing]
struct ThrowAwayIter<'a> {
materialized: Rc<BTreeSet<Vec<DataValue>>>,
eliminate_indices: BTreeSet<usize>,
left_join_indices: Vec<usize>,
right_invert_indices: Vec<usize>,
#[borrows(materialized)]
#[covariant]
mat_iter: std::collections::btree_set::Range<'this, Vec<DataValue>>,
left: TupleIter<'a>,
left_cache: Vec<DataValue>,
}
impl<'a> ThrowAwayIter<'a> {
fn next_inner(&mut self) -> Result<Option<Tuple>> {
loop {
let right_nxt: Option<&Vec<DataValue>> = self.with_mat_iter_mut(|it| it.next());
match right_nxt {
Some(data) => {
let data = data.clone();
let mut ret = self.borrow_left_cache().clone();
for i in self.borrow_right_invert_indices() {
ret.push(data[*i].clone());
}
let tuple = eliminate_from_tuple(Tuple(ret), self.borrow_eliminate_indices());
return Ok(Some(tuple));
}
None => {
let next_left = self.with_left_mut(|l| l.next());
match next_left {
None => return Ok(None),
Some(l) => {
let left_tuple = l?.0;
self.with_left_cache_mut(|cache| {
*cache = left_tuple.clone();
});
self.with_mut(|fields| {
let it = build_mat_range_iter(
fields.materialized.borrow(),
&fields.left_join_indices,
&left_tuple,
);
*fields.mat_iter = it
});
}
eliminate_from_tuple(Tuple(ret), &eliminate_indices)
})
})
.flatten_ok()
.map(flatten_err),
))
}
}
}
}
}
}
fn build_mat_range_iter<'a>(
mat: &'a BTreeSet<Vec<DataValue>>,
left_join_indices: &[usize],
left_tuple: &[DataValue],
) -> std::collections::btree_set::Range<'a, Vec<DataValue>> {
let prefix = left_join_indices
.iter()
.map(|i| left_tuple[*i].clone())
.collect_vec();
let mut prefix_end = prefix.clone();
prefix_end.push(DataValue::Bot);
mat.range(prefix..prefix_end)
}
impl<'a> Iterator for ThrowAwayIter<'a> {
type Item = Result<Tuple>;
fn next(&mut self) -> Option<Self::Item> {
swap_option_result(self.next_inner())
}
}
#[cfg(test)]
mod tests {
use serde_json::json;
use crate::new_cozo_mem;
#[test]
fn test_mat_join() {
let db = new_cozo_mem().unwrap();
let res = db
.run_script(
r#"
data[a, b] <- [[1, 2], [1, 3], [2, 3]]
?[x] := a = 3, data[x, a]
"#,
Default::default(),
)
.unwrap()
.rows;
assert_eq!(res, vec![vec![json!(1)], vec![json!(2)]])
}
}

@ -12,10 +12,8 @@ use std::sync::Arc;
use miette::Result;
use crate::data::program::MagicSymbol;
use crate::data::symb::Symbol;
use crate::data::tuple::Tuple;
use crate::data::value::DataValue;
use crate::parse::SourceSpan;
use crate::runtime::in_mem::{InMemRelation, StoredRelationId};
use crate::runtime::relation::RelationId;
use crate::storage::StoreTx;
@ -35,20 +33,6 @@ impl<'a> SessionTx<'a> {
ret
}
pub(crate) fn new_temp_store(&self, span: SourceSpan) -> InMemRelation {
let old_count = self.mem_store_id.fetch_add(1, Ordering::AcqRel);
let old_count = old_count & 0x00ff_ffffu32;
let ret = InMemRelation::new(
StoredRelationId(old_count),
MagicSymbol::Muggle {
inner: Symbol::new("", span),
},
0,
);
ret.ensure_mem_db_for_epoch(0);
ret
}
pub(crate) fn load_last_relation_store_id(&self) -> Result<RelationId> {
let tuple = Tuple(vec![DataValue::Null]);
let t_encoded = tuple.encode_as_key(RelationId::SYSTEM);

Loading…
Cancel
Save