diff --git a/cli/Cargo.toml b/cli/Cargo.toml index e27538bd..acd78d13 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -10,5 +10,6 @@ edition = "2021" # internal deps libsky = { path = "../libsky" } skytable = { git = "https://github.com/skytable/client-rust.git", branch = "octave" } +# external deps crossterm = "0.27.0" rustyline = "12.0.0" diff --git a/cli/src/args.rs b/cli/src/args.rs index 29923f8d..804afc16 100644 --- a/cli/src/args.rs +++ b/cli/src/args.rs @@ -30,9 +30,10 @@ use { event::{self, Event, KeyCode, KeyEvent}, terminal, }, + libsky::CliAction, std::{ - collections::{hash_map::Entry, HashMap}, - env, fs, + collections::HashMap, + fs, io::{self, Write}, process::exit, }, @@ -75,42 +76,12 @@ enum TaskInner { } fn load_env() -> CliResult { - let mut args = HashMap::new(); - let mut it = env::args().skip(1).into_iter(); - while let Some(arg) = it.next() { - let (arg, arg_val) = match arg.as_str() { - "--help" => return Ok(TaskInner::HelpMsg(TXT_HELP.into())), - "--version" => return Ok(TaskInner::HelpMsg(format!("skysh v{}", libsky::VERSION))), - _ if arg.starts_with("--") => match it.next() { - Some(arg_val) => (arg, arg_val), - None => { - // self contained? - let split: Vec<&str> = arg.split("=").collect(); - if split.len() != 2 { - return Err(CliError::ArgsErr(format!("expected value for {arg}"))); - } - (split[0].into(), split[1].into()) - } - }, - unknown_arg => { - return Err(CliError::ArgsErr(format!( - "unknown argument: {unknown_arg}" - ))) - } - }; - match args.entry(arg) { - Entry::Occupied(oe) => { - return Err(CliError::ArgsErr(format!( - "found duplicate values for {}", - oe.key() - ))) - } - Entry::Vacant(ve) => { - ve.insert(arg_val); - } - } + let action = libsky::parse_cli_args_disallow_duplicate()?; + match action { + CliAction::Help => Ok(TaskInner::HelpMsg(TXT_HELP.into())), + CliAction::Version => Ok(TaskInner::HelpMsg(libsky::version_msg("skysh"))), + CliAction::Action(a) => Ok(TaskInner::OpenShell(a)), } - Ok(TaskInner::OpenShell(args)) } pub fn parse() -> CliResult { diff --git a/cli/src/error.rs b/cli/src/error.rs index 89878bac..913c2e9f 100644 --- a/cli/src/error.rs +++ b/cli/src/error.rs @@ -36,6 +36,19 @@ pub enum CliError { IoError(std::io::Error), } +impl From for CliError { + fn from(e: libsky::ArgParseError) -> Self { + match e { + libsky::ArgParseError::Duplicate(d) => { + Self::ArgsErr(format!("duplicate value for `{d}`")) + } + libsky::ArgParseError::MissingValue(m) => { + Self::ArgsErr(format!("missing value for `{m}`")) + } + } + } +} + impl From for CliError { fn from(cle: skytable::error::Error) -> Self { Self::ClientError(cle) diff --git a/libsky/src/lib.rs b/libsky/src/lib.rs index 15ee9247..e5c93208 100644 --- a/libsky/src/lib.rs +++ b/libsky/src/lib.rs @@ -31,9 +31,6 @@ //! //! This contains modules which are shared by both the `cli` and the `server` modules -use std::error::Error; -/// A generic result -pub type TResult = Result>; /// The size of the read buffer in bytes pub const BUF_CAP: usize = 8 * 1024; // 8 KB per-connection /// The current version @@ -41,15 +38,130 @@ pub const VERSION: &str = env!("CARGO_PKG_VERSION"); /// The URL pub const URL: &str = "https://github.com/skytable/skytable"; -#[macro_export] -/// Don't use unwrap_or but use this macro as the optimizer fails to optimize away usages -/// of unwrap_or and creates a lot of LLVM IR bloat. use -// FIXME(@ohsayan): Fix this when https://github.com/rust-lang/rust/issues/68667 is addressed -macro_rules! option_unwrap_or { - ($try:expr, $fallback:expr) => { - match $try { - Some(t) => t, - None => $fallback, +use std::{ + collections::{hash_map::Entry, HashMap}, + env, +}; + +/// Returns a formatted version message `{binary} vx.y.z` +pub fn version_msg(binary: &str) -> String { + format!("{binary} v{VERSION}") +} + +#[derive(Debug, PartialEq)] +/// The CLI action that is expected to be performed +pub enum CliAction { + /// Display the `--help` message + Help, + /// Dipslay the `--version` + Version, + /// Perform an action using the given args + Action(A), +} + +pub type CliActionMulti = CliAction>>; +pub type CliActionSingle = CliAction>; + +/* + generic cli arg parser +*/ + +#[derive(Debug, PartialEq)] +/// Argument parse error +pub enum AnyArgsParseError { + /// The value for the given argument was either incorrectly formatted or missing + MissingValue(String), +} +/// Parse CLI args, allowing duplicates (bucketing them) +pub fn parse_cli_args_allow_duplicate() -> Result { + parse_args(env::args()) +} +/// Parse args allowing and bucketing any duplicates +pub fn parse_args( + args: impl IntoIterator, +) -> Result { + let mut ret: HashMap> = HashMap::new(); + let mut args = args.into_iter().skip(1).peekable(); + while let Some(arg) = args.next() { + if arg == "--help" { + return Ok(CliAction::Help); + } + if arg == "--version" { + return Ok(CliAction::Version); + } + let (arg, value) = extract_arg(arg, &mut args).map_err(AnyArgsParseError::MissingValue)?; + match ret.get_mut(&arg) { + Some(values) => { + values.push(value); + } + None => { + ret.insert(arg, vec![value]); + } + } + } + Ok(CliAction::Action(ret)) +} + +/* + no duplicate arg parser +*/ + +#[derive(Debug, PartialEq)] +/// Argument parse error +pub enum ArgParseError { + /// The given argument had a duplicate value + Duplicate(String), + /// The given argument did not have an appropriate value + MissingValue(String), +} +/// Parse all non-repeating CLI arguments +pub fn parse_cli_args_disallow_duplicate() -> Result { + parse_args_deny_duplicate(env::args()) +} +/// Parse all arguments but deny any duplicates +pub fn parse_args_deny_duplicate( + args: impl IntoIterator, +) -> Result { + let mut ret: HashMap = HashMap::new(); + let mut args = args.into_iter().skip(1).peekable(); + while let Some(arg) = args.next() { + if arg == "--help" { + return Ok(CliAction::Help); + } + if arg == "--version" { + return Ok(CliAction::Version); + } + let (arg, value) = extract_arg(arg, &mut args).map_err(ArgParseError::MissingValue)?; + match ret.entry(arg) { + Entry::Vacant(v) => { + v.insert(value); + } + Entry::Occupied(oe) => return Err(ArgParseError::Duplicate(oe.key().into())), + } + } + Ok(CliAction::Action(ret)) +} + +/// Extract an argument: +/// - `--arg=value` +/// - `--arg value` +fn extract_arg( + arg: String, + args: &mut impl Iterator, +) -> Result<(String, String), String> { + let this_args: Vec<&str> = arg.split("=").collect(); + let (arg, value) = if this_args.len() == 2 { + // self contained arg + (this_args[0].to_owned(), this_args[1].to_owned()) + } else { + if this_args.len() == 1 { + match args.next() { + None => return Err(arg), + Some(val) => (arg, val), + } + } else { + return Err(arg); } }; + Ok((arg, value)) } diff --git a/sky-bench/README.md b/sky-bench/README.md index 69869217..f9327f13 100644 --- a/sky-bench/README.md +++ b/sky-bench/README.md @@ -5,16 +5,10 @@ tool doesn't do anything "fancy" to make benchmarks appear better than they are. Here's how the benchmark tool works (it's dead simple): -1. Depending on the configuration it launches "network pools" which are just thread pools where each worker - holds a persistent connection to the database (something like a connection pool) -2. A collection of unique, random keys are generated using a PRNG provided by the `rand` library that is - seeded using the OS' source of randomness. The values are allowed to repeat -3. The [Skytable Rust driver](https://github.com/skytable/client-rust) is used to generate _raw query packets_. To put it simply, the keys and values are turned into `Query` objects and then into the raw bytes that will be sent over the network. (This just makes it simpler to design the network pool interface) -4. For every type of benchmark (GET,SET,...) we use the network pool to send all the bytes and wait until we receive the expected response. We time how long it takes to send and receive the response for all the queries for the given test (aggregate) -5. We repeat this for all the remaining tests -6. We repeat the entire set of tests 5 times (by default, this can be changed). -7. We do the calculations and output the results. - -## License - -All files in this directory are distributed under the [AGPL-3.0 License](../LICENSE). +1. We start up some threads with each having a thread local connection to the database +2. Each thread attempts to keep running queries until the target number of queries is reached. + - This sort of simulates a real-world scenario where these threads are like your application servers sending requests to the database + - Also there is no ideal distribution and the number of queries each worker runs is unspecified (but owing to low latencies from the database, that should be even) + - We do this to ensure that the distribution of queries executed by each "server" is skewed as it would be in the real world. +3. Once the target number of queries are reached, the workers notify that the task is complete. Each worker keeps track of how long it spent processing queries and this is also notified to the benchmark engine +4. The benchmark engine then computes relevant statistics diff --git a/sky-bench/src/args.rs b/sky-bench/src/args.rs index 1a8a63aa..31828ce2 100644 --- a/sky-bench/src/args.rs +++ b/sky-bench/src/args.rs @@ -26,10 +26,8 @@ use { crate::error::{BenchError, BenchResult}, - std::{ - collections::hash_map::{Entry, HashMap}, - env, - }, + libsky::CliAction, + std::collections::hash_map::HashMap, }; const TXT_HELP: &str = include_str!("../help_text/help"); @@ -77,47 +75,12 @@ impl BenchConfig { } fn load_env() -> BenchResult { - let mut args = HashMap::new(); - let mut it = env::args().skip(1).into_iter(); - while let Some(arg) = it.next() { - let (arg, arg_val) = match arg.as_str() { - "--help" => return Ok(TaskInner::HelpMsg(TXT_HELP.into())), - "--version" => { - return Ok(TaskInner::HelpMsg(format!( - "sky-bench v{}", - libsky::VERSION - ))) - } - _ if arg.starts_with("--") => match it.next() { - Some(arg_val) => (arg, arg_val), - None => { - // self contained? - let split: Vec<&str> = arg.split("=").collect(); - if split.len() != 2 { - return Err(BenchError::ArgsErr(format!("expected value for {arg}"))); - } - (split[0].into(), split[1].into()) - } - }, - unknown_arg => { - return Err(BenchError::ArgsErr(format!( - "unknown argument: {unknown_arg}" - ))) - } - }; - match args.entry(arg) { - Entry::Occupied(oe) => { - return Err(BenchError::ArgsErr(format!( - "found duplicate values for {}", - oe.key() - ))) - } - Entry::Vacant(ve) => { - ve.insert(arg_val); - } - } + let action = libsky::parse_cli_args_disallow_duplicate()?; + match action { + CliAction::Help => Ok(TaskInner::HelpMsg(TXT_HELP.into())), + CliAction::Version => Ok(TaskInner::HelpMsg(libsky::version_msg("sky-bench"))), + CliAction::Action(a) => Ok(TaskInner::CheckConfig(a)), } - Ok(TaskInner::CheckConfig(args)) } fn cdig(n: usize) -> usize { @@ -176,7 +139,7 @@ pub fn parse() -> BenchResult { }; // threads let thread_count = match args.remove("--threads") { - None => num_cpus::get(), + None => num_cpus::get_physical(), Some(tc) => match tc.parse() { Ok(tc) if tc > 0 => tc, Err(_) | Ok(_) => { diff --git a/sky-bench/src/bench.rs b/sky-bench/src/bench.rs index 651b30c6..7325734c 100644 --- a/sky-bench/src/bench.rs +++ b/sky-bench/src/bench.rs @@ -24,136 +24,157 @@ * */ -use crate::error::BenchResult; - use { crate::{ args::BenchConfig, - error::{self, BenchmarkTaskWorkerError}, - pool::{RuntimeStats, Taskpool, ThreadedTask}, + error::{self, BenchResult}, + runtime::{BombardPool, RuntimeStats, ThreadedBombardTask}, }, - skytable::{query, response::Response, Config, Connection, Query}, - std::time::Instant, + skytable::{error::Error, query, response::Response, Config, Connection, Query}, + std::{fmt, time::Instant}, }; +/* + task impl +*/ + +/// A bombard task used for benchmarking + #[derive(Debug)] -pub struct BenchmarkTask { - cfg: Config, +pub struct BombardTask { + config: Config, } -impl BenchmarkTask { - pub fn new(host: &str, port: u16, username: &str, password: &str) -> Self { +impl BombardTask { + pub fn new(config: Config) -> Self { + Self { config } + } +} + +#[derive(Debug, Clone)] +pub enum BombardTaskKind { + Insert(u8), + Update, + Delete, +} + +#[derive(Debug, Clone)] +pub struct BombardTaskSpec { + kind: BombardTaskKind, + base_query: String, + pk_len: usize, +} + +impl BombardTaskSpec { + pub fn insert(base_query: String, pk_len: usize, second_column: u8) -> Self { Self { - cfg: Config::new(host, port, username, password), + kind: BombardTaskKind::Insert(second_column), + base_query, + pk_len, + } + } + pub fn update(base_query: String, pk_len: usize) -> Self { + Self { + kind: BombardTaskKind::Update, + base_query, + pk_len, + } + } + pub fn delete(base_query: String, pk_len: usize) -> Self { + Self { + kind: BombardTaskKind::Delete, + base_query, + pk_len, + } + } + fn generate(&self, current: u64) -> (Query, Response) { + let mut q = query!(&self.base_query); + let resp = match self.kind { + BombardTaskKind::Insert(second_column) => { + q.push_param(format!("{:0>width$}", current, width = self.pk_len)); + q.push_param(second_column); + Response::Empty + } + BombardTaskKind::Update => { + q.push_param(1u64); + q.push_param(format!("{:0>width$}", current, width = self.pk_len)); + Response::Empty + } + BombardTaskKind::Delete => { + q.push_param(format!("{:0>width$}", current, width = self.pk_len)); + Response::Empty + } + }; + (q, resp) + } +} + +/// Errors while running a bombard +#[derive(Debug)] +pub enum BombardTaskError { + DbError(Error), + Mismatch, +} + +impl fmt::Display for BombardTaskError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::DbError(e) => write!(f, "a bombard subtask failed with {e}"), + Self::Mismatch => write!(f, "got unexpected response for bombard subtask"), } } } -impl ThreadedTask for BenchmarkTask { - type TaskWorker = Connection; - type TaskWorkerInitError = BenchmarkTaskWorkerError; - type TaskWorkerTerminateError = BenchmarkTaskWorkerError; - type TaskWorkerWorkError = BenchmarkTaskWorkerError; - type TaskInput = (Query, Response); - fn initialize_worker(&self) -> Result { - self.cfg.connect().map_err(Into::into) +impl From for BombardTaskError { + fn from(dbe: Error) -> Self { + Self::DbError(dbe) + } +} + +impl ThreadedBombardTask for BombardTask { + type Worker = Connection; + type WorkerTask = (Query, Response); + type WorkerTaskSpec = BombardTaskSpec; + type WorkerInitError = Error; + type WorkerTaskError = BombardTaskError; + fn worker_init(&self) -> Result { + self.config.connect() } - fn drive_worker_timed( - worker: &mut Self::TaskWorker, - (query, expected_resp): Self::TaskInput, - ) -> Result<(Instant, Instant), Self::TaskWorkerWorkError> { + fn generate_task(spec: &Self::WorkerTaskSpec, current: u64) -> Self::WorkerTask { + spec.generate(current) + } + fn worker_drive_timed( + worker: &mut Self::Worker, + (query, response): Self::WorkerTask, + ) -> Result { let start = Instant::now(); - let resp = worker.query(&query)?; + let ret = worker.query(&query)?; let stop = Instant::now(); - if resp == expected_resp { - Ok((start, stop)) + if ret == response { + Ok(stop.duration_since(start).as_nanos()) } else { - Err(BenchmarkTaskWorkerError::Error(format!( - "response from server did not match expected response: {:?}", - resp - ))) + Err(BombardTaskError::Mismatch) } } - fn terminate_worker( - &self, - _: &mut Self::TaskWorker, - ) -> Result<(), Self::TaskWorkerTerminateError> { - Ok(()) - } } +/* + runner +*/ + pub fn run(bench: BenchConfig) -> error::BenchResult<()> { - let bench_config = BenchmarkTask::new(&bench.host, bench.port, "root", &bench.root_pass); + let bench_config = BombardTask::new(Config::new( + &bench.host, + bench.port, + "root", + &bench.root_pass, + )); info!("running preliminary checks and creating model `bench.bench` with definition: `{{un: string, pw: uint8}}`"); - let mut main_thread_db = bench_config.cfg.connect()?; + let mut main_thread_db = bench_config.config.connect()?; main_thread_db.query_parse::<()>(&query!("create space bench"))?; main_thread_db.query_parse::<()>(&query!("create model bench.bench(un: string, pw: uint8)"))?; - info!( - "initializing connection pool with {} connections", - bench.threads - ); - let mut p = Taskpool::new(bench.threads, bench_config)?; - info!( - "pool initialized successfully. preparing {} `INSERT` queries with primary key size={} bytes", - bench.query_count, bench.key_size - ); - let mut insert_stats = Default::default(); - let mut update_stats = Default::default(); - let mut delete_stats = Default::default(); - match || -> BenchResult<()> { - // bench insert - let insert_queries: Vec<(Query, Response)> = (0..bench.query_count) - .into_iter() - .map(|i| { - ( - query!( - "insert into bench.bench(?, ?)", - format!("{:0>width$}", i, width = bench.key_size), - 0u64 - ), - Response::Empty, - ) - }) - .collect(); - info!("benchmarking `INSERT` queries"); - insert_stats = p.blocking_bombard(insert_queries)?; - // bench update - info!("completed benchmarking `INSERT`. preparing `UPDATE` queries"); - let update_queries: Vec<(Query, Response)> = (0..bench.query_count) - .into_iter() - .map(|i| { - ( - query!( - "update bench.bench set pw += ? where un = ?", - 1u64, - format!("{:0>width$}", i, width = bench.key_size), - ), - Response::Empty, - ) - }) - .collect(); - info!("benchmarking `UPDATE` queries"); - update_stats = p.blocking_bombard(update_queries)?; - // bench delete - info!("completed benchmarking `UPDATE`. preparing `DELETE` queries"); - let delete_queries: Vec<(Query, Response)> = (0..bench.query_count) - .into_iter() - .map(|i| { - ( - query!( - "delete from bench.bench where un = ?", - format!("{:0>width$}", i, width = bench.key_size), - ), - Response::Empty, - ) - }) - .collect(); - info!("benchmarking `DELETE` queries"); - delete_stats = p.blocking_bombard(delete_queries)?; - info!("completed benchmarking `DELETE` queries"); - Ok(()) - }() { - Ok(()) => {} + let stats = match bench_internal(bench_config, bench) { + Ok(stats) => stats, Err(e) => { error!("benchmarking failed. attempting to clean up"); match cleanup(main_thread_db) { @@ -164,20 +185,18 @@ pub fn run(bench: BenchConfig) -> error::BenchResult<()> { } } } - } - drop(p); + }; warn!("benchmarks might appear to be slower. this tool is currently experimental"); // print results - info!("results:"); - print_table(vec![ - ("INSERT", insert_stats), - ("UPDATE", update_stats), - ("DELETE", delete_stats), - ]); + print_table(stats); cleanup(main_thread_db)?; Ok(()) } +/* + util +*/ + fn cleanup(mut main_thread_db: Connection) -> Result<(), error::BenchError> { trace!("dropping space and table"); main_thread_db.query_parse::<()>(&query!("drop model bench.bench"))?; @@ -195,22 +214,51 @@ fn print_table(data: Vec<(&'static str, RuntimeStats)>) { println!( "+---------+--------------------------+-----------------------+------------------------+" ); - for ( - query, - RuntimeStats { - qps, - avg_per_query_ns: _, - head_ns, - tail_ns, - }, - ) in data - { + for (query, RuntimeStats { qps, head, tail }) in data { println!( "| {:<7} | {:>24.2} | {:>21} | {:>22} |", - query, qps, tail_ns, head_ns + query, qps, tail, head ); } println!( "+---------+--------------------------+-----------------------+------------------------+" ); } + +/* + bench runner +*/ + +fn bench_internal( + config: BombardTask, + bench: BenchConfig, +) -> BenchResult> { + let mut ret = vec![]; + // initialize pool + info!("initializing connection pool"); + let mut pool = BombardPool::new(bench.threads, config)?; + // bench INSERT + info!("benchmarking `INSERT`"); + let insert = BombardTaskSpec::insert("insert into bench.bench(?, ?)".into(), bench.key_size, 0); + let insert_stats = pool.blocking_bombard(insert, bench.query_count)?; + ret.push(("INSERT", insert_stats)); + // bench UPDATE + info!("benchmarking `UPDATE`"); + let update = BombardTaskSpec::update( + "update bench.bench set pw += ? where un = ?".into(), + bench.key_size, + ); + let update_stats = pool.blocking_bombard(update, bench.query_count)?; + ret.push(("UPDATE", update_stats)); + // bench DELETE + info!("benchmarking `DELETE`"); + let delete = BombardTaskSpec::delete( + "delete from bench.bench where un = ?".into(), + bench.key_size, + ); + let delete_stats = pool.blocking_bombard(delete, bench.query_count)?; + ret.push(("DELETE", delete_stats)); + info!("completed benchmarks. closing pool"); + drop(pool); + Ok(ret) +} diff --git a/sky-bench/src/error.rs b/sky-bench/src/error.rs index 0fe3fcb6..6cf5caf9 100644 --- a/sky-bench/src/error.rs +++ b/sky-bench/src/error.rs @@ -25,7 +25,7 @@ */ use { - crate::{bench::BenchmarkTask, pool::TaskpoolError}, + crate::{bench::BombardTask, runtime::BombardError}, core::fmt, skytable::error::Error, }; @@ -35,13 +35,20 @@ pub type BenchResult = Result; #[derive(Debug)] pub enum BenchError { ArgsErr(String), - BenchError(TaskpoolError), + BenchBombardError(BombardError), DirectDbError(Error), } -impl From> for BenchError { - fn from(e: TaskpoolError) -> Self { - Self::BenchError(e) +impl From for BenchError { + fn from(e: libsky::ArgParseError) -> Self { + match e { + libsky::ArgParseError::Duplicate(d) => { + Self::ArgsErr(format!("duplicate value for `{d}`")) + } + libsky::ArgParseError::MissingValue(m) => { + Self::ArgsErr(format!("missing value for `{m}`")) + } + } } } @@ -51,12 +58,18 @@ impl From for BenchError { } } +impl From> for BenchError { + fn from(e: BombardError) -> Self { + Self::BenchBombardError(e) + } +} + impl fmt::Display for BenchError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { Self::ArgsErr(e) => write!(f, "args error: {e}"), - Self::BenchError(e) => write!(f, "benchmark system error: {e}"), Self::DirectDbError(e) => write!(f, "direct operation on db failed. {e}"), + Self::BenchBombardError(e) => write!(f, "benchmark failed: {e}"), } } } @@ -66,7 +79,6 @@ impl std::error::Error for BenchError {} #[derive(Debug)] pub enum BenchmarkTaskWorkerError { DbError(Error), - Error(String), } impl From for BenchmarkTaskWorkerError { @@ -79,7 +91,6 @@ impl fmt::Display for BenchmarkTaskWorkerError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { Self::DbError(e) => write!(f, "worker failed due to DB error. {e}"), - Self::Error(e) => write!(f, "worker failed. {e}"), } } } diff --git a/sky-bench/src/main.rs b/sky-bench/src/main.rs index 27dfc5ae..07e5f522 100644 --- a/sky-bench/src/main.rs +++ b/sky-bench/src/main.rs @@ -29,7 +29,7 @@ extern crate log; mod args; mod bench; mod error; -mod pool; +mod runtime; fn main() { env_logger::Builder::new() diff --git a/sky-bench/src/pool.rs b/sky-bench/src/pool.rs deleted file mode 100644 index 0b6e13e4..00000000 --- a/sky-bench/src/pool.rs +++ /dev/null @@ -1,279 +0,0 @@ -/* - * Created on Fri Nov 17 2023 - * - * This file is a part of Skytable - * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source - * NoSQL database written by Sayan Nandan ("the Author") with the - * vision to provide flexibility in data modelling without compromising - * on performance, queryability or scalability. - * - * Copyright (c) 2023, Sayan Nandan - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * -*/ - -use { - crossbeam_channel::{unbounded, Receiver, Sender}, - std::{ - fmt, - marker::PhantomData, - thread::{self, JoinHandle}, - time::Instant, - }, -}; - -pub type TaskPoolResult = Result>; - -#[derive(Debug)] -pub enum TaskpoolError { - InitError(Th::TaskWorkerInitError), - BombardError(&'static str), - WorkerError(Th::TaskWorkerWorkError), -} - -impl fmt::Display for TaskpoolError -where - Th::TaskWorkerInitError: fmt::Display, - Th::TaskWorkerWorkError: fmt::Display, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Self::InitError(e) => write!(f, "failed to init worker pool. {e}"), - Self::BombardError(e) => write!(f, "failed to post work to pool. {e}"), - Self::WorkerError(e) => write!(f, "failed running worker task. {e}"), - } - } -} - -pub trait ThreadedTask: Send + Sync + 'static { - /// the per-thread item that does the actual work - /// - /// NB: this is not to be confused with the actual underlying thread pool worker - type TaskWorker: Send + Sync; - /// when attempting initialization of the per-thread task worker, if an error is thrown, this is the type - /// you're looking for - type TaskWorkerInitError: Send + Sync; - /// when attempting to run a single unit of work, if any error occurs this is the error type that is to be returned - type TaskWorkerWorkError: Send + Sync; - /// when attempting to close a worker, if an error occurs this is the error type that is returned - type TaskWorkerTerminateError: Send + Sync; - /// the task that is sent to each worker - type TaskInput: Send + Sync; - // fn - /// initialize the worker - fn initialize_worker(&self) -> Result; - /// drive the worker to complete a task and return the time - fn drive_worker_timed( - worker: &mut Self::TaskWorker, - task: Self::TaskInput, - ) -> Result<(Instant, Instant), Self::TaskWorkerWorkError>; - fn terminate_worker( - &self, - worker: &mut Self::TaskWorker, - ) -> Result<(), Self::TaskWorkerTerminateError>; -} - -#[derive(Debug)] -struct ThreadWorker { - handle: JoinHandle<()>, - _m: PhantomData, -} - -#[derive(Debug)] -enum WorkerTask { - Task(Th::TaskInput), - Exit, -} - -impl ThreadWorker { - fn new( - hl_worker: Th::TaskWorker, - task_rx: Receiver>, - res_tx: Sender>, - ) -> Self { - Self { - handle: thread::spawn(move || { - let mut worker = hl_worker; - loop { - let task = match task_rx.recv().unwrap() { - WorkerTask::Exit => { - drop(task_rx); - return; - } - WorkerTask::Task(t) => t, - }; - res_tx - .send(Th::drive_worker_timed(&mut worker, task)) - .unwrap(); - } - }), - _m: PhantomData, - } - } -} - -#[derive(Debug)] -pub struct Taskpool { - workers: Vec>, - _config: Th, - task_tx: Sender>, - res_rx: Receiver>, - record_real_start: Instant, - record_real_stop: Instant, - stat_run_avg_ns: f64, - stat_run_tail_ns: u128, - stat_run_head_ns: u128, -} - -// TODO(@ohsayan): prepare histogram for report; for now there's no use of the head and tail latencies -#[derive(Default, Debug)] -pub struct RuntimeStats { - pub qps: f64, - pub avg_per_query_ns: f64, - pub head_ns: u128, - pub tail_ns: u128, -} - -impl Taskpool { - pub fn stat_avg(&self) -> f64 { - self.stat_run_avg_ns - } - pub fn stat_tail(&self) -> u128 { - self.stat_run_tail_ns - } - pub fn stat_head(&self) -> u128 { - self.stat_run_head_ns - } - pub fn stat_elapsed(&self) -> u128 { - self.record_real_stop - .duration_since(self.record_real_start) - .as_nanos() - } -} - -fn qps(query_count: usize, time_taken_in_nanos: u128) -> f64 { - const NANOS_PER_SECOND: u128 = 1_000_000_000; - let time_taken_in_nanos_f64 = time_taken_in_nanos as f64; - let query_count_f64 = query_count as f64; - (query_count_f64 / time_taken_in_nanos_f64) * NANOS_PER_SECOND as f64 -} - -impl Taskpool { - pub fn new(size: usize, config: Th) -> TaskPoolResult { - let (task_tx, task_rx) = unbounded(); - let (res_tx, res_rx) = unbounded(); - let mut workers = Vec::with_capacity(size); - for _ in 0..size { - let con = config - .initialize_worker() - .map_err(TaskpoolError::InitError)?; - workers.push(ThreadWorker::new(con, task_rx.clone(), res_tx.clone())); - } - Ok(Self { - workers, - _config: config, - task_tx, - res_rx, - stat_run_avg_ns: 0.0, - record_real_start: Instant::now(), - record_real_stop: Instant::now(), - stat_run_head_ns: u128::MAX, - stat_run_tail_ns: u128::MIN, - }) - } - pub fn blocking_bombard( - &mut self, - vec: Vec, - ) -> TaskPoolResult { - let expected = vec.len(); - let mut received = 0usize; - for task in vec { - match self.task_tx.send(WorkerTask::Task(task)) { - Ok(()) => {} - Err(_) => { - // stop bombarding, we hit an error - return Err(TaskpoolError::BombardError( - "all worker threads exited. this indicates a catastrophic failure", - )); - } - } - } - while received != expected { - match self.res_rx.recv() { - Err(_) => { - // all workers exited. that is catastrophic - return Err(TaskpoolError::BombardError( - "detected all worker threads crashed during run check", - )); - } - Ok(r) => self.recompute_stats(&mut received, r)?, - }; - } - // compute stats - let ret = Ok(RuntimeStats { - qps: qps(received, self.stat_elapsed()), - avg_per_query_ns: self.stat_avg(), - head_ns: self.stat_head(), - tail_ns: self.stat_tail(), - }); - // reset stats - self.stat_run_avg_ns = 0.0; - self.record_real_start = Instant::now(); - self.record_real_stop = Instant::now(); - self.stat_run_head_ns = u128::MAX; - self.stat_run_tail_ns = u128::MIN; - // return - ret - } - fn recompute_stats( - &mut self, - received: &mut usize, - result: Result<(Instant, Instant), ::TaskWorkerWorkError>, - ) -> Result<(), TaskpoolError> { - *received += 1; - let (start, stop) = match result { - Ok(time) => time, - Err(e) => return Err(TaskpoolError::WorkerError(e)), - }; - // adjust real start - if start < self.record_real_start { - self.record_real_start = start; - } - if stop > self.record_real_stop { - self.record_real_stop = stop; - } - let current_time = stop.duration_since(start).as_nanos(); - self.stat_run_avg_ns = self.stat_run_avg_ns - + ((current_time as f64 - self.stat_run_avg_ns) / *received as f64); - if current_time > self.stat_run_tail_ns { - self.stat_run_tail_ns = current_time; - } - if current_time < self.stat_run_head_ns { - self.stat_run_head_ns = current_time; - } - Ok(()) - } -} - -impl Drop for Taskpool { - fn drop(&mut self) { - for _ in 0..self.workers.len() { - self.task_tx.send(WorkerTask::Exit).unwrap(); - } - for worker in self.workers.drain(..) { - worker.handle.join().unwrap() - } - } -} diff --git a/sky-bench/src/runtime.rs b/sky-bench/src/runtime.rs new file mode 100644 index 00000000..4386e1a9 --- /dev/null +++ b/sky-bench/src/runtime.rs @@ -0,0 +1,406 @@ +/* + * Created on Sun Nov 19 2023 + * + * This file is a part of Skytable + * Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source + * NoSQL database written by Sayan Nandan ("the Author") with the + * vision to provide flexibility in data modelling without compromising + * on performance, queryability or scalability. + * + * Copyright (c) 2023, Sayan Nandan + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * +*/ + +use { + crossbeam_channel::{unbounded, Receiver, Sender}, + std::{ + fmt::{self, Display}, + sync::atomic::{AtomicBool, AtomicU64, Ordering}, + thread::{self, JoinHandle}, + time::{Duration, Instant}, + }, +}; + +pub type BombardResult = Result>; + +/* + state mgmt +*/ + +#[derive(Debug)] +/// The pool state. Be warned **ONLY ONE POOL AT A TIME!** +struct GPState { + current: AtomicU64, + state: AtomicBool, + occupied: AtomicBool, +} + +impl GPState { + #[inline(always)] + fn get() -> &'static Self { + static STATE: GPState = GPState::zero(); + &STATE + } + const fn zero() -> Self { + Self { + current: AtomicU64::new(0), + state: AtomicBool::new(true), + occupied: AtomicBool::new(false), + } + } + fn occupy(&self) { + assert!(!self.occupied.swap(true, Ordering::Release)); + } + fn vacate(&self) { + assert!(self.occupied.swap(false, Ordering::Release)); + } + fn guard(f: impl FnOnce() -> T) -> T { + let slf = Self::get(); + slf.occupy(); + let ret = f(); + slf.vacate(); + ret + } + fn post_failure(&self) { + self.state.store(false, Ordering::Release) + } + fn post_target(&self, target: u64) { + self.current.store(target, Ordering::Release) + } + /// WARNING: this is not atomic! only sensible to run a quiescent state + fn post_reset(&self) { + self.current.store(0, Ordering::Release); + self.state.store(true, Ordering::Release); + } + fn update_target(&self) -> u64 { + let mut current = self.current.load(Ordering::Acquire); + loop { + if current == 0 { + return 0; + } + match self.current.compare_exchange( + current, + current - 1, + Ordering::Release, + Ordering::Acquire, + ) { + Ok(last) => { + return last; + } + Err(new) => { + current = new; + } + } + } + } + fn load_okay(&self) -> bool { + self.state.load(Ordering::Acquire) + } +} + +/* + task spec +*/ + +/// A threaded bombard task specification which drives a global pool of threads towards a common goal +pub trait ThreadedBombardTask: Send + Sync + 'static { + /// The per-task worker that is initialized once in every thread (not to be confused with the actual thread worker!) + type Worker: Send + Sync; + /// The task that the [`ThreadedBombardTask::TaskWorker`] performs + type WorkerTask: Send + Sync; + type WorkerTaskSpec: Clone + Send + Sync + 'static; + /// Errors while running a task + type WorkerTaskError: Send + Sync; + /// Errors while initializing a task worker + type WorkerInitError: Send + Sync; + /// Initialize a task worker + fn worker_init(&self) -> Result; + fn generate_task(spec: &Self::WorkerTaskSpec, current: u64) -> Self::WorkerTask; + /// Drive a single subtask + fn worker_drive_timed( + worker: &mut Self::Worker, + task: Self::WorkerTask, + ) -> Result; +} + +/* + worker +*/ + +#[derive(Debug)] +enum WorkerResult { + Completed(WorkerLocalStats), + Errored(Bt::WorkerTaskError), +} + +#[derive(Debug)] +struct WorkerLocalStats { + start: Instant, + elapsed: u128, + head: u128, + tail: u128, +} + +impl WorkerLocalStats { + fn new(start: Instant, elapsed: u128, head: u128, tail: u128) -> Self { + Self { + start, + elapsed, + head, + tail, + } + } +} + +#[derive(Debug)] +enum WorkerTask { + Task(Bt::WorkerTaskSpec), + Exit, +} + +#[derive(Debug)] +struct Worker { + handle: JoinHandle<()>, +} + +impl Worker { + fn start( + id: usize, + driver: Bt::Worker, + rx_work: Receiver>, + tx_res: Sender>, + ) -> Self { + Self { + handle: thread::Builder::new() + .name(format!("worker-{id}")) + .spawn(move || { + let mut worker_driver = driver; + 'blocking_wait: loop { + let task = match rx_work.recv().unwrap() { + WorkerTask::Exit => return, + WorkerTask::Task(spec) => spec, + }; + // check global state + let mut global_okay = GPState::get().load_okay(); + let mut global_position = GPState::get().update_target(); + // init local state + let mut local_start = None; + let mut local_elapsed = 0u128; + let mut local_head = u128::MAX; + let mut local_tail = 0; + // bombard + while (global_position != 0) & global_okay { + let task = Bt::generate_task(&task, global_position); + if local_start.is_none() { + local_start = Some(Instant::now()); + } + let this_elapsed = + match Bt::worker_drive_timed(&mut worker_driver, task) { + Ok(elapsed) => elapsed, + Err(e) => { + GPState::get().post_failure(); + tx_res.send(WorkerResult::Errored(e)).unwrap(); + continue 'blocking_wait; + } + }; + local_elapsed += this_elapsed; + if this_elapsed < local_head { + local_head = this_elapsed; + } + if this_elapsed > local_tail { + local_tail = this_elapsed; + } + global_position = GPState::get().update_target(); + global_okay = GPState::get().load_okay(); + } + if global_okay { + // we're done + tx_res + .send(WorkerResult::Completed(WorkerLocalStats::new( + local_start.unwrap(), + local_elapsed, + local_head, + local_tail, + ))) + .unwrap(); + } + } + }) + .expect("failed to start thread"), + } + } +} + +/* + pool +*/ + +#[derive(Debug)] +pub enum BombardError { + InitError(Bt::WorkerInitError), + WorkerTaskError(Bt::WorkerTaskError), + AllWorkersOffline, +} + +impl fmt::Display for BombardError +where + Bt::WorkerInitError: fmt::Display, + Bt::WorkerTaskError: Display, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::AllWorkersOffline => write!( + f, + "bombard failed because all workers went offline indicating catastrophic failure" + ), + Self::WorkerTaskError(e) => write!(f, "worker task failed. {e}"), + Self::InitError(e) => write!(f, "worker init failed. {e}"), + } + } +} + +#[derive(Debug)] +pub struct RuntimeStats { + pub qps: f64, + pub head: u128, + pub tail: u128, +} + +#[derive(Debug)] +pub struct BombardPool { + workers: Vec<(Worker, Sender>)>, + rx_res: Receiver>, + _config: Bt, +} + +impl BombardPool { + fn qps(query_count: usize, time_taken_in_nanos: u128) -> f64 { + const NANOS_PER_SECOND: u128 = 1_000_000_000; + let time_taken_in_nanos_f64 = time_taken_in_nanos as f64; + let query_count_f64 = query_count as f64; + (query_count_f64 / time_taken_in_nanos_f64) * NANOS_PER_SECOND as f64 + } + pub fn new(size: usize, config: Bt) -> BombardResult { + assert_ne!(size, 0, "pool can't be empty"); + let mut workers = Vec::with_capacity(size); + let (tx_res, rx_res) = unbounded(); + for id in 0..size { + let (tx_work, rx_work) = unbounded(); + let driver = config.worker_init().map_err(BombardError::InitError)?; + workers.push((Worker::start(id, driver, rx_work, tx_res.clone()), tx_work)); + } + Ok(Self { + workers, + rx_res, + _config: config, + }) + } + /// Bombard queries to the workers + pub fn blocking_bombard( + &mut self, + task_description: Bt::WorkerTaskSpec, + count: usize, + ) -> BombardResult { + GPState::guard(|| { + GPState::get().post_target(count as _); + let mut global_start = None; + let mut global_stop = None; + let mut global_head = u128::MAX; + let mut global_tail = 0u128; + let messages: Vec<::WorkerTaskSpec> = + (0..self.workers.len()) + .into_iter() + .map(|_| task_description.clone()) + .collect(); + for ((_, sender), msg) in self.workers.iter().zip(messages) { + sender.send(WorkerTask::Task(msg)).unwrap(); + } + // wait for all workers to complete + let mut received = 0; + while received != self.workers.len() { + let results = match self.rx_res.recv() { + Err(_) => return Err(BombardError::AllWorkersOffline), + Ok(r) => r, + }; + let WorkerLocalStats { + start: this_start, + elapsed, + head, + tail, + } = match results { + WorkerResult::Completed(r) => r, + WorkerResult::Errored(e) => return Err(BombardError::WorkerTaskError(e)), + }; + // update start if required + match global_start.as_mut() { + None => { + global_start = Some(this_start); + } + Some(start) => { + if this_start < *start { + *start = this_start; + } + } + } + let this_task_stopped_at = + this_start + Duration::from_nanos(elapsed.try_into().unwrap()); + match global_stop.as_mut() { + None => { + global_stop = Some(this_task_stopped_at); + } + Some(stop) => { + if this_task_stopped_at > *stop { + // this task stopped later than the previous one + *stop = this_task_stopped_at; + } + } + } + if head < global_head { + global_head = head; + } + if tail > global_tail { + global_tail = tail; + } + received += 1; + } + // reset global pool state + GPState::get().post_reset(); + // compute results + let global_elapsed = global_stop + .unwrap() + .duration_since(global_start.unwrap()) + .as_nanos(); + Ok(RuntimeStats { + qps: Self::qps(count, global_elapsed), + head: global_head, + tail: global_tail, + }) + }) + } +} + +impl Drop for BombardPool { + fn drop(&mut self) { + info!("taking all workers offline"); + for (_, sender) in self.workers.iter() { + sender.send(WorkerTask::Exit).unwrap(); + } + for (worker, _) in self.workers.drain(..) { + worker.handle.join().unwrap(); + } + info!("all workers now offline"); + } +}