time travel functionality

main
Ziyang Hu 2 years ago
parent cd43966389
commit 7aaea2de42

@ -186,7 +186,7 @@ literal = _{ null | boolean | number | string}
table_schema = {"{" ~ table_cols ~ ("=>" ~ table_cols)? ~ "}"}
table_cols = {(table_col ~ ",")* ~ table_col?}
table_col = {ident ~ (":" ~ col_type)? ~ (("default" ~ expr) | ("=" ~ out_arg))?}
col_type = {(any_type | bool_type | int_type | float_type | string_type | bytes_type | uuid_type | list_type | tuple_type) ~ "?"?}
col_type = {(any_type | bool_type | int_type | float_type | string_type | bytes_type | uuid_type | validity_type | list_type | tuple_type) ~ "?"?}
col_type_with_term = {SOI ~ col_type ~ EOI}
any_type = {"Any"}
int_type = {"Int"}
@ -195,5 +195,6 @@ string_type = {"String"}
bytes_type = {"Bytes"}
uuid_type = {"Uuid"}
bool_type = {"Bool"}
validity_type = {"Validity"}
list_type = {"[" ~ col_type ~ (";" ~ expr)? ~ "]"}
tuple_type = {"(" ~ (col_type ~ ",")* ~ col_type? ~ ")"}

