Refactor benchmarks into separate functions

next
Sayan Nandan 2 years ago
parent 021579d4f8
commit da645247d2
No known key found for this signature in database
GPG Key ID: 8BC07A0A4D41DD52

@ -304,20 +304,21 @@ where
}
}
/// Execute something
pub fn execute(&self, inp: UIn) {
self.job_distributor.send(JobType::Task(inp)).unwrap();
pub fn execute(&self, inp: UIn) -> bool {
self.job_distributor.send(JobType::Task(inp)).is_ok()
}
/// Execute something that can be executed as a parallel iterator
/// For the best performance, it is recommended that you pass true for `needs_iterator_pool`
/// on initialization of the [`Workpool`]
pub fn execute_iter(&self, iter: impl IntoParallelIterator<Item = UIn>) {
iter.into_par_iter().for_each(|inp| self.execute(inp));
pub fn execute_iter(&self, iter: impl IntoParallelIterator<Item = UIn>) -> bool {
iter.into_par_iter().all(|inp| self.execute(inp))
}
/// Does the same thing as [`execute_iter`] but drops self ensuring that all the
/// workers actually finish their tasks
pub fn execute_and_finish_iter(self, iter: impl IntoParallelIterator<Item = UIn>) {
self.execute_iter(iter);
pub fn execute_and_finish_iter(self, iter: impl IntoParallelIterator<Item = UIn>) -> bool {
let ret = self.execute_iter(iter);
drop(self);
ret
}
/// Initialize a new [`Workpool`] with the default count of threads. This is equal
/// to 2 * the number of logical cores.

@ -0,0 +1,255 @@
/*
* Created on Sat Aug 13 2022
*
* This file is a part of S{
let ref this = loopmon;
this.current
}le (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2022, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use crate::error::Error;
use {
super::{
report::{AggregateReport, SingleReport},
validation, vec_with_cap, BenchmarkConfig, LoopMonitor,
},
crate::error::BResult,
devtimer::SimpleTimer,
libstress::Workpool,
skytable::{types::RawString, Connection, Element, Query, RespCode},
std::{
io::{Read, Write},
net::{Shutdown, TcpStream},
},
};
/// Run a benchmark using the given pre-loop, in-loop and post-loop closures
fn run_bench_custom<Inp, Lp, Lv, Ex>(
bench_config: BenchmarkConfig,
packets: Vec<Box<[u8]>>,
on_init: Lv,
on_loop: Lp,
on_loop_exit: Ex,
loopmon: LoopMonitor,
reports: &mut AggregateReport,
) -> BResult<()>
where
Ex: Clone + Fn(&mut Inp) + Send + Sync + 'static,
Inp: Sync + 'static,
Lp: Clone + Fn(&mut Inp, Box<[u8]>) + Send + Sync + 'static,
Lv: Clone + Fn() -> Inp + Send + 'static + Sync,
{
// now do our runs
let mut loopmon = loopmon;
let mut run_okay = true;
while loopmon.should_continue() && run_okay {
// now create our connection pool
let pool = Workpool::new(
bench_config.server.connections(),
on_init.clone(),
on_loop.clone(),
on_loop_exit.clone(),
true,
Some(bench_config.query_count()),
);
// get our local copy
let this_packets = packets.clone();
// run and time our operations
let mut dt = SimpleTimer::new();
dt.start();
let ok = pool.execute_and_finish_iter(this_packets);
dt.stop();
loopmon.incr_time(&dt);
run_okay = ok;
// cleanup
loopmon.cleanup()?;
loopmon.step();
}
if run_okay {
// save time
reports.push(SingleReport::new(
loopmon.name(),
loopmon.sum() as f64 / bench_config.runs() as f64,
));
Ok(())
} else {
Err(Error::RuntimeError(format!(
"Worker thread for test `{}` crashed. Unable to send job",
loopmon.name()
)))
}
}
#[inline(always)]
/// Init connection and buffer
fn init_connection_and_buf(
host: &str,
port: u16,
start_command: Vec<u8>,
bufsize: usize,
) -> (TcpStream, Vec<u8>) {
let mut con = TcpStream::connect((host, port)).unwrap();
con.write_all(&start_command).unwrap();
let mut ret = [0u8; validation::RESPCODE_OKAY.len()];
con.read_exact(&mut ret).unwrap();
let readbuf = vec![0; bufsize];
(con, readbuf)
}
/// Benchmark SET
pub fn bench_set(
keys: &[Vec<u8>],
values: &[Vec<u8>],
connection: &mut Connection,
bench_config: &BenchmarkConfig,
create_table: &[u8],
reports: &mut AggregateReport,
) -> BResult<()> {
let bench_config = bench_config.clone();
let create_table = create_table.to_owned();
let loopmon = LoopMonitor::new_cleanup(
bench_config.runs(),
"set",
connection,
Query::from("FLUSHDB").arg("default.tmpbench"),
Element::RespCode(RespCode::Okay),
true,
);
let mut packets = vec_with_cap(bench_config.query_count())?;
(0..bench_config.query_count()).for_each(|i| {
packets.push(
Query::from("SET")
.arg(RawString::from(keys[i].to_owned()))
.arg(RawString::from(values[i].to_owned()))
.into_raw_query()
.into_boxed_slice(),
)
});
run_bench_custom(
bench_config.clone(),
packets,
move || {
init_connection_and_buf(
bench_config.server.host(),
bench_config.server.port(),
create_table.to_owned(),
validation::RESPCODE_OKAY.len(),
)
},
|(con, buf), packet| {
con.write_all(&packet).unwrap();
con.read_exact(buf).unwrap();
assert_eq!(buf, validation::RESPCODE_OKAY);
},
|(con, _)| con.shutdown(Shutdown::Both).unwrap(),
loopmon,
reports,
)
}
/// Benchmark UPDATE
pub fn bench_update(
keys: &[Vec<u8>],
new_value: &[u8],
bench_config: &BenchmarkConfig,
create_table: &[u8],
reports: &mut AggregateReport,
) -> BResult<()> {
let bench_config = bench_config.clone();
let create_table = create_table.to_owned();
let loopmon = LoopMonitor::new(bench_config.runs(), "update");
let mut packets = vec_with_cap(bench_config.query_count())?;
(0..bench_config.query_count()).for_each(|i| {
packets.push(
Query::from("update")
.arg(RawString::from(keys[i].clone()))
.arg(RawString::from(new_value.to_owned()))
.into_raw_query()
.into_boxed_slice(),
)
});
run_bench_custom(
bench_config.clone(),
packets,
move || {
init_connection_and_buf(
bench_config.server.host(),
bench_config.server.port(),
create_table.to_owned(),
validation::RESPCODE_OKAY.len(),
)
},
|(con, buf), packet| {
con.write_all(&packet).unwrap();
con.read_exact(buf).unwrap();
assert_eq!(buf, validation::RESPCODE_OKAY);
},
|(con, _)| con.shutdown(Shutdown::Both).unwrap(),
loopmon,
reports,
)
}
/// Benchmark GET
pub fn bench_get(
keys: &[Vec<u8>],
bench_config: &BenchmarkConfig,
create_table: &[u8],
reports: &mut AggregateReport,
) -> BResult<()> {
let bench_config = bench_config.clone();
let create_table = create_table.to_owned();
let loopmon = LoopMonitor::new(bench_config.runs(), "get");
let mut packets = vec_with_cap(bench_config.query_count())?;
(0..bench_config.query_count()).for_each(|i| {
packets.push(
Query::from("get")
.arg(RawString::from(keys[i].clone()))
.into_raw_query()
.into_boxed_slice(),
)
});
run_bench_custom(
bench_config.clone(),
packets,
move || {
init_connection_and_buf(
bench_config.server.host(),
bench_config.server.port(),
create_table.to_owned(),
validation::calculate_response_size(bench_config.kvsize()),
)
},
|(con, buf), packet| {
con.write_all(&packet).unwrap();
con.read_exact(buf).unwrap();
},
|(con, _)| con.shutdown(Shutdown::Both).unwrap(),
loopmon,
reports,
)
}

@ -25,7 +25,7 @@
*/
use {
self::report::{AggregateReport, SingleReport},
self::report::AggregateReport,
crate::{
config,
config::{BenchmarkConfig, ServerConfig},
@ -34,17 +34,11 @@ use {
},
clap::ArgMatches,
devtimer::SimpleTimer,
libstress::{
utils::{generate_random_byte_vector, ran_bytes},
PoolConfig,
},
skytable::{types::RawString, Connection, Element, Query, RespCode},
std::{
io::{Read, Write},
net::TcpStream,
},
libstress::utils::{generate_random_byte_vector, ran_bytes},
skytable::{Connection, Element, Query, RespCode},
};
mod benches;
mod report;
mod validation;
@ -56,13 +50,136 @@ macro_rules! binfo {
};
}
/// The loop monitor can be used for maintaining a loop for a given benchmark
struct LoopMonitor<'a> {
/// cleanup instructions
inner: Option<CleanupInner<'a>>,
/// maximum iterations
max: usize,
/// current iteration
current: usize,
/// total time
time: u128,
/// name of test
name: &'static str,
}
impl<'a> LoopMonitor<'a> {
/// Create a benchmark loop monitor that doesn't need any cleanup
pub fn new(max: usize, name: &'static str) -> Self {
Self {
inner: None,
max,
current: 0,
time: 0,
name,
}
}
/// Create a new benchmark loop monitor that uses the given cleanup instructions:
/// - `max`: Total iterations
/// - `name`: Name of benchmark
/// - `connection`: A connection to use for cleanup instructions
/// - `query`: Query to run for cleanup
/// - `response`: Response expected when cleaned up
/// - `skip_on_last`: Skip running the cleanup instructions on the last loop
pub fn new_cleanup(
max: usize,
name: &'static str,
connection: &'a mut Connection,
query: Query,
response: Element,
skip_on_last: bool,
) -> Self {
Self {
inner: Some(CleanupInner::new(query, response, connection, skip_on_last)),
max,
current: 0,
time: 0,
name: name,
}
}
/// Run cleanup
fn cleanup(&mut self) -> BResult<()> {
let last_iter = self.is_last_iter();
if let Some(ref mut cleanup) = self.inner {
let should_run_cleanup = (!last_iter) || (last_iter && !cleanup.skip_on_last);
if should_run_cleanup {
return cleanup.cleanup(self.name);
}
}
Ok(())
}
/// Check if this is the last iteration
fn is_last_iter(&self) -> bool {
(self.max - 1) == self.current
}
/// Step the counter ahead
fn step(&mut self) {
self.current += 1;
}
/// Determine if we should continue executing
fn should_continue(&self) -> bool {
self.current < self.max
}
/// Append a new time to the sum
fn incr_time(&mut self, dt: &SimpleTimer) {
self.time += dt.time_in_nanos().unwrap();
}
/// Return the sum
fn sum(&self) -> u128 {
self.time
}
/// Return the name of the benchmark
fn name(&self) -> &'static str {
self.name
}
}
/// Cleanup instructions
struct CleanupInner<'a> {
/// the connection to use for cleanup processes
connection: &'a mut Connection,
/// the query to be run
query: Query,
/// the response to expect
response: Element,
/// whether we should skip on the last loop
skip_on_last: bool,
}
impl<'a> CleanupInner<'a> {
/// Init cleanup instructions
fn new(q: Query, r: Element, connection: &'a mut Connection, skip_on_last: bool) -> Self {
Self {
query: q,
response: r,
connection,
skip_on_last,
}
}
/// Run cleanup
fn cleanup(&mut self, name: &'static str) -> BResult<()> {
let r: Element = self.connection.run_query(&self.query)?;
if r.ne(&self.response) {
return Err(Error::RuntimeError(format!(
"Failed to run cleanup for benchmark `{}`",
name
)));
} else {
Ok(())
}
}
}
#[inline(always)]
/// Returns a vec with the given cap, ensuring that we don't overflow memory
fn vec_with_cap<T>(cap: usize) -> BResult<Vec<T>> {
let mut v = Vec::new();
v.try_reserve_exact(cap)?;
Ok(v)
}
/// Run the actual benchmarks
pub fn run_bench(servercfg: &ServerConfig, matches: ArgMatches) -> BResult<()> {
// init bench config
let bench_config = BenchmarkConfig::new(servercfg, matches)?;
@ -79,30 +196,12 @@ pub fn run_bench(servercfg: &ServerConfig, matches: ArgMatches) -> BResult<()> {
// pool pre-exec setup
let servercfg = servercfg.clone();
let switch_table = Query::from("use default.tmpbench").into_raw_query();
let get_response_size = validation::calculate_response_size(bench_config.kvsize());
let rcode_okay_size = validation::RESPCODE_OKAY.len();
// init pool config; side_connection is for cleanups
let mut misc_connection = Connection::new(servercfg.host(), servercfg.port())?;
let pool_config = PoolConfig::new(
servercfg.connections(),
move || {
let mut stream = TcpStream::connect((servercfg.host(), servercfg.port())).unwrap();
stream.write_all(&switch_table.clone()).unwrap();
let mut v = vec![0; rcode_okay_size];
let _ = stream.read_exact(&mut v).unwrap();
stream
},
move |_sock, _packet: Box<[u8]>| panic!("on_loop exec unset"),
|socket| {
socket.shutdown(std::net::Shutdown::Both).unwrap();
},
true,
Some(bench_config.query_count()),
);
// init timer and reports
let mut report = AggregateReport::new(bench_config.query_count());
let mut reports = AggregateReport::new(bench_config.query_count());
// init test data
binfo!("Initializing test data ...");
@ -125,92 +224,28 @@ pub fn run_bench(servercfg: &ServerConfig, matches: ArgMatches) -> BResult<()> {
// such an approach helps us keep memory usage low
// bench set
binfo!("Benchmarking SET ...");
let mut set_packets: Vec<Box<[u8]>> = vec_with_cap(bench_config.query_count())?;
(0..bench_config.query_count()).for_each(|i| {
set_packets.push(
Query::from("SET")
.arg(RawString::from(keys[i].clone()))
.arg(RawString::from(values[i].clone()))
.into_raw_query()
.into_boxed_slice(),
)
});
run_bench_for(
&pool_config,
move |sock, packet: Box<[u8]>| {
sock.write_all(&packet).unwrap();
// expect rcode 0
let mut v = vec![0; rcode_okay_size];
let _ = sock.read_exact(&mut v).unwrap();
assert_eq!(v, validation::RESPCODE_OKAY);
},
"set",
&mut report,
set_packets,
bench_config.runs(),
benches::bench_set(
&keys,
&values,
&mut misc_connection,
Some((
Query::from("FLUSHDB").arg("default.tmpbench"),
Element::RespCode(RespCode::Okay),
true,
)),
&bench_config,
&switch_table,
&mut reports,
)?;
// bench update
binfo!("Benchmarking UPDATE ...");
let mut update_packets = vec_with_cap(bench_config.query_count())?;
(0..bench_config.query_count()).for_each(|i| {
update_packets.push(
Query::from("UPDATE")
.arg(RawString::from(keys[i].clone()))
.arg(RawString::from(new_updated_key.clone()))
.into_raw_query()
.into_boxed_slice(),
)
});
run_bench_for(
&pool_config,
move |sock, packet: Box<[u8]>| {
sock.write_all(&packet).unwrap();
// expect rcode 0
let mut v = vec![0; rcode_okay_size];
let _ = sock.read_exact(&mut v).unwrap();
assert_eq!(v, validation::RESPCODE_OKAY);
},
"update",
&mut report,
update_packets,
bench_config.runs(),
&mut misc_connection,
None,
benches::bench_update(
&keys,
&new_updated_key,
&bench_config,
&switch_table,
&mut reports,
)?;
// bench get
binfo!("Benchmarking GET ...");
let mut get_packets: Vec<Box<[u8]>> = vec_with_cap(bench_config.query_count())?;
(0..bench_config.query_count()).for_each(|i| {
get_packets.push(
Query::from("GET")
.arg(RawString::from(keys[i].clone()))
.into_raw_query()
.into_boxed_slice(),
)
});
run_bench_for(
&pool_config,
move |sock, packet: Box<[u8]>| {
sock.write_all(&packet).unwrap();
// expect kvsize byte count
let mut v = vec![0; get_response_size];
let _ = sock.read_exact(&mut v).unwrap();
},
"get",
&mut report,
get_packets,
bench_config.runs(),
&mut misc_connection,
None,
)?;
benches::bench_get(&keys, &bench_config, &switch_table, &mut reports)?;
// remove all test data
binfo!("Finished benchmarks. Cleaning up ...");
@ -224,7 +259,7 @@ pub fn run_bench(servercfg: &ServerConfig, matches: ArgMatches) -> BResult<()> {
if config::should_output_messages() {
// normal output
println!("===========RESULTS===========");
let (maxpad, reports) = report.finish();
let (maxpad, reports) = reports.finish();
for report in reports {
let padding = " ".repeat(maxpad - report.name().len());
println!(
@ -237,53 +272,7 @@ pub fn run_bench(servercfg: &ServerConfig, matches: ArgMatches) -> BResult<()> {
println!("=============================");
} else {
// JSON
println!("{}", report.into_json())
}
Ok(())
}
fn run_bench_for<F, Inp, UIn, Lv, Lp, Ex>(
pool: &PoolConfig<Inp, UIn, Lv, Lp, Ex>,
closure: F,
name: &'static str,
reports: &mut AggregateReport,
input: Vec<UIn>,
runs: usize,
tmp_con: &mut Connection,
cleanup: Option<(Query, Element, bool)>,
) -> BResult<()>
where
F: Send + Sync + Fn(&mut Inp, UIn) + Clone + 'static,
Ex: Clone + Fn(&mut Inp) + Send + Sync + 'static,
Inp: Sync + 'static,
Lp: Clone + Fn(&mut Inp, UIn) + Send + Sync + 'static,
Lv: Clone + Fn() -> Inp + Send + 'static + Sync,
UIn: Clone + Send + Sync + 'static,
{
let mut sum: u128 = 0;
for i in 0..runs {
// run local copy
let this_input = input.clone();
let pool = pool.with_loop_closure(closure.clone());
// time
let mut tm = SimpleTimer::new();
tm.start();
pool.execute_and_finish_iter(this_input);
tm.stop();
sum += tm.time_in_nanos().unwrap();
// cleanup
if let Some((ref cleanup_after_run, ref resp_cleanup_after_run, skip_on_last)) = cleanup {
if !(skip_on_last && (i == runs - 1)) {
let r: Element = tmp_con.run_query(cleanup_after_run)?;
if r.ne(resp_cleanup_after_run) {
return Err(Error::RuntimeError(format!(
"Failed to run cleanup for benchmark `{name}` in iteration {i}"
)));
}
}
}
println!("{}", reports.into_json())
}
// return average time
reports.push(SingleReport::new(name, sum as f64 / runs as f64));
Ok(())
}

@ -103,20 +103,21 @@ impl ServerConfig {
}
/// Benchmark configuration
pub struct BenchmarkConfig<'a> {
pub server: &'a ServerConfig,
#[derive(Clone)]
pub struct BenchmarkConfig {
pub server: ServerConfig,
kvsize: usize,
queries: usize,
runs: usize,
}
impl<'a> BenchmarkConfig<'a> {
impl BenchmarkConfig {
const DEFAULT_QUERIES: usize = 100_000;
const DEFAULT_KVSIZE: usize = 3;
const DEFAULT_RUNS: usize = 5;
pub fn new(server: &'a ServerConfig, matches: ArgMatches) -> BResult<Self> {
pub fn new(server: &ServerConfig, matches: ArgMatches) -> BResult<Self> {
let mut slf = Self {
server,
server: server.clone(),
queries: Self::DEFAULT_QUERIES,
kvsize: Self::DEFAULT_KVSIZE,
runs: Self::DEFAULT_RUNS,

Loading…
Cancel
Save