From 6b4d27cf4331f6173edea3933efeac857400b53c Mon Sep 17 00:00:00 2001 From: Sayan Nandan Date: Wed, 16 Jun 2021 20:07:21 +0530 Subject: [PATCH] Add `libstress` and `Workpool` object The `Workpool` is a more generic version of the Netpool that we used, making it modular. --- Cargo.lock | 5 ++ Cargo.toml | 3 +- libstress/Cargo.toml | 9 +++ libstress/src/lib.rs | 173 ++++++++++++++++++++++++++++++++++++++++++ sky-bench/Cargo.toml | 1 + sky-bench/src/main.rs | 118 +++++++--------------------- 6 files changed, 216 insertions(+), 93 deletions(-) create mode 100644 libstress/Cargo.toml create mode 100644 libstress/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index cc14f48c..ad373dc2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -410,6 +410,10 @@ dependencies = [ "termcolor", ] +[[package]] +name = "libstress" +version = "0.1.0" + [[package]] name = "lock_api" version = "0.4.4" @@ -838,6 +842,7 @@ dependencies = [ "devtimer", "lazy_static", "libsky", + "libstress", "rand", "regex", "serde", diff --git a/Cargo.toml b/Cargo.toml index 0185f9ac..3c9ceeea 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,8 @@ members = [ "server", "libsky", "sky-bench", - "sky-macros" + "sky-macros", + "libstress" ] [profile.release] diff --git a/libstress/Cargo.toml b/libstress/Cargo.toml new file mode 100644 index 00000000..b9b37711 --- /dev/null +++ b/libstress/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "libstress" +version = "0.1.0" +authors = ["Sayan Nandan "] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] diff --git a/libstress/src/lib.rs b/libstress/src/lib.rs new file mode 100644 index 00000000..adf3cc5b --- /dev/null +++ b/libstress/src/lib.rs @@ -0,0 +1,173 @@ +/* + * Created on Wed Jun 16 2021 + * + * This file is a part of Skytable + * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source + * NoSQL database written by Sayan Nandan ("the Author") with the + * vision to provide flexibility in data modelling without compromising + * on performance, queryability or scalability. + * + * Copyright (c) 2021, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +use core::marker::PhantomData; +use std::sync::mpsc; +use std::sync::Arc; +use std::sync::Mutex; +use std::thread; + +/// A Job. The UIn type parameter is the type that will be used to execute the action +/// Nothing is a variant used by the drop implementation to terminate all the workers +/// and call the exit_loop function +pub enum JobType { + Task(UIn), + Nothing, +} + +/// A worker +/// +/// The only reason we use option is to reduce the effort needed to implement [`Drop`] for the +/// [`Workpool`] +struct Worker { + thread: Option>, +} + +impl Worker { + /// Initialize a new worker + fn new( + job_receiver: Arc>>>, + init_pre_loop_var: impl Fn() -> Inp + 'static + Send, + on_exit: impl Fn(&Inp) + Send + 'static, + on_loop: impl Fn(&Inp, UIn) + Send + Sync + 'static, + ) -> Self + where + UIn: Send + Sync + 'static, + { + let thread = thread::spawn(move || { + let on_loop = on_loop; + let pre_loop_var = init_pre_loop_var(); + loop { + let action = job_receiver.lock().unwrap().recv().unwrap(); + match action { + JobType::Task(tsk) => on_loop(&pre_loop_var, tsk), + JobType::Nothing => { + on_exit(&pre_loop_var); + break; + } + } + } + }); + Self { + thread: Some(thread), + } + } +} + +impl Clone for Workpool +where + UIn: Send + Sync + 'static, + Ex: Fn(&Inp) + Send + Sync + 'static + Clone, + Lv: Fn() -> Inp + Send + Sync + 'static + Clone, + Lp: Fn(&Inp, UIn) + Clone + Send + Sync + 'static, +{ + fn clone(&self) -> Self { + Workpool::new( + self.workers.len(), + self.init_pre_loop_var.clone(), + self.on_loop.clone(), + self.on_exit.clone(), + ) + } +} + +/// # Workpool +/// +/// A Workpool is a generic synchronous thread pool that can be used to perform, well, anything. +/// A workpool has to be initialized with the number of workers, the pre_loop_variable (set this +/// to None if there isn't any). what to do on loop and what to do on exit of each worker. The +/// closures are kept as `Clone`able types just to reduce complexity with copy (we were lazy). +/// +/// ## Clones +/// +/// Workpool clones simply create a new workpool with the same on_exit, on_loop and init_pre_loop_var +/// configurations. This provides a very convenient interface if one desires to use multiple workpools +/// to do the _same kind of thing_ +pub struct Workpool { + /// the workers + workers: Vec, + /// the sender that sends jobs + job_distributor: mpsc::Sender>, + /// the function that sets the pre-loop variable + init_pre_loop_var: Lv, + /// the function to be executed on worker termination + on_exit: Ex, + /// the function to be executed on loop + on_loop: Lp, + /// a marker for `Inp` since no parameters use it directly + _marker: PhantomData, +} + +impl Workpool +where + UIn: Send + Sync + 'static, + Ex: Fn(&Inp) + Send + Sync + 'static + Clone, + Lv: Fn() -> Inp + Send + Sync + 'static + Clone, + Lp: Fn(&Inp, UIn) + Send + Sync + 'static + Clone, +{ + /// Create a new workpool + pub fn new(count: usize, init_pre_loop_var: Lv, on_loop: Lp, on_exit: Ex) -> Self { + if count == 0 { + panic!("Runtime panic: Bad value `0` for thread count"); + } + let (sender, receiver) = mpsc::channel(); + let receiver = Arc::new(Mutex::new(receiver)); + let mut workers = Vec::with_capacity(count); + for _ in 0..count { + workers.push(Worker::new( + Arc::clone(&receiver), + init_pre_loop_var.clone(), + on_exit.clone(), + on_loop.clone(), + )); + } + Self { + workers, + job_distributor: sender, + init_pre_loop_var, + on_exit, + on_loop, + _marker: PhantomData, + } + } + /// Execute something + pub fn execute(&mut self, inp: UIn) { + self.job_distributor.send(JobType::Task(inp)).unwrap(); + } +} + +impl Drop for Workpool { + fn drop(&mut self) { + for _ in &mut self.workers { + self.job_distributor.send(JobType::Nothing).unwrap(); + } + for worker in &mut self.workers { + if let Some(thread) = worker.thread.take() { + thread.join().unwrap() + } + } + } +} diff --git a/sky-bench/Cargo.toml b/sky-bench/Cargo.toml index e551a94d..c9b35390 100644 --- a/sky-bench/Cargo.toml +++ b/sky-bench/Cargo.toml @@ -16,3 +16,4 @@ serde_json = "1.0.64" regex = "1.5.4" lazy_static = "1.4.0" libsky = { path = "../libsky" } +libstress = {path = "../libstress"} diff --git a/sky-bench/src/main.rs b/sky-bench/src/main.rs index e53d1d86..cfa68619 100644 --- a/sky-bench/src/main.rs +++ b/sky-bench/src/main.rs @@ -31,100 +31,13 @@ mod benchtool { use clap::{load_yaml, App}; use devtimer::DevTime; + use libstress::Workpool; use rand::distributions::Alphanumeric; use rand::thread_rng; use serde::Serialize; use std::error::Error; use std::io::prelude::*; use std::net::{self, TcpStream}; - use std::sync::mpsc; - use std::sync::Arc; - use std::sync::Mutex; - use std::thread; - /// A Netpool is a threadpool that holds several workers - /// - /// Essentially, a `NetPool` is a connection pool - pub struct Netpool { - workers: Vec, - sender: mpsc::Sender, - } - /// The job - /// - /// A `NewJob` has a `Vec` field for the bytes it has to write to a stream held - /// by the worker. If the `Job` is `Nothing`, then it is time for the worker - /// to terminate - enum WhatToDo { - NewJob(Vec), - Nothing, - } - /// A worker holds a thread which also holds a persistent connection to - /// `localhost:2003`, as long as the thread is not told to terminate - struct Worker { - thread: Option>, - } - impl Netpool { - /// Create a new `Netpool` instance with `size` number of connections (and threads) - pub fn new(size: usize, host: &str) -> Netpool { - assert!(size > 0); - let (sender, receiver) = mpsc::channel(); - let receiver = Arc::new(Mutex::new(receiver)); - let mut workers = Vec::with_capacity(size); - for _ in 0..size { - workers.push(Worker::new(Arc::clone(&receiver), host.to_owned())); - } - Netpool { workers, sender } - } - /// Execute the job - pub fn execute(&mut self, action: Vec) { - self.sender.send(WhatToDo::NewJob(action)).unwrap(); - } - } - impl Worker { - /// Create a new `Worker` which also means that a connection to port 2003 - /// will be established - fn new( - receiver: Arc>>, - host: std::string::String, - ) -> Worker { - let thread = thread::spawn(move || { - let mut connection = TcpStream::connect(host).unwrap(); - loop { - let action = receiver.lock().unwrap().recv().unwrap(); - match action { - WhatToDo::NewJob(someaction) => { - // We have to write something to the socket - connection.write_all(&someaction).unwrap(); - // Ignore whatever we get, we don't need them - let _ = connection.read(&mut vec![0; 1024]).unwrap(); - } - WhatToDo::Nothing => { - // A termination signal - just close the stream and - // return - connection.shutdown(net::Shutdown::Both).unwrap(); - break; - } - } - } - }); - Worker { - thread: Some(thread), - } - } - } - impl Drop for Netpool { - fn drop(&mut self) { - // Signal all the workers to shut down - for _ in &mut self.workers { - self.sender.send(WhatToDo::Nothing).unwrap(); - } - // Terminate all the threads - for worker in &mut self.workers { - if let Some(thread) = worker.thread.take() { - thread.join().unwrap(); - } - } - } - } #[derive(Serialize)] /// A `JSONReportBlock` represents a JSON object which contains the type of report @@ -185,7 +98,17 @@ mod benchtool { if let Some(matches) = matches.subcommand_matches("testkey") { let numkeys = matches.value_of("count").unwrap(); if let Ok(num) = numkeys.parse::() { - let mut np = Netpool::new(10, &host); + let mut np = Workpool::new( + 10, + move || TcpStream::connect(host.clone()).unwrap(), + |mut sock, packet: Vec| { + sock.write_all(&packet).unwrap(); + let _ = sock.read(&mut vec![0; 1024]).unwrap(); + }, + |socket| { + socket.shutdown(net::Shutdown::Both).unwrap(); + }, + ); println!("Generating keys ..."); let keys: Vec = (0..num) .into_iter() @@ -238,8 +161,20 @@ mod benchtool { let mut rand = thread_rng(); let mut dt = DevTime::new_complex(); // Create separate connection pools for get and set operations - let mut setpool = Netpool::new(max_connections, &host); - let mut getpool = Netpool::new(max_connections, &host); + let mut setpool = Workpool::new( + 10, + move || TcpStream::connect(host.clone()).unwrap(), + |mut sock, packet: Vec| { + sock.write_all(&packet).unwrap(); + // we don't care much about what's returned + let _ = sock.read(&mut vec![0; 1024]).unwrap(); + }, + |socket| { + socket.shutdown(std::net::Shutdown::Both).unwrap(); + }, + ); + let mut getpool = setpool.clone(); + let mut delpool = getpool.clone(); let keys: Vec = (0..max_queries) .into_iter() .map(|_| ran_string(packet_size, &mut rand)) @@ -284,7 +219,6 @@ mod benchtool { dt.stop_timer("GET").unwrap(); eprintln!("Benchmark completed! Removing created keys..."); // Create a connection pool for del operations - let mut delpool = Netpool::new(max_connections, &host); // Delete all the created keys for packet in del_packs { delpool.execute(packet);