From b201e79029054395d4a2debd33f5bb7e58018344 Mon Sep 17 00:00:00 2001 From: Ziyang Hu Date: Tue, 10 May 2022 16:31:00 +0800 Subject: [PATCH] merge join --- src/db/eval.rs | 36 +++++++++----- src/db/plan.rs | 109 +++++++++++++++++++++++++++++++++++++++++- src/relation/table.rs | 4 ++ 3 files changed, 136 insertions(+), 13 deletions(-) diff --git a/src/db/eval.rs b/src/db/eval.rs index 32bcfb1b..a39e6d08 100644 --- a/src/db/eval.rs +++ b/src/db/eval.rs @@ -4,7 +4,7 @@ use std::collections::{BTreeMap, BTreeSet}; use crate::db::cnf_transform::{cnf_transform, extract_tables}; use crate::db::engine::{Session}; use crate::db::plan::AccessorMap; -use crate::db::table::TableId; +use crate::db::table::{ColId, TableId}; use crate::relation::value::{Value}; use crate::error::{CozoError, Result}; use crate::error::CozoError::{InvalidArgument, LogicError}; @@ -13,6 +13,19 @@ use crate::relation::table::MegaTuple; use crate::relation::value; +pub fn extract_table_ref<'a>(tuples: &'a MegaTuple, tid: &TableId, cid: &ColId) -> Result> { + let targets = if cid.is_key { &tuples.keys } else { &tuples.vals }; + let target = targets.get(tid.id as usize).ok_or_else(|| { + LogicError("Tuple ref out of bound".to_string()) + })?; + if matches!(target.data_kind(), Ok(DataKind::Empty)) { + Ok(Value::Null) + } else { + target.get(cid.id as usize) + .ok_or_else(|| LogicError("Tuple ref out of bound".to_string())) + } +} + pub fn tuple_eval<'a>(value: &'a Value<'a>, tuples: &'a MegaTuple) -> Result> { let res: Value = match value { v @ (Value::Null | @@ -36,16 +49,17 @@ pub fn tuple_eval<'a>(value: &'a Value<'a>, tuples: &'a MegaTuple) -> Result { - let targets = if cid.is_key { &tuples.keys } else { &tuples.vals }; - let target = targets.get(tid.id as usize).ok_or_else(|| { - LogicError("Tuple ref out of bound".to_string()) - })?; - if matches!(target.data_kind(), Ok(DataKind::Empty)) { - Value::Null - } else { - target.get(cid.id as usize) - .ok_or_else(|| LogicError("Tuple ref out of bound".to_string()))? - } + extract_table_ref(tuples, tid, cid)? + // let targets = if cid.is_key { &tuples.keys } else { &tuples.vals }; + // let target = targets.get(tid.id as usize).ok_or_else(|| { + // LogicError("Tuple ref out of bound".to_string()) + // })?; + // if matches!(target.data_kind(), Ok(DataKind::Empty)) { + // Value::Null + // } else { + // target.get(cid.id as usize) + // .ok_or_else(|| LogicError("Tuple ref out of bound".to_string()))? + // } } Value::Apply(op, args) => { match op.as_ref() { diff --git a/src/db/plan.rs b/src/db/plan.rs index 35ac90d3..d7905826 100644 --- a/src/db/plan.rs +++ b/src/db/plan.rs @@ -4,7 +4,7 @@ use std::cmp::Ordering; use pest::iterators::Pair; use cozorocks::{IteratorPtr}; use crate::db::engine::Session; -use crate::db::eval::tuple_eval; +use crate::db::eval::{extract_table_ref, tuple_eval}; use crate::db::query::{FromEl, Selection}; use crate::db::table::{ColId, TableId, TableInfo}; use crate::error::CozoError::LogicError; @@ -162,6 +162,8 @@ pub enum MegaTupleIt<'a> { KeySortedWithAssocIt { main: Box>, associates: Vec<(u32, IteratorPtr<'a>)> }, CartesianProdIt { left: Box>, right: Box> }, MergeJoinIt { left: Box>, right: Box>, left_keys: Vec<(TableId, ColId)>, right_keys: Vec<(TableId, ColId)> }, + KeyedUnionIt { left: Box>, right: Box> }, + KeyedDifferenceIt { left: Box>, right: Box> }, FilterIt { it: Box>, filter: Value<'a> }, EvalIt { it: Box>, keys: Vec>, vals: Vec>, prefix: u32 }, } @@ -221,7 +223,6 @@ impl<'a> MegaTupleIt<'a> { right: Box::new(right.as_ref().iter()), }) } - MegaTupleIt::MergeJoinIt { .. } => todo!(), MegaTupleIt::FilterIt { it, filter } => { Box::new(FilterIterator { it: Box::new(it.iter()), @@ -236,6 +237,23 @@ impl<'a> MegaTupleIt<'a> { prefix: *prefix, }) } + MegaTupleIt::MergeJoinIt { left, right, left_keys, right_keys } => { + Box::new(MergeJoinIterator { + left: Box::new(left.iter()), + right: Box::new(right.iter()), + left_keys, + right_keys, + left_cache: None, + right_cache: None, + started: false, + }) + } + MegaTupleIt::KeyedUnionIt { .. } => { + todo!() + } + MegaTupleIt::KeyedDifferenceIt { .. } => { + todo!() + } } } } @@ -425,6 +443,93 @@ impl<'a> Iterator for KeySortedWithAssocIterator<'a> { } } +pub struct MergeJoinIterator<'a> { + left: Box> + 'a>, + right: Box> + 'a>, + left_keys: &'a [(TableId, ColId)], + right_keys: &'a [(TableId, ColId)], + left_cache: Option, + right_cache: Option, + started: bool, +} + +impl<'a> Iterator for MergeJoinIterator<'a> { + type Item = Result; + + fn next(&mut self) -> Option { + let mut left_cache; + let mut right_cache; + let mut left_val_cache; + let mut right_val_cache; + + match self.left.next() { + None => return None, + Some(Err(e)) => return Some(Err(e)), + Some(Ok(t)) => { + left_cache = t; + left_val_cache = match self.left_keys.iter().map(|(tid, cid)| + // to_static() here is to convince the borrow checker. It is really unnecessary. + // fortunately the val_cache is only for keys, which presumably isn't too big + extract_table_ref(&left_cache, tid, cid).map(|v| v.to_static())).collect::>>() { + Err(e) => return Some(Err(e)), + Ok(t) => t + }; + } + }; + match self.right.next() { + None => return None, + Some(Err(e)) => return Some(Err(e)), + Some(Ok(t)) => { + right_cache = t; + right_val_cache = match self.right_keys.iter().map(|(tid, cid)| + extract_table_ref(&right_cache, tid, cid).map(|v| v.to_static())).collect::>>() { + Err(e) => return Some(Err(e)), + Ok(t) => t + }; + } + }; + + loop { + match left_val_cache.cmp(&right_val_cache) { + Ordering::Equal => { + left_cache.extend(right_cache); + return Some(Ok(left_cache)); + } + Ordering::Less => { + // Advance the left one + match self.left.next() { + None => return None, + Some(Err(e)) => return Some(Err(e)), + Some(Ok(t)) => { + left_cache = t; + left_val_cache = match self.left_keys.iter().map(|(tid, cid)| + extract_table_ref(&left_cache, tid, cid).map(|v| v.to_static())).collect::>>() { + Err(e) => return Some(Err(e)), + Ok(t) => t + }; + } + }; + } + Ordering::Greater => { + // Advance the right one + match self.right.next() { + None => return None, + Some(Err(e)) => return Some(Err(e)), + Some(Ok(t)) => { + right_cache = t; + right_val_cache = match self.right_keys.iter().map(|(tid, cid)| + extract_table_ref(&right_cache, tid, cid).map(|v| v.to_static())).collect::>>() { + Err(e) => return Some(Err(e)), + Ok(t) => t + }; + } + }; + } + } + } + } +} + pub struct CartesianProdIterator<'a> { left: Box> + 'a>, left_cache: MegaTuple, diff --git a/src/relation/table.rs b/src/relation/table.rs index 4f3d0457..a4ade769 100644 --- a/src/relation/table.rs +++ b/src/relation/table.rs @@ -53,4 +53,8 @@ impl MegaTuple { pub fn is_empty(&self) -> bool { self.keys.is_empty() } + pub fn extend(&mut self, other: Self) { + self.keys.extend(other.keys); + self.vals.extend(other.vals); + } } \ No newline at end of file