migrated to new stores

main
Ziyang Hu 2 years ago
parent 9cebe63772
commit 53ada74a52

@ -25,7 +25,7 @@ use crate::data::symb::Symbol;
use crate::data::value::DataValue;
use crate::parse::SourceSpan;
use crate::runtime::db::Poison;
use crate::runtime::in_mem::InMemRelation;
use crate::runtime::temp_store::{EpochStore, NormalTempStore};
use crate::runtime::transact::SessionTx;
pub(crate) struct BetweennessCentrality;
@ -35,8 +35,8 @@ impl AlgoImpl for BetweennessCentrality {
&mut self,
tx: &'a SessionTx<'_>,
algo: &'a MagicAlgoApply,
stores: &'a BTreeMap<MagicSymbol, InMemRelation>,
out: &'a InMemRelation,
stores: &'a BTreeMap<MagicSymbol, EpochStore>,
out: &'a mut NormalTempStore,
poison: Poison,
) -> Result<()> {
let edges = algo.relation(0)?;
@ -86,7 +86,7 @@ impl AlgoImpl for BetweennessCentrality {
for (i, s) in centrality.into_iter().enumerate() {
let node = indices[i].clone();
out.put(vec![node, s.into()], 0);
out.put(vec![node, s.into()]);
}
Ok(())
@ -109,8 +109,8 @@ impl AlgoImpl for ClosenessCentrality {
&mut self,
tx: &'a SessionTx<'_>,
algo: &'a MagicAlgoApply,
stores: &'a BTreeMap<MagicSymbol, InMemRelation>,
out: &'a InMemRelation,
stores: &'a BTreeMap<MagicSymbol, EpochStore>,
out: &'a mut NormalTempStore,
poison: Poison,
) -> Result<()> {
let edges = algo.relation(0)?;
@ -137,7 +137,7 @@ impl AlgoImpl for ClosenessCentrality {
})
.collect::<Result<_>>()?;
for (idx, centrality) in res.into_iter().enumerate() {
out.put(vec![indices[idx].clone(), DataValue::from(centrality)], 0);
out.put(vec![indices[idx].clone(), DataValue::from(centrality)]);
poison.check()?;
}
Ok(())

@ -22,7 +22,7 @@ use crate::data::tuple::Tuple;
use crate::data::value::DataValue;
use crate::parse::SourceSpan;
use crate::runtime::db::Poison;
use crate::runtime::in_mem::InMemRelation;
use crate::runtime::temp_store::{EpochStore, NormalTempStore};
use crate::runtime::transact::SessionTx;
pub(crate) struct ShortestPathAStar;
@ -32,8 +32,8 @@ impl AlgoImpl for ShortestPathAStar {
&mut self,
tx: &'a SessionTx<'_>,
algo: &'a MagicAlgoApply,
stores: &'a BTreeMap<MagicSymbol, InMemRelation>,
out: &'a InMemRelation,
stores: &'a BTreeMap<MagicSymbol, EpochStore>,
out: &'a mut NormalTempStore,
poison: Poison,
) -> Result<()> {
let edges = algo.relation_with_min_len(0, 3, tx, stores)?;
@ -60,15 +60,12 @@ impl AlgoImpl for ShortestPathAStar {
stores,
poison.clone(),
)?;
out.put(
vec![
start[0].clone(),
goal[0].clone(),
DataValue::from(cost),
DataValue::List(path),
],
0,
);
out.put(vec![
start[0].clone(),
goal[0].clone(),
DataValue::from(cost),
DataValue::List(path),
]);
}
}
@ -92,7 +89,7 @@ fn astar<'a>(
nodes: &'a MagicAlgoRuleArg,
heuristic: &Expr,
tx: &'a SessionTx<'_>,
stores: &'a BTreeMap<MagicSymbol, InMemRelation>,
stores: &'a BTreeMap<MagicSymbol, EpochStore>,
poison: Poison,
) -> Result<(f64, Vec<DataValue>)> {
let start_node = &starting[0];

@ -18,7 +18,7 @@ use crate::data::symb::Symbol;
use crate::data::value::DataValue;
use crate::parse::SourceSpan;
use crate::runtime::db::Poison;
use crate::runtime::in_mem::InMemRelation;
use crate::runtime::temp_store::{EpochStore, NormalTempStore};
use crate::runtime::transact::SessionTx;
pub(crate) struct Bfs;
@ -28,8 +28,8 @@ impl AlgoImpl for Bfs {
&mut self,
tx: &'a SessionTx<'_>,
algo: &'a MagicAlgoApply,
stores: &'a BTreeMap<MagicSymbol, InMemRelation>,
out: &'a InMemRelation,
stores: &'a BTreeMap<MagicSymbol, EpochStore>,
out: &'a mut NormalTempStore,
poison: Poison,
) -> Result<()> {
let edges = algo.relation_with_min_len(0, 2, tx, stores)?;
@ -103,7 +103,7 @@ impl AlgoImpl for Bfs {
route.push(starting.clone());
route.reverse();
let tuple = vec![starting, ending, DataValue::List(route)];
out.put(tuple, 0);
out.put(tuple);
}
Ok(())
}

@ -19,7 +19,7 @@ use crate::data::symb::Symbol;
use crate::data::value::DataValue;
use crate::parse::SourceSpan;
use crate::runtime::db::Poison;
use crate::runtime::in_mem::InMemRelation;
use crate::runtime::temp_store::{EpochStore, NormalTempStore};
use crate::runtime::transact::SessionTx;
pub(crate) struct Constant;
@ -29,15 +29,15 @@ impl AlgoImpl for Constant {
&mut self,
_tx: &SessionTx<'_>,
algo: &MagicAlgoApply,
_stores: &BTreeMap<MagicSymbol, InMemRelation>,
out: &InMemRelation,
_stores: &BTreeMap<MagicSymbol, EpochStore>,
out: &mut NormalTempStore,
_poison: Poison,
) -> Result<()> {
let data = algo.expr_option("data", None).unwrap();
let data = data.get_const().unwrap().get_list().unwrap();
for row in data {
let tuple = row.get_list().unwrap().into();
out.put(tuple, 0)
out.put(tuple)
}
Ok(())
}

@ -25,7 +25,7 @@ use crate::data::symb::Symbol;
use crate::data::value::DataValue;
use crate::parse::{parse_type, SourceSpan};
use crate::runtime::db::Poison;
use crate::runtime::in_mem::InMemRelation;
use crate::runtime::temp_store::{EpochStore, NormalTempStore};
use crate::runtime::transact::SessionTx;
pub(crate) struct CsvReader;
@ -35,8 +35,8 @@ impl AlgoImpl for CsvReader {
&mut self,
_tx: &SessionTx<'_>,
algo: &MagicAlgoApply,
_stores: &BTreeMap<MagicSymbol, InMemRelation>,
out: &InMemRelation,
_stores: &BTreeMap<MagicSymbol, EpochStore>,
out: &mut NormalTempStore,
_poison: Poison,
) -> Result<()> {
let delimiter = algo.string_option("delimiter", Some(","))?;
@ -148,7 +148,7 @@ impl AlgoImpl for CsvReader {
}
}
}
out.put(out_tuple, 0);
out.put(out_tuple);
Ok(())
};

@ -18,7 +18,7 @@ use crate::data::symb::Symbol;
use crate::data::value::DataValue;
use crate::parse::SourceSpan;
use crate::runtime::db::Poison;
use crate::runtime::in_mem::InMemRelation;
use crate::runtime::temp_store::{EpochStore, NormalTempStore};
use crate::runtime::transact::SessionTx;
pub(crate) struct DegreeCentrality;
@ -28,8 +28,8 @@ impl AlgoImpl for DegreeCentrality {
&mut self,
tx: &'a SessionTx<'_>,
algo: &'a MagicAlgoApply,
stores: &'a BTreeMap<MagicSymbol, InMemRelation>,
out: &'a InMemRelation,
stores: &'a BTreeMap<MagicSymbol, EpochStore>,
out: &'a mut NormalTempStore,
poison: Poison,
) -> Result<()> {
let it = algo
@ -66,8 +66,7 @@ impl AlgoImpl for DegreeCentrality {
DataValue::from(out_d as i64),
DataValue::from(in_d as i64),
];
out.put(tuple, 0);
poison.check()?;
out.put(tuple);
}
Ok(())
}

