store query results in script as ephemeral relations, as discussed in https://github.com/cozodb/cozo/issues/110

main
Ziyang Hu 1 year ago
parent 0387d07911
commit 9a634ff9d7

@ -55,6 +55,7 @@ var = @{(XID_START | "_") ~ (XID_CONTINUE | "_")*}
param = @{"$" ~ (XID_CONTINUE | "_")*} param = @{"$" ~ (XID_CONTINUE | "_")*}
ident = @{XID_START ~ ("_" | XID_CONTINUE)*} ident = @{XID_START ~ ("_" | XID_CONTINUE)*}
underscore_ident = @{("_" | XID_START) ~ ("_" | XID_CONTINUE)*} underscore_ident = @{("_" | XID_START) ~ ("_" | XID_CONTINUE)*}
definitely_underscore_ident = @{"_" ~ XID_CONTINUE+}
relation_ident = @{"*" ~ (compound_or_index_ident | underscore_ident)} relation_ident = @{"*" ~ (compound_or_index_ident | underscore_ident)}
search_index_ident = _{"~" ~ compound_or_index_ident} search_index_ident = _{"~" ~ compound_or_index_ident}
compound_ident = @{ident ~ ("." ~ ident)*} compound_ident = @{ident ~ ("." ~ ident)*}
@ -95,7 +96,7 @@ not_op = @{"not" ~ !XID_CONTINUE}
apply = {ident ~ "(" ~ apply_args ~ ")"} apply = {ident ~ "(" ~ apply_args ~ ")"}
apply_args = {(expr ~ ",")* ~ expr?} apply_args = {(expr ~ ",")* ~ expr?}
named_apply_args = {(named_apply_pair ~ ",")* ~ named_apply_pair?} named_apply_args = {(named_apply_pair ~ ",")* ~ named_apply_pair?}
named_apply_pair = {ident ~ (":" ~ expr)?} named_apply_pair = {underscore_ident ~ (":" ~ expr)?}
grouped = _{"(" ~ rule_body ~ ")"} grouped = _{"(" ~ rule_body ~ ")"}
expr = {unary_op* ~ term ~ (operation ~ unary_op* ~ term)*} expr = {unary_op* ~ term ~ (operation ~ unary_op* ~ term)*}
@ -232,9 +233,10 @@ vec_el_type = {"F32" | "F64" | "Float" | "Double" }
imperative_stmt = _{ imperative_stmt = _{
break_stmt | continue_stmt | return_stmt | debug_stmt | break_stmt | continue_stmt | return_stmt | debug_stmt |
query_script_inner | ignore_error_script | if_chain | if_not_chain | loop_block | temp_swap imperative_clause | ignore_error_script | if_chain | if_not_chain | loop_block | temp_swap
} }
imperative_condition = _{underscore_ident | query_script_inner} imperative_clause = {query_script_inner ~ ("as" ~ definitely_underscore_ident)?}
imperative_condition = _{underscore_ident | imperative_clause}
if_chain = {"%if" ~ imperative_condition if_chain = {"%if" ~ imperative_condition
~ "%then"? ~ imperative_block ~ "%then"? ~ imperative_block
~ ("%else" ~ imperative_block)? ~ "%end" } ~ ("%else" ~ imperative_block)? ~ "%end" }
@ -243,9 +245,9 @@ if_not_chain = {"%if_not" ~ imperative_condition
~ ("%else" ~ imperative_block)? ~ "%end" } ~ ("%else" ~ imperative_block)? ~ "%end" }
imperative_block = {imperative_stmt+} imperative_block = {imperative_stmt+}
break_stmt = {"%break" ~ ident?} break_stmt = {"%break" ~ ident?}
ignore_error_script = {"%ignore_error" ~ query_script_inner} ignore_error_script = {"%ignore_error" ~ imperative_clause}
continue_stmt = {"%continue" ~ ident?} continue_stmt = {"%continue" ~ ident?}
return_stmt = {"%return" ~ (ident | underscore_ident | query_script_inner)*} return_stmt = {"%return" ~ (ident | underscore_ident | imperative_clause)*}
loop_block = {("%mark" ~ ident)? ~ "%loop" ~ imperative_block ~ "%end"} loop_block = {("%mark" ~ ident)? ~ "%loop" ~ imperative_block ~ "%end"}
temp_swap = {"%swap" ~ underscore_ident ~ underscore_ident} temp_swap = {"%swap" ~ underscore_ident ~ underscore_ident}
debug_stmt = {"%debug" ~ (ident | underscore_ident)} debug_stmt = {"%debug" ~ (ident | underscore_ident)}

