From 0cc050b8be99129be9276dc180b4c3cf40695bee Mon Sep 17 00:00:00 2001 From: Ziyang Hu Date: Thu, 22 Dec 2022 20:46:36 +0800 Subject: [PATCH] validity inputs --- cozo-core/src/data/functions.rs | 31 ++++++----- cozo-core/src/data/relation.rs | 65 ++++++++++++++++++++--- cozo-core/src/fixed_rule/utilities/csv.rs | 4 +- cozo-core/src/parse/mod.rs | 7 ++- cozo-core/src/parse/sys.rs | 5 +- cozo-core/src/query/stored.rs | 60 +++++++++++---------- cozo-core/src/runtime/db.rs | 39 ++++++++++---- 7 files changed, 141 insertions(+), 70 deletions(-) diff --git a/cozo-core/src/data/functions.rs b/cozo-core/src/data/functions.rs index 5cbd19ab..46720c0b 100644 --- a/cozo-core/src/data/functions.rs +++ b/cozo-core/src/data/functions.rs @@ -1479,14 +1479,14 @@ pub(crate) fn op_now(_args: &[DataValue]) -> Result { pub(crate) fn current_validity() -> ValidityTs { #[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))] - let ts_millis = { + let ts_micros = { let now = SystemTime::now(); - (now.duration_since(UNIX_EPOCH).unwrap().as_secs_f64() * 1000.) as i64 + now.duration_since(UNIX_EPOCH).unwrap().as_micros() as i64 }; #[cfg(all(target_arch = "wasm32", target_os = "unknown"))] - let ts_millis = { Date::now() as i64 }; + let ts_micros = { Date::now() * 1000 as i64 }; - ValidityTs(Reverse(ts_millis)) + ValidityTs(Reverse(ts_micros)) } pub(crate) const MAX_VALIDITY_TS: ValidityTs = ValidityTs(Reverse(i64::MAX)); @@ -1497,20 +1497,19 @@ pub(crate) const TERMINAL_VALIDITY: Validity = Validity { define_op!(OP_FORMAT_TIMESTAMP, 1, true); pub(crate) fn op_format_timestamp(args: &[DataValue]) -> Result { - #[cfg(all(target_arch = "wasm32", target_os = "unknown"))] - let dt = Utc - .timestamp_millis_opt(Date::now() as i64) - .latest() - .ok_or_else(|| miette!("bad time input"))?; - #[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))] let dt = { - let f = args[0] - .get_float() - .ok_or_else(|| miette!("'format_timestamp' expects a number"))?; - let millis = (f * 1000.) as i64; + let millis = match &args[0] { + DataValue::Validity(vld) => vld.timestamp.0 .0 / 1000, + v => { + let f = v + .get_float() + .ok_or_else(|| miette!("'format_timestamp' expects a number"))?; + (f * 1000.) as i64 + } + }; Utc.timestamp_millis_opt(millis) .latest() - .ok_or_else(|| miette!("bad time: {}", f))? + .ok_or_else(|| miette!("bad time: {}", &args[0]))? }; match args.get(1) { Some(tz_v) => { @@ -1545,7 +1544,7 @@ pub(crate) fn op_parse_timestamp(args: &[DataValue]) -> Result { pub(crate) fn str2vld(s: &str) -> Result { let dt = DateTime::parse_from_rfc3339(s).map_err(|_| miette!("bad datetime: {}", s))?; let st: SystemTime = dt.into(); - let microseconds = st.duration_since(UNIX_EPOCH).unwrap().as_secs_f64() * 1_000_000.; + let microseconds = st.duration_since(UNIX_EPOCH).unwrap().as_micros(); Ok(ValidityTs(Reverse(microseconds as i64))) } diff --git a/cozo-core/src/data/relation.rs b/cozo-core/src/data/relation.rs index d63e8447..151a4520 100644 --- a/cozo-core/src/data/relation.rs +++ b/cozo-core/src/data/relation.rs @@ -6,7 +6,10 @@ * You can obtain one at https://mozilla.org/MPL/2.0/. */ +use chrono::DateTime; +use std::cmp::Reverse; use std::fmt::{Display, Formatter}; +use std::time::{SystemTime, UNIX_EPOCH}; use itertools::Itertools; use miette::{bail, ensure, Diagnostic, Result}; @@ -14,7 +17,7 @@ use smartstring::{LazyCompact, SmartString}; use thiserror::Error; use crate::data::expr::Expr; -use crate::data::value::{DataValue, UuidWrapper}; +use crate::data::value::{DataValue, UuidWrapper, Validity, ValidityTs}; #[derive(Debug, Clone, Eq, PartialEq, serde_derive::Deserialize, serde_derive::Serialize)] pub(crate) struct NullableColType { @@ -74,7 +77,7 @@ pub(crate) enum ColType { len: Option, }, Tuple(Vec), - Validity + Validity, } #[derive(Debug, Clone, Eq, PartialEq, serde_derive::Deserialize, serde_derive::Serialize)] @@ -140,7 +143,7 @@ impl StoredRelationMetadata { } impl NullableColType { - pub(crate) fn coerce(&self, data: DataValue) -> Result { + pub(crate) fn coerce(&self, data: DataValue, cur_vld: ValidityTs) -> Result { if matches!(data, DataValue::Null) { return if self.nullable { Ok(data) @@ -196,7 +199,11 @@ impl NullableColType { if let Some(expected) = len { ensure!(*expected == l.len(), BadListLength(self.clone(), l.len())) } - DataValue::List(l.into_iter().map(|el| eltype.coerce(el)).try_collect()?) + DataValue::List( + l.into_iter() + .map(|el| eltype.coerce(el, cur_vld)) + .try_collect()?, + ) } else { bail!(make_err()) } @@ -207,7 +214,7 @@ impl NullableColType { DataValue::List( l.into_iter() .zip(typ.iter()) - .map(|(el, t)| t.coerce(el)) + .map(|(el, t)| t.coerce(el, cur_vld)) .try_collect()?, ) } else { @@ -215,9 +222,53 @@ impl NullableColType { } } ColType::Validity => { + #[derive(Debug, Error, Diagnostic)] + #[error("{0} cannot be coerced into validity")] + #[diagnostic(code(eval::invalid_validity))] + struct InvalidValidity(DataValue); + match data { - vld@DataValue::Validity(_) => vld, - _ => todo!() + vld @ DataValue::Validity(_) => vld, + DataValue::Str(s) => match &s as &str { + "ASSERT" => DataValue::Validity(Validity { + timestamp: cur_vld, + is_assert: true, + }), + "RETRACT" => DataValue::Validity(Validity { + timestamp: cur_vld, + is_assert: false, + }), + s => { + let (is_assert, ts_str) = match s.strip_prefix('~') { + None => (true, s), + Some(remaining) => (false, remaining), + }; + let dt = DateTime::parse_from_rfc3339(ts_str) + .map_err(|_| InvalidValidity(DataValue::Str(s.into())))?; + let st: SystemTime = dt.into(); + let microseconds = + st.duration_since(UNIX_EPOCH).unwrap().as_micros() as i64; + + DataValue::Validity(Validity { + timestamp: ValidityTs(Reverse(microseconds)), + is_assert, + }) + } + }, + DataValue::List(l) => { + if l.len() == 2 { + let o_ts = l[0].get_int(); + let o_is_assert = l[1].get_bool(); + if let (Some(ts), Some(is_assert)) = (o_ts, o_is_assert) { + return Ok(DataValue::Validity(Validity { + timestamp: ValidityTs(Reverse(ts)), + is_assert, + })); + } + } + bail!(InvalidValidity(DataValue::List(l))) + } + v => bail!(InvalidValidity(v)), } } }) diff --git a/cozo-core/src/fixed_rule/utilities/csv.rs b/cozo-core/src/fixed_rule/utilities/csv.rs index a07e85e2..6282a310 100644 --- a/cozo-core/src/fixed_rule/utilities/csv.rs +++ b/cozo-core/src/fixed_rule/utilities/csv.rs @@ -16,7 +16,7 @@ use smartstring::{LazyCompact, SmartString}; use crate::fixed_rule::utilities::jlines::get_file_content_from_url; use crate::fixed_rule::{FixedRule, FixedRulePayload, CannotDetermineArity}; use crate::data::expr::Expr; -use crate::data::functions::{op_to_float, op_to_uuid}; +use crate::data::functions::{op_to_float, op_to_uuid, TERMINAL_VALIDITY}; use crate::data::program::{FixedRuleOptionNotFoundError, WrongFixedRuleOptionError}; use crate::data::relation::{ColType, NullableColType}; use crate::data::symb::Symbol; @@ -59,7 +59,7 @@ impl FixedRule for CsvReader { }, nullable: false, }; - let types_opts = typing.coerce(types_opts)?; + let types_opts = typing.coerce(types_opts, TERMINAL_VALIDITY.timestamp)?; let mut types = vec![]; for type_str in types_opts.get_list().unwrap() { let type_str = type_str.get_string().unwrap(); diff --git a/cozo-core/src/parse/mod.rs b/cozo-core/src/parse/mod.rs index 12358907..12883c04 100644 --- a/cozo-core/src/parse/mod.rs +++ b/cozo-core/src/parse/mod.rs @@ -14,11 +14,10 @@ use miette::{bail, ensure, Diagnostic, IntoDiagnostic, Result}; use pest::error::InputLocation; use pest::Parser; use thiserror::Error; -use crate::data::functions::current_validity; use crate::data::program::InputProgram; use crate::data::relation::NullableColType; -use crate::data::value::{DataValue}; +use crate::data::value::{DataValue, ValidityTs}; use crate::parse::query::parse_query; use crate::parse::schema::parse_nullable_type; use crate::parse::sys::{parse_sys, SysOp}; @@ -108,6 +107,7 @@ pub(crate) fn parse_script( src: &str, param_pool: &BTreeMap, algorithms: &BTreeMap>>, + cur_vld: ValidityTs ) -> Result { let parsed = CozoScriptParser::parse(Rule::script, src) .map_err(|err| { @@ -119,7 +119,6 @@ pub(crate) fn parse_script( })? .next() .unwrap(); - let cur_vld = current_validity(); Ok(match parsed.as_rule() { Rule::query_script => { let q = parse_query(parsed.into_inner(), param_pool, algorithms, cur_vld)?; @@ -135,7 +134,7 @@ pub(crate) fn parse_script( CozoScript::Multi(qs) } Rule::sys_script => { - CozoScript::Sys(parse_sys(parsed.into_inner(), param_pool, algorithms)?) + CozoScript::Sys(parse_sys(parsed.into_inner(), param_pool, algorithms, cur_vld)?) } _ => unreachable!(), }) diff --git a/cozo-core/src/parse/sys.rs b/cozo-core/src/parse/sys.rs index a713de36..4855c7b1 100644 --- a/cozo-core/src/parse/sys.rs +++ b/cozo-core/src/parse/sys.rs @@ -12,11 +12,10 @@ use std::sync::Arc; use itertools::Itertools; use miette::{Diagnostic, Result}; use thiserror::Error; -use crate::data::functions::{current_validity}; use crate::data::program::InputProgram; use crate::data::symb::Symbol; -use crate::data::value::DataValue; +use crate::data::value::{DataValue, ValidityTs}; use crate::parse::query::parse_query; use crate::parse::{ExtractSpan, Pairs, Rule, SourceSpan}; use crate::runtime::relation::AccessLevel; @@ -45,8 +44,8 @@ pub(crate) fn parse_sys( mut src: Pairs<'_>, param_pool: &BTreeMap, algorithms: &BTreeMap>>, + cur_vld: ValidityTs ) -> Result { - let cur_vld = current_validity(); let inner = src.next().unwrap(); Ok(match inner.as_rule() { Rule::compact_op => SysOp::Compact, diff --git a/cozo-core/src/query/stored.rs b/cozo-core/src/query/stored.rs index 84f930be..1f523e1d 100644 --- a/cozo-core/src/query/stored.rs +++ b/cozo-core/src/query/stored.rs @@ -15,14 +15,14 @@ use miette::{bail, Diagnostic, Result, WrapErr}; use smartstring::SmartString; use thiserror::Error; -use crate::fixed_rule::FixedRuleHandle; use crate::data::expr::Expr; use crate::data::program::{FixedRuleApply, InputInlineRulesOrFixed, InputProgram, RelationOp}; use crate::data::relation::{ColumnDef, NullableColType}; use crate::data::symb::Symbol; use crate::data::tuple::{Tuple, ENCODED_KEY_MIN_LEN}; -use crate::data::value::DataValue; +use crate::data::value::{DataValue, ValidityTs}; use crate::fixed_rule::utilities::constant::Constant; +use crate::fixed_rule::FixedRuleHandle; use crate::parse::parse_script; use crate::runtime::relation::{AccessLevel, InputRelationHandle, InsufficientAccessLevel}; use crate::runtime::transact::SessionTx; @@ -42,6 +42,7 @@ impl<'a> SessionTx<'a> { op: RelationOp, meta: &InputRelationHandle, headers: &[Symbol], + cur_vld: ValidityTs, ) -> Result, Vec)>> { let mut to_clear = vec![]; let mut replaced_old_triggers = None; @@ -58,10 +59,11 @@ impl<'a> SessionTx<'a> { replaced_old_triggers = Some((old_handle.put_triggers, old_handle.rm_triggers)) } for trigger in &old_handle.replace_triggers { - let program = parse_script(trigger, &Default::default(), &db.algorithms)? - .get_single_program()?; + let program = + parse_script(trigger, &Default::default(), &db.algorithms, cur_vld)? + .get_single_program()?; - let (_, cleanups) = db.run_query(self, program).map_err(|err| { + let (_, cleanups) = db.run_query(self, program, cur_vld).map_err(|err| { if err.source_code().is_some() { err } else { @@ -114,7 +116,7 @@ impl<'a> SessionTx<'a> { for tuple in res_iter { let extracted = key_extractors .iter() - .map(|ex| ex.extract_data(&tuple)) + .map(|ex| ex.extract_data(&tuple, cur_vld)) .try_collect()?; let key = relation_store.encode_key_for_store(&extracted, *span)?; if has_triggers { @@ -138,7 +140,7 @@ impl<'a> SessionTx<'a> { if has_triggers && !new_tuples.is_empty() { for trigger in &relation_store.rm_triggers { let mut program = - parse_script(trigger, &Default::default(), &db.algorithms)? + parse_script(trigger, &Default::default(), &db.algorithms, cur_vld)? .get_single_program()?; let mut bindings = relation_store @@ -159,13 +161,14 @@ impl<'a> SessionTx<'a> { make_const_rule(&mut program, "_old", bindings, old_tuples.clone()); - let (_, cleanups) = db.run_query(self, program).map_err(|err| { - if err.source_code().is_some() { - err - } else { - err.with_source_code(trigger.to_string()) - } - })?; + let (_, cleanups) = + db.run_query(self, program, cur_vld).map_err(|err| { + if err.source_code().is_some() { + err + } else { + err.with_source_code(trigger.to_string()) + } + })?; to_clear.extend(cleanups); } } @@ -197,7 +200,7 @@ impl<'a> SessionTx<'a> { for tuple in res_iter { let extracted = key_extractors .iter() - .map(|ex| ex.extract_data(&tuple)) + .map(|ex| ex.extract_data(&tuple, cur_vld)) .try_collect()?; let key = relation_store.encode_key_for_store(&extracted, *span)?; @@ -244,7 +247,7 @@ impl<'a> SessionTx<'a> { for tuple in res_iter { let extracted = key_extractors .iter() - .map(|ex| ex.extract_data(&tuple)) + .map(|ex| ex.extract_data(&tuple, cur_vld)) .try_collect()?; let key = relation_store.encode_key_for_store(&extracted, *span)?; if self.store_tx.exists(&key, true)? { @@ -287,7 +290,7 @@ impl<'a> SessionTx<'a> { for tuple in res_iter { let extracted = key_extractors .iter() - .map(|ex| ex.extract_data(&tuple)) + .map(|ex| ex.extract_data(&tuple, cur_vld)) .try_collect()?; let key = relation_store.encode_key_for_store(&extracted, *span)?; @@ -314,7 +317,7 @@ impl<'a> SessionTx<'a> { if has_triggers && !new_tuples.is_empty() { for trigger in &relation_store.put_triggers { let mut program = - parse_script(trigger, &Default::default(), &db.algorithms)? + parse_script(trigger, &Default::default(), &db.algorithms, cur_vld)? .get_single_program()?; let mut bindings = relation_store @@ -333,13 +336,14 @@ impl<'a> SessionTx<'a> { make_const_rule(&mut program, "_new", bindings.clone(), new_tuples.clone()); make_const_rule(&mut program, "_old", bindings, old_tuples.clone()); - let (_, cleanups) = db.run_query(self, program).map_err(|err| { - if err.source_code().is_some() { - err - } else { - err.with_source_code(trigger.to_string()) - } - })?; + let (_, cleanups) = + db.run_query(self, program, cur_vld).map_err(|err| { + if err.source_code().is_some() { + err + } else { + err.with_source_code(trigger.to_string()) + } + })?; to_clear.extend(cleanups); } } @@ -364,13 +368,13 @@ enum DataExtractor { } impl DataExtractor { - fn extract_data(&self, tuple: &Tuple) -> Result { + fn extract_data(&self, tuple: &Tuple, cur_vld: ValidityTs) -> Result { Ok(match self { DataExtractor::DefaultExtractor(expr, typ) => typ - .coerce(expr.clone().eval_to_const()?) + .coerce(expr.clone().eval_to_const()?, cur_vld) .wrap_err_with(|| format!("when processing tuple {:?}", tuple))?, DataExtractor::IndexExtractor(i, typ) => typ - .coerce(tuple[*i].clone()) + .coerce(tuple[*i].clone(), cur_vld) .wrap_err_with(|| format!("when processing tuple {:?}", tuple))?, }) } diff --git a/cozo-core/src/runtime/db.rs b/cozo-core/src/runtime/db.rs index b8018181..a4529a08 100644 --- a/cozo-core/src/runtime/db.rs +++ b/cozo-core/src/runtime/db.rs @@ -25,16 +25,20 @@ use serde_json::json; use smartstring::SmartString; use thiserror::Error; +use crate::data::functions::current_validity; use crate::data::json::JsonValue; use crate::data::program::{InputProgram, QueryAssertion, RelationOp}; use crate::data::relation::ColumnDef; use crate::data::tuple::{Tuple, TupleT}; -use crate::data::value::{DataValue, LARGEST_UTF_CHAR}; +use crate::data::value::{DataValue, ValidityTs, LARGEST_UTF_CHAR}; use crate::fixed_rule::DEFAULT_FIXED_RULES; use crate::parse::sys::SysOp; use crate::parse::{parse_script, CozoScript, SourceSpan}; use crate::query::compile::{CompiledProgram, CompiledRule, CompiledRuleSet}; -use crate::query::ra::{FilteredRA, InnerJoin, NegJoin, RelAlgebra, ReorderRA, StoredRA, StoredWithValidityRA, TempStoreRA, UnificationRA}; +use crate::query::ra::{ + FilteredRA, InnerJoin, NegJoin, RelAlgebra, ReorderRA, StoredRA, StoredWithValidityRA, + TempStoreRA, UnificationRA, +}; use crate::runtime::relation::{AccessLevel, InsufficientAccessLevel, RelationHandle, RelationId}; use crate::runtime::transact::SessionTx; use crate::storage::{Storage, StoreTx}; @@ -147,11 +151,12 @@ impl<'s, S: Storage<'s>> Db { payload: &str, params: BTreeMap, ) -> Result { + let cur_vld = current_validity(); let params = params .into_iter() .map(|(k, v)| (k, DataValue::from(v))) .collect(); - self.do_run_script(payload, ¶ms) + self.do_run_script(payload, ¶ms, cur_vld) } /// Export relations to JSON data. /// @@ -218,6 +223,8 @@ impl<'s, S: Storage<'s>> Db { #[diagnostic(code(import::bad_data))] struct BadDataForRelation(String, JsonValue); + let cur_vld = current_validity(); + let mut tx = self.transact_write()?; for (relation_op, in_data) in data { @@ -292,7 +299,7 @@ impl<'s, S: Storage<'s>> Db { let v = row .get(*i) .ok_or_else(|| miette!("row too short: {:?}", row))?; - col.typing.coerce(DataValue::from(v)) + col.typing.coerce(DataValue::from(v), cur_vld) }) .try_collect()?; let k_store = handle.encode_key_for_store(&keys, Default::default())?; @@ -305,7 +312,7 @@ impl<'s, S: Storage<'s>> Db { let v = row .get(*i) .ok_or_else(|| miette!("row too short: {:?}", row))?; - col.typing.coerce(DataValue::from(v)) + col.typing.coerce(DataValue::from(v), cur_vld) }) .try_collect()?; let v_store = handle.encode_val_only_for_store(&vals, Default::default())?; @@ -486,8 +493,9 @@ impl<'s, S: Storage<'s>> Db { &'s self, payload: &str, param_pool: &BTreeMap, + cur_vld: ValidityTs, ) -> Result { - match parse_script(payload, param_pool, &self.algorithms)? { + match parse_script(payload, param_pool, &self.algorithms, cur_vld)? { CozoScript::Multi(ps) => { let is_write = ps.iter().any(|p| p.out_opts.store_relation.is_some()); let mut cleanups = vec![]; @@ -505,7 +513,7 @@ impl<'s, S: Storage<'s>> Db { for p in ps { #[allow(unused_variables)] let sleep_opt = p.out_opts.sleep; - let (q_res, q_cleanups) = self.run_query(&mut tx, p)?; + let (q_res, q_cleanups) = self.run_query(&mut tx, p, cur_vld)?; res = q_res; cleanups.extend(q_cleanups); #[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))] @@ -611,8 +619,10 @@ impl<'s, S: Storage<'s>> Db { json!(filters.iter().map(|f| f.to_string()).collect_vec()), ), RelAlgebra::StoredWithValidity(StoredWithValidityRA { - storage, filters, .. - }) => ( + storage, + filters, + .. + }) => ( "load_stored_with_validity", json!(format!(":{}", storage.name)), json!(null), @@ -827,6 +837,7 @@ impl<'s, S: Storage<'s>> Db { &self, tx: &mut SessionTx<'_>, input_program: InputProgram, + cur_vld: ValidityTs, ) -> Result<(NamedRows, Vec<(Vec, Vec)>)> { // cleanups contain stored relations that should be deleted at the end of query let mut clean_ups = vec![]; @@ -968,6 +979,7 @@ impl<'s, S: Storage<'s>> Db { *relation_op, meta, &entry_head_or_default, + cur_vld, ) .wrap_err_with(|| format!("when executing against relation '{}'", meta.name))?; clean_ups.extend(to_clear); @@ -1017,7 +1029,14 @@ impl<'s, S: Storage<'s>> Db { if let Some((meta, relation_op)) = &out_opts.store_relation { let to_clear = tx - .execute_relation(self, scan, *relation_op, meta, &entry_head_or_default) + .execute_relation( + self, + scan, + *relation_op, + meta, + &entry_head_or_default, + cur_vld, + ) .wrap_err_with(|| format!("when executing against relation '{}'", meta.name))?; clean_ups.extend(to_clear); Ok((