Add worker-based bench tool

next
Sayan Nandan 4 years ago
parent e2fba6e6f4
commit 7919f5b40e
No known key found for this signature in database
GPG Key ID: C31EFD7DDA12AEE0

@ -9,7 +9,10 @@ edition = "2018"
[dependencies]
corelib = {path = "../corelib"}
tokio = {version = "0.2.22", features = ["full"]}
[dev-dependencies]
rand = "0.7.3"
devtimer = "4.0.0"
devtimer = "4.0.0"
[[bin]]
path="src/bin/benchmark.rs"
name = "tdb-bench"
version = "0.1.0"

@ -1,91 +0,0 @@
//! A generic module for benchmarking SET/GET operations
//! **NOTE:** This is experimental and only uses a single connection. So any
//! benchmark comparisons you might do - aren't fair since it is likely
//! that most of them were done using parallel connections
#[cfg(test)]
#[test]
#[ignore]
fn benchmark_server() {
const MAX_TESTS: usize = 1000000;
use corelib::terrapipe::QueryBuilder;
use devtimer::DevTime;
use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};
use std::io::{Read, Write};
use std::net::TcpStream;
let rand = thread_rng();
let mut dt = DevTime::new_complex();
let keys: Vec<String> = (0..MAX_TESTS)
.into_iter()
.map(|_| {
let rand_string: String = rand.sample_iter(&Alphanumeric).take(32).collect();
rand_string
})
.collect();
let values: Vec<String> = (0..MAX_TESTS)
.into_iter()
.map(|_| {
let rand_string: String = rand.sample_iter(&Alphanumeric).take(32).collect();
rand_string
})
.collect();
let set_packs: Vec<Vec<u8>> = (0..MAX_TESTS)
.map(|idx| {
let mut q = QueryBuilder::new_simple();
q.add("SET");
q.add(&keys[idx]);
q.add(&values[idx]);
q.prepare_response().1
})
.collect();
let get_packs: Vec<Vec<u8>> = (0..MAX_TESTS)
.map(|idx| {
let mut q = QueryBuilder::new_simple();
q.add("GET");
q.add(&keys[idx]);
q.prepare_response().1
})
.collect();
let del_packs: Vec<Vec<u8>> = (0..MAX_TESTS)
.map(|idx| {
let mut q = QueryBuilder::new_simple();
q.add("DEL");
q.add(&keys[idx]);
q.prepare_response().1
})
.collect();
let mut con = TcpStream::connect("127.0.0.1:2003").unwrap();
dt.create_timer("SET").unwrap();
dt.start_timer("SET").unwrap();
for packet in set_packs {
con.write_all(&packet).unwrap();
// We don't care about the return
let _ = con.read(&mut vec![0; 1024]).unwrap();
}
dt.stop_timer("SET").unwrap();
dt.create_timer("GET").unwrap();
dt.start_timer("GET").unwrap();
for packet in get_packs {
con.write_all(&packet).unwrap();
// We don't need the return
let _ = con.read(&mut vec![0; 1024]).unwrap();
}
dt.stop_timer("GET").unwrap();
// Delete all the created keys
for packet in del_packs {
con.write_all(&packet).unwrap();
// We don't need the return
let _ = con.read(&mut vec![0; 1024]).unwrap();
}
println!(
"Time for {} SETs: {} ns",
MAX_TESTS,
dt.time_in_nanos("SET").unwrap()
);
println!(
"Time for {} GETs: {} ns",
MAX_TESTS,
dt.time_in_nanos("GET").unwrap()
);
}