@ -18,7 +18,7 @@ use crate::data::symb::Symbol;
use crate::data::value::DataValue;
use crate::parse::SourceSpan;
use crate::runtime::db::Poison;
use crate::runtime::in_mem::InMemRelation;
use crate::runtime::temp_store::{EpochStore, NormalTempStore};
use crate::runtime::transact::SessionTx;
pub(crate) struct Dfs;
@ -28,8 +28,8 @@ impl AlgoImpl for Dfs {
&mut self,
tx: &'a SessionTx<'_>,
algo: &'a MagicAlgoApply,
stores: &'a BTreeMap<MagicSymbol, InMemRelation>,
out: &'a InMemRelation,
stores: &'a BTreeMap<MagicSymbol, EpochStore>,
out: &'a mut NormalTempStore,
poison: Poison,
) -> Result<()> {
let edges = algo.relation_with_min_len(0, 2, tx, stores)?;
@ -105,7 +105,7 @@ impl AlgoImpl for Dfs {
route.push(starting.clone());
route.reverse();
let tuple = vec![starting, ending, DataValue::List(route)];
out.put(tuple, 0);
out.put(tuple);
poison.check()?;
}
Ok(())

@ -28,7 +28,7 @@ use crate::data::symb::Symbol;
use crate::data::value::DataValue;
use crate::parse::SourceSpan;
use crate::runtime::db::Poison;
use crate::runtime::in_mem::InMemRelation;
use crate::runtime::temp_store::{EpochStore, NormalTempStore};
use crate::runtime::transact::SessionTx;
pub(crate) struct JsonReader;
@ -38,8 +38,8 @@ impl AlgoImpl for JsonReader {
&mut self,
_tx: &SessionTx<'_>,
algo: &MagicAlgoApply,
_stores: &BTreeMap<MagicSymbol, InMemRelation>,
out: &InMemRelation,
_stores: &BTreeMap<MagicSymbol, EpochStore>,
out: &mut NormalTempStore,
_poison: Poison,
) -> Result<()> {
let url = algo.string_option("url", None)?;
@ -85,7 +85,7 @@ impl AlgoImpl for JsonReader {
};
ret.push(val);
}
out.put(ret, 0);
out.put(ret);
Ok(())
};
match url.strip_prefix("file://") {

@ -22,7 +22,7 @@ use crate::data::symb::Symbol;
use crate::data::value::DataValue;
use crate::parse::SourceSpan;
use crate::runtime::db::Poison;
use crate::runtime::in_mem::InMemRelation;
use crate::runtime::temp_store::{EpochStore, NormalTempStore};
use crate::runtime::transact::SessionTx;
pub(crate) struct MinimumSpanningForestKruskal;
@ -32,8 +32,8 @@ impl AlgoImpl for MinimumSpanningForestKruskal {
&mut self,
tx: &'a SessionTx<'_>,
algo: &'a MagicAlgoApply,
stores: &'a BTreeMap<MagicSymbol, InMemRelation>,
out: &'a InMemRelation,
stores: &'a BTreeMap<MagicSymbol, EpochStore>,
out: &'a mut NormalTempStore,
poison: Poison,
) -> Result<()> {
let edges = algo.relation(0)?;
@ -44,14 +44,11 @@ impl AlgoImpl for MinimumSpanningForestKruskal {
}
let msp = kruskal(&graph, poison)?;
for (src, dst, cost) in msp {
out.put(
vec![
indices[src].clone(),
indices[dst].clone(),
DataValue::from(cost),
],
0,
);
out.put(vec![
indices[src].clone(),
indices[dst].clone(),
DataValue::from(cost),
]);
}
Ok(())

@ -20,7 +20,7 @@ use crate::data::symb::Symbol;
use crate::data::value::DataValue;
use crate::parse::SourceSpan;
use crate::runtime::db::Poison;
use crate::runtime::in_mem::InMemRelation;
use crate::runtime::temp_store::{EpochStore, NormalTempStore};
use crate::runtime::transact::SessionTx;
pub(crate) struct LabelPropagation;
@ -30,8 +30,8 @@ impl AlgoImpl for LabelPropagation {
&mut self,
tx: &'a SessionTx<'_>,
algo: &'a MagicAlgoApply,
stores: &'a BTreeMap<MagicSymbol, InMemRelation>,
out: &'a InMemRelation,
stores: &'a BTreeMap<MagicSymbol, EpochStore>,
out: &'a mut NormalTempStore,
poison: Poison,
) -> Result<()> {
let edges = algo.relation(0)?;
@ -42,7 +42,7 @@ impl AlgoImpl for LabelPropagation {
let labels = label_propagation(&graph, max_iter, poison)?;
for (idx, label) in labels.into_iter().enumerate() {
let node = indices[idx].clone();
out.put(vec![DataValue::from(label as i64), node], 0);
out.put(vec![DataValue::from(label as i64), node]);
}
Ok(())
}

@ -20,7 +20,7 @@ use crate::data::symb::Symbol;
use crate::data::value::DataValue;
use crate::parse::SourceSpan;
use crate::runtime::db::Poison;
use crate::runtime::in_mem::InMemRelation;
use crate::runtime::temp_store::{EpochStore, NormalTempStore};
use crate::runtime::transact::SessionTx;
pub(crate) struct CommunityDetectionLouvain;
@ -30,8 +30,8 @@ impl AlgoImpl for CommunityDetectionLouvain {
&mut self,
tx: &'a SessionTx<'_>,
algo: &'a MagicAlgoApply,
stores: &'a BTreeMap<MagicSymbol, InMemRelation>,
out: &'a InMemRelation,
stores: &'a BTreeMap<MagicSymbol, EpochStore>,
out: &'a mut NormalTempStore,
poison: Poison,
) -> Result<()> {
let edges = algo.relation(0)?;
@ -65,7 +65,7 @@ impl AlgoImpl for CommunityDetectionLouvain {
if let Some(l) = keep_depth {
labels.truncate(l);
}
out.put(vec![DataValue::List(labels), node], 0);
out.put(vec![DataValue::List(labels), node]);
}
Ok(())

@ -55,7 +55,7 @@ use crate::data::tuple::TupleIter;
use crate::data::value::DataValue;
use crate::parse::SourceSpan;
use crate::runtime::db::Poison;
use crate::runtime::in_mem::InMemRelation;
use crate::runtime::temp_store::{EpochStore, NormalTempStore};
use crate::runtime::transact::SessionTx;
#[cfg(feature = "graph-algo")]
@ -99,8 +99,8 @@ pub(crate) trait AlgoImpl {
&mut self,
tx: &'a SessionTx<'_>,
algo: &'a MagicAlgoApply,
stores: &'a BTreeMap<MagicSymbol, InMemRelation>,
out: &'a InMemRelation,
stores: &'a BTreeMap<MagicSymbol, EpochStore>,
out: &'a mut NormalTempStore,
poison: Poison,
) -> Result<()>;
fn arity(
@ -250,7 +250,7 @@ impl MagicAlgoRuleArg {
undirected: bool,
allow_negative_edges: bool,
tx: &'a SessionTx<'_>,
stores: &'a BTreeMap<MagicSymbol, InMemRelation>,
stores: &'a BTreeMap<MagicSymbol, EpochStore>,
) -> Result<(
Vec<Vec<(usize, f64)>>,
Vec<DataValue>,
@ -335,7 +335,7 @@ impl MagicAlgoRuleArg {
&'a self,
undirected: bool,
tx: &'a SessionTx<'_>,
stores: &'a BTreeMap<MagicSymbol, InMemRelation>,
stores: &'a BTreeMap<MagicSymbol, EpochStore>,
) -> Result<(Vec<Vec<usize>>, Vec<DataValue>, BTreeMap<DataValue, usize>)> {
let mut graph: Vec<Vec<usize>> = vec![];
let mut indices: Vec<DataValue> = vec![];
@ -375,7 +375,7 @@ impl MagicAlgoRuleArg {
&'a self,
prefix: &DataValue,
tx: &'a SessionTx<'_>,
stores: &'a BTreeMap<MagicSymbol, InMemRelation>,
stores: &'a BTreeMap<MagicSymbol, EpochStore>,
) -> Result<TupleIter<'a>> {
Ok(match self {
MagicAlgoRuleArg::InMem { name, .. } => {
@ -383,7 +383,7 @@ impl MagicAlgoRuleArg {
RuleNotFoundError(name.symbol().to_string(), name.symbol().span)
})?;
let t = vec![prefix.clone()];
Box::new(store.scan_prefix(&t))
Box::new(store.prefix_iter(&t).map(|t| Ok(t.into_tuple())))
}
MagicAlgoRuleArg::Stored { name, .. } => {
let relation = tx.get_relation(name, false)?;
@ -395,7 +395,7 @@ impl MagicAlgoRuleArg {
pub(crate) fn arity(
&self,
tx: &SessionTx<'_>,
stores: &BTreeMap<MagicSymbol, InMemRelation>,
stores: &BTreeMap<MagicSymbol, EpochStore>,
) -> Result<usize> {
Ok(match self {
MagicAlgoRuleArg::InMem { name, .. } => {
@ -413,14 +413,14 @@ impl MagicAlgoRuleArg {
pub(crate) fn iter<'a>(
&'a self,
tx: &'a SessionTx<'_>,
stores: &'a BTreeMap<MagicSymbol, InMemRelation>,
stores: &'a BTreeMap<MagicSymbol, EpochStore>,
) -> Result<TupleIter<'a>> {
Ok(match self {
MagicAlgoRuleArg::InMem { name, .. } => {
let store = stores.get(name).ok_or_else(|| {
RuleNotFoundError(name.symbol().to_string(), name.symbol().span)
})?;
Box::new(store.scan_all())
Box::new(store.all_iter().map(|t| Ok(t.into_tuple())))
}
MagicAlgoRuleArg::Stored { name, .. } => {
let relation = tx.get_relation(name, false)?;

@ -24,7 +24,7 @@ use crate::data::symb::Symbol;
use crate::data::value::DataValue;
use crate::parse::SourceSpan;
use crate::runtime::db::Poison;
use crate::runtime::in_mem::InMemRelation;
use crate::runtime::temp_store::{EpochStore, NormalTempStore};
use crate::runtime::transact::SessionTx;
pub(crate) struct PageRank;
@ -34,8 +34,8 @@ impl AlgoImpl for PageRank {
&mut self,
tx: &'a SessionTx<'_>,
algo: &'a MagicAlgoApply,
stores: &'a BTreeMap<MagicSymbol, InMemRelation>,
out: &'a InMemRelation,
stores: &'a BTreeMap<MagicSymbol, EpochStore>,
out: &'a mut NormalTempStore,
_poison: Poison,
) -> Result<()> {
let edges = algo.relation(0)?;
@ -64,10 +64,7 @@ impl AlgoImpl for PageRank {
);
for (idx, score) in ranks.iter().enumerate() {
out.put(
vec![indices[idx].clone(), DataValue::from(*score as f64)],
0,
);
out.put(vec![indices[idx].clone(), DataValue::from(*score as f64)]);
}
}
#[cfg(not(feature = "rayon"))]

@ -23,7 +23,7 @@ use crate::data::symb::Symbol;
use crate::data::value::DataValue;
use crate::parse::SourceSpan;
use crate::runtime::db::Poison;
use crate::runtime::in_mem::InMemRelation;
use crate::runtime::temp_store::{EpochStore, NormalTempStore};
use crate::runtime::transact::SessionTx;
pub(crate) struct MinimumSpanningTreePrim;
@ -33,8 +33,8 @@ impl AlgoImpl for MinimumSpanningTreePrim {
&mut self,
tx: &'a SessionTx<'_>,
algo: &'a MagicAlgoApply,
stores: &'a BTreeMap<MagicSymbol, InMemRelation>,
out: &'a InMemRelation,
stores: &'a BTreeMap<MagicSymbol, EpochStore>,
out: &'a mut NormalTempStore,
poison: Poison,
) -> Result<()> {
let edges = algo.relation(0)?;
@ -67,14 +67,11 @@ impl AlgoImpl for MinimumSpanningTreePrim {
};
let msp = prim(&graph, starting, poison)?;
for (src, dst, cost) in msp {
out.put(
vec![
indices[src].clone(),
indices[dst].clone(),
DataValue::from(cost),
],
0,
);
out.put(vec![
indices[src].clone(),
indices[dst].clone(),
DataValue::from(cost),
]);
}
Ok(())
}

@ -21,7 +21,7 @@ use crate::data::symb::Symbol;
use crate::data::value::DataValue;
use crate::parse::SourceSpan;
use crate::runtime::db::Poison;
use crate::runtime::in_mem::InMemRelation;
use crate::runtime::temp_store::{EpochStore, NormalTempStore};
use crate::runtime::transact::SessionTx;
pub(crate) struct RandomWalk;
@ -31,8 +31,8 @@ impl AlgoImpl for RandomWalk {
&mut self,
tx: &'a SessionTx<'_>,
algo: &'a MagicAlgoApply,
stores: &'a BTreeMap<MagicSymbol, InMemRelation>,
out: &'a InMemRelation,
stores: &'a BTreeMap<MagicSymbol, EpochStore>,
out: &'a mut NormalTempStore,
poison: Poison,
) -> Result<()> {
let edges = algo.relation_with_min_len(0, 2, tx, stores)?;
@ -118,14 +118,11 @@ impl AlgoImpl for RandomWalk {
})??;
poison.check()?;
}
out.put(
vec![
DataValue::from(counter),
start_node_key.clone(),
DataValue::List(path),
],
0,
);
out.put(vec![
DataValue::from(counter),
start_node_key.clone(),
DataValue::List(path),
]);
}
}
Ok(())

@ -20,7 +20,7 @@ use crate::data::symb::Symbol;
use crate::data::value::DataValue;
use crate::parse::SourceSpan;
use crate::runtime::db::Poison;
use crate::runtime::in_mem::InMemRelation;
use crate::runtime::temp_store::{EpochStore, NormalTempStore};
use crate::runtime::transact::SessionTx;
pub(crate) struct ReorderSort;
@ -30,8 +30,8 @@ impl AlgoImpl for ReorderSort {
&mut self,
tx: &'a SessionTx<'_>,
algo: &'a MagicAlgoApply,
stores: &'a BTreeMap<MagicSymbol, InMemRelation>,
out: &'a InMemRelation,
stores: &'a BTreeMap<MagicSymbol, EpochStore>,
out: &'a mut NormalTempStore,
poison: Poison,
) -> Result<()> {
let in_rel = algo.relation(0)?;
@ -116,7 +116,7 @@ impl AlgoImpl for ReorderSort {
}
let mut out_t = vec![DataValue::from(if break_ties { count } else { rank } as i64)];
out_t.extend_from_slice(&val[0..val.len() - 1]);
out.put(out_t, 0);
out.put(out_t);
poison.check()?;
}
Ok(())

@ -26,7 +26,7 @@ use crate::data::symb::Symbol;
use crate::data::value::DataValue;
use crate::parse::SourceSpan;
use crate::runtime::db::Poison;
use crate::runtime::in_mem::InMemRelation;
use crate::runtime::temp_store::{EpochStore, NormalTempStore};
use crate::runtime::transact::SessionTx;
pub(crate) struct ShortestPathDijkstra;
@ -36,8 +36,8 @@ impl AlgoImpl for ShortestPathDijkstra {
&mut self,
tx: &'a SessionTx<'_>,
algo: &'a MagicAlgoApply,
stores: &'a BTreeMap<MagicSymbol, InMemRelation>,
out: &'a InMemRelation,
stores: &'a BTreeMap<MagicSymbol, EpochStore>,
out: &'a mut NormalTempStore,
poison: Poison,
) -> Result<()> {
let edges = algo.relation(0)?;
@ -97,7 +97,7 @@ impl AlgoImpl for ShortestPathDijkstra {
DataValue::from(cost),
DataValue::List(path.into_iter().map(|u| indices[u].clone()).collect_vec()),
];
out.put(t, 0)
out.put(t)
}
}
} else {
@ -144,7 +144,7 @@ impl AlgoImpl for ShortestPathDijkstra {
DataValue::from(cost),
DataValue::List(path.into_iter().map(|u| indices[u].clone()).collect_vec()),
];
out.put(t, 0)
out.put(t)
}
}
}

@ -22,7 +22,7 @@ use crate::data::tuple::Tuple;
use crate::data::value::DataValue;
use crate::parse::SourceSpan;
use crate::runtime::db::Poison;
use crate::runtime::in_mem::InMemRelation;
use crate::runtime::temp_store::{EpochStore, NormalTempStore};
use crate::runtime::transact::SessionTx;
#[cfg(feature = "graph-algo")]
@ -42,8 +42,8 @@ impl AlgoImpl for StronglyConnectedComponent {
&mut self,
tx: &'a SessionTx<'_>,
algo: &'a MagicAlgoApply,
stores: &'a BTreeMap<MagicSymbol, InMemRelation>,
out: &'a InMemRelation,
stores: &'a BTreeMap<MagicSymbol, EpochStore>,
out: &'a mut NormalTempStore,
poison: Poison,
) -> Result<()> {
let edges = algo.relation(0)?;
@ -56,7 +56,7 @@ impl AlgoImpl for StronglyConnectedComponent {
for idx in cc {
let val = indices.get(*idx).unwrap();
let tuple = vec![val.clone(), DataValue::from(grp_id as i64)];
out.put(tuple, 0);
out.put(tuple);
}
}
@ -69,7 +69,7 @@ impl AlgoImpl for StronglyConnectedComponent {
if !inv_indices.contains_key(&node) {
inv_indices.insert(node.clone(), usize::MAX);
let tuple = vec![node, DataValue::from(counter)];
out.put(tuple, 0);
out.put(tuple);
counter += 1;
}
}

@ -18,7 +18,7 @@ use crate::data::symb::Symbol;
use crate::data::value::DataValue;
use crate::parse::SourceSpan;
use crate::runtime::db::Poison;
use crate::runtime::in_mem::InMemRelation;
use crate::runtime::temp_store::{EpochStore, NormalTempStore};
use crate::runtime::transact::SessionTx;
pub(crate) struct TopSort;
@ -28,8 +28,8 @@ impl AlgoImpl for TopSort {
&mut self,
tx: &'a SessionTx<'_>,
algo: &'a MagicAlgoApply,
stores: &'a BTreeMap<MagicSymbol, InMemRelation>,
out: &'a InMemRelation,
stores: &'a BTreeMap<MagicSymbol, EpochStore>,
out: &'a mut NormalTempStore,
poison: Poison,
) -> Result<()> {
let edges = algo.relation(0)?;
@ -41,7 +41,7 @@ impl AlgoImpl for TopSort {
for (idx, val_id) in sorted.iter().enumerate() {
let val = indices.get(*val_id).unwrap();
let tuple = vec![DataValue::from(idx as i64), val.clone()];
out.put(tuple, 0);
out.put(tuple);
}
Ok(())

@ -20,7 +20,7 @@ use crate::data::symb::Symbol;
use crate::data::value::DataValue;
use crate::parse::SourceSpan;
use crate::runtime::db::Poison;
use crate::runtime::in_mem::InMemRelation;
use crate::runtime::temp_store::{EpochStore, NormalTempStore};
use crate::runtime::transact::SessionTx;
pub(crate) struct ClusteringCoefficients;
@ -30,8 +30,8 @@ impl AlgoImpl for ClusteringCoefficients {
&mut self,
tx: &'a SessionTx<'_>,
algo: &'a MagicAlgoApply,
stores: &'a BTreeMap<MagicSymbol, InMemRelation>,
out: &'a InMemRelation,
stores: &'a BTreeMap<MagicSymbol, EpochStore>,
out: &'a mut NormalTempStore,
poison: Poison,
) -> Result<()> {
let edges = algo.relation(0)?;
@ -40,15 +40,12 @@ impl AlgoImpl for ClusteringCoefficients {
graph.into_iter().map(|e| e.into_iter().collect()).collect();
let coefficients = clustering_coefficients(&graph, poison)?;
for (idx, (cc, n_triangles, degree)) in coefficients.into_iter().enumerate() {
out.put(
vec![
indices[idx].clone(),
DataValue::from(cc),
DataValue::from(n_triangles as i64),
DataValue::from(degree as i64),
],
0,
);
out.put(vec![
indices[idx].clone(),
DataValue::from(cc),
DataValue::from(n_triangles as i64),
DataValue::from(degree as i64),
]);
}
Ok(())

@ -22,7 +22,7 @@ use crate::data::symb::Symbol;
use crate::data::value::DataValue;
use crate::parse::SourceSpan;
use crate::runtime::db::Poison;
use crate::runtime::in_mem::InMemRelation;
use crate::runtime::temp_store::{EpochStore, NormalTempStore};
use crate::runtime::transact::SessionTx;
pub(crate) struct KShortestPathYen;
@ -32,8 +32,8 @@ impl AlgoImpl for KShortestPathYen {
&mut self,
tx: &'a SessionTx<'_>,
algo: &'a MagicAlgoApply,
stores: &'a BTreeMap<MagicSymbol, InMemRelation>,
out: &'a InMemRelation,
stores: &'a BTreeMap<MagicSymbol, EpochStore>,
out: &'a mut NormalTempStore,
poison: Poison,
) -> Result<()> {
let edges = algo.relation(0)?;
@ -75,7 +75,7 @@ impl AlgoImpl for KShortestPathYen {
path.into_iter().map(|u| indices[u].clone()).collect_vec(),
),
];
out.put(t, 0)
out.put(t)
}
}
}
@ -106,7 +106,7 @@ impl AlgoImpl for KShortestPathYen {
DataValue::from(cost),
DataValue::List(path.into_iter().map(|u| indices[u].clone()).collect_vec()),
];
out.put(t, 0)
out.put(t)
}
}
}

@ -1206,7 +1206,7 @@ impl Aggregation {
name if name == AGGR_INTERSECTION.name => Box::new(MeetAggrIntersection),
name if name == AGGR_SHORTEST.name => Box::new(MeetAggrShortest),
name if name == AGGR_MIN_COST.name => Box::new(MeetAggrMinCost),
_ => unreachable!(),
name => unreachable!("{}", name),
});
Ok(())
}

@ -49,7 +49,6 @@ fn ensure_same_value_type(a: &DataValue, b: &DataValue) -> Result<()> {
| (Regex(_), Regex(_))
| (List(_), List(_))
| (Set(_), Set(_))
| (Guard, Guard)
| (Bot, Bot)
) {
bail!(
@ -1272,7 +1271,6 @@ pub(crate) fn op_to_bool(args: &[DataValue]) -> Result<DataValue> {
DataValue::Regex(r) => !r.0.as_str().is_empty(),
DataValue::List(l) => !l.is_empty(),
DataValue::Set(s) => !s.is_empty(),
DataValue::Guard => false,
DataValue::Bot => false,
}))
}
@ -1290,7 +1288,6 @@ pub(crate) fn op_to_unity(args: &[DataValue]) -> Result<DataValue> {
DataValue::Regex(r) => if r.0.as_str().is_empty() {0 } else { 1},
DataValue::List(l) => if l.is_empty() {0} else {1},
DataValue::Set(s) => if s.is_empty() {0} else {1},
DataValue::Guard => 0,
DataValue::Bot => 0,
}))
}

@ -89,7 +89,6 @@ impl From<DataValue> for JsonValue {
JsonValue::Array(l.iter().map(|v| JsonValue::from(v.clone())).collect())
}
DataValue::Bot => panic!("found bottom"),
DataValue::Guard => panic!("found guard"),
DataValue::Set(l) => {
JsonValue::Array(l.iter().map(|v| JsonValue::from(v.clone())).collect())
}

@ -26,7 +26,6 @@ const UUID_TAG: u8 = 0x08;
const REGEX_TAG: u8 = 0x09;
const LIST_TAG: u8 = 0x0A;
const SET_TAG: u8 = 0x0B;
const GUARD_TAG: u8 = 0xFE;
const BOT_TAG: u8 = 0xFF;
const IS_FLOAT: u8 = 0b00010000;
@ -79,7 +78,6 @@ pub(crate) trait MemCmpEncoder: Write {
}
self.write_u8(INIT_TAG).unwrap()
}
DataValue::Guard => self.write_u8(GUARD_TAG).unwrap(),
DataValue::Bot => self.write_u8(BOT_TAG).unwrap(),
}
}
@ -271,7 +269,6 @@ impl DataValue {
}
(DataValue::Set(collected), &remaining[1..])
}
GUARD_TAG => (DataValue::Guard, remaining),
BOT_TAG => (DataValue::Bot, remaining),
_ => unreachable!("{:?}", bs),
}

