AlgoImpl now has a much cleaner API

main
Ziyang Hu 2 years ago
parent 5bc0219b07
commit 6beec31ada

@ -18,32 +18,28 @@ use rayon::prelude::*;
use smartstring::{LazyCompact, SmartString};
use crate::algo::shortest_path_dijkstra::dijkstra_keep_ties;
use crate::algo::AlgoImpl;
use crate::algo::{AlgoImpl, AlgoPayload};
use crate::data::expr::Expr;
use crate::data::program::{MagicAlgoApply, MagicSymbol};
use crate::data::symb::Symbol;
use crate::data::value::DataValue;
use crate::parse::SourceSpan;
use crate::runtime::db::Poison;
use crate::runtime::temp_store::{EpochStore, RegularTempStore};
use crate::runtime::transact::SessionTx;
use crate::runtime::temp_store::RegularTempStore;
pub(crate) struct BetweennessCentrality;
impl AlgoImpl for BetweennessCentrality {
fn run<'a>(
fn run(
&mut self,
tx: &'a SessionTx<'_>,
algo: &'a MagicAlgoApply,
stores: &'a BTreeMap<MagicSymbol, EpochStore>,
out: &'a mut RegularTempStore,
payload: AlgoPayload<'_, '_>,
out: &mut RegularTempStore,
poison: Poison,
) -> Result<()> {
let edges = algo.relation(0)?;
let undirected = algo.bool_option("undirected", Some(false))?;
let edges = payload.get_input(0)?;
let undirected = payload.bool_option("undirected", Some(false))?;
let (graph, indices, _inv_indices, _) =
edges.convert_edge_to_weighted_graph(undirected, false, tx, stores)?;
edges.convert_edge_to_weighted_graph(undirected, false)?;
let n = graph.len();
if n == 0 {
@ -105,19 +101,17 @@ impl AlgoImpl for BetweennessCentrality {
pub(crate) struct ClosenessCentrality;
impl AlgoImpl for ClosenessCentrality {
fn run<'a>(
fn run(
&mut self,
tx: &'a SessionTx<'_>,
algo: &'a MagicAlgoApply,
stores: &'a BTreeMap<MagicSymbol, EpochStore>,
out: &'a mut RegularTempStore,
payload: AlgoPayload<'_, '_>,
out: &mut RegularTempStore,
poison: Poison,
) -> Result<()> {
let edges = algo.relation(0)?;
let undirected = algo.bool_option("undirected", Some(false))?;
let edges = payload.get_input(0)?;
let undirected = payload.bool_option("undirected", Some(false))?;
let (graph, indices, _inv_indices, _) =
edges.convert_edge_to_weighted_graph(undirected, false, tx, stores)?;
edges.convert_edge_to_weighted_graph(undirected, false)?;
let n = graph.len();
if n == 0 {

@ -14,52 +14,39 @@ use ordered_float::OrderedFloat;
use priority_queue::PriorityQueue;
use smartstring::{LazyCompact, SmartString};
use crate::algo::{AlgoImpl, BadExprValueError, NodeNotFoundError};
use crate::algo::{AlgoImpl, AlgoInputRelation, AlgoPayload, BadExprValueError, NodeNotFoundError};
use crate::data::expr::Expr;
use crate::data::program::{MagicAlgoApply, MagicAlgoRuleArg, MagicSymbol};
use crate::data::symb::Symbol;
use crate::data::tuple::Tuple;
use crate::data::value::DataValue;
use crate::parse::SourceSpan;
use crate::runtime::db::Poison;
use crate::runtime::temp_store::{EpochStore, RegularTempStore};
use crate::runtime::transact::SessionTx;
use crate::runtime::temp_store::RegularTempStore;
pub(crate) struct ShortestPathAStar;
impl AlgoImpl for ShortestPathAStar {
fn run<'a>(
fn run(
&mut self,
tx: &'a SessionTx<'_>,
algo: &'a MagicAlgoApply,
stores: &'a BTreeMap<MagicSymbol, EpochStore>,
out: &'a mut RegularTempStore,
payload: AlgoPayload<'_, '_>,
out: &mut RegularTempStore,
poison: Poison,
) -> Result<()> {
let edges = algo.relation_with_min_len(0, 3, tx, stores)?;
let nodes = algo.relation(1)?;
let starting = algo.relation(2)?;
let goals = algo.relation(3)?;
let mut heuristic = algo.expr_option("heuristic", None)?;
let edges = payload.get_input(0)?.ensure_min_len(3)?;
let nodes = payload.get_input(1)?;
let starting = payload.get_input(2)?;
let goals = payload.get_input(3)?;
let mut heuristic = payload.expr_option("heuristic", None)?;
let mut binding_map = nodes.get_binding_map(0);
let goal_binding_map = goals.get_binding_map(nodes.arity(tx, stores)?);
let goal_binding_map = goals.get_binding_map(nodes.arity()?);
binding_map.extend(goal_binding_map);
heuristic.fill_binding_indices(&binding_map)?;
for start in starting.iter(tx, stores)? {
for start in starting.iter()? {
let start = start?;
for goal in goals.iter(tx, stores)? {
for goal in goals.iter()? {
let goal = goal?;
let (cost, path) = astar(
&start,
&goal,
edges,
nodes,
&heuristic,
tx,
stores,
poison.clone(),
)?;
let (cost, path) = astar(&start, &goal, edges, nodes, &heuristic, poison.clone())?;
out.put(vec![
start[0].clone(),
goal[0].clone(),
@ -82,14 +69,12 @@ impl AlgoImpl for ShortestPathAStar {
}
}
fn astar<'a>(
fn astar(
starting: &Tuple,
goal: &Tuple,
edges: &'a MagicAlgoRuleArg,
nodes: &'a MagicAlgoRuleArg,
edges: AlgoInputRelation<'_, '_>,
nodes: AlgoInputRelation<'_, '_>,
heuristic: &Expr,
tx: &'a SessionTx<'_>,
stores: &'a BTreeMap<MagicSymbol, EpochStore>,
poison: Poison,
) -> Result<(f64, Vec<DataValue>)> {
let start_node = &starting[0];
@ -136,7 +121,7 @@ fn astar<'a>(
return Ok((cost, ret));
}
for edge in edges.prefix_iter(&node, tx, stores)? {
for edge in edges.prefix_iter(&node)? {
let edge = edge?;
let edge_dst = &edge[1];
let edge_cost = edge[2].get_float().ok_or_else(|| {
@ -162,13 +147,14 @@ fn astar<'a>(
back_trace.insert(edge_dst.clone(), node.clone());
g_score.insert(edge_dst.clone(), tentative_cost_to_dst);
let edge_dst_tuple = nodes
.prefix_iter(edge_dst, tx, stores)?
.next()
.ok_or_else(|| NodeNotFoundError {
missing: edge_dst.clone(),
span: nodes.span(),
})??;
let edge_dst_tuple =
nodes
.prefix_iter(edge_dst)?
.next()
.ok_or_else(|| NodeNotFoundError {
missing: edge_dst.clone(),
span: nodes.span(),
})??;
let heuristic_cost = eval_heuristic(&edge_dst_tuple)?;
sub_priority += 1;

@ -8,35 +8,31 @@
use std::collections::{BTreeMap, BTreeSet, VecDeque};
use miette::{Result};
use miette::Result;
use smartstring::{LazyCompact, SmartString};
use crate::algo::{AlgoImpl, NodeNotFoundError};
use crate::algo::{AlgoImpl, AlgoPayload, NodeNotFoundError};
use crate::data::expr::Expr;
use crate::data::program::{MagicAlgoApply, MagicSymbol};
use crate::data::symb::Symbol;
use crate::data::value::DataValue;
use crate::parse::SourceSpan;
use crate::runtime::db::Poison;
use crate::runtime::temp_store::{EpochStore, RegularTempStore};
use crate::runtime::transact::SessionTx;
use crate::runtime::temp_store::RegularTempStore;
pub(crate) struct Bfs;
impl AlgoImpl for Bfs {
fn run<'a>(
fn run(
&mut self,
tx: &'a SessionTx<'_>,
algo: &'a MagicAlgoApply,
stores: &'a BTreeMap<MagicSymbol, EpochStore>,
out: &'a mut RegularTempStore,
payload: AlgoPayload<'_, '_>,
out: &mut RegularTempStore,
poison: Poison,
) -> Result<()> {
let edges = algo.relation_with_min_len(0, 2, tx, stores)?;
let nodes = algo.relation(1)?;
let starting_nodes = algo.relation(2).unwrap_or(nodes);
let limit = algo.pos_integer_option("limit", Some(1))?;
let mut condition = algo.expr_option("condition", None)?;
let edges = payload.get_input(0)?.ensure_min_len(2)?;
let nodes = payload.get_input(1)?;
let starting_nodes = payload.get_input(2).unwrap_or(nodes);
let limit = payload.pos_integer_option("limit", Some(1))?;
let mut condition = payload.expr_option("condition", None)?;
let binding_map = nodes.get_binding_map(0);
condition.fill_binding_indices(&binding_map)?;
let binding_indices = condition.binding_indices();
@ -46,7 +42,7 @@ impl AlgoImpl for Bfs {
let mut backtrace: BTreeMap<DataValue, DataValue> = Default::default();
let mut found: Vec<(DataValue, DataValue)> = vec![];
'outer: for node_tuple in starting_nodes.iter(tx, stores)? {
'outer: for node_tuple in starting_nodes.iter()? {
let node_tuple = node_tuple?;
let starting_node = &node_tuple[0];
if visited.contains(starting_node) {
@ -58,7 +54,7 @@ impl AlgoImpl for Bfs {
queue.push_front(starting_node.clone());
while let Some(candidate) = queue.pop_back() {
for edge in edges.prefix_iter(&candidate, tx, stores)? {
for edge in edges.prefix_iter(&candidate)? {
let edge = edge?;
let to_node = &edge[1];
if visited.contains(to_node) {
@ -72,7 +68,7 @@ impl AlgoImpl for Bfs {
vec![to_node.clone()]
} else {
nodes
.prefix_iter(to_node, tx, stores)?
.prefix_iter(to_node)?
.next()
.ok_or_else(|| NodeNotFoundError {
missing: candidate.clone(),

@ -12,28 +12,25 @@ use miette::{bail, ensure, Diagnostic, Result};
use smartstring::{LazyCompact, SmartString};
use thiserror::Error;
use crate::algo::AlgoImpl;
use crate::algo::{AlgoImpl, AlgoPayload};
use crate::data::expr::Expr;
use crate::data::program::{MagicAlgoApply, MagicSymbol, WrongAlgoOptionError};
use crate::data::program::WrongAlgoOptionError;
use crate::data::symb::Symbol;
use crate::data::value::DataValue;
use crate::parse::SourceSpan;
use crate::runtime::db::Poison;
use crate::runtime::temp_store::{EpochStore, RegularTempStore};
use crate::runtime::transact::SessionTx;
use crate::runtime::temp_store::RegularTempStore;
pub(crate) struct Constant;
impl AlgoImpl for Constant {
fn run(
&mut self,
_tx: &SessionTx<'_>,
algo: &MagicAlgoApply,
_stores: &BTreeMap<MagicSymbol, EpochStore>,
payload: AlgoPayload<'_, '_>,
out: &mut RegularTempStore,
_poison: Poison,
) -> Result<()> {
let data = algo.expr_option("data", None).unwrap();
let data = payload.expr_option("data", None).unwrap();
let data = data.get_const().unwrap().get_list().unwrap();
for row in data {
let tuple = row.get_list().unwrap().into();
@ -74,7 +71,7 @@ impl AlgoImpl for Constant {
})
}
fn process_options(
fn init_options(
&self,
options: &mut BTreeMap<SmartString<LazyCompact>, Expr>,
span: SourceSpan,

@ -12,48 +12,43 @@ use csv::StringRecord;
use miette::{bail, ensure, IntoDiagnostic, Result};
use smartstring::{LazyCompact, SmartString};
use crate::algo::{AlgoImpl, CannotDetermineArity};
#[cfg(feature = "requests")]
use crate::algo::jlines::get_file_content_from_url;
use crate::algo::{AlgoImpl, AlgoPayload, CannotDetermineArity};
use crate::data::expr::Expr;
use crate::data::functions::{op_to_float, op_to_uuid};
use crate::data::program::{
AlgoOptionNotFoundError, MagicAlgoApply, MagicSymbol, WrongAlgoOptionError,
};
use crate::data::program::{AlgoOptionNotFoundError, WrongAlgoOptionError};
use crate::data::relation::{ColType, NullableColType};
use crate::data::symb::Symbol;
use crate::data::value::DataValue;
use crate::parse::{parse_type, SourceSpan};
use crate::runtime::db::Poison;
use crate::runtime::temp_store::{EpochStore, RegularTempStore};
use crate::runtime::transact::SessionTx;
use crate::runtime::temp_store::RegularTempStore;
pub(crate) struct CsvReader;
impl AlgoImpl for CsvReader {
fn run(
&mut self,
_tx: &SessionTx<'_>,
algo: &MagicAlgoApply,
_stores: &BTreeMap<MagicSymbol, EpochStore>,
payload: AlgoPayload<'_, '_>,
out: &mut RegularTempStore,
_poison: Poison,
) -> Result<()> {
let delimiter = algo.string_option("delimiter", Some(","))?;
let delimiter = payload.string_option("delimiter", Some(","))?;
let delimiter = delimiter.as_bytes();
ensure!(
delimiter.len() == 1,
WrongAlgoOptionError {
name: "delimiter".to_string(),
span: algo.span,
span: payload.span(),
algo_name: "CsvReader".to_string(),
help: "'delimiter' must be a single-byte string".to_string()
}
);
let delimiter = delimiter[0];
let prepend_index = algo.bool_option("prepend_index", Some(false))?;
let has_headers = algo.bool_option("has_headers", Some(true))?;
let types_opts = algo.expr_option("types", None)?.eval_to_const()?;
let prepend_index = payload.bool_option("prepend_index", Some(false))?;
let has_headers = payload.bool_option("has_headers", Some(true))?;
let types_opts = payload.expr_option("types", None)?.eval_to_const()?;
let typing = NullableColType {
coltype: ColType::List {
eltype: Box::new(NullableColType {
@ -70,7 +65,7 @@ impl AlgoImpl for CsvReader {
let type_str = type_str.get_string().unwrap();
let typ = parse_type(type_str).map_err(|e| WrongAlgoOptionError {
name: "types".to_string(),
span: algo.span,
span: payload.span(),
algo_name: "CsvReader".to_string(),
help: e.to_string(),
})?;
@ -152,7 +147,7 @@ impl AlgoImpl for CsvReader {
Ok(())
};
let url = algo.string_option("url", None)?;
let url = payload.string_option("url", None)?;
match url.strip_prefix("file://") {
Some(file_path) => {
let mut rdr = rdr_builder.from_path(file_path).into_diagnostic()?;

@ -11,30 +11,24 @@ use std::collections::BTreeMap;
use miette::Result;
use smartstring::{LazyCompact, SmartString};
use crate::algo::AlgoImpl;
use crate::algo::{AlgoImpl, AlgoPayload};
use crate::data::expr::Expr;
use crate::data::program::{MagicAlgoApply, MagicSymbol};
use crate::data::symb::Symbol;
use crate::data::value::DataValue;
use crate::parse::SourceSpan;
use crate::runtime::db::Poison;
use crate::runtime::temp_store::{EpochStore, RegularTempStore};
use crate::runtime::transact::SessionTx;
use crate::runtime::temp_store::RegularTempStore;
pub(crate) struct DegreeCentrality;
impl AlgoImpl for DegreeCentrality {
fn run<'a>(
fn run(
&mut self,
tx: &'a SessionTx<'_>,
algo: &'a MagicAlgoApply,
stores: &'a BTreeMap<MagicSymbol, EpochStore>,
out: &'a mut RegularTempStore,
payload: AlgoPayload<'_, '_>,
out: &mut RegularTempStore,
poison: Poison,
) -> Result<()> {
let it = algo
.relation_with_min_len(0, 2, tx, stores)?
.iter(tx, stores)?;
let it = payload.get_input(0)?.ensure_min_len(2)?.iter()?;
let mut counter: BTreeMap<DataValue, (usize, usize, usize)> = BTreeMap::new();
for tuple in it {
let tuple = tuple?;
@ -49,8 +43,8 @@ impl AlgoImpl for DegreeCentrality {
*to_in += 1;
poison.check()?;
}
if let Ok(nodes) = algo.relation(1) {
for tuple in nodes.iter(tx, stores)? {
if let Ok(nodes) = payload.get_input(1) {
for tuple in nodes.iter()? {
let tuple = tuple?;
let id = &tuple[0];
if !counter.contains_key(id) {

@ -11,32 +11,28 @@ use std::collections::{BTreeMap, BTreeSet};
use miette::Result;
use smartstring::{LazyCompact, SmartString};
use crate::algo::{AlgoImpl, NodeNotFoundError};
use crate::algo::{AlgoImpl, AlgoPayload, NodeNotFoundError};
use crate::data::expr::Expr;
use crate::data::program::{MagicAlgoApply, MagicSymbol};
use crate::data::symb::Symbol;
use crate::data::value::DataValue;
use crate::parse::SourceSpan;
use crate::runtime::db::Poison;
use crate::runtime::temp_store::{EpochStore, RegularTempStore};
use crate::runtime::transact::SessionTx;
use crate::runtime::temp_store::RegularTempStore;
pub(crate) struct Dfs;
impl AlgoImpl for Dfs {
fn run<'a>(
fn run(
&mut self,
tx: &'a SessionTx<'_>,
algo: &'a MagicAlgoApply,
stores: &'a BTreeMap<MagicSymbol, EpochStore>,
out: &'a mut RegularTempStore,
payload: AlgoPayload<'_, '_>,
out: &mut RegularTempStore,
poison: Poison,
) -> Result<()> {
let edges = algo.relation_with_min_len(0, 2, tx, stores)?;
let nodes = algo.relation(1)?;
let starting_nodes = algo.relation(2).unwrap_or(nodes);
let limit = algo.pos_integer_option("limit", Some(1))?;
let mut condition = algo.expr_option("condition", None)?;
let edges = payload.get_input(0)?.ensure_min_len(2)?;
let nodes = payload.get_input(1)?;
let starting_nodes = payload.get_input(2).unwrap_or(nodes);
let limit = payload.pos_integer_option("limit", Some(1))?;
let mut condition = payload.expr_option("condition", None)?;
let binding_map = nodes.get_binding_map(0);
condition.fill_binding_indices(&binding_map)?;
let binding_indices = condition.binding_indices();
@ -46,7 +42,7 @@ impl AlgoImpl for Dfs {
let mut backtrace: BTreeMap<DataValue, DataValue> = Default::default();
let mut found: Vec<(DataValue, DataValue)> = vec![];
'outer: for node_tuple in starting_nodes.iter(tx, stores)? {
'outer: for node_tuple in starting_nodes.iter()? {
let node_tuple = node_tuple?;
let starting_node = &node_tuple[0];
if visited.contains(starting_node) {
@ -65,7 +61,7 @@ impl AlgoImpl for Dfs {
vec![candidate.clone()]
} else {
nodes
.prefix_iter(&candidate, tx, stores)?
.prefix_iter(&candidate)?
.next()
.ok_or_else(|| NodeNotFoundError {
missing: candidate.clone(),
@ -82,7 +78,7 @@ impl AlgoImpl for Dfs {
visited.insert(candidate.clone());
for edge in edges.prefix_iter(&candidate, tx, stores)? {
for edge in edges.prefix_iter(&candidate)? {
let edge = edge?;
let to_node = &edge[1];
if visited.contains(to_node) {

@ -20,39 +20,35 @@ use minreq::Response;
use smartstring::{LazyCompact, SmartString};
use thiserror::Error;
use crate::algo::{AlgoImpl, CannotDetermineArity};
use crate::algo::{AlgoImpl, AlgoPayload, CannotDetermineArity};
use crate::data::expr::Expr;
use crate::data::json::JsonValue;
use crate::data::program::{MagicAlgoApply, MagicSymbol};
use crate::data::symb::Symbol;
use crate::data::value::DataValue;
use crate::parse::SourceSpan;
use crate::runtime::db::Poison;
use crate::runtime::temp_store::{EpochStore, RegularTempStore};
use crate::runtime::transact::SessionTx;
use crate::runtime::temp_store::RegularTempStore;
pub(crate) struct JsonReader;
impl AlgoImpl for JsonReader {
fn run(
&mut self,
_tx: &SessionTx<'_>,
algo: &MagicAlgoApply,
_stores: &BTreeMap<MagicSymbol, EpochStore>,
payload: AlgoPayload<'_, '_>,
out: &mut RegularTempStore,
_poison: Poison,
) -> Result<()> {
let url = algo.string_option("url", None)?;
let json_lines = algo.bool_option("json_lines", Some(true))?;
let null_if_absent = algo.bool_option("null_if_absent", Some(false))?;
let prepend_index = algo.bool_option("prepend_index", Some(false))?;
let url = payload.string_option("url", None)?;
let json_lines = payload.bool_option("json_lines", Some(true))?;
let null_if_absent = payload.bool_option("null_if_absent", Some(false))?;
let prepend_index = payload.bool_option("prepend_index", Some(false))?;
#[derive(Error, Diagnostic, Debug)]
#[error("fields specification must be a list of strings")]
#[diagnostic(code(eval::algo_bad_fields))]
struct BadFields(#[label] SourceSpan);
let fields_expr = algo.expr_option("fields", None)?;
let fields_expr = payload.expr_option("fields", None)?;
let fields_span = fields_expr.span();
let fields: Vec<_> = match fields_expr.eval_to_const()? {
DataValue::List(l) => l
@ -96,13 +92,15 @@ impl AlgoImpl for JsonReader {
let line = line.into_diagnostic()?;
let line = line.trim();
if !line.is_empty() {
let row: BTreeMap<String, JsonValue> = serde_json::from_str(line).into_diagnostic()?;
let row: BTreeMap<String, JsonValue> =
serde_json::from_str(line).into_diagnostic()?;
process_row(&row)?;
}
}
} else {
let content = fs::read_to_string(file_path).into_diagnostic()?;
let rows: Vec<BTreeMap<String, JsonValue>> = serde_json::from_str(&content).into_diagnostic()?;
let rows: Vec<BTreeMap<String, JsonValue>> =
serde_json::from_str(&content).into_diagnostic()?;
for row in &rows {
process_row(row)?;
}
@ -122,7 +120,8 @@ impl AlgoImpl for JsonReader {
}
}
} else {
let rows: Vec<BTreeMap<String, JsonValue>> = serde_json::from_str(content).into_diagnostic()?;
let rows: Vec<BTreeMap<String, JsonValue>> =
serde_json::from_str(content).into_diagnostic()?;
for row in &rows {
process_row(row)?;
}

@ -15,30 +15,25 @@ use ordered_float::OrderedFloat;
use priority_queue::PriorityQueue;
use smartstring::{LazyCompact, SmartString};
use crate::algo::AlgoImpl;
use crate::algo::{AlgoImpl, AlgoPayload};
use crate::data::expr::Expr;
use crate::data::program::{MagicAlgoApply, MagicSymbol};
use crate::data::symb::Symbol;
use crate::data::value::DataValue;
use crate::parse::SourceSpan;
use crate::runtime::db::Poison;
use crate::runtime::temp_store::{EpochStore, RegularTempStore};
use crate::runtime::transact::SessionTx;
use crate::runtime::temp_store::RegularTempStore;
pub(crate) struct MinimumSpanningForestKruskal;
impl AlgoImpl for MinimumSpanningForestKruskal {
fn run<'a>(
fn run(
&mut self,
tx: &'a SessionTx<'_>,
algo: &'a MagicAlgoApply,
stores: &'a BTreeMap<MagicSymbol, EpochStore>,
out: &'a mut RegularTempStore,
payload: AlgoPayload<'_, '_>,
out: &mut RegularTempStore,
poison: Poison,
) -> Result<()> {
let edges = algo.relation(0)?;
let (graph, indices, _, _) =
edges.convert_edge_to_weighted_graph(true, true, tx, stores)?;
let edges = payload.get_input(0)?;
let (graph, indices, _, _) = edges.convert_edge_to_weighted_graph(true, true)?;
if graph.is_empty() {
return Ok(());
}

@ -13,32 +13,28 @@ use miette::Result;
use rand::prelude::*;
use smartstring::{LazyCompact, SmartString};
use crate::algo::AlgoImpl;
use crate::algo::{AlgoImpl, AlgoPayload};
use crate::data::expr::Expr;
use crate::data::program::{MagicAlgoApply, MagicSymbol};
use crate::data::symb::Symbol;
use crate::data::value::DataValue;
use crate::parse::SourceSpan;
use crate::runtime::db::Poison;
use crate::runtime::temp_store::{EpochStore, RegularTempStore};
use crate::runtime::transact::SessionTx;
use crate::runtime::temp_store::RegularTempStore;
pub(crate) struct LabelPropagation;
impl AlgoImpl for LabelPropagation {
fn run<'a>(
fn run(
&mut self,
tx: &'a SessionTx<'_>,
algo: &'a MagicAlgoApply,
stores: &'a BTreeMap<MagicSymbol, EpochStore>,
out: &'a mut RegularTempStore,
payload: AlgoPayload<'_, '_>,
out: &mut RegularTempStore,
poison: Poison,
) -> Result<()> {
let edges = algo.relation(0)?;
let undirected = algo.bool_option("undirected", Some(false))?;
let max_iter = algo.pos_integer_option("max_iter", Some(10))?;
let edges = payload.get_input(0)?;
let undirected = payload.bool_option("undirected", Some(false))?;
let max_iter = payload.pos_integer_option("max_iter", Some(10))?;
let (graph, indices, _inv_indices, _) =
edges.convert_edge_to_weighted_graph(undirected, true, tx, stores)?;
edges.convert_edge_to_weighted_graph(undirected, true)?;
let labels = label_propagation(&graph, max_iter, poison)?;
for (idx, label) in labels.into_iter().enumerate() {
let node = indices[idx].clone();

@ -13,35 +13,31 @@ use log::debug;
use miette::Result;
use smartstring::{LazyCompact, SmartString};
use crate::algo::AlgoImpl;
use crate::algo::{AlgoImpl, AlgoPayload};
use crate::data::expr::Expr;
use crate::data::program::{MagicAlgoApply, MagicSymbol};
use crate::data::symb::Symbol;
use crate::data::value::DataValue;
use crate::parse::SourceSpan;
use crate::runtime::db::Poison;
use crate::runtime::temp_store::{EpochStore, RegularTempStore};
use crate::runtime::transact::SessionTx;
use crate::runtime::temp_store::RegularTempStore;
pub(crate) struct CommunityDetectionLouvain;
impl AlgoImpl for CommunityDetectionLouvain {
fn run<'a>(
fn run(
&mut self,
tx: &'a SessionTx<'_>,
algo: &'a MagicAlgoApply,
stores: &'a BTreeMap<MagicSymbol, EpochStore>,
out: &'a mut RegularTempStore,
payload: AlgoPayload<'_, '_>,
out: &mut RegularTempStore,
poison: Poison,
) -> Result<()> {
let edges = algo.relation(0)?;
let undirected = algo.bool_option("undirected", Some(false))?;
let max_iter = algo.pos_integer_option("max_iter", Some(10))?;
let delta = algo.unit_interval_option("delta", Some(0.0001))?;
let keep_depth = algo.non_neg_integer_option("keep_depth", None).ok();
let edges = payload.get_input(0)?;
let undirected = payload.bool_option("undirected", Some(false))?;
let max_iter = payload.pos_integer_option("max_iter", Some(10))?;
let delta = payload.unit_interval_option("delta", Some(0.0001))?;
let keep_depth = payload.non_neg_integer_option("keep_depth", None).ok();
let (graph, indices, _inv_indices, _) =
edges.convert_edge_to_weighted_graph(undirected, false, tx, stores)?;
edges.convert_edge_to_weighted_graph(undirected, false)?;
let graph = graph
.into_iter()
.map(|edges| -> BTreeMap<usize, f64> {

@ -49,7 +49,9 @@ use crate::algo::triangles::ClusteringCoefficients;
#[cfg(feature = "graph-algo")]
use crate::algo::yen::KShortestPathYen;
use crate::data::expr::Expr;
use crate::data::program::{MagicAlgoApply, MagicAlgoRuleArg, MagicSymbol};
use crate::data::program::{
AlgoOptionNotFoundError, MagicAlgoApply, MagicAlgoRuleArg, MagicSymbol, WrongAlgoOptionError,
};
use crate::data::symb::Symbol;
use crate::data::tuple::TupleIter;
use crate::data::value::DataValue;
@ -94,34 +96,307 @@ pub(crate) mod triangles;
#[cfg(feature = "graph-algo")]
pub(crate) mod yen;
pub struct AlgoPayload<'a> {
manifest: MagicAlgoApply,
params: Vec<&'a EpochStore>,
tx: &'a SessionTx<'a>
pub struct AlgoPayload<'a, 'b> {
pub(crate) manifest: &'a MagicAlgoApply,
pub(crate) stores: &'a BTreeMap<MagicSymbol, EpochStore>,
pub(crate) tx: &'a SessionTx<'b>,
}
pub(crate) trait AlgoImpl {
fn run<'a>(
&mut self,
tx: &'a SessionTx<'_>,
algo: &'a MagicAlgoApply,
stores: &'a BTreeMap<MagicSymbol, EpochStore>,
out: &'a mut RegularTempStore,
poison: Poison,
) -> Result<()>;
fn arity(
#[derive(Copy, Clone)]
pub struct AlgoInputRelation<'a, 'b> {
arg_manifest: &'a MagicAlgoRuleArg,
stores: &'a BTreeMap<MagicSymbol, EpochStore>,
tx: &'a SessionTx<'b>,
}
impl<'a, 'b> AlgoInputRelation<'a, 'b> {
pub fn arity(&self) -> Result<usize> {
self.arg_manifest.arity(self.tx, self.stores)
}
pub fn ensure_min_len(self, len: usize) -> Result<Self> {
#[derive(Error, Diagnostic, Debug)]
#[error("Input relation to algorithm has insufficient arity")]
#[diagnostic(help("Arity should be at least {0} but is {1}"))]
#[diagnostic(code(algo::input_relation_bad_arity))]
struct InputRelationArityError(usize, usize, #[label] SourceSpan);
let arity = self.arg_manifest.arity(self.tx, self.stores)?;
ensure!(
arity >= len,
InputRelationArityError(len, arity, self.arg_manifest.span())
);
Ok(self)
}
pub fn get_binding_map(&self, offset: usize) -> BTreeMap<Symbol, usize> {
self.arg_manifest.get_binding_map(offset)
}
pub fn iter(&self) -> Result<TupleIter<'a>> {
self.arg_manifest.iter(self.tx, self.stores)
}
pub fn prefix_iter(&self, prefix: &DataValue) -> Result<TupleIter<'_>> {
self.arg_manifest.prefix_iter(prefix, self.tx, self.stores)
}
pub fn span(&self) -> SourceSpan {
self.arg_manifest.span()
}
fn convert_edge_to_graph(
&self,
_options: &BTreeMap<SmartString<LazyCompact>, Expr>,
_rule_head: &[Symbol],
_span: SourceSpan,
) -> Result<usize>;
fn process_options(
undirected: bool,
) -> Result<(Vec<Vec<usize>>, Vec<DataValue>, BTreeMap<DataValue, usize>)> {
self.arg_manifest
.convert_edge_to_graph(undirected, self.tx, self.stores)
}
pub fn convert_edge_to_weighted_graph(
&self,
undirected: bool,
allow_negative_edges: bool,
) -> Result<(
Vec<Vec<(usize, f64)>>,
Vec<DataValue>,
BTreeMap<DataValue, usize>,
bool,
)> {
self.arg_manifest.convert_edge_to_weighted_graph(
undirected,
allow_negative_edges,
self.tx,
self.stores,
)
}
}
impl<'a, 'b> AlgoPayload<'a, 'b> {
pub fn get_input(&self, idx: usize) -> Result<AlgoInputRelation<'a, 'b>> {
let arg_manifest = self.manifest.relation(idx)?;
Ok(AlgoInputRelation {
arg_manifest,
stores: self.stores,
tx: self.tx,
})
}
pub fn name(&self) -> &str {
&self.manifest.algo.name
}
pub fn span(&self) -> SourceSpan {
self.manifest.span
}
pub fn expr_option(&self, name: &str, default: Option<Expr>) -> Result<Expr> {
match self.manifest.options.get(name) {
Some(ex) => Ok(ex.clone()),
None => match default {
Some(ex) => Ok(ex),
None => Err(AlgoOptionNotFoundError {
name: name.to_string(),
span: self.manifest.span,
algo_name: self.manifest.algo.name.to_string(),
}
.into()),
},
}
}
pub fn string_option(
&self,
name: &str,
default: Option<&str>,
) -> Result<SmartString<LazyCompact>> {
match self.manifest.options.get(name) {
Some(ex) => match ex.clone().eval_to_const()? {
DataValue::Str(s) => Ok(s),
_ => Err(WrongAlgoOptionError {
name: name.to_string(),
span: ex.span(),
algo_name: self.manifest.algo.name.to_string(),
help: "a string is required".to_string(),
}
.into()),
},
None => match default {
None => Err(AlgoOptionNotFoundError {
name: name.to_string(),
span: self.manifest.span,
algo_name: self.manifest.algo.name.to_string(),
}
.into()),
Some(s) => Ok(SmartString::from(s)),
},
}
}
pub fn pos_integer_option(&self, name: &str, default: Option<usize>) -> Result<usize> {
match self.manifest.options.get(name) {
Some(v) => match v.clone().eval_to_const() {
Ok(DataValue::Num(n)) => match n.get_int() {
Some(i) => {
ensure!(
i > 0,
WrongAlgoOptionError {
name: name.to_string(),
span: v.span(),
algo_name: self.manifest.algo.name.to_string(),
help: "a positive integer is required".to_string(),
}
);
Ok(i as usize)
}
None => Err(AlgoOptionNotFoundError {
name: name.to_string(),
span: self.span(),
algo_name: self.manifest.algo.name.to_string(),
}
.into()),
},
_ => Err(WrongAlgoOptionError {
name: name.to_string(),
span: v.span(),
algo_name: self.manifest.algo.name.to_string(),
help: "a positive integer is required".to_string(),
}
.into()),
},
None => match default {
Some(v) => Ok(v),
None => Err(AlgoOptionNotFoundError {
name: name.to_string(),
span: self.manifest.span,
algo_name: self.manifest.algo.name.to_string(),
}
.into()),
},
}
}
pub fn non_neg_integer_option(&self, name: &str, default: Option<usize>) -> Result<usize> {
match self.manifest.options.get(name) {
Some(v) => match v.clone().eval_to_const() {
Ok(DataValue::Num(n)) => match n.get_int() {
Some(i) => {
ensure!(
i >= 0,
WrongAlgoOptionError {
name: name.to_string(),
span: v.span(),
algo_name: self.manifest.algo.name.to_string(),
help: "a non-negative integer is required".to_string(),
}
);
Ok(i as usize)
}
None => Err(AlgoOptionNotFoundError {
name: name.to_string(),
span: self.manifest.span,
algo_name: self.manifest.algo.name.to_string(),
}
.into()),
},
_ => Err(WrongAlgoOptionError {
name: name.to_string(),
span: v.span(),
algo_name: self.manifest.algo.name.to_string(),
help: "a non-negative integer is required".to_string(),
}
.into()),
},
None => match default {
Some(v) => Ok(v),
None => Err(AlgoOptionNotFoundError {
name: name.to_string(),
span: self.manifest.span,
algo_name: self.manifest.algo.name.to_string(),
}
.into()),
},
}
}
pub fn unit_interval_option(&self, name: &str, default: Option<f64>) -> Result<f64> {
match self.manifest.options.get(name) {
Some(v) => match v.clone().eval_to_const() {
Ok(DataValue::Num(n)) => {
let f = n.get_float();
ensure!(
(0. ..=1.).contains(&f),
WrongAlgoOptionError {
name: name.to_string(),
span: v.span(),
algo_name: self.manifest.algo.name.to_string(),
help: "a number between 0. and 1. is required".to_string(),
}
);
Ok(f)
}
_ => Err(WrongAlgoOptionError {
name: name.to_string(),
span: v.span(),
algo_name: self.manifest.algo.name.to_string(),
help: "a number between 0. and 1. is required".to_string(),
}
.into()),
},
None => match default {
Some(v) => Ok(v),
None => Err(AlgoOptionNotFoundError {
name: name.to_string(),
span: self.manifest.span,
algo_name: self.manifest.algo.name.to_string(),
}
.into()),
},
}
}
pub(crate) fn bool_option(&self, name: &str, default: Option<bool>) -> Result<bool> {
match self.manifest.options.get(name) {
Some(v) => match v.clone().eval_to_const() {
Ok(DataValue::Bool(b)) => Ok(b),
_ => Err(WrongAlgoOptionError {
name: name.to_string(),
span: v.span(),
algo_name: self.manifest.algo.name.to_string(),
help: "a boolean value is required".to_string(),
}
.into()),
},
None => match default {
Some(v) => Ok(v),
None => Err(AlgoOptionNotFoundError {
name: name.to_string(),
span: self.manifest.span,
algo_name: self.manifest.algo.name.to_string(),
}
.into()),
},
}
}
}
/// Trait for an implementation of an algorithm or a utility
pub trait AlgoImpl {
/// Called to initialize the options given.
/// Will always be called once, before anything else.
/// You can mutate the options if you need to.
/// The default implementation does nothing.
fn init_options(
&self,
_options: &mut BTreeMap<SmartString<LazyCompact>, Expr>,
_span: SourceSpan,
) -> Result<()> {
Ok(())
}
/// You must return the row width of the returned relation and it must be accurate.
/// This function may be called multiple times.
fn arity(
&self,
options: &BTreeMap<SmartString<LazyCompact>, Expr>,
rule_head: &[Symbol],
span: SourceSpan,
) -> Result<usize>;
/// You should implement the logic of your algorithm/utility in this function.
/// The outputs are written to `out`. You should check `poison` periodically
/// for user-initiated termination.
fn run(
&mut self,
payload: AlgoPayload<'_, '_>,
out: &'_ mut RegularTempStore,
poison: Poison,
) -> Result<()>;
}
#[derive(Debug, Error, Diagnostic)]
@ -336,7 +611,6 @@ impl MagicAlgoRuleArg {
}
Ok((graph, indices, inv_indices, has_neg_edge))
}
#[allow(dead_code)]
pub(crate) fn convert_edge_to_graph<'a>(
&'a self,
undirected: bool,
@ -376,7 +650,6 @@ impl MagicAlgoRuleArg {
}
Ok((graph, indices, inv_indices))
}
#[allow(dead_code)]
pub(crate) fn prefix_iter<'a>(
&'a self,
prefix: &DataValue,

@ -17,34 +17,31 @@ use miette::Result;
use nalgebra::{Dynamic, OMatrix, U1};
use smartstring::{LazyCompact, SmartString};
use crate::algo::AlgoImpl;
use crate::algo::{AlgoImpl, AlgoPayload};
use crate::data::expr::Expr;
use crate::data::program::{MagicAlgoApply, MagicSymbol};
use crate::data::symb::Symbol;
use crate::data::value::DataValue;
use crate::parse::SourceSpan;
use crate::runtime::db::Poison;
use crate::runtime::temp_store::{EpochStore, RegularTempStore};
use crate::runtime::transact::SessionTx;
use crate::runtime::temp_store::RegularTempStore;
pub(crate) struct PageRank;
impl AlgoImpl for PageRank {
fn run<'a>(
#[allow(unused_variables)]
fn run(
&mut self,
tx: &'a SessionTx<'_>,
algo: &'a MagicAlgoApply,
stores: &'a BTreeMap<MagicSymbol, EpochStore>,
out: &'a mut RegularTempStore,
_poison: Poison,
payload: AlgoPayload<'_, '_>,
out: &mut RegularTempStore,
poison: Poison,
) -> Result<()> {
let edges = algo.relation(0)?;
let undirected = algo.bool_option("undirected", Some(false))?;
let theta = algo.unit_interval_option("theta", Some(0.85))? as f32;
let epsilon = algo.unit_interval_option("epsilon", Some(0.0001))? as f32;
let iterations = algo.pos_integer_option("iterations", Some(10))?;
let edges = payload.get_input(0)?;
let undirected = payload.bool_option("undirected", Some(false))?;
let theta = payload.unit_interval_option("theta", Some(0.85))? as f32;
let epsilon = payload.unit_interval_option("epsilon", Some(0.0001))? as f32;
let iterations = payload.pos_integer_option("iterations", Some(10))?;
let (graph, indices, _) = edges.convert_edge_to_graph(undirected, tx, stores)?;
let (graph, indices, _) = edges.convert_edge_to_graph(undirected)?;
#[cfg(feature = "rayon")]
{

@ -16,37 +16,32 @@ use priority_queue::PriorityQueue;
use smartstring::{LazyCompact, SmartString};
use thiserror::Error;
use crate::algo::AlgoImpl;
use crate::algo::{AlgoImpl, AlgoPayload};
use crate::data::expr::Expr;
use crate::data::program::{MagicAlgoApply, MagicSymbol};
use crate::data::symb::Symbol;
use crate::data::value::DataValue;
use crate::parse::SourceSpan;
use crate::runtime::db::Poison;
use crate::runtime::temp_store::{EpochStore, RegularTempStore};
use crate::runtime::transact::SessionTx;
use crate::runtime::temp_store::RegularTempStore;
pub(crate) struct MinimumSpanningTreePrim;
impl AlgoImpl for MinimumSpanningTreePrim {
fn run<'a>(
fn run(
&mut self,
tx: &'a SessionTx<'_>,
algo: &'a MagicAlgoApply,
stores: &'a BTreeMap<MagicSymbol, EpochStore>,
out: &'a mut RegularTempStore,
payload: AlgoPayload<'_, '_>,
out: &mut RegularTempStore,
poison: Poison,
) -> Result<()> {
let edges = algo.relation(0)?;
let (graph, indices, inv_indices, _) =
edges.convert_edge_to_weighted_graph(true, true, tx, stores)?;
let edges = payload.get_input(0)?;
let (graph, indices, inv_indices, _) = edges.convert_edge_to_weighted_graph(true, true)?;
if graph.is_empty() {
return Ok(());
}
let starting = match algo.relation(1) {
let starting = match payload.get_input(1) {
Err(_) => 0,
Ok(rel) => {
let tuple = rel.iter(tx, stores)?.next().ok_or_else(|| {
let tuple = rel.iter()?.next().ok_or_else(|| {
#[derive(Debug, Error, Diagnostic)]
#[error("The provided starting nodes relation is empty")]
#[diagnostic(code(algo::empty_starting))]

@ -14,37 +14,33 @@ use rand::distributions::WeightedIndex;
use rand::prelude::*;
use smartstring::{LazyCompact, SmartString};
use crate::algo::{AlgoImpl, BadExprValueError, NodeNotFoundError};
use crate::algo::{AlgoImpl, AlgoPayload, BadExprValueError, NodeNotFoundError};
use crate::data::expr::Expr;
use crate::data::program::{MagicAlgoApply, MagicSymbol};
use crate::data::symb::Symbol;
use crate::data::value::DataValue;
use crate::parse::SourceSpan;
use crate::runtime::db::Poison;
use crate::runtime::temp_store::{EpochStore, RegularTempStore};
use crate::runtime::transact::SessionTx;
use crate::runtime::temp_store::RegularTempStore;
pub(crate) struct RandomWalk;
impl AlgoImpl for RandomWalk {
fn run<'a>(
fn run(
&mut self,
tx: &'a SessionTx<'_>,
algo: &'a MagicAlgoApply,
stores: &'a BTreeMap<MagicSymbol, EpochStore>,
out: &'a mut RegularTempStore,
payload: AlgoPayload<'_, '_>,
out: &mut RegularTempStore,
poison: Poison,
) -> Result<()> {
let edges = algo.relation_with_min_len(0, 2, tx, stores)?;
let nodes = algo.relation(1)?;
let starting = algo.relation(2)?;
let iterations = algo.pos_integer_option("iterations", Some(1))?;
let steps = algo.pos_integer_option("steps", None)?;
let edges = payload.get_input(0)?.ensure_min_len(2)?;
let nodes = payload.get_input(1)?;
let starting = payload.get_input(2)?;
let iterations = payload.pos_integer_option("iterations", Some(1))?;
let steps = payload.pos_integer_option("steps", None)?;
let mut maybe_weight = algo.expr_option("weight", None).ok();
let mut maybe_weight = payload.expr_option("weight", None).ok();
if let Some(weight) = &mut maybe_weight {
let mut nodes_binding = nodes.get_binding_map(0);
let nodes_arity = nodes.arity(tx, stores)?;
let nodes_arity = nodes.arity()?;
let edges_binding = edges.get_binding_map(nodes_arity);
nodes_binding.extend(edges_binding);
weight.fill_binding_indices(&nodes_binding)?;
@ -52,24 +48,24 @@ impl AlgoImpl for RandomWalk {
let mut counter = 0i64;
let mut rng = thread_rng();
for start_node in starting.iter(tx, stores)? {
for start_node in starting.iter()? {
let start_node = start_node?;
let start_node_key = &start_node[0];
let starting_tuple = nodes
.prefix_iter(start_node_key, tx, stores)?
.next()
.ok_or_else(|| NodeNotFoundError {
missing: start_node_key.clone(),
span: starting.span(),
})??;
let starting_tuple =
nodes
.prefix_iter(start_node_key)?
.next()
.ok_or_else(|| NodeNotFoundError {
missing: start_node_key.clone(),
span: starting.span(),
})??;
for _ in 0..iterations {
counter += 1;
let mut current_tuple = starting_tuple.clone();
let mut path = vec![start_node_key.clone()];
for _ in 0..steps {
let cur_node_key = &current_tuple[0];
let candidate_steps: Vec<_> =
edges.prefix_iter(cur_node_key, tx, stores)?.try_collect()?;
let candidate_steps: Vec<_> = edges.prefix_iter(cur_node_key)?.try_collect()?;
if candidate_steps.is_empty() {
break;
}
@ -109,13 +105,12 @@ impl AlgoImpl for RandomWalk {
};
let next_node = &next_step[1];
path.push(next_node.clone());
current_tuple = nodes
.prefix_iter(next_node, tx, stores)?
.next()
.ok_or_else(|| NodeNotFoundError {
current_tuple = nodes.prefix_iter(next_node)?.next().ok_or_else(|| {
NodeNotFoundError {
missing: next_node.clone(),
span: nodes.span(),
})??;
}
})??;
poison.check()?;
}
out.put(vec![

@ -12,31 +12,28 @@ use itertools::Itertools;
use miette::{bail, Result};
use smartstring::{LazyCompact, SmartString};
use crate::algo::{AlgoImpl, CannotDetermineArity};
use crate::algo::{AlgoImpl, AlgoPayload, CannotDetermineArity};
use crate::data::expr::Expr;
use crate::data::functions::OP_LIST;
use crate::data::program::{MagicAlgoApply, MagicSymbol, WrongAlgoOptionError};
use crate::data::program::WrongAlgoOptionError;
use crate::data::symb::Symbol;
use crate::data::value::DataValue;
use crate::parse::SourceSpan;
use crate::runtime::db::Poison;
use crate::runtime::temp_store::{EpochStore, RegularTempStore};
use crate::runtime::transact::SessionTx;
use crate::runtime::temp_store::RegularTempStore;
pub(crate) struct ReorderSort;
impl AlgoImpl for ReorderSort {
fn run<'a>(
fn run(
&mut self,
tx: &'a SessionTx<'_>,
algo: &'a MagicAlgoApply,
stores: &'a BTreeMap<MagicSymbol, EpochStore>,
out: &'a mut RegularTempStore,
payload: AlgoPayload<'_, '_>,
out: &mut RegularTempStore,
poison: Poison,
) -> Result<()> {
let in_rel = algo.relation(0)?;
let in_rel = payload.get_input(0)?;
let mut out_list = match algo.expr_option("out", None)? {
let mut out_list = match payload.expr_option("out", None)? {
Expr::Const {
val: DataValue::List(l),
span,
@ -51,24 +48,24 @@ impl AlgoImpl for ReorderSort {
_ => {
bail!(WrongAlgoOptionError {
name: "out".to_string(),
span: algo.span,
algo_name: algo.algo.name.to_string(),
span: payload.span(),
algo_name: payload.name().to_string(),
help: "This option must evaluate to a list".to_string()
})
}
};
let mut sort_by = algo.expr_option(
let mut sort_by = payload.expr_option(
"sort_by",
Some(Expr::Const {
val: DataValue::Null,
span: SourceSpan(0, 0),
}),
)?;
let sort_descending = algo.bool_option("descending", Some(false))?;
let break_ties = algo.bool_option("break_ties", Some(false))?;
let skip = algo.non_neg_integer_option("skip", Some(0))?;
let take = algo.non_neg_integer_option("take", Some(0))?;
let sort_descending = payload.bool_option("descending", Some(false))?;
let break_ties = payload.bool_option("break_ties", Some(false))?;
let skip = payload.non_neg_integer_option("skip", Some(0))?;
let take = payload.non_neg_integer_option("take", Some(0))?;
let binding_map = in_rel.get_binding_map(0);
sort_by.fill_binding_indices(&binding_map)?;
@ -77,7 +74,7 @@ impl AlgoImpl for ReorderSort {
}
let mut buffer = vec![];
for tuple in in_rel.iter(tx, stores)? {
for tuple in in_rel.iter()? {
let tuple = tuple?;
let sorter = sort_by.eval(&tuple)?;
let mut s_tuple: Vec<_> = out_list.iter().map(|ex| ex.eval(&tuple)).try_collect()?;

@ -19,38 +19,34 @@ use rayon::prelude::*;
use smallvec::{smallvec, SmallVec};
use smartstring::{LazyCompact, SmartString};
use crate::algo::AlgoImpl;
use crate::algo::{AlgoImpl, AlgoPayload};
use crate::data::expr::Expr;
use crate::data::program::{MagicAlgoApply, MagicSymbol};
use crate::data::symb::Symbol;
use crate::data::value::DataValue;
use crate::parse::SourceSpan;
use crate::runtime::db::Poison;
use crate::runtime::temp_store::{EpochStore, RegularTempStore};
use crate::runtime::transact::SessionTx;
use crate::runtime::temp_store::RegularTempStore;
pub(crate) struct ShortestPathDijkstra;
impl AlgoImpl for ShortestPathDijkstra {
fn run<'a>(
fn run(
&mut self,
tx: &'a SessionTx<'_>,
algo: &'a MagicAlgoApply,
stores: &'a BTreeMap<MagicSymbol, EpochStore>,
out: &'a mut RegularTempStore,
payload: AlgoPayload<'_, '_>,
out: &mut RegularTempStore,
poison: Poison,
) -> Result<()> {
let edges = algo.relation(0)?;
let starting = algo.relation(1)?;
let termination = algo.relation(2);
let undirected = algo.bool_option("undirected", Some(false))?;
let keep_ties = algo.bool_option("keep_ties", Some(false))?;
let edges = payload.get_input(0)?;
let starting = payload.get_input(1)?;
let termination = payload.get_input(2);
let undirected = payload.bool_option("undirected", Some(false))?;
let keep_ties = payload.bool_option("keep_ties", Some(false))?;
let (graph, indices, inv_indices, _) =
edges.convert_edge_to_weighted_graph(undirected, false, tx, stores)?;
edges.convert_edge_to_weighted_graph(undirected, false)?;
let mut starting_nodes = BTreeSet::new();
for tuple in starting.iter(tx, stores)? {
for tuple in starting.iter()? {
let tuple = tuple?;
let node = &tuple[0];
if let Some(idx) = inv_indices.get(node) {
@ -61,7 +57,7 @@ impl AlgoImpl for ShortestPathDijkstra {
Err(_) => None,
Ok(t) => {
let mut tn = BTreeSet::new();
for tuple in t.iter(tx, stores)? {
for tuple in t.iter()? {
let tuple = tuple?;
let node = &tuple[0];
if let Some(idx) = inv_indices.get(node) {

@ -14,7 +14,7 @@ use itertools::Itertools;
use miette::Result;
use smartstring::{LazyCompact, SmartString};
use crate::algo::AlgoImpl;
use crate::algo::{AlgoImpl, AlgoPayload};
use crate::data::expr::Expr;
use crate::data::program::{MagicAlgoApply, MagicSymbol};
use crate::data::symb::Symbol;
@ -38,18 +38,16 @@ impl StronglyConnectedComponent {
#[cfg(feature = "graph-algo")]
impl AlgoImpl for StronglyConnectedComponent {
fn run<'a>(
fn run(
&mut self,
tx: &'a SessionTx<'_>,
algo: &'a MagicAlgoApply,
stores: &'a BTreeMap<MagicSymbol, EpochStore>,
out: &'a mut RegularTempStore,
payload: AlgoPayload<'_, '_>,
out: &mut RegularTempStore,
poison: Poison,
) -> Result<()> {
let edges = algo.relation(0)?;
let edges = payload.get_input(0)?;
let (graph, indices, mut inv_indices) =
edges.convert_edge_to_graph(!self.strong, tx, stores)?;
edges.convert_edge_to_graph(!self.strong)?;
let tarjan = TarjanScc::new(&graph).run(poison)?;
for (grp_id, cc) in tarjan.iter().enumerate() {
@ -62,8 +60,8 @@ impl AlgoImpl for StronglyConnectedComponent {
let mut counter = tarjan.len() as i64;
if let Ok(nodes) = algo.relation(1) {
for tuple in nodes.iter(tx, stores)? {
if let Ok(nodes) = payload.get_input(1) {
for tuple in nodes.iter()? {
let tuple = tuple?;
let node = tuple.into_iter().next().unwrap();
if !inv_indices.contains_key(&node) {

@ -11,30 +11,26 @@ use std::collections::BTreeMap;
use miette::Result;
use smartstring::{LazyCompact, SmartString};
use crate::algo::AlgoImpl;
use crate::algo::{AlgoImpl, AlgoPayload};
use crate::data::expr::Expr;
use crate::data::program::{MagicAlgoApply, MagicSymbol};
use crate::data::symb::Symbol;
use crate::data::value::DataValue;
use crate::parse::SourceSpan;
use crate::runtime::db::Poison;
use crate::runtime::temp_store::{EpochStore, RegularTempStore};
use crate::runtime::transact::SessionTx;
use crate::runtime::temp_store::RegularTempStore;
pub(crate) struct TopSort;
impl AlgoImpl for TopSort {
fn run<'a>(
fn run(
&mut self,
tx: &'a SessionTx<'_>,
algo: &'a MagicAlgoApply,
stores: &'a BTreeMap<MagicSymbol, EpochStore>,
out: &'a mut RegularTempStore,
payload: AlgoPayload<'_, '_>,
out: &mut RegularTempStore,
poison: Poison,
) -> Result<()> {
let edges = algo.relation(0)?;
let edges = payload.get_input(0)?;
let (graph, indices, _) = edges.convert_edge_to_graph(false, tx, stores)?;
let (graph, indices, _) = edges.convert_edge_to_graph(false)?;
let sorted = kahn(&graph, poison)?;

@ -13,29 +13,25 @@ use miette::Result;
use rayon::prelude::*;
use smartstring::{LazyCompact, SmartString};
use crate::algo::AlgoImpl;
use crate::algo::{AlgoImpl, AlgoPayload};
use crate::data::expr::Expr;
use crate::data::program::{MagicAlgoApply, MagicSymbol};
use crate::data::symb::Symbol;
use crate::data::value::DataValue;
use crate::parse::SourceSpan;
use crate::runtime::db::Poison;
use crate::runtime::temp_store::{EpochStore, RegularTempStore};
use crate::runtime::transact::SessionTx;
use crate::runtime::temp_store::RegularTempStore;
pub(crate) struct ClusteringCoefficients;
impl AlgoImpl for ClusteringCoefficients {
fn run<'a>(
fn run(
&mut self,
tx: &'a SessionTx<'_>,
algo: &'a MagicAlgoApply,
stores: &'a BTreeMap<MagicSymbol, EpochStore>,
out: &'a mut RegularTempStore,
payload: AlgoPayload<'_, '_>,
out: &mut RegularTempStore,
poison: Poison,
) -> Result<()> {
let edges = algo.relation(0)?;
let (graph, indices, _) = edges.convert_edge_to_graph(true, tx, stores)?;
let edges = payload.get_input(0)?;
let (graph, indices, _) = edges.convert_edge_to_graph(true)?;
let graph: Vec<BTreeSet<usize>> =
graph.into_iter().map(|e| e.into_iter().collect()).collect();
let coefficients = clustering_coefficients(&graph, poison)?;

@ -15,38 +15,34 @@ use rayon::prelude::*;
use smartstring::{LazyCompact, SmartString};
use crate::algo::shortest_path_dijkstra::dijkstra;
use crate::algo::AlgoImpl;
use crate::algo::{AlgoImpl, AlgoPayload};
use crate::data::expr::Expr;
use crate::data::program::{MagicAlgoApply, MagicSymbol};
use crate::data::symb::Symbol;
use crate::data::value::DataValue;
use crate::parse::SourceSpan;
use crate::runtime::db::Poison;
use crate::runtime::temp_store::{EpochStore, RegularTempStore};
use crate::runtime::transact::SessionTx;
use crate::runtime::temp_store::RegularTempStore;
pub(crate) struct KShortestPathYen;
impl AlgoImpl for KShortestPathYen {
fn run<'a>(
fn run(
&mut self,
tx: &'a SessionTx<'_>,
algo: &'a MagicAlgoApply,
stores: &'a BTreeMap<MagicSymbol, EpochStore>,
out: &'a mut RegularTempStore,
payload: AlgoPayload<'_, '_>,
out: &mut RegularTempStore,
poison: Poison,
) -> Result<()> {
let edges = algo.relation(0)?;
let starting = algo.relation(1)?;
let termination = algo.relation(2)?;
let undirected = algo.bool_option("undirected", Some(false))?;
let k = algo.pos_integer_option("k", None)?;
let edges = payload.get_input(0)?;
let starting = payload.get_input(1)?;
let termination = payload.get_input(2)?;
let undirected = payload.bool_option("undirected", Some(false))?;
let k = payload.pos_integer_option("k", None)?;
let (graph, indices, inv_indices, _) =
edges.convert_edge_to_weighted_graph(undirected, false, tx, stores)?;
edges.convert_edge_to_weighted_graph(undirected, false)?;
let mut starting_nodes = BTreeSet::new();
for tuple in starting.iter(tx, stores)? {
for tuple in starting.iter()? {
let tuple = tuple?;
let node = &tuple[0];
if let Some(idx) = inv_indices.get(node) {
@ -54,7 +50,7 @@ impl AlgoImpl for KShortestPathYen {
}
}
let mut termination_nodes = BTreeSet::new();
for tuple in termination.iter(tx, stores)? {
for tuple in termination.iter()? {
let tuple = tuple?;
let node = &tuple[0];
if let Some(idx) = inv_indices.get(node) {

@ -24,7 +24,7 @@ use crate::data::value::{DataValue, LARGEST_UTF_CHAR};
use crate::parse::SourceSpan;
#[derive(Clone, PartialEq, Eq, serde_derive::Serialize, serde_derive::Deserialize)]
pub(crate) enum Expr {
pub enum Expr {
Binding {
var: Symbol,
tuple_pos: Option<usize>,
@ -542,7 +542,7 @@ impl Default for ValueRange {
}
#[derive(Clone)]
pub(crate) struct Op {
pub struct Op {
pub(crate) name: &'static str,
pub(crate) min_arity: usize,
pub(crate) vararg: bool,

@ -313,195 +313,6 @@ impl MagicAlgoApply {
algo_name: self.algo.name.to_string(),
})?)
}
pub(crate) fn expr_option(&self, name: &str, default: Option<Expr>) -> Result<Expr> {
match self.options.get(name) {
Some(ex) => Ok(ex.clone()),
None => match default {
Some(ex) => Ok(ex),
None => Err(AlgoOptionNotFoundError {
name: name.to_string(),
span: self.span,
algo_name: self.algo.name.to_string(),
}
.into()),
},
}
}
pub(crate) fn string_option(
&self,
name: &str,
default: Option<&str>,
) -> Result<SmartString<LazyCompact>> {
match self.options.get(name) {
Some(ex) => match ex.clone().eval_to_const()? {
DataValue::Str(s) => Ok(s),
_ => Err(WrongAlgoOptionError {
name: name.to_string(),
span: ex.span(),
algo_name: self.algo.name.to_string(),
help: "a string is required".to_string(),
}
.into()),
},
None => match default {
None => Err(AlgoOptionNotFoundError {
name: name.to_string(),
span: self.span,
algo_name: self.algo.name.to_string(),
}
.into()),
Some(s) => Ok(SmartString::from(s)),
},
}
}
#[allow(dead_code)]
pub(crate) fn pos_integer_option(&self, name: &str, default: Option<usize>) -> Result<usize> {
match self.options.get(name) {
Some(v) => match v.clone().eval_to_const() {
Ok(DataValue::Num(n)) => match n.get_int() {
Some(i) => {
ensure!(
i > 0,
WrongAlgoOptionError {
name: name.to_string(),
span: v.span(),
algo_name: self.algo.name.to_string(),
help: "a positive integer is required".to_string(),
}
);
Ok(i as usize)
}
None => Err(AlgoOptionNotFoundError {
name: name.to_string(),
span: self.span,
algo_name: self.algo.name.to_string(),
}
.into()),
},
_ => Err(WrongAlgoOptionError {
name: name.to_string(),
span: v.span(),
algo_name: self.algo.name.to_string(),
help: "a positive integer is required".to_string(),
}
.into()),
},
None => match default {
Some(v) => Ok(v),
None => Err(AlgoOptionNotFoundError {
name: name.to_string(),
span: self.span,
algo_name: self.algo.name.to_string(),
}
.into()),
},
}
}
pub(crate) fn non_neg_integer_option(
&self,
name: &str,
default: Option<usize>,
) -> Result<usize> {
match self.options.get(name) {
Some(v) => match v.clone().eval_to_const() {
Ok(DataValue::Num(n)) => match n.get_int() {
Some(i) => {
ensure!(
i >= 0,
WrongAlgoOptionError {
name: name.to_string(),
span: v.span(),
algo_name: self.algo.name.to_string(),
help: "a non-negative integer is required".to_string(),
}
);
Ok(i as usize)
}
None => Err(AlgoOptionNotFoundError {
name: name.to_string(),
span: self.span,
algo_name: self.algo.name.to_string(),
}
.into()),
},
_ => Err(WrongAlgoOptionError {
name: name.to_string(),
span: v.span(),
algo_name: self.algo.name.to_string(),
help: "a non-negative integer is required".to_string(),
}
.into()),
},
None => match default {
Some(v) => Ok(v),
None => Err(AlgoOptionNotFoundError {
name: name.to_string(),
span: self.span,
algo_name: self.algo.name.to_string(),
}
.into()),
},
}
}
#[allow(dead_code)]
pub(crate) fn unit_interval_option(&self, name: &str, default: Option<f64>) -> Result<f64> {
match self.options.get(name) {
Some(v) => match v.clone().eval_to_const() {
Ok(DataValue::Num(n)) => {
let f = n.get_float();
ensure!(
(0. ..=1.).contains(&f),
WrongAlgoOptionError {
name: name.to_string(),
span: v.span(),
algo_name: self.algo.name.to_string(),
help: "a number between 0. and 1. is required".to_string(),
}
);
Ok(f)
}
_ => Err(WrongAlgoOptionError {
name: name.to_string(),
span: v.span(),
algo_name: self.algo.name.to_string(),
help: "a number between 0. and 1. is required".to_string(),
}
.into()),
},
None => match default {
Some(v) => Ok(v),
None => Err(AlgoOptionNotFoundError {
name: name.to_string(),
span: self.span,
algo_name: self.algo.name.to_string(),
}
.into()),
},
}
}
pub(crate) fn bool_option(&self, name: &str, default: Option<bool>) -> Result<bool> {
match self.options.get(name) {
Some(v) => match v.clone().eval_to_const() {
Ok(DataValue::Bool(b)) => Ok(b),
_ => Err(WrongAlgoOptionError {
name: name.to_string(),
span: v.span(),
algo_name: self.algo.name.to_string(),
help: "a boolean value is required".to_string(),
}
.into()),
},
None => match default {
Some(v) => Ok(v),
None => Err(AlgoOptionNotFoundError {
name: name.to_string(),
span: self.span,
algo_name: self.algo.name.to_string(),
}
.into()),
},
}
}
}
impl Debug for MagicAlgoApply {

@ -19,7 +19,7 @@ use thiserror::Error;
use crate::parse::SourceSpan;
#[derive(Clone, Deserialize, Serialize)]
pub(crate) struct Symbol {
pub struct Symbol {
pub(crate) name: SmartString<LazyCompact>,
#[serde(skip)]
pub(crate) span: SourceSpan,

@ -62,6 +62,7 @@ pub use storage::sqlite::{new_cozo_sqlite, SqliteStorage};
pub use storage::tikv::{new_cozo_tikv, TiKvStorage};
pub use storage::{Storage, StoreTx};
pub use runtime::temp_store::RegularTempStore;
pub use algo::AlgoImpl;
use crate::data::json::JsonValue;

@ -59,7 +59,7 @@ impl CozoScript {
#[derive(
Eq, PartialEq, Debug, serde_derive::Serialize, serde_derive::Deserialize, Copy, Clone, Default,
)]
pub(crate) struct SourceSpan(pub(crate) usize, pub(crate) usize);
pub struct SourceSpan(pub(crate) usize, pub(crate) usize);
impl SourceSpan {
pub(crate) fn merge(self, other: Self) -> Self {

@ -199,7 +199,7 @@ pub(crate) fn parse_query(
name: Symbol::new("Constant", span),
};
let algo_impl = handle.get_impl()?;
algo_impl.process_options(&mut options, span)?;
algo_impl.init_options(&mut options, span)?;
let arity = algo_impl.arity(&options, &head, span)?;
ensure!(arity != 0, EmptyRowForConstRule(span));
@ -745,7 +745,7 @@ fn parse_algo_rule(
let algo = AlgoHandle::new(algo_name, name_pair.extract_span());
let algo_impl = algo.get_impl()?;
algo_impl.process_options(&mut options, args_list_span)?;
algo_impl.init_options(&mut options, args_list_span)?;
let arity = algo_impl.arity(&options, &head, name_pair.extract_span())?;
ensure!(

@ -12,6 +12,7 @@ use std::collections::BTreeMap;
use itertools::Itertools;
use log::{debug, trace};
use miette::Result;
use crate::algo::AlgoPayload;
use crate::data::aggr::Aggregation;
use crate::data::program::{MagicSymbol, NoEntryError};
@ -154,7 +155,12 @@ impl<'a> SessionTx<'a> {
CompiledRuleSet::Algo(algo_apply) => {
let mut algo_impl = algo_apply.algo.get_impl()?;
let mut out = RegularTempStore::default();
algo_impl.run(self, algo_apply, stores, &mut out, poison.clone())?;
let payload = AlgoPayload {
manifest: algo_apply,
stores,
tx: self,
};
algo_impl.run(payload, &mut out, poison.clone())?;
out.wrap()
}
};

@ -1079,7 +1079,7 @@ impl<'s, S: Storage<'s>> Db<S> {
}
#[derive(Clone, Default)]
pub(crate) struct Poison(pub(crate) Arc<AtomicBool>);
pub struct Poison(pub(crate) Arc<AtomicBool>);
impl Poison {
#[inline(always)]

Loading…
Cancel
Save