|
|
|
@ -163,7 +163,7 @@ pub enum MegaTupleIt<'a> {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<'a> MegaTupleIt<'a> {
|
|
|
|
|
pub fn iter(&'a self) -> Box<dyn Iterator<Item=MegaTuple> + 'a> {
|
|
|
|
|
pub fn iter(&'a self) -> Box<dyn Iterator<Item=Result<MegaTuple>> + 'a> {
|
|
|
|
|
match self {
|
|
|
|
|
MegaTupleIt::NodeIt { it, tid } => {
|
|
|
|
|
let prefix_tuple = OwnTuple::with_prefix(*tid);
|
|
|
|
@ -228,7 +228,7 @@ pub struct NodeIterator<'a> {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<'a> Iterator for NodeIterator<'a> {
|
|
|
|
|
type Item = MegaTuple;
|
|
|
|
|
type Item = Result<MegaTuple>;
|
|
|
|
|
fn next(&mut self) -> Option<Self::Item> {
|
|
|
|
|
if self.started {
|
|
|
|
|
self.it.next();
|
|
|
|
@ -236,10 +236,10 @@ impl<'a> Iterator for NodeIterator<'a> {
|
|
|
|
|
self.started = true;
|
|
|
|
|
}
|
|
|
|
|
self.it.pair().map(|(k, v)| {
|
|
|
|
|
MegaTuple {
|
|
|
|
|
Ok(MegaTuple {
|
|
|
|
|
keys: vec![Tuple::new(k).into()],
|
|
|
|
|
vals: vec![Tuple::new(v).into()],
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -250,7 +250,7 @@ pub struct EdgeIterator<'a> {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<'a> Iterator for EdgeIterator<'a> {
|
|
|
|
|
type Item = MegaTuple;
|
|
|
|
|
type Item = Result<MegaTuple>;
|
|
|
|
|
|
|
|
|
|
fn next(&mut self) -> Option<Self::Item> {
|
|
|
|
|
if self.started {
|
|
|
|
@ -267,10 +267,10 @@ impl<'a> Iterator for EdgeIterator<'a> {
|
|
|
|
|
self.it.next()
|
|
|
|
|
} else {
|
|
|
|
|
let kt = Tuple::new(k);
|
|
|
|
|
return Some(MegaTuple {
|
|
|
|
|
return Some(Ok(MegaTuple {
|
|
|
|
|
keys: vec![kt.into()],
|
|
|
|
|
vals: vec![vt.into()],
|
|
|
|
|
});
|
|
|
|
|
}));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -284,7 +284,7 @@ pub struct EdgeKeyOnlyBwdIterator<'a> {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<'a> Iterator for EdgeKeyOnlyBwdIterator<'a> {
|
|
|
|
|
type Item = MegaTuple;
|
|
|
|
|
type Item = Result<MegaTuple>;
|
|
|
|
|
|
|
|
|
|
fn next(&mut self) -> Option<Self::Item> {
|
|
|
|
|
if self.started {
|
|
|
|
@ -300,10 +300,10 @@ impl<'a> Iterator for EdgeKeyOnlyBwdIterator<'a> {
|
|
|
|
|
if !matches!(rev_k_tuple.data_kind(), Ok(DataKind::Edge)) {
|
|
|
|
|
self.it.next()
|
|
|
|
|
} else {
|
|
|
|
|
return Some(MegaTuple {
|
|
|
|
|
return Some(Ok(MegaTuple {
|
|
|
|
|
keys: vec![rev_k_tuple.into()],
|
|
|
|
|
vals: vec![],
|
|
|
|
|
});
|
|
|
|
|
}));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -312,18 +312,19 @@ impl<'a> Iterator for EdgeKeyOnlyBwdIterator<'a> {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub struct KeySortedWithAssocIterator<'a> {
|
|
|
|
|
main: Box<dyn Iterator<Item=MegaTuple> + 'a>,
|
|
|
|
|
main: Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>,
|
|
|
|
|
associates: Vec<NodeIterator<'a>>,
|
|
|
|
|
buffer: Vec<Option<(CowTuple, CowTuple)>>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<'a> Iterator for KeySortedWithAssocIterator<'a> {
|
|
|
|
|
type Item = MegaTuple;
|
|
|
|
|
type Item = Result<MegaTuple>;
|
|
|
|
|
|
|
|
|
|
fn next(&mut self) -> Option<Self::Item> {
|
|
|
|
|
match self.main.next() {
|
|
|
|
|
None => None, // main exhausted, we are finished
|
|
|
|
|
Some(MegaTuple { mut keys, mut vals }) => {
|
|
|
|
|
Some(Err(e)) => return Some(Err(e)),
|
|
|
|
|
Some(Ok(MegaTuple { mut keys, mut vals })) => {
|
|
|
|
|
// extract key from main
|
|
|
|
|
let k = keys.pop().unwrap();
|
|
|
|
|
let l = self.associates.len();
|
|
|
|
@ -336,8 +337,20 @@ impl<'a> Iterator for KeySortedWithAssocIterator<'a> {
|
|
|
|
|
// if no cache, try to get cache filled first
|
|
|
|
|
if matches!(cached, None) {
|
|
|
|
|
let assoc_data = self.associates.get_mut(i).unwrap().next()
|
|
|
|
|
.map(|mut mt| (mt.keys.pop().unwrap(), mt.vals.pop().unwrap()));
|
|
|
|
|
self.buffer[i] = assoc_data;
|
|
|
|
|
.map(|mt| {
|
|
|
|
|
mt.map(|mut mt| {
|
|
|
|
|
(mt.keys.pop().unwrap(), mt.vals.pop().unwrap())
|
|
|
|
|
})
|
|
|
|
|
});
|
|
|
|
|
match assoc_data {
|
|
|
|
|
None => {
|
|
|
|
|
self.buffer[i] = None
|
|
|
|
|
}
|
|
|
|
|
Some(Ok(data)) => {
|
|
|
|
|
self.buffer[i] = Some(data)
|
|
|
|
|
}
|
|
|
|
|
Some(Err(e)) => return Some(Err(e))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// if we have cache
|
|
|
|
@ -357,8 +370,20 @@ impl<'a> Iterator for KeySortedWithAssocIterator<'a> {
|
|
|
|
|
// target key greater than cache key, meaning that the source has holes (maybe due to filtering)
|
|
|
|
|
// get a new one into buffer
|
|
|
|
|
let assoc_data = self.associates.get_mut(i).unwrap().next()
|
|
|
|
|
.map(|mut mt| (mt.keys.pop().unwrap(), mt.vals.pop().unwrap()));
|
|
|
|
|
self.buffer[i] = assoc_data;
|
|
|
|
|
.map(|mt| {
|
|
|
|
|
mt.map(|mut mt| {
|
|
|
|
|
(mt.keys.pop().unwrap(), mt.vals.pop().unwrap())
|
|
|
|
|
})
|
|
|
|
|
});
|
|
|
|
|
match assoc_data {
|
|
|
|
|
None => {
|
|
|
|
|
self.buffer[i] = None
|
|
|
|
|
}
|
|
|
|
|
Some(Ok(data)) => {
|
|
|
|
|
self.buffer[i] = Some(data)
|
|
|
|
|
}
|
|
|
|
|
Some(Err(e)) => return Some(Err(e))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -370,30 +395,31 @@ impl<'a> Iterator for KeySortedWithAssocIterator<'a> {
|
|
|
|
|
}
|
|
|
|
|
Some(v) => v
|
|
|
|
|
}));
|
|
|
|
|
Some(MegaTuple {
|
|
|
|
|
Some(Ok(MegaTuple {
|
|
|
|
|
keys: vec![k],
|
|
|
|
|
vals,
|
|
|
|
|
})
|
|
|
|
|
}))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub struct CartesianProdIterator<'a> {
|
|
|
|
|
left: Box<dyn Iterator<Item=MegaTuple> + 'a>,
|
|
|
|
|
left: Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>,
|
|
|
|
|
left_cache: MegaTuple,
|
|
|
|
|
right_source: &'a MegaTupleIt<'a>,
|
|
|
|
|
right: Box<dyn Iterator<Item=MegaTuple> + 'a>,
|
|
|
|
|
right: Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<'a> Iterator for CartesianProdIterator<'a> {
|
|
|
|
|
type Item = MegaTuple;
|
|
|
|
|
type Item = Result<MegaTuple>;
|
|
|
|
|
|
|
|
|
|
fn next(&mut self) -> Option<Self::Item> {
|
|
|
|
|
if self.left_cache.is_empty() {
|
|
|
|
|
self.left_cache = match self.left.next() {
|
|
|
|
|
None => return None,
|
|
|
|
|
Some(v) => v
|
|
|
|
|
Some(Ok(v)) => v,
|
|
|
|
|
Some(Err(e)) => return Some(Err(e))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
let r_tpl = match self.right.next() {
|
|
|
|
@ -401,20 +427,23 @@ impl<'a> Iterator for CartesianProdIterator<'a> {
|
|
|
|
|
self.right = Box::new(self.right_source.iter());
|
|
|
|
|
self.left_cache = match self.left.next() {
|
|
|
|
|
None => return None,
|
|
|
|
|
Some(v) => v
|
|
|
|
|
Some(Ok(v)) => v,
|
|
|
|
|
Some(Err(e)) => return Some(Err(e))
|
|
|
|
|
};
|
|
|
|
|
match self.right.next() {
|
|
|
|
|
// early return in case right is empty
|
|
|
|
|
None => return None,
|
|
|
|
|
Some(r_tpl) => r_tpl
|
|
|
|
|
Some(Ok(r_tpl)) => r_tpl,
|
|
|
|
|
Some(Err(e)) => return Some(Err(e))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
Some(r_tpl) => r_tpl
|
|
|
|
|
Some(Ok(r_tpl)) => r_tpl,
|
|
|
|
|
Some(Err(e)) => return Some(Err(e))
|
|
|
|
|
};
|
|
|
|
|
let mut ret = self.left_cache.clone();
|
|
|
|
|
ret.keys.extend(r_tpl.keys);
|
|
|
|
|
ret.vals.extend(r_tpl.vals);
|
|
|
|
|
Some(ret)
|
|
|
|
|
Some(Ok(ret))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -500,6 +529,7 @@ mod tests {
|
|
|
|
|
let tbl = rel_tbls.pop().unwrap();
|
|
|
|
|
let it = sess.iter_node(tbl);
|
|
|
|
|
for tuple in it.iter() {
|
|
|
|
|
let tuple = tuple.unwrap();
|
|
|
|
|
match sess.tuple_eval(&where_vals, &tuple).unwrap() {
|
|
|
|
|
Value::Bool(true) => {
|
|
|
|
|
let extracted = sess.tuple_eval(&vals, &tuple).unwrap();
|
|
|
|
@ -545,6 +575,7 @@ mod tests {
|
|
|
|
|
println!("Now cartesian product");
|
|
|
|
|
let mut n = 0;
|
|
|
|
|
for el in it.iter() {
|
|
|
|
|
let el = el.unwrap();
|
|
|
|
|
if n % 4096 == 0 {
|
|
|
|
|
println!("{}: {:?}", n, el)
|
|
|
|
|
}
|
|
|
|
|