Use parallel iterators for benches

next
Sayan Nandan 3 years ago
parent a87478dcba
commit 1b8fda51fc

86
Cargo.lock generated

@ -108,6 +108,50 @@ dependencies = [
"winapi",
]
[[package]]
name = "crossbeam-channel"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06ed27e177f16d65f0f0c22a213e17c696ace5dd64b14258b52f9417ccb52db4"
dependencies = [
"cfg-if",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-deque"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94af6efb46fef72616855b036a624cf27ba656ffc9be1b9a3c931cfc7749a9a9"
dependencies = [
"cfg-if",
"crossbeam-epoch",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-epoch"
version = "0.9.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ec02e091aa634e2c3ada4a392989e7c3116673ef0ac5b72232439094d73b7fd"
dependencies = [
"cfg-if",
"crossbeam-utils",
"lazy_static",
"memoffset",
"scopeguard",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d82cfc11ce7f2c3faef78d8a684447b40d503d9681acebed6cb728d45940c4db"
dependencies = [
"cfg-if",
"lazy_static",
]
[[package]]
name = "crossterm"
version = "0.20.0"
@ -171,6 +215,12 @@ dependencies = [
"winapi",
]
[[package]]
name = "either"
version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457"
[[package]]
name = "endian-type"
version = "0.1.2"
@ -413,6 +463,7 @@ dependencies = [
name = "libstress"
version = "0.1.0"
dependencies = [
"crossbeam-channel",
"num_cpus",
]
@ -440,6 +491,15 @@ version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b16bd47d9e329435e309c58469fe0791c2d0d1ba96ec0954152a5ae2b04387dc"
[[package]]
name = "memoffset"
version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59accc507f1338036a0477ef61afdae33cde60840f4dfe481319ce3ad116ddf9"
dependencies = [
"autocfg",
]
[[package]]
name = "mio"
version = "0.7.11"
@ -713,6 +773,31 @@ dependencies = [
"rand_core",
]
[[package]]
name = "rayon"
version = "1.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c06aca804d41dbc8ba42dfd964f0d01334eceb64314b9ecf7c5fad5188a06d90"
dependencies = [
"autocfg",
"crossbeam-deque",
"either",
"rayon-core",
]
[[package]]
name = "rayon-core"
version = "1.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d78120e2c850279833f1dd3582f730c4ab53ed95aeaaaa862a2a5c71b1656d8e"
dependencies = [
"crossbeam-channel",
"crossbeam-deque",
"crossbeam-utils",
"lazy_static",
"num_cpus",
]
[[package]]
name = "redox_syscall"
version = "0.2.8"
@ -855,6 +940,7 @@ dependencies = [
"libsky",
"libstress",
"rand",
"rayon",
"serde",
"serde_json",
"skytable",

@ -9,3 +9,4 @@ edition = "2018"
[dependencies]
# external deps
num_cpus = "1.13.0"
crossbeam-channel = "0.5.1"

@ -28,9 +28,9 @@
#![deny(unused_imports)]
use core::marker::PhantomData;
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
use crossbeam_channel::unbounded;
use crossbeam_channel::Receiver as CReceiver;
use crossbeam_channel::Sender as CSender;
use std::thread;
/// A Job. The UIn type parameter is the type that will be used to execute the action
@ -52,7 +52,7 @@ struct Worker {
impl Worker {
/// Initialize a new worker
fn new<Inp: 'static, UIn>(
job_receiver: Arc<Mutex<mpsc::Receiver<JobType<UIn>>>>,
job_receiver: CReceiver<JobType<UIn>>,
init_pre_loop_var: impl Fn() -> Inp + 'static + Send,
on_exit: impl Fn(&mut Inp) + Send + 'static,
on_loop: impl Fn(&mut Inp, UIn) + Send + Sync + 'static,
@ -64,7 +64,7 @@ impl Worker {
let on_loop = on_loop;
let mut pre_loop_var = init_pre_loop_var();
loop {
let action = job_receiver.lock().unwrap().recv().unwrap();
let action = job_receiver.recv().unwrap();
match action {
JobType::Task(tsk) => on_loop(&mut pre_loop_var, tsk),
JobType::Nothing => {
@ -113,7 +113,7 @@ pub struct Workpool<Inp, UIn, Lv, Lp, Ex> {
/// the workers
workers: Vec<Worker>,
/// the sender that sends jobs
job_distributor: mpsc::Sender<JobType<UIn>>,
job_distributor: CSender<JobType<UIn>>,
/// the function that sets the pre-loop variable
init_pre_loop_var: Lv,
/// the function to be executed on worker termination
@ -136,12 +136,11 @@ where
if count == 0 {
panic!("Runtime panic: Bad value `0` for thread count");
}
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let (sender, receiver) = unbounded();
let mut workers = Vec::with_capacity(count);
for _ in 0..count {
workers.push(Worker::new(
Arc::clone(&receiver),
receiver.clone(),
init_pre_loop_var.clone(),
on_exit.clone(),
on_loop.clone(),
@ -157,7 +156,7 @@ where
}
}
/// Execute something
pub fn execute(&mut self, inp: UIn) {
pub fn execute(&self, inp: UIn) {
self.job_distributor.send(JobType::Task(inp)).unwrap();
}
pub fn new_default_threads(init_pre_loop_var: Lv, on_loop: Lp, on_exit: Ex) -> Self {

@ -17,3 +17,4 @@ devtimer = "4.0.1"
clap = { version="2.33.3", features=["yaml"] }
serde = { version="1.0.126", features=["derive"] }
serde_json = "1.0.64"
rayon = "1.5.1"

@ -32,6 +32,7 @@ use crate::util::JSONReportBlock;
use devtimer::DevTime;
use libstress::Workpool;
use rand::thread_rng;
use rayon::prelude::*;
use std::io::{Read, Write};
use std::net::TcpStream;
@ -44,6 +45,10 @@ pub fn runner(
packet_size: usize,
json_out: bool,
) {
rayon::ThreadPoolBuilder::new()
.num_threads(max_connections)
.build_global()
.unwrap();
sanity_test!(host, port);
if !json_out {
println!(
@ -57,7 +62,7 @@ pub fn runner(
let mut rand = thread_rng();
let mut dt = DevTime::new_complex();
// Create separate connection pools for get and set operations
let mut setpool = Workpool::new(
let setpool = Workpool::new(
max_connections,
move || TcpStream::connect(host.clone()).unwrap(),
|sock, packet: Vec<u8>| {
@ -69,8 +74,8 @@ pub fn runner(
socket.shutdown(std::net::Shutdown::Both).unwrap();
},
);
let mut getpool = setpool.clone();
let mut delpool = getpool.clone();
let getpool = setpool.clone();
let delpool = getpool.clone();
let keys: Vec<String> = (0..max_queries)
.into_iter()
.map(|_| ran_string(packet_size, &mut rand))
@ -103,26 +108,24 @@ pub fn runner(
}
dt.create_timer("SET").unwrap();
dt.start_timer("SET").unwrap();
for packet in set_packs {
setpool.execute(packet);
}
set_packs
.into_par_iter()
.for_each(|packet| setpool.execute(packet));
drop(setpool);
dt.stop_timer("SET").unwrap();
dt.create_timer("GET").unwrap();
dt.start_timer("GET").unwrap();
for packet in get_packs {
getpool.execute(packet);
}
get_packs
.into_par_iter()
.for_each(|packet| getpool.execute(packet));
drop(getpool);
dt.stop_timer("GET").unwrap();
if !json_out {
println!("Benchmark completed! Removing created keys...");
}
// Create a connection pool for del operations
// Delete all the created keys
for packet in del_packs {
delpool.execute(packet);
}
del_packs
.into_par_iter()
.for_each(|packet| delpool.execute(packet));
drop(delpool);
let gets_per_sec = calc(max_queries, dt.time_in_nanos("GET").unwrap());
let sets_per_sec = calc(max_queries, dt.time_in_nanos("SET").unwrap());

@ -36,7 +36,7 @@ pub fn create_testkeys(host: &str, port: u16, num: usize, connections: usize, si
sanity_test!(host, port);
let host = hoststr!(host, port);
let mut rand = thread_rng();
let mut np = Workpool::new(
let np = Workpool::new(
connections,
move || TcpStream::connect(host.clone()).unwrap(),
|sock, packet: Vec<u8>| {

@ -33,7 +33,7 @@ use skytable::Query;
use skytable::{Element, Response};
fn main() {
let mut pool = Workpool::new(
let pool = Workpool::new(
10,
|| Connection::new("127.0.0.1", 2003).unwrap(),
|con, query| {

Loading…
Cancel
Save