fix nested join iter

main
Ziyang Hu 2 years ago
parent c7b22e6d6f
commit db4b7c4851

@ -22,11 +22,13 @@ pub fn extract_table_ref<'a>(
} else {
&tuples.vals
};
let target = targets
.get(tid.id as usize)
.ok_or_else(|| {
LogicError(format!("Tuple ref out of bound: wanted {:?} for {}", tid, targets.len()))
})?;
let target = targets.get(tid.id as usize).ok_or_else(|| {
LogicError(format!(
"Tuple ref out of bound: wanted {:?} for {}",
tid,
targets.len()
))
})?;
if matches!(target.data_kind(), Ok(DataKind::Empty)) {
Ok(Value::Null)
} else {

@ -42,7 +42,7 @@ impl<'a> TableRowGetter<'a> {
pub fn reset(&mut self) {
self.key_cache.truncate_all();
}
pub fn get_with_tuple<'b, T: AsRef<[u8]>>(&self, t: &'b T) -> Result<Option<SliceTuple>> {
pub fn get_with_tuple<T: AsRef<[u8]>>(&self, t: &T) -> Result<Option<SliceTuple>> {
let res = self
.sess
.txn
@ -58,7 +58,7 @@ impl<'a> TableRowGetter<'a> {
.map(Tuple::new);
Ok(res)
}
pub fn get_with_iter<'b, T: Iterator<Item=Value<'b>>>(
pub fn get_with_iter<'b, T: Iterator<Item = Value<'b>>>(
&mut self,
vals: T,
) -> Result<Option<SliceTuple>> {
@ -102,7 +102,6 @@ impl<'a> IteratorSlot<'a> {
}
}
#[allow(clippy::large_enum_variant)]
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub enum ChainJoinKind {
NodeToFwdEdge,
@ -111,6 +110,7 @@ pub enum ChainJoinKind {
BwdEdgeToNode,
}
#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
pub enum ExecPlan<'a> {
NodeItPlan {
@ -253,7 +253,7 @@ impl<'a> OutputItPlan<'a> {
}
impl<'a> ExecPlan<'a> {
pub fn iter(&'a self) -> Result<Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>> {
pub fn iter(&'a self) -> Result<Box<dyn Iterator<Item = Result<MegaTuple>> + 'a>> {
match self {
ExecPlan::NodeItPlan { it, info, .. } => {
let it = it.try_get()?;
@ -418,9 +418,11 @@ impl<'a> ExecPlan<'a> {
right_it: right_iter,
right_getter: right.clone(),
kind: chain_kind,
right_key_cache: right.key_cache.clone(),
right_key_cache: None,
edge_front_table_id,
left_key_len,
left_cache: None,
last_right_key_cache: None,
})
}
ChainJoinKind::FwdEdgeToNode => Box::new(EdgeToNodeChainJoinIterator {
@ -457,13 +459,15 @@ pub enum NodeEdgeChainKind {
}
pub struct NodeToEdgeChainJoinIterator<'a> {
left: Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>,
left: Box<dyn Iterator<Item = Result<MegaTuple>> + 'a>,
right_it: IteratorPtr<'a>,
right_getter: TableRowGetter<'a>,
kind: NodeEdgeChainKind,
left_key_len: usize,
right_key_cache: OwnTuple,
right_key_cache: Option<OwnTuple>,
edge_front_table_id: u32,
left_cache: Option<MegaTuple>,
last_right_key_cache: Option<CowTuple>,
}
impl<'a> Iterator for NodeToEdgeChainJoinIterator<'a> {
@ -471,27 +475,73 @@ impl<'a> Iterator for NodeToEdgeChainJoinIterator<'a> {
fn next(&mut self) -> Option<Self::Item> {
'outer: loop {
match self.left.next() {
if self.left_cache.is_none() {
self.left_cache = match self.left.next() {
None => return None,
Some(Ok(v)) => Some(v),
Some(Err(e)) => return Some(Err(e)),
};
}
match &self.left_cache {
None => return None,
Some(Err(e)) => return Some(Err(e)),
Some(Ok(mut left_tuple)) => {
let left_key = left_tuple.keys.last().unwrap();
self.right_key_cache.truncate_all();
self.right_key_cache
.push_int(self.edge_front_table_id as i64);
for v in left_key.iter() {
self.right_key_cache.push_value(&v);
Some(left_tuple) => {
if self.right_key_cache.is_none() {
let left_key = left_tuple.keys.last().unwrap();
let mut right_key_cache = self.right_getter.key_cache.clone();
right_key_cache.truncate_all();
right_key_cache.push_int(self.edge_front_table_id as i64);
for v in left_key.iter() {
right_key_cache.push_value(&v);
}
self.right_key_cache = Some(right_key_cache);
}
self.right_it.seek(&self.right_key_cache);
'inner: while let Some((r_key, r_val)) = unsafe { self.right_it.pair() } {
let right_key_cache = match &self.right_key_cache {
Some(v) => v,
_ => unreachable!(),
};
let mut started = false;
match &self.last_right_key_cache {
None => {
self.right_it.seek(right_key_cache);
}
Some(v) => {
started = true;
self.right_it.seek(v);
}
}
self.last_right_key_cache = None;
let mut is_first_loop = true;
'inner: while let Some((r_key, r_val)) = {
if !started {
started = true;
} else {
self.right_it.next();
}
unsafe { self.right_it.pair() }
} {
let r_key = Tuple::new(r_key);
if !r_key.starts_with(&self.right_key_cache) {
if !r_key.starts_with(right_key_cache) {
self.right_key_cache = None;
if is_first_loop {
self.left_cache = None;
}
// left join return here
// if self {
//
// }
continue 'outer;
} else {
is_first_loop = false;
let is_edge_forward = r_key.get_bool(self.left_key_len + 1).unwrap();
match self.kind {
NodeEdgeChainKind::Fwd => {
if is_edge_forward {
self.last_right_key_cache = Some(r_key.clone().into());
let mut left_tuple = left_tuple.clone();
left_tuple.keys.push(r_key.into());
left_tuple.vals.push(Tuple::new(r_val).into());
return Some(Ok(left_tuple));
@ -505,6 +555,8 @@ impl<'a> Iterator for NodeToEdgeChainJoinIterator<'a> {
match self.right_getter.get_with_tuple(&real_r_key) {
Ok(None) => unreachable!(),
Ok(Some(v)) => {
self.last_right_key_cache = Some(r_key.into());
let mut left_tuple = left_tuple.clone();
left_tuple.keys.push(real_r_key.into());
left_tuple.vals.push(v.into());
return Some(Ok(left_tuple));
@ -521,6 +573,8 @@ impl<'a> Iterator for NodeToEdgeChainJoinIterator<'a> {
}
}
}
// iterator goes out of the table
return None;
}
}
}
@ -528,7 +582,7 @@ impl<'a> Iterator for NodeToEdgeChainJoinIterator<'a> {
}
pub struct EdgeToNodeChainJoinIterator<'a> {
left: Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>,
left: Box<dyn Iterator<Item = Result<MegaTuple>> + 'a>,
right: TableRowGetter<'a>,
left_outer: bool,
key_start_idx: usize,
@ -545,9 +599,8 @@ impl<'a> Iterator for EdgeToNodeChainJoinIterator<'a> {
Some(Err(e)) => return Some(Err(e)),
Some(Ok(mut left_tuple)) => {
self.right.reset();
let key_iter = (self.key_start_idx..self.key_end_idx).map(|i| {
left_tuple.keys.last().unwrap().get(i).unwrap()
});
let key_iter = (self.key_start_idx..self.key_end_idx)
.map(|i| left_tuple.keys.last().unwrap().get(i).unwrap());
match self.right.get_with_iter(key_iter) {
Ok(v) => {
match v {
@ -575,8 +628,8 @@ impl<'a> Iterator for EdgeToNodeChainJoinIterator<'a> {
}
pub struct KeyedUnionIterator<'a> {
left: Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>,
right: Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>,
left: Box<dyn Iterator<Item = Result<MegaTuple>> + 'a>,
right: Box<dyn Iterator<Item = Result<MegaTuple>> + 'a>,
}
impl<'a> Iterator for KeyedUnionIterator<'a> {
@ -627,8 +680,8 @@ impl<'a> Iterator for KeyedUnionIterator<'a> {
}
pub struct KeyedDifferenceIterator<'a> {
left: Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>,
right: Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>,
left: Box<dyn Iterator<Item = Result<MegaTuple>> + 'a>,
right: Box<dyn Iterator<Item = Result<MegaTuple>> + 'a>,
right_cache: Option<MegaTuple>,
started: bool,
}
@ -696,7 +749,7 @@ impl<'a> Iterator for KeyedDifferenceIterator<'a> {
}
pub struct BagsUnionIterator<'a> {
bags: Vec<Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>>,
bags: Vec<Box<dyn Iterator<Item = Result<MegaTuple>> + 'a>>,
current: usize,
}
@ -820,7 +873,7 @@ impl<'a> Iterator for EdgeKeyOnlyBwdIterator<'a> {
}
pub struct KeySortedWithAssocIterator<'a> {
main: Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>,
main: Box<dyn Iterator<Item = Result<MegaTuple>> + 'a>,
associates: Vec<NodeIterator<'a>>,
buffer: Vec<Option<(CowTuple, CowTuple)>>,
}
@ -904,8 +957,8 @@ impl<'a> Iterator for KeySortedWithAssocIterator<'a> {
}
pub struct OuterMergeJoinIterator<'a> {
left: Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>,
right: Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>,
left: Box<dyn Iterator<Item = Result<MegaTuple>> + 'a>,
right: Box<dyn Iterator<Item = Result<MegaTuple>> + 'a>,
left_outer: bool,
right_outer: bool,
left_keys: &'a [(TableId, ColId)],
@ -1053,8 +1106,8 @@ impl<'a> Iterator for OuterMergeJoinIterator<'a> {
}
pub struct MergeJoinIterator<'a> {
left: Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>,
right: Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>,
left: Box<dyn Iterator<Item = Result<MegaTuple>> + 'a>,
right: Box<dyn Iterator<Item = Result<MegaTuple>> + 'a>,
left_keys: &'a [(TableId, ColId)],
right_keys: &'a [(TableId, ColId)],
}
@ -1114,10 +1167,10 @@ impl<'a> Iterator for MergeJoinIterator<'a> {
}
pub struct CartesianProdIterator<'a> {
left: Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>,
left: Box<dyn Iterator<Item = Result<MegaTuple>> + 'a>,
left_cache: MegaTuple,
right_source: &'a ExecPlan<'a>,
right: Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>,
right: Box<dyn Iterator<Item = Result<MegaTuple>> + 'a>,
}
impl<'a> Iterator for CartesianProdIterator<'a> {
@ -1160,7 +1213,7 @@ impl<'a> Iterator for CartesianProdIterator<'a> {
}
pub struct FilterIterator<'a> {
it: Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>,
it: Box<dyn Iterator<Item = Result<MegaTuple>> + 'a>,
filter: &'a Value<'a>,
}
@ -1188,7 +1241,7 @@ impl<'a> Iterator for FilterIterator<'a> {
}
pub struct OutputIterator<'a> {
it: Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>,
it: Box<dyn Iterator<Item = Result<MegaTuple>> + 'a>,
transform: &'a Value<'a>,
}
@ -1214,7 +1267,7 @@ impl<'a> Iterator for OutputIterator<'a> {
}
pub struct EvalIterator<'a> {
it: Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>,
it: Box<dyn Iterator<Item = Result<MegaTuple>> + 'a>,
keys: &'a [(String, Value<'a>)],
vals: &'a [(String, Value<'a>)],
}
@ -1479,6 +1532,21 @@ mod tests {
}
let duration = start.elapsed();
println!("Time elapsed {:?}", duration);
let start = Instant::now();
let s = r##"from (j:Job)<-[hj:HasJob]-(e:Employee)
where j.id == 16
select { eid: e.id, jid: j.id, fname: e.first_name, salary: hj.salary, job: j.title }"##;
let parsed = Parser::parse(Rule::relational_query, s)?.next().unwrap();
let plan = sess.query_to_plan(parsed)?;
let plan = sess.reify_output_plan(plan)?;
for val in plan.iter()? {
println!("{}", val?)
}
let duration = start.elapsed();
println!("Time elapsed {:?}", duration);
}
drop(engine);
let _ = fs::remove_dir_all(db_path);

@ -199,10 +199,14 @@ impl<'a> Session<'a> {
None => Default::default(),
Some(binding) => match kind {
ChainJoinKind::NodeToFwdEdge | ChainJoinKind::NodeToBwdEdge => {
convert_to_relative_accessor_map(self.edge_accessor_map(binding, &right_info))
convert_to_relative_accessor_map(
self.edge_accessor_map(binding, &right_info),
)
}
ChainJoinKind::FwdEdgeToNode | ChainJoinKind::BwdEdgeToNode => {
convert_to_relative_accessor_map(self.node_accessor_map(binding, &right_info))
convert_to_relative_accessor_map(
self.node_accessor_map(binding, &right_info),
)
}
},
};

@ -77,7 +77,7 @@ impl<'a> Session<'a> {
.map(|p| match p.as_rule() {
Rule::node_pattern => self.parse_node_pattern(p),
Rule::edge_pattern => {
let right_join;
// let right_join;
let mut pairs = p.into_inner();
let nxt = pairs.next().unwrap();
if nxt.as_rule() == Rule::outer_join_marker {
@ -87,7 +87,7 @@ impl<'a> Session<'a> {
"Right outer join not supported here".to_string(),
));
} else {
right_join = false;
// right_join = false;
}
let mut edge = match nxt.as_rule() {
Rule::fwd_edge_pattern => self.parse_edge_pattern(nxt, true)?,
@ -95,7 +95,7 @@ impl<'a> Session<'a> {
_ => unreachable!(),
};
edge.left_outer_marker = pairs.next().is_some();
edge.right_outer_marker = right_join;
edge.right_outer_marker = false; // right_join;
Ok(edge)
}
_ => unreachable!(),

Loading…
Cancel
Save