|
|
|
@ -1,47 +1,79 @@
|
|
|
|
|
use std::cmp::Ordering;
|
|
|
|
|
use std::{iter, mem};
|
|
|
|
|
use std::fmt::{Debug, Formatter};
|
|
|
|
|
use cozorocks::IteratorPtr;
|
|
|
|
|
use crate::db::eval::{compare_tuple_by_keys, tuple_eval};
|
|
|
|
|
use crate::db::table::{ColId, TableId};
|
|
|
|
|
use crate::error::CozoError::LogicError;
|
|
|
|
|
use crate::error::Result;
|
|
|
|
|
use crate::error::{CozoError, Result};
|
|
|
|
|
use crate::relation::data::{DataKind, EMPTY_DATA};
|
|
|
|
|
use crate::relation::table::MegaTuple;
|
|
|
|
|
use crate::relation::tuple::{CowSlice, CowTuple, OwnTuple, Tuple};
|
|
|
|
|
use crate::relation::value::Value;
|
|
|
|
|
|
|
|
|
|
pub enum MegaTupleIt<'a> {
|
|
|
|
|
|
|
|
|
|
pub enum IteratorSlot<'a> {
|
|
|
|
|
Dummy,
|
|
|
|
|
Reified(IteratorPtr<'a>),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<'a> Debug for IteratorSlot<'a> {
|
|
|
|
|
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
|
|
|
|
match self {
|
|
|
|
|
IteratorSlot::Dummy => write!(f, "DummyIterator"),
|
|
|
|
|
IteratorSlot::Reified(_) => write!(f, "BaseIterator")
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<'a> From<IteratorPtr<'a>> for IteratorSlot<'a> {
|
|
|
|
|
fn from(it: IteratorPtr<'a>) -> Self {
|
|
|
|
|
Self::Reified(it)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<'a> IteratorSlot<'a> {
|
|
|
|
|
pub fn try_get(&self) -> Result<&IteratorPtr<'a>> {
|
|
|
|
|
match self {
|
|
|
|
|
IteratorSlot::Dummy => Err(LogicError("Cannot iter over dummy".to_string())),
|
|
|
|
|
IteratorSlot::Reified(r) => Ok(r)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[derive(Debug)]
|
|
|
|
|
pub enum ExecPlan<'a> {
|
|
|
|
|
NodeIt {
|
|
|
|
|
it: IteratorPtr<'a>,
|
|
|
|
|
tid: u32
|
|
|
|
|
it: IteratorSlot<'a>,
|
|
|
|
|
tid: u32,
|
|
|
|
|
},
|
|
|
|
|
EdgeIt {
|
|
|
|
|
it: IteratorPtr<'a>,
|
|
|
|
|
tid: u32
|
|
|
|
|
it: IteratorSlot<'a>,
|
|
|
|
|
tid: u32,
|
|
|
|
|
},
|
|
|
|
|
EdgeKeyOnlyBwdIt {
|
|
|
|
|
it: IteratorPtr<'a>,
|
|
|
|
|
tid: u32
|
|
|
|
|
it: IteratorSlot<'a>,
|
|
|
|
|
tid: u32,
|
|
|
|
|
},
|
|
|
|
|
// EdgeBwdIt { it: IteratorPtr<'a>, sess: &'a Session<'a>, tid: u32 },
|
|
|
|
|
// IndexIt {it: ..}
|
|
|
|
|
KeySortedWithAssocIt {
|
|
|
|
|
main: Box<MegaTupleIt<'a>>,
|
|
|
|
|
associates: Vec<(u32, IteratorPtr<'a>)>
|
|
|
|
|
main: Box<ExecPlan<'a>>,
|
|
|
|
|
associates: Vec<(u32, IteratorSlot<'a>)>,
|
|
|
|
|
},
|
|
|
|
|
CartesianProdIt {
|
|
|
|
|
left: Box<MegaTupleIt<'a>>,
|
|
|
|
|
right: Box<MegaTupleIt<'a>>
|
|
|
|
|
left: Box<ExecPlan<'a>>,
|
|
|
|
|
right: Box<ExecPlan<'a>>,
|
|
|
|
|
},
|
|
|
|
|
MergeJoinIt {
|
|
|
|
|
left: Box<MegaTupleIt<'a>>,
|
|
|
|
|
right: Box<MegaTupleIt<'a>>,
|
|
|
|
|
left: Box<ExecPlan<'a>>,
|
|
|
|
|
right: Box<ExecPlan<'a>>,
|
|
|
|
|
left_keys: Vec<(TableId, ColId)>,
|
|
|
|
|
right_keys: Vec<(TableId, ColId)>
|
|
|
|
|
right_keys: Vec<(TableId, ColId)>,
|
|
|
|
|
},
|
|
|
|
|
OuterMergeJoinIt {
|
|
|
|
|
left: Box<MegaTupleIt<'a>>,
|
|
|
|
|
right: Box<MegaTupleIt<'a>>,
|
|
|
|
|
left: Box<ExecPlan<'a>>,
|
|
|
|
|
right: Box<ExecPlan<'a>>,
|
|
|
|
|
left_keys: Vec<(TableId, ColId)>,
|
|
|
|
|
right_keys: Vec<(TableId, ColId)>,
|
|
|
|
|
left_outer: bool,
|
|
|
|
@ -50,61 +82,79 @@ pub enum MegaTupleIt<'a> {
|
|
|
|
|
right_len: (usize, usize),
|
|
|
|
|
},
|
|
|
|
|
KeyedUnionIt {
|
|
|
|
|
left: Box<MegaTupleIt<'a>>,
|
|
|
|
|
right: Box<MegaTupleIt<'a>>
|
|
|
|
|
left: Box<ExecPlan<'a>>,
|
|
|
|
|
right: Box<ExecPlan<'a>>,
|
|
|
|
|
},
|
|
|
|
|
KeyedDifferenceIt {
|
|
|
|
|
left: Box<MegaTupleIt<'a>>,
|
|
|
|
|
right: Box<MegaTupleIt<'a>>
|
|
|
|
|
left: Box<ExecPlan<'a>>,
|
|
|
|
|
right: Box<ExecPlan<'a>>,
|
|
|
|
|
},
|
|
|
|
|
FilterIt {
|
|
|
|
|
it: Box<MegaTupleIt<'a>>,
|
|
|
|
|
filter: Value<'a>
|
|
|
|
|
it: Box<ExecPlan<'a>>,
|
|
|
|
|
filter: Value<'a>,
|
|
|
|
|
},
|
|
|
|
|
EvalIt {
|
|
|
|
|
it: Box<MegaTupleIt<'a>>,
|
|
|
|
|
it: Box<ExecPlan<'a>>,
|
|
|
|
|
keys: Vec<Value<'a>>,
|
|
|
|
|
vals: Vec<Value<'a>>,
|
|
|
|
|
out_prefix: u32
|
|
|
|
|
out_prefix: u32,
|
|
|
|
|
},
|
|
|
|
|
BagsUnionIt {
|
|
|
|
|
bags: Vec<MegaTupleIt<'a>>
|
|
|
|
|
bags: Vec<ExecPlan<'a>>
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<'a> MegaTupleIt<'a> {
|
|
|
|
|
pub fn iter(&'a self) -> Box<dyn Iterator<Item=Result<MegaTuple>> + 'a> {
|
|
|
|
|
pub struct OutputIt<'a> {
|
|
|
|
|
it: ExecPlan<'a>,
|
|
|
|
|
filter: Value<'a>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<'a> OutputIt<'a> {
|
|
|
|
|
pub fn iter(&self) -> Result<OutputIterator> {
|
|
|
|
|
Ok(OutputIterator {
|
|
|
|
|
it: self.it.iter()?,
|
|
|
|
|
transform: &self.filter,
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<'a> ExecPlan<'a> {
|
|
|
|
|
pub fn iter(&'a self) -> Result<Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>> {
|
|
|
|
|
match self {
|
|
|
|
|
MegaTupleIt::NodeIt { it, tid } => {
|
|
|
|
|
ExecPlan::NodeIt { it, tid } => {
|
|
|
|
|
let it = it.try_get()?;
|
|
|
|
|
let prefix_tuple = OwnTuple::with_prefix(*tid);
|
|
|
|
|
it.seek(prefix_tuple);
|
|
|
|
|
|
|
|
|
|
Box::new(NodeIterator {
|
|
|
|
|
Ok(Box::new(NodeIterator {
|
|
|
|
|
it,
|
|
|
|
|
started: false,
|
|
|
|
|
})
|
|
|
|
|
}))
|
|
|
|
|
}
|
|
|
|
|
MegaTupleIt::EdgeIt { it, tid } => {
|
|
|
|
|
ExecPlan::EdgeIt { it, tid } => {
|
|
|
|
|
let it = it.try_get()?;
|
|
|
|
|
let prefix_tuple = OwnTuple::with_prefix(*tid);
|
|
|
|
|
it.seek(prefix_tuple);
|
|
|
|
|
|
|
|
|
|
Box::new(EdgeIterator {
|
|
|
|
|
Ok(Box::new(EdgeIterator {
|
|
|
|
|
it,
|
|
|
|
|
started: false,
|
|
|
|
|
})
|
|
|
|
|
}))
|
|
|
|
|
}
|
|
|
|
|
MegaTupleIt::EdgeKeyOnlyBwdIt { it, tid } => {
|
|
|
|
|
ExecPlan::EdgeKeyOnlyBwdIt { it, tid } => {
|
|
|
|
|
let it = it.try_get()?;
|
|
|
|
|
let prefix_tuple = OwnTuple::with_prefix(*tid);
|
|
|
|
|
it.seek(prefix_tuple);
|
|
|
|
|
|
|
|
|
|
Box::new(EdgeKeyOnlyBwdIterator {
|
|
|
|
|
Ok(Box::new(EdgeKeyOnlyBwdIterator {
|
|
|
|
|
it,
|
|
|
|
|
started: false,
|
|
|
|
|
})
|
|
|
|
|
}))
|
|
|
|
|
}
|
|
|
|
|
MegaTupleIt::KeySortedWithAssocIt { main, associates } => {
|
|
|
|
|
ExecPlan::KeySortedWithAssocIt { main, associates } => {
|
|
|
|
|
let buffer = iter::repeat_with(|| None).take(associates.len()).collect();
|
|
|
|
|
let associates = associates.into_iter().map(|(tid, it)| {
|
|
|
|
|
it.try_get().map(|it| {
|
|
|
|
|
let prefix_tuple = OwnTuple::with_prefix(*tid);
|
|
|
|
|
it.seek(prefix_tuple);
|
|
|
|
|
|
|
|
|
@ -112,51 +162,52 @@ impl<'a> MegaTupleIt<'a> {
|
|
|
|
|
it,
|
|
|
|
|
started: false,
|
|
|
|
|
}
|
|
|
|
|
}).collect();
|
|
|
|
|
Box::new(KeySortedWithAssocIterator {
|
|
|
|
|
main: main.iter(),
|
|
|
|
|
})
|
|
|
|
|
}).collect::<Result<Vec<_>>>()?;
|
|
|
|
|
Ok(Box::new(KeySortedWithAssocIterator {
|
|
|
|
|
main: main.iter()?,
|
|
|
|
|
associates,
|
|
|
|
|
buffer,
|
|
|
|
|
})
|
|
|
|
|
}))
|
|
|
|
|
}
|
|
|
|
|
MegaTupleIt::CartesianProdIt { left, right } => {
|
|
|
|
|
Box::new(CartesianProdIterator {
|
|
|
|
|
left: left.iter(),
|
|
|
|
|
ExecPlan::CartesianProdIt { left, right } => {
|
|
|
|
|
Ok(Box::new(CartesianProdIterator {
|
|
|
|
|
left: left.iter()?,
|
|
|
|
|
left_cache: MegaTuple::empty_tuple(),
|
|
|
|
|
right_source: right.as_ref(),
|
|
|
|
|
right: right.as_ref().iter(),
|
|
|
|
|
})
|
|
|
|
|
right: right.as_ref().iter()?,
|
|
|
|
|
}))
|
|
|
|
|
}
|
|
|
|
|
MegaTupleIt::FilterIt { it, filter } => {
|
|
|
|
|
Box::new(FilterIterator {
|
|
|
|
|
it: it.iter(),
|
|
|
|
|
ExecPlan::FilterIt { it, filter } => {
|
|
|
|
|
Ok(Box::new(FilterIterator {
|
|
|
|
|
it: it.iter()?,
|
|
|
|
|
filter,
|
|
|
|
|
})
|
|
|
|
|
}))
|
|
|
|
|
}
|
|
|
|
|
MegaTupleIt::EvalIt { it, keys, vals, out_prefix: prefix } => {
|
|
|
|
|
Box::new(EvalIterator {
|
|
|
|
|
it: it.iter(),
|
|
|
|
|
ExecPlan::EvalIt { it, keys, vals, out_prefix: prefix } => {
|
|
|
|
|
Ok(Box::new(EvalIterator {
|
|
|
|
|
it: it.iter()?,
|
|
|
|
|
keys,
|
|
|
|
|
vals,
|
|
|
|
|
prefix: *prefix,
|
|
|
|
|
})
|
|
|
|
|
}))
|
|
|
|
|
}
|
|
|
|
|
MegaTupleIt::MergeJoinIt { left, right, left_keys, right_keys } => {
|
|
|
|
|
Box::new(MergeJoinIterator {
|
|
|
|
|
left: left.iter(),
|
|
|
|
|
right: right.iter(),
|
|
|
|
|
ExecPlan::MergeJoinIt { left, right, left_keys, right_keys } => {
|
|
|
|
|
Ok(Box::new(MergeJoinIterator {
|
|
|
|
|
left: left.iter()?,
|
|
|
|
|
right: right.iter()?,
|
|
|
|
|
left_keys,
|
|
|
|
|
right_keys,
|
|
|
|
|
})
|
|
|
|
|
}))
|
|
|
|
|
}
|
|
|
|
|
MegaTupleIt::OuterMergeJoinIt {
|
|
|
|
|
ExecPlan::OuterMergeJoinIt {
|
|
|
|
|
left, right,
|
|
|
|
|
left_keys, right_keys, left_outer, right_outer,
|
|
|
|
|
left_len, right_len
|
|
|
|
|
} => {
|
|
|
|
|
Box::new(OuterMergeJoinIterator {
|
|
|
|
|
left: left.iter(),
|
|
|
|
|
right: right.iter(),
|
|
|
|
|
Ok(Box::new(OuterMergeJoinIterator {
|
|
|
|
|
left: left.iter()?,
|
|
|
|
|
right: right.iter()?,
|
|
|
|
|
left_outer: *left_outer,
|
|
|
|
|
right_outer: *right_outer,
|
|
|
|
|
left_keys,
|
|
|
|
@ -167,28 +218,28 @@ impl<'a> MegaTupleIt<'a> {
|
|
|
|
|
right_cache: None,
|
|
|
|
|
pull_left: true,
|
|
|
|
|
pull_right: true,
|
|
|
|
|
})
|
|
|
|
|
}))
|
|
|
|
|
}
|
|
|
|
|
MegaTupleIt::KeyedUnionIt { left, right } => {
|
|
|
|
|
Box::new(KeyedUnionIterator {
|
|
|
|
|
left: left.iter(),
|
|
|
|
|
right: right.iter(),
|
|
|
|
|
})
|
|
|
|
|
ExecPlan::KeyedUnionIt { left, right } => {
|
|
|
|
|
Ok(Box::new(KeyedUnionIterator {
|
|
|
|
|
left: left.iter()?,
|
|
|
|
|
right: right.iter()?,
|
|
|
|
|
}))
|
|
|
|
|
}
|
|
|
|
|
MegaTupleIt::KeyedDifferenceIt { left, right } => {
|
|
|
|
|
Box::new(KeyedDifferenceIterator {
|
|
|
|
|
left: left.iter(),
|
|
|
|
|
right: right.iter(),
|
|
|
|
|
ExecPlan::KeyedDifferenceIt { left, right } => {
|
|
|
|
|
Ok(Box::new(KeyedDifferenceIterator {
|
|
|
|
|
left: left.iter()?,
|
|
|
|
|
right: right.iter()?,
|
|
|
|
|
right_cache: None,
|
|
|
|
|
started: false,
|
|
|
|
|
})
|
|
|
|
|
}))
|
|
|
|
|
}
|
|
|
|
|
MegaTupleIt::BagsUnionIt { bags } => {
|
|
|
|
|
let bags = bags.iter().map(|i| i.iter()).collect();
|
|
|
|
|
Box::new(BagsUnionIterator {
|
|
|
|
|
ExecPlan::BagsUnionIt { bags } => {
|
|
|
|
|
let bags = bags.iter().map(|i| i.iter()).collect::<Result<Vec<_>>>()?;
|
|
|
|
|
Ok(Box::new(BagsUnionIterator {
|
|
|
|
|
bags,
|
|
|
|
|
current: 0,
|
|
|
|
|
})
|
|
|
|
|
}))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -726,7 +777,7 @@ impl<'a> Iterator for MergeJoinIterator<'a> {
|
|
|
|
|
pub struct CartesianProdIterator<'a> {
|
|
|
|
|
left: Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>,
|
|
|
|
|
left_cache: MegaTuple,
|
|
|
|
|
right_source: &'a MegaTupleIt<'a>,
|
|
|
|
|
right_source: &'a ExecPlan<'a>,
|
|
|
|
|
right: Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -743,7 +794,10 @@ impl<'a> Iterator for CartesianProdIterator<'a> {
|
|
|
|
|
}
|
|
|
|
|
let r_tpl = match self.right.next() {
|
|
|
|
|
None => {
|
|
|
|
|
self.right = self.right_source.iter();
|
|
|
|
|
self.right = match self.right_source.iter() {
|
|
|
|
|
Ok(it) => it,
|
|
|
|
|
Err(e) => return Some(Err(e))
|
|
|
|
|
};
|
|
|
|
|
self.left_cache = match self.left.next() {
|
|
|
|
|
None => return None,
|
|
|
|
|
Some(Ok(v)) => v,
|
|
|
|
@ -800,11 +854,11 @@ pub struct OutputIterator<'a> {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<'a> OutputIterator<'a> {
|
|
|
|
|
pub fn new(it: &'a MegaTupleIt<'a>, transform: &'a Value<'a>) -> Self {
|
|
|
|
|
Self {
|
|
|
|
|
it: it.iter(),
|
|
|
|
|
pub fn new(it: &'a ExecPlan<'a>, transform: &'a Value<'a>) -> Result<Self> {
|
|
|
|
|
Ok(Self {
|
|
|
|
|
it: it.iter()?,
|
|
|
|
|
transform,
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -863,7 +917,7 @@ mod tests {
|
|
|
|
|
use crate::db::engine::Engine;
|
|
|
|
|
use crate::parser::{Parser, Rule};
|
|
|
|
|
use pest::Parser as PestParser;
|
|
|
|
|
use crate::db::iterator::{MegaTupleIt, OutputIterator};
|
|
|
|
|
use crate::db::iterator::{ExecPlan, OutputIterator};
|
|
|
|
|
use crate::db::query::FromEl;
|
|
|
|
|
use crate::relation::value::Value;
|
|
|
|
|
use crate::error::Result;
|
|
|
|
@ -878,7 +932,7 @@ mod tests {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn plan() {
|
|
|
|
|
fn plan() -> Result<()> {
|
|
|
|
|
let db_path = "_test_db_plan";
|
|
|
|
|
let engine = Engine::new(db_path.to_string(), true).unwrap();
|
|
|
|
|
{
|
|
|
|
@ -935,34 +989,34 @@ mod tests {
|
|
|
|
|
|
|
|
|
|
let tbl = rel_tbls.pop().unwrap();
|
|
|
|
|
let it = sess.iter_node(tbl);
|
|
|
|
|
let it = MegaTupleIt::FilterIt { filter: where_vals, it: it.into() };
|
|
|
|
|
let it = OutputIterator::new(&it, &vals);
|
|
|
|
|
let it = ExecPlan::FilterIt { filter: where_vals, it: it.into() };
|
|
|
|
|
let it = OutputIterator::new(&it, &vals)?;
|
|
|
|
|
for val in it {
|
|
|
|
|
println!("{}", val.unwrap());
|
|
|
|
|
}
|
|
|
|
|
let duration = start.elapsed();
|
|
|
|
|
let duration2 = start2.elapsed();
|
|
|
|
|
println!("Time elapsed {:?} {:?}", duration, duration2);
|
|
|
|
|
let it = MegaTupleIt::KeySortedWithAssocIt {
|
|
|
|
|
let it = ExecPlan::KeySortedWithAssocIt {
|
|
|
|
|
main: Box::new(sess.iter_node(tbl)),
|
|
|
|
|
associates: vec![(tbl.id as u32, sess.raw_iterator(true)),
|
|
|
|
|
(tbl.id as u32, sess.raw_iterator(true)),
|
|
|
|
|
(tbl.id as u32, sess.raw_iterator(true))],
|
|
|
|
|
associates: vec![(tbl.id as u32, sess.raw_iterator(true).into()),
|
|
|
|
|
(tbl.id as u32, sess.raw_iterator(true).into()),
|
|
|
|
|
(tbl.id as u32, sess.raw_iterator(true).into())],
|
|
|
|
|
};
|
|
|
|
|
{
|
|
|
|
|
for el in it.iter() {
|
|
|
|
|
for el in it.iter()? {
|
|
|
|
|
println!("{:?}", el);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
println!("XXXXX");
|
|
|
|
|
{
|
|
|
|
|
for el in it.iter() {
|
|
|
|
|
for el in it.iter()? {
|
|
|
|
|
println!("{:?}", el);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
let mut it = sess.iter_node(tbl);
|
|
|
|
|
for _ in 0..3 {
|
|
|
|
|
it = MegaTupleIt::CartesianProdIt {
|
|
|
|
|
it = ExecPlan::CartesianProdIt {
|
|
|
|
|
left: Box::new(it),
|
|
|
|
|
right: Box::new(sess.iter_node(tbl)),
|
|
|
|
|
}
|
|
|
|
@ -972,7 +1026,7 @@ mod tests {
|
|
|
|
|
|
|
|
|
|
println!("Now cartesian product");
|
|
|
|
|
let mut n = 0;
|
|
|
|
|
for el in it.iter() {
|
|
|
|
|
for el in it.iter()? {
|
|
|
|
|
let el = el.unwrap();
|
|
|
|
|
// if n % 4096 == 0 {
|
|
|
|
|
// println!("{}: {:?}", n, el)
|
|
|
|
@ -989,5 +1043,6 @@ mod tests {
|
|
|
|
|
}
|
|
|
|
|
drop(engine);
|
|
|
|
|
let _ = fs::remove_dir_all(db_path);
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
}
|