diff --git a/libstress/src/lib.rs b/libstress/src/lib.rs index 7852e3cc..c610e231 100644 --- a/libstress/src/lib.rs +++ b/libstress/src/lib.rs @@ -60,9 +60,9 @@ pub mod traits; use core::marker::PhantomData; -use crossbeam_channel::unbounded; use crossbeam_channel::Receiver as CReceiver; use crossbeam_channel::Sender as CSender; +use crossbeam_channel::{bounded, unbounded}; pub use rayon; use rayon::prelude::*; use std::thread; @@ -139,6 +139,8 @@ where _marker: PhantomData<(Inp, UIn)>, /// check if self needs a pool for parallel iterators needs_iterator_pool: bool, + /// expected maximum number of jobs + expected_max_sends: Option, } impl PoolConfig @@ -156,6 +158,7 @@ where on_loop: Lp, on_exit: Ex, needs_iterator_pool: bool, + expected_max_sends: Option, ) -> Self { Self { count, @@ -164,6 +167,7 @@ where on_exit, needs_iterator_pool, _marker: PhantomData, + expected_max_sends, } } /// Get a new [`Workpool`] from the current config @@ -178,6 +182,7 @@ where self.on_loop.clone(), self.on_exit.clone(), self.needs_iterator_pool, + self.expected_max_sends, ) } /// Get a [`Workpool`] with the base config but with a custom loop-stage closure @@ -191,6 +196,7 @@ where lp, self.on_exit.clone(), self.needs_iterator_pool, + self.expected_max_sends, ) } } @@ -210,6 +216,7 @@ where self.on_loop.clone(), self.on_exit.clone(), self.needs_iterator_pool, + self.expected_max_sends, ) } } @@ -248,6 +255,8 @@ pub struct Workpool { _marker: PhantomData, /// check if self needs a pool for parallel iterators needs_iterator_pool: bool, + /// expected maximum number of sends + expected_max_sends: Option, } impl Workpool @@ -265,6 +274,7 @@ where on_loop: Lp, on_exit: Ex, needs_iterator_pool: bool, + expected_max_sends: Option, ) -> Self { if needs_iterator_pool { // initialize a global threadpool for parallel iterators @@ -275,7 +285,10 @@ where if count == 0 { panic!("Runtime panic: Bad value `0` for thread count"); } - let (sender, receiver) = unbounded(); + let (sender, receiver) = match expected_max_sends { + Some(limit) => bounded(limit), + None => unbounded(), + }; let mut workers = Vec::with_capacity(count); for _ in 0..count { workers.push(Worker::new( @@ -293,6 +306,7 @@ where on_loop, _marker: PhantomData, needs_iterator_pool, + expected_max_sends, } } /// Execute something @@ -318,6 +332,7 @@ where on_loop: Lp, on_exit: Ex, needs_iterator_pool: bool, + expected_max_sends: Option, ) -> Self { // we'll naively use the number of CPUs present on the system times 2 to determine // the number of workers (sure the scheduler does tricks all the time) @@ -328,6 +343,7 @@ where on_loop, on_exit, needs_iterator_pool, + expected_max_sends, ) } } diff --git a/sky-bench/src/benchtool.rs b/sky-bench/src/benchtool.rs index 6cc5f4c0..19b68101 100644 --- a/sky-bench/src/benchtool.rs +++ b/sky-bench/src/benchtool.rs @@ -163,6 +163,7 @@ pub fn runner( socket.shutdown(std::net::Shutdown::Both).unwrap(); }, true, + Some(max_queries) ); // create table diff --git a/sky-bench/src/testkey.rs b/sky-bench/src/testkey.rs index 14a878ce..638cd5f4 100644 --- a/sky-bench/src/testkey.rs +++ b/sky-bench/src/testkey.rs @@ -50,6 +50,7 @@ pub fn create_testkeys(host: &str, port: u16, num: usize, connections: usize, si socket.shutdown(net::Shutdown::Both).unwrap(); }, true, + Some(connections) ); println!("Generating keys ..."); let keys: Vec = (0..num) diff --git a/stress-test/src/linearity_client.rs b/stress-test/src/linearity_client.rs index 6cb517f7..ffb47ab1 100644 --- a/stress-test/src/linearity_client.rs +++ b/stress-test/src/linearity_client.rs @@ -139,6 +139,7 @@ pub fn stress_linearity_concurrent_clients_set( }, |_| {}, true, + Some(DEFAULT_QUERY_COUNT), ); let mut timer = SimpleTimer::new(); timer.start(); @@ -189,6 +190,7 @@ pub fn stress_linearity_concurrent_clients_get( }, |_| {}, true, + Some(DEFAULT_QUERY_COUNT), ); workpool.execute_and_finish_iter(set_packs); @@ -213,6 +215,7 @@ pub fn stress_linearity_concurrent_clients_get( }, |_| {}, true, + Some(DEFAULT_QUERY_COUNT), ); let mut timer = SimpleTimer::new(); timer.start();