disable callbacks for WASM

main
Ziyang Hu 2 years ago
parent a6a9419a35
commit e7fd785584

@ -64,6 +64,7 @@ pub use storage::tikv::{new_cozo_tikv, TiKvStorage};
pub use storage::{Storage, StoreTx};
use crate::data::json::JsonValue;
#[cfg(not(target_arch = "wasm32"))]
use crate::runtime::db::CallbackOp;
pub(crate) mod data;
@ -367,6 +368,7 @@ impl DbInstance {
self.import_from_backup(&json_payload.path, &json_payload.relations)
}
#[cfg(not(target_arch = "wasm32"))]
pub fn register_callback<CB>(&self, callback: CB, dependent: &str) -> Result<u32>
where
CB: Fn(CallbackOp, NamedRows, NamedRows) + Send + Sync + 'static {

@ -43,6 +43,7 @@ impl QueryLimiter {
false
}
}
#[allow(dead_code)]
pub(crate) fn is_stopped(&self) -> bool {
if let Some(limit) = self.total {
self.counter.load(Ordering::Acquire) >= limit

@ -12,6 +12,7 @@ use std::default::Default;
use std::fmt::{Debug, Formatter};
use std::iter;
use std::path::Path;
#[allow(unused_imports)]
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
#[allow(unused_imports)]
@ -19,6 +20,7 @@ use std::thread;
#[allow(unused_imports)]
use std::time::{Duration, SystemTime, UNIX_EPOCH};
#[cfg(not(target_arch = "wasm32"))]
use crossbeam::channel::{unbounded, Sender};
use crossbeam::sync::ShardedLock;
use either::{Either, Left, Right};
@ -84,6 +86,7 @@ pub enum CallbackOp {
Rm,
}
#[cfg(not(target_arch = "wasm32"))]
pub struct CallbackDeclaration {
dependent: SmartString<LazyCompact>,
callback: Box<dyn Fn(CallbackOp, NamedRows, NamedRows) + Send + Sync>,
@ -101,12 +104,16 @@ pub struct Db<S> {
queries_count: Arc<AtomicU64>,
running_queries: Arc<Mutex<BTreeMap<u64, RunningQueryHandle>>>,
pub(crate) algorithms: Arc<BTreeMap<String, Arc<Box<dyn FixedRule>>>>,
#[cfg(not(target_arch = "wasm32"))]
callback_count: Arc<AtomicU32>,
#[cfg(not(target_arch = "wasm32"))]
callback_sender: Sender<(SmartString<LazyCompact>, CallbackOp, NamedRows, NamedRows)>,
#[cfg(not(target_arch = "wasm32"))]
event_callbacks: Arc<ShardedLock<EventCallbackRegistry>>,
relation_locks: Arc<ShardedLock<BTreeMap<SmartString<LazyCompact>, Arc<ShardedLock<()>>>>>,
}
#[cfg(not(target_arch = "wasm32"))]
type EventCallbackRegistry = (
BTreeMap<u32, CallbackDeclaration>,
BTreeMap<SmartString<LazyCompact>, BTreeSet<u32>>,
@ -161,6 +168,7 @@ impl<'s, S: Storage<'s>> Db<S> {
/// You must call [`initialize`](Self::initialize) immediately after creation.
/// Due to lifetime restrictions we are not able to call that for you automatically.
pub fn new(storage: S) -> Result<Self> {
#[cfg(not(target_arch = "wasm32"))]
let (sender, receiver) = unbounded();
let ret = Self {
db: storage,
@ -169,25 +177,31 @@ impl<'s, S: Storage<'s>> Db<S> {
queries_count: Arc::new(Default::default()),
running_queries: Arc::new(Mutex::new(Default::default())),
algorithms: DEFAULT_FIXED_RULES.clone(),
#[cfg(not(target_arch = "wasm32"))]
callback_count: Arc::new(Default::default()),
#[cfg(not(target_arch = "wasm32"))]
callback_sender: sender,
// callback_receiver: Arc::new(receiver),
#[cfg(not(target_arch = "wasm32"))]
event_callbacks: Arc::new(Default::default()),
relation_locks: Arc::new(Default::default()),
};
let callbacks = ret.event_callbacks.clone();
thread::spawn(move || {
while let Ok((table, op, new, old)) = receiver.recv() {
let (cbs, cb_dir) = &*callbacks.read().unwrap();
if let Some(cb_ids) = cb_dir.get(&table) {
for cb_id in cb_ids {
if let Some(cb) = cbs.get(cb_id) {
(cb.callback)(op, new.clone(), old.clone())
#[cfg(not(target_arch = "wasm32"))]
{
let callbacks = ret.event_callbacks.clone();
thread::spawn(move || {
while let Ok((table, op, new, old)) = receiver.recv() {
let (cbs, cb_dir) = &*callbacks.read().unwrap();
if let Some(cb_ids) = cb_dir.get(&table) {
for cb_id in cb_ids {
if let Some(cb) = cbs.get(cb_id) {
(cb.callback)(op, new.clone(), old.clone())
}
}
}
}
}
});
});
}
Ok(ret)
}
@ -502,8 +516,8 @@ impl<'s, S: Storage<'s>> Db<S> {
/// Register callbacks to run when changes to relations are committed.
/// The returned ID can be used to unregister the callbacks.
#[allow(dead_code)]
pub(crate) fn register_callback<CB>(&self, callback: CB, dependent: &str) -> Result<u32>
#[cfg(not(target_arch = "wasm32"))]
pub fn register_callback<CB>(&self, callback: CB, dependent: &str) -> Result<u32>
where
CB: Fn(CallbackOp, NamedRows, NamedRows) + Send + Sync + 'static,
{
@ -525,8 +539,8 @@ impl<'s, S: Storage<'s>> Db<S> {
}
/// Unregister callbacks to run when changes to relations are committed.
#[allow(dead_code)]
pub(crate) fn unregister_callback(&self, id: u32) -> bool {
#[cfg(not(target_arch = "wasm32"))]
pub fn unregister_callback(&self, id: u32) -> bool {
let mut guard = self.event_callbacks.write().unwrap();
let ret = guard.0.remove(&id);
if let Some(cb) = &ret {
@ -655,15 +669,25 @@ impl<'s, S: Storage<'s>> Db<S> {
}
Ok(q_res)
}
fn current_callback_targets(&self) -> BTreeSet<SmartString<LazyCompact>> {
self.event_callbacks
.read()
.unwrap()
.1
.keys()
.cloned()
.collect()
#[cfg(not(target_arch = "wasm32"))]
{
self.event_callbacks
.read()
.unwrap()
.1
.keys()
.cloned()
.collect()
}
#[cfg(target_arch = "wasm32")]
{
Default::default()
}
}
#[cfg(not(target_arch = "wasm32"))]
fn send_callbacks(&'s self, collector: CallbackCollector) {
for (k, vals) in collector {
for (op, new, old) in vals {
@ -720,6 +744,7 @@ impl<'s, S: Storage<'s>> Db<S> {
assert!(cleanups.is_empty(), "non-empty cleanups on read-only tx");
}
}
#[cfg(not(target_arch = "wasm32"))]
if !callback_collector.is_empty() {
self.send_callbacks(callback_collector)
}
@ -783,6 +808,7 @@ impl<'s, S: Storage<'s>> Db<S> {
assert!(cleanups.is_empty(), "non-empty cleanups on read-only tx");
}
}
#[cfg(not(target_arch = "wasm32"))]
if !callback_collector.is_empty() {
self.send_callbacks(callback_collector)
}

Loading…
Cancel
Save