From bd4f5bef120ef7a1e9a4b7dd74198ebfeb99512a Mon Sep 17 00:00:00 2001 From: Ziyang Hu Date: Mon, 16 Jan 2023 22:08:04 +0800 Subject: [PATCH] multi-transactions for NodeJS --- cozo-lib-nodejs/example.js | 15 ++++ cozo-lib-nodejs/index.js | 31 ++++++++ cozo-lib-nodejs/src/lib.rs | 129 +++++++++++++++++++++++++++++++++- cozo-lib-nodejs/test_build.sh | 3 +- 4 files changed, 175 insertions(+), 3 deletions(-) diff --git a/cozo-lib-nodejs/example.js b/cozo-lib-nodejs/example.js index 2ecd3846..abbd3eef 100644 --- a/cozo-lib-nodejs/example.js +++ b/cozo-lib-nodejs/example.js @@ -41,6 +41,21 @@ const {CozoDb} = require('.'); } console.log((await db.exportRelations(['test']))['test']['rows']) + + const tx = db.multi_transact(true); + await tx.run(':create a {a}'); + await tx.run('?[a] <- [[1]] :put a {a}'); + try { + await tx.run(':create a {a}') + } catch (e) { + } + await tx.run('?[a] <- [[2]] :put a {a}') + await tx.run('?[a] <- [[3]] :put a {a}') + tx.commit() + + const res = await db.run('?[a] := *a[a]'); + console.log(res); + db.unregister_callback(cb_id) db.unregister_named_rule('Pipipy') })() diff --git a/cozo-lib-nodejs/index.js b/cozo-lib-nodejs/index.js index deeaeed3..6a3c22df 100644 --- a/cozo-lib-nodejs/index.js +++ b/cozo-lib-nodejs/index.js @@ -11,6 +11,33 @@ const path = require('path'); const binding_path = binary.find(path.resolve(path.join(__dirname, './package.json'))); const native = require(binding_path); + +class CozoTx { + constructor(id) { + this.tx_id = id; + } + + run(script, params) { + return new Promise((resolve, reject) => { + params = params || {}; + native.query_tx(this.tx_id, script, params, (err, result) => { + if (err) { + reject(JSON.parse(err)) + } else { + resolve(result) + } + }) + }) + } + abort() { + return native.abort_tx(this.tx_id) + } + + commit() { + return native.commit_tx(this.tx_id) + } +} + class CozoDb { constructor(engine, path, options) { this.db_id = native.open_db(engine || 'mem', path || 'data.db', JSON.stringify(options || {})) @@ -20,6 +47,10 @@ class CozoDb { native.close_db(this.db_id) } + multi_transact(write) { + return new CozoTx(native.multi_transact(this.db_id, !!write)) + } + run(script, params) { return new Promise((resolve, reject) => { params = params || {}; diff --git a/cozo-lib-nodejs/src/lib.rs b/cozo-lib-nodejs/src/lib.rs index 74f4f75b..80f34114 100644 --- a/cozo-lib-nodejs/src/lib.rs +++ b/cozo-lib-nodejs/src/lib.rs @@ -203,10 +203,12 @@ fn params2js<'a>( #[derive(Default)] struct Handles { - current: AtomicU32, + nxt_db_id: AtomicU32, dbs: Mutex>, cb_idx: AtomicU32, current_cbs: Mutex>>>, + nxt_tx_id: AtomicU32, + txs: Mutex>>, } lazy_static! { @@ -219,7 +221,7 @@ fn open_db(mut cx: FunctionContext) -> JsResult { let options = cx.argument::(2)?.value(&mut cx); match DbInstance::new(&engine, path, &options) { Ok(db) => { - let id = HANDLES.current.fetch_add(1, Ordering::AcqRel); + let id = HANDLES.nxt_db_id.fetch_add(1, Ordering::AcqRel); let mut dbs = HANDLES.dbs.lock().unwrap(); dbs.insert(id, db); Ok(cx.number(id)) @@ -260,6 +262,79 @@ macro_rules! get_db { }}; } +macro_rules! get_tx { + ($cx:expr) => {{ + let id = $cx.argument::(0)?.value(&mut $cx) as u32; + let tx = { + let tx_ref = { + let txs = HANDLES.txs.lock().unwrap(); + txs.get(&id).cloned() + }; + match tx_ref { + None => { + let s = $cx.string("transaction closed"); + $cx.throw(s)? + } + Some(tx) => tx, + } + }; + tx + }}; +} + + +macro_rules! remove_tx { + ($cx:expr) => {{ + let id = $cx.argument::(0)?.value(&mut $cx) as u32; + let tx = { + let tx_ref = { + let mut txs = HANDLES.txs.lock().unwrap(); + txs.remove(&id) + }; + match tx_ref { + None => { + let s = $cx.string("transaction closed"); + $cx.throw(s)? + } + Some(tx) => tx, + } + }; + tx + }}; +} + + +fn multi_transact(mut cx: FunctionContext) -> JsResult { + let db = get_db!(cx); + let write = cx.argument::(1)?.value(&mut cx); + let tx = db.multi_transaction(write); + let id = HANDLES.nxt_tx_id.fetch_add(1, Ordering::AcqRel); + HANDLES.txs.lock().unwrap().insert(id, Arc::new(tx)); + Ok(cx.number(id)) +} + +fn abort_tx(mut cx: FunctionContext) -> JsResult { + let tx = remove_tx!(cx); + match tx.abort() { + Ok(_) => Ok(cx.undefined()), + Err(err) => { + let msg = cx.string(err.to_string()); + cx.throw(msg) + } + } +} + +fn commit_tx(mut cx: FunctionContext) -> JsResult { + let tx = remove_tx!(cx); + match tx.commit() { + Ok(_) => Ok(cx.undefined()), + Err(err) => { + let msg = cx.string(err.to_string()); + cx.throw(msg) + } + } +} + fn query_db(mut cx: FunctionContext) -> JsResult { let db = get_db!(cx); let query = cx.argument::(1)?.value(&mut cx); @@ -295,6 +370,52 @@ fn query_db(mut cx: FunctionContext) -> JsResult { Ok(cx.undefined()) } +fn query_tx(mut cx: FunctionContext) -> JsResult { + let tx = get_tx!(cx); + let query = cx.argument::(1)?.value(&mut cx); + let params_js = cx.argument::(2)?; + let mut params = BTreeMap::new(); + js2params(&mut cx, params_js, &mut params)?; + + let callback = cx.argument::(3)?.root(&mut cx); + + let channel = cx.channel(); + match tx.sender.send(TransactionPayload::Query((query.clone(), params))) { + Ok(_) => { + thread::spawn(move || { + let result = tx.receiver.recv(); + channel.send(move |mut cx| { + let callback = callback.into_inner(&mut cx); + let this = cx.undefined(); + match result { + Ok(Ok(nr)) => { + let js_vals = named_rows2js(&mut cx, &nr)?.as_value(&mut cx); + let err = cx.undefined().as_value(&mut cx); + callback.call(&mut cx, this, vec![err, js_vals])?; + } + Ok(Err(err)) => { + let reports = format_error_as_json(err, Some(&query)).to_string(); + let err = cx.string(&reports).as_value(&mut cx); + callback.call(&mut cx, this, vec![err])?; + } + Err(err) => { + let err = cx.string(err.to_string()).as_value(&mut cx); + callback.call(&mut cx, this, vec![err])?; + } + } + Ok(()) + }); + }); + + Ok(cx.undefined()) + } + Err(err) => { + let msg = cx.string(err.to_string()); + cx.throw(msg) + } + } +} + fn backup_db(mut cx: FunctionContext) -> JsResult { let db = get_db!(cx); let path = cx.argument::(1)?.value(&mut cx); @@ -599,5 +720,9 @@ fn main(mut cx: ModuleContext) -> NeonResult<()> { respond_to_named_rule_invocation, )?; cx.export_function("unregister_named_rule", unregister_named_rule)?; + cx.export_function("abort_tx", abort_tx)?; + cx.export_function("commit_tx", commit_tx)?; + cx.export_function("multi_transact", multi_transact)?; + cx.export_function("query_tx", query_tx)?; Ok(()) } diff --git a/cozo-lib-nodejs/test_build.sh b/cozo-lib-nodejs/test_build.sh index 3236bb8d..7fd4ec78 100755 --- a/cozo-lib-nodejs/test_build.sh +++ b/cozo-lib-nodejs/test_build.sh @@ -7,4 +7,5 @@ # cargo build -p cozo-node -F compact -F storage-rocksdb -mv ../target/debug/libcozo_node.dylib native/6/cozo_node_prebuilt.node \ No newline at end of file +mv ../target/debug/libcozo_node.dylib native/6/cozo_node_prebuilt.node +node example.js \ No newline at end of file