main
Ziyang Hu 2 years ago
parent db4b7c4851
commit b3651f8d47

@ -1,451 +1,15 @@
use crate::db::engine::Session;
use crate::db::eval::{compare_tuple_by_keys, tuple_eval};
use crate::db::table::{ColId, TableId, TableInfo};
use crate::db::table::{ColId, TableId};
use crate::error::CozoError::LogicError;
use crate::error::Result;
use crate::relation::data::{DataKind, EMPTY_DATA};
use crate::relation::table::MegaTuple;
use crate::relation::tuple::{CowSlice, CowTuple, OwnTuple, SliceTuple, Tuple};
use crate::relation::tuple::{CowSlice, CowTuple, OwnTuple, Tuple};
use crate::relation::value::Value;
use cozorocks::IteratorPtr;
use std::cmp::Ordering;
use std::fmt::{Debug, Formatter};
use std::{iter, mem};
pub enum IteratorSlot<'a> {
Dummy,
Reified(IteratorPtr<'a>),
}
impl<'a> Debug for IteratorSlot<'a> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
IteratorSlot::Dummy { .. } => write!(f, "DummyIterator"),
IteratorSlot::Reified(_) => write!(f, "BaseIterator"),
}
}
}
pub enum TableRowGetterSlot<'a> {
Dummy,
Reified(TableRowGetter<'a>),
}
#[derive(Clone)]
pub struct TableRowGetter<'a> {
pub sess: &'a Session<'a>,
pub key_cache: OwnTuple,
pub in_root: bool,
}
impl<'a> TableRowGetter<'a> {
pub fn reset(&mut self) {
self.key_cache.truncate_all();
}
pub fn get_with_tuple<T: AsRef<[u8]>>(&self, t: &T) -> Result<Option<SliceTuple>> {
let res = self
.sess
.txn
.get(
self.in_root,
if self.in_root {
&self.sess.perm_cf
} else {
&self.sess.temp_cf
},
t,
)?
.map(Tuple::new);
Ok(res)
}
pub fn get_with_iter<'b, T: Iterator<Item = Value<'b>>>(
&mut self,
vals: T,
) -> Result<Option<SliceTuple>> {
for val in vals {
self.key_cache.push_value(&val);
}
let val = self.sess.txn.get(
self.in_root,
if self.in_root {
&self.sess.perm_cf
} else {
&self.sess.temp_cf
},
&self.key_cache,
)?;
Ok(val.map(Tuple::new))
}
}
impl<'a> Debug for TableRowGetterSlot<'a> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
TableRowGetterSlot::Dummy => write!(f, "DummyRowGetter"),
TableRowGetterSlot::Reified { .. } => write!(f, "TableRowGetter"),
}
}
}
impl<'a> From<IteratorPtr<'a>> for IteratorSlot<'a> {
fn from(it: IteratorPtr<'a>) -> Self {
Self::Reified(it)
}
}
impl<'a> IteratorSlot<'a> {
pub fn try_get(&self) -> Result<&IteratorPtr<'a>> {
match self {
IteratorSlot::Dummy => Err(LogicError("Cannot iter over dummy".to_string())),
IteratorSlot::Reified(r) => Ok(r),
}
}
}
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub enum ChainJoinKind {
NodeToFwdEdge,
NodeToBwdEdge,
FwdEdgeToNode,
BwdEdgeToNode,
}
#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
pub enum ExecPlan<'a> {
NodeItPlan {
it: IteratorSlot<'a>,
info: TableInfo,
binding: Option<String>,
},
EdgeItPlan {
it: IteratorSlot<'a>,
info: TableInfo,
binding: Option<String>,
},
EdgeKeyOnlyBwdItPlan {
it: IteratorSlot<'a>,
info: TableInfo,
binding: Option<String>,
},
EdgeBwdItPlan {
it: IteratorSlot<'a>,
info: TableInfo,
binding: Option<String>,
getter: TableRowGetterSlot<'a>,
},
ChainJoinItPlan {
left: Box<ExecPlan<'a>>,
left_info: TableInfo,
right: TableRowGetterSlot<'a>,
right_info: TableInfo,
right_binding: Option<String>,
kind: ChainJoinKind,
left_outer: bool,
},
// IndexIt {it: ..}
KeySortedWithAssocItPlan {
main: Box<ExecPlan<'a>>,
associates: Vec<(u32, IteratorSlot<'a>)>,
},
CartesianProdItPlan {
left: Box<ExecPlan<'a>>,
right: Box<ExecPlan<'a>>,
},
MergeJoinItPlan {
left: Box<ExecPlan<'a>>,
right: Box<ExecPlan<'a>>,
left_keys: Vec<(TableId, ColId)>,
right_keys: Vec<(TableId, ColId)>,
},
OuterMergeJoinItPlan {
left: Box<ExecPlan<'a>>,
right: Box<ExecPlan<'a>>,
left_keys: Vec<(TableId, ColId)>,
right_keys: Vec<(TableId, ColId)>,
left_outer: bool,
right_outer: bool,
left_len: (usize, usize),
right_len: (usize, usize),
},
KeyedUnionItPlan {
left: Box<ExecPlan<'a>>,
right: Box<ExecPlan<'a>>,
},
KeyedDifferenceItPlan {
left: Box<ExecPlan<'a>>,
right: Box<ExecPlan<'a>>,
},
FilterItPlan {
source: Box<ExecPlan<'a>>,
filter: Value<'a>,
},
EvalItPlan {
source: Box<ExecPlan<'a>>,
keys: Vec<(String, Value<'a>)>,
vals: Vec<(String, Value<'a>)>,
},
BagsUnionIt {
bags: Vec<ExecPlan<'a>>,
},
}
impl<'a> ExecPlan<'a> {
pub fn tuple_widths(&self) -> (usize, usize) {
match self {
ExecPlan::NodeItPlan { .. } => (1, 1),
ExecPlan::EdgeItPlan { .. } => (1, 1),
ExecPlan::EdgeKeyOnlyBwdItPlan { .. } => (1, 0),
ExecPlan::KeySortedWithAssocItPlan { main, associates } => {
let (k, v) = main.tuple_widths();
(k, v + associates.len())
}
ExecPlan::CartesianProdItPlan { left, right } => {
let (l1, l2) = left.tuple_widths();
let (r1, r2) = right.tuple_widths();
(l1 + r1, l2 + r2)
}
ExecPlan::MergeJoinItPlan { left, right, .. } => {
let (l1, l2) = left.tuple_widths();
let (r1, r2) = right.tuple_widths();
(l1 + r1, l2 + r2)
}
ExecPlan::OuterMergeJoinItPlan { left, right, .. } => {
let (l1, l2) = left.tuple_widths();
let (r1, r2) = right.tuple_widths();
(l1 + r1, l2 + r2)
}
ExecPlan::KeyedUnionItPlan { left, .. } => left.tuple_widths(),
ExecPlan::KeyedDifferenceItPlan { left, .. } => left.tuple_widths(),
ExecPlan::FilterItPlan { source, .. } => source.tuple_widths(),
ExecPlan::EvalItPlan { source, .. } => source.tuple_widths(),
ExecPlan::BagsUnionIt { bags } => {
if bags.is_empty() {
(0, 0)
} else {
bags.get(0).unwrap().tuple_widths()
}
}
ExecPlan::EdgeBwdItPlan { .. } => {
todo!()
}
ExecPlan::ChainJoinItPlan { left, .. } => {
let (l1, l2) = left.tuple_widths();
(l1 + 1, l2 + 1)
}
}
}
}
#[derive(Debug)]
pub struct OutputItPlan<'a> {
pub source: ExecPlan<'a>,
pub value: Value<'a>,
}
impl<'a> OutputItPlan<'a> {
pub fn iter(&self) -> Result<OutputIterator> {
Ok(OutputIterator {
it: self.source.iter()?,
transform: &self.value,
})
}
}
impl<'a> ExecPlan<'a> {
pub fn iter(&'a self) -> Result<Box<dyn Iterator<Item = Result<MegaTuple>> + 'a>> {
match self {
ExecPlan::NodeItPlan { it, info, .. } => {
let it = it.try_get()?;
let prefix_tuple = OwnTuple::with_prefix(info.table_id.id as u32);
it.seek(prefix_tuple);
Ok(Box::new(NodeIterator { it, started: false }))
}
ExecPlan::EdgeItPlan { it, info, .. } => {
let it = it.try_get()?;
let mut prefix_tuple = OwnTuple::with_prefix(info.table_id.id as u32);
prefix_tuple.push_int(info.src_table_id.id);
it.seek(prefix_tuple);
Ok(Box::new(EdgeIterator {
it,
started: false,
src_table_id: info.src_table_id.id,
}))
}
ExecPlan::EdgeKeyOnlyBwdItPlan { it, info, .. } => {
let it = it.try_get()?;
let mut prefix_tuple = OwnTuple::with_prefix(info.table_id.id as u32);
prefix_tuple.push_int(info.dst_table_id.id);
it.seek(prefix_tuple);
Ok(Box::new(EdgeKeyOnlyBwdIterator {
it,
started: false,
dst_table_id: info.dst_table_id.id,
}))
}
ExecPlan::KeySortedWithAssocItPlan { main, associates } => {
let buffer = iter::repeat_with(|| None).take(associates.len()).collect();
let associates = associates
.iter()
.map(|(tid, it)| {
it.try_get().map(|it| {
let prefix_tuple = OwnTuple::with_prefix(*tid);
it.seek(prefix_tuple);
NodeIterator { it, started: false }
})
})
.collect::<Result<Vec<_>>>()?;
Ok(Box::new(KeySortedWithAssocIterator {
main: main.iter()?,
associates,
buffer,
}))
}
ExecPlan::CartesianProdItPlan { left, right } => Ok(Box::new(CartesianProdIterator {
left: left.iter()?,
left_cache: MegaTuple::empty_tuple(),
right_source: right.as_ref(),
right: right.as_ref().iter()?,
})),
ExecPlan::FilterItPlan { source: it, filter } => Ok(Box::new(FilterIterator {
it: it.iter()?,
filter,
})),
ExecPlan::EvalItPlan {
source: it,
keys,
vals,
} => Ok(Box::new(EvalIterator {
it: it.iter()?,
keys,
vals,
})),
ExecPlan::MergeJoinItPlan {
left,
right,
left_keys,
right_keys,
} => Ok(Box::new(MergeJoinIterator {
left: left.iter()?,
right: right.iter()?,
left_keys,
right_keys,
})),
ExecPlan::OuterMergeJoinItPlan {
left,
right,
left_keys,
right_keys,
left_outer,
right_outer,
left_len,
right_len,
} => Ok(Box::new(OuterMergeJoinIterator {
left: left.iter()?,
right: right.iter()?,
left_outer: *left_outer,
right_outer: *right_outer,
left_keys,
right_keys,
left_len: *left_len,
right_len: *right_len,
left_cache: None,
right_cache: None,
pull_left: true,
pull_right: true,
})),
ExecPlan::KeyedUnionItPlan { left, right } => Ok(Box::new(KeyedUnionIterator {
left: left.iter()?,
right: right.iter()?,
})),
ExecPlan::KeyedDifferenceItPlan { left, right } => {
Ok(Box::new(KeyedDifferenceIterator {
left: left.iter()?,
right: right.iter()?,
right_cache: None,
started: false,
}))
}
ExecPlan::BagsUnionIt { bags } => {
let bags = bags.iter().map(|i| i.iter()).collect::<Result<Vec<_>>>()?;
Ok(Box::new(BagsUnionIterator { bags, current: 0 }))
}
ExecPlan::EdgeBwdItPlan { .. } => {
todo!()
}
ExecPlan::ChainJoinItPlan {
left,
right,
left_outer,
kind,
left_info,
right_info,
..
} => match right {
TableRowGetterSlot::Dummy => {
Err(LogicError("Uninitialized chain join".to_string()))
}
TableRowGetterSlot::Reified(right) => Ok(match kind {
ChainJoinKind::NodeToFwdEdge | ChainJoinKind::NodeToBwdEdge => {
let chain_kind = match kind {
ChainJoinKind::NodeToFwdEdge => NodeEdgeChainKind::Fwd,
ChainJoinKind::NodeToBwdEdge => NodeEdgeChainKind::Bwd,
_ => unreachable!(),
};
let (edge_front_table_id, left_key_len) =
if *kind == ChainJoinKind::NodeToBwdEdge {
(
right_info.dst_table_id.id as u32,
right_info.dst_key_typing.len(),
)
} else {
(
right_info.src_table_id.id as u32,
right_info.src_key_typing.len(),
)
};
let right_iter = if right.in_root {
right.sess.txn.iterator(true, &right.sess.perm_cf)
} else {
right.sess.txn.iterator(false, &right.sess.temp_cf)
};
Box::new(NodeToEdgeChainJoinIterator {
left: left.iter()?,
right_it: right_iter,
right_getter: right.clone(),
kind: chain_kind,
right_key_cache: None,
edge_front_table_id,
left_key_len,
left_cache: None,
last_right_key_cache: None,
})
}
ChainJoinKind::FwdEdgeToNode => Box::new(EdgeToNodeChainJoinIterator {
left: left.iter()?,
right: right.clone(),
left_outer: *left_outer,
key_start_idx: left_info.src_key_typing.len() + 2,
key_end_idx: left_info.src_key_typing.len()
+ left_info.dst_key_typing.len()
+ 2,
}),
ChainJoinKind::BwdEdgeToNode => Box::new(EdgeToNodeChainJoinIterator {
left: left.iter()?,
right: right.clone(),
left_outer: *left_outer,
key_start_idx: 1,
key_end_idx: left_info.src_key_typing.len() + 1,
}),
}),
},
}
}
}
use crate::db::plan::{ExecPlan, TableRowGetter};
// Implementation notice
// Never define `.next()` recursively for iterators below, otherwise stackoverflow is almost
@ -459,15 +23,15 @@ pub enum NodeEdgeChainKind {
}
pub struct NodeToEdgeChainJoinIterator<'a> {
left: Box<dyn Iterator<Item = Result<MegaTuple>> + 'a>,
right_it: IteratorPtr<'a>,
right_getter: TableRowGetter<'a>,
kind: NodeEdgeChainKind,
left_key_len: usize,
right_key_cache: Option<OwnTuple>,
edge_front_table_id: u32,
left_cache: Option<MegaTuple>,
last_right_key_cache: Option<CowTuple>,
pub(crate) left: Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>,
pub(crate) right_it: IteratorPtr<'a>,
pub(crate) right_getter: TableRowGetter<'a>,
pub(crate) kind: NodeEdgeChainKind,
pub(crate) left_key_len: usize,
pub(crate) right_key_cache: Option<OwnTuple>,
pub(crate) edge_front_table_id: u32,
pub(crate) left_cache: Option<MegaTuple>,
pub(crate) last_right_key_cache: Option<CowTuple>,
}
impl<'a> Iterator for NodeToEdgeChainJoinIterator<'a> {
@ -582,11 +146,11 @@ impl<'a> Iterator for NodeToEdgeChainJoinIterator<'a> {
}
pub struct EdgeToNodeChainJoinIterator<'a> {
left: Box<dyn Iterator<Item = Result<MegaTuple>> + 'a>,
right: TableRowGetter<'a>,
left_outer: bool,
key_start_idx: usize,
key_end_idx: usize,
pub(crate) left: Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>,
pub(crate) right: TableRowGetter<'a>,
pub(crate) left_outer: bool,
pub(crate) key_start_idx: usize,
pub(crate) key_end_idx: usize,
}
impl<'a> Iterator for EdgeToNodeChainJoinIterator<'a> {
@ -628,8 +192,8 @@ impl<'a> Iterator for EdgeToNodeChainJoinIterator<'a> {
}
pub struct KeyedUnionIterator<'a> {
left: Box<dyn Iterator<Item = Result<MegaTuple>> + 'a>,
right: Box<dyn Iterator<Item = Result<MegaTuple>> + 'a>,
pub(crate) left: Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>,
pub(crate) right: Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>,
}
impl<'a> Iterator for KeyedUnionIterator<'a> {
@ -680,10 +244,10 @@ impl<'a> Iterator for KeyedUnionIterator<'a> {
}
pub struct KeyedDifferenceIterator<'a> {
left: Box<dyn Iterator<Item = Result<MegaTuple>> + 'a>,
right: Box<dyn Iterator<Item = Result<MegaTuple>> + 'a>,
right_cache: Option<MegaTuple>,
started: bool,
pub(crate) left: Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>,
pub(crate) right: Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>,
pub(crate) right_cache: Option<MegaTuple>,
pub(crate) started: bool,
}
impl<'a> Iterator for KeyedDifferenceIterator<'a> {
@ -749,8 +313,8 @@ impl<'a> Iterator for KeyedDifferenceIterator<'a> {
}
pub struct BagsUnionIterator<'a> {
bags: Vec<Box<dyn Iterator<Item = Result<MegaTuple>> + 'a>>,
current: usize,
pub(crate) bags: Vec<Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>>,
pub(crate) current: usize,
}
impl<'a> Iterator for BagsUnionIterator<'a> {
@ -774,8 +338,8 @@ impl<'a> Iterator for BagsUnionIterator<'a> {
}
pub struct NodeIterator<'a> {
it: &'a IteratorPtr<'a>,
started: bool,
pub(crate) it: &'a IteratorPtr<'a>,
pub(crate) started: bool,
}
impl<'a> Iterator for NodeIterator<'a> {
@ -798,9 +362,9 @@ impl<'a> Iterator for NodeIterator<'a> {
}
pub struct EdgeIterator<'a> {
it: &'a IteratorPtr<'a>,
started: bool,
src_table_id: i64,
pub(crate) it: &'a IteratorPtr<'a>,
pub(crate) started: bool,
pub(crate) src_table_id: i64,
}
impl<'a> Iterator for EdgeIterator<'a> {
@ -836,9 +400,9 @@ impl<'a> Iterator for EdgeIterator<'a> {
}
pub struct EdgeKeyOnlyBwdIterator<'a> {
it: &'a IteratorPtr<'a>,
started: bool,
dst_table_id: i64,
pub(crate) it: &'a IteratorPtr<'a>,
pub(crate) started: bool,
pub(crate) dst_table_id: i64,
}
impl<'a> Iterator for EdgeKeyOnlyBwdIterator<'a> {
@ -873,9 +437,9 @@ impl<'a> Iterator for EdgeKeyOnlyBwdIterator<'a> {
}
pub struct KeySortedWithAssocIterator<'a> {
main: Box<dyn Iterator<Item = Result<MegaTuple>> + 'a>,
associates: Vec<NodeIterator<'a>>,
buffer: Vec<Option<(CowTuple, CowTuple)>>,
pub(crate) main: Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>,
pub(crate) associates: Vec<NodeIterator<'a>>,
pub(crate) buffer: Vec<Option<(CowTuple, CowTuple)>>,
}
impl<'a> Iterator for KeySortedWithAssocIterator<'a> {
@ -957,18 +521,18 @@ impl<'a> Iterator for KeySortedWithAssocIterator<'a> {
}
pub struct OuterMergeJoinIterator<'a> {
left: Box<dyn Iterator<Item = Result<MegaTuple>> + 'a>,
right: Box<dyn Iterator<Item = Result<MegaTuple>> + 'a>,
left_outer: bool,
right_outer: bool,
left_keys: &'a [(TableId, ColId)],
right_keys: &'a [(TableId, ColId)],
left_len: (usize, usize),
right_len: (usize, usize),
left_cache: Option<MegaTuple>,
right_cache: Option<MegaTuple>,
pull_left: bool,
pull_right: bool,
pub(crate) left: Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>,
pub(crate) right: Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>,
pub(crate) left_outer: bool,
pub(crate) right_outer: bool,
pub(crate) left_keys: &'a [(TableId, ColId)],
pub(crate) right_keys: &'a [(TableId, ColId)],
pub(crate) left_len: (usize, usize),
pub(crate) right_len: (usize, usize),
pub(crate) left_cache: Option<MegaTuple>,
pub(crate) right_cache: Option<MegaTuple>,
pub(crate) pull_left: bool,
pub(crate) pull_right: bool,
}
impl<'a> Iterator for OuterMergeJoinIterator<'a> {
@ -1106,10 +670,10 @@ impl<'a> Iterator for OuterMergeJoinIterator<'a> {
}
pub struct MergeJoinIterator<'a> {
left: Box<dyn Iterator<Item = Result<MegaTuple>> + 'a>,
right: Box<dyn Iterator<Item = Result<MegaTuple>> + 'a>,
left_keys: &'a [(TableId, ColId)],
right_keys: &'a [(TableId, ColId)],
pub(crate) left: Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>,
pub(crate) right: Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>,
pub(crate) left_keys: &'a [(TableId, ColId)],
pub(crate) right_keys: &'a [(TableId, ColId)],
}
impl<'a> Iterator for MergeJoinIterator<'a> {
@ -1167,10 +731,10 @@ impl<'a> Iterator for MergeJoinIterator<'a> {
}
pub struct CartesianProdIterator<'a> {
left: Box<dyn Iterator<Item = Result<MegaTuple>> + 'a>,
left_cache: MegaTuple,
right_source: &'a ExecPlan<'a>,
right: Box<dyn Iterator<Item = Result<MegaTuple>> + 'a>,
pub(crate) left: Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>,
pub(crate) left_cache: MegaTuple,
pub(crate) right_source: &'a ExecPlan<'a>,
pub(crate) right: Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>,
}
impl<'a> Iterator for CartesianProdIterator<'a> {
@ -1213,8 +777,8 @@ impl<'a> Iterator for CartesianProdIterator<'a> {
}
pub struct FilterIterator<'a> {
it: Box<dyn Iterator<Item = Result<MegaTuple>> + 'a>,
filter: &'a Value<'a>,
pub(crate) it: Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>,
pub(crate) filter: &'a Value<'a>,
}
impl<'a> Iterator for FilterIterator<'a> {
@ -1241,8 +805,8 @@ impl<'a> Iterator for FilterIterator<'a> {
}
pub struct OutputIterator<'a> {
it: Box<dyn Iterator<Item = Result<MegaTuple>> + 'a>,
transform: &'a Value<'a>,
pub(crate) it: Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>,
pub(crate) transform: &'a Value<'a>,
}
impl<'a> OutputIterator<'a> {
@ -1267,9 +831,9 @@ impl<'a> Iterator for OutputIterator<'a> {
}
pub struct EvalIterator<'a> {
it: Box<dyn Iterator<Item = Result<MegaTuple>> + 'a>,
keys: &'a [(String, Value<'a>)],
vals: &'a [(String, Value<'a>)],
pub(crate) it: Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>,
pub(crate) keys: &'a [(String, Value<'a>)],
pub(crate) vals: &'a [(String, Value<'a>)],
}
pub const EVAL_TEMP_PREFIX: u32 = u32::MAX - 1;

