Use bounded channel when feasible

next
Sayan Nandan 3 years ago
parent 2175a3c34e
commit 35bc94f3f1

@ -60,9 +60,9 @@
pub mod traits; pub mod traits;
use core::marker::PhantomData; use core::marker::PhantomData;
use crossbeam_channel::unbounded;
use crossbeam_channel::Receiver as CReceiver; use crossbeam_channel::Receiver as CReceiver;
use crossbeam_channel::Sender as CSender; use crossbeam_channel::Sender as CSender;
use crossbeam_channel::{bounded, unbounded};
pub use rayon; pub use rayon;
use rayon::prelude::*; use rayon::prelude::*;
use std::thread; use std::thread;
@ -139,6 +139,8 @@ where
_marker: PhantomData<(Inp, UIn)>, _marker: PhantomData<(Inp, UIn)>,
/// check if self needs a pool for parallel iterators /// check if self needs a pool for parallel iterators
needs_iterator_pool: bool, needs_iterator_pool: bool,
/// expected maximum number of jobs
expected_max_sends: Option<usize>,
} }
impl<Inp: 'static, UIn, Lv, Lp, Ex> PoolConfig<Inp, UIn, Lv, Lp, Ex> impl<Inp: 'static, UIn, Lv, Lp, Ex> PoolConfig<Inp, UIn, Lv, Lp, Ex>
@ -156,6 +158,7 @@ where
on_loop: Lp, on_loop: Lp,
on_exit: Ex, on_exit: Ex,
needs_iterator_pool: bool, needs_iterator_pool: bool,
expected_max_sends: Option<usize>,
) -> Self { ) -> Self {
Self { Self {
count, count,
@ -164,6 +167,7 @@ where
on_exit, on_exit,
needs_iterator_pool, needs_iterator_pool,
_marker: PhantomData, _marker: PhantomData,
expected_max_sends,
} }
} }
/// Get a new [`Workpool`] from the current config /// Get a new [`Workpool`] from the current config
@ -178,6 +182,7 @@ where
self.on_loop.clone(), self.on_loop.clone(),
self.on_exit.clone(), self.on_exit.clone(),
self.needs_iterator_pool, self.needs_iterator_pool,
self.expected_max_sends,
) )
} }
/// Get a [`Workpool`] with the base config but with a custom loop-stage closure /// Get a [`Workpool`] with the base config but with a custom loop-stage closure
@ -191,6 +196,7 @@ where
lp, lp,
self.on_exit.clone(), self.on_exit.clone(),
self.needs_iterator_pool, self.needs_iterator_pool,
self.expected_max_sends,
) )
} }
} }
@ -210,6 +216,7 @@ where
self.on_loop.clone(), self.on_loop.clone(),
self.on_exit.clone(), self.on_exit.clone(),
self.needs_iterator_pool, self.needs_iterator_pool,
self.expected_max_sends,
) )
} }
} }
@ -248,6 +255,8 @@ pub struct Workpool<Inp, UIn, Lv, Lp, Ex> {
_marker: PhantomData<Inp>, _marker: PhantomData<Inp>,
/// check if self needs a pool for parallel iterators /// check if self needs a pool for parallel iterators
needs_iterator_pool: bool, needs_iterator_pool: bool,
/// expected maximum number of sends
expected_max_sends: Option<usize>,
} }
impl<Inp: 'static, UIn, Lv, Ex, Lp> Workpool<Inp, UIn, Lv, Lp, Ex> impl<Inp: 'static, UIn, Lv, Ex, Lp> Workpool<Inp, UIn, Lv, Lp, Ex>
@ -265,6 +274,7 @@ where
on_loop: Lp, on_loop: Lp,
on_exit: Ex, on_exit: Ex,
needs_iterator_pool: bool, needs_iterator_pool: bool,
expected_max_sends: Option<usize>,
) -> Self { ) -> Self {
if needs_iterator_pool { if needs_iterator_pool {
// initialize a global threadpool for parallel iterators // initialize a global threadpool for parallel iterators
@ -275,7 +285,10 @@ where
if count == 0 { if count == 0 {
panic!("Runtime panic: Bad value `0` for thread count"); panic!("Runtime panic: Bad value `0` for thread count");
} }
let (sender, receiver) = unbounded(); let (sender, receiver) = match expected_max_sends {
Some(limit) => bounded(limit),
None => unbounded(),
};
let mut workers = Vec::with_capacity(count); let mut workers = Vec::with_capacity(count);
for _ in 0..count { for _ in 0..count {
workers.push(Worker::new( workers.push(Worker::new(
@ -293,6 +306,7 @@ where
on_loop, on_loop,
_marker: PhantomData, _marker: PhantomData,
needs_iterator_pool, needs_iterator_pool,
expected_max_sends,
} }
} }
/// Execute something /// Execute something
@ -318,6 +332,7 @@ where
on_loop: Lp, on_loop: Lp,
on_exit: Ex, on_exit: Ex,
needs_iterator_pool: bool, needs_iterator_pool: bool,
expected_max_sends: Option<usize>,
) -> Self { ) -> Self {
// we'll naively use the number of CPUs present on the system times 2 to determine // we'll naively use the number of CPUs present on the system times 2 to determine
// the number of workers (sure the scheduler does tricks all the time) // the number of workers (sure the scheduler does tricks all the time)
@ -328,6 +343,7 @@ where
on_loop, on_loop,
on_exit, on_exit,
needs_iterator_pool, needs_iterator_pool,
expected_max_sends,
) )
} }
} }

@ -163,6 +163,7 @@ pub fn runner(
socket.shutdown(std::net::Shutdown::Both).unwrap(); socket.shutdown(std::net::Shutdown::Both).unwrap();
}, },
true, true,
Some(max_queries)
); );
// create table // create table

@ -50,6 +50,7 @@ pub fn create_testkeys(host: &str, port: u16, num: usize, connections: usize, si
socket.shutdown(net::Shutdown::Both).unwrap(); socket.shutdown(net::Shutdown::Both).unwrap();
}, },
true, true,
Some(connections)
); );
println!("Generating keys ..."); println!("Generating keys ...");
let keys: Vec<String> = (0..num) let keys: Vec<String> = (0..num)

@ -139,6 +139,7 @@ pub fn stress_linearity_concurrent_clients_set(
}, },
|_| {}, |_| {},
true, true,
Some(DEFAULT_QUERY_COUNT),
); );
let mut timer = SimpleTimer::new(); let mut timer = SimpleTimer::new();
timer.start(); timer.start();
@ -189,6 +190,7 @@ pub fn stress_linearity_concurrent_clients_get(
}, },
|_| {}, |_| {},
true, true,
Some(DEFAULT_QUERY_COUNT),
); );
workpool.execute_and_finish_iter(set_packs); workpool.execute_and_finish_iter(set_packs);
@ -213,6 +215,7 @@ pub fn stress_linearity_concurrent_clients_get(
}, },
|_| {}, |_| {},
true, true,
Some(DEFAULT_QUERY_COUNT),
); );
let mut timer = SimpleTimer::new(); let mut timer = SimpleTimer::new();
timer.start(); timer.start();

Loading…
Cancel
Save