Re-implement benchmark tool
This commit re-implements the benchmark tool, doing a lot of refactoring and speeding up things, while reducing memory usage. Some notes: 1. The `testkey` subcommand was removed. This was needed because it makes no sense to have it with data types like lists. I'll leave support for creating test data for another PR (if at all needed) 2. The bench tool now uses lesser memory "in a go." Previously, all the get, set and update packets were generated in one single step. This is no longer done. Instead, we generate the packets for one type, run its corresponding tests and completely deallocate and cleanup once we're done with that test. This helps alleviate memory usage. 3. Instead of going in the sequence (GET, SET, UPDATE, ...) and running that set some n times, we now run each test n times, and repeat that for the other tests. The advantage of this is code clarity. 4. Finally, more validation checks were added ensuring that the benchmark tool very closely resembles a "real-life" scenario wherein a connection is picked up from a pool by a thread, a task run-on and evaluated. 5. Better logging and error reporting: instead of vaguely crashing, the benchmark tool provides more clarity on errors. However, panics in worker threads are yet to be improved in terms of reporting.next
parent
a512b8b617
commit
0f11dcfb88
@ -0,0 +1,279 @@
|
||||
/*
|
||||
* Created on Tue Aug 09 2022
|
||||
*
|
||||
* This file is a part of Skytable
|
||||
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
|
||||
* NoSQL database written by Sayan Nandan ("the Author") with the
|
||||
* vision to provide flexibility in data modelling without compromising
|
||||
* on performance, queryability or scalability.
|
||||
*
|
||||
* Copyright (c) 2022, Sayan Nandan <ohsayan@outlook.com>
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*
|
||||
*/
|
||||
|
||||
use {
|
||||
self::report::{AggregateReport, SingleReport},
|
||||
crate::{
|
||||
config,
|
||||
config::{BenchmarkConfig, ServerConfig},
|
||||
error::{BResult, Error},
|
||||
util,
|
||||
},
|
||||
clap::ArgMatches,
|
||||
devtimer::SimpleTimer,
|
||||
libstress::{
|
||||
utils::{generate_random_byte_vector, ran_bytes},
|
||||
PoolConfig,
|
||||
},
|
||||
skytable::{types::RawString, Connection, Element, Query, RespCode},
|
||||
std::{
|
||||
io::{Read, Write},
|
||||
net::TcpStream,
|
||||
},
|
||||
};
|
||||
|
||||
mod report;
|
||||
mod validation;
|
||||
|
||||
macro_rules! binfo {
|
||||
($($arg:tt)+) => {
|
||||
if $crate::config::should_output_messages() {
|
||||
::log::info!($($arg)+)
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
pub fn run_bench(servercfg: &ServerConfig, matches: ArgMatches) -> BResult<()> {
|
||||
// init bench config
|
||||
let bench_config = BenchmarkConfig::new(servercfg, matches)?;
|
||||
// check if we have enough combinations for the given query count and key size
|
||||
if !util::has_enough_ncr(bench_config.kvsize(), bench_config.query_count()) {
|
||||
return Err(Error::RuntimeError(
|
||||
"too low sample space for given query count. use larger kvsize".into(),
|
||||
));
|
||||
}
|
||||
// run sanity test; this will also set up the temporary table for benchmarking
|
||||
binfo!("Running sanity test ...");
|
||||
util::run_sanity_test(&bench_config.server)?;
|
||||
|
||||
// pool pre-exec setup
|
||||
let servercfg = servercfg.clone();
|
||||
let switch_table = Query::from("use default.tmpbench").into_raw_query();
|
||||
let get_response_size = validation::calculate_response_size(bench_config.kvsize());
|
||||
let rcode_okay_size = validation::RESPCODE_OKAY.len();
|
||||
|
||||
// init pool config; side_connection is for cleanups
|
||||
let mut misc_connection = Connection::new(servercfg.host(), servercfg.port())?;
|
||||
let pool_config = PoolConfig::new(
|
||||
servercfg.connections(),
|
||||
move || {
|
||||
let mut stream = TcpStream::connect((servercfg.host(), servercfg.port())).unwrap();
|
||||
stream.write_all(&switch_table.clone()).unwrap();
|
||||
let mut v = vec![0; rcode_okay_size];
|
||||
let _ = stream.read_exact(&mut v).unwrap();
|
||||
stream
|
||||
},
|
||||
move |_sock, _packet: Box<[u8]>| panic!("on_loop exec unset"),
|
||||
|socket| {
|
||||
socket.shutdown(std::net::Shutdown::Both).unwrap();
|
||||
},
|
||||
true,
|
||||
Some(bench_config.query_count()),
|
||||
);
|
||||
|
||||
// init timer and reports
|
||||
let mut report = AggregateReport::new(bench_config.query_count());
|
||||
|
||||
// init test data
|
||||
binfo!("Initializing test data ...");
|
||||
let mut rng = rand::thread_rng();
|
||||
let keys = generate_random_byte_vector(
|
||||
bench_config.query_count(),
|
||||
bench_config.kvsize(),
|
||||
&mut rng,
|
||||
true,
|
||||
)?;
|
||||
let values = generate_random_byte_vector(
|
||||
bench_config.query_count(),
|
||||
bench_config.kvsize(),
|
||||
&mut rng,
|
||||
false,
|
||||
)?;
|
||||
let new_updated_key = ran_bytes(bench_config.kvsize(), &mut rng);
|
||||
|
||||
// run tests; the idea here is to run all tests one-by-one instead of generating all packets at once
|
||||
// such an approach helps us keep memory usage low
|
||||
// bench set
|
||||
binfo!("Benchmarking SET ...");
|
||||
let set_packets: Vec<Box<[u8]>> = (0..bench_config.query_count())
|
||||
.map(|i| {
|
||||
Query::from("SET")
|
||||
.arg(RawString::from(keys[i].clone()))
|
||||
.arg(RawString::from(values[i].clone()))
|
||||
.into_raw_query()
|
||||
.into_boxed_slice()
|
||||
})
|
||||
.collect();
|
||||
run_bench_for(
|
||||
&pool_config,
|
||||
move |sock, packet: Box<[u8]>| {
|
||||
sock.write_all(&packet).unwrap();
|
||||
// expect rcode 0
|
||||
let mut v = vec![0; rcode_okay_size];
|
||||
let _ = sock.read_exact(&mut v).unwrap();
|
||||
assert_eq!(v, validation::RESPCODE_OKAY);
|
||||
},
|
||||
"set",
|
||||
&mut report,
|
||||
set_packets,
|
||||
bench_config.runs(),
|
||||
&mut misc_connection,
|
||||
Some((
|
||||
Query::from("FLUSHDB").arg("default.tmpbench"),
|
||||
Element::RespCode(RespCode::Okay),
|
||||
true,
|
||||
)),
|
||||
)?;
|
||||
|
||||
// bench update
|
||||
binfo!("Benchmarking UPDATE ...");
|
||||
let update_packets: Vec<Box<[u8]>> = (0..bench_config.query_count())
|
||||
.map(|i| {
|
||||
Query::from("UPDATE")
|
||||
.arg(RawString::from(keys[i].clone()))
|
||||
.arg(RawString::from(new_updated_key.clone()))
|
||||
.into_raw_query()
|
||||
.into_boxed_slice()
|
||||
})
|
||||
.collect();
|
||||
run_bench_for(
|
||||
&pool_config,
|
||||
move |sock, packet: Box<[u8]>| {
|
||||
sock.write_all(&packet).unwrap();
|
||||
// expect rcode 0
|
||||
let mut v = vec![0; rcode_okay_size];
|
||||
let _ = sock.read_exact(&mut v).unwrap();
|
||||
assert_eq!(v, validation::RESPCODE_OKAY);
|
||||
},
|
||||
"update",
|
||||
&mut report,
|
||||
update_packets,
|
||||
bench_config.runs(),
|
||||
&mut misc_connection,
|
||||
None,
|
||||
)?;
|
||||
|
||||
// bench get
|
||||
binfo!("Benchmarking GET ...");
|
||||
let get_packets: Vec<Box<[u8]>> = (0..bench_config.query_count())
|
||||
.map(|i| {
|
||||
Query::from("GET")
|
||||
.arg(RawString::from(keys[i].clone()))
|
||||
.into_raw_query()
|
||||
.into_boxed_slice()
|
||||
})
|
||||
.collect();
|
||||
run_bench_for(
|
||||
&pool_config,
|
||||
move |sock, packet: Box<[u8]>| {
|
||||
sock.write_all(&packet).unwrap();
|
||||
// expect kvsize byte count
|
||||
let mut v = vec![0; get_response_size];
|
||||
let _ = sock.read_exact(&mut v).unwrap();
|
||||
},
|
||||
"get",
|
||||
&mut report,
|
||||
get_packets,
|
||||
bench_config.runs(),
|
||||
&mut misc_connection,
|
||||
None,
|
||||
)?;
|
||||
|
||||
// remove all test data
|
||||
binfo!("Finished benchmarks. Cleaning up ...");
|
||||
let r: Element = misc_connection.run_query(Query::from("drop model default.tmpbench force"))?;
|
||||
if r != Element::RespCode(RespCode::Okay) {
|
||||
return Err(Error::RuntimeError(
|
||||
"failed to clean up after benchmarks".into(),
|
||||
));
|
||||
}
|
||||
|
||||
if config::should_output_messages() {
|
||||
// normal output
|
||||
println!("===========RESULTS===========");
|
||||
let (maxpad, reports) = report.finish();
|
||||
for report in reports {
|
||||
let padding = " ".repeat(maxpad - report.name().len());
|
||||
println!(
|
||||
"{}{} {:.6}/sec",
|
||||
report.name().to_uppercase(),
|
||||
padding,
|
||||
report.stat(),
|
||||
);
|
||||
}
|
||||
println!("=============================");
|
||||
} else {
|
||||
// JSON
|
||||
println!("{}", report.into_json())
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn run_bench_for<F, Inp, UIn, Lv, Lp, Ex>(
|
||||
pool: &PoolConfig<Inp, UIn, Lv, Lp, Ex>,
|
||||
closure: F,
|
||||
name: &'static str,
|
||||
reports: &mut AggregateReport,
|
||||
input: Vec<UIn>,
|
||||
runs: usize,
|
||||
tmp_con: &mut Connection,
|
||||
cleanup: Option<(Query, Element, bool)>,
|
||||
) -> BResult<()>
|
||||
where
|
||||
F: Send + Sync + Fn(&mut Inp, UIn) + Clone + 'static,
|
||||
Ex: Clone + Fn(&mut Inp) + Send + Sync + 'static,
|
||||
Inp: Sync + 'static,
|
||||
Lp: Clone + Fn(&mut Inp, UIn) + Send + Sync + 'static,
|
||||
Lv: Clone + Fn() -> Inp + Send + 'static + Sync,
|
||||
UIn: Clone + Send + Sync + 'static,
|
||||
{
|
||||
let mut sum: u128 = 0;
|
||||
for i in 0..runs {
|
||||
// run local copy
|
||||
let this_input = input.clone();
|
||||
let pool = pool.with_loop_closure(closure.clone());
|
||||
// time
|
||||
let mut tm = SimpleTimer::new();
|
||||
tm.start();
|
||||
pool.execute_and_finish_iter(this_input);
|
||||
tm.stop();
|
||||
sum += tm.time_in_nanos().unwrap();
|
||||
// cleanup
|
||||
if let Some((ref cleanup_after_run, ref resp_cleanup_after_run, skip_on_last)) = cleanup {
|
||||
if !(skip_on_last && (i == runs - 1)) {
|
||||
let r: Element = tmp_con.run_query(cleanup_after_run)?;
|
||||
if r.ne(resp_cleanup_after_run) {
|
||||
return Err(Error::RuntimeError(format!(
|
||||
"Failed to run cleanup for benchmark `{name}` in iteration {i}"
|
||||
)));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// return average time
|
||||
reports.push(SingleReport::new(name, sum as f64 / runs as f64));
|
||||
Ok(())
|
||||
}
|
@ -0,0 +1,82 @@
|
||||
/*
|
||||
* Created on Wed Aug 10 2022
|
||||
*
|
||||
* This file is a part of Skytable
|
||||
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
|
||||
* NoSQL database written by Sayan Nandan ("the Author") with the
|
||||
* vision to provide flexibility in data modelling without compromising
|
||||
* on performance, queryability or scalability.
|
||||
*
|
||||
* Copyright (c) 2022, Sayan Nandan <ohsayan@outlook.com>
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*
|
||||
*/
|
||||
|
||||
use serde::Serialize;
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct SingleReport {
|
||||
name: &'static str,
|
||||
stat: f64,
|
||||
}
|
||||
|
||||
impl SingleReport {
|
||||
pub fn new(name: &'static str, stat: f64) -> Self {
|
||||
Self { name, stat }
|
||||
}
|
||||
|
||||
pub fn stat(&self) -> f64 {
|
||||
self.stat
|
||||
}
|
||||
|
||||
pub fn name(&self) -> &str {
|
||||
self.name
|
||||
}
|
||||
}
|
||||
|
||||
pub struct AggregateReport {
|
||||
names: Vec<SingleReport>,
|
||||
query_count: usize,
|
||||
}
|
||||
|
||||
impl AggregateReport {
|
||||
pub fn new(query_count: usize) -> Self {
|
||||
Self {
|
||||
names: Vec::new(),
|
||||
query_count,
|
||||
}
|
||||
}
|
||||
pub fn push(&mut self, report: SingleReport) {
|
||||
self.names.push(report)
|
||||
}
|
||||
pub(crate) fn into_json(self) -> String {
|
||||
let (_, report) = self.finish();
|
||||
serde_json::to_string(&report).unwrap()
|
||||
}
|
||||
|
||||
pub(crate) fn finish(self) -> (usize, Vec<SingleReport>) {
|
||||
let mut maxpad = self.names[0].name.len();
|
||||
let mut reps = self.names;
|
||||
reps.iter_mut().for_each(|rep| {
|
||||
let total_time = rep.stat;
|
||||
let qps = (self.query_count as f64 / total_time) * 1_000_000_000_f64;
|
||||
rep.stat = qps;
|
||||
if rep.name.len() > maxpad {
|
||||
maxpad = rep.name.len();
|
||||
}
|
||||
});
|
||||
(maxpad, reps)
|
||||
}
|
||||
}
|
@ -0,0 +1,39 @@
|
||||
/*
|
||||
* Created on Tue Aug 09 2022
|
||||
*
|
||||
* This file is a part of Skytable
|
||||
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
|
||||
* NoSQL database written by Sayan Nandan ("the Author") with the
|
||||
* vision to provide flexibility in data modelling without compromising
|
||||
* on performance, queryability or scalability.
|
||||
*
|
||||
* Copyright (c) 2022, Sayan Nandan <ohsayan@outlook.com>
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*
|
||||
*/
|
||||
|
||||
pub const RESPCODE_OKAY: &[u8] = b"*!0\n";
|
||||
|
||||
pub fn calculate_response_size(keylen: usize) -> usize {
|
||||
/*
|
||||
*+5\n
|
||||
hello
|
||||
*/
|
||||
let mut size = 2; // simple query byte + tsymbol
|
||||
size += keylen.to_string().len(); // bytes in length
|
||||
size += 1; // LF
|
||||
size += keylen; // payload
|
||||
size
|
||||
}
|
@ -1,242 +0,0 @@
|
||||
/*
|
||||
* Created on Thu Jun 17 2021
|
||||
*
|
||||
* This file is a part of Skytable
|
||||
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
|
||||
* NoSQL database written by Sayan Nandan ("the Author") with the
|
||||
* vision to provide flexibility in data modelling without compromising
|
||||
* on performance, queryability or scalability.
|
||||
*
|
||||
* Copyright (c) 2021, Sayan Nandan <ohsayan@outlook.com>
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*
|
||||
*/
|
||||
|
||||
use {
|
||||
self::validation::SQ_RESPCODE_SIZE,
|
||||
crate::{report::AggregatedReport, util},
|
||||
devtimer::DevTime,
|
||||
libstress::{utils::generate_random_byte_vector, PoolConfig},
|
||||
rand::thread_rng,
|
||||
skytable::{types::RawString, Query},
|
||||
std::{
|
||||
io::{Read, Write},
|
||||
net::TcpStream,
|
||||
},
|
||||
};
|
||||
pub mod validation;
|
||||
|
||||
const NOTICE_INIT_BENCH: &str = "Finished sanity test. Initializing benchmark ...";
|
||||
const NOTICE_INIT_COMPLETE: &str = "Initialization complete! Benchmark started";
|
||||
const CONFIG_TABLE_MODEL: &str = "keymap(binstr,binstr)";
|
||||
const CONFIG_TABLE_VOLATILITY: &str = "volatile";
|
||||
|
||||
/// Run the benchmark tool
|
||||
pub fn runner(
|
||||
host: String,
|
||||
port: u16,
|
||||
max_connections: usize,
|
||||
max_queries: usize,
|
||||
per_kv_size: usize,
|
||||
json_out: bool,
|
||||
runs: usize,
|
||||
) {
|
||||
if !util::enough_ncr(per_kv_size, max_queries) {
|
||||
err!("Too low sample space for given k/v size and query count. Try a higher k/v size.");
|
||||
}
|
||||
|
||||
if !json_out {
|
||||
println!("Running sanity test ...");
|
||||
}
|
||||
|
||||
if let Err(e) = sanity_test!(host, port) {
|
||||
err!(format!("Sanity test failed with error: {}", e));
|
||||
}
|
||||
|
||||
if !json_out {
|
||||
println!("{}", NOTICE_INIT_BENCH);
|
||||
println!("Connections: {}", max_connections);
|
||||
println!("Queries: {}", max_queries);
|
||||
println!("Data size (key+value): {} bytes", (per_kv_size * 2));
|
||||
}
|
||||
|
||||
let host = hoststr!(host, port);
|
||||
let mut rand = thread_rng();
|
||||
let temp_table = init_temp_table(&mut rand, &host);
|
||||
let switch_table = Query::from("use")
|
||||
.arg(format!("default:{}", &temp_table))
|
||||
.into_raw_query();
|
||||
|
||||
let pool_config = PoolConfig::new(
|
||||
max_connections,
|
||||
move || {
|
||||
let mut stream = TcpStream::connect(&host).unwrap();
|
||||
stream.write_all(&switch_table.clone()).unwrap();
|
||||
let mut v = vec![0; SQ_RESPCODE_SIZE];
|
||||
let _ = stream.read_exact(&mut v).unwrap();
|
||||
stream
|
||||
},
|
||||
move |sock, packet: Vec<u8>| {
|
||||
sock.write_all(&packet).unwrap();
|
||||
// all `okay`s are returned (for both update and set)
|
||||
let mut v = vec![0; SQ_RESPCODE_SIZE];
|
||||
let _ = sock.read_exact(&mut v).unwrap();
|
||||
},
|
||||
|socket| {
|
||||
socket.shutdown(std::net::Shutdown::Both).unwrap();
|
||||
},
|
||||
true,
|
||||
Some(max_queries),
|
||||
);
|
||||
|
||||
// Create separate connection pools for get and set operations
|
||||
|
||||
let keys = generate_random_byte_vector(max_queries, per_kv_size, &mut rand, true);
|
||||
let values = generate_random_byte_vector(max_queries, per_kv_size, &mut rand, false);
|
||||
let (keys, values) = match (keys, values) {
|
||||
(Ok(k), Ok(v)) => (k, v),
|
||||
_ => err!("Allocation error"),
|
||||
};
|
||||
|
||||
/*
|
||||
We create three vectors of vectors: `set_packs`, `get_packs` and `del_packs`
|
||||
The bytes in each of `set_packs` has a query packet for setting data;
|
||||
The bytes in each of `get_packs` has a query packet for getting a key set by one of `set_packs`
|
||||
since we use the same key/value pairs for all;
|
||||
The bytes in each of `del_packs` has a query packet for deleting a key created by
|
||||
one of `set_packs`
|
||||
*/
|
||||
let set_packs: Vec<Vec<u8>> = (0..max_queries)
|
||||
.map(|idx| {
|
||||
let mut q = Query::from("SET");
|
||||
q.push(RawString::from(keys[idx].clone()));
|
||||
q.push(RawString::from(values[idx].clone()));
|
||||
q.into_raw_query()
|
||||
})
|
||||
.collect();
|
||||
let get_packs: Vec<Vec<u8>> = (0..max_queries)
|
||||
.map(|idx| {
|
||||
let mut q = Query::from("GET");
|
||||
q.push(RawString::from(keys[idx].clone()));
|
||||
q.into_raw_query()
|
||||
})
|
||||
.collect();
|
||||
// just update key -> value to key -> key to avoid unnecessary memory usage
|
||||
let update_packs: Vec<Vec<u8>> = (0..max_queries)
|
||||
.map(|idx| {
|
||||
let mut q = Query::from("UPDATE");
|
||||
q.push(RawString::from(keys[idx].clone()));
|
||||
q.push(RawString::from(keys[idx].clone()));
|
||||
q.into_raw_query()
|
||||
})
|
||||
.collect();
|
||||
|
||||
if !json_out {
|
||||
println!("Per-packet size (GET): {} bytes", get_packs[0].len());
|
||||
println!("Per-packet size (SET): {} bytes", set_packs[0].len());
|
||||
println!("Per-packet size (UPDATE): {} bytes", update_packs[0].len());
|
||||
println!("{}", NOTICE_INIT_COMPLETE);
|
||||
}
|
||||
|
||||
let mut report = AggregatedReport::new(3, runs, max_queries);
|
||||
for i in 1..runs + 1 {
|
||||
let mut dt = DevTime::new_complex();
|
||||
// clone in the keys
|
||||
let set_packs = set_packs.clone();
|
||||
let get_packs = get_packs.clone();
|
||||
let update_packs = update_packs.clone();
|
||||
|
||||
// bench SET
|
||||
let setpool = pool_config.get_pool();
|
||||
dt.create_timer("SET").unwrap();
|
||||
dt.start_timer("SET").unwrap();
|
||||
setpool.execute_and_finish_iter(set_packs);
|
||||
dt.stop_timer("SET").unwrap();
|
||||
|
||||
let get_response_packet_size =
|
||||
validation::calculate_monoelement_dataframe_size(per_kv_size)
|
||||
+ validation::calculate_metaframe_size(1);
|
||||
let getpool =
|
||||
pool_config.with_loop_closure(move |sock: &mut TcpStream, packet: Vec<u8>| {
|
||||
sock.write_all(&packet).unwrap();
|
||||
// read exact for the key size
|
||||
let mut v = vec![0; get_response_packet_size];
|
||||
let _ = sock.read_exact(&mut v).unwrap();
|
||||
});
|
||||
dt.create_timer("GET").unwrap();
|
||||
dt.start_timer("GET").unwrap();
|
||||
getpool.execute_and_finish_iter(get_packs);
|
||||
dt.stop_timer("GET").unwrap();
|
||||
|
||||
// bench UPDATE
|
||||
let update_pool = pool_config.get_pool();
|
||||
dt.create_timer("UPDATE").unwrap();
|
||||
dt.start_timer("UPDATE").unwrap();
|
||||
update_pool.execute_and_finish_iter(update_packs);
|
||||
dt.stop_timer("UPDATE").unwrap();
|
||||
|
||||
if !json_out {
|
||||
println!("Finished run: {}", i);
|
||||
}
|
||||
|
||||
// drop table
|
||||
let flushdb = Query::new()
|
||||
.arg("FLUSHDB")
|
||||
.arg(format!("default:{}", &temp_table))
|
||||
.into_raw_query();
|
||||
let drop_pool = pool_config.get_pool_with_workers(1);
|
||||
drop_pool.execute(flushdb);
|
||||
drop(drop_pool);
|
||||
dt.iter()
|
||||
.for_each(|(name, timer)| report.insert(name, timer.time_in_nanos().unwrap()));
|
||||
}
|
||||
print_results(json_out, report);
|
||||
}
|
||||
|
||||
fn init_temp_table(rand: &mut impl rand::Rng, host: &str) -> String {
|
||||
let temp_table = libstress::utils::rand_alphastring(10, rand);
|
||||
let create_table = Query::from("create")
|
||||
.arg("table")
|
||||
.arg(&temp_table)
|
||||
.arg(CONFIG_TABLE_MODEL)
|
||||
.arg(CONFIG_TABLE_VOLATILITY)
|
||||
.into_raw_query();
|
||||
let mut create_table_connection = TcpStream::connect(host).unwrap();
|
||||
// create table
|
||||
create_table_connection.write_all(&create_table).unwrap();
|
||||
let mut v = [0u8; SQ_RESPCODE_SIZE];
|
||||
let _ = create_table_connection.read_exact(&mut v).unwrap();
|
||||
temp_table
|
||||
}
|
||||
|
||||
fn print_results(flag_json: bool, report: AggregatedReport) {
|
||||
if flag_json {
|
||||
let serialized = report.into_json();
|
||||
println!("{}", serialized);
|
||||
} else {
|
||||
println!("===========RESULTS===========");
|
||||
let (report, maxpad) = report.into_sorted_stat();
|
||||
let pad = |clen: usize| " ".repeat(maxpad - clen);
|
||||
report.into_iter().for_each(|block| {
|
||||
println!(
|
||||
"{}{} {:.6}/sec",
|
||||
block.get_report(),
|
||||
pad(block.get_report().len()),
|
||||
block.get_stat()
|
||||
);
|
||||
});
|
||||
println!("=============================");
|
||||
}
|
||||
}
|
@ -1,131 +0,0 @@
|
||||
/*
|
||||
* Created on Fri Nov 26 2021
|
||||
*
|
||||
* This file is a part of Skytable
|
||||
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
|
||||
* NoSQL database written by Sayan Nandan ("the Author") with the
|
||||
* vision to provide flexibility in data modelling without compromising
|
||||
* on performance, queryability or scalability.
|
||||
*
|
||||
* Copyright (c) 2021, Sayan Nandan <ohsayan@outlook.com>
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*
|
||||
*/
|
||||
|
||||
pub const SQ_RESPCODE_SIZE: usize = b"*!1\n".len();
|
||||
|
||||
/// For a dataframe, this returns the dataframe size for array responses.
|
||||
///
|
||||
/// For example,
|
||||
/// ```text
|
||||
/// &<n>\n
|
||||
/// (<tsymbol><size>\n<element>)*
|
||||
/// ```
|
||||
#[allow(dead_code)] // TODO(@ohsayan): Remove this lint
|
||||
pub fn calculate_array_dataframe_size(element_count: usize, per_element_size: usize) -> usize {
|
||||
let mut s = 0;
|
||||
s += 1; // `&`
|
||||
s += element_count.to_string().len(); // `<n>`
|
||||
s += 1; // `\n`
|
||||
let mut subsize = 0;
|
||||
subsize += 1; // `+`
|
||||
subsize += per_element_size.to_string().len(); // `<n>`
|
||||
subsize += 1; // `\n`
|
||||
subsize += per_element_size; // the element size itself
|
||||
subsize += 1; // `\n`
|
||||
s += subsize * element_count;
|
||||
s
|
||||
}
|
||||
|
||||
/// For a dataframe with a typed array, calculate its size
|
||||
///
|
||||
/// **Warning:** Null entries are not yet supported (for a full null array, just pass `1` for the `per_element_size`)
|
||||
#[allow(dead_code)]
|
||||
pub fn calculate_typed_array_dataframe_size(
|
||||
element_count: usize,
|
||||
per_element_size: usize,
|
||||
) -> usize {
|
||||
let mut s = 0usize;
|
||||
s += 2; // `@<tsymbol>`
|
||||
s += element_count.to_string().len(); // `<n>`
|
||||
s += 1; // `\n`
|
||||
|
||||
// now for the payload
|
||||
let mut subsize = 0usize;
|
||||
subsize += per_element_size.to_string().len(); // `<n>`
|
||||
subsize += 1; // `\n`
|
||||
subsize += per_element_size; // the payload itself
|
||||
subsize += 1; // `\n`
|
||||
|
||||
s += subsize * element_count;
|
||||
s
|
||||
}
|
||||
|
||||
/// For a monoelement dataframe, this returns the size:
|
||||
/// ```text
|
||||
/// <tsymbol><size>\n
|
||||
/// <element>
|
||||
/// ```
|
||||
///
|
||||
/// For an `okay` respcode, it will look like this:
|
||||
/// ```text
|
||||
/// !1\n
|
||||
/// 0\n
|
||||
/// ```
|
||||
pub fn calculate_monoelement_dataframe_size(per_element_size: usize) -> usize {
|
||||
let mut s = 0;
|
||||
s += 1; // the tsymbol (always one byte)
|
||||
s += per_element_size.to_string().len(); // the bytes in size string
|
||||
s += 1; // the LF
|
||||
s += per_element_size; // the element itself
|
||||
s
|
||||
}
|
||||
|
||||
/// Returns the metaframe size
|
||||
/// ```text
|
||||
/// *<n>\n
|
||||
/// ```
|
||||
#[allow(dead_code)] // TODO(@ohsayan): Remove this lint
|
||||
pub fn calculate_metaframe_size(queries: usize) -> usize {
|
||||
if queries == 1 {
|
||||
// just `*`
|
||||
1
|
||||
} else {
|
||||
let mut s = 0;
|
||||
s += 1; // `$`
|
||||
s += queries.to_string().len(); // the bytes in size string
|
||||
s += 1; // `\n`
|
||||
s
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_monoelement_calculation() {
|
||||
assert_eq!(calculate_monoelement_dataframe_size(1), 4);
|
||||
}
|
||||
#[test]
|
||||
fn test_simple_query_metaframe_size() {
|
||||
assert_eq!(calculate_metaframe_size(1), 1);
|
||||
}
|
||||
#[test]
|
||||
fn test_typed_array_dataframe_size() {
|
||||
let packet = b"@+3\n3\nhow\n3\nyou\n3\ndng\n";
|
||||
assert_eq!(calculate_typed_array_dataframe_size(3, 3), packet.len());
|
||||
}
|
||||
}
|
@ -0,0 +1,147 @@
|
||||
/*
|
||||
* Created on Mon Aug 08 2022
|
||||
*
|
||||
* This file is a part of Skytable
|
||||
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
|
||||
* NoSQL database written by Sayan Nandan ("the Author") with the
|
||||
* vision to provide flexibility in data modelling without compromising
|
||||
* on performance, queryability or scalability.
|
||||
*
|
||||
* Copyright (c) 2022, Sayan Nandan <ohsayan@outlook.com>
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*
|
||||
*/
|
||||
|
||||
use {
|
||||
crate::error::{BResult, Error},
|
||||
crate::util,
|
||||
clap::ArgMatches,
|
||||
std::{fmt::Display, str::FromStr},
|
||||
};
|
||||
|
||||
static mut OUTPUT_JSON: bool = false;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ServerConfig {
|
||||
/// host
|
||||
host: Box<str>,
|
||||
/// port
|
||||
port: u16,
|
||||
/// connection count for network pool
|
||||
connections: usize,
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
fn try_update<T: FromStr, S: AsRef<str>>(input: Option<S>, target: &mut T) -> BResult<()>
|
||||
where
|
||||
<T as FromStr>::Err: Display,
|
||||
{
|
||||
if let Some(input) = input {
|
||||
let parsed = input
|
||||
.as_ref()
|
||||
.parse::<T>()
|
||||
.map_err(|e| Error::ConfigError(format!("parse error: `{}`", e)))?;
|
||||
*target = parsed;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
impl ServerConfig {
|
||||
const DEFAULT_HOST: &'static str = "127.0.0.1";
|
||||
const DEFAULT_PORT: u16 = 2003;
|
||||
const DEFAULT_CONNECTIONS: usize = 10;
|
||||
/// Init the default server config
|
||||
pub fn new(matches: &ArgMatches) -> BResult<Self> {
|
||||
let mut slf = Self {
|
||||
host: Self::DEFAULT_HOST.into(),
|
||||
port: Self::DEFAULT_PORT,
|
||||
connections: Self::DEFAULT_CONNECTIONS,
|
||||
};
|
||||
slf.try_host(matches.value_of_lossy("host"));
|
||||
slf.try_port(matches.value_of_lossy("port"))?;
|
||||
slf.try_connections(matches.value_of_lossy("connections"))?;
|
||||
Ok(slf)
|
||||
}
|
||||
/// Update the host
|
||||
pub fn try_host<T: AsRef<str>>(&mut self, host: Option<T>) {
|
||||
if let Some(host) = host {
|
||||
self.host = host.as_ref().into();
|
||||
}
|
||||
}
|
||||
/// Attempt to update the port
|
||||
pub fn try_port<T: AsRef<str>>(&mut self, port: Option<T>) -> BResult<()> {
|
||||
try_update(port, &mut self.port)
|
||||
}
|
||||
/// Attempt to update the connections
|
||||
pub fn try_connections<T: AsRef<str>>(&mut self, con: Option<T>) -> BResult<()> {
|
||||
try_update(con, &mut self.connections)
|
||||
}
|
||||
}
|
||||
|
||||
impl ServerConfig {
|
||||
pub fn host(&self) -> &str {
|
||||
self.host.as_ref()
|
||||
}
|
||||
pub fn port(&self) -> u16 {
|
||||
self.port
|
||||
}
|
||||
pub fn connections(&self) -> usize {
|
||||
self.connections
|
||||
}
|
||||
}
|
||||
|
||||
/// Benchmark configuration
|
||||
pub struct BenchmarkConfig<'a> {
|
||||
pub server: &'a ServerConfig,
|
||||
kvsize: usize,
|
||||
queries: usize,
|
||||
runs: usize,
|
||||
}
|
||||
|
||||
impl<'a> BenchmarkConfig<'a> {
|
||||
const DEFAULT_QUERIES: usize = 100_000;
|
||||
const DEFAULT_KVSIZE: usize = 3;
|
||||
const DEFAULT_RUNS: usize = 5;
|
||||
pub fn new(server: &'a ServerConfig, matches: ArgMatches) -> BResult<Self> {
|
||||
let mut slf = Self {
|
||||
server,
|
||||
queries: Self::DEFAULT_QUERIES,
|
||||
kvsize: Self::DEFAULT_KVSIZE,
|
||||
runs: Self::DEFAULT_RUNS,
|
||||
};
|
||||
try_update(matches.value_of_lossy("queries"), &mut slf.queries)?;
|
||||
try_update(matches.value_of_lossy("size"), &mut slf.kvsize)?;
|
||||
try_update(matches.value_of_lossy("runs"), &mut slf.runs)?;
|
||||
util::ensure_main_thread();
|
||||
unsafe {
|
||||
OUTPUT_JSON = matches.is_present("json");
|
||||
}
|
||||
Ok(slf)
|
||||
}
|
||||
pub fn kvsize(&self) -> usize {
|
||||
self.kvsize
|
||||
}
|
||||
pub fn query_count(&self) -> usize {
|
||||
self.queries
|
||||
}
|
||||
pub fn runs(&self) -> usize {
|
||||
self.runs
|
||||
}
|
||||
}
|
||||
|
||||
pub fn should_output_messages() -> bool {
|
||||
util::ensure_main_thread();
|
||||
unsafe { !OUTPUT_JSON }
|
||||
}
|
@ -0,0 +1,63 @@
|
||||
/*
|
||||
* Created on Mon Aug 08 2022
|
||||
*
|
||||
* This file is a part of Skytable
|
||||
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
|
||||
* NoSQL database written by Sayan Nandan ("the Author") with the
|
||||
* vision to provide flexibility in data modelling without compromising
|
||||
* on performance, queryability or scalability.
|
||||
*
|
||||
* Copyright (c) 2022, Sayan Nandan <ohsayan@outlook.com>
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*
|
||||
*/
|
||||
|
||||
use std::collections::TryReserveError;
|
||||
|
||||
use {skytable::error::Error as SkyError, std::fmt::Display};
|
||||
|
||||
pub type BResult<T> = Result<T, Error>;
|
||||
|
||||
/// Benchmark tool errors
|
||||
pub enum Error {
|
||||
/// An error originating from the Skytable client
|
||||
ClientError(SkyError),
|
||||
/// An error originating from the benchmark/server configuration
|
||||
ConfigError(String),
|
||||
/// A runtime error
|
||||
RuntimeError(String),
|
||||
}
|
||||
|
||||
impl From<SkyError> for Error {
|
||||
fn from(e: SkyError) -> Self {
|
||||
Self::ClientError(e)
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for Error {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
Error::ClientError(e) => write!(f, "client error: {}", e),
|
||||
Error::ConfigError(e) => write!(f, "config error: {}", e),
|
||||
Error::RuntimeError(e) => write!(f, "runtime error: {}", e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<TryReserveError> for Error {
|
||||
fn from(e: TryReserveError) -> Self {
|
||||
Error::RuntimeError(format!("memory reserve error: {}", e.to_string()))
|
||||
}
|
||||
}
|
@ -1,148 +0,0 @@
|
||||
/*
|
||||
* Created on Tue Aug 10 2021
|
||||
*
|
||||
* This file is a part of Skytable
|
||||
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
|
||||
* NoSQL database written by Sayan Nandan ("the Author") with the
|
||||
* vision to provide flexibility in data modelling without compromising
|
||||
* on performance, queryability or scalability.
|
||||
*
|
||||
* Copyright (c) 2021, Sayan Nandan <ohsayan@outlook.com>
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*
|
||||
*/
|
||||
|
||||
use {
|
||||
crate::util,
|
||||
core::cmp::Ordering,
|
||||
std::collections::{hash_map::Entry, HashMap},
|
||||
};
|
||||
|
||||
/// A map of reports
|
||||
pub struct AggregatedReport {
|
||||
map: HashMap<&'static str, Report>,
|
||||
queries: usize,
|
||||
cap: usize,
|
||||
}
|
||||
|
||||
impl AggregatedReport {
|
||||
/// Create a new aggregated report instance. Here:
|
||||
/// - `report_count`: Is the count of benches you will be running. For example, if you
|
||||
/// are testing GET and SET, this will be 2
|
||||
/// - `cap`: Is the number of repeats you will be running
|
||||
/// - `queries`: Is the number of queries you will run
|
||||
pub fn new(report_count: usize, cap: usize, queries: usize) -> Self {
|
||||
Self {
|
||||
map: HashMap::with_capacity(report_count),
|
||||
cap,
|
||||
queries,
|
||||
}
|
||||
}
|
||||
/// Insert a new statistic. The `name` should correspond to the bench name (for example GET)
|
||||
/// while the `time` should be the time taken for that bench to complete
|
||||
pub fn insert(&mut self, name: &'static str, time: u128) {
|
||||
match self.map.entry(name) {
|
||||
Entry::Occupied(mut oe) => oe.get_mut().times.push(time),
|
||||
Entry::Vacant(ve) => {
|
||||
let mut rep = Report::with_capacity(self.cap);
|
||||
rep.times.push(time);
|
||||
let _ = ve.insert(rep);
|
||||
}
|
||||
}
|
||||
}
|
||||
/// Returns a vector of sorted statistics (lexicographical) and the length of the longest
|
||||
/// bench name. `(Vec<Stat>, longest_bench_name)`
|
||||
pub fn into_sorted_stat(self) -> (Vec<Stat>, usize) {
|
||||
let Self { map, queries, .. } = self;
|
||||
let mut maxpad = 0usize;
|
||||
let mut repvec: Vec<Stat> = map
|
||||
.into_iter()
|
||||
.map(|(name, report)| {
|
||||
if name.len() > maxpad {
|
||||
maxpad = name.len();
|
||||
}
|
||||
report.into_stat(queries, name)
|
||||
})
|
||||
.collect();
|
||||
repvec.sort();
|
||||
(repvec, maxpad)
|
||||
}
|
||||
/// Returns a minified JSON string
|
||||
pub fn into_json(self) -> String {
|
||||
serde_json::to_string(&self.into_sorted_stat().0).unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
/// A report with a collection of times
|
||||
pub struct Report {
|
||||
times: Vec<u128>,
|
||||
}
|
||||
|
||||
impl Report {
|
||||
/// Returns a new report with space for atleast `cap` number of times
|
||||
pub fn with_capacity(cap: usize) -> Self {
|
||||
Self {
|
||||
times: Vec::with_capacity(cap),
|
||||
}
|
||||
}
|
||||
/// Returns a [`Stat`] with the average time
|
||||
pub fn into_stat(self, reqs: usize, name: &'static str) -> Stat {
|
||||
let count = self.times.len();
|
||||
let avg: u128 = self.times.into_iter().sum();
|
||||
let avg = avg / count as u128;
|
||||
Stat {
|
||||
name,
|
||||
stat: util::calc(reqs, avg),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(serde::Serialize, Debug)]
|
||||
/// A statistic: name of the bench and the result
|
||||
pub struct Stat {
|
||||
name: &'static str,
|
||||
stat: f64,
|
||||
}
|
||||
|
||||
impl Stat {
|
||||
/// Get a reference to the report name
|
||||
pub fn get_report(&self) -> &str {
|
||||
self.name
|
||||
}
|
||||
/// Get the statistic
|
||||
pub fn get_stat(&self) -> f64 {
|
||||
self.stat
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialEq for Stat {
|
||||
fn eq(&self, oth: &Self) -> bool {
|
||||
self.name == oth.name
|
||||
}
|
||||
}
|
||||
|
||||
impl Eq for Stat {}
|
||||
impl PartialOrd for Stat {
|
||||
fn partial_cmp(&self, oth: &Self) -> Option<Ordering> {
|
||||
self.name.partial_cmp(oth.name)
|
||||
}
|
||||
}
|
||||
|
||||
impl Ord for Stat {
|
||||
fn cmp(&self, oth: &Self) -> std::cmp::Ordering {
|
||||
self.name.cmp(oth.name)
|
||||
}
|
||||
}
|
@ -1,81 +0,0 @@
|
||||
/*
|
||||
* Created on Thu Jun 17 2021
|
||||
*
|
||||
* This file is a part of Skytable
|
||||
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
|
||||
* NoSQL database written by Sayan Nandan ("the Author") with the
|
||||
* vision to provide flexibility in data modelling without compromising
|
||||
* on performance, queryability or scalability.
|
||||
*
|
||||
* Copyright (c) 2021, Sayan Nandan <ohsayan@outlook.com>
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*
|
||||
*/
|
||||
|
||||
use {
|
||||
crate::{benchtool::validation::SQ_RESPCODE_SIZE, hoststr, sanity_test},
|
||||
libstress::{utils::generate_random_string_vector, Workpool},
|
||||
rand::thread_rng,
|
||||
skytable::Query,
|
||||
std::{
|
||||
io::{Read, Write},
|
||||
net::{self, TcpStream},
|
||||
},
|
||||
};
|
||||
|
||||
pub fn create_testkeys(host: &str, port: u16, num: usize, connections: usize, size: usize) {
|
||||
if let Err(e) = sanity_test!(host, port) {
|
||||
err!(format!("Sanity test failed with error: {}", e));
|
||||
}
|
||||
|
||||
let host = hoststr!(host, port);
|
||||
let mut rand = thread_rng();
|
||||
let np = Workpool::new(
|
||||
connections,
|
||||
move || TcpStream::connect(host.clone()).unwrap(),
|
||||
|sock, packet: Vec<u8>| {
|
||||
sock.write_all(&packet).unwrap();
|
||||
let mut buf = [0u8; SQ_RESPCODE_SIZE];
|
||||
let _ = sock.read_exact(&mut buf).unwrap();
|
||||
},
|
||||
|socket| {
|
||||
socket.shutdown(net::Shutdown::Both).unwrap();
|
||||
},
|
||||
true,
|
||||
Some(connections),
|
||||
);
|
||||
println!("Generating keys ...");
|
||||
let keys = generate_random_string_vector(num, size, &mut rand, true);
|
||||
let values = generate_random_string_vector(num, size, &mut rand, false);
|
||||
let (keys, values) = match (keys, values) {
|
||||
(Ok(k), Ok(v)) => (k, v),
|
||||
_ => err!("Allocation error"),
|
||||
};
|
||||
{
|
||||
let np = np;
|
||||
(0..num)
|
||||
.map(|idx| {
|
||||
Query::new()
|
||||
.arg("SET")
|
||||
.arg(&keys[idx])
|
||||
.arg(&values[idx])
|
||||
.into_raw_query()
|
||||
})
|
||||
.for_each(|packet| {
|
||||
np.execute(packet);
|
||||
});
|
||||
}
|
||||
println!("Created mock keys successfully");
|
||||
}
|
Loading…
Reference in New Issue