yielding relations for the next block

main
Ziyang Hu 2 years ago
parent 46d89a24f2
commit 36b5b804e5

@ -10,6 +10,7 @@ use std::collections::btree_map::Entry;
use std::collections::BTreeMap;
use std::default::Default;
use std::fmt::{Debug, Formatter};
use std::rc::Rc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Mutex, RwLock};
#[allow(unused_imports)]
@ -25,13 +26,18 @@ use serde_json::json;
use smartstring::SmartString;
use thiserror::Error;
use crate::data::expr::Expr;
use crate::data::functions::current_validity;
use crate::data::json::JsonValue;
use crate::data::program::{InputProgram, MagicSymbol, QueryAssertion, RelationOp};
use crate::data::program::{
FixedRuleApply, InputInlineRulesOrFixed, InputProgram, QueryAssertion, RelationOp,
};
use crate::data::relation::ColumnDef;
use crate::data::symb::Symbol;
use crate::data::tuple::{Tuple, TupleT};
use crate::data::value::{DataValue, ValidityTs, LARGEST_UTF_CHAR};
use crate::fixed_rule::DEFAULT_FIXED_RULES;
use crate::fixed_rule::utilities::Constant;
use crate::fixed_rule::{FixedRuleHandle, DEFAULT_FIXED_RULES};
use crate::parse::sys::SysOp;
use crate::parse::{parse_script, CozoScript, SourceSpan};
use crate::query::compile::{CompiledProgram, CompiledRule, CompiledRuleSet};
@ -510,10 +516,21 @@ impl<'s, S: Storage<'s>> Db<S> {
self.transact()?
};
for p in ps {
let mut propagate_results = BTreeMap::new();
let prog_n = ps.len();
for (i, mut p) in ps.into_iter().enumerate() {
#[allow(unused_variables)]
let sleep_opt = p.out_opts.sleep;
let prop = p.out_opts.yield_const.clone();
propagate_previous_results(&mut p, &propagate_results)?;
let (q_res, q_cleanups) = self.run_query(&mut tx, p, cur_vld)?;
if let Some(to_yield) = prop {
if i != prog_n - 1 {
propagate_results.insert(to_yield, q_res.clone());
}
}
res = q_res;
cleanups.extend(q_cleanups);
#[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))]
@ -1194,6 +1211,57 @@ impl Poison {
}
}
fn propagate_previous_results(
p: &mut InputProgram,
prev_results: &BTreeMap<Symbol, NamedRows>,
) -> Result<()> {
// OPTIMIZATION: insert only if needed
for (k, v) in prev_results {
let replaced = p.prog.insert(
k.clone(),
InputInlineRulesOrFixed::Fixed {
fixed: FixedRuleApply {
fixed_handle: FixedRuleHandle {
name: Symbol::new("Constant", Default::default()),
},
rule_args: vec![],
options: Rc::new(BTreeMap::from([(
SmartString::from("data"),
Expr::Const {
val: DataValue::List(
v.rows
.iter()
.map(|row| {
DataValue::List(
row.iter().map(DataValue::from).collect_vec(),
)
})
.collect_vec(),
),
span: Default::default(),
},
)])),
head: vec![],
arity: v.headers.len(),
span: Default::default(),
fixed_impl: Arc::new(Box::new(Constant)),
},
},
);
if let Some(replaced_rel) = replaced {
#[derive(Debug, Diagnostic, Error)]
#[error("Name conflict with previous yield: '{0}'")]
#[diagnostic(code(db::name_confilict_with_yield))]
pub(crate) struct ConflictWithPrevYield(String, #[label] SourceSpan);
bail!(ConflictWithPrevYield(
k.to_string(),
replaced_rel.first_span()
))
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use itertools::Itertools;
@ -1479,6 +1547,23 @@ grandparent[gcld, gp] := parent[gcld, p], parent[p, gp]
"#,
Default::default(),
)
.unwrap();
.unwrap()
.rows;
assert_eq!(json!(res), json!([[1, 2, 3]]));
let res = db.run_script(
r#"
{
?[] <- [[1,2,3]]
:yield nxt
}
{
nxt[] <- [[2, 3, 5]]
?[a,b,c] := nxt[a, b, c]
}
"#,
Default::default(),
);
assert!(res.is_err());
}
}

Loading…
Cancel
Save