diff --git a/Cargo.lock b/Cargo.lock index 5853fb3b..260da48c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -607,7 +607,9 @@ name = "cozo-node" version = "0.5.0" dependencies = [ "cozo", + "crossbeam", "lazy_static", + "miette", "neon", ] diff --git a/cozo-lib-nodejs/Cargo.toml b/cozo-lib-nodejs/Cargo.toml index afb789ce..017593a4 100644 --- a/cozo-lib-nodejs/Cargo.toml +++ b/cozo-lib-nodejs/Cargo.toml @@ -42,6 +42,8 @@ io-uring = ["cozo/io-uring"] [dependencies] cozo = { version = "0.5.0", path = "../cozo-core", default-features = false } lazy_static = "1.4.0" +crossbeam = "0.8.2" +miette = "5.5.0" [dependencies.neon] version = "0.10" diff --git a/cozo-lib-nodejs/build_linux.sh b/cozo-lib-nodejs/build_linux.sh deleted file mode 100755 index 30f7183d..00000000 --- a/cozo-lib-nodejs/build_linux.sh +++ /dev/null @@ -1,7 +0,0 @@ -#!/usr/bin/env bash - -rm -fr native -mkdir -p native/6 -cross build --target=x86_64-unknown-linux-gnu --release -mv target/x86_64-unknown-linux-gnu/release/libcozo_node.so native/6/index.node -npm run package diff --git a/cozo-lib-nodejs/build_mac.sh b/cozo-lib-nodejs/build_mac.sh deleted file mode 100755 index 63a0b4c9..00000000 --- a/cozo-lib-nodejs/build_mac.sh +++ /dev/null @@ -1,7 +0,0 @@ -#!/usr/bin/env bash - -rm -fr native -mkdir -p native/6 -cargo build --release -mv target/release/libcozo_node.dylib native/6/index.node -yarn package diff --git a/cozo-lib-nodejs/build_win.ps1 b/cozo-lib-nodejs/build_win.ps1 deleted file mode 100644 index 223109dd..00000000 --- a/cozo-lib-nodejs/build_win.ps1 +++ /dev/null @@ -1,9 +0,0 @@ -$ErrorActionPreference="Stop" - -if (test-path native/6/index.node) { - Remove-Item native/6/index.node -} - -cargo build --release -cp target/release/cozo_node.dll native/6/index.node -yarn package diff --git a/cozo-lib-nodejs/example.js b/cozo-lib-nodejs/example.js index d12d1511..e021117c 100644 --- a/cozo-lib-nodejs/example.js +++ b/cozo-lib-nodejs/example.js @@ -6,14 +6,47 @@ * You can obtain one at https://mozilla.org/MPL/2.0/. */ +const {Buffer} = require('node:buffer') const {CozoDb} = require('.'); (async () => { const db = new CozoDb() try { - const result = await db.run('?[a] <- [["hello"], ["world"]]'); + const result = await db.run('?[a] <- [["hello"], ["world"], [$b]]', {b: Buffer.alloc(8, 255)}); console.log(result.rows) } catch (e) { console.error(e) } -})() \ No newline at end of file + const cb_id = db.register_callback('test', (op, new_rows, old_rows) => { + console.log(`${op} ${JSON.stringify(new_rows)} ${JSON.stringify(old_rows)}`) + }) + + await db.run(`?[a] <- [[1],[2],[3]] :create test {a}`); + + db.register_named_rule('Pipipy', 1, async (inputs, options) => { + console.log(`rule inputs: ${JSON.stringify(inputs)} ${JSON.stringify(options)}`) + await sleep(1000); + return inputs[0].map((row) => [row[0] * options.mul]) + }) + + try { + let r = await db.run(` + rel[] <- [[1],[2]] + + ?[a] <~ Pipipy(rel[], mul: 3) + `); + console.log(r); + } catch (e) { + console.error(e.display); + } + db.unregister_callback(cb_id) + db.unregister_named_rule('Pipipy') +})() + +function sleep(ms) { + return new Promise((resolve, reject) => { + setTimeout(() => { + resolve() + }, ms); + }) +} \ No newline at end of file diff --git a/cozo-lib-nodejs/index.js b/cozo-lib-nodejs/index.js index 9feb8a89..4eb97d77 100644 --- a/cozo-lib-nodejs/index.js +++ b/cozo-lib-nodejs/index.js @@ -22,13 +22,12 @@ class CozoDb { run(script, params) { return new Promise((resolve, reject) => { - const params_str = JSON.stringify(params || {}) - native.query_db(this.db_id, script, params_str, (result_str) => { - const result = JSON.parse(result_str); - if (result.ok) { - resolve(result) + params = params || {}; + native.query_db(this.db_id, script, params, (err, result) => { + if (err) { + reject(JSON.parse(err)) } else { - reject(result) + resolve(result) } }) }) @@ -100,6 +99,36 @@ class CozoDb { }) }) } + + register_callback(relation, cb, capacity = -1) { + return native.register_callback(this.db_id, relation, cb, capacity) + } + + unregister_callback(cb_id) { + return native.unregister_callback(this.db_id, cb_id) + } + + register_named_rule(name, arity, cb) { + return native.register_named_rule(this.db_id, name, arity, async (ret_id, inputs, options) => { + let ret = undefined; + try { + ret = await cb(inputs, options); + } catch (e) { + console.error(e); + native.respond_to_named_rule_invocation(ret_id, '' + e); + return; + } + try { + native.respond_to_named_rule_invocation(ret_id, ret); + } catch (e) { + console.error(e); + } + }) + } + + unregister_named_rule(name) { + return native.unregister_named_rule(this.db_id, name) + } } module.exports = {CozoDb: CozoDb} diff --git a/cozo-lib-nodejs/src/lib.rs b/cozo-lib-nodejs/src/lib.rs index ddc2e81d..b2ed97fb 100644 --- a/cozo-lib-nodejs/src/lib.rs +++ b/cozo-lib-nodejs/src/lib.rs @@ -8,16 +8,185 @@ use std::collections::BTreeMap; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::{Arc, Mutex}; +use std::thread; +use crossbeam::channel::Sender; use lazy_static::lazy_static; +use miette::{miette, Result}; use neon::prelude::*; +use neon::types::buffer::TypedArray; use cozo::*; +fn rows2js<'a>(cx: &mut impl Context<'a>, rows: &[Vec]) -> JsResult<'a, JsArray> { + let coll = cx.empty_array(); + for (j, row) in rows.iter().enumerate() { + let cur = cx.empty_array(); + for (i, el) in row.iter().enumerate() { + let el = value2js(cx, el)?; + cur.set(cx, i as u32, el)?; + } + coll.set(cx, j as u32, cur)?; + } + Ok(coll) +} + +fn named_rows2js<'a>(cx: &mut impl Context<'a>, nr: &NamedRows) -> JsResult<'a, JsObject> { + let ret = cx.empty_object(); + if let Some(rows) = &nr.next { + let converted = named_rows2js(cx, rows)?; + ret.set(cx, "next", converted)?; + }; + let headers = cx.empty_array(); + for (i, header) in nr.headers.iter().enumerate() { + let converted = cx.string(header); + headers.set(cx, i as u32, converted)?; + } + ret.set(cx, "headers", headers)?; + let rows = rows2js(cx, &nr.rows)?; + ret.set(cx, "rows", rows)?; + Ok(ret) +} + +fn js2value<'a>( + cx: &mut impl Context<'a>, + val: Handle<'a, JsValue>, + coll: &mut DataValue, +) -> JsResult<'a, JsUndefined> { + if let Ok(_) = val.downcast::(cx) { + *coll = DataValue::Null; + } else if let Ok(n) = val.downcast::(cx) { + let n = n.value(cx); + *coll = DataValue::from(n); + } else if let Ok(b) = val.downcast::(cx) { + let b = b.value(cx); + *coll = DataValue::from(b); + } else if let Ok(_) = val.downcast::(cx) { + *coll = DataValue::Null; + } else if let Ok(s) = val.downcast::(cx) { + let s = s.value(cx); + *coll = DataValue::Str(s.into()); + } else if let Ok(l) = val.downcast::(cx) { + let n = l.len(cx); + let mut ret = Vec::with_capacity(n as usize); + for i in 0..n { + let v: Handle = l.get(cx, i)?; + let mut target = DataValue::Bot; + js2value(cx, v, &mut target)?; + ret.push(target); + } + *coll = DataValue::List(ret); + } else if let Ok(b) = val.downcast::(cx) { + let d = b.as_slice(cx); + *coll = DataValue::Bytes(d.to_vec()); + } else { + let err = cx.string("Javascript value cannot be converted."); + return cx.throw(err); + } + Ok(cx.undefined()) +} + +fn value2js<'a>(cx: &mut impl Context<'a>, val: &DataValue) -> JsResult<'a, JsValue> { + Ok(match val { + DataValue::Null => cx.null().as_value(cx), + DataValue::Bool(b) => cx.boolean(*b).as_value(cx), + DataValue::Num(n) => match n { + Num::Int(i) => cx.number(*i as f64).as_value(cx), + Num::Float(f) => cx.number(*f).as_value(cx), + }, + DataValue::Str(s) => cx.string(s).as_value(cx), + DataValue::Bytes(b) => { + let b = b.clone(); + JsBuffer::external(cx, b).as_value(cx) + } + DataValue::Uuid(uuid) => cx.string(uuid.0.to_string()).as_value(cx), + DataValue::Regex(rx) => cx.string(rx.0.to_string()).as_value(cx), + DataValue::List(l) => { + let target_l = cx.empty_array(); + for (i, el) in l.iter().enumerate() { + let el = value2js(cx, el)?; + target_l.set(cx, i as u32, el)?; + } + target_l.as_value(cx) + } + DataValue::Set(l) => { + let target_l = cx.empty_array(); + for (i, el) in l.iter().enumerate() { + let el = value2js(cx, el)?; + target_l.set(cx, i as u32, el)?; + } + target_l.as_value(cx) + } + DataValue::Validity(vld) => { + let target_l = cx.empty_array(); + let ts = cx.number(vld.timestamp.0 .0 as f64); + target_l.set(cx, 0, ts)?; + let a = cx.boolean(vld.is_assert.0); + target_l.set(cx, 1, a)?; + target_l.as_value(cx) + } + DataValue::Bot => cx.undefined().as_value(cx), + }) +} + +fn js2params<'a>( + cx: &mut impl Context<'a>, + js_params: Handle<'a, JsObject>, + collector: &mut BTreeMap, +) -> JsResult<'a, JsUndefined> { + let keys = js_params.get_own_property_names(cx)?; + let n_keys = keys.len(cx); + for i in 0..n_keys { + let key: Handle = keys.get(cx, i)?; + let key_str = key.value(cx); + let val: Handle = js_params.get(cx, key)?; + let mut value = DataValue::Bot; + js2value(cx, val, &mut value)?; + collector.insert(key_str, value); + } + Ok(cx.undefined()) +} + +fn js2rows<'a>( + cx: &mut impl Context<'a>, + rows: Handle<'a, JsArray>, + collector: &mut Vec>, +) -> JsResult<'a, JsUndefined> { + let n_rows = rows.len(cx); + collector.reserve(n_rows as usize); + for i in 0..n_rows { + let row = rows.get::(cx, i)?; + let n_cols = row.len(cx); + let mut ret_row = Vec::with_capacity(n_cols as usize); + for j in 0..n_cols { + let col = row.get::(cx, j)?; + let mut val = DataValue::Bot; + js2value(cx, col, &mut val)?; + ret_row.push(val); + } + collector.push(ret_row); + } + Ok(cx.undefined()) +} + +fn params2js<'a>( + cx: &mut impl Context<'a>, + params: &BTreeMap, +) -> JsResult<'a, JsObject> { + let obj = cx.empty_object(); + for (k, v) in params { + let val = value2js(cx, v)?; + obj.set(cx, k as &str, val)?; + } + Ok(obj) +} + #[derive(Default)] struct Handles { current: AtomicU32, dbs: Mutex>>, + cb_idx: AtomicU32, + current_cbs: Mutex>>>, } lazy_static! { @@ -51,37 +220,54 @@ fn close_db(mut cx: FunctionContext) -> JsResult { Ok(cx.boolean(db.is_some())) } -fn query_db(mut cx: FunctionContext) -> JsResult { - let id = cx.argument::(0)?.value(&mut cx) as u32; - let db = { - let db_ref = { - let dbs = HANDLES.dbs.lock().unwrap(); - dbs.get(&id).cloned() - }; - match db_ref { - None => { - let s = cx.string("database already closed"); - cx.throw(s)? +macro_rules! get_db { + ($cx:expr) => {{ + let id = $cx.argument::(0)?.value(&mut $cx) as u32; + let db = { + let db_ref = { + let dbs = HANDLES.dbs.lock().unwrap(); + dbs.get(&id).cloned() + }; + match db_ref { + None => { + let s = $cx.string("database already closed"); + $cx.throw(s)? + } + Some(db) => db, } - Some(db) => db, - } - }; + }; + db + }}; +} +fn query_db(mut cx: FunctionContext) -> JsResult { + let db = get_db!(cx); let query = cx.argument::(1)?.value(&mut cx); - let params = cx.argument::(2)?.value(&mut cx); + let params_js = cx.argument::(2)?; + let mut params = BTreeMap::new(); + js2params(&mut cx, params_js, &mut params)?; let callback = cx.argument::(3)?.root(&mut cx); let channel = cx.channel(); - std::thread::spawn(move || { - let result = db.run_script_str(&query, ¶ms); + thread::spawn(move || { + let result = db.run_script(&query, params); channel.send(move |mut cx| { let callback = callback.into_inner(&mut cx); let this = cx.undefined(); - let json_str = cx.string(result); - callback.call(&mut cx, this, vec![json_str.upcast()])?; - + match result { + Ok(nr) => { + let js_vals = named_rows2js(&mut cx, &nr)?.as_value(&mut cx); + let err = cx.undefined().as_value(&mut cx); + callback.call(&mut cx, this, vec![err, js_vals])?; + } + Err(err) => { + let reports = format_error_as_json(err, Some(&query)).to_string(); + let err = cx.string(&reports).as_value(&mut cx); + callback.call(&mut cx, this, vec![err])?; + } + } Ok(()) }); }); @@ -90,28 +276,12 @@ fn query_db(mut cx: FunctionContext) -> JsResult { } fn backup_db(mut cx: FunctionContext) -> JsResult { - let id = cx.argument::(0)?.value(&mut cx) as u32; - let db = { - let db_ref = { - let dbs = HANDLES.dbs.lock().unwrap(); - dbs.get(&id).cloned() - }; - match db_ref { - None => { - let s = cx.string("database already closed"); - cx.throw(s)? - } - Some(db) => db, - } - }; - + let db = get_db!(cx); let path = cx.argument::(1)?.value(&mut cx); - let callback = cx.argument::(2)?.root(&mut cx); - let channel = cx.channel(); - std::thread::spawn(move || { + thread::spawn(move || { let result = db.backup_db_str(&path); channel.send(move |mut cx| { let callback = callback.into_inner(&mut cx); @@ -127,28 +297,12 @@ fn backup_db(mut cx: FunctionContext) -> JsResult { } fn restore_db(mut cx: FunctionContext) -> JsResult { - let id = cx.argument::(0)?.value(&mut cx) as u32; - let db = { - let db_ref = { - let dbs = HANDLES.dbs.lock().unwrap(); - dbs.get(&id).cloned() - }; - match db_ref { - None => { - let s = cx.string("database already closed"); - cx.throw(s)? - } - Some(db) => db, - } - }; - + let db = get_db!(cx); let path = cx.argument::(1)?.value(&mut cx); - let callback = cx.argument::(2)?.root(&mut cx); - let channel = cx.channel(); - std::thread::spawn(move || { + thread::spawn(move || { let result = db.restore_backup_str(&path); channel.send(move |mut cx| { let callback = callback.into_inner(&mut cx); @@ -164,28 +318,12 @@ fn restore_db(mut cx: FunctionContext) -> JsResult { } fn export_relations(mut cx: FunctionContext) -> JsResult { - let id = cx.argument::(0)?.value(&mut cx) as u32; - let db = { - let db_ref = { - let dbs = HANDLES.dbs.lock().unwrap(); - dbs.get(&id).cloned() - }; - match db_ref { - None => { - let s = cx.string("database already closed"); - cx.throw(s)? - } - Some(db) => db, - } - }; - + let db = get_db!(cx); let rels = cx.argument::(1)?.value(&mut cx); - let callback = cx.argument::(2)?.root(&mut cx); - let channel = cx.channel(); - std::thread::spawn(move || { + thread::spawn(move || { let result = db.export_relations_str(&rels); channel.send(move |mut cx| { let callback = callback.into_inner(&mut cx); @@ -201,28 +339,12 @@ fn export_relations(mut cx: FunctionContext) -> JsResult { } fn import_relations(mut cx: FunctionContext) -> JsResult { - let id = cx.argument::(0)?.value(&mut cx) as u32; - let db = { - let db_ref = { - let dbs = HANDLES.dbs.lock().unwrap(); - dbs.get(&id).cloned() - }; - match db_ref { - None => { - let s = cx.string("database already closed"); - cx.throw(s)? - } - Some(db) => db, - } - }; - + let db = get_db!(cx); let data = cx.argument::(1)?.value(&mut cx); - let callback = cx.argument::(2)?.root(&mut cx); - let channel = cx.channel(); - std::thread::spawn(move || { + thread::spawn(move || { let result = db.import_relations_str(&data); channel.send(move |mut cx| { let callback = callback.into_inner(&mut cx); @@ -238,28 +360,12 @@ fn import_relations(mut cx: FunctionContext) -> JsResult { } fn import_from_backup(mut cx: FunctionContext) -> JsResult { - let id = cx.argument::(0)?.value(&mut cx) as u32; - let db = { - let db_ref = { - let dbs = HANDLES.dbs.lock().unwrap(); - dbs.get(&id).cloned() - }; - match db_ref { - None => { - let s = cx.string("database already closed"); - cx.throw(s)? - } - Some(db) => db, - } - }; - + let db = get_db!(cx); let data = cx.argument::(1)?.value(&mut cx); - let callback = cx.argument::(2)?.root(&mut cx); - let channel = cx.channel(); - std::thread::spawn(move || { + thread::spawn(move || { let result = db.import_from_backup_str(&data); channel.send(move |mut cx| { let callback = callback.into_inner(&mut cx); @@ -274,6 +380,131 @@ fn import_from_backup(mut cx: FunctionContext) -> JsResult { Ok(cx.undefined()) } +fn register_callback(mut cx: FunctionContext) -> JsResult { + let db = get_db!(cx); + let name = cx.argument::(1)?.value(&mut cx); + let capacity = cx.argument::(3)?.value(&mut cx); + let capacity = if capacity < 0. { + None + } else { + Some(capacity as usize) + }; + let callback = Arc::new(cx.argument::(2)?.root(&mut cx)); + let channel = cx.channel(); + + let (rid, recv) = db.register_callback(&name, capacity); + thread::spawn(move || { + for (op, new, old) in recv { + let cb = callback.clone(); + channel.send(move |mut cx| { + let callback = cb.to_inner(&mut cx); + let op = cx.string(op.as_str()).as_value(&mut cx); + let new = rows2js(&mut cx, &new.rows)?.as_value(&mut cx); + let old = rows2js(&mut cx, &old.rows)?.as_value(&mut cx); + let this = cx.undefined(); + + callback.call(&mut cx, this, vec![op, new, old])?; + Ok(()) + }); + } + }); + Ok(cx.number(rid)) +} + +fn unregister_callback(mut cx: FunctionContext) -> JsResult { + let db = get_db!(cx); + let id = cx.argument::(1)?.value(&mut cx) as u32; + let removed = db.unregister_callback(id); + Ok(cx.boolean(removed)) +} + +fn register_named_rule(mut cx: FunctionContext) -> JsResult { + let db = get_db!(cx); + let name = cx.argument::(1)?.value(&mut cx); + let arity = cx.argument::(2)?.value(&mut cx) as usize; + let callback = Arc::new(cx.argument::(3)?.root(&mut cx)); + let channel = cx.channel(); + let (rule_impl, recv) = SimpleFixedRule::rule_with_channel(arity); + if let Err(err) = db.register_fixed_rule(name, rule_impl) { + let msg = cx.string(err.to_string()); + return cx.throw(msg); + } + thread::spawn(move || { + for (inputs, options, sender) in recv { + let id = HANDLES.cb_idx.fetch_add(1, Ordering::AcqRel); + { + HANDLES.current_cbs.lock().unwrap().insert(id, sender); + } + let cb = callback.clone(); + channel.send(move |mut cx| { + let callback = cb.to_inner(&mut cx); + let inputs_js = cx.empty_array(); + for (i, input) in inputs.into_iter().enumerate() { + let input_js = rows2js(&mut cx, &input.rows)?; + inputs_js.set(&mut cx, i as u32, input_js)?; + } + let inputs_js = inputs_js.as_value(&mut cx); + let options_js = params2js(&mut cx, &options)?.as_value(&mut cx); + let this = cx.undefined(); + let ret_id = cx.number(id).as_value(&mut cx); + callback.call(&mut cx, this, vec![ret_id, inputs_js, options_js])?; + + Ok(()) + }); + } + }); + + Ok(cx.undefined()) +} + +fn respond_to_named_rule_invocation(mut cx: FunctionContext) -> JsResult { + let ret_id = cx.argument::(0)?.value(&mut cx) as u32; + let sender = { + match HANDLES.current_cbs.lock().unwrap().remove(&ret_id) { + None => { + let msg = cx.string("fixed rule invocation sender should only be used once"); + return cx.throw(msg); + } + Some(s) => s, + } + }; + + let send_err = |err| { + let _ = sender.send(Err(miette!("Javascript fixed rule failed"))); + err + }; + + let payload = cx.argument::(1)?; + if let Ok(msg) = payload.downcast::(&mut cx) { + let _ = sender.send(Err(miette!(msg.value(&mut cx)))); + return Ok(cx.undefined()) + } + + let data = payload.downcast_or_throw(&mut cx).map_err(send_err)?; + let mut rows = vec![]; + js2rows(&mut cx, data, &mut rows).map_err(send_err)?; + let nr = NamedRows::new(vec![], rows); + if let Err(err) = sender.send(Ok(nr)) { + let msg = err.to_string(); + let msg = cx.string(msg); + return cx.throw(msg); + } + Ok(cx.undefined()) +} + +fn unregister_named_rule(mut cx: FunctionContext) -> JsResult { + let db = get_db!(cx); + let name = cx.argument::(1)?.value(&mut cx); + let removed = match db.unregister_fixed_rule(&name) { + Ok(b) => b, + Err(msg) => { + let msg = cx.string(msg.to_string()); + return cx.throw(msg); + } + }; + Ok(cx.boolean(removed)) +} + #[neon::main] fn main(mut cx: ModuleContext) -> NeonResult<()> { cx.export_function("open_db", open_db)?; @@ -284,5 +515,13 @@ fn main(mut cx: ModuleContext) -> NeonResult<()> { cx.export_function("export_relations", export_relations)?; cx.export_function("import_relations", import_relations)?; cx.export_function("import_from_backup", import_from_backup)?; + cx.export_function("register_callback", register_callback)?; + cx.export_function("unregister_callback", unregister_callback)?; + cx.export_function("register_named_rule", register_named_rule)?; + cx.export_function( + "respond_to_named_rule_invocation", + respond_to_named_rule_invocation, + )?; + cx.export_function("unregister_named_rule", unregister_named_rule)?; Ok(()) } diff --git a/cozo-lib-nodejs/test_build.sh b/cozo-lib-nodejs/test_build.sh new file mode 100755 index 00000000..3236bb8d --- /dev/null +++ b/cozo-lib-nodejs/test_build.sh @@ -0,0 +1,10 @@ +# +# Copyright 2022, The Cozo Project Authors. +# +# This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0. +# If a copy of the MPL was not distributed with this file, +# You can obtain one at https://mozilla.org/MPL/2.0/. +# + +cargo build -p cozo-node -F compact -F storage-rocksdb +mv ../target/debug/libcozo_node.dylib native/6/cozo_node_prebuilt.node \ No newline at end of file