@ -1164,6 +1164,7 @@ impl Aggregation {
Ok(())
}
pub(crate) fn normal_init(&mut self, args: &[DataValue]) -> Result<()> {
#[allow(clippy::box_default)]
self.normal_op.replace(match self.name {
name if name == AGGR_AND.name => Box::new(AggrAnd::default()),
name if name == AGGR_OR.name => Box::new(AggrOr::default()),

@ -25,7 +25,7 @@ use uuid::v1::Timestamp;
use crate::data::expr::Op;
use crate::data::json::JsonValue;
use crate::data::value::{DataValue, Num, RegexWrapper, UuidWrapper};
use crate::data::value::{DataValue, Num, RegexWrapper, UuidWrapper, Validity, ValidityTs};
macro_rules! define_op {
($name:ident, $min_arity:expr, $vararg:expr) => {
@ -112,8 +112,8 @@ define_op!(OP_GT, 2, false);
pub(crate) fn op_gt(args: &[DataValue]) -> Result<DataValue> {
ensure_same_value_type(&args[0], &args[1])?;
Ok(DataValue::Bool(match (&args[0], &args[1]) {
(DataValue::Num(Num::Float(l)), DataValue::Num(Num::Int(r))) => *l as f64 > *r as f64,
(DataValue::Num(Num::Int(l)), DataValue::Num(Num::Float(r))) => *l as f64 > *r as f64,
(DataValue::Num(Num::Float(l)), DataValue::Num(Num::Int(r))) => *l > *r as f64,
(DataValue::Num(Num::Int(l)), DataValue::Num(Num::Float(r))) => *l as f64 > *r,
(a, b) => a > b,
}))
}
@ -122,8 +122,8 @@ define_op!(OP_GE, 2, false);
pub(crate) fn op_ge(args: &[DataValue]) -> Result<DataValue> {
ensure_same_value_type(&args[0], &args[1])?;
Ok(DataValue::Bool(match (&args[0], &args[1]) {
(DataValue::Num(Num::Float(l)), DataValue::Num(Num::Int(r))) => *l as f64 >= *r as f64,
(DataValue::Num(Num::Int(l)), DataValue::Num(Num::Float(r))) => *l as f64 >= *r as f64,
(DataValue::Num(Num::Float(l)), DataValue::Num(Num::Int(r))) => *l >= *r as f64,
(DataValue::Num(Num::Int(l)), DataValue::Num(Num::Float(r))) => *l as f64 >= *r,
(a, b) => a >= b,
}))
}
@ -132,8 +132,8 @@ define_op!(OP_LT, 2, false);
pub(crate) fn op_lt(args: &[DataValue]) -> Result<DataValue> {
ensure_same_value_type(&args[0], &args[1])?;
Ok(DataValue::Bool(match (&args[0], &args[1]) {
(DataValue::Num(Num::Float(l)), DataValue::Num(Num::Int(r))) => (*l as f64) < (*r as f64),
(DataValue::Num(Num::Int(l)), DataValue::Num(Num::Float(r))) => (*l as f64) < (*r as f64),
(DataValue::Num(Num::Float(l)), DataValue::Num(Num::Int(r))) => *l < (*r as f64),
(DataValue::Num(Num::Int(l)), DataValue::Num(Num::Float(r))) => (*l as f64) < *r,
(a, b) => a < b,
}))
}
@ -142,8 +142,8 @@ define_op!(OP_LE, 2, false);
pub(crate) fn op_le(args: &[DataValue]) -> Result<DataValue> {
ensure_same_value_type(&args[0], &args[1])?;
Ok(DataValue::Bool(match (&args[0], &args[1]) {
(DataValue::Num(Num::Float(l)), DataValue::Num(Num::Int(r))) => (*l as f64) <= (*r as f64),
(DataValue::Num(Num::Int(l)), DataValue::Num(Num::Float(r))) => (*l as f64) <= (*r as f64),
(DataValue::Num(Num::Float(l)), DataValue::Num(Num::Int(r))) => *l <= (*r as f64),
(DataValue::Num(Num::Int(l)), DataValue::Num(Num::Float(r))) => (*l as f64) <= *r,
(a, b) => a <= b,
}))
}
@ -1277,25 +1277,23 @@ pub(crate) fn op_to_bool(args: &[DataValue]) -> Result<DataValue> {
}))
}
define_op!(OP_TO_UNITY, 1, false);
pub(crate) fn op_to_unity(args: &[DataValue]) -> Result<DataValue> {
Ok(DataValue::from(match &args[0] {
DataValue::Null => 0,
DataValue::Bool(b) => *b as i64,
DataValue::Num(n) => (n.get_float() != 0.) as i64,
DataValue::Str(s) => if s.is_empty() { 0 } else { 1 },
DataValue::Bytes(b) => if b.is_empty() { 0 } else { 1 },
DataValue::Uuid(u) => if u.0.is_nil() { 0 } else { 1 },
DataValue::Regex(r) => if r.0.as_str().is_empty() { 0 } else { 1 },
DataValue::List(l) => if l.is_empty() { 0 } else { 1 },
DataValue::Set(s) => if s.is_empty() { 0 } else { 1 },
DataValue::Validity(vld) => if vld.is_assert { 1 } else { 0 },
DataValue::Str(s) => i64::from(!s.is_empty()),
DataValue::Bytes(b) => i64::from(!b.is_empty()),
DataValue::Uuid(u) => i64::from(!u.0.is_nil()),
DataValue::Regex(r) => i64::from(!r.0.as_str().is_empty()),
DataValue::List(l) => i64::from(!l.is_empty()),
DataValue::Set(s) => i64::from(!s.is_empty()),
DataValue::Validity(vld) => i64::from(vld.is_assert),
DataValue::Bot => 0,
}))
}
define_op!(OP_TO_FLOAT, 1, false);
pub(crate) fn op_to_float(args: &[DataValue]) -> Result<DataValue> {
Ok(match &args[0] {
@ -1479,21 +1477,23 @@ pub(crate) fn op_now(_args: &[DataValue]) -> Result<DataValue> {
))
}
pub(crate) fn current_validity() -> Reverse<i64> {
pub(crate) fn current_validity() -> ValidityTs {
#[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))]
let ts_millis = {
let now = SystemTime::now();
(now.duration_since(UNIX_EPOCH).unwrap().as_secs_f64() * 1000.) as i64
};
#[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
let ts_millis = {
Date::now() as i64
};
let ts_millis = { Date::now() as i64 };
Reverse(ts_millis)
ValidityTs(Reverse(ts_millis))
}
pub(crate) const MAX_VALIDITY: Reverse<i64> = Reverse(i64::MAX);
pub(crate) const MAX_VALIDITY_TS: ValidityTs = ValidityTs(Reverse(i64::MAX));
pub(crate) const TERMINAL_VALIDITY: Validity = Validity {
timestamp: ValidityTs(Reverse(i64::MIN)),
is_assert: true,
};
define_op!(OP_FORMAT_TIMESTAMP, 1, true);
pub(crate) fn op_format_timestamp(args: &[DataValue]) -> Result<DataValue> {
@ -1542,11 +1542,11 @@ pub(crate) fn op_parse_timestamp(args: &[DataValue]) -> Result<DataValue> {
))
}
pub(crate) fn str2vld(s: &str) -> Result<Reverse<i64>> {
pub(crate) fn str2vld(s: &str) -> Result<ValidityTs> {
let dt = DateTime::parse_from_rfc3339(s).map_err(|_| miette!("bad datetime: {}", s))?;
let st: SystemTime = dt.into();
let microseconds = st.duration_since(UNIX_EPOCH).unwrap().as_secs_f64() * 1_000_000.;
Ok(Reverse(microseconds as i64))
Ok(ValidityTs(Reverse(microseconds as i64)))
}
define_op!(OP_RAND_UUID_V1, 0, false);

@ -14,7 +14,7 @@ use std::str::FromStr;
use byteorder::{BigEndian, ByteOrder, WriteBytesExt};
use regex::Regex;
use crate::data::value::{DataValue, Num, RegexWrapper, UuidWrapper, Validity};
use crate::data::value::{DataValue, Num, RegexWrapper, UuidWrapper, Validity, ValidityTs};
const INIT_TAG: u8 = 0x00;
const NULL_TAG: u8 = 0x01;
@ -81,7 +81,7 @@ pub(crate) trait MemCmpEncoder: Write {
self.write_u8(INIT_TAG).unwrap()
}
DataValue::Validity(vld) => {
let ts = vld.timestamp.0;
let ts = vld.timestamp.0 .0;
let ts_u64 = order_encode_i64(ts);
let ts_flipped = !ts_u64;
self.write_u8(VLD_TAG).unwrap();
@ -288,7 +288,7 @@ impl DataValue {
let is_assert = *is_assert_byte != 0;
(
DataValue::Validity(Validity {
timestamp: Reverse(ts),
timestamp: ValidityTs(Reverse(ts)),
is_assert,
}),
rest,

@ -6,7 +6,6 @@
* You can obtain one at https://mozilla.org/MPL/2.0/.
*/
use std::cmp::Reverse;
use std::collections::btree_map::Entry;
use std::collections::{BTreeMap, BTreeSet};
use std::fmt::{Debug, Display, Formatter};
@ -22,7 +21,7 @@ use crate::data::aggr::Aggregation;
use crate::data::expr::Expr;
use crate::data::relation::StoredRelationMetadata;
use crate::data::symb::{Symbol, PROG_ENTRY};
use crate::data::value::DataValue;
use crate::data::value::{DataValue, ValidityTs};
use crate::fixed_rule::{FixedRule, FixedRuleHandle};
use crate::parse::SourceSpan;
use crate::runtime::relation::InputRelationHandle;
@ -325,13 +324,13 @@ pub(crate) enum FixedRuleArg {
Stored {
name: Symbol,
bindings: Vec<Symbol>,
valid_at: Option<Reverse<i64>>,
valid_at: Option<ValidityTs>,
span: SourceSpan,
},
NamedStored {
name: Symbol,
bindings: BTreeMap<SmartString<LazyCompact>, Symbol>,
valid_at: Option<Reverse<i64>>,
valid_at: Option<ValidityTs>,
span: SourceSpan,
},
}
@ -376,7 +375,7 @@ pub(crate) enum MagicFixedRuleRuleArg {
Stored {
name: Symbol,
bindings: Vec<Symbol>,
valid_at: Option<Reverse<i64>>,
valid_at: Option<ValidityTs>,
span: SourceSpan,
},
}
@ -534,7 +533,7 @@ impl InputProgram {
for (symb, aggr) in head.iter().zip(aggrs.iter()) {
if let Some((aggr, _)) = aggr {
ret.push(Symbol::new(
&format!(
format!(
"{}({})",
aggr.name
.strip_prefix("AGGR_")
@ -995,7 +994,7 @@ pub(crate) struct InputRuleApplyAtom {
pub(crate) struct InputNamedFieldRelationApplyAtom {
pub(crate) name: Symbol,
pub(crate) args: BTreeMap<SmartString<LazyCompact>, Expr>,
pub(crate) valid_at: Option<Reverse<i64>>,
pub(crate) valid_at: Option<ValidityTs>,
pub(crate) span: SourceSpan,
}
@ -1003,7 +1002,7 @@ pub(crate) struct InputNamedFieldRelationApplyAtom {
pub(crate) struct InputRelationApplyAtom {
pub(crate) name: Symbol,
pub(crate) args: Vec<Expr>,
pub(crate) valid_at: Option<Reverse<i64>>,
pub(crate) valid_at: Option<ValidityTs>,
pub(crate) span: SourceSpan,
}
@ -1018,7 +1017,7 @@ pub(crate) struct NormalFormRuleApplyAtom {
pub(crate) struct NormalFormRelationApplyAtom {
pub(crate) name: Symbol,
pub(crate) args: Vec<Symbol>,
pub(crate) valid_at: Option<Reverse<i64>>,
pub(crate) valid_at: Option<ValidityTs>,
pub(crate) span: SourceSpan,
}
@ -1033,7 +1032,7 @@ pub(crate) struct MagicRuleApplyAtom {
pub(crate) struct MagicRelationApplyAtom {
pub(crate) name: Symbol,
pub(crate) args: Vec<Symbol>,
pub(crate) valid_at: Option<Reverse<i64>>,
pub(crate) valid_at: Option<ValidityTs>,
pub(crate) span: SourceSpan,
}

@ -32,6 +32,7 @@ impl Display for NullableColType {
ColType::String => f.write_str("String")?,
ColType::Bytes => f.write_str("Bytes")?,
ColType::Uuid => f.write_str("Uuid")?,
ColType::Validity => f.write_str("Validity")?,
ColType::List { eltype, len } => {
f.write_str("[")?;
write!(f, "{}", eltype)?;
@ -73,6 +74,7 @@ pub(crate) enum ColType {
len: Option<usize>,
},
Tuple(Vec<NullableColType>),
Validity
}
#[derive(Debug, Clone, Eq, PartialEq, serde_derive::Deserialize, serde_derive::Serialize)]
@ -212,6 +214,12 @@ impl NullableColType {
bail!(make_err())
}
}
ColType::Validity => {
match data {
vld@DataValue::Validity(_) => vld,
_ => todo!()
}
}
})
}
}

@ -6,10 +6,11 @@
* You can obtain one at https://mozilla.org/MPL/2.0/.
*/
use crate::data::functions::TERMINAL_VALIDITY;
use miette::Result;
use crate::data::memcmp::MemCmpEncoder;
use crate::data::value::DataValue;
use crate::data::value::{DataValue, Validity, ValidityTs};
use crate::runtime::relation::RelationId;
pub type Tuple = Vec<DataValue>;
@ -20,7 +21,10 @@ pub(crate) trait TupleT {
fn encode_as_key(&self, prefix: RelationId) -> Vec<u8>;
}
impl<T> TupleT for T where T: AsRef<[DataValue]> {
impl<T> TupleT for T
where
T: AsRef<[DataValue]>,
{
fn encode_as_key(&self, prefix: RelationId) -> Vec<u8> {
let len = self.as_ref().len();
let mut ret = Vec::with_capacity(4 + 4 * len + 10 * len);
@ -33,7 +37,7 @@ impl<T> TupleT for T where T: AsRef<[DataValue]> {
}
}
pub(crate) fn decode_tuple_from_key(key: &[u8]) -> Tuple {
pub fn decode_tuple_from_key(key: &[u8]) -> Tuple {
let mut remaining = &key[ENCODED_KEY_MIN_LEN..];
let mut ret = vec![];
while !remaining.is_empty() {
@ -44,4 +48,36 @@ pub(crate) fn decode_tuple_from_key(key: &[u8]) -> Tuple {
ret
}
/// Check if the tuple key passed in should be a valid return for a validity query.
///
/// Returns two elements, the first element contains `Some(tuple)` if the key should be included
/// in the return set and `None` otherwise,
/// the second element gives the next binary key for the seek to be used as an inclusive
/// lower bound.
pub fn check_key_for_validity(key: &[u8], valid_at: ValidityTs) -> (Option<Tuple>, Vec<u8>) {
let mut decoded = decode_tuple_from_key(key);
let rel_id = RelationId::raw_decode(key);
let vld = match decoded.last().unwrap() {
DataValue::Validity(vld) => vld,
_ => unreachable!(),
};
if vld.timestamp < valid_at {
*decoded.last_mut().unwrap() = DataValue::Validity(Validity {
timestamp: valid_at,
is_assert: false,
});
let nxt_seek = decoded.encode_as_key(rel_id);
(None, nxt_seek)
} else if !vld.is_assert {
*decoded.last_mut().unwrap() = DataValue::Validity(TERMINAL_VALIDITY);
let nxt_seek = decoded.encode_as_key(rel_id);
(None, nxt_seek)
} else {
let ret = decoded.clone();
*decoded.last_mut().unwrap() = DataValue::Validity(TERMINAL_VALIDITY);
let nxt_seek = decoded.encode_as_key(rel_id);
(Some(ret), nxt_seek)
}
}
pub(crate) const ENCODED_KEY_MIN_LEN: usize = 8;

@ -84,6 +84,20 @@ impl PartialOrd for RegexWrapper {
}
}
#[derive(
Copy,
Clone,
Eq,
PartialEq,
Ord,
PartialOrd,
serde_derive::Deserialize,
serde_derive::Serialize,
Hash,
Debug,
)]
pub struct ValidityTs(pub Reverse<i64>);
#[derive(
Copy,
Clone,
@ -96,7 +110,7 @@ impl PartialOrd for RegexWrapper {
Hash,
)]
pub struct Validity {
pub(crate) timestamp: Reverse<i64>,
pub(crate) timestamp: ValidityTs,
pub(crate) is_assert: bool,
}
@ -372,7 +386,7 @@ mod tests {
DataValue::List(vec![
DataValue::Bool(false),
DataValue::Str(SmartString::from(r###"abc"你"好'啊👌"###)),
DataValue::from(f64::NEG_INFINITY)
DataValue::from(f64::NEG_INFINITY),
])
);
}

@ -39,7 +39,7 @@ impl FixedRule for BetweennessCentrality {
let undirected = payload.bool_option("undirected", Some(false))?;
let (graph, indices, _inv_indices) =
edges.to_directed_weighted_graph(undirected, false)?;
edges.as_directed_weighted_graph(undirected, false)?;
let n = graph.node_count();
if n == 0 {
@ -108,7 +108,7 @@ impl FixedRule for ClosenessCentrality {
let undirected = payload.bool_option("undirected", Some(false))?;
let (graph, indices, _inv_indices) =
edges.to_directed_weighted_graph(undirected, false)?;
edges.as_directed_weighted_graph(undirected, false)?;
let n = graph.node_count();
if n == 0 {

@ -34,7 +34,7 @@ impl FixedRule for MinimumSpanningForestKruskal {
poison: Poison,
) -> Result<()> {
let edges = payload.get_input(0)?;
let (graph, indices, _) = edges.to_directed_weighted_graph(true, true)?;
let (graph, indices, _) = edges.as_directed_weighted_graph(true, true)?;
if graph.node_count() == 0 {
return Ok(());
}

@ -34,7 +34,7 @@ impl FixedRule for LabelPropagation {
let edges = payload.get_input(0)?;
let undirected = payload.bool_option("undirected", Some(false))?;
let max_iter = payload.pos_integer_option("max_iter", Some(10))?;
let (graph, indices, _inv_indices) = edges.to_directed_weighted_graph(undirected, true)?;
let (graph, indices, _inv_indices) = edges.as_directed_weighted_graph(undirected, true)?;
let labels = label_propagation(&graph, max_iter, poison)?;
for (idx, label) in labels.into_iter().enumerate() {
let node = indices[idx].clone();

@ -39,7 +39,7 @@ impl FixedRule for CommunityDetectionLouvain {
let delta = payload.unit_interval_option("delta", Some(0.0001))? as f32;
let keep_depth = payload.non_neg_integer_option("keep_depth", None).ok();
let (graph, indices, _inv_indices) = edges.to_directed_weighted_graph(undirected, false)?;
let (graph, indices, _inv_indices) = edges.as_directed_weighted_graph(undirected, false)?;
let result = louvain(&graph, delta, max_iter, poison)?;
for (idx, node) in indices.into_iter().enumerate() {
let mut labels = vec![];

@ -38,7 +38,7 @@ impl FixedRule for PageRank {
let epsilon = payload.unit_interval_option("epsilon", Some(0.0001))? as f32;
let iterations = payload.pos_integer_option("iterations", Some(10))?;
let (graph, indices, _) = edges.to_directed_graph(undirected)?;
let (graph, indices, _) = edges.as_directed_graph(undirected)?;
let (ranks, _n_run, _) = page_rank(
&graph,

@ -35,7 +35,7 @@ impl FixedRule for MinimumSpanningTreePrim {
poison: Poison,
) -> Result<()> {
let edges = payload.get_input(0)?;
let (graph, indices, inv_indices) = edges.to_directed_weighted_graph(true, true)?;
let (graph, indices, inv_indices) = edges.as_directed_weighted_graph(true, true)?;
if graph.node_count() == 0 {
return Ok(());
}

@ -43,7 +43,7 @@ impl FixedRule for ShortestPathDijkstra {
let keep_ties = payload.bool_option("keep_ties", Some(false))?;
let (graph, indices, inv_indices) =
edges.to_directed_weighted_graph(undirected, false)?;
edges.as_directed_weighted_graph(undirected, false)?;
let mut starting_nodes = BTreeSet::new();
for tuple in starting.iter()? {

@ -48,7 +48,7 @@ impl FixedRule for StronglyConnectedComponent {
let edges = payload.get_input(0)?;
let (graph, indices, mut inv_indices) =
edges.to_directed_graph(!self.strong)?;
edges.as_directed_graph(!self.strong)?;
let tarjan = TarjanSccG::new(graph).run(poison)?;
for (grp_id, cc) in tarjan.iter().enumerate() {
@ -123,7 +123,7 @@ impl TarjanSccG {
low_map.entry(grp).or_default().push(idx as u32);
}
Ok(low_map.into_iter().map(|(_, vs)| vs).collect_vec())
Ok(low_map.into_values().collect_vec())
}
fn dfs(&mut self, at: u32) {
self.stack.push(at);

@ -31,7 +31,7 @@ impl FixedRule for TopSort {
) -> Result<()> {
let edges = payload.get_input(0)?;
let (graph, indices, _) = edges.to_directed_graph(false)?;
let (graph, indices, _) = edges.as_directed_graph(false)?;
let sorted = kahn_g(&graph, poison)?;
@ -75,7 +75,7 @@ pub(crate) fn kahn_g(graph: &DirectedCsrGraph<u32>, poison: Poison) -> Result<Ve
while !pending.is_empty() {
let removed = pending.pop().unwrap();
sorted.push(removed);
for nxt in graph.out_neighbors(removed as u32) {
for nxt in graph.out_neighbors(removed) {
in_degree[*nxt as usize] -= 1;
if in_degree[*nxt as usize] == 0 {
pending.push(*nxt);

@ -33,7 +33,7 @@ impl FixedRule for ClusteringCoefficients {
poison: Poison,
) -> Result<()> {
let edges = payload.get_input(0)?;
let (graph, indices, _) = edges.to_directed_graph(true)?;
let (graph, indices, _) = edges.as_directed_graph(true)?;
let coefficients = clustering_coefficients(&graph, poison)?;
for (idx, (cc, n_triangles, degree)) in coefficients.into_iter().enumerate() {
out.put(vec![
@ -85,7 +85,7 @@ fn clustering_coefficients(
return true;
}
}
return false;
false
})
.count()
})

@ -39,7 +39,7 @@ impl FixedRule for KShortestPathYen {
let undirected = payload.bool_option("undirected", Some(false))?;
let k = payload.pos_integer_option("k", None)?;
let (graph, indices, inv_indices) = edges.to_directed_weighted_graph(undirected, false)?;
let (graph, indices, inv_indices) = edges.as_directed_weighted_graph(undirected, false)?;
let mut starting_nodes = BTreeSet::new();
for tuple in starting.iter()? {
@ -61,7 +61,7 @@ impl FixedRule for KShortestPathYen {
for start in starting_nodes {
for goal in &termination_nodes {
for (cost, path) in
k_shortest_path_yen(k as usize, &graph, start, *goal, poison.clone())?
k_shortest_path_yen(k, &graph, start, *goal, poison.clone())?
{
let t = vec![
indices[start as usize].clone(),
@ -90,7 +90,7 @@ impl FixedRule for KShortestPathYen {
Ok((
start,
goal,
k_shortest_path_yen(k as usize, &graph, start, goal, poison.clone())?,
k_shortest_path_yen(k, &graph, start, goal, poison.clone())?,
))
},
)

@ -83,8 +83,12 @@ impl<'a, 'b> FixedRuleInputRelation<'a, 'b> {
}
MagicFixedRuleRuleArg::Stored { name, valid_at, .. } => {
let relation = self.tx.get_relation(name, false)?;
if let Some(valid_at) = valid_at {
Box::new(relation.skip_scan_all(self.tx, *valid_at))
} else {
Box::new(relation.scan_all(self.tx))
}
}
})
}
pub fn prefix_iter(&self, prefix: &DataValue) -> Result<TupleIter<'_>> {
@ -99,15 +103,19 @@ impl<'a, 'b> FixedRuleInputRelation<'a, 'b> {
MagicFixedRuleRuleArg::Stored { name, valid_at, .. } => {
let relation = self.tx.get_relation(name, false)?;
let t = vec![prefix.clone()];
if let Some(valid_at) = valid_at {
Box::new(relation.skip_scan_prefix(self.tx, &t, *valid_at))
} else {
Box::new(relation.scan_prefix(self.tx, &t))
}
}
})
}
pub fn span(&self) -> SourceSpan {
self.arg_manifest.span()
}
#[cfg(feature = "graph-algo")]
pub fn to_directed_graph(
pub fn as_directed_graph(
&self,
undirected: bool,
) -> Result<(
@ -173,7 +181,7 @@ impl<'a, 'b> FixedRuleInputRelation<'a, 'b> {
Ok((graph, indices, inv_indices))
}
#[cfg(feature = "graph-algo")]
pub fn to_directed_weighted_graph(
pub fn as_directed_weighted_graph(
&self,
undirected: bool,
allow_negative_weights: bool,
@ -238,8 +246,7 @@ impl<'a, 'b> FixedRuleInputRelation<'a, 'b> {
return None;
};
if f < 0. {
if !allow_negative_weights {
if f < 0. && !allow_negative_weights {
error = Some(
BadEdgeWeightError(
d,
@ -253,7 +260,6 @@ impl<'a, 'b> FixedRuleInputRelation<'a, 'b> {
);
return None;
}
}
f
}
None => {

@ -17,7 +17,7 @@ use crate::data::functions::OP_LIST;
use crate::data::program::WrongFixedRuleOptionError;
use crate::data::symb::Symbol;
use crate::data::value::DataValue;
use crate::fixed_rule::{FixedRule, FixedRulePayload, CannotDetermineArity};
use crate::fixed_rule::{CannotDetermineArity, FixedRule, FixedRulePayload};
use crate::parse::SourceSpan;
use crate::runtime::db::Poison;
use crate::runtime::temp_store::RegularTempStore;
@ -91,8 +91,7 @@ impl FixedRule for ReorderSort {
let mut count = 0usize;
let mut rank = 0usize;
let mut last = &DataValue::Bot;
let skip = skip as usize;
let take_plus_skip = (take as usize).saturating_add(skip);
let take_plus_skip = take.saturating_add(skip);
for val in &buffer {
let sorter = val.last().unwrap();

@ -22,7 +22,7 @@ use thiserror::Error;
use crate::data::aggr::{parse_aggr, Aggregation};
use crate::data::expr::Expr;
use crate::data::functions::{MAX_VALIDITY, str2vld};
use crate::data::functions::{MAX_VALIDITY_TS, str2vld};
use crate::data::program::{
FixedRuleApply, FixedRuleArg, InputAtom, InputInlineRule, InputInlineRulesOrFixed,
InputNamedFieldRelationApplyAtom, InputProgram, InputRelationApplyAtom, InputRuleApplyAtom,
@ -30,7 +30,7 @@ use crate::data::program::{
};
use crate::data::relation::{ColType, ColumnDef, NullableColType, StoredRelationMetadata};
use crate::data::symb::{Symbol, PROG_ENTRY};
use crate::data::value::DataValue;
use crate::data::value::{DataValue, ValidityTs};
use crate::fixed_rule::utilities::constant::Constant;
use crate::fixed_rule::{FixedRuleHandle, FixedRuleNotFoundError};
use crate::parse::expr::build_expr;
@ -97,7 +97,7 @@ pub(crate) fn parse_query(
src: Pairs<'_>,
param_pool: &BTreeMap<String, DataValue>,
fixedrithms: &BTreeMap<String, Arc<Box<dyn FixedRule>>>,
cur_vld: Reverse<i64>,
cur_vld: ValidityTs,
) -> Result<InputProgram> {
let mut progs: BTreeMap<Symbol, InputInlineRulesOrFixed> = Default::default();
let mut out_opts: QueryOutOptions = Default::default();
@ -435,7 +435,7 @@ pub(crate) fn parse_query(
fn parse_rule(
src: Pair<'_>,
param_pool: &BTreeMap<String, DataValue>,
cur_vld: Reverse<i64>,
cur_vld: ValidityTs,
) -> Result<(Symbol, InputInlineRule)> {
let span = src.extract_span();
let mut src = src.into_inner();
@ -469,7 +469,7 @@ fn parse_rule(
fn parse_disjunction(
pair: Pair<'_>,
param_pool: &BTreeMap<String, DataValue>,
cur_vld: Reverse<i64>,
cur_vld: ValidityTs,
) -> Result<InputAtom> {
let span = pair.extract_span();
let res: Vec<_> = pair
@ -483,7 +483,7 @@ fn parse_disjunction(
})
}
fn parse_atom(src: Pair<'_>, param_pool: &BTreeMap<String, DataValue>, cur_vld: Reverse<i64>) -> Result<InputAtom> {
fn parse_atom(src: Pair<'_>, param_pool: &BTreeMap<String, DataValue>, cur_vld: ValidityTs) -> Result<InputAtom> {
Ok(match src.as_rule() {
Rule::rule_body => {
let span = src.extract_span();
@ -682,7 +682,7 @@ fn parse_fixed_rule(
src: Pair<'_>,
param_pool: &BTreeMap<String, DataValue>,
fixedrithms: &BTreeMap<String, Arc<Box<dyn FixedRule>>>,
cur_vld: Reverse<i64>,
cur_vld: ValidityTs,
) -> Result<(Symbol, FixedRuleApply)> {
let mut src = src.into_inner();
let (out_symbol, head, aggr) = parse_rule_head(src.next().unwrap(), param_pool)?;
@ -865,19 +865,19 @@ fn make_empty_const_rule(prog: &mut InputProgram, bindings: &[Symbol]) {
);
}
fn expr2vld_spec(expr: Expr, cur_vld: Reverse<i64>) -> Result<Reverse<i64>> {
fn expr2vld_spec(expr: Expr, cur_vld: ValidityTs) -> Result<ValidityTs> {
let vld_span = expr.span();
match expr.eval_to_const()? {
DataValue::Num(n) => {
let microseconds = n.get_int().ok_or_else(|| BadValiditySpecification(vld_span))?;
Ok(Reverse(microseconds))
let microseconds = n.get_int().ok_or(BadValiditySpecification(vld_span))?;
Ok(ValidityTs(Reverse(microseconds)))
}
DataValue::Str(s) => {
match &s as &str {
"now" => {
Ok(cur_vld)
}
"max" => { Ok(MAX_VALIDITY) }
"max" => { Ok(MAX_VALIDITY_TS) }
s => {
Ok(str2vld(s).map_err(|_| BadValiditySpecification(vld_span))?)
}

@ -122,6 +122,7 @@ fn parse_type_inner(pair: Pair<'_>) -> Result<ColType> {
Rule::string_type => ColType::String,
Rule::bytes_type => ColType::Bytes,
Rule::uuid_type => ColType::Uuid,
Rule::validity_type => ColType::Validity,
Rule::list_type => {
let mut inner = pair.into_inner();
let eltype = parse_nullable_type(inner.next().unwrap())?;

@ -15,7 +15,7 @@ use thiserror::Error;
use crate::data::aggr::Aggregation;
use crate::data::expr::Expr;
use crate::data::program::{
MagicFixedRuleApply, MagicAtom, MagicInlineRule, MagicRulesOrFixed, MagicSymbol,
MagicAtom, MagicFixedRuleApply, MagicInlineRule, MagicRulesOrFixed, MagicSymbol,
StratifiedMagicProgram,
};
use crate::data::symb::Symbol;
@ -238,7 +238,8 @@ impl<'a> SessionTx<'a> {
}
}
let right = RelAlgebra::relation(right_vars, store, rel_app.span, rel_app.valid_at);
let right =
RelAlgebra::relation(right_vars, store, rel_app.span, rel_app.valid_at)?;
debug_assert_eq!(prev_joiner_vars.len(), right_joiner_vars.len());
ret = ret.join(right, prev_joiner_vars, right_joiner_vars, rel_app.span);
}
@ -306,7 +307,12 @@ impl<'a> SessionTx<'a> {
}
}
let right = RelAlgebra::relation(right_vars, store, relation_app.span, relation_app.valid_at);
let right = RelAlgebra::relation(
right_vars,
store,
relation_app.span,
relation_app.valid_at,
)?;
debug_assert_eq!(prev_joiner_vars.len(), right_joiner_vars.len());
ret = ret.neg_join(
right,

@ -163,7 +163,7 @@ impl<'a> TarjanScc<'a> {
low_map.entry(grp).or_default().push(idx);
}
Ok(low_map.into_iter().map(|(_, vs)| vs).collect_vec())
Ok(low_map.into_values().collect_vec())
}
fn dfs(&mut self, at: usize) {
self.stack.push(at);

@ -10,19 +10,21 @@ use std::collections::BTreeSet;
use std::mem;
use itertools::Itertools;
use miette::{ensure, Result};
use miette::{bail, ensure, Result};
use smallvec::SmallVec;
use smartstring::SmartString;
use crate::data::program::{
FixedRuleArg, MagicFixedRuleApply, MagicFixedRuleRuleArg, MagicAtom, MagicInlineRule, MagicProgram,
MagicRelationApplyAtom, MagicRuleApplyAtom, MagicRulesOrFixed, MagicSymbol,
NormalFormRulesOrFixed, NormalFormAtom, NormalFormInlineRule, NormalFormProgram,
FixedRuleArg, MagicAtom, MagicFixedRuleApply, MagicFixedRuleRuleArg, MagicInlineRule,
MagicProgram, MagicRelationApplyAtom, MagicRuleApplyAtom, MagicRulesOrFixed, MagicSymbol,
NormalFormAtom, NormalFormInlineRule, NormalFormProgram, NormalFormRulesOrFixed,
StratifiedMagicProgram, StratifiedNormalFormProgram,
};
use crate::data::relation::{ColType, NullableColType};
use crate::data::symb::{Symbol, PROG_ENTRY};
use crate::parse::SourceSpan;
use crate::query::logical::NamedFieldNotFound;
use crate::query::ra::InvalidTimeTravelScanning;
use crate::runtime::transact::SessionTx;
impl NormalFormProgram {
@ -332,13 +334,36 @@ impl NormalFormProgram {
name,
bindings,
span,
valid_at
} => MagicFixedRuleRuleArg::Stored {
valid_at,
} => {
if valid_at.is_some() {
let relation = tx.get_relation(name, false)?;
let last_col_type = &relation
.metadata
.keys
.last()
.unwrap()
.typing;
if *last_col_type
!= (NullableColType {
coltype: ColType::Validity,
nullable: false,
})
{
bail!(InvalidTimeTravelScanning(
name.to_string(),
*span
));
}
}
MagicFixedRuleRuleArg::Stored {
name: name.clone(),
bindings: bindings.clone(),
valid_at: *valid_at,
span: *span,
},
}
}
FixedRuleArg::NamedStored {
name,
bindings,
@ -346,6 +371,25 @@ impl NormalFormProgram {
span,
} => {
let relation = tx.get_relation(name, false)?;
if valid_at.is_some() {
let last_col_type = &relation
.metadata
.keys
.last()
.unwrap()
.typing;
if *last_col_type
!= (NullableColType {
coltype: ColType::Validity,
nullable: false,
})
{
bail!(InvalidTimeTravelScanning(
name.to_string(),
*span
));
}
}
let fields: BTreeSet<_> = relation
.metadata
.keys

@ -6,7 +6,6 @@
* You can obtain one at https://mozilla.org/MPL/2.0/.
*/
use std::cmp::Reverse;
use std::collections::{BTreeMap, BTreeSet};
use std::fmt::{Debug, Formatter};
use std::iter;
@ -14,14 +13,15 @@ use std::iter;
use either::{Left, Right};
use itertools::Itertools;
use log::{debug, error};
use miette::{Diagnostic, Result};
use miette::{bail, Diagnostic, Result};
use thiserror::Error;
use crate::data::expr::{compute_bounds, Expr};
use crate::data::program::MagicSymbol;
use crate::data::relation::{ColType, NullableColType};
use crate::data::symb::Symbol;
use crate::data::tuple::{Tuple, TupleIter};
use crate::data::value::DataValue;
use crate::data::value::{DataValue, ValidityTs};
use crate::parse::SourceSpan;
use crate::runtime::relation::RelationHandle;
use crate::runtime::temp_store::EpochStore;
@ -317,6 +317,14 @@ impl Debug for RelAlgebra {
}
}
#[derive(Debug, Error, Diagnostic)]
#[error("Invalid time travel on relation {0}")]
#[diagnostic(code(eval::invalid_time_travel))]
#[diagnostic(help(
"Time travel scanning requires the last key column of the relation to be of type 'Validity'"
))]
pub(crate) struct InvalidTimeTravelScanning(pub(crate) String, #[label] pub(crate) SourceSpan);
impl RelAlgebra {
pub(crate) fn fill_binding_indices(&mut self) -> Result<()> {
match self {
@ -380,25 +388,31 @@ impl RelAlgebra {
bindings: Vec<Symbol>,
storage: RelationHandle,
span: SourceSpan,
validity: Option<Reverse<i64>>,
) -> Self {
validity: Option<ValidityTs>,
) -> Result<Self> {
match validity {
None => {
Self::Stored(StoredRA {
None => Ok(Self::Stored(StoredRA {
bindings,
storage,
filters: vec![],
span,
})
}
})),
Some(vld) => {
Self::StoredWithValidity(StoredWithValidityRA {
if storage.metadata.keys.last().unwrap().typing
!= (NullableColType {
coltype: ColType::Validity,
nullable: false,
})
{
bail!(InvalidTimeTravelScanning(storage.name.to_string(), span));
};
Ok(Self::StoredWithValidity(StoredWithValidityRA {
bindings,
storage,
filters: vec![],
valid_at: vld,
span,
})
}))
}
}
}
@ -789,7 +803,7 @@ pub(crate) struct StoredWithValidityRA {
pub(crate) bindings: Vec<Symbol>,
pub(crate) storage: RelationHandle,
pub(crate) filters: Vec<Expr>,
pub(crate) valid_at: Reverse<i64>,
pub(crate) valid_at: ValidityTs,
pub(crate) span: SourceSpan,
}
@ -808,13 +822,12 @@ impl StoredWithValidityRA {
Ok(())
}
fn iter<'a>(&'a self, tx: &'a SessionTx<'_>) -> Result<TupleIter<'a>> {
todo!()
// let it = self.storage.scan_all(tx);
// Ok(if self.filters.is_empty() {
// Box::new(it)
// } else {
// Box::new(filter_iter(self.filters.clone(), it))
// })
let it = self.storage.skip_scan_all(tx, self.valid_at);
Ok(if self.filters.is_empty() {
Box::new(it)
} else {
Box::new(filter_iter(self.filters.clone(), it))
})
}
fn prefix_join<'a>(
&'a self,
@ -822,89 +835,81 @@ impl StoredWithValidityRA {
left_iter: TupleIter<'a>,
(left_join_indices, right_join_indices): (Vec<usize>, Vec<usize>),
eliminate_indices: BTreeSet<usize>,
left_tuple_len: usize,
) -> Result<TupleIter<'a>> {
todo!()
// let mut right_invert_indices = right_join_indices.iter().enumerate().collect_vec();
// right_invert_indices.sort_by_key(|(_, b)| **b);
// let left_to_prefix_indices = right_invert_indices
// .into_iter()
// .map(|(a, _)| left_join_indices[a])
// .collect_vec();
//
// let key_len = self.storage.metadata.keys.len();
// if left_to_prefix_indices.len() >= key_len {
// return self.point_lookup_join(
// tx,
// left_iter,
// key_len,
// left_to_prefix_indices,
// eliminate_indices,
// left_tuple_len,
// );
// }
//
// let mut skip_range_check = false;
// // In some cases, maybe we can stop as soon as we get one result?
// let it = left_iter
// .map_ok(move |tuple| {
// let prefix = left_to_prefix_indices
// .iter()
// .map(|i| tuple[*i].clone())
// .collect_vec();
//
// if !skip_range_check && !self.filters.is_empty() {
// let other_bindings = &self.bindings[right_join_indices.len()..];
// let (l_bound, u_bound) = match compute_bounds(&self.filters, other_bindings) {
// Ok(b) => b,
// _ => (vec![], vec![]),
// };
// if !l_bound.iter().all(|v| *v == DataValue::Null)
// || !u_bound.iter().all(|v| *v == DataValue::Bot)
// {
// return Left(
// self.storage
// .scan_bounded_prefix(tx, &prefix, &l_bound, &u_bound)
// .map(move |res_found| -> Result<Option<Tuple>> {
// let found = res_found?;
// for p in self.filters.iter() {
// if !p.eval_pred(&found)? {
// return Ok(None);
// }
// }
// let mut ret = tuple.clone();
// ret.extend(found);
// Ok(Some(ret))
// })
// .filter_map(swap_option_result),
// );
// }
// }
// skip_range_check = true;
// Right(
// self.storage
// .scan_prefix(tx, &prefix)
// .map(move |res_found| -> Result<Option<Tuple>> {
// let found = res_found?;
// for p in self.filters.iter() {
// if !p.eval_pred(&found)? {
// return Ok(None);
// }
// }
// let mut ret = tuple.clone();
// ret.extend(found);
// Ok(Some(ret))
// })
// .filter_map(swap_option_result),
// )
// })
// .flatten_ok()
// .map(flatten_err);
// Ok(if eliminate_indices.is_empty() {
// Box::new(it)
// } else {
// Box::new(it.map_ok(move |t| eliminate_from_tuple(t, &eliminate_indices)))
// })
let mut right_invert_indices = right_join_indices.iter().enumerate().collect_vec();
right_invert_indices.sort_by_key(|(_, b)| **b);
let left_to_prefix_indices = right_invert_indices
.into_iter()
.map(|(a, _)| left_join_indices[a])
.collect_vec();
let mut skip_range_check = false;
let it = left_iter
.map_ok(move |tuple| {
let prefix = left_to_prefix_indices
.iter()
.map(|i| tuple[*i].clone())
.collect_vec();
if !skip_range_check && !self.filters.is_empty() {
let other_bindings = &self.bindings[right_join_indices.len()..];
let (l_bound, u_bound) = match compute_bounds(&self.filters, other_bindings) {
Ok(b) => b,
_ => (vec![], vec![]),
};
if !l_bound.iter().all(|v| *v == DataValue::Null)
|| !u_bound.iter().all(|v| *v == DataValue::Bot)
{
return Left(
self.storage
.skip_scan_bounded_prefix(
tx,
&prefix,
&l_bound,
&u_bound,
self.valid_at,
)
.map(move |res_found| -> Result<Option<Tuple>> {
let found = res_found?;
for p in self.filters.iter() {
if !p.eval_pred(&found)? {
return Ok(None);
}
}
let mut ret = tuple.clone();
ret.extend(found);
Ok(Some(ret))
})
.filter_map(swap_option_result),
);
}
}
skip_range_check = true;
Right(
self.storage
.skip_scan_prefix(tx, &prefix, self.valid_at)
.map(move |res_found| -> Result<Option<Tuple>> {
let found = res_found?;
for p in self.filters.iter() {
if !p.eval_pred(&found)? {
return Ok(None);
}
}
let mut ret = tuple.clone();
ret.extend(found);
Ok(Some(ret))
})
.filter_map(swap_option_result),
)
})
.flatten_ok()
.map(flatten_err);
Ok(if eliminate_indices.is_empty() {
Box::new(it)
} else {
Box::new(it.map_ok(move |t| eliminate_from_tuple(t, &eliminate_indices)))
})
}
}
@ -1856,13 +1861,11 @@ impl InnerJoin {
)
.unwrap();
if join_is_prefix(&join_indices.1) {
let left_len = self.left.bindings_after_eliminate().len();
r.prefix_join(
tx,
self.left.iter(tx, delta_rule, stores)?,
join_indices,
eliminate_indices,
left_len,
)
} else {
self.materialized_join(tx, eliminate_indices, delta_rule, stores)

@ -20,7 +20,7 @@ use crate::data::memcmp::MemCmpEncoder;
use crate::data::relation::StoredRelationMetadata;
use crate::data::symb::Symbol;
use crate::data::tuple::{decode_tuple_from_key, Tuple, TupleT, ENCODED_KEY_MIN_LEN};
use crate::data::value::DataValue;
use crate::data::value::{DataValue, ValidityTs};
use crate::parse::SourceSpan;
use crate::runtime::transact::SessionTx;
@ -226,6 +226,16 @@ impl RelationHandle {
tx.store_tx.range_scan_tuple(&lower, &upper)
}
pub(crate) fn skip_scan_all<'a>(
&self,
tx: &'a SessionTx<'_>,
valid_at: ValidityTs,
) -> impl Iterator<Item = Result<Tuple>> + 'a {
let lower = Tuple::default().encode_as_key(self.id);
let upper = Tuple::default().encode_as_key(self.id.next());
tx.store_tx.range_skip_scan_tuple(&lower, &upper, valid_at)
}
pub(crate) fn get<'a>(
&self,
tx: &'a SessionTx<'_>,
@ -254,9 +264,26 @@ impl RelationHandle {
upper.push(DataValue::Bot);
let prefix_encoded = lower.encode_as_key(self.id);
let upper_encoded = upper.encode_as_key(self.id);
// RelationIterator::new(tx, &prefix_encoded, &upper_encoded)
tx.store_tx.range_scan_tuple(&prefix_encoded, &upper_encoded)
tx.store_tx
.range_scan_tuple(&prefix_encoded, &upper_encoded)
}
pub(crate) fn skip_scan_prefix<'a>(
&self,
tx: &'a SessionTx<'_>,
prefix: &Tuple,
valid_at: ValidityTs,
) -> impl Iterator<Item = Result<Tuple>> + 'a {
let mut lower = prefix.clone();
lower.truncate(self.metadata.keys.len());
let mut upper = lower.clone();
upper.push(DataValue::Bot);
let prefix_encoded = lower.encode_as_key(self.id);
let upper_encoded = upper.encode_as_key(self.id);
tx.store_tx
.range_skip_scan_tuple(&prefix_encoded, &upper_encoded, valid_at)
}
pub(crate) fn scan_bounded_prefix<'a>(
&self,
tx: &'a SessionTx<'_>,
@ -273,6 +300,24 @@ impl RelationHandle {
let upper_encoded = upper_t.encode_as_key(self.id);
tx.store_tx.range_scan_tuple(&lower_encoded, &upper_encoded)
}
pub(crate) fn skip_scan_bounded_prefix<'a>(
&self,
tx: &'a SessionTx<'_>,
prefix: &Tuple,
lower: &[DataValue],
upper: &[DataValue],
valid_at: ValidityTs,
) -> impl Iterator<Item = Result<Tuple>> + 'a {
let mut lower_t = prefix.clone();
lower_t.extend_from_slice(lower);
let mut upper_t = prefix.clone();
upper_t.extend_from_slice(upper);
upper_t.push(DataValue::Bot);
let lower_encoded = lower_t.encode_as_key(self.id);
let upper_encoded = upper_t.encode_as_key(self.id);
tx.store_tx
.range_skip_scan_tuple(&lower_encoded, &upper_encoded, valid_at)
}
}
/// Decode tuple from key-value pairs. Used for customizing storage
@ -280,11 +325,15 @@ impl RelationHandle {
#[inline]
pub fn decode_tuple_from_kv(key: &[u8], val: &[u8]) -> Tuple {
let mut tup = decode_tuple_from_key(key);
extend_tuple_from_v(&mut tup, val);
tup
}
pub fn extend_tuple_from_v(key: &mut Tuple, val: &[u8]) {
if !val.is_empty() {
let vals: Vec<DataValue> = rmp_serde::from_slice(&val[ENCODED_KEY_MIN_LEN..]).unwrap();
tup.extend(vals);
key.extend(vals);
}
tup
}
#[derive(Debug, Diagnostic, Error)]

@ -12,13 +12,15 @@ use std::collections::BTreeMap;
use std::default::Default;
use std::iter::Fuse;
use std::mem;
use std::ops::Bound;
use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard};
use itertools::Itertools;
use miette::{bail, Result};
use crate::data::tuple::Tuple;
use crate::runtime::relation::decode_tuple_from_kv;
use crate::data::tuple::{check_key_for_validity, Tuple};
use crate::data::value::ValidityTs;
use crate::runtime::relation::{decode_tuple_from_kv, extend_tuple_from_v};
use crate::storage::{Storage, StoreTx};
use crate::utils::swap_option_result;
@ -191,6 +193,35 @@ impl<'s> StoreTx<'s> for MemTx<'s> {
}
}
fn range_skip_scan_tuple<'a>(
&'a self,
lower: &[u8],
upper: &[u8],
valid_at: ValidityTs,
) -> Box<dyn Iterator<Item = Result<Tuple>> + 'a> {
match self {
MemTx::Reader(stored) => Box::new(
SkipIterator {
inner: stored,
upper: upper.to_vec(),
valid_at,
next_bound: lower.to_vec(),
}
.map(Ok),
),
MemTx::Writer(stored, delta) => Box::new(
SkipDualIterator {
stored,
delta,
upper: upper.to_vec(),
valid_at,
next_bound: lower.to_vec(),
}
.map(Ok),
),
}
}
fn range_scan<'a>(
&'a self,
lower: &[u8],
@ -386,3 +417,102 @@ impl Iterator for CacheIter<'_> {
swap_option_result(self.next_inner())
}
}
/// Keep an eye on https://github.com/rust-lang/rust/issues/49638
struct SkipIterator<'a> {
inner: &'a BTreeMap<Vec<u8>, Vec<u8>>,
upper: Vec<u8>,
valid_at: ValidityTs,
next_bound: Vec<u8>,
}
impl<'a> Iterator for SkipIterator<'a> {
type Item = Tuple;
fn next(&mut self) -> Option<Self::Item> {
loop {
let nxt = self
.inner
.range::<Vec<u8>, (Bound<&Vec<u8>>, Bound<&Vec<u8>>)>((
Bound::Included(&self.next_bound),
Bound::Excluded(&self.upper),
))
.next();
match nxt {
None => return None,
Some((candidate_key, candidate_val)) => {
let (ret, nxt_bound) = check_key_for_validity(candidate_key, self.valid_at);
self.next_bound = nxt_bound;
if let Some(mut nk) = ret {
extend_tuple_from_v(&mut nk, candidate_val);
return Some(nk);
}
}
}
}
}
}
struct SkipDualIterator<'a> {
stored: &'a BTreeMap<Vec<u8>, Vec<u8>>,
delta: &'a BTreeMap<Vec<u8>, Option<Vec<u8>>>,
upper: Vec<u8>,
valid_at: ValidityTs,
next_bound: Vec<u8>,
}
impl<'a> Iterator for SkipDualIterator<'a> {
type Item = Tuple;
fn next(&mut self) -> Option<Self::Item> {
loop {
let stored_nxt = self
.stored
.range::<Vec<u8>, (Bound<&Vec<u8>>, Bound<&Vec<u8>>)>((
Bound::Included(&self.next_bound),
Bound::Excluded(&self.upper),
))
.next();
let delta_nxt = self
.delta
.range::<Vec<u8>, (Bound<&Vec<u8>>, Bound<&Vec<u8>>)>((
Bound::Included(&self.next_bound),
Bound::Excluded(&self.upper),
))
.next();
let (candidate_key, candidate_val) = match (stored_nxt, delta_nxt) {
(None, None) => return None,
(None, Some((delta_key, maybe_delta_val))) => match maybe_delta_val {
None => {
let (_, nxt_seek) = check_key_for_validity(delta_key, self.valid_at);
self.next_bound = nxt_seek;
continue;
}
Some(delta_val) => (delta_key, delta_val),
},
(Some((stored_key, stored_val)), None) => (stored_key, stored_val),
(Some((stored_key, stored_val)), Some((delta_key, maybe_delta_val))) => {
if stored_key < delta_key {
(stored_key, stored_val)
} else {
match maybe_delta_val {
None => {
let (_, nxt_seek) =
check_key_for_validity(delta_key, self.valid_at);
self.next_bound = nxt_seek;
continue;
}
Some(delta_val) => (delta_key, delta_val),
}
}
}
};
let (ret, nxt_bound) = check_key_for_validity(candidate_key, self.valid_at);
self.next_bound = nxt_bound;
if let Some(mut nk) = ret {
extend_tuple_from_v(&mut nk, candidate_val);
return Some(nk);
}
}
}
}

@ -10,6 +10,7 @@ use itertools::Itertools;
use miette::Result;
use crate::data::tuple::Tuple;
use crate::data::value::ValidityTs;
use crate::decode_tuple_from_kv;
pub(crate) mod mem;
@ -93,6 +94,29 @@ pub trait StoreTx<'s> {
Box::new(it.map_ok(|(k, v)| decode_tuple_from_kv(&k, &v)))
}
/// Scan on a range with a certain validity.
///
/// `lower` is inclusive whereas `upper` is exclusive.
/// For tuples that differ only with respect to their validity, which must be at
/// the last slot of the key,
/// only the tuple that has validity equal to or earlier than (i.e. greater by the comparator)
/// `valid_at` should be considered for returning, and only those with an assertive validity
/// should be returned. Every other tuple should be skipped.
///
/// Ideally, implementations should take advantage of seeking capabilities of the
/// underlying storage so that not every tuple within the `lower` and `upper` range
/// need to be looked at.
///
/// For custom implementations, it is OK to return an iterator that always error out,
/// in which case the database with the engine does not support time travelling.
/// You should indicate this clearly in your error message.
fn range_skip_scan_tuple<'a>(
&'a self,
lower: &[u8],
upper: &[u8],
valid_at: ValidityTs,
) -> Box<dyn Iterator<Item = Result<Tuple>> + 'a>;
/// Scan on a range and return the raw results.
/// `lower` is inclusive whereas `upper` is exclusive.
fn range_scan<'a>(

@ -14,9 +14,10 @@ use miette::{miette, IntoDiagnostic, Result, WrapErr};
use cozorocks::{DbBuilder, DbIter, RocksDb, Tx};
use crate::data::tuple::Tuple;
use crate::data::tuple::{check_key_for_validity, Tuple};
use crate::data::value::ValidityTs;
use crate::runtime::db::{BadDbInit, DbManifest};
use crate::runtime::relation::decode_tuple_from_kv;
use crate::runtime::relation::{decode_tuple_from_kv, extend_tuple_from_v};
use crate::storage::{Storage, StoreTx};
use crate::utils::swap_option_result;
use crate::Db;
@ -195,6 +196,21 @@ impl<'s> StoreTx<'s> for RocksDbTx {
})
}
fn range_skip_scan_tuple<'a>(
&'a self,
lower: &[u8],
upper: &[u8],
valid_at: ValidityTs,
) -> Box<dyn Iterator<Item = Result<Tuple>> + 'a> {
let inner = self.db_tx.iterator().upper_bound(upper).start();
Box::new(RocksDbSkipIterator {
inner,
upper_bound: upper.to_vec(),
next_bound: lower.to_owned(),
valid_at
})
}
fn range_scan<'a>(
&'a self,
lower: &[u8],
@ -256,6 +272,47 @@ impl Iterator for RocksDbIterator {
}
}
pub(crate) struct RocksDbSkipIterator {
inner: DbIter,
upper_bound: Vec<u8>,
next_bound: Vec<u8>,
valid_at: ValidityTs
}
impl RocksDbSkipIterator {
#[inline]
fn next_inner(&mut self) -> Result<Option<Tuple>> {
loop {
self.inner.seek(&self.next_bound);
match self.inner.pair()? {
None => return Ok(None),
Some((k_slice, v_slice)) => {
if self.upper_bound.as_slice() <= k_slice {
return Ok(None);
}
let (ret, nxt_bound) = check_key_for_validity(k_slice, self.valid_at);
self.next_bound = nxt_bound;
if let Some(mut tup) = ret {
extend_tuple_from_v(&mut tup, v_slice);
return Ok(Some(tup));
}
}
}
}
}
}
impl Iterator for RocksDbSkipIterator {
type Item = Result<Tuple>;
#[inline]
fn next(&mut self) -> Option<Self::Item> {
swap_option_result(self.next_inner())
}
}
pub(crate) struct RocksDbIteratorRaw {
inner: DbIter,
started: bool,

@ -9,13 +9,14 @@
use std::cmp::Ordering;
use std::iter::Fuse;
use std::path::Path;
use std::thread;
use std::{iter, thread};
use itertools::Itertools;
use miette::{IntoDiagnostic, Result};
use miette::{miette, IntoDiagnostic, Result};
use sled::{Batch, Config, Db, IVec, Iter, Mode};
use crate::data::tuple::Tuple;
use crate::data::value::ValidityTs;
use crate::runtime::relation::decode_tuple_from_kv;
use crate::storage::{Storage, StoreTx};
use crate::utils::swap_option_result;
@ -203,6 +204,17 @@ impl<'s> StoreTx<'s> for SledTx {
}
}
fn range_skip_scan_tuple<'a>(
&'a self,
_lower: &[u8],
_upper: &[u8],
_valid_at: ValidityTs,
) -> Box<dyn Iterator<Item = Result<Tuple>> + 'a> {
Box::new(iter::once(miette!(
"Sled backend does not support time travelling."
)))
}
fn range_scan<'a>(
&'a self,
lower: &[u8],

@ -14,9 +14,11 @@ use either::{Either, Left, Right};
use miette::{bail, miette, IntoDiagnostic, Result};
use sqlite::{State, Statement};
use crate::data::tuple::Tuple;
use crate::runtime::relation::decode_tuple_from_kv;
use crate::data::tuple::{check_key_for_validity, Tuple};
use crate::data::value::ValidityTs;
use crate::runtime::relation::{decode_tuple_from_kv, extend_tuple_from_v};
use crate::storage::{Storage, StoreTx};
use crate::utils::swap_option_result;
/// The Sqlite storage engine
#[derive(Clone)]
@ -144,7 +146,7 @@ pub struct SqliteTx<'a> {
committed: bool,
}
const N_QUERIES: usize = 5;
const N_QUERIES: usize = 6;
const N_CACHED_QUERIES: usize = 4;
const QUERIES: [&str; N_QUERIES] = [
"select v from cozo where k = ?;",
@ -152,6 +154,7 @@ const QUERIES: [&str; N_QUERIES] = [
"delete from cozo where k = ?;",
"select 1 from cozo where k = ?;",
"select k, v from cozo where k >= ? and k < ? order by k;",
"select k, v from cozo where k >= ? and k < ? order by k limit 1;",
];
const GET_QUERY: usize = 0;
@ -159,6 +162,7 @@ const PUT_QUERY: usize = 1;
const DEL_QUERY: usize = 2;
const EXISTS_QUERY: usize = 3;
const RANGE_QUERY: usize = 4;
const SKIP_RANGE_QUERY: usize = 5;
impl Drop for SqliteTx<'_> {
fn drop(&mut self) {
@ -276,6 +280,22 @@ impl<'s> StoreTx<'s> for SqliteTx<'s> {
Box::new(TupleIter(statement))
}
fn range_skip_scan_tuple<'a>(
&'a self,
lower: &[u8],
upper: &[u8],
valid_at: ValidityTs,
) -> Box<dyn Iterator<Item = Result<Tuple>> + 'a> {
let query = QUERIES[SKIP_RANGE_QUERY];
let statement = self.conn.as_ref().unwrap().prepare(query).unwrap();
Box::new(SkipIter {
stmt: statement,
valid_at,
next_bound: lower.to_vec(),
upper_bound: upper.to_vec(),
})
}
fn range_scan<'a>(
&'a self,
lower: &[u8],
@ -341,3 +361,42 @@ impl<'l> Iterator for RawIter<'l> {
}
}
}
struct SkipIter<'l> {
stmt: Statement<'l>,
valid_at: ValidityTs,
next_bound: Vec<u8>,
upper_bound: Vec<u8>,
}
impl<'l> SkipIter<'l> {
fn next_inner(&mut self) -> Result<Option<Tuple>> {
loop {
self.stmt.reset().into_diagnostic()?;
self.stmt.bind((1, &self.next_bound as &[u8])).unwrap();
self.stmt.bind((2, &self.upper_bound as &[u8])).unwrap();
match self.stmt.next().into_diagnostic()? {
State::Done => return Ok(None),
State::Row => {
let k = self.stmt.read::<Vec<u8>, _>(0).unwrap();
let (ret, nxt_bound) = check_key_for_validity(&k, self.valid_at);
self.next_bound = nxt_bound;
if let Some(mut tup) = ret {
let v = self.stmt.read::<Vec<u8>, _>(1).unwrap();
extend_tuple_from_v(&mut tup, &v);
return Ok(Some(tup))
}
}
}
}
}
}
impl<'l> Iterator for SkipIter<'l> {
type Item = Result<Tuple>;
fn next(&mut self) -> Option<Self::Item> {
swap_option_result(self.next_inner())
}
}

@ -8,15 +8,16 @@
use std::ops::Bound::{Excluded, Included};
use std::sync::{Arc, Mutex};
use std::thread;
use std::{iter, thread};
use itertools::Itertools;
use lazy_static::lazy_static;
use miette::{IntoDiagnostic, Result};
use miette::{miette, IntoDiagnostic, Result};
use tikv_client::{RawClient, Transaction, TransactionClient};
use tokio::runtime::Runtime;
use crate::data::tuple::Tuple;
use crate::data::value::ValidityTs;
use crate::runtime::relation::decode_tuple_from_kv;
use crate::storage::{Storage, StoreTx};
use crate::utils::swap_option_result;
@ -160,6 +161,17 @@ impl<'s> StoreTx<'s> for TiKvTx {
Box::new(BatchScanner::new(self.tx.clone(), lower, upper))
}
fn range_skip_scan_tuple<'a>(
&'a self,
_lower: &[u8],
_upper: &[u8],
_valid_at: ValidityTs,
) -> Box<dyn Iterator<Item = Result<Tuple>> + 'a> {
Box::new(iter::once(miette!(
"TiKV backend does not support time travelling."
)))
}
fn range_scan<'a>(
&'a self,
lower: &[u8],

@ -334,7 +334,7 @@ fn build_rocksdb() {
fn try_to_find_and_link_lib(lib_name: &str) -> bool {
println!("cargo:rerun-if-env-changed={}_COMPILE", lib_name);
if let Ok(v) = env::var(&format!("{}_COMPILE", lib_name)) {
if let Ok(v) = env::var(format!("{}_COMPILE", lib_name)) {
if v.to_lowercase() == "true" || v == "1" {
return false;
}
@ -343,9 +343,9 @@ fn try_to_find_and_link_lib(lib_name: &str) -> bool {
println!("cargo:rerun-if-env-changed={}_LIB_DIR", lib_name);
println!("cargo:rerun-if-env-changed={}_STATIC", lib_name);
if let Ok(lib_dir) = env::var(&format!("{}_LIB_DIR", lib_name)) {
if let Ok(lib_dir) = env::var(format!("{}_LIB_DIR", lib_name)) {
println!("cargo:rustc-link-search=native={}", lib_dir);
let mode = match env::var_os(&format!("{}_STATIC", lib_name)) {
let mode = match env::var_os(format!("{}_STATIC", lib_name)) {
Some(_) => "static",
None => "dylib",
};

Loading…
Cancel
Save