Add `libstress` and `Workpool` object

The `Workpool` is a more generic version of the Netpool that we used,
making it modular.
next
Sayan Nandan 3 years ago
parent c2030d676e
commit 6b4d27cf43

5
Cargo.lock generated

@ -410,6 +410,10 @@ dependencies = [
"termcolor",
]
[[package]]
name = "libstress"
version = "0.1.0"
[[package]]
name = "lock_api"
version = "0.4.4"
@ -838,6 +842,7 @@ dependencies = [
"devtimer",
"lazy_static",
"libsky",
"libstress",
"rand",
"regex",
"serde",

@ -4,7 +4,8 @@ members = [
"server",
"libsky",
"sky-bench",
"sky-macros"
"sky-macros",
"libstress"
]
[profile.release]

@ -0,0 +1,9 @@
[package]
name = "libstress"
version = "0.1.0"
authors = ["Sayan Nandan <nandansayan@outlook.com>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]

@ -0,0 +1,173 @@
/*
* Created on Wed Jun 16 2021
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2021, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use core::marker::PhantomData;
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;
/// A Job. The UIn type parameter is the type that will be used to execute the action
/// Nothing is a variant used by the drop implementation to terminate all the workers
/// and call the exit_loop function
pub enum JobType<UIn> {
Task(UIn),
Nothing,
}
/// A worker
///
/// The only reason we use option is to reduce the effort needed to implement [`Drop`] for the
/// [`Workpool`]
struct Worker {
thread: Option<thread::JoinHandle<()>>,
}
impl Worker {
/// Initialize a new worker
fn new<Inp: 'static, UIn>(
job_receiver: Arc<Mutex<mpsc::Receiver<JobType<UIn>>>>,
init_pre_loop_var: impl Fn() -> Inp + 'static + Send,
on_exit: impl Fn(&Inp) + Send + 'static,
on_loop: impl Fn(&Inp, UIn) + Send + Sync + 'static,
) -> Self
where
UIn: Send + Sync + 'static,
{
let thread = thread::spawn(move || {
let on_loop = on_loop;
let pre_loop_var = init_pre_loop_var();
loop {
let action = job_receiver.lock().unwrap().recv().unwrap();
match action {
JobType::Task(tsk) => on_loop(&pre_loop_var, tsk),
JobType::Nothing => {
on_exit(&pre_loop_var);
break;
}
}
}
});
Self {
thread: Some(thread),
}
}
}
impl<Inp: 'static, UIn, Lp, Lv, Ex> Clone for Workpool<Inp, UIn, Lv, Lp, Ex>
where
UIn: Send + Sync + 'static,
Ex: Fn(&Inp) + Send + Sync + 'static + Clone,
Lv: Fn() -> Inp + Send + Sync + 'static + Clone,
Lp: Fn(&Inp, UIn) + Clone + Send + Sync + 'static,
{
fn clone(&self) -> Self {
Workpool::new(
self.workers.len(),
self.init_pre_loop_var.clone(),
self.on_loop.clone(),
self.on_exit.clone(),
)
}
}
/// # Workpool
///
/// A Workpool is a generic synchronous thread pool that can be used to perform, well, anything.
/// A workpool has to be initialized with the number of workers, the pre_loop_variable (set this
/// to None if there isn't any). what to do on loop and what to do on exit of each worker. The
/// closures are kept as `Clone`able types just to reduce complexity with copy (we were lazy).
///
/// ## Clones
///
/// Workpool clones simply create a new workpool with the same on_exit, on_loop and init_pre_loop_var
/// configurations. This provides a very convenient interface if one desires to use multiple workpools
/// to do the _same kind of thing_
pub struct Workpool<Inp, UIn, Lv, Lp, Ex> {
/// the workers
workers: Vec<Worker>,
/// the sender that sends jobs
job_distributor: mpsc::Sender<JobType<UIn>>,
/// the function that sets the pre-loop variable
init_pre_loop_var: Lv,
/// the function to be executed on worker termination
on_exit: Ex,
/// the function to be executed on loop
on_loop: Lp,
/// a marker for `Inp` since no parameters use it directly
_marker: PhantomData<Inp>,
}
impl<Inp: 'static, UIn, Lv, Ex, Lp> Workpool<Inp, UIn, Lv, Lp, Ex>
where
UIn: Send + Sync + 'static,
Ex: Fn(&Inp) + Send + Sync + 'static + Clone,
Lv: Fn() -> Inp + Send + Sync + 'static + Clone,
Lp: Fn(&Inp, UIn) + Send + Sync + 'static + Clone,
{
/// Create a new workpool
pub fn new(count: usize, init_pre_loop_var: Lv, on_loop: Lp, on_exit: Ex) -> Self {
if count == 0 {
panic!("Runtime panic: Bad value `0` for thread count");
}
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(count);
for _ in 0..count {
workers.push(Worker::new(
Arc::clone(&receiver),
init_pre_loop_var.clone(),
on_exit.clone(),
on_loop.clone(),
));
}
Self {
workers,
job_distributor: sender,
init_pre_loop_var,
on_exit,
on_loop,
_marker: PhantomData,
}
}
/// Execute something
pub fn execute(&mut self, inp: UIn) {
self.job_distributor.send(JobType::Task(inp)).unwrap();
}
}
impl<Inp, UIn, Lv, Lp, Ex> Drop for Workpool<Inp, UIn, Lp, Lv, Ex> {
fn drop(&mut self) {
for _ in &mut self.workers {
self.job_distributor.send(JobType::Nothing).unwrap();
}
for worker in &mut self.workers {
if let Some(thread) = worker.thread.take() {
thread.join().unwrap()
}
}
}
}

@ -16,3 +16,4 @@ serde_json = "1.0.64"
regex = "1.5.4"
lazy_static = "1.4.0"
libsky = { path = "../libsky" }
libstress = {path = "../libstress"}

@ -31,100 +31,13 @@
mod benchtool {
use clap::{load_yaml, App};
use devtimer::DevTime;
use libstress::Workpool;
use rand::distributions::Alphanumeric;
use rand::thread_rng;
use serde::Serialize;
use std::error::Error;
use std::io::prelude::*;
use std::net::{self, TcpStream};
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;
/// A Netpool is a threadpool that holds several workers
///
/// Essentially, a `NetPool` is a connection pool
pub struct Netpool {
workers: Vec<Worker>,
sender: mpsc::Sender<WhatToDo>,
}
/// The job
///
/// A `NewJob` has a `Vec<u8>` field for the bytes it has to write to a stream held
/// by the worker. If the `Job` is `Nothing`, then it is time for the worker
/// to terminate
enum WhatToDo {
NewJob(Vec<u8>),
Nothing,
}
/// A worker holds a thread which also holds a persistent connection to
/// `localhost:2003`, as long as the thread is not told to terminate
struct Worker {
thread: Option<thread::JoinHandle<()>>,
}
impl Netpool {
/// Create a new `Netpool` instance with `size` number of connections (and threads)
pub fn new(size: usize, host: &str) -> Netpool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for _ in 0..size {
workers.push(Worker::new(Arc::clone(&receiver), host.to_owned()));
}
Netpool { workers, sender }
}
/// Execute the job
pub fn execute(&mut self, action: Vec<u8>) {
self.sender.send(WhatToDo::NewJob(action)).unwrap();
}
}
impl Worker {
/// Create a new `Worker` which also means that a connection to port 2003
/// will be established
fn new(
receiver: Arc<Mutex<mpsc::Receiver<WhatToDo>>>,
host: std::string::String,
) -> Worker {
let thread = thread::spawn(move || {
let mut connection = TcpStream::connect(host).unwrap();
loop {
let action = receiver.lock().unwrap().recv().unwrap();
match action {
WhatToDo::NewJob(someaction) => {
// We have to write something to the socket
connection.write_all(&someaction).unwrap();
// Ignore whatever we get, we don't need them
let _ = connection.read(&mut vec![0; 1024]).unwrap();
}
WhatToDo::Nothing => {
// A termination signal - just close the stream and
// return
connection.shutdown(net::Shutdown::Both).unwrap();
break;
}
}
}
});
Worker {
thread: Some(thread),
}
}
}
impl Drop for Netpool {
fn drop(&mut self) {
// Signal all the workers to shut down
for _ in &mut self.workers {
self.sender.send(WhatToDo::Nothing).unwrap();
}
// Terminate all the threads
for worker in &mut self.workers {
if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
}
}
}
}
#[derive(Serialize)]
/// A `JSONReportBlock` represents a JSON object which contains the type of report
@ -185,7 +98,17 @@ mod benchtool {
if let Some(matches) = matches.subcommand_matches("testkey") {
let numkeys = matches.value_of("count").unwrap();
if let Ok(num) = numkeys.parse::<usize>() {
let mut np = Netpool::new(10, &host);
let mut np = Workpool::new(
10,
move || TcpStream::connect(host.clone()).unwrap(),
|mut sock, packet: Vec<u8>| {
sock.write_all(&packet).unwrap();
let _ = sock.read(&mut vec![0; 1024]).unwrap();
},
|socket| {
socket.shutdown(net::Shutdown::Both).unwrap();
},
);
println!("Generating keys ...");
let keys: Vec<String> = (0..num)
.into_iter()
@ -238,8 +161,20 @@ mod benchtool {
let mut rand = thread_rng();
let mut dt = DevTime::new_complex();
// Create separate connection pools for get and set operations
let mut setpool = Netpool::new(max_connections, &host);
let mut getpool = Netpool::new(max_connections, &host);
let mut setpool = Workpool::new(
10,
move || TcpStream::connect(host.clone()).unwrap(),
|mut sock, packet: Vec<u8>| {
sock.write_all(&packet).unwrap();
// we don't care much about what's returned
let _ = sock.read(&mut vec![0; 1024]).unwrap();
},
|socket| {
socket.shutdown(std::net::Shutdown::Both).unwrap();
},
);
let mut getpool = setpool.clone();
let mut delpool = getpool.clone();
let keys: Vec<String> = (0..max_queries)
.into_iter()
.map(|_| ran_string(packet_size, &mut rand))
@ -284,7 +219,6 @@ mod benchtool {
dt.stop_timer("GET").unwrap();
eprintln!("Benchmark completed! Removing created keys...");
// Create a connection pool for del operations
let mut delpool = Netpool::new(max_connections, &host);
// Delete all the created keys
for packet in del_packs {
delpool.execute(packet);

Loading…
Cancel
Save