diff --git a/Cargo.toml b/Cargo.toml index a29996d1..834e3630 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,6 +37,7 @@ pest = "2.2.1" pest_derive = "2.2.1" rayon = "1.5.3" nalgebra = "0.31.1" +reqwest = { version = "0.11.11", features = ["blocking"] } approx = "0.5.1" unicode-normalization = "0.1.21" thiserror = "1.0.34" diff --git a/src/algo/jlines.rs b/src/algo/jlines.rs new file mode 100644 index 00000000..63973955 --- /dev/null +++ b/src/algo/jlines.rs @@ -0,0 +1,126 @@ +use std::collections::BTreeMap; +use std::fs::File; +use std::io::BufRead; +use std::{fs, io}; + +use itertools::Itertools; +use miette::{bail, miette, Diagnostic, IntoDiagnostic, Result}; +use thiserror::Error; + +use crate::algo::AlgoImpl; +use crate::data::json::JsonValue; +use crate::data::program::{MagicAlgoApply, MagicSymbol}; +use crate::data::tuple::Tuple; +use crate::data::value::DataValue; +use crate::parse::SourceSpan; +use crate::runtime::db::Poison; +use crate::runtime::derived::DerivedRelStore; +use crate::runtime::transact::SessionTx; + +pub(crate) struct JsonReader; + +impl AlgoImpl for JsonReader { + fn run( + &mut self, + _tx: &SessionTx, + algo: &MagicAlgoApply, + _stores: &BTreeMap, + out: &DerivedRelStore, + _poison: Poison, + ) -> Result<()> { + let url = algo.string_option("url", None)?; + let json_lines = algo.bool_option("json_lines", Some(true))?; + let null_if_absent = algo.bool_option("null_if_absent", Some(false))?; + let prepend_index = algo.bool_option("prepend_index", Some(false))?; + + #[derive(Error, Diagnostic, Debug)] + #[error("fields specification must be a list of strings")] + #[diagnostic(code(eval::algo_bad_fields))] + struct BadFields(#[label] SourceSpan); + + let fields_expr = algo.expr_option("fields", None)?; + let fields_span = fields_expr.span(); + let fields: Vec<_> = match fields_expr.eval_to_const()? { + DataValue::List(l) => l + .into_iter() + .map(|d| match d { + DataValue::Str(s) => Ok(s), + _ => Err(BadFields(fields_span)), + }) + .try_collect()?, + _ => bail!(BadFields(fields_span)), + }; + let mut counter = -1i64; + let mut process_row = |row: &JsonValue| -> Result<()> { + let mut ret = if prepend_index { + counter += 1; + vec![DataValue::from(counter)] + } else { + vec![] + }; + for field in &fields { + let val = match row.get(field as &str) { + None => { + if null_if_absent { + DataValue::Null + } else { + bail!("field {} is absent from JSON line", field); + } + } + Some(v) => DataValue::from(v), + }; + ret.push(val); + } + out.put(Tuple(ret), 0); + Ok(()) + }; + match url.strip_prefix("file://") { + Some(file_path) => { + if json_lines { + let file = File::open(file_path).into_diagnostic()?; + for line in io::BufReader::new(file).lines() { + let line = line.into_diagnostic()?; + let line = line.trim(); + if !line.is_empty() { + let row = serde_json::from_str(&line).into_diagnostic()?; + process_row(&row)?; + } + } + } else { + let content = fs::read_to_string(file_path).into_diagnostic()?; + let data: JsonValue = serde_json::from_str(&content).into_diagnostic()?; + let rows = data + .as_array() + .ok_or_else(|| miette!("JSON file is not an array"))?; + for row in rows { + process_row(row)?; + } + } + } + None => { + let content = reqwest::blocking::get(&url as &str) + .into_diagnostic()? + .text() + .into_diagnostic()?; + if json_lines { + for line in content.lines() { + let line = line.trim(); + if !line.is_empty() { + let row = serde_json::from_str(&line).into_diagnostic()?; + process_row(&row)?; + } + } + } else { + let data: JsonValue = serde_json::from_str(&content).into_diagnostic()?; + let rows = data + .as_array() + .ok_or_else(|| miette!("JSON file is not an array"))?; + for row in rows { + process_row(row)?; + } + } + } + } + Ok(()) + } +} diff --git a/src/algo/mod.rs b/src/algo/mod.rs index 8cd247a7..5c2b563c 100644 --- a/src/algo/mod.rs +++ b/src/algo/mod.rs @@ -1,7 +1,7 @@ use std::collections::BTreeMap; use either::Either; -use miette::{bail, Diagnostic, ensure, Result}; +use miette::{bail, ensure, Diagnostic, Result}; use smartstring::{LazyCompact, SmartString}; use thiserror::Error; @@ -10,6 +10,7 @@ use crate::algo::astar::ShortestPathAStar; use crate::algo::bfs::Bfs; use crate::algo::degree_centrality::DegreeCentrality; use crate::algo::dfs::Dfs; +use crate::algo::jlines::JsonReader; use crate::algo::kruskal::MinimumSpanningForestKruskal; use crate::algo::label_propagation::LabelPropagation; use crate::algo::louvain::CommunityDetectionLouvain; @@ -38,6 +39,7 @@ pub(crate) mod astar; pub(crate) mod bfs; pub(crate) mod degree_centrality; pub(crate) mod dfs; +pub(crate) mod jlines; pub(crate) mod kruskal; pub(crate) mod label_propagation; pub(crate) mod louvain; @@ -77,8 +79,13 @@ impl AlgoHandle { &self, _args: Either<&[AlgoRuleArg], &[MagicAlgoRuleArg]>, opts: &BTreeMap, Expr>, - ) -> Option { - Some(match &self.name.name as &str { + ) -> Result { + #[derive(Debug, Error, Diagnostic)] + #[error("Cannot determine arity for algo {0} since {1}")] + #[diagnostic(code(parser::no_algo_arity))] + struct CannotDetermineArity(String, String, #[label] SourceSpan); + + Ok(match &self.name.name as &str { "ClusteringCoefficients" => 4, "DegreeCentrality" => 4, "ClosenessCentrality" => 2, @@ -97,18 +104,61 @@ impl AlgoHandle { "CommunityDetectionLouvain" => 2, "LabelPropagation" => 2, "RandomWalk" => 3, - "ReorderSort" => { - let out_opts = opts.get("out")?; + n @ "ReorderSort" => { + let out_opts = opts.get("out").ok_or_else(|| { + CannotDetermineArity( + n.to_string(), + "option 'out' not provided".to_string(), + self.name.span, + ) + })?; match out_opts { Expr::Const { val: DataValue::List(l), .. } => l.len() + 1, Expr::Apply { op, args, .. } if **op == OP_LIST => args.len() + 1, - _ => return None, + _ => bail!(CannotDetermineArity( + n.to_string(), + "invalid option 'out' given, expect a list".to_string(), + self.name.span + )), + } + } + n @ "JsonReader" => { + let with_row_num = match opts.get("prepend_index") { + None => 0, + Some(Expr::Const { + val: DataValue::Bool(true), + .. + }) => 1, + Some(Expr::Const { + val: DataValue::Bool(false), + .. + }) => 0, + _ => bail!(CannotDetermineArity( + n.to_string(), + "invalid option 'prepend_index' given, expect a boolean".to_string(), + self.name.span + )), + }; + let fields = opts.get("fields").ok_or_else(|| { + CannotDetermineArity( + n.to_string(), + "option 'fields' not provided".to_string(), + self.name.span, + ) + })?; + match fields.clone().eval_to_const()? { + DataValue::List(l) => l.len() + with_row_num, + _ => bail!(CannotDetermineArity( + n.to_string(), + "invalid option 'fields' given, expect a list".to_string(), + self.name.span + )), } } - _ => return None, + n => bail!(AlgoNotFoundError(n.to_string(), self.name.span)), }) } @@ -135,6 +185,7 @@ impl AlgoHandle { "LabelPropagation" => Box::new(LabelPropagation), "RandomWalk" => Box::new(RandomWalk), "ReorderSort" => Box::new(ReorderSort), + "JsonReader" => Box::new(JsonReader), name => bail!(AlgoNotFoundError(name.to_string(), self.name.span)), }) } @@ -148,11 +199,11 @@ struct NotAnEdgeError(#[label] SourceSpan); #[derive(Error, Diagnostic, Debug)] #[error( -"The value {0:?} at the third position in the relation cannot be interpreted as edge weights" + "The value {0:?} at the third position in the relation cannot be interpreted as edge weights" )] #[diagnostic(code(algo::invalid_edge_weight))] #[diagnostic(help( -"Edge weights must be finite numbers. Some algorithm also requires positivity." + "Edge weights must be finite numbers. Some algorithm also requires positivity." ))] struct BadEdgeWeightError(DataValue, #[label] SourceSpan); @@ -165,7 +216,7 @@ struct RuleNotFoundError(String, #[label] SourceSpan); #[error("Invalid reverse scanning of triples")] #[diagnostic(code(algo::invalid_reverse_triple_scan))] #[diagnostic(help( -"Inverse scanning of triples requires the type to be 'ref', or the value be indexed" + "Inverse scanning of triples requires the type to be 'ref', or the value be indexed" ))] struct InvalidInverseTripleUse(String, #[label] SourceSpan); @@ -173,7 +224,7 @@ struct InvalidInverseTripleUse(String, #[label] SourceSpan); #[error("Required node with key {missing:?} not found")] #[diagnostic(code(algo::node_with_key_not_found))] #[diagnostic(help( -"The relation is interpreted as a relation of nodes, but the required key is missing" + "The relation is interpreted as a relation of nodes, but the required key is missing" ))] pub(crate) struct NodeNotFoundError { pub(crate) missing: DataValue, diff --git a/src/data/program.rs b/src/data/program.rs index da3becea..af43a325 100644 --- a/src/data/program.rs +++ b/src/data/program.rs @@ -8,7 +8,7 @@ use smallvec::SmallVec; use smartstring::{LazyCompact, SmartString}; use thiserror::Error; -use crate::algo::{AlgoHandle, AlgoNotFoundError}; +use crate::algo::AlgoHandle; use crate::data::aggr::Aggregation; use crate::data::expr::Expr; use crate::data::symb::{Symbol, PROG_ENTRY}; @@ -118,7 +118,7 @@ pub(crate) struct AlgoApply { } impl AlgoApply { - pub(crate) fn arity(&self) -> Option { + pub(crate) fn arity(&self) -> Result { self.algo.arity(Left(&self.rule_args), &self.options) } } @@ -164,7 +164,7 @@ pub(crate) struct WrongAlgoOptionError { } impl MagicAlgoApply { - pub(crate) fn arity(&self) -> Option { + pub(crate) fn arity(&self) -> Result { self.algo.arity(Right(&self.rule_args), &self.options) } pub(crate) fn relation_with_min_len( @@ -484,11 +484,7 @@ impl InputProgram { if let Some(entry) = self.prog.get(&Symbol::new(PROG_ENTRY, SourceSpan(0, 0))) { return match entry { InputRulesOrAlgo::Rules { rules } => Ok(rules.last().unwrap().head.len()), - InputRulesOrAlgo::Algo { algo: algo_apply } => { - algo_apply.arity().ok_or_else(|| { - AlgoNotFoundError(algo_apply.algo.name.to_string(), algo_apply.span).into() - }) - } + InputRulesOrAlgo::Algo { algo: algo_apply } => algo_apply.arity(), }; } @@ -678,8 +674,8 @@ impl Default for MagicRulesOrAlgo { } impl MagicRulesOrAlgo { - pub(crate) fn arity(&self) -> Option { - Some(match self { + pub(crate) fn arity(&self) -> Result { + Ok(match self { MagicRulesOrAlgo::Rules { rules } => rules.first().unwrap().head.len(), MagicRulesOrAlgo::Algo { algo } => algo.arity()?, }) diff --git a/src/parse/query.rs b/src/parse/query.rs index a0af3deb..aee96d39 100644 --- a/src/parse/query.rs +++ b/src/parse/query.rs @@ -9,7 +9,7 @@ use miette::{bail, ensure, Diagnostic, LabeledSpan, Report, Result}; use smartstring::{LazyCompact, SmartString}; use thiserror::Error; -use crate::algo::{AlgoHandle, AlgoNotFoundError}; +use crate::algo::AlgoHandle; use crate::data::aggr::{parse_aggr, Aggregation}; use crate::data::expr::Expr; use crate::data::program::{ @@ -468,11 +468,16 @@ pub(crate) fn parse_query( if let Some((handle, RelationOp::Create)) = &prog.out_opts.store_relation { let mut bindings = handle.dep_bindings.clone(); bindings.extend_from_slice(&handle.key_bindings); - prog.const_rules.insert(MagicSymbol::Muggle {inner: Symbol::new(PROG_ENTRY, Default::default())}, ConstRule { - bindings, - data: vec![], - span: Default::default() - }); + prog.const_rules.insert( + MagicSymbol::Muggle { + inner: Symbol::new(PROG_ENTRY, Default::default()), + }, + ConstRule { + bindings, + data: vec![], + span: Default::default(), + }, + ); } } @@ -805,9 +810,7 @@ fn parse_algo_rule( } let algo = AlgoHandle::new(algo_name, name_pair.extract_span()); - let algo_arity = algo - .arity(Left(&rule_args), &options) - .ok_or_else(|| AlgoNotFoundError(algo.name.to_string(), algo.name.span))?; + let algo_arity = algo.arity(Left(&rule_args), &options)?; #[derive(Debug, Error, Diagnostic)] #[error("Algorithm rule head arity mismatch")] diff --git a/src/query/compile.rs b/src/query/compile.rs index 145ebeb0..17545ace 100644 --- a/src/query/compile.rs +++ b/src/query/compile.rs @@ -4,7 +4,6 @@ use itertools::Itertools; use miette::{Context, Diagnostic, ensure, Result}; use thiserror::Error; -use crate::algo::AlgoNotFoundError; use crate::data::aggr::Aggregation; use crate::data::expr::Expr; use crate::data::program::{ @@ -108,9 +107,7 @@ impl SessionTx { name.clone(), self.new_rule_store( name.clone(), - ruleset.arity().ok_or_else(|| { - AlgoNotFoundError(name.symbol().to_string(), name.symbol().span) - })?, + ruleset.arity()?, ), ); }