diff --git a/src/db/eval.rs b/src/db/eval.rs index f95479af..5b967e75 100644 --- a/src/db/eval.rs +++ b/src/db/eval.rs @@ -24,7 +24,9 @@ pub fn extract_table_ref<'a>( }; let target = targets .get(tid.id as usize) - .ok_or_else(|| LogicError("Tuple ref out of bound".to_string()))?; + .ok_or_else(|| { + LogicError(format!("Tuple ref out of bound: wanted {:?} for {}", tid, targets.len())) + })?; if matches!(target.data_kind(), Ok(DataKind::Empty)) { Ok(Value::Null) } else { diff --git a/src/db/iterator.rs b/src/db/iterator.rs index e5a894e1..694e6553 100644 --- a/src/db/iterator.rs +++ b/src/db/iterator.rs @@ -11,7 +11,6 @@ use cozorocks::IteratorPtr; use std::cmp::Ordering; use std::fmt::{Debug, Formatter}; use std::{iter, mem}; -use std::ops::Range; pub enum IteratorSlot<'a> { Dummy, @@ -43,14 +42,38 @@ impl<'a> TableRowGetter<'a> { pub fn reset(&mut self) { self.key_cache.truncate_all(); } - pub fn get<'b, T: Iterator>>(&mut self, vals: T) -> Result> { + pub fn get_with_tuple<'b, T: AsRef<[u8]>>(&self, t: &'b T) -> Result> { + let res = self + .sess + .txn + .get( + self.in_root, + if self.in_root { + &self.sess.perm_cf + } else { + &self.sess.temp_cf + }, + t, + )? + .map(Tuple::new); + Ok(res) + } + pub fn get_with_iter<'b, T: Iterator>>( + &mut self, + vals: T, + ) -> Result> { for val in vals { self.key_cache.push_value(&val); } let val = self.sess.txn.get( self.in_root, - if self.in_root { &self.sess.perm_cf } else { &self.sess.temp_cf }, - &self.key_cache)?; + if self.in_root { + &self.sess.perm_cf + } else { + &self.sess.temp_cf + }, + &self.key_cache, + )?; Ok(val.map(Tuple::new)) } } @@ -79,6 +102,7 @@ impl<'a> IteratorSlot<'a> { } } +#[allow(clippy::large_enum_variant)] #[derive(Debug, Clone, Copy, Eq, PartialEq)] pub enum ChainJoinKind { NodeToFwdEdge, @@ -112,6 +136,7 @@ pub enum ExecPlan<'a> { }, ChainJoinItPlan { left: Box>, + left_info: TableInfo, right: TableRowGetterSlot<'a>, right_info: TableInfo, right_binding: Option, @@ -204,11 +229,9 @@ impl<'a> ExecPlan<'a> { ExecPlan::EdgeBwdItPlan { .. } => { todo!() } - ExecPlan::ChainJoinItPlan { - left, .. - } => { + ExecPlan::ChainJoinItPlan { left, .. } => { let (l1, l2) = left.tuple_widths(); - (l1, l2 + 1) + (l1 + 1, l2 + 1) } } } @@ -354,44 +377,70 @@ impl<'a> ExecPlan<'a> { ExecPlan::EdgeBwdItPlan { .. } => { todo!() } - ExecPlan::ChainJoinItPlan { left, right, left_outer, kind, right_info, .. } => { - match right { - TableRowGetterSlot::Dummy => Err(LogicError("Uninitialized chain join".to_string())), - TableRowGetterSlot::Reified(right) => { - Ok(match kind { - ChainJoinKind::NodeToFwdEdge => Box::new(NodeToFwdEdgeChainJoinIterator { - left: left.iter()?, - right: right.clone(), - }), - ChainJoinKind::NodeToBwdEdge => todo!(), - ChainJoinKind::FwdEdgeToNode => { - let key_indices = Range { - start: 1, - end: right_info.src_key_typing.len() + 1, - }; - Box::new(EdgeToNodeChainJoinIterator { - left: left.iter()?, - right: right.clone(), - left_outer: *left_outer, - key_indices, - }) - } - ChainJoinKind::BwdEdgeToNode => { - let key_indices = Range { - start: right_info.src_key_typing.len() + 2, - end: right_info.src_key_typing.len() + right_info.dst_key_typing.len() + 2, - }; - Box::new(EdgeToNodeChainJoinIterator { - left: left.iter()?, - right: right.clone(), - left_outer: *left_outer, - key_indices, - }) - } + ExecPlan::ChainJoinItPlan { + left, + right, + left_outer, + kind, + left_info, + right_info, + .. + } => match right { + TableRowGetterSlot::Dummy => { + Err(LogicError("Uninitialized chain join".to_string())) + } + TableRowGetterSlot::Reified(right) => Ok(match kind { + ChainJoinKind::NodeToFwdEdge | ChainJoinKind::NodeToBwdEdge => { + let chain_kind = match kind { + ChainJoinKind::NodeToFwdEdge => NodeEdgeChainKind::Fwd, + ChainJoinKind::NodeToBwdEdge => NodeEdgeChainKind::Bwd, + _ => unreachable!(), + }; + let (edge_front_table_id, left_key_len) = + if *kind == ChainJoinKind::NodeToBwdEdge { + ( + right_info.dst_table_id.id as u32, + right_info.dst_key_typing.len(), + ) + } else { + ( + right_info.src_table_id.id as u32, + right_info.src_key_typing.len(), + ) + }; + let right_iter = if right.in_root { + right.sess.txn.iterator(true, &right.sess.perm_cf) + } else { + right.sess.txn.iterator(false, &right.sess.temp_cf) + }; + Box::new(NodeToEdgeChainJoinIterator { + left: left.iter()?, + right_it: right_iter, + right_getter: right.clone(), + kind: chain_kind, + right_key_cache: right.key_cache.clone(), + edge_front_table_id, + left_key_len, }) } - } - } + ChainJoinKind::FwdEdgeToNode => Box::new(EdgeToNodeChainJoinIterator { + left: left.iter()?, + right: right.clone(), + left_outer: *left_outer, + key_start_idx: left_info.src_key_typing.len() + 2, + key_end_idx: left_info.src_key_typing.len() + + left_info.dst_key_typing.len() + + 2, + }), + ChainJoinKind::BwdEdgeToNode => Box::new(EdgeToNodeChainJoinIterator { + left: left.iter()?, + right: right.clone(), + left_outer: *left_outer, + key_start_idx: 1, + key_end_idx: left_info.src_key_typing.len() + 1, + }), + }), + }, } } } @@ -400,25 +449,90 @@ impl<'a> ExecPlan<'a> { // Never define `.next()` recursively for iterators below, otherwise stackoverflow is almost // guaranteed (but may not show for test data) -pub struct NodeToFwdEdgeChainJoinIterator<'a> { +#[derive(Copy, Clone, Debug)] +pub enum NodeEdgeChainKind { + Fwd, + Bwd, + Bidi, +} + +pub struct NodeToEdgeChainJoinIterator<'a> { left: Box> + 'a>, - right: TableRowGetter<'a>, + right_it: IteratorPtr<'a>, + right_getter: TableRowGetter<'a>, + kind: NodeEdgeChainKind, + left_key_len: usize, + right_key_cache: OwnTuple, + edge_front_table_id: u32, } -impl<'a> Iterator for NodeToFwdEdgeChainJoinIterator<'a> { +impl<'a> Iterator for NodeToEdgeChainJoinIterator<'a> { type Item = Result; fn next(&mut self) -> Option { - todo!() + 'outer: loop { + match self.left.next() { + None => return None, + Some(Err(e)) => return Some(Err(e)), + Some(Ok(mut left_tuple)) => { + let left_key = left_tuple.keys.last().unwrap(); + self.right_key_cache.truncate_all(); + self.right_key_cache + .push_int(self.edge_front_table_id as i64); + for v in left_key.iter() { + self.right_key_cache.push_value(&v); + } + self.right_it.seek(&self.right_key_cache); + 'inner: while let Some((r_key, r_val)) = unsafe { self.right_it.pair() } { + let r_key = Tuple::new(r_key); + if !r_key.starts_with(&self.right_key_cache) { + continue 'outer; + } else { + let is_edge_forward = r_key.get_bool(self.left_key_len + 1).unwrap(); + match self.kind { + NodeEdgeChainKind::Fwd => { + if is_edge_forward { + left_tuple.keys.push(r_key.into()); + left_tuple.vals.push(Tuple::new(r_val).into()); + return Some(Ok(left_tuple)); + } else { + continue 'inner; + } + } + NodeEdgeChainKind::Bwd => { + if !is_edge_forward { + let real_r_key = Tuple::new(r_val); + match self.right_getter.get_with_tuple(&real_r_key) { + Ok(None) => unreachable!(), + Ok(Some(v)) => { + left_tuple.keys.push(real_r_key.into()); + left_tuple.vals.push(v.into()); + return Some(Ok(left_tuple)); + } + Err(e) => return Some(Err(e)), + } + } else { + continue 'inner; + } + } + NodeEdgeChainKind::Bidi => { + todo!() + } + } + } + } + } + } + } } } - pub struct EdgeToNodeChainJoinIterator<'a> { left: Box> + 'a>, right: TableRowGetter<'a>, left_outer: bool, - key_indices: Range, + key_start_idx: usize, + key_end_idx: usize, } impl<'a> Iterator for EdgeToNodeChainJoinIterator<'a> { @@ -431,26 +545,28 @@ impl<'a> Iterator for EdgeToNodeChainJoinIterator<'a> { Some(Err(e)) => return Some(Err(e)), Some(Ok(mut left_tuple)) => { self.right.reset(); - let key_iter = self.key_indices.clone().map(|i| + let key_iter = (self.key_start_idx..self.key_end_idx).map(|i| { left_tuple.keys.last().unwrap().get(i).unwrap() - ); - match self.right.get(key_iter) { + }); + match self.right.get_with_iter(key_iter) { Ok(v) => { match v { None => { if self.left_outer { + left_tuple.keys.push(OwnTuple::empty_tuple().into()); left_tuple.vals.push(OwnTuple::empty_tuple().into()); return Some(Ok(left_tuple)); } // else fall through, go to the next iteration } Some(right_val) => { + left_tuple.keys.push(self.right.key_cache.clone().into()); left_tuple.vals.push(right_val.into()); return Some(Ok(left_tuple)); } } } - Err(e) => return Some(Err(e)) + Err(e) => return Some(Err(e)), } } }; @@ -1341,9 +1457,7 @@ mod tests { let parsed = Parser::parse(Rule::relational_query, s)?.next().unwrap(); let plan = sess.query_to_plan(parsed)?; - println!("{:?}", plan); let plan = sess.reify_output_plan(plan)?; - println!("{:?}", plan); for val in plan.iter()? { println!("{}", val?) } @@ -1351,18 +1465,20 @@ mod tests { let duration = start.elapsed(); println!("Time elapsed {:?}", duration); + let start = Instant::now(); + let s = r##"from (e:Employee)-[hj:HasJob]->(j:Job) - where e.id == 110 - select { fname: e.first_name, salary: hj.salary, job: j.title }"##; + where j.id == 16 + select { eid: e.id, jid: j.id, fname: e.first_name, salary: hj.salary, job: j.title }"##; let parsed = Parser::parse(Rule::relational_query, s)?.next().unwrap(); let plan = sess.query_to_plan(parsed)?; - println!("{:?}", plan); let plan = sess.reify_output_plan(plan)?; - println!("{:?}", plan); for val in plan.iter()? { println!("{}", val?) } + let duration = start.elapsed(); + println!("Time elapsed {:?}", duration); } drop(engine); let _ = fs::remove_dir_all(db_path); diff --git a/src/db/plan.rs b/src/db/plan.rs index 3aa4f0d5..9125ae76 100644 --- a/src/db/plan.rs +++ b/src/db/plan.rs @@ -1,6 +1,8 @@ use crate::db::engine::Session; -use crate::db::iterator::{ChainJoinKind, ExecPlan, IteratorSlot, OutputItPlan, TableRowGetter, TableRowGetterSlot}; -use crate::db::query::{EdgeOrNodeEl, EdgeOrNodeKind, FromEl, Selection}; +use crate::db::iterator::{ + ChainJoinKind, ExecPlan, IteratorSlot, OutputItPlan, TableRowGetter, TableRowGetterSlot, +}; +use crate::db::query::{EdgeOrNodeKind, FromEl, Selection}; use crate::db::table::{ColId, TableId, TableInfo}; use crate::error::CozoError::LogicError; use crate::error::Result; @@ -67,10 +69,10 @@ fn shift_accessor_map(amap: AccessorMap, (keyshift, valshift): (usize, usize)) - tid.in_root, tid.id + if cid.is_key { - keyshift as i64 - } else { - valshift as i64 - }, + keyshift as i64 + } else { + valshift as i64 + }, ), cid, ), @@ -185,27 +187,29 @@ impl<'a> Session<'a> { ExecPlan::BagsUnionIt { .. } => todo!(), ExecPlan::ChainJoinItPlan { left, - right, + left_info, right_info, kind, left_outer, right_binding, + .. } => { let (l_plan, l_map) = self.do_reify_intermediate_plan(*left)?; let r_map = match &right_binding { None => Default::default(), Some(binding) => match kind { ChainJoinKind::NodeToFwdEdge | ChainJoinKind::NodeToBwdEdge => { - self.edge_accessor_map(binding, &right_info) + convert_to_relative_accessor_map(self.edge_accessor_map(binding, &right_info)) } ChainJoinKind::FwdEdgeToNode | ChainJoinKind::BwdEdgeToNode => { - self.node_accessor_map(binding, &right_info) + convert_to_relative_accessor_map(self.node_accessor_map(binding, &right_info)) } }, }; let r_map = shift_accessor_map(r_map, l_plan.tuple_widths()); let plan = ExecPlan::ChainJoinItPlan { left: l_plan.into(), + left_info, right: TableRowGetterSlot::Reified(TableRowGetter { sess: self, key_cache: OwnTuple::with_prefix(right_info.table_id.id as u32), @@ -281,6 +285,7 @@ impl<'a> Session<'a> { .ok_or_else(|| LogicError("Empty chain not allowed".to_string()))?; let mut prev_kind = nxt.kind; let mut prev_left_outer = nxt.left_outer_marker; + let mut last_info = nxt.info.clone(); let mut plan = match prev_kind { EdgeOrNodeKind::Node => ExecPlan::NodeItPlan { it: IteratorSlot::Dummy, @@ -302,8 +307,9 @@ impl<'a> Session<'a> { for el in it { plan = ExecPlan::ChainJoinItPlan { left: plan.into(), + left_info: last_info, right: TableRowGetterSlot::Dummy, - right_info: el.info, + right_info: el.info.clone(), kind: match (prev_kind, el.kind) { (EdgeOrNodeKind::Node, EdgeOrNodeKind::FwdEdge) => { ChainJoinKind::NodeToFwdEdge @@ -325,8 +331,9 @@ impl<'a> Session<'a> { prev_kind = el.kind; prev_left_outer = el.left_outer_marker; + last_info = el.info; } - println!("{:#?}", plan); + // println!("{:#?}", plan); Ok(plan) } }; diff --git a/src/db/query.rs b/src/db/query.rs index d02f7e30..5d69361f 100644 --- a/src/db/query.rs +++ b/src/db/query.rs @@ -1,5 +1,6 @@ use crate::db::engine::Session; use crate::db::table::TableInfo; +use crate::error::CozoError::LogicError; use crate::error::{CozoError, Result}; use crate::parser::text_identifier::{build_name_in_def, parse_string}; use crate::parser::Rule; @@ -8,7 +9,6 @@ use crate::relation::value; use crate::relation::value::{StaticValue, Value}; use pest::iterators::Pair; use std::collections::BTreeMap; -use crate::error::CozoError::LogicError; #[derive(Debug, Eq, PartialEq, Clone)] pub enum FromEl { @@ -79,11 +79,13 @@ impl<'a> Session<'a> { Rule::edge_pattern => { let right_join; let mut pairs = p.into_inner(); - let mut nxt = pairs.next().unwrap(); + let nxt = pairs.next().unwrap(); if nxt.as_rule() == Rule::outer_join_marker { // right_join = true; // nxt = pairs.next().unwrap(); - return Err(LogicError("Right outer join not supported here".to_string())) + return Err(LogicError( + "Right outer join not supported here".to_string(), + )); } else { right_join = false; }