parsing algo application

main
Ziyang Hu 2 years ago
parent df429e7451
commit 0962af3762

@ -2,4 +2,23 @@
* [ ] more complete functions and aggregations
* [ ] more complete tx tests
* [ ] graph algorithms
* [ ] graph algorithms
* [ ] bfs
* [ ] dfs
* [ ] shortest path
* [ ] A*
* [ ] Yen's k-shortest
* [ ] all-pairs shortest path
* [ ] single-source shortest path
* [ ] minimum spanning tree
* [ ] random walking
* [ ] degree centrality
* [ ] closeness centrality
* [ ] betweenness centrality
* [ ] pagerank
* [ ] triangle counting
* [ ] strongly connected components
* [ ] connected components
* [ ] label propagation
* [ ] louvain modularity
* [ ] direct loading of data

@ -0,0 +1 @@
pub(crate) mod page_rank;

@ -1,4 +1,4 @@
script = _{SOI ~ (option | rule | const_rule)+ ~ EOI}
script = _{SOI ~ (option | rule | const_rule | algo_rule)+ ~ EOI}
schema_script = _{SOI ~ schema_clause+ ~ EOI}
tx_script = _{SOI ~ tx_clause+ ~ EOI}
@ -11,15 +11,22 @@ prog_entry = {"?"}
var = @{"?" ~ (XID_CONTINUE | "_")*}
param = @{"$" ~ (XID_CONTINUE | "_")*}
ident = @{XID_START ~ ("_" | XID_CONTINUE)*}
algo_ident = @{XID_START ~ ("_" | XID_CONTINUE)* ~ "!"}
view_ident = @{":" ~ ident}
compound_ident = {ident ~ ("." ~ ident)?}
rule = {rule_head ~ ("@" ~ expr)? ~ ":=" ~ rule_body ~ ";"}
const_rule = {ident ~ "<-" ~ expr ~ ";" }
algo_rule = {ident ~ "<-" ~ algo_ident ~ "(" ~ (algo_arg ~ ",")* ~ algo_arg? ~ ")" ~ ";" }
rule_head = {(prog_entry | ident) ~ "[" ~ (head_arg ~ ",")* ~ head_arg? ~ "]"}
head_arg = {aggr_arg | var}
aggr_arg = {ident ~ "(" ~ var ~ ("," ~ expr)* ~ ")"}
algo_arg = _{algo_rel | algo_opt_pair}
algo_opt_pair = {ident ~ "=" ~ expr}
algo_rel = {ident | view_ident | algo_triple_rel}
algo_triple_rel = { rev_triple_marker? ~ "[" ~ ident ~ "]"}
rev_triple_marker = {"<"}
rule_body = {(disjunction ~ ",")* ~ disjunction?}
rule_apply = {ident ~ "[" ~ apply_args ~ "]"}

@ -1191,7 +1191,7 @@ pub(crate) fn get_op(name: &str) -> Option<&'static Op> {
"last" => &OP_LAST,
"chunks" => &OP_CHUNKS,
"chunks_exact" => &OP_CHUNKS_EXACT,
"windows" => &OP_WINDOW,
"windows" => &OP_WINDOWS,
_ => return None,
})
}