@ -17,7 +17,9 @@ use smartstring::SmartString;
use thiserror::Error; use thiserror::Error;
use crate::parse::query::parse_query; use crate::parse::query::parse_query;
use crate::parse::{ExtractSpan, ImperativeProgram, ImperativeStmt, Pair, Rule, SourceSpan}; use crate::parse::{
ExtractSpan, ImperativeProgram, ImperativeStmt, ImperativeStmtClause, Pair, Rule, SourceSpan,
};
use crate::{DataValue, FixedRule, ValidityTs}; use crate::{DataValue, FixedRule, ValidityTs};
pub(crate) fn parse_imperative_block( pub(crate) fn parse_imperative_block(
@ -86,8 +88,15 @@ fn parse_imperative_stmt(
rets.push(Right(rel)); rets.push(Right(rel));
} }
Rule::query_script_inner => { Rule::query_script_inner => {
let prog = parse_query(p.into_inner(), param_pool, fixed_rules, cur_vld)?; let mut src = p.into_inner();
rets.push(Left(prog)) let prog = parse_query(
src.next().unwrap().into_inner(),
param_pool,
fixed_rules,
cur_vld,
)?;
let store_as = src.next().map(|p| SmartString::from(p.as_str().trim()));
rets.push(Left(ImperativeStmtClause { prog, store_as }))
} }
_ => unreachable!(), _ => unreachable!(),
} }
@ -100,12 +109,17 @@ fn parse_imperative_stmt(
let condition = inner.next().unwrap(); let condition = inner.next().unwrap();
let cond = match condition.as_rule() { let cond = match condition.as_rule() {
Rule::underscore_ident => Left(SmartString::from(condition.as_str())), Rule::underscore_ident => Left(SmartString::from(condition.as_str())),
Rule::query_script_inner => Right(parse_query( Rule::query_script_inner => {
condition.into_inner(), let mut src = condition.into_inner();
let prog = parse_query(
src.next().unwrap().into_inner(),
param_pool, param_pool,
fixed_rules, fixed_rules,
cur_vld, cur_vld,
)?), )?;
let store_as = src.next().map(|p| SmartString::from(p.as_str().trim()));
Right(ImperativeStmtClause { prog, store_as })
}
_ => unreachable!(), _ => unreachable!(),
}; };
let body = inner let body = inner
@ -161,14 +175,32 @@ fn parse_imperative_stmt(
temp: SmartString::from(name), temp: SmartString::from(name),
} }
} }
Rule::query_script_inner => { Rule::imperative_clause => {
let prog = parse_query(pair.into_inner(), param_pool, fixed_rules, cur_vld)?; let mut src = pair.into_inner();
ImperativeStmt::Program { prog } let prog = parse_query(
src.next().unwrap().into_inner(),
param_pool,
fixed_rules,
cur_vld,
)?;
let store_as = src.next().map(|p| SmartString::from(p.as_str().trim()));
ImperativeStmt::Program {
prog: ImperativeStmtClause { prog, store_as },
}
} }
Rule::ignore_error_script => { Rule::ignore_error_script => {
let pair = pair.into_inner().next().unwrap(); let pair = pair.into_inner().next().unwrap();
let prog = parse_query(pair.into_inner(), param_pool, fixed_rules, cur_vld)?; let mut src = pair.into_inner();
ImperativeStmt::IgnoreErrorProgram { prog } let prog = parse_query(
src.next().unwrap().into_inner(),
param_pool,
fixed_rules,
cur_vld,
)?;
let store_as = src.next().map(|p| SmartString::from(p.as_str().trim()));
ImperativeStmt::IgnoreErrorProgram {
prog: ImperativeStmtClause { prog, store_as },
}
} }
r => unreachable!("{r:?}"), r => unreachable!("{r:?}"),
}) })