@ -1,19 +1,457 @@
use crate::db::engine::Session;
use crate::db::iterator::{
ChainJoinKind, ExecPlan, IteratorSlot, OutputItPlan, TableRowGetter, TableRowGetterSlot,
};
use crate::db::query::{EdgeOrNodeKind, FromEl, Selection};
use crate::db::table::{ColId, TableId, TableInfo};
use crate::error::CozoError::LogicError;
use crate::error::Result;
use crate::parser::Rule;
use crate::relation::data::DataKind;
use crate::relation::tuple::OwnTuple;
use crate::relation::tuple::{OwnTuple, SliceTuple, Tuple};
use crate::relation::value::{StaticValue, Value};
use cozorocks::IteratorPtr;
use pest::iterators::Pair;
use std::collections::btree_map::Entry;
use std::collections::BTreeMap;
use std::fmt::{Debug, Formatter};
use std::iter;
use crate::db::iterator::{BagsUnionIterator, CartesianProdIterator, EdgeIterator, EdgeKeyOnlyBwdIterator, EdgeToNodeChainJoinIterator, EvalIterator, FilterIterator, KeyedDifferenceIterator, KeyedUnionIterator, KeySortedWithAssocIterator, MergeJoinIterator, NodeEdgeChainKind, NodeIterator, NodeToEdgeChainJoinIterator, OuterMergeJoinIterator, OutputIterator};
use crate::relation::table::MegaTuple;
pub enum IteratorSlot<'a> {
Dummy,
Reified(IteratorPtr<'a>),
}
impl<'a> Debug for IteratorSlot<'a> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
IteratorSlot::Dummy { .. } => write!(f, "DummyIterator"),
IteratorSlot::Reified(_) => write!(f, "BaseIterator"),
}
}
}
pub enum TableRowGetterSlot<'a> {
Dummy,
Reified(TableRowGetter<'a>),
}
#[derive(Clone)]
pub struct TableRowGetter<'a> {
pub sess: &'a Session<'a>,
pub key_cache: OwnTuple,
pub in_root: bool,
}
impl<'a> TableRowGetter<'a> {
pub fn reset(&mut self) {
self.key_cache.truncate_all();
}
pub fn get_with_tuple<T: AsRef<[u8]>>(&self, t: &T) -> Result<Option<SliceTuple>> {
let res = self
.sess
.txn
.get(
self.in_root,
if self.in_root {
&self.sess.perm_cf
} else {
&self.sess.temp_cf
},
t,
)?
.map(Tuple::new);
Ok(res)
}
pub fn get_with_iter<'b, T: Iterator<Item = Value<'b>>>(
&mut self,
vals: T,
) -> Result<Option<SliceTuple>> {
for val in vals {
self.key_cache.push_value(&val);
}
let val = self.sess.txn.get(
self.in_root,
if self.in_root {
&self.sess.perm_cf
} else {
&self.sess.temp_cf
},
&self.key_cache,
)?;
Ok(val.map(Tuple::new))
}
}
impl<'a> Debug for TableRowGetterSlot<'a> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
TableRowGetterSlot::Dummy => write!(f, "DummyRowGetter"),
TableRowGetterSlot::Reified { .. } => write!(f, "TableRowGetter"),
}
}
}
impl<'a> From<IteratorPtr<'a>> for IteratorSlot<'a> {
fn from(it: IteratorPtr<'a>) -> Self {
Self::Reified(it)
}
}
impl<'a> IteratorSlot<'a> {
pub fn try_get(&self) -> Result<&IteratorPtr<'a>> {
match self {
IteratorSlot::Dummy => Err(LogicError("Cannot iter over dummy".to_string())),
IteratorSlot::Reified(r) => Ok(r),
}
}
}
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub enum ChainJoinKind {
NodeToFwdEdge,
NodeToBwdEdge,
FwdEdgeToNode,
BwdEdgeToNode,
}
#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
pub enum ExecPlan<'a> {
NodeItPlan {
it: IteratorSlot<'a>,
info: TableInfo,
binding: Option<String>,
},
EdgeItPlan {
it: IteratorSlot<'a>,
info: TableInfo,
binding: Option<String>,
},
EdgeKeyOnlyBwdItPlan {
it: IteratorSlot<'a>,
info: TableInfo,
binding: Option<String>,
},
EdgeBwdItPlan {
it: IteratorSlot<'a>,
info: TableInfo,
binding: Option<String>,
getter: TableRowGetterSlot<'a>,
},
ChainJoinItPlan {
left: Box<ExecPlan<'a>>,
left_info: TableInfo,
right: TableRowGetterSlot<'a>,
right_info: TableInfo,
right_binding: Option<String>,
kind: ChainJoinKind,
left_outer: bool,
},
// IndexIt {it: ..}
KeySortedWithAssocItPlan {
main: Box<ExecPlan<'a>>,
associates: Vec<(u32, IteratorSlot<'a>)>,
},
CartesianProdItPlan {
left: Box<ExecPlan<'a>>,
right: Box<ExecPlan<'a>>,
},
MergeJoinItPlan {
left: Box<ExecPlan<'a>>,
right: Box<ExecPlan<'a>>,
left_keys: Vec<(TableId, ColId)>,
right_keys: Vec<(TableId, ColId)>,
},
OuterMergeJoinItPlan {
left: Box<ExecPlan<'a>>,
right: Box<ExecPlan<'a>>,
left_keys: Vec<(TableId, ColId)>,
right_keys: Vec<(TableId, ColId)>,
left_outer: bool,
right_outer: bool,
left_len: (usize, usize),
right_len: (usize, usize),
},
KeyedUnionItPlan {
left: Box<ExecPlan<'a>>,
right: Box<ExecPlan<'a>>,
},
KeyedDifferenceItPlan {
left: Box<ExecPlan<'a>>,
right: Box<ExecPlan<'a>>,
},
FilterItPlan {
source: Box<ExecPlan<'a>>,
filter: Value<'a>,
},
EvalItPlan {
source: Box<ExecPlan<'a>>,
keys: Vec<(String, Value<'a>)>,
vals: Vec<(String, Value<'a>)>,
},
BagsUnionIt {
bags: Vec<ExecPlan<'a>>,
},
}
impl<'a> ExecPlan<'a> {
pub fn tuple_widths(&self) -> (usize, usize) {
match self {
ExecPlan::NodeItPlan { .. } => (1, 1),
ExecPlan::EdgeItPlan { .. } => (1, 1),
ExecPlan::EdgeKeyOnlyBwdItPlan { .. } => (1, 0),
ExecPlan::KeySortedWithAssocItPlan { main, associates } => {
let (k, v) = main.tuple_widths();
(k, v + associates.len())
}
ExecPlan::CartesianProdItPlan { left, right } => {
let (l1, l2) = left.tuple_widths();
let (r1, r2) = right.tuple_widths();
(l1 + r1, l2 + r2)
}
ExecPlan::MergeJoinItPlan { left, right, .. } => {
let (l1, l2) = left.tuple_widths();
let (r1, r2) = right.tuple_widths();
(l1 + r1, l2 + r2)
}
ExecPlan::OuterMergeJoinItPlan { left, right, .. } => {
let (l1, l2) = left.tuple_widths();
let (r1, r2) = right.tuple_widths();
(l1 + r1, l2 + r2)
}
ExecPlan::KeyedUnionItPlan { left, .. } => left.tuple_widths(),
ExecPlan::KeyedDifferenceItPlan { left, .. } => left.tuple_widths(),
ExecPlan::FilterItPlan { source, .. } => source.tuple_widths(),
ExecPlan::EvalItPlan { source, .. } => source.tuple_widths(),
ExecPlan::BagsUnionIt { bags } => {
if bags.is_empty() {
(0, 0)
} else {
bags.get(0).unwrap().tuple_widths()
}
}
ExecPlan::EdgeBwdItPlan { .. } => {
todo!()
}
ExecPlan::ChainJoinItPlan { left, .. } => {
let (l1, l2) = left.tuple_widths();
(l1 + 1, l2 + 1)
}
}
}
}
#[derive(Debug)]
pub struct OutputItPlan<'a> {
pub source: ExecPlan<'a>,
pub value: Value<'a>,
}
impl<'a> OutputItPlan<'a> {
pub fn iter(&self) -> Result<OutputIterator> {
Ok(OutputIterator {
it: self.source.iter()?,
transform: &self.value,
})
}
}
impl<'a> ExecPlan<'a> {
pub fn iter(&'a self) -> Result<Box<dyn Iterator<Item = Result<MegaTuple>> + 'a>> {
match self {
ExecPlan::NodeItPlan { it, info, .. } => {
let it = it.try_get()?;
let prefix_tuple = OwnTuple::with_prefix(info.table_id.id as u32);
it.seek(prefix_tuple);
Ok(Box::new(NodeIterator { it, started: false }))
}
ExecPlan::EdgeItPlan { it, info, .. } => {
let it = it.try_get()?;
let mut prefix_tuple = OwnTuple::with_prefix(info.table_id.id as u32);
prefix_tuple.push_int(info.src_table_id.id);
it.seek(prefix_tuple);
Ok(Box::new(EdgeIterator {
it,
started: false,
src_table_id: info.src_table_id.id,
}))
}
ExecPlan::EdgeKeyOnlyBwdItPlan { it, info, .. } => {
let it = it.try_get()?;
let mut prefix_tuple = OwnTuple::with_prefix(info.table_id.id as u32);
prefix_tuple.push_int(info.dst_table_id.id);
it.seek(prefix_tuple);
Ok(Box::new(EdgeKeyOnlyBwdIterator {
it,
started: false,
dst_table_id: info.dst_table_id.id,
}))
}
ExecPlan::KeySortedWithAssocItPlan { main, associates } => {
let buffer = iter::repeat_with(|| None).take(associates.len()).collect();
let associates = associates
.iter()
.map(|(tid, it)| {
it.try_get().map(|it| {
let prefix_tuple = OwnTuple::with_prefix(*tid);
it.seek(prefix_tuple);
NodeIterator { it, started: false }
})
})
.collect::<Result<Vec<_>>>()?;
Ok(Box::new(KeySortedWithAssocIterator {
main: main.iter()?,
associates,
buffer,
}))
}
ExecPlan::CartesianProdItPlan { left, right } => Ok(Box::new(CartesianProdIterator {
left: left.iter()?,
left_cache: MegaTuple::empty_tuple(),
right_source: right.as_ref(),
right: right.as_ref().iter()?,
})),
ExecPlan::FilterItPlan { source: it, filter } => Ok(Box::new(FilterIterator {
it: it.iter()?,
filter,
})),
ExecPlan::EvalItPlan {
source: it,
keys,
vals,
} => Ok(Box::new(EvalIterator {
it: it.iter()?,
keys,
vals,
})),
ExecPlan::MergeJoinItPlan {
left,
right,
left_keys,
right_keys,
} => Ok(Box::new(MergeJoinIterator {
left: left.iter()?,
right: right.iter()?,
left_keys,
right_keys,
})),
ExecPlan::OuterMergeJoinItPlan {
left,
right,
left_keys,
right_keys,
left_outer,
right_outer,
left_len,
right_len,
} => Ok(Box::new(OuterMergeJoinIterator {
left: left.iter()?,
right: right.iter()?,
left_outer: *left_outer,
right_outer: *right_outer,
left_keys,
right_keys,
left_len: *left_len,
right_len: *right_len,
left_cache: None,
right_cache: None,
pull_left: true,
pull_right: true,
})),
ExecPlan::KeyedUnionItPlan { left, right } => Ok(Box::new(KeyedUnionIterator {
left: left.iter()?,
right: right.iter()?,
})),
ExecPlan::KeyedDifferenceItPlan { left, right } => {
Ok(Box::new(KeyedDifferenceIterator {
left: left.iter()?,
right: right.iter()?,
right_cache: None,
started: false,
}))
}
ExecPlan::BagsUnionIt { bags } => {
let bags = bags.iter().map(|i| i.iter()).collect::<Result<Vec<_>>>()?;
Ok(Box::new(BagsUnionIterator { bags, current: 0 }))
}
ExecPlan::EdgeBwdItPlan { .. } => {
todo!()
}
ExecPlan::ChainJoinItPlan {
left,
right,
left_outer,
kind,
left_info,
right_info,
..
} => match right {
TableRowGetterSlot::Dummy => {
Err(LogicError("Uninitialized chain join".to_string()))
}
TableRowGetterSlot::Reified(right) => Ok(match kind {
ChainJoinKind::NodeToFwdEdge | ChainJoinKind::NodeToBwdEdge => {
let chain_kind = match kind {
ChainJoinKind::NodeToFwdEdge => NodeEdgeChainKind::Fwd,
ChainJoinKind::NodeToBwdEdge => NodeEdgeChainKind::Bwd,
_ => unreachable!(),
};
let (edge_front_table_id, left_key_len) =
if *kind == ChainJoinKind::NodeToBwdEdge {
(
right_info.dst_table_id.id as u32,
right_info.dst_key_typing.len(),
)
} else {
(
right_info.src_table_id.id as u32,
right_info.src_key_typing.len(),
)
};
let right_iter = if right.in_root {
right.sess.txn.iterator(true, &right.sess.perm_cf)
} else {
right.sess.txn.iterator(false, &right.sess.temp_cf)
};
Box::new(NodeToEdgeChainJoinIterator {
left: left.iter()?,
right_it: right_iter,
right_getter: right.clone(),
kind: chain_kind,
right_key_cache: None,
edge_front_table_id,
left_key_len,
left_cache: None,
last_right_key_cache: None,
})
}
ChainJoinKind::FwdEdgeToNode => Box::new(EdgeToNodeChainJoinIterator {
left: left.iter()?,
right: right.clone(),
left_outer: *left_outer,
key_start_idx: left_info.src_key_typing.len() + 2,
key_end_idx: left_info.src_key_typing.len()
+ left_info.dst_key_typing.len()
+ 2,
}),
ChainJoinKind::BwdEdgeToNode => Box::new(EdgeToNodeChainJoinIterator {
left: left.iter()?,
right: right.clone(),
left_outer: *left_outer,
key_start_idx: 1,
key_end_idx: left_info.src_key_typing.len() + 1,
}),
}),
},
}
}
}
#[derive(Eq, PartialEq, Copy, Clone, Debug)]
pub enum OuterJoinType {

Loading…
Cancel
Save