fix callbacks not running

main
Ziyang Hu 2 years ago
parent 798facf94d
commit 172280a3a7

@ -64,6 +64,7 @@ pub use storage::tikv::{new_cozo_tikv, TiKvStorage};
pub use storage::{Storage, StoreTx};
use crate::data::json::JsonValue;
use crate::runtime::db::CallbackOp;
pub(crate) mod data;
pub(crate) mod fixed_rule;
@ -366,6 +367,21 @@ impl DbInstance {
self.import_from_backup(&json_payload.path, &json_payload.relations)
}
pub fn register_callback<CB>(&self, callback: CB, dependent: &str) -> Result<u32>
where
CB: Fn(CallbackOp, NamedRows, NamedRows) + Send + Sync + 'static {
match self {
DbInstance::Mem(db) => db.register_callback(callback, dependent),
#[cfg(feature = "storage-sqlite")]
DbInstance::Sqlite(db) => db.register_callback(callback, dependent),
#[cfg(feature = "storage-rocksdb")]
DbInstance::RocksDb(db) => db.register_callback(callback, dependent),
#[cfg(feature = "storage-sled")]
DbInstance::Sled(db) => db.register_callback(callback, dependent),
#[cfg(feature = "storage-tikv")]
DbInstance::TiKv(db) => db.register_callback(callback, dependent),
}
}
}
/// Convert error raised by the database into friendly JSON format

@ -444,7 +444,7 @@ impl<'a> SessionTx<'a> {
.map(|k| k.name.to_string())
.collect_vec();
target_collector.push((
CallbackOp::Rm,
CallbackOp::Put,
NamedRows {
headers: headers.clone(),
rows: new_tuples

@ -647,9 +647,9 @@ impl<'s, S: Storage<'s>> Db<S> {
param_pool: &BTreeMap<String, DataValue>,
cur_vld: ValidityTs,
) -> Result<NamedRows> {
let mut callback_collector = BTreeMap::new();
match parse_script(payload, param_pool, &self.algorithms, cur_vld)? {
CozoScript::Single(p) => {
let mut callback_collector = BTreeMap::new();
let is_write = p.needs_write_tx();
let callback_targets = if is_write {
self.current_callback_targets()
@ -681,6 +681,9 @@ impl<'s, S: Storage<'s>> Db<S> {
assert!(cleanups.is_empty(), "non-empty cleanups on read-only tx");
}
}
if !callback_collector.is_empty() {
self.send_callbacks(callback_collector)
}
for (lower, upper) in cleanups {
self.db.del_range(&lower, &upper)?;
@ -688,6 +691,7 @@ impl<'s, S: Storage<'s>> Db<S> {
Ok(res)
}
CozoScript::Imperative(ps) => {
let mut callback_collector = BTreeMap::new();
let is_write = ps.iter().any(|p| p.needs_write_tx());
let callback_targets = if is_write {
self.current_callback_targets()

@ -30,6 +30,13 @@ lazy_static! {
let db = DbInstance::new(&db_kind, path, Default::default()).unwrap();
dbg!(creation.elapsed());
db.register_callback(
|op, new, old| {
println!("callback for {op:?}, new: {}, old: {}", new.rows.len(), old.rows.len());
},
"airport",
).unwrap();
let init = Instant::now();
db.run_script(r##"
res[idx, label, typ, code, icao, desc, region, runways, longest, elev, country, city, lat, lon] <~

Loading…
Cancel
Save