outer merge join

main
Ziyang Hu 2 years ago
parent 797a8ebac5
commit 612a6caeb2

@ -19,6 +19,16 @@ pub enum MegaTupleIt<'a> {
KeySortedWithAssocIt { main: Box<MegaTupleIt<'a>>, associates: Vec<(u32, IteratorPtr<'a>)> },
CartesianProdIt { left: Box<MegaTupleIt<'a>>, right: Box<MegaTupleIt<'a>> },
MergeJoinIt { left: Box<MegaTupleIt<'a>>, right: Box<MegaTupleIt<'a>>, left_keys: Vec<(TableId, ColId)>, right_keys: Vec<(TableId, ColId)> },
OuterMergeJoinIt {
left: Box<MegaTupleIt<'a>>,
right: Box<MegaTupleIt<'a>>,
left_keys: Vec<(TableId, ColId)>,
right_keys: Vec<(TableId, ColId)>,
left_outer: bool,
right_outer: bool,
left_len: (usize, usize),
right_len: (usize, usize),
},
KeyedUnionIt { left: Box<MegaTupleIt<'a>>, right: Box<MegaTupleIt<'a>> },
KeyedDifferenceIt { left: Box<MegaTupleIt<'a>>, right: Box<MegaTupleIt<'a>> },
FilterIt { it: Box<MegaTupleIt<'a>>, filter: Value<'a> },
@ -103,6 +113,26 @@ impl<'a> MegaTupleIt<'a> {
right_keys,
})
}
MegaTupleIt::OuterMergeJoinIt {
left, right,
left_keys, right_keys, left_outer, right_outer,
left_len, right_len
} => {
Box::new(OuterMergeJoinIterator {
left: left.iter(),
right: right.iter(),
left_outer: *left_outer,
right_outer: *right_outer,
left_keys,
right_keys,
left_len: *left_len,
right_len: *right_len,
left_cache: None,
right_cache: None,
pull_left: true,
pull_right: true,
})
}
MegaTupleIt::KeyedUnionIt { left, right } => {
Box::new(KeyedUnionIterator {
left: left.iter(),
@ -114,7 +144,7 @@ impl<'a> MegaTupleIt<'a> {
left: left.iter(),
right: right.iter(),
right_cache: None,
started: false
started: false,
})
}
MegaTupleIt::BagsUnionIt { bags } => {
@ -212,7 +242,7 @@ impl<'a> Iterator for KeyedDifferenceIterator<'a> {
let right = match &self.right_cache {
None => {
// right is exhausted, so all left ones can be returned
return Some(Ok(left_cache))
return Some(Ok(left_cache));
}
Some(r) => r
};
@ -459,6 +489,145 @@ impl<'a> Iterator for KeySortedWithAssocIterator<'a> {
}
}
pub struct OuterMergeJoinIterator<'a> {
left: Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>,
right: Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>,
left_outer: bool,
right_outer: bool,
left_keys: &'a [(TableId, ColId)],
right_keys: &'a [(TableId, ColId)],
left_len: (usize, usize),
right_len: (usize, usize),
left_cache: Option<MegaTuple>,
right_cache: Option<MegaTuple>,
pull_left: bool,
pull_right: bool,
}
impl<'a> Iterator for OuterMergeJoinIterator<'a> {
type Item = Result<MegaTuple>;
fn next(&mut self) -> Option<Self::Item> {
if self.pull_left {
self.left_cache = match self.left.next() {
None => None,
Some(Err(e)) => return Some(Err(e)),
Some(Ok(t)) => Some(t)
};
self.pull_left = false;
}
if self.pull_right {
self.right_cache = match self.right.next() {
None => None,
Some(Err(e)) => return Some(Err(e)),
Some(Ok(t)) => Some(t)
};
self.pull_right = false;
}
let make_empty_tuple = |is_left: bool| -> MegaTuple {
let lengths = if is_left { self.left_len } else { self.right_len };
let keys = iter::repeat_with(|| OwnTuple::empty_tuple().into()).take(lengths.0).collect();
let vals = iter::repeat_with(|| OwnTuple::empty_tuple().into()).take(lengths.1).collect();
MegaTuple { keys, vals }
};
loop {
let left_cache = match &self.left_cache {
None => {
return match &self.right_cache {
None => None,
Some(_) => {
if self.right_outer {
self.pull_right = true;
let mut tuple = make_empty_tuple(true);
let right = self.right_cache.take().unwrap();
tuple.extend(right);
Some(Ok(tuple))
} else {
None
}
}
};
}
Some(t) => t
};
let right_cache = match &self.right_cache {
None => {
return match &self.left_cache {
None => None,
Some(_) => {
if self.left_outer {
self.pull_left = true;
let tuple = make_empty_tuple(false);
let mut left = self.right_cache.take().unwrap();
left.extend(tuple);
Some(Ok(left))
} else {
None
}
}
};
}
Some(t) => t
};
let cmp_res = match compare_tuple_by_keys((&left_cache, self.left_keys),
(&right_cache, self.right_keys)) {
Ok(r) => r,
Err(e) => return Some(Err(e))
};
match cmp_res {
Ordering::Equal => {
// Both are present
self.pull_left = true;
self.pull_right = true;
let mut left = self.left_cache.take().unwrap();
let right = self.right_cache.take().unwrap();
left.extend(right);
return Some(Ok(left));
}
Ordering::Less => {
// Advance the left one
if self.left_outer {
self.pull_left = true;
let right = make_empty_tuple(false);
let mut left = self.left_cache.take().unwrap();
left.extend(right);
return Some(Ok(left));
} else {
match self.left.next() {
None => return None,
Some(Err(e)) => return Some(Err(e)),
Some(Ok(t)) => {
self.left_cache = Some(t);
}
};
}
}
Ordering::Greater => {
// Advance the right one
if self.right_outer {
self.pull_right = true;
let mut left = make_empty_tuple(true);
let right = self.right_cache.take().unwrap();
left.extend(right);
return Some(Ok(left));
} else {
match self.right.next() {
None => return None,
Some(Err(e)) => return Some(Err(e)),
Some(Ok(t)) => {
self.right_cache = Some(t);
}
};
}
}
}
}
}
}
pub struct MergeJoinIterator<'a> {
left: Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>,
right: Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>,

@ -456,6 +456,10 @@ impl<'a, T: AsRef<[u8]>> Iterator for TupleIter<'a, T> {
}
impl OwnTuple {
#[inline]
pub fn empty_tuple() -> OwnTuple {
OwnTuple::with_data_prefix(DataKind::Empty)
}
#[inline]
pub fn with_null_prefix() -> Self {
Tuple::with_prefix(0)

Loading…
Cancel
Save