diff --git a/cli/Cargo.toml b/cli/Cargo.toml index fda0c310..959726fd 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -9,7 +9,10 @@ edition = "2018" [dependencies] corelib = {path = "../corelib"} tokio = {version = "0.2.22", features = ["full"]} - -[dev-dependencies] rand = "0.7.3" -devtimer = "4.0.0" \ No newline at end of file +devtimer = "4.0.0" + +[[bin]] +path="src/bin/benchmark.rs" +name = "tdb-bench" +version = "0.1.0" \ No newline at end of file diff --git a/cli/src/benchmark.rs b/cli/src/benchmark.rs deleted file mode 100644 index 5305f456..00000000 --- a/cli/src/benchmark.rs +++ /dev/null @@ -1,91 +0,0 @@ -//! A generic module for benchmarking SET/GET operations -//! **NOTE:** This is experimental and only uses a single connection. So any -//! benchmark comparisons you might do - aren't fair since it is likely -//! that most of them were done using parallel connections -#[cfg(test)] -#[test] -#[ignore] -fn benchmark_server() { - const MAX_TESTS: usize = 1000000; - use corelib::terrapipe::QueryBuilder; - use devtimer::DevTime; - use rand::distributions::Alphanumeric; - use rand::{thread_rng, Rng}; - use std::io::{Read, Write}; - use std::net::TcpStream; - let rand = thread_rng(); - let mut dt = DevTime::new_complex(); - let keys: Vec = (0..MAX_TESTS) - .into_iter() - .map(|_| { - let rand_string: String = rand.sample_iter(&Alphanumeric).take(32).collect(); - rand_string - }) - .collect(); - let values: Vec = (0..MAX_TESTS) - .into_iter() - .map(|_| { - let rand_string: String = rand.sample_iter(&Alphanumeric).take(32).collect(); - rand_string - }) - .collect(); - let set_packs: Vec> = (0..MAX_TESTS) - .map(|idx| { - let mut q = QueryBuilder::new_simple(); - q.add("SET"); - q.add(&keys[idx]); - q.add(&values[idx]); - q.prepare_response().1 - }) - .collect(); - let get_packs: Vec> = (0..MAX_TESTS) - .map(|idx| { - let mut q = QueryBuilder::new_simple(); - q.add("GET"); - q.add(&keys[idx]); - q.prepare_response().1 - }) - .collect(); - let del_packs: Vec> = (0..MAX_TESTS) - .map(|idx| { - let mut q = QueryBuilder::new_simple(); - q.add("DEL"); - q.add(&keys[idx]); - q.prepare_response().1 - }) - .collect(); - let mut con = TcpStream::connect("127.0.0.1:2003").unwrap(); - dt.create_timer("SET").unwrap(); - dt.start_timer("SET").unwrap(); - for packet in set_packs { - con.write_all(&packet).unwrap(); - // We don't care about the return - let _ = con.read(&mut vec![0; 1024]).unwrap(); - } - dt.stop_timer("SET").unwrap(); - dt.create_timer("GET").unwrap(); - dt.start_timer("GET").unwrap(); - for packet in get_packs { - con.write_all(&packet).unwrap(); - // We don't need the return - let _ = con.read(&mut vec![0; 1024]).unwrap(); - } - dt.stop_timer("GET").unwrap(); - - // Delete all the created keys - for packet in del_packs { - con.write_all(&packet).unwrap(); - // We don't need the return - let _ = con.read(&mut vec![0; 1024]).unwrap(); - } - println!( - "Time for {} SETs: {} ns", - MAX_TESTS, - dt.time_in_nanos("SET").unwrap() - ); - println!( - "Time for {} GETs: {} ns", - MAX_TESTS, - dt.time_in_nanos("GET").unwrap() - ); -} diff --git a/cli/src/bin/benchmark.rs b/cli/src/bin/benchmark.rs new file mode 100644 index 00000000..cea52859 --- /dev/null +++ b/cli/src/bin/benchmark.rs @@ -0,0 +1,220 @@ +/* + * Created on Sun Aug 02 2020 + * + * This file is a part of the source code for the Terrabase database + * Copyright (c) 2020, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +//! A generic module for benchmarking SET/GET operations +//! **NOTE:** This is experimental and only uses a single connection. So any +//! benchmark comparisons you might do - aren't fair since it is likely +//! that most of them were done using parallel connections + +mod benchtool { + use corelib::terrapipe::QueryBuilder; + use devtimer::DevTime; + use rand::distributions::Alphanumeric; + use rand::{thread_rng, Rng}; + use std::io::prelude::*; + use std::net::TcpStream; + use std::sync::mpsc; + use std::sync::Arc; + use std::sync::Mutex; + use std::thread; + pub struct Netpool { + workers: Vec, + sender: mpsc::Sender, + } + enum WhatToDo { + NewJob(Vec), + Nothing, + } + struct Worker { + thread: Option>, + } + impl Netpool { + pub fn new(size: usize) -> Netpool { + assert!(size > 0); + let (sender, receiver) = mpsc::channel(); + let receiver = Arc::new(Mutex::new(receiver)); + let mut workers = Vec::with_capacity(size); + for _ in 0..size { + workers.push(Worker::new(Arc::clone(&receiver))); + } + Netpool { workers, sender } + } + /// Execute the job + pub fn execute(&mut self, action: Vec) { + self.sender.send(WhatToDo::NewJob(action)).unwrap(); + } + } + impl Worker { + fn new(receiver: Arc>>) -> Worker { + let thread = thread::spawn(move || { + let mut connection = TcpStream::connect("127.0.0.1:2003").unwrap(); + loop { + let action = receiver.lock().unwrap().recv().unwrap(); + match action { + WhatToDo::NewJob(someaction) => { + connection.write_all(&someaction).unwrap(); + connection.read(&mut vec![0; 1024]).unwrap(); + } + WhatToDo::Nothing => break, + } + } + }); + Worker { + thread: Some(thread), + } + } + } + impl Drop for Netpool { + fn drop(&mut self) { + for _ in &mut self.workers { + self.sender.send(WhatToDo::Nothing).unwrap(); + } + for worker in &mut self.workers { + if let Some(thread) = worker.thread.take() { + thread.join().unwrap(); + } + } + } + } + + pub fn runner() { + let mut args: Vec = std::env::args().collect(); + args.remove(0); + println!( + "------------------------------------------------------------\ + \nTerrabaseDB Benchmark Tool v0.1.0\ + \nReport issues here: https://github.com/terrabasedb/terrabase\ + \n------------------------------------------------------------" + ); + // connections queries packetsize + if args.len() != 3 { + eprintln!( + "Insufficient arguments!\ + \nUSAGE: benchmark " + ); + std::process::exit(0x100); + } + let (max_connections, max_queries, packet_size) = match ( + args[0].parse::(), + args[1].parse::(), + args[2].parse::(), + ) { + (Ok(mx), Ok(mc), Ok(ps)) => (mx, mc, ps), + _ => { + eprintln!("Incorrect arguments"); + std::process::exit(0x100); + } + }; + println!( + "Initializing benchmark\nConnections: {}\nQueries: {}\nData size (key+value): {} bytes", + max_connections, + max_queries, + (packet_size * 2), // key size + value size + ); + let rand = thread_rng(); + let mut dt = DevTime::new_complex(); + let mut setpool = Netpool::new(max_connections); + let mut getpool = Netpool::new(max_connections); + let keys: Vec = (0..max_queries) + .into_iter() + .map(|_| { + let rand_string: String = + rand.sample_iter(&Alphanumeric).take(packet_size).collect(); + rand_string + }) + .collect(); + let values: Vec = (0..max_queries) + .into_iter() + .map(|_| { + let rand_string: String = + rand.sample_iter(&Alphanumeric).take(packet_size).collect(); + rand_string + }) + .collect(); + let set_packs: Vec> = (0..max_queries) + .map(|idx| { + let mut q = QueryBuilder::new_simple(); + q.add("SET"); + q.add(&keys[idx]); + q.add(&values[idx]); + q.prepare_response().1 + }) + .collect(); + let get_packs: Vec> = (0..max_queries) + .map(|idx| { + let mut q = QueryBuilder::new_simple(); + q.add("GET"); + q.add(&keys[idx]); + q.prepare_response().1 + }) + .collect(); + let del_packs: Vec> = (0..max_queries) + .map(|idx| { + let mut q = QueryBuilder::new_simple(); + q.add("DEL"); + q.add(&keys[idx]); + q.prepare_response().1 + }) + .collect(); + println!("Per-packet size (GET): {} bytes", get_packs[0].len()); + println!("Per-packet size (SET): {} bytes", set_packs[0].len()); + println!("Initialization complete! Benchmark started"); + dt.create_timer("SET").unwrap(); + dt.start_timer("SET").unwrap(); + for packet in set_packs { + setpool.execute(packet); + } + drop(setpool); + dt.stop_timer("SET").unwrap(); + dt.create_timer("GET").unwrap(); + dt.start_timer("GET").unwrap(); + for packet in get_packs { + getpool.execute(packet); + } + drop(getpool); + dt.stop_timer("GET").unwrap(); + println!("Benchmark completed! Removing created keys"); + let mut delpool = Netpool::new(max_connections); + // Delete all the created keys + for packet in del_packs { + delpool.execute(packet); + } + drop(delpool); + println!("==========RESULTS=========="); + println!( + "{} SETs/sec", + calc(max_queries, dt.time_in_nanos("SET").unwrap()) + ); + println!( + "{} GETs/sec", + calc(max_queries, dt.time_in_nanos("GET").unwrap()) + ); + println!("==========================="); + } + + fn calc(reqs: usize, time: u128) -> f64 { + reqs as f64 / (time as f64 / 1_000_000_000 as f64) + } +} + +fn main() { + benchtool::runner(); +} diff --git a/cli/src/main.rs b/cli/src/main.rs index 58ad25ee..c1911ffe 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -24,9 +24,6 @@ mod client; use tokio; const MSG_WELCOME: &'static str = "TerrabaseDB v0.1.0"; -#[cfg(test)] -mod benchmark; - #[tokio::main] async fn main() { println!("{}", MSG_WELCOME); diff --git a/server/Cargo.toml b/server/Cargo.toml index f4b29874..64aa0dfe 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -11,4 +11,5 @@ tokio = { version = "0.2.22", features = ["full"] } bytes = "0.5.6" corelib = {path ="../corelib"} bincode = "1.3.1" -parking_lot = "0.11.0" \ No newline at end of file +parking_lot = "0.11.0" +