merge join

main
Ziyang Hu 2 years ago
parent 50e3eebfa3
commit b201e79029

@ -4,7 +4,7 @@ use std::collections::{BTreeMap, BTreeSet};
use crate::db::cnf_transform::{cnf_transform, extract_tables};
use crate::db::engine::{Session};
use crate::db::plan::AccessorMap;
use crate::db::table::TableId;
use crate::db::table::{ColId, TableId};
use crate::relation::value::{Value};
use crate::error::{CozoError, Result};
use crate::error::CozoError::{InvalidArgument, LogicError};
@ -13,6 +13,19 @@ use crate::relation::table::MegaTuple;
use crate::relation::value;
pub fn extract_table_ref<'a>(tuples: &'a MegaTuple, tid: &TableId, cid: &ColId) -> Result<Value<'a>> {
let targets = if cid.is_key { &tuples.keys } else { &tuples.vals };
let target = targets.get(tid.id as usize).ok_or_else(|| {
LogicError("Tuple ref out of bound".to_string())
})?;
if matches!(target.data_kind(), Ok(DataKind::Empty)) {
Ok(Value::Null)
} else {
target.get(cid.id as usize)
.ok_or_else(|| LogicError("Tuple ref out of bound".to_string()))
}
}
pub fn tuple_eval<'a>(value: &'a Value<'a>, tuples: &'a MegaTuple) -> Result<Value<'a>> {
let res: Value = match value {
v @ (Value::Null |
@ -36,16 +49,17 @@ pub fn tuple_eval<'a>(value: &'a Value<'a>, tuples: &'a MegaTuple) -> Result<Val
return Err(LogicError(format!("Cannot resolve variable {}", v)));
}
Value::TupleRef(tid, cid) => {
let targets = if cid.is_key { &tuples.keys } else { &tuples.vals };
let target = targets.get(tid.id as usize).ok_or_else(|| {
LogicError("Tuple ref out of bound".to_string())
})?;
if matches!(target.data_kind(), Ok(DataKind::Empty)) {
Value::Null
} else {
target.get(cid.id as usize)
.ok_or_else(|| LogicError("Tuple ref out of bound".to_string()))?
}
extract_table_ref(tuples, tid, cid)?
// let targets = if cid.is_key { &tuples.keys } else { &tuples.vals };
// let target = targets.get(tid.id as usize).ok_or_else(|| {
// LogicError("Tuple ref out of bound".to_string())
// })?;
// if matches!(target.data_kind(), Ok(DataKind::Empty)) {
// Value::Null
// } else {
// target.get(cid.id as usize)
// .ok_or_else(|| LogicError("Tuple ref out of bound".to_string()))?
// }
}
Value::Apply(op, args) => {
match op.as_ref() {

@ -4,7 +4,7 @@ use std::cmp::Ordering;
use pest::iterators::Pair;
use cozorocks::{IteratorPtr};
use crate::db::engine::Session;
use crate::db::eval::tuple_eval;
use crate::db::eval::{extract_table_ref, tuple_eval};
use crate::db::query::{FromEl, Selection};
use crate::db::table::{ColId, TableId, TableInfo};
use crate::error::CozoError::LogicError;
@ -162,6 +162,8 @@ pub enum MegaTupleIt<'a> {
KeySortedWithAssocIt { main: Box<MegaTupleIt<'a>>, associates: Vec<(u32, IteratorPtr<'a>)> },
CartesianProdIt { left: Box<MegaTupleIt<'a>>, right: Box<MegaTupleIt<'a>> },
MergeJoinIt { left: Box<MegaTupleIt<'a>>, right: Box<MegaTupleIt<'a>>, left_keys: Vec<(TableId, ColId)>, right_keys: Vec<(TableId, ColId)> },
KeyedUnionIt { left: Box<MegaTupleIt<'a>>, right: Box<MegaTupleIt<'a>> },
KeyedDifferenceIt { left: Box<MegaTupleIt<'a>>, right: Box<MegaTupleIt<'a>> },
FilterIt { it: Box<MegaTupleIt<'a>>, filter: Value<'a> },
EvalIt { it: Box<MegaTupleIt<'a>>, keys: Vec<Value<'a>>, vals: Vec<Value<'a>>, prefix: u32 },
}
@ -221,7 +223,6 @@ impl<'a> MegaTupleIt<'a> {
right: Box::new(right.as_ref().iter()),
})
}
MegaTupleIt::MergeJoinIt { .. } => todo!(),
MegaTupleIt::FilterIt { it, filter } => {
Box::new(FilterIterator {
it: Box::new(it.iter()),
@ -236,6 +237,23 @@ impl<'a> MegaTupleIt<'a> {
prefix: *prefix,
})
}
MegaTupleIt::MergeJoinIt { left, right, left_keys, right_keys } => {
Box::new(MergeJoinIterator {
left: Box::new(left.iter()),
right: Box::new(right.iter()),
left_keys,
right_keys,
left_cache: None,
right_cache: None,
started: false,
})
}
MegaTupleIt::KeyedUnionIt { .. } => {
todo!()
}
MegaTupleIt::KeyedDifferenceIt { .. } => {
todo!()
}
}
}
}
@ -425,6 +443,93 @@ impl<'a> Iterator for KeySortedWithAssocIterator<'a> {
}
}
pub struct MergeJoinIterator<'a> {
left: Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>,
right: Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>,
left_keys: &'a [(TableId, ColId)],
right_keys: &'a [(TableId, ColId)],
left_cache: Option<MegaTuple>,
right_cache: Option<MegaTuple>,
started: bool,
}
impl<'a> Iterator for MergeJoinIterator<'a> {
type Item = Result<MegaTuple>;
fn next(&mut self) -> Option<Self::Item> {
let mut left_cache;
let mut right_cache;
let mut left_val_cache;
let mut right_val_cache;
match self.left.next() {
None => return None,
Some(Err(e)) => return Some(Err(e)),
Some(Ok(t)) => {
left_cache = t;
left_val_cache = match self.left_keys.iter().map(|(tid, cid)|
// to_static() here is to convince the borrow checker. It is really unnecessary.
// fortunately the val_cache is only for keys, which presumably isn't too big
extract_table_ref(&left_cache, tid, cid).map(|v| v.to_static())).collect::<Result<Vec<_>>>() {
Err(e) => return Some(Err(e)),
Ok(t) => t
};
}
};
match self.right.next() {
None => return None,
Some(Err(e)) => return Some(Err(e)),
Some(Ok(t)) => {
right_cache = t;
right_val_cache = match self.right_keys.iter().map(|(tid, cid)|
extract_table_ref(&right_cache, tid, cid).map(|v| v.to_static())).collect::<Result<Vec<_>>>() {
Err(e) => return Some(Err(e)),
Ok(t) => t
};
}
};
loop {
match left_val_cache.cmp(&right_val_cache) {
Ordering::Equal => {
left_cache.extend(right_cache);
return Some(Ok(left_cache));
}
Ordering::Less => {
// Advance the left one
match self.left.next() {
None => return None,
Some(Err(e)) => return Some(Err(e)),
Some(Ok(t)) => {
left_cache = t;
left_val_cache = match self.left_keys.iter().map(|(tid, cid)|
extract_table_ref(&left_cache, tid, cid).map(|v| v.to_static())).collect::<Result<Vec<_>>>() {
Err(e) => return Some(Err(e)),
Ok(t) => t
};
}
};
}
Ordering::Greater => {
// Advance the right one
match self.right.next() {
None => return None,
Some(Err(e)) => return Some(Err(e)),
Some(Ok(t)) => {
right_cache = t;
right_val_cache = match self.right_keys.iter().map(|(tid, cid)|
extract_table_ref(&right_cache, tid, cid).map(|v| v.to_static())).collect::<Result<Vec<_>>>() {
Err(e) => return Some(Err(e)),
Ok(t) => t
};
}
};
}
}
}
}
}
pub struct CartesianProdIterator<'a> {
left: Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>,
left_cache: MegaTuple,

@ -53,4 +53,8 @@ impl MegaTuple {
pub fn is_empty(&self) -> bool {
self.keys.is_empty()
}
pub fn extend(&mut self, other: Self) {
self.keys.extend(other.keys);
self.vals.extend(other.vals);
}
}
Loading…
Cancel
Save