From 1b8fda51fc23eb5ce8c58ba48402cb7f525a9634 Mon Sep 17 00:00:00 2001 From: Sayan Nandan Date: Thu, 17 Jun 2021 10:41:29 +0530 Subject: [PATCH] Use parallel iterators for benches --- Cargo.lock | 86 ++++++++++++++++++++++++++++++++++++++ libstress/Cargo.toml | 1 + libstress/src/lib.rs | 19 ++++----- sky-bench/Cargo.toml | 1 + sky-bench/src/benchtool.rs | 31 +++++++------- sky-bench/src/testkey.rs | 2 +- stress-test/src/main.rs | 2 +- 7 files changed, 116 insertions(+), 26 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 86e1b572..a3287557 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -108,6 +108,50 @@ dependencies = [ "winapi", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06ed27e177f16d65f0f0c22a213e17c696ace5dd64b14258b52f9417ccb52db4" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-deque" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94af6efb46fef72616855b036a624cf27ba656ffc9be1b9a3c931cfc7749a9a9" +dependencies = [ + "cfg-if", + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ec02e091aa634e2c3ada4a392989e7c3116673ef0ac5b72232439094d73b7fd" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "lazy_static", + "memoffset", + "scopeguard", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d82cfc11ce7f2c3faef78d8a684447b40d503d9681acebed6cb728d45940c4db" +dependencies = [ + "cfg-if", + "lazy_static", +] + [[package]] name = "crossterm" version = "0.20.0" @@ -171,6 +215,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "either" +version = "1.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457" + [[package]] name = "endian-type" version = "0.1.2" @@ -413,6 +463,7 @@ dependencies = [ name = "libstress" version = "0.1.0" dependencies = [ + "crossbeam-channel", "num_cpus", ] @@ -440,6 +491,15 @@ version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b16bd47d9e329435e309c58469fe0791c2d0d1ba96ec0954152a5ae2b04387dc" +[[package]] +name = "memoffset" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59accc507f1338036a0477ef61afdae33cde60840f4dfe481319ce3ad116ddf9" +dependencies = [ + "autocfg", +] + [[package]] name = "mio" version = "0.7.11" @@ -713,6 +773,31 @@ dependencies = [ "rand_core", ] +[[package]] +name = "rayon" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c06aca804d41dbc8ba42dfd964f0d01334eceb64314b9ecf7c5fad5188a06d90" +dependencies = [ + "autocfg", + "crossbeam-deque", + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d78120e2c850279833f1dd3582f730c4ab53ed95aeaaaa862a2a5c71b1656d8e" +dependencies = [ + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-utils", + "lazy_static", + "num_cpus", +] + [[package]] name = "redox_syscall" version = "0.2.8" @@ -855,6 +940,7 @@ dependencies = [ "libsky", "libstress", "rand", + "rayon", "serde", "serde_json", "skytable", diff --git a/libstress/Cargo.toml b/libstress/Cargo.toml index 9a3e7af5..b761570d 100644 --- a/libstress/Cargo.toml +++ b/libstress/Cargo.toml @@ -9,3 +9,4 @@ edition = "2018" [dependencies] # external deps num_cpus = "1.13.0" +crossbeam-channel = "0.5.1" diff --git a/libstress/src/lib.rs b/libstress/src/lib.rs index b7a817fc..62bc121d 100644 --- a/libstress/src/lib.rs +++ b/libstress/src/lib.rs @@ -28,9 +28,9 @@ #![deny(unused_imports)] use core::marker::PhantomData; -use std::sync::mpsc; -use std::sync::Arc; -use std::sync::Mutex; +use crossbeam_channel::unbounded; +use crossbeam_channel::Receiver as CReceiver; +use crossbeam_channel::Sender as CSender; use std::thread; /// A Job. The UIn type parameter is the type that will be used to execute the action @@ -52,7 +52,7 @@ struct Worker { impl Worker { /// Initialize a new worker fn new( - job_receiver: Arc>>>, + job_receiver: CReceiver>, init_pre_loop_var: impl Fn() -> Inp + 'static + Send, on_exit: impl Fn(&mut Inp) + Send + 'static, on_loop: impl Fn(&mut Inp, UIn) + Send + Sync + 'static, @@ -64,7 +64,7 @@ impl Worker { let on_loop = on_loop; let mut pre_loop_var = init_pre_loop_var(); loop { - let action = job_receiver.lock().unwrap().recv().unwrap(); + let action = job_receiver.recv().unwrap(); match action { JobType::Task(tsk) => on_loop(&mut pre_loop_var, tsk), JobType::Nothing => { @@ -113,7 +113,7 @@ pub struct Workpool { /// the workers workers: Vec, /// the sender that sends jobs - job_distributor: mpsc::Sender>, + job_distributor: CSender>, /// the function that sets the pre-loop variable init_pre_loop_var: Lv, /// the function to be executed on worker termination @@ -136,12 +136,11 @@ where if count == 0 { panic!("Runtime panic: Bad value `0` for thread count"); } - let (sender, receiver) = mpsc::channel(); - let receiver = Arc::new(Mutex::new(receiver)); + let (sender, receiver) = unbounded(); let mut workers = Vec::with_capacity(count); for _ in 0..count { workers.push(Worker::new( - Arc::clone(&receiver), + receiver.clone(), init_pre_loop_var.clone(), on_exit.clone(), on_loop.clone(), @@ -157,7 +156,7 @@ where } } /// Execute something - pub fn execute(&mut self, inp: UIn) { + pub fn execute(&self, inp: UIn) { self.job_distributor.send(JobType::Task(inp)).unwrap(); } pub fn new_default_threads(init_pre_loop_var: Lv, on_loop: Lp, on_exit: Ex) -> Self { diff --git a/sky-bench/Cargo.toml b/sky-bench/Cargo.toml index 8463db3e..f721da90 100644 --- a/sky-bench/Cargo.toml +++ b/sky-bench/Cargo.toml @@ -17,3 +17,4 @@ devtimer = "4.0.1" clap = { version="2.33.3", features=["yaml"] } serde = { version="1.0.126", features=["derive"] } serde_json = "1.0.64" +rayon = "1.5.1" diff --git a/sky-bench/src/benchtool.rs b/sky-bench/src/benchtool.rs index 34f269a8..a10ff054 100644 --- a/sky-bench/src/benchtool.rs +++ b/sky-bench/src/benchtool.rs @@ -32,6 +32,7 @@ use crate::util::JSONReportBlock; use devtimer::DevTime; use libstress::Workpool; use rand::thread_rng; +use rayon::prelude::*; use std::io::{Read, Write}; use std::net::TcpStream; @@ -44,6 +45,10 @@ pub fn runner( packet_size: usize, json_out: bool, ) { + rayon::ThreadPoolBuilder::new() + .num_threads(max_connections) + .build_global() + .unwrap(); sanity_test!(host, port); if !json_out { println!( @@ -57,7 +62,7 @@ pub fn runner( let mut rand = thread_rng(); let mut dt = DevTime::new_complex(); // Create separate connection pools for get and set operations - let mut setpool = Workpool::new( + let setpool = Workpool::new( max_connections, move || TcpStream::connect(host.clone()).unwrap(), |sock, packet: Vec| { @@ -69,8 +74,8 @@ pub fn runner( socket.shutdown(std::net::Shutdown::Both).unwrap(); }, ); - let mut getpool = setpool.clone(); - let mut delpool = getpool.clone(); + let getpool = setpool.clone(); + let delpool = getpool.clone(); let keys: Vec = (0..max_queries) .into_iter() .map(|_| ran_string(packet_size, &mut rand)) @@ -103,26 +108,24 @@ pub fn runner( } dt.create_timer("SET").unwrap(); dt.start_timer("SET").unwrap(); - for packet in set_packs { - setpool.execute(packet); - } + set_packs + .into_par_iter() + .for_each(|packet| setpool.execute(packet)); drop(setpool); dt.stop_timer("SET").unwrap(); dt.create_timer("GET").unwrap(); dt.start_timer("GET").unwrap(); - for packet in get_packs { - getpool.execute(packet); - } + get_packs + .into_par_iter() + .for_each(|packet| getpool.execute(packet)); drop(getpool); dt.stop_timer("GET").unwrap(); if !json_out { println!("Benchmark completed! Removing created keys..."); } - // Create a connection pool for del operations - // Delete all the created keys - for packet in del_packs { - delpool.execute(packet); - } + del_packs + .into_par_iter() + .for_each(|packet| delpool.execute(packet)); drop(delpool); let gets_per_sec = calc(max_queries, dt.time_in_nanos("GET").unwrap()); let sets_per_sec = calc(max_queries, dt.time_in_nanos("SET").unwrap()); diff --git a/sky-bench/src/testkey.rs b/sky-bench/src/testkey.rs index 40ad2c8e..2264536c 100644 --- a/sky-bench/src/testkey.rs +++ b/sky-bench/src/testkey.rs @@ -36,7 +36,7 @@ pub fn create_testkeys(host: &str, port: u16, num: usize, connections: usize, si sanity_test!(host, port); let host = hoststr!(host, port); let mut rand = thread_rng(); - let mut np = Workpool::new( + let np = Workpool::new( connections, move || TcpStream::connect(host.clone()).unwrap(), |sock, packet: Vec| { diff --git a/stress-test/src/main.rs b/stress-test/src/main.rs index f700561d..9bf693cb 100644 --- a/stress-test/src/main.rs +++ b/stress-test/src/main.rs @@ -33,7 +33,7 @@ use skytable::Query; use skytable::{Element, Response}; fn main() { - let mut pool = Workpool::new( + let pool = Workpool::new( 10, || Connection::new("127.0.0.1", 2003).unwrap(), |con, query| {