@ -0,0 +1,220 @@
/*
* Created on Sun Aug 02 2020
*
* This file is a part of the source code for the Terrabase database
* Copyright (c) 2020, Sayan Nandan <ohsayan at outlook dot com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
//! A generic module for benchmarking SET/GET operations
//! **NOTE:** This is experimental and only uses a single connection. So any
//! benchmark comparisons you might do - aren't fair since it is likely
//! that most of them were done using parallel connections
mod benchtool {
use corelib::terrapipe::QueryBuilder;
use devtimer::DevTime;
use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};
use std::io::prelude::*;
use std::net::TcpStream;
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;
pub struct Netpool {
workers: Vec<Worker>,
sender: mpsc::Sender<WhatToDo>,
}
enum WhatToDo {
NewJob(Vec<u8>),
Nothing,
}
struct Worker {
thread: Option<thread::JoinHandle<()>>,
}
impl Netpool {
pub fn new(size: usize) -> Netpool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for _ in 0..size {
workers.push(Worker::new(Arc::clone(&receiver)));
}
Netpool { workers, sender }
}
/// Execute the job
pub fn execute(&mut self, action: Vec<u8>) {
self.sender.send(WhatToDo::NewJob(action)).unwrap();
}
}
impl Worker {
fn new(receiver: Arc<Mutex<mpsc::Receiver<WhatToDo>>>) -> Worker {
let thread = thread::spawn(move || {
let mut connection = TcpStream::connect("127.0.0.1:2003").unwrap();
loop {
let action = receiver.lock().unwrap().recv().unwrap();
match action {
WhatToDo::NewJob(someaction) => {
connection.write_all(&someaction).unwrap();
connection.read(&mut vec![0; 1024]).unwrap();
}
WhatToDo::Nothing => break,
}
}
});
Worker {
thread: Some(thread),
}
}
}
impl Drop for Netpool {
fn drop(&mut self) {
for _ in &mut self.workers {
self.sender.send(WhatToDo::Nothing).unwrap();
}
for worker in &mut self.workers {
if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
}
}
}
}
pub fn runner() {
let mut args: Vec<String> = std::env::args().collect();
args.remove(0);
println!(
"------------------------------------------------------------\
\nTerrabaseDB Benchmark Tool v0.1.0\
\nReport issues here: https://github.com/terrabasedb/terrabase\
\n------------------------------------------------------------"
);
// connections queries packetsize
if args.len() != 3 {
eprintln!(
"Insufficient arguments!\
\nUSAGE: benchmark <connections> <queries> <packetsize-in-bytes>"
);
std::process::exit(0x100);
}
let (max_connections, max_queries, packet_size) = match (
args[0].parse::<usize>(),
args[1].parse::<usize>(),
args[2].parse::<usize>(),
) {
(Ok(mx), Ok(mc), Ok(ps)) => (mx, mc, ps),
_ => {
eprintln!("Incorrect arguments");
std::process::exit(0x100);
}
};
println!(
"Initializing benchmark\nConnections: {}\nQueries: {}\nData size (key+value): {} bytes",
max_connections,
max_queries,
(packet_size * 2), // key size + value size
);
let rand = thread_rng();
let mut dt = DevTime::new_complex();
let mut setpool = Netpool::new(max_connections);
let mut getpool = Netpool::new(max_connections);
let keys: Vec<String> = (0..max_queries)
.into_iter()
.map(|_| {
let rand_string: String =
rand.sample_iter(&Alphanumeric).take(packet_size).collect();
rand_string
})
.collect();
let values: Vec<String> = (0..max_queries)
.into_iter()
.map(|_| {
let rand_string: String =
rand.sample_iter(&Alphanumeric).take(packet_size).collect();
rand_string
})
.collect();
let set_packs: Vec<Vec<u8>> = (0..max_queries)
.map(|idx| {
let mut q = QueryBuilder::new_simple();
q.add("SET");
q.add(&keys[idx]);
q.add(&values[idx]);
q.prepare_response().1
})
.collect();
let get_packs: Vec<Vec<u8>> = (0..max_queries)
.map(|idx| {
let mut q = QueryBuilder::new_simple();
q.add("GET");
q.add(&keys[idx]);
q.prepare_response().1
})
.collect();
let del_packs: Vec<Vec<u8>> = (0..max_queries)
.map(|idx| {
let mut q = QueryBuilder::new_simple();
q.add("DEL");
q.add(&keys[idx]);
q.prepare_response().1
})
.collect();
println!("Per-packet size (GET): {} bytes", get_packs[0].len());
println!("Per-packet size (SET): {} bytes", set_packs[0].len());
println!("Initialization complete! Benchmark started");
dt.create_timer("SET").unwrap();
dt.start_timer("SET").unwrap();
for packet in set_packs {
setpool.execute(packet);
}
drop(setpool);
dt.stop_timer("SET").unwrap();
dt.create_timer("GET").unwrap();
dt.start_timer("GET").unwrap();
for packet in get_packs {
getpool.execute(packet);
}
drop(getpool);
dt.stop_timer("GET").unwrap();
println!("Benchmark completed! Removing created keys");
let mut delpool = Netpool::new(max_connections);
// Delete all the created keys
for packet in del_packs {
delpool.execute(packet);
}
drop(delpool);
println!("==========RESULTS==========");
println!(
"{} SETs/sec",
calc(max_queries, dt.time_in_nanos("SET").unwrap())
);
println!(
"{} GETs/sec",
calc(max_queries, dt.time_in_nanos("GET").unwrap())
);
println!("===========================");
}
fn calc(reqs: usize, time: u128) -> f64 {
reqs as f64 / (time as f64 / 1_000_000_000 as f64)
}
}
fn main() {
benchtool::runner();
}

@ -24,9 +24,6 @@ mod client;
use tokio;
const MSG_WELCOME: &'static str = "TerrabaseDB v0.1.0";
#[cfg(test)]
mod benchmark;
#[tokio::main]
async fn main() {
println!("{}", MSG_WELCOME);

@ -11,4 +11,5 @@ tokio = { version = "0.2.22", features = ["full"] }
bytes = "0.5.6"
corelib = {path ="../corelib"}
bincode = "1.3.1"
parking_lot = "0.11.0"
parking_lot = "0.11.0"

Loading…
Cancel
Save