make cozo run within a single thread

main
Ziyang Hu 2 years ago
parent 4dc26a8e5a
commit 97ef8d4a2a

@ -20,7 +20,9 @@ exclude = [
default = ["compact", "storage-rocksdb"]
## Enables the `minimal`, `requests` and `graph-algo` features
compact = ["minimal", "requests", "graph-algo"]
compact = ["minimal", "requests", "graph-algo", "rayon"]
## Enables the `minimal`, `requests` and `graph-algo` features in single threaded mode
compact-single-threaded = ["minimal", "requests", "graph-algo"]
## Enables the `storage-sqlite` feature
minimal = ["storage-sqlite"]
## Enables the [Sqlite](https://www.sqlite.org/index.html) backend, also allows backup and restore with Sqlite data files.
@ -28,7 +30,7 @@ storage-sqlite = ["dep:sqlite"]
## Enables the [RocksDB](http://rocksdb.org/) backend
storage-rocksdb = ["dep:cozorocks"]
## Enables the graph algorithms
graph-algo = ["dep:rayon", "dep:nalgebra"]
graph-algo = ["dep:nalgebra"]
## Allows the utilities to make web requests to fetch data
requests = ["dep:minreq"]
## Uses jemalloc as the global allocator, can make a difference in performance
@ -37,6 +39,10 @@ jemalloc = ["dep:tikv-jemallocator-global", "cozorocks?/jemalloc"]
io-uring = ["cozorocks?/io-uring"]
## Enables the WASM target
wasm = ["uuid/js"]
## Allows threading and enables the use of the `rayon` library for parallelizing algorithms
rayon = ["dep:rayon"]
## Disallows the use of threads
nothread = []
#! The following features are highly experimental:

@ -13,6 +13,7 @@ use itertools::Itertools;
use miette::Result;
use ordered_float::OrderedFloat;
use priority_queue::PriorityQueue;
#[cfg(feature = "rayon")]
use rayon::prelude::*;
use smartstring::{LazyCompact, SmartString};
@ -50,8 +51,12 @@ impl AlgoImpl for BetweennessCentrality {
return Ok(());
}
let centrality_segs: Vec<_> = (0..n)
.into_par_iter()
#[cfg(feature = "rayon")]
let it = (0..n).into_par_iter();
#[cfg(not(feature = "rayon"))]
let it = (0..n).into_iter();
let centrality_segs: Vec<_> = it
.map(|start| -> Result<BTreeMap<usize, f64>> {
let res_for_start =
dijkstra_keep_ties(&graph, start, &(), &(), &(), poison.clone())?;
@ -119,8 +124,12 @@ impl AlgoImpl for ClosenessCentrality {
if n == 0 {
return Ok(());
}
let res: Vec<_> = (0..n)
.into_par_iter()
#[cfg(feature = "rayon")]
let it = (0..n).into_par_iter();
#[cfg(not(feature = "rayon"))]
let it = (0..n).into_iter();
let res: Vec<_> = it
.map(|start| -> Result<f64> {
let distances = dijkstra_cost_only(&graph, start, poison.clone())?;
let total_dist: f64 = distances.iter().filter(|d| d.is_finite()).cloned().sum();

@ -14,6 +14,7 @@ use itertools::Itertools;
use miette::Result;
use ordered_float::OrderedFloat;
use priority_queue::PriorityQueue;
#[cfg(feature = "rayon")]
use rayon::prelude::*;
use smallvec::{smallvec, SmallVec};
use smartstring::{LazyCompact, SmartString};
@ -101,8 +102,12 @@ impl AlgoImpl for ShortestPathDijkstra {
}
}
} else {
let all_res: Vec<_> = starting_nodes
.into_par_iter()
#[cfg(feature = "rayon")]
let it = starting_nodes.into_par_iter();
#[cfg(not(feature = "rayon"))]
let it = starting_nodes.into_iter();
let all_res: Vec<_> = it
.map(|start| -> Result<(usize, Vec<(usize, f64, Vec<usize>)>)> {
Ok((
start,

@ -9,6 +9,7 @@
use std::collections::{BTreeMap, BTreeSet};
use miette::Result;
#[cfg(feature = "rayon")]
use rayon::prelude::*;
use smartstring::{LazyCompact, SmartString};
@ -68,26 +69,29 @@ fn clustering_coefficients(
graph: &[BTreeSet<usize>],
poison: Poison,
) -> Result<Vec<(f64, usize, usize)>> {
graph
.par_iter()
.map(|edges| -> Result<(f64, usize, usize)> {
let degree = edges.len();
if degree < 2 {
Ok((0., 0, degree))
} else {
let n_triangles = edges
.iter()
.map(|e_src| {
edges
.iter()
.filter(|e_dst| e_src > e_dst && graph[*e_src].contains(*e_dst))
.count()
})
.sum();
let cc = 2. * n_triangles as f64 / ((degree as f64) * ((degree as f64) - 1.));
poison.check()?;
Ok((cc, n_triangles, degree))
}
})
.collect::<Result<_>>()
#[cfg(feature = "rayon")]
let it = graph.par_iter();
#[cfg(not(feature = "rayon"))]
let it = graph.iter();
it.map(|edges| -> Result<(f64, usize, usize)> {
let degree = edges.len();
if degree < 2 {
Ok((0., 0, degree))
} else {
let n_triangles = edges
.iter()
.map(|e_src| {
edges
.iter()
.filter(|e_dst| e_src > e_dst && graph[*e_src].contains(*e_dst))
.count()
})
.sum();
let cc = 2. * n_triangles as f64 / ((degree as f64) * ((degree as f64) - 1.));
poison.check()?;
Ok((cc, n_triangles, degree))
}
})
.collect::<Result<_>>()
}

@ -10,6 +10,7 @@ use std::collections::{BTreeMap, BTreeSet};
use itertools::Itertools;
use miette::Result;
#[cfg(feature = "rayon")]
use rayon::prelude::*;
use smartstring::{LazyCompact, SmartString};
@ -80,10 +81,13 @@ impl AlgoImpl for KShortestPathYen {
}
}
} else {
let res_all: Vec<_> = starting_nodes
let first_it = starting_nodes
.iter()
.flat_map(|start| termination_nodes.iter().map(|goal| (*start, *goal)))
.par_bridge()
.flat_map(|start| termination_nodes.iter().map(|goal| (*start, *goal)));
#[cfg(feature = "rayon")]
let first_it = first_it.par_bridge();
let res_all: Vec<_> = first_it
.map(
|(start, goal)| -> Result<(usize, usize, Vec<(f64, Vec<usize>)>)> {
Ok((
@ -94,6 +98,7 @@ impl AlgoImpl for KShortestPathYen {
},
)
.collect::<Result<_>>()?;
for (start, goal, res) in res_all {
for (cost, path) in res {
let t = vec![

@ -8,8 +8,8 @@
use std::collections::BTreeMap;
use std::fmt::{Debug, Formatter};
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
@ -17,7 +17,7 @@ use either::{Left, Right};
use itertools::Itertools;
use lazy_static::lazy_static;
use miette::{
bail, Diagnostic, ensure, GraphicalReportHandler, GraphicalTheme, IntoDiagnostic,
bail, ensure, Diagnostic, GraphicalReportHandler, GraphicalTheme, IntoDiagnostic,
JSONReportHandler, Result, WrapErr,
};
use serde_json::{json, Map};
@ -29,8 +29,8 @@ use crate::data::program::{InputProgram, QueryAssertion, RelationOp};
use crate::data::symb::Symbol;
use crate::data::tuple::Tuple;
use crate::data::value::{DataValue, LARGEST_UTF_CHAR};
use crate::parse::{CozoScript, parse_script, SourceSpan};
use crate::parse::sys::SysOp;
use crate::parse::{parse_script, CozoScript, SourceSpan};
use crate::query::compile::{CompiledProgram, CompiledRule, CompiledRuleSet};
use crate::query::relation::{
FilteredRA, InMemRelationRA, InnerJoin, NegJoin, RelAlgebra, ReorderRA, StoredRA, UnificationRA,
@ -140,7 +140,11 @@ impl<'s, S: Storage<'s>> Db<S> {
Ok(ret)
}
/// Run the CozoScript passed in. The `params` argument is a map of parameters.
pub fn run_script(&'s self, payload: &str, params: &Map<String, JsonValue>) -> Result<JsonValue> {
pub fn run_script(
&'s self,
payload: &str,
params: &Map<String, JsonValue>,
) -> Result<JsonValue> {
let start = Instant::now();
match self.do_run_script(payload, params) {
Ok(mut json) => {
@ -155,7 +159,11 @@ impl<'s, S: Storage<'s>> Db<S> {
}
/// Run the CozoScript passed in. The `params` argument is a map of parameters.
/// Fold any error into the return JSON itself.
pub fn run_script_fold_err(&'s self, payload: &str, params: &Map<String, JsonValue>) -> JsonValue {
pub fn run_script_fold_err(
&'s self,
payload: &str,
params: &Map<String, JsonValue>,
) -> JsonValue {
match self.run_script(payload, params) {
Ok(json) => json,
Err(mut err) => {
@ -198,7 +206,11 @@ impl<'s, S: Storage<'s>> Db<S> {
};
self.run_script_fold_err(payload, &params_json).to_string()
}
fn do_run_script(&'s self, payload: &str, params: &Map<String, JsonValue>) -> Result<JsonValue> {
fn do_run_script(
&'s self,
payload: &str,
params: &Map<String, JsonValue>,
) -> Result<JsonValue> {
let param_pool = params
.iter()
.map(|(k, v)| (k.clone(), DataValue::from(v)))
@ -206,29 +218,32 @@ impl<'s, S: Storage<'s>> Db<S> {
match parse_script(payload, &param_pool)? {
CozoScript::Multi(ps) => {
let is_write = ps.iter().any(|p| p.out_opts.store_relation.is_some());
let mut tx = if is_write {
self.transact_write()?
} else {
self.transact()?
};
let mut res = json!(null);
let mut cleanups = vec![];
for p in ps {
let sleep_opt = p.out_opts.sleep;
let (q_res, q_cleanups) = self.run_query(&mut tx, p)?;
res = q_res;
cleanups.extend(q_cleanups);
if let Some(secs) = sleep_opt {
thread::sleep(Duration::from_micros((secs * 1000000.) as u64));
let mut res = json!(null);
{
let mut tx = if is_write {
self.transact_write()?
} else {
self.transact()?
};
for p in ps {
let sleep_opt = p.out_opts.sleep;
let (q_res, q_cleanups) = self.run_query(&mut tx, p)?;
res = q_res;
cleanups.extend(q_cleanups);
if let Some(secs) = sleep_opt {
thread::sleep(Duration::from_micros((secs * 1000000.) as u64));
}
}
if is_write {
tx.commit_tx()?;
} else {
tx.commit_tx()?;
assert!(cleanups.is_empty(), "non-empty cleanups on read-only tx");
}
}
if is_write {
tx.commit_tx()?;
} else {
tx.commit_tx()?;
assert!(cleanups.is_empty(), "non-empty cleanups on read-only tx");
}
for (lower, upper) in cleanups {
self.db.del_range(&lower, &upper)?;
}
@ -431,11 +446,18 @@ impl<'s, S: Storage<'s>> Db<S> {
}
SysOp::ListRelations => self.list_relations(),
SysOp::RemoveRelation(rel_names) => {
let mut tx = self.transact_write()?;
for rs in rel_names {
self.remove_relation(&rs, &mut tx)?;
let mut bounds = vec![];
{
let mut tx = self.transact_write()?;
for rs in rel_names {
let bound = tx.destroy_relation(&rs)?;
bounds.push(bound);
}
tx.commit_tx()?;
}
for (lower, upper) in bounds {
self.db.del_range(&lower, &upper)?;
}
tx.commit_tx()?;
Ok(json!({"headers": ["status"], "rows": [["OK"]]}))
}
SysOp::ListRelation(rs) => self.list_relation(&rs),
@ -533,7 +555,7 @@ impl<'s, S: Storage<'s>> Db<S> {
let poison = Poison::default();
if let Some(secs) = input_program.out_opts.timeout {
poison.set_timeout(secs);
poison.set_timeout(secs)?;
}
let id = self.queries_count.fetch_add(1, Ordering::AcqRel);
@ -672,11 +694,6 @@ impl<'s, S: Storage<'s>> Db<S> {
}
}
}
pub(crate) fn remove_relation(&'s self, name: &Symbol, tx: &mut SessionTx<'_>) -> Result<()> {
let (lower, upper) = tx.destroy_relation(name)?;
self.db.del_range(&lower, &upper)?;
Ok(())
}
pub(crate) fn list_running(&self) -> Result<JsonValue> {
let res = self
.running_queries
@ -768,11 +785,15 @@ impl Poison {
}
Ok(())
}
pub(crate) fn set_timeout(&self, secs: f64) {
pub(crate) fn set_timeout(&self, secs: f64) -> Result<()> {
#[cfg(feature = "nothread")]
bail!("Cannot set timeout when threading is disallowed");
let pill = self.0.clone();
thread::spawn(move || {
thread::sleep(Duration::from_micros((secs * 1000000.) as u64));
pill.store(true, Ordering::Relaxed);
});
Ok(())
}
}

@ -55,7 +55,7 @@ impl<'s> Storage<'s> for MemStorage {
let store = self.store.clone();
let lower_b = lower.to_vec();
let upper_b = upper.to_vec();
thread::spawn(move || {
let closure = move || {
let keys = {
let rdr = store.read().unwrap();
rdr.range(lower_b..upper_b)
@ -66,7 +66,11 @@ impl<'s> Storage<'s> for MemStorage {
for k in keys.iter() {
wtr.remove(k);
}
});
};
#[cfg(feature = "nothread")]
closure();
#[cfg(not(feature = "nothread"))]
thread::spawn(closure);
Ok(())
}

@ -81,14 +81,18 @@ impl<'s> Storage<'s> for SqliteStorage {
"#;
let lock = self.lock.clone();
let name = self.name.clone();
std::thread::spawn(move || {
let closure = move || {
let _locked = lock.write().unwrap();
let conn = sqlite::open(&name).unwrap();
let mut statement = conn.prepare(query).unwrap();
statement.bind((1, &lower_b as &[u8])).unwrap();
statement.bind((2, &upper_b as &[u8])).unwrap();
while statement.next().unwrap() != State::Done {}
});
};
#[cfg(feature = "nothread")]
closure();
#[cfg(not(feature = "nothread"))]
std::thread::spawn(closure);
Ok(())
}

Loading…
Cancel
Save