tx callbacks

main
Ziyang Hu 2 years ago
parent 82dd566717
commit 5c7bff171c

@ -6,11 +6,12 @@
* You can obtain one at https://mozilla.org/MPL/2.0/.
*/
use std::collections::btree_map::Entry;
use std::collections::BTreeMap;
use std::default::Default;
use std::fmt::{Debug, Formatter};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Mutex, RwLock};
#[allow(unused_imports)]
use std::thread;
#[allow(unused_imports)]
@ -29,6 +30,7 @@ use crate::data::program::{InputProgram, QueryAssertion, RelationOp};
use crate::data::relation::ColumnDef;
use crate::data::tuple::{Tuple, TupleT};
use crate::data::value::{DataValue, LARGEST_UTF_CHAR};
use crate::fixed_rule::DEFAULT_FIXED_RULES;
use crate::parse::sys::SysOp;
use crate::parse::{parse_script, CozoScript, SourceSpan};
use crate::query::compile::{CompiledProgram, CompiledRule, CompiledRuleSet};
@ -39,7 +41,6 @@ use crate::runtime::relation::{AccessLevel, InsufficientAccessLevel, RelationHan
use crate::runtime::transact::SessionTx;
use crate::storage::{Storage, StoreTx};
use crate::{decode_tuple_from_kv, FixedRule};
use crate::fixed_rule::DEFAULT_FIXED_RULES;
struct RunningQueryHandle {
started_at: f64,
@ -65,6 +66,15 @@ pub struct DbManifest {
pub storage_version: u64,
}
#[allow(dead_code)]
#[derive(Copy, Clone, Debug)]
pub enum CallbackOp {
Put,
Rm,
}
pub type TxCallback = Box<dyn FnMut(CallbackOp, Tuple) + Send + Sync>;
/// The database object of Cozo.
#[derive(Clone)]
pub struct Db<S> {
@ -73,6 +83,8 @@ pub struct Db<S> {
queries_count: Arc<AtomicU64>,
running_queries: Arc<Mutex<BTreeMap<u64, RunningQueryHandle>>>,
pub(crate) algorithms: Arc<BTreeMap<String, Arc<Box<dyn FixedRule>>>>,
callback_count: Arc<AtomicU64>,
event_callbacks: Arc<RwLock<BTreeMap<String, BTreeMap<u64, TxCallback>>>>,
}
impl<S> Debug for Db<S> {
@ -119,6 +131,8 @@ impl<'s, S: Storage<'s>> Db<S> {
queries_count: Arc::new(Default::default()),
running_queries: Arc::new(Mutex::new(Default::default())),
algorithms: DEFAULT_FIXED_RULES.clone(),
callback_count: Arc::new(Default::default()),
event_callbacks: Arc::new(Default::default()),
};
Ok(ret)
}
@ -395,6 +409,49 @@ impl<'s, S: Storage<'s>> Db<S> {
dst_tx.commit_tx()
}
}
/// Register a custom fixed rule implementation
pub fn register_fixed_rule(
&mut self,
name: String,
rule_impl: Box<dyn FixedRule>,
) -> Result<()> {
let inner = Arc::make_mut(&mut self.algorithms);
match inner.entry(name) {
Entry::Vacant(ent) => {
ent.insert(Arc::new(rule_impl));
Ok(())
}
Entry::Occupied(ent) => {
bail!("A fixed rule with the name {} is already loaded", ent.key())
}
}
}
/// Register callbacks to run when changes to relations are committed.
/// The returned ID can be used to unregister the callbacks.
/// It is OK to register callbacks for relations that do not exist (yet).
/// TODO: not yet implemented
#[allow(dead_code)]
pub(crate) fn register_callback(&self, relation: String, cb: TxCallback) -> u64 {
let id = self.callback_count.fetch_add(1, Ordering::AcqRel);
let mut guard = self.event_callbacks.write().unwrap();
let entries = guard.entry(relation).or_default();
entries.insert(id, cb);
id
}
/// Unregister callbacks to run when changes to relations are committed.
#[allow(dead_code)]
pub(crate) fn unregister_callback(&self, relation: String, id: u64) -> bool {
let mut guard = self.event_callbacks.write().unwrap();
match guard.entry(relation) {
Entry::Vacant(_) => false,
Entry::Occupied(mut ent) => {
let entries = ent.get_mut();
entries.remove(&id).is_some()
}
}
}
fn compact_relation(&'s self) -> Result<()> {
let l = Tuple::default().encode_as_key(RelationId(0));

Loading…
Cancel
Save