edge to node join

main
Ziyang Hu 2 years ago
parent aa56126edc
commit a0b001f94a

@ -5,12 +5,13 @@ use crate::error::CozoError::LogicError;
use crate::error::Result;
use crate::relation::data::{DataKind, EMPTY_DATA};
use crate::relation::table::MegaTuple;
use crate::relation::tuple::{CowSlice, CowTuple, OwnTuple, Tuple};
use crate::relation::tuple::{CowSlice, CowTuple, OwnTuple, SliceTuple, Tuple};
use crate::relation::value::Value;
use cozorocks::IteratorPtr;
use std::cmp::Ordering;
use std::fmt::{Debug, Formatter};
use std::{iter, mem};
use std::ops::Range;
pub enum IteratorSlot<'a> {
Dummy,
@ -26,20 +27,39 @@ impl<'a> Debug for IteratorSlot<'a> {
}
}
pub enum TableRowGetter<'a> {
pub enum TableRowGetterSlot<'a> {
Dummy,
Reified {
sess: &'a Session<'a>,
key_cache: OwnTuple,
in_root: bool,
},
Reified(TableRowGetter<'a>),
}
#[derive(Clone)]
pub struct TableRowGetter<'a> {
pub sess: &'a Session<'a>,
pub key_cache: OwnTuple,
pub in_root: bool,
}
impl<'a> Debug for TableRowGetter<'a> {
impl<'a> TableRowGetter<'a> {
pub fn reset(&mut self) {
self.key_cache.truncate_all();
}
pub fn get<'b, T: Iterator<Item=Value<'b>>>(&mut self, vals: T) -> Result<Option<SliceTuple>> {
for val in vals {
self.key_cache.push_value(&val);
}
let val = self.sess.txn.get(
self.in_root,
if self.in_root { &self.sess.perm_cf } else { &self.sess.temp_cf },
&self.key_cache)?;
Ok(val.map(Tuple::new))
}
}
impl<'a> Debug for TableRowGetterSlot<'a> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
TableRowGetter::Dummy => write!(f, "DummyRowGetter"),
TableRowGetter::Reified { .. } => write!(f, "TableRowGetter"),
TableRowGetterSlot::Dummy => write!(f, "DummyRowGetter"),
TableRowGetterSlot::Reified { .. } => write!(f, "TableRowGetter"),
}
}
}
@ -88,16 +108,15 @@ pub enum ExecPlan<'a> {
it: IteratorSlot<'a>,
info: TableInfo,
binding: Option<String>,
getter: TableRowGetter<'a>,
getter: TableRowGetterSlot<'a>,
},
ChainJoinItPlan {
left: Box<ExecPlan<'a>>,
right: TableRowGetter<'a>,
right: TableRowGetterSlot<'a>,
right_info: TableInfo,
right_binding: Option<String>,
kind: ChainJoinKind,
left_outer: bool,
right_outer: bool,
},
// IndexIt {it: ..}
KeySortedWithAssocItPlan {
@ -186,10 +205,10 @@ impl<'a> ExecPlan<'a> {
todo!()
}
ExecPlan::ChainJoinItPlan {
left, right_info, ..
left, ..
} => {
let (l1, l2) = left.tuple_widths();
(l1 + 1, l2 + 1)
(l1, l2 + 1)
}
}
}
@ -211,7 +230,7 @@ impl<'a> OutputItPlan<'a> {
}
impl<'a> ExecPlan<'a> {
pub fn iter(&'a self) -> Result<Box<dyn Iterator<Item = Result<MegaTuple>> + 'a>> {
pub fn iter(&'a self) -> Result<Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>> {
match self {
ExecPlan::NodeItPlan { it, info, .. } => {
let it = it.try_get()?;
@ -335,16 +354,113 @@ impl<'a> ExecPlan<'a> {
ExecPlan::EdgeBwdItPlan { .. } => {
todo!()
}
ExecPlan::ChainJoinItPlan { .. } => {
todo!()
ExecPlan::ChainJoinItPlan { left, right, left_outer, kind, right_info, .. } => {
match right {
TableRowGetterSlot::Dummy => Err(LogicError("Uninitialized chain join".to_string())),
TableRowGetterSlot::Reified(right) => {
Ok(match kind {
ChainJoinKind::NodeToFwdEdge => Box::new(NodeToFwdEdgeChainJoinIterator {
left: left.iter()?,
right: right.clone(),
}),
ChainJoinKind::NodeToBwdEdge => todo!(),
ChainJoinKind::FwdEdgeToNode => {
let key_indices = Range {
start: 1,
end: right_info.src_key_typing.len() + 1,
};
Box::new(EdgeToNodeChainJoinIterator {
left: left.iter()?,
right: right.clone(),
left_outer: *left_outer,
key_indices,
})
}
ChainJoinKind::BwdEdgeToNode => {
let key_indices = Range {
start: right_info.src_key_typing.len() + 2,
end: right_info.src_key_typing.len() + right_info.dst_key_typing.len() + 2,
};
Box::new(EdgeToNodeChainJoinIterator {
left: left.iter()?,
right: right.clone(),
left_outer: *left_outer,
key_indices,
})
}
})
}
}
}
}
}
}
// Implementation notice
// Never define `.next()` recursively for iterators below, otherwise stackoverflow is almost
// guaranteed (but may not show for test data)
pub struct NodeToFwdEdgeChainJoinIterator<'a> {
left: Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>,
right: TableRowGetter<'a>,
}
impl<'a> Iterator for NodeToFwdEdgeChainJoinIterator<'a> {
type Item = Result<MegaTuple>;
fn next(&mut self) -> Option<Self::Item> {
todo!()
}
}
pub struct EdgeToNodeChainJoinIterator<'a> {
left: Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>,
right: TableRowGetter<'a>,
left_outer: bool,
key_indices: Range<usize>,
}
impl<'a> Iterator for EdgeToNodeChainJoinIterator<'a> {
type Item = Result<MegaTuple>;
fn next(&mut self) -> Option<Self::Item> {
loop {
match self.left.next() {
None => return None,
Some(Err(e)) => return Some(Err(e)),
Some(Ok(mut left_tuple)) => {
self.right.reset();
let key_iter = self.key_indices.clone().map(|i|
left_tuple.keys.last().unwrap().get(i).unwrap()
);
match self.right.get(key_iter) {
Ok(v) => {
match v {
None => {
if self.left_outer {
left_tuple.vals.push(OwnTuple::empty_tuple().into());
return Some(Ok(left_tuple));
}
// else fall through, go to the next iteration
}
Some(right_val) => {
left_tuple.vals.push(right_val.into());
return Some(Ok(left_tuple));
}
}
}
Err(e) => return Some(Err(e))
}
}
};
}
}
}
pub struct KeyedUnionIterator<'a> {
left: Box<dyn Iterator<Item = Result<MegaTuple>> + 'a>,
right: Box<dyn Iterator<Item = Result<MegaTuple>> + 'a>,
left: Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>,
right: Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>,
}
impl<'a> Iterator for KeyedUnionIterator<'a> {
@ -395,8 +511,8 @@ impl<'a> Iterator for KeyedUnionIterator<'a> {
}
pub struct KeyedDifferenceIterator<'a> {
left: Box<dyn Iterator<Item = Result<MegaTuple>> + 'a>,
right: Box<dyn Iterator<Item = Result<MegaTuple>> + 'a>,
left: Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>,
right: Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>,
right_cache: Option<MegaTuple>,
started: bool,
}
@ -464,7 +580,7 @@ impl<'a> Iterator for KeyedDifferenceIterator<'a> {
}
pub struct BagsUnionIterator<'a> {
bags: Vec<Box<dyn Iterator<Item = Result<MegaTuple>> + 'a>>,
bags: Vec<Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>>,
current: usize,
}
@ -472,17 +588,18 @@ impl<'a> Iterator for BagsUnionIterator<'a> {
type Item = Result<MegaTuple>;
fn next(&mut self) -> Option<Self::Item> {
let cur_it = self.bags.get_mut(self.current).unwrap();
match cur_it.next() {
None => {
if self.current == self.bags.len() - 1 {
None
} else {
self.current += 1;
self.next()
loop {
let cur_it = self.bags.get_mut(self.current).unwrap();
match cur_it.next() {
None => {
if self.current == self.bags.len() - 1 {
return None;
} else {
self.current += 1;
}
}
v => return v,
}
v => v,
}
}
}
@ -587,7 +704,7 @@ impl<'a> Iterator for EdgeKeyOnlyBwdIterator<'a> {
}
pub struct KeySortedWithAssocIterator<'a> {
main: Box<dyn Iterator<Item = Result<MegaTuple>> + 'a>,
main: Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>,
associates: Vec<NodeIterator<'a>>,
buffer: Vec<Option<(CowTuple, CowTuple)>>,
}
@ -671,8 +788,8 @@ impl<'a> Iterator for KeySortedWithAssocIterator<'a> {
}
pub struct OuterMergeJoinIterator<'a> {
left: Box<dyn Iterator<Item = Result<MegaTuple>> + 'a>,
right: Box<dyn Iterator<Item = Result<MegaTuple>> + 'a>,
left: Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>,
right: Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>,
left_outer: bool,
right_outer: bool,
left_keys: &'a [(TableId, ColId)],
@ -820,8 +937,8 @@ impl<'a> Iterator for OuterMergeJoinIterator<'a> {
}
pub struct MergeJoinIterator<'a> {
left: Box<dyn Iterator<Item = Result<MegaTuple>> + 'a>,
right: Box<dyn Iterator<Item = Result<MegaTuple>> + 'a>,
left: Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>,
right: Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>,
left_keys: &'a [(TableId, ColId)],
right_keys: &'a [(TableId, ColId)],
}
@ -881,10 +998,10 @@ impl<'a> Iterator for MergeJoinIterator<'a> {
}
pub struct CartesianProdIterator<'a> {
left: Box<dyn Iterator<Item = Result<MegaTuple>> + 'a>,
left: Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>,
left_cache: MegaTuple,
right_source: &'a ExecPlan<'a>,
right: Box<dyn Iterator<Item = Result<MegaTuple>> + 'a>,
right: Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>,
}
impl<'a> Iterator for CartesianProdIterator<'a> {
@ -927,7 +1044,7 @@ impl<'a> Iterator for CartesianProdIterator<'a> {
}
pub struct FilterIterator<'a> {
it: Box<dyn Iterator<Item = Result<MegaTuple>> + 'a>,
it: Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>,
filter: &'a Value<'a>,
}
@ -955,7 +1072,7 @@ impl<'a> Iterator for FilterIterator<'a> {
}
pub struct OutputIterator<'a> {
it: Box<dyn Iterator<Item = Result<MegaTuple>> + 'a>,
it: Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>,
transform: &'a Value<'a>,
}
@ -981,7 +1098,7 @@ impl<'a> Iterator for OutputIterator<'a> {
}
pub struct EvalIterator<'a> {
it: Box<dyn Iterator<Item = Result<MegaTuple>> + 'a>,
it: Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>,
keys: &'a [(String, Value<'a>)],
vals: &'a [(String, Value<'a>)],
}

