From da645247d2f02a023a54fc8ef251de86975d179f Mon Sep 17 00:00:00 2001 From: Sayan Nandan Date: Sat, 13 Aug 2022 22:42:01 +0530 Subject: [PATCH] Refactor benchmarks into separate functions --- libstress/src/lib.rs | 13 +- sky-bench/src/bench/benches.rs | 255 ++++++++++++++++++++++++++++ sky-bench/src/bench/mod.rs | 297 ++++++++++++++++----------------- sky-bench/src/config.rs | 11 +- 4 files changed, 411 insertions(+), 165 deletions(-) create mode 100644 sky-bench/src/bench/benches.rs diff --git a/libstress/src/lib.rs b/libstress/src/lib.rs index fec4c2dc..c741acb2 100644 --- a/libstress/src/lib.rs +++ b/libstress/src/lib.rs @@ -304,20 +304,21 @@ where } } /// Execute something - pub fn execute(&self, inp: UIn) { - self.job_distributor.send(JobType::Task(inp)).unwrap(); + pub fn execute(&self, inp: UIn) -> bool { + self.job_distributor.send(JobType::Task(inp)).is_ok() } /// Execute something that can be executed as a parallel iterator /// For the best performance, it is recommended that you pass true for `needs_iterator_pool` /// on initialization of the [`Workpool`] - pub fn execute_iter(&self, iter: impl IntoParallelIterator) { - iter.into_par_iter().for_each(|inp| self.execute(inp)); + pub fn execute_iter(&self, iter: impl IntoParallelIterator) -> bool { + iter.into_par_iter().all(|inp| self.execute(inp)) } /// Does the same thing as [`execute_iter`] but drops self ensuring that all the /// workers actually finish their tasks - pub fn execute_and_finish_iter(self, iter: impl IntoParallelIterator) { - self.execute_iter(iter); + pub fn execute_and_finish_iter(self, iter: impl IntoParallelIterator) -> bool { + let ret = self.execute_iter(iter); drop(self); + ret } /// Initialize a new [`Workpool`] with the default count of threads. This is equal /// to 2 * the number of logical cores. diff --git a/sky-bench/src/bench/benches.rs b/sky-bench/src/bench/benches.rs new file mode 100644 index 00000000..70c87354 --- /dev/null +++ b/sky-bench/src/bench/benches.rs @@ -0,0 +1,255 @@ +/* + * Created on Sat Aug 13 2022 + * + * This file is a part of S{ + let ref this = loopmon; + this.current +}le (formerly known as TerrabaseDB or Skybase) is a free and open-source + * NoSQL database written by Sayan Nandan ("the Author") with the + * vision to provide flexibility in data modelling without compromising + * on performance, queryability or scalability. + * + * Copyright (c) 2022, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +use crate::error::Error; + +use { + super::{ + report::{AggregateReport, SingleReport}, + validation, vec_with_cap, BenchmarkConfig, LoopMonitor, + }, + crate::error::BResult, + devtimer::SimpleTimer, + libstress::Workpool, + skytable::{types::RawString, Connection, Element, Query, RespCode}, + std::{ + io::{Read, Write}, + net::{Shutdown, TcpStream}, + }, +}; + +/// Run a benchmark using the given pre-loop, in-loop and post-loop closures +fn run_bench_custom( + bench_config: BenchmarkConfig, + packets: Vec>, + on_init: Lv, + on_loop: Lp, + on_loop_exit: Ex, + loopmon: LoopMonitor, + reports: &mut AggregateReport, +) -> BResult<()> +where + Ex: Clone + Fn(&mut Inp) + Send + Sync + 'static, + Inp: Sync + 'static, + Lp: Clone + Fn(&mut Inp, Box<[u8]>) + Send + Sync + 'static, + Lv: Clone + Fn() -> Inp + Send + 'static + Sync, +{ + // now do our runs + let mut loopmon = loopmon; + let mut run_okay = true; + + while loopmon.should_continue() && run_okay { + // now create our connection pool + let pool = Workpool::new( + bench_config.server.connections(), + on_init.clone(), + on_loop.clone(), + on_loop_exit.clone(), + true, + Some(bench_config.query_count()), + ); + + // get our local copy + let this_packets = packets.clone(); + + // run and time our operations + let mut dt = SimpleTimer::new(); + dt.start(); + let ok = pool.execute_and_finish_iter(this_packets); + dt.stop(); + loopmon.incr_time(&dt); + run_okay = ok; + + // cleanup + loopmon.cleanup()?; + loopmon.step(); + } + if run_okay { + // save time + reports.push(SingleReport::new( + loopmon.name(), + loopmon.sum() as f64 / bench_config.runs() as f64, + )); + Ok(()) + } else { + Err(Error::RuntimeError(format!( + "Worker thread for test `{}` crashed. Unable to send job", + loopmon.name() + ))) + } +} + +#[inline(always)] +/// Init connection and buffer +fn init_connection_and_buf( + host: &str, + port: u16, + start_command: Vec, + bufsize: usize, +) -> (TcpStream, Vec) { + let mut con = TcpStream::connect((host, port)).unwrap(); + con.write_all(&start_command).unwrap(); + let mut ret = [0u8; validation::RESPCODE_OKAY.len()]; + con.read_exact(&mut ret).unwrap(); + let readbuf = vec![0; bufsize]; + (con, readbuf) +} + +/// Benchmark SET +pub fn bench_set( + keys: &[Vec], + values: &[Vec], + connection: &mut Connection, + bench_config: &BenchmarkConfig, + create_table: &[u8], + reports: &mut AggregateReport, +) -> BResult<()> { + let bench_config = bench_config.clone(); + let create_table = create_table.to_owned(); + let loopmon = LoopMonitor::new_cleanup( + bench_config.runs(), + "set", + connection, + Query::from("FLUSHDB").arg("default.tmpbench"), + Element::RespCode(RespCode::Okay), + true, + ); + let mut packets = vec_with_cap(bench_config.query_count())?; + (0..bench_config.query_count()).for_each(|i| { + packets.push( + Query::from("SET") + .arg(RawString::from(keys[i].to_owned())) + .arg(RawString::from(values[i].to_owned())) + .into_raw_query() + .into_boxed_slice(), + ) + }); + run_bench_custom( + bench_config.clone(), + packets, + move || { + init_connection_and_buf( + bench_config.server.host(), + bench_config.server.port(), + create_table.to_owned(), + validation::RESPCODE_OKAY.len(), + ) + }, + |(con, buf), packet| { + con.write_all(&packet).unwrap(); + con.read_exact(buf).unwrap(); + assert_eq!(buf, validation::RESPCODE_OKAY); + }, + |(con, _)| con.shutdown(Shutdown::Both).unwrap(), + loopmon, + reports, + ) +} + +/// Benchmark UPDATE +pub fn bench_update( + keys: &[Vec], + new_value: &[u8], + bench_config: &BenchmarkConfig, + create_table: &[u8], + reports: &mut AggregateReport, +) -> BResult<()> { + let bench_config = bench_config.clone(); + let create_table = create_table.to_owned(); + let loopmon = LoopMonitor::new(bench_config.runs(), "update"); + let mut packets = vec_with_cap(bench_config.query_count())?; + (0..bench_config.query_count()).for_each(|i| { + packets.push( + Query::from("update") + .arg(RawString::from(keys[i].clone())) + .arg(RawString::from(new_value.to_owned())) + .into_raw_query() + .into_boxed_slice(), + ) + }); + run_bench_custom( + bench_config.clone(), + packets, + move || { + init_connection_and_buf( + bench_config.server.host(), + bench_config.server.port(), + create_table.to_owned(), + validation::RESPCODE_OKAY.len(), + ) + }, + |(con, buf), packet| { + con.write_all(&packet).unwrap(); + con.read_exact(buf).unwrap(); + assert_eq!(buf, validation::RESPCODE_OKAY); + }, + |(con, _)| con.shutdown(Shutdown::Both).unwrap(), + loopmon, + reports, + ) +} + +/// Benchmark GET +pub fn bench_get( + keys: &[Vec], + bench_config: &BenchmarkConfig, + create_table: &[u8], + reports: &mut AggregateReport, +) -> BResult<()> { + let bench_config = bench_config.clone(); + let create_table = create_table.to_owned(); + let loopmon = LoopMonitor::new(bench_config.runs(), "get"); + let mut packets = vec_with_cap(bench_config.query_count())?; + (0..bench_config.query_count()).for_each(|i| { + packets.push( + Query::from("get") + .arg(RawString::from(keys[i].clone())) + .into_raw_query() + .into_boxed_slice(), + ) + }); + run_bench_custom( + bench_config.clone(), + packets, + move || { + init_connection_and_buf( + bench_config.server.host(), + bench_config.server.port(), + create_table.to_owned(), + validation::calculate_response_size(bench_config.kvsize()), + ) + }, + |(con, buf), packet| { + con.write_all(&packet).unwrap(); + con.read_exact(buf).unwrap(); + }, + |(con, _)| con.shutdown(Shutdown::Both).unwrap(), + loopmon, + reports, + ) +} diff --git a/sky-bench/src/bench/mod.rs b/sky-bench/src/bench/mod.rs index b61cc903..9ca83487 100644 --- a/sky-bench/src/bench/mod.rs +++ b/sky-bench/src/bench/mod.rs @@ -25,7 +25,7 @@ */ use { - self::report::{AggregateReport, SingleReport}, + self::report::AggregateReport, crate::{ config, config::{BenchmarkConfig, ServerConfig}, @@ -34,17 +34,11 @@ use { }, clap::ArgMatches, devtimer::SimpleTimer, - libstress::{ - utils::{generate_random_byte_vector, ran_bytes}, - PoolConfig, - }, - skytable::{types::RawString, Connection, Element, Query, RespCode}, - std::{ - io::{Read, Write}, - net::TcpStream, - }, + libstress::utils::{generate_random_byte_vector, ran_bytes}, + skytable::{Connection, Element, Query, RespCode}, }; +mod benches; mod report; mod validation; @@ -56,13 +50,136 @@ macro_rules! binfo { }; } +/// The loop monitor can be used for maintaining a loop for a given benchmark +struct LoopMonitor<'a> { + /// cleanup instructions + inner: Option>, + /// maximum iterations + max: usize, + /// current iteration + current: usize, + /// total time + time: u128, + /// name of test + name: &'static str, +} + +impl<'a> LoopMonitor<'a> { + /// Create a benchmark loop monitor that doesn't need any cleanup + pub fn new(max: usize, name: &'static str) -> Self { + Self { + inner: None, + max, + current: 0, + time: 0, + name, + } + } + /// Create a new benchmark loop monitor that uses the given cleanup instructions: + /// - `max`: Total iterations + /// - `name`: Name of benchmark + /// - `connection`: A connection to use for cleanup instructions + /// - `query`: Query to run for cleanup + /// - `response`: Response expected when cleaned up + /// - `skip_on_last`: Skip running the cleanup instructions on the last loop + pub fn new_cleanup( + max: usize, + name: &'static str, + connection: &'a mut Connection, + query: Query, + response: Element, + skip_on_last: bool, + ) -> Self { + Self { + inner: Some(CleanupInner::new(query, response, connection, skip_on_last)), + max, + current: 0, + time: 0, + name: name, + } + } + /// Run cleanup + fn cleanup(&mut self) -> BResult<()> { + let last_iter = self.is_last_iter(); + if let Some(ref mut cleanup) = self.inner { + let should_run_cleanup = (!last_iter) || (last_iter && !cleanup.skip_on_last); + if should_run_cleanup { + return cleanup.cleanup(self.name); + } + } + Ok(()) + } + /// Check if this is the last iteration + fn is_last_iter(&self) -> bool { + (self.max - 1) == self.current + } + /// Step the counter ahead + fn step(&mut self) { + self.current += 1; + } + /// Determine if we should continue executing + fn should_continue(&self) -> bool { + self.current < self.max + } + /// Append a new time to the sum + fn incr_time(&mut self, dt: &SimpleTimer) { + self.time += dt.time_in_nanos().unwrap(); + } + /// Return the sum + fn sum(&self) -> u128 { + self.time + } + /// Return the name of the benchmark + fn name(&self) -> &'static str { + self.name + } +} + +/// Cleanup instructions +struct CleanupInner<'a> { + /// the connection to use for cleanup processes + connection: &'a mut Connection, + /// the query to be run + query: Query, + /// the response to expect + response: Element, + /// whether we should skip on the last loop + skip_on_last: bool, +} + +impl<'a> CleanupInner<'a> { + /// Init cleanup instructions + fn new(q: Query, r: Element, connection: &'a mut Connection, skip_on_last: bool) -> Self { + Self { + query: q, + response: r, + connection, + skip_on_last, + } + } + /// Run cleanup + fn cleanup(&mut self, name: &'static str) -> BResult<()> { + let r: Element = self.connection.run_query(&self.query)?; + if r.ne(&self.response) { + return Err(Error::RuntimeError(format!( + "Failed to run cleanup for benchmark `{}`", + name + ))); + } else { + Ok(()) + } + } +} + #[inline(always)] +/// Returns a vec with the given cap, ensuring that we don't overflow memory fn vec_with_cap(cap: usize) -> BResult> { let mut v = Vec::new(); v.try_reserve_exact(cap)?; Ok(v) } +/// Run the actual benchmarks pub fn run_bench(servercfg: &ServerConfig, matches: ArgMatches) -> BResult<()> { // init bench config let bench_config = BenchmarkConfig::new(servercfg, matches)?; @@ -79,30 +196,12 @@ pub fn run_bench(servercfg: &ServerConfig, matches: ArgMatches) -> BResult<()> { // pool pre-exec setup let servercfg = servercfg.clone(); let switch_table = Query::from("use default.tmpbench").into_raw_query(); - let get_response_size = validation::calculate_response_size(bench_config.kvsize()); - let rcode_okay_size = validation::RESPCODE_OKAY.len(); // init pool config; side_connection is for cleanups let mut misc_connection = Connection::new(servercfg.host(), servercfg.port())?; - let pool_config = PoolConfig::new( - servercfg.connections(), - move || { - let mut stream = TcpStream::connect((servercfg.host(), servercfg.port())).unwrap(); - stream.write_all(&switch_table.clone()).unwrap(); - let mut v = vec![0; rcode_okay_size]; - let _ = stream.read_exact(&mut v).unwrap(); - stream - }, - move |_sock, _packet: Box<[u8]>| panic!("on_loop exec unset"), - |socket| { - socket.shutdown(std::net::Shutdown::Both).unwrap(); - }, - true, - Some(bench_config.query_count()), - ); // init timer and reports - let mut report = AggregateReport::new(bench_config.query_count()); + let mut reports = AggregateReport::new(bench_config.query_count()); // init test data binfo!("Initializing test data ..."); @@ -125,92 +224,28 @@ pub fn run_bench(servercfg: &ServerConfig, matches: ArgMatches) -> BResult<()> { // such an approach helps us keep memory usage low // bench set binfo!("Benchmarking SET ..."); - let mut set_packets: Vec> = vec_with_cap(bench_config.query_count())?; - (0..bench_config.query_count()).for_each(|i| { - set_packets.push( - Query::from("SET") - .arg(RawString::from(keys[i].clone())) - .arg(RawString::from(values[i].clone())) - .into_raw_query() - .into_boxed_slice(), - ) - }); - run_bench_for( - &pool_config, - move |sock, packet: Box<[u8]>| { - sock.write_all(&packet).unwrap(); - // expect rcode 0 - let mut v = vec![0; rcode_okay_size]; - let _ = sock.read_exact(&mut v).unwrap(); - assert_eq!(v, validation::RESPCODE_OKAY); - }, - "set", - &mut report, - set_packets, - bench_config.runs(), + benches::bench_set( + &keys, + &values, &mut misc_connection, - Some(( - Query::from("FLUSHDB").arg("default.tmpbench"), - Element::RespCode(RespCode::Okay), - true, - )), + &bench_config, + &switch_table, + &mut reports, )?; // bench update binfo!("Benchmarking UPDATE ..."); - let mut update_packets = vec_with_cap(bench_config.query_count())?; - (0..bench_config.query_count()).for_each(|i| { - update_packets.push( - Query::from("UPDATE") - .arg(RawString::from(keys[i].clone())) - .arg(RawString::from(new_updated_key.clone())) - .into_raw_query() - .into_boxed_slice(), - ) - }); - run_bench_for( - &pool_config, - move |sock, packet: Box<[u8]>| { - sock.write_all(&packet).unwrap(); - // expect rcode 0 - let mut v = vec![0; rcode_okay_size]; - let _ = sock.read_exact(&mut v).unwrap(); - assert_eq!(v, validation::RESPCODE_OKAY); - }, - "update", - &mut report, - update_packets, - bench_config.runs(), - &mut misc_connection, - None, + benches::bench_update( + &keys, + &new_updated_key, + &bench_config, + &switch_table, + &mut reports, )?; // bench get binfo!("Benchmarking GET ..."); - let mut get_packets: Vec> = vec_with_cap(bench_config.query_count())?; - (0..bench_config.query_count()).for_each(|i| { - get_packets.push( - Query::from("GET") - .arg(RawString::from(keys[i].clone())) - .into_raw_query() - .into_boxed_slice(), - ) - }); - run_bench_for( - &pool_config, - move |sock, packet: Box<[u8]>| { - sock.write_all(&packet).unwrap(); - // expect kvsize byte count - let mut v = vec![0; get_response_size]; - let _ = sock.read_exact(&mut v).unwrap(); - }, - "get", - &mut report, - get_packets, - bench_config.runs(), - &mut misc_connection, - None, - )?; + benches::bench_get(&keys, &bench_config, &switch_table, &mut reports)?; // remove all test data binfo!("Finished benchmarks. Cleaning up ..."); @@ -224,7 +259,7 @@ pub fn run_bench(servercfg: &ServerConfig, matches: ArgMatches) -> BResult<()> { if config::should_output_messages() { // normal output println!("===========RESULTS==========="); - let (maxpad, reports) = report.finish(); + let (maxpad, reports) = reports.finish(); for report in reports { let padding = " ".repeat(maxpad - report.name().len()); println!( @@ -237,53 +272,7 @@ pub fn run_bench(servercfg: &ServerConfig, matches: ArgMatches) -> BResult<()> { println!("============================="); } else { // JSON - println!("{}", report.into_json()) - } - Ok(()) -} - -fn run_bench_for( - pool: &PoolConfig, - closure: F, - name: &'static str, - reports: &mut AggregateReport, - input: Vec, - runs: usize, - tmp_con: &mut Connection, - cleanup: Option<(Query, Element, bool)>, -) -> BResult<()> -where - F: Send + Sync + Fn(&mut Inp, UIn) + Clone + 'static, - Ex: Clone + Fn(&mut Inp) + Send + Sync + 'static, - Inp: Sync + 'static, - Lp: Clone + Fn(&mut Inp, UIn) + Send + Sync + 'static, - Lv: Clone + Fn() -> Inp + Send + 'static + Sync, - UIn: Clone + Send + Sync + 'static, -{ - let mut sum: u128 = 0; - for i in 0..runs { - // run local copy - let this_input = input.clone(); - let pool = pool.with_loop_closure(closure.clone()); - // time - let mut tm = SimpleTimer::new(); - tm.start(); - pool.execute_and_finish_iter(this_input); - tm.stop(); - sum += tm.time_in_nanos().unwrap(); - // cleanup - if let Some((ref cleanup_after_run, ref resp_cleanup_after_run, skip_on_last)) = cleanup { - if !(skip_on_last && (i == runs - 1)) { - let r: Element = tmp_con.run_query(cleanup_after_run)?; - if r.ne(resp_cleanup_after_run) { - return Err(Error::RuntimeError(format!( - "Failed to run cleanup for benchmark `{name}` in iteration {i}" - ))); - } - } - } + println!("{}", reports.into_json()) } - // return average time - reports.push(SingleReport::new(name, sum as f64 / runs as f64)); Ok(()) } diff --git a/sky-bench/src/config.rs b/sky-bench/src/config.rs index e9ebc7e4..5d4efa83 100644 --- a/sky-bench/src/config.rs +++ b/sky-bench/src/config.rs @@ -103,20 +103,21 @@ impl ServerConfig { } /// Benchmark configuration -pub struct BenchmarkConfig<'a> { - pub server: &'a ServerConfig, +#[derive(Clone)] +pub struct BenchmarkConfig { + pub server: ServerConfig, kvsize: usize, queries: usize, runs: usize, } -impl<'a> BenchmarkConfig<'a> { +impl BenchmarkConfig { const DEFAULT_QUERIES: usize = 100_000; const DEFAULT_KVSIZE: usize = 3; const DEFAULT_RUNS: usize = 5; - pub fn new(server: &'a ServerConfig, matches: ArgMatches) -> BResult { + pub fn new(server: &ServerConfig, matches: ArgMatches) -> BResult { let mut slf = Self { - server, + server: server.clone(), queries: Self::DEFAULT_QUERIES, kvsize: Self::DEFAULT_KVSIZE, runs: Self::DEFAULT_RUNS,