make imperative scripts stoppable

main
Ziyang Hu 2 years ago
parent 52c4c195ee
commit f97c497d31

@ -6,41 +6,42 @@
* You can obtain one at https://mozilla.org/MPL/2.0/.
*/
use std::collections::btree_map::Entry;
use std::collections::{BTreeMap, BTreeSet};
use std::collections::btree_map::Entry;
use std::default::Default;
use std::fmt::{Debug, Formatter};
use std::iter;
use std::path::Path;
use std::sync::{Arc, Mutex};
#[allow(unused_imports)]
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
#[allow(unused_imports)]
use std::thread;
#[allow(unused_imports)]
use std::time::{Duration, SystemTime, UNIX_EPOCH};
#[allow(unused_imports)]
use crossbeam::channel::{bounded, unbounded, Receiver, Sender};
use crossbeam::channel::{bounded, Receiver, Sender, unbounded};
use crossbeam::sync::ShardedLock;
use either::{Left, Right};
use itertools::Itertools;
use miette::Report;
#[allow(unused_imports)]
use miette::{bail, ensure, miette, Diagnostic, IntoDiagnostic, Result, WrapErr};
use miette::{bail, Diagnostic, ensure, IntoDiagnostic, miette, Result, WrapErr};
use miette::Report;
use serde_json::json;
use smartstring::{LazyCompact, SmartString};
use thiserror::Error;
use crate::{decode_tuple_from_kv, FixedRule};
use crate::data::functions::current_validity;
use crate::data::json::JsonValue;
use crate::data::program::{InputProgram, QueryAssertion, RelationOp};
use crate::data::relation::ColumnDef;
use crate::data::tuple::{Tuple, TupleT};
use crate::data::value::{DataValue, ValidityTs, LARGEST_UTF_CHAR};
use crate::data::value::{DataValue, LARGEST_UTF_CHAR, ValidityTs};
use crate::fixed_rule::DEFAULT_FIXED_RULES;
use crate::parse::{CozoScript, parse_script, SourceSpan};
use crate::parse::sys::SysOp;
use crate::parse::{parse_script, CozoScript, SourceSpan};
use crate::query::compile::{CompiledProgram, CompiledRule, CompiledRuleSet};
use crate::query::ra::{
FilteredRA, InnerJoin, NegJoin, RelAlgebra, ReorderRA, StoredRA, StoredWithValidityRA,
@ -51,21 +52,20 @@ use crate::runtime::callback::{
CallbackCollector, CallbackDeclaration, CallbackOp, EventCallbackRegistry,
};
use crate::runtime::relation::{
extend_tuple_from_v, AccessLevel, InsufficientAccessLevel, RelationHandle, RelationId,
AccessLevel, extend_tuple_from_v, InsufficientAccessLevel, RelationHandle, RelationId,
};
use crate::runtime::transact::SessionTx;
use crate::storage::temp::TempStorage;
use crate::storage::{Storage, StoreTx};
use crate::{decode_tuple_from_kv, FixedRule};
use crate::storage::temp::TempStorage;
struct RunningQueryHandle {
started_at: f64,
poison: Poison,
pub(crate) struct RunningQueryHandle {
pub(crate) started_at: f64,
pub(crate) poison: Poison,
}
struct RunningQueryCleanup {
id: u64,
running_queries: Arc<Mutex<BTreeMap<u64, RunningQueryHandle>>>,
pub(crate) struct RunningQueryCleanup {
pub(crate) id: u64,
pub(crate) running_queries: Arc<Mutex<BTreeMap<u64, RunningQueryHandle>>>,
}
impl Drop for RunningQueryCleanup {
@ -88,8 +88,8 @@ pub struct Db<S> {
pub(crate) db: S,
temp_db: TempStorage,
relation_store_id: Arc<AtomicU64>,
queries_count: Arc<AtomicU64>,
running_queries: Arc<Mutex<BTreeMap<u64, RunningQueryHandle>>>,
pub(crate) queries_count: Arc<AtomicU64>,
pub(crate) running_queries: Arc<Mutex<BTreeMap<u64, RunningQueryHandle>>>,
pub(crate) fixed_rules: Arc<ShardedLock<BTreeMap<String, Arc<Box<dyn FixedRule>>>>>,
#[cfg(not(target_arch = "wasm32"))]
callback_count: Arc<AtomicU32>,
@ -1304,16 +1304,7 @@ impl<'s, S: Storage<'s>> Db<S> {
let id = self.queries_count.fetch_add(1, Ordering::AcqRel);
// time the query
#[cfg(not(target_arch = "wasm32"))]
let now = SystemTime::now();
#[cfg(not(target_arch = "wasm32"))]
let since_the_epoch = now
.duration_since(UNIX_EPOCH)
.into_diagnostic()?
.as_secs_f64();
#[cfg(target_arch = "wasm32")]
let since_the_epoch = js_sys::Date::now();
let since_the_epoch = seconds_since_the_epoch()?;
let handle = RunningQueryHandle {
started_at: since_the_epoch,
@ -1625,3 +1616,16 @@ impl Poison {
Ok(())
}
}
pub(crate) fn seconds_since_the_epoch() -> Result<f64> {
#[cfg(not(target_arch = "wasm32"))]
let now = SystemTime::now();
#[cfg(not(target_arch = "wasm32"))]
return Ok(now
.duration_since(UNIX_EPOCH)
.into_diagnostic()?
.as_secs_f64());
#[cfg(target_arch = "wasm32")]
Ok(js_sys::Date::now())
}

@ -7,6 +7,7 @@
*/
use std::collections::{BTreeMap, BTreeSet};
use std::sync::atomic::Ordering;
use either::{Either, Left, Right};
use itertools::Itertools;
@ -20,7 +21,8 @@ use crate::data::symb::Symbol;
use crate::parse::{ImperativeCondition, ImperativeProgram, ImperativeStmt, SourceSpan};
use crate::runtime::callback::CallbackCollector;
use crate::runtime::transact::SessionTx;
use crate::{DataValue, Db, NamedRows, Storage, ValidityTs};
use crate::{DataValue, Db, NamedRows, Poison, Storage, ValidityTs};
use crate::runtime::db::{RunningQueryCleanup, RunningQueryHandle, seconds_since_the_epoch};
enum ControlCode {
Termination(NamedRows),
@ -75,9 +77,11 @@ impl<'s, S: Storage<'s>> Db<S> {
cur_vld: ValidityTs,
callback_targets: &BTreeSet<SmartString<LazyCompact>>,
callback_collector: &mut CallbackCollector,
poison: &Poison
) -> Result<Either<NamedRows, ControlCode>> {
let mut ret = NamedRows::default();
for p in ps {
poison.check()?;
match p {
ImperativeStmt::Break { target, span, .. } => {
return Ok(Right(ControlCode::Break(target.clone(), *span)));
@ -168,6 +172,7 @@ impl<'s, S: Storage<'s>> Db<S> {
cur_vld,
callback_targets,
callback_collector,
poison
)? {
Left(rows) => {
ret = rows;
@ -178,6 +183,8 @@ impl<'s, S: Storage<'s>> Db<S> {
ImperativeStmt::Loop { label, body, .. } => {
ret = Default::default();
loop {
poison.check()?;
match self.execute_imperative_stmts(
body,
tx,
@ -185,6 +192,7 @@ impl<'s, S: Storage<'s>> Db<S> {
cur_vld,
callback_targets,
callback_collector,
poison
)? {
Left(_) => {}
Right(ctrl) => match ctrl {
@ -256,6 +264,21 @@ impl<'s, S: Storage<'s>> Db<S> {
} else {
self.transact()?
};
let poison = Poison::default();
let qid = self.queries_count.fetch_add(1, Ordering::AcqRel);
let since_the_epoch = seconds_since_the_epoch()?;
let q_handle = RunningQueryHandle {
started_at: since_the_epoch,
poison: poison.clone(),
};
self.running_queries.lock().unwrap().insert(qid, q_handle);
let _guard = RunningQueryCleanup {
id: qid,
running_queries: self.running_queries.clone(),
};
match self.execute_imperative_stmts(
&ps,
&mut tx,
@ -263,6 +286,7 @@ impl<'s, S: Storage<'s>> Db<S> {
cur_vld,
&callback_targets,
&mut callback_collector,
&poison
)? {
Left(res) => ret = res,
Right(ctrl) => match ctrl {

Loading…
Cancel
Save