diff --git a/Cargo.lock b/Cargo.lock index 34521bde..27c2b014 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -772,6 +772,7 @@ dependencies = [ "minreq", "prettytable", "rand 0.8.5", + "rayon", "rustyline", "serde", "serde_derive", @@ -800,6 +801,7 @@ dependencies = [ "lazy_static", "miette", "neon", + "rayon", "serde_json", ] @@ -837,6 +839,7 @@ dependencies = [ "cozo", "miette", "pyo3", + "rayon", "serde_json", ] diff --git a/cozo-bin/Cargo.toml b/cozo-bin/Cargo.toml index 64fc45cd..62f451a0 100644 --- a/cozo-bin/Cargo.toml +++ b/cozo-bin/Cargo.toml @@ -64,4 +64,5 @@ async-stream = "0.3.3" futures = "0.3.25" crossbeam = "0.8.2" eventsource-client = "0.12.0" -tower-http = { version = "0.5.0", features = ["full"] } \ No newline at end of file +tower-http = { version = "0.5.0", features = ["full"] } +rayon = "1.8.0" \ No newline at end of file diff --git a/cozo-bin/src/server.rs b/cozo-bin/src/server.rs index fa1895b7..65299335 100644 --- a/cozo-bin/src/server.rs +++ b/cozo-bin/src/server.rs @@ -10,11 +10,8 @@ use std::collections::BTreeMap; use std::convert::Infallible; use std::net::{Ipv6Addr, SocketAddr}; use std::str::FromStr; -// use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::{Arc, Mutex}; use std::sync::atomic::{AtomicU32, Ordering}; -use std::thread; -// use std::thread; use axum::body::Body; use axum::extract::{DefaultBodyLimit, Path, Query, State}; @@ -583,7 +580,7 @@ async fn register_rule( let rule_senders = st.rule_senders.clone(); let rule_counter = st.rule_counter.clone(); - thread::spawn(move || { + rayon::spawn(move || { for (inputs, options, sender) in task_receiver { let id = rule_counter.fetch_add(1, Ordering::AcqRel); let inputs: serde_json::Value = diff --git a/cozo-core/src/lib.rs b/cozo-core/src/lib.rs index a55712eb..7131d1a3 100644 --- a/cozo-core/src/lib.rs +++ b/cozo-core/src/lib.rs @@ -33,7 +33,6 @@ use std::collections::BTreeMap; use std::path::Path; -use std::thread; #[allow(unused_imports)] use std::time::Instant; @@ -506,7 +505,7 @@ impl DbInstance { let (app2db_send, app2db_recv) = bounded(1); let (db2app_send, db2app_recv) = bounded(1); let db = self.clone(); - thread::spawn(move || db.run_multi_transaction(write, app2db_recv, db2app_send)); + rayon::spawn(move || db.run_multi_transaction(write, app2db_recv, db2app_send)); MultiTransaction { sender: app2db_send, receiver: db2app_recv, diff --git a/cozo-lib-nodejs/Cargo.toml b/cozo-lib-nodejs/Cargo.toml index 1bcfeb68..95d368b4 100644 --- a/cozo-lib-nodejs/Cargo.toml +++ b/cozo-lib-nodejs/Cargo.toml @@ -45,6 +45,7 @@ lazy_static = "1.4.0" crossbeam = "0.8.2" miette = "5.5.0" serde_json = "1.0.96" +rayon = "1.8.0" [dependencies.neon] version = "0.10" diff --git a/cozo-lib-nodejs/src/lib.rs b/cozo-lib-nodejs/src/lib.rs index 4efcbd88..18ed4a64 100644 --- a/cozo-lib-nodejs/src/lib.rs +++ b/cozo-lib-nodejs/src/lib.rs @@ -8,7 +8,6 @@ use std::collections::BTreeMap; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::{Arc, Mutex}; -use std::thread; use crossbeam::channel::Sender; use lazy_static::lazy_static; @@ -410,7 +409,7 @@ fn query_db(mut cx: FunctionContext) -> JsResult { let channel = cx.channel(); - thread::spawn(move || { + rayon::spawn(move || { let result = db.run_script( &query, params, @@ -457,7 +456,7 @@ fn query_tx(mut cx: FunctionContext) -> JsResult { .send(TransactionPayload::Query((query.clone(), params))) { Ok(_) => { - thread::spawn(move || { + rayon::spawn(move || { let result = tx.receiver.recv(); channel.send(move |mut cx| { let callback = callback.into_inner(&mut cx); @@ -497,7 +496,7 @@ fn backup_db(mut cx: FunctionContext) -> JsResult { let callback = cx.argument::(2)?.root(&mut cx); let channel = cx.channel(); - thread::spawn(move || { + rayon::spawn(move || { let result = db.backup_db(&path); channel.send(move |mut cx| { let callback = callback.into_inner(&mut cx); @@ -523,7 +522,7 @@ fn restore_db(mut cx: FunctionContext) -> JsResult { let callback = cx.argument::(2)?.root(&mut cx); let channel = cx.channel(); - thread::spawn(move || { + rayon::spawn(move || { let result = db.restore_backup(&path); channel.send(move |mut cx| { let callback = callback.into_inner(&mut cx); @@ -553,7 +552,7 @@ fn export_relations(mut cx: FunctionContext) -> JsResult { let callback = cx.argument::(2)?.root(&mut cx); let channel = cx.channel(); - thread::spawn(move || { + rayon::spawn(move || { let result = db.export_relations(relations.iter()); channel.send(move |mut cx| { let callback = callback.into_inner(&mut cx); @@ -599,7 +598,7 @@ fn import_relations(mut cx: FunctionContext) -> JsResult { rels.insert(name, nr); } - thread::spawn(move || { + rayon::spawn(move || { let result = db.import_relations(rels); channel.send(move |mut cx| { let callback = callback.into_inner(&mut cx); @@ -632,7 +631,7 @@ fn import_from_backup(mut cx: FunctionContext) -> JsResult { let callback = cx.argument::(3)?.root(&mut cx); let channel = cx.channel(); - thread::spawn(move || { + rayon::spawn(move || { let result = db.import_from_backup(path, &relations); channel.send(move |mut cx| { let callback = callback.into_inner(&mut cx); @@ -665,7 +664,7 @@ fn register_callback(mut cx: FunctionContext) -> JsResult { let channel = cx.channel(); let (rid, recv) = db.register_callback(&name, capacity); - thread::spawn(move || { + rayon::spawn(move || { for (op, new, old) in recv { let cb = callback.clone(); channel.send(move |mut cx| { @@ -701,7 +700,7 @@ fn register_named_rule(mut cx: FunctionContext) -> JsResult { let msg = cx.string(err.to_string()); return cx.throw(msg); } - thread::spawn(move || { + rayon::spawn(move || { for (inputs, options, sender) in recv { let id = HANDLES.cb_idx.fetch_add(1, Ordering::AcqRel); { diff --git a/cozo-lib-python/Cargo.toml b/cozo-lib-python/Cargo.toml index 79ffb982..03441c49 100644 --- a/cozo-lib-python/Cargo.toml +++ b/cozo-lib-python/Cargo.toml @@ -44,4 +44,5 @@ io-uring = ["cozo/io-uring"] cozo = { version = "0.7.6", path = "../cozo-core", default-features = false } pyo3 = { version = "0.20.0", features = ["extension-module", "abi3", "abi3-py37"] } miette = "5.5.0" -serde_json = "1.0.96" \ No newline at end of file +serde_json = "1.0.96" +rayon = "1.8.0" \ No newline at end of file diff --git a/cozo-lib-python/src/lib.rs b/cozo-lib-python/src/lib.rs index 75b77bee..97698c72 100644 --- a/cozo-lib-python/src/lib.rs +++ b/cozo-lib-python/src/lib.rs @@ -7,7 +7,6 @@ */ use std::collections::{BTreeMap, BTreeSet}; -use std::thread; use miette::{IntoDiagnostic, Report, Result}; use pyo3::exceptions::PyException; @@ -258,7 +257,7 @@ impl CozoDbPy { if let Some(db) = &self.db { let cb: Py = callback.into(); let (id, ch) = db.register_callback(rel, None); - thread::spawn(move || { + rayon::spawn(move || { for (op, new, old) in ch { Python::with_gil(|py| { let op = PyString::new(py, op.as_str()).into();