Merge branch 'newbench' into next

next
Sayan Nandan 2 years ago
commit a4daa47c65
No known key found for this signature in database
GPG Key ID: 8BC07A0A4D41DD52

@ -6,24 +6,33 @@ All changes in this project will be noted in this file.
### Additions
- New protocol: Skyhash 2.0
- Reduced bandwidth usage (as much as 50%)
- Even simpler client implementations
- Backward compatibility with Skyhash 1.0:
- Simply set the protocol version you want to use in the config file, env vars or pass it as a CLI
argument
- Even faster implementation, even for Skyhash 1.0
- New query language: BlueQL
- `create keyspace` is now `create space`
- `create table` is now `create model`
- Similary, all `inspect` queries have been changed
- Entities are now of the form `space.model` instead of `ks:tbl`
- `skyd`:
- New protocol: Skyhash 2.0
- Reduced bandwidth usage (as much as 50%)
- Even simpler client implementations
- Backward compatibility with Skyhash 1.0:
- Simply set the protocol version you want to use in the config file, env vars or pass it as a CLI
argument
- Even faster implementation, even for Skyhash 1.0
- New query language: BlueQL
- `create keyspace` is now `create space`
- `create table` is now `create model`
- Similary, all `inspect` queries have been changed
- Entities are now of the form `space.model` instead of `ks:tbl`
## Version 0.7.6
### Fixes
- Fixed erroneous removal of `auth` system table during tree cleanup (see #276)
- `skyd`:
- Fixed erroneous removal of `auth` system table during tree cleanup (see #276)
- `sky-bench`:
- Fixed sample space calculation
### Breaking changes
- The `testkey` subcommand was removed. This was done due to the addition of different data types making this command
redundant since it only works with simple key/value pairs
## Version 0.7.5

12
Cargo.lock generated

@ -1150,18 +1150,18 @@ checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
[[package]]
name = "serde"
version = "1.0.142"
version = "1.0.143"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e590c437916fb6b221e1d00df6e3294f3fccd70ca7e92541c475d6ed6ef5fee2"
checksum = "53e8e5d5b70924f74ff5c6d64d9a5acd91422117c60f48c4e07855238a254553"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
version = "1.0.142"
version = "1.0.143"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34b5b8d809babe02f538c2cfec6f2c1ed10804c0e5a6a041a049a4f5588ccc2e"
checksum = "d3d8e8de557aee63c26b85b947f5e59b690d0454c753f3adeb5cd7835ab88391"
dependencies = [
"proc-macro2",
"quote",
@ -1237,11 +1237,13 @@ version = "0.8.0"
dependencies = [
"clap",
"devtimer",
"env_logger",
"libstress",
"log",
"rand",
"serde",
"serde_json",
"skytable 0.8.0 (git+https://github.com/skytable/client-rust?branch=next)",
"skytable 0.8.0 (git+https://github.com/skytable/client-rust.git)",
]
[[package]]

@ -65,9 +65,29 @@ use {
core::marker::PhantomData,
crossbeam_channel::{bounded, unbounded, Receiver as CReceiver, Sender as CSender},
rayon::prelude::{IntoParallelIterator, ParallelIterator},
std::thread,
std::{fmt::Display, thread},
};
#[derive(Debug)]
pub enum WorkpoolError {
ThreadStartFailure(usize, usize),
}
impl Display for WorkpoolError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
WorkpoolError::ThreadStartFailure(expected, started) => {
write!(
f,
"couldn't start all threads. expected {expected} but started {started}"
)
}
}
}
}
pub type WorkpoolResult<T> = Result<T, WorkpoolError>;
/// A Job. The UIn type parameter is the type that will be used to execute the action
/// Nothing is a variant used by the drop implementation to terminate all the workers
/// and call the exit_loop function
@ -87,10 +107,12 @@ struct Worker {
impl Worker {
/// Initialize a new worker
fn new<Inp: 'static, UIn, Lv, Lp, Ex>(
id: usize,
job_receiver: CReceiver<JobType<UIn>>,
init_pre_loop_var: Lv,
on_exit: Ex,
on_loop: Lp,
wgtx: CSender<()>,
) -> Self
where
UIn: Send + Sync + 'static,
@ -98,20 +120,25 @@ impl Worker {
Lp: Fn(&mut Inp, UIn) + Send + Sync + 'static,
Ex: Fn(&mut Inp) + Send + 'static,
{
let thread = thread::spawn(move || {
let on_loop = on_loop;
let mut pre_loop_var = init_pre_loop_var();
loop {
let action = job_receiver.recv().unwrap();
match action {
JobType::Task(tsk) => on_loop(&mut pre_loop_var, tsk),
JobType::Nothing => {
on_exit(&mut pre_loop_var);
break;
let thread = thread::Builder::new()
.name(format!("worker-{id}"))
.spawn(move || {
let on_loop = on_loop;
let mut pre_loop_var = init_pre_loop_var();
wgtx.send(()).unwrap();
drop(wgtx);
loop {
let action = job_receiver.recv().unwrap();
match action {
JobType::Task(tsk) => on_loop(&mut pre_loop_var, tsk),
JobType::Nothing => {
on_exit(&mut pre_loop_var);
break;
}
}
}
}
});
})
.unwrap();
Self {
thread: Some(thread),
}
@ -120,14 +147,7 @@ impl Worker {
/// A pool configuration setting to easily generate [`Workpool`]s without
/// having to clone an entire pool and its threads upfront
pub struct PoolConfig<Inp, UIn, Lv, Lp, Ex>
where
UIn: Send + Sync + 'static,
Inp: Sync,
Ex: Fn(&mut Inp) + Send + Sync + 'static + Clone,
Lv: Fn() -> Inp + Send + Sync + 'static + Clone,
Lp: Fn(&mut Inp, UIn) + Clone + Send + Sync + 'static,
{
pub struct PoolConfig<Inp, UIn, Lv, Lp, Ex> {
/// the pool size
count: usize,
/// the function that sets the pre-loop variable
@ -172,11 +192,14 @@ where
}
}
/// Get a new [`Workpool`] from the current config
pub fn get_pool(&self) -> Workpool<Inp, UIn, Lv, Lp, Ex> {
pub fn get_pool(&self) -> WorkpoolResult<Workpool<Inp, UIn, Lv, Lp, Ex>> {
self.get_pool_with_workers(self.count)
}
/// Get a [`Workpool`] with the base config but with a different number of workers
pub fn get_pool_with_workers(&self, count: usize) -> Workpool<Inp, UIn, Lv, Lp, Ex> {
pub fn get_pool_with_workers(
&self,
count: usize,
) -> WorkpoolResult<Workpool<Inp, UIn, Lv, Lp, Ex>> {
Workpool::new(
count,
self.init_pre_loop_var.clone(),
@ -187,7 +210,7 @@ where
)
}
/// Get a [`Workpool`] with the base config but with a custom loop-stage closure
pub fn with_loop_closure<Dlp>(&self, lp: Dlp) -> Workpool<Inp, UIn, Lv, Dlp, Ex>
pub fn with_loop_closure<Dlp>(&self, lp: Dlp) -> WorkpoolResult<Workpool<Inp, UIn, Lv, Dlp, Ex>>
where
Dlp: Fn(&mut Inp, UIn) + Clone + Send + Sync + 'static,
{
@ -202,26 +225,6 @@ where
}
}
impl<Inp: 'static, UIn, Lp, Lv, Ex> Clone for Workpool<Inp, UIn, Lv, Lp, Ex>
where
UIn: Send + Sync + 'static,
Inp: Sync,
Ex: Fn(&mut Inp) + Send + Sync + 'static + Clone,
Lv: Fn() -> Inp + Send + Sync + 'static + Clone,
Lp: Fn(&mut Inp, UIn) + Clone + Send + Sync + 'static,
{
fn clone(&self) -> Self {
Workpool::new(
self.workers.len(),
self.init_pre_loop_var.clone(),
self.on_loop.clone(),
self.on_exit.clone(),
self.needs_iterator_pool,
self.expected_max_sends,
)
}
}
/// # Workpool
///
/// A Workpool is a generic synchronous thread pool that can be used to perform, well, anything.
@ -276,49 +279,69 @@ where
on_exit: Ex,
needs_iterator_pool: bool,
expected_max_sends: Option<usize>,
) -> Self {
) -> WorkpoolResult<Self> {
// init threadpool for iterator
if needs_iterator_pool {
// initialize a global threadpool for parallel iterators
let _ = rayon::ThreadPoolBuilder::new()
.num_threads(count)
.build_global();
}
if count == 0 {
panic!("Runtime panic: Bad value `0` for thread count");
}
assert!(count != 0, "Runtime panic: Bad value `0` for thread count");
let (sender, receiver) = match expected_max_sends {
Some(limit) => bounded(limit),
None => unbounded(),
};
let (wgtx, wgrx) = bounded::<()>(count);
let mut workers = Vec::with_capacity(count);
for _ in 0..count {
for i in 0..count {
workers.push(Worker::new(
i,
receiver.clone(),
init_pre_loop_var.clone(),
on_exit.clone(),
on_loop.clone(),
wgtx.clone(),
));
}
Self {
workers,
job_distributor: sender,
init_pre_loop_var,
on_exit,
on_loop,
_marker: PhantomData,
needs_iterator_pool,
expected_max_sends,
drop(wgtx);
let sum: usize = wgrx.iter().map(|_| 1usize).sum();
if sum == count {
Ok(Self {
workers,
job_distributor: sender,
init_pre_loop_var,
on_exit,
on_loop,
_marker: PhantomData,
needs_iterator_pool,
expected_max_sends,
})
} else {
Err(WorkpoolError::ThreadStartFailure(count, sum))
}
}
pub fn clone(&self) -> WorkpoolResult<Self> {
Self::new(
self.workers.len(),
self.init_pre_loop_var.clone(),
self.on_loop.clone(),
self.on_exit.clone(),
self.needs_iterator_pool,
self.expected_max_sends,
)
}
/// Execute something
pub fn execute(&self, inp: UIn) {
self.job_distributor.send(JobType::Task(inp)).unwrap();
self.job_distributor
.send(JobType::Task(inp))
.expect("Worker thread crashed")
}
/// Execute something that can be executed as a parallel iterator
/// For the best performance, it is recommended that you pass true for `needs_iterator_pool`
/// on initialization of the [`Workpool`]
pub fn execute_iter(&self, iter: impl IntoParallelIterator<Item = UIn>) {
iter.into_par_iter().for_each(|inp| self.execute(inp));
iter.into_par_iter().for_each(|inp| self.execute(inp))
}
/// Does the same thing as [`execute_iter`] but drops self ensuring that all the
/// workers actually finish their tasks
@ -334,7 +357,7 @@ where
on_exit: Ex,
needs_iterator_pool: bool,
expected_max_sends: Option<usize>,
) -> Self {
) -> WorkpoolResult<Self> {
// we'll naively use the number of CPUs present on the system times 2 to determine
// the number of workers (sure the scheduler does tricks all the time)
let worker_count = thread::available_parallelism().map_or(1, usize::from) * 2;
@ -402,10 +425,8 @@ pub mod utils {
} else {
let mut keys = Vec::new();
keys.try_reserve_exact(size)?;
(0..count)
.into_iter()
.map(|_| ran_bytes(size, &mut rng))
.for_each(|bytes| keys.push(bytes));
let ran_byte_key = ran_bytes(size, &mut rng);
(0..count).for_each(|_| keys.push(ran_byte_key.clone()));
Ok(keys)
}
}

@ -8,13 +8,16 @@ version = "0.8.0"
[dependencies]
# internal deps
libstress = { path = "../libstress" }
skytable = { git = "https://github.com/skytable/client-rust", branch = "next", features = [
skytable = { git = "https://github.com/skytable/client-rust.git", features = [
"sync",
"dbg",
] }
devtimer = "4.0.1"
libstress = { path = "../libstress" }
# external deps
clap = { version = "2", features = ["yaml"] }
rand = "0.8.5"
serde = { version = "1.0.142", features = ["derive"] }
log = "0.4.17"
env_logger = "0.9.0"
devtimer = "4.0.1"
serde = { version = "1.0.143", features = ["derive"] }
serde_json = "1.0.83"
rand = "0.8.5"

@ -0,0 +1,245 @@
/*
* Created on Sat Aug 13 2022
*
* This file is a part of S{
let ref this = loopmon;
this.current
}le (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2022, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use {
super::{
report::{AggregateReport, SingleReport},
validation, vec_with_cap, BenchmarkConfig, LoopMonitor,
},
crate::error::BResult,
devtimer::SimpleTimer,
libstress::Workpool,
skytable::{types::RawString, Connection, Element, Query, RespCode},
std::{
io::{Read, Write},
net::{Shutdown, TcpStream},
},
};
/// Run a benchmark using the given pre-loop, in-loop and post-loop closures
fn run_bench_custom<Inp, Lp, Lv, Ex>(
bench_config: BenchmarkConfig,
packets: Vec<Box<[u8]>>,
on_init: Lv,
on_loop: Lp,
on_loop_exit: Ex,
loopmon: LoopMonitor,
reports: &mut AggregateReport,
) -> BResult<()>
where
Ex: Clone + Fn(&mut Inp) + Send + Sync + 'static,
Inp: Sync + 'static,
Lp: Clone + Fn(&mut Inp, Box<[u8]>) + Send + Sync + 'static,
Lv: Clone + Fn() -> Inp + Send + 'static + Sync,
{
// now do our runs
let mut loopmon = loopmon;
while loopmon.should_continue() {
// now create our connection pool
let pool = Workpool::new(
bench_config.server.connections(),
on_init.clone(),
on_loop.clone(),
on_loop_exit.clone(),
true,
Some(bench_config.query_count()),
)?;
// get our local copy
let this_packets = packets.clone();
// run and time our operations
let mut dt = SimpleTimer::new();
dt.start();
pool.execute_and_finish_iter(this_packets);
dt.stop();
loopmon.incr_time(&dt);
// cleanup
loopmon.cleanup()?;
loopmon.step();
}
// save time
reports.push(SingleReport::new(
loopmon.name(),
loopmon.sum() as f64 / bench_config.runs() as f64,
));
Ok(())
}
#[inline(always)]
/// Init connection and buffer
fn init_connection_and_buf(
host: &str,
port: u16,
start_command: Vec<u8>,
bufsize: usize,
) -> (TcpStream, Vec<u8>) {
let mut con = TcpStream::connect((host, port)).unwrap();
con.write_all(&start_command).unwrap();
let mut ret = [0u8; validation::RESPCODE_OKAY.len()];
con.read_exact(&mut ret).unwrap();
let readbuf = vec![0; bufsize];
(con, readbuf)
}
/// Benchmark SET
pub fn bench_set(
keys: &[Vec<u8>],
values: &[Vec<u8>],
connection: &mut Connection,
bench_config: &BenchmarkConfig,
create_table: &[u8],
reports: &mut AggregateReport,
) -> BResult<()> {
let bench_config = bench_config.clone();
let create_table = create_table.to_owned();
let loopmon = LoopMonitor::new_cleanup(
bench_config.runs(),
"set",
connection,
Query::from("FLUSHDB").arg("default.tmpbench"),
Element::RespCode(RespCode::Okay),
true,
);
let mut packets = vec_with_cap(bench_config.query_count())?;
(0..bench_config.query_count()).for_each(|i| {
packets.push(
Query::from("SET")
.arg(RawString::from(keys[i].to_owned()))
.arg(RawString::from(values[i].to_owned()))
.into_raw_query()
.into_boxed_slice(),
)
});
run_bench_custom(
bench_config.clone(),
packets,
move || {
init_connection_and_buf(
bench_config.server.host(),
bench_config.server.port(),
create_table.to_owned(),
validation::RESPCODE_OKAY.len(),
)
},
|(con, buf), packet| {
con.write_all(&packet).unwrap();
con.read_exact(buf).unwrap();
assert_eq!(buf, validation::RESPCODE_OKAY);
},
|(con, _)| con.shutdown(Shutdown::Both).unwrap(),
loopmon,
reports,
)
}
/// Benchmark UPDATE
pub fn bench_update(
keys: &[Vec<u8>],
new_value: &[u8],
bench_config: &BenchmarkConfig,
create_table: &[u8],
reports: &mut AggregateReport,
) -> BResult<()> {
let bench_config = bench_config.clone();
let create_table = create_table.to_owned();
let loopmon = LoopMonitor::new(bench_config.runs(), "update");
let mut packets = vec_with_cap(bench_config.query_count())?;
(0..bench_config.query_count()).for_each(|i| {
packets.push(
Query::from("update")
.arg(RawString::from(keys[i].clone()))
.arg(RawString::from(new_value.to_owned()))
.into_raw_query()
.into_boxed_slice(),
)
});
run_bench_custom(
bench_config.clone(),
packets,
move || {
init_connection_and_buf(
bench_config.server.host(),
bench_config.server.port(),
create_table.to_owned(),
validation::RESPCODE_OKAY.len(),
)
},
|(con, buf), packet| {
con.write_all(&packet).unwrap();
con.read_exact(buf).unwrap();
assert_eq!(buf, validation::RESPCODE_OKAY);
},
|(con, _)| con.shutdown(Shutdown::Both).unwrap(),
loopmon,
reports,
)
}
/// Benchmark GET
pub fn bench_get(
keys: &[Vec<u8>],
bench_config: &BenchmarkConfig,
create_table: &[u8],
reports: &mut AggregateReport,
) -> BResult<()> {
let bench_config = bench_config.clone();
let create_table = create_table.to_owned();
let loopmon = LoopMonitor::new(bench_config.runs(), "get");
let mut packets = vec_with_cap(bench_config.query_count())?;
(0..bench_config.query_count()).for_each(|i| {
packets.push(
Query::from("get")
.arg(RawString::from(keys[i].clone()))
.into_raw_query()
.into_boxed_slice(),
)
});
run_bench_custom(
bench_config.clone(),
packets,
move || {
init_connection_and_buf(
bench_config.server.host(),
bench_config.server.port(),
create_table.to_owned(),
validation::calculate_response_size(bench_config.kvsize()),
)
},
|(con, buf), packet| {
con.write_all(&packet).unwrap();
con.read_exact(buf).unwrap();
},
|(con, _)| con.shutdown(Shutdown::Both).unwrap(),
loopmon,
reports,
)
}

@ -0,0 +1,278 @@
/*
* Created on Tue Aug 09 2022
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2022, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use {
self::report::AggregateReport,
crate::{
config,
config::{BenchmarkConfig, ServerConfig},
error::{BResult, Error},
util,
},
clap::ArgMatches,
devtimer::SimpleTimer,
libstress::utils::{generate_random_byte_vector, ran_bytes},
skytable::{Connection, Element, Query, RespCode},
};
mod benches;
mod report;
mod validation;
macro_rules! binfo {
($($arg:tt)+) => {
if $crate::config::should_output_messages() {
::log::info!($($arg)+)
}
};
}
/// The loop monitor can be used for maintaining a loop for a given benchmark
struct LoopMonitor<'a> {
/// cleanup instructions
inner: Option<CleanupInner<'a>>,
/// maximum iterations
max: usize,
/// current iteration
current: usize,
/// total time
time: u128,
/// name of test
name: &'static str,
}
impl<'a> LoopMonitor<'a> {
/// Create a benchmark loop monitor that doesn't need any cleanup
pub fn new(max: usize, name: &'static str) -> Self {
Self {
inner: None,
max,
current: 0,
time: 0,
name,
}
}
/// Create a new benchmark loop monitor that uses the given cleanup instructions:
/// - `max`: Total iterations
/// - `name`: Name of benchmark
/// - `connection`: A connection to use for cleanup instructions
/// - `query`: Query to run for cleanup
/// - `response`: Response expected when cleaned up
/// - `skip_on_last`: Skip running the cleanup instructions on the last loop
pub fn new_cleanup(
max: usize,
name: &'static str,
connection: &'a mut Connection,
query: Query,
response: Element,
skip_on_last: bool,
) -> Self {
Self {
inner: Some(CleanupInner::new(query, response, connection, skip_on_last)),
max,
current: 0,
time: 0,
name: name,
}
}
/// Run cleanup
fn cleanup(&mut self) -> BResult<()> {
let last_iter = self.is_last_iter();
if let Some(ref mut cleanup) = self.inner {
let should_run_cleanup = (!last_iter) || (last_iter && !cleanup.skip_on_last);
if should_run_cleanup {
return cleanup.cleanup(self.name);
}
}
Ok(())
}
/// Check if this is the last iteration
fn is_last_iter(&self) -> bool {
(self.max - 1) == self.current
}
/// Step the counter ahead
fn step(&mut self) {
self.current += 1;
}
/// Determine if we should continue executing
fn should_continue(&self) -> bool {
self.current < self.max
}
/// Append a new time to the sum
fn incr_time(&mut self, dt: &SimpleTimer) {
self.time += dt.time_in_nanos().unwrap();
}
/// Return the sum
fn sum(&self) -> u128 {
self.time
}
/// Return the name of the benchmark
fn name(&self) -> &'static str {
self.name
}
}
/// Cleanup instructions
struct CleanupInner<'a> {
/// the connection to use for cleanup processes
connection: &'a mut Connection,
/// the query to be run
query: Query,
/// the response to expect
response: Element,
/// whether we should skip on the last loop
skip_on_last: bool,
}
impl<'a> CleanupInner<'a> {
/// Init cleanup instructions
fn new(q: Query, r: Element, connection: &'a mut Connection, skip_on_last: bool) -> Self {
Self {
query: q,
response: r,
connection,
skip_on_last,
}
}
/// Run cleanup
fn cleanup(&mut self, name: &'static str) -> BResult<()> {
let r: Element = self.connection.run_query(&self.query)?;
if r.ne(&self.response) {
return Err(Error::RuntimeError(format!(
"Failed to run cleanup for benchmark `{}`",
name
)));
} else {
Ok(())
}
}
}
#[inline(always)]
/// Returns a vec with the given cap, ensuring that we don't overflow memory
fn vec_with_cap<T>(cap: usize) -> BResult<Vec<T>> {
let mut v = Vec::new();
v.try_reserve_exact(cap)?;
Ok(v)
}
/// Run the actual benchmarks
pub fn run_bench(servercfg: &ServerConfig, matches: ArgMatches) -> BResult<()> {
// init bench config
let bench_config = BenchmarkConfig::new(servercfg, matches)?;
// check if we have enough combinations for the given query count and key size
if !util::has_enough_ncr(bench_config.kvsize(), bench_config.query_count()) {
return Err(Error::RuntimeError(
"too low sample space for given query count. use larger kvsize".into(),
));
}
// run sanity test; this will also set up the temporary table for benchmarking
binfo!("Running sanity test ...");
util::run_sanity_test(&bench_config.server)?;
// pool pre-exec setup
let servercfg = servercfg.clone();
let switch_table = Query::from("use default.tmpbench").into_raw_query();
// init pool config; side_connection is for cleanups
let mut misc_connection = Connection::new(servercfg.host(), servercfg.port())?;
// init timer and reports
let mut reports = AggregateReport::new(bench_config.query_count());
// init test data
binfo!("Initializing test data ...");
let mut rng = rand::thread_rng();
let keys = generate_random_byte_vector(
bench_config.query_count(),
bench_config.kvsize(),
&mut rng,
true,
)?;
let values = generate_random_byte_vector(
bench_config.query_count(),
bench_config.kvsize(),
&mut rng,
false,
)?;
let new_updated_key = ran_bytes(bench_config.kvsize(), &mut rng);
// run tests; the idea here is to run all tests one-by-one instead of generating all packets at once
// such an approach helps us keep memory usage low
// bench set
binfo!("Benchmarking SET ...");
benches::bench_set(
&keys,
&values,
&mut misc_connection,
&bench_config,
&switch_table,
&mut reports,
)?;
// bench update
binfo!("Benchmarking UPDATE ...");
benches::bench_update(
&keys,
&new_updated_key,
&bench_config,
&switch_table,
&mut reports,
)?;
// bench get
binfo!("Benchmarking GET ...");
benches::bench_get(&keys, &bench_config, &switch_table, &mut reports)?;
// remove all test data
binfo!("Finished benchmarks. Cleaning up ...");
let r: Element = misc_connection.run_query(Query::from("drop model default.tmpbench force"))?;
if r != Element::RespCode(RespCode::Okay) {
return Err(Error::RuntimeError(
"failed to clean up after benchmarks".into(),
));
}
if config::should_output_messages() {
// normal output
println!("===========RESULTS===========");
let (maxpad, reports) = reports.finish();
for report in reports {
let padding = " ".repeat(maxpad - report.name().len());
println!(
"{}{} {:.6}/sec",
report.name().to_uppercase(),
padding,
report.stat(),
);
}
println!("=============================");
} else {
// JSON
println!("{}", reports.into_json())
}
Ok(())
}

@ -0,0 +1,82 @@
/*
* Created on Wed Aug 10 2022
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2022, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use serde::Serialize;
#[derive(Serialize)]
pub struct SingleReport {
name: &'static str,
stat: f64,
}
impl SingleReport {
pub fn new(name: &'static str, stat: f64) -> Self {
Self { name, stat }
}
pub fn stat(&self) -> f64 {
self.stat
}
pub fn name(&self) -> &str {
self.name
}
}
pub struct AggregateReport {
names: Vec<SingleReport>,
query_count: usize,
}
impl AggregateReport {
pub fn new(query_count: usize) -> Self {
Self {
names: Vec::new(),
query_count,
}
}
pub fn push(&mut self, report: SingleReport) {
self.names.push(report)
}
pub(crate) fn into_json(self) -> String {
let (_, report) = self.finish();
serde_json::to_string(&report).unwrap()
}
pub(crate) fn finish(self) -> (usize, Vec<SingleReport>) {
let mut maxpad = self.names[0].name.len();
let mut reps = self.names;
reps.iter_mut().for_each(|rep| {
let total_time = rep.stat;
let qps = (self.query_count as f64 / total_time) * 1_000_000_000_f64;
rep.stat = qps;
if rep.name.len() > maxpad {
maxpad = rep.name.len();
}
});
(maxpad, reps)
}
}

@ -0,0 +1,39 @@
/*
* Created on Tue Aug 09 2022
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2022, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
pub const RESPCODE_OKAY: &[u8] = b"*!0\n";
pub fn calculate_response_size(keylen: usize) -> usize {
/*
*+5\n
hello
*/
let mut size = 2; // simple query byte + tsymbol
size += keylen.to_string().len(); // bytes in length
size += 1; // LF
size += keylen; // payload
size
}

@ -1,242 +0,0 @@
/*
* Created on Thu Jun 17 2021
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2021, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use {
self::validation::SQ_RESPCODE_SIZE,
crate::{report::AggregatedReport, util},
devtimer::DevTime,
libstress::{utils::generate_random_byte_vector, PoolConfig},
rand::thread_rng,
skytable::{types::RawString, Query},
std::{
io::{Read, Write},
net::TcpStream,
},
};
pub mod validation;
const NOTICE_INIT_BENCH: &str = "Finished sanity test. Initializing benchmark ...";
const NOTICE_INIT_COMPLETE: &str = "Initialization complete! Benchmark started";
const CONFIG_TABLE_MODEL: &str = "keymap(binstr,binstr)";
const CONFIG_TABLE_VOLATILITY: &str = "volatile";
/// Run the benchmark tool
pub fn runner(
host: String,
port: u16,
max_connections: usize,
max_queries: usize,
per_kv_size: usize,
json_out: bool,
runs: usize,
) {
if !util::enough_ncr(per_kv_size, max_queries) {
err!("Too low sample space for given k/v size and query count. Try a higher k/v size.");
}
if !json_out {
println!("Running sanity test ...");
}
if let Err(e) = sanity_test!(host, port) {
err!(format!("Sanity test failed with error: {}", e));
}
if !json_out {
println!("{}", NOTICE_INIT_BENCH);
println!("Connections: {}", max_connections);
println!("Queries: {}", max_queries);
println!("Data size (key+value): {} bytes", (per_kv_size * 2));
}
let host = hoststr!(host, port);
let mut rand = thread_rng();
let temp_table = init_temp_table(&mut rand, &host);
let switch_table = Query::from("use")
.arg(format!("default:{}", &temp_table))
.into_raw_query();
let pool_config = PoolConfig::new(
max_connections,
move || {
let mut stream = TcpStream::connect(&host).unwrap();
stream.write_all(&switch_table.clone()).unwrap();
let mut v = vec![0; SQ_RESPCODE_SIZE];
let _ = stream.read_exact(&mut v).unwrap();
stream
},
move |sock, packet: Vec<u8>| {
sock.write_all(&packet).unwrap();
// all `okay`s are returned (for both update and set)
let mut v = vec![0; SQ_RESPCODE_SIZE];
let _ = sock.read_exact(&mut v).unwrap();
},
|socket| {
socket.shutdown(std::net::Shutdown::Both).unwrap();
},
true,
Some(max_queries),
);
// Create separate connection pools for get and set operations
let keys = generate_random_byte_vector(max_queries, per_kv_size, &mut rand, true);
let values = generate_random_byte_vector(max_queries, per_kv_size, &mut rand, false);
let (keys, values) = match (keys, values) {
(Ok(k), Ok(v)) => (k, v),
_ => err!("Allocation error"),
};
/*
We create three vectors of vectors: `set_packs`, `get_packs` and `del_packs`
The bytes in each of `set_packs` has a query packet for setting data;
The bytes in each of `get_packs` has a query packet for getting a key set by one of `set_packs`
since we use the same key/value pairs for all;
The bytes in each of `del_packs` has a query packet for deleting a key created by
one of `set_packs`
*/
let set_packs: Vec<Vec<u8>> = (0..max_queries)
.map(|idx| {
let mut q = Query::from("SET");
q.push(RawString::from(keys[idx].clone()));
q.push(RawString::from(values[idx].clone()));
q.into_raw_query()
})
.collect();
let get_packs: Vec<Vec<u8>> = (0..max_queries)
.map(|idx| {
let mut q = Query::from("GET");
q.push(RawString::from(keys[idx].clone()));
q.into_raw_query()
})
.collect();
// just update key -> value to key -> key to avoid unnecessary memory usage
let update_packs: Vec<Vec<u8>> = (0..max_queries)
.map(|idx| {
let mut q = Query::from("UPDATE");
q.push(RawString::from(keys[idx].clone()));
q.push(RawString::from(keys[idx].clone()));
q.into_raw_query()
})
.collect();
if !json_out {
println!("Per-packet size (GET): {} bytes", get_packs[0].len());
println!("Per-packet size (SET): {} bytes", set_packs[0].len());
println!("Per-packet size (UPDATE): {} bytes", update_packs[0].len());
println!("{}", NOTICE_INIT_COMPLETE);
}
let mut report = AggregatedReport::new(3, runs, max_queries);
for i in 1..runs + 1 {
let mut dt = DevTime::new_complex();
// clone in the keys
let set_packs = set_packs.clone();
let get_packs = get_packs.clone();
let update_packs = update_packs.clone();
// bench SET
let setpool = pool_config.get_pool();
dt.create_timer("SET").unwrap();
dt.start_timer("SET").unwrap();
setpool.execute_and_finish_iter(set_packs);
dt.stop_timer("SET").unwrap();
let get_response_packet_size =
validation::calculate_monoelement_dataframe_size(per_kv_size)
+ validation::calculate_metaframe_size(1);
let getpool =
pool_config.with_loop_closure(move |sock: &mut TcpStream, packet: Vec<u8>| {
sock.write_all(&packet).unwrap();
// read exact for the key size
let mut v = vec![0; get_response_packet_size];
let _ = sock.read_exact(&mut v).unwrap();
});
dt.create_timer("GET").unwrap();
dt.start_timer("GET").unwrap();
getpool.execute_and_finish_iter(get_packs);
dt.stop_timer("GET").unwrap();
// bench UPDATE
let update_pool = pool_config.get_pool();
dt.create_timer("UPDATE").unwrap();
dt.start_timer("UPDATE").unwrap();
update_pool.execute_and_finish_iter(update_packs);
dt.stop_timer("UPDATE").unwrap();
if !json_out {
println!("Finished run: {}", i);
}
// drop table
let flushdb = Query::new()
.arg("FLUSHDB")
.arg(format!("default:{}", &temp_table))
.into_raw_query();
let drop_pool = pool_config.get_pool_with_workers(1);
drop_pool.execute(flushdb);
drop(drop_pool);
dt.iter()
.for_each(|(name, timer)| report.insert(name, timer.time_in_nanos().unwrap()));
}
print_results(json_out, report);
}
fn init_temp_table(rand: &mut impl rand::Rng, host: &str) -> String {
let temp_table = libstress::utils::rand_alphastring(10, rand);
let create_table = Query::from("create")
.arg("table")
.arg(&temp_table)
.arg(CONFIG_TABLE_MODEL)
.arg(CONFIG_TABLE_VOLATILITY)
.into_raw_query();
let mut create_table_connection = TcpStream::connect(host).unwrap();
// create table
create_table_connection.write_all(&create_table).unwrap();
let mut v = [0u8; SQ_RESPCODE_SIZE];
let _ = create_table_connection.read_exact(&mut v).unwrap();
temp_table
}
fn print_results(flag_json: bool, report: AggregatedReport) {
if flag_json {
let serialized = report.into_json();
println!("{}", serialized);
} else {
println!("===========RESULTS===========");
let (report, maxpad) = report.into_sorted_stat();
let pad = |clen: usize| " ".repeat(maxpad - clen);
report.into_iter().for_each(|block| {
println!(
"{}{} {:.6}/sec",
block.get_report(),
pad(block.get_report().len()),
block.get_stat()
);
});
println!("=============================");
}
}

@ -1,131 +0,0 @@
/*
* Created on Fri Nov 26 2021
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2021, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
pub const SQ_RESPCODE_SIZE: usize = b"*!1\n".len();
/// For a dataframe, this returns the dataframe size for array responses.
///
/// For example,
/// ```text
/// &<n>\n
/// (<tsymbol><size>\n<element>)*
/// ```
#[allow(dead_code)] // TODO(@ohsayan): Remove this lint
pub fn calculate_array_dataframe_size(element_count: usize, per_element_size: usize) -> usize {
let mut s = 0;
s += 1; // `&`
s += element_count.to_string().len(); // `<n>`
s += 1; // `\n`
let mut subsize = 0;
subsize += 1; // `+`
subsize += per_element_size.to_string().len(); // `<n>`
subsize += 1; // `\n`
subsize += per_element_size; // the element size itself
subsize += 1; // `\n`
s += subsize * element_count;
s
}
/// For a dataframe with a typed array, calculate its size
///
/// **Warning:** Null entries are not yet supported (for a full null array, just pass `1` for the `per_element_size`)
#[allow(dead_code)]
pub fn calculate_typed_array_dataframe_size(
element_count: usize,
per_element_size: usize,
) -> usize {
let mut s = 0usize;
s += 2; // `@<tsymbol>`
s += element_count.to_string().len(); // `<n>`
s += 1; // `\n`
// now for the payload
let mut subsize = 0usize;
subsize += per_element_size.to_string().len(); // `<n>`
subsize += 1; // `\n`
subsize += per_element_size; // the payload itself
subsize += 1; // `\n`
s += subsize * element_count;
s
}
/// For a monoelement dataframe, this returns the size:
/// ```text
/// <tsymbol><size>\n
/// <element>
/// ```
///
/// For an `okay` respcode, it will look like this:
/// ```text
/// !1\n
/// 0\n
/// ```
pub fn calculate_monoelement_dataframe_size(per_element_size: usize) -> usize {
let mut s = 0;
s += 1; // the tsymbol (always one byte)
s += per_element_size.to_string().len(); // the bytes in size string
s += 1; // the LF
s += per_element_size; // the element itself
s
}
/// Returns the metaframe size
/// ```text
/// *<n>\n
/// ```
#[allow(dead_code)] // TODO(@ohsayan): Remove this lint
pub fn calculate_metaframe_size(queries: usize) -> usize {
if queries == 1 {
// just `*`
1
} else {
let mut s = 0;
s += 1; // `$`
s += queries.to_string().len(); // the bytes in size string
s += 1; // `\n`
s
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_monoelement_calculation() {
assert_eq!(calculate_monoelement_dataframe_size(1), 4);
}
#[test]
fn test_simple_query_metaframe_size() {
assert_eq!(calculate_metaframe_size(1), 1);
}
#[test]
fn test_typed_array_dataframe_size() {
let packet = b"@+3\n3\nhow\n3\nyou\n3\ndng\n";
assert_eq!(calculate_typed_array_dataframe_size(3, 3), packet.len());
}
}

@ -70,14 +70,3 @@ args:
value_name: runs
takes_value: true
help: Sets the number of times the entire test should be run
subcommands:
- testkey:
about: This can be used to create 'mock' keys
args:
- count:
short: c
required: true
long: count
value_name: NUMBEROFKEYS
help: Sets the number of keys to create
takes_value: true

@ -0,0 +1,148 @@
/*
* Created on Mon Aug 08 2022
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2022, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use {
crate::error::{BResult, Error},
crate::util,
clap::ArgMatches,
std::{fmt::Display, str::FromStr},
};
static mut OUTPUT_JSON: bool = false;
#[derive(Clone)]
pub struct ServerConfig {
/// host
host: Box<str>,
/// port
port: u16,
/// connection count for network pool
connections: usize,
}
#[inline(always)]
fn try_update<T: FromStr, S: AsRef<str>>(input: Option<S>, target: &mut T) -> BResult<()>
where
<T as FromStr>::Err: Display,
{
if let Some(input) = input {
let parsed = input
.as_ref()
.parse::<T>()
.map_err(|e| Error::ConfigError(format!("parse error: `{}`", e)))?;
*target = parsed;
}
Ok(())
}
impl ServerConfig {
const DEFAULT_HOST: &'static str = "127.0.0.1";
const DEFAULT_PORT: u16 = 2003;
const DEFAULT_CONNECTIONS: usize = 10;
/// Init the default server config
pub fn new(matches: &ArgMatches) -> BResult<Self> {
let mut slf = Self {
host: Self::DEFAULT_HOST.into(),
port: Self::DEFAULT_PORT,
connections: Self::DEFAULT_CONNECTIONS,
};
slf.try_host(matches.value_of_lossy("host"));
slf.try_port(matches.value_of_lossy("port"))?;
slf.try_connections(matches.value_of_lossy("connections"))?;
Ok(slf)
}
/// Update the host
pub fn try_host<T: AsRef<str>>(&mut self, host: Option<T>) {
if let Some(host) = host {
self.host = host.as_ref().into();
}
}
/// Attempt to update the port
pub fn try_port<T: AsRef<str>>(&mut self, port: Option<T>) -> BResult<()> {
try_update(port, &mut self.port)
}
/// Attempt to update the connections
pub fn try_connections<T: AsRef<str>>(&mut self, con: Option<T>) -> BResult<()> {
try_update(con, &mut self.connections)
}
}
impl ServerConfig {
pub fn host(&self) -> &str {
self.host.as_ref()
}
pub fn port(&self) -> u16 {
self.port
}
pub fn connections(&self) -> usize {
self.connections
}
}
/// Benchmark configuration
#[derive(Clone)]
pub struct BenchmarkConfig {
pub server: ServerConfig,
kvsize: usize,
queries: usize,
runs: usize,
}
impl BenchmarkConfig {
const DEFAULT_QUERIES: usize = 100_000;
const DEFAULT_KVSIZE: usize = 3;
const DEFAULT_RUNS: usize = 5;
pub fn new(server: &ServerConfig, matches: ArgMatches) -> BResult<Self> {
let mut slf = Self {
server: server.clone(),
queries: Self::DEFAULT_QUERIES,
kvsize: Self::DEFAULT_KVSIZE,
runs: Self::DEFAULT_RUNS,
};
try_update(matches.value_of_lossy("queries"), &mut slf.queries)?;
try_update(matches.value_of_lossy("size"), &mut slf.kvsize)?;
try_update(matches.value_of_lossy("runs"), &mut slf.runs)?;
util::ensure_main_thread();
unsafe {
OUTPUT_JSON = matches.is_present("json");
}
Ok(slf)
}
pub fn kvsize(&self) -> usize {
self.kvsize
}
pub fn query_count(&self) -> usize {
self.queries
}
pub fn runs(&self) -> usize {
self.runs
}
}
pub fn should_output_messages() -> bool {
util::ensure_main_thread();
unsafe { !OUTPUT_JSON }
}

@ -0,0 +1,71 @@
/*
* Created on Mon Aug 08 2022
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2022, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use {
libstress::WorkpoolError,
skytable::error::Error as SkyError,
std::{collections::TryReserveError, fmt::Display},
};
pub type BResult<T> = Result<T, Error>;
/// Benchmark tool errors
pub enum Error {
/// An error originating from the Skytable client
ClientError(SkyError),
/// An error originating from the benchmark/server configuration
ConfigError(String),
/// A runtime error
RuntimeError(String),
}
impl From<SkyError> for Error {
fn from(e: SkyError) -> Self {
Self::ClientError(e)
}
}
impl Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Error::ClientError(e) => write!(f, "client error: {}", e),
Error::ConfigError(e) => write!(f, "config error: {}", e),
Error::RuntimeError(e) => write!(f, "runtime error: {}", e),
}
}
}
impl From<TryReserveError> for Error {
fn from(e: TryReserveError) -> Self {
Error::RuntimeError(format!("memory reserve error: {}", e.to_string()))
}
}
impl From<WorkpoolError> for Error {
fn from(e: WorkpoolError) -> Self {
Error::RuntimeError(format!("threadpool error: {}", e))
}
}

@ -1,5 +1,5 @@
/*
* Created on Sun Sep 13 2020
* Created on Mon Aug 08 2022
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
@ -7,7 +7,7 @@
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2020, Sayan Nandan <ohsayan@outlook.com>
* Copyright (c) 2022, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
@ -24,86 +24,40 @@
*
*/
#![deny(unused_crate_dependencies)]
#![deny(unused_imports)]
//! A generic module for benchmarking SET/GET operations
//! **NOTE:** This is experimental and may show incorrect results - that is,
//! the response times may be shown to be slower than they actually are
#[macro_use]
mod util;
mod benchtool;
mod report;
mod testkey;
use {
crate::util::{DEFAULT_PACKET_SIZE, DEFAULT_QUERY_COUNT, DEFAULT_REPEAT, DEFAULT_WORKER_COUNT},
clap::{load_yaml, App},
core::hint::unreachable_unchecked,
config::ServerConfig,
env_logger::Builder,
std::{env, process},
};
#[macro_use]
extern crate log;
mod bench;
mod config;
mod error;
mod util;
fn main() {
let cfg_layout = load_yaml!("./cli.yml");
let matches = App::from_yaml(cfg_layout).get_matches();
let host = match matches.value_of("host") {
Some(h) => h.to_owned(),
None => "127.0.0.1".to_owned(),
};
let port = match matches.value_of("port") {
Some(p) => match p.parse::<u16>() {
Ok(p) => p,
Err(_) => err!("Invalid Port"),
},
None => 2003,
};
let json_out = matches.is_present("json");
let max_connections = match matches.value_of("connections").map(|v| v.parse::<u16>()) {
Some(Ok(con)) => con as _,
None => DEFAULT_WORKER_COUNT,
_ => err!("Bad value for maximum connections"),
};
let max_queries = match matches.value_of("queries").map(|v| v.parse::<usize>()) {
Some(Ok(qr)) => qr,
None => DEFAULT_QUERY_COUNT,
_ => err!("Bad value for max queries"),
};
let packet_size = match matches.value_of("size").map(|v| v.parse::<usize>()) {
Some(Ok(size)) => size,
None => DEFAULT_PACKET_SIZE,
_ => err!("Bad value for key/value size"),
};
let runs: usize = match matches.value_of("runs").map(|v| v.parse()) {
Some(Ok(r)) => r,
Some(Err(_)) => err!("Bad value for runs"),
None => DEFAULT_REPEAT,
};
if packet_size == 0 || max_queries == 0 || max_connections == 0 {
err!("All inputs must be non-zero values");
}
if let Some(cmd) = matches.subcommand_matches("testkey") {
let count = match cmd.value_of_lossy("count") {
Some(cnt) => match cnt.to_string().parse::<usize>() {
Ok(cnt) => cnt,
Err(_) => err!("Bad value for testkey count"),
},
None => unsafe {
// UNSAFE(@ohsayan): This is completely safe because clap takes care that
// the count argument is supplied
unreachable_unchecked();
},
};
println!("warning: Ignoring any other invalid flags/options (if they were supplied)");
testkey::create_testkeys(&host, port, count, max_connections, packet_size);
} else {
benchtool::runner(
host,
port,
max_connections,
max_queries,
packet_size,
json_out,
runs,
);
Builder::new()
.parse_filters(&env::var("SKYBENCH_LOG").unwrap_or_else(|_| "info".to_owned()))
.init();
if let Err(e) = run() {
error!("sky-bench exited with error: {}", e);
process::exit(0x01);
}
}
fn run() -> error::BResult<()> {
// init CLI arg parser
let cli_args = load_yaml!("cli.yml");
let cli = App::from_yaml(cli_args);
let matches = cli.get_matches();
// parse args
let cfg = ServerConfig::new(&matches)?;
// run our task
bench::run_bench(&cfg, matches)?;
util::cleanup(&cfg)
}

@ -1,148 +0,0 @@
/*
* Created on Tue Aug 10 2021
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2021, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use {
crate::util,
core::cmp::Ordering,
std::collections::{hash_map::Entry, HashMap},
};
/// A map of reports
pub struct AggregatedReport {
map: HashMap<&'static str, Report>,
queries: usize,
cap: usize,
}
impl AggregatedReport {
/// Create a new aggregated report instance. Here:
/// - `report_count`: Is the count of benches you will be running. For example, if you
/// are testing GET and SET, this will be 2
/// - `cap`: Is the number of repeats you will be running
/// - `queries`: Is the number of queries you will run
pub fn new(report_count: usize, cap: usize, queries: usize) -> Self {
Self {
map: HashMap::with_capacity(report_count),
cap,
queries,
}
}
/// Insert a new statistic. The `name` should correspond to the bench name (for example GET)
/// while the `time` should be the time taken for that bench to complete
pub fn insert(&mut self, name: &'static str, time: u128) {
match self.map.entry(name) {
Entry::Occupied(mut oe) => oe.get_mut().times.push(time),
Entry::Vacant(ve) => {
let mut rep = Report::with_capacity(self.cap);
rep.times.push(time);
let _ = ve.insert(rep);
}
}
}
/// Returns a vector of sorted statistics (lexicographical) and the length of the longest
/// bench name. `(Vec<Stat>, longest_bench_name)`
pub fn into_sorted_stat(self) -> (Vec<Stat>, usize) {
let Self { map, queries, .. } = self;
let mut maxpad = 0usize;
let mut repvec: Vec<Stat> = map
.into_iter()
.map(|(name, report)| {
if name.len() > maxpad {
maxpad = name.len();
}
report.into_stat(queries, name)
})
.collect();
repvec.sort();
(repvec, maxpad)
}
/// Returns a minified JSON string
pub fn into_json(self) -> String {
serde_json::to_string(&self.into_sorted_stat().0).unwrap()
}
}
#[derive(Debug)]
/// A report with a collection of times
pub struct Report {
times: Vec<u128>,
}
impl Report {
/// Returns a new report with space for atleast `cap` number of times
pub fn with_capacity(cap: usize) -> Self {
Self {
times: Vec::with_capacity(cap),
}
}
/// Returns a [`Stat`] with the average time
pub fn into_stat(self, reqs: usize, name: &'static str) -> Stat {
let count = self.times.len();
let avg: u128 = self.times.into_iter().sum();
let avg = avg / count as u128;
Stat {
name,
stat: util::calc(reqs, avg),
}
}
}
#[derive(serde::Serialize, Debug)]
/// A statistic: name of the bench and the result
pub struct Stat {
name: &'static str,
stat: f64,
}
impl Stat {
/// Get a reference to the report name
pub fn get_report(&self) -> &str {
self.name
}
/// Get the statistic
pub fn get_stat(&self) -> f64 {
self.stat
}
}
impl PartialEq for Stat {
fn eq(&self, oth: &Self) -> bool {
self.name == oth.name
}
}
impl Eq for Stat {}
impl PartialOrd for Stat {
fn partial_cmp(&self, oth: &Self) -> Option<Ordering> {
self.name.partial_cmp(oth.name)
}
}
impl Ord for Stat {
fn cmp(&self, oth: &Self) -> std::cmp::Ordering {
self.name.cmp(oth.name)
}
}

@ -1,81 +0,0 @@
/*
* Created on Thu Jun 17 2021
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2021, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use {
crate::{benchtool::validation::SQ_RESPCODE_SIZE, hoststr, sanity_test},
libstress::{utils::generate_random_string_vector, Workpool},
rand::thread_rng,
skytable::Query,
std::{
io::{Read, Write},
net::{self, TcpStream},
},
};
pub fn create_testkeys(host: &str, port: u16, num: usize, connections: usize, size: usize) {
if let Err(e) = sanity_test!(host, port) {
err!(format!("Sanity test failed with error: {}", e));
}
let host = hoststr!(host, port);
let mut rand = thread_rng();
let np = Workpool::new(
connections,
move || TcpStream::connect(host.clone()).unwrap(),
|sock, packet: Vec<u8>| {
sock.write_all(&packet).unwrap();
let mut buf = [0u8; SQ_RESPCODE_SIZE];
let _ = sock.read_exact(&mut buf).unwrap();
},
|socket| {
socket.shutdown(net::Shutdown::Both).unwrap();
},
true,
Some(connections),
);
println!("Generating keys ...");
let keys = generate_random_string_vector(num, size, &mut rand, true);
let values = generate_random_string_vector(num, size, &mut rand, false);
let (keys, values) = match (keys, values) {
(Ok(k), Ok(v)) => (k, v),
_ => err!("Allocation error"),
};
{
let np = np;
(0..num)
.map(|idx| {
Query::new()
.arg("SET")
.arg(&keys[idx])
.arg(&values[idx])
.into_raw_query()
})
.for_each(|packet| {
np.execute(packet);
});
}
println!("Created mock keys successfully");
}

@ -1,5 +1,5 @@
/*
* Created on Thu Jun 17 2021
* Created on Tue Aug 09 2022
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
@ -7,7 +7,7 @@
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2021, Sayan Nandan <ohsayan@outlook.com>
* Copyright (c) 2022, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
@ -24,103 +24,14 @@
*
*/
use {libstress::utils::ran_string, rand::thread_rng, std::error::Error};
pub const DEFAULT_WORKER_COUNT: usize = 10;
pub const DEFAULT_PACKET_SIZE: usize = 4;
pub const DEFAULT_QUERY_COUNT: usize = 100_000;
pub const DEFAULT_REPEAT: usize = 5;
#[macro_export]
macro_rules! hoststr {
($host:expr, $port:expr) => {{
let mut hst: String = $host.to_string();
hst.push(':');
hst.push_str(&$port.to_string());
hst
}};
}
#[macro_export]
macro_rules! sanity_test {
($host:expr, $port:expr) => {{
// Run a sanity test
if let Err(e) = $crate::util::run_sanity_test(&$host, $port) {
Err(e)
} else {
Ok(())
}
}};
}
#[macro_export]
macro_rules! err {
($note:expr) => {{
eprintln!("ERROR: {}", $note);
std::process::exit(0x01);
}};
}
/// Returns the number of queries/sec
pub fn calc(reqs: usize, time: u128) -> f64 {
reqs as f64 / (time as f64 / 1_000_000_000_f64)
}
/// # Sanity Test
///
/// This function performs a 'sanity test' to determine if the benchmarks should be run; this test ensures
/// that the server is functioning as expected and we'll run the benchmarks assuming that the server will
/// act similarly in the future. This test currently runs a HEYA, SET, GET and DEL test, the latter three of which
/// are the ones that are benchmarked
///
/// ## Limitations
/// A 65535 character long key/value pair is created and fetched. This random string has extremely low
/// chances of colliding with any existing key
pub fn run_sanity_test(host: &str, port: u16) -> Result<(), Box<dyn Error>> {
use skytable::{Connection, Element, Query, RespCode};
let mut rng = thread_rng();
let mut connection = Connection::new(host, port)?;
// test heya
let mut query = Query::new();
query.push("heya");
if !connection
.run_query_raw(&query)?
.eq(&Element::String("HEY!".to_owned()))
{
return Err("HEYA test failed".into());
}
let key = ran_string(65536, &mut rng);
let value = ran_string(65536, &mut rng);
let mut query = Query::new();
query.push("set");
query.push(&key);
query.push(&value);
if !connection
.run_query_raw(&query)?
.eq(&Element::RespCode(RespCode::Okay))
{
return Err("SET test failed".into());
}
let mut query = Query::new();
query.push("get");
query.push(&key);
if !connection
.run_query_raw(&query)?
.eq(&Element::Binstr(value.as_bytes().to_owned()))
{
return Err("GET test failed".into());
}
let mut query = Query::new();
query.push("del");
query.push(&key);
if !connection
.run_query_raw(&query)?
.eq(&Element::UnsignedInt(1))
{
return Err("DEL test failed".into());
}
Ok(())
}
use {
crate::{
config::ServerConfig,
error::{BResult, Error},
},
skytable::{Connection, Element, Query, RespCode},
std::thread,
};
/// Check if the provided keysize has enough combinations to support the given `queries` count
///
@ -132,7 +43,7 @@ pub fn run_sanity_test(host: &str, port: u16) -> Result<(), Box<dyn Error>> {
/// - For 32-bit address spaces: `(256!)/r!(256-r!)`; for a value of r >= 5, we'll hit the maximum
/// of the address space and hence this will always return true (because of the size of `usize`)
/// > The value for r = 5 is `8.81e+9` which largely exceeds `4.3e+9`
pub const fn enough_ncr(keysize: usize, queries: usize) -> bool {
pub const fn has_enough_ncr(keysize: usize, queries: usize) -> bool {
const LUT: [u64; 11] = [
// 1B
256,
@ -163,3 +74,70 @@ pub const fn enough_ncr(keysize: usize, queries: usize) -> bool {
const ALWAYS_TRUE_FACTOR: usize = 5;
keysize >= ALWAYS_TRUE_FACTOR || (LUT[keysize - 1] >= queries as _)
}
/// Run a sanity test, making sure that the server is ready for benchmarking. This function will do the
/// following tests:
/// - Connect to the instance
/// - Run a `heya` as a preliminary test
/// - Create a new table `tmpbench`. This is where we're supposed to run all the benchmarks.
/// - Switch to the new table
/// - Set a key, and get it checking the equality of the returned value
pub fn run_sanity_test(server_config: &ServerConfig) -> BResult<()> {
let mut con = Connection::new(server_config.host(), server_config.port())?;
let tests: [(Query, Element, &str); 5] = [
(
Query::from("HEYA"),
Element::String("HEY!".to_owned()),
"heya",
),
(
Query::from("CREATE MODEL default.tmpbench(binary, binary)"),
Element::RespCode(RespCode::Okay),
"create model",
),
(
Query::from("use default.tmpbench"),
Element::RespCode(RespCode::Okay),
"use",
),
(
Query::from("set").arg("x").arg("100"),
Element::RespCode(RespCode::Okay),
"set",
),
(
Query::from("get").arg("x"),
Element::Binstr("100".as_bytes().to_owned()),
"get",
),
];
for (query, expected, test_kind) in tests {
let r: Element = con.run_query(query)?;
if r != expected {
return Err(Error::RuntimeError(format!(
"sanity test for `{test_kind}` failed"
)));
}
}
Ok(())
}
/// Ensures that the current thread is the main thread. If not, this function will panic
pub fn ensure_main_thread() {
assert_eq!(
thread::current().name().unwrap(),
"main",
"unsafe function called from non-main thread"
)
}
/// Run a cleanup. This function attempts to remove the `default.tmpbench` entity
pub fn cleanup(server_config: &ServerConfig) -> BResult<()> {
let mut c = Connection::new(server_config.host(), server_config.port())?;
let r: Element = c.run_query(Query::from("drop model default.tmpbench force"))?;
if r == Element::RespCode(RespCode::Okay) {
Err(Error::RuntimeError("failed to run cleanup".into()))
} else {
Ok(())
}
}

@ -142,7 +142,8 @@ pub fn stress_linearity_concurrent_clients_set(
|_| {},
true,
Some(DEFAULT_QUERY_COUNT),
);
)
.unwrap();
let mut timer = SimpleTimer::new();
timer.start();
workpool.execute_and_finish_iter(set_packs);
@ -200,7 +201,8 @@ pub fn stress_linearity_concurrent_clients_get(
|_| {},
true,
Some(DEFAULT_QUERY_COUNT),
);
)
.unwrap();
workpool.execute_and_finish_iter(set_packs);
// initialize the linearity counter
@ -225,7 +227,8 @@ pub fn stress_linearity_concurrent_clients_get(
|_| {},
true,
Some(DEFAULT_QUERY_COUNT),
);
)
.unwrap();
let mut timer = SimpleTimer::new();
timer.start();
wp.execute_and_finish_iter(get_packs);

Loading…
Cancel
Save