@ -16,9 +16,9 @@ use std::rc::Rc;
/// # key layouts
///
/// * Node
/// * `[table_id, keys]`
/// * `table_id, [keys]`
/// * Edge
/// * `[table_id, fst_table_id, fst_keys, is_forward, snd_keys, other_keys]` twice, the backward has no data
/// * `table_id, [fst_table_id, fst_keys, is_forward, snd_keys, other_keys]` twice, the backward has no data
/// * Associate
/// * Same as the main one
///

@ -1,5 +1,5 @@
use crate::db::engine::Session;
use crate::db::iterator::{ChainJoinKind, ExecPlan, IteratorSlot, OutputItPlan, TableRowGetter};
use crate::db::iterator::{ChainJoinKind, ExecPlan, IteratorSlot, OutputItPlan, TableRowGetter, TableRowGetterSlot};
use crate::db::query::{EdgeOrNodeEl, EdgeOrNodeKind, FromEl, Selection};
use crate::db::table::{ColId, TableId, TableInfo};
use crate::error::CozoError::LogicError;
@ -67,10 +67,10 @@ fn shift_accessor_map(amap: AccessorMap, (keyshift, valshift): (usize, usize)) -
tid.in_root,
tid.id
+ if cid.is_key {
keyshift as i64
} else {
valshift as i64
},
keyshift as i64
} else {
valshift as i64
},
),
cid,
),
@ -189,7 +189,6 @@ impl<'a> Session<'a> {
right_info,
kind,
left_outer,
right_outer,
right_binding,
} => {
let (l_plan, l_map) = self.do_reify_intermediate_plan(*left)?;
@ -207,16 +206,15 @@ impl<'a> Session<'a> {
let r_map = shift_accessor_map(r_map, l_plan.tuple_widths());
let plan = ExecPlan::ChainJoinItPlan {
left: l_plan.into(),
right: TableRowGetter::Reified {
right: TableRowGetterSlot::Reified(TableRowGetter {
sess: self,
key_cache: OwnTuple::with_prefix(right_info.table_id.id as u32),
in_root: right_info.table_id.in_root,
},
}),
right_info,
right_binding,
kind,
left_outer,
right_outer,
};
(plan, merge_accessor_map(l_map, r_map))
}
@ -298,13 +296,13 @@ impl<'a> Session<'a> {
it: IteratorSlot::Dummy,
info: nxt.info,
binding: nxt.binding,
getter: TableRowGetter::Dummy,
getter: TableRowGetterSlot::Dummy,
},
};
for el in it {
plan = ExecPlan::ChainJoinItPlan {
left: plan.into(),
right: TableRowGetter::Dummy,
right: TableRowGetterSlot::Dummy,
right_info: el.info,
kind: match (prev_kind, el.kind) {
(EdgeOrNodeKind::Node, EdgeOrNodeKind::FwdEdge) => {
@ -322,7 +320,6 @@ impl<'a> Session<'a> {
_ => unreachable!(),
},
left_outer: prev_left_outer,
right_outer: el.right_outer_marker,
right_binding: el.binding,
};

@ -8,6 +8,7 @@ use crate::relation::value;
use crate::relation::value::{StaticValue, Value};
use pest::iterators::Pair;
use std::collections::BTreeMap;
use crate::error::CozoError::LogicError;
#[derive(Debug, Eq, PartialEq, Clone)]
pub enum FromEl {
@ -80,8 +81,9 @@ impl<'a> Session<'a> {
let mut pairs = p.into_inner();
let mut nxt = pairs.next().unwrap();
if nxt.as_rule() == Rule::outer_join_marker {
right_join = true;
nxt = pairs.next().unwrap();
// right_join = true;
// nxt = pairs.next().unwrap();
return Err(LogicError("Right outer join not supported here".to_string()))
} else {
right_join = false;
}

Loading…
Cancel
Save