Implement new benchmark tool

next
Sayan Nandan 10 months ago
parent 3cdd814067
commit daf0f32c30
No known key found for this signature in database
GPG Key ID: 42EEDF4AE9D96B54

63
Cargo.lock generated

@ -260,17 +260,6 @@ dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-deque"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ce6fd6f855243022dcecf8702fef0c297d4338e226845fe067f6341ad9fa0cef"
dependencies = [
"cfg-if",
"crossbeam-epoch",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-epoch"
version = "0.9.15"
@ -345,12 +334,6 @@ dependencies = [
"subtle",
]
[[package]]
name = "either"
version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a26ae43d7bcc3b814de94796a5e736d4029efb0ee900c12e2d54c993ad1a1e07"
[[package]]
name = "endian-type"
version = "0.1.2"
@ -359,9 +342,9 @@ checksum = "c34f04666d835ff5d62e058c3995147c06f42fe86ff053337632bca83e42702d"
[[package]]
name = "env_logger"
version = "0.10.0"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85cdab6a89accf66733ad5a1693a4dcced6aeff64602b634530dd73c1f3ee9f0"
checksum = "95b3f3e67048839cb0d0781f445682a35113da7121f7c949db0e2be96a4fbece"
dependencies = [
"humantime",
"is-terminal",
@ -640,16 +623,6 @@ checksum = "89d92a4743f9a61002fae18374ed11e7973f530cb3a3255fb354818118b2203c"
name = "libsky"
version = "0.8.0"
[[package]]
name = "libstress"
version = "0.8.0"
dependencies = [
"crossbeam-channel",
"log",
"rand",
"rayon",
]
[[package]]
name = "linux-raw-sys"
version = "0.4.5"
@ -668,9 +641,9 @@ dependencies = [
[[package]]
name = "log"
version = "0.4.19"
version = "0.4.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b06a4cde4c0f271a446782e3eff8de789548ce57dbc8eca9292c27f4a42004b4"
checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f"
[[package]]
name = "memchr"
@ -970,28 +943,6 @@ dependencies = [
"getrandom",
]
[[package]]
name = "rayon"
version = "1.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d2df5196e37bcc87abebc0053e20787d73847bb33134a69841207dd0a47f03b"
dependencies = [
"either",
"rayon-core",
]
[[package]]
name = "rayon-core"
version = "1.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4b8f95bd6966f5c87776639160a66bd8ab9895d9d4ab01ddba9fc60661aebe8d"
dependencies = [
"crossbeam-channel",
"crossbeam-deque",
"crossbeam-utils",
"num_cpus",
]
[[package]]
name = "rcrypt"
version = "0.4.0"
@ -1225,7 +1176,11 @@ dependencies = [
name = "sky-bench"
version = "0.8.0"
dependencies = [
"libstress",
"crossbeam-channel",
"env_logger",
"libsky",
"log",
"num_cpus",
"skytable",
]

@ -1,14 +1,6 @@
[workspace]
resolver = "1"
members = [
"cli",
"server",
"libsky",
"sky-bench",
"sky-macros",
"libstress",
"harness",
]
members = ["cli", "server", "libsky", "sky-bench", "sky-macros", "harness"]
[profile.release]
opt-level = 3

@ -1,14 +0,0 @@
[package]
name = "libstress"
version = "0.8.0"
authors = ["Sayan Nandan <nandansayan@outlook.com>"]
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
# external deps
crossbeam-channel = "0.5.8"
rayon = "1.7.0"
log = "0.4.19"
rand = "0.8.5"

@ -1,469 +0,0 @@
/*
* Created on Wed Jun 16 2021
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2021, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
//! # libstress
//!
//! Tools for emulating concurrent query behavior to _stress test_ the database server.
//! As of now, this crate provides a [`Workpool`] which is a generic synchronous threadpool
//! for doing multiple operations. But Workpool is a little different from standard threadpool
//! implementations in it categorizing a job to be made up of three parts, namely:
//!
//! - The init_pre_loop_var (the pre-loop stage)
//! - The on_loop (the in-loop stage)
//! - The on_exit (the post-loop stage)
//!
//! These stages form a part of the event loop.
//!
//! ## The event loop
//!
//! A task runs in a loop with the `on_loop` routine to which the a reference of the result of
//! the `init_pre_loop_var` is sent that is initialized. The loop proceeds whenever a worker
//! receives a task or else it blocks the current thread, waiting for a task. Hence the loop
//! cannot be terminated by an execute call. Instead, the _event loop_ is terminated when the
//! Workpool is dropped, either by scoping out, or by using the provided finish-like methods
//! (that call the destructor).
//!
//! ## Worker lifetime
//!
//! If a runtime panic occurs in the pre-loop stage, then the entire worker just terminates. Hence
//! this worker is no longer able to perform any tasks. Similarly, if a runtime panic occurs in
//! the in-loop stage, the worker terminates and is no longer available to do any work. This will
//! be reflected when the workpool attempts to terminate in entirety, i.e when the threads are joined
//! to the parent thread
//!
#![deny(unused_crate_dependencies)]
#![deny(unused_imports)]
pub mod traits;
pub use rayon;
use {
core::marker::PhantomData,
crossbeam_channel::{bounded, unbounded, Receiver as CReceiver, Sender as CSender},
rayon::prelude::{IntoParallelIterator, ParallelIterator},
std::{fmt::Display, thread},
};
#[derive(Debug)]
pub enum WorkpoolError {
ThreadStartFailure(usize, usize),
}
impl Display for WorkpoolError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
WorkpoolError::ThreadStartFailure(expected, started) => {
write!(
f,
"couldn't start all threads. expected {expected} but started {started}"
)
}
}
}
}
pub type WorkpoolResult<T> = Result<T, WorkpoolError>;
/// A Job. The UIn type parameter is the type that will be used to execute the action
/// Nothing is a variant used by the drop implementation to terminate all the workers
/// and call the exit_loop function
enum JobType<UIn> {
Task(UIn),
Nothing,
}
/// A worker
///
/// The only reason we use option is to reduce the effort needed to implement [`Drop`] for the
/// [`Workpool`]
struct Worker {
thread: Option<thread::JoinHandle<()>>,
}
impl Worker {
/// Initialize a new worker
fn new<Inp: 'static, UIn, Lv, Lp, Ex>(
id: usize,
job_receiver: CReceiver<JobType<UIn>>,
init_pre_loop_var: Lv,
on_exit: Ex,
on_loop: Lp,
wgtx: CSender<()>,
) -> Self
where
UIn: Send + Sync + 'static,
Lv: Fn() -> Inp + 'static + Send,
Lp: Fn(&mut Inp, UIn) + Send + Sync + 'static,
Ex: Fn(&mut Inp) + Send + 'static,
{
let thread = thread::Builder::new()
.name(format!("worker-{id}"))
.spawn(move || {
let on_loop = on_loop;
let mut pre_loop_var = init_pre_loop_var();
wgtx.send(()).unwrap();
drop(wgtx);
loop {
let action = job_receiver.recv().unwrap();
match action {
JobType::Task(tsk) => on_loop(&mut pre_loop_var, tsk),
JobType::Nothing => {
on_exit(&mut pre_loop_var);
break;
}
}
}
})
.unwrap();
Self {
thread: Some(thread),
}
}
}
/// A pool configuration setting to easily generate [`Workpool`]s without
/// having to clone an entire pool and its threads upfront
pub struct PoolConfig<Inp, UIn, Lv, Lp, Ex> {
/// the pool size
count: usize,
/// the function that sets the pre-loop variable
init_pre_loop_var: Lv,
/// the function to be executed on worker termination
on_exit: Ex,
/// the function to be executed on loop
on_loop: Lp,
/// a marker for `Inp` since no parameters use it directly
_marker: PhantomData<(Inp, UIn)>,
/// check if self needs a pool for parallel iterators
needs_iterator_pool: bool,
/// expected maximum number of jobs
expected_max_sends: Option<usize>,
}
impl<Inp: 'static, UIn, Lv, Lp, Ex> PoolConfig<Inp, UIn, Lv, Lp, Ex>
where
UIn: Send + Sync + 'static,
Inp: Sync,
Ex: Fn(&mut Inp) + Send + Sync + 'static + Clone,
Lv: Fn() -> Inp + Send + Sync + 'static + Clone,
Lp: Fn(&mut Inp, UIn) + Clone + Send + Sync + 'static,
{
/// Create a new pool config
pub fn new(
count: usize,
init_pre_loop_var: Lv,
on_loop: Lp,
on_exit: Ex,
needs_iterator_pool: bool,
expected_max_sends: Option<usize>,
) -> Self {
Self {
count,
init_pre_loop_var,
on_loop,
on_exit,
needs_iterator_pool,
_marker: PhantomData,
expected_max_sends,
}
}
/// Get a new [`Workpool`] from the current config
pub fn get_pool(&self) -> WorkpoolResult<Workpool<Inp, UIn, Lv, Lp, Ex>> {
self.get_pool_with_workers(self.count)
}
/// Get a [`Workpool`] with the base config but with a different number of workers
pub fn get_pool_with_workers(
&self,
count: usize,
) -> WorkpoolResult<Workpool<Inp, UIn, Lv, Lp, Ex>> {
Workpool::new(
count,
self.init_pre_loop_var.clone(),
self.on_loop.clone(),
self.on_exit.clone(),
self.needs_iterator_pool,
self.expected_max_sends,
)
}
/// Get a [`Workpool`] with the base config but with a custom loop-stage closure
pub fn with_loop_closure<Dlp>(&self, lp: Dlp) -> WorkpoolResult<Workpool<Inp, UIn, Lv, Dlp, Ex>>
where
Dlp: Fn(&mut Inp, UIn) + Clone + Send + Sync + 'static,
{
Workpool::new(
self.count,
self.init_pre_loop_var.clone(),
lp,
self.on_exit.clone(),
self.needs_iterator_pool,
self.expected_max_sends,
)
}
}
/// # Workpool
///
/// A Workpool is a generic synchronous thread pool that can be used to perform, well, anything.
/// A workpool has to be initialized with the number of workers, the pre_loop_variable (set this
/// to None if there isn't any). what to do on loop and what to do on exit of each worker. The
/// closures are kept as `Clone`able types just to reduce complexity with copy (we were lazy).
///
/// ## Clones
///
/// Workpool clones simply create a new workpool with the same on_exit, on_loop and init_pre_loop_var
/// configurations. This provides a very convenient interface if one desires to use multiple workpools
/// to do the _same kind of thing_
///
/// ## Actual thread count
///
/// The actual thread count will depend on whether the caller requests the initialization of an
/// iterator pool or not. If the caller does request for an iterator pool, then the number of threads
/// spawned will be twice the number of the set workers. Else, the number of spawned threads is equal
/// to the number of workers.
pub struct Workpool<Inp, UIn, Lv, Lp, Ex> {
/// the workers
workers: Vec<Worker>,
/// the sender that sends jobs
job_distributor: CSender<JobType<UIn>>,
/// the function that sets the pre-loop variable
init_pre_loop_var: Lv,
/// the function to be executed on worker termination
on_exit: Ex,
/// the function to be executed on loop
on_loop: Lp,
/// a marker for `Inp` since no parameters use it directly
_marker: PhantomData<Inp>,
/// check if self needs a pool for parallel iterators
needs_iterator_pool: bool,
/// expected maximum number of sends
expected_max_sends: Option<usize>,
}
impl<Inp: 'static, UIn, Lv, Ex, Lp> Workpool<Inp, UIn, Lv, Lp, Ex>
where
UIn: Send + Sync + 'static,
Ex: Fn(&mut Inp) + Send + Sync + 'static + Clone,
Lv: Fn() -> Inp + Send + Sync + 'static + Clone,
Lp: Fn(&mut Inp, UIn) + Send + Sync + 'static + Clone,
Inp: Sync,
{
/// Create a new workpool
pub fn new(
count: usize,
init_pre_loop_var: Lv,
on_loop: Lp,
on_exit: Ex,
needs_iterator_pool: bool,
expected_max_sends: Option<usize>,
) -> WorkpoolResult<Self> {
// init threadpool for iterator
if needs_iterator_pool {
// initialize a global threadpool for parallel iterators
let _ = rayon::ThreadPoolBuilder::new()
.num_threads(count)
.build_global();
}
assert!(count != 0, "Runtime panic: Bad value `0` for thread count");
let (sender, receiver) = match expected_max_sends {
Some(limit) => bounded(limit),
None => unbounded(),
};
let (wgtx, wgrx) = bounded::<()>(count);
let mut workers = Vec::with_capacity(count);
for i in 0..count {
workers.push(Worker::new(
i,
receiver.clone(),
init_pre_loop_var.clone(),
on_exit.clone(),
on_loop.clone(),
wgtx.clone(),
));
}
drop(wgtx);
let sum: usize = wgrx.iter().map(|_| 1usize).sum();
if sum == count {
Ok(Self {
workers,
job_distributor: sender,
init_pre_loop_var,
on_exit,
on_loop,
_marker: PhantomData,
needs_iterator_pool,
expected_max_sends,
})
} else {
Err(WorkpoolError::ThreadStartFailure(count, sum))
}
}
pub fn clone_pool(&self) -> WorkpoolResult<Self> {
Self::new(
self.workers.len(),
self.init_pre_loop_var.clone(),
self.on_loop.clone(),
self.on_exit.clone(),
self.needs_iterator_pool,
self.expected_max_sends,
)
}
/// Execute something
pub fn execute(&self, inp: UIn) {
self.job_distributor
.send(JobType::Task(inp))
.expect("Worker thread crashed")
}
/// Execute something that can be executed as a parallel iterator
/// For the best performance, it is recommended that you pass true for `needs_iterator_pool`
/// on initialization of the [`Workpool`]
pub fn execute_iter(&self, iter: impl IntoParallelIterator<Item = UIn>) {
iter.into_par_iter().for_each(|inp| self.execute(inp))
}
/// Does the same thing as [`execute_iter`] but drops self ensuring that all the
/// workers actually finish their tasks
pub fn execute_and_finish_iter(self, iter: impl IntoParallelIterator<Item = UIn>) {
self.execute_iter(iter);
drop(self);
}
/// Initialize a new [`Workpool`] with the default count of threads. This is equal
/// to 2 * the number of logical cores.
pub fn new_default_threads(
init_pre_loop_var: Lv,
on_loop: Lp,
on_exit: Ex,
needs_iterator_pool: bool,
expected_max_sends: Option<usize>,
) -> WorkpoolResult<Self> {
// we'll naively use the number of CPUs present on the system times 2 to determine
// the number of workers (sure the scheduler does tricks all the time)
let worker_count = thread::available_parallelism().map_or(1, usize::from) * 2;
Self::new(
worker_count,
init_pre_loop_var,
on_loop,
on_exit,
needs_iterator_pool,
expected_max_sends,
)
}
}
impl<Inp, UIn, Lv, Lp, Ex> Drop for Workpool<Inp, UIn, Lp, Lv, Ex> {
fn drop(&mut self) {
for _ in &self.workers {
self.job_distributor.send(JobType::Nothing).unwrap();
}
for worker in &mut self.workers {
if let Some(thread) = worker.thread.take() {
thread.join().unwrap()
}
}
}
}
pub mod utils {
const CHARSET: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
use rand::distributions::{Alphanumeric, Standard};
use std::collections::HashSet;
use std::collections::TryReserveError;
/// Generate a random UTF-8 string
pub fn ran_string(len: usize, rand: impl rand::Rng) -> String {
let rand_string: String = rand
.sample_iter(&Alphanumeric)
.take(len)
.map(char::from)
.collect();
rand_string
}
/// Generate a vector of random bytes
pub fn ran_bytes(len: usize, rand: impl rand::Rng) -> Vec<u8> {
rand.sample_iter(&Standard).take(len).collect()
}
/// Generate multiple vectors of random bytes
pub fn generate_random_byte_vector(
count: usize,
size: usize,
mut rng: impl rand::Rng,
unique: bool,
) -> Result<Vec<Vec<u8>>, TryReserveError> {
if unique {
let mut keys = HashSet::new();
keys.try_reserve(size)?;
(0..count).into_iter().for_each(|_| {
let mut ran = ran_bytes(size, &mut rng);
while keys.contains(&ran) {
ran = ran_bytes(size, &mut rng);
}
keys.insert(ran);
});
Ok(keys.into_iter().collect())
} else {
let mut keys = Vec::new();
keys.try_reserve_exact(size)?;
let ran_byte_key = ran_bytes(size, &mut rng);
(0..count).for_each(|_| keys.push(ran_byte_key.clone()));
Ok(keys)
}
}
/// Generate a vector of random UTF-8 valid strings
pub fn generate_random_string_vector(
count: usize,
size: usize,
mut rng: impl rand::Rng,
unique: bool,
) -> Result<Vec<String>, TryReserveError> {
if unique {
let mut keys = HashSet::new();
keys.try_reserve(size)?;
(0..count).into_iter().for_each(|_| {
let mut ran = ran_string(size, &mut rng);
while keys.contains(&ran) {
ran = ran_string(size, &mut rng);
}
keys.insert(ran);
});
Ok(keys.into_iter().collect())
} else {
let mut keys = Vec::new();
keys.try_reserve_exact(size)?;
(0..count)
.into_iter()
.map(|_| ran_string(size, &mut rng))
.for_each(|bytes| keys.push(bytes));
Ok(keys)
}
}
pub fn rand_alphastring(len: usize, rng: &mut impl rand::Rng) -> String {
(0..len)
.map(|_| {
let idx = rng.gen_range(0..CHARSET.len());
CHARSET[idx] as char
})
.collect()
}
}

@ -1,69 +0,0 @@
/*
* Created on Fri Jun 18 2021
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2021, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use std::fmt;
/// A trait for aggresive erroring
pub trait ExitError<T> {
/// Abort the process if the type errors with an error code or
/// return the type
fn exit_error<Ms>(self, msg: Ms) -> T
where
Ms: ToString;
}
impl<T, E> ExitError<T> for Result<T, E>
where
E: fmt::Display,
{
fn exit_error<Ms>(self, msg: Ms) -> T
where
Ms: ToString,
{
match self {
Self::Err(e) => {
log::error!("{} : '{}'", msg.to_string(), e);
std::process::exit(0x01);
}
Self::Ok(v) => v,
}
}
}
impl<T> ExitError<T> for Option<T> {
fn exit_error<Ms>(self, msg: Ms) -> T
where
Ms: ToString,
{
match self {
Self::None => {
log::error!("{}", msg.to_string());
std::process::exit(0x01);
}
Self::Some(v) => v,
}
}
}

@ -9,4 +9,9 @@ version = "0.8.0"
[dependencies]
# internal deps
skytable = { git = "https://github.com/skytable/client-rust.git", branch = "octave" }
libstress = { path = "../libstress" }
libsky = { path = "../libsky" }
# external deps
crossbeam-channel = "0.5.8"
num_cpus = "1.16.0"
env_logger = "0.10.1"
log = "0.4.20"

@ -0,0 +1,25 @@
sky-bench 0.8.0
Sayan N. <ohsayan@outlook.com>
Skytable benchmark tool
USAGE:
sky-bench [OPTIONS]
FLAGS:
--help Displays this help message
--version Displays the benchmark tool version
REQUIRED OPTIONS:
--password Provide the password
OPTIONS:
--endpoint Set the endpoint (defaults to tcp@127.0.0.1:2003)
--threads Set the number of threads to be used (defaults to number of physical CPU cores)
--keysize Set the default primary key size. defaults to 7
--rowcount Set the number of rows to be manipulated for the benchmark
NOTES:
- The user for auth will be 'root' since only 'root' accounts allow the creation and deletion of spaces and models
- A space called 'benchmark_[random 8B string]' will be created
- A model called 'benchmark_[random 8B string]' will be created in the space created above. The created model has the structure {name: string, pass: string}
- The model and space will be removed once the benchmark is complete

@ -0,0 +1,217 @@
/*
* Created on Sat Nov 18 2023
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2023, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use {
crate::error::{BenchError, BenchResult},
std::{
collections::hash_map::{Entry, HashMap},
env,
},
};
const TXT_HELP: &str = include_str!("../help_text/help");
#[derive(Debug)]
enum TaskInner {
HelpMsg(String),
CheckConfig(HashMap<String, String>),
}
#[derive(Debug)]
pub enum Task {
HelpMsg(String),
BenchConfig(BenchConfig),
}
#[derive(Debug)]
pub struct BenchConfig {
pub host: String,
pub port: u16,
pub root_pass: String,
pub threads: usize,
pub key_size: usize,
pub query_count: usize,
}
impl BenchConfig {
pub fn new(
host: String,
port: u16,
root_pass: String,
threads: usize,
key_size: usize,
query_count: usize,
) -> Self {
Self {
host,
port,
root_pass,
threads,
key_size,
query_count,
}
}
}
fn load_env() -> BenchResult<TaskInner> {
let mut args = HashMap::new();
let mut it = env::args().skip(1).into_iter();
while let Some(arg) = it.next() {
let (arg, arg_val) = match arg.as_str() {
"--help" => return Ok(TaskInner::HelpMsg(TXT_HELP.into())),
"--version" => {
return Ok(TaskInner::HelpMsg(format!(
"sky-bench v{}",
libsky::VERSION
)))
}
_ if arg.starts_with("--") => match it.next() {
Some(arg_val) => (arg, arg_val),
None => {
// self contained?
let split: Vec<&str> = arg.split("=").collect();
if split.len() != 2 {
return Err(BenchError::ArgsErr(format!("expected value for {arg}")));
}
(split[0].into(), split[1].into())
}
},
unknown_arg => {
return Err(BenchError::ArgsErr(format!(
"unknown argument: {unknown_arg}"
)))
}
};
match args.entry(arg) {
Entry::Occupied(oe) => {
return Err(BenchError::ArgsErr(format!(
"found duplicate values for {}",
oe.key()
)))
}
Entry::Vacant(ve) => {
ve.insert(arg_val);
}
}
}
Ok(TaskInner::CheckConfig(args))
}
fn cdig(n: usize) -> usize {
if n == 0 {
1
} else {
(n as f64).log10().floor() as usize + 1
}
}
pub fn parse() -> BenchResult<Task> {
let mut args = match load_env()? {
TaskInner::HelpMsg(msg) => return Ok(Task::HelpMsg(msg)),
TaskInner::CheckConfig(args) => args,
};
// endpoint
let (host, port) = match args.remove("--endpoint") {
None => ("127.0.0.1".to_owned(), 2003),
Some(ep) => {
// proto@host:port
let ep: Vec<&str> = ep.split("@").collect();
if ep.len() != 2 {
return Err(BenchError::ArgsErr(
"value for --endpoint must be in the form `[protocol]@[host]:[port]`".into(),
));
}
let protocol = ep[0];
let host_port: Vec<&str> = ep[1].split(":").collect();
if host_port.len() != 2 {
return Err(BenchError::ArgsErr(
"value for --endpoint must be in the form `[protocol]@[host]:[port]`".into(),
));
}
let (host, port) = (host_port[0], host_port[1]);
let Ok(port) = port.parse::<u16>() else {
return Err(BenchError::ArgsErr(
"the value for port must be an integer in the range 0-65535".into(),
));
};
if protocol != "tcp" {
return Err(BenchError::ArgsErr(
"only TCP endpoints can be benchmarked at the moment".into(),
));
}
(host.to_owned(), port)
}
};
// password
let passsword = match args.remove("--password") {
Some(p) => p,
None => {
return Err(BenchError::ArgsErr(
"you must provide a value for `--password`".into(),
))
}
};
// threads
let thread_count = match args.remove("--threads") {
None => num_cpus::get(),
Some(tc) => match tc.parse() {
Ok(tc) if tc > 0 => tc,
Err(_) | Ok(_) => {
return Err(BenchError::ArgsErr(
"incorrect value for `--threads`. must be a nonzero value".into(),
))
}
},
};
// query count
let query_count = match args.remove("--rowcount") {
None => 1_000_000_usize,
Some(rc) => match rc.parse() {
Ok(rc) if rc != 0 => rc,
Err(_) | Ok(_) => {
return Err(BenchError::ArgsErr(format!(
"bad value for `--rowcount` must be a nonzero value"
)))
}
},
};
let need_atleast = cdig(query_count);
let key_size = match args.remove("--keysize") {
None => need_atleast,
Some(ks) => match ks.parse() {
Ok(s) if s >= need_atleast => s,
Err(_) | Ok(_) => return Err(BenchError::ArgsErr(format!("incorrect value for `--keysize`. must be set to a value that can be used to generate atleast {query_count} unique primary keys"))),
}
};
Ok(Task::BenchConfig(BenchConfig::new(
host,
port,
passsword,
thread_count,
key_size,
query_count,
)))
}

@ -0,0 +1,216 @@
/*
* Created on Sat Nov 18 2023
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2023, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use crate::error::BenchResult;
use {
crate::{
args::BenchConfig,
error::{self, BenchmarkTaskWorkerError},
pool::{RuntimeStats, Taskpool, ThreadedTask},
},
skytable::{query, response::Response, Config, Connection, Query},
std::time::Instant,
};
#[derive(Debug)]
pub struct BenchmarkTask {
cfg: Config,
}
impl BenchmarkTask {
pub fn new(host: &str, port: u16, username: &str, password: &str) -> Self {
Self {
cfg: Config::new(host, port, username, password),
}
}
}
impl ThreadedTask for BenchmarkTask {
type TaskWorker = Connection;
type TaskWorkerInitError = BenchmarkTaskWorkerError;
type TaskWorkerTerminateError = BenchmarkTaskWorkerError;
type TaskWorkerWorkError = BenchmarkTaskWorkerError;
type TaskInput = (Query, Response);
fn initialize_worker(&self) -> Result<Self::TaskWorker, Self::TaskWorkerInitError> {
self.cfg.connect().map_err(Into::into)
}
fn drive_worker_timed(
worker: &mut Self::TaskWorker,
(query, expected_resp): Self::TaskInput,
) -> Result<(Instant, Instant), Self::TaskWorkerWorkError> {
let start = Instant::now();
let resp = worker.query(&query)?;
let stop = Instant::now();
if resp == expected_resp {
Ok((start, stop))
} else {
Err(BenchmarkTaskWorkerError::Error(format!(
"response from server did not match expected response: {:?}",
resp
)))
}
}
fn terminate_worker(
&self,
_: &mut Self::TaskWorker,
) -> Result<(), Self::TaskWorkerTerminateError> {
Ok(())
}
}
pub fn run(bench: BenchConfig) -> error::BenchResult<()> {
let bench_config = BenchmarkTask::new(&bench.host, bench.port, "root", &bench.root_pass);
info!("running preliminary checks and creating model `bench.bench` with definition: `{{un: string, pw: uint8}}`");
let mut main_thread_db = bench_config.cfg.connect()?;
main_thread_db.query_parse::<()>(&query!("create space bench"))?;
main_thread_db.query_parse::<()>(&query!("create model bench.bench(un: string, pw: uint8)"))?;
info!(
"initializing connection pool with {} connections",
bench.threads
);
let mut p = Taskpool::new(bench.threads, bench_config)?;
info!(
"pool initialized successfully. preparing {} `INSERT` queries with primary key size={} bytes",
bench.query_count, bench.key_size
);
let mut insert_stats = Default::default();
let mut update_stats = Default::default();
let mut delete_stats = Default::default();
match || -> BenchResult<()> {
// bench insert
let insert_queries: Vec<(Query, Response)> = (0..bench.query_count)
.into_iter()
.map(|i| {
(
query!(
"insert into bench.bench(?, ?)",
format!("{:0>width$}", i, width = bench.key_size),
0u64
),
Response::Empty,
)
})
.collect();
info!("benchmarking `INSERT` queries");
insert_stats = p.blocking_bombard(insert_queries)?;
// bench update
info!("completed benchmarking `INSERT`. preparing `UPDATE` queries");
let update_queries: Vec<(Query, Response)> = (0..bench.query_count)
.into_iter()
.map(|i| {
(
query!(
"update bench.bench set pw += ? where un = ?",
1u64,
format!("{:0>width$}", i, width = bench.key_size),
),
Response::Empty,
)
})
.collect();
info!("benchmarking `UPDATE` queries");
update_stats = p.blocking_bombard(update_queries)?;
// bench delete
info!("completed benchmarking `UPDATE`. preparing `DELETE` queries");
let delete_queries: Vec<(Query, Response)> = (0..bench.query_count)
.into_iter()
.map(|i| {
(
query!(
"delete from bench.bench where un = ?",
format!("{:0>width$}", i, width = bench.key_size),
),
Response::Empty,
)
})
.collect();
info!("benchmarking `DELETE` queries");
delete_stats = p.blocking_bombard(delete_queries)?;
info!("completed benchmarking `DELETE` queries");
Ok(())
}() {
Ok(()) => {}
Err(e) => {
error!("benchmarking failed. attempting to clean up");
match cleanup(main_thread_db) {
Ok(()) => return Err(e),
Err(e_cleanup) => {
error!("failed to clean up db: {e_cleanup}. please remove model `bench.bench` manually");
return Err(e);
}
}
}
}
drop(p);
warn!("benchmarks might appear to be slower. this tool is currently experimental");
// print results
info!("results:");
print_table(vec![
("INSERT", insert_stats),
("UPDATE", update_stats),
("DELETE", delete_stats),
]);
cleanup(main_thread_db)?;
Ok(())
}
fn cleanup(mut main_thread_db: Connection) -> Result<(), error::BenchError> {
trace!("dropping space and table");
main_thread_db.query_parse::<()>(&query!("drop model bench.bench"))?;
main_thread_db.query_parse::<()>(&query!("drop space bench"))?;
Ok(())
}
fn print_table(data: Vec<(&'static str, RuntimeStats)>) {
println!(
"+---------+--------------------------+-----------------------+------------------------+"
);
println!(
"| Query | Effective real-world QPS | Slowest Query (nanos) | Fastest Query (nanos) |"
);
println!(
"+---------+--------------------------+-----------------------+------------------------+"
);
for (
query,
RuntimeStats {
qps,
avg_per_query_ns: _,
head_ns,
tail_ns,
},
) in data
{
println!(
"| {:<7} | {:>24.2} | {:>21} | {:>22} |",
query, qps, tail_ns, head_ns
);
}
println!(
"+---------+--------------------------+-----------------------+------------------------+"
);
}

@ -0,0 +1,85 @@
/*
* Created on Sat Nov 18 2023
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2023, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use {
crate::{bench::BenchmarkTask, pool::TaskpoolError},
core::fmt,
skytable::error::Error,
};
pub type BenchResult<T> = Result<T, BenchError>;
#[derive(Debug)]
pub enum BenchError {
ArgsErr(String),
BenchError(TaskpoolError<BenchmarkTask>),
DirectDbError(Error),
}
impl From<TaskpoolError<BenchmarkTask>> for BenchError {
fn from(e: TaskpoolError<BenchmarkTask>) -> Self {
Self::BenchError(e)
}
}
impl From<Error> for BenchError {
fn from(e: Error) -> Self {
Self::DirectDbError(e)
}
}
impl fmt::Display for BenchError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::ArgsErr(e) => write!(f, "args error: {e}"),
Self::BenchError(e) => write!(f, "benchmark system error: {e}"),
Self::DirectDbError(e) => write!(f, "direct operation on db failed. {e}"),
}
}
}
impl std::error::Error for BenchError {}
#[derive(Debug)]
pub enum BenchmarkTaskWorkerError {
DbError(Error),
Error(String),
}
impl From<Error> for BenchmarkTaskWorkerError {
fn from(e: Error) -> Self {
Self::DbError(e)
}
}
impl fmt::Display for BenchmarkTaskWorkerError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::DbError(e) => write!(f, "worker failed due to DB error. {e}"),
Self::Error(e) => write!(f, "worker failed. {e}"),
}
}
}

@ -24,4 +24,31 @@
*
*/
fn main() {}
#[macro_use]
extern crate log;
mod args;
mod bench;
mod error;
mod pool;
fn main() {
env_logger::Builder::new()
.parse_filters(&std::env::var("SKYBENCH_LOG").unwrap_or_else(|_| "info".to_owned()))
.init();
match run() {
Ok(()) => {}
Err(e) => {
error!("bench error: {e}");
std::process::exit(0x01);
}
}
}
fn run() -> error::BenchResult<()> {
let task = args::parse()?;
match task {
args::Task::HelpMsg(msg) => println!("{msg}"),
args::Task::BenchConfig(bench) => bench::run(bench)?,
}
Ok(())
}

@ -0,0 +1,279 @@
/*
* Created on Fri Nov 17 2023
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2023, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use {
crossbeam_channel::{unbounded, Receiver, Sender},
std::{
fmt,
marker::PhantomData,
thread::{self, JoinHandle},
time::Instant,
},
};
pub type TaskPoolResult<T, Th> = Result<T, TaskpoolError<Th>>;
#[derive(Debug)]
pub enum TaskpoolError<Th: ThreadedTask> {
InitError(Th::TaskWorkerInitError),
BombardError(&'static str),
WorkerError(Th::TaskWorkerWorkError),
}
impl<Th: ThreadedTask> fmt::Display for TaskpoolError<Th>
where
Th::TaskWorkerInitError: fmt::Display,
Th::TaskWorkerWorkError: fmt::Display,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::InitError(e) => write!(f, "failed to init worker pool. {e}"),
Self::BombardError(e) => write!(f, "failed to post work to pool. {e}"),
Self::WorkerError(e) => write!(f, "failed running worker task. {e}"),
}
}
}
pub trait ThreadedTask: Send + Sync + 'static {
/// the per-thread item that does the actual work
///
/// NB: this is not to be confused with the actual underlying thread pool worker
type TaskWorker: Send + Sync;
/// when attempting initialization of the per-thread task worker, if an error is thrown, this is the type
/// you're looking for
type TaskWorkerInitError: Send + Sync;
/// when attempting to run a single unit of work, if any error occurs this is the error type that is to be returned
type TaskWorkerWorkError: Send + Sync;
/// when attempting to close a worker, if an error occurs this is the error type that is returned
type TaskWorkerTerminateError: Send + Sync;
/// the task that is sent to each worker
type TaskInput: Send + Sync;
// fn
/// initialize the worker
fn initialize_worker(&self) -> Result<Self::TaskWorker, Self::TaskWorkerInitError>;
/// drive the worker to complete a task and return the time
fn drive_worker_timed(
worker: &mut Self::TaskWorker,
task: Self::TaskInput,
) -> Result<(Instant, Instant), Self::TaskWorkerWorkError>;
fn terminate_worker(
&self,
worker: &mut Self::TaskWorker,
) -> Result<(), Self::TaskWorkerTerminateError>;
}
#[derive(Debug)]
struct ThreadWorker<Th> {
handle: JoinHandle<()>,
_m: PhantomData<Th>,
}
#[derive(Debug)]
enum WorkerTask<Th: ThreadedTask> {
Task(Th::TaskInput),
Exit,
}
impl<Th: ThreadedTask> ThreadWorker<Th> {
fn new(
hl_worker: Th::TaskWorker,
task_rx: Receiver<WorkerTask<Th>>,
res_tx: Sender<Result<(Instant, Instant), Th::TaskWorkerWorkError>>,
) -> Self {
Self {
handle: thread::spawn(move || {
let mut worker = hl_worker;
loop {
let task = match task_rx.recv().unwrap() {
WorkerTask::Exit => {
drop(task_rx);
return;
}
WorkerTask::Task(t) => t,
};
res_tx
.send(Th::drive_worker_timed(&mut worker, task))
.unwrap();
}
}),
_m: PhantomData,
}
}
}
#[derive(Debug)]
pub struct Taskpool<Th: ThreadedTask> {
workers: Vec<ThreadWorker<Th>>,
_config: Th,
task_tx: Sender<WorkerTask<Th>>,
res_rx: Receiver<Result<(Instant, Instant), Th::TaskWorkerWorkError>>,
record_real_start: Instant,
record_real_stop: Instant,
stat_run_avg_ns: f64,
stat_run_tail_ns: u128,
stat_run_head_ns: u128,
}
// TODO(@ohsayan): prepare histogram for report; for now there's no use of the head and tail latencies
#[derive(Default, Debug)]
pub struct RuntimeStats {
pub qps: f64,
pub avg_per_query_ns: f64,
pub head_ns: u128,
pub tail_ns: u128,
}
impl<Th: ThreadedTask> Taskpool<Th> {
pub fn stat_avg(&self) -> f64 {
self.stat_run_avg_ns
}
pub fn stat_tail(&self) -> u128 {
self.stat_run_tail_ns
}
pub fn stat_head(&self) -> u128 {
self.stat_run_head_ns
}
pub fn stat_elapsed(&self) -> u128 {
self.record_real_stop
.duration_since(self.record_real_start)
.as_nanos()
}
}
fn qps(query_count: usize, time_taken_in_nanos: u128) -> f64 {
const NANOS_PER_SECOND: u128 = 1_000_000_000;
let time_taken_in_nanos_f64 = time_taken_in_nanos as f64;
let query_count_f64 = query_count as f64;
(query_count_f64 / time_taken_in_nanos_f64) * NANOS_PER_SECOND as f64
}
impl<Th: ThreadedTask> Taskpool<Th> {
pub fn new(size: usize, config: Th) -> TaskPoolResult<Self, Th> {
let (task_tx, task_rx) = unbounded();
let (res_tx, res_rx) = unbounded();
let mut workers = Vec::with_capacity(size);
for _ in 0..size {
let con = config
.initialize_worker()
.map_err(TaskpoolError::InitError)?;
workers.push(ThreadWorker::new(con, task_rx.clone(), res_tx.clone()));
}
Ok(Self {
workers,
_config: config,
task_tx,
res_rx,
stat_run_avg_ns: 0.0,
record_real_start: Instant::now(),
record_real_stop: Instant::now(),
stat_run_head_ns: u128::MAX,
stat_run_tail_ns: u128::MIN,
})
}
pub fn blocking_bombard(
&mut self,
vec: Vec<Th::TaskInput>,
) -> TaskPoolResult<RuntimeStats, Th> {
let expected = vec.len();
let mut received = 0usize;
for task in vec {
match self.task_tx.send(WorkerTask::Task(task)) {
Ok(()) => {}
Err(_) => {
// stop bombarding, we hit an error
return Err(TaskpoolError::BombardError(
"all worker threads exited. this indicates a catastrophic failure",
));
}
}
}
while received != expected {
match self.res_rx.recv() {
Err(_) => {
// all workers exited. that is catastrophic
return Err(TaskpoolError::BombardError(
"detected all worker threads crashed during run check",
));
}
Ok(r) => self.recompute_stats(&mut received, r)?,
};
}
// compute stats
let ret = Ok(RuntimeStats {
qps: qps(received, self.stat_elapsed()),
avg_per_query_ns: self.stat_avg(),
head_ns: self.stat_head(),
tail_ns: self.stat_tail(),
});
// reset stats
self.stat_run_avg_ns = 0.0;
self.record_real_start = Instant::now();
self.record_real_stop = Instant::now();
self.stat_run_head_ns = u128::MAX;
self.stat_run_tail_ns = u128::MIN;
// return
ret
}
fn recompute_stats(
&mut self,
received: &mut usize,
result: Result<(Instant, Instant), <Th as ThreadedTask>::TaskWorkerWorkError>,
) -> Result<(), TaskpoolError<Th>> {
*received += 1;
let (start, stop) = match result {
Ok(time) => time,
Err(e) => return Err(TaskpoolError::WorkerError(e)),
};
// adjust real start
if start < self.record_real_start {
self.record_real_start = start;
}
if stop > self.record_real_stop {
self.record_real_stop = stop;
}
let current_time = stop.duration_since(start).as_nanos();
self.stat_run_avg_ns = self.stat_run_avg_ns
+ ((current_time as f64 - self.stat_run_avg_ns) / *received as f64);
if current_time > self.stat_run_tail_ns {
self.stat_run_tail_ns = current_time;
}
if current_time < self.stat_run_head_ns {
self.stat_run_head_ns = current_time;
}
Ok(())
}
}
impl<Th: ThreadedTask> Drop for Taskpool<Th> {
fn drop(&mut self) {
for _ in 0..self.workers.len() {
self.task_tx.send(WorkerTask::Exit).unwrap();
}
for worker in self.workers.drain(..) {
worker.handle.join().unwrap()
}
}
}
Loading…
Cancel
Save