From 7aaea2de426da868cc709d5b5e92961c89ce15f1 Mon Sep 17 00:00:00 2001 From: Ziyang Hu Date: Thu, 22 Dec 2022 17:33:39 +0800 Subject: [PATCH] time travel functionality --- cozo-core/src/cozoscript.pest | 3 +- cozo-core/src/data/aggr.rs | 1 + cozo-core/src/data/functions.rs | 74 ++--- cozo-core/src/data/memcmp.rs | 6 +- cozo-core/src/data/program.rs | 19 +- cozo-core/src/data/relation.rs | 8 + cozo-core/src/data/tuple.rs | 44 ++- cozo-core/src/data/value.rs | 18 +- .../algos/all_pairs_shortest_path.rs | 4 +- cozo-core/src/fixed_rule/algos/kruskal.rs | 2 +- .../src/fixed_rule/algos/label_propagation.rs | 2 +- cozo-core/src/fixed_rule/algos/louvain.rs | 2 +- cozo-core/src/fixed_rule/algos/pagerank.rs | 2 +- cozo-core/src/fixed_rule/algos/prim.rs | 2 +- .../algos/shortest_path_dijkstra.rs | 2 +- .../algos/strongly_connected_components.rs | 4 +- cozo-core/src/fixed_rule/algos/top_sort.rs | 4 +- cozo-core/src/fixed_rule/algos/triangles.rs | 4 +- cozo-core/src/fixed_rule/algos/yen.rs | 6 +- cozo-core/src/fixed_rule/mod.rs | 44 +-- .../src/fixed_rule/utilities/reorder_sort.rs | 5 +- cozo-core/src/parse/query.rs | 22 +- cozo-core/src/parse/schema.rs | 1 + cozo-core/src/query/compile.rs | 14 +- cozo-core/src/query/graph.rs | 2 +- cozo-core/src/query/magic.rs | 66 ++++- cozo-core/src/query/ra.rs | 275 +++++++++--------- cozo-core/src/runtime/relation.rs | 59 +++- cozo-core/src/storage/mem.rs | 134 ++++++++- cozo-core/src/storage/mod.rs | 24 ++ cozo-core/src/storage/rocks.rs | 61 +++- cozo-core/src/storage/sled.rs | 16 +- cozo-core/src/storage/sqlite.rs | 65 ++++- cozo-core/src/storage/tikv.rs | 16 +- cozorocks/build.rs | 6 +- 35 files changed, 739 insertions(+), 278 deletions(-) diff --git a/cozo-core/src/cozoscript.pest b/cozo-core/src/cozoscript.pest index b2da94a1..8285a70e 100644 --- a/cozo-core/src/cozoscript.pest +++ b/cozo-core/src/cozoscript.pest @@ -186,7 +186,7 @@ literal = _{ null | boolean | number | string} table_schema = {"{" ~ table_cols ~ ("=>" ~ table_cols)? ~ "}"} table_cols = {(table_col ~ ",")* ~ table_col?} table_col = {ident ~ (":" ~ col_type)? ~ (("default" ~ expr) | ("=" ~ out_arg))?} -col_type = {(any_type | bool_type | int_type | float_type | string_type | bytes_type | uuid_type | list_type | tuple_type) ~ "?"?} +col_type = {(any_type | bool_type | int_type | float_type | string_type | bytes_type | uuid_type | validity_type | list_type | tuple_type) ~ "?"?} col_type_with_term = {SOI ~ col_type ~ EOI} any_type = {"Any"} int_type = {"Int"} @@ -195,5 +195,6 @@ string_type = {"String"} bytes_type = {"Bytes"} uuid_type = {"Uuid"} bool_type = {"Bool"} +validity_type = {"Validity"} list_type = {"[" ~ col_type ~ (";" ~ expr)? ~ "]"} tuple_type = {"(" ~ (col_type ~ ",")* ~ col_type? ~ ")"} diff --git a/cozo-core/src/data/aggr.rs b/cozo-core/src/data/aggr.rs index b12cc603..fe821ed6 100644 --- a/cozo-core/src/data/aggr.rs +++ b/cozo-core/src/data/aggr.rs @@ -1164,6 +1164,7 @@ impl Aggregation { Ok(()) } pub(crate) fn normal_init(&mut self, args: &[DataValue]) -> Result<()> { + #[allow(clippy::box_default)] self.normal_op.replace(match self.name { name if name == AGGR_AND.name => Box::new(AggrAnd::default()), name if name == AGGR_OR.name => Box::new(AggrOr::default()), diff --git a/cozo-core/src/data/functions.rs b/cozo-core/src/data/functions.rs index 65195147..5cbd19ab 100644 --- a/cozo-core/src/data/functions.rs +++ b/cozo-core/src/data/functions.rs @@ -25,7 +25,7 @@ use uuid::v1::Timestamp; use crate::data::expr::Op; use crate::data::json::JsonValue; -use crate::data::value::{DataValue, Num, RegexWrapper, UuidWrapper}; +use crate::data::value::{DataValue, Num, RegexWrapper, UuidWrapper, Validity, ValidityTs}; macro_rules! define_op { ($name:ident, $min_arity:expr, $vararg:expr) => { @@ -112,8 +112,8 @@ define_op!(OP_GT, 2, false); pub(crate) fn op_gt(args: &[DataValue]) -> Result { ensure_same_value_type(&args[0], &args[1])?; Ok(DataValue::Bool(match (&args[0], &args[1]) { - (DataValue::Num(Num::Float(l)), DataValue::Num(Num::Int(r))) => *l as f64 > *r as f64, - (DataValue::Num(Num::Int(l)), DataValue::Num(Num::Float(r))) => *l as f64 > *r as f64, + (DataValue::Num(Num::Float(l)), DataValue::Num(Num::Int(r))) => *l > *r as f64, + (DataValue::Num(Num::Int(l)), DataValue::Num(Num::Float(r))) => *l as f64 > *r, (a, b) => a > b, })) } @@ -122,8 +122,8 @@ define_op!(OP_GE, 2, false); pub(crate) fn op_ge(args: &[DataValue]) -> Result { ensure_same_value_type(&args[0], &args[1])?; Ok(DataValue::Bool(match (&args[0], &args[1]) { - (DataValue::Num(Num::Float(l)), DataValue::Num(Num::Int(r))) => *l as f64 >= *r as f64, - (DataValue::Num(Num::Int(l)), DataValue::Num(Num::Float(r))) => *l as f64 >= *r as f64, + (DataValue::Num(Num::Float(l)), DataValue::Num(Num::Int(r))) => *l >= *r as f64, + (DataValue::Num(Num::Int(l)), DataValue::Num(Num::Float(r))) => *l as f64 >= *r, (a, b) => a >= b, })) } @@ -132,8 +132,8 @@ define_op!(OP_LT, 2, false); pub(crate) fn op_lt(args: &[DataValue]) -> Result { ensure_same_value_type(&args[0], &args[1])?; Ok(DataValue::Bool(match (&args[0], &args[1]) { - (DataValue::Num(Num::Float(l)), DataValue::Num(Num::Int(r))) => (*l as f64) < (*r as f64), - (DataValue::Num(Num::Int(l)), DataValue::Num(Num::Float(r))) => (*l as f64) < (*r as f64), + (DataValue::Num(Num::Float(l)), DataValue::Num(Num::Int(r))) => *l < (*r as f64), + (DataValue::Num(Num::Int(l)), DataValue::Num(Num::Float(r))) => (*l as f64) < *r, (a, b) => a < b, })) } @@ -142,8 +142,8 @@ define_op!(OP_LE, 2, false); pub(crate) fn op_le(args: &[DataValue]) -> Result { ensure_same_value_type(&args[0], &args[1])?; Ok(DataValue::Bool(match (&args[0], &args[1]) { - (DataValue::Num(Num::Float(l)), DataValue::Num(Num::Int(r))) => (*l as f64) <= (*r as f64), - (DataValue::Num(Num::Int(l)), DataValue::Num(Num::Float(r))) => (*l as f64) <= (*r as f64), + (DataValue::Num(Num::Float(l)), DataValue::Num(Num::Int(r))) => *l <= (*r as f64), + (DataValue::Num(Num::Int(l)), DataValue::Num(Num::Float(r))) => (*l as f64) <= *r, (a, b) => a <= b, })) } @@ -1031,9 +1031,9 @@ pub(crate) fn op_haversine(args: &[DataValue]) -> Result { let lon2 = args[3].get_float().ok_or_else(miette)?; let ret = 2. * f64::asin(f64::sqrt( - f64::sin((lat1 - lat2) / 2.).powi(2) - + f64::cos(lat1) * f64::cos(lat2) * f64::sin((lon1 - lon2) / 2.).powi(2), - )); + f64::sin((lat1 - lat2) / 2.).powi(2) + + f64::cos(lat1) * f64::cos(lat2) * f64::sin((lon1 - lon2) / 2.).powi(2), + )); Ok(DataValue::from(ret)) } @@ -1046,9 +1046,9 @@ pub(crate) fn op_haversine_deg_input(args: &[DataValue]) -> Result { let lon2 = args[3].get_float().ok_or_else(miette)? * f64::PI() / 180.; let ret = 2. * f64::asin(f64::sqrt( - f64::sin((lat1 - lat2) / 2.).powi(2) - + f64::cos(lat1) * f64::cos(lat2) * f64::sin((lon1 - lon2) / 2.).powi(2), - )); + f64::sin((lat1 - lat2) / 2.).powi(2) + + f64::cos(lat1) * f64::cos(lat2) * f64::sin((lon1 - lon2) / 2.).powi(2), + )); Ok(DataValue::from(ret)) } @@ -1277,25 +1277,23 @@ pub(crate) fn op_to_bool(args: &[DataValue]) -> Result { })) } - define_op!(OP_TO_UNITY, 1, false); pub(crate) fn op_to_unity(args: &[DataValue]) -> Result { Ok(DataValue::from(match &args[0] { DataValue::Null => 0, DataValue::Bool(b) => *b as i64, DataValue::Num(n) => (n.get_float() != 0.) as i64, - DataValue::Str(s) => if s.is_empty() { 0 } else { 1 }, - DataValue::Bytes(b) => if b.is_empty() { 0 } else { 1 }, - DataValue::Uuid(u) => if u.0.is_nil() { 0 } else { 1 }, - DataValue::Regex(r) => if r.0.as_str().is_empty() { 0 } else { 1 }, - DataValue::List(l) => if l.is_empty() { 0 } else { 1 }, - DataValue::Set(s) => if s.is_empty() { 0 } else { 1 }, - DataValue::Validity(vld) => if vld.is_assert { 1 } else { 0 }, + DataValue::Str(s) => i64::from(!s.is_empty()), + DataValue::Bytes(b) => i64::from(!b.is_empty()), + DataValue::Uuid(u) => i64::from(!u.0.is_nil()), + DataValue::Regex(r) => i64::from(!r.0.as_str().is_empty()), + DataValue::List(l) => i64::from(!l.is_empty()), + DataValue::Set(s) => i64::from(!s.is_empty()), + DataValue::Validity(vld) => i64::from(vld.is_assert), DataValue::Bot => 0, })) } - define_op!(OP_TO_FLOAT, 1, false); pub(crate) fn op_to_float(args: &[DataValue]) -> Result { Ok(match &args[0] { @@ -1479,31 +1477,33 @@ pub(crate) fn op_now(_args: &[DataValue]) -> Result { )) } -pub(crate) fn current_validity() -> Reverse { +pub(crate) fn current_validity() -> ValidityTs { #[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))] - let ts_millis = { + let ts_millis = { let now = SystemTime::now(); (now.duration_since(UNIX_EPOCH).unwrap().as_secs_f64() * 1000.) as i64 }; #[cfg(all(target_arch = "wasm32", target_os = "unknown"))] - let ts_millis = { - Date::now() as i64 - }; + let ts_millis = { Date::now() as i64 }; - Reverse(ts_millis) + ValidityTs(Reverse(ts_millis)) } -pub(crate) const MAX_VALIDITY: Reverse = Reverse(i64::MAX); +pub(crate) const MAX_VALIDITY_TS: ValidityTs = ValidityTs(Reverse(i64::MAX)); +pub(crate) const TERMINAL_VALIDITY: Validity = Validity { + timestamp: ValidityTs(Reverse(i64::MIN)), + is_assert: true, +}; define_op!(OP_FORMAT_TIMESTAMP, 1, true); pub(crate) fn op_format_timestamp(args: &[DataValue]) -> Result { #[cfg(all(target_arch = "wasm32", target_os = "unknown"))] - let dt = Utc + let dt = Utc .timestamp_millis_opt(Date::now() as i64) .latest() .ok_or_else(|| miette!("bad time input"))?; #[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))] - let dt = { + let dt = { let f = args[0] .get_float() .ok_or_else(|| miette!("'format_timestamp' expects a number"))?; @@ -1542,11 +1542,11 @@ pub(crate) fn op_parse_timestamp(args: &[DataValue]) -> Result { )) } -pub(crate) fn str2vld(s: &str) -> Result> { +pub(crate) fn str2vld(s: &str) -> Result { let dt = DateTime::parse_from_rfc3339(s).map_err(|_| miette!("bad datetime: {}", s))?; let st: SystemTime = dt.into(); let microseconds = st.duration_since(UNIX_EPOCH).unwrap().as_secs_f64() * 1_000_000.; - Ok(Reverse(microseconds as i64)) + Ok(ValidityTs(Reverse(microseconds as i64))) } define_op!(OP_RAND_UUID_V1, 0, false); @@ -1554,14 +1554,14 @@ pub(crate) fn op_rand_uuid_v1(_args: &[DataValue]) -> Result { let mut rng = rand::thread_rng(); let uuid_ctx = uuid::v1::Context::new(rng.gen()); #[cfg(all(target_arch = "wasm32", target_os = "unknown"))] - let ts = { + let ts = { let since_epoch: f64 = Date::now(); let seconds = since_epoch.floor(); let fractional = (since_epoch - seconds) * 1.0e9; Timestamp::from_unix(uuid_ctx, seconds as u64, fractional as u32) }; #[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))] - let ts = { + let ts = { let now = SystemTime::now(); let since_epoch = now.duration_since(UNIX_EPOCH).unwrap(); Timestamp::from_unix(uuid_ctx, since_epoch.as_secs(), since_epoch.subsec_nanos()) diff --git a/cozo-core/src/data/memcmp.rs b/cozo-core/src/data/memcmp.rs index d3d8c0a4..2cc648ef 100644 --- a/cozo-core/src/data/memcmp.rs +++ b/cozo-core/src/data/memcmp.rs @@ -14,7 +14,7 @@ use std::str::FromStr; use byteorder::{BigEndian, ByteOrder, WriteBytesExt}; use regex::Regex; -use crate::data::value::{DataValue, Num, RegexWrapper, UuidWrapper, Validity}; +use crate::data::value::{DataValue, Num, RegexWrapper, UuidWrapper, Validity, ValidityTs}; const INIT_TAG: u8 = 0x00; const NULL_TAG: u8 = 0x01; @@ -81,7 +81,7 @@ pub(crate) trait MemCmpEncoder: Write { self.write_u8(INIT_TAG).unwrap() } DataValue::Validity(vld) => { - let ts = vld.timestamp.0; + let ts = vld.timestamp.0 .0; let ts_u64 = order_encode_i64(ts); let ts_flipped = !ts_u64; self.write_u8(VLD_TAG).unwrap(); @@ -288,7 +288,7 @@ impl DataValue { let is_assert = *is_assert_byte != 0; ( DataValue::Validity(Validity { - timestamp: Reverse(ts), + timestamp: ValidityTs(Reverse(ts)), is_assert, }), rest, diff --git a/cozo-core/src/data/program.rs b/cozo-core/src/data/program.rs index 98e2448a..78777f3c 100644 --- a/cozo-core/src/data/program.rs +++ b/cozo-core/src/data/program.rs @@ -6,7 +6,6 @@ * You can obtain one at https://mozilla.org/MPL/2.0/. */ -use std::cmp::Reverse; use std::collections::btree_map::Entry; use std::collections::{BTreeMap, BTreeSet}; use std::fmt::{Debug, Display, Formatter}; @@ -22,7 +21,7 @@ use crate::data::aggr::Aggregation; use crate::data::expr::Expr; use crate::data::relation::StoredRelationMetadata; use crate::data::symb::{Symbol, PROG_ENTRY}; -use crate::data::value::DataValue; +use crate::data::value::{DataValue, ValidityTs}; use crate::fixed_rule::{FixedRule, FixedRuleHandle}; use crate::parse::SourceSpan; use crate::runtime::relation::InputRelationHandle; @@ -325,13 +324,13 @@ pub(crate) enum FixedRuleArg { Stored { name: Symbol, bindings: Vec, - valid_at: Option>, + valid_at: Option, span: SourceSpan, }, NamedStored { name: Symbol, bindings: BTreeMap, Symbol>, - valid_at: Option>, + valid_at: Option, span: SourceSpan, }, } @@ -376,7 +375,7 @@ pub(crate) enum MagicFixedRuleRuleArg { Stored { name: Symbol, bindings: Vec, - valid_at: Option>, + valid_at: Option, span: SourceSpan, }, } @@ -534,7 +533,7 @@ impl InputProgram { for (symb, aggr) in head.iter().zip(aggrs.iter()) { if let Some((aggr, _)) = aggr { ret.push(Symbol::new( - &format!( + format!( "{}({})", aggr.name .strip_prefix("AGGR_") @@ -995,7 +994,7 @@ pub(crate) struct InputRuleApplyAtom { pub(crate) struct InputNamedFieldRelationApplyAtom { pub(crate) name: Symbol, pub(crate) args: BTreeMap, Expr>, - pub(crate) valid_at: Option>, + pub(crate) valid_at: Option, pub(crate) span: SourceSpan, } @@ -1003,7 +1002,7 @@ pub(crate) struct InputNamedFieldRelationApplyAtom { pub(crate) struct InputRelationApplyAtom { pub(crate) name: Symbol, pub(crate) args: Vec, - pub(crate) valid_at: Option>, + pub(crate) valid_at: Option, pub(crate) span: SourceSpan, } @@ -1018,7 +1017,7 @@ pub(crate) struct NormalFormRuleApplyAtom { pub(crate) struct NormalFormRelationApplyAtom { pub(crate) name: Symbol, pub(crate) args: Vec, - pub(crate) valid_at: Option>, + pub(crate) valid_at: Option, pub(crate) span: SourceSpan, } @@ -1033,7 +1032,7 @@ pub(crate) struct MagicRuleApplyAtom { pub(crate) struct MagicRelationApplyAtom { pub(crate) name: Symbol, pub(crate) args: Vec, - pub(crate) valid_at: Option>, + pub(crate) valid_at: Option, pub(crate) span: SourceSpan, } diff --git a/cozo-core/src/data/relation.rs b/cozo-core/src/data/relation.rs index e957e722..d63e8447 100644 --- a/cozo-core/src/data/relation.rs +++ b/cozo-core/src/data/relation.rs @@ -32,6 +32,7 @@ impl Display for NullableColType { ColType::String => f.write_str("String")?, ColType::Bytes => f.write_str("Bytes")?, ColType::Uuid => f.write_str("Uuid")?, + ColType::Validity => f.write_str("Validity")?, ColType::List { eltype, len } => { f.write_str("[")?; write!(f, "{}", eltype)?; @@ -73,6 +74,7 @@ pub(crate) enum ColType { len: Option, }, Tuple(Vec), + Validity } #[derive(Debug, Clone, Eq, PartialEq, serde_derive::Deserialize, serde_derive::Serialize)] @@ -212,6 +214,12 @@ impl NullableColType { bail!(make_err()) } } + ColType::Validity => { + match data { + vld@DataValue::Validity(_) => vld, + _ => todo!() + } + } }) } } diff --git a/cozo-core/src/data/tuple.rs b/cozo-core/src/data/tuple.rs index 60bba24b..09cfea82 100644 --- a/cozo-core/src/data/tuple.rs +++ b/cozo-core/src/data/tuple.rs @@ -6,10 +6,11 @@ * You can obtain one at https://mozilla.org/MPL/2.0/. */ +use crate::data::functions::TERMINAL_VALIDITY; use miette::Result; use crate::data::memcmp::MemCmpEncoder; -use crate::data::value::DataValue; +use crate::data::value::{DataValue, Validity, ValidityTs}; use crate::runtime::relation::RelationId; pub type Tuple = Vec; @@ -20,7 +21,10 @@ pub(crate) trait TupleT { fn encode_as_key(&self, prefix: RelationId) -> Vec; } -impl TupleT for T where T: AsRef<[DataValue]> { +impl TupleT for T +where + T: AsRef<[DataValue]>, +{ fn encode_as_key(&self, prefix: RelationId) -> Vec { let len = self.as_ref().len(); let mut ret = Vec::with_capacity(4 + 4 * len + 10 * len); @@ -33,7 +37,7 @@ impl TupleT for T where T: AsRef<[DataValue]> { } } -pub(crate) fn decode_tuple_from_key(key: &[u8]) -> Tuple { +pub fn decode_tuple_from_key(key: &[u8]) -> Tuple { let mut remaining = &key[ENCODED_KEY_MIN_LEN..]; let mut ret = vec![]; while !remaining.is_empty() { @@ -44,4 +48,36 @@ pub(crate) fn decode_tuple_from_key(key: &[u8]) -> Tuple { ret } -pub(crate) const ENCODED_KEY_MIN_LEN: usize = 8; \ No newline at end of file +/// Check if the tuple key passed in should be a valid return for a validity query. +/// +/// Returns two elements, the first element contains `Some(tuple)` if the key should be included +/// in the return set and `None` otherwise, +/// the second element gives the next binary key for the seek to be used as an inclusive +/// lower bound. +pub fn check_key_for_validity(key: &[u8], valid_at: ValidityTs) -> (Option, Vec) { + let mut decoded = decode_tuple_from_key(key); + let rel_id = RelationId::raw_decode(key); + let vld = match decoded.last().unwrap() { + DataValue::Validity(vld) => vld, + _ => unreachable!(), + }; + if vld.timestamp < valid_at { + *decoded.last_mut().unwrap() = DataValue::Validity(Validity { + timestamp: valid_at, + is_assert: false, + }); + let nxt_seek = decoded.encode_as_key(rel_id); + (None, nxt_seek) + } else if !vld.is_assert { + *decoded.last_mut().unwrap() = DataValue::Validity(TERMINAL_VALIDITY); + let nxt_seek = decoded.encode_as_key(rel_id); + (None, nxt_seek) + } else { + let ret = decoded.clone(); + *decoded.last_mut().unwrap() = DataValue::Validity(TERMINAL_VALIDITY); + let nxt_seek = decoded.encode_as_key(rel_id); + (Some(ret), nxt_seek) + } +} + +pub(crate) const ENCODED_KEY_MIN_LEN: usize = 8; diff --git a/cozo-core/src/data/value.rs b/cozo-core/src/data/value.rs index 61a07225..91eb17c7 100644 --- a/cozo-core/src/data/value.rs +++ b/cozo-core/src/data/value.rs @@ -84,6 +84,20 @@ impl PartialOrd for RegexWrapper { } } +#[derive( + Copy, + Clone, + Eq, + PartialEq, + Ord, + PartialOrd, + serde_derive::Deserialize, + serde_derive::Serialize, + Hash, + Debug, +)] +pub struct ValidityTs(pub Reverse); + #[derive( Copy, Clone, @@ -96,7 +110,7 @@ impl PartialOrd for RegexWrapper { Hash, )] pub struct Validity { - pub(crate) timestamp: Reverse, + pub(crate) timestamp: ValidityTs, pub(crate) is_assert: bool, } @@ -372,7 +386,7 @@ mod tests { DataValue::List(vec![ DataValue::Bool(false), DataValue::Str(SmartString::from(r###"abc"你"好'啊👌"###)), - DataValue::from(f64::NEG_INFINITY) + DataValue::from(f64::NEG_INFINITY), ]) ); } diff --git a/cozo-core/src/fixed_rule/algos/all_pairs_shortest_path.rs b/cozo-core/src/fixed_rule/algos/all_pairs_shortest_path.rs index 13f137cd..2e4fa31f 100644 --- a/cozo-core/src/fixed_rule/algos/all_pairs_shortest_path.rs +++ b/cozo-core/src/fixed_rule/algos/all_pairs_shortest_path.rs @@ -39,7 +39,7 @@ impl FixedRule for BetweennessCentrality { let undirected = payload.bool_option("undirected", Some(false))?; let (graph, indices, _inv_indices) = - edges.to_directed_weighted_graph(undirected, false)?; + edges.as_directed_weighted_graph(undirected, false)?; let n = graph.node_count(); if n == 0 { @@ -108,7 +108,7 @@ impl FixedRule for ClosenessCentrality { let undirected = payload.bool_option("undirected", Some(false))?; let (graph, indices, _inv_indices) = - edges.to_directed_weighted_graph(undirected, false)?; + edges.as_directed_weighted_graph(undirected, false)?; let n = graph.node_count(); if n == 0 { diff --git a/cozo-core/src/fixed_rule/algos/kruskal.rs b/cozo-core/src/fixed_rule/algos/kruskal.rs index db76fc97..cf8108a7 100644 --- a/cozo-core/src/fixed_rule/algos/kruskal.rs +++ b/cozo-core/src/fixed_rule/algos/kruskal.rs @@ -34,7 +34,7 @@ impl FixedRule for MinimumSpanningForestKruskal { poison: Poison, ) -> Result<()> { let edges = payload.get_input(0)?; - let (graph, indices, _) = edges.to_directed_weighted_graph(true, true)?; + let (graph, indices, _) = edges.as_directed_weighted_graph(true, true)?; if graph.node_count() == 0 { return Ok(()); } diff --git a/cozo-core/src/fixed_rule/algos/label_propagation.rs b/cozo-core/src/fixed_rule/algos/label_propagation.rs index cc0e4072..23378829 100644 --- a/cozo-core/src/fixed_rule/algos/label_propagation.rs +++ b/cozo-core/src/fixed_rule/algos/label_propagation.rs @@ -34,7 +34,7 @@ impl FixedRule for LabelPropagation { let edges = payload.get_input(0)?; let undirected = payload.bool_option("undirected", Some(false))?; let max_iter = payload.pos_integer_option("max_iter", Some(10))?; - let (graph, indices, _inv_indices) = edges.to_directed_weighted_graph(undirected, true)?; + let (graph, indices, _inv_indices) = edges.as_directed_weighted_graph(undirected, true)?; let labels = label_propagation(&graph, max_iter, poison)?; for (idx, label) in labels.into_iter().enumerate() { let node = indices[idx].clone(); diff --git a/cozo-core/src/fixed_rule/algos/louvain.rs b/cozo-core/src/fixed_rule/algos/louvain.rs index 0d59e5b1..99d717f8 100644 --- a/cozo-core/src/fixed_rule/algos/louvain.rs +++ b/cozo-core/src/fixed_rule/algos/louvain.rs @@ -39,7 +39,7 @@ impl FixedRule for CommunityDetectionLouvain { let delta = payload.unit_interval_option("delta", Some(0.0001))? as f32; let keep_depth = payload.non_neg_integer_option("keep_depth", None).ok(); - let (graph, indices, _inv_indices) = edges.to_directed_weighted_graph(undirected, false)?; + let (graph, indices, _inv_indices) = edges.as_directed_weighted_graph(undirected, false)?; let result = louvain(&graph, delta, max_iter, poison)?; for (idx, node) in indices.into_iter().enumerate() { let mut labels = vec![]; diff --git a/cozo-core/src/fixed_rule/algos/pagerank.rs b/cozo-core/src/fixed_rule/algos/pagerank.rs index 0d3c1272..62ac4c98 100644 --- a/cozo-core/src/fixed_rule/algos/pagerank.rs +++ b/cozo-core/src/fixed_rule/algos/pagerank.rs @@ -38,7 +38,7 @@ impl FixedRule for PageRank { let epsilon = payload.unit_interval_option("epsilon", Some(0.0001))? as f32; let iterations = payload.pos_integer_option("iterations", Some(10))?; - let (graph, indices, _) = edges.to_directed_graph(undirected)?; + let (graph, indices, _) = edges.as_directed_graph(undirected)?; let (ranks, _n_run, _) = page_rank( &graph, diff --git a/cozo-core/src/fixed_rule/algos/prim.rs b/cozo-core/src/fixed_rule/algos/prim.rs index 6489fa68..1c86e4bf 100644 --- a/cozo-core/src/fixed_rule/algos/prim.rs +++ b/cozo-core/src/fixed_rule/algos/prim.rs @@ -35,7 +35,7 @@ impl FixedRule for MinimumSpanningTreePrim { poison: Poison, ) -> Result<()> { let edges = payload.get_input(0)?; - let (graph, indices, inv_indices) = edges.to_directed_weighted_graph(true, true)?; + let (graph, indices, inv_indices) = edges.as_directed_weighted_graph(true, true)?; if graph.node_count() == 0 { return Ok(()); } diff --git a/cozo-core/src/fixed_rule/algos/shortest_path_dijkstra.rs b/cozo-core/src/fixed_rule/algos/shortest_path_dijkstra.rs index 81d86494..c20d30e9 100644 --- a/cozo-core/src/fixed_rule/algos/shortest_path_dijkstra.rs +++ b/cozo-core/src/fixed_rule/algos/shortest_path_dijkstra.rs @@ -43,7 +43,7 @@ impl FixedRule for ShortestPathDijkstra { let keep_ties = payload.bool_option("keep_ties", Some(false))?; let (graph, indices, inv_indices) = - edges.to_directed_weighted_graph(undirected, false)?; + edges.as_directed_weighted_graph(undirected, false)?; let mut starting_nodes = BTreeSet::new(); for tuple in starting.iter()? { diff --git a/cozo-core/src/fixed_rule/algos/strongly_connected_components.rs b/cozo-core/src/fixed_rule/algos/strongly_connected_components.rs index ca2e5c4b..6a0196cb 100644 --- a/cozo-core/src/fixed_rule/algos/strongly_connected_components.rs +++ b/cozo-core/src/fixed_rule/algos/strongly_connected_components.rs @@ -48,7 +48,7 @@ impl FixedRule for StronglyConnectedComponent { let edges = payload.get_input(0)?; let (graph, indices, mut inv_indices) = - edges.to_directed_graph(!self.strong)?; + edges.as_directed_graph(!self.strong)?; let tarjan = TarjanSccG::new(graph).run(poison)?; for (grp_id, cc) in tarjan.iter().enumerate() { @@ -123,7 +123,7 @@ impl TarjanSccG { low_map.entry(grp).or_default().push(idx as u32); } - Ok(low_map.into_iter().map(|(_, vs)| vs).collect_vec()) + Ok(low_map.into_values().collect_vec()) } fn dfs(&mut self, at: u32) { self.stack.push(at); diff --git a/cozo-core/src/fixed_rule/algos/top_sort.rs b/cozo-core/src/fixed_rule/algos/top_sort.rs index ea6e6c31..7f749fa0 100644 --- a/cozo-core/src/fixed_rule/algos/top_sort.rs +++ b/cozo-core/src/fixed_rule/algos/top_sort.rs @@ -31,7 +31,7 @@ impl FixedRule for TopSort { ) -> Result<()> { let edges = payload.get_input(0)?; - let (graph, indices, _) = edges.to_directed_graph(false)?; + let (graph, indices, _) = edges.as_directed_graph(false)?; let sorted = kahn_g(&graph, poison)?; @@ -75,7 +75,7 @@ pub(crate) fn kahn_g(graph: &DirectedCsrGraph, poison: Poison) -> Result Result<()> { let edges = payload.get_input(0)?; - let (graph, indices, _) = edges.to_directed_graph(true)?; + let (graph, indices, _) = edges.as_directed_graph(true)?; let coefficients = clustering_coefficients(&graph, poison)?; for (idx, (cc, n_triangles, degree)) in coefficients.into_iter().enumerate() { out.put(vec![ @@ -85,7 +85,7 @@ fn clustering_coefficients( return true; } } - return false; + false }) .count() }) diff --git a/cozo-core/src/fixed_rule/algos/yen.rs b/cozo-core/src/fixed_rule/algos/yen.rs index e860b331..f8cdeea5 100644 --- a/cozo-core/src/fixed_rule/algos/yen.rs +++ b/cozo-core/src/fixed_rule/algos/yen.rs @@ -39,7 +39,7 @@ impl FixedRule for KShortestPathYen { let undirected = payload.bool_option("undirected", Some(false))?; let k = payload.pos_integer_option("k", None)?; - let (graph, indices, inv_indices) = edges.to_directed_weighted_graph(undirected, false)?; + let (graph, indices, inv_indices) = edges.as_directed_weighted_graph(undirected, false)?; let mut starting_nodes = BTreeSet::new(); for tuple in starting.iter()? { @@ -61,7 +61,7 @@ impl FixedRule for KShortestPathYen { for start in starting_nodes { for goal in &termination_nodes { for (cost, path) in - k_shortest_path_yen(k as usize, &graph, start, *goal, poison.clone())? + k_shortest_path_yen(k, &graph, start, *goal, poison.clone())? { let t = vec![ indices[start as usize].clone(), @@ -90,7 +90,7 @@ impl FixedRule for KShortestPathYen { Ok(( start, goal, - k_shortest_path_yen(k as usize, &graph, start, goal, poison.clone())?, + k_shortest_path_yen(k, &graph, start, goal, poison.clone())?, )) }, ) diff --git a/cozo-core/src/fixed_rule/mod.rs b/cozo-core/src/fixed_rule/mod.rs index 0487de7c..0efafa8c 100644 --- a/cozo-core/src/fixed_rule/mod.rs +++ b/cozo-core/src/fixed_rule/mod.rs @@ -83,7 +83,11 @@ impl<'a, 'b> FixedRuleInputRelation<'a, 'b> { } MagicFixedRuleRuleArg::Stored { name, valid_at, .. } => { let relation = self.tx.get_relation(name, false)?; - Box::new(relation.scan_all(self.tx)) + if let Some(valid_at) = valid_at { + Box::new(relation.skip_scan_all(self.tx, *valid_at)) + } else { + Box::new(relation.scan_all(self.tx)) + } } }) } @@ -99,7 +103,11 @@ impl<'a, 'b> FixedRuleInputRelation<'a, 'b> { MagicFixedRuleRuleArg::Stored { name, valid_at, .. } => { let relation = self.tx.get_relation(name, false)?; let t = vec![prefix.clone()]; - Box::new(relation.scan_prefix(self.tx, &t)) + if let Some(valid_at) = valid_at { + Box::new(relation.skip_scan_prefix(self.tx, &t, *valid_at)) + } else { + Box::new(relation.scan_prefix(self.tx, &t)) + } } }) } @@ -107,7 +115,7 @@ impl<'a, 'b> FixedRuleInputRelation<'a, 'b> { self.arg_manifest.span() } #[cfg(feature = "graph-algo")] - pub fn to_directed_graph( + pub fn as_directed_graph( &self, undirected: bool, ) -> Result<( @@ -173,7 +181,7 @@ impl<'a, 'b> FixedRuleInputRelation<'a, 'b> { Ok((graph, indices, inv_indices)) } #[cfg(feature = "graph-algo")] - pub fn to_directed_weighted_graph( + pub fn as_directed_weighted_graph( &self, undirected: bool, allow_negative_weights: bool, @@ -238,21 +246,19 @@ impl<'a, 'b> FixedRuleInputRelation<'a, 'b> { return None; }; - if f < 0. { - if !allow_negative_weights { - error = Some( - BadEdgeWeightError( - d, - self.arg_manifest - .bindings() - .get(2) - .map(|s| s.span) - .unwrap_or_else(|| self.span()), - ) - .into(), - ); - return None; - } + if f < 0. && !allow_negative_weights { + error = Some( + BadEdgeWeightError( + d, + self.arg_manifest + .bindings() + .get(2) + .map(|s| s.span) + .unwrap_or_else(|| self.span()), + ) + .into(), + ); + return None; } f } diff --git a/cozo-core/src/fixed_rule/utilities/reorder_sort.rs b/cozo-core/src/fixed_rule/utilities/reorder_sort.rs index 6b920520..514615d8 100644 --- a/cozo-core/src/fixed_rule/utilities/reorder_sort.rs +++ b/cozo-core/src/fixed_rule/utilities/reorder_sort.rs @@ -17,7 +17,7 @@ use crate::data::functions::OP_LIST; use crate::data::program::WrongFixedRuleOptionError; use crate::data::symb::Symbol; use crate::data::value::DataValue; -use crate::fixed_rule::{FixedRule, FixedRulePayload, CannotDetermineArity}; +use crate::fixed_rule::{CannotDetermineArity, FixedRule, FixedRulePayload}; use crate::parse::SourceSpan; use crate::runtime::db::Poison; use crate::runtime::temp_store::RegularTempStore; @@ -91,8 +91,7 @@ impl FixedRule for ReorderSort { let mut count = 0usize; let mut rank = 0usize; let mut last = &DataValue::Bot; - let skip = skip as usize; - let take_plus_skip = (take as usize).saturating_add(skip); + let take_plus_skip = take.saturating_add(skip); for val in &buffer { let sorter = val.last().unwrap(); diff --git a/cozo-core/src/parse/query.rs b/cozo-core/src/parse/query.rs index 8f208bed..e9ea1976 100644 --- a/cozo-core/src/parse/query.rs +++ b/cozo-core/src/parse/query.rs @@ -22,7 +22,7 @@ use thiserror::Error; use crate::data::aggr::{parse_aggr, Aggregation}; use crate::data::expr::Expr; -use crate::data::functions::{MAX_VALIDITY, str2vld}; +use crate::data::functions::{MAX_VALIDITY_TS, str2vld}; use crate::data::program::{ FixedRuleApply, FixedRuleArg, InputAtom, InputInlineRule, InputInlineRulesOrFixed, InputNamedFieldRelationApplyAtom, InputProgram, InputRelationApplyAtom, InputRuleApplyAtom, @@ -30,7 +30,7 @@ use crate::data::program::{ }; use crate::data::relation::{ColType, ColumnDef, NullableColType, StoredRelationMetadata}; use crate::data::symb::{Symbol, PROG_ENTRY}; -use crate::data::value::DataValue; +use crate::data::value::{DataValue, ValidityTs}; use crate::fixed_rule::utilities::constant::Constant; use crate::fixed_rule::{FixedRuleHandle, FixedRuleNotFoundError}; use crate::parse::expr::build_expr; @@ -97,7 +97,7 @@ pub(crate) fn parse_query( src: Pairs<'_>, param_pool: &BTreeMap, fixedrithms: &BTreeMap>>, - cur_vld: Reverse, + cur_vld: ValidityTs, ) -> Result { let mut progs: BTreeMap = Default::default(); let mut out_opts: QueryOutOptions = Default::default(); @@ -435,7 +435,7 @@ pub(crate) fn parse_query( fn parse_rule( src: Pair<'_>, param_pool: &BTreeMap, - cur_vld: Reverse, + cur_vld: ValidityTs, ) -> Result<(Symbol, InputInlineRule)> { let span = src.extract_span(); let mut src = src.into_inner(); @@ -469,7 +469,7 @@ fn parse_rule( fn parse_disjunction( pair: Pair<'_>, param_pool: &BTreeMap, - cur_vld: Reverse, + cur_vld: ValidityTs, ) -> Result { let span = pair.extract_span(); let res: Vec<_> = pair @@ -483,7 +483,7 @@ fn parse_disjunction( }) } -fn parse_atom(src: Pair<'_>, param_pool: &BTreeMap, cur_vld: Reverse) -> Result { +fn parse_atom(src: Pair<'_>, param_pool: &BTreeMap, cur_vld: ValidityTs) -> Result { Ok(match src.as_rule() { Rule::rule_body => { let span = src.extract_span(); @@ -682,7 +682,7 @@ fn parse_fixed_rule( src: Pair<'_>, param_pool: &BTreeMap, fixedrithms: &BTreeMap>>, - cur_vld: Reverse, + cur_vld: ValidityTs, ) -> Result<(Symbol, FixedRuleApply)> { let mut src = src.into_inner(); let (out_symbol, head, aggr) = parse_rule_head(src.next().unwrap(), param_pool)?; @@ -865,19 +865,19 @@ fn make_empty_const_rule(prog: &mut InputProgram, bindings: &[Symbol]) { ); } -fn expr2vld_spec(expr: Expr, cur_vld: Reverse) -> Result> { +fn expr2vld_spec(expr: Expr, cur_vld: ValidityTs) -> Result { let vld_span = expr.span(); match expr.eval_to_const()? { DataValue::Num(n) => { - let microseconds = n.get_int().ok_or_else(|| BadValiditySpecification(vld_span))?; - Ok(Reverse(microseconds)) + let microseconds = n.get_int().ok_or(BadValiditySpecification(vld_span))?; + Ok(ValidityTs(Reverse(microseconds))) } DataValue::Str(s) => { match &s as &str { "now" => { Ok(cur_vld) } - "max" => { Ok(MAX_VALIDITY) } + "max" => { Ok(MAX_VALIDITY_TS) } s => { Ok(str2vld(s).map_err(|_| BadValiditySpecification(vld_span))?) } diff --git a/cozo-core/src/parse/schema.rs b/cozo-core/src/parse/schema.rs index 3285d884..3453e270 100644 --- a/cozo-core/src/parse/schema.rs +++ b/cozo-core/src/parse/schema.rs @@ -122,6 +122,7 @@ fn parse_type_inner(pair: Pair<'_>) -> Result { Rule::string_type => ColType::String, Rule::bytes_type => ColType::Bytes, Rule::uuid_type => ColType::Uuid, + Rule::validity_type => ColType::Validity, Rule::list_type => { let mut inner = pair.into_inner(); let eltype = parse_nullable_type(inner.next().unwrap())?; diff --git a/cozo-core/src/query/compile.rs b/cozo-core/src/query/compile.rs index 0b94b3ee..62550dac 100644 --- a/cozo-core/src/query/compile.rs +++ b/cozo-core/src/query/compile.rs @@ -15,7 +15,7 @@ use thiserror::Error; use crate::data::aggr::Aggregation; use crate::data::expr::Expr; use crate::data::program::{ - MagicFixedRuleApply, MagicAtom, MagicInlineRule, MagicRulesOrFixed, MagicSymbol, + MagicAtom, MagicFixedRuleApply, MagicInlineRule, MagicRulesOrFixed, MagicSymbol, StratifiedMagicProgram, }; use crate::data::symb::Symbol; @@ -238,7 +238,8 @@ impl<'a> SessionTx<'a> { } } - let right = RelAlgebra::relation(right_vars, store, rel_app.span, rel_app.valid_at); + let right = + RelAlgebra::relation(right_vars, store, rel_app.span, rel_app.valid_at)?; debug_assert_eq!(prev_joiner_vars.len(), right_joiner_vars.len()); ret = ret.join(right, prev_joiner_vars, right_joiner_vars, rel_app.span); } @@ -306,7 +307,12 @@ impl<'a> SessionTx<'a> { } } - let right = RelAlgebra::relation(right_vars, store, relation_app.span, relation_app.valid_at); + let right = RelAlgebra::relation( + right_vars, + store, + relation_app.span, + relation_app.valid_at, + )?; debug_assert_eq!(prev_joiner_vars.len(), right_joiner_vars.len()); ret = ret.neg_join( right, @@ -366,7 +372,7 @@ impl<'a> SessionTx<'a> { #[error("Symbol '{0}' in rule head is unbound")] #[diagnostic(code(eval::unbound_symb_in_head))] #[diagnostic(help( - "Note that symbols occurring only in negated positions are not considered bound" + "Note that symbols occurring only in negated positions are not considered bound" ))] struct UnboundSymbolInRuleHead(String, #[label] SourceSpan); diff --git a/cozo-core/src/query/graph.rs b/cozo-core/src/query/graph.rs index 07796ed2..2dfd93df 100644 --- a/cozo-core/src/query/graph.rs +++ b/cozo-core/src/query/graph.rs @@ -163,7 +163,7 @@ impl<'a> TarjanScc<'a> { low_map.entry(grp).or_default().push(idx); } - Ok(low_map.into_iter().map(|(_, vs)| vs).collect_vec()) + Ok(low_map.into_values().collect_vec()) } fn dfs(&mut self, at: usize) { self.stack.push(at); diff --git a/cozo-core/src/query/magic.rs b/cozo-core/src/query/magic.rs index 7560197b..359a5004 100644 --- a/cozo-core/src/query/magic.rs +++ b/cozo-core/src/query/magic.rs @@ -10,19 +10,21 @@ use std::collections::BTreeSet; use std::mem; use itertools::Itertools; -use miette::{ensure, Result}; +use miette::{bail, ensure, Result}; use smallvec::SmallVec; use smartstring::SmartString; use crate::data::program::{ - FixedRuleArg, MagicFixedRuleApply, MagicFixedRuleRuleArg, MagicAtom, MagicInlineRule, MagicProgram, - MagicRelationApplyAtom, MagicRuleApplyAtom, MagicRulesOrFixed, MagicSymbol, - NormalFormRulesOrFixed, NormalFormAtom, NormalFormInlineRule, NormalFormProgram, + FixedRuleArg, MagicAtom, MagicFixedRuleApply, MagicFixedRuleRuleArg, MagicInlineRule, + MagicProgram, MagicRelationApplyAtom, MagicRuleApplyAtom, MagicRulesOrFixed, MagicSymbol, + NormalFormAtom, NormalFormInlineRule, NormalFormProgram, NormalFormRulesOrFixed, StratifiedMagicProgram, StratifiedNormalFormProgram, }; +use crate::data::relation::{ColType, NullableColType}; use crate::data::symb::{Symbol, PROG_ENTRY}; use crate::parse::SourceSpan; use crate::query::logical::NamedFieldNotFound; +use crate::query::ra::InvalidTimeTravelScanning; use crate::runtime::transact::SessionTx; impl NormalFormProgram { @@ -332,13 +334,36 @@ impl NormalFormProgram { name, bindings, span, - valid_at - } => MagicFixedRuleRuleArg::Stored { - name: name.clone(), - bindings: bindings.clone(), - valid_at: *valid_at, - span: *span, - }, + valid_at, + } => { + if valid_at.is_some() { + let relation = tx.get_relation(name, false)?; + let last_col_type = &relation + .metadata + .keys + .last() + .unwrap() + .typing; + if *last_col_type + != (NullableColType { + coltype: ColType::Validity, + nullable: false, + }) + { + bail!(InvalidTimeTravelScanning( + name.to_string(), + *span + )); + } + } + + MagicFixedRuleRuleArg::Stored { + name: name.clone(), + bindings: bindings.clone(), + valid_at: *valid_at, + span: *span, + } + } FixedRuleArg::NamedStored { name, bindings, @@ -346,6 +371,25 @@ impl NormalFormProgram { span, } => { let relation = tx.get_relation(name, false)?; + if valid_at.is_some() { + let last_col_type = &relation + .metadata + .keys + .last() + .unwrap() + .typing; + if *last_col_type + != (NullableColType { + coltype: ColType::Validity, + nullable: false, + }) + { + bail!(InvalidTimeTravelScanning( + name.to_string(), + *span + )); + } + } let fields: BTreeSet<_> = relation .metadata .keys diff --git a/cozo-core/src/query/ra.rs b/cozo-core/src/query/ra.rs index a28c87f7..3e39a793 100644 --- a/cozo-core/src/query/ra.rs +++ b/cozo-core/src/query/ra.rs @@ -6,7 +6,6 @@ * You can obtain one at https://mozilla.org/MPL/2.0/. */ -use std::cmp::Reverse; use std::collections::{BTreeMap, BTreeSet}; use std::fmt::{Debug, Formatter}; use std::iter; @@ -14,14 +13,15 @@ use std::iter; use either::{Left, Right}; use itertools::Itertools; use log::{debug, error}; -use miette::{Diagnostic, Result}; +use miette::{bail, Diagnostic, Result}; use thiserror::Error; use crate::data::expr::{compute_bounds, Expr}; use crate::data::program::MagicSymbol; +use crate::data::relation::{ColType, NullableColType}; use crate::data::symb::Symbol; use crate::data::tuple::{Tuple, TupleIter}; -use crate::data::value::DataValue; +use crate::data::value::{DataValue, ValidityTs}; use crate::parse::SourceSpan; use crate::runtime::relation::RelationHandle; use crate::runtime::temp_store::EpochStore; @@ -317,6 +317,14 @@ impl Debug for RelAlgebra { } } +#[derive(Debug, Error, Diagnostic)] +#[error("Invalid time travel on relation {0}")] +#[diagnostic(code(eval::invalid_time_travel))] +#[diagnostic(help( + "Time travel scanning requires the last key column of the relation to be of type 'Validity'" +))] +pub(crate) struct InvalidTimeTravelScanning(pub(crate) String, #[label] pub(crate) SourceSpan); + impl RelAlgebra { pub(crate) fn fill_binding_indices(&mut self) -> Result<()> { match self { @@ -380,25 +388,31 @@ impl RelAlgebra { bindings: Vec, storage: RelationHandle, span: SourceSpan, - validity: Option>, - ) -> Self { + validity: Option, + ) -> Result { match validity { - None => { - Self::Stored(StoredRA { - bindings, - storage, - filters: vec![], - span, - }) - } + None => Ok(Self::Stored(StoredRA { + bindings, + storage, + filters: vec![], + span, + })), Some(vld) => { - Self::StoredWithValidity(StoredWithValidityRA { + if storage.metadata.keys.last().unwrap().typing + != (NullableColType { + coltype: ColType::Validity, + nullable: false, + }) + { + bail!(InvalidTimeTravelScanning(storage.name.to_string(), span)); + }; + Ok(Self::StoredWithValidity(StoredWithValidityRA { bindings, storage, filters: vec![], valid_at: vld, span, - }) + })) } } } @@ -423,11 +437,11 @@ impl RelAlgebra { }) } RelAlgebra::Filter(FilteredRA { - parent, - mut pred, - to_eliminate, - span, - }) => { + parent, + mut pred, + to_eliminate, + span, + }) => { pred.push(filter); RelAlgebra::Filter(FilteredRA { parent, @@ -437,11 +451,11 @@ impl RelAlgebra { }) } RelAlgebra::TempStore(TempStoreRA { - bindings, - storage_key, - mut filters, - span, - }) => { + bindings, + storage_key, + mut filters, + span, + }) => { filters.push(filter); RelAlgebra::TempStore(TempStoreRA { bindings, @@ -451,11 +465,11 @@ impl RelAlgebra { }) } RelAlgebra::Stored(StoredRA { - bindings, - storage, - mut filters, - span, - }) => { + bindings, + storage, + mut filters, + span, + }) => { filters.push(filter); RelAlgebra::Stored(StoredRA { bindings, @@ -465,12 +479,12 @@ impl RelAlgebra { }) } RelAlgebra::StoredWithValidity(StoredWithValidityRA { - bindings, - storage, - mut filters, - span, - valid_at, - }) => { + bindings, + storage, + mut filters, + span, + valid_at, + }) => { filters.push(filter); RelAlgebra::StoredWithValidity(StoredWithValidityRA { bindings, @@ -744,8 +758,8 @@ fn invert_option_err(v: Result>) -> Option> { fn filter_iter( filters: Vec, - it: impl Iterator>, -) -> impl Iterator> { + it: impl Iterator>, +) -> impl Iterator> { it.filter_map_ok(move |t| -> Option> { for p in filters.iter() { match p.eval_pred(&t) { @@ -759,7 +773,7 @@ fn filter_iter( } Some(Ok(t)) }) - .map(flatten_err) + .map(flatten_err) } fn get_eliminate_indices(bindings: &[Symbol], eliminate: &BTreeSet) -> BTreeSet { @@ -789,7 +803,7 @@ pub(crate) struct StoredWithValidityRA { pub(crate) bindings: Vec, pub(crate) storage: RelationHandle, pub(crate) filters: Vec, - pub(crate) valid_at: Reverse, + pub(crate) valid_at: ValidityTs, pub(crate) span: SourceSpan, } @@ -808,13 +822,12 @@ impl StoredWithValidityRA { Ok(()) } fn iter<'a>(&'a self, tx: &'a SessionTx<'_>) -> Result> { - todo!() - // let it = self.storage.scan_all(tx); - // Ok(if self.filters.is_empty() { - // Box::new(it) - // } else { - // Box::new(filter_iter(self.filters.clone(), it)) - // }) + let it = self.storage.skip_scan_all(tx, self.valid_at); + Ok(if self.filters.is_empty() { + Box::new(it) + } else { + Box::new(filter_iter(self.filters.clone(), it)) + }) } fn prefix_join<'a>( &'a self, @@ -822,89 +835,81 @@ impl StoredWithValidityRA { left_iter: TupleIter<'a>, (left_join_indices, right_join_indices): (Vec, Vec), eliminate_indices: BTreeSet, - left_tuple_len: usize, ) -> Result> { - todo!() - // let mut right_invert_indices = right_join_indices.iter().enumerate().collect_vec(); - // right_invert_indices.sort_by_key(|(_, b)| **b); - // let left_to_prefix_indices = right_invert_indices - // .into_iter() - // .map(|(a, _)| left_join_indices[a]) - // .collect_vec(); - // - // let key_len = self.storage.metadata.keys.len(); - // if left_to_prefix_indices.len() >= key_len { - // return self.point_lookup_join( - // tx, - // left_iter, - // key_len, - // left_to_prefix_indices, - // eliminate_indices, - // left_tuple_len, - // ); - // } - // - // let mut skip_range_check = false; - // // In some cases, maybe we can stop as soon as we get one result? - // let it = left_iter - // .map_ok(move |tuple| { - // let prefix = left_to_prefix_indices - // .iter() - // .map(|i| tuple[*i].clone()) - // .collect_vec(); - // - // if !skip_range_check && !self.filters.is_empty() { - // let other_bindings = &self.bindings[right_join_indices.len()..]; - // let (l_bound, u_bound) = match compute_bounds(&self.filters, other_bindings) { - // Ok(b) => b, - // _ => (vec![], vec![]), - // }; - // if !l_bound.iter().all(|v| *v == DataValue::Null) - // || !u_bound.iter().all(|v| *v == DataValue::Bot) - // { - // return Left( - // self.storage - // .scan_bounded_prefix(tx, &prefix, &l_bound, &u_bound) - // .map(move |res_found| -> Result> { - // let found = res_found?; - // for p in self.filters.iter() { - // if !p.eval_pred(&found)? { - // return Ok(None); - // } - // } - // let mut ret = tuple.clone(); - // ret.extend(found); - // Ok(Some(ret)) - // }) - // .filter_map(swap_option_result), - // ); - // } - // } - // skip_range_check = true; - // Right( - // self.storage - // .scan_prefix(tx, &prefix) - // .map(move |res_found| -> Result> { - // let found = res_found?; - // for p in self.filters.iter() { - // if !p.eval_pred(&found)? { - // return Ok(None); - // } - // } - // let mut ret = tuple.clone(); - // ret.extend(found); - // Ok(Some(ret)) - // }) - // .filter_map(swap_option_result), - // ) - // }) - // .flatten_ok() - // .map(flatten_err); - // Ok(if eliminate_indices.is_empty() { - // Box::new(it) - // } else { - // Box::new(it.map_ok(move |t| eliminate_from_tuple(t, &eliminate_indices))) - // }) + let mut right_invert_indices = right_join_indices.iter().enumerate().collect_vec(); + right_invert_indices.sort_by_key(|(_, b)| **b); + let left_to_prefix_indices = right_invert_indices + .into_iter() + .map(|(a, _)| left_join_indices[a]) + .collect_vec(); + + let mut skip_range_check = false; + + let it = left_iter + .map_ok(move |tuple| { + let prefix = left_to_prefix_indices + .iter() + .map(|i| tuple[*i].clone()) + .collect_vec(); + + if !skip_range_check && !self.filters.is_empty() { + let other_bindings = &self.bindings[right_join_indices.len()..]; + let (l_bound, u_bound) = match compute_bounds(&self.filters, other_bindings) { + Ok(b) => b, + _ => (vec![], vec![]), + }; + if !l_bound.iter().all(|v| *v == DataValue::Null) + || !u_bound.iter().all(|v| *v == DataValue::Bot) + { + return Left( + self.storage + .skip_scan_bounded_prefix( + tx, + &prefix, + &l_bound, + &u_bound, + self.valid_at, + ) + .map(move |res_found| -> Result> { + let found = res_found?; + for p in self.filters.iter() { + if !p.eval_pred(&found)? { + return Ok(None); + } + } + let mut ret = tuple.clone(); + ret.extend(found); + Ok(Some(ret)) + }) + .filter_map(swap_option_result), + ); + } + } + skip_range_check = true; + Right( + self.storage + .skip_scan_prefix(tx, &prefix, self.valid_at) + .map(move |res_found| -> Result> { + let found = res_found?; + for p in self.filters.iter() { + if !p.eval_pred(&found)? { + return Ok(None); + } + } + let mut ret = tuple.clone(); + ret.extend(found); + Ok(Some(ret)) + }) + .filter_map(swap_option_result), + ) + }) + .flatten_ok() + .map(flatten_err); + Ok(if eliminate_indices.is_empty() { + Box::new(it) + } else { + Box::new(it.map_ok(move |t| eliminate_from_tuple(t, &eliminate_indices))) + }) } } @@ -1118,7 +1123,7 @@ impl StoredRA { 'outer: for found in self.storage.scan_prefix(tx, &prefix) { let found = found?; for (left_idx, right_idx) in - left_join_indices.iter().zip(right_join_indices.iter()) + left_join_indices.iter().zip(right_join_indices.iter()) { if tuple[*left_idx] != found[*right_idx] { continue 'outer; @@ -1281,7 +1286,7 @@ impl TempStoreRA { 'outer: for found in storage.prefix_iter(&prefix) { for (left_idx, right_idx) in - left_join_indices.iter().zip(right_join_indices.iter()) + left_join_indices.iter().zip(right_join_indices.iter()) { if tuple[*left_idx] != *found.get(*right_idx) { continue 'outer; @@ -1414,7 +1419,7 @@ impl TempStoreRA { Ok(Some(ret)) } }) - .filter_map(swap_option_result), + .filter_map(swap_option_result), ); } } @@ -1444,7 +1449,7 @@ impl TempStoreRA { Ok(Some(ret)) } }) - .filter_map(swap_option_result), + .filter_map(swap_option_result), ) }) .flatten_ok() @@ -1467,7 +1472,7 @@ impl Debug for Joiner { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { let left_bindings = BindingFormatter(self.left_keys.clone()); let right_bindings = BindingFormatter(self.right_keys.clone()); - write!(f, "{:?}<->{:?}", left_bindings, right_bindings, ) + write!(f, "{:?}<->{:?}", left_bindings, right_bindings,) } } @@ -1856,13 +1861,11 @@ impl InnerJoin { ) .unwrap(); if join_is_prefix(&join_indices.1) { - let left_len = self.left.bindings_after_eliminate().len(); r.prefix_join( tx, self.left.iter(tx, delta_rule, stores)?, join_indices, eliminate_indices, - left_len, ) } else { self.materialized_join(tx, eliminate_indices, delta_rule, stores) diff --git a/cozo-core/src/runtime/relation.rs b/cozo-core/src/runtime/relation.rs index eaee26b0..9d49624b 100644 --- a/cozo-core/src/runtime/relation.rs +++ b/cozo-core/src/runtime/relation.rs @@ -20,7 +20,7 @@ use crate::data::memcmp::MemCmpEncoder; use crate::data::relation::StoredRelationMetadata; use crate::data::symb::Symbol; use crate::data::tuple::{decode_tuple_from_key, Tuple, TupleT, ENCODED_KEY_MIN_LEN}; -use crate::data::value::DataValue; +use crate::data::value::{DataValue, ValidityTs}; use crate::parse::SourceSpan; use crate::runtime::transact::SessionTx; @@ -226,6 +226,16 @@ impl RelationHandle { tx.store_tx.range_scan_tuple(&lower, &upper) } + pub(crate) fn skip_scan_all<'a>( + &self, + tx: &'a SessionTx<'_>, + valid_at: ValidityTs, + ) -> impl Iterator> + 'a { + let lower = Tuple::default().encode_as_key(self.id); + let upper = Tuple::default().encode_as_key(self.id.next()); + tx.store_tx.range_skip_scan_tuple(&lower, &upper, valid_at) + } + pub(crate) fn get<'a>( &self, tx: &'a SessionTx<'_>, @@ -254,9 +264,26 @@ impl RelationHandle { upper.push(DataValue::Bot); let prefix_encoded = lower.encode_as_key(self.id); let upper_encoded = upper.encode_as_key(self.id); - // RelationIterator::new(tx, &prefix_encoded, &upper_encoded) - tx.store_tx.range_scan_tuple(&prefix_encoded, &upper_encoded) + tx.store_tx + .range_scan_tuple(&prefix_encoded, &upper_encoded) + } + + pub(crate) fn skip_scan_prefix<'a>( + &self, + tx: &'a SessionTx<'_>, + prefix: &Tuple, + valid_at: ValidityTs, + ) -> impl Iterator> + 'a { + let mut lower = prefix.clone(); + lower.truncate(self.metadata.keys.len()); + let mut upper = lower.clone(); + upper.push(DataValue::Bot); + let prefix_encoded = lower.encode_as_key(self.id); + let upper_encoded = upper.encode_as_key(self.id); + tx.store_tx + .range_skip_scan_tuple(&prefix_encoded, &upper_encoded, valid_at) } + pub(crate) fn scan_bounded_prefix<'a>( &self, tx: &'a SessionTx<'_>, @@ -273,6 +300,24 @@ impl RelationHandle { let upper_encoded = upper_t.encode_as_key(self.id); tx.store_tx.range_scan_tuple(&lower_encoded, &upper_encoded) } + pub(crate) fn skip_scan_bounded_prefix<'a>( + &self, + tx: &'a SessionTx<'_>, + prefix: &Tuple, + lower: &[DataValue], + upper: &[DataValue], + valid_at: ValidityTs, + ) -> impl Iterator> + 'a { + let mut lower_t = prefix.clone(); + lower_t.extend_from_slice(lower); + let mut upper_t = prefix.clone(); + upper_t.extend_from_slice(upper); + upper_t.push(DataValue::Bot); + let lower_encoded = lower_t.encode_as_key(self.id); + let upper_encoded = upper_t.encode_as_key(self.id); + tx.store_tx + .range_skip_scan_tuple(&lower_encoded, &upper_encoded, valid_at) + } } /// Decode tuple from key-value pairs. Used for customizing storage @@ -280,11 +325,15 @@ impl RelationHandle { #[inline] pub fn decode_tuple_from_kv(key: &[u8], val: &[u8]) -> Tuple { let mut tup = decode_tuple_from_key(key); + extend_tuple_from_v(&mut tup, val); + tup +} + +pub fn extend_tuple_from_v(key: &mut Tuple, val: &[u8]) { if !val.is_empty() { let vals: Vec = rmp_serde::from_slice(&val[ENCODED_KEY_MIN_LEN..]).unwrap(); - tup.extend(vals); + key.extend(vals); } - tup } #[derive(Debug, Diagnostic, Error)] diff --git a/cozo-core/src/storage/mem.rs b/cozo-core/src/storage/mem.rs index 8b158ce5..6af0fa0d 100644 --- a/cozo-core/src/storage/mem.rs +++ b/cozo-core/src/storage/mem.rs @@ -12,13 +12,15 @@ use std::collections::BTreeMap; use std::default::Default; use std::iter::Fuse; use std::mem; +use std::ops::Bound; use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard}; use itertools::Itertools; use miette::{bail, Result}; -use crate::data::tuple::Tuple; -use crate::runtime::relation::decode_tuple_from_kv; +use crate::data::tuple::{check_key_for_validity, Tuple}; +use crate::data::value::ValidityTs; +use crate::runtime::relation::{decode_tuple_from_kv, extend_tuple_from_v}; use crate::storage::{Storage, StoreTx}; use crate::utils::swap_option_result; @@ -191,6 +193,35 @@ impl<'s> StoreTx<'s> for MemTx<'s> { } } + fn range_skip_scan_tuple<'a>( + &'a self, + lower: &[u8], + upper: &[u8], + valid_at: ValidityTs, + ) -> Box> + 'a> { + match self { + MemTx::Reader(stored) => Box::new( + SkipIterator { + inner: stored, + upper: upper.to_vec(), + valid_at, + next_bound: lower.to_vec(), + } + .map(Ok), + ), + MemTx::Writer(stored, delta) => Box::new( + SkipDualIterator { + stored, + delta, + upper: upper.to_vec(), + valid_at, + next_bound: lower.to_vec(), + } + .map(Ok), + ), + } + } + fn range_scan<'a>( &'a self, lower: &[u8], @@ -386,3 +417,102 @@ impl Iterator for CacheIter<'_> { swap_option_result(self.next_inner()) } } + +/// Keep an eye on https://github.com/rust-lang/rust/issues/49638 +struct SkipIterator<'a> { + inner: &'a BTreeMap, Vec>, + upper: Vec, + valid_at: ValidityTs, + next_bound: Vec, +} + +impl<'a> Iterator for SkipIterator<'a> { + type Item = Tuple; + + fn next(&mut self) -> Option { + loop { + let nxt = self + .inner + .range::, (Bound<&Vec>, Bound<&Vec>)>(( + Bound::Included(&self.next_bound), + Bound::Excluded(&self.upper), + )) + .next(); + match nxt { + None => return None, + Some((candidate_key, candidate_val)) => { + let (ret, nxt_bound) = check_key_for_validity(candidate_key, self.valid_at); + self.next_bound = nxt_bound; + if let Some(mut nk) = ret { + extend_tuple_from_v(&mut nk, candidate_val); + return Some(nk); + } + } + } + } + } +} + +struct SkipDualIterator<'a> { + stored: &'a BTreeMap, Vec>, + delta: &'a BTreeMap, Option>>, + upper: Vec, + valid_at: ValidityTs, + next_bound: Vec, +} + +impl<'a> Iterator for SkipDualIterator<'a> { + type Item = Tuple; + + fn next(&mut self) -> Option { + loop { + let stored_nxt = self + .stored + .range::, (Bound<&Vec>, Bound<&Vec>)>(( + Bound::Included(&self.next_bound), + Bound::Excluded(&self.upper), + )) + .next(); + let delta_nxt = self + .delta + .range::, (Bound<&Vec>, Bound<&Vec>)>(( + Bound::Included(&self.next_bound), + Bound::Excluded(&self.upper), + )) + .next(); + let (candidate_key, candidate_val) = match (stored_nxt, delta_nxt) { + (None, None) => return None, + (None, Some((delta_key, maybe_delta_val))) => match maybe_delta_val { + None => { + let (_, nxt_seek) = check_key_for_validity(delta_key, self.valid_at); + self.next_bound = nxt_seek; + continue; + } + Some(delta_val) => (delta_key, delta_val), + }, + (Some((stored_key, stored_val)), None) => (stored_key, stored_val), + (Some((stored_key, stored_val)), Some((delta_key, maybe_delta_val))) => { + if stored_key < delta_key { + (stored_key, stored_val) + } else { + match maybe_delta_val { + None => { + let (_, nxt_seek) = + check_key_for_validity(delta_key, self.valid_at); + self.next_bound = nxt_seek; + continue; + } + Some(delta_val) => (delta_key, delta_val), + } + } + } + }; + let (ret, nxt_bound) = check_key_for_validity(candidate_key, self.valid_at); + self.next_bound = nxt_bound; + if let Some(mut nk) = ret { + extend_tuple_from_v(&mut nk, candidate_val); + return Some(nk); + } + } + } +} diff --git a/cozo-core/src/storage/mod.rs b/cozo-core/src/storage/mod.rs index ebc2ec07..0fcf3fa2 100644 --- a/cozo-core/src/storage/mod.rs +++ b/cozo-core/src/storage/mod.rs @@ -10,6 +10,7 @@ use itertools::Itertools; use miette::Result; use crate::data::tuple::Tuple; +use crate::data::value::ValidityTs; use crate::decode_tuple_from_kv; pub(crate) mod mem; @@ -93,6 +94,29 @@ pub trait StoreTx<'s> { Box::new(it.map_ok(|(k, v)| decode_tuple_from_kv(&k, &v))) } + /// Scan on a range with a certain validity. + /// + /// `lower` is inclusive whereas `upper` is exclusive. + /// For tuples that differ only with respect to their validity, which must be at + /// the last slot of the key, + /// only the tuple that has validity equal to or earlier than (i.e. greater by the comparator) + /// `valid_at` should be considered for returning, and only those with an assertive validity + /// should be returned. Every other tuple should be skipped. + /// + /// Ideally, implementations should take advantage of seeking capabilities of the + /// underlying storage so that not every tuple within the `lower` and `upper` range + /// need to be looked at. + /// + /// For custom implementations, it is OK to return an iterator that always error out, + /// in which case the database with the engine does not support time travelling. + /// You should indicate this clearly in your error message. + fn range_skip_scan_tuple<'a>( + &'a self, + lower: &[u8], + upper: &[u8], + valid_at: ValidityTs, + ) -> Box> + 'a>; + /// Scan on a range and return the raw results. /// `lower` is inclusive whereas `upper` is exclusive. fn range_scan<'a>( diff --git a/cozo-core/src/storage/rocks.rs b/cozo-core/src/storage/rocks.rs index aff437a4..d5414600 100644 --- a/cozo-core/src/storage/rocks.rs +++ b/cozo-core/src/storage/rocks.rs @@ -14,9 +14,10 @@ use miette::{miette, IntoDiagnostic, Result, WrapErr}; use cozorocks::{DbBuilder, DbIter, RocksDb, Tx}; -use crate::data::tuple::Tuple; +use crate::data::tuple::{check_key_for_validity, Tuple}; +use crate::data::value::ValidityTs; use crate::runtime::db::{BadDbInit, DbManifest}; -use crate::runtime::relation::decode_tuple_from_kv; +use crate::runtime::relation::{decode_tuple_from_kv, extend_tuple_from_v}; use crate::storage::{Storage, StoreTx}; use crate::utils::swap_option_result; use crate::Db; @@ -195,6 +196,21 @@ impl<'s> StoreTx<'s> for RocksDbTx { }) } + fn range_skip_scan_tuple<'a>( + &'a self, + lower: &[u8], + upper: &[u8], + valid_at: ValidityTs, + ) -> Box> + 'a> { + let inner = self.db_tx.iterator().upper_bound(upper).start(); + Box::new(RocksDbSkipIterator { + inner, + upper_bound: upper.to_vec(), + next_bound: lower.to_owned(), + valid_at + }) + } + fn range_scan<'a>( &'a self, lower: &[u8], @@ -256,6 +272,47 @@ impl Iterator for RocksDbIterator { } } + +pub(crate) struct RocksDbSkipIterator { + inner: DbIter, + upper_bound: Vec, + next_bound: Vec, + valid_at: ValidityTs +} + +impl RocksDbSkipIterator { + #[inline] + fn next_inner(&mut self) -> Result> { + loop { + self.inner.seek(&self.next_bound); + match self.inner.pair()? { + None => return Ok(None), + Some((k_slice, v_slice)) => { + if self.upper_bound.as_slice() <= k_slice { + return Ok(None); + } + + let (ret, nxt_bound) = check_key_for_validity(k_slice, self.valid_at); + self.next_bound = nxt_bound; + if let Some(mut tup) = ret { + extend_tuple_from_v(&mut tup, v_slice); + return Ok(Some(tup)); + } + } + } + } + } +} + +impl Iterator for RocksDbSkipIterator { + type Item = Result; + #[inline] + fn next(&mut self) -> Option { + swap_option_result(self.next_inner()) + } +} + + pub(crate) struct RocksDbIteratorRaw { inner: DbIter, started: bool, diff --git a/cozo-core/src/storage/sled.rs b/cozo-core/src/storage/sled.rs index d32b4b04..e3c7f68e 100644 --- a/cozo-core/src/storage/sled.rs +++ b/cozo-core/src/storage/sled.rs @@ -9,13 +9,14 @@ use std::cmp::Ordering; use std::iter::Fuse; use std::path::Path; -use std::thread; +use std::{iter, thread}; use itertools::Itertools; -use miette::{IntoDiagnostic, Result}; +use miette::{miette, IntoDiagnostic, Result}; use sled::{Batch, Config, Db, IVec, Iter, Mode}; use crate::data::tuple::Tuple; +use crate::data::value::ValidityTs; use crate::runtime::relation::decode_tuple_from_kv; use crate::storage::{Storage, StoreTx}; use crate::utils::swap_option_result; @@ -203,6 +204,17 @@ impl<'s> StoreTx<'s> for SledTx { } } + fn range_skip_scan_tuple<'a>( + &'a self, + _lower: &[u8], + _upper: &[u8], + _valid_at: ValidityTs, + ) -> Box> + 'a> { + Box::new(iter::once(miette!( + "Sled backend does not support time travelling." + ))) + } + fn range_scan<'a>( &'a self, lower: &[u8], diff --git a/cozo-core/src/storage/sqlite.rs b/cozo-core/src/storage/sqlite.rs index 882fcfde..83d779b8 100644 --- a/cozo-core/src/storage/sqlite.rs +++ b/cozo-core/src/storage/sqlite.rs @@ -14,9 +14,11 @@ use either::{Either, Left, Right}; use miette::{bail, miette, IntoDiagnostic, Result}; use sqlite::{State, Statement}; -use crate::data::tuple::Tuple; -use crate::runtime::relation::decode_tuple_from_kv; +use crate::data::tuple::{check_key_for_validity, Tuple}; +use crate::data::value::ValidityTs; +use crate::runtime::relation::{decode_tuple_from_kv, extend_tuple_from_v}; use crate::storage::{Storage, StoreTx}; +use crate::utils::swap_option_result; /// The Sqlite storage engine #[derive(Clone)] @@ -144,7 +146,7 @@ pub struct SqliteTx<'a> { committed: bool, } -const N_QUERIES: usize = 5; +const N_QUERIES: usize = 6; const N_CACHED_QUERIES: usize = 4; const QUERIES: [&str; N_QUERIES] = [ "select v from cozo where k = ?;", @@ -152,6 +154,7 @@ const QUERIES: [&str; N_QUERIES] = [ "delete from cozo where k = ?;", "select 1 from cozo where k = ?;", "select k, v from cozo where k >= ? and k < ? order by k;", + "select k, v from cozo where k >= ? and k < ? order by k limit 1;", ]; const GET_QUERY: usize = 0; @@ -159,6 +162,7 @@ const PUT_QUERY: usize = 1; const DEL_QUERY: usize = 2; const EXISTS_QUERY: usize = 3; const RANGE_QUERY: usize = 4; +const SKIP_RANGE_QUERY: usize = 5; impl Drop for SqliteTx<'_> { fn drop(&mut self) { @@ -276,6 +280,22 @@ impl<'s> StoreTx<'s> for SqliteTx<'s> { Box::new(TupleIter(statement)) } + fn range_skip_scan_tuple<'a>( + &'a self, + lower: &[u8], + upper: &[u8], + valid_at: ValidityTs, + ) -> Box> + 'a> { + let query = QUERIES[SKIP_RANGE_QUERY]; + let statement = self.conn.as_ref().unwrap().prepare(query).unwrap(); + Box::new(SkipIter { + stmt: statement, + valid_at, + next_bound: lower.to_vec(), + upper_bound: upper.to_vec(), + }) + } + fn range_scan<'a>( &'a self, lower: &[u8], @@ -341,3 +361,42 @@ impl<'l> Iterator for RawIter<'l> { } } } + +struct SkipIter<'l> { + stmt: Statement<'l>, + valid_at: ValidityTs, + next_bound: Vec, + upper_bound: Vec, +} + +impl<'l> SkipIter<'l> { + fn next_inner(&mut self) -> Result> { + loop { + self.stmt.reset().into_diagnostic()?; + self.stmt.bind((1, &self.next_bound as &[u8])).unwrap(); + self.stmt.bind((2, &self.upper_bound as &[u8])).unwrap(); + + match self.stmt.next().into_diagnostic()? { + State::Done => return Ok(None), + State::Row => { + let k = self.stmt.read::, _>(0).unwrap(); + let (ret, nxt_bound) = check_key_for_validity(&k, self.valid_at); + self.next_bound = nxt_bound; + if let Some(mut tup) = ret { + let v = self.stmt.read::, _>(1).unwrap(); + extend_tuple_from_v(&mut tup, &v); + return Ok(Some(tup)) + } + } + } + } + } +} + +impl<'l> Iterator for SkipIter<'l> { + type Item = Result; + + fn next(&mut self) -> Option { + swap_option_result(self.next_inner()) + } +} diff --git a/cozo-core/src/storage/tikv.rs b/cozo-core/src/storage/tikv.rs index af854807..d4bc7d4b 100644 --- a/cozo-core/src/storage/tikv.rs +++ b/cozo-core/src/storage/tikv.rs @@ -8,15 +8,16 @@ use std::ops::Bound::{Excluded, Included}; use std::sync::{Arc, Mutex}; -use std::thread; +use std::{iter, thread}; use itertools::Itertools; use lazy_static::lazy_static; -use miette::{IntoDiagnostic, Result}; +use miette::{miette, IntoDiagnostic, Result}; use tikv_client::{RawClient, Transaction, TransactionClient}; use tokio::runtime::Runtime; use crate::data::tuple::Tuple; +use crate::data::value::ValidityTs; use crate::runtime::relation::decode_tuple_from_kv; use crate::storage::{Storage, StoreTx}; use crate::utils::swap_option_result; @@ -160,6 +161,17 @@ impl<'s> StoreTx<'s> for TiKvTx { Box::new(BatchScanner::new(self.tx.clone(), lower, upper)) } + fn range_skip_scan_tuple<'a>( + &'a self, + _lower: &[u8], + _upper: &[u8], + _valid_at: ValidityTs, + ) -> Box> + 'a> { + Box::new(iter::once(miette!( + "TiKV backend does not support time travelling." + ))) + } + fn range_scan<'a>( &'a self, lower: &[u8], diff --git a/cozorocks/build.rs b/cozorocks/build.rs index 6515ddf1..99f04871 100644 --- a/cozorocks/build.rs +++ b/cozorocks/build.rs @@ -334,7 +334,7 @@ fn build_rocksdb() { fn try_to_find_and_link_lib(lib_name: &str) -> bool { println!("cargo:rerun-if-env-changed={}_COMPILE", lib_name); - if let Ok(v) = env::var(&format!("{}_COMPILE", lib_name)) { + if let Ok(v) = env::var(format!("{}_COMPILE", lib_name)) { if v.to_lowercase() == "true" || v == "1" { return false; } @@ -343,9 +343,9 @@ fn try_to_find_and_link_lib(lib_name: &str) -> bool { println!("cargo:rerun-if-env-changed={}_LIB_DIR", lib_name); println!("cargo:rerun-if-env-changed={}_STATIC", lib_name); - if let Ok(lib_dir) = env::var(&format!("{}_LIB_DIR", lib_name)) { + if let Ok(lib_dir) = env::var(format!("{}_LIB_DIR", lib_name)) { println!("cargo:rustc-link-search=native={}", lib_dir); - let mode = match env::var_os(&format!("{}_STATIC", lib_name)) { + let mode = match env::var_os(format!("{}_STATIC", lib_name)) { Some(_) => "static", None => "dylib", };