@ -2,14 +2,15 @@ use std::collections::btree_map::Entry;
use std::collections::{BTreeMap, BTreeSet};
use std::fmt::{Debug, Formatter};
use anyhow::Result;
use anyhow::{anyhow, ensure, Result};
use itertools::Itertools;
use smallvec::SmallVec;
use crate::data::aggr::Aggregation;
use crate::data::attr::Attribute;
use crate::data::expr::Expr;
use crate::data::id::{EntityId, Validity};
use crate::data::symb::Symbol;
use crate::data::symb::{Symbol, PROG_ENTRY};
use crate::data::value::DataValue;
#[derive(Default)]
@ -24,60 +25,150 @@ impl TempSymbGen {
}
}
#[derive(Debug, Clone)]
pub(crate) enum RulesOrAlgo {
Rules(Vec<InputRule>),
Algo(AlgoApply),
}
impl RulesOrAlgo {
pub(crate) fn get_rules_mut(&mut self) -> Option<&mut Vec<InputRule>> {
match self {
RulesOrAlgo::Rules(r) => Some(r),
_ => None
}
}
pub(crate) fn get_rules(&self) -> Option<&[InputRule]> {
match self {
RulesOrAlgo::Rules(r) => Some(r),
_ => None,
}
}
}
#[derive(Debug, Clone)]
pub(crate) struct AlgoApply {
pub(crate) algo_name: Symbol,
pub(crate) rule_args: Vec<AlgoRuleArg>,
pub(crate) options: BTreeMap<Symbol, Expr>,
}
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub(crate) enum TripleDir {
Fwd,
Bwd,
}
#[derive(Debug, Clone)]
pub(crate) enum AlgoRuleArg {
InMem(Symbol),
Stored(Symbol),
Triple(Symbol, TripleDir),
}
#[derive(Debug, Clone)]
pub(crate) struct InputProgram {
pub(crate) prog: BTreeMap<Symbol, Vec<InputRule>>,
pub(crate) prog: BTreeMap<Symbol, RulesOrAlgo>,
}
impl InputProgram {
pub(crate) fn validate_entry(&self) -> Result<()> {
match self
.prog
.get(&PROG_ENTRY)
.ok_or_else(|| anyhow!("program entry point not found"))?
{
RulesOrAlgo::Rules(r) => {
ensure!(
r.iter().map(|e| &e.head).all_equal(),
"program entry point must have equal bindings"
);
}
RulesOrAlgo::Algo(_) => {
todo!()
}
}
Ok(())
}
pub(crate) fn get_entry_arity(&self) -> Result<usize> {
match self
.prog
.get(&PROG_ENTRY)
.ok_or_else(|| anyhow!("program entry point not found"))?
{
RulesOrAlgo::Rules(rules) => Ok(rules[0].head.len()),
RulesOrAlgo::Algo(_algo) => {
todo!()
}
}
}
pub(crate) fn get_entry_head(&self) -> Result<&[Symbol]> {
match self
.prog
.get(&PROG_ENTRY)
.ok_or_else(|| anyhow!("program entry point not found"))?
{
RulesOrAlgo::Rules(rules) => Ok(&rules.last().unwrap().head),
RulesOrAlgo::Algo(_) => {
todo!()
}
}
}
pub(crate) fn to_normalized_program(self) -> Result<NormalFormProgram> {
let mut prog: BTreeMap<_, _> = Default::default();
for (k, rules) in self.prog {
let mut collected_rules = vec![];
for rule in rules {
let mut counter = -1;
let mut gen_symb = || {
counter += 1;
Symbol::from(&format!("***{}", counter) as &str)
};
let normalized_body =
InputAtom::Conjunction(rule.body).disjunctive_normal_form()?;
let mut new_head = Vec::with_capacity(rule.head.len());
let mut seen: BTreeMap<&Symbol, Vec<Symbol>> = BTreeMap::default();
for symb in rule.head.iter() {
match seen.entry(symb) {
Entry::Vacant(e) => {
e.insert(vec![]);
new_head.push(symb.clone());
for (k, rules_or_algo) in self.prog {
match rules_or_algo {
RulesOrAlgo::Rules(rules) => {
let mut collected_rules = vec![];
for rule in rules {
let mut counter = -1;
let mut gen_symb = || {
counter += 1;
Symbol::from(&format!("***{}", counter) as &str)
};
let normalized_body =
InputAtom::Conjunction(rule.body).disjunctive_normal_form()?;
let mut new_head = Vec::with_capacity(rule.head.len());
let mut seen: BTreeMap<&Symbol, Vec<Symbol>> = BTreeMap::default();
for symb in rule.head.iter() {
match seen.entry(symb) {
Entry::Vacant(e) => {
e.insert(vec![]);
new_head.push(symb.clone());
}
Entry::Occupied(mut e) => {
let new_symb = gen_symb();
e.get_mut().push(new_symb.clone());
new_head.push(new_symb);
}
}
}
Entry::Occupied(mut e) => {
let new_symb = gen_symb();
e.get_mut().push(new_symb.clone());
new_head.push(new_symb);
for conj in normalized_body.0 {
let mut body = conj.0;
for (old_symb, new_symbs) in seen.iter() {
for new_symb in new_symbs.iter() {
body.push(NormalFormAtom::Unification(Unification {
binding: new_symb.clone(),
expr: Expr::Binding((*old_symb).clone(), None),
one_many_unif: false,
}))
}
}
let normalized_rule = NormalFormRule {
head: new_head.clone(),
aggr: rule.aggr.clone(),
body,
vld: rule.vld,
};
collected_rules.push(normalized_rule.convert_to_well_ordered_rule()?);
}
}
prog.insert(k, collected_rules);
}
for conj in normalized_body.0 {
let mut body = conj.0;
for (old_symb, new_symbs) in seen.iter() {
for new_symb in new_symbs.iter() {
body.push(NormalFormAtom::Unification(Unification {
binding: new_symb.clone(),
expr: Expr::Binding((*old_symb).clone(), None),
one_many_unif: false
}))
}
}
let normalized_rule = NormalFormRule {
head: new_head.clone(),
aggr: rule.aggr.clone(),
body,
vld: rule.vld,
};
collected_rules.push(normalized_rule.convert_to_well_ordered_rule()?);
RulesOrAlgo::Algo(algo_apply) => {
todo!()
}
}
prog.insert(k, collected_rules);
}
Ok(NormalFormProgram { prog })
}
@ -342,7 +433,7 @@ pub(crate) enum InputTerm<T> {
pub(crate) struct Unification {
pub(crate) binding: Symbol,
pub(crate) expr: Expr,
pub(crate) one_many_unif: bool
pub(crate) one_many_unif: bool,
}
impl Unification {

@ -19,3 +19,4 @@ pub(crate) mod query;
pub(crate) mod runtime;
pub(crate) mod transact;
pub(crate) mod utils;
pub(crate) mod algo;

@ -22,9 +22,14 @@ fn parsed_to_json(src: Pairs<'_>) -> Result<JsonValue> {
let mut ret_map = Map::default();
let mut rules = vec![];
let mut const_rules = Map::default();
let mut algo_applies = vec![];
for pair in src {
match pair.as_rule() {
Rule::rule => rules.push(parse_rule(pair)?),
Rule::algo_rule => {
let apply = parse_algo_rule(pair)?;
algo_applies.push(apply);
},
Rule::const_rule => {
let mut src = pair.into_inner();
let name = src.next().unwrap().as_str();
@ -82,6 +87,7 @@ fn parsed_to_json(src: Pairs<'_>) -> Result<JsonValue> {
}
ret_map.insert("const_rules".to_string(), json!(const_rules));
ret_map.insert("q".to_string(), json!(rules));
ret_map.insert("algo_rules".to_string(), json!(algo_applies));
Ok(json!(ret_map))
}
@ -194,6 +200,53 @@ fn str2usize(src: &str) -> Result<usize> {
Ok(usize::from_str(&src.replace('_', ""))?)
}
fn parse_algo_rule(src: Pair<'_>) -> Result<JsonValue> {
let mut src = src.into_inner();
let out_symbol = src.next().unwrap().as_str();
let algo_name = &src.next().unwrap().as_str().strip_suffix('!').unwrap();
let mut algo_rels = vec![];
let mut algo_opts = Map::default();
for nxt in src {
match nxt.as_rule() {
Rule::algo_rel => {
let inner = nxt.into_inner().next().unwrap();
match inner.as_rule() {
Rule::ident => algo_rels.push(json!({"rule": inner.as_str()})),
Rule::view_ident => {
algo_rels.push(json!({"view": inner.as_str().strip_prefix(':').unwrap()}))
}
Rule::algo_triple_rel => {
let mut inner = inner.into_inner();
let mut backward = false;
let ident = inner.next().unwrap();
let ident = match ident.as_rule() {
Rule::rev_triple_marker => {
backward = true;
inner.next().unwrap()
}
_ => ident,
};
let ident = ident.as_str();
algo_rels.push(json!({"triple": ident, "backward": backward}));
}
_ => unreachable!()
}
}
Rule::algo_opt_pair => {
let mut inner = nxt.into_inner();
let name = inner.next().unwrap().as_str();
let val = inner.next().unwrap();
let val = build_expr(val)?;
algo_opts.insert(name.to_string(), val);
}
_ => unreachable!()
}
}
Ok(
json!({"algo_out": out_symbol, "algo_name": algo_name, "relations": algo_rels, "options": algo_opts}),
)
}
fn parse_rule(src: Pair<'_>) -> Result<JsonValue> {
let mut src = src.into_inner();
let head = src.next().unwrap();

@ -2,6 +2,7 @@ use std::collections::btree_map::Entry;
use std::collections::{BTreeMap, BTreeSet};
use anyhow::{anyhow, bail, ensure, Result};
use either::Either;
use itertools::Itertools;
use serde_json::{json, Map};
@ -11,8 +12,9 @@ use crate::data::expr::{get_op, Expr, OP_LIST};
use crate::data::id::{EntityId, Validity};
use crate::data::json::JsonValue;
use crate::data::program::{
InputAtom, InputAttrTripleAtom, InputProgram, InputRule, InputRuleApplyAtom, InputTerm,
InputViewApplyAtom, MagicSymbol, Unification,
AlgoApply, AlgoRuleArg, InputAtom, InputAttrTripleAtom, InputProgram, InputRule,
InputRuleApplyAtom, InputTerm, InputViewApplyAtom, MagicSymbol, RulesOrAlgo, TripleDir,
Unification,
};
use crate::data::symb::{Symbol, PROG_ENTRY};
use crate::data::tuple::Tuple;
@ -98,19 +100,13 @@ impl SessionTx {
.as_array()
.ok_or_else(|| anyhow!("expect field 'q' to be an array in query {}", payload))?;
ensure!(!rules_payload.is_empty(), "no rules in {}", payload);
let input_prog = if rules_payload.first().unwrap().is_array() {
let mut input_prog = if rules_payload.first().unwrap().is_array() {
let q = json!([{"rule": "?", "args": rules_payload}]);
self.parse_input_rule_sets(&q, vld, &params_pool)?
} else {
self.parse_input_rule_sets(q, vld, &params_pool)?
};
let entry_bindings = &input_prog
.prog
.get(&PROG_ENTRY)
.ok_or_else(|| anyhow!("program has no entry point"))?
.first()
.unwrap()
.head;
let entry_bindings = input_prog.get_entry_head()?;
let out_spec = payload
.get("out")
.map(|spec| self.parse_query_out_spec(spec, entry_bindings));
@ -125,6 +121,74 @@ impl SessionTx {
.map(|v| v as usize)
.ok_or_else(|| anyhow!("'offset' must be a positive number"))
}))?;
if let Some(algo_rules) = payload.get("algo_rules") {
for algo_rule in algo_rules.as_array().ok_or_else(|| anyhow!("'algo_rules' must be an array"))? {
let out_symbol = algo_rule
.get("algo_out")
.ok_or_else(|| anyhow!("algo rule requires field 'algo_out': {}", algo_rule))?
.as_str()
.ok_or_else(|| anyhow!("'algo_out' mut be a string: {}", algo_rule))?;
let name_symbol = algo_rule
.get("algo_name")
.ok_or_else(|| anyhow!("algo rule requires field 'algo_name': {}", algo_rule))?
.as_str()
.ok_or_else(|| anyhow!("'algo_name' mut be a string: {}", algo_rule))?;
let mut relations = vec![];
let mut options = BTreeMap::default();
for rel_def in algo_rule
.get("relations")
.ok_or_else(|| anyhow!("'relations' field required in algo rule"))?
.as_array()
.ok_or_else(|| anyhow!("'relations' field must be an array"))?
{
if let Some(rule_name) = rel_def.get("rule") {
let rule_name = rule_name
.as_str()
.ok_or_else(|| anyhow!("'rule' must be a string, got {}", rule_name))?;
relations.push(AlgoRuleArg::InMem(Symbol::from(rule_name)));
} else if let Some(view_name) = rel_def.get("view") {
let view_name = view_name
.as_str()
.ok_or_else(|| anyhow!("'view' must be a string, got {}", view_name))?;
relations.push(AlgoRuleArg::Stored(Symbol::from(view_name)));
} else if let Some(triple_name) = rel_def.get("triple") {
let triple_name = triple_name
.as_str()
.ok_or_else(|| anyhow!("'triple' must be a string, got {}", triple_name))?;
let dir = match rel_def.get("backward") {
None => TripleDir::Fwd,
Some(JsonValue::Bool(true)) => TripleDir::Bwd,
Some(JsonValue::Bool(false)) => TripleDir::Fwd,
d => bail!("'backward' must be a boolean, got {}", d.unwrap()),
};
relations.push(AlgoRuleArg::Triple(Symbol::from(triple_name), dir));
}
}
if let Some(opts) = algo_rule.get("options") {
let opts = opts
.as_object()
.ok_or_else(|| anyhow!("'options' is required to be a map, got {}", opts))?;
for (k, v) in opts.iter() {
let expr = Self::parse_expr_arg(v, params_pool)?;
options.insert(Symbol::from(k as &str), expr);
}
}
let out_name = Symbol::from(out_symbol);
out_name.validate_not_reserved()?;
match input_prog.prog.entry(out_name) {
Entry::Vacant(v) => {
v.insert(RulesOrAlgo::Algo(AlgoApply {
algo_name: Symbol::from(name_symbol),
rule_args: relations,
options,
}));
}
Entry::Occupied(_) => {
bail!("algo rule application conflict: {}", out_symbol)
}
};
}
}
let const_rules = if let Some(rules) = payload.get("const_rules") {
rules
.as_object()
@ -187,15 +251,8 @@ impl SessionTx {
})
.try_collect()?;
if !sorters.is_empty() {
let entry = input_prog
.prog
.get(&PROG_ENTRY)
.ok_or_else(|| anyhow!("program entry point not found"))?;
ensure!(
entry.iter().map(|e| &e.head).all_equal(),
"program entry point must have equal bindings"
);
let entry_head = &entry[0].head;
input_prog.validate_entry()?;
let entry_head = input_prog.get_entry_head()?;
if sorters
.iter()
.map(|(k, _v)| k)
@ -235,19 +292,18 @@ impl SessionTx {
} else {
bail!("cannot parse view options: {}", view_payload);
};
let name = name.as_str().ok_or_else(|| anyhow!("view name must be a string"))?;
let name = name
.as_str()
.ok_or_else(|| anyhow!("view name must be a string"))?;
let name = Symbol::from(name);
ensure!(!name.is_reserved(), "view name {} is reserved", name);
let entry = input_prog
.prog
.get(&PROG_ENTRY)
.ok_or_else(|| anyhow!("program entry point not found"))?;
let entry_arity = input_prog.get_entry_arity()?;
(
ViewRelMetadata {
name,
id: ViewRelId::SYSTEM,
arity: entry[0].head.len(),
arity: entry_arity,
kind: ViewRelKind::Manual,
},
op,
@ -353,9 +409,9 @@ impl SessionTx {
}
}
}
let ret: BTreeMap<Symbol, Vec<InputRule>> = collected
let ret: BTreeMap<Symbol, RulesOrAlgo> = collected
.into_iter()
.map(|(name, rules)| -> Result<(Symbol, Vec<InputRule>)> {
.map(|(name, rules)| -> Result<(Symbol, RulesOrAlgo)> {
let mut arities = rules.iter().map(|r| r.head.len());
let arity = arities.next().unwrap();
for other in arities {
@ -363,19 +419,24 @@ impl SessionTx {
bail!("arity mismatch for rules under the name of {}", name);
}
}
Ok((name, rules))
Ok((name, RulesOrAlgo::Rules(rules)))
})
.try_collect()?;
match ret.get(&PROG_ENTRY as &Symbol) {
None => bail!("no entry defined for datalog program"),
Some(ruleset) => {
if !ruleset.iter().map(|r| &r.head).all_equal() {
bail!("all heads for the entry query must be identical");
} else {
Ok(InputProgram { prog: ret })
Some(ruleset) => match ruleset {
RulesOrAlgo::Rules(ruleset) => {
if !ruleset.iter().map(|r| &r.head).all_equal() {
bail!("all heads for the entry query must be identical");
} else {
Ok(InputProgram { prog: ret })
}
}
}
RulesOrAlgo::Algo(_) => {
todo!()
}
},
}
}
fn parse_input_predicate_atom(

@ -17,7 +17,7 @@ use crate::data::encode::{
};
use crate::data::id::{AttrId, EntityId, TxId, Validity};
use crate::data::json::JsonValue;
use crate::data::symb::{Symbol, PROG_ENTRY};
use crate::data::symb::Symbol;
use crate::data::triple::StoreOp;
use crate::data::tuple::{rusty_scratch_cmp, Tuple, SCRATCH_DB_KEY_PREFIX_LEN};
use crate::data::value::{DataValue, LARGEST_UTF_CHAR};
@ -322,7 +322,7 @@ impl Db {
)
}
};
let entry_head = &input_program.prog.get(&PROG_ENTRY).unwrap()[0].head.clone();
let entry_head = input_program.get_entry_head()?.to_vec();
let program = input_program
.to_normalized_program()?
.stratify()?
@ -339,7 +339,7 @@ impl Db {
},
)?;
if !out_opts.sorters.is_empty() {
let sorted_result = tx.sort_and_collect(result, &out_opts.sorters, entry_head)?;
let sorted_result = tx.sort_and_collect(result, &out_opts.sorters, &entry_head)?;
let sorted_iter = if let Some(offset) = out_opts.offset {
Left(sorted_result.scan_sorted().skip(offset))
} else {

Loading…
Cancel
Save