algo application and degree centrality

main
Ziyang Hu 2 years ago
parent 97874e6ad5
commit 9e94b29f8b

@ -12,7 +12,7 @@
* [ ] single-source shortest path
* [ ] minimum spanning tree
* [ ] random walking
* [ ] degree centrality
* [x] degree centrality
* [ ] closeness centrality
* [ ] betweenness centrality
* [ ] pagerank

@ -0,0 +1,67 @@
use std::collections::BTreeMap;
use anyhow::ensure;
use crate::algo::AlgoImpl;
use crate::data::expr::Expr;
use crate::data::program::{MagicAlgoRuleArg, MagicSymbol};
use crate::data::symb::Symbol;
use crate::data::tuple::Tuple;
use crate::data::value::DataValue;
use crate::runtime::derived::DerivedRelStore;
use crate::runtime::transact::SessionTx;
pub(crate) struct DegreeCentrality;
impl AlgoImpl for DegreeCentrality {
fn name(&self) -> Symbol {
Symbol::from("degree_centrality")
}
fn arity(&self) -> usize {
4
}
fn run(
&self,
tx: &mut SessionTx,
rels: &[MagicAlgoRuleArg],
_opts: &BTreeMap<Symbol, Expr>,
stores: &BTreeMap<MagicSymbol, DerivedRelStore>,
out: &DerivedRelStore,
) -> anyhow::Result<()> {
ensure!(
rels.len() == 1,
"'degree_centrality' requires a single input relation, got {}",
rels.len()
);
let it = rels[0].iter(tx, stores)?;
let mut counter: BTreeMap<DataValue, (usize, usize, usize)> = BTreeMap::new();
for tuple in it {
let tuple = tuple?;
ensure!(
tuple.0.len() >= 2,
"'degree_centrality' requires input relation to be a tuple of two elements"
);
let from = tuple.0[0].clone();
let (from_total, from_out, _) = counter.entry(from).or_default();
*from_total += 1;
*from_out += 1;
let to = tuple.0[1].clone();
let (to_total, _, to_in) = counter.entry(to).or_default();
*to_total += 1;
*to_in += 1;
}
for (k, (total_d, out_d, in_d)) in counter.into_iter() {
let tuple = Tuple(vec![
k,
DataValue::from(total_d as i64),
DataValue::from(out_d as i64),
DataValue::from(in_d as i64),
]);
out.put(tuple, 0);
}
Ok(())
}
}

@ -1,14 +1,17 @@
use std::collections::BTreeMap;
use std::sync::Arc;
use anyhow::{bail, Result};
use anyhow::{anyhow, bail, Result};
use crate::algo::degree_centrality::DegreeCentrality;
use crate::data::expr::Expr;
use crate::data::program::MagicAlgoRuleArg;
use crate::data::program::{MagicAlgoRuleArg, MagicSymbol};
use crate::data::symb::Symbol;
use crate::data::tuple::TupleIter;
use crate::runtime::derived::DerivedRelStore;
use crate::runtime::transact::SessionTx;
mod degree_centrality;
pub(crate) mod page_rank;
pub(crate) trait AlgoImpl {
@ -17,14 +20,41 @@ pub(crate) trait AlgoImpl {
fn run(
&self,
tx: &mut SessionTx,
rels: Vec<MagicAlgoRuleArg>,
rels: &[MagicAlgoRuleArg],
opts: &BTreeMap<Symbol, Expr>,
) -> DerivedRelStore;
stores: &BTreeMap<MagicSymbol, DerivedRelStore>,
out: &DerivedRelStore,
) -> Result<()>;
}
pub(crate) fn get_algo(name: &str) -> Result<Arc<dyn AlgoImpl>> {
match name {
Ok(match name {
"degree_centrality" => Arc::new(DegreeCentrality),
"page_rank" => todo!(),
name => bail!("algorithm '{}' not found", name),
})
}
impl MagicAlgoRuleArg {
pub(crate) fn iter<'a>(
&'a self,
tx: &'a SessionTx,
stores: &'a BTreeMap<MagicSymbol, DerivedRelStore>,
) -> Result<TupleIter<'a>> {
Ok(match self {
MagicAlgoRuleArg::InMem(s) => {
let store = stores
.get(s)
.ok_or_else(|| anyhow!("rule not found: {:?}", s))?;
Box::new(store.scan_all())
}
MagicAlgoRuleArg::Stored(s) => {
let view_rel = tx.get_view_rel(s)?;
Box::new(view_rel.scan_all()?)
}
MagicAlgoRuleArg::Triple(_attr, _backward) => {
todo!()
}
})
}
}

