use rayon to spawn threads in the global pool

main
Ziyang Hu 10 months ago
parent ea5f3f2e01
commit 303ab1f7ee

3
Cargo.lock generated

@ -772,6 +772,7 @@ dependencies = [
"minreq",
"prettytable",
"rand 0.8.5",
"rayon",
"rustyline",
"serde",
"serde_derive",
@ -800,6 +801,7 @@ dependencies = [
"lazy_static",
"miette",
"neon",
"rayon",
"serde_json",
]
@ -837,6 +839,7 @@ dependencies = [
"cozo",
"miette",
"pyo3",
"rayon",
"serde_json",
]

@ -65,3 +65,4 @@ futures = "0.3.25"
crossbeam = "0.8.2"
eventsource-client = "0.12.0"
tower-http = { version = "0.5.0", features = ["full"] }
rayon = "1.8.0"

@ -10,11 +10,8 @@ use std::collections::BTreeMap;
use std::convert::Infallible;
use std::net::{Ipv6Addr, SocketAddr};
use std::str::FromStr;
// use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicU32, Ordering};
use std::thread;
// use std::thread;
use axum::body::Body;
use axum::extract::{DefaultBodyLimit, Path, Query, State};
@ -583,7 +580,7 @@ async fn register_rule(
let rule_senders = st.rule_senders.clone();
let rule_counter = st.rule_counter.clone();
thread::spawn(move || {
rayon::spawn(move || {
for (inputs, options, sender) in task_receiver {
let id = rule_counter.fetch_add(1, Ordering::AcqRel);
let inputs: serde_json::Value =

@ -33,7 +33,6 @@
use std::collections::BTreeMap;
use std::path::Path;
use std::thread;
#[allow(unused_imports)]
use std::time::Instant;
@ -506,7 +505,7 @@ impl DbInstance {
let (app2db_send, app2db_recv) = bounded(1);
let (db2app_send, db2app_recv) = bounded(1);
let db = self.clone();
thread::spawn(move || db.run_multi_transaction(write, app2db_recv, db2app_send));
rayon::spawn(move || db.run_multi_transaction(write, app2db_recv, db2app_send));
MultiTransaction {
sender: app2db_send,
receiver: db2app_recv,

@ -45,6 +45,7 @@ lazy_static = "1.4.0"
crossbeam = "0.8.2"
miette = "5.5.0"
serde_json = "1.0.96"
rayon = "1.8.0"
[dependencies.neon]
version = "0.10"

@ -8,7 +8,6 @@
use std::collections::BTreeMap;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, Mutex};
use std::thread;
use crossbeam::channel::Sender;
use lazy_static::lazy_static;
@ -410,7 +409,7 @@ fn query_db(mut cx: FunctionContext) -> JsResult<JsUndefined> {
let channel = cx.channel();
thread::spawn(move || {
rayon::spawn(move || {
let result = db.run_script(
&query,
params,
@ -457,7 +456,7 @@ fn query_tx(mut cx: FunctionContext) -> JsResult<JsUndefined> {
.send(TransactionPayload::Query((query.clone(), params)))
{
Ok(_) => {
thread::spawn(move || {
rayon::spawn(move || {
let result = tx.receiver.recv();
channel.send(move |mut cx| {
let callback = callback.into_inner(&mut cx);
@ -497,7 +496,7 @@ fn backup_db(mut cx: FunctionContext) -> JsResult<JsUndefined> {
let callback = cx.argument::<JsFunction>(2)?.root(&mut cx);
let channel = cx.channel();
thread::spawn(move || {
rayon::spawn(move || {
let result = db.backup_db(&path);
channel.send(move |mut cx| {
let callback = callback.into_inner(&mut cx);
@ -523,7 +522,7 @@ fn restore_db(mut cx: FunctionContext) -> JsResult<JsUndefined> {
let callback = cx.argument::<JsFunction>(2)?.root(&mut cx);
let channel = cx.channel();
thread::spawn(move || {
rayon::spawn(move || {
let result = db.restore_backup(&path);
channel.send(move |mut cx| {
let callback = callback.into_inner(&mut cx);
@ -553,7 +552,7 @@ fn export_relations(mut cx: FunctionContext) -> JsResult<JsUndefined> {
let callback = cx.argument::<JsFunction>(2)?.root(&mut cx);
let channel = cx.channel();
thread::spawn(move || {
rayon::spawn(move || {
let result = db.export_relations(relations.iter());
channel.send(move |mut cx| {
let callback = callback.into_inner(&mut cx);
@ -599,7 +598,7 @@ fn import_relations(mut cx: FunctionContext) -> JsResult<JsUndefined> {
rels.insert(name, nr);
}
thread::spawn(move || {
rayon::spawn(move || {
let result = db.import_relations(rels);
channel.send(move |mut cx| {
let callback = callback.into_inner(&mut cx);
@ -632,7 +631,7 @@ fn import_from_backup(mut cx: FunctionContext) -> JsResult<JsUndefined> {
let callback = cx.argument::<JsFunction>(3)?.root(&mut cx);
let channel = cx.channel();
thread::spawn(move || {
rayon::spawn(move || {
let result = db.import_from_backup(path, &relations);
channel.send(move |mut cx| {
let callback = callback.into_inner(&mut cx);
@ -665,7 +664,7 @@ fn register_callback(mut cx: FunctionContext) -> JsResult<JsNumber> {
let channel = cx.channel();
let (rid, recv) = db.register_callback(&name, capacity);
thread::spawn(move || {
rayon::spawn(move || {
for (op, new, old) in recv {
let cb = callback.clone();
channel.send(move |mut cx| {
@ -701,7 +700,7 @@ fn register_named_rule(mut cx: FunctionContext) -> JsResult<JsUndefined> {
let msg = cx.string(err.to_string());
return cx.throw(msg);
}
thread::spawn(move || {
rayon::spawn(move || {
for (inputs, options, sender) in recv {
let id = HANDLES.cb_idx.fetch_add(1, Ordering::AcqRel);
{

@ -45,3 +45,4 @@ cozo = { version = "0.7.6", path = "../cozo-core", default-features = false }
pyo3 = { version = "0.20.0", features = ["extension-module", "abi3", "abi3-py37"] }
miette = "5.5.0"
serde_json = "1.0.96"
rayon = "1.8.0"

@ -7,7 +7,6 @@
*/
use std::collections::{BTreeMap, BTreeSet};
use std::thread;
use miette::{IntoDiagnostic, Report, Result};
use pyo3::exceptions::PyException;
@ -258,7 +257,7 @@ impl CozoDbPy {
if let Some(db) = &self.db {
let cb: Py<PyAny> = callback.into();
let (id, ch) = db.register_callback(rel, None);
thread::spawn(move || {
rayon::spawn(move || {
for (op, new, old) in ch {
Python::with_gil(|py| {
let op = PyString::new(py, op.as_str()).into();

Loading…
Cancel
Save