From 53ada74a529c52bb8cb0a5015742a2360c7ae8da Mon Sep 17 00:00:00 2001 From: Ziyang Hu Date: Thu, 8 Dec 2022 00:25:56 +0800 Subject: [PATCH] migrated to new stores --- cozo-core/src/algo/all_pairs_shortest_path.rs | 14 +- cozo-core/src/algo/astar.rs | 23 +- cozo-core/src/algo/bfs.rs | 8 +- cozo-core/src/algo/constant.rs | 8 +- cozo-core/src/algo/csv.rs | 8 +- cozo-core/src/algo/degree_centrality.rs | 9 +- cozo-core/src/algo/dfs.rs | 8 +- cozo-core/src/algo/jlines.rs | 8 +- cozo-core/src/algo/kruskal.rs | 19 +- cozo-core/src/algo/label_propagation.rs | 8 +- cozo-core/src/algo/louvain.rs | 8 +- cozo-core/src/algo/mod.rs | 20 +- cozo-core/src/algo/pagerank.rs | 11 +- cozo-core/src/algo/prim.rs | 19 +- cozo-core/src/algo/random_walk.rs | 19 +- cozo-core/src/algo/reorder_sort.rs | 8 +- cozo-core/src/algo/shortest_path_dijkstra.rs | 10 +- .../src/algo/strongly_connected_components.rs | 10 +- cozo-core/src/algo/top_sort.rs | 8 +- cozo-core/src/algo/triangles.rs | 21 +- cozo-core/src/algo/yen.rs | 10 +- cozo-core/src/data/aggr.rs | 2 +- cozo-core/src/data/functions.rs | 3 - cozo-core/src/data/json.rs | 1 - cozo-core/src/data/memcmp.rs | 3 - cozo-core/src/data/program.rs | 4 +- cozo-core/src/data/value.rs | 4 - cozo-core/src/query/eval.rs | 208 ++++++++-------- cozo-core/src/query/ra.rs | 223 +++++++++--------- cozo-core/src/query/sort.rs | 6 +- cozo-core/src/query/stored.rs | 8 +- cozo-core/src/runtime/db.rs | 74 ++++-- cozo-core/src/runtime/mod.rs | 3 +- cozo-core/src/runtime/temp_store.rs | 87 ++++--- cozo-core/src/runtime/transact.rs | 12 +- cozo-core/src/storage/re.rs | 8 +- 36 files changed, 467 insertions(+), 436 deletions(-) diff --git a/cozo-core/src/algo/all_pairs_shortest_path.rs b/cozo-core/src/algo/all_pairs_shortest_path.rs index 691044ba..1e215064 100644 --- a/cozo-core/src/algo/all_pairs_shortest_path.rs +++ b/cozo-core/src/algo/all_pairs_shortest_path.rs @@ -25,7 +25,7 @@ use crate::data::symb::Symbol; use crate::data::value::DataValue; use crate::parse::SourceSpan; use crate::runtime::db::Poison; -use crate::runtime::in_mem::InMemRelation; +use crate::runtime::temp_store::{EpochStore, NormalTempStore}; use crate::runtime::transact::SessionTx; pub(crate) struct BetweennessCentrality; @@ -35,8 +35,8 @@ impl AlgoImpl for BetweennessCentrality { &mut self, tx: &'a SessionTx<'_>, algo: &'a MagicAlgoApply, - stores: &'a BTreeMap, - out: &'a InMemRelation, + stores: &'a BTreeMap, + out: &'a mut NormalTempStore, poison: Poison, ) -> Result<()> { let edges = algo.relation(0)?; @@ -86,7 +86,7 @@ impl AlgoImpl for BetweennessCentrality { for (i, s) in centrality.into_iter().enumerate() { let node = indices[i].clone(); - out.put(vec![node, s.into()], 0); + out.put(vec![node, s.into()]); } Ok(()) @@ -109,8 +109,8 @@ impl AlgoImpl for ClosenessCentrality { &mut self, tx: &'a SessionTx<'_>, algo: &'a MagicAlgoApply, - stores: &'a BTreeMap, - out: &'a InMemRelation, + stores: &'a BTreeMap, + out: &'a mut NormalTempStore, poison: Poison, ) -> Result<()> { let edges = algo.relation(0)?; @@ -137,7 +137,7 @@ impl AlgoImpl for ClosenessCentrality { }) .collect::>()?; for (idx, centrality) in res.into_iter().enumerate() { - out.put(vec![indices[idx].clone(), DataValue::from(centrality)], 0); + out.put(vec![indices[idx].clone(), DataValue::from(centrality)]); poison.check()?; } Ok(()) diff --git a/cozo-core/src/algo/astar.rs b/cozo-core/src/algo/astar.rs index a0c3154d..b64dbf8b 100644 --- a/cozo-core/src/algo/astar.rs +++ b/cozo-core/src/algo/astar.rs @@ -22,7 +22,7 @@ use crate::data::tuple::Tuple; use crate::data::value::DataValue; use crate::parse::SourceSpan; use crate::runtime::db::Poison; -use crate::runtime::in_mem::InMemRelation; +use crate::runtime::temp_store::{EpochStore, NormalTempStore}; use crate::runtime::transact::SessionTx; pub(crate) struct ShortestPathAStar; @@ -32,8 +32,8 @@ impl AlgoImpl for ShortestPathAStar { &mut self, tx: &'a SessionTx<'_>, algo: &'a MagicAlgoApply, - stores: &'a BTreeMap, - out: &'a InMemRelation, + stores: &'a BTreeMap, + out: &'a mut NormalTempStore, poison: Poison, ) -> Result<()> { let edges = algo.relation_with_min_len(0, 3, tx, stores)?; @@ -60,15 +60,12 @@ impl AlgoImpl for ShortestPathAStar { stores, poison.clone(), )?; - out.put( - vec![ - start[0].clone(), - goal[0].clone(), - DataValue::from(cost), - DataValue::List(path), - ], - 0, - ); + out.put(vec![ + start[0].clone(), + goal[0].clone(), + DataValue::from(cost), + DataValue::List(path), + ]); } } @@ -92,7 +89,7 @@ fn astar<'a>( nodes: &'a MagicAlgoRuleArg, heuristic: &Expr, tx: &'a SessionTx<'_>, - stores: &'a BTreeMap, + stores: &'a BTreeMap, poison: Poison, ) -> Result<(f64, Vec)> { let start_node = &starting[0]; diff --git a/cozo-core/src/algo/bfs.rs b/cozo-core/src/algo/bfs.rs index fc760fe3..34000d17 100644 --- a/cozo-core/src/algo/bfs.rs +++ b/cozo-core/src/algo/bfs.rs @@ -18,7 +18,7 @@ use crate::data::symb::Symbol; use crate::data::value::DataValue; use crate::parse::SourceSpan; use crate::runtime::db::Poison; -use crate::runtime::in_mem::InMemRelation; +use crate::runtime::temp_store::{EpochStore, NormalTempStore}; use crate::runtime::transact::SessionTx; pub(crate) struct Bfs; @@ -28,8 +28,8 @@ impl AlgoImpl for Bfs { &mut self, tx: &'a SessionTx<'_>, algo: &'a MagicAlgoApply, - stores: &'a BTreeMap, - out: &'a InMemRelation, + stores: &'a BTreeMap, + out: &'a mut NormalTempStore, poison: Poison, ) -> Result<()> { let edges = algo.relation_with_min_len(0, 2, tx, stores)?; @@ -103,7 +103,7 @@ impl AlgoImpl for Bfs { route.push(starting.clone()); route.reverse(); let tuple = vec![starting, ending, DataValue::List(route)]; - out.put(tuple, 0); + out.put(tuple); } Ok(()) } diff --git a/cozo-core/src/algo/constant.rs b/cozo-core/src/algo/constant.rs index e5386e42..6197d90e 100644 --- a/cozo-core/src/algo/constant.rs +++ b/cozo-core/src/algo/constant.rs @@ -19,7 +19,7 @@ use crate::data::symb::Symbol; use crate::data::value::DataValue; use crate::parse::SourceSpan; use crate::runtime::db::Poison; -use crate::runtime::in_mem::InMemRelation; +use crate::runtime::temp_store::{EpochStore, NormalTempStore}; use crate::runtime::transact::SessionTx; pub(crate) struct Constant; @@ -29,15 +29,15 @@ impl AlgoImpl for Constant { &mut self, _tx: &SessionTx<'_>, algo: &MagicAlgoApply, - _stores: &BTreeMap, - out: &InMemRelation, + _stores: &BTreeMap, + out: &mut NormalTempStore, _poison: Poison, ) -> Result<()> { let data = algo.expr_option("data", None).unwrap(); let data = data.get_const().unwrap().get_list().unwrap(); for row in data { let tuple = row.get_list().unwrap().into(); - out.put(tuple, 0) + out.put(tuple) } Ok(()) } diff --git a/cozo-core/src/algo/csv.rs b/cozo-core/src/algo/csv.rs index 2ab12d4e..b23c938f 100644 --- a/cozo-core/src/algo/csv.rs +++ b/cozo-core/src/algo/csv.rs @@ -25,7 +25,7 @@ use crate::data::symb::Symbol; use crate::data::value::DataValue; use crate::parse::{parse_type, SourceSpan}; use crate::runtime::db::Poison; -use crate::runtime::in_mem::InMemRelation; +use crate::runtime::temp_store::{EpochStore, NormalTempStore}; use crate::runtime::transact::SessionTx; pub(crate) struct CsvReader; @@ -35,8 +35,8 @@ impl AlgoImpl for CsvReader { &mut self, _tx: &SessionTx<'_>, algo: &MagicAlgoApply, - _stores: &BTreeMap, - out: &InMemRelation, + _stores: &BTreeMap, + out: &mut NormalTempStore, _poison: Poison, ) -> Result<()> { let delimiter = algo.string_option("delimiter", Some(","))?; @@ -148,7 +148,7 @@ impl AlgoImpl for CsvReader { } } } - out.put(out_tuple, 0); + out.put(out_tuple); Ok(()) }; diff --git a/cozo-core/src/algo/degree_centrality.rs b/cozo-core/src/algo/degree_centrality.rs index e67e2312..f4a88452 100644 --- a/cozo-core/src/algo/degree_centrality.rs +++ b/cozo-core/src/algo/degree_centrality.rs @@ -18,7 +18,7 @@ use crate::data::symb::Symbol; use crate::data::value::DataValue; use crate::parse::SourceSpan; use crate::runtime::db::Poison; -use crate::runtime::in_mem::InMemRelation; +use crate::runtime::temp_store::{EpochStore, NormalTempStore}; use crate::runtime::transact::SessionTx; pub(crate) struct DegreeCentrality; @@ -28,8 +28,8 @@ impl AlgoImpl for DegreeCentrality { &mut self, tx: &'a SessionTx<'_>, algo: &'a MagicAlgoApply, - stores: &'a BTreeMap, - out: &'a InMemRelation, + stores: &'a BTreeMap, + out: &'a mut NormalTempStore, poison: Poison, ) -> Result<()> { let it = algo @@ -66,8 +66,7 @@ impl AlgoImpl for DegreeCentrality { DataValue::from(out_d as i64), DataValue::from(in_d as i64), ]; - out.put(tuple, 0); - poison.check()?; + out.put(tuple); } Ok(()) } diff --git a/cozo-core/src/algo/dfs.rs b/cozo-core/src/algo/dfs.rs index 2612d57f..ec7d43ab 100644 --- a/cozo-core/src/algo/dfs.rs +++ b/cozo-core/src/algo/dfs.rs @@ -18,7 +18,7 @@ use crate::data::symb::Symbol; use crate::data::value::DataValue; use crate::parse::SourceSpan; use crate::runtime::db::Poison; -use crate::runtime::in_mem::InMemRelation; +use crate::runtime::temp_store::{EpochStore, NormalTempStore}; use crate::runtime::transact::SessionTx; pub(crate) struct Dfs; @@ -28,8 +28,8 @@ impl AlgoImpl for Dfs { &mut self, tx: &'a SessionTx<'_>, algo: &'a MagicAlgoApply, - stores: &'a BTreeMap, - out: &'a InMemRelation, + stores: &'a BTreeMap, + out: &'a mut NormalTempStore, poison: Poison, ) -> Result<()> { let edges = algo.relation_with_min_len(0, 2, tx, stores)?; @@ -105,7 +105,7 @@ impl AlgoImpl for Dfs { route.push(starting.clone()); route.reverse(); let tuple = vec![starting, ending, DataValue::List(route)]; - out.put(tuple, 0); + out.put(tuple); poison.check()?; } Ok(()) diff --git a/cozo-core/src/algo/jlines.rs b/cozo-core/src/algo/jlines.rs index 089ebde5..14cec7e2 100644 --- a/cozo-core/src/algo/jlines.rs +++ b/cozo-core/src/algo/jlines.rs @@ -28,7 +28,7 @@ use crate::data::symb::Symbol; use crate::data::value::DataValue; use crate::parse::SourceSpan; use crate::runtime::db::Poison; -use crate::runtime::in_mem::InMemRelation; +use crate::runtime::temp_store::{EpochStore, NormalTempStore}; use crate::runtime::transact::SessionTx; pub(crate) struct JsonReader; @@ -38,8 +38,8 @@ impl AlgoImpl for JsonReader { &mut self, _tx: &SessionTx<'_>, algo: &MagicAlgoApply, - _stores: &BTreeMap, - out: &InMemRelation, + _stores: &BTreeMap, + out: &mut NormalTempStore, _poison: Poison, ) -> Result<()> { let url = algo.string_option("url", None)?; @@ -85,7 +85,7 @@ impl AlgoImpl for JsonReader { }; ret.push(val); } - out.put(ret, 0); + out.put(ret); Ok(()) }; match url.strip_prefix("file://") { diff --git a/cozo-core/src/algo/kruskal.rs b/cozo-core/src/algo/kruskal.rs index c1e2c113..0dcdc0eb 100644 --- a/cozo-core/src/algo/kruskal.rs +++ b/cozo-core/src/algo/kruskal.rs @@ -22,7 +22,7 @@ use crate::data::symb::Symbol; use crate::data::value::DataValue; use crate::parse::SourceSpan; use crate::runtime::db::Poison; -use crate::runtime::in_mem::InMemRelation; +use crate::runtime::temp_store::{EpochStore, NormalTempStore}; use crate::runtime::transact::SessionTx; pub(crate) struct MinimumSpanningForestKruskal; @@ -32,8 +32,8 @@ impl AlgoImpl for MinimumSpanningForestKruskal { &mut self, tx: &'a SessionTx<'_>, algo: &'a MagicAlgoApply, - stores: &'a BTreeMap, - out: &'a InMemRelation, + stores: &'a BTreeMap, + out: &'a mut NormalTempStore, poison: Poison, ) -> Result<()> { let edges = algo.relation(0)?; @@ -44,14 +44,11 @@ impl AlgoImpl for MinimumSpanningForestKruskal { } let msp = kruskal(&graph, poison)?; for (src, dst, cost) in msp { - out.put( - vec![ - indices[src].clone(), - indices[dst].clone(), - DataValue::from(cost), - ], - 0, - ); + out.put(vec![ + indices[src].clone(), + indices[dst].clone(), + DataValue::from(cost), + ]); } Ok(()) diff --git a/cozo-core/src/algo/label_propagation.rs b/cozo-core/src/algo/label_propagation.rs index 9ab91f83..d006d823 100644 --- a/cozo-core/src/algo/label_propagation.rs +++ b/cozo-core/src/algo/label_propagation.rs @@ -20,7 +20,7 @@ use crate::data::symb::Symbol; use crate::data::value::DataValue; use crate::parse::SourceSpan; use crate::runtime::db::Poison; -use crate::runtime::in_mem::InMemRelation; +use crate::runtime::temp_store::{EpochStore, NormalTempStore}; use crate::runtime::transact::SessionTx; pub(crate) struct LabelPropagation; @@ -30,8 +30,8 @@ impl AlgoImpl for LabelPropagation { &mut self, tx: &'a SessionTx<'_>, algo: &'a MagicAlgoApply, - stores: &'a BTreeMap, - out: &'a InMemRelation, + stores: &'a BTreeMap, + out: &'a mut NormalTempStore, poison: Poison, ) -> Result<()> { let edges = algo.relation(0)?; @@ -42,7 +42,7 @@ impl AlgoImpl for LabelPropagation { let labels = label_propagation(&graph, max_iter, poison)?; for (idx, label) in labels.into_iter().enumerate() { let node = indices[idx].clone(); - out.put(vec![DataValue::from(label as i64), node], 0); + out.put(vec![DataValue::from(label as i64), node]); } Ok(()) } diff --git a/cozo-core/src/algo/louvain.rs b/cozo-core/src/algo/louvain.rs index eaf04de8..42bc9a69 100644 --- a/cozo-core/src/algo/louvain.rs +++ b/cozo-core/src/algo/louvain.rs @@ -20,7 +20,7 @@ use crate::data::symb::Symbol; use crate::data::value::DataValue; use crate::parse::SourceSpan; use crate::runtime::db::Poison; -use crate::runtime::in_mem::InMemRelation; +use crate::runtime::temp_store::{EpochStore, NormalTempStore}; use crate::runtime::transact::SessionTx; pub(crate) struct CommunityDetectionLouvain; @@ -30,8 +30,8 @@ impl AlgoImpl for CommunityDetectionLouvain { &mut self, tx: &'a SessionTx<'_>, algo: &'a MagicAlgoApply, - stores: &'a BTreeMap, - out: &'a InMemRelation, + stores: &'a BTreeMap, + out: &'a mut NormalTempStore, poison: Poison, ) -> Result<()> { let edges = algo.relation(0)?; @@ -65,7 +65,7 @@ impl AlgoImpl for CommunityDetectionLouvain { if let Some(l) = keep_depth { labels.truncate(l); } - out.put(vec![DataValue::List(labels), node], 0); + out.put(vec![DataValue::List(labels), node]); } Ok(()) diff --git a/cozo-core/src/algo/mod.rs b/cozo-core/src/algo/mod.rs index 14ee3718..e1212575 100644 --- a/cozo-core/src/algo/mod.rs +++ b/cozo-core/src/algo/mod.rs @@ -55,7 +55,7 @@ use crate::data::tuple::TupleIter; use crate::data::value::DataValue; use crate::parse::SourceSpan; use crate::runtime::db::Poison; -use crate::runtime::in_mem::InMemRelation; +use crate::runtime::temp_store::{EpochStore, NormalTempStore}; use crate::runtime::transact::SessionTx; #[cfg(feature = "graph-algo")] @@ -99,8 +99,8 @@ pub(crate) trait AlgoImpl { &mut self, tx: &'a SessionTx<'_>, algo: &'a MagicAlgoApply, - stores: &'a BTreeMap, - out: &'a InMemRelation, + stores: &'a BTreeMap, + out: &'a mut NormalTempStore, poison: Poison, ) -> Result<()>; fn arity( @@ -250,7 +250,7 @@ impl MagicAlgoRuleArg { undirected: bool, allow_negative_edges: bool, tx: &'a SessionTx<'_>, - stores: &'a BTreeMap, + stores: &'a BTreeMap, ) -> Result<( Vec>, Vec, @@ -335,7 +335,7 @@ impl MagicAlgoRuleArg { &'a self, undirected: bool, tx: &'a SessionTx<'_>, - stores: &'a BTreeMap, + stores: &'a BTreeMap, ) -> Result<(Vec>, Vec, BTreeMap)> { let mut graph: Vec> = vec![]; let mut indices: Vec = vec![]; @@ -375,7 +375,7 @@ impl MagicAlgoRuleArg { &'a self, prefix: &DataValue, tx: &'a SessionTx<'_>, - stores: &'a BTreeMap, + stores: &'a BTreeMap, ) -> Result> { Ok(match self { MagicAlgoRuleArg::InMem { name, .. } => { @@ -383,7 +383,7 @@ impl MagicAlgoRuleArg { RuleNotFoundError(name.symbol().to_string(), name.symbol().span) })?; let t = vec![prefix.clone()]; - Box::new(store.scan_prefix(&t)) + Box::new(store.prefix_iter(&t).map(|t| Ok(t.into_tuple()))) } MagicAlgoRuleArg::Stored { name, .. } => { let relation = tx.get_relation(name, false)?; @@ -395,7 +395,7 @@ impl MagicAlgoRuleArg { pub(crate) fn arity( &self, tx: &SessionTx<'_>, - stores: &BTreeMap, + stores: &BTreeMap, ) -> Result { Ok(match self { MagicAlgoRuleArg::InMem { name, .. } => { @@ -413,14 +413,14 @@ impl MagicAlgoRuleArg { pub(crate) fn iter<'a>( &'a self, tx: &'a SessionTx<'_>, - stores: &'a BTreeMap, + stores: &'a BTreeMap, ) -> Result> { Ok(match self { MagicAlgoRuleArg::InMem { name, .. } => { let store = stores.get(name).ok_or_else(|| { RuleNotFoundError(name.symbol().to_string(), name.symbol().span) })?; - Box::new(store.scan_all()) + Box::new(store.all_iter().map(|t| Ok(t.into_tuple()))) } MagicAlgoRuleArg::Stored { name, .. } => { let relation = tx.get_relation(name, false)?; diff --git a/cozo-core/src/algo/pagerank.rs b/cozo-core/src/algo/pagerank.rs index 04cfc32d..7d0b2cfd 100644 --- a/cozo-core/src/algo/pagerank.rs +++ b/cozo-core/src/algo/pagerank.rs @@ -24,7 +24,7 @@ use crate::data::symb::Symbol; use crate::data::value::DataValue; use crate::parse::SourceSpan; use crate::runtime::db::Poison; -use crate::runtime::in_mem::InMemRelation; +use crate::runtime::temp_store::{EpochStore, NormalTempStore}; use crate::runtime::transact::SessionTx; pub(crate) struct PageRank; @@ -34,8 +34,8 @@ impl AlgoImpl for PageRank { &mut self, tx: &'a SessionTx<'_>, algo: &'a MagicAlgoApply, - stores: &'a BTreeMap, - out: &'a InMemRelation, + stores: &'a BTreeMap, + out: &'a mut NormalTempStore, _poison: Poison, ) -> Result<()> { let edges = algo.relation(0)?; @@ -64,10 +64,7 @@ impl AlgoImpl for PageRank { ); for (idx, score) in ranks.iter().enumerate() { - out.put( - vec![indices[idx].clone(), DataValue::from(*score as f64)], - 0, - ); + out.put(vec![indices[idx].clone(), DataValue::from(*score as f64)]); } } #[cfg(not(feature = "rayon"))] diff --git a/cozo-core/src/algo/prim.rs b/cozo-core/src/algo/prim.rs index ef402b53..ab66e6ce 100644 --- a/cozo-core/src/algo/prim.rs +++ b/cozo-core/src/algo/prim.rs @@ -23,7 +23,7 @@ use crate::data::symb::Symbol; use crate::data::value::DataValue; use crate::parse::SourceSpan; use crate::runtime::db::Poison; -use crate::runtime::in_mem::InMemRelation; +use crate::runtime::temp_store::{EpochStore, NormalTempStore}; use crate::runtime::transact::SessionTx; pub(crate) struct MinimumSpanningTreePrim; @@ -33,8 +33,8 @@ impl AlgoImpl for MinimumSpanningTreePrim { &mut self, tx: &'a SessionTx<'_>, algo: &'a MagicAlgoApply, - stores: &'a BTreeMap, - out: &'a InMemRelation, + stores: &'a BTreeMap, + out: &'a mut NormalTempStore, poison: Poison, ) -> Result<()> { let edges = algo.relation(0)?; @@ -67,14 +67,11 @@ impl AlgoImpl for MinimumSpanningTreePrim { }; let msp = prim(&graph, starting, poison)?; for (src, dst, cost) in msp { - out.put( - vec![ - indices[src].clone(), - indices[dst].clone(), - DataValue::from(cost), - ], - 0, - ); + out.put(vec![ + indices[src].clone(), + indices[dst].clone(), + DataValue::from(cost), + ]); } Ok(()) } diff --git a/cozo-core/src/algo/random_walk.rs b/cozo-core/src/algo/random_walk.rs index b9e54c30..ccf7c55e 100644 --- a/cozo-core/src/algo/random_walk.rs +++ b/cozo-core/src/algo/random_walk.rs @@ -21,7 +21,7 @@ use crate::data::symb::Symbol; use crate::data::value::DataValue; use crate::parse::SourceSpan; use crate::runtime::db::Poison; -use crate::runtime::in_mem::InMemRelation; +use crate::runtime::temp_store::{EpochStore, NormalTempStore}; use crate::runtime::transact::SessionTx; pub(crate) struct RandomWalk; @@ -31,8 +31,8 @@ impl AlgoImpl for RandomWalk { &mut self, tx: &'a SessionTx<'_>, algo: &'a MagicAlgoApply, - stores: &'a BTreeMap, - out: &'a InMemRelation, + stores: &'a BTreeMap, + out: &'a mut NormalTempStore, poison: Poison, ) -> Result<()> { let edges = algo.relation_with_min_len(0, 2, tx, stores)?; @@ -118,14 +118,11 @@ impl AlgoImpl for RandomWalk { })??; poison.check()?; } - out.put( - vec![ - DataValue::from(counter), - start_node_key.clone(), - DataValue::List(path), - ], - 0, - ); + out.put(vec![ + DataValue::from(counter), + start_node_key.clone(), + DataValue::List(path), + ]); } } Ok(()) diff --git a/cozo-core/src/algo/reorder_sort.rs b/cozo-core/src/algo/reorder_sort.rs index 8eed220a..5ff9ab36 100644 --- a/cozo-core/src/algo/reorder_sort.rs +++ b/cozo-core/src/algo/reorder_sort.rs @@ -20,7 +20,7 @@ use crate::data::symb::Symbol; use crate::data::value::DataValue; use crate::parse::SourceSpan; use crate::runtime::db::Poison; -use crate::runtime::in_mem::InMemRelation; +use crate::runtime::temp_store::{EpochStore, NormalTempStore}; use crate::runtime::transact::SessionTx; pub(crate) struct ReorderSort; @@ -30,8 +30,8 @@ impl AlgoImpl for ReorderSort { &mut self, tx: &'a SessionTx<'_>, algo: &'a MagicAlgoApply, - stores: &'a BTreeMap, - out: &'a InMemRelation, + stores: &'a BTreeMap, + out: &'a mut NormalTempStore, poison: Poison, ) -> Result<()> { let in_rel = algo.relation(0)?; @@ -116,7 +116,7 @@ impl AlgoImpl for ReorderSort { } let mut out_t = vec![DataValue::from(if break_ties { count } else { rank } as i64)]; out_t.extend_from_slice(&val[0..val.len() - 1]); - out.put(out_t, 0); + out.put(out_t); poison.check()?; } Ok(()) diff --git a/cozo-core/src/algo/shortest_path_dijkstra.rs b/cozo-core/src/algo/shortest_path_dijkstra.rs index 65640b2e..3659e839 100644 --- a/cozo-core/src/algo/shortest_path_dijkstra.rs +++ b/cozo-core/src/algo/shortest_path_dijkstra.rs @@ -26,7 +26,7 @@ use crate::data::symb::Symbol; use crate::data::value::DataValue; use crate::parse::SourceSpan; use crate::runtime::db::Poison; -use crate::runtime::in_mem::InMemRelation; +use crate::runtime::temp_store::{EpochStore, NormalTempStore}; use crate::runtime::transact::SessionTx; pub(crate) struct ShortestPathDijkstra; @@ -36,8 +36,8 @@ impl AlgoImpl for ShortestPathDijkstra { &mut self, tx: &'a SessionTx<'_>, algo: &'a MagicAlgoApply, - stores: &'a BTreeMap, - out: &'a InMemRelation, + stores: &'a BTreeMap, + out: &'a mut NormalTempStore, poison: Poison, ) -> Result<()> { let edges = algo.relation(0)?; @@ -97,7 +97,7 @@ impl AlgoImpl for ShortestPathDijkstra { DataValue::from(cost), DataValue::List(path.into_iter().map(|u| indices[u].clone()).collect_vec()), ]; - out.put(t, 0) + out.put(t) } } } else { @@ -144,7 +144,7 @@ impl AlgoImpl for ShortestPathDijkstra { DataValue::from(cost), DataValue::List(path.into_iter().map(|u| indices[u].clone()).collect_vec()), ]; - out.put(t, 0) + out.put(t) } } } diff --git a/cozo-core/src/algo/strongly_connected_components.rs b/cozo-core/src/algo/strongly_connected_components.rs index bdb053e6..c143c2d4 100644 --- a/cozo-core/src/algo/strongly_connected_components.rs +++ b/cozo-core/src/algo/strongly_connected_components.rs @@ -22,7 +22,7 @@ use crate::data::tuple::Tuple; use crate::data::value::DataValue; use crate::parse::SourceSpan; use crate::runtime::db::Poison; -use crate::runtime::in_mem::InMemRelation; +use crate::runtime::temp_store::{EpochStore, NormalTempStore}; use crate::runtime::transact::SessionTx; #[cfg(feature = "graph-algo")] @@ -42,8 +42,8 @@ impl AlgoImpl for StronglyConnectedComponent { &mut self, tx: &'a SessionTx<'_>, algo: &'a MagicAlgoApply, - stores: &'a BTreeMap, - out: &'a InMemRelation, + stores: &'a BTreeMap, + out: &'a mut NormalTempStore, poison: Poison, ) -> Result<()> { let edges = algo.relation(0)?; @@ -56,7 +56,7 @@ impl AlgoImpl for StronglyConnectedComponent { for idx in cc { let val = indices.get(*idx).unwrap(); let tuple = vec![val.clone(), DataValue::from(grp_id as i64)]; - out.put(tuple, 0); + out.put(tuple); } } @@ -69,7 +69,7 @@ impl AlgoImpl for StronglyConnectedComponent { if !inv_indices.contains_key(&node) { inv_indices.insert(node.clone(), usize::MAX); let tuple = vec![node, DataValue::from(counter)]; - out.put(tuple, 0); + out.put(tuple); counter += 1; } } diff --git a/cozo-core/src/algo/top_sort.rs b/cozo-core/src/algo/top_sort.rs index 1fc10caa..abc7e3fc 100644 --- a/cozo-core/src/algo/top_sort.rs +++ b/cozo-core/src/algo/top_sort.rs @@ -18,7 +18,7 @@ use crate::data::symb::Symbol; use crate::data::value::DataValue; use crate::parse::SourceSpan; use crate::runtime::db::Poison; -use crate::runtime::in_mem::InMemRelation; +use crate::runtime::temp_store::{EpochStore, NormalTempStore}; use crate::runtime::transact::SessionTx; pub(crate) struct TopSort; @@ -28,8 +28,8 @@ impl AlgoImpl for TopSort { &mut self, tx: &'a SessionTx<'_>, algo: &'a MagicAlgoApply, - stores: &'a BTreeMap, - out: &'a InMemRelation, + stores: &'a BTreeMap, + out: &'a mut NormalTempStore, poison: Poison, ) -> Result<()> { let edges = algo.relation(0)?; @@ -41,7 +41,7 @@ impl AlgoImpl for TopSort { for (idx, val_id) in sorted.iter().enumerate() { let val = indices.get(*val_id).unwrap(); let tuple = vec![DataValue::from(idx as i64), val.clone()]; - out.put(tuple, 0); + out.put(tuple); } Ok(()) diff --git a/cozo-core/src/algo/triangles.rs b/cozo-core/src/algo/triangles.rs index 10b7c48d..2ad73205 100644 --- a/cozo-core/src/algo/triangles.rs +++ b/cozo-core/src/algo/triangles.rs @@ -20,7 +20,7 @@ use crate::data::symb::Symbol; use crate::data::value::DataValue; use crate::parse::SourceSpan; use crate::runtime::db::Poison; -use crate::runtime::in_mem::InMemRelation; +use crate::runtime::temp_store::{EpochStore, NormalTempStore}; use crate::runtime::transact::SessionTx; pub(crate) struct ClusteringCoefficients; @@ -30,8 +30,8 @@ impl AlgoImpl for ClusteringCoefficients { &mut self, tx: &'a SessionTx<'_>, algo: &'a MagicAlgoApply, - stores: &'a BTreeMap, - out: &'a InMemRelation, + stores: &'a BTreeMap, + out: &'a mut NormalTempStore, poison: Poison, ) -> Result<()> { let edges = algo.relation(0)?; @@ -40,15 +40,12 @@ impl AlgoImpl for ClusteringCoefficients { graph.into_iter().map(|e| e.into_iter().collect()).collect(); let coefficients = clustering_coefficients(&graph, poison)?; for (idx, (cc, n_triangles, degree)) in coefficients.into_iter().enumerate() { - out.put( - vec![ - indices[idx].clone(), - DataValue::from(cc), - DataValue::from(n_triangles as i64), - DataValue::from(degree as i64), - ], - 0, - ); + out.put(vec![ + indices[idx].clone(), + DataValue::from(cc), + DataValue::from(n_triangles as i64), + DataValue::from(degree as i64), + ]); } Ok(()) diff --git a/cozo-core/src/algo/yen.rs b/cozo-core/src/algo/yen.rs index 63fd1716..714d5a3c 100644 --- a/cozo-core/src/algo/yen.rs +++ b/cozo-core/src/algo/yen.rs @@ -22,7 +22,7 @@ use crate::data::symb::Symbol; use crate::data::value::DataValue; use crate::parse::SourceSpan; use crate::runtime::db::Poison; -use crate::runtime::in_mem::InMemRelation; +use crate::runtime::temp_store::{EpochStore, NormalTempStore}; use crate::runtime::transact::SessionTx; pub(crate) struct KShortestPathYen; @@ -32,8 +32,8 @@ impl AlgoImpl for KShortestPathYen { &mut self, tx: &'a SessionTx<'_>, algo: &'a MagicAlgoApply, - stores: &'a BTreeMap, - out: &'a InMemRelation, + stores: &'a BTreeMap, + out: &'a mut NormalTempStore, poison: Poison, ) -> Result<()> { let edges = algo.relation(0)?; @@ -75,7 +75,7 @@ impl AlgoImpl for KShortestPathYen { path.into_iter().map(|u| indices[u].clone()).collect_vec(), ), ]; - out.put(t, 0) + out.put(t) } } } @@ -106,7 +106,7 @@ impl AlgoImpl for KShortestPathYen { DataValue::from(cost), DataValue::List(path.into_iter().map(|u| indices[u].clone()).collect_vec()), ]; - out.put(t, 0) + out.put(t) } } } diff --git a/cozo-core/src/data/aggr.rs b/cozo-core/src/data/aggr.rs index 7b355b48..d1ecb3ff 100644 --- a/cozo-core/src/data/aggr.rs +++ b/cozo-core/src/data/aggr.rs @@ -1206,7 +1206,7 @@ impl Aggregation { name if name == AGGR_INTERSECTION.name => Box::new(MeetAggrIntersection), name if name == AGGR_SHORTEST.name => Box::new(MeetAggrShortest), name if name == AGGR_MIN_COST.name => Box::new(MeetAggrMinCost), - _ => unreachable!(), + name => unreachable!("{}", name), }); Ok(()) } diff --git a/cozo-core/src/data/functions.rs b/cozo-core/src/data/functions.rs index aecacd53..a200a2b8 100644 --- a/cozo-core/src/data/functions.rs +++ b/cozo-core/src/data/functions.rs @@ -49,7 +49,6 @@ fn ensure_same_value_type(a: &DataValue, b: &DataValue) -> Result<()> { | (Regex(_), Regex(_)) | (List(_), List(_)) | (Set(_), Set(_)) - | (Guard, Guard) | (Bot, Bot) ) { bail!( @@ -1272,7 +1271,6 @@ pub(crate) fn op_to_bool(args: &[DataValue]) -> Result { DataValue::Regex(r) => !r.0.as_str().is_empty(), DataValue::List(l) => !l.is_empty(), DataValue::Set(s) => !s.is_empty(), - DataValue::Guard => false, DataValue::Bot => false, })) } @@ -1290,7 +1288,6 @@ pub(crate) fn op_to_unity(args: &[DataValue]) -> Result { DataValue::Regex(r) => if r.0.as_str().is_empty() {0 } else { 1}, DataValue::List(l) => if l.is_empty() {0} else {1}, DataValue::Set(s) => if s.is_empty() {0} else {1}, - DataValue::Guard => 0, DataValue::Bot => 0, })) } diff --git a/cozo-core/src/data/json.rs b/cozo-core/src/data/json.rs index d355930d..e16e265d 100644 --- a/cozo-core/src/data/json.rs +++ b/cozo-core/src/data/json.rs @@ -89,7 +89,6 @@ impl From for JsonValue { JsonValue::Array(l.iter().map(|v| JsonValue::from(v.clone())).collect()) } DataValue::Bot => panic!("found bottom"), - DataValue::Guard => panic!("found guard"), DataValue::Set(l) => { JsonValue::Array(l.iter().map(|v| JsonValue::from(v.clone())).collect()) } diff --git a/cozo-core/src/data/memcmp.rs b/cozo-core/src/data/memcmp.rs index 87ee73bf..5e195a0a 100644 --- a/cozo-core/src/data/memcmp.rs +++ b/cozo-core/src/data/memcmp.rs @@ -26,7 +26,6 @@ const UUID_TAG: u8 = 0x08; const REGEX_TAG: u8 = 0x09; const LIST_TAG: u8 = 0x0A; const SET_TAG: u8 = 0x0B; -const GUARD_TAG: u8 = 0xFE; const BOT_TAG: u8 = 0xFF; const IS_FLOAT: u8 = 0b00010000; @@ -79,7 +78,6 @@ pub(crate) trait MemCmpEncoder: Write { } self.write_u8(INIT_TAG).unwrap() } - DataValue::Guard => self.write_u8(GUARD_TAG).unwrap(), DataValue::Bot => self.write_u8(BOT_TAG).unwrap(), } } @@ -271,7 +269,6 @@ impl DataValue { } (DataValue::Set(collected), &remaining[1..]) } - GUARD_TAG => (DataValue::Guard, remaining), BOT_TAG => (DataValue::Bot, remaining), _ => unreachable!("{:?}", bs), } diff --git a/cozo-core/src/data/program.rs b/cozo-core/src/data/program.rs index 4afdbef8..72889842 100644 --- a/cozo-core/src/data/program.rs +++ b/cozo-core/src/data/program.rs @@ -22,8 +22,8 @@ use crate::data::relation::StoredRelationMetadata; use crate::data::symb::{Symbol, PROG_ENTRY}; use crate::data::value::DataValue; use crate::parse::SourceSpan; -use crate::runtime::in_mem::InMemRelation; use crate::runtime::relation::InputRelationHandle; +use crate::runtime::temp_store::EpochStore; use crate::runtime::transact::SessionTx; #[derive(Debug, Clone, Eq, PartialEq)] @@ -277,7 +277,7 @@ impl MagicAlgoApply { idx: usize, len: usize, tx: &SessionTx<'_>, - stores: &BTreeMap, + stores: &BTreeMap, ) -> Result<&MagicAlgoRuleArg> { #[derive(Error, Diagnostic, Debug)] #[error("Input relation to algorithm has insufficient arity")] diff --git a/cozo-core/src/data/value.rs b/cozo-core/src/data/value.rs index 0b91ccd9..2c3305c1 100644 --- a/cozo-core/src/data/value.rs +++ b/cozo-core/src/data/value.rs @@ -98,7 +98,6 @@ pub enum DataValue { Regex(RegexWrapper), List(Vec), Set(BTreeSet), - Guard, Bot, } @@ -245,9 +244,6 @@ impl Display for DataValue { } DataValue::List(ls) => f.debug_list().entries(ls).finish(), DataValue::Set(s) => f.debug_list().entries(s).finish(), - DataValue::Guard => { - write!(f, "null") - } DataValue::Bot => write!(f, "null"), } } diff --git a/cozo-core/src/query/eval.rs b/cozo-core/src/query/eval.rs index 83dbd4ed..2faf2aee 100644 --- a/cozo-core/src/query/eval.rs +++ b/cozo-core/src/query/eval.rs @@ -7,7 +7,7 @@ */ use std::collections::btree_map::Entry; -use std::collections::{BTreeMap, BTreeSet}; +use std::collections::BTreeMap; use std::mem; use itertools::Itertools; @@ -22,7 +22,7 @@ use crate::data::value::DataValue; use crate::parse::SourceSpan; use crate::query::compile::{AggrKind, CompiledProgram, CompiledRule, CompiledRuleSet}; use crate::runtime::db::Poison; -use crate::runtime::in_mem::InMemRelation; +use crate::runtime::temp_store::{EpochStore, MeetAggrStore, NormalTempStore}; use crate::runtime::transact::SessionTx; pub(crate) struct QueryLimiter { @@ -56,33 +56,35 @@ impl<'a> SessionTx<'a> { total_num_to_take: Option, num_to_skip: Option, poison: Poison, - ) -> Result<(InMemRelation, bool)> { - let mut stores = BTreeMap::new(); + ) -> Result<(EpochStore, bool)> { + let mut stores: BTreeMap = BTreeMap::new(); let mut early_return = false; - let entry_symbol = MagicSymbol::Muggle { - inner: Symbol::new(PROG_ENTRY, SourceSpan(0, 0)), - }; for (stratum, cur_prog) in strata.iter().enumerate() { if stratum > 0 { // remove stores that have outlived their usefulness! stores = stores .into_iter() - .filter(|(name, _)| { - if *name == entry_symbol { - return true; - } - match store_lifetimes.get(name) { - None => false, - Some(n) => *n >= stratum, - } + .filter(|(name, _)| match store_lifetimes.get(name) { + None => false, + Some(n) => *n >= stratum, }) - .collect() + .collect(); + trace!("{:?}", stores); } for (rule_name, rule_set) in cur_prog { - stores.insert(rule_name.clone(), self.new_rule_store(rule_set.arity())); + let store = match rule_set.aggr_kind() { + AggrKind::None | AggrKind::Normal => EpochStore::new_normal(rule_set.arity()), + AggrKind::Meet => { + let rs = match rule_set { + CompiledRuleSet::Rules(rs) => rs, + _ => unreachable!(), + }; + EpochStore::new_meet(&rs[0].aggr)? + } + }; + stores.insert(rule_name.clone(), store); } debug!("stratum {}", stratum); - early_return = self.semi_naive_magic_evaluate( cur_prog, &mut stores, @@ -91,6 +93,9 @@ impl<'a> SessionTx<'a> { poison.clone(), )?; } + let entry_symbol = MagicSymbol::Muggle { + inner: Symbol::new(PROG_ENTRY, SourceSpan(0, 0)), + }; let ret_area = stores.remove(&entry_symbol).ok_or(NoEntryError)?; Ok((ret_area, early_return)) } @@ -98,7 +103,7 @@ impl<'a> SessionTx<'a> { fn semi_naive_magic_evaluate( &self, prog: &CompiledProgram, - stores: &mut BTreeMap, + stores: &mut BTreeMap, total_num_to_take: Option, num_to_skip: Option, poison: Poison, @@ -117,61 +122,65 @@ impl<'a> SessionTx<'a> { debug!("epoch {}", epoch); if epoch == 0 { for (k, compiled_ruleset) in prog.iter().rev() { - match compiled_ruleset { + let new_store = match compiled_ruleset { CompiledRuleSet::Rules(ruleset) => match compiled_ruleset.aggr_kind() { AggrKind::None => { - used_limiter = self.initial_rule_normal_eval( + let res = self.initial_rule_non_aggr_eval( k, ruleset, stores, &mut changed, &mut limiter, poison.clone(), - )? || used_limiter; + )?; + used_limiter = res.0 || used_limiter; + res.1.wrap() } AggrKind::Normal => { - used_limiter = self.initial_rule_aggr_eval( + let res = self.initial_rule_aggr_eval( k, ruleset, stores, &mut changed, &mut limiter, poison.clone(), - )? || used_limiter; + )?; + used_limiter = res.0 || used_limiter; + res.1.wrap() } AggrKind::Meet => { - self.initial_rule_meet_eval( + let new = self.initial_rule_meet_eval( k, ruleset, stores, &mut changed, poison.clone(), )?; + new.wrap() } }, CompiledRuleSet::Algo(algo_apply) => { let mut algo_impl = algo_apply.algo.get_impl()?; - let out = stores.get(k).unwrap(); - algo_impl.run(self, algo_apply, stores, out, poison.clone())?; + let mut out = NormalTempStore::default(); + algo_impl.run(self, algo_apply, stores, &mut out, poison.clone())?; + out.wrap() } - } + }; + let old_store = stores.get_mut(k).unwrap(); + old_store.merge(new_store)?; } } else { - for store in stores.values() { - store.ensure_mem_db_for_epoch(epoch); - } - mem::swap(&mut changed, &mut prev_changed); for (_k, v) in changed.iter_mut() { *v = false; } for (k, compiled_ruleset) in prog.iter().rev() { - match compiled_ruleset { + let new_store = match compiled_ruleset { CompiledRuleSet::Rules(ruleset) => { match compiled_ruleset.aggr_kind() { AggrKind::None => { - used_limiter = self.incremental_rule_normal_eval( + let res = self.incremental_rule_non_aggr_eval( k, ruleset, epoch, @@ -180,29 +189,35 @@ impl<'a> SessionTx<'a> { &mut changed, &mut limiter, poison.clone(), - )? || used_limiter; + )?; + used_limiter = res.0 || used_limiter; + res.1.wrap() } AggrKind::Meet => { - self.incremental_rule_meet_eval( + let new = self.incremental_rule_meet_eval( k, ruleset, - epoch, stores, &prev_changed, &mut changed, poison.clone(), )?; + new.wrap() } AggrKind::Normal => { // not doing anything + NormalTempStore::default().wrap() } } } CompiledRuleSet::Algo(_) => { // no need to do anything, algos are only calculated once + NormalTempStore::default().wrap() } - } + }; + let old_store = stores.get_mut(k).unwrap(); + old_store.merge(new_store)?; } } if changed.values().all(|rule_changed| !*rule_changed) { @@ -212,52 +227,54 @@ impl<'a> SessionTx<'a> { Ok(used_limiter) } /// returns true is early return is activated - fn initial_rule_normal_eval( + fn initial_rule_non_aggr_eval( &self, rule_symb: &MagicSymbol, ruleset: &[CompiledRule], - stores: &mut BTreeMap, + stores: &mut BTreeMap, changed: &mut BTreeMap<&MagicSymbol, bool>, limiter: &mut QueryLimiter, poison: Poison, - ) -> Result { - let store = stores.get(rule_symb).unwrap(); - let use_delta = BTreeSet::default(); + ) -> Result<(bool, NormalTempStore)> { + let mut out_store = NormalTempStore::default(); let should_check_limit = limiter.total.is_some() && rule_symb.is_prog_entry(); for (rule_n, rule) in ruleset.iter().enumerate() { debug!("initial calculation for rule {:?}.{}", rule_symb, rule_n); - for item_res in rule.relation.iter(self, Some(0), &use_delta, stores)? { + for item_res in rule.relation.iter(self, None, stores)? { let item = item_res?; trace!("item for {:?}.{}: {:?} at {}", rule_symb, rule_n, item, 0); if should_check_limit { - if !store.exists(&item, 0) { - store.put_with_skip(item, limiter.should_skip_next()); + if !out_store.exists(&item) { + if limiter.should_skip_next() { + out_store.put_with_skip(item); + } else { + out_store.put(item); + } if limiter.incr_and_should_stop() { trace!("early stopping due to result count limit exceeded"); - return Ok(true); + return Ok((true, out_store)); } } } else { - store.put(item, 0); + out_store.put(item); } *changed.get_mut(rule_symb).unwrap() = true; } poison.check()?; } - Ok(should_check_limit) + Ok((should_check_limit, out_store)) } fn initial_rule_meet_eval( &self, rule_symb: &MagicSymbol, ruleset: &[CompiledRule], - stores: &mut BTreeMap, + stores: &mut BTreeMap, changed: &mut BTreeMap<&MagicSymbol, bool>, poison: Poison, - ) -> Result<()> { - let store = stores.get(rule_symb).unwrap(); - let use_delta = BTreeSet::default(); + ) -> Result { + let mut out_store = MeetAggrStore::new(ruleset[0].aggr.clone())?; for (rule_n, rule) in ruleset.iter().enumerate() { debug!("initial calculation for rule {:?}.{}", rule_symb, rule_n); @@ -265,16 +282,16 @@ impl<'a> SessionTx<'a> { for (aggr, args) in aggr.iter_mut().flatten() { aggr.meet_init(args)?; } - for item_res in rule.relation.iter(self, Some(0), &use_delta, stores)? { + for item_res in rule.relation.iter(self, None, stores)? { let item = item_res?; trace!("item for {:?}.{}: {:?} at {}", rule_symb, rule_n, item, 0); - store.aggr_meet_put(&item, &mut aggr, 0)?; + out_store.meet_put(item)?; *changed.get_mut(rule_symb).unwrap() = true; } poison.check()?; } - if store.is_empty() && ruleset[0].aggr.iter().all(|a| a.is_some()) { + if out_store.is_empty() && ruleset[0].aggr.iter().all(|a| a.is_some()) { let mut aggr = ruleset[0].aggr.clone(); for (aggr, args) in aggr.iter_mut().flatten() { aggr.meet_init(args)?; @@ -287,21 +304,20 @@ impl<'a> SessionTx<'a> { Ok(op.init_val()) }) .try_collect()?; - store.aggr_meet_put(&value, &mut aggr, 0)?; + out_store.meet_put(value)?; } - Ok(()) + Ok(out_store) } fn initial_rule_aggr_eval( &self, rule_symb: &MagicSymbol, ruleset: &[CompiledRule], - stores: &mut BTreeMap, + stores: &mut BTreeMap, changed: &mut BTreeMap<&MagicSymbol, bool>, limiter: &mut QueryLimiter, poison: Poison, - ) -> Result { - let store = stores.get(rule_symb).unwrap(); - let use_delta = BTreeSet::default(); + ) -> Result<(bool, NormalTempStore)> { + let mut out_store = NormalTempStore::default(); let should_check_limit = limiter.total.is_some() && rule_symb.is_prog_entry(); let mut aggr_work: BTreeMap, Vec> = BTreeMap::new(); @@ -310,6 +326,7 @@ impl<'a> SessionTx<'a> { "Calculation for normal aggr rule {:?}.{}", rule_symb, rule_n ); + trace!("{:?}", rule); let keys_indices = rule .aggr @@ -331,7 +348,7 @@ impl<'a> SessionTx<'a> { }) .collect_vec(); - for item_res in rule.relation.iter(self, Some(0), &use_delta, stores)? { + for item_res in rule.relation.iter(self, None, stores)? { let item = item_res?; trace!("item for {:?}.{}: {:?} at {}", rule_symb, rule_n, item, 0); @@ -390,7 +407,7 @@ impl<'a> SessionTx<'a> { op.get() }) .try_collect()?; - store.put(empty_result, 0); + out_store.put(empty_result); } for (keys, aggrs) in aggr_work { @@ -406,31 +423,36 @@ impl<'a> SessionTx<'a> { .try_collect()?; let tuple = tuple_data; if should_check_limit { - if !store.exists(&tuple, 0) { - store.put_with_skip(tuple, limiter.should_skip_next()); + if !out_store.exists(&tuple) { + if limiter.should_skip_next() { + out_store.put_with_skip(tuple); + } else { + out_store.put(tuple); + } if limiter.incr_and_should_stop() { - return Ok(true); + return Ok((true, out_store)); } } // else, do nothing } else { - store.put(tuple, 0); + out_store.put(tuple); } } - Ok(should_check_limit) + Ok((should_check_limit, out_store)) } - fn incremental_rule_normal_eval( + fn incremental_rule_non_aggr_eval( &self, rule_symb: &MagicSymbol, ruleset: &[CompiledRule], epoch: u32, - stores: &mut BTreeMap, + stores: &mut BTreeMap, prev_changed: &BTreeMap<&MagicSymbol, bool>, changed: &mut BTreeMap<&MagicSymbol, bool>, limiter: &mut QueryLimiter, poison: Poison, - ) -> Result { - let store = stores.get(rule_symb).unwrap(); + ) -> Result<(bool, NormalTempStore)> { + let prev_store = stores.get(rule_symb).unwrap(); + let mut out_store = NormalTempStore::default(); let should_check_limit = limiter.total.is_some() && rule_symb.is_prog_entry(); for (rule_n, rule) in ruleset.iter().enumerate() { let mut should_do_calculation = false; @@ -446,12 +468,7 @@ impl<'a> SessionTx<'a> { continue; } - let mut aggr = rule.aggr.clone(); - for (aggr, args) in aggr.iter_mut().flatten() { - aggr.meet_init(args)?; - } - - for (delta_key, delta_store) in stores.iter() { + for (delta_key, _) in stores.iter() { if !rule.contained_rules.contains(delta_key) { continue; } @@ -459,11 +476,10 @@ impl<'a> SessionTx<'a> { "with delta {:?} for rule {:?}.{}", delta_key, rule_symb, rule_n ); - let use_delta = BTreeSet::from([delta_store.id]); - for item_res in rule.relation.iter(self, Some(epoch), &use_delta, stores)? { + for item_res in rule.relation.iter(self, Some(delta_key), stores)? { let item = item_res?; // improvement: the clauses can actually be evaluated in parallel - if store.exists(&item, 0) { + if prev_store.exists(&item) { trace!( "item for {:?}.{}: {:?} at {}, rederived", rule_symb, @@ -480,30 +496,33 @@ impl<'a> SessionTx<'a> { epoch ); *changed.get_mut(rule_symb).unwrap() = true; - store.put(item.clone(), epoch); - store.put_with_skip(item, limiter.should_skip_next()); + if limiter.should_skip_next() { + out_store.put_with_skip(item); + } else { + out_store.put(item); + } if should_check_limit && limiter.incr_and_should_stop() { trace!("early stopping due to result count limit exceeded"); - return Ok(true); + return Ok((true, out_store)); } } } poison.check()?; } } - Ok(should_check_limit) + Ok((should_check_limit, out_store)) } fn incremental_rule_meet_eval( &self, rule_symb: &MagicSymbol, ruleset: &[CompiledRule], - epoch: u32, - stores: &mut BTreeMap, + stores: &mut BTreeMap, prev_changed: &BTreeMap<&MagicSymbol, bool>, changed: &mut BTreeMap<&MagicSymbol, bool>, poison: Poison, - ) -> Result<()> { - let store = stores.get(rule_symb).unwrap(); + ) -> Result { + // let store = stores.get(rule_symb).unwrap(); + let mut out_store = MeetAggrStore::new(ruleset[0].aggr.clone())?; for (rule_n, rule) in ruleset.iter().enumerate() { let mut should_do_calculation = false; for d_rule in &rule.contained_rules { @@ -523,7 +542,7 @@ impl<'a> SessionTx<'a> { aggr.meet_init(args)?; } - for (delta_key, delta_store) in stores.iter() { + for (delta_key, _) in stores.iter() { if !rule.contained_rules.contains(delta_key) { continue; } @@ -531,11 +550,10 @@ impl<'a> SessionTx<'a> { "with delta {:?} for rule {:?}.{}", delta_key, rule_symb, rule_n ); - let use_delta = BTreeSet::from([delta_store.id]); - for item_res in rule.relation.iter(self, Some(epoch), &use_delta, stores)? { + for item_res in rule.relation.iter(self, Some(delta_key), stores)? { let item = item_res?; // improvement: the clauses can actually be evaluated in parallel - let aggr_changed = store.aggr_meet_put(&item, &mut aggr, epoch)?; + let aggr_changed = out_store.meet_put(item)?; if aggr_changed { *changed.get_mut(rule_symb).unwrap() = true; } @@ -543,6 +561,6 @@ impl<'a> SessionTx<'a> { poison.check()?; } } - Ok(()) + Ok(out_store) } } diff --git a/cozo-core/src/query/ra.rs b/cozo-core/src/query/ra.rs index de0b86f3..b85f67b7 100644 --- a/cozo-core/src/query/ra.rs +++ b/cozo-core/src/query/ra.rs @@ -25,8 +25,8 @@ use crate::data::symb::Symbol; use crate::data::tuple::{Tuple, TupleIter}; use crate::data::value::DataValue; use crate::parse::SourceSpan; -use crate::runtime::in_mem::{InMemRelation, StoredRelationId}; use crate::runtime::relation::RelationHandle; +use crate::runtime::temp_store::EpochStore; use crate::runtime::transact::SessionTx; use crate::utils::swap_option_result; @@ -113,9 +113,8 @@ impl UnificationRA { fn iter<'a>( &'a self, tx: &'a SessionTx<'_>, - epoch: Option, - use_delta: &BTreeSet, - stores: &'a BTreeMap + delta_rule: Option<&MagicSymbol>, + stores: &'a BTreeMap, ) -> Result> { let mut bindings = self.parent.bindings_after_eliminate(); bindings.push(self.binding.clone()); @@ -123,7 +122,7 @@ impl UnificationRA { Ok(if self.is_multi { let it = self .parent - .iter(tx, epoch, use_delta, stores)? + .iter(tx, delta_rule, stores)? .map_ok(move |tuple| -> Result> { let result_list = self.expr.eval(&tuple)?; let result_list = result_list.get_list().ok_or_else(|| { @@ -151,7 +150,7 @@ impl UnificationRA { } else { Box::new( self.parent - .iter(tx, epoch, use_delta, stores)? + .iter(tx, delta_rule, stores)? .map_ok(move |tuple| -> Result { let result = self.expr.eval(&tuple)?; let mut ret = tuple; @@ -204,15 +203,14 @@ impl FilteredRA { fn iter<'a>( &'a self, tx: &'a SessionTx<'_>, - epoch: Option, - use_delta: &BTreeSet, - stores: &'a BTreeMap + delta_rule: Option<&MagicSymbol>, + stores: &'a BTreeMap, ) -> Result> { let bindings = self.parent.bindings_after_eliminate(); let eliminate_indices = get_eliminate_indices(&bindings, &self.to_eliminate); Ok(Box::new( self.parent - .iter(tx, epoch, use_delta, stores)? + .iter(tx, delta_rule, stores)? .filter_map(move |tuple| match tuple { Ok(t) => { for p in self.pred.iter() { @@ -260,13 +258,13 @@ impl Debug for RelAlgebra { } } RelAlgebra::TempStore(r) => f - .debug_tuple("Derived") + .debug_tuple("TempStore") .field(&bindings) .field(&r.storage_key) .field(&r.filters) .finish(), RelAlgebra::Stored(r) => f - .debug_tuple("Derived") + .debug_tuple("Stored") .field(&bindings) .field(&r.storage.name) .field(&r.filters) @@ -356,7 +354,11 @@ impl RelAlgebra { pub(crate) fn cartesian_join(self, right: RelAlgebra, span: SourceSpan) -> Self { self.join(right, vec![], vec![], span) } - pub(crate) fn derived(bindings: Vec, storage_key: MagicSymbol, span: SourceSpan) -> Self { + pub(crate) fn derived( + bindings: Vec, + storage_key: MagicSymbol, + span: SourceSpan, + ) -> Self { Self::TempStore(TempStoreRA { bindings, storage_key, @@ -556,9 +558,8 @@ impl ReorderRA { fn iter<'a>( &'a self, tx: &'a SessionTx<'_>, - epoch: Option, - use_delta: &BTreeSet, - stores: &'a BTreeMap + delta_rule: Option<&MagicSymbol>, + stores: &'a BTreeMap, ) -> Result> { let old_order = self.relation.bindings_after_eliminate(); let old_order_indices: BTreeMap<_, _> = old_order @@ -575,16 +576,18 @@ impl ReorderRA { .expect("program logic error: reorder indices mismatch") }) .collect_vec(); - Ok(Box::new(self.relation.iter(tx, epoch, use_delta, stores)?.map_ok( - move |tuple| { - let old = tuple; - let new = reorder_indices - .iter() - .map(|i| old[*i].clone()) - .collect_vec(); - new - }, - ))) + Ok(Box::new( + self.relation + .iter(tx, delta_rule, stores)? + .map_ok(move |tuple| { + let old = tuple; + let new = reorder_indices + .iter() + .map(|i| old[*i].clone()) + .collect_vec(); + new + }), + )) } } @@ -1069,26 +1072,20 @@ impl TempStoreRA { fn iter<'a>( &'a self, - epoch: Option, - use_delta: &BTreeSet, - stores: &'a BTreeMap + delta_rule: Option<&MagicSymbol>, + stores: &'a BTreeMap, ) -> Result> { let storage = stores.get(&self.storage_key).unwrap(); - if epoch == Some(0) && use_delta.contains(&storage.id) { - return Ok(Box::new(iter::empty())); - } - let scan_epoch = match epoch { - None => 0, - Some(ep) => { - if use_delta.contains(&storage.id) { - ep - 1 - } else { - 0 - } - } + let scan_epoch = match delta_rule { + None => false, + Some(name) => *name == self.storage_key, + }; + let it = if scan_epoch { + Left(storage.delta_all_iter().map(|t| Ok(t.into_tuple()))) + } else { + Right(storage.all_iter().map(|t| Ok(t.into_tuple()))) }; - let it = storage.scan_all_for_epoch(scan_epoch); Ok(if self.filters.is_empty() { Box::new(it) } else { @@ -1100,7 +1097,7 @@ impl TempStoreRA { left_iter: TupleIter<'a>, (left_join_indices, right_join_indices): (Vec, Vec), eliminate_indices: BTreeSet, - stores: &'a BTreeMap + stores: &'a BTreeMap, ) -> Result> { let storage = stores.get(&self.storage_key).unwrap(); debug_assert!(!right_join_indices.is_empty()); @@ -1122,12 +1119,11 @@ impl TempStoreRA { .map(|i| tuple[*i].clone()) .collect_vec(); - 'outer: for found in storage.scan_prefix(&prefix) { - let found = found?; + 'outer: for found in storage.prefix_iter(&prefix) { for (left_idx, right_idx) in left_join_indices.iter().zip(right_join_indices.iter()) { - if tuple[*left_idx] != found[*right_idx] { + if tuple[*left_idx] != *found.get(*right_idx) { continue 'outer; } } @@ -1155,11 +1151,10 @@ impl TempStoreRA { )) } else { let mut right_join_vals = BTreeSet::new(); - for tuple in storage.scan_all() { - let tuple = tuple?; + for tuple in storage.all_iter() { let to_join: Box<[DataValue]> = right_join_indices .iter() - .map(|i| tuple[*i].clone()) + .map(|i| tuple.get(*i).clone()) .collect(); right_join_vals.insert(to_join); } @@ -1200,29 +1195,20 @@ impl TempStoreRA { left_iter: TupleIter<'a>, (left_join_indices, right_join_indices): (Vec, Vec), eliminate_indices: BTreeSet, - epoch: Option, - use_delta: &BTreeSet, - stores: &'a BTreeMap + delta_rule: Option<&MagicSymbol>, + stores: &'a BTreeMap, ) -> Result> { let storage = stores.get(&self.storage_key).unwrap(); - if epoch == Some(0) && use_delta.contains(&storage.id) { - return Ok(Box::new(iter::empty())); - } + let mut right_invert_indices = right_join_indices.iter().enumerate().collect_vec(); right_invert_indices.sort_by_key(|(_, b)| **b); let left_to_prefix_indices = right_invert_indices .into_iter() .map(|(a, _)| left_join_indices[a]) .collect_vec(); - let scan_epoch = match epoch { - None => 0, - Some(ep) => { - if use_delta.contains(&storage.id) { - ep - 1 - } else { - 0 - } - } + let scan_epoch = match delta_rule { + None => false, + Some(name) => *name == self.storage_key, }; let mut skip_range_check = false; let it = left_iter @@ -1241,13 +1227,23 @@ impl TempStoreRA { if !l_bound.iter().all(|v| *v == DataValue::Null) || !u_bound.iter().all(|v| *v == DataValue::Bot) { + let mut lower_bound = prefix.clone(); + lower_bound.extend(l_bound); + let mut upper_bound = prefix.clone(); + upper_bound.extend(u_bound); + let it = if scan_epoch { + Left(storage.delta_range_iter(&lower_bound, &upper_bound, true)) + } else { + Right(storage.range_iter(&lower_bound, &upper_bound, true)) + }; return Left( - storage - .scan_bounded_prefix_for_epoch( - &prefix, &l_bound, &u_bound, scan_epoch, - ) - .map(move |res_found| -> Result> { - let found = res_found?; + it.map(move |res_found| -> Result> { + if self.filters.is_empty() { + let mut ret = tuple.clone(); + ret.extend(res_found.into_iter().cloned()); + Ok(Some(ret)) + } else { + let found = res_found.into_tuple(); for p in self.filters.iter() { if !p.eval_pred(&found)? { return Ok(None); @@ -1256,17 +1252,28 @@ impl TempStoreRA { let mut ret = tuple.clone(); ret.extend(found); Ok(Some(ret)) - }) - .filter_map(swap_option_result), + } + }) + .filter_map(swap_option_result), ); } } skip_range_check = true; + + let it = if scan_epoch { + Left(storage.delta_prefix_iter(&prefix)) + } else { + Right(storage.prefix_iter(&prefix)) + }; + Right( - storage - .scan_prefix_for_epoch(&prefix, scan_epoch) - .map(move |res_found| -> Result> { - let found = res_found?; + it.map(move |res_found| -> Result> { + if self.filters.is_empty() { + let mut ret = tuple.clone(); + ret.extend(res_found.into_iter().cloned()); + Ok(Some(ret)) + } else { + let found = res_found.into_tuple(); for p in self.filters.iter() { if !p.eval_pred(&found)? { return Ok(None); @@ -1275,8 +1282,9 @@ impl TempStoreRA { let mut ret = tuple.clone(); ret.extend(found); Ok(Some(ret)) - }) - .filter_map(swap_option_result), + } + }) + .filter_map(swap_option_result), ) }) .flatten_ok() @@ -1395,19 +1403,18 @@ impl RelAlgebra { pub(crate) fn iter<'a>( &'a self, tx: &'a SessionTx<'_>, - epoch: Option, - use_delta: &BTreeSet, - stores: &'a BTreeMap + delta_rule: Option<&MagicSymbol>, + stores: &'a BTreeMap, ) -> Result> { match self { RelAlgebra::Fixed(f) => Ok(Box::new(f.data.iter().map(|t| Ok(t.clone())))), - RelAlgebra::TempStore(r) => r.iter(epoch, use_delta, stores), + RelAlgebra::TempStore(r) => r.iter(delta_rule, stores), RelAlgebra::Stored(v) => v.iter(tx), - RelAlgebra::Join(j) => j.iter(tx, epoch, use_delta, stores), - RelAlgebra::Reorder(r) => r.iter(tx, epoch, use_delta, stores), - RelAlgebra::Filter(r) => r.iter(tx, epoch, use_delta, stores), - RelAlgebra::NegJoin(r) => r.iter(tx, epoch, use_delta, stores), - RelAlgebra::Unification(r) => r.iter(tx, epoch, use_delta, stores), + RelAlgebra::Join(j) => j.iter(tx, delta_rule, stores), + RelAlgebra::Reorder(r) => r.iter(tx, delta_rule, stores), + RelAlgebra::Filter(r) => r.iter(tx, delta_rule, stores), + RelAlgebra::NegJoin(r) => r.iter(tx, delta_rule, stores), + RelAlgebra::Unification(r) => r.iter(tx, delta_rule, stores), } } } @@ -1474,9 +1481,8 @@ impl NegJoin { pub(crate) fn iter<'a>( &'a self, tx: &'a SessionTx<'_>, - epoch: Option, - use_delta: &BTreeSet, - stores: &'a BTreeMap + delta_rule: Option<&MagicSymbol>, + stores: &'a BTreeMap, ) -> Result> { let bindings = self.left.bindings_after_eliminate(); let eliminate_indices = get_eliminate_indices(&bindings, &self.to_eliminate); @@ -1490,10 +1496,10 @@ impl NegJoin { ) .unwrap(); r.neg_join( - self.left.iter(tx, epoch, use_delta, stores)?, + self.left.iter(tx, delta_rule, stores)?, join_indices, eliminate_indices, - stores + stores, ) } RelAlgebra::Stored(v) => { @@ -1506,7 +1512,7 @@ impl NegJoin { .unwrap(); v.neg_join( tx, - self.left.iter(tx, epoch, use_delta, stores)?, + self.left.iter(tx, delta_rule, stores)?, join_indices, eliminate_indices, ) @@ -1604,9 +1610,8 @@ impl InnerJoin { pub(crate) fn iter<'a>( &'a self, tx: &'a SessionTx<'_>, - epoch: Option, - use_delta: &BTreeSet, - stores: &'a BTreeMap + delta_rule: Option<&MagicSymbol>, + stores: &'a BTreeMap, ) -> Result> { let bindings = self.bindings(); let eliminate_indices = get_eliminate_indices(&bindings, &self.to_eliminate); @@ -1620,7 +1625,7 @@ impl InnerJoin { ) .unwrap(); f.join( - self.left.iter(tx, epoch, use_delta, stores)?, + self.left.iter(tx, delta_rule, stores)?, join_indices, eliminate_indices, ) @@ -1635,15 +1640,14 @@ impl InnerJoin { .unwrap(); if join_is_prefix(&join_indices.1) { r.prefix_join( - self.left.iter(tx, epoch, use_delta, stores)?, + self.left.iter(tx, delta_rule, stores)?, join_indices, eliminate_indices, - epoch, - use_delta, - stores + delta_rule, + stores, ) } else { - self.materialized_join(tx, eliminate_indices, epoch, use_delta, stores) + self.materialized_join(tx, eliminate_indices, delta_rule, stores) } } RelAlgebra::Stored(r) => { @@ -1658,17 +1662,17 @@ impl InnerJoin { let left_len = self.left.bindings_after_eliminate().len(); r.prefix_join( tx, - self.left.iter(tx, epoch, use_delta, stores)?, + self.left.iter(tx, delta_rule, stores)?, join_indices, eliminate_indices, left_len, ) } else { - self.materialized_join(tx, eliminate_indices, epoch, use_delta, stores) + self.materialized_join(tx, eliminate_indices, delta_rule, stores) } } RelAlgebra::Join(_) | RelAlgebra::Filter(_) | RelAlgebra::Unification(_) => { - self.materialized_join(tx, eliminate_indices, epoch, use_delta, stores) + self.materialized_join(tx, eliminate_indices, delta_rule, stores) } RelAlgebra::Reorder(_) => { panic!("joining on reordered") @@ -1682,9 +1686,8 @@ impl InnerJoin { &'a self, tx: &'a SessionTx<'_>, eliminate_indices: BTreeSet, - epoch: Option, - use_delta: &BTreeSet, - stores: &'a BTreeMap + delta_rule: Option<&MagicSymbol>, + stores: &'a BTreeMap, ) -> Result> { let right_bindings = self.right.bindings_after_eliminate(); let (left_join_indices, right_join_indices) = self @@ -1692,7 +1695,7 @@ impl InnerJoin { .join_indices(&self.left.bindings_after_eliminate(), &right_bindings) .unwrap(); - let mut left_iter = self.left.iter(tx, epoch, use_delta, stores)?; + let mut left_iter = self.left.iter(tx, delta_rule, stores)?; let left_cache = match left_iter.next() { None => return Ok(Box::new(iter::empty())), Some(Err(err)) => return Err(err), @@ -1717,7 +1720,7 @@ impl InnerJoin { self.mat_right_cache.borrow().clone() } else { let mut cache = BTreeSet::new(); - for item in self.right.iter(tx, epoch, use_delta, stores)? { + for item in self.right.iter(tx, delta_rule, stores)? { match item { Ok(tuple) => { let stored_tuple = right_store_indices diff --git a/cozo-core/src/query/sort.rs b/cozo-core/src/query/sort.rs index 60b1743d..589a119e 100644 --- a/cozo-core/src/query/sort.rs +++ b/cozo-core/src/query/sort.rs @@ -15,13 +15,13 @@ use miette::Result; use crate::data::program::SortDir; use crate::data::symb::Symbol; use crate::data::tuple::Tuple; -use crate::runtime::in_mem::InMemRelation; +use crate::runtime::temp_store::EpochStore; use crate::runtime::transact::SessionTx; impl<'a> SessionTx<'a> { pub(crate) fn sort_and_collect( &mut self, - original: InMemRelation, + original: EpochStore, sorters: &[(Symbol, SortDir)], head: &[Symbol], ) -> Result> { @@ -31,7 +31,7 @@ impl<'a> SessionTx<'a> { .map(|(k, dir)| (head_indices[k], *dir)) .collect_vec(); - let mut all_data: Vec<_> = original.scan_all().try_collect()?; + let mut all_data: Vec<_> = original.all_iter().map(|v| v.into_tuple()).collect_vec(); all_data.sort_by(|a, b| { for (idx, dir) in &idx_sorters { match a[*idx].cmp(&b[*idx]) { diff --git a/cozo-core/src/query/stored.rs b/cozo-core/src/query/stored.rs index 543811f2..ab1ae2a1 100644 --- a/cozo-core/src/query/stored.rs +++ b/cozo-core/src/query/stored.rs @@ -36,7 +36,7 @@ impl<'a> SessionTx<'a> { pub(crate) fn execute_relation<'s, S: Storage<'s>>( &mut self, db: &Db, - res_iter: impl Iterator>, + res_iter: impl Iterator, op: RelationOp, meta: &InputRelationHandle, headers: &[Symbol], @@ -110,7 +110,6 @@ impl<'a> SessionTx<'a> { let mut old_tuples: Vec = vec![]; for tuple in res_iter { - let tuple = tuple?; let extracted = key_extractors .iter() .map(|ex| ex.extract_data(&tuple)) @@ -193,8 +192,6 @@ impl<'a> SessionTx<'a> { key_extractors.extend(val_extractors); for tuple in res_iter { - let tuple = tuple?; - let extracted = key_extractors .iter() .map(|ex| ex.extract_data(&tuple)) @@ -242,7 +239,6 @@ impl<'a> SessionTx<'a> { )?; for tuple in res_iter { - let tuple = tuple?; let extracted = key_extractors .iter() .map(|ex| ex.extract_data(&tuple)) @@ -286,8 +282,6 @@ impl<'a> SessionTx<'a> { key_extractors.extend(val_extractors); for tuple in res_iter { - let tuple = tuple?; - let extracted = key_extractors .iter() .map(|ex| ex.extract_data(&tuple)) diff --git a/cozo-core/src/runtime/db.rs b/cozo-core/src/runtime/db.rs index 7bcc4b2f..ac3499cb 100644 --- a/cozo-core/src/runtime/db.rs +++ b/cozo-core/src/runtime/db.rs @@ -407,7 +407,6 @@ impl<'s, S: Storage<'s>> Db { fn transact(&'s self) -> Result> { let ret = SessionTx { tx: Box::new(self.db.transact(false)?), - mem_store_id: Default::default(), relation_store_id: self.relation_store_id.clone(), }; Ok(ret) @@ -415,7 +414,6 @@ impl<'s, S: Storage<'s>> Db { fn transact_write(&'s self) -> Result> { let ret = SessionTx { tx: Box::new(self.db.transact(true)?), - mem_store_id: Default::default(), relation_store_id: self.relation_store_id.clone(), }; Ok(ret) @@ -839,7 +837,7 @@ impl<'s, S: Storage<'s>> Db { }; // the real evaluation - let (result, early_return) = tx.stratified_magic_evaluate( + let (result_store, early_return) = tx.stratified_magic_evaluate( &compiled, store_lifetimes, total_num_to_take, @@ -851,22 +849,18 @@ impl<'s, S: Storage<'s>> Db { if let Some(assertion) = &input_program.out_opts.assertion { match assertion { QueryAssertion::AssertNone(span) => { - if let Some(tuple) = result.scan_all().next() { - let tuple = tuple?; - + if let Some(tuple) = result_store.all_iter().next() { #[derive(Debug, Error, Diagnostic)] #[error( "The query is asserted to return no result, but a tuple {0:?} is found" )] #[diagnostic(code(eval::assert_none_failure))] struct AssertNoneFailure(Tuple, #[label] SourceSpan); - bail!(AssertNoneFailure(tuple, *span)) + bail!(AssertNoneFailure(tuple.into_tuple(), *span)) } } QueryAssertion::AssertSome(span) => { - if let Some(tuple) = result.scan_all().next() { - let _ = tuple?; - } else { + if result_store.all_iter().next().is_none() { #[derive(Debug, Error, Diagnostic)] #[error("The query is asserted to return some results, but returned none")] #[diagnostic(code(eval::assert_some_failure))] @@ -881,7 +875,7 @@ impl<'s, S: Storage<'s>> Db { // sort outputs if required let entry_head = input_program.get_entry_out_head()?; let sorted_result = - tx.sort_and_collect(result, &input_program.out_opts.sorters, &entry_head)?; + tx.sort_and_collect(result_store, &input_program.out_opts.sorters, &entry_head)?; let sorted_iter = if let Some(offset) = input_program.out_opts.offset { Left(sorted_result.into_iter().skip(offset)) } else { @@ -892,7 +886,6 @@ impl<'s, S: Storage<'s>> Db { } else { Right(sorted_iter) }; - let sorted_iter = sorted_iter.map(Ok); if let Some((meta, relation_op)) = &input_program.out_opts.store_relation { let to_clear = tx .execute_relation( @@ -914,10 +907,10 @@ impl<'s, S: Storage<'s>> Db { } else { // not sorting outputs let rows: Vec> = sorted_iter - .map_ok(|tuple| -> Vec { + .map(|tuple| -> Vec { tuple.into_iter().map(JsonValue::from).collect() }) - .try_collect()?; + .collect_vec(); let headers: Vec = match input_program.get_entry_out_head() { Ok(headers) => headers.into_iter().map(|v| v.name.to_string()).collect(), Err(_) => match rows.get(0) { @@ -929,15 +922,23 @@ impl<'s, S: Storage<'s>> Db { } } else { let scan = if early_return { - Right(Left(result.scan_early_returned())) + Right(Left( + result_store.early_returned_iter().map(|t| t.into_tuple()), + )) } else if input_program.out_opts.limit.is_some() || input_program.out_opts.offset.is_some() { let limit = input_program.out_opts.limit.unwrap_or(usize::MAX); let offset = input_program.out_opts.offset.unwrap_or(0); - Right(Right(result.scan_all().skip(offset).take(limit))) + Right(Right( + result_store + .all_iter() + .skip(offset) + .take(limit) + .map(|t| t.into_tuple()), + )) } else { - Left(result.scan_all()) + Left(result_store.all_iter().map(|t| t.into_tuple())) }; if let Some((meta, relation_op)) = &input_program.out_opts.store_relation { @@ -960,10 +961,10 @@ impl<'s, S: Storage<'s>> Db { )) } else { let rows: Vec> = scan - .map_ok(|tuple| -> Vec { + .map(|tuple| -> Vec { tuple.into_iter().map(JsonValue::from).collect() }) - .try_collect()?; + .collect_vec(); let headers: Vec = match input_program.get_entry_out_head() { Ok(headers) => headers.into_iter().map(|v| v.name.to_string()).collect(), @@ -1108,6 +1109,7 @@ impl Poison { #[cfg(test)] mod tests { use itertools::Itertools; + use log::debug; use serde_json::json; use crate::new_cozo_mem; @@ -1181,4 +1183,38 @@ mod tests { .rows; assert_eq!(res, vec![vec![json!(null), json!(0)]]); } + #[test] + fn test_layers() { + let _ = env_logger::builder().is_test(true).try_init(); + + let db = new_cozo_mem().unwrap(); + let res = db.run_script(r#" + y[a] := a in [1,2,3] + x[sum(a)] := y[a] + x[sum(a)] := a in [4,5,6] + ?[sum(a)] := x[a] + "#, Default::default()).unwrap().rows; + assert_eq!(res[0][0], json!(21.)) + } + #[test] + fn test_conditions() { + let _ = env_logger::builder().is_test(true).try_init(); + let db = new_cozo_mem().unwrap(); + db.run_script(r#" + { + ?[code] <- [['a'],['b'],['c']] + :create airport {code} + } + { + ?[fr, to, dist] <- [['a', 'b', 1.1], ['a', 'c', 0.5], ['b', 'c', 9.1]] + :create route {fr, to => dist} + } + "#, Default::default()).unwrap(); + debug!("real test begins"); + let res = db.run_script(r#" + r[code, dist] := *airport{code}, *route{fr: code, dist}; + ?[dist] := r['a', dist], dist > 0.5, dist <= 1.1; + "#, Default::default()).unwrap().rows; + dbg!(res); + } } diff --git a/cozo-core/src/runtime/mod.rs b/cozo-core/src/runtime/mod.rs index 482e1a3f..7721cf3d 100644 --- a/cozo-core/src/runtime/mod.rs +++ b/cozo-core/src/runtime/mod.rs @@ -8,6 +8,5 @@ pub(crate) mod db; pub(crate) mod transact; -pub(crate) mod in_mem; pub(crate) mod relation; -// pub(crate) mod temp_store; +pub(crate) mod temp_store; diff --git a/cozo-core/src/runtime/temp_store.rs b/cozo-core/src/runtime/temp_store.rs index 898052ce..808cc0d5 100644 --- a/cozo-core/src/runtime/temp_store.rs +++ b/cozo-core/src/runtime/temp_store.rs @@ -8,24 +8,20 @@ use std::cmp::Ordering; use std::collections::btree_map::Entry; +use std::collections::BTreeMap; use std::collections::Bound::Included; -use std::collections::{BTreeMap, BTreeSet}; use std::mem; use std::ops::Bound::Excluded; use either::{Left, Right}; use itertools::Itertools; -use miette::{Diagnostic, Result}; -use thiserror::Error; +use miette::Result; use crate::data::aggr::Aggregation; -use crate::data::symb::Symbol; use crate::data::tuple::Tuple; use crate::data::value::DataValue; -type Store = BTreeMap; - -#[derive(Default)] +#[derive(Default, Debug)] pub(crate) struct NormalTempStore { inner: BTreeMap, } @@ -36,11 +32,8 @@ impl NormalTempStore { pub(crate) fn wrap(self) -> TempStore { TempStore::Normal(self) } - pub(crate) fn exists(&self, old: Option<&EpochStore>, key: &Tuple) -> bool { - // if let Some(old_store) = old { - // if old_store. - // } - todo!() + pub(crate) fn exists(&self, key: &Tuple) -> bool { + self.inner.contains_key(key) } fn range_iter( @@ -68,16 +61,16 @@ impl NormalTempStore { self.inner.insert(tuple, true); } fn merge(&mut self, mut other: Self) { - if self.inner.len() < other.inner.len() { + if self.inner.is_empty() { mem::swap(&mut self.inner, &mut other.inner); - } - if other.inner.is_empty() { return; } + // must do it in this order! cannot swap! self.inner.extend(other.inner) } } +#[derive(Debug)] pub(crate) struct MeetAggrStore { inner: BTreeMap, aggregations: Vec<(Aggregation, Vec)>, @@ -90,14 +83,12 @@ impl MeetAggrStore { pub(crate) fn wrap(self) -> TempStore { TempStore::MeetAggr(self) } - pub(crate) fn exists(&self, old: Option<&EpochStore>, key: &Tuple) -> bool { - // if let Some(old_store) = old { - // if old_store. - // } - todo!() + pub(crate) fn exists(&self, key: &Tuple) -> bool { + let truncated = &key[0..self.grouping_len]; + self.inner.contains_key(truncated) } - pub(crate) fn is_empty(&self, old: Option<&EpochStore>) -> bool { - todo!() + pub(crate) fn is_empty(&self) -> bool { + self.inner.is_empty() } pub(crate) fn new(aggrs: Vec)>>) -> Result { let total_key_len = aggrs.len(); @@ -113,12 +104,10 @@ impl MeetAggrStore { }) } fn merge(&mut self, mut other: Self) -> Result<()> { + // can switch the order because we are dealing with meet aggregations if self.inner.len() < other.inner.len() { mem::swap(&mut self.inner, &mut other.inner); } - if other.inner.is_empty() { - return Ok(()); - } for (k, v) in other.inner { match self.inner.entry(k) { Entry::Vacant(ent) => { @@ -195,6 +184,7 @@ impl MeetAggrStore { } } +#[derive(Debug)] pub(crate) enum TempStore { Normal(NormalTempStore), MeetAggr(MeetAggrStore), @@ -205,6 +195,12 @@ impl TempStore { // pub(crate) fn new() -> Self { // Self::Normal(NormalTempStore::default()) // } + fn exists(&self, key: &Tuple) -> bool { + match self { + TempStore::Normal(n) => n.exists(key), + TempStore::MeetAggr(m) => m.exists(key), + } + } fn merge(&mut self, other: Self) -> Result<()> { match (self, other) { (TempStore::Normal(s), TempStore::Normal(o)) => { @@ -215,12 +211,12 @@ impl TempStore { _ => unreachable!(), } } - fn is_empty(&self) -> bool { - match self { - TempStore::Normal(n) => n.inner.is_empty(), - TempStore::MeetAggr(m) => m.inner.is_empty(), - } - } + // fn is_empty(&self) -> bool { + // match self { + // TempStore::Normal(n) => n.inner.is_empty(), + // TempStore::MeetAggr(m) => m.inner.is_empty(), + // } + // } fn range_iter( &self, lower: &Tuple, @@ -234,12 +230,31 @@ impl TempStore { } } +#[derive(Debug)] pub(crate) struct EpochStore { prev: TempStore, delta: TempStore, + pub(crate) arity: usize, } impl EpochStore { + pub(crate) fn exists(&self, key: &Tuple) -> bool { + self.prev.exists(key) || self.delta.exists(key) + } + pub(crate) fn new_normal(arity: usize) -> Self { + Self { + prev: TempStore::Normal(NormalTempStore::default()), + delta: TempStore::Normal(NormalTempStore::default()), + arity, + } + } + pub(crate) fn new_meet(aggrs: &[Option<(Aggregation, Vec)>]) -> Result { + Ok(Self { + prev: TempStore::MeetAggr(MeetAggrStore::new(aggrs.to_vec())?), + delta: TempStore::MeetAggr(MeetAggrStore::new(aggrs.to_vec())?), + arity: aggrs.len(), + }) + } pub(crate) fn merge(&mut self, mut new: TempStore) -> Result<()> { mem::swap(&mut new, &mut self.delta); self.prev.merge(new) @@ -267,7 +282,10 @@ impl EpochStore { upper.push(DataValue::Bot); self.range_iter(prefix, &upper, true) } - pub(crate) fn delta_prefix_iter(&self, prefix: &Tuple) -> impl Iterator> { + pub(crate) fn delta_prefix_iter( + &self, + prefix: &Tuple, + ) -> impl Iterator> { let mut upper = prefix.to_vec(); upper.push(DataValue::Bot); self.delta_range_iter(prefix, &upper, true) @@ -278,6 +296,9 @@ impl EpochStore { pub(crate) fn delta_all_iter(&self) -> impl Iterator> { self.delta_prefix_iter(&vec![]) } + pub(crate) fn early_returned_iter(&self) -> impl Iterator> { + self.all_iter().filter(|t| !t.should_skip()) + } } #[derive(Copy, Clone)] @@ -289,7 +310,7 @@ impl<'a> TupleInIter<'a> { .get(idx) .unwrap_or_else(|| self.1.get(idx - self.0.len()).unwrap()) } - pub(crate) fn should_skip(&self) -> bool { + fn should_skip(&self) -> bool { self.2 } pub(crate) fn into_tuple(self) -> Tuple { diff --git a/cozo-core/src/runtime/transact.rs b/cozo-core/src/runtime/transact.rs index 7bbfb31d..30a5e299 100644 --- a/cozo-core/src/runtime/transact.rs +++ b/cozo-core/src/runtime/transact.rs @@ -6,32 +6,22 @@ * You can obtain one at https://mozilla.org/MPL/2.0/. */ -use std::sync::atomic::{AtomicU32, AtomicU64, Ordering}; +use std::sync::atomic::AtomicU64; use std::sync::Arc; use miette::Result; use crate::data::tuple::TupleT; use crate::data::value::DataValue; -use crate::runtime::in_mem::{InMemRelation, StoredRelationId}; use crate::runtime::relation::RelationId; use crate::storage::StoreTx; pub struct SessionTx<'a> { pub(crate) tx: Box + 'a>, pub(crate) relation_store_id: Arc, - pub(crate) mem_store_id: Arc, } impl<'a> SessionTx<'a> { - pub(crate) fn new_rule_store(&self, arity: usize) -> InMemRelation { - let old_count = self.mem_store_id.fetch_add(1, Ordering::AcqRel); - let old_count = old_count & 0x00ff_ffffu32; - let ret = InMemRelation::new(StoredRelationId(old_count), arity); - ret.ensure_mem_db_for_epoch(0); - ret - } - pub(crate) fn load_last_relation_store_id(&self) -> Result { let tuple = vec![DataValue::Null]; let t_encoded = tuple.encode_as_key(RelationId::SYSTEM); diff --git a/cozo-core/src/storage/re.rs b/cozo-core/src/storage/re.rs index 5c2368b6..22bc0e28 100644 --- a/cozo-core/src/storage/re.rs +++ b/cozo-core/src/storage/re.rs @@ -208,7 +208,7 @@ impl<'s> StoreTx<'s> for ReTx<'s> { it_builder: |tbl| tbl.range(lower.to_vec()..upper.to_vec()).unwrap(), } .build(); - todo!() + panic!() // match tbl.range(lower.to_vec()..upper.to_vec()) { // Ok(it) => Box::new(it.map(|(k, v)| Ok(decode_tuple_from_kv(k, v)))), // Err(err) => Box::new(iter::once(Err(miette!(err)))), @@ -226,7 +226,7 @@ impl<'s> StoreTx<'s> for ReTx<'s> { it_builder: |tbl| tbl.range(lower.to_vec()..upper.to_vec()).unwrap(), } .build(); - todo!() + panic!() // let tbl = &*inner.tbl_ptr.unwrap(); // match tbl.range(lower.to_vec()..upper.to_vec()) { // Ok(it) => Box::new(it.map(|(k, v)| Ok(decode_tuple_from_kv(k, v)))), @@ -244,7 +244,7 @@ impl<'s> StoreTx<'s> for ReTx<'s> { where 's: 'a, { - todo!() + panic!() // match self { // ReTx::Read(inner) => unsafe { // let tbl = &*inner.tbl_ptr.unwrap(); @@ -254,7 +254,7 @@ impl<'s> StoreTx<'s> for ReTx<'s> { // } // }, // ReTx::Write(inner) => unsafe { - // todo!() + // panic!() // // let tbl = &*inner.tbl_ptr.unwrap(); // // match tbl.range(lower.to_vec()..upper.to_vec()) { // // Ok(it) => Box::new(it.map(|(k, v)| Ok((k.to_vec(), v.to_vec())))),