Ziyang Hu 1 year ago
parent db29a65c68
commit bf325b76f5

@ -127,13 +127,14 @@ object_pair = {expr ~ ":" ~ expr}
list = { "[" ~ (expr ~ ",")* ~ expr? ~ "]" }
grouping = { "(" ~ expr ~ ")" }
option = _{(limit_option|offset_option|sort_option|relation_option|timeout_option|sleep_option|
option = _{(limit_option|offset_option|sort_option|relation_option|timeout_option|sleep_option|returning_option|
assert_none_option|assert_some_option|disable_magic_rewrite_option) ~ ";"?}
out_arg = @{var ~ ("(" ~ var ~ ")")?}
disable_magic_rewrite_option = {":disable_magic_rewrite" ~ expr}
limit_option = {":limit" ~ expr}
offset_option = {":offset" ~ expr}
sort_option = {(":sort" | ":order") ~ (sort_arg ~ ",")* ~ sort_arg }
returning_option = {":returning"}
relation_option = {relation_op ~ (compound_ident | underscore_ident) ~ table_schema?}
relation_op = _{relation_create | relation_replace | relation_insert | relation_put | relation_update | relation_rm | relation_delete | relation_ensure_not | relation_ensure }
relation_create = {":create"}

@ -40,6 +40,12 @@ pub(crate) enum QueryAssertion {
AssertSome(SourceSpan),
}
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub(crate) enum ReturnMutation {
NotReturning,
Returning,
}
#[derive(Clone, PartialEq, Default)]
pub(crate) struct QueryOutOptions {
pub(crate) limit: Option<usize>,
@ -47,7 +53,7 @@ pub(crate) struct QueryOutOptions {
pub(crate) timeout: Option<f64>,
pub(crate) sleep: Option<f64>,
pub(crate) sorters: Vec<(Symbol, SortDir)>,
pub(crate) store_relation: Option<(InputRelationHandle, RelationOp)>,
pub(crate) store_relation: Option<(InputRelationHandle, RelationOp, ReturnMutation)>,
pub(crate) assertion: Option<QueryAssertion>,
}
@ -84,8 +90,12 @@ impl Display for QueryOutOptions {
..
},
op,
return_mutation,
)) = &self.store_relation
{
if *return_mutation == ReturnMutation::Returning {
write!(f, ":returning\n")?;
}
match op {
RelationOp::Create => {
write!(f, ":create ")?;
@ -546,7 +556,7 @@ pub(crate) struct NoEntryError;
impl InputProgram {
pub(crate) fn needs_write_lock(&self) -> Option<SmartString<LazyCompact>> {
if let Some((h, _)) = &self.out_opts.store_relation {
if let Some((h, _, _)) = &self.out_opts.store_relation {
if !h.name.name.starts_with('_') {
Some(h.name.name.clone())
} else {

@ -22,11 +22,7 @@ use thiserror::Error;
use crate::data::aggr::{parse_aggr, Aggregation};
use crate::data::expr::Expr;
use crate::data::functions::{str2vld, MAX_VALIDITY_TS};
use crate::data::program::{
FixedRuleApply, FixedRuleArg, InputAtom, InputInlineRule, InputInlineRulesOrFixed,
InputNamedFieldRelationApplyAtom, InputProgram, InputRelationApplyAtom, InputRuleApplyAtom,
QueryAssertion, QueryOutOptions, RelationOp, SearchInput, SortDir, Unification,
};
use crate::data::program::{FixedRuleApply, FixedRuleArg, InputAtom, InputInlineRule, InputInlineRulesOrFixed, InputNamedFieldRelationApplyAtom, InputProgram, InputRelationApplyAtom, InputRuleApplyAtom, QueryAssertion, QueryOutOptions, RelationOp, ReturnMutation, SearchInput, SortDir, Unification};
use crate::data::relation::{ColType, ColumnDef, NullableColType, StoredRelationMetadata};
use crate::data::symb::{Symbol, PROG_ENTRY};
use crate::data::value::{DataValue, ValidityTs};
@ -87,7 +83,7 @@ impl Diagnostic for MultipleRuleDefinitionError {
fn code<'a>(&'a self) -> Option<Box<dyn Display + 'a>> {
Some(Box::new("parser::mult_rule_def"))
}
fn labels(&self) -> Option<Box<dyn Iterator<Item = LabeledSpan> + '_>> {
fn labels(&self) -> Option<Box<dyn Iterator<Item=LabeledSpan> + '_>> {
Some(Box::new(
self.1.iter().map(|s| LabeledSpan::new_with_span(None, s)),
))
@ -113,6 +109,7 @@ pub(crate) fn parse_query(
let mut disable_magic_rewrite = false;
let mut stored_relation = None;
let mut returning_mutation = ReturnMutation::NotReturning;
for pair in src {
match pair.as_rule() {
@ -310,6 +307,9 @@ pub(crate) fn parse_query(
out_opts.sorters.push((Symbol::new(var, span), dir));
}
}
Rule::returning_option => {
returning_mutation = ReturnMutation::Returning;
}
Rule::relation_option => {
let span = pair.extract_span();
let mut args = pair.into_inner();
@ -395,6 +395,7 @@ pub(crate) fn parse_query(
..
},
RelationOp::Create,
_
)) = &prog.out_opts.store_relation
{
let mut bindings = key_bindings.clone();
@ -435,13 +436,13 @@ pub(crate) fn parse_query(
dep_bindings: vec![],
span,
};
prog.out_opts.store_relation = Some((handle, op))
prog.out_opts.store_relation = Some((handle, op, returning_mutation))
}
Some(Right(r)) => prog.out_opts.store_relation = Some(r),
Some(Right((h, o))) => prog.out_opts.store_relation = Some((h, o, returning_mutation)),
}
if prog.prog.is_empty() {
if let Some((handle, RelationOp::Create)) = &prog.out_opts.store_relation {
if let Some((handle, RelationOp::Create, _)) = &prog.out_opts.store_relation {
let mut bindings = handle.dep_bindings.clone();
bindings.extend_from_slice(&handle.key_bindings);
make_empty_const_rule(&mut prog, &bindings);

@ -44,7 +44,7 @@ impl<'a> SessionTx<'a> {
pub(crate) fn execute_relation<'s, S: Storage<'s>>(
&mut self,
db: &Db<S>,
res_iter: impl Iterator<Item = Tuple>,
res_iter: impl Iterator<Item=Tuple>,
op: RelationOp,
meta: &InputRelationHandle,
headers: &[Symbol],
@ -52,6 +52,7 @@ impl<'a> SessionTx<'a> {
callback_targets: &BTreeSet<SmartString<LazyCompact>>,
callback_collector: &mut CallbackCollector,
propagate_triggers: bool,
force_collect: &str,
) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
let mut to_clear = vec![];
let mut replaced_old_triggers = None;
@ -141,17 +142,18 @@ impl<'a> SessionTx<'a> {
callback_collector,
propagate_triggers,
&mut to_clear,
&mut relation_store,
&relation_store,
metadata,
key_bindings,
op == RelationOp::Delete,
force_collect,
*span,
)?,
RelationOp::Ensure => self.ensure_in_relation(
res_iter,
headers,
cur_vld,
&mut relation_store,
&relation_store,
metadata,
key_bindings,
*span,
@ -160,7 +162,7 @@ impl<'a> SessionTx<'a> {
res_iter,
headers,
cur_vld,
&mut relation_store,
&relation_store,
metadata,
key_bindings,
*span,
@ -174,9 +176,10 @@ impl<'a> SessionTx<'a> {
callback_collector,
propagate_triggers,
&mut to_clear,
&mut relation_store,
&relation_store,
metadata,
key_bindings,
force_collect,
*span,
)?,
RelationOp::Create | RelationOp::Replace | RelationOp::Put | RelationOp::Insert => self
@ -189,11 +192,12 @@ impl<'a> SessionTx<'a> {
callback_collector,
propagate_triggers,
&mut to_clear,
&mut relation_store,
&relation_store,
metadata,
key_bindings,
dep_bindings,
op == RelationOp::Insert,
force_collect,
*span,
)?,
};
@ -204,21 +208,22 @@ impl<'a> SessionTx<'a> {
fn put_into_relation<'s, S: Storage<'s>>(
&mut self,
db: &Db<S>,
res_iter: impl Iterator<Item = Tuple>,
res_iter: impl Iterator<Item=Tuple>,
headers: &[Symbol],
cur_vld: ValidityTs,
callback_targets: &BTreeSet<SmartString<LazyCompact>>,
callback_collector: &mut CallbackCollector,
propagate_triggers: bool,
to_clear: &mut Vec<(Vec<u8>, Vec<u8>)>,
relation_store: &mut RelationHandle,
relation_store: &RelationHandle,
metadata: &StoredRelationMetadata,
key_bindings: &[Symbol],
dep_bindings: &[Symbol],
is_insert: bool,
force_collect: &str,
span: SourceSpan,
) -> Result<()> {
let is_callback_target = callback_targets.contains(&relation_store.name);
let is_callback_target = callback_targets.contains(&relation_store.name) || force_collect == &relation_store.name;
if relation_store.access_level < AccessLevel::Protected {
bail!(InsufficientAccessLevel(
@ -235,9 +240,9 @@ impl<'a> SessionTx<'a> {
headers,
)?;
let need_to_collect = !relation_store.is_temp
let need_to_collect = !force_collect.is_empty() || (!relation_store.is_temp
&& (is_callback_target
|| (propagate_triggers && !relation_store.put_triggers.is_empty()));
|| (propagate_triggers && !relation_store.put_triggers.is_empty())));
let has_indices = !relation_store.indices.is_empty();
let has_hnsw_indices = !relation_store.hnsw_indices.is_empty();
let has_fts_indices = !relation_store.fts_indices.is_empty();
@ -517,19 +522,20 @@ impl<'a> SessionTx<'a> {
fn update_in_relation<'s, S: Storage<'s>>(
&mut self,
db: &Db<S>,
res_iter: impl Iterator<Item = Tuple>,
res_iter: impl Iterator<Item=Tuple>,
headers: &[Symbol],
cur_vld: ValidityTs,
callback_targets: &BTreeSet<SmartString<LazyCompact>>,
callback_collector: &mut CallbackCollector,
propagate_triggers: bool,
to_clear: &mut Vec<(Vec<u8>, Vec<u8>)>,
relation_store: &mut RelationHandle,
relation_store: &RelationHandle,
metadata: &StoredRelationMetadata,
key_bindings: &[Symbol],
force_collect: &str,
span: SourceSpan,
) -> Result<()> {
let is_callback_target = callback_targets.contains(&relation_store.name);
let is_callback_target = callback_targets.contains(&relation_store.name) || force_collect == &relation_store.name;
if relation_store.access_level < AccessLevel::Protected {
bail!(InsufficientAccessLevel(
@ -546,9 +552,9 @@ impl<'a> SessionTx<'a> {
headers,
)?;
let need_to_collect = !relation_store.is_temp
let need_to_collect = !force_collect.is_empty() || (!relation_store.is_temp
&& (is_callback_target
|| (propagate_triggers && !relation_store.put_triggers.is_empty()));
|| (propagate_triggers && !relation_store.put_triggers.is_empty())));
let has_indices = !relation_store.indices.is_empty();
let has_hnsw_indices = !relation_store.hnsw_indices.is_empty();
let has_fts_indices = !relation_store.fts_indices.is_empty();
@ -791,10 +797,10 @@ impl<'a> SessionTx<'a> {
fn ensure_not_in_relation(
&mut self,
res_iter: impl Iterator<Item = Tuple>,
res_iter: impl Iterator<Item=Tuple>,
headers: &[Symbol],
cur_vld: ValidityTs,
relation_store: &mut RelationHandle,
relation_store: &RelationHandle,
metadata: &StoredRelationMetadata,
key_bindings: &[Symbol],
span: SourceSpan,
@ -838,10 +844,10 @@ impl<'a> SessionTx<'a> {
fn ensure_in_relation(
&mut self,
res_iter: impl Iterator<Item = Tuple>,
res_iter: impl Iterator<Item=Tuple>,
headers: &[Symbol],
cur_vld: ValidityTs,
relation_store: &mut RelationHandle,
relation_store: &RelationHandle,
metadata: &StoredRelationMetadata,
key_bindings: &[Symbol],
span: SourceSpan,
@ -908,20 +914,21 @@ impl<'a> SessionTx<'a> {
fn remove_from_relation<'s, S: Storage<'s>>(
&mut self,
db: &Db<S>,
res_iter: impl Iterator<Item = Tuple>,
res_iter: impl Iterator<Item=Tuple>,
headers: &[Symbol],
cur_vld: ValidityTs,
callback_targets: &BTreeSet<SmartString<LazyCompact>>,
callback_collector: &mut CallbackCollector,
propagate_triggers: bool,
to_clear: &mut Vec<(Vec<u8>, Vec<u8>)>,
relation_store: &mut RelationHandle,
relation_store: &RelationHandle,
metadata: &StoredRelationMetadata,
key_bindings: &[Symbol],
check_exists: bool,
force_collect: &str,
span: SourceSpan,
) -> Result<()> {
let is_callback_target = callback_targets.contains(&relation_store.name);
let is_callback_target = callback_targets.contains(&relation_store.name) || force_collect == relation_store.name;
if relation_store.access_level < AccessLevel::Protected {
bail!(InsufficientAccessLevel(
@ -937,9 +944,9 @@ impl<'a> SessionTx<'a> {
headers,
)?;
let need_to_collect = !relation_store.is_temp
let need_to_collect = !force_collect.is_empty() || (!relation_store.is_temp
&& (is_callback_target
|| (propagate_triggers && !relation_store.rm_triggers.is_empty()));
|| (propagate_triggers && !relation_store.rm_triggers.is_empty())));
let has_indices = !relation_store.indices.is_empty();
let has_hnsw_indices = !relation_store.hnsw_indices.is_empty();
let has_fts_indices = !relation_store.fts_indices.is_empty();

@ -34,7 +34,7 @@ use thiserror::Error;
use crate::data::functions::current_validity;
use crate::data::json::JsonValue;
use crate::data::program::{InputProgram, QueryAssertion, RelationOp};
use crate::data::program::{InputProgram, QueryAssertion, RelationOp, ReturnMutation};
use crate::data::relation::ColumnDef;
use crate::data::tuple::{Tuple, TupleT};
use crate::data::value::{DataValue, ValidityTs, LARGEST_UTF_CHAR};
@ -51,9 +51,7 @@ use crate::query::ra::{
use crate::runtime::callback::{
CallbackCollector, CallbackDeclaration, CallbackOp, EventCallbackRegistry,
};
use crate::runtime::relation::{
extend_tuple_from_v, AccessLevel, InsufficientAccessLevel, RelationHandle, RelationId,
};
use crate::runtime::relation::{extend_tuple_from_v, AccessLevel, InsufficientAccessLevel, RelationHandle, RelationId, InputRelationHandle};
use crate::runtime::transact::SessionTx;
use crate::storage::temp::TempStorage;
use crate::storage::{Storage, StoreTx};
@ -379,7 +377,7 @@ impl<'s, S: Storage<'s>> Db<S> {
pub fn export_relations<I, T>(&'s self, relations: I) -> Result<BTreeMap<String, NamedRows>>
where
T: AsRef<str>,
I: Iterator<Item = T>,
I: Iterator<Item=T>,
{
let tx = self.transact()?;
let mut ret: BTreeMap<String, NamedRows> = BTreeMap::new();
@ -759,7 +757,7 @@ impl<'s, S: Storage<'s>> Db<S> {
ret.is_some()
}
pub(crate) fn obtain_relation_locks<'a, T: Iterator<Item = &'a SmartString<LazyCompact>>>(
pub(crate) fn obtain_relation_locks<'a, T: Iterator<Item=&'a SmartString<LazyCompact>>>(
&'s self,
rels: T,
) -> Vec<Arc<ShardedLock<()>>> {
@ -1353,7 +1351,7 @@ impl<'s, S: Storage<'s>> Db<S> {
let mut clean_ups = vec![];
// Some checks in case the query specifies mutation
if let Some((meta, op)) = &input_program.out_opts.store_relation {
if let Some((meta, op, _)) = &input_program.out_opts.store_relation {
if *op == RelationOp::Create {
#[derive(Debug, Error, Diagnostic)]
#[error("Stored relation {0} conflicts with an existing one")]
@ -1475,7 +1473,7 @@ impl<'s, S: Storage<'s>> Db<S> {
} else {
Right(sorted_iter)
};
if let Some((meta, relation_op)) = &out_opts.store_relation {
if let Some((meta, relation_op, returning)) = &out_opts.store_relation {
let to_clear = tx
.execute_relation(
self,
@ -1487,16 +1485,16 @@ impl<'s, S: Storage<'s>> Db<S> {
callback_targets,
callback_collector,
top_level,
if *returning == ReturnMutation::Returning {
&meta.name.name
} else {
""
},
)
.wrap_err_with(|| format!("when executing against relation '{}'", meta.name))?;
clean_ups.extend(to_clear);
Ok((
NamedRows::new(
vec![STATUS_STR.to_string()],
vec![vec![DataValue::from(OK_STR)]],
),
clean_ups,
))
let returned_rows = Self::get_returning_rows(callback_collector, meta, returning);
Ok((returned_rows, clean_ups))
} else {
// not sorting outputs
let rows: Vec<Tuple> = sorted_iter.collect_vec();
@ -1530,7 +1528,7 @@ impl<'s, S: Storage<'s>> Db<S> {
Left(result_store.all_iter().map(|t| t.into_tuple()))
};
if let Some((meta, relation_op)) = &out_opts.store_relation {
if let Some((meta, relation_op, returning)) = &out_opts.store_relation {
let to_clear = tx
.execute_relation(
self,
@ -1542,16 +1540,17 @@ impl<'s, S: Storage<'s>> Db<S> {
callback_targets,
callback_collector,
top_level,
if *returning == ReturnMutation::Returning {
&meta.name.name
} else {
""
},
)
.wrap_err_with(|| format!("when executing against relation '{}'", meta.name))?;
clean_ups.extend(to_clear);
Ok((
NamedRows::new(
vec![STATUS_STR.to_string()],
vec![vec![DataValue::from(OK_STR)]],
),
clean_ups,
))
let returned_rows = Self::get_returning_rows(callback_collector, meta, returning);
Ok((returned_rows, clean_ups))
} else {
let rows: Vec<Tuple> = scan.collect_vec();
@ -1568,6 +1567,59 @@ impl<'s, S: Storage<'s>> Db<S> {
}
}
}
fn get_returning_rows(callback_collector: &mut CallbackCollector, meta: &InputRelationHandle, returning: &ReturnMutation) -> NamedRows {
let returned_rows = {
match returning {
ReturnMutation::NotReturning => {
NamedRows::new(
vec![STATUS_STR.to_string()],
vec![vec![DataValue::from(OK_STR)]],
)
}
ReturnMutation::Returning => {
let target_len = meta.metadata.keys.len() + meta.metadata.non_keys.len();
let mut returned_rows = Vec::new();
if let Some(collected) = callback_collector.get(&meta.name.name) {
for (kind, insertions, deletions) in collected {
let (pos_key, neg_key) = match kind {
CallbackOp::Put => { ("inserted", "replaced") }
CallbackOp::Rm => { ("requested", "deleted") }
};
for row in &insertions.rows {
let mut v = Vec::with_capacity(target_len + 1);
v.push(DataValue::from(pos_key));
v.extend_from_slice(&row[..target_len]);
while v.len() <= target_len {
v.push(DataValue::Null);
}
returned_rows.push(v);
}
for row in &deletions.rows {
let mut v = Vec::with_capacity(target_len + 1);
v.push(DataValue::from(neg_key));
v.extend_from_slice(&row[..target_len]);
while v.len() <= target_len {
v.push(DataValue::Null);
}
returned_rows.push(v);
}
}
}
let mut header = vec!["_kind".to_string()];
header.extend(meta.metadata.keys
.iter()
.chain(meta.metadata.non_keys.iter())
.map(|s| s.name.to_string()));
NamedRows::new(
header,
returned_rows,
)
}
}
};
returned_rows
}
pub(crate) fn list_running(&self) -> Result<NamedRows> {
let rows = self
.running_queries

@ -1218,6 +1218,44 @@ fn deletion() {
.run_script(r"?[x] <- [[1]] :delete a {x}", Default::default()).unwrap();
}
#[test]
fn returning() {
let db = DbInstance::new("mem", "", "").unwrap();
db.run_script(":create a {x => y}", Default::default())
.unwrap();
let res = db
.run_script(
r"?[x, y] <- [[1, 2]] :insert a {x => y} ",
Default::default(),
)
.unwrap();
for row in res.into_json()["rows"].as_array().unwrap() {
println!("{}", row);
}
let res = db
.run_script(
r"?[x, y] <- [[1, 3], [2, 4]] :returning :put a {x => y} ",
Default::default(),
)
.unwrap();
println!("{:?}", res.headers);
for row in res.into_json()["rows"].as_array().unwrap() {
println!("{}", row);
}
let res = db
.run_script(
r"?[x] <- [[1], [4]] :returning :rm a {x} ",
Default::default(),
)
.unwrap();
println!("{:?}", res.headers);
for row in res.into_json()["rows"].as_array().unwrap() {
println!("{}", row);
}
}
#[test]
fn parser_corner_case() {
let db = DbInstance::new("mem", "", "").unwrap();

Loading…
Cancel
Save