diff --git a/Cargo.lock b/Cargo.lock index eb537c47..4dc37767 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -260,17 +260,6 @@ dependencies = [ "crossbeam-utils", ] -[[package]] -name = "crossbeam-deque" -version = "0.8.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce6fd6f855243022dcecf8702fef0c297d4338e226845fe067f6341ad9fa0cef" -dependencies = [ - "cfg-if", - "crossbeam-epoch", - "crossbeam-utils", -] - [[package]] name = "crossbeam-epoch" version = "0.9.15" @@ -345,12 +334,6 @@ dependencies = [ "subtle", ] -[[package]] -name = "either" -version = "1.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a26ae43d7bcc3b814de94796a5e736d4029efb0ee900c12e2d54c993ad1a1e07" - [[package]] name = "endian-type" version = "0.1.2" @@ -359,9 +342,9 @@ checksum = "c34f04666d835ff5d62e058c3995147c06f42fe86ff053337632bca83e42702d" [[package]] name = "env_logger" -version = "0.10.0" +version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85cdab6a89accf66733ad5a1693a4dcced6aeff64602b634530dd73c1f3ee9f0" +checksum = "95b3f3e67048839cb0d0781f445682a35113da7121f7c949db0e2be96a4fbece" dependencies = [ "humantime", "is-terminal", @@ -640,16 +623,6 @@ checksum = "89d92a4743f9a61002fae18374ed11e7973f530cb3a3255fb354818118b2203c" name = "libsky" version = "0.8.0" -[[package]] -name = "libstress" -version = "0.8.0" -dependencies = [ - "crossbeam-channel", - "log", - "rand", - "rayon", -] - [[package]] name = "linux-raw-sys" version = "0.4.5" @@ -668,9 +641,9 @@ dependencies = [ [[package]] name = "log" -version = "0.4.19" +version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b06a4cde4c0f271a446782e3eff8de789548ce57dbc8eca9292c27f4a42004b4" +checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" [[package]] name = "memchr" @@ -970,28 +943,6 @@ dependencies = [ "getrandom", ] -[[package]] -name = "rayon" -version = "1.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d2df5196e37bcc87abebc0053e20787d73847bb33134a69841207dd0a47f03b" -dependencies = [ - "either", - "rayon-core", -] - -[[package]] -name = "rayon-core" -version = "1.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4b8f95bd6966f5c87776639160a66bd8ab9895d9d4ab01ddba9fc60661aebe8d" -dependencies = [ - "crossbeam-channel", - "crossbeam-deque", - "crossbeam-utils", - "num_cpus", -] - [[package]] name = "rcrypt" version = "0.4.0" @@ -1225,7 +1176,11 @@ dependencies = [ name = "sky-bench" version = "0.8.0" dependencies = [ - "libstress", + "crossbeam-channel", + "env_logger", + "libsky", + "log", + "num_cpus", "skytable", ] diff --git a/Cargo.toml b/Cargo.toml index a07132a4..66bc3478 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,14 +1,6 @@ [workspace] resolver = "1" -members = [ - "cli", - "server", - "libsky", - "sky-bench", - "sky-macros", - "libstress", - "harness", -] +members = ["cli", "server", "libsky", "sky-bench", "sky-macros", "harness"] [profile.release] opt-level = 3 diff --git a/libstress/Cargo.toml b/libstress/Cargo.toml deleted file mode 100644 index dc66438b..00000000 --- a/libstress/Cargo.toml +++ /dev/null @@ -1,14 +0,0 @@ -[package] -name = "libstress" -version = "0.8.0" -authors = ["Sayan Nandan "] -edition = "2021" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -# external deps -crossbeam-channel = "0.5.8" -rayon = "1.7.0" -log = "0.4.19" -rand = "0.8.5" diff --git a/libstress/src/lib.rs b/libstress/src/lib.rs deleted file mode 100644 index 4a922773..00000000 --- a/libstress/src/lib.rs +++ /dev/null @@ -1,469 +0,0 @@ -/* - * Created on Wed Jun 16 2021 - * - * This file is a part of Skytable - * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source - * NoSQL database written by Sayan Nandan ("the Author") with the - * vision to provide flexibility in data modelling without compromising - * on performance, queryability or scalability. - * - * Copyright (c) 2021, Sayan Nandan - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * -*/ - -//! # libstress -//! -//! Tools for emulating concurrent query behavior to _stress test_ the database server. -//! As of now, this crate provides a [`Workpool`] which is a generic synchronous threadpool -//! for doing multiple operations. But Workpool is a little different from standard threadpool -//! implementations in it categorizing a job to be made up of three parts, namely: -//! -//! - The init_pre_loop_var (the pre-loop stage) -//! - The on_loop (the in-loop stage) -//! - The on_exit (the post-loop stage) -//! -//! These stages form a part of the event loop. -//! -//! ## The event loop -//! -//! A task runs in a loop with the `on_loop` routine to which the a reference of the result of -//! the `init_pre_loop_var` is sent that is initialized. The loop proceeds whenever a worker -//! receives a task or else it blocks the current thread, waiting for a task. Hence the loop -//! cannot be terminated by an execute call. Instead, the _event loop_ is terminated when the -//! Workpool is dropped, either by scoping out, or by using the provided finish-like methods -//! (that call the destructor). -//! -//! ## Worker lifetime -//! -//! If a runtime panic occurs in the pre-loop stage, then the entire worker just terminates. Hence -//! this worker is no longer able to perform any tasks. Similarly, if a runtime panic occurs in -//! the in-loop stage, the worker terminates and is no longer available to do any work. This will -//! be reflected when the workpool attempts to terminate in entirety, i.e when the threads are joined -//! to the parent thread -//! - -#![deny(unused_crate_dependencies)] -#![deny(unused_imports)] - -pub mod traits; -pub use rayon; - -use { - core::marker::PhantomData, - crossbeam_channel::{bounded, unbounded, Receiver as CReceiver, Sender as CSender}, - rayon::prelude::{IntoParallelIterator, ParallelIterator}, - std::{fmt::Display, thread}, -}; - -#[derive(Debug)] -pub enum WorkpoolError { - ThreadStartFailure(usize, usize), -} - -impl Display for WorkpoolError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - WorkpoolError::ThreadStartFailure(expected, started) => { - write!( - f, - "couldn't start all threads. expected {expected} but started {started}" - ) - } - } - } -} - -pub type WorkpoolResult = Result; - -/// A Job. The UIn type parameter is the type that will be used to execute the action -/// Nothing is a variant used by the drop implementation to terminate all the workers -/// and call the exit_loop function -enum JobType { - Task(UIn), - Nothing, -} - -/// A worker -/// -/// The only reason we use option is to reduce the effort needed to implement [`Drop`] for the -/// [`Workpool`] -struct Worker { - thread: Option>, -} - -impl Worker { - /// Initialize a new worker - fn new( - id: usize, - job_receiver: CReceiver>, - init_pre_loop_var: Lv, - on_exit: Ex, - on_loop: Lp, - wgtx: CSender<()>, - ) -> Self - where - UIn: Send + Sync + 'static, - Lv: Fn() -> Inp + 'static + Send, - Lp: Fn(&mut Inp, UIn) + Send + Sync + 'static, - Ex: Fn(&mut Inp) + Send + 'static, - { - let thread = thread::Builder::new() - .name(format!("worker-{id}")) - .spawn(move || { - let on_loop = on_loop; - let mut pre_loop_var = init_pre_loop_var(); - wgtx.send(()).unwrap(); - drop(wgtx); - loop { - let action = job_receiver.recv().unwrap(); - match action { - JobType::Task(tsk) => on_loop(&mut pre_loop_var, tsk), - JobType::Nothing => { - on_exit(&mut pre_loop_var); - break; - } - } - } - }) - .unwrap(); - Self { - thread: Some(thread), - } - } -} - -/// A pool configuration setting to easily generate [`Workpool`]s without -/// having to clone an entire pool and its threads upfront -pub struct PoolConfig { - /// the pool size - count: usize, - /// the function that sets the pre-loop variable - init_pre_loop_var: Lv, - /// the function to be executed on worker termination - on_exit: Ex, - /// the function to be executed on loop - on_loop: Lp, - /// a marker for `Inp` since no parameters use it directly - _marker: PhantomData<(Inp, UIn)>, - /// check if self needs a pool for parallel iterators - needs_iterator_pool: bool, - /// expected maximum number of jobs - expected_max_sends: Option, -} - -impl PoolConfig -where - UIn: Send + Sync + 'static, - Inp: Sync, - Ex: Fn(&mut Inp) + Send + Sync + 'static + Clone, - Lv: Fn() -> Inp + Send + Sync + 'static + Clone, - Lp: Fn(&mut Inp, UIn) + Clone + Send + Sync + 'static, -{ - /// Create a new pool config - pub fn new( - count: usize, - init_pre_loop_var: Lv, - on_loop: Lp, - on_exit: Ex, - needs_iterator_pool: bool, - expected_max_sends: Option, - ) -> Self { - Self { - count, - init_pre_loop_var, - on_loop, - on_exit, - needs_iterator_pool, - _marker: PhantomData, - expected_max_sends, - } - } - /// Get a new [`Workpool`] from the current config - pub fn get_pool(&self) -> WorkpoolResult> { - self.get_pool_with_workers(self.count) - } - /// Get a [`Workpool`] with the base config but with a different number of workers - pub fn get_pool_with_workers( - &self, - count: usize, - ) -> WorkpoolResult> { - Workpool::new( - count, - self.init_pre_loop_var.clone(), - self.on_loop.clone(), - self.on_exit.clone(), - self.needs_iterator_pool, - self.expected_max_sends, - ) - } - /// Get a [`Workpool`] with the base config but with a custom loop-stage closure - pub fn with_loop_closure(&self, lp: Dlp) -> WorkpoolResult> - where - Dlp: Fn(&mut Inp, UIn) + Clone + Send + Sync + 'static, - { - Workpool::new( - self.count, - self.init_pre_loop_var.clone(), - lp, - self.on_exit.clone(), - self.needs_iterator_pool, - self.expected_max_sends, - ) - } -} - -/// # Workpool -/// -/// A Workpool is a generic synchronous thread pool that can be used to perform, well, anything. -/// A workpool has to be initialized with the number of workers, the pre_loop_variable (set this -/// to None if there isn't any). what to do on loop and what to do on exit of each worker. The -/// closures are kept as `Clone`able types just to reduce complexity with copy (we were lazy). -/// -/// ## Clones -/// -/// Workpool clones simply create a new workpool with the same on_exit, on_loop and init_pre_loop_var -/// configurations. This provides a very convenient interface if one desires to use multiple workpools -/// to do the _same kind of thing_ -/// -/// ## Actual thread count -/// -/// The actual thread count will depend on whether the caller requests the initialization of an -/// iterator pool or not. If the caller does request for an iterator pool, then the number of threads -/// spawned will be twice the number of the set workers. Else, the number of spawned threads is equal -/// to the number of workers. -pub struct Workpool { - /// the workers - workers: Vec, - /// the sender that sends jobs - job_distributor: CSender>, - /// the function that sets the pre-loop variable - init_pre_loop_var: Lv, - /// the function to be executed on worker termination - on_exit: Ex, - /// the function to be executed on loop - on_loop: Lp, - /// a marker for `Inp` since no parameters use it directly - _marker: PhantomData, - /// check if self needs a pool for parallel iterators - needs_iterator_pool: bool, - /// expected maximum number of sends - expected_max_sends: Option, -} - -impl Workpool -where - UIn: Send + Sync + 'static, - Ex: Fn(&mut Inp) + Send + Sync + 'static + Clone, - Lv: Fn() -> Inp + Send + Sync + 'static + Clone, - Lp: Fn(&mut Inp, UIn) + Send + Sync + 'static + Clone, - Inp: Sync, -{ - /// Create a new workpool - pub fn new( - count: usize, - init_pre_loop_var: Lv, - on_loop: Lp, - on_exit: Ex, - needs_iterator_pool: bool, - expected_max_sends: Option, - ) -> WorkpoolResult { - // init threadpool for iterator - if needs_iterator_pool { - // initialize a global threadpool for parallel iterators - let _ = rayon::ThreadPoolBuilder::new() - .num_threads(count) - .build_global(); - } - assert!(count != 0, "Runtime panic: Bad value `0` for thread count"); - let (sender, receiver) = match expected_max_sends { - Some(limit) => bounded(limit), - None => unbounded(), - }; - let (wgtx, wgrx) = bounded::<()>(count); - let mut workers = Vec::with_capacity(count); - for i in 0..count { - workers.push(Worker::new( - i, - receiver.clone(), - init_pre_loop_var.clone(), - on_exit.clone(), - on_loop.clone(), - wgtx.clone(), - )); - } - drop(wgtx); - let sum: usize = wgrx.iter().map(|_| 1usize).sum(); - if sum == count { - Ok(Self { - workers, - job_distributor: sender, - init_pre_loop_var, - on_exit, - on_loop, - _marker: PhantomData, - needs_iterator_pool, - expected_max_sends, - }) - } else { - Err(WorkpoolError::ThreadStartFailure(count, sum)) - } - } - pub fn clone_pool(&self) -> WorkpoolResult { - Self::new( - self.workers.len(), - self.init_pre_loop_var.clone(), - self.on_loop.clone(), - self.on_exit.clone(), - self.needs_iterator_pool, - self.expected_max_sends, - ) - } - /// Execute something - pub fn execute(&self, inp: UIn) { - self.job_distributor - .send(JobType::Task(inp)) - .expect("Worker thread crashed") - } - /// Execute something that can be executed as a parallel iterator - /// For the best performance, it is recommended that you pass true for `needs_iterator_pool` - /// on initialization of the [`Workpool`] - pub fn execute_iter(&self, iter: impl IntoParallelIterator) { - iter.into_par_iter().for_each(|inp| self.execute(inp)) - } - /// Does the same thing as [`execute_iter`] but drops self ensuring that all the - /// workers actually finish their tasks - pub fn execute_and_finish_iter(self, iter: impl IntoParallelIterator) { - self.execute_iter(iter); - drop(self); - } - /// Initialize a new [`Workpool`] with the default count of threads. This is equal - /// to 2 * the number of logical cores. - pub fn new_default_threads( - init_pre_loop_var: Lv, - on_loop: Lp, - on_exit: Ex, - needs_iterator_pool: bool, - expected_max_sends: Option, - ) -> WorkpoolResult { - // we'll naively use the number of CPUs present on the system times 2 to determine - // the number of workers (sure the scheduler does tricks all the time) - let worker_count = thread::available_parallelism().map_or(1, usize::from) * 2; - Self::new( - worker_count, - init_pre_loop_var, - on_loop, - on_exit, - needs_iterator_pool, - expected_max_sends, - ) - } -} - -impl Drop for Workpool { - fn drop(&mut self) { - for _ in &self.workers { - self.job_distributor.send(JobType::Nothing).unwrap(); - } - for worker in &mut self.workers { - if let Some(thread) = worker.thread.take() { - thread.join().unwrap() - } - } - } -} - -pub mod utils { - const CHARSET: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; - use rand::distributions::{Alphanumeric, Standard}; - use std::collections::HashSet; - use std::collections::TryReserveError; - - /// Generate a random UTF-8 string - pub fn ran_string(len: usize, rand: impl rand::Rng) -> String { - let rand_string: String = rand - .sample_iter(&Alphanumeric) - .take(len) - .map(char::from) - .collect(); - rand_string - } - /// Generate a vector of random bytes - pub fn ran_bytes(len: usize, rand: impl rand::Rng) -> Vec { - rand.sample_iter(&Standard).take(len).collect() - } - /// Generate multiple vectors of random bytes - pub fn generate_random_byte_vector( - count: usize, - size: usize, - mut rng: impl rand::Rng, - unique: bool, - ) -> Result>, TryReserveError> { - if unique { - let mut keys = HashSet::new(); - keys.try_reserve(size)?; - (0..count).into_iter().for_each(|_| { - let mut ran = ran_bytes(size, &mut rng); - while keys.contains(&ran) { - ran = ran_bytes(size, &mut rng); - } - keys.insert(ran); - }); - Ok(keys.into_iter().collect()) - } else { - let mut keys = Vec::new(); - keys.try_reserve_exact(size)?; - let ran_byte_key = ran_bytes(size, &mut rng); - (0..count).for_each(|_| keys.push(ran_byte_key.clone())); - Ok(keys) - } - } - /// Generate a vector of random UTF-8 valid strings - pub fn generate_random_string_vector( - count: usize, - size: usize, - mut rng: impl rand::Rng, - unique: bool, - ) -> Result, TryReserveError> { - if unique { - let mut keys = HashSet::new(); - keys.try_reserve(size)?; - (0..count).into_iter().for_each(|_| { - let mut ran = ran_string(size, &mut rng); - while keys.contains(&ran) { - ran = ran_string(size, &mut rng); - } - keys.insert(ran); - }); - Ok(keys.into_iter().collect()) - } else { - let mut keys = Vec::new(); - keys.try_reserve_exact(size)?; - (0..count) - .into_iter() - .map(|_| ran_string(size, &mut rng)) - .for_each(|bytes| keys.push(bytes)); - Ok(keys) - } - } - pub fn rand_alphastring(len: usize, rng: &mut impl rand::Rng) -> String { - (0..len) - .map(|_| { - let idx = rng.gen_range(0..CHARSET.len()); - CHARSET[idx] as char - }) - .collect() - } -} diff --git a/libstress/src/traits.rs b/libstress/src/traits.rs deleted file mode 100644 index 12d7c343..00000000 --- a/libstress/src/traits.rs +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Created on Fri Jun 18 2021 - * - * This file is a part of Skytable - * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source - * NoSQL database written by Sayan Nandan ("the Author") with the - * vision to provide flexibility in data modelling without compromising - * on performance, queryability or scalability. - * - * Copyright (c) 2021, Sayan Nandan - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * -*/ - -use std::fmt; - -/// A trait for aggresive erroring -pub trait ExitError { - /// Abort the process if the type errors with an error code or - /// return the type - fn exit_error(self, msg: Ms) -> T - where - Ms: ToString; -} - -impl ExitError for Result -where - E: fmt::Display, -{ - fn exit_error(self, msg: Ms) -> T - where - Ms: ToString, - { - match self { - Self::Err(e) => { - log::error!("{} : '{}'", msg.to_string(), e); - std::process::exit(0x01); - } - Self::Ok(v) => v, - } - } -} - -impl ExitError for Option { - fn exit_error(self, msg: Ms) -> T - where - Ms: ToString, - { - match self { - Self::None => { - log::error!("{}", msg.to_string()); - std::process::exit(0x01); - } - Self::Some(v) => v, - } - } -} diff --git a/sky-bench/Cargo.toml b/sky-bench/Cargo.toml index 19595da4..b04e8c17 100644 --- a/sky-bench/Cargo.toml +++ b/sky-bench/Cargo.toml @@ -9,4 +9,9 @@ version = "0.8.0" [dependencies] # internal deps skytable = { git = "https://github.com/skytable/client-rust.git", branch = "octave" } -libstress = { path = "../libstress" } +libsky = { path = "../libsky" } +# external deps +crossbeam-channel = "0.5.8" +num_cpus = "1.16.0" +env_logger = "0.10.1" +log = "0.4.20" diff --git a/sky-bench/help_text/help b/sky-bench/help_text/help new file mode 100644 index 00000000..508f43f3 --- /dev/null +++ b/sky-bench/help_text/help @@ -0,0 +1,25 @@ +sky-bench 0.8.0 +Sayan N. +Skytable benchmark tool + +USAGE: + sky-bench [OPTIONS] + +FLAGS: + --help Displays this help message + --version Displays the benchmark tool version + +REQUIRED OPTIONS: + --password Provide the password + +OPTIONS: + --endpoint Set the endpoint (defaults to tcp@127.0.0.1:2003) + --threads Set the number of threads to be used (defaults to number of physical CPU cores) + --keysize Set the default primary key size. defaults to 7 + --rowcount Set the number of rows to be manipulated for the benchmark + +NOTES: + - The user for auth will be 'root' since only 'root' accounts allow the creation and deletion of spaces and models + - A space called 'benchmark_[random 8B string]' will be created + - A model called 'benchmark_[random 8B string]' will be created in the space created above. The created model has the structure {name: string, pass: string} + - The model and space will be removed once the benchmark is complete \ No newline at end of file diff --git a/sky-bench/src/args.rs b/sky-bench/src/args.rs new file mode 100644 index 00000000..1a8a63aa --- /dev/null +++ b/sky-bench/src/args.rs @@ -0,0 +1,217 @@ +/* + * Created on Sat Nov 18 2023 + * + * This file is a part of Skytable + * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source + * NoSQL database written by Sayan Nandan ("the Author") with the + * vision to provide flexibility in data modelling without compromising + * on performance, queryability or scalability. + * + * Copyright (c) 2023, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +use { + crate::error::{BenchError, BenchResult}, + std::{ + collections::hash_map::{Entry, HashMap}, + env, + }, +}; + +const TXT_HELP: &str = include_str!("../help_text/help"); + +#[derive(Debug)] +enum TaskInner { + HelpMsg(String), + CheckConfig(HashMap), +} + +#[derive(Debug)] +pub enum Task { + HelpMsg(String), + BenchConfig(BenchConfig), +} + +#[derive(Debug)] +pub struct BenchConfig { + pub host: String, + pub port: u16, + pub root_pass: String, + pub threads: usize, + pub key_size: usize, + pub query_count: usize, +} + +impl BenchConfig { + pub fn new( + host: String, + port: u16, + root_pass: String, + threads: usize, + key_size: usize, + query_count: usize, + ) -> Self { + Self { + host, + port, + root_pass, + threads, + key_size, + query_count, + } + } +} + +fn load_env() -> BenchResult { + let mut args = HashMap::new(); + let mut it = env::args().skip(1).into_iter(); + while let Some(arg) = it.next() { + let (arg, arg_val) = match arg.as_str() { + "--help" => return Ok(TaskInner::HelpMsg(TXT_HELP.into())), + "--version" => { + return Ok(TaskInner::HelpMsg(format!( + "sky-bench v{}", + libsky::VERSION + ))) + } + _ if arg.starts_with("--") => match it.next() { + Some(arg_val) => (arg, arg_val), + None => { + // self contained? + let split: Vec<&str> = arg.split("=").collect(); + if split.len() != 2 { + return Err(BenchError::ArgsErr(format!("expected value for {arg}"))); + } + (split[0].into(), split[1].into()) + } + }, + unknown_arg => { + return Err(BenchError::ArgsErr(format!( + "unknown argument: {unknown_arg}" + ))) + } + }; + match args.entry(arg) { + Entry::Occupied(oe) => { + return Err(BenchError::ArgsErr(format!( + "found duplicate values for {}", + oe.key() + ))) + } + Entry::Vacant(ve) => { + ve.insert(arg_val); + } + } + } + Ok(TaskInner::CheckConfig(args)) +} + +fn cdig(n: usize) -> usize { + if n == 0 { + 1 + } else { + (n as f64).log10().floor() as usize + 1 + } +} + +pub fn parse() -> BenchResult { + let mut args = match load_env()? { + TaskInner::HelpMsg(msg) => return Ok(Task::HelpMsg(msg)), + TaskInner::CheckConfig(args) => args, + }; + // endpoint + let (host, port) = match args.remove("--endpoint") { + None => ("127.0.0.1".to_owned(), 2003), + Some(ep) => { + // proto@host:port + let ep: Vec<&str> = ep.split("@").collect(); + if ep.len() != 2 { + return Err(BenchError::ArgsErr( + "value for --endpoint must be in the form `[protocol]@[host]:[port]`".into(), + )); + } + let protocol = ep[0]; + let host_port: Vec<&str> = ep[1].split(":").collect(); + if host_port.len() != 2 { + return Err(BenchError::ArgsErr( + "value for --endpoint must be in the form `[protocol]@[host]:[port]`".into(), + )); + } + let (host, port) = (host_port[0], host_port[1]); + let Ok(port) = port.parse::() else { + return Err(BenchError::ArgsErr( + "the value for port must be an integer in the range 0-65535".into(), + )); + }; + if protocol != "tcp" { + return Err(BenchError::ArgsErr( + "only TCP endpoints can be benchmarked at the moment".into(), + )); + } + (host.to_owned(), port) + } + }; + // password + let passsword = match args.remove("--password") { + Some(p) => p, + None => { + return Err(BenchError::ArgsErr( + "you must provide a value for `--password`".into(), + )) + } + }; + // threads + let thread_count = match args.remove("--threads") { + None => num_cpus::get(), + Some(tc) => match tc.parse() { + Ok(tc) if tc > 0 => tc, + Err(_) | Ok(_) => { + return Err(BenchError::ArgsErr( + "incorrect value for `--threads`. must be a nonzero value".into(), + )) + } + }, + }; + // query count + let query_count = match args.remove("--rowcount") { + None => 1_000_000_usize, + Some(rc) => match rc.parse() { + Ok(rc) if rc != 0 => rc, + Err(_) | Ok(_) => { + return Err(BenchError::ArgsErr(format!( + "bad value for `--rowcount` must be a nonzero value" + ))) + } + }, + }; + let need_atleast = cdig(query_count); + let key_size = match args.remove("--keysize") { + None => need_atleast, + Some(ks) => match ks.parse() { + Ok(s) if s >= need_atleast => s, + Err(_) | Ok(_) => return Err(BenchError::ArgsErr(format!("incorrect value for `--keysize`. must be set to a value that can be used to generate atleast {query_count} unique primary keys"))), + } + }; + Ok(Task::BenchConfig(BenchConfig::new( + host, + port, + passsword, + thread_count, + key_size, + query_count, + ))) +} diff --git a/sky-bench/src/bench.rs b/sky-bench/src/bench.rs new file mode 100644 index 00000000..651b30c6 --- /dev/null +++ b/sky-bench/src/bench.rs @@ -0,0 +1,216 @@ +/* + * Created on Sat Nov 18 2023 + * + * This file is a part of Skytable + * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source + * NoSQL database written by Sayan Nandan ("the Author") with the + * vision to provide flexibility in data modelling without compromising + * on performance, queryability or scalability. + * + * Copyright (c) 2023, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +use crate::error::BenchResult; + +use { + crate::{ + args::BenchConfig, + error::{self, BenchmarkTaskWorkerError}, + pool::{RuntimeStats, Taskpool, ThreadedTask}, + }, + skytable::{query, response::Response, Config, Connection, Query}, + std::time::Instant, +}; + +#[derive(Debug)] +pub struct BenchmarkTask { + cfg: Config, +} + +impl BenchmarkTask { + pub fn new(host: &str, port: u16, username: &str, password: &str) -> Self { + Self { + cfg: Config::new(host, port, username, password), + } + } +} + +impl ThreadedTask for BenchmarkTask { + type TaskWorker = Connection; + type TaskWorkerInitError = BenchmarkTaskWorkerError; + type TaskWorkerTerminateError = BenchmarkTaskWorkerError; + type TaskWorkerWorkError = BenchmarkTaskWorkerError; + type TaskInput = (Query, Response); + fn initialize_worker(&self) -> Result { + self.cfg.connect().map_err(Into::into) + } + fn drive_worker_timed( + worker: &mut Self::TaskWorker, + (query, expected_resp): Self::TaskInput, + ) -> Result<(Instant, Instant), Self::TaskWorkerWorkError> { + let start = Instant::now(); + let resp = worker.query(&query)?; + let stop = Instant::now(); + if resp == expected_resp { + Ok((start, stop)) + } else { + Err(BenchmarkTaskWorkerError::Error(format!( + "response from server did not match expected response: {:?}", + resp + ))) + } + } + fn terminate_worker( + &self, + _: &mut Self::TaskWorker, + ) -> Result<(), Self::TaskWorkerTerminateError> { + Ok(()) + } +} + +pub fn run(bench: BenchConfig) -> error::BenchResult<()> { + let bench_config = BenchmarkTask::new(&bench.host, bench.port, "root", &bench.root_pass); + info!("running preliminary checks and creating model `bench.bench` with definition: `{{un: string, pw: uint8}}`"); + let mut main_thread_db = bench_config.cfg.connect()?; + main_thread_db.query_parse::<()>(&query!("create space bench"))?; + main_thread_db.query_parse::<()>(&query!("create model bench.bench(un: string, pw: uint8)"))?; + info!( + "initializing connection pool with {} connections", + bench.threads + ); + let mut p = Taskpool::new(bench.threads, bench_config)?; + info!( + "pool initialized successfully. preparing {} `INSERT` queries with primary key size={} bytes", + bench.query_count, bench.key_size + ); + let mut insert_stats = Default::default(); + let mut update_stats = Default::default(); + let mut delete_stats = Default::default(); + match || -> BenchResult<()> { + // bench insert + let insert_queries: Vec<(Query, Response)> = (0..bench.query_count) + .into_iter() + .map(|i| { + ( + query!( + "insert into bench.bench(?, ?)", + format!("{:0>width$}", i, width = bench.key_size), + 0u64 + ), + Response::Empty, + ) + }) + .collect(); + info!("benchmarking `INSERT` queries"); + insert_stats = p.blocking_bombard(insert_queries)?; + // bench update + info!("completed benchmarking `INSERT`. preparing `UPDATE` queries"); + let update_queries: Vec<(Query, Response)> = (0..bench.query_count) + .into_iter() + .map(|i| { + ( + query!( + "update bench.bench set pw += ? where un = ?", + 1u64, + format!("{:0>width$}", i, width = bench.key_size), + ), + Response::Empty, + ) + }) + .collect(); + info!("benchmarking `UPDATE` queries"); + update_stats = p.blocking_bombard(update_queries)?; + // bench delete + info!("completed benchmarking `UPDATE`. preparing `DELETE` queries"); + let delete_queries: Vec<(Query, Response)> = (0..bench.query_count) + .into_iter() + .map(|i| { + ( + query!( + "delete from bench.bench where un = ?", + format!("{:0>width$}", i, width = bench.key_size), + ), + Response::Empty, + ) + }) + .collect(); + info!("benchmarking `DELETE` queries"); + delete_stats = p.blocking_bombard(delete_queries)?; + info!("completed benchmarking `DELETE` queries"); + Ok(()) + }() { + Ok(()) => {} + Err(e) => { + error!("benchmarking failed. attempting to clean up"); + match cleanup(main_thread_db) { + Ok(()) => return Err(e), + Err(e_cleanup) => { + error!("failed to clean up db: {e_cleanup}. please remove model `bench.bench` manually"); + return Err(e); + } + } + } + } + drop(p); + warn!("benchmarks might appear to be slower. this tool is currently experimental"); + // print results + info!("results:"); + print_table(vec![ + ("INSERT", insert_stats), + ("UPDATE", update_stats), + ("DELETE", delete_stats), + ]); + cleanup(main_thread_db)?; + Ok(()) +} + +fn cleanup(mut main_thread_db: Connection) -> Result<(), error::BenchError> { + trace!("dropping space and table"); + main_thread_db.query_parse::<()>(&query!("drop model bench.bench"))?; + main_thread_db.query_parse::<()>(&query!("drop space bench"))?; + Ok(()) +} + +fn print_table(data: Vec<(&'static str, RuntimeStats)>) { + println!( + "+---------+--------------------------+-----------------------+------------------------+" + ); + println!( + "| Query | Effective real-world QPS | Slowest Query (nanos) | Fastest Query (nanos) |" + ); + println!( + "+---------+--------------------------+-----------------------+------------------------+" + ); + for ( + query, + RuntimeStats { + qps, + avg_per_query_ns: _, + head_ns, + tail_ns, + }, + ) in data + { + println!( + "| {:<7} | {:>24.2} | {:>21} | {:>22} |", + query, qps, tail_ns, head_ns + ); + } + println!( + "+---------+--------------------------+-----------------------+------------------------+" + ); +} diff --git a/sky-bench/src/error.rs b/sky-bench/src/error.rs new file mode 100644 index 00000000..0fe3fcb6 --- /dev/null +++ b/sky-bench/src/error.rs @@ -0,0 +1,85 @@ +/* + * Created on Sat Nov 18 2023 + * + * This file is a part of Skytable + * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source + * NoSQL database written by Sayan Nandan ("the Author") with the + * vision to provide flexibility in data modelling without compromising + * on performance, queryability or scalability. + * + * Copyright (c) 2023, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +use { + crate::{bench::BenchmarkTask, pool::TaskpoolError}, + core::fmt, + skytable::error::Error, +}; + +pub type BenchResult = Result; + +#[derive(Debug)] +pub enum BenchError { + ArgsErr(String), + BenchError(TaskpoolError), + DirectDbError(Error), +} + +impl From> for BenchError { + fn from(e: TaskpoolError) -> Self { + Self::BenchError(e) + } +} + +impl From for BenchError { + fn from(e: Error) -> Self { + Self::DirectDbError(e) + } +} + +impl fmt::Display for BenchError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::ArgsErr(e) => write!(f, "args error: {e}"), + Self::BenchError(e) => write!(f, "benchmark system error: {e}"), + Self::DirectDbError(e) => write!(f, "direct operation on db failed. {e}"), + } + } +} + +impl std::error::Error for BenchError {} + +#[derive(Debug)] +pub enum BenchmarkTaskWorkerError { + DbError(Error), + Error(String), +} + +impl From for BenchmarkTaskWorkerError { + fn from(e: Error) -> Self { + Self::DbError(e) + } +} + +impl fmt::Display for BenchmarkTaskWorkerError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::DbError(e) => write!(f, "worker failed due to DB error. {e}"), + Self::Error(e) => write!(f, "worker failed. {e}"), + } + } +} diff --git a/sky-bench/src/main.rs b/sky-bench/src/main.rs index 57a21b8e..27dfc5ae 100644 --- a/sky-bench/src/main.rs +++ b/sky-bench/src/main.rs @@ -24,4 +24,31 @@ * */ -fn main() {} +#[macro_use] +extern crate log; +mod args; +mod bench; +mod error; +mod pool; + +fn main() { + env_logger::Builder::new() + .parse_filters(&std::env::var("SKYBENCH_LOG").unwrap_or_else(|_| "info".to_owned())) + .init(); + match run() { + Ok(()) => {} + Err(e) => { + error!("bench error: {e}"); + std::process::exit(0x01); + } + } +} + +fn run() -> error::BenchResult<()> { + let task = args::parse()?; + match task { + args::Task::HelpMsg(msg) => println!("{msg}"), + args::Task::BenchConfig(bench) => bench::run(bench)?, + } + Ok(()) +} diff --git a/sky-bench/src/pool.rs b/sky-bench/src/pool.rs new file mode 100644 index 00000000..0b6e13e4 --- /dev/null +++ b/sky-bench/src/pool.rs @@ -0,0 +1,279 @@ +/* + * Created on Fri Nov 17 2023 + * + * This file is a part of Skytable + * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source + * NoSQL database written by Sayan Nandan ("the Author") with the + * vision to provide flexibility in data modelling without compromising + * on performance, queryability or scalability. + * + * Copyright (c) 2023, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +use { + crossbeam_channel::{unbounded, Receiver, Sender}, + std::{ + fmt, + marker::PhantomData, + thread::{self, JoinHandle}, + time::Instant, + }, +}; + +pub type TaskPoolResult = Result>; + +#[derive(Debug)] +pub enum TaskpoolError { + InitError(Th::TaskWorkerInitError), + BombardError(&'static str), + WorkerError(Th::TaskWorkerWorkError), +} + +impl fmt::Display for TaskpoolError +where + Th::TaskWorkerInitError: fmt::Display, + Th::TaskWorkerWorkError: fmt::Display, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::InitError(e) => write!(f, "failed to init worker pool. {e}"), + Self::BombardError(e) => write!(f, "failed to post work to pool. {e}"), + Self::WorkerError(e) => write!(f, "failed running worker task. {e}"), + } + } +} + +pub trait ThreadedTask: Send + Sync + 'static { + /// the per-thread item that does the actual work + /// + /// NB: this is not to be confused with the actual underlying thread pool worker + type TaskWorker: Send + Sync; + /// when attempting initialization of the per-thread task worker, if an error is thrown, this is the type + /// you're looking for + type TaskWorkerInitError: Send + Sync; + /// when attempting to run a single unit of work, if any error occurs this is the error type that is to be returned + type TaskWorkerWorkError: Send + Sync; + /// when attempting to close a worker, if an error occurs this is the error type that is returned + type TaskWorkerTerminateError: Send + Sync; + /// the task that is sent to each worker + type TaskInput: Send + Sync; + // fn + /// initialize the worker + fn initialize_worker(&self) -> Result; + /// drive the worker to complete a task and return the time + fn drive_worker_timed( + worker: &mut Self::TaskWorker, + task: Self::TaskInput, + ) -> Result<(Instant, Instant), Self::TaskWorkerWorkError>; + fn terminate_worker( + &self, + worker: &mut Self::TaskWorker, + ) -> Result<(), Self::TaskWorkerTerminateError>; +} + +#[derive(Debug)] +struct ThreadWorker { + handle: JoinHandle<()>, + _m: PhantomData, +} + +#[derive(Debug)] +enum WorkerTask { + Task(Th::TaskInput), + Exit, +} + +impl ThreadWorker { + fn new( + hl_worker: Th::TaskWorker, + task_rx: Receiver>, + res_tx: Sender>, + ) -> Self { + Self { + handle: thread::spawn(move || { + let mut worker = hl_worker; + loop { + let task = match task_rx.recv().unwrap() { + WorkerTask::Exit => { + drop(task_rx); + return; + } + WorkerTask::Task(t) => t, + }; + res_tx + .send(Th::drive_worker_timed(&mut worker, task)) + .unwrap(); + } + }), + _m: PhantomData, + } + } +} + +#[derive(Debug)] +pub struct Taskpool { + workers: Vec>, + _config: Th, + task_tx: Sender>, + res_rx: Receiver>, + record_real_start: Instant, + record_real_stop: Instant, + stat_run_avg_ns: f64, + stat_run_tail_ns: u128, + stat_run_head_ns: u128, +} + +// TODO(@ohsayan): prepare histogram for report; for now there's no use of the head and tail latencies +#[derive(Default, Debug)] +pub struct RuntimeStats { + pub qps: f64, + pub avg_per_query_ns: f64, + pub head_ns: u128, + pub tail_ns: u128, +} + +impl Taskpool { + pub fn stat_avg(&self) -> f64 { + self.stat_run_avg_ns + } + pub fn stat_tail(&self) -> u128 { + self.stat_run_tail_ns + } + pub fn stat_head(&self) -> u128 { + self.stat_run_head_ns + } + pub fn stat_elapsed(&self) -> u128 { + self.record_real_stop + .duration_since(self.record_real_start) + .as_nanos() + } +} + +fn qps(query_count: usize, time_taken_in_nanos: u128) -> f64 { + const NANOS_PER_SECOND: u128 = 1_000_000_000; + let time_taken_in_nanos_f64 = time_taken_in_nanos as f64; + let query_count_f64 = query_count as f64; + (query_count_f64 / time_taken_in_nanos_f64) * NANOS_PER_SECOND as f64 +} + +impl Taskpool { + pub fn new(size: usize, config: Th) -> TaskPoolResult { + let (task_tx, task_rx) = unbounded(); + let (res_tx, res_rx) = unbounded(); + let mut workers = Vec::with_capacity(size); + for _ in 0..size { + let con = config + .initialize_worker() + .map_err(TaskpoolError::InitError)?; + workers.push(ThreadWorker::new(con, task_rx.clone(), res_tx.clone())); + } + Ok(Self { + workers, + _config: config, + task_tx, + res_rx, + stat_run_avg_ns: 0.0, + record_real_start: Instant::now(), + record_real_stop: Instant::now(), + stat_run_head_ns: u128::MAX, + stat_run_tail_ns: u128::MIN, + }) + } + pub fn blocking_bombard( + &mut self, + vec: Vec, + ) -> TaskPoolResult { + let expected = vec.len(); + let mut received = 0usize; + for task in vec { + match self.task_tx.send(WorkerTask::Task(task)) { + Ok(()) => {} + Err(_) => { + // stop bombarding, we hit an error + return Err(TaskpoolError::BombardError( + "all worker threads exited. this indicates a catastrophic failure", + )); + } + } + } + while received != expected { + match self.res_rx.recv() { + Err(_) => { + // all workers exited. that is catastrophic + return Err(TaskpoolError::BombardError( + "detected all worker threads crashed during run check", + )); + } + Ok(r) => self.recompute_stats(&mut received, r)?, + }; + } + // compute stats + let ret = Ok(RuntimeStats { + qps: qps(received, self.stat_elapsed()), + avg_per_query_ns: self.stat_avg(), + head_ns: self.stat_head(), + tail_ns: self.stat_tail(), + }); + // reset stats + self.stat_run_avg_ns = 0.0; + self.record_real_start = Instant::now(); + self.record_real_stop = Instant::now(); + self.stat_run_head_ns = u128::MAX; + self.stat_run_tail_ns = u128::MIN; + // return + ret + } + fn recompute_stats( + &mut self, + received: &mut usize, + result: Result<(Instant, Instant), ::TaskWorkerWorkError>, + ) -> Result<(), TaskpoolError> { + *received += 1; + let (start, stop) = match result { + Ok(time) => time, + Err(e) => return Err(TaskpoolError::WorkerError(e)), + }; + // adjust real start + if start < self.record_real_start { + self.record_real_start = start; + } + if stop > self.record_real_stop { + self.record_real_stop = stop; + } + let current_time = stop.duration_since(start).as_nanos(); + self.stat_run_avg_ns = self.stat_run_avg_ns + + ((current_time as f64 - self.stat_run_avg_ns) / *received as f64); + if current_time > self.stat_run_tail_ns { + self.stat_run_tail_ns = current_time; + } + if current_time < self.stat_run_head_ns { + self.stat_run_head_ns = current_time; + } + Ok(()) + } +} + +impl Drop for Taskpool { + fn drop(&mut self) { + for _ in 0..self.workers.len() { + self.task_tx.send(WorkerTask::Exit).unwrap(); + } + for worker in self.workers.drain(..) { + worker.handle.join().unwrap() + } + } +}