nested loop join complete

main
Ziyang Hu 2 years ago
parent a0b001f94a
commit c7b22e6d6f

@ -24,7 +24,9 @@ pub fn extract_table_ref<'a>(
};
let target = targets
.get(tid.id as usize)
.ok_or_else(|| LogicError("Tuple ref out of bound".to_string()))?;
.ok_or_else(|| {
LogicError(format!("Tuple ref out of bound: wanted {:?} for {}", tid, targets.len()))
})?;
if matches!(target.data_kind(), Ok(DataKind::Empty)) {
Ok(Value::Null)
} else {

@ -11,7 +11,6 @@ use cozorocks::IteratorPtr;
use std::cmp::Ordering;
use std::fmt::{Debug, Formatter};
use std::{iter, mem};
use std::ops::Range;
pub enum IteratorSlot<'a> {
Dummy,
@ -43,14 +42,38 @@ impl<'a> TableRowGetter<'a> {
pub fn reset(&mut self) {
self.key_cache.truncate_all();
}
pub fn get<'b, T: Iterator<Item=Value<'b>>>(&mut self, vals: T) -> Result<Option<SliceTuple>> {
pub fn get_with_tuple<'b, T: AsRef<[u8]>>(&self, t: &'b T) -> Result<Option<SliceTuple>> {
let res = self
.sess
.txn
.get(
self.in_root,
if self.in_root {
&self.sess.perm_cf
} else {
&self.sess.temp_cf
},
t,
)?
.map(Tuple::new);
Ok(res)
}
pub fn get_with_iter<'b, T: Iterator<Item=Value<'b>>>(
&mut self,
vals: T,
) -> Result<Option<SliceTuple>> {
for val in vals {
self.key_cache.push_value(&val);
}
let val = self.sess.txn.get(
self.in_root,
if self.in_root { &self.sess.perm_cf } else { &self.sess.temp_cf },
&self.key_cache)?;
if self.in_root {
&self.sess.perm_cf
} else {
&self.sess.temp_cf
},
&self.key_cache,
)?;
Ok(val.map(Tuple::new))
}
}
@ -79,6 +102,7 @@ impl<'a> IteratorSlot<'a> {
}
}
#[allow(clippy::large_enum_variant)]
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub enum ChainJoinKind {
NodeToFwdEdge,
@ -112,6 +136,7 @@ pub enum ExecPlan<'a> {
},
ChainJoinItPlan {
left: Box<ExecPlan<'a>>,
left_info: TableInfo,
right: TableRowGetterSlot<'a>,
right_info: TableInfo,
right_binding: Option<String>,
@ -204,11 +229,9 @@ impl<'a> ExecPlan<'a> {
ExecPlan::EdgeBwdItPlan { .. } => {
todo!()
}
ExecPlan::ChainJoinItPlan {
left, ..
} => {
ExecPlan::ChainJoinItPlan { left, .. } => {
let (l1, l2) = left.tuple_widths();
(l1, l2 + 1)
(l1 + 1, l2 + 1)
}
}
}
@ -354,44 +377,70 @@ impl<'a> ExecPlan<'a> {
ExecPlan::EdgeBwdItPlan { .. } => {
todo!()
}
ExecPlan::ChainJoinItPlan { left, right, left_outer, kind, right_info, .. } => {
match right {
TableRowGetterSlot::Dummy => Err(LogicError("Uninitialized chain join".to_string())),
TableRowGetterSlot::Reified(right) => {
Ok(match kind {
ChainJoinKind::NodeToFwdEdge => Box::new(NodeToFwdEdgeChainJoinIterator {
left: left.iter()?,
right: right.clone(),
}),
ChainJoinKind::NodeToBwdEdge => todo!(),
ChainJoinKind::FwdEdgeToNode => {
let key_indices = Range {
start: 1,
end: right_info.src_key_typing.len() + 1,
};
Box::new(EdgeToNodeChainJoinIterator {
left: left.iter()?,
right: right.clone(),
left_outer: *left_outer,
key_indices,
})
}
ChainJoinKind::BwdEdgeToNode => {
let key_indices = Range {
start: right_info.src_key_typing.len() + 2,
end: right_info.src_key_typing.len() + right_info.dst_key_typing.len() + 2,
};
Box::new(EdgeToNodeChainJoinIterator {
left: left.iter()?,
right: right.clone(),
left_outer: *left_outer,
key_indices,
})
}
ExecPlan::ChainJoinItPlan {
left,
right,
left_outer,
kind,
left_info,
right_info,
..
} => match right {
TableRowGetterSlot::Dummy => {
Err(LogicError("Uninitialized chain join".to_string()))
}
TableRowGetterSlot::Reified(right) => Ok(match kind {
ChainJoinKind::NodeToFwdEdge | ChainJoinKind::NodeToBwdEdge => {
let chain_kind = match kind {
ChainJoinKind::NodeToFwdEdge => NodeEdgeChainKind::Fwd,
ChainJoinKind::NodeToBwdEdge => NodeEdgeChainKind::Bwd,
_ => unreachable!(),
};
let (edge_front_table_id, left_key_len) =
if *kind == ChainJoinKind::NodeToBwdEdge {
(
right_info.dst_table_id.id as u32,
right_info.dst_key_typing.len(),
)
} else {
(
right_info.src_table_id.id as u32,
right_info.src_key_typing.len(),
)
};
let right_iter = if right.in_root {
right.sess.txn.iterator(true, &right.sess.perm_cf)
} else {
right.sess.txn.iterator(false, &right.sess.temp_cf)
};
Box::new(NodeToEdgeChainJoinIterator {
left: left.iter()?,
right_it: right_iter,
right_getter: right.clone(),
kind: chain_kind,
right_key_cache: right.key_cache.clone(),
edge_front_table_id,
left_key_len,
})
}
}
}
ChainJoinKind::FwdEdgeToNode => Box::new(EdgeToNodeChainJoinIterator {
left: left.iter()?,
right: right.clone(),
left_outer: *left_outer,
key_start_idx: left_info.src_key_typing.len() + 2,
key_end_idx: left_info.src_key_typing.len()
+ left_info.dst_key_typing.len()
+ 2,
}),
ChainJoinKind::BwdEdgeToNode => Box::new(EdgeToNodeChainJoinIterator {
left: left.iter()?,
right: right.clone(),
left_outer: *left_outer,
key_start_idx: 1,
key_end_idx: left_info.src_key_typing.len() + 1,
}),
}),
},
}
}
}
@ -400,25 +449,90 @@ impl<'a> ExecPlan<'a> {
// Never define `.next()` recursively for iterators below, otherwise stackoverflow is almost
// guaranteed (but may not show for test data)
pub struct NodeToFwdEdgeChainJoinIterator<'a> {
#[derive(Copy, Clone, Debug)]
pub enum NodeEdgeChainKind {
Fwd,
Bwd,
Bidi,
}
pub struct NodeToEdgeChainJoinIterator<'a> {
left: Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>,
right: TableRowGetter<'a>,
right_it: IteratorPtr<'a>,
right_getter: TableRowGetter<'a>,
kind: NodeEdgeChainKind,
left_key_len: usize,
right_key_cache: OwnTuple,
edge_front_table_id: u32,
}
impl<'a> Iterator for NodeToFwdEdgeChainJoinIterator<'a> {
impl<'a> Iterator for NodeToEdgeChainJoinIterator<'a> {
type Item = Result<MegaTuple>;
fn next(&mut self) -> Option<Self::Item> {
todo!()
'outer: loop {
match self.left.next() {
None => return None,
Some(Err(e)) => return Some(Err(e)),
Some(Ok(mut left_tuple)) => {
let left_key = left_tuple.keys.last().unwrap();
self.right_key_cache.truncate_all();
self.right_key_cache
.push_int(self.edge_front_table_id as i64);
for v in left_key.iter() {
self.right_key_cache.push_value(&v);
}
self.right_it.seek(&self.right_key_cache);
'inner: while let Some((r_key, r_val)) = unsafe { self.right_it.pair() } {
let r_key = Tuple::new(r_key);
if !r_key.starts_with(&self.right_key_cache) {
continue 'outer;
} else {
let is_edge_forward = r_key.get_bool(self.left_key_len + 1).unwrap();
match self.kind {
NodeEdgeChainKind::Fwd => {
if is_edge_forward {
left_tuple.keys.push(r_key.into());
left_tuple.vals.push(Tuple::new(r_val).into());
return Some(Ok(left_tuple));
} else {
continue 'inner;
}
}
NodeEdgeChainKind::Bwd => {
if !is_edge_forward {
let real_r_key = Tuple::new(r_val);
match self.right_getter.get_with_tuple(&real_r_key) {
Ok(None) => unreachable!(),
Ok(Some(v)) => {
left_tuple.keys.push(real_r_key.into());
left_tuple.vals.push(v.into());
return Some(Ok(left_tuple));
}
Err(e) => return Some(Err(e)),
}
} else {
continue 'inner;
}
}
NodeEdgeChainKind::Bidi => {
todo!()
}
}
}
}
}
}
}
}
}
pub struct EdgeToNodeChainJoinIterator<'a> {
left: Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>,
right: TableRowGetter<'a>,
left_outer: bool,
key_indices: Range<usize>,
key_start_idx: usize,
key_end_idx: usize,
}
impl<'a> Iterator for EdgeToNodeChainJoinIterator<'a> {
@ -431,26 +545,28 @@ impl<'a> Iterator for EdgeToNodeChainJoinIterator<'a> {
Some(Err(e)) => return Some(Err(e)),
Some(Ok(mut left_tuple)) => {
self.right.reset();
let key_iter = self.key_indices.clone().map(|i|
let key_iter = (self.key_start_idx..self.key_end_idx).map(|i| {
left_tuple.keys.last().unwrap().get(i).unwrap()
);
match self.right.get(key_iter) {
});
match self.right.get_with_iter(key_iter) {
Ok(v) => {
match v {
None => {
if self.left_outer {
left_tuple.keys.push(OwnTuple::empty_tuple().into());
left_tuple.vals.push(OwnTuple::empty_tuple().into());
return Some(Ok(left_tuple));
}
// else fall through, go to the next iteration
}
Some(right_val) => {
left_tuple.keys.push(self.right.key_cache.clone().into());
left_tuple.vals.push(right_val.into());
return Some(Ok(left_tuple));
}
}
}
Err(e) => return Some(Err(e))
Err(e) => return Some(Err(e)),
}
}
};
@ -1341,9 +1457,7 @@ mod tests {
let parsed = Parser::parse(Rule::relational_query, s)?.next().unwrap();
let plan = sess.query_to_plan(parsed)?;
println!("{:?}", plan);
let plan = sess.reify_output_plan(plan)?;
println!("{:?}", plan);
for val in plan.iter()? {
println!("{}", val?)
}
@ -1351,18 +1465,20 @@ mod tests {
let duration = start.elapsed();
println!("Time elapsed {:?}", duration);
let start = Instant::now();
let s = r##"from (e:Employee)-[hj:HasJob]->(j:Job)
where e.id == 110
select { fname: e.first_name, salary: hj.salary, job: j.title }"##;
where j.id == 16
select { eid: e.id, jid: j.id, fname: e.first_name, salary: hj.salary, job: j.title }"##;
let parsed = Parser::parse(Rule::relational_query, s)?.next().unwrap();
let plan = sess.query_to_plan(parsed)?;
println!("{:?}", plan);
let plan = sess.reify_output_plan(plan)?;
println!("{:?}", plan);
for val in plan.iter()? {
println!("{}", val?)
}
let duration = start.elapsed();
println!("Time elapsed {:?}", duration);
}
drop(engine);
let _ = fs::remove_dir_all(db_path);

@ -1,6 +1,8 @@
use crate::db::engine::Session;
use crate::db::iterator::{ChainJoinKind, ExecPlan, IteratorSlot, OutputItPlan, TableRowGetter, TableRowGetterSlot};
use crate::db::query::{EdgeOrNodeEl, EdgeOrNodeKind, FromEl, Selection};
use crate::db::iterator::{
ChainJoinKind, ExecPlan, IteratorSlot, OutputItPlan, TableRowGetter, TableRowGetterSlot,
};
use crate::db::query::{EdgeOrNodeKind, FromEl, Selection};
use crate::db::table::{ColId, TableId, TableInfo};
use crate::error::CozoError::LogicError;
use crate::error::Result;
@ -67,10 +69,10 @@ fn shift_accessor_map(amap: AccessorMap, (keyshift, valshift): (usize, usize)) -
tid.in_root,
tid.id
+ if cid.is_key {
keyshift as i64
} else {
valshift as i64
},
keyshift as i64
} else {
valshift as i64
},
),
cid,
),
@ -185,27 +187,29 @@ impl<'a> Session<'a> {
ExecPlan::BagsUnionIt { .. } => todo!(),
ExecPlan::ChainJoinItPlan {
left,
right,
left_info,
right_info,
kind,
left_outer,
right_binding,
..
} => {
let (l_plan, l_map) = self.do_reify_intermediate_plan(*left)?;
let r_map = match &right_binding {
None => Default::default(),
Some(binding) => match kind {
ChainJoinKind::NodeToFwdEdge | ChainJoinKind::NodeToBwdEdge => {
self.edge_accessor_map(binding, &right_info)
convert_to_relative_accessor_map(self.edge_accessor_map(binding, &right_info))
}
ChainJoinKind::FwdEdgeToNode | ChainJoinKind::BwdEdgeToNode => {
self.node_accessor_map(binding, &right_info)
convert_to_relative_accessor_map(self.node_accessor_map(binding, &right_info))
}
},
};
let r_map = shift_accessor_map(r_map, l_plan.tuple_widths());
let plan = ExecPlan::ChainJoinItPlan {
left: l_plan.into(),
left_info,
right: TableRowGetterSlot::Reified(TableRowGetter {
sess: self,
key_cache: OwnTuple::with_prefix(right_info.table_id.id as u32),
@ -281,6 +285,7 @@ impl<'a> Session<'a> {
.ok_or_else(|| LogicError("Empty chain not allowed".to_string()))?;
let mut prev_kind = nxt.kind;
let mut prev_left_outer = nxt.left_outer_marker;
let mut last_info = nxt.info.clone();
let mut plan = match prev_kind {
EdgeOrNodeKind::Node => ExecPlan::NodeItPlan {
it: IteratorSlot::Dummy,
@ -302,8 +307,9 @@ impl<'a> Session<'a> {
for el in it {
plan = ExecPlan::ChainJoinItPlan {
left: plan.into(),
left_info: last_info,
right: TableRowGetterSlot::Dummy,
right_info: el.info,
right_info: el.info.clone(),
kind: match (prev_kind, el.kind) {
(EdgeOrNodeKind::Node, EdgeOrNodeKind::FwdEdge) => {
ChainJoinKind::NodeToFwdEdge
@ -325,8 +331,9 @@ impl<'a> Session<'a> {
prev_kind = el.kind;
prev_left_outer = el.left_outer_marker;
last_info = el.info;
}
println!("{:#?}", plan);
// println!("{:#?}", plan);
Ok(plan)
}
};

@ -1,5 +1,6 @@
use crate::db::engine::Session;
use crate::db::table::TableInfo;
use crate::error::CozoError::LogicError;
use crate::error::{CozoError, Result};
use crate::parser::text_identifier::{build_name_in_def, parse_string};
use crate::parser::Rule;
@ -8,7 +9,6 @@ use crate::relation::value;
use crate::relation::value::{StaticValue, Value};
use pest::iterators::Pair;
use std::collections::BTreeMap;
use crate::error::CozoError::LogicError;
#[derive(Debug, Eq, PartialEq, Clone)]
pub enum FromEl {
@ -79,11 +79,13 @@ impl<'a> Session<'a> {
Rule::edge_pattern => {
let right_join;
let mut pairs = p.into_inner();
let mut nxt = pairs.next().unwrap();
let nxt = pairs.next().unwrap();
if nxt.as_rule() == Rule::outer_join_marker {
// right_join = true;
// nxt = pairs.next().unwrap();
return Err(LogicError("Right outer join not supported here".to_string()))
return Err(LogicError(
"Right outer join not supported here".to_string(),
));
} else {
right_join = false;
}

Loading…
Cancel
Save