sorting mat plan

main
Ziyang Hu 2 years ago
parent 8d41002711
commit 9e0b739589

@ -305,7 +305,7 @@ impl<'s> Session<'s> {
self.define_data(&name, tuple, in_root) self.define_data(&name, tuple, in_root)
} }
fn get_next_storage_id(&self, in_root: bool) -> Result<i64> { pub fn get_next_storage_id(&self, in_root: bool) -> Result<i64> {
let mut key_entry = Tuple::with_null_prefix(); let mut key_entry = Tuple::with_null_prefix();
key_entry.push_null(); key_entry.push_null();
let db_res = if in_root { let db_res = if in_root {

@ -5,16 +5,48 @@ use crate::error::Result;
use crate::relation::data::{DataKind, EMPTY_DATA}; use crate::relation::data::{DataKind, EMPTY_DATA};
use crate::relation::table::MegaTuple; use crate::relation::table::MegaTuple;
use crate::relation::tuple::{CowSlice, CowTuple, OwnTuple, Tuple}; use crate::relation::tuple::{CowSlice, CowTuple, OwnTuple, Tuple};
use crate::relation::value::Value; use crate::relation::value::{Value};
use cozorocks::IteratorPtr; use cozorocks::IteratorPtr;
use std::cmp::Ordering; use std::cmp::Ordering;
use std::{iter, mem}; use std::{iter, mem};
use crate::db::engine::Session;
use crate::db::plan::{ExecPlan, TableRowGetter}; use crate::db::plan::{ExecPlan, TableRowGetter};
// Implementation notice // Implementation notice
// Never define `.next()` recursively for iterators below, otherwise stackoverflow is almost // Never define `.next()` recursively for iterators below, otherwise stackoverflow is almost
// guaranteed (but may not show for test data) // guaranteed (but may not show for test data)
pub struct SortingMaterialization<'a> {
pub(crate) source: Box<dyn Iterator<Item=Result<MegaTuple>> + 'a>,
pub(crate) ordering: &'a [(bool, Value<'a>)],
pub(crate) sess: &'a Session<'a>,
pub(crate) sorted: bool
}
impl <'a> SortingMaterialization<'a> {
fn sort(&mut self) {
// todo!()
self.sorted = true;
}
}
impl<'a> Drop for SortingMaterialization<'a> {
fn drop(&mut self) {
// todo!()
}
}
impl <'a> Iterator for SortingMaterialization<'a> {
type Item = Result<MegaTuple>;
fn next(&mut self) -> Option<Self::Item> {
if !self.sorted {
self.sort();
}
todo!()
}
}
#[derive(Copy, Clone, Debug)] #[derive(Copy, Clone, Debug)]
pub enum NodeEdgeChainKind { pub enum NodeEdgeChainKind {
Fwd, Fwd,
@ -1147,8 +1179,9 @@ mod tests {
let start = Instant::now(); let start = Instant::now();
let s = r##"from (j:Job)<-[hj:HasJob]-(e:Employee) let s = r##"from (j:Job)<-[hj:HasJob]-(e:Employee)
where j.id == 16 // where j.id == 16
select { eid: e.id, jid: j.id, fname: e.first_name, salary: hj.salary, job: j.title } select { eid: e.id, jid: j.id, fname: e.first_name, salary: hj.salary, job: j.title }
ordered [j.id: desc, e.id]
limit 2 offset 1"##; limit 2 offset 1"##;
let parsed = Parser::parse(Rule::relational_query, s)?.next().unwrap(); let parsed = Parser::parse(Rule::relational_query, s)?.next().unwrap();

@ -13,10 +13,28 @@ use std::collections::btree_map::Entry;
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::fmt::{Debug, Formatter}; use std::fmt::{Debug, Formatter};
use std::iter; use std::iter;
use crate::db::iterator::{BagsUnionIterator, CartesianProdIterator, EdgeIterator, EdgeKeyOnlyBwdIterator, EdgeToNodeChainJoinIterator, EvalIterator, FilterIterator, KeyedDifferenceIterator, KeyedUnionIterator, KeySortedWithAssocIterator, LimiterIterator, MergeJoinIterator, NodeEdgeChainKind, NodeIterator, NodeToEdgeChainJoinIterator, OuterMergeJoinIterator, OutputIterator}; use crate::db::iterator::{BagsUnionIterator, CartesianProdIterator, EdgeIterator, EdgeKeyOnlyBwdIterator, EdgeToNodeChainJoinIterator, EvalIterator, FilterIterator, KeyedDifferenceIterator, KeyedUnionIterator, KeySortedWithAssocIterator, LimiterIterator, MergeJoinIterator, NodeEdgeChainKind, NodeIterator, NodeToEdgeChainJoinIterator, OuterMergeJoinIterator, OutputIterator, SortingMaterialization};
use crate::relation::table::MegaTuple; use crate::relation::table::MegaTuple;
pub enum SessionSlot<'a> {
Dummy,
Reified(&'a Session<'a>)
}
impl <'a> Debug for SessionSlot<'a> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
SessionSlot::Dummy => {
write!(f, "DummySession")
}
SessionSlot::Reified(_) => {
write!(f, "Session")
}
}
}
}
pub enum IteratorSlot<'a> { pub enum IteratorSlot<'a> {
Dummy, Dummy,
Reified(IteratorPtr<'a>), Reified(IteratorPtr<'a>),
@ -192,14 +210,19 @@ pub enum ExecPlan<'a> {
keys: Vec<(String, Value<'a>)>, keys: Vec<(String, Value<'a>)>,
vals: Vec<(String, Value<'a>)>, vals: Vec<(String, Value<'a>)>,
}, },
BagsUnionIt { BagsUnionItPlan {
bags: Vec<ExecPlan<'a>>, bags: Vec<ExecPlan<'a>>,
}, },
LimiterIt { LimiterItPlan {
source: Box<ExecPlan<'a>>, source: Box<ExecPlan<'a>>,
offset: usize, offset: usize,
limit: usize, limit: usize,
}, },
SortingMatPlan {
source: Box<ExecPlan<'a>>,
ordering: Vec<(bool, StaticValue)>,
sess: SessionSlot<'a>
},
} }
impl<'a> ExecPlan<'a> { impl<'a> ExecPlan<'a> {
@ -231,7 +254,7 @@ impl<'a> ExecPlan<'a> {
ExecPlan::KeyedDifferenceItPlan { left, .. } => left.tuple_widths(), ExecPlan::KeyedDifferenceItPlan { left, .. } => left.tuple_widths(),
ExecPlan::FilterItPlan { source, .. } => source.tuple_widths(), ExecPlan::FilterItPlan { source, .. } => source.tuple_widths(),
ExecPlan::EvalItPlan { source, .. } => source.tuple_widths(), ExecPlan::EvalItPlan { source, .. } => source.tuple_widths(),
ExecPlan::BagsUnionIt { bags } => { ExecPlan::BagsUnionItPlan { bags } => {
if bags.is_empty() { if bags.is_empty() {
(0, 0) (0, 0)
} else { } else {
@ -245,7 +268,10 @@ impl<'a> ExecPlan<'a> {
let (l1, l2) = left.tuple_widths(); let (l1, l2) = left.tuple_widths();
(l1 + 1, l2 + 1 + right_associates.len()) (l1 + 1, l2 + 1 + right_associates.len())
} }
ExecPlan::LimiterIt { source, .. } => { ExecPlan::LimiterItPlan { source, .. } => {
source.tuple_widths()
}
ExecPlan::SortingMatPlan { source, .. } => {
source.tuple_widths() source.tuple_widths()
} }
} }
@ -385,7 +411,7 @@ impl<'a> ExecPlan<'a> {
started: false, started: false,
})) }))
} }
ExecPlan::BagsUnionIt { bags } => { ExecPlan::BagsUnionItPlan { bags } => {
let bags = bags.iter().map(|i| i.iter()).collect::<Result<Vec<_>>>()?; let bags = bags.iter().map(|i| i.iter()).collect::<Result<Vec<_>>>()?;
Ok(Box::new(BagsUnionIterator { bags, current: 0 })) Ok(Box::new(BagsUnionIterator { bags, current: 0 }))
} }
@ -458,7 +484,7 @@ impl<'a> ExecPlan<'a> {
}), }),
}), }),
}, },
ExecPlan::LimiterIt { source, limit, offset } => { ExecPlan::LimiterItPlan { source, limit, offset } => {
Ok(Box::new(LimiterIterator { Ok(Box::new(LimiterIterator {
source: source.iter()?, source: source.iter()?,
limit: *limit, limit: *limit,
@ -466,6 +492,21 @@ impl<'a> ExecPlan<'a> {
current: 0, current: 0,
})) }))
} }
ExecPlan::SortingMatPlan { source, ordering , sess } => {
match sess {
SessionSlot::Dummy => {
Err(LogicError("Uninitialized session data".to_string()))
}
SessionSlot::Reified(sess) => {
Ok(Box::new(SortingMaterialization {
source: source.iter()?,
ordering,
sess,
sorted: false
}))
}
}
}
} }
} }
} }
@ -656,7 +697,7 @@ impl<'a> Session<'a> {
}; };
(plan, amap) (plan, amap)
} }
ExecPlan::BagsUnionIt { .. } => todo!(), ExecPlan::BagsUnionItPlan { .. } => todo!(),
ExecPlan::ChainJoinItPlan { ExecPlan::ChainJoinItPlan {
left, left,
left_info, left_info,
@ -716,14 +757,27 @@ impl<'a> Session<'a> {
}; };
(plan, l_map) (plan, l_map)
} }
ExecPlan::LimiterIt { source, limit, offset } => { ExecPlan::LimiterItPlan { source, limit, offset } => {
let (source, amap) = self.do_reify_intermediate_plan(*source)?; let (source, amap) = self.do_reify_intermediate_plan(*source)?;
(ExecPlan::LimiterIt { (ExecPlan::LimiterItPlan {
source: source.into(), source: source.into(),
limit, limit,
offset, offset,
}, amap) }, amap)
} }
ExecPlan::SortingMatPlan { source, ordering, .. } => {
let (source, amap) = self.do_reify_intermediate_plan(*source)?;
let ordering = ordering.into_iter().map(|(is_asc, val)| -> Result<(bool, StaticValue)> {
let (_, val) = self.partial_eval(val, &Default::default(), &amap)?;
Ok((is_asc, val))
}).collect::<Result<Vec<_>>>()?;
let temp_table_id = self.get_next_storage_id(false)?;
(ExecPlan::SortingMatPlan {
source: source.into(),
ordering,
sess: SessionSlot::Reified(self)
}, amap)
}
}; };
Ok(res) Ok(res)
} }
@ -949,10 +1003,17 @@ impl<'a> Session<'a> {
mut plan: ExecPlan<'b>, mut plan: ExecPlan<'b>,
select_data: Selection, select_data: Selection,
) -> Result<ExecPlan<'b>> { ) -> Result<ExecPlan<'b>> {
if !select_data.ordering.is_empty() {
plan = ExecPlan::SortingMatPlan {
source: plan.into(),
ordering: select_data.ordering,
sess: SessionSlot::Dummy
};
}
if select_data.limit.is_some() || select_data.offset.is_some() { if select_data.limit.is_some() || select_data.offset.is_some() {
let limit = select_data.limit.unwrap_or(0) as usize; let limit = select_data.limit.unwrap_or(0) as usize;
let offset = select_data.offset.unwrap_or(0) as usize; let offset = select_data.offset.unwrap_or(0) as usize;
plan = ExecPlan::LimiterIt { plan = ExecPlan::LimiterItPlan {
source: plan.into(), source: plan.into(),
offset, offset,
limit, limit,

@ -290,7 +290,7 @@ impl<'a> Session<'a> {
for p in p.into_inner() { for p in p.into_inner() {
ordering.push(( ordering.push((
p.as_rule() == Rule::order_asc, p.as_rule() == Rule::order_asc,
parse_string(p.into_inner().next().unwrap())?, Value::from_pair(p.into_inner().next().unwrap())?.to_static(),
)) ))
} }
} }
@ -325,6 +325,8 @@ impl<'a> Session<'a> {
} }
} }
println!("ordering {:?}", ordering);
Ok(Selection { Ok(Selection {
scoped, scoped,
keys, keys,
@ -341,7 +343,7 @@ pub struct Selection {
pub scoped: Option<String>, pub scoped: Option<String>,
pub keys: Vec<(String, StaticValue)>, pub keys: Vec<(String, StaticValue)>,
pub vals: Vec<(String, StaticValue)>, pub vals: Vec<(String, StaticValue)>,
pub ordering: Vec<(bool, String)>, pub ordering: Vec<(bool, StaticValue)>,
pub limit: Option<i64>, pub limit: Option<i64>,
pub offset: Option<i64>, pub offset: Option<i64>,
} }

@ -191,9 +191,9 @@ where_pattern = { "where" ~ (expr ~ ",")* ~ expr }
select_pattern = { "select" ~ (select_dict | scoped_dict) ~ ( (order_pattern ~ offset_pattern?) | (offset_pattern ~ order_pattern?) )? } select_pattern = { "select" ~ (select_dict | scoped_dict) ~ ( (order_pattern ~ offset_pattern?) | (offset_pattern ~ order_pattern?) )? }
order_pattern = { "ordered" ~ "[" ~ order_el ~ ("," ~ order_el)* ~ "]" } order_pattern = { "ordered" ~ "[" ~ order_el ~ ("," ~ order_el)* ~ "]" }
order_el = _{order_asc | order_dsc} order_el = _{order_dsc | order_asc}
order_asc = {expr ~ (":" ~ "asc")?}
order_dsc = {expr ~ (":" ~ "desc")} order_dsc = {expr ~ (":" ~ "desc")}
order_asc = {expr ~ (":" ~ "asc")?}
offset_pattern = { limit_clause? ~ offset_clause? } offset_pattern = { limit_clause? ~ offset_clause? }
limit_clause = { "limit" ~ pos_int } limit_clause = { "limit" ~ pos_int }
offset_clause = { "offset" ~ pos_int } offset_clause = { "offset" ~ pos_int }

Loading…
Cancel
Save