@ -47,6 +47,12 @@ pub(crate) enum CozoScript {
Sys(SysOp), Sys(SysOp),
} }
#[derive(Debug)]
pub(crate) struct ImperativeStmtClause {
pub(crate) prog: InputProgram,
pub(crate) store_as: Option<SmartString<LazyCompact>>,
}
#[derive(Debug)] #[derive(Debug)]
pub(crate) enum ImperativeStmt { pub(crate) enum ImperativeStmt {
Break { Break {
@ -58,13 +64,13 @@ pub(crate) enum ImperativeStmt {
span: SourceSpan, span: SourceSpan,
}, },
Return { Return {
returns: Vec<Either<InputProgram, SmartString<LazyCompact>>>, returns: Vec<Either<ImperativeStmtClause, SmartString<LazyCompact>>>,
}, },
Program { Program {
prog: InputProgram, prog: ImperativeStmtClause,
}, },
IgnoreErrorProgram { IgnoreErrorProgram {
prog: InputProgram, prog: ImperativeStmtClause,
}, },
If { If {
condition: ImperativeCondition, condition: ImperativeCondition,
@ -86,7 +92,7 @@ pub(crate) enum ImperativeStmt {
}, },
} }
pub(crate) type ImperativeCondition = Either<SmartString<LazyCompact>, InputProgram>; pub(crate) type ImperativeCondition = Either<SmartString<LazyCompact>, ImperativeStmtClause>;
pub(crate) type ImperativeProgram = Vec<ImperativeStmt>; pub(crate) type ImperativeProgram = Vec<ImperativeStmt>;
@ -95,14 +101,14 @@ impl ImperativeStmt {
match self { match self {
ImperativeStmt::Program { prog, .. } ImperativeStmt::Program { prog, .. }
| ImperativeStmt::IgnoreErrorProgram { prog, .. } => { | ImperativeStmt::IgnoreErrorProgram { prog, .. } => {
if let Some(name) = prog.needs_write_lock() { if let Some(name) = prog.prog.needs_write_lock() {
collector.insert(name); collector.insert(name);
} }
} }
ImperativeStmt::Return { returns, .. } => { ImperativeStmt::Return { returns, .. } => {
for ret in returns { for ret in returns {
if let Left(prog) = ret { if let Left(prog) = ret {
if let Some(name) = prog.needs_write_lock() { if let Some(name) = prog.prog.needs_write_lock() {
collector.insert(name); collector.insert(name);
} }
} }
@ -115,7 +121,7 @@ impl ImperativeStmt {
.. ..
} => { } => {
if let ImperativeCondition::Right(prog) = condition { if let ImperativeCondition::Right(prog) = condition {
if let Some(name) = prog.needs_write_lock() { if let Some(name) = prog.prog.needs_write_lock() {
collector.insert(name); collector.insert(name);
} }
} }

@ -15,10 +15,13 @@ use miette::{bail, Diagnostic, Report, Result};
use smartstring::{LazyCompact, SmartString}; use smartstring::{LazyCompact, SmartString};
use thiserror::Error; use thiserror::Error;
use crate::data::program::RelationOp;
use crate::data::relation::{ColType, ColumnDef, NullableColType, StoredRelationMetadata};
use crate::data::symb::Symbol; use crate::data::symb::Symbol;
use crate::parse::{ImperativeCondition, ImperativeProgram, ImperativeStmt, SourceSpan}; use crate::parse::{ImperativeCondition, ImperativeProgram, ImperativeStmt, SourceSpan};
use crate::runtime::callback::CallbackCollector; use crate::runtime::callback::CallbackCollector;
use crate::runtime::db::{seconds_since_the_epoch, RunningQueryCleanup, RunningQueryHandle}; use crate::runtime::db::{seconds_since_the_epoch, RunningQueryCleanup, RunningQueryHandle};
use crate::runtime::relation::InputRelationHandle;
use crate::runtime::transact::SessionTx; use crate::runtime::transact::SessionTx;
use crate::{DataValue, Db, NamedRows, Poison, Storage, ValidityTs}; use crate::{DataValue, Db, NamedRows, Poison, Storage, ValidityTs};
@ -44,7 +47,7 @@ impl<'s, S: Storage<'s>> Db<S> {
relation.as_named_rows(tx)? relation.as_named_rows(tx)?
} }
Right(p) => self.execute_single_program( Right(p) => self.execute_single_program(
p.clone(), p.prog.clone(),
tx, tx,
cleanups, cleanups,
cur_vld, cur_vld,
@ -52,6 +55,11 @@ impl<'s, S: Storage<'s>> Db<S> {
callback_collector, callback_collector,
)?, )?,
}; };
if let Right(pg) = &p {
if let Some(store_as) = &pg.store_as {
tx.script_store_as_relation(self, store_as, &res, cur_vld)?;
}
}
Ok(!res.rows.is_empty()) Ok(!res.rows.is_empty())
} }
@ -83,7 +91,7 @@ impl<'s, S: Storage<'s>> Db<S> {
for nxt in returns.iter().rev() { for nxt in returns.iter().rev() {
let mut nr = match nxt { let mut nr = match nxt {
Left(prog) => self.execute_single_program( Left(prog) => self.execute_single_program(
prog.clone(), prog.prog.clone(),
tx, tx,
cleanups, cleanups,
cur_vld, cur_vld,
@ -95,6 +103,11 @@ impl<'s, S: Storage<'s>> Db<S> {
relation.as_named_rows(tx)? relation.as_named_rows(tx)?
} }
}; };
if let Left(pg) = nxt {
if let Some(store_as) = &pg.store_as {
tx.script_store_as_relation(self, store_as, &nr, cur_vld)?;
}
}
nr.next = current; nr.next = current;
current = Some(Box::new(nr)) current = Some(Box::new(nr))
} }
@ -107,24 +120,32 @@ impl<'s, S: Storage<'s>> Db<S> {
} }
ImperativeStmt::Program { prog, .. } => { ImperativeStmt::Program { prog, .. } => {
ret = self.execute_single_program( ret = self.execute_single_program(
prog.clone(), prog.prog.clone(),
tx, tx,
cleanups, cleanups,
cur_vld, cur_vld,
callback_targets, callback_targets,
callback_collector, callback_collector,
)?; )?;
if let Some(store_as) = &prog.store_as {
tx.script_store_as_relation(self, store_as, &ret, cur_vld)?;
}
} }
ImperativeStmt::IgnoreErrorProgram { prog, .. } => { ImperativeStmt::IgnoreErrorProgram { prog, .. } => {
match self.execute_single_program( match self.execute_single_program(
prog.clone(), prog.prog.clone(),
tx, tx,
cleanups, cleanups,
cur_vld, cur_vld,
callback_targets, callback_targets,
callback_collector, callback_collector,
) { ) {
Ok(res) => ret = res, Ok(res) => {
if let Some(store_as) = &prog.store_as {
tx.script_store_as_relation(self, store_as, &res, cur_vld)?;
}
ret = res
}
Err(_) => { Err(_) => {
ret = NamedRows::new( ret = NamedRows::new(
vec!["status".to_string()], vec!["status".to_string()],
@ -303,3 +324,53 @@ impl<'s, S: Storage<'s>> Db<S> {
Ok(ret) Ok(ret)
} }
} }
impl SessionTx<'_> {
fn script_store_as_relation<'s, S: Storage<'s>>(
&mut self,
db: &Db<S>,
name: &str,
rels: &NamedRows,
cur_vld: ValidityTs,
) -> Result<()> {
let meta = InputRelationHandle {
name: Symbol::new(name, Default::default()),
metadata: StoredRelationMetadata {
keys: rels
.headers
.iter()
.map(|s| ColumnDef {
name: s.into(),
typing: NullableColType {
coltype: ColType::Any,
nullable: true,
},
default_gen: None,
})
.collect_vec(),
non_keys: vec![],
},
key_bindings: rels
.headers
.iter()
.map(|s| Symbol::new(s.clone(), Default::default()))
.collect_vec(),
dep_bindings: vec![],
span: Default::default(),
};
let headers = meta.key_bindings.clone();
self.execute_relation(
db,
rels.rows.iter().cloned(),
RelationOp::Replace,
&meta,
&headers,
cur_vld,
&Default::default(),
&mut Default::default(),
true,
"",
)?;
Ok(())
}
}

@ -1214,8 +1214,8 @@ fn deletion() {
Default::default(), Default::default(),
) )
.is_ok()); .is_ok());
db db.run_script(r"?[x] <- [[1]] :delete a {x}", Default::default())
.run_script(r"?[x] <- [[1]] :delete a {x}", Default::default()).unwrap(); .unwrap();
} }
#[test] #[test]
@ -1240,7 +1240,10 @@ fn returning() {
Default::default(), Default::default(),
) )
.unwrap(); .unwrap();
assert_eq!(res.into_json()["rows"], json!([["inserted", 1, 3], ["inserted", 2, 4], ["replaced", 1, 2]])); assert_eq!(
res.into_json()["rows"],
json!([["inserted", 1, 3], ["inserted", 2, 4], ["replaced", 1, 2]])
);
// println!("{:?}", res.headers); // println!("{:?}", res.headers);
// for row in res.into_json()["rows"].as_array().unwrap() { // for row in res.into_json()["rows"].as_array().unwrap() {
// println!("{}", row); // println!("{}", row);
@ -1256,8 +1259,18 @@ fn returning() {
// for row in res.into_json()["rows"].as_array().unwrap() { // for row in res.into_json()["rows"].as_array().unwrap() {
// println!("{}", row); // println!("{}", row);
// } // }
assert_eq!(res.into_json()["rows"], json!([["requested", 1, null], ["requested", 4, null], ["deleted", 1, 3]])); assert_eq!(
db.run_script(r":create todo{id:Uuid default rand_uuid_v1() => label: String, done: Bool}", Default::default()) res.into_json()["rows"],
json!([
["requested", 1, null],
["requested", 4, null],
["deleted", 1, 3]
])
);
db.run_script(
r":create todo{id:Uuid default rand_uuid_v1() => label: String, done: Bool}",
Default::default(),
)
.unwrap(); .unwrap();
let res = db let res = db
.run_script( .run_script(
@ -1295,3 +1308,32 @@ fn parser_corner_case() {
) )
.unwrap(); .unwrap();
} }
#[test]
fn as_store_in_imperative_script() {
let db = DbInstance::new("mem", "", "").unwrap();
let res = db
.run_script(
r#"
{ ?[x, y, z] <- [[1, 2, 3], [4, 5, 6]] } as _store
{ ?[x, y, z] := *_store{x, y, z} }
"#,
Default::default(),
)
.unwrap();
assert_eq!(res.into_json()["rows"], json!([[1, 2, 3], [4, 5, 6]]));
let res = db.run_script(r#"
{
?[y] <- [[1], [2], [3]]
:create a {x default rand_uuid_v1() => y}
:returning
} as _last
{
?[x] := *_last{_kind: 'inserted', x}
}
"#, Default::default()).unwrap();
assert_eq!(3, res.rows.len());
for row in res.into_json()["rows"].as_array().unwrap() {
println!("{}", row);
}
}

Loading…
Cancel
Save