rework walk builder

main
Ziyang Hu 2 years ago
parent eb34d3e49b
commit 5e5a6b9888

@ -1,5 +1,5 @@
use crate::algebra::op::{
build_binding_map_from_info, parse_chain, unique_prefix_nested_loop, ChainPart,
build_binding_map_from_info, parse_chain, unique_prefix_nested_loop, ChainEl, ChainPart,
ChainPartEdgeDir, FilterError, InterpretContext, JoinType, NestLoopLeftPrefixIter,
RelationalAlgebra, SelectOpError, SortDirection, NAME_SKIP, NAME_SORT, NAME_TAKE, NAME_WHERE,
};
@ -59,213 +59,31 @@ impl<'a> WalkOp<'a> {
return Err(WalkError::Chain.into());
}
// check no dup binding
check_chain_dup_binding(&chain)?;
let mut bindings: BTreeSet<&str> = BTreeSet::new();
for el in &chain {
if !bindings.insert(&el.binding) {
return Err(AlgebraParseError::DuplicateBinding(el.binding.to_string()).into());
}
}
// check the chain connects, and get the table infos
let mut chain = chain.into_iter();
let first_el = chain.next().unwrap();
let (first_info, first_assocs) =
get_chain_el_info(ctx, &first_el.target, &first_el.assocs)?;
let bmap_inner = build_binding_map_from_info(ctx, &first_info, &first_assocs, true)?;
let mut binding_maps = vec![BindingMap {
inner_map: BTreeMap::from([(first_el.binding.clone(), bmap_inner)]),
key_size: 1,
val_size: 1 + first_el.assocs.len(),
}];
let mut starting_el = StartingEl {
node_info: first_info.into_node()?,
assocs: first_assocs,
binding: first_el.binding,
pivot: false,
ops: vec![],
};
let mut last_node_tid = starting_el.node_info.tid;
let mut hops = vec![];
loop {
match chain.next() {
None => break,
Some(el) => {
let (edge_info, edge_assocs) = get_chain_el_info(ctx, &el.target, &el.assocs)?;
let bmap_inner =
build_binding_map_from_info(ctx, &edge_info, &edge_assocs, true)?;
let e_bmap = BindingMap {
inner_map: BTreeMap::from([(el.binding.clone(), bmap_inner)]),
key_size: 1,
val_size: 1 + el.assocs.len(),
};
let edge_info = edge_info.into_edge()?;
let edge_binding = el.binding;
let direction = match el.part {
ChainPart::Edge { dir, join } => {
if join != JoinType::Inner {
return Err(WalkError::OuterJoin.into());
}
dir
}
_ => unreachable!(),
};
let el = chain.next().unwrap();
let (node_info, node_assocs) = get_chain_el_info(ctx, &el.target, &el.assocs)?;
let (mut starting_el, mut hops, binding_maps) = resolve_walk_chain(ctx, chain)?;
let bmap_inner =
build_binding_map_from_info(ctx, &node_info, &node_assocs, true)?;
let n_bmap = BindingMap {
inner_map: BTreeMap::from([(el.binding.clone(), bmap_inner)]),
key_size: 1,
val_size: 1 + el.assocs.len(),
};
binding_maps.push(merge_binding_maps(
[binding_maps.last().unwrap().clone(), e_bmap, n_bmap].into_iter(),
));
let node_info = node_info.into_node()?;
let node_binding = el.binding;
match direction {
ChainPartEdgeDir::Fwd => {
if edge_info.src_id != last_node_tid
|| edge_info.dst_id != node_info.tid
{
return Err(WalkError::Disconnect.into());
}
}
ChainPartEdgeDir::Bwd => {
if edge_info.dst_id != last_node_tid
|| edge_info.src_id != node_info.tid
{
return Err(WalkError::Disconnect.into());
}
}
}
last_node_tid = node_info.tid;
let hop = HoppingEls {
node_info,
node_assocs,
node_binding,
edge_info,
edge_assocs,
edge_binding,
direction,
pivot: false,
ops: vec![],
};
hops.push(hop);
}
}
}
let mut collectors = vec![];
let mut bindings = vec![];
let mut pivots = vec![];
for arg in args {
let arg = arg.into_inner().next().unwrap();
match arg.as_rule() {
Rule::walk_cond => {
let (binding, ops) = parse_walk_cond(arg)?;
let mut found = false;
if binding == starting_el.binding {
found = true;
starting_el.ops.extend(ops);
pivots.push(TableInfo::Node(starting_el.node_info.clone()));
} else {
for hop in hops.iter_mut() {
if hop.node_binding == binding || hop.edge_binding == binding {
found = true;
hop.ops.extend(ops);
break;
}
}
}
if !found {
return Err(WalkError::UnboundFilter.into());
}
}
Rule::scoped_dict => {
let (binding, keys, vals) = parse_scoped_dict(arg)?;
if !keys.is_empty() {
return Err(WalkError::Keyed.into());
}
let mut found = false;
if binding == starting_el.binding {
found = true;
starting_el.pivot = true;
} else {
for hop in hops.iter_mut() {
if hop.node_binding == binding || hop.edge_binding == binding {
if hop.node_binding == binding {
pivots.push(TableInfo::Node(hop.node_info.clone()));
} else {
pivots.push(TableInfo::Edge(hop.edge_info.clone()));
}
hop.pivot = true;
found = true;
break;
}
}
}
if !found {
return Err(WalkError::UnboundCollection.into());
} else {
collectors.push(vals);
bindings.push(binding);
}
}
_ => unreachable!(),
}
}
if collectors.len() != 1 {
return Err(WalkError::CollectorNumberMismatch.into());
}
let collector = collectors.pop().unwrap();
let source_map = binding_maps.last().unwrap();
let binding_ctx = BindingMapEvalContext {
map: source_map,
parent: ctx,
};
let extraction_map = match collector.clone().partial_eval(&binding_ctx)? {
Expr::Dict(d) => d,
Expr::Const(Value::Dict(d)) => d
.into_iter()
.map(|(k, v)| (k.to_string(), Expr::Const(v.clone())))
.collect(),
ex => return Err(SelectOpError::NeedsDict(ex).into()),
};
let (binding, extraction_map) = parse_walk_conditions_and_collectors(
ctx,
args,
&mut starting_el,
&mut hops,
binding_maps.last().unwrap(),
)?;
Ok(Self {
ctx,
starting: starting_el,
hops,
extraction_map,
binding: bindings.pop().unwrap(),
binding,
binding_maps,
})
}
}
#[derive(Debug)]
struct StartingEl {
pub(crate) struct StartingEl {
node_info: NodeInfo,
assocs: Vec<AssocInfo>,
binding: String,
@ -274,7 +92,7 @@ struct StartingEl {
}
#[derive(Debug)]
struct HoppingEls {
pub(crate) struct HoppingEls {
node_info: NodeInfo,
node_assocs: Vec<AssocInfo>,
node_binding: String,
@ -1096,3 +914,204 @@ fn in_mem_sort(
fn sort_value_comparator(a: &(Vec<Value>, TupleSet), b: &(Vec<Value>, TupleSet)) -> Ordering {
a.0.cmp(&b.0)
}
pub(crate) fn check_chain_dup_binding(chain: &[ChainEl]) -> Result<()> {
let mut bindings: BTreeSet<&str> = BTreeSet::new();
for el in chain {
if !bindings.insert(&el.binding) {
return Err(AlgebraParseError::DuplicateBinding(el.binding.to_string()).into());
}
}
Ok(())
}
pub(crate) fn resolve_walk_chain(
ctx: &TempDbContext,
chain: Vec<ChainEl>,
) -> Result<(StartingEl, Vec<HoppingEls>, Vec<BindingMap>)> {
let mut chain = chain.into_iter();
let first_el = chain.next().unwrap();
let (first_info, first_assocs) = get_chain_el_info(ctx, &first_el.target, &first_el.assocs)?;
let bmap_inner = build_binding_map_from_info(ctx, &first_info, &first_assocs, true)?;
let mut binding_maps = vec![BindingMap {
inner_map: BTreeMap::from([(first_el.binding.clone(), bmap_inner)]),
key_size: 1,
val_size: 1 + first_el.assocs.len(),
}];
let mut starting_el = StartingEl {
node_info: first_info.into_node()?,
assocs: first_assocs,
binding: first_el.binding,
pivot: false,
ops: vec![],
};
let mut last_node_tid = starting_el.node_info.tid;
let mut hops = vec![];
loop {
match chain.next() {
None => break,
Some(el) => {
let (edge_info, edge_assocs) = get_chain_el_info(ctx, &el.target, &el.assocs)?;
let bmap_inner = build_binding_map_from_info(ctx, &edge_info, &edge_assocs, true)?;
let e_bmap = BindingMap {
inner_map: BTreeMap::from([(el.binding.clone(), bmap_inner)]),
key_size: 1,
val_size: 1 + el.assocs.len(),
};
let edge_info = edge_info.into_edge()?;
let edge_binding = el.binding;
let direction = match el.part {
ChainPart::Edge { dir, join } => {
if join != JoinType::Inner {
return Err(WalkError::OuterJoin.into());
}
dir
}
_ => unreachable!(),
};
let el = chain.next().unwrap();
let (node_info, node_assocs) = get_chain_el_info(ctx, &el.target, &el.assocs)?;
let bmap_inner = build_binding_map_from_info(ctx, &node_info, &node_assocs, true)?;
let n_bmap = BindingMap {
inner_map: BTreeMap::from([(el.binding.clone(), bmap_inner)]),
key_size: 1,
val_size: 1 + el.assocs.len(),
};
binding_maps.push(merge_binding_maps(
[binding_maps.last().unwrap().clone(), e_bmap, n_bmap].into_iter(),
));
let node_info = node_info.into_node()?;
let node_binding = el.binding;
match direction {
ChainPartEdgeDir::Fwd => {
if edge_info.src_id != last_node_tid || edge_info.dst_id != node_info.tid {
return Err(WalkError::Disconnect.into());
}
}
ChainPartEdgeDir::Bwd => {
if edge_info.dst_id != last_node_tid || edge_info.src_id != node_info.tid {
return Err(WalkError::Disconnect.into());
}
}
}
last_node_tid = node_info.tid;
let hop = HoppingEls {
node_info,
node_assocs,
node_binding,
edge_info,
edge_assocs,
edge_binding,
direction,
pivot: false,
ops: vec![],
};
hops.push(hop);
}
}
}
Ok((starting_el, hops, binding_maps))
}
pub(crate) fn parse_walk_conditions_and_collectors(
ctx: &TempDbContext,
args: Pairs,
starting_el: &mut StartingEl,
hops: &mut Vec<HoppingEls>,
binding_map: &BindingMap,
) -> Result<(String, BTreeMap<String, Expr>)> {
let mut collectors = vec![];
let mut bindings = vec![];
let mut pivots = vec![];
for arg in args {
let arg = arg.into_inner().next().unwrap();
match arg.as_rule() {
Rule::walk_cond => {
let (binding, ops) = parse_walk_cond(arg)?;
let mut found = false;
if binding == starting_el.binding {
found = true;
starting_el.ops.extend(ops);
pivots.push(TableInfo::Node(starting_el.node_info.clone()));
} else {
for hop in hops.iter_mut() {
if hop.node_binding == binding || hop.edge_binding == binding {
found = true;
hop.ops.extend(ops);
break;
}
}
}
if !found {
return Err(WalkError::UnboundFilter.into());
}
}
Rule::scoped_dict => {
let (binding, keys, vals) = parse_scoped_dict(arg)?;
if !keys.is_empty() {
return Err(WalkError::Keyed.into());
}
let mut found = false;
if binding == starting_el.binding {
found = true;
starting_el.pivot = true;
} else {
for hop in hops.iter_mut() {
if hop.node_binding == binding || hop.edge_binding == binding {
if hop.node_binding == binding {
pivots.push(TableInfo::Node(hop.node_info.clone()));
} else {
pivots.push(TableInfo::Edge(hop.edge_info.clone()));
}
hop.pivot = true;
found = true;
break;
}
}
}
if !found {
return Err(WalkError::UnboundCollection.into());
} else {
collectors.push(vals);
bindings.push(binding);
}
}
_ => unreachable!(),
}
}
if collectors.len() != 1 {
return Err(WalkError::CollectorNumberMismatch.into());
}
let collector = collectors.pop().unwrap();
// let source_map = binding_maps.last().unwrap();
let binding_ctx = BindingMapEvalContext {
map: binding_map,
parent: ctx,
};
let extraction_map = match collector.clone().partial_eval(&binding_ctx)? {
Expr::Dict(d) => d,
Expr::Const(Value::Dict(d)) => d
.into_iter()
.map(|(k, v)| (k.to_string(), Expr::Const(v.clone())))
.collect(),
ex => return Err(SelectOpError::NeedsDict(ex).into()),
};
Ok((bindings.pop().unwrap(), extraction_map))
}

Loading…
Cancel
Save