json reader

main
Ziyang Hu 2 years ago
parent 53bbb0be60
commit 1bfa3bd5f2

@ -37,6 +37,7 @@ pest = "2.2.1"
pest_derive = "2.2.1" pest_derive = "2.2.1"
rayon = "1.5.3" rayon = "1.5.3"
nalgebra = "0.31.1" nalgebra = "0.31.1"
reqwest = { version = "0.11.11", features = ["blocking"] }
approx = "0.5.1" approx = "0.5.1"
unicode-normalization = "0.1.21" unicode-normalization = "0.1.21"
thiserror = "1.0.34" thiserror = "1.0.34"

@ -0,0 +1,126 @@
use std::collections::BTreeMap;
use std::fs::File;
use std::io::BufRead;
use std::{fs, io};
use itertools::Itertools;
use miette::{bail, miette, Diagnostic, IntoDiagnostic, Result};
use thiserror::Error;
use crate::algo::AlgoImpl;
use crate::data::json::JsonValue;
use crate::data::program::{MagicAlgoApply, MagicSymbol};
use crate::data::tuple::Tuple;
use crate::data::value::DataValue;
use crate::parse::SourceSpan;
use crate::runtime::db::Poison;
use crate::runtime::derived::DerivedRelStore;
use crate::runtime::transact::SessionTx;
pub(crate) struct JsonReader;
impl AlgoImpl for JsonReader {
fn run(
&mut self,
_tx: &SessionTx,
algo: &MagicAlgoApply,
_stores: &BTreeMap<MagicSymbol, DerivedRelStore>,
out: &DerivedRelStore,
_poison: Poison,
) -> Result<()> {
let url = algo.string_option("url", None)?;
let json_lines = algo.bool_option("json_lines", Some(true))?;
let null_if_absent = algo.bool_option("null_if_absent", Some(false))?;
let prepend_index = algo.bool_option("prepend_index", Some(false))?;
#[derive(Error, Diagnostic, Debug)]
#[error("fields specification must be a list of strings")]
#[diagnostic(code(eval::algo_bad_fields))]
struct BadFields(#[label] SourceSpan);
let fields_expr = algo.expr_option("fields", None)?;
let fields_span = fields_expr.span();
let fields: Vec<_> = match fields_expr.eval_to_const()? {
DataValue::List(l) => l
.into_iter()
.map(|d| match d {
DataValue::Str(s) => Ok(s),
_ => Err(BadFields(fields_span)),
})
.try_collect()?,
_ => bail!(BadFields(fields_span)),
};
let mut counter = -1i64;
let mut process_row = |row: &JsonValue| -> Result<()> {
let mut ret = if prepend_index {
counter += 1;
vec![DataValue::from(counter)]
} else {
vec![]
};
for field in &fields {
let val = match row.get(field as &str) {
None => {
if null_if_absent {
DataValue::Null
} else {
bail!("field {} is absent from JSON line", field);
}
}
Some(v) => DataValue::from(v),
};
ret.push(val);
}
out.put(Tuple(ret), 0);
Ok(())
};
match url.strip_prefix("file://") {
Some(file_path) => {
if json_lines {
let file = File::open(file_path).into_diagnostic()?;
for line in io::BufReader::new(file).lines() {
let line = line.into_diagnostic()?;
let line = line.trim();
if !line.is_empty() {
let row = serde_json::from_str(&line).into_diagnostic()?;
process_row(&row)?;
}
}
} else {
let content = fs::read_to_string(file_path).into_diagnostic()?;
let data: JsonValue = serde_json::from_str(&content).into_diagnostic()?;
let rows = data
.as_array()
.ok_or_else(|| miette!("JSON file is not an array"))?;
for row in rows {
process_row(row)?;
}
}
}
None => {
let content = reqwest::blocking::get(&url as &str)
.into_diagnostic()?
.text()
.into_diagnostic()?;
if json_lines {
for line in content.lines() {
let line = line.trim();
if !line.is_empty() {
let row = serde_json::from_str(&line).into_diagnostic()?;
process_row(&row)?;
}
}
} else {
let data: JsonValue = serde_json::from_str(&content).into_diagnostic()?;
let rows = data
.as_array()
.ok_or_else(|| miette!("JSON file is not an array"))?;
for row in rows {
process_row(row)?;
}
}
}
}
Ok(())
}
}

@ -1,7 +1,7 @@
use std::collections::BTreeMap; use std::collections::BTreeMap;
use either::Either; use either::Either;
use miette::{bail, Diagnostic, ensure, Result}; use miette::{bail, ensure, Diagnostic, Result};
use smartstring::{LazyCompact, SmartString}; use smartstring::{LazyCompact, SmartString};
use thiserror::Error; use thiserror::Error;
@ -10,6 +10,7 @@ use crate::algo::astar::ShortestPathAStar;
use crate::algo::bfs::Bfs; use crate::algo::bfs::Bfs;
use crate::algo::degree_centrality::DegreeCentrality; use crate::algo::degree_centrality::DegreeCentrality;
use crate::algo::dfs::Dfs; use crate::algo::dfs::Dfs;
use crate::algo::jlines::JsonReader;
use crate::algo::kruskal::MinimumSpanningForestKruskal; use crate::algo::kruskal::MinimumSpanningForestKruskal;
use crate::algo::label_propagation::LabelPropagation; use crate::algo::label_propagation::LabelPropagation;
use crate::algo::louvain::CommunityDetectionLouvain; use crate::algo::louvain::CommunityDetectionLouvain;
@ -38,6 +39,7 @@ pub(crate) mod astar;
pub(crate) mod bfs; pub(crate) mod bfs;
pub(crate) mod degree_centrality; pub(crate) mod degree_centrality;
pub(crate) mod dfs; pub(crate) mod dfs;
pub(crate) mod jlines;
pub(crate) mod kruskal; pub(crate) mod kruskal;
pub(crate) mod label_propagation; pub(crate) mod label_propagation;
pub(crate) mod louvain; pub(crate) mod louvain;
@ -77,8 +79,13 @@ impl AlgoHandle {
&self, &self,
_args: Either<&[AlgoRuleArg], &[MagicAlgoRuleArg]>, _args: Either<&[AlgoRuleArg], &[MagicAlgoRuleArg]>,
opts: &BTreeMap<SmartString<LazyCompact>, Expr>, opts: &BTreeMap<SmartString<LazyCompact>, Expr>,
) -> Option<usize> { ) -> Result<usize> {
Some(match &self.name.name as &str { #[derive(Debug, Error, Diagnostic)]
#[error("Cannot determine arity for algo {0} since {1}")]
#[diagnostic(code(parser::no_algo_arity))]
struct CannotDetermineArity(String, String, #[label] SourceSpan);
Ok(match &self.name.name as &str {
"ClusteringCoefficients" => 4, "ClusteringCoefficients" => 4,
"DegreeCentrality" => 4, "DegreeCentrality" => 4,
"ClosenessCentrality" => 2, "ClosenessCentrality" => 2,
@ -97,18 +104,61 @@ impl AlgoHandle {
"CommunityDetectionLouvain" => 2, "CommunityDetectionLouvain" => 2,
"LabelPropagation" => 2, "LabelPropagation" => 2,
"RandomWalk" => 3, "RandomWalk" => 3,
"ReorderSort" => { n @ "ReorderSort" => {
let out_opts = opts.get("out")?; let out_opts = opts.get("out").ok_or_else(|| {
CannotDetermineArity(
n.to_string(),
"option 'out' not provided".to_string(),
self.name.span,
)
})?;
match out_opts { match out_opts {
Expr::Const { Expr::Const {
val: DataValue::List(l), val: DataValue::List(l),
.. ..
} => l.len() + 1, } => l.len() + 1,
Expr::Apply { op, args, .. } if **op == OP_LIST => args.len() + 1, Expr::Apply { op, args, .. } if **op == OP_LIST => args.len() + 1,
_ => return None, _ => bail!(CannotDetermineArity(
n.to_string(),
"invalid option 'out' given, expect a list".to_string(),
self.name.span
)),
}
}
n @ "JsonReader" => {
let with_row_num = match opts.get("prepend_index") {
None => 0,
Some(Expr::Const {
val: DataValue::Bool(true),
..
}) => 1,
Some(Expr::Const {
val: DataValue::Bool(false),
..
}) => 0,
_ => bail!(CannotDetermineArity(
n.to_string(),
"invalid option 'prepend_index' given, expect a boolean".to_string(),
self.name.span
)),
};
let fields = opts.get("fields").ok_or_else(|| {
CannotDetermineArity(
n.to_string(),
"option 'fields' not provided".to_string(),
self.name.span,
)
})?;
match fields.clone().eval_to_const()? {
DataValue::List(l) => l.len() + with_row_num,
_ => bail!(CannotDetermineArity(
n.to_string(),
"invalid option 'fields' given, expect a list".to_string(),
self.name.span
)),
} }
} }
_ => return None, n => bail!(AlgoNotFoundError(n.to_string(), self.name.span)),
}) })
} }
@ -135,6 +185,7 @@ impl AlgoHandle {
"LabelPropagation" => Box::new(LabelPropagation), "LabelPropagation" => Box::new(LabelPropagation),
"RandomWalk" => Box::new(RandomWalk), "RandomWalk" => Box::new(RandomWalk),
"ReorderSort" => Box::new(ReorderSort), "ReorderSort" => Box::new(ReorderSort),
"JsonReader" => Box::new(JsonReader),
name => bail!(AlgoNotFoundError(name.to_string(), self.name.span)), name => bail!(AlgoNotFoundError(name.to_string(), self.name.span)),
}) })
} }
@ -148,11 +199,11 @@ struct NotAnEdgeError(#[label] SourceSpan);
#[derive(Error, Diagnostic, Debug)] #[derive(Error, Diagnostic, Debug)]
#[error( #[error(
"The value {0:?} at the third position in the relation cannot be interpreted as edge weights" "The value {0:?} at the third position in the relation cannot be interpreted as edge weights"
)] )]
#[diagnostic(code(algo::invalid_edge_weight))] #[diagnostic(code(algo::invalid_edge_weight))]
#[diagnostic(help( #[diagnostic(help(
"Edge weights must be finite numbers. Some algorithm also requires positivity." "Edge weights must be finite numbers. Some algorithm also requires positivity."
))] ))]
struct BadEdgeWeightError(DataValue, #[label] SourceSpan); struct BadEdgeWeightError(DataValue, #[label] SourceSpan);
@ -165,7 +216,7 @@ struct RuleNotFoundError(String, #[label] SourceSpan);
#[error("Invalid reverse scanning of triples")] #[error("Invalid reverse scanning of triples")]
#[diagnostic(code(algo::invalid_reverse_triple_scan))] #[diagnostic(code(algo::invalid_reverse_triple_scan))]
#[diagnostic(help( #[diagnostic(help(
"Inverse scanning of triples requires the type to be 'ref', or the value be indexed" "Inverse scanning of triples requires the type to be 'ref', or the value be indexed"
))] ))]
struct InvalidInverseTripleUse(String, #[label] SourceSpan); struct InvalidInverseTripleUse(String, #[label] SourceSpan);
@ -173,7 +224,7 @@ struct InvalidInverseTripleUse(String, #[label] SourceSpan);
#[error("Required node with key {missing:?} not found")] #[error("Required node with key {missing:?} not found")]
#[diagnostic(code(algo::node_with_key_not_found))] #[diagnostic(code(algo::node_with_key_not_found))]
#[diagnostic(help( #[diagnostic(help(
"The relation is interpreted as a relation of nodes, but the required key is missing" "The relation is interpreted as a relation of nodes, but the required key is missing"
))] ))]
pub(crate) struct NodeNotFoundError { pub(crate) struct NodeNotFoundError {
pub(crate) missing: DataValue, pub(crate) missing: DataValue,

@ -8,7 +8,7 @@ use smallvec::SmallVec;
use smartstring::{LazyCompact, SmartString}; use smartstring::{LazyCompact, SmartString};
use thiserror::Error; use thiserror::Error;
use crate::algo::{AlgoHandle, AlgoNotFoundError}; use crate::algo::AlgoHandle;
use crate::data::aggr::Aggregation; use crate::data::aggr::Aggregation;
use crate::data::expr::Expr; use crate::data::expr::Expr;
use crate::data::symb::{Symbol, PROG_ENTRY}; use crate::data::symb::{Symbol, PROG_ENTRY};
@ -118,7 +118,7 @@ pub(crate) struct AlgoApply {
} }
impl AlgoApply { impl AlgoApply {
pub(crate) fn arity(&self) -> Option<usize> { pub(crate) fn arity(&self) -> Result<usize> {
self.algo.arity(Left(&self.rule_args), &self.options) self.algo.arity(Left(&self.rule_args), &self.options)
} }
} }
@ -164,7 +164,7 @@ pub(crate) struct WrongAlgoOptionError {
} }
impl MagicAlgoApply { impl MagicAlgoApply {
pub(crate) fn arity(&self) -> Option<usize> { pub(crate) fn arity(&self) -> Result<usize> {
self.algo.arity(Right(&self.rule_args), &self.options) self.algo.arity(Right(&self.rule_args), &self.options)
} }
pub(crate) fn relation_with_min_len( pub(crate) fn relation_with_min_len(
@ -484,11 +484,7 @@ impl InputProgram {
if let Some(entry) = self.prog.get(&Symbol::new(PROG_ENTRY, SourceSpan(0, 0))) { if let Some(entry) = self.prog.get(&Symbol::new(PROG_ENTRY, SourceSpan(0, 0))) {
return match entry { return match entry {
InputRulesOrAlgo::Rules { rules } => Ok(rules.last().unwrap().head.len()), InputRulesOrAlgo::Rules { rules } => Ok(rules.last().unwrap().head.len()),
InputRulesOrAlgo::Algo { algo: algo_apply } => { InputRulesOrAlgo::Algo { algo: algo_apply } => algo_apply.arity(),
algo_apply.arity().ok_or_else(|| {
AlgoNotFoundError(algo_apply.algo.name.to_string(), algo_apply.span).into()
})
}
}; };
} }
@ -678,8 +674,8 @@ impl Default for MagicRulesOrAlgo {
} }
impl MagicRulesOrAlgo { impl MagicRulesOrAlgo {
pub(crate) fn arity(&self) -> Option<usize> { pub(crate) fn arity(&self) -> Result<usize> {
Some(match self { Ok(match self {
MagicRulesOrAlgo::Rules { rules } => rules.first().unwrap().head.len(), MagicRulesOrAlgo::Rules { rules } => rules.first().unwrap().head.len(),
MagicRulesOrAlgo::Algo { algo } => algo.arity()?, MagicRulesOrAlgo::Algo { algo } => algo.arity()?,
}) })

@ -9,7 +9,7 @@ use miette::{bail, ensure, Diagnostic, LabeledSpan, Report, Result};
use smartstring::{LazyCompact, SmartString}; use smartstring::{LazyCompact, SmartString};
use thiserror::Error; use thiserror::Error;
use crate::algo::{AlgoHandle, AlgoNotFoundError}; use crate::algo::AlgoHandle;
use crate::data::aggr::{parse_aggr, Aggregation}; use crate::data::aggr::{parse_aggr, Aggregation};
use crate::data::expr::Expr; use crate::data::expr::Expr;
use crate::data::program::{ use crate::data::program::{
@ -468,11 +468,16 @@ pub(crate) fn parse_query(
if let Some((handle, RelationOp::Create)) = &prog.out_opts.store_relation { if let Some((handle, RelationOp::Create)) = &prog.out_opts.store_relation {
let mut bindings = handle.dep_bindings.clone(); let mut bindings = handle.dep_bindings.clone();
bindings.extend_from_slice(&handle.key_bindings); bindings.extend_from_slice(&handle.key_bindings);
prog.const_rules.insert(MagicSymbol::Muggle {inner: Symbol::new(PROG_ENTRY, Default::default())}, ConstRule { prog.const_rules.insert(
bindings, MagicSymbol::Muggle {
data: vec![], inner: Symbol::new(PROG_ENTRY, Default::default()),
span: Default::default() },
}); ConstRule {
bindings,
data: vec![],
span: Default::default(),
},
);
} }
} }
@ -805,9 +810,7 @@ fn parse_algo_rule(
} }
let algo = AlgoHandle::new(algo_name, name_pair.extract_span()); let algo = AlgoHandle::new(algo_name, name_pair.extract_span());
let algo_arity = algo let algo_arity = algo.arity(Left(&rule_args), &options)?;
.arity(Left(&rule_args), &options)
.ok_or_else(|| AlgoNotFoundError(algo.name.to_string(), algo.name.span))?;
#[derive(Debug, Error, Diagnostic)] #[derive(Debug, Error, Diagnostic)]
#[error("Algorithm rule head arity mismatch")] #[error("Algorithm rule head arity mismatch")]

@ -4,7 +4,6 @@ use itertools::Itertools;
use miette::{Context, Diagnostic, ensure, Result}; use miette::{Context, Diagnostic, ensure, Result};
use thiserror::Error; use thiserror::Error;
use crate::algo::AlgoNotFoundError;
use crate::data::aggr::Aggregation; use crate::data::aggr::Aggregation;
use crate::data::expr::Expr; use crate::data::expr::Expr;
use crate::data::program::{ use crate::data::program::{
@ -108,9 +107,7 @@ impl SessionTx {
name.clone(), name.clone(),
self.new_rule_store( self.new_rule_store(
name.clone(), name.clone(),
ruleset.arity().ok_or_else(|| { ruleset.arity()?,
AlgoNotFoundError(name.symbol().to_string(), name.symbol().span)
})?,
), ),
); );
} }

Loading…
Cancel
Save