@ -22,8 +22,8 @@ use crate::data::relation::StoredRelationMetadata;
use crate::data::symb::{Symbol, PROG_ENTRY};
use crate::data::value::DataValue;
use crate::parse::SourceSpan;
use crate::runtime::in_mem::InMemRelation;
use crate::runtime::relation::InputRelationHandle;
use crate::runtime::temp_store::EpochStore;
use crate::runtime::transact::SessionTx;
#[derive(Debug, Clone, Eq, PartialEq)]
@ -277,7 +277,7 @@ impl MagicAlgoApply {
idx: usize,
len: usize,
tx: &SessionTx<'_>,
stores: &BTreeMap<MagicSymbol, InMemRelation>,
stores: &BTreeMap<MagicSymbol, EpochStore>,
) -> Result<&MagicAlgoRuleArg> {
#[derive(Error, Diagnostic, Debug)]
#[error("Input relation to algorithm has insufficient arity")]

@ -98,7 +98,6 @@ pub enum DataValue {
Regex(RegexWrapper),
List(Vec<DataValue>),
Set(BTreeSet<DataValue>),
Guard,
Bot,
}
@ -245,9 +244,6 @@ impl Display for DataValue {
}
DataValue::List(ls) => f.debug_list().entries(ls).finish(),
DataValue::Set(s) => f.debug_list().entries(s).finish(),
DataValue::Guard => {
write!(f, "null")
}
DataValue::Bot => write!(f, "null"),
}
}