@ -77,14 +77,14 @@ pub(crate) enum TripleDir {
pub(crate) enum AlgoRuleArg {
InMem(Symbol),
Stored(Symbol),
Triple(Symbol, TripleDir),
Triple(Attribute, TripleDir),
}
#[derive(Debug, Clone)]
pub(crate) enum MagicAlgoRuleArg {
InMem(MagicSymbol),
Stored(Symbol),
Triple(Symbol, TripleDir),
Triple(Attribute, TripleDir),
}
#[derive(Debug, Clone)]

@ -158,16 +158,17 @@ impl SessionTx {
.ok_or_else(|| anyhow!("'view' must be a string, got {}", view_name))?;
relations.push(AlgoRuleArg::Stored(Symbol::from(view_name)));
} else if let Some(triple_name) = rel_def.get("triple") {
let triple_name = triple_name.as_str().ok_or_else(|| {
anyhow!("'triple' must be a string, got {}", triple_name)
})?;
let attr = self.parse_triple_atom_attr(triple_name)?;
// let triple_name = triple_name.as_str().ok_or_else(|| {
// anyhow!("'triple' must be a string, got {}", triple_name)
// })?;
let dir = match rel_def.get("backward") {
None => TripleDir::Fwd,
Some(JsonValue::Bool(true)) => TripleDir::Bwd,
Some(JsonValue::Bool(false)) => TripleDir::Fwd,
d => bail!("'backward' must be a boolean, got {}", d.unwrap()),
};
relations.push(AlgoRuleArg::Triple(Symbol::from(triple_name), dir));
relations.push(AlgoRuleArg::Triple(attr, dir));
}
}
if let Some(opts) = algo_rule.get("options") {
@ -439,9 +440,7 @@ impl SessionTx {
Ok(InputProgram { prog: ret })
}
}
InputRulesOrAlgo::Algo(_) => {
Ok(InputProgram { prog: ret })
}
InputRulesOrAlgo::Algo(_) => Ok(InputProgram { prog: ret }),
},
}
}

@ -91,7 +91,7 @@ impl SessionTx {
)?;
}
CompiledRuleSet::Algo(algo_apply) => {
self.algo_application_eval(k, algo_apply, stores, &mut limiter)?;
self.algo_application_eval(k, algo_apply, stores)?;
}
}
}
@ -136,9 +136,12 @@ impl SessionTx {
rule_symb: &MagicSymbol,
algo_apply: &MagicAlgoApply,
stores: &BTreeMap<MagicSymbol, DerivedRelStore>,
limiter: &mut QueryLimiter,
) -> Result<()> {
todo!()
let algo_impl = &algo_apply.algo;
let out = stores
.get(rule_symb)
.ok_or_else(|| anyhow!("cannot find algo store {:?}", rule_symb))?;
algo_impl.run(self, &algo_apply.rule_args, &algo_apply.options, stores, out)
}
fn initial_rule_eval(
&mut self,

@ -79,6 +79,50 @@ fn air_routes() -> Result<()> {
println!("views: {}", db.list_views()?);
let deg_centrality_time = Instant::now();
let res = db.run_script(
r#"
deg_centrality <- degree_centrality!(:flies_to);
?[?total, ?out, ?in] := deg_centrality[?node, ?total, ?out, ?in];
:order -?total;
:limit 10;
"#,
)?;
dbg!(deg_centrality_time.elapsed());
assert_eq!(
res,
serde_json::Value::from_str(
r#"[
[614,307,307],[587,293,294],[566,282,284],[541,270,271],[527,264,263],[502,251,251],
[497,248,249],[494,247,247],[484,242,242],[465,232,233]]"#
)?
);
let deg_centrality_ad_hoc_time = Instant::now();
let res = db.run_script(
r#"
flies_to[?a, ?b] := [?r route.src ?ac], [?r route.dst ?bc],
[?ac airport.iata ?a], [?bc airport.iata ?b];
deg_centrality <- degree_centrality!(flies_to);
?[?node, ?total, ?out, ?in] := deg_centrality[?node, ?total, ?out, ?in];
:order -?total;
:limit 10;
"#,
)?;
dbg!(deg_centrality_ad_hoc_time.elapsed());
assert_eq!(
res,
serde_json::Value::from_str(
r#"[
["FRA",614,307,307],["IST",614,307,307],["CDG",587,293,294],["AMS",566,282,284],
["MUC",541,270,271],["ORD",527,264,263],["DFW",502,251,251],["PEK",497,248,249],
["DXB",494,247,247],["ATL",484,242,242]
]"#
)?
);
let starts_with_time = Instant::now();
let res = db.run_script(
r#"

Loading…
Cancel
Save