multi-transactions for NodeJS

main
Ziyang Hu 2 years ago
parent f14140f786
commit bd4f5bef12

@ -41,6 +41,21 @@ const {CozoDb} = require('.');
} }
console.log((await db.exportRelations(['test']))['test']['rows']) console.log((await db.exportRelations(['test']))['test']['rows'])
const tx = db.multi_transact(true);
await tx.run(':create a {a}');
await tx.run('?[a] <- [[1]] :put a {a}');
try {
await tx.run(':create a {a}')
} catch (e) {
}
await tx.run('?[a] <- [[2]] :put a {a}')
await tx.run('?[a] <- [[3]] :put a {a}')
tx.commit()
const res = await db.run('?[a] := *a[a]');
console.log(res);
db.unregister_callback(cb_id) db.unregister_callback(cb_id)
db.unregister_named_rule('Pipipy') db.unregister_named_rule('Pipipy')
})() })()

@ -11,6 +11,33 @@ const path = require('path');
const binding_path = binary.find(path.resolve(path.join(__dirname, './package.json'))); const binding_path = binary.find(path.resolve(path.join(__dirname, './package.json')));
const native = require(binding_path); const native = require(binding_path);
class CozoTx {
constructor(id) {
this.tx_id = id;
}
run(script, params) {
return new Promise((resolve, reject) => {
params = params || {};
native.query_tx(this.tx_id, script, params, (err, result) => {
if (err) {
reject(JSON.parse(err))
} else {
resolve(result)
}
})
})
}
abort() {
return native.abort_tx(this.tx_id)
}
commit() {
return native.commit_tx(this.tx_id)
}
}
class CozoDb { class CozoDb {
constructor(engine, path, options) { constructor(engine, path, options) {
this.db_id = native.open_db(engine || 'mem', path || 'data.db', JSON.stringify(options || {})) this.db_id = native.open_db(engine || 'mem', path || 'data.db', JSON.stringify(options || {}))
@ -20,6 +47,10 @@ class CozoDb {
native.close_db(this.db_id) native.close_db(this.db_id)
} }
multi_transact(write) {
return new CozoTx(native.multi_transact(this.db_id, !!write))
}
run(script, params) { run(script, params) {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
params = params || {}; params = params || {};

@ -203,10 +203,12 @@ fn params2js<'a>(
#[derive(Default)] #[derive(Default)]
struct Handles { struct Handles {
current: AtomicU32, nxt_db_id: AtomicU32,
dbs: Mutex<BTreeMap<u32, DbInstance>>, dbs: Mutex<BTreeMap<u32, DbInstance>>,
cb_idx: AtomicU32, cb_idx: AtomicU32,
current_cbs: Mutex<BTreeMap<u32, Sender<Result<NamedRows>>>>, current_cbs: Mutex<BTreeMap<u32, Sender<Result<NamedRows>>>>,
nxt_tx_id: AtomicU32,
txs: Mutex<BTreeMap<u32, Arc<MultiTransaction>>>,
} }
lazy_static! { lazy_static! {
@ -219,7 +221,7 @@ fn open_db(mut cx: FunctionContext) -> JsResult<JsNumber> {
let options = cx.argument::<JsString>(2)?.value(&mut cx); let options = cx.argument::<JsString>(2)?.value(&mut cx);
match DbInstance::new(&engine, path, &options) { match DbInstance::new(&engine, path, &options) {
Ok(db) => { Ok(db) => {
let id = HANDLES.current.fetch_add(1, Ordering::AcqRel); let id = HANDLES.nxt_db_id.fetch_add(1, Ordering::AcqRel);
let mut dbs = HANDLES.dbs.lock().unwrap(); let mut dbs = HANDLES.dbs.lock().unwrap();
dbs.insert(id, db); dbs.insert(id, db);
Ok(cx.number(id)) Ok(cx.number(id))
@ -260,6 +262,79 @@ macro_rules! get_db {
}}; }};
} }
macro_rules! get_tx {
($cx:expr) => {{
let id = $cx.argument::<JsNumber>(0)?.value(&mut $cx) as u32;
let tx = {
let tx_ref = {
let txs = HANDLES.txs.lock().unwrap();
txs.get(&id).cloned()
};
match tx_ref {
None => {
let s = $cx.string("transaction closed");
$cx.throw(s)?
}
Some(tx) => tx,
}
};
tx
}};
}
macro_rules! remove_tx {
($cx:expr) => {{
let id = $cx.argument::<JsNumber>(0)?.value(&mut $cx) as u32;
let tx = {
let tx_ref = {
let mut txs = HANDLES.txs.lock().unwrap();
txs.remove(&id)
};
match tx_ref {
None => {
let s = $cx.string("transaction closed");
$cx.throw(s)?
}
Some(tx) => tx,
}
};
tx
}};
}
fn multi_transact(mut cx: FunctionContext) -> JsResult<JsNumber> {
let db = get_db!(cx);
let write = cx.argument::<JsBoolean>(1)?.value(&mut cx);
let tx = db.multi_transaction(write);
let id = HANDLES.nxt_tx_id.fetch_add(1, Ordering::AcqRel);
HANDLES.txs.lock().unwrap().insert(id, Arc::new(tx));
Ok(cx.number(id))
}
fn abort_tx(mut cx: FunctionContext) -> JsResult<JsUndefined> {
let tx = remove_tx!(cx);
match tx.abort() {
Ok(_) => Ok(cx.undefined()),
Err(err) => {
let msg = cx.string(err.to_string());
cx.throw(msg)
}
}
}
fn commit_tx(mut cx: FunctionContext) -> JsResult<JsUndefined> {
let tx = remove_tx!(cx);
match tx.commit() {
Ok(_) => Ok(cx.undefined()),
Err(err) => {
let msg = cx.string(err.to_string());
cx.throw(msg)
}
}
}
fn query_db(mut cx: FunctionContext) -> JsResult<JsUndefined> { fn query_db(mut cx: FunctionContext) -> JsResult<JsUndefined> {
let db = get_db!(cx); let db = get_db!(cx);
let query = cx.argument::<JsString>(1)?.value(&mut cx); let query = cx.argument::<JsString>(1)?.value(&mut cx);
@ -295,6 +370,52 @@ fn query_db(mut cx: FunctionContext) -> JsResult<JsUndefined> {
Ok(cx.undefined()) Ok(cx.undefined())
} }
fn query_tx(mut cx: FunctionContext) -> JsResult<JsUndefined> {
let tx = get_tx!(cx);
let query = cx.argument::<JsString>(1)?.value(&mut cx);
let params_js = cx.argument::<JsObject>(2)?;
let mut params = BTreeMap::new();
js2params(&mut cx, params_js, &mut params)?;
let callback = cx.argument::<JsFunction>(3)?.root(&mut cx);
let channel = cx.channel();
match tx.sender.send(TransactionPayload::Query((query.clone(), params))) {
Ok(_) => {
thread::spawn(move || {
let result = tx.receiver.recv();
channel.send(move |mut cx| {
let callback = callback.into_inner(&mut cx);
let this = cx.undefined();
match result {
Ok(Ok(nr)) => {
let js_vals = named_rows2js(&mut cx, &nr)?.as_value(&mut cx);
let err = cx.undefined().as_value(&mut cx);
callback.call(&mut cx, this, vec![err, js_vals])?;
}
Ok(Err(err)) => {
let reports = format_error_as_json(err, Some(&query)).to_string();
let err = cx.string(&reports).as_value(&mut cx);
callback.call(&mut cx, this, vec![err])?;
}
Err(err) => {
let err = cx.string(err.to_string()).as_value(&mut cx);
callback.call(&mut cx, this, vec![err])?;
}
}
Ok(())
});
});
Ok(cx.undefined())
}
Err(err) => {
let msg = cx.string(err.to_string());
cx.throw(msg)
}
}
}
fn backup_db(mut cx: FunctionContext) -> JsResult<JsUndefined> { fn backup_db(mut cx: FunctionContext) -> JsResult<JsUndefined> {
let db = get_db!(cx); let db = get_db!(cx);
let path = cx.argument::<JsString>(1)?.value(&mut cx); let path = cx.argument::<JsString>(1)?.value(&mut cx);
@ -599,5 +720,9 @@ fn main(mut cx: ModuleContext) -> NeonResult<()> {
respond_to_named_rule_invocation, respond_to_named_rule_invocation,
)?; )?;
cx.export_function("unregister_named_rule", unregister_named_rule)?; cx.export_function("unregister_named_rule", unregister_named_rule)?;
cx.export_function("abort_tx", abort_tx)?;
cx.export_function("commit_tx", commit_tx)?;
cx.export_function("multi_transact", multi_transact)?;
cx.export_function("query_tx", query_tx)?;
Ok(()) Ok(())
} }

@ -7,4 +7,5 @@
# #
cargo build -p cozo-node -F compact -F storage-rocksdb cargo build -p cozo-node -F compact -F storage-rocksdb
mv ../target/debug/libcozo_node.dylib native/6/cozo_node_prebuilt.node mv ../target/debug/libcozo_node.dylib native/6/cozo_node_prebuilt.node
node example.js
Loading…
Cancel
Save