@ -7,7 +7,7 @@
*/
use std::collections::btree_map::Entry;
use std::collections::{BTreeMap, BTreeSet};
use std::collections::BTreeMap;
use std::mem;
use itertools::Itertools;
@ -22,7 +22,7 @@ use crate::data::value::DataValue;
use crate::parse::SourceSpan;
use crate::query::compile::{AggrKind, CompiledProgram, CompiledRule, CompiledRuleSet};
use crate::runtime::db::Poison;
use crate::runtime::in_mem::InMemRelation;
use crate::runtime::temp_store::{EpochStore, MeetAggrStore, NormalTempStore};
use crate::runtime::transact::SessionTx;
pub(crate) struct QueryLimiter {
@ -56,33 +56,35 @@ impl<'a> SessionTx<'a> {
total_num_to_take: Option<usize>,
num_to_skip: Option<usize>,
poison: Poison,
) -> Result<(InMemRelation, bool)> {
let mut stores = BTreeMap::new();
) -> Result<(EpochStore, bool)> {
let mut stores: BTreeMap<MagicSymbol, EpochStore> = BTreeMap::new();
let mut early_return = false;
let entry_symbol = MagicSymbol::Muggle {
inner: Symbol::new(PROG_ENTRY, SourceSpan(0, 0)),
};
for (stratum, cur_prog) in strata.iter().enumerate() {
if stratum > 0 {
// remove stores that have outlived their usefulness!
stores = stores
.into_iter()
.filter(|(name, _)| {
if *name == entry_symbol {
return true;
}
match store_lifetimes.get(name) {
None => false,
Some(n) => *n >= stratum,
}
.filter(|(name, _)| match store_lifetimes.get(name) {
None => false,
Some(n) => *n >= stratum,
})
.collect()
.collect();
trace!("{:?}", stores);
}
for (rule_name, rule_set) in cur_prog {
stores.insert(rule_name.clone(), self.new_rule_store(rule_set.arity()));
let store = match rule_set.aggr_kind() {
AggrKind::None | AggrKind::Normal => EpochStore::new_normal(rule_set.arity()),
AggrKind::Meet => {
let rs = match rule_set {
CompiledRuleSet::Rules(rs) => rs,
_ => unreachable!(),
};
EpochStore::new_meet(&rs[0].aggr)?
}
};
stores.insert(rule_name.clone(), store);
}
debug!("stratum {}", stratum);
early_return = self.semi_naive_magic_evaluate(
cur_prog,
&mut stores,
@ -91,6 +93,9 @@ impl<'a> SessionTx<'a> {
poison.clone(),
)?;
}
let entry_symbol = MagicSymbol::Muggle {
inner: Symbol::new(PROG_ENTRY, SourceSpan(0, 0)),
};
let ret_area = stores.remove(&entry_symbol).ok_or(NoEntryError)?;
Ok((ret_area, early_return))
}
@ -98,7 +103,7 @@ impl<'a> SessionTx<'a> {
fn semi_naive_magic_evaluate(
&self,
prog: &CompiledProgram,
stores: &mut BTreeMap<MagicSymbol, InMemRelation>,
stores: &mut BTreeMap<MagicSymbol, EpochStore>,
total_num_to_take: Option<usize>,
num_to_skip: Option<usize>,
poison: Poison,
@ -117,61 +122,65 @@ impl<'a> SessionTx<'a> {
debug!("epoch {}", epoch);
if epoch == 0 {
for (k, compiled_ruleset) in prog.iter().rev() {
match compiled_ruleset {
let new_store = match compiled_ruleset {
CompiledRuleSet::Rules(ruleset) => match compiled_ruleset.aggr_kind() {
AggrKind::None => {
used_limiter = self.initial_rule_normal_eval(
let res = self.initial_rule_non_aggr_eval(
k,
ruleset,
stores,
&mut changed,
&mut limiter,
poison.clone(),
)? || used_limiter;
)?;
used_limiter = res.0 || used_limiter;
res.1.wrap()
}
AggrKind::Normal => {
used_limiter = self.initial_rule_aggr_eval(
let res = self.initial_rule_aggr_eval(
k,
ruleset,
stores,
&mut changed,
&mut limiter,
poison.clone(),
)? || used_limiter;
)?;
used_limiter = res.0 || used_limiter;
res.1.wrap()
}
AggrKind::Meet => {
self.initial_rule_meet_eval(
let new = self.initial_rule_meet_eval(
k,
ruleset,
stores,
&mut changed,
poison.clone(),
)?;
new.wrap()
}
},
CompiledRuleSet::Algo(algo_apply) => {
let mut algo_impl = algo_apply.algo.get_impl()?;
let out = stores.get(k).unwrap();
algo_impl.run(self, algo_apply, stores, out, poison.clone())?;
let mut out = NormalTempStore::default();
algo_impl.run(self, algo_apply, stores, &mut out, poison.clone())?;
out.wrap()
}
}
};
let old_store = stores.get_mut(k).unwrap();
old_store.merge(new_store)?;
}
} else {
for store in stores.values() {
store.ensure_mem_db_for_epoch(epoch);
}
mem::swap(&mut changed, &mut prev_changed);
for (_k, v) in changed.iter_mut() {
*v = false;
}
for (k, compiled_ruleset) in prog.iter().rev() {
match compiled_ruleset {
let new_store = match compiled_ruleset {
CompiledRuleSet::Rules(ruleset) => {
match compiled_ruleset.aggr_kind() {
AggrKind::None => {
used_limiter = self.incremental_rule_normal_eval(
let res = self.incremental_rule_non_aggr_eval(
k,
ruleset,
epoch,
@ -180,29 +189,35 @@ impl<'a> SessionTx<'a> {
&mut changed,
&mut limiter,
poison.clone(),
)? || used_limiter;
)?;
used_limiter = res.0 || used_limiter;
res.1.wrap()
}
AggrKind::Meet => {
self.incremental_rule_meet_eval(
let new = self.incremental_rule_meet_eval(
k,
ruleset,
epoch,
stores,
&prev_changed,
&mut changed,
poison.clone(),
)?;
new.wrap()
}
AggrKind::Normal => {
// not doing anything
NormalTempStore::default().wrap()
}
}
}
CompiledRuleSet::Algo(_) => {
// no need to do anything, algos are only calculated once
NormalTempStore::default().wrap()
}
}
};
let old_store = stores.get_mut(k).unwrap();
old_store.merge(new_store)?;
}
}
if changed.values().all(|rule_changed| !*rule_changed) {
@ -212,52 +227,54 @@ impl<'a> SessionTx<'a> {
Ok(used_limiter)
}
/// returns true is early return is activated
fn initial_rule_normal_eval(
fn initial_rule_non_aggr_eval(
&self,
rule_symb: &MagicSymbol,
ruleset: &[CompiledRule],
stores: &mut BTreeMap<MagicSymbol, InMemRelation>,
stores: &mut BTreeMap<MagicSymbol, EpochStore>,
changed: &mut BTreeMap<&MagicSymbol, bool>,
limiter: &mut QueryLimiter,
poison: Poison,
) -> Result<bool> {
let store = stores.get(rule_symb).unwrap();
let use_delta = BTreeSet::default();
) -> Result<(bool, NormalTempStore)> {
let mut out_store = NormalTempStore::default();
let should_check_limit = limiter.total.is_some() && rule_symb.is_prog_entry();
for (rule_n, rule) in ruleset.iter().enumerate() {
debug!("initial calculation for rule {:?}.{}", rule_symb, rule_n);
for item_res in rule.relation.iter(self, Some(0), &use_delta, stores)? {
for item_res in rule.relation.iter(self, None, stores)? {
let item = item_res?;
trace!("item for {:?}.{}: {:?} at {}", rule_symb, rule_n, item, 0);
if should_check_limit {
if !store.exists(&item, 0) {
store.put_with_skip(item, limiter.should_skip_next());
if !out_store.exists(&item) {
if limiter.should_skip_next() {
out_store.put_with_skip(item);
} else {
out_store.put(item);
}
if limiter.incr_and_should_stop() {
trace!("early stopping due to result count limit exceeded");
return Ok(true);
return Ok((true, out_store));
}
}
} else {
store.put(item, 0);
out_store.put(item);
}
*changed.get_mut(rule_symb).unwrap() = true;
}
poison.check()?;
}
Ok(should_check_limit)
Ok((should_check_limit, out_store))
}
fn initial_rule_meet_eval(
&self,
rule_symb: &MagicSymbol,
ruleset: &[CompiledRule],
stores: &mut BTreeMap<MagicSymbol, InMemRelation>,
stores: &mut BTreeMap<MagicSymbol, EpochStore>,
changed: &mut BTreeMap<&MagicSymbol, bool>,
poison: Poison,
) -> Result<()> {
let store = stores.get(rule_symb).unwrap();
let use_delta = BTreeSet::default();
) -> Result<MeetAggrStore> {
let mut out_store = MeetAggrStore::new(ruleset[0].aggr.clone())?;
for (rule_n, rule) in ruleset.iter().enumerate() {
debug!("initial calculation for rule {:?}.{}", rule_symb, rule_n);
@ -265,16 +282,16 @@ impl<'a> SessionTx<'a> {
for (aggr, args) in aggr.iter_mut().flatten() {
aggr.meet_init(args)?;
}
for item_res in rule.relation.iter(self, Some(0), &use_delta, stores)? {
for item_res in rule.relation.iter(self, None, stores)? {
let item = item_res?;
trace!("item for {:?}.{}: {:?} at {}", rule_symb, rule_n, item, 0);
store.aggr_meet_put(&item, &mut aggr, 0)?;
out_store.meet_put(item)?;
*changed.get_mut(rule_symb).unwrap() = true;
}
poison.check()?;
}
if store.is_empty() && ruleset[0].aggr.iter().all(|a| a.is_some()) {
if out_store.is_empty() && ruleset[0].aggr.iter().all(|a| a.is_some()) {
let mut aggr = ruleset[0].aggr.clone();
for (aggr, args) in aggr.iter_mut().flatten() {
aggr.meet_init(args)?;
@ -287,21 +304,20 @@ impl<'a> SessionTx<'a> {
Ok(op.init_val())
})
.try_collect()?;
store.aggr_meet_put(&value, &mut aggr, 0)?;
out_store.meet_put(value)?;
}
Ok(())
Ok(out_store)
}
fn initial_rule_aggr_eval(
&self,
rule_symb: &MagicSymbol,
ruleset: &[CompiledRule],
stores: &mut BTreeMap<MagicSymbol, InMemRelation>,
stores: &mut BTreeMap<MagicSymbol, EpochStore>,
changed: &mut BTreeMap<&MagicSymbol, bool>,
limiter: &mut QueryLimiter,
poison: Poison,
) -> Result<bool> {
let store = stores.get(rule_symb).unwrap();
let use_delta = BTreeSet::default();
) -> Result<(bool, NormalTempStore)> {
let mut out_store = NormalTempStore::default();
let should_check_limit = limiter.total.is_some() && rule_symb.is_prog_entry();
let mut aggr_work: BTreeMap<Vec<DataValue>, Vec<Aggregation>> = BTreeMap::new();
@ -310,6 +326,7 @@ impl<'a> SessionTx<'a> {
"Calculation for normal aggr rule {:?}.{}",
rule_symb, rule_n
);
trace!("{:?}", rule);
let keys_indices = rule
.aggr
@ -331,7 +348,7 @@ impl<'a> SessionTx<'a> {
})
.collect_vec();
for item_res in rule.relation.iter(self, Some(0), &use_delta, stores)? {
for item_res in rule.relation.iter(self, None, stores)? {
let item = item_res?;
trace!("item for {:?}.{}: {:?} at {}", rule_symb, rule_n, item, 0);
@ -390,7 +407,7 @@ impl<'a> SessionTx<'a> {
op.get()
})
.try_collect()?;
store.put(empty_result, 0);
out_store.put(empty_result);
}
for (keys, aggrs) in aggr_work {
@ -406,31 +423,36 @@ impl<'a> SessionTx<'a> {
.try_collect()?;
let tuple = tuple_data;
if should_check_limit {
if !store.exists(&tuple, 0) {
store.put_with_skip(tuple, limiter.should_skip_next());
if !out_store.exists(&tuple) {
if limiter.should_skip_next() {
out_store.put_with_skip(tuple);
} else {
out_store.put(tuple);
}
if limiter.incr_and_should_stop() {
return Ok(true);
return Ok((true, out_store));
}
}
// else, do nothing
} else {
store.put(tuple, 0);
out_store.put(tuple);
}
}
Ok(should_check_limit)
Ok((should_check_limit, out_store))
}
fn incremental_rule_normal_eval(
fn incremental_rule_non_aggr_eval(
&self,
rule_symb: &MagicSymbol,
ruleset: &[CompiledRule],
epoch: u32,
stores: &mut BTreeMap<MagicSymbol, InMemRelation>,
stores: &mut BTreeMap<MagicSymbol, EpochStore>,
prev_changed: &BTreeMap<&MagicSymbol, bool>,
changed: &mut BTreeMap<&MagicSymbol, bool>,
limiter: &mut QueryLimiter,
poison: Poison,
) -> Result<bool> {
let store = stores.get(rule_symb).unwrap();
) -> Result<(bool, NormalTempStore)> {
let prev_store = stores.get(rule_symb).unwrap();
let mut out_store = NormalTempStore::default();
let should_check_limit = limiter.total.is_some() && rule_symb.is_prog_entry();
for (rule_n, rule) in ruleset.iter().enumerate() {
let mut should_do_calculation = false;
@ -446,12 +468,7 @@ impl<'a> SessionTx<'a> {
continue;
}
let mut aggr = rule.aggr.clone();
for (aggr, args) in aggr.iter_mut().flatten() {
aggr.meet_init(args)?;
}
for (delta_key, delta_store) in stores.iter() {
for (delta_key, _) in stores.iter() {
if !rule.contained_rules.contains(delta_key) {
continue;
}
@ -459,11 +476,10 @@ impl<'a> SessionTx<'a> {
"with delta {:?} for rule {:?}.{}",
delta_key, rule_symb, rule_n
);
let use_delta = BTreeSet::from([delta_store.id]);
for item_res in rule.relation.iter(self, Some(epoch), &use_delta, stores)? {
for item_res in rule.relation.iter(self, Some(delta_key), stores)? {
let item = item_res?;
// improvement: the clauses can actually be evaluated in parallel
if store.exists(&item, 0) {
if prev_store.exists(&item) {
trace!(
"item for {:?}.{}: {:?} at {}, rederived",
rule_symb,
@ -480,30 +496,33 @@ impl<'a> SessionTx<'a> {
epoch
);
*changed.get_mut(rule_symb).unwrap() = true;
store.put(item.clone(), epoch);
store.put_with_skip(item, limiter.should_skip_next());
if limiter.should_skip_next() {
out_store.put_with_skip(item);
} else {
out_store.put(item);
}
if should_check_limit && limiter.incr_and_should_stop() {
trace!("early stopping due to result count limit exceeded");
return Ok(true);
return Ok((true, out_store));
}
}
}
poison.check()?;
}
}
Ok(should_check_limit)
Ok((should_check_limit, out_store))
}
fn incremental_rule_meet_eval(
&self,
rule_symb: &MagicSymbol,
ruleset: &[CompiledRule],
epoch: u32,
stores: &mut BTreeMap<MagicSymbol, InMemRelation>,
stores: &mut BTreeMap<MagicSymbol, EpochStore>,
prev_changed: &BTreeMap<&MagicSymbol, bool>,
changed: &mut BTreeMap<&MagicSymbol, bool>,
poison: Poison,
) -> Result<()> {
let store = stores.get(rule_symb).unwrap();
) -> Result<MeetAggrStore> {
// let store = stores.get(rule_symb).unwrap();
let mut out_store = MeetAggrStore::new(ruleset[0].aggr.clone())?;
for (rule_n, rule) in ruleset.iter().enumerate() {
let mut should_do_calculation = false;
for d_rule in &rule.contained_rules {
@ -523,7 +542,7 @@ impl<'a> SessionTx<'a> {
aggr.meet_init(args)?;
}
for (delta_key, delta_store) in stores.iter() {
for (delta_key, _) in stores.iter() {
if !rule.contained_rules.contains(delta_key) {
continue;
}
@ -531,11 +550,10 @@ impl<'a> SessionTx<'a> {
"with delta {:?} for rule {:?}.{}",
delta_key, rule_symb, rule_n
);
let use_delta = BTreeSet::from([delta_store.id]);
for item_res in rule.relation.iter(self, Some(epoch), &use_delta, stores)? {
for item_res in rule.relation.iter(self, Some(delta_key), stores)? {
let item = item_res?;
// improvement: the clauses can actually be evaluated in parallel
let aggr_changed = store.aggr_meet_put(&item, &mut aggr, epoch)?;
let aggr_changed = out_store.meet_put(item)?;
if aggr_changed {
*changed.get_mut(rule_symb).unwrap() = true;
}
@ -543,6 +561,6 @@ impl<'a> SessionTx<'a> {
poison.check()?;
}
}
Ok(())
Ok(out_store)
}
}

@ -25,8 +25,8 @@ use crate::data::symb::Symbol;
use crate::data::tuple::{Tuple, TupleIter};
use crate::data::value::DataValue;
use crate::parse::SourceSpan;
use crate::runtime::in_mem::{InMemRelation, StoredRelationId};
use crate::runtime::relation::RelationHandle;
use crate::runtime::temp_store::EpochStore;
use crate::runtime::transact::SessionTx;
use crate::utils::swap_option_result;
@ -113,9 +113,8 @@ impl UnificationRA {
fn iter<'a>(
&'a self,
tx: &'a SessionTx<'_>,
epoch: Option<u32>,
use_delta: &BTreeSet<StoredRelationId>,
stores: &'a BTreeMap<MagicSymbol, InMemRelation>
delta_rule: Option<&MagicSymbol>,
stores: &'a BTreeMap<MagicSymbol, EpochStore>,
) -> Result<TupleIter<'a>> {
let mut bindings = self.parent.bindings_after_eliminate();
bindings.push(self.binding.clone());
@ -123,7 +122,7 @@ impl UnificationRA {
Ok(if self.is_multi {
let it = self
.parent
.iter(tx, epoch, use_delta, stores)?
.iter(tx, delta_rule, stores)?
.map_ok(move |tuple| -> Result<Vec<Tuple>> {
let result_list = self.expr.eval(&tuple)?;
let result_list = result_list.get_list().ok_or_else(|| {
@ -151,7 +150,7 @@ impl UnificationRA {
} else {
Box::new(
self.parent
.iter(tx, epoch, use_delta, stores)?
.iter(tx, delta_rule, stores)?
.map_ok(move |tuple| -> Result<Tuple> {
let result = self.expr.eval(&tuple)?;
let mut ret = tuple;
@ -204,15 +203,14 @@ impl FilteredRA {
fn iter<'a>(
&'a self,
tx: &'a SessionTx<'_>,
epoch: Option<u32>,
use_delta: &BTreeSet<StoredRelationId>,
stores: &'a BTreeMap<MagicSymbol, InMemRelation>
delta_rule: Option<&MagicSymbol>,
stores: &'a BTreeMap<MagicSymbol, EpochStore>,
) -> Result<TupleIter<'a>> {
let bindings = self.parent.bindings_after_eliminate();
let eliminate_indices = get_eliminate_indices(&bindings, &self.to_eliminate);
Ok(Box::new(
self.parent
.iter(tx, epoch, use_delta, stores)?
.iter(tx, delta_rule, stores)?
.filter_map(move |tuple| match tuple {
Ok(t) => {
for p in self.pred.iter() {
@ -260,13 +258,13 @@ impl Debug for RelAlgebra {
}
}
RelAlgebra::TempStore(r) => f
.debug_tuple("Derived")
.debug_tuple("TempStore")
.field(&bindings)
.field(&r.storage_key)
.field(&r.filters)
.finish(),
RelAlgebra::Stored(r) => f
.debug_tuple("Derived")
.debug_tuple("Stored")
.field(&bindings)
.field(&r.storage.name)
.field(&r.filters)
@ -356,7 +354,11 @@ impl RelAlgebra {
pub(crate) fn cartesian_join(self, right: RelAlgebra, span: SourceSpan) -> Self {
self.join(right, vec![], vec![], span)
}
pub(crate) fn derived(bindings: Vec<Symbol>, storage_key: MagicSymbol, span: SourceSpan) -> Self {
pub(crate) fn derived(
bindings: Vec<Symbol>,
storage_key: MagicSymbol,
span: SourceSpan,
) -> Self {
Self::TempStore(TempStoreRA {
bindings,
storage_key,
@ -556,9 +558,8 @@ impl ReorderRA {
fn iter<'a>(
&'a self,
tx: &'a SessionTx<'_>,
epoch: Option<u32>,
use_delta: &BTreeSet<StoredRelationId>,
stores: &'a BTreeMap<MagicSymbol, InMemRelation>
delta_rule: Option<&MagicSymbol>,
stores: &'a BTreeMap<MagicSymbol, EpochStore>,
) -> Result<TupleIter<'a>> {
let old_order = self.relation.bindings_after_eliminate();
let old_order_indices: BTreeMap<_, _> = old_order
@ -575,16 +576,18 @@ impl ReorderRA {
.expect("program logic error: reorder indices mismatch")
})
.collect_vec();
Ok(Box::new(self.relation.iter(tx, epoch, use_delta, stores)?.map_ok(
move |tuple| {
let old = tuple;
let new = reorder_indices
.iter()
.map(|i| old[*i].clone())
.collect_vec();
new
},
)))
Ok(Box::new(
self.relation
.iter(tx, delta_rule, stores)?
.map_ok(move |tuple| {
let old = tuple;
let new = reorder_indices
.iter()
.map(|i| old[*i].clone())
.collect_vec();
new
}),
))
}
}
@ -1069,26 +1072,20 @@ impl TempStoreRA {
fn iter<'a>(
&'a self,
epoch: Option<u32>,
use_delta: &BTreeSet<StoredRelationId>,
stores: &'a BTreeMap<MagicSymbol, InMemRelation>
delta_rule: Option<&MagicSymbol>,
stores: &'a BTreeMap<MagicSymbol, EpochStore>,
) -> Result<TupleIter<'a>> {
let storage = stores.get(&self.storage_key).unwrap();
if epoch == Some(0) && use_delta.contains(&storage.id) {
return Ok(Box::new(iter::empty()));
}
let scan_epoch = match epoch {
None => 0,
Some(ep) => {
if use_delta.contains(&storage.id) {
ep - 1
} else {
0
}
}
let scan_epoch = match delta_rule {
None => false,
Some(name) => *name == self.storage_key,
};
let it = if scan_epoch {
Left(storage.delta_all_iter().map(|t| Ok(t.into_tuple())))
} else {
Right(storage.all_iter().map(|t| Ok(t.into_tuple())))
};
let it = storage.scan_all_for_epoch(scan_epoch);
Ok(if self.filters.is_empty() {
Box::new(it)
} else {
@ -1100,7 +1097,7 @@ impl TempStoreRA {
left_iter: TupleIter<'a>,
(left_join_indices, right_join_indices): (Vec<usize>, Vec<usize>),
eliminate_indices: BTreeSet<usize>,
stores: &'a BTreeMap<MagicSymbol, InMemRelation>
stores: &'a BTreeMap<MagicSymbol, EpochStore>,
) -> Result<TupleIter<'a>> {
let storage = stores.get(&self.storage_key).unwrap();
debug_assert!(!right_join_indices.is_empty());
@ -1122,12 +1119,11 @@ impl TempStoreRA {
.map(|i| tuple[*i].clone())
.collect_vec();
'outer: for found in storage.scan_prefix(&prefix) {
let found = found?;
'outer: for found in storage.prefix_iter(&prefix) {
for (left_idx, right_idx) in
left_join_indices.iter().zip(right_join_indices.iter())
{
if tuple[*left_idx] != found[*right_idx] {
if tuple[*left_idx] != *found.get(*right_idx) {
continue 'outer;
}
}
@ -1155,11 +1151,10 @@ impl TempStoreRA {
))
} else {
let mut right_join_vals = BTreeSet::new();
for tuple in storage.scan_all() {
let tuple = tuple?;
for tuple in storage.all_iter() {
let to_join: Box<[DataValue]> = right_join_indices
.iter()
.map(|i| tuple[*i].clone())
.map(|i| tuple.get(*i).clone())
.collect();
right_join_vals.insert(to_join);
}
@ -1200,29 +1195,20 @@ impl TempStoreRA {
left_iter: TupleIter<'a>,
(left_join_indices, right_join_indices): (Vec<usize>, Vec<usize>),
eliminate_indices: BTreeSet<usize>,
epoch: Option<u32>,
use_delta: &BTreeSet<StoredRelationId>,
stores: &'a BTreeMap<MagicSymbol, InMemRelation>
delta_rule: Option<&MagicSymbol>,
stores: &'a BTreeMap<MagicSymbol, EpochStore>,
) -> Result<TupleIter<'a>> {
let storage = stores.get(&self.storage_key).unwrap();
if epoch == Some(0) && use_delta.contains(&storage.id) {
return Ok(Box::new(iter::empty()));
}
let mut right_invert_indices = right_join_indices.iter().enumerate().collect_vec();
right_invert_indices.sort_by_key(|(_, b)| **b);
let left_to_prefix_indices = right_invert_indices
.into_iter()
.map(|(a, _)| left_join_indices[a])
.collect_vec();
let scan_epoch = match epoch {
None => 0,
Some(ep) => {
if use_delta.contains(&storage.id) {
ep - 1
} else {
0
}
}
let scan_epoch = match delta_rule {
None => false,
Some(name) => *name == self.storage_key,
};
let mut skip_range_check = false;
let it = left_iter
@ -1241,13 +1227,23 @@ impl TempStoreRA {
if !l_bound.iter().all(|v| *v == DataValue::Null)
|| !u_bound.iter().all(|v| *v == DataValue::Bot)
{
let mut lower_bound = prefix.clone();
lower_bound.extend(l_bound);
let mut upper_bound = prefix.clone();
upper_bound.extend(u_bound);
let it = if scan_epoch {
Left(storage.delta_range_iter(&lower_bound, &upper_bound, true))
} else {
Right(storage.range_iter(&lower_bound, &upper_bound, true))
};
return Left(
storage
.scan_bounded_prefix_for_epoch(
&prefix, &l_bound, &u_bound, scan_epoch,
)
.map(move |res_found| -> Result<Option<Tuple>> {
let found = res_found?;
it.map(move |res_found| -> Result<Option<Tuple>> {
if self.filters.is_empty() {
let mut ret = tuple.clone();
ret.extend(res_found.into_iter().cloned());
Ok(Some(ret))
} else {
let found = res_found.into_tuple();
for p in self.filters.iter() {
if !p.eval_pred(&found)? {
return Ok(None);
@ -1256,17 +1252,28 @@ impl TempStoreRA {
let mut ret = tuple.clone();
ret.extend(found);
Ok(Some(ret))
})
.filter_map(swap_option_result),
}
})
.filter_map(swap_option_result),
);
}
}
skip_range_check = true;
let it = if scan_epoch {
Left(storage.delta_prefix_iter(&prefix))
} else {
Right(storage.prefix_iter(&prefix))
};
Right(
storage
.scan_prefix_for_epoch(&prefix, scan_epoch)
.map(move |res_found| -> Result<Option<Tuple>> {
let found = res_found?;
it.map(move |res_found| -> Result<Option<Tuple>> {
if self.filters.is_empty() {
let mut ret = tuple.clone();
ret.extend(res_found.into_iter().cloned());
Ok(Some(ret))
} else {
let found = res_found.into_tuple();
for p in self.filters.iter() {
if !p.eval_pred(&found)? {
return Ok(None);
@ -1275,8 +1282,9 @@ impl TempStoreRA {
let mut ret = tuple.clone();
ret.extend(found);
Ok(Some(ret))
})
.filter_map(swap_option_result),
}
})
.filter_map(swap_option_result),
)
})
.flatten_ok()
@ -1395,19 +1403,18 @@ impl RelAlgebra {
pub(crate) fn iter<'a>(
&'a self,
tx: &'a SessionTx<'_>,
epoch: Option<u32>,
use_delta: &BTreeSet<StoredRelationId>,
stores: &'a BTreeMap<MagicSymbol, InMemRelation>
delta_rule: Option<&MagicSymbol>,
stores: &'a BTreeMap<MagicSymbol, EpochStore>,
) -> Result<TupleIter<'a>> {
match self {
RelAlgebra::Fixed(f) => Ok(Box::new(f.data.iter().map(|t| Ok(t.clone())))),
RelAlgebra::TempStore(r) => r.iter(epoch, use_delta, stores),
RelAlgebra::TempStore(r) => r.iter(delta_rule, stores),
RelAlgebra::Stored(v) => v.iter(tx),
RelAlgebra::Join(j) => j.iter(tx, epoch, use_delta, stores),
RelAlgebra::Reorder(r) => r.iter(tx, epoch, use_delta, stores),
RelAlgebra::Filter(r) => r.iter(tx, epoch, use_delta, stores),
RelAlgebra::NegJoin(r) => r.iter(tx, epoch, use_delta, stores),
RelAlgebra::Unification(r) => r.iter(tx, epoch, use_delta, stores),
RelAlgebra::Join(j) => j.iter(tx, delta_rule, stores),
RelAlgebra::Reorder(r) => r.iter(tx, delta_rule, stores),
RelAlgebra::Filter(r) => r.iter(tx, delta_rule, stores),
RelAlgebra::NegJoin(r) => r.iter(tx, delta_rule, stores),
RelAlgebra::Unification(r) => r.iter(tx, delta_rule, stores),
}
}
}
@ -1474,9 +1481,8 @@ impl NegJoin {
pub(crate) fn iter<'a>(
&'a self,
tx: &'a SessionTx<'_>,
epoch: Option<u32>,
use_delta: &BTreeSet<StoredRelationId>,
stores: &'a BTreeMap<MagicSymbol, InMemRelation>
delta_rule: Option<&MagicSymbol>,
stores: &'a BTreeMap<MagicSymbol, EpochStore>,
) -> Result<TupleIter<'a>> {
let bindings = self.left.bindings_after_eliminate();
let eliminate_indices = get_eliminate_indices(&bindings, &self.to_eliminate);
@ -1490,10 +1496,10 @@ impl NegJoin {
)
.unwrap();
r.neg_join(
self.left.iter(tx, epoch, use_delta, stores)?,
self.left.iter(tx, delta_rule, stores)?,
join_indices,
eliminate_indices,
stores
stores,
)
}
RelAlgebra::Stored(v) => {
@ -1506,7 +1512,7 @@ impl NegJoin {
.unwrap();
v.neg_join(
tx,
self.left.iter(tx, epoch, use_delta, stores)?,
self.left.iter(tx, delta_rule, stores)?,
join_indices,
eliminate_indices,
)
@ -1604,9 +1610,8 @@ impl InnerJoin {
pub(crate) fn iter<'a>(
&'a self,
tx: &'a SessionTx<'_>,
epoch: Option<u32>,
use_delta: &BTreeSet<StoredRelationId>,
stores: &'a BTreeMap<MagicSymbol, InMemRelation>
delta_rule: Option<&MagicSymbol>,
stores: &'a BTreeMap<MagicSymbol, EpochStore>,
) -> Result<TupleIter<'a>> {
let bindings = self.bindings();
let eliminate_indices = get_eliminate_indices(&bindings, &self.to_eliminate);
@ -1620,7 +1625,7 @@ impl InnerJoin {
)
.unwrap();
f.join(
self.left.iter(tx, epoch, use_delta, stores)?,
self.left.iter(tx, delta_rule, stores)?,
join_indices,
eliminate_indices,
)
@ -1635,15 +1640,14 @@ impl InnerJoin {
.unwrap();
if join_is_prefix(&join_indices.1) {
r.prefix_join(
self.left.iter(tx, epoch, use_delta, stores)?,
self.left.iter(tx, delta_rule, stores)?,
join_indices,
eliminate_indices,
epoch,
use_delta,
stores
delta_rule,
stores,
)
} else {
self.materialized_join(tx, eliminate_indices, epoch, use_delta, stores)
self.materialized_join(tx, eliminate_indices, delta_rule, stores)
}
}
RelAlgebra::Stored(r) => {
@ -1658,17 +1662,17 @@ impl InnerJoin {
let left_len = self.left.bindings_after_eliminate().len();
r.prefix_join(
tx,
self.left.iter(tx, epoch, use_delta, stores)?,
self.left.iter(tx, delta_rule, stores)?,
join_indices,
eliminate_indices,
left_len,
)
} else {
self.materialized_join(tx, eliminate_indices, epoch, use_delta, stores)
self.materialized_join(tx, eliminate_indices, delta_rule, stores)
}
}
RelAlgebra::Join(_) | RelAlgebra::Filter(_) | RelAlgebra::Unification(_) => {
self.materialized_join(tx, eliminate_indices, epoch, use_delta, stores)
self.materialized_join(tx, eliminate_indices, delta_rule, stores)
}
RelAlgebra::Reorder(_) => {
panic!("joining on reordered")
@ -1682,9 +1686,8 @@ impl InnerJoin {
&'a self,
tx: &'a SessionTx<'_>,
eliminate_indices: BTreeSet<usize>,
epoch: Option<u32>,
use_delta: &BTreeSet<StoredRelationId>,
stores: &'a BTreeMap<MagicSymbol, InMemRelation>
delta_rule: Option<&MagicSymbol>,
stores: &'a BTreeMap<MagicSymbol, EpochStore>,
) -> Result<TupleIter<'a>> {
let right_bindings = self.right.bindings_after_eliminate();
let (left_join_indices, right_join_indices) = self
@ -1692,7 +1695,7 @@ impl InnerJoin {
.join_indices(&self.left.bindings_after_eliminate(), &right_bindings)
.unwrap();
let mut left_iter = self.left.iter(tx, epoch, use_delta, stores)?;
let mut left_iter = self.left.iter(tx, delta_rule, stores)?;
let left_cache = match left_iter.next() {
None => return Ok(Box::new(iter::empty())),
Some(Err(err)) => return Err(err),
@ -1717,7 +1720,7 @@ impl InnerJoin {
self.mat_right_cache.borrow().clone()
} else {
let mut cache = BTreeSet::new();
for item in self.right.iter(tx, epoch, use_delta, stores)? {
for item in self.right.iter(tx, delta_rule, stores)? {
match item {
Ok(tuple) => {
let stored_tuple = right_store_indices

@ -15,13 +15,13 @@ use miette::Result;
use crate::data::program::SortDir;
use crate::data::symb::Symbol;
use crate::data::tuple::Tuple;
use crate::runtime::in_mem::InMemRelation;
use crate::runtime::temp_store::EpochStore;
use crate::runtime::transact::SessionTx;
impl<'a> SessionTx<'a> {
pub(crate) fn sort_and_collect(
&mut self,
original: InMemRelation,
original: EpochStore,
sorters: &[(Symbol, SortDir)],
head: &[Symbol],
) -> Result<Vec<Tuple>> {
@ -31,7 +31,7 @@ impl<'a> SessionTx<'a> {
.map(|(k, dir)| (head_indices[k], *dir))
.collect_vec();
let mut all_data: Vec<_> = original.scan_all().try_collect()?;
let mut all_data: Vec<_> = original.all_iter().map(|v| v.into_tuple()).collect_vec();
all_data.sort_by(|a, b| {
for (idx, dir) in &idx_sorters {
match a[*idx].cmp(&b[*idx]) {

@ -36,7 +36,7 @@ impl<'a> SessionTx<'a> {
pub(crate) fn execute_relation<'s, S: Storage<'s>>(
&mut self,
db: &Db<S>,
res_iter: impl Iterator<Item = Result<Tuple>>,
res_iter: impl Iterator<Item = Tuple>,
op: RelationOp,
meta: &InputRelationHandle,
headers: &[Symbol],
@ -110,7 +110,6 @@ impl<'a> SessionTx<'a> {
let mut old_tuples: Vec<DataValue> = vec![];
for tuple in res_iter {
let tuple = tuple?;
let extracted = key_extractors
.iter()
.map(|ex| ex.extract_data(&tuple))
@ -193,8 +192,6 @@ impl<'a> SessionTx<'a> {
key_extractors.extend(val_extractors);
for tuple in res_iter {
let tuple = tuple?;
let extracted = key_extractors
.iter()
.map(|ex| ex.extract_data(&tuple))
@ -242,7 +239,6 @@ impl<'a> SessionTx<'a> {
)?;
for tuple in res_iter {
let tuple = tuple?;
let extracted = key_extractors
.iter()
.map(|ex| ex.extract_data(&tuple))
@ -286,8 +282,6 @@ impl<'a> SessionTx<'a> {
key_extractors.extend(val_extractors);
for tuple in res_iter {
let tuple = tuple?;
let extracted = key_extractors
.iter()
.map(|ex| ex.extract_data(&tuple))

@ -407,7 +407,6 @@ impl<'s, S: Storage<'s>> Db<S> {
fn transact(&'s self) -> Result<SessionTx<'_>> {
let ret = SessionTx {
tx: Box::new(self.db.transact(false)?),
mem_store_id: Default::default(),
relation_store_id: self.relation_store_id.clone(),
};
Ok(ret)
@ -415,7 +414,6 @@ impl<'s, S: Storage<'s>> Db<S> {
fn transact_write(&'s self) -> Result<SessionTx<'_>> {
let ret = SessionTx {
tx: Box::new(self.db.transact(true)?),
mem_store_id: Default::default(),
relation_store_id: self.relation_store_id.clone(),
};
Ok(ret)
@ -839,7 +837,7 @@ impl<'s, S: Storage<'s>> Db<S> {
};
// the real evaluation
let (result, early_return) = tx.stratified_magic_evaluate(
let (result_store, early_return) = tx.stratified_magic_evaluate(
&compiled,
store_lifetimes,
total_num_to_take,
@ -851,22 +849,18 @@ impl<'s, S: Storage<'s>> Db<S> {
if let Some(assertion) = &input_program.out_opts.assertion {
match assertion {
QueryAssertion::AssertNone(span) => {
if let Some(tuple) = result.scan_all().next() {
let tuple = tuple?;
if let Some(tuple) = result_store.all_iter().next() {
#[derive(Debug, Error, Diagnostic)]
#[error(
"The query is asserted to return no result, but a tuple {0:?} is found"
)]
#[diagnostic(code(eval::assert_none_failure))]
struct AssertNoneFailure(Tuple, #[label] SourceSpan);
bail!(AssertNoneFailure(tuple, *span))
bail!(AssertNoneFailure(tuple.into_tuple(), *span))
}
}
QueryAssertion::AssertSome(span) => {
if let Some(tuple) = result.scan_all().next() {
let _ = tuple?;
} else {
if result_store.all_iter().next().is_none() {
#[derive(Debug, Error, Diagnostic)]
#[error("The query is asserted to return some results, but returned none")]
#[diagnostic(code(eval::assert_some_failure))]
@ -881,7 +875,7 @@ impl<'s, S: Storage<'s>> Db<S> {
// sort outputs if required
let entry_head = input_program.get_entry_out_head()?;
let sorted_result =
tx.sort_and_collect(result, &input_program.out_opts.sorters, &entry_head)?;
tx.sort_and_collect(result_store, &input_program.out_opts.sorters, &entry_head)?;
let sorted_iter = if let Some(offset) = input_program.out_opts.offset {
Left(sorted_result.into_iter().skip(offset))
} else {
@ -892,7 +886,6 @@ impl<'s, S: Storage<'s>> Db<S> {
} else {
Right(sorted_iter)
};
let sorted_iter = sorted_iter.map(Ok);
if let Some((meta, relation_op)) = &input_program.out_opts.store_relation {
let to_clear = tx
.execute_relation(
@ -914,10 +907,10 @@ impl<'s, S: Storage<'s>> Db<S> {
} else {
// not sorting outputs
let rows: Vec<Vec<JsonValue>> = sorted_iter
.map_ok(|tuple| -> Vec<JsonValue> {
.map(|tuple| -> Vec<JsonValue> {
tuple.into_iter().map(JsonValue::from).collect()
})
.try_collect()?;
.collect_vec();
let headers: Vec<String> = match input_program.get_entry_out_head() {
Ok(headers) => headers.into_iter().map(|v| v.name.to_string()).collect(),
Err(_) => match rows.get(0) {
@ -929,15 +922,23 @@ impl<'s, S: Storage<'s>> Db<S> {
}
} else {
let scan = if early_return {
Right(Left(result.scan_early_returned()))
Right(Left(
result_store.early_returned_iter().map(|t| t.into_tuple()),
))
} else if input_program.out_opts.limit.is_some()
|| input_program.out_opts.offset.is_some()
{
let limit = input_program.out_opts.limit.unwrap_or(usize::MAX);
let offset = input_program.out_opts.offset.unwrap_or(0);
Right(Right(result.scan_all().skip(offset).take(limit)))
Right(Right(
result_store
.all_iter()
.skip(offset)
.take(limit)
.map(|t| t.into_tuple()),
))
} else {
Left(result.scan_all())
Left(result_store.all_iter().map(|t| t.into_tuple()))
};
if let Some((meta, relation_op)) = &input_program.out_opts.store_relation {
@ -960,10 +961,10 @@ impl<'s, S: Storage<'s>> Db<S> {
))
} else {
let rows: Vec<Vec<JsonValue>> = scan
.map_ok(|tuple| -> Vec<JsonValue> {
.map(|tuple| -> Vec<JsonValue> {
tuple.into_iter().map(JsonValue::from).collect()
})
.try_collect()?;
.collect_vec();
let headers: Vec<String> = match input_program.get_entry_out_head() {
Ok(headers) => headers.into_iter().map(|v| v.name.to_string()).collect(),
@ -1108,6 +1109,7 @@ impl Poison {
#[cfg(test)]
mod tests {
use itertools::Itertools;
use log::debug;
use serde_json::json;
use crate::new_cozo_mem;
@ -1181,4 +1183,38 @@ mod tests {
.rows;
assert_eq!(res, vec![vec![json!(null), json!(0)]]);
}
#[test]
fn test_layers() {
let _ = env_logger::builder().is_test(true).try_init();
let db = new_cozo_mem().unwrap();
let res = db.run_script(r#"
y[a] := a in [1,2,3]
x[sum(a)] := y[a]
x[sum(a)] := a in [4,5,6]
?[sum(a)] := x[a]
"#, Default::default()).unwrap().rows;
assert_eq!(res[0][0], json!(21.))
}
#[test]
fn test_conditions() {
let _ = env_logger::builder().is_test(true).try_init();
let db = new_cozo_mem().unwrap();
db.run_script(r#"
{
?[code] <- [['a'],['b'],['c']]
:create airport {code}
}
{
?[fr, to, dist] <- [['a', 'b', 1.1], ['a', 'c', 0.5], ['b', 'c', 9.1]]
:create route {fr, to => dist}
}
"#, Default::default()).unwrap();
debug!("real test begins");
let res = db.run_script(r#"
r[code, dist] := *airport{code}, *route{fr: code, dist};
?[dist] := r['a', dist], dist > 0.5, dist <= 1.1;
"#, Default::default()).unwrap().rows;
dbg!(res);
}
}

@ -8,6 +8,5 @@
pub(crate) mod db;
pub(crate) mod transact;
pub(crate) mod in_mem;
pub(crate) mod relation;
// pub(crate) mod temp_store;
pub(crate) mod temp_store;

@ -8,24 +8,20 @@
use std::cmp::Ordering;
use std::collections::btree_map::Entry;
use std::collections::BTreeMap;
use std::collections::Bound::Included;
use std::collections::{BTreeMap, BTreeSet};
use std::mem;
use std::ops::Bound::Excluded;
use either::{Left, Right};
use itertools::Itertools;
use miette::{Diagnostic, Result};
use thiserror::Error;
use miette::Result;
use crate::data::aggr::Aggregation;
use crate::data::symb::Symbol;
use crate::data::tuple::Tuple;
use crate::data::value::DataValue;
type Store = BTreeMap<Tuple, Tuple>;
#[derive(Default)]
#[derive(Default, Debug)]
pub(crate) struct NormalTempStore {
inner: BTreeMap<Tuple, bool>,
}
@ -36,11 +32,8 @@ impl NormalTempStore {
pub(crate) fn wrap(self) -> TempStore {
TempStore::Normal(self)
}
pub(crate) fn exists(&self, old: Option<&EpochStore>, key: &Tuple) -> bool {
// if let Some(old_store) = old {
// if old_store.
// }
todo!()
pub(crate) fn exists(&self, key: &Tuple) -> bool {
self.inner.contains_key(key)
}
fn range_iter(
@ -68,16 +61,16 @@ impl NormalTempStore {
self.inner.insert(tuple, true);
}
fn merge(&mut self, mut other: Self) {
if self.inner.len() < other.inner.len() {
if self.inner.is_empty() {
mem::swap(&mut self.inner, &mut other.inner);
}
if other.inner.is_empty() {
return;
}
// must do it in this order! cannot swap!
self.inner.extend(other.inner)
}
}
#[derive(Debug)]
pub(crate) struct MeetAggrStore {
inner: BTreeMap<Tuple, Tuple>,
aggregations: Vec<(Aggregation, Vec<DataValue>)>,
@ -90,14 +83,12 @@ impl MeetAggrStore {
pub(crate) fn wrap(self) -> TempStore {
TempStore::MeetAggr(self)
}
pub(crate) fn exists(&self, old: Option<&EpochStore>, key: &Tuple) -> bool {
// if let Some(old_store) = old {
// if old_store.
// }
todo!()
pub(crate) fn exists(&self, key: &Tuple) -> bool {
let truncated = &key[0..self.grouping_len];
self.inner.contains_key(truncated)
}
pub(crate) fn is_empty(&self, old: Option<&EpochStore>) -> bool {
todo!()
pub(crate) fn is_empty(&self) -> bool {
self.inner.is_empty()
}
pub(crate) fn new(aggrs: Vec<Option<(Aggregation, Vec<DataValue>)>>) -> Result<Self> {
let total_key_len = aggrs.len();
@ -113,12 +104,10 @@ impl MeetAggrStore {
})
}
fn merge(&mut self, mut other: Self) -> Result<()> {
// can switch the order because we are dealing with meet aggregations
if self.inner.len() < other.inner.len() {
mem::swap(&mut self.inner, &mut other.inner);
}
if other.inner.is_empty() {
return Ok(());
}
for (k, v) in other.inner {
match self.inner.entry(k) {
Entry::Vacant(ent) => {
@ -195,6 +184,7 @@ impl MeetAggrStore {
}
}
#[derive(Debug)]
pub(crate) enum TempStore {
Normal(NormalTempStore),
MeetAggr(MeetAggrStore),
@ -205,6 +195,12 @@ impl TempStore {
// pub(crate) fn new() -> Self {
// Self::Normal(NormalTempStore::default())
// }
fn exists(&self, key: &Tuple) -> bool {
match self {
TempStore::Normal(n) => n.exists(key),
TempStore::MeetAggr(m) => m.exists(key),
}
}
fn merge(&mut self, other: Self) -> Result<()> {
match (self, other) {
(TempStore::Normal(s), TempStore::Normal(o)) => {
@ -215,12 +211,12 @@ impl TempStore {
_ => unreachable!(),
}
}
fn is_empty(&self) -> bool {
match self {
TempStore::Normal(n) => n.inner.is_empty(),
TempStore::MeetAggr(m) => m.inner.is_empty(),
}
}
// fn is_empty(&self) -> bool {
// match self {
// TempStore::Normal(n) => n.inner.is_empty(),
// TempStore::MeetAggr(m) => m.inner.is_empty(),
// }
// }
fn range_iter(
&self,
lower: &Tuple,
@ -234,12 +230,31 @@ impl TempStore {
}
}
#[derive(Debug)]
pub(crate) struct EpochStore {
prev: TempStore,
delta: TempStore,
pub(crate) arity: usize,
}
impl EpochStore {
pub(crate) fn exists(&self, key: &Tuple) -> bool {
self.prev.exists(key) || self.delta.exists(key)
}
pub(crate) fn new_normal(arity: usize) -> Self {
Self {
prev: TempStore::Normal(NormalTempStore::default()),
delta: TempStore::Normal(NormalTempStore::default()),
arity,
}
}
pub(crate) fn new_meet(aggrs: &[Option<(Aggregation, Vec<DataValue>)>]) -> Result<Self> {
Ok(Self {
prev: TempStore::MeetAggr(MeetAggrStore::new(aggrs.to_vec())?),
delta: TempStore::MeetAggr(MeetAggrStore::new(aggrs.to_vec())?),
arity: aggrs.len(),
})
}
pub(crate) fn merge(&mut self, mut new: TempStore) -> Result<()> {
mem::swap(&mut new, &mut self.delta);
self.prev.merge(new)
@ -267,7 +282,10 @@ impl EpochStore {
upper.push(DataValue::Bot);
self.range_iter(prefix, &upper, true)
}
pub(crate) fn delta_prefix_iter(&self, prefix: &Tuple) -> impl Iterator<Item = TupleInIter<'_>> {
pub(crate) fn delta_prefix_iter(
&self,
prefix: &Tuple,
) -> impl Iterator<Item = TupleInIter<'_>> {
let mut upper = prefix.to_vec();
upper.push(DataValue::Bot);
self.delta_range_iter(prefix, &upper, true)
@ -278,6 +296,9 @@ impl EpochStore {
pub(crate) fn delta_all_iter(&self) -> impl Iterator<Item = TupleInIter<'_>> {
self.delta_prefix_iter(&vec![])
}
pub(crate) fn early_returned_iter(&self) -> impl Iterator<Item = TupleInIter<'_>> {
self.all_iter().filter(|t| !t.should_skip())
}
}
#[derive(Copy, Clone)]
@ -289,7 +310,7 @@ impl<'a> TupleInIter<'a> {
.get(idx)
.unwrap_or_else(|| self.1.get(idx - self.0.len()).unwrap())
}
pub(crate) fn should_skip(&self) -> bool {
fn should_skip(&self) -> bool {
self.2
}
pub(crate) fn into_tuple(self) -> Tuple {

@ -6,32 +6,22 @@
* You can obtain one at https://mozilla.org/MPL/2.0/.
*/
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use miette::Result;
use crate::data::tuple::TupleT;
use crate::data::value::DataValue;
use crate::runtime::in_mem::{InMemRelation, StoredRelationId};
use crate::runtime::relation::RelationId;
use crate::storage::StoreTx;
pub struct SessionTx<'a> {
pub(crate) tx: Box<dyn StoreTx<'a> + 'a>,
pub(crate) relation_store_id: Arc<AtomicU64>,
pub(crate) mem_store_id: Arc<AtomicU32>,
}
impl<'a> SessionTx<'a> {
pub(crate) fn new_rule_store(&self, arity: usize) -> InMemRelation {
let old_count = self.mem_store_id.fetch_add(1, Ordering::AcqRel);
let old_count = old_count & 0x00ff_ffffu32;
let ret = InMemRelation::new(StoredRelationId(old_count), arity);
ret.ensure_mem_db_for_epoch(0);
ret
}
pub(crate) fn load_last_relation_store_id(&self) -> Result<RelationId> {
let tuple = vec![DataValue::Null];
let t_encoded = tuple.encode_as_key(RelationId::SYSTEM);

@ -208,7 +208,7 @@ impl<'s> StoreTx<'s> for ReTx<'s> {
it_builder: |tbl| tbl.range(lower.to_vec()..upper.to_vec()).unwrap(),
}
.build();
todo!()
panic!()
// match tbl.range(lower.to_vec()..upper.to_vec()) {
// Ok(it) => Box::new(it.map(|(k, v)| Ok(decode_tuple_from_kv(k, v)))),
// Err(err) => Box::new(iter::once(Err(miette!(err)))),
@ -226,7 +226,7 @@ impl<'s> StoreTx<'s> for ReTx<'s> {
it_builder: |tbl| tbl.range(lower.to_vec()..upper.to_vec()).unwrap(),
}
.build();
todo!()
panic!()
// let tbl = &*inner.tbl_ptr.unwrap();
// match tbl.range(lower.to_vec()..upper.to_vec()) {
// Ok(it) => Box::new(it.map(|(k, v)| Ok(decode_tuple_from_kv(k, v)))),
@ -244,7 +244,7 @@ impl<'s> StoreTx<'s> for ReTx<'s> {
where
's: 'a,
{
todo!()
panic!()
// match self {
// ReTx::Read(inner) => unsafe {
// let tbl = &*inner.tbl_ptr.unwrap();
@ -254,7 +254,7 @@ impl<'s> StoreTx<'s> for ReTx<'s> {
// }
// },
// ReTx::Write(inner) => unsafe {
// todo!()
// panic!()
// // let tbl = &*inner.tbl_ptr.unwrap();
// // match tbl.range(lower.to_vec()..upper.to_vec()) {
// // Ok(it) => Box::new(it.map(|(k, v)| Ok((k.to_vec(), v.to_vec())))),

Loading…
Cancel
Save