From 86289cd628307e7d601d62606cd8abe83dc55b21 Mon Sep 17 00:00:00 2001 From: Ziyang Hu Date: Wed, 7 Dec 2022 19:58:57 +0800 Subject: [PATCH] documenting --- cozo-core/src/query/eval.rs | 12 +++++----- cozo-core/src/runtime/db.rs | 48 +++++++++++++++++++++++++++---------- 2 files changed, 41 insertions(+), 19 deletions(-) diff --git a/cozo-core/src/query/eval.rs b/cozo-core/src/query/eval.rs index 49064eae..17bdc256 100644 --- a/cozo-core/src/query/eval.rs +++ b/cozo-core/src/query/eval.rs @@ -57,12 +57,6 @@ impl<'a> SessionTx<'a> { num_to_skip: Option, poison: Poison, ) -> Result<(InMemRelation, bool)> { - let ret_area = stores - .get(&MagicSymbol::Muggle { - inner: Symbol::new(PROG_ENTRY, SourceSpan(0, 0)), - }) - .ok_or(NoEntryError)? - .clone(); let mut early_return = false; for (idx, cur_prog) in strata.iter().enumerate() { debug!("stratum {}", idx); @@ -74,6 +68,12 @@ impl<'a> SessionTx<'a> { poison.clone(), )?; } + let ret_area = stores + .get(&MagicSymbol::Muggle { + inner: Symbol::new(PROG_ENTRY, SourceSpan(0, 0)), + }) + .ok_or(NoEntryError)? + .clone(); Ok((ret_area, early_return)) } /// returns true if early return is activated diff --git a/cozo-core/src/runtime/db.rs b/cozo-core/src/runtime/db.rs index df253298..2029556a 100644 --- a/cozo-core/src/runtime/db.rs +++ b/cozo-core/src/runtime/db.rs @@ -31,7 +31,7 @@ use crate::parse::sys::SysOp; use crate::parse::{parse_script, CozoScript, SourceSpan}; use crate::query::compile::{CompiledProgram, CompiledRule, CompiledRuleSet}; use crate::query::ra::{ - FilteredRA, TempStoreRA, InnerJoin, NegJoin, RelAlgebra, ReorderRA, StoredRA, UnificationRA, + FilteredRA, InnerJoin, NegJoin, RelAlgebra, ReorderRA, StoredRA, TempStoreRA, UnificationRA, }; use crate::runtime::relation::{AccessLevel, InsufficientAccessLevel, RelationHandle, RelationId}; use crate::runtime::transact::SessionTx; @@ -530,7 +530,9 @@ impl<'s, S: Storage<'s>> Db { ("fixed", json!(null), json!(null), json!(null)) } RelAlgebra::TempStore(TempStoreRA { - storage_key, filters, .. + storage_key, + filters, + .. }) => ( "load_mem", json!(storage_key.to_string()), @@ -748,12 +750,16 @@ impl<'s, S: Storage<'s>> Db { } } } + /// This is the entry to query evaluation pub(crate) fn run_query( &self, tx: &mut SessionTx<'_>, input_program: InputProgram, ) -> Result<(NamedRows, Vec<(Vec, Vec)>)> { + // cleanups contain stored relations that should be deleted at the end of query let mut clean_ups = vec![]; + + // Some checks in case the query specifies mutation if let Some((meta, op)) = &input_program.out_opts.store_relation { if *op == RelationOp::Create { #[derive(Debug, Error, Diagnostic)] @@ -781,18 +787,22 @@ impl<'s, S: Storage<'s>> Db { existing.ensure_compatible(meta)?; } }; - // TODO + + // query compilation let (stratified_program, _store_lifetimes) = input_program.to_normalized_program(tx)?.stratify()?; let program = stratified_program.magic_sets_rewrite(tx)?; let (compiled, stores) = tx.stratified_magic_compile(&program)?; + // poison is used to terminate queries early let poison = Poison::default(); if let Some(secs) = input_program.out_opts.timeout { poison.set_timeout(secs)?; } + // give the query an ID and store it so that it can be queried and cancelled let id = self.queries_count.fetch_add(1, Ordering::AcqRel); + // time the query #[cfg(not(feature = "wasm"))] let now = SystemTime::now(); #[cfg(not(feature = "wasm"))] @@ -809,26 +819,35 @@ impl<'s, S: Storage<'s>> Db { poison: poison.clone(), }; self.running_queries.lock().unwrap().insert(id, handle); + + // RAII cleanups of running query handle let _guard = RunningQueryCleanup { id, running_queries: self.running_queries.clone(), }; + let total_num_to_take = if input_program.out_opts.sorters.is_empty() { + input_program.out_opts.num_to_take() + } else { + None + }; + + let num_to_skip = if input_program.out_opts.sorters.is_empty() { + input_program.out_opts.offset + } else { + None + }; + + // the real evaluation let (result, early_return) = tx.stratified_magic_evaluate( &compiled, &stores, - if input_program.out_opts.sorters.is_empty() { - input_program.out_opts.num_to_take() - } else { - None - }, - if input_program.out_opts.sorters.is_empty() { - input_program.out_opts.offset - } else { - None - }, + total_num_to_take, + num_to_skip, poison, )?; + + // deal with assertions if let Some(assertion) = &input_program.out_opts.assertion { match assertion { QueryAssertion::AssertNone(span) => { @@ -857,7 +876,9 @@ impl<'s, S: Storage<'s>> Db { } } } + if !input_program.out_opts.sorters.is_empty() { + // sort outputs if required let entry_head = input_program.get_entry_out_head()?; let sorted_result = tx.sort_and_collect(result, &input_program.out_opts.sorters, &entry_head)?; @@ -891,6 +912,7 @@ impl<'s, S: Storage<'s>> Db { clean_ups, )) } else { + // not sorting outputs let rows: Vec> = sorted_iter .map_ok(|tuple| -> Vec { tuple.into_iter().map(JsonValue::from).collect()