|
|
@ -18,8 +18,10 @@ use crate::parser::{Pair, Pairs, Rule};
|
|
|
|
use crate::runtime::options::{default_read_options, default_write_options};
|
|
|
|
use crate::runtime::options::{default_read_options, default_write_options};
|
|
|
|
use anyhow::Result;
|
|
|
|
use anyhow::Result;
|
|
|
|
use cozorocks::RowIterator;
|
|
|
|
use cozorocks::RowIterator;
|
|
|
|
|
|
|
|
use extsort::Sortable;
|
|
|
|
use std::cmp::{Ordering, Reverse};
|
|
|
|
use std::cmp::{Ordering, Reverse};
|
|
|
|
use std::collections::{BTreeMap, BTreeSet};
|
|
|
|
use std::collections::{BTreeMap, BTreeSet};
|
|
|
|
|
|
|
|
use std::io::{Read, Write};
|
|
|
|
use std::mem;
|
|
|
|
use std::mem;
|
|
|
|
|
|
|
|
|
|
|
|
pub(crate) const NAME_WALK: &str = "Walk";
|
|
|
|
pub(crate) const NAME_WALK: &str = "Walk";
|
|
|
@ -112,7 +114,7 @@ impl<'a> WalkOp<'a> {
|
|
|
|
},
|
|
|
|
},
|
|
|
|
)
|
|
|
|
)
|
|
|
|
.collect::<Result<Vec<_>>>()?;
|
|
|
|
.collect::<Result<Vec<_>>>()?;
|
|
|
|
it = Box::new(in_mem_sort(it, sort_exprs)?)
|
|
|
|
it = Box::new(maybe_in_mem_sort(it, sort_exprs)?)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
WalkElOp::Filter(expr) => {
|
|
|
|
WalkElOp::Filter(expr) => {
|
|
|
|
let expr = expr.clone().partial_eval(&first_binding_ctx)?;
|
|
|
|
let expr = expr.clone().partial_eval(&first_binding_ctx)?;
|
|
|
@ -400,6 +402,7 @@ impl<'a> WalkOp<'a> {
|
|
|
|
#[derive(Debug)]
|
|
|
|
#[derive(Debug)]
|
|
|
|
pub(crate) struct StartingEl {
|
|
|
|
pub(crate) struct StartingEl {
|
|
|
|
node_info: NodeInfo,
|
|
|
|
node_info: NodeInfo,
|
|
|
|
|
|
|
|
#[allow(dead_code)]
|
|
|
|
assocs: Vec<AssocInfo>,
|
|
|
|
assocs: Vec<AssocInfo>,
|
|
|
|
binding: String,
|
|
|
|
binding: String,
|
|
|
|
pivot: bool,
|
|
|
|
pivot: bool,
|
|
|
@ -409,9 +412,11 @@ pub(crate) struct StartingEl {
|
|
|
|
#[derive(Debug)]
|
|
|
|
#[derive(Debug)]
|
|
|
|
pub(crate) struct HoppingEls {
|
|
|
|
pub(crate) struct HoppingEls {
|
|
|
|
node_info: NodeInfo,
|
|
|
|
node_info: NodeInfo,
|
|
|
|
|
|
|
|
#[allow(dead_code)]
|
|
|
|
node_assocs: Vec<AssocInfo>,
|
|
|
|
node_assocs: Vec<AssocInfo>,
|
|
|
|
node_binding: String,
|
|
|
|
node_binding: String,
|
|
|
|
edge_info: EdgeInfo,
|
|
|
|
edge_info: EdgeInfo,
|
|
|
|
|
|
|
|
#[allow(dead_code)]
|
|
|
|
edge_assocs: Vec<AssocInfo>,
|
|
|
|
edge_assocs: Vec<AssocInfo>,
|
|
|
|
edge_binding: String,
|
|
|
|
edge_binding: String,
|
|
|
|
direction: ChainPartEdgeDir,
|
|
|
|
direction: ChainPartEdgeDir,
|
|
|
@ -435,8 +440,6 @@ pub(crate) enum WalkError {
|
|
|
|
UnboundFilter,
|
|
|
|
UnboundFilter,
|
|
|
|
#[error("No/multiple collectors")]
|
|
|
|
#[error("No/multiple collectors")]
|
|
|
|
CollectorNumberMismatch,
|
|
|
|
CollectorNumberMismatch,
|
|
|
|
#[error("Starting el cannot have sorters")]
|
|
|
|
|
|
|
|
SorterOnStart,
|
|
|
|
|
|
|
|
#[error("Unsupported operation {0} on walk element")]
|
|
|
|
#[error("Unsupported operation {0} on walk element")]
|
|
|
|
UnsupportedWalkOp(String),
|
|
|
|
UnsupportedWalkOp(String),
|
|
|
|
#[error("Wrong argument to walk op")]
|
|
|
|
#[error("Wrong argument to walk op")]
|
|
|
@ -909,7 +912,7 @@ fn clustered_in_mem_sort(
|
|
|
|
Ok(it)
|
|
|
|
Ok(it)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
fn in_mem_sort(
|
|
|
|
fn maybe_in_mem_sort(
|
|
|
|
it: Box<dyn Iterator<Item = Result<TupleSet>>>,
|
|
|
|
it: Box<dyn Iterator<Item = Result<TupleSet>>>,
|
|
|
|
sort_exprs: Vec<(Expr, SortDirection)>,
|
|
|
|
sort_exprs: Vec<(Expr, SortDirection)>,
|
|
|
|
) -> Result<impl Iterator<Item = Result<TupleSet>>> {
|
|
|
|
) -> Result<impl Iterator<Item = Result<TupleSet>>> {
|
|
|
@ -919,30 +922,138 @@ fn in_mem_sort(
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
let mut collected: Vec<(Vec<Value>, TupleSet)> = it
|
|
|
|
let collected_iter = it.map(move |v| match v {
|
|
|
|
.map(|v| match v {
|
|
|
|
Err(err) => {
|
|
|
|
Err(e) => Err(e),
|
|
|
|
dbg!(err);
|
|
|
|
Ok(v) => {
|
|
|
|
TupleSetSortEl {
|
|
|
|
match sort_exprs
|
|
|
|
is_ok: false,
|
|
|
|
.iter()
|
|
|
|
keys: vec![],
|
|
|
|
.map(|(ex, dir)| -> Result<Value> {
|
|
|
|
tset: Default::default(),
|
|
|
|
let mut res = ex.row_eval(&v)?.into_static();
|
|
|
|
|
|
|
|
if *dir == SortDirection::Dsc {
|
|
|
|
|
|
|
|
res = Value::DescVal(Reverse(res.into()))
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(res)
|
|
|
|
|
|
|
|
})
|
|
|
|
|
|
|
|
.collect::<Result<Vec<_>>>()
|
|
|
|
|
|
|
|
{
|
|
|
|
|
|
|
|
Err(e) => Err(e),
|
|
|
|
|
|
|
|
Ok(sort_vals) => Ok((sort_vals, v.into_owned())),
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
})
|
|
|
|
}
|
|
|
|
.collect::<Result<Vec<_>>>()?;
|
|
|
|
Ok(v) => {
|
|
|
|
|
|
|
|
match sort_exprs
|
|
|
|
|
|
|
|
.iter()
|
|
|
|
|
|
|
|
.map(|(ex, dir)| -> Result<Value> {
|
|
|
|
|
|
|
|
let mut res = ex.row_eval(&v)?.into_static();
|
|
|
|
|
|
|
|
if *dir == SortDirection::Dsc {
|
|
|
|
|
|
|
|
res = Value::DescVal(Reverse(res.into()))
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(res)
|
|
|
|
|
|
|
|
})
|
|
|
|
|
|
|
|
.collect::<Result<Vec<_>>>()
|
|
|
|
|
|
|
|
{
|
|
|
|
|
|
|
|
Err(_) => TupleSetSortEl {
|
|
|
|
|
|
|
|
is_ok: false,
|
|
|
|
|
|
|
|
keys: vec![],
|
|
|
|
|
|
|
|
tset: Default::default(),
|
|
|
|
|
|
|
|
},
|
|
|
|
|
|
|
|
Ok(sort_vals) => TupleSetSortEl {
|
|
|
|
|
|
|
|
is_ok: true,
|
|
|
|
|
|
|
|
keys: sort_vals,
|
|
|
|
|
|
|
|
tset: v.into_owned(),
|
|
|
|
|
|
|
|
},
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
let sorter = extsort::ExternalSorter::new();
|
|
|
|
|
|
|
|
let ret = sorter.sort(collected_iter).unwrap();
|
|
|
|
|
|
|
|
Ok(ret.map(|st| {
|
|
|
|
|
|
|
|
if st.is_ok {
|
|
|
|
|
|
|
|
Ok(st.tset)
|
|
|
|
|
|
|
|
} else {
|
|
|
|
|
|
|
|
Err(AlgebraParseError::Sorting.into())
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}))
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#[derive(Eq, PartialEq)]
|
|
|
|
|
|
|
|
struct TupleSetSortEl<'a> {
|
|
|
|
|
|
|
|
is_ok: bool,
|
|
|
|
|
|
|
|
keys: Vec<Value<'a>>,
|
|
|
|
|
|
|
|
tset: TupleSet,
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
impl<'a> PartialOrd for TupleSetSortEl<'a> {
|
|
|
|
|
|
|
|
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
|
|
|
|
|
|
|
Some(self.cmp(other))
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
collected.sort_by(sort_value_comparator);
|
|
|
|
impl<'a> Ord for TupleSetSortEl<'a> {
|
|
|
|
Ok(collected.into_iter().map(|(_, v)| Ok(v)))
|
|
|
|
fn cmp(&self, other: &Self) -> Ordering {
|
|
|
|
|
|
|
|
match self.is_ok.cmp(&other.is_ok) {
|
|
|
|
|
|
|
|
Ordering::Equal => self.keys.cmp(&other.keys),
|
|
|
|
|
|
|
|
v => v,
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
impl<'a> Sortable for TupleSetSortEl<'a> {
|
|
|
|
|
|
|
|
fn encode<W: Write>(&self, writer: &mut W) {
|
|
|
|
|
|
|
|
let mut out = OwnTuple::with_null_prefix();
|
|
|
|
|
|
|
|
if !self.is_ok {
|
|
|
|
|
|
|
|
out.push_null();
|
|
|
|
|
|
|
|
writer.write_all(out.as_ref()).unwrap();
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
out.push_int(self.keys.len() as i64);
|
|
|
|
|
|
|
|
for val in &self.keys {
|
|
|
|
|
|
|
|
out.push_value(val);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
out.push_int(self.tset.keys.len() as i64);
|
|
|
|
|
|
|
|
out.push_int(self.tset.vals.len() as i64);
|
|
|
|
|
|
|
|
for k in &self.tset.keys {
|
|
|
|
|
|
|
|
out.push_bytes(k.as_ref());
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
for v in &self.tset.vals {
|
|
|
|
|
|
|
|
out.push_bytes(v.as_ref());
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
writer.write_all(out.as_ref()).unwrap()
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
fn decode<R: Read>(reader: &mut R) -> Option<Self> {
|
|
|
|
|
|
|
|
let mut buf = vec![];
|
|
|
|
|
|
|
|
reader.read_to_end(&mut buf).ok()?;
|
|
|
|
|
|
|
|
let source = OwnTuple::new(buf);
|
|
|
|
|
|
|
|
let key_len = match source.get_int(0) {
|
|
|
|
|
|
|
|
Ok(len) => len as usize,
|
|
|
|
|
|
|
|
Err(err) => {
|
|
|
|
|
|
|
|
dbg!(err);
|
|
|
|
|
|
|
|
return Some(Self {
|
|
|
|
|
|
|
|
is_ok: false,
|
|
|
|
|
|
|
|
keys: vec![],
|
|
|
|
|
|
|
|
tset: Default::default(),
|
|
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
};
|
|
|
|
|
|
|
|
let mut keys = Vec::with_capacity(key_len);
|
|
|
|
|
|
|
|
for i in 1..(key_len + 1) {
|
|
|
|
|
|
|
|
keys.push(source.get(i).ok()?.into_static());
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
let k_len = source.get_int(key_len + 1).ok()? as usize;
|
|
|
|
|
|
|
|
let v_len = source.get_int(key_len + 2).ok()? as usize;
|
|
|
|
|
|
|
|
let mut tset = TupleSet {
|
|
|
|
|
|
|
|
keys: Vec::with_capacity(k_len),
|
|
|
|
|
|
|
|
vals: Vec::with_capacity(v_len),
|
|
|
|
|
|
|
|
};
|
|
|
|
|
|
|
|
for i in 3 + key_len..3 + key_len + k_len {
|
|
|
|
|
|
|
|
let d = source.get(i).ok()?;
|
|
|
|
|
|
|
|
let d = d.get_bytes()?;
|
|
|
|
|
|
|
|
tset.keys.push(OwnTuple::new(d.to_vec()).into());
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
for i in 3 + key_len + k_len..3 + key_len + k_len + v_len {
|
|
|
|
|
|
|
|
let d = source.get(i).ok()?;
|
|
|
|
|
|
|
|
let d = d.get_bytes()?;
|
|
|
|
|
|
|
|
tset.vals.push(OwnTuple::new(d.to_vec()).into());
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
Some(Self {
|
|
|
|
|
|
|
|
is_ok: true,
|
|
|
|
|
|
|
|
keys,
|
|
|
|
|
|
|
|
tset,
|
|
|
|
|
|
|
|
})
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
fn sort_value_comparator(a: &(Vec<Value>, TupleSet), b: &(Vec<Value>, TupleSet)) -> Ordering {
|
|
|
|
fn sort_value_comparator(a: &(Vec<Value>, TupleSet), b: &(Vec<Value>, TupleSet)) -> Ordering {
|
|
|
@ -1064,7 +1175,7 @@ pub(crate) fn parse_walk_conditions_and_collectors(
|
|
|
|
ctx: &TempDbContext,
|
|
|
|
ctx: &TempDbContext,
|
|
|
|
args: Pairs,
|
|
|
|
args: Pairs,
|
|
|
|
starting_el: &mut StartingEl,
|
|
|
|
starting_el: &mut StartingEl,
|
|
|
|
hops: &mut Vec<HoppingEls>,
|
|
|
|
hops: &mut [HoppingEls],
|
|
|
|
binding_map: &BindingMap,
|
|
|
|
binding_map: &BindingMap,
|
|
|
|
) -> Result<(String, BTreeMap<String, Expr>)> {
|
|
|
|
) -> Result<(String, BTreeMap<String, Expr>)> {
|
|
|
|
let mut collectors = vec![];
|
|
|
|
let mut collectors = vec![];
|
|
|
@ -1139,13 +1250,13 @@ pub(crate) fn parse_walk_conditions_and_collectors(
|
|
|
|
map: binding_map,
|
|
|
|
map: binding_map,
|
|
|
|
parent: ctx,
|
|
|
|
parent: ctx,
|
|
|
|
};
|
|
|
|
};
|
|
|
|
let extraction_map = match collector.clone().partial_eval(&binding_ctx)? {
|
|
|
|
let extraction_map = match collector.partial_eval(&binding_ctx)? {
|
|
|
|
Expr::Dict(d) => d,
|
|
|
|
Expr::Dict(d) => d,
|
|
|
|
Expr::Const(Value::Dict(d)) => d
|
|
|
|
Expr::Const(Value::Dict(d)) => d
|
|
|
|
.into_iter()
|
|
|
|
.into_iter()
|
|
|
|
.map(|(k, v)| (k.to_string(), Expr::Const(v.clone())))
|
|
|
|
.map(|(k, v)| (k.to_string(), Expr::Const(v.clone())))
|
|
|
|
.collect(),
|
|
|
|
.collect(),
|
|
|
|
ex => return Err(SelectOpError::NeedsDict.into()),
|
|
|
|
_ex => return Err(SelectOpError::NeedsDict.into()),
|
|
|
|
};
|
|
|
|
};
|
|
|
|
Ok((bindings.pop().unwrap(), extraction_map))
|
|
|
|
Ok((bindings.pop().unwrap(), extraction_map))
|
|
|
|
}
|
|
|
|
}
|
|
|
|