|
|
|
@ -34,18 +34,29 @@ mod benchtool {
|
|
|
|
|
use std::sync::Arc;
|
|
|
|
|
use std::sync::Mutex;
|
|
|
|
|
use std::thread;
|
|
|
|
|
/// A Netpool is a threadpool that holds several workers
|
|
|
|
|
///
|
|
|
|
|
/// Essentially, a `NetPool` is a connection pool
|
|
|
|
|
pub struct Netpool {
|
|
|
|
|
workers: Vec<Worker>,
|
|
|
|
|
sender: mpsc::Sender<WhatToDo>,
|
|
|
|
|
}
|
|
|
|
|
/// The job
|
|
|
|
|
///
|
|
|
|
|
/// A `NewJob` has a `Vec<u8>` field for the bytes it has to write to a stream held
|
|
|
|
|
/// by the worker. If the `Job` is `Nothing`, then it is time for the worker
|
|
|
|
|
/// to terminate
|
|
|
|
|
enum WhatToDo {
|
|
|
|
|
NewJob(Vec<u8>),
|
|
|
|
|
Nothing,
|
|
|
|
|
}
|
|
|
|
|
/// A worker holds a thread which also holds a persistent connection to
|
|
|
|
|
/// `localhost:2003`, as long as the thread is not told to terminate
|
|
|
|
|
struct Worker {
|
|
|
|
|
thread: Option<thread::JoinHandle<()>>,
|
|
|
|
|
}
|
|
|
|
|
impl Netpool {
|
|
|
|
|
/// Create a new `Netpool` instance with `size` number of connections (and threads)
|
|
|
|
|
pub fn new(size: usize) -> Netpool {
|
|
|
|
|
assert!(size > 0);
|
|
|
|
|
let (sender, receiver) = mpsc::channel();
|
|
|
|
@ -62,6 +73,8 @@ mod benchtool {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
impl Worker {
|
|
|
|
|
/// Create a new `Worker` which also means that a connection to port 2003
|
|
|
|
|
/// will be established
|
|
|
|
|
fn new(receiver: Arc<Mutex<mpsc::Receiver<WhatToDo>>>) -> Worker {
|
|
|
|
|
let thread = thread::spawn(move || {
|
|
|
|
|
let mut connection = TcpStream::connect("127.0.0.1:2003").unwrap();
|
|
|
|
@ -69,10 +82,14 @@ mod benchtool {
|
|
|
|
|
let action = receiver.lock().unwrap().recv().unwrap();
|
|
|
|
|
match action {
|
|
|
|
|
WhatToDo::NewJob(someaction) => {
|
|
|
|
|
// We have to write something to the socket
|
|
|
|
|
connection.write_all(&someaction).unwrap();
|
|
|
|
|
// Ignore whatever we get, we don't need them
|
|
|
|
|
connection.read(&mut vec![0; 1024]).unwrap();
|
|
|
|
|
}
|
|
|
|
|
WhatToDo::Nothing => {
|
|
|
|
|
// A termination signal - just close the stream and
|
|
|
|
|
// return
|
|
|
|
|
connection.shutdown(net::Shutdown::Both).unwrap();
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
@ -86,9 +103,11 @@ mod benchtool {
|
|
|
|
|
}
|
|
|
|
|
impl Drop for Netpool {
|
|
|
|
|
fn drop(&mut self) {
|
|
|
|
|
// Signal all the workers to shut down
|
|
|
|
|
for _ in &mut self.workers {
|
|
|
|
|
self.sender.send(WhatToDo::Nothing).unwrap();
|
|
|
|
|
}
|
|
|
|
|
// Terminate all the threads
|
|
|
|
|
for worker in &mut self.workers {
|
|
|
|
|
if let Some(thread) = worker.thread.take() {
|
|
|
|
|
thread.join().unwrap();
|
|
|
|
@ -97,13 +116,14 @@ mod benchtool {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Run the benchmark tool
|
|
|
|
|
pub fn runner() {
|
|
|
|
|
let mut args: Vec<String> = std::env::args().collect();
|
|
|
|
|
args.remove(0);
|
|
|
|
|
println!(
|
|
|
|
|
"------------------------------------------------------------\
|
|
|
|
|
\nTerrabaseDB Benchmark Tool v0.1.0\
|
|
|
|
|
\nReport issues here: https://github.com/terrabasedb/terrabase\
|
|
|
|
|
\nReport issues here: https://github.com/terrabasedb/terrabasedb\
|
|
|
|
|
\n------------------------------------------------------------"
|
|
|
|
|
);
|
|
|
|
|
// connections queries packetsize
|
|
|
|
@ -133,6 +153,7 @@ mod benchtool {
|
|
|
|
|
);
|
|
|
|
|
let rand = thread_rng();
|
|
|
|
|
let mut dt = DevTime::new_complex();
|
|
|
|
|
// Create separate connection pools for get and set operations
|
|
|
|
|
let mut setpool = Netpool::new(max_connections);
|
|
|
|
|
let mut getpool = Netpool::new(max_connections);
|
|
|
|
|
let keys: Vec<String> = (0..max_queries)
|
|
|
|
@ -151,6 +172,14 @@ mod benchtool {
|
|
|
|
|
rand_string
|
|
|
|
|
})
|
|
|
|
|
.collect();
|
|
|
|
|
/*
|
|
|
|
|
We create three vectors of vectors: `set_packs`, `get_packs` and `del_packs`
|
|
|
|
|
The bytes in each of `set_packs` has a query packet for setting data;
|
|
|
|
|
The bytes in each of `get_packs` has a query packet for getting a key set by one of `set_packs`
|
|
|
|
|
since we use the same key/value pairs for all;
|
|
|
|
|
The bytes in each of `del_packs` has a query packet for deleting a key created by
|
|
|
|
|
one of `set_packs`
|
|
|
|
|
*/
|
|
|
|
|
let set_packs: Vec<Vec<u8>> = (0..max_queries)
|
|
|
|
|
.map(|idx| terrapipe::proc_query(format!("SET {} {}", keys[idx], values[idx])))
|
|
|
|
|
.collect();
|
|
|
|
@ -178,6 +207,7 @@ mod benchtool {
|
|
|
|
|
drop(getpool);
|
|
|
|
|
dt.stop_timer("GET").unwrap();
|
|
|
|
|
println!("Benchmark completed! Removing created keys...");
|
|
|
|
|
// Create a connection pool for del operations
|
|
|
|
|
let mut delpool = Netpool::new(max_connections);
|
|
|
|
|
// Delete all the created keys
|
|
|
|
|
for packet in del_packs {
|
|
|
|
@ -196,6 +226,7 @@ mod benchtool {
|
|
|
|
|
println!("===========================");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Returns the number of queries/sec
|
|
|
|
|
fn calc(reqs: usize, time: u128) -> f64 {
|
|
|
|
|
reqs as f64 / (time as f64 / 1_000_000_000 as f64)
|
|
|
|
|
}
|
|
|
|
|