|
|
|
@ -15,14 +15,14 @@ use miette::{bail, Diagnostic, Result, WrapErr};
|
|
|
|
|
use smartstring::SmartString;
|
|
|
|
|
use thiserror::Error;
|
|
|
|
|
|
|
|
|
|
use crate::fixed_rule::FixedRuleHandle;
|
|
|
|
|
use crate::data::expr::Expr;
|
|
|
|
|
use crate::data::program::{FixedRuleApply, InputInlineRulesOrFixed, InputProgram, RelationOp};
|
|
|
|
|
use crate::data::relation::{ColumnDef, NullableColType};
|
|
|
|
|
use crate::data::symb::Symbol;
|
|
|
|
|
use crate::data::tuple::{Tuple, ENCODED_KEY_MIN_LEN};
|
|
|
|
|
use crate::data::value::DataValue;
|
|
|
|
|
use crate::data::value::{DataValue, ValidityTs};
|
|
|
|
|
use crate::fixed_rule::utilities::constant::Constant;
|
|
|
|
|
use crate::fixed_rule::FixedRuleHandle;
|
|
|
|
|
use crate::parse::parse_script;
|
|
|
|
|
use crate::runtime::relation::{AccessLevel, InputRelationHandle, InsufficientAccessLevel};
|
|
|
|
|
use crate::runtime::transact::SessionTx;
|
|
|
|
@ -42,6 +42,7 @@ impl<'a> SessionTx<'a> {
|
|
|
|
|
op: RelationOp,
|
|
|
|
|
meta: &InputRelationHandle,
|
|
|
|
|
headers: &[Symbol],
|
|
|
|
|
cur_vld: ValidityTs,
|
|
|
|
|
) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
|
|
|
|
|
let mut to_clear = vec![];
|
|
|
|
|
let mut replaced_old_triggers = None;
|
|
|
|
@ -58,10 +59,11 @@ impl<'a> SessionTx<'a> {
|
|
|
|
|
replaced_old_triggers = Some((old_handle.put_triggers, old_handle.rm_triggers))
|
|
|
|
|
}
|
|
|
|
|
for trigger in &old_handle.replace_triggers {
|
|
|
|
|
let program = parse_script(trigger, &Default::default(), &db.algorithms)?
|
|
|
|
|
.get_single_program()?;
|
|
|
|
|
let program =
|
|
|
|
|
parse_script(trigger, &Default::default(), &db.algorithms, cur_vld)?
|
|
|
|
|
.get_single_program()?;
|
|
|
|
|
|
|
|
|
|
let (_, cleanups) = db.run_query(self, program).map_err(|err| {
|
|
|
|
|
let (_, cleanups) = db.run_query(self, program, cur_vld).map_err(|err| {
|
|
|
|
|
if err.source_code().is_some() {
|
|
|
|
|
err
|
|
|
|
|
} else {
|
|
|
|
@ -114,7 +116,7 @@ impl<'a> SessionTx<'a> {
|
|
|
|
|
for tuple in res_iter {
|
|
|
|
|
let extracted = key_extractors
|
|
|
|
|
.iter()
|
|
|
|
|
.map(|ex| ex.extract_data(&tuple))
|
|
|
|
|
.map(|ex| ex.extract_data(&tuple, cur_vld))
|
|
|
|
|
.try_collect()?;
|
|
|
|
|
let key = relation_store.encode_key_for_store(&extracted, *span)?;
|
|
|
|
|
if has_triggers {
|
|
|
|
@ -138,7 +140,7 @@ impl<'a> SessionTx<'a> {
|
|
|
|
|
if has_triggers && !new_tuples.is_empty() {
|
|
|
|
|
for trigger in &relation_store.rm_triggers {
|
|
|
|
|
let mut program =
|
|
|
|
|
parse_script(trigger, &Default::default(), &db.algorithms)?
|
|
|
|
|
parse_script(trigger, &Default::default(), &db.algorithms, cur_vld)?
|
|
|
|
|
.get_single_program()?;
|
|
|
|
|
|
|
|
|
|
let mut bindings = relation_store
|
|
|
|
@ -159,13 +161,14 @@ impl<'a> SessionTx<'a> {
|
|
|
|
|
|
|
|
|
|
make_const_rule(&mut program, "_old", bindings, old_tuples.clone());
|
|
|
|
|
|
|
|
|
|
let (_, cleanups) = db.run_query(self, program).map_err(|err| {
|
|
|
|
|
if err.source_code().is_some() {
|
|
|
|
|
err
|
|
|
|
|
} else {
|
|
|
|
|
err.with_source_code(trigger.to_string())
|
|
|
|
|
}
|
|
|
|
|
})?;
|
|
|
|
|
let (_, cleanups) =
|
|
|
|
|
db.run_query(self, program, cur_vld).map_err(|err| {
|
|
|
|
|
if err.source_code().is_some() {
|
|
|
|
|
err
|
|
|
|
|
} else {
|
|
|
|
|
err.with_source_code(trigger.to_string())
|
|
|
|
|
}
|
|
|
|
|
})?;
|
|
|
|
|
to_clear.extend(cleanups);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -197,7 +200,7 @@ impl<'a> SessionTx<'a> {
|
|
|
|
|
for tuple in res_iter {
|
|
|
|
|
let extracted = key_extractors
|
|
|
|
|
.iter()
|
|
|
|
|
.map(|ex| ex.extract_data(&tuple))
|
|
|
|
|
.map(|ex| ex.extract_data(&tuple, cur_vld))
|
|
|
|
|
.try_collect()?;
|
|
|
|
|
|
|
|
|
|
let key = relation_store.encode_key_for_store(&extracted, *span)?;
|
|
|
|
@ -244,7 +247,7 @@ impl<'a> SessionTx<'a> {
|
|
|
|
|
for tuple in res_iter {
|
|
|
|
|
let extracted = key_extractors
|
|
|
|
|
.iter()
|
|
|
|
|
.map(|ex| ex.extract_data(&tuple))
|
|
|
|
|
.map(|ex| ex.extract_data(&tuple, cur_vld))
|
|
|
|
|
.try_collect()?;
|
|
|
|
|
let key = relation_store.encode_key_for_store(&extracted, *span)?;
|
|
|
|
|
if self.store_tx.exists(&key, true)? {
|
|
|
|
@ -287,7 +290,7 @@ impl<'a> SessionTx<'a> {
|
|
|
|
|
for tuple in res_iter {
|
|
|
|
|
let extracted = key_extractors
|
|
|
|
|
.iter()
|
|
|
|
|
.map(|ex| ex.extract_data(&tuple))
|
|
|
|
|
.map(|ex| ex.extract_data(&tuple, cur_vld))
|
|
|
|
|
.try_collect()?;
|
|
|
|
|
|
|
|
|
|
let key = relation_store.encode_key_for_store(&extracted, *span)?;
|
|
|
|
@ -314,7 +317,7 @@ impl<'a> SessionTx<'a> {
|
|
|
|
|
if has_triggers && !new_tuples.is_empty() {
|
|
|
|
|
for trigger in &relation_store.put_triggers {
|
|
|
|
|
let mut program =
|
|
|
|
|
parse_script(trigger, &Default::default(), &db.algorithms)?
|
|
|
|
|
parse_script(trigger, &Default::default(), &db.algorithms, cur_vld)?
|
|
|
|
|
.get_single_program()?;
|
|
|
|
|
|
|
|
|
|
let mut bindings = relation_store
|
|
|
|
@ -333,13 +336,14 @@ impl<'a> SessionTx<'a> {
|
|
|
|
|
make_const_rule(&mut program, "_new", bindings.clone(), new_tuples.clone());
|
|
|
|
|
make_const_rule(&mut program, "_old", bindings, old_tuples.clone());
|
|
|
|
|
|
|
|
|
|
let (_, cleanups) = db.run_query(self, program).map_err(|err| {
|
|
|
|
|
if err.source_code().is_some() {
|
|
|
|
|
err
|
|
|
|
|
} else {
|
|
|
|
|
err.with_source_code(trigger.to_string())
|
|
|
|
|
}
|
|
|
|
|
})?;
|
|
|
|
|
let (_, cleanups) =
|
|
|
|
|
db.run_query(self, program, cur_vld).map_err(|err| {
|
|
|
|
|
if err.source_code().is_some() {
|
|
|
|
|
err
|
|
|
|
|
} else {
|
|
|
|
|
err.with_source_code(trigger.to_string())
|
|
|
|
|
}
|
|
|
|
|
})?;
|
|
|
|
|
to_clear.extend(cleanups);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -364,13 +368,13 @@ enum DataExtractor {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl DataExtractor {
|
|
|
|
|
fn extract_data(&self, tuple: &Tuple) -> Result<DataValue> {
|
|
|
|
|
fn extract_data(&self, tuple: &Tuple, cur_vld: ValidityTs) -> Result<DataValue> {
|
|
|
|
|
Ok(match self {
|
|
|
|
|
DataExtractor::DefaultExtractor(expr, typ) => typ
|
|
|
|
|
.coerce(expr.clone().eval_to_const()?)
|
|
|
|
|
.coerce(expr.clone().eval_to_const()?, cur_vld)
|
|
|
|
|
.wrap_err_with(|| format!("when processing tuple {:?}", tuple))?,
|
|
|
|
|
DataExtractor::IndexExtractor(i, typ) => typ
|
|
|
|
|
.coerce(tuple[*i].clone())
|
|
|
|
|
.coerce(tuple[*i].clone(), cur_vld)
|
|
|
|
|
.wrap_err_with(|| format!("when processing tuple {:?}", tuple))?,
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|