From f97c497d3188ba0d305b8fee0fed798217c3b025 Mon Sep 17 00:00:00 2001 From: Ziyang Hu Date: Wed, 25 Jan 2023 17:21:37 +0800 Subject: [PATCH] make imperative scripts stoppable --- cozo-core/src/runtime/db.rs | 60 +++++++++++++++-------------- cozo-core/src/runtime/imperative.rs | 26 ++++++++++++- 2 files changed, 57 insertions(+), 29 deletions(-) diff --git a/cozo-core/src/runtime/db.rs b/cozo-core/src/runtime/db.rs index 4838e3a8..9b9946eb 100644 --- a/cozo-core/src/runtime/db.rs +++ b/cozo-core/src/runtime/db.rs @@ -6,41 +6,42 @@ * You can obtain one at https://mozilla.org/MPL/2.0/. */ -use std::collections::btree_map::Entry; use std::collections::{BTreeMap, BTreeSet}; +use std::collections::btree_map::Entry; use std::default::Default; use std::fmt::{Debug, Formatter}; use std::iter; use std::path::Path; +use std::sync::{Arc, Mutex}; #[allow(unused_imports)] use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering}; -use std::sync::{Arc, Mutex}; #[allow(unused_imports)] use std::thread; #[allow(unused_imports)] use std::time::{Duration, SystemTime, UNIX_EPOCH}; #[allow(unused_imports)] -use crossbeam::channel::{bounded, unbounded, Receiver, Sender}; +use crossbeam::channel::{bounded, Receiver, Sender, unbounded}; use crossbeam::sync::ShardedLock; use either::{Left, Right}; use itertools::Itertools; -use miette::Report; #[allow(unused_imports)] -use miette::{bail, ensure, miette, Diagnostic, IntoDiagnostic, Result, WrapErr}; +use miette::{bail, Diagnostic, ensure, IntoDiagnostic, miette, Result, WrapErr}; +use miette::Report; use serde_json::json; use smartstring::{LazyCompact, SmartString}; use thiserror::Error; +use crate::{decode_tuple_from_kv, FixedRule}; use crate::data::functions::current_validity; use crate::data::json::JsonValue; use crate::data::program::{InputProgram, QueryAssertion, RelationOp}; use crate::data::relation::ColumnDef; use crate::data::tuple::{Tuple, TupleT}; -use crate::data::value::{DataValue, ValidityTs, LARGEST_UTF_CHAR}; +use crate::data::value::{DataValue, LARGEST_UTF_CHAR, ValidityTs}; use crate::fixed_rule::DEFAULT_FIXED_RULES; +use crate::parse::{CozoScript, parse_script, SourceSpan}; use crate::parse::sys::SysOp; -use crate::parse::{parse_script, CozoScript, SourceSpan}; use crate::query::compile::{CompiledProgram, CompiledRule, CompiledRuleSet}; use crate::query::ra::{ FilteredRA, InnerJoin, NegJoin, RelAlgebra, ReorderRA, StoredRA, StoredWithValidityRA, @@ -51,21 +52,20 @@ use crate::runtime::callback::{ CallbackCollector, CallbackDeclaration, CallbackOp, EventCallbackRegistry, }; use crate::runtime::relation::{ - extend_tuple_from_v, AccessLevel, InsufficientAccessLevel, RelationHandle, RelationId, + AccessLevel, extend_tuple_from_v, InsufficientAccessLevel, RelationHandle, RelationId, }; use crate::runtime::transact::SessionTx; -use crate::storage::temp::TempStorage; use crate::storage::{Storage, StoreTx}; -use crate::{decode_tuple_from_kv, FixedRule}; +use crate::storage::temp::TempStorage; -struct RunningQueryHandle { - started_at: f64, - poison: Poison, +pub(crate) struct RunningQueryHandle { + pub(crate) started_at: f64, + pub(crate) poison: Poison, } -struct RunningQueryCleanup { - id: u64, - running_queries: Arc>>, +pub(crate) struct RunningQueryCleanup { + pub(crate) id: u64, + pub(crate) running_queries: Arc>>, } impl Drop for RunningQueryCleanup { @@ -88,8 +88,8 @@ pub struct Db { pub(crate) db: S, temp_db: TempStorage, relation_store_id: Arc, - queries_count: Arc, - running_queries: Arc>>, + pub(crate) queries_count: Arc, + pub(crate) running_queries: Arc>>, pub(crate) fixed_rules: Arc>>>>, #[cfg(not(target_arch = "wasm32"))] callback_count: Arc, @@ -1304,16 +1304,7 @@ impl<'s, S: Storage<'s>> Db { let id = self.queries_count.fetch_add(1, Ordering::AcqRel); // time the query - #[cfg(not(target_arch = "wasm32"))] - let now = SystemTime::now(); - #[cfg(not(target_arch = "wasm32"))] - let since_the_epoch = now - .duration_since(UNIX_EPOCH) - .into_diagnostic()? - .as_secs_f64(); - - #[cfg(target_arch = "wasm32")] - let since_the_epoch = js_sys::Date::now(); + let since_the_epoch = seconds_since_the_epoch()?; let handle = RunningQueryHandle { started_at: since_the_epoch, @@ -1625,3 +1616,16 @@ impl Poison { Ok(()) } } + +pub(crate) fn seconds_since_the_epoch() -> Result { + #[cfg(not(target_arch = "wasm32"))] + let now = SystemTime::now(); + #[cfg(not(target_arch = "wasm32"))] + return Ok(now + .duration_since(UNIX_EPOCH) + .into_diagnostic()? + .as_secs_f64()); + + #[cfg(target_arch = "wasm32")] + Ok(js_sys::Date::now()) +} \ No newline at end of file diff --git a/cozo-core/src/runtime/imperative.rs b/cozo-core/src/runtime/imperative.rs index 38f0cdc0..aa534504 100644 --- a/cozo-core/src/runtime/imperative.rs +++ b/cozo-core/src/runtime/imperative.rs @@ -7,6 +7,7 @@ */ use std::collections::{BTreeMap, BTreeSet}; +use std::sync::atomic::Ordering; use either::{Either, Left, Right}; use itertools::Itertools; @@ -20,7 +21,8 @@ use crate::data::symb::Symbol; use crate::parse::{ImperativeCondition, ImperativeProgram, ImperativeStmt, SourceSpan}; use crate::runtime::callback::CallbackCollector; use crate::runtime::transact::SessionTx; -use crate::{DataValue, Db, NamedRows, Storage, ValidityTs}; +use crate::{DataValue, Db, NamedRows, Poison, Storage, ValidityTs}; +use crate::runtime::db::{RunningQueryCleanup, RunningQueryHandle, seconds_since_the_epoch}; enum ControlCode { Termination(NamedRows), @@ -75,9 +77,11 @@ impl<'s, S: Storage<'s>> Db { cur_vld: ValidityTs, callback_targets: &BTreeSet>, callback_collector: &mut CallbackCollector, + poison: &Poison ) -> Result> { let mut ret = NamedRows::default(); for p in ps { + poison.check()?; match p { ImperativeStmt::Break { target, span, .. } => { return Ok(Right(ControlCode::Break(target.clone(), *span))); @@ -168,6 +172,7 @@ impl<'s, S: Storage<'s>> Db { cur_vld, callback_targets, callback_collector, + poison )? { Left(rows) => { ret = rows; @@ -178,6 +183,8 @@ impl<'s, S: Storage<'s>> Db { ImperativeStmt::Loop { label, body, .. } => { ret = Default::default(); loop { + poison.check()?; + match self.execute_imperative_stmts( body, tx, @@ -185,6 +192,7 @@ impl<'s, S: Storage<'s>> Db { cur_vld, callback_targets, callback_collector, + poison )? { Left(_) => {} Right(ctrl) => match ctrl { @@ -256,6 +264,21 @@ impl<'s, S: Storage<'s>> Db { } else { self.transact()? }; + + let poison = Poison::default(); + let qid = self.queries_count.fetch_add(1, Ordering::AcqRel); + let since_the_epoch = seconds_since_the_epoch()?; + + let q_handle = RunningQueryHandle { + started_at: since_the_epoch, + poison: poison.clone(), + }; + self.running_queries.lock().unwrap().insert(qid, q_handle); + let _guard = RunningQueryCleanup { + id: qid, + running_queries: self.running_queries.clone(), + }; + match self.execute_imperative_stmts( &ps, &mut tx, @@ -263,6 +286,7 @@ impl<'s, S: Storage<'s>> Db { cur_vld, &callback_targets, &mut callback_collector, + &poison )? { Left(res) => ret = res, Right(ctrl) => match ctrl {