|
|
|
@ -31,7 +31,7 @@ use crate::parse::sys::SysOp;
|
|
|
|
|
use crate::parse::{parse_script, CozoScript, SourceSpan};
|
|
|
|
|
use crate::query::compile::{CompiledProgram, CompiledRule, CompiledRuleSet};
|
|
|
|
|
use crate::query::ra::{
|
|
|
|
|
FilteredRA, TempStoreRA, InnerJoin, NegJoin, RelAlgebra, ReorderRA, StoredRA, UnificationRA,
|
|
|
|
|
FilteredRA, InnerJoin, NegJoin, RelAlgebra, ReorderRA, StoredRA, TempStoreRA, UnificationRA,
|
|
|
|
|
};
|
|
|
|
|
use crate::runtime::relation::{AccessLevel, InsufficientAccessLevel, RelationHandle, RelationId};
|
|
|
|
|
use crate::runtime::transact::SessionTx;
|
|
|
|
@ -530,7 +530,9 @@ impl<'s, S: Storage<'s>> Db<S> {
|
|
|
|
|
("fixed", json!(null), json!(null), json!(null))
|
|
|
|
|
}
|
|
|
|
|
RelAlgebra::TempStore(TempStoreRA {
|
|
|
|
|
storage_key, filters, ..
|
|
|
|
|
storage_key,
|
|
|
|
|
filters,
|
|
|
|
|
..
|
|
|
|
|
}) => (
|
|
|
|
|
"load_mem",
|
|
|
|
|
json!(storage_key.to_string()),
|
|
|
|
@ -748,12 +750,16 @@ impl<'s, S: Storage<'s>> Db<S> {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
/// This is the entry to query evaluation
|
|
|
|
|
pub(crate) fn run_query(
|
|
|
|
|
&self,
|
|
|
|
|
tx: &mut SessionTx<'_>,
|
|
|
|
|
input_program: InputProgram,
|
|
|
|
|
) -> Result<(NamedRows, Vec<(Vec<u8>, Vec<u8>)>)> {
|
|
|
|
|
// cleanups contain stored relations that should be deleted at the end of query
|
|
|
|
|
let mut clean_ups = vec![];
|
|
|
|
|
|
|
|
|
|
// Some checks in case the query specifies mutation
|
|
|
|
|
if let Some((meta, op)) = &input_program.out_opts.store_relation {
|
|
|
|
|
if *op == RelationOp::Create {
|
|
|
|
|
#[derive(Debug, Error, Diagnostic)]
|
|
|
|
@ -781,18 +787,22 @@ impl<'s, S: Storage<'s>> Db<S> {
|
|
|
|
|
existing.ensure_compatible(meta)?;
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
// TODO
|
|
|
|
|
|
|
|
|
|
// query compilation
|
|
|
|
|
let (stratified_program, _store_lifetimes) =
|
|
|
|
|
input_program.to_normalized_program(tx)?.stratify()?;
|
|
|
|
|
let program = stratified_program.magic_sets_rewrite(tx)?;
|
|
|
|
|
let (compiled, stores) = tx.stratified_magic_compile(&program)?;
|
|
|
|
|
|
|
|
|
|
// poison is used to terminate queries early
|
|
|
|
|
let poison = Poison::default();
|
|
|
|
|
if let Some(secs) = input_program.out_opts.timeout {
|
|
|
|
|
poison.set_timeout(secs)?;
|
|
|
|
|
}
|
|
|
|
|
// give the query an ID and store it so that it can be queried and cancelled
|
|
|
|
|
let id = self.queries_count.fetch_add(1, Ordering::AcqRel);
|
|
|
|
|
|
|
|
|
|
// time the query
|
|
|
|
|
#[cfg(not(feature = "wasm"))]
|
|
|
|
|
let now = SystemTime::now();
|
|
|
|
|
#[cfg(not(feature = "wasm"))]
|
|
|
|
@ -809,26 +819,35 @@ impl<'s, S: Storage<'s>> Db<S> {
|
|
|
|
|
poison: poison.clone(),
|
|
|
|
|
};
|
|
|
|
|
self.running_queries.lock().unwrap().insert(id, handle);
|
|
|
|
|
|
|
|
|
|
// RAII cleanups of running query handle
|
|
|
|
|
let _guard = RunningQueryCleanup {
|
|
|
|
|
id,
|
|
|
|
|
running_queries: self.running_queries.clone(),
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let (result, early_return) = tx.stratified_magic_evaluate(
|
|
|
|
|
&compiled,
|
|
|
|
|
&stores,
|
|
|
|
|
if input_program.out_opts.sorters.is_empty() {
|
|
|
|
|
let total_num_to_take = if input_program.out_opts.sorters.is_empty() {
|
|
|
|
|
input_program.out_opts.num_to_take()
|
|
|
|
|
} else {
|
|
|
|
|
None
|
|
|
|
|
},
|
|
|
|
|
if input_program.out_opts.sorters.is_empty() {
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let num_to_skip = if input_program.out_opts.sorters.is_empty() {
|
|
|
|
|
input_program.out_opts.offset
|
|
|
|
|
} else {
|
|
|
|
|
None
|
|
|
|
|
},
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// the real evaluation
|
|
|
|
|
let (result, early_return) = tx.stratified_magic_evaluate(
|
|
|
|
|
&compiled,
|
|
|
|
|
&stores,
|
|
|
|
|
total_num_to_take,
|
|
|
|
|
num_to_skip,
|
|
|
|
|
poison,
|
|
|
|
|
)?;
|
|
|
|
|
|
|
|
|
|
// deal with assertions
|
|
|
|
|
if let Some(assertion) = &input_program.out_opts.assertion {
|
|
|
|
|
match assertion {
|
|
|
|
|
QueryAssertion::AssertNone(span) => {
|
|
|
|
@ -857,7 +876,9 @@ impl<'s, S: Storage<'s>> Db<S> {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if !input_program.out_opts.sorters.is_empty() {
|
|
|
|
|
// sort outputs if required
|
|
|
|
|
let entry_head = input_program.get_entry_out_head()?;
|
|
|
|
|
let sorted_result =
|
|
|
|
|
tx.sort_and_collect(result, &input_program.out_opts.sorters, &entry_head)?;
|
|
|
|
@ -891,6 +912,7 @@ impl<'s, S: Storage<'s>> Db<S> {
|
|
|
|
|
clean_ups,
|
|
|
|
|
))
|
|
|
|
|
} else {
|
|
|
|
|
// not sorting outputs
|
|
|
|
|
let rows: Vec<Vec<JsonValue>> = sorted_iter
|
|
|
|
|
.map_ok(|tuple| -> Vec<JsonValue> {
|
|
|
|
|
tuple.into_iter().map(JsonValue::from).collect()
|
|
|
|
|