|
|
@ -4,7 +4,7 @@ use std::cmp::Ordering;
|
|
|
|
use pest::iterators::Pair;
|
|
|
|
use pest::iterators::Pair;
|
|
|
|
use cozorocks::{IteratorPtr};
|
|
|
|
use cozorocks::{IteratorPtr};
|
|
|
|
use crate::db::engine::Session;
|
|
|
|
use crate::db::engine::Session;
|
|
|
|
use crate::db::eval::{extract_table_ref, tuple_eval};
|
|
|
|
use crate::db::eval::{compare_tuple_by_keys, extract_table_ref, tuple_eval};
|
|
|
|
use crate::db::query::{FromEl, Selection};
|
|
|
|
use crate::db::query::{FromEl, Selection};
|
|
|
|
use crate::db::table::{ColId, TableId, TableInfo};
|
|
|
|
use crate::db::table::{ColId, TableId, TableInfo};
|
|
|
|
use crate::error::CozoError::LogicError;
|
|
|
|
use crate::error::CozoError::LogicError;
|
|
|
@ -243,9 +243,6 @@ impl<'a> MegaTupleIt<'a> {
|
|
|
|
right: Box::new(right.iter()),
|
|
|
|
right: Box::new(right.iter()),
|
|
|
|
left_keys,
|
|
|
|
left_keys,
|
|
|
|
right_keys,
|
|
|
|
right_keys,
|
|
|
|
left_cache: None,
|
|
|
|
|
|
|
|
right_cache: None,
|
|
|
|
|
|
|
|
started: false,
|
|
|
|
|
|
|
|
})
|
|
|
|
})
|
|
|
|
}
|
|
|
|
}
|
|
|
|
MegaTupleIt::KeyedUnionIt { .. } => {
|
|
|
|
MegaTupleIt::KeyedUnionIt { .. } => {
|
|
|
@ -448,49 +445,31 @@ pub struct MergeJoinIterator<'a> {
|
|
|
|
right: Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>,
|
|
|
|
right: Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>,
|
|
|
|
left_keys: &'a [(TableId, ColId)],
|
|
|
|
left_keys: &'a [(TableId, ColId)],
|
|
|
|
right_keys: &'a [(TableId, ColId)],
|
|
|
|
right_keys: &'a [(TableId, ColId)],
|
|
|
|
left_cache: Option<MegaTuple>,
|
|
|
|
|
|
|
|
right_cache: Option<MegaTuple>,
|
|
|
|
|
|
|
|
started: bool,
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
impl<'a> Iterator for MergeJoinIterator<'a> {
|
|
|
|
impl<'a> Iterator for MergeJoinIterator<'a> {
|
|
|
|
type Item = Result<MegaTuple>;
|
|
|
|
type Item = Result<MegaTuple>;
|
|
|
|
|
|
|
|
|
|
|
|
fn next(&mut self) -> Option<Self::Item> {
|
|
|
|
fn next(&mut self) -> Option<Self::Item> {
|
|
|
|
let mut left_cache;
|
|
|
|
let mut left_cache = match self.left.next() {
|
|
|
|
let mut right_cache;
|
|
|
|
|
|
|
|
let mut left_val_cache;
|
|
|
|
|
|
|
|
let mut right_val_cache;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
match self.left.next() {
|
|
|
|
|
|
|
|
None => return None,
|
|
|
|
None => return None,
|
|
|
|
Some(Err(e)) => return Some(Err(e)),
|
|
|
|
Some(Err(e)) => return Some(Err(e)),
|
|
|
|
Some(Ok(t)) => {
|
|
|
|
Some(Ok(t)) => t
|
|
|
|
left_cache = t;
|
|
|
|
|
|
|
|
left_val_cache = match self.left_keys.iter().map(|(tid, cid)|
|
|
|
|
|
|
|
|
// to_static() here is to convince the borrow checker. It is really unnecessary.
|
|
|
|
|
|
|
|
// fortunately the val_cache is only for keys, which presumably isn't too big
|
|
|
|
|
|
|
|
extract_table_ref(&left_cache, tid, cid).map(|v| v.to_static())).collect::<Result<Vec<_>>>() {
|
|
|
|
|
|
|
|
Err(e) => return Some(Err(e)),
|
|
|
|
|
|
|
|
Ok(t) => t
|
|
|
|
|
|
|
|
};
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
};
|
|
|
|
};
|
|
|
|
match self.right.next() {
|
|
|
|
|
|
|
|
|
|
|
|
let mut right_cache = match self.right.next() {
|
|
|
|
None => return None,
|
|
|
|
None => return None,
|
|
|
|
Some(Err(e)) => return Some(Err(e)),
|
|
|
|
Some(Err(e)) => return Some(Err(e)),
|
|
|
|
Some(Ok(t)) => {
|
|
|
|
Some(Ok(t)) => t
|
|
|
|
right_cache = t;
|
|
|
|
|
|
|
|
right_val_cache = match self.right_keys.iter().map(|(tid, cid)|
|
|
|
|
|
|
|
|
extract_table_ref(&right_cache, tid, cid).map(|v| v.to_static())).collect::<Result<Vec<_>>>() {
|
|
|
|
|
|
|
|
Err(e) => return Some(Err(e)),
|
|
|
|
|
|
|
|
Ok(t) => t
|
|
|
|
|
|
|
|
};
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
};
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
loop {
|
|
|
|
loop {
|
|
|
|
match left_val_cache.cmp(&right_val_cache) {
|
|
|
|
let cmp_res = match compare_tuple_by_keys((&left_cache, self.left_keys),
|
|
|
|
|
|
|
|
(&right_cache, self.right_keys)) {
|
|
|
|
|
|
|
|
Ok(r) => r,
|
|
|
|
|
|
|
|
Err(e) => return Some(Err(e))
|
|
|
|
|
|
|
|
};
|
|
|
|
|
|
|
|
match cmp_res {
|
|
|
|
Ordering::Equal => {
|
|
|
|
Ordering::Equal => {
|
|
|
|
left_cache.extend(right_cache);
|
|
|
|
left_cache.extend(right_cache);
|
|
|
|
return Some(Ok(left_cache));
|
|
|
|
return Some(Ok(left_cache));
|
|
|
@ -502,11 +481,6 @@ impl<'a> Iterator for MergeJoinIterator<'a> {
|
|
|
|
Some(Err(e)) => return Some(Err(e)),
|
|
|
|
Some(Err(e)) => return Some(Err(e)),
|
|
|
|
Some(Ok(t)) => {
|
|
|
|
Some(Ok(t)) => {
|
|
|
|
left_cache = t;
|
|
|
|
left_cache = t;
|
|
|
|
left_val_cache = match self.left_keys.iter().map(|(tid, cid)|
|
|
|
|
|
|
|
|
extract_table_ref(&left_cache, tid, cid).map(|v| v.to_static())).collect::<Result<Vec<_>>>() {
|
|
|
|
|
|
|
|
Err(e) => return Some(Err(e)),
|
|
|
|
|
|
|
|
Ok(t) => t
|
|
|
|
|
|
|
|
};
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
};
|
|
|
|
};
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -517,11 +491,6 @@ impl<'a> Iterator for MergeJoinIterator<'a> {
|
|
|
|
Some(Err(e)) => return Some(Err(e)),
|
|
|
|
Some(Err(e)) => return Some(Err(e)),
|
|
|
|
Some(Ok(t)) => {
|
|
|
|
Some(Ok(t)) => {
|
|
|
|
right_cache = t;
|
|
|
|
right_cache = t;
|
|
|
|
right_val_cache = match self.right_keys.iter().map(|(tid, cid)|
|
|
|
|
|
|
|
|
extract_table_ref(&right_cache, tid, cid).map(|v| v.to_static())).collect::<Result<Vec<_>>>() {
|
|
|
|
|
|
|
|
Err(e) => return Some(Err(e)),
|
|
|
|
|
|
|
|
Ok(t) => t
|
|
|
|
|
|
|
|
};
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
};
|
|
|
|
};
|
|
|
|
}
|
|
|
|
}
|
|
|
|