validity inputs

main
Ziyang Hu 2 years ago
parent 7aaea2de42
commit 115e955265

@ -48,11 +48,13 @@ wasm = ["uuid/js", "dep:js-sys"]
## Enables the [Sled](https://github.com/spacejam/sled) backend. ## Enables the [Sled](https://github.com/spacejam/sled) backend.
## Sled is slower than Sqlite for the usual workload of Cozo, can use quite a lot of disk space, ## Sled is slower than Sqlite for the usual workload of Cozo, can use quite a lot of disk space,
## and may not be stable enough. In general you should use RocksDB instead. ## and may not be stable enough. In general you should use RocksDB instead.
## The Sled engine does not support time travel.
storage-sled = ["dep:sled"] storage-sled = ["dep:sled"]
## Enables the [TiKV](https://tikv.org/) client backend. ## Enables the [TiKV](https://tikv.org/) client backend.
## The only reason that you may want to use this is that your data does not fit in a single machine. ## The only reason that you may want to use this is that your data does not fit in a single machine.
## This engine is orders of magnitude slower than every other engine for graph traversals, due to the ## This engine is orders of magnitude slower than every other engine for graph traversals, due to the
## significant network overhead. Simple point-lookup queries are fine, though. ## significant network overhead. Simple point-lookup queries are fine, though.
## The TiKV engine does not support time travel.
storage-tikv = ["dep:tikv-client", "dep:tokio"] storage-tikv = ["dep:tikv-client", "dep:tokio"]
#! # Recommendation for features to enable #! # Recommendation for features to enable

