Wait for threads to start up before returning

next
Sayan Nandan 2 years ago
parent da645247d2
commit 60d36af34b
No known key found for this signature in database
GPG Key ID: 8BC07A0A4D41DD52

@ -59,6 +59,9 @@
#![deny(unused_imports)]
pub mod traits;
use std::fmt::Display;
pub use rayon;
use {
@ -68,6 +71,26 @@ use {
std::thread,
};
#[derive(Debug)]
pub enum WorkpoolError {
ThreadStartFailure(usize, usize),
}
impl Display for WorkpoolError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
WorkpoolError::ThreadStartFailure(expected, started) => {
write!(
f,
"couldn't start all threads. expected {expected} but started {started}"
)
}
}
}
}
pub type WorkpoolResult<T> = Result<T, WorkpoolError>;
/// A Job. The UIn type parameter is the type that will be used to execute the action
/// Nothing is a variant used by the drop implementation to terminate all the workers
/// and call the exit_loop function
@ -87,10 +110,12 @@ struct Worker {
impl Worker {
/// Initialize a new worker
fn new<Inp: 'static, UIn, Lv, Lp, Ex>(
id: usize,
job_receiver: CReceiver<JobType<UIn>>,
init_pre_loop_var: Lv,
on_exit: Ex,
on_loop: Lp,
wgtx: CSender<()>,
) -> Self
where
UIn: Send + Sync + 'static,
@ -98,20 +123,25 @@ impl Worker {
Lp: Fn(&mut Inp, UIn) + Send + Sync + 'static,
Ex: Fn(&mut Inp) + Send + 'static,
{
let thread = thread::spawn(move || {
let on_loop = on_loop;
let mut pre_loop_var = init_pre_loop_var();
loop {
let action = job_receiver.recv().unwrap();
match action {
JobType::Task(tsk) => on_loop(&mut pre_loop_var, tsk),
JobType::Nothing => {
on_exit(&mut pre_loop_var);
break;
let thread = thread::Builder::new()
.name(format!("worker-{id}"))
.spawn(move || {
let on_loop = on_loop;
let mut pre_loop_var = init_pre_loop_var();
wgtx.send(()).unwrap();
drop(wgtx);
loop {
let action = job_receiver.recv().unwrap();
match action {
JobType::Task(tsk) => on_loop(&mut pre_loop_var, tsk),
JobType::Nothing => {
on_exit(&mut pre_loop_var);
break;
}
}
}
}
});
})
.unwrap();
Self {
thread: Some(thread),
}
@ -165,11 +195,14 @@ where
}
}
/// Get a new [`Workpool`] from the current config
pub fn get_pool(&self) -> Workpool<Inp, UIn, Lv, Lp, Ex> {
pub fn get_pool(&self) -> WorkpoolResult<Workpool<Inp, UIn, Lv, Lp, Ex>> {
self.get_pool_with_workers(self.count)
}
/// Get a [`Workpool`] with the base config but with a different number of workers
pub fn get_pool_with_workers(&self, count: usize) -> Workpool<Inp, UIn, Lv, Lp, Ex> {
pub fn get_pool_with_workers(
&self,
count: usize,
) -> WorkpoolResult<Workpool<Inp, UIn, Lv, Lp, Ex>> {
Workpool::new(
count,
self.init_pre_loop_var.clone(),
@ -180,7 +213,7 @@ where
)
}
/// Get a [`Workpool`] with the base config but with a custom loop-stage closure
pub fn with_loop_closure<Dlp>(&self, lp: Dlp) -> Workpool<Inp, UIn, Lv, Dlp, Ex>
pub fn with_loop_closure<Dlp>(&self, lp: Dlp) -> WorkpoolResult<Workpool<Inp, UIn, Lv, Dlp, Ex>>
where
Dlp: Fn(&mut Inp, UIn) + Clone + Send + Sync + 'static,
{
@ -195,26 +228,6 @@ where
}
}
impl<Inp: 'static, UIn, Lp, Lv, Ex> Clone for Workpool<Inp, UIn, Lv, Lp, Ex>
where
UIn: Send + Sync + 'static,
Inp: Sync,
Ex: Fn(&mut Inp) + Send + Sync + 'static + Clone,
Lv: Fn() -> Inp + Send + Sync + 'static + Clone,
Lp: Fn(&mut Inp, UIn) + Clone + Send + Sync + 'static,
{
fn clone(&self) -> Self {
Workpool::new(
self.workers.len(),
self.init_pre_loop_var.clone(),
self.on_loop.clone(),
self.on_exit.clone(),
self.needs_iterator_pool,
self.expected_max_sends,
)
}
}
/// # Workpool
///
/// A Workpool is a generic synchronous thread pool that can be used to perform, well, anything.
@ -269,56 +282,75 @@ where
on_exit: Ex,
needs_iterator_pool: bool,
expected_max_sends: Option<usize>,
) -> Self {
) -> WorkpoolResult<Self> {
// init threadpool for iterator
if needs_iterator_pool {
// initialize a global threadpool for parallel iterators
let _ = rayon::ThreadPoolBuilder::new()
.num_threads(count)
.build_global();
}
if count == 0 {
panic!("Runtime panic: Bad value `0` for thread count");
}
assert!(count != 0, "Runtime panic: Bad value `0` for thread count");
let (sender, receiver) = match expected_max_sends {
Some(limit) => bounded(limit),
None => unbounded(),
};
let (wgtx, wgrx) = bounded::<()>(count);
let mut workers = Vec::with_capacity(count);
for _ in 0..count {
for i in 0..count {
workers.push(Worker::new(
i,
receiver.clone(),
init_pre_loop_var.clone(),
on_exit.clone(),
on_loop.clone(),
wgtx.clone(),
));
}
Self {
workers,
job_distributor: sender,
init_pre_loop_var,
on_exit,
on_loop,
_marker: PhantomData,
needs_iterator_pool,
expected_max_sends,
drop(wgtx);
let sum: usize = wgrx.iter().map(|_| 1usize).sum();
if sum == count {
Ok(Self {
workers,
job_distributor: sender,
init_pre_loop_var,
on_exit,
on_loop,
_marker: PhantomData,
needs_iterator_pool,
expected_max_sends,
})
} else {
Err(WorkpoolError::ThreadStartFailure(count, sum))
}
}
pub fn clone(&self) -> WorkpoolResult<Self> {
Self::new(
self.workers.len(),
self.init_pre_loop_var.clone(),
self.on_loop.clone(),
self.on_exit.clone(),
self.needs_iterator_pool,
self.expected_max_sends,
)
}
/// Execute something
pub fn execute(&self, inp: UIn) -> bool {
self.job_distributor.send(JobType::Task(inp)).is_ok()
pub fn execute(&self, inp: UIn) {
self.job_distributor
.send(JobType::Task(inp))
.expect("Worker thread crashed")
}
/// Execute something that can be executed as a parallel iterator
/// For the best performance, it is recommended that you pass true for `needs_iterator_pool`
/// on initialization of the [`Workpool`]
pub fn execute_iter(&self, iter: impl IntoParallelIterator<Item = UIn>) -> bool {
iter.into_par_iter().all(|inp| self.execute(inp))
pub fn execute_iter(&self, iter: impl IntoParallelIterator<Item = UIn>) {
iter.into_par_iter().for_each(|inp| self.execute(inp))
}
/// Does the same thing as [`execute_iter`] but drops self ensuring that all the
/// workers actually finish their tasks
pub fn execute_and_finish_iter(self, iter: impl IntoParallelIterator<Item = UIn>) -> bool {
let ret = self.execute_iter(iter);
pub fn execute_and_finish_iter(self, iter: impl IntoParallelIterator<Item = UIn>) {
self.execute_iter(iter);
drop(self);
ret
}
/// Initialize a new [`Workpool`] with the default count of threads. This is equal
/// to 2 * the number of logical cores.
@ -328,7 +360,7 @@ where
on_exit: Ex,
needs_iterator_pool: bool,
expected_max_sends: Option<usize>,
) -> Self {
) -> WorkpoolResult<Self> {
// we'll naively use the number of CPUs present on the system times 2 to determine
// the number of workers (sure the scheduler does tricks all the time)
let worker_count = thread::available_parallelism().map_or(1, usize::from) * 2;

@ -26,8 +26,6 @@
*
*/
use crate::error::Error;
use {
super::{
report::{AggregateReport, SingleReport},
@ -61,9 +59,8 @@ where
{
// now do our runs
let mut loopmon = loopmon;
let mut run_okay = true;
while loopmon.should_continue() && run_okay {
while loopmon.should_continue() {
// now create our connection pool
let pool = Workpool::new(
bench_config.server.connections(),
@ -72,7 +69,7 @@ where
on_loop_exit.clone(),
true,
Some(bench_config.query_count()),
);
)?;
// get our local copy
let this_packets = packets.clone();
@ -80,28 +77,21 @@ where
// run and time our operations
let mut dt = SimpleTimer::new();
dt.start();
let ok = pool.execute_and_finish_iter(this_packets);
pool.execute_and_finish_iter(this_packets);
dt.stop();
loopmon.incr_time(&dt);
run_okay = ok;
// cleanup
loopmon.cleanup()?;
loopmon.step();
}
if run_okay {
// save time
reports.push(SingleReport::new(
loopmon.name(),
loopmon.sum() as f64 / bench_config.runs() as f64,
));
Ok(())
} else {
Err(Error::RuntimeError(format!(
"Worker thread for test `{}` crashed. Unable to send job",
loopmon.name()
)))
}
// save time
reports.push(SingleReport::new(
loopmon.name(),
loopmon.sum() as f64 / bench_config.runs() as f64,
));
Ok(())
}
#[inline(always)]

@ -24,9 +24,11 @@
*
*/
use std::collections::TryReserveError;
use {skytable::error::Error as SkyError, std::fmt::Display};
use {
libstress::WorkpoolError,
skytable::error::Error as SkyError,
std::{collections::TryReserveError, fmt::Display},
};
pub type BResult<T> = Result<T, Error>;
@ -61,3 +63,9 @@ impl From<TryReserveError> for Error {
Error::RuntimeError(format!("memory reserve error: {}", e.to_string()))
}
}
impl From<WorkpoolError> for Error {
fn from(e: WorkpoolError) -> Self {
Error::RuntimeError(format!("threadpool error: {}", e))
}
}

@ -142,7 +142,8 @@ pub fn stress_linearity_concurrent_clients_set(
|_| {},
true,
Some(DEFAULT_QUERY_COUNT),
);
)
.unwrap();
let mut timer = SimpleTimer::new();
timer.start();
workpool.execute_and_finish_iter(set_packs);
@ -200,7 +201,8 @@ pub fn stress_linearity_concurrent_clients_get(
|_| {},
true,
Some(DEFAULT_QUERY_COUNT),
);
)
.unwrap();
workpool.execute_and_finish_iter(set_packs);
// initialize the linearity counter
@ -225,7 +227,8 @@ pub fn stress_linearity_concurrent_clients_get(
|_| {},
true,
Some(DEFAULT_QUERY_COUNT),
);
)
.unwrap();
let mut timer = SimpleTimer::new();
timer.start();
wp.execute_and_finish_iter(get_packs);

Loading…
Cancel
Save