diff --git a/cozo-core/src/lib.rs b/cozo-core/src/lib.rs index 0b4e065a..7a238ba5 100644 --- a/cozo-core/src/lib.rs +++ b/cozo-core/src/lib.rs @@ -64,6 +64,7 @@ pub use storage::tikv::{new_cozo_tikv, TiKvStorage}; pub use storage::{Storage, StoreTx}; use crate::data::json::JsonValue; +#[cfg(not(target_arch = "wasm32"))] use crate::runtime::db::CallbackOp; pub(crate) mod data; @@ -367,6 +368,7 @@ impl DbInstance { self.import_from_backup(&json_payload.path, &json_payload.relations) } + #[cfg(not(target_arch = "wasm32"))] pub fn register_callback(&self, callback: CB, dependent: &str) -> Result where CB: Fn(CallbackOp, NamedRows, NamedRows) + Send + Sync + 'static { diff --git a/cozo-core/src/query/eval.rs b/cozo-core/src/query/eval.rs index 515f5423..746f6546 100644 --- a/cozo-core/src/query/eval.rs +++ b/cozo-core/src/query/eval.rs @@ -43,6 +43,7 @@ impl QueryLimiter { false } } + #[allow(dead_code)] pub(crate) fn is_stopped(&self) -> bool { if let Some(limit) = self.total { self.counter.load(Ordering::Acquire) >= limit diff --git a/cozo-core/src/runtime/db.rs b/cozo-core/src/runtime/db.rs index ea1ab94b..7f34c2d0 100644 --- a/cozo-core/src/runtime/db.rs +++ b/cozo-core/src/runtime/db.rs @@ -12,6 +12,7 @@ use std::default::Default; use std::fmt::{Debug, Formatter}; use std::iter; use std::path::Path; +#[allow(unused_imports)] use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; #[allow(unused_imports)] @@ -19,6 +20,7 @@ use std::thread; #[allow(unused_imports)] use std::time::{Duration, SystemTime, UNIX_EPOCH}; +#[cfg(not(target_arch = "wasm32"))] use crossbeam::channel::{unbounded, Sender}; use crossbeam::sync::ShardedLock; use either::{Either, Left, Right}; @@ -84,6 +86,7 @@ pub enum CallbackOp { Rm, } +#[cfg(not(target_arch = "wasm32"))] pub struct CallbackDeclaration { dependent: SmartString, callback: Box, @@ -101,12 +104,16 @@ pub struct Db { queries_count: Arc, running_queries: Arc>>, pub(crate) algorithms: Arc>>>, + #[cfg(not(target_arch = "wasm32"))] callback_count: Arc, + #[cfg(not(target_arch = "wasm32"))] callback_sender: Sender<(SmartString, CallbackOp, NamedRows, NamedRows)>, + #[cfg(not(target_arch = "wasm32"))] event_callbacks: Arc>, relation_locks: Arc, Arc>>>>, } +#[cfg(not(target_arch = "wasm32"))] type EventCallbackRegistry = ( BTreeMap, BTreeMap, BTreeSet>, @@ -161,6 +168,7 @@ impl<'s, S: Storage<'s>> Db { /// You must call [`initialize`](Self::initialize) immediately after creation. /// Due to lifetime restrictions we are not able to call that for you automatically. pub fn new(storage: S) -> Result { + #[cfg(not(target_arch = "wasm32"))] let (sender, receiver) = unbounded(); let ret = Self { db: storage, @@ -169,25 +177,31 @@ impl<'s, S: Storage<'s>> Db { queries_count: Arc::new(Default::default()), running_queries: Arc::new(Mutex::new(Default::default())), algorithms: DEFAULT_FIXED_RULES.clone(), + #[cfg(not(target_arch = "wasm32"))] callback_count: Arc::new(Default::default()), + #[cfg(not(target_arch = "wasm32"))] callback_sender: sender, // callback_receiver: Arc::new(receiver), + #[cfg(not(target_arch = "wasm32"))] event_callbacks: Arc::new(Default::default()), relation_locks: Arc::new(Default::default()), }; - let callbacks = ret.event_callbacks.clone(); - thread::spawn(move || { - while let Ok((table, op, new, old)) = receiver.recv() { - let (cbs, cb_dir) = &*callbacks.read().unwrap(); - if let Some(cb_ids) = cb_dir.get(&table) { - for cb_id in cb_ids { - if let Some(cb) = cbs.get(cb_id) { - (cb.callback)(op, new.clone(), old.clone()) + #[cfg(not(target_arch = "wasm32"))] + { + let callbacks = ret.event_callbacks.clone(); + thread::spawn(move || { + while let Ok((table, op, new, old)) = receiver.recv() { + let (cbs, cb_dir) = &*callbacks.read().unwrap(); + if let Some(cb_ids) = cb_dir.get(&table) { + for cb_id in cb_ids { + if let Some(cb) = cbs.get(cb_id) { + (cb.callback)(op, new.clone(), old.clone()) + } } } } - } - }); + }); + } Ok(ret) } @@ -502,8 +516,8 @@ impl<'s, S: Storage<'s>> Db { /// Register callbacks to run when changes to relations are committed. /// The returned ID can be used to unregister the callbacks. - #[allow(dead_code)] - pub(crate) fn register_callback(&self, callback: CB, dependent: &str) -> Result + #[cfg(not(target_arch = "wasm32"))] + pub fn register_callback(&self, callback: CB, dependent: &str) -> Result where CB: Fn(CallbackOp, NamedRows, NamedRows) + Send + Sync + 'static, { @@ -525,8 +539,8 @@ impl<'s, S: Storage<'s>> Db { } /// Unregister callbacks to run when changes to relations are committed. - #[allow(dead_code)] - pub(crate) fn unregister_callback(&self, id: u32) -> bool { + #[cfg(not(target_arch = "wasm32"))] + pub fn unregister_callback(&self, id: u32) -> bool { let mut guard = self.event_callbacks.write().unwrap(); let ret = guard.0.remove(&id); if let Some(cb) = &ret { @@ -655,15 +669,25 @@ impl<'s, S: Storage<'s>> Db { } Ok(q_res) } + fn current_callback_targets(&self) -> BTreeSet> { - self.event_callbacks - .read() - .unwrap() - .1 - .keys() - .cloned() - .collect() + #[cfg(not(target_arch = "wasm32"))] + { + self.event_callbacks + .read() + .unwrap() + .1 + .keys() + .cloned() + .collect() + } + + #[cfg(target_arch = "wasm32")] + { + Default::default() + } } + #[cfg(not(target_arch = "wasm32"))] fn send_callbacks(&'s self, collector: CallbackCollector) { for (k, vals) in collector { for (op, new, old) in vals { @@ -720,6 +744,7 @@ impl<'s, S: Storage<'s>> Db { assert!(cleanups.is_empty(), "non-empty cleanups on read-only tx"); } } + #[cfg(not(target_arch = "wasm32"))] if !callback_collector.is_empty() { self.send_callbacks(callback_collector) } @@ -783,6 +808,7 @@ impl<'s, S: Storage<'s>> Db { assert!(cleanups.is_empty(), "non-empty cleanups on read-only tx"); } } + #[cfg(not(target_arch = "wasm32"))] if !callback_collector.is_empty() { self.send_callbacks(callback_collector) }