@ -1479,14 +1479,14 @@ pub(crate) fn op_now(_args: &[DataValue]) -> Result<DataValue> {
pub(crate) fn current_validity() -> ValidityTs { pub(crate) fn current_validity() -> ValidityTs {
#[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))] #[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))]
let ts_millis = { let ts_micros = {
let now = SystemTime::now(); let now = SystemTime::now();
(now.duration_since(UNIX_EPOCH).unwrap().as_secs_f64() * 1000.) as i64 now.duration_since(UNIX_EPOCH).unwrap().as_micros() as i64
}; };
#[cfg(all(target_arch = "wasm32", target_os = "unknown"))] #[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
let ts_millis = { Date::now() as i64 }; let ts_micros = { Date::now() * 1000 as i64 };
ValidityTs(Reverse(ts_millis)) ValidityTs(Reverse(ts_micros))
} }
pub(crate) const MAX_VALIDITY_TS: ValidityTs = ValidityTs(Reverse(i64::MAX)); pub(crate) const MAX_VALIDITY_TS: ValidityTs = ValidityTs(Reverse(i64::MAX));
@ -1497,20 +1497,19 @@ pub(crate) const TERMINAL_VALIDITY: Validity = Validity {
define_op!(OP_FORMAT_TIMESTAMP, 1, true); define_op!(OP_FORMAT_TIMESTAMP, 1, true);
pub(crate) fn op_format_timestamp(args: &[DataValue]) -> Result<DataValue> { pub(crate) fn op_format_timestamp(args: &[DataValue]) -> Result<DataValue> {
#[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
let dt = Utc
.timestamp_millis_opt(Date::now() as i64)
.latest()
.ok_or_else(|| miette!("bad time input"))?;
#[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))]
let dt = { let dt = {
let f = args[0] let millis = match &args[0] {
.get_float() DataValue::Validity(vld) => vld.timestamp.0 .0 / 1000,
.ok_or_else(|| miette!("'format_timestamp' expects a number"))?; v => {
let millis = (f * 1000.) as i64; let f = v
.get_float()
.ok_or_else(|| miette!("'format_timestamp' expects a number"))?;
(f * 1000.) as i64
}
};
Utc.timestamp_millis_opt(millis) Utc.timestamp_millis_opt(millis)
.latest() .latest()
.ok_or_else(|| miette!("bad time: {}", f))? .ok_or_else(|| miette!("bad time: {}", &args[0]))?
}; };
match args.get(1) { match args.get(1) {
Some(tz_v) => { Some(tz_v) => {
@ -1545,7 +1544,7 @@ pub(crate) fn op_parse_timestamp(args: &[DataValue]) -> Result<DataValue> {
pub(crate) fn str2vld(s: &str) -> Result<ValidityTs> { pub(crate) fn str2vld(s: &str) -> Result<ValidityTs> {
let dt = DateTime::parse_from_rfc3339(s).map_err(|_| miette!("bad datetime: {}", s))?; let dt = DateTime::parse_from_rfc3339(s).map_err(|_| miette!("bad datetime: {}", s))?;
let st: SystemTime = dt.into(); let st: SystemTime = dt.into();
let microseconds = st.duration_since(UNIX_EPOCH).unwrap().as_secs_f64() * 1_000_000.; let microseconds = st.duration_since(UNIX_EPOCH).unwrap().as_micros();
Ok(ValidityTs(Reverse(microseconds as i64))) Ok(ValidityTs(Reverse(microseconds as i64)))
} }

@ -6,7 +6,10 @@
* You can obtain one at https://mozilla.org/MPL/2.0/. * You can obtain one at https://mozilla.org/MPL/2.0/.
*/ */
use chrono::DateTime;
use std::cmp::Reverse;
use std::fmt::{Display, Formatter}; use std::fmt::{Display, Formatter};
use std::time::{SystemTime, UNIX_EPOCH};
use itertools::Itertools; use itertools::Itertools;
use miette::{bail, ensure, Diagnostic, Result}; use miette::{bail, ensure, Diagnostic, Result};
@ -14,7 +17,7 @@ use smartstring::{LazyCompact, SmartString};
use thiserror::Error; use thiserror::Error;
use crate::data::expr::Expr; use crate::data::expr::Expr;
use crate::data::value::{DataValue, UuidWrapper}; use crate::data::value::{DataValue, UuidWrapper, Validity, ValidityTs};
#[derive(Debug, Clone, Eq, PartialEq, serde_derive::Deserialize, serde_derive::Serialize)] #[derive(Debug, Clone, Eq, PartialEq, serde_derive::Deserialize, serde_derive::Serialize)]
pub(crate) struct NullableColType { pub(crate) struct NullableColType {
@ -74,7 +77,7 @@ pub(crate) enum ColType {
len: Option<usize>, len: Option<usize>,
}, },
Tuple(Vec<NullableColType>), Tuple(Vec<NullableColType>),
Validity Validity,
} }
#[derive(Debug, Clone, Eq, PartialEq, serde_derive::Deserialize, serde_derive::Serialize)] #[derive(Debug, Clone, Eq, PartialEq, serde_derive::Deserialize, serde_derive::Serialize)]
@ -140,7 +143,7 @@ impl StoredRelationMetadata {
} }
impl NullableColType { impl NullableColType {
pub(crate) fn coerce(&self, data: DataValue) -> Result<DataValue> { pub(crate) fn coerce(&self, data: DataValue, cur_vld: ValidityTs) -> Result<DataValue> {
if matches!(data, DataValue::Null) { if matches!(data, DataValue::Null) {
return if self.nullable { return if self.nullable {
Ok(data) Ok(data)
@ -196,7 +199,11 @@ impl NullableColType {
if let Some(expected) = len { if let Some(expected) = len {
ensure!(*expected == l.len(), BadListLength(self.clone(), l.len())) ensure!(*expected == l.len(), BadListLength(self.clone(), l.len()))
} }
DataValue::List(l.into_iter().map(|el| eltype.coerce(el)).try_collect()?) DataValue::List(
l.into_iter()
.map(|el| eltype.coerce(el, cur_vld))
.try_collect()?,
)
} else { } else {
bail!(make_err()) bail!(make_err())
} }
@ -207,7 +214,7 @@ impl NullableColType {
DataValue::List( DataValue::List(
l.into_iter() l.into_iter()
.zip(typ.iter()) .zip(typ.iter())
.map(|(el, t)| t.coerce(el)) .map(|(el, t)| t.coerce(el, cur_vld))
.try_collect()?, .try_collect()?,
) )
} else { } else {
@ -215,9 +222,60 @@ impl NullableColType {
} }
} }
ColType::Validity => { ColType::Validity => {
#[derive(Debug, Error, Diagnostic)]
#[error("{0} cannot be coerced into validity")]
#[diagnostic(code(eval::invalid_validity))]
struct InvalidValidity(DataValue);
match data { match data {
vld@DataValue::Validity(_) => vld, vld @ DataValue::Validity(_) => vld,
_ => todo!() DataValue::Str(s) => match &s as &str {
"ASSERT" => DataValue::Validity(Validity {
timestamp: cur_vld,
is_assert: true,
}),
"RETRACT" => DataValue::Validity(Validity {
timestamp: cur_vld,
is_assert: false,
}),
s => {
let (is_assert, ts_str) = match s.strip_prefix('~') {
None => (true, s),
Some(remaining) => (false, remaining),
};
let dt = DateTime::parse_from_rfc3339(ts_str)
.map_err(|_| InvalidValidity(DataValue::Str(s.into())))?;
let st: SystemTime = dt.into();
let microseconds =
st.duration_since(UNIX_EPOCH).unwrap().as_micros() as i64;
if microseconds == i64::MAX || microseconds == i64::MIN {
bail!(InvalidValidity(DataValue::Str(s.into())))
}
DataValue::Validity(Validity {
timestamp: ValidityTs(Reverse(microseconds)),
is_assert,
})
}
},
DataValue::List(l) => {
if l.len() == 2 {
let o_ts = l[0].get_int();
let o_is_assert = l[1].get_bool();
if let (Some(ts), Some(is_assert)) = (o_ts, o_is_assert) {
if ts == i64::MAX || ts == i64::MIN {
bail!(InvalidValidity(DataValue::List(l)))
}
return Ok(DataValue::Validity(Validity {
timestamp: ValidityTs(Reverse(ts)),
is_assert,
}));
}
}
bail!(InvalidValidity(DataValue::List(l)))
}
v => bail!(InvalidValidity(v)),
} }
} }
}) })

@ -7,4 +7,5 @@
*/ */
mod functions; mod functions;
mod aggrs; mod aggrs;
mod validity;

@ -0,0 +1,205 @@
use crate::DbInstance;
use serde_json::json;
use std::env;
#[test]
fn test_validity() {
let path = "_test_validity";
let _ = std::fs::remove_file(path);
let _ = std::fs::remove_dir_all(path);
let db_kind = env::var("COZO_TEST_DB_ENGINE").unwrap_or("mem".to_string());
println!("Using {} engine", db_kind);
let db = DbInstance::new(&db_kind, path, Default::default()).unwrap();
db.run_script(":create vld {a, v: Validity => d}", Default::default())
.unwrap();
assert!(db
.run_script(
r#"
?[a, v, d] <- [[1, [9223372036854775807, true], null]]
:put vld {a, v => d}
"#,
Default::default(),
)
.is_err());
assert!(db
.run_script(
r#"
?[a, v, d] <- [[1, [-9223372036854775808, true], null]]
:put vld {a, v => d}
"#,
Default::default(),
)
.is_err());
db.run_script(
r#"
?[a, v, d] <- [[1, [0, true], 0]]
:put vld {a, v => d}
"#,
Default::default(),
)
.unwrap();
let res = db
.run_script(
r#"
?[a, v, d] := *vld{a, v, d @ "NOW"}
"#,
Default::default(),
)
.unwrap()
.rows;
assert_eq!(res.len(), 1);
let res = db
.run_script(
r#"
?[a, v, d] := *vld{a, v, d}
"#,
Default::default(),
)
.unwrap()
.rows;
assert_eq!(res.len(), 1);
db.run_script(
r#"
?[a, v, d] <- [[1, [1, false], 1]]
:put vld {a, v => d}
"#,
Default::default(),
)
.unwrap();
let res = db
.run_script(
r#"
?[a, v, d] := *vld{a, v, d @ "NOW"}
"#,
Default::default(),
)
.unwrap()
.rows;
assert_eq!(res.len(), 0);
let res = db
.run_script(
r#"
?[a, v, d] := *vld{a, v, d}
"#,
Default::default(),
)
.unwrap()
.rows;
assert_eq!(res.len(), 2);
db.run_script(
r#"
?[a, v, d] <- [[1, "ASSERT", 2]]
:put vld {a, v => d}
"#,
Default::default(),
)
.unwrap();
let res = db
.run_script(
r#"
?[a, v, d] := *vld{a, v, d @ "NOW"}
"#,
Default::default(),
)
.unwrap()
.rows;
assert_eq!(res.len(), 1);
assert_eq!(res[0][2].as_i64().unwrap(), 2);
let res = db
.run_script(
r#"
?[a, v, d] := *vld{a, v, d}
"#,
Default::default(),
)
.unwrap()
.rows;
assert_eq!(res.len(), 3);
db.run_script(
r#"
?[a, v, d] <- [[1, "RETRACT", 3]]
:put vld {a, v => d}
"#,
Default::default(),
)
.unwrap();
let res = db
.run_script(
r#"
?[a, v, d] := *vld{a, v, d @ "NOW"}
"#,
Default::default(),
)
.unwrap()
.rows;
assert_eq!(res.len(), 0);
let res = db
.run_script(
r#"
?[a, v, d] := *vld{a, v, d}
"#,
Default::default(),
)
.unwrap()
.rows;
assert_eq!(res.len(), 4);
db.run_script(
r#"
?[a, v, d] <- [[1, [9223372036854775806, true], null]]
:put vld {a, v => d}
"#,
Default::default(),
)
.unwrap();
let res = db
.run_script(
r#"
?[a, v, d] := *vld{a, v, d @ "NOW"}
"#,
Default::default(),
)
.unwrap()
.rows;
assert_eq!(res.len(), 0);
let res = db
.run_script(
r#"
?[a, v, d] := *vld{a, v, d @ "END"}
"#,
Default::default(),
)
.unwrap()
.rows;
assert_eq!(res.len(), 1);
assert_eq!(res[0][2], json!(null));
let res = db
.run_script(
r#"
?[a, v, d] := *vld{a, v, d}
"#,
Default::default(),
)
.unwrap()
.rows;
assert_eq!(res.len(), 5);
println!("{}", json!(res));
}

@ -16,7 +16,7 @@ use smartstring::{LazyCompact, SmartString};
use crate::fixed_rule::utilities::jlines::get_file_content_from_url; use crate::fixed_rule::utilities::jlines::get_file_content_from_url;
use crate::fixed_rule::{FixedRule, FixedRulePayload, CannotDetermineArity}; use crate::fixed_rule::{FixedRule, FixedRulePayload, CannotDetermineArity};
use crate::data::expr::Expr; use crate::data::expr::Expr;
use crate::data::functions::{op_to_float, op_to_uuid}; use crate::data::functions::{op_to_float, op_to_uuid, TERMINAL_VALIDITY};
use crate::data::program::{FixedRuleOptionNotFoundError, WrongFixedRuleOptionError}; use crate::data::program::{FixedRuleOptionNotFoundError, WrongFixedRuleOptionError};
use crate::data::relation::{ColType, NullableColType}; use crate::data::relation::{ColType, NullableColType};
use crate::data::symb::Symbol; use crate::data::symb::Symbol;
@ -59,7 +59,7 @@ impl FixedRule for CsvReader {
}, },
nullable: false, nullable: false,
}; };
let types_opts = typing.coerce(types_opts)?; let types_opts = typing.coerce(types_opts, TERMINAL_VALIDITY.timestamp)?;
let mut types = vec![]; let mut types = vec![];
for type_str in types_opts.get_list().unwrap() { for type_str in types_opts.get_list().unwrap() {
let type_str = type_str.get_string().unwrap(); let type_str = type_str.get_string().unwrap();

@ -14,11 +14,10 @@ use miette::{bail, ensure, Diagnostic, IntoDiagnostic, Result};
use pest::error::InputLocation; use pest::error::InputLocation;
use pest::Parser; use pest::Parser;
use thiserror::Error; use thiserror::Error;
use crate::data::functions::current_validity;
use crate::data::program::InputProgram; use crate::data::program::InputProgram;
use crate::data::relation::NullableColType; use crate::data::relation::NullableColType;
use crate::data::value::{DataValue}; use crate::data::value::{DataValue, ValidityTs};
use crate::parse::query::parse_query; use crate::parse::query::parse_query;
use crate::parse::schema::parse_nullable_type; use crate::parse::schema::parse_nullable_type;
use crate::parse::sys::{parse_sys, SysOp}; use crate::parse::sys::{parse_sys, SysOp};
@ -108,6 +107,7 @@ pub(crate) fn parse_script(
src: &str, src: &str,
param_pool: &BTreeMap<String, DataValue>, param_pool: &BTreeMap<String, DataValue>,
algorithms: &BTreeMap<String, Arc<Box<dyn FixedRule>>>, algorithms: &BTreeMap<String, Arc<Box<dyn FixedRule>>>,
cur_vld: ValidityTs
) -> Result<CozoScript> { ) -> Result<CozoScript> {
let parsed = CozoScriptParser::parse(Rule::script, src) let parsed = CozoScriptParser::parse(Rule::script, src)
.map_err(|err| { .map_err(|err| {
@ -119,7 +119,6 @@ pub(crate) fn parse_script(
})? })?
.next() .next()
.unwrap(); .unwrap();
let cur_vld = current_validity();
Ok(match parsed.as_rule() { Ok(match parsed.as_rule() {
Rule::query_script => { Rule::query_script => {
let q = parse_query(parsed.into_inner(), param_pool, algorithms, cur_vld)?; let q = parse_query(parsed.into_inner(), param_pool, algorithms, cur_vld)?;
@ -135,7 +134,7 @@ pub(crate) fn parse_script(
CozoScript::Multi(qs) CozoScript::Multi(qs)
} }
Rule::sys_script => { Rule::sys_script => {
CozoScript::Sys(parse_sys(parsed.into_inner(), param_pool, algorithms)?) CozoScript::Sys(parse_sys(parsed.into_inner(), param_pool, algorithms, cur_vld)?)
} }
_ => unreachable!(), _ => unreachable!(),
}) })

@ -22,7 +22,7 @@ use thiserror::Error;
use crate::data::aggr::{parse_aggr, Aggregation}; use crate::data::aggr::{parse_aggr, Aggregation};
use crate::data::expr::Expr; use crate::data::expr::Expr;
use crate::data::functions::{MAX_VALIDITY_TS, str2vld}; use crate::data::functions::{str2vld, MAX_VALIDITY_TS};
use crate::data::program::{ use crate::data::program::{
FixedRuleApply, FixedRuleArg, InputAtom, InputInlineRule, InputInlineRulesOrFixed, FixedRuleApply, FixedRuleArg, InputAtom, InputInlineRule, InputInlineRulesOrFixed,
InputNamedFieldRelationApplyAtom, InputProgram, InputRelationApplyAtom, InputRuleApplyAtom, InputNamedFieldRelationApplyAtom, InputProgram, InputRelationApplyAtom, InputRuleApplyAtom,
@ -78,7 +78,7 @@ impl Diagnostic for MultipleRuleDefinitionError {
fn code<'a>(&'a self) -> Option<Box<dyn Display + 'a>> { fn code<'a>(&'a self) -> Option<Box<dyn Display + 'a>> {
Some(Box::new("parser::mult_rule_def")) Some(Box::new("parser::mult_rule_def"))
} }
fn labels(&self) -> Option<Box<dyn Iterator<Item=LabeledSpan> + '_>> { fn labels(&self) -> Option<Box<dyn Iterator<Item = LabeledSpan> + '_>> {
Some(Box::new( Some(Box::new(
self.1.iter().map(|s| LabeledSpan::new_with_span(None, s)), self.1.iter().map(|s| LabeledSpan::new_with_span(None, s)),
)) ))
@ -354,13 +354,13 @@ pub(crate) fn parse_query(
if prog.prog.is_empty() { if prog.prog.is_empty() {
if let Some(( if let Some((
InputRelationHandle { InputRelationHandle {
key_bindings, key_bindings,
dep_bindings, dep_bindings,
.. ..
}, },
RelationOp::Create, RelationOp::Create,
)) = &prog.out_opts.store_relation )) = &prog.out_opts.store_relation
{ {
let mut bindings = key_bindings.clone(); let mut bindings = key_bindings.clone();
bindings.extend_from_slice(dep_bindings); bindings.extend_from_slice(dep_bindings);
@ -483,7 +483,11 @@ fn parse_disjunction(
}) })
} }
fn parse_atom(src: Pair<'_>, param_pool: &BTreeMap<String, DataValue>, cur_vld: ValidityTs) -> Result<InputAtom> { fn parse_atom(
src: Pair<'_>,
param_pool: &BTreeMap<String, DataValue>,
cur_vld: ValidityTs,
) -> Result<InputAtom> {
Ok(match src.as_rule() { Ok(match src.as_rule() {
Rule::rule_body => { Rule::rule_body => {
let span = src.extract_span(); let span = src.extract_span();
@ -612,7 +616,12 @@ fn parse_atom(src: Pair<'_>, param_pool: &BTreeMap<String, DataValue>, cur_vld:
} }
}; };
InputAtom::NamedFieldRelation { InputAtom::NamedFieldRelation {
inner: InputNamedFieldRelationApplyAtom { name, args, span, valid_at }, inner: InputNamedFieldRelationApplyAtom {
name,
args,
span,
valid_at,
},
} }
} }
rule => unreachable!("{:?}", rule), rule => unreachable!("{:?}", rule),
@ -872,19 +881,13 @@ fn expr2vld_spec(expr: Expr, cur_vld: ValidityTs) -> Result<ValidityTs> {
let microseconds = n.get_int().ok_or(BadValiditySpecification(vld_span))?; let microseconds = n.get_int().ok_or(BadValiditySpecification(vld_span))?;
Ok(ValidityTs(Reverse(microseconds))) Ok(ValidityTs(Reverse(microseconds)))
} }
DataValue::Str(s) => { DataValue::Str(s) => match &s as &str {
match &s as &str { "NOW" => Ok(cur_vld),
"now" => { "END" => Ok(MAX_VALIDITY_TS),
Ok(cur_vld) s => Ok(str2vld(s).map_err(|_| BadValiditySpecification(vld_span))?),
} },
"max" => { Ok(MAX_VALIDITY_TS) }
s => {
Ok(str2vld(s).map_err(|_| BadValiditySpecification(vld_span))?)
}
}
}
_ => { _ => {
bail!(BadValiditySpecification(vld_span)) bail!(BadValiditySpecification(vld_span))
} }
} }
} }

@ -12,11 +12,10 @@ use std::sync::Arc;
use itertools::Itertools; use itertools::Itertools;
use miette::{Diagnostic, Result}; use miette::{Diagnostic, Result};
use thiserror::Error; use thiserror::Error;
use crate::data::functions::{current_validity};
use crate::data::program::InputProgram; use crate::data::program::InputProgram;
use crate::data::symb::Symbol; use crate::data::symb::Symbol;
use crate::data::value::DataValue; use crate::data::value::{DataValue, ValidityTs};
use crate::parse::query::parse_query; use crate::parse::query::parse_query;
use crate::parse::{ExtractSpan, Pairs, Rule, SourceSpan}; use crate::parse::{ExtractSpan, Pairs, Rule, SourceSpan};
use crate::runtime::relation::AccessLevel; use crate::runtime::relation::AccessLevel;
@ -45,8 +44,8 @@ pub(crate) fn parse_sys(
mut src: Pairs<'_>, mut src: Pairs<'_>,
param_pool: &BTreeMap<String, DataValue>, param_pool: &BTreeMap<String, DataValue>,
algorithms: &BTreeMap<String, Arc<Box<dyn FixedRule>>>, algorithms: &BTreeMap<String, Arc<Box<dyn FixedRule>>>,
cur_vld: ValidityTs
) -> Result<SysOp> { ) -> Result<SysOp> {
let cur_vld = current_validity();
let inner = src.next().unwrap(); let inner = src.next().unwrap();
Ok(match inner.as_rule() { Ok(match inner.as_rule() {
Rule::compact_op => SysOp::Compact, Rule::compact_op => SysOp::Compact,

@ -15,14 +15,14 @@ use miette::{bail, Diagnostic, Result, WrapErr};
use smartstring::SmartString; use smartstring::SmartString;
use thiserror::Error; use thiserror::Error;
use crate::fixed_rule::FixedRuleHandle;
use crate::data::expr::Expr; use crate::data::expr::Expr;
use crate::data::program::{FixedRuleApply, InputInlineRulesOrFixed, InputProgram, RelationOp}; use crate::data::program::{FixedRuleApply, InputInlineRulesOrFixed, InputProgram, RelationOp};
use crate::data::relation::{ColumnDef, NullableColType}; use crate::data::relation::{ColumnDef, NullableColType};
use crate::data::symb::Symbol; use crate::data::symb::Symbol;
use crate::data::tuple::{Tuple, ENCODED_KEY_MIN_LEN}; use crate::data::tuple::{Tuple, ENCODED_KEY_MIN_LEN};
use crate::data::value::DataValue; use crate::data::value::{DataValue, ValidityTs};
use crate::fixed_rule::utilities::constant::Constant; use crate::fixed_rule::utilities::constant::Constant;
use crate::fixed_rule::FixedRuleHandle;
use crate::parse::parse_script; use crate::parse::parse_script;
use crate::runtime::relation::{AccessLevel, InputRelationHandle, InsufficientAccessLevel}; use crate::runtime::relation::{AccessLevel, InputRelationHandle, InsufficientAccessLevel};
use crate::runtime::transact::SessionTx; use crate::runtime::transact::SessionTx;
@ -42,6 +42,7 @@ impl<'a> SessionTx<'a> {
op: RelationOp, op: RelationOp,
meta: &InputRelationHandle, meta: &InputRelationHandle,
headers: &[Symbol], headers: &[Symbol],
cur_vld: ValidityTs,
) -> Result<Vec<(Vec<u8>, Vec<u8>)>> { ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
let mut to_clear = vec![]; let mut to_clear = vec![];
let mut replaced_old_triggers = None; let mut replaced_old_triggers = None;
@ -58,10 +59,11 @@ impl<'a> SessionTx<'a> {
replaced_old_triggers = Some((old_handle.put_triggers, old_handle.rm_triggers)) replaced_old_triggers = Some((old_handle.put_triggers, old_handle.rm_triggers))
} }
for trigger in &old_handle.replace_triggers { for trigger in &old_handle.replace_triggers {
let program = parse_script(trigger, &Default::default(), &db.algorithms)? let program =
.get_single_program()?; parse_script(trigger, &Default::default(), &db.algorithms, cur_vld)?
.get_single_program()?;
let (_, cleanups) = db.run_query(self, program).map_err(|err| { let (_, cleanups) = db.run_query(self, program, cur_vld).map_err(|err| {
if err.source_code().is_some() { if err.source_code().is_some() {
err err
} else { } else {
@ -114,7 +116,7 @@ impl<'a> SessionTx<'a> {
for tuple in res_iter { for tuple in res_iter {
let extracted = key_extractors let extracted = key_extractors
.iter() .iter()
.map(|ex| ex.extract_data(&tuple)) .map(|ex| ex.extract_data(&tuple, cur_vld))
.try_collect()?; .try_collect()?;
let key = relation_store.encode_key_for_store(&extracted, *span)?; let key = relation_store.encode_key_for_store(&extracted, *span)?;
if has_triggers { if has_triggers {
@ -138,7 +140,7 @@ impl<'a> SessionTx<'a> {
if has_triggers && !new_tuples.is_empty() { if has_triggers && !new_tuples.is_empty() {
for trigger in &relation_store.rm_triggers { for trigger in &relation_store.rm_triggers {
let mut program = let mut program =
parse_script(trigger, &Default::default(), &db.algorithms)? parse_script(trigger, &Default::default(), &db.algorithms, cur_vld)?
.get_single_program()?; .get_single_program()?;
let mut bindings = relation_store let mut bindings = relation_store
@ -159,13 +161,14 @@ impl<'a> SessionTx<'a> {
make_const_rule(&mut program, "_old", bindings, old_tuples.clone()); make_const_rule(&mut program, "_old", bindings, old_tuples.clone());
let (_, cleanups) = db.run_query(self, program).map_err(|err| { let (_, cleanups) =
if err.source_code().is_some() { db.run_query(self, program, cur_vld).map_err(|err| {
err if err.source_code().is_some() {
} else { err
err.with_source_code(trigger.to_string()) } else {
} err.with_source_code(trigger.to_string())
})?; }
})?;
to_clear.extend(cleanups); to_clear.extend(cleanups);
} }
} }
@ -197,7 +200,7 @@ impl<'a> SessionTx<'a> {
for tuple in res_iter { for tuple in res_iter {
let extracted = key_extractors let extracted = key_extractors
.iter() .iter()
.map(|ex| ex.extract_data(&tuple)) .map(|ex| ex.extract_data(&tuple, cur_vld))
.try_collect()?; .try_collect()?;
let key = relation_store.encode_key_for_store(&extracted, *span)?; let key = relation_store.encode_key_for_store(&extracted, *span)?;
@ -244,7 +247,7 @@ impl<'a> SessionTx<'a> {
for tuple in res_iter { for tuple in res_iter {
let extracted = key_extractors let extracted = key_extractors
.iter() .iter()
.map(|ex| ex.extract_data(&tuple)) .map(|ex| ex.extract_data(&tuple, cur_vld))
.try_collect()?; .try_collect()?;
let key = relation_store.encode_key_for_store(&extracted, *span)?; let key = relation_store.encode_key_for_store(&extracted, *span)?;
if self.store_tx.exists(&key, true)? { if self.store_tx.exists(&key, true)? {
@ -287,7 +290,7 @@ impl<'a> SessionTx<'a> {
for tuple in res_iter { for tuple in res_iter {
let extracted = key_extractors let extracted = key_extractors
.iter() .iter()
.map(|ex| ex.extract_data(&tuple)) .map(|ex| ex.extract_data(&tuple, cur_vld))
.try_collect()?; .try_collect()?;
let key = relation_store.encode_key_for_store(&extracted, *span)?; let key = relation_store.encode_key_for_store(&extracted, *span)?;
@ -314,7 +317,7 @@ impl<'a> SessionTx<'a> {
if has_triggers && !new_tuples.is_empty() { if has_triggers && !new_tuples.is_empty() {
for trigger in &relation_store.put_triggers { for trigger in &relation_store.put_triggers {
let mut program = let mut program =
parse_script(trigger, &Default::default(), &db.algorithms)? parse_script(trigger, &Default::default(), &db.algorithms, cur_vld)?
.get_single_program()?; .get_single_program()?;
let mut bindings = relation_store let mut bindings = relation_store
@ -333,13 +336,14 @@ impl<'a> SessionTx<'a> {
make_const_rule(&mut program, "_new", bindings.clone(), new_tuples.clone()); make_const_rule(&mut program, "_new", bindings.clone(), new_tuples.clone());
make_const_rule(&mut program, "_old", bindings, old_tuples.clone()); make_const_rule(&mut program, "_old", bindings, old_tuples.clone());
let (_, cleanups) = db.run_query(self, program).map_err(|err| { let (_, cleanups) =
if err.source_code().is_some() { db.run_query(self, program, cur_vld).map_err(|err| {
err if err.source_code().is_some() {
} else { err
err.with_source_code(trigger.to_string()) } else {
} err.with_source_code(trigger.to_string())
})?; }
})?;
to_clear.extend(cleanups); to_clear.extend(cleanups);
} }
} }
@ -364,13 +368,13 @@ enum DataExtractor {
} }
impl DataExtractor { impl DataExtractor {
fn extract_data(&self, tuple: &Tuple) -> Result<DataValue> { fn extract_data(&self, tuple: &Tuple, cur_vld: ValidityTs) -> Result<DataValue> {
Ok(match self { Ok(match self {
DataExtractor::DefaultExtractor(expr, typ) => typ DataExtractor::DefaultExtractor(expr, typ) => typ
.coerce(expr.clone().eval_to_const()?) .coerce(expr.clone().eval_to_const()?, cur_vld)
.wrap_err_with(|| format!("when processing tuple {:?}", tuple))?, .wrap_err_with(|| format!("when processing tuple {:?}", tuple))?,
DataExtractor::IndexExtractor(i, typ) => typ DataExtractor::IndexExtractor(i, typ) => typ
.coerce(tuple[*i].clone()) .coerce(tuple[*i].clone(), cur_vld)
.wrap_err_with(|| format!("when processing tuple {:?}", tuple))?, .wrap_err_with(|| format!("when processing tuple {:?}", tuple))?,
}) })
} }

@ -25,16 +25,20 @@ use serde_json::json;
use smartstring::SmartString; use smartstring::SmartString;
use thiserror::Error; use thiserror::Error;
use crate::data::functions::current_validity;
use crate::data::json::JsonValue; use crate::data::json::JsonValue;
use crate::data::program::{InputProgram, QueryAssertion, RelationOp}; use crate::data::program::{InputProgram, QueryAssertion, RelationOp};
use crate::data::relation::ColumnDef; use crate::data::relation::ColumnDef;
use crate::data::tuple::{Tuple, TupleT}; use crate::data::tuple::{Tuple, TupleT};
use crate::data::value::{DataValue, LARGEST_UTF_CHAR}; use crate::data::value::{DataValue, ValidityTs, LARGEST_UTF_CHAR};
use crate::fixed_rule::DEFAULT_FIXED_RULES; use crate::fixed_rule::DEFAULT_FIXED_RULES;
use crate::parse::sys::SysOp; use crate::parse::sys::SysOp;
use crate::parse::{parse_script, CozoScript, SourceSpan}; use crate::parse::{parse_script, CozoScript, SourceSpan};
use crate::query::compile::{CompiledProgram, CompiledRule, CompiledRuleSet}; use crate::query::compile::{CompiledProgram, CompiledRule, CompiledRuleSet};
use crate::query::ra::{FilteredRA, InnerJoin, NegJoin, RelAlgebra, ReorderRA, StoredRA, StoredWithValidityRA, TempStoreRA, UnificationRA}; use crate::query::ra::{
FilteredRA, InnerJoin, NegJoin, RelAlgebra, ReorderRA, StoredRA, StoredWithValidityRA,
TempStoreRA, UnificationRA,
};
use crate::runtime::relation::{AccessLevel, InsufficientAccessLevel, RelationHandle, RelationId}; use crate::runtime::relation::{AccessLevel, InsufficientAccessLevel, RelationHandle, RelationId};
use crate::runtime::transact::SessionTx; use crate::runtime::transact::SessionTx;
use crate::storage::{Storage, StoreTx}; use crate::storage::{Storage, StoreTx};
@ -147,11 +151,12 @@ impl<'s, S: Storage<'s>> Db<S> {
payload: &str, payload: &str,
params: BTreeMap<String, JsonValue>, params: BTreeMap<String, JsonValue>,
) -> Result<NamedRows> { ) -> Result<NamedRows> {
let cur_vld = current_validity();
let params = params let params = params
.into_iter() .into_iter()
.map(|(k, v)| (k, DataValue::from(v))) .map(|(k, v)| (k, DataValue::from(v)))
.collect(); .collect();
self.do_run_script(payload, &params) self.do_run_script(payload, &params, cur_vld)
} }
/// Export relations to JSON data. /// Export relations to JSON data.
/// ///
@ -218,6 +223,8 @@ impl<'s, S: Storage<'s>> Db<S> {
#[diagnostic(code(import::bad_data))] #[diagnostic(code(import::bad_data))]
struct BadDataForRelation(String, JsonValue); struct BadDataForRelation(String, JsonValue);
let cur_vld = current_validity();
let mut tx = self.transact_write()?; let mut tx = self.transact_write()?;
for (relation_op, in_data) in data { for (relation_op, in_data) in data {
@ -292,7 +299,7 @@ impl<'s, S: Storage<'s>> Db<S> {
let v = row let v = row
.get(*i) .get(*i)
.ok_or_else(|| miette!("row too short: {:?}", row))?; .ok_or_else(|| miette!("row too short: {:?}", row))?;
col.typing.coerce(DataValue::from(v)) col.typing.coerce(DataValue::from(v), cur_vld)
}) })
.try_collect()?; .try_collect()?;
let k_store = handle.encode_key_for_store(&keys, Default::default())?; let k_store = handle.encode_key_for_store(&keys, Default::default())?;
@ -305,7 +312,7 @@ impl<'s, S: Storage<'s>> Db<S> {
let v = row let v = row
.get(*i) .get(*i)
.ok_or_else(|| miette!("row too short: {:?}", row))?; .ok_or_else(|| miette!("row too short: {:?}", row))?;
col.typing.coerce(DataValue::from(v)) col.typing.coerce(DataValue::from(v), cur_vld)
}) })
.try_collect()?; .try_collect()?;
let v_store = handle.encode_val_only_for_store(&vals, Default::default())?; let v_store = handle.encode_val_only_for_store(&vals, Default::default())?;
@ -486,8 +493,9 @@ impl<'s, S: Storage<'s>> Db<S> {
&'s self, &'s self,
payload: &str, payload: &str,
param_pool: &BTreeMap<String, DataValue>, param_pool: &BTreeMap<String, DataValue>,
cur_vld: ValidityTs,
) -> Result<NamedRows> { ) -> Result<NamedRows> {
match parse_script(payload, param_pool, &self.algorithms)? { match parse_script(payload, param_pool, &self.algorithms, cur_vld)? {
CozoScript::Multi(ps) => { CozoScript::Multi(ps) => {
let is_write = ps.iter().any(|p| p.out_opts.store_relation.is_some()); let is_write = ps.iter().any(|p| p.out_opts.store_relation.is_some());
let mut cleanups = vec![]; let mut cleanups = vec![];
@ -505,7 +513,7 @@ impl<'s, S: Storage<'s>> Db<S> {
for p in ps { for p in ps {
#[allow(unused_variables)] #[allow(unused_variables)]
let sleep_opt = p.out_opts.sleep; let sleep_opt = p.out_opts.sleep;
let (q_res, q_cleanups) = self.run_query(&mut tx, p)?; let (q_res, q_cleanups) = self.run_query(&mut tx, p, cur_vld)?;
res = q_res; res = q_res;
cleanups.extend(q_cleanups); cleanups.extend(q_cleanups);
#[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))] #[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))]
@ -611,8 +619,10 @@ impl<'s, S: Storage<'s>> Db<S> {
json!(filters.iter().map(|f| f.to_string()).collect_vec()), json!(filters.iter().map(|f| f.to_string()).collect_vec()),
), ),
RelAlgebra::StoredWithValidity(StoredWithValidityRA { RelAlgebra::StoredWithValidity(StoredWithValidityRA {
storage, filters, .. storage,
}) => ( filters,
..
}) => (
"load_stored_with_validity", "load_stored_with_validity",
json!(format!(":{}", storage.name)), json!(format!(":{}", storage.name)),
json!(null), json!(null),
@ -827,6 +837,7 @@ impl<'s, S: Storage<'s>> Db<S> {
&self, &self,
tx: &mut SessionTx<'_>, tx: &mut SessionTx<'_>,
input_program: InputProgram, input_program: InputProgram,
cur_vld: ValidityTs,
) -> Result<(NamedRows, Vec<(Vec<u8>, Vec<u8>)>)> { ) -> Result<(NamedRows, Vec<(Vec<u8>, Vec<u8>)>)> {
// cleanups contain stored relations that should be deleted at the end of query // cleanups contain stored relations that should be deleted at the end of query
let mut clean_ups = vec![]; let mut clean_ups = vec![];
@ -968,6 +979,7 @@ impl<'s, S: Storage<'s>> Db<S> {
*relation_op, *relation_op,
meta, meta,
&entry_head_or_default, &entry_head_or_default,
cur_vld,
) )
.wrap_err_with(|| format!("when executing against relation '{}'", meta.name))?; .wrap_err_with(|| format!("when executing against relation '{}'", meta.name))?;
clean_ups.extend(to_clear); clean_ups.extend(to_clear);
@ -1017,7 +1029,14 @@ impl<'s, S: Storage<'s>> Db<S> {
if let Some((meta, relation_op)) = &out_opts.store_relation { if let Some((meta, relation_op)) = &out_opts.store_relation {
let to_clear = tx let to_clear = tx
.execute_relation(self, scan, *relation_op, meta, &entry_head_or_default) .execute_relation(
self,
scan,
*relation_op,
meta,
&entry_head_or_default,
cur_vld,
)
.wrap_err_with(|| format!("when executing against relation '{}'", meta.name))?; .wrap_err_with(|| format!("when executing against relation '{}'", meta.name))?;
clean_ups.extend(to_clear); clean_ups.extend(to_clear);
Ok(( Ok((

Loading…
Cancel
Save