Implement new benchmark engine

next
Sayan Nandan 10 months ago
parent daf0f32c30
commit 250a2b3c16
No known key found for this signature in database
GPG Key ID: 42EEDF4AE9D96B54

@ -10,5 +10,6 @@ edition = "2021"
# internal deps
libsky = { path = "../libsky" }
skytable = { git = "https://github.com/skytable/client-rust.git", branch = "octave" }
# external deps
crossterm = "0.27.0"
rustyline = "12.0.0"

@ -30,9 +30,10 @@ use {
event::{self, Event, KeyCode, KeyEvent},
terminal,
},
libsky::CliAction,
std::{
collections::{hash_map::Entry, HashMap},
env, fs,
collections::HashMap,
fs,
io::{self, Write},
process::exit,
},
@ -75,42 +76,12 @@ enum TaskInner {
}
fn load_env() -> CliResult<TaskInner> {
let mut args = HashMap::new();
let mut it = env::args().skip(1).into_iter();
while let Some(arg) = it.next() {
let (arg, arg_val) = match arg.as_str() {
"--help" => return Ok(TaskInner::HelpMsg(TXT_HELP.into())),
"--version" => return Ok(TaskInner::HelpMsg(format!("skysh v{}", libsky::VERSION))),
_ if arg.starts_with("--") => match it.next() {
Some(arg_val) => (arg, arg_val),
None => {
// self contained?
let split: Vec<&str> = arg.split("=").collect();
if split.len() != 2 {
return Err(CliError::ArgsErr(format!("expected value for {arg}")));
}
(split[0].into(), split[1].into())
}
},
unknown_arg => {
return Err(CliError::ArgsErr(format!(
"unknown argument: {unknown_arg}"
)))
}
};
match args.entry(arg) {
Entry::Occupied(oe) => {
return Err(CliError::ArgsErr(format!(
"found duplicate values for {}",
oe.key()
)))
}
Entry::Vacant(ve) => {
ve.insert(arg_val);
}
}
let action = libsky::parse_cli_args_disallow_duplicate()?;
match action {
CliAction::Help => Ok(TaskInner::HelpMsg(TXT_HELP.into())),
CliAction::Version => Ok(TaskInner::HelpMsg(libsky::version_msg("skysh"))),
CliAction::Action(a) => Ok(TaskInner::OpenShell(a)),
}
Ok(TaskInner::OpenShell(args))
}
pub fn parse() -> CliResult<Task> {

@ -36,6 +36,19 @@ pub enum CliError {
IoError(std::io::Error),
}
impl From<libsky::ArgParseError> for CliError {
fn from(e: libsky::ArgParseError) -> Self {
match e {
libsky::ArgParseError::Duplicate(d) => {
Self::ArgsErr(format!("duplicate value for `{d}`"))
}
libsky::ArgParseError::MissingValue(m) => {
Self::ArgsErr(format!("missing value for `{m}`"))
}
}
}
}
impl From<skytable::error::Error> for CliError {
fn from(cle: skytable::error::Error) -> Self {
Self::ClientError(cle)

@ -31,9 +31,6 @@
//!
//! This contains modules which are shared by both the `cli` and the `server` modules
use std::error::Error;
/// A generic result
pub type TResult<T> = Result<T, Box<dyn Error>>;
/// The size of the read buffer in bytes
pub const BUF_CAP: usize = 8 * 1024; // 8 KB per-connection
/// The current version
@ -41,15 +38,130 @@ pub const VERSION: &str = env!("CARGO_PKG_VERSION");
/// The URL
pub const URL: &str = "https://github.com/skytable/skytable";
#[macro_export]
/// Don't use unwrap_or but use this macro as the optimizer fails to optimize away usages
/// of unwrap_or and creates a lot of LLVM IR bloat. use
// FIXME(@ohsayan): Fix this when https://github.com/rust-lang/rust/issues/68667 is addressed
macro_rules! option_unwrap_or {
($try:expr, $fallback:expr) => {
match $try {
Some(t) => t,
None => $fallback,
use std::{
collections::{hash_map::Entry, HashMap},
env,
};
/// Returns a formatted version message `{binary} vx.y.z`
pub fn version_msg(binary: &str) -> String {
format!("{binary} v{VERSION}")
}
#[derive(Debug, PartialEq)]
/// The CLI action that is expected to be performed
pub enum CliAction<A> {
/// Display the `--help` message
Help,
/// Dipslay the `--version`
Version,
/// Perform an action using the given args
Action(A),
}
pub type CliActionMulti = CliAction<HashMap<String, Vec<String>>>;
pub type CliActionSingle = CliAction<HashMap<String, String>>;
/*
generic cli arg parser
*/
#[derive(Debug, PartialEq)]
/// Argument parse error
pub enum AnyArgsParseError {
/// The value for the given argument was either incorrectly formatted or missing
MissingValue(String),
}
/// Parse CLI args, allowing duplicates (bucketing them)
pub fn parse_cli_args_allow_duplicate() -> Result<CliActionMulti, AnyArgsParseError> {
parse_args(env::args())
}
/// Parse args allowing and bucketing any duplicates
pub fn parse_args(
args: impl IntoIterator<Item = String>,
) -> Result<CliActionMulti, AnyArgsParseError> {
let mut ret: HashMap<String, Vec<String>> = HashMap::new();
let mut args = args.into_iter().skip(1).peekable();
while let Some(arg) = args.next() {
if arg == "--help" {
return Ok(CliAction::Help);
}
if arg == "--version" {
return Ok(CliAction::Version);
}
let (arg, value) = extract_arg(arg, &mut args).map_err(AnyArgsParseError::MissingValue)?;
match ret.get_mut(&arg) {
Some(values) => {
values.push(value);
}
None => {
ret.insert(arg, vec![value]);
}
}
}
Ok(CliAction::Action(ret))
}
/*
no duplicate arg parser
*/
#[derive(Debug, PartialEq)]
/// Argument parse error
pub enum ArgParseError {
/// The given argument had a duplicate value
Duplicate(String),
/// The given argument did not have an appropriate value
MissingValue(String),
}
/// Parse all non-repeating CLI arguments
pub fn parse_cli_args_disallow_duplicate() -> Result<CliActionSingle, ArgParseError> {
parse_args_deny_duplicate(env::args())
}
/// Parse all arguments but deny any duplicates
pub fn parse_args_deny_duplicate(
args: impl IntoIterator<Item = String>,
) -> Result<CliActionSingle, ArgParseError> {
let mut ret: HashMap<String, String> = HashMap::new();
let mut args = args.into_iter().skip(1).peekable();
while let Some(arg) = args.next() {
if arg == "--help" {
return Ok(CliAction::Help);
}
if arg == "--version" {
return Ok(CliAction::Version);
}
let (arg, value) = extract_arg(arg, &mut args).map_err(ArgParseError::MissingValue)?;
match ret.entry(arg) {
Entry::Vacant(v) => {
v.insert(value);
}
Entry::Occupied(oe) => return Err(ArgParseError::Duplicate(oe.key().into())),
}
}
Ok(CliAction::Action(ret))
}
/// Extract an argument:
/// - `--arg=value`
/// - `--arg value`
fn extract_arg(
arg: String,
args: &mut impl Iterator<Item = String>,
) -> Result<(String, String), String> {
let this_args: Vec<&str> = arg.split("=").collect();
let (arg, value) = if this_args.len() == 2 {
// self contained arg
(this_args[0].to_owned(), this_args[1].to_owned())
} else {
if this_args.len() == 1 {
match args.next() {
None => return Err(arg),
Some(val) => (arg, val),
}
} else {
return Err(arg);
}
};
Ok((arg, value))
}

@ -5,16 +5,10 @@ tool doesn't do anything "fancy" to make benchmarks appear better than they are.
Here's how the benchmark tool works (it's dead simple):
1. Depending on the configuration it launches "network pools" which are just thread pools where each worker
holds a persistent connection to the database (something like a connection pool)
2. A collection of unique, random keys are generated using a PRNG provided by the `rand` library that is
seeded using the OS' source of randomness. The values are allowed to repeat
3. The [Skytable Rust driver](https://github.com/skytable/client-rust) is used to generate _raw query packets_. To put it simply, the keys and values are turned into `Query` objects and then into the raw bytes that will be sent over the network. (This just makes it simpler to design the network pool interface)
4. For every type of benchmark (GET,SET,...) we use the network pool to send all the bytes and wait until we receive the expected response. We time how long it takes to send and receive the response for all the queries for the given test (aggregate)
5. We repeat this for all the remaining tests
6. We repeat the entire set of tests 5 times (by default, this can be changed).
7. We do the calculations and output the results.
## License
All files in this directory are distributed under the [AGPL-3.0 License](../LICENSE).
1. We start up some threads with each having a thread local connection to the database
2. Each thread attempts to keep running queries until the target number of queries is reached.
- This sort of simulates a real-world scenario where these threads are like your application servers sending requests to the database
- Also there is no ideal distribution and the number of queries each worker runs is unspecified (but owing to low latencies from the database, that should be even)
- We do this to ensure that the distribution of queries executed by each "server" is skewed as it would be in the real world.
3. Once the target number of queries are reached, the workers notify that the task is complete. Each worker keeps track of how long it spent processing queries and this is also notified to the benchmark engine
4. The benchmark engine then computes relevant statistics

@ -26,10 +26,8 @@
use {
crate::error::{BenchError, BenchResult},
std::{
collections::hash_map::{Entry, HashMap},
env,
},
libsky::CliAction,
std::collections::hash_map::HashMap,
};
const TXT_HELP: &str = include_str!("../help_text/help");
@ -77,47 +75,12 @@ impl BenchConfig {
}
fn load_env() -> BenchResult<TaskInner> {
let mut args = HashMap::new();
let mut it = env::args().skip(1).into_iter();
while let Some(arg) = it.next() {
let (arg, arg_val) = match arg.as_str() {
"--help" => return Ok(TaskInner::HelpMsg(TXT_HELP.into())),
"--version" => {
return Ok(TaskInner::HelpMsg(format!(
"sky-bench v{}",
libsky::VERSION
)))
}
_ if arg.starts_with("--") => match it.next() {
Some(arg_val) => (arg, arg_val),
None => {
// self contained?
let split: Vec<&str> = arg.split("=").collect();
if split.len() != 2 {
return Err(BenchError::ArgsErr(format!("expected value for {arg}")));
}
(split[0].into(), split[1].into())
}
},
unknown_arg => {
return Err(BenchError::ArgsErr(format!(
"unknown argument: {unknown_arg}"
)))
}
};
match args.entry(arg) {
Entry::Occupied(oe) => {
return Err(BenchError::ArgsErr(format!(
"found duplicate values for {}",
oe.key()
)))
}
Entry::Vacant(ve) => {
ve.insert(arg_val);
}
}
let action = libsky::parse_cli_args_disallow_duplicate()?;
match action {
CliAction::Help => Ok(TaskInner::HelpMsg(TXT_HELP.into())),
CliAction::Version => Ok(TaskInner::HelpMsg(libsky::version_msg("sky-bench"))),
CliAction::Action(a) => Ok(TaskInner::CheckConfig(a)),
}
Ok(TaskInner::CheckConfig(args))
}
fn cdig(n: usize) -> usize {
@ -176,7 +139,7 @@ pub fn parse() -> BenchResult<Task> {
};
// threads
let thread_count = match args.remove("--threads") {
None => num_cpus::get(),
None => num_cpus::get_physical(),
Some(tc) => match tc.parse() {
Ok(tc) if tc > 0 => tc,
Err(_) | Ok(_) => {

@ -24,136 +24,157 @@
*
*/
use crate::error::BenchResult;
use {
crate::{
args::BenchConfig,
error::{self, BenchmarkTaskWorkerError},
pool::{RuntimeStats, Taskpool, ThreadedTask},
error::{self, BenchResult},
runtime::{BombardPool, RuntimeStats, ThreadedBombardTask},
},
skytable::{query, response::Response, Config, Connection, Query},
std::time::Instant,
skytable::{error::Error, query, response::Response, Config, Connection, Query},
std::{fmt, time::Instant},
};
/*
task impl
*/
/// A bombard task used for benchmarking
#[derive(Debug)]
pub struct BenchmarkTask {
cfg: Config,
pub struct BombardTask {
config: Config,
}
impl BenchmarkTask {
pub fn new(host: &str, port: u16, username: &str, password: &str) -> Self {
impl BombardTask {
pub fn new(config: Config) -> Self {
Self { config }
}
}
#[derive(Debug, Clone)]
pub enum BombardTaskKind {
Insert(u8),
Update,
Delete,
}
#[derive(Debug, Clone)]
pub struct BombardTaskSpec {
kind: BombardTaskKind,
base_query: String,
pk_len: usize,
}
impl BombardTaskSpec {
pub fn insert(base_query: String, pk_len: usize, second_column: u8) -> Self {
Self {
cfg: Config::new(host, port, username, password),
kind: BombardTaskKind::Insert(second_column),
base_query,
pk_len,
}
}
pub fn update(base_query: String, pk_len: usize) -> Self {
Self {
kind: BombardTaskKind::Update,
base_query,
pk_len,
}
}
pub fn delete(base_query: String, pk_len: usize) -> Self {
Self {
kind: BombardTaskKind::Delete,
base_query,
pk_len,
}
}
fn generate(&self, current: u64) -> (Query, Response) {
let mut q = query!(&self.base_query);
let resp = match self.kind {
BombardTaskKind::Insert(second_column) => {
q.push_param(format!("{:0>width$}", current, width = self.pk_len));
q.push_param(second_column);
Response::Empty
}
BombardTaskKind::Update => {
q.push_param(1u64);
q.push_param(format!("{:0>width$}", current, width = self.pk_len));
Response::Empty
}
BombardTaskKind::Delete => {
q.push_param(format!("{:0>width$}", current, width = self.pk_len));
Response::Empty
}
};
(q, resp)
}
}
/// Errors while running a bombard
#[derive(Debug)]
pub enum BombardTaskError {
DbError(Error),
Mismatch,
}
impl fmt::Display for BombardTaskError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::DbError(e) => write!(f, "a bombard subtask failed with {e}"),
Self::Mismatch => write!(f, "got unexpected response for bombard subtask"),
}
}
}
impl ThreadedTask for BenchmarkTask {
type TaskWorker = Connection;
type TaskWorkerInitError = BenchmarkTaskWorkerError;
type TaskWorkerTerminateError = BenchmarkTaskWorkerError;
type TaskWorkerWorkError = BenchmarkTaskWorkerError;
type TaskInput = (Query, Response);
fn initialize_worker(&self) -> Result<Self::TaskWorker, Self::TaskWorkerInitError> {
self.cfg.connect().map_err(Into::into)
impl From<Error> for BombardTaskError {
fn from(dbe: Error) -> Self {
Self::DbError(dbe)
}
}
impl ThreadedBombardTask for BombardTask {
type Worker = Connection;
type WorkerTask = (Query, Response);
type WorkerTaskSpec = BombardTaskSpec;
type WorkerInitError = Error;
type WorkerTaskError = BombardTaskError;
fn worker_init(&self) -> Result<Self::Worker, Self::WorkerInitError> {
self.config.connect()
}
fn drive_worker_timed(
worker: &mut Self::TaskWorker,
(query, expected_resp): Self::TaskInput,
) -> Result<(Instant, Instant), Self::TaskWorkerWorkError> {
fn generate_task(spec: &Self::WorkerTaskSpec, current: u64) -> Self::WorkerTask {
spec.generate(current)
}
fn worker_drive_timed(
worker: &mut Self::Worker,
(query, response): Self::WorkerTask,
) -> Result<u128, Self::WorkerTaskError> {
let start = Instant::now();
let resp = worker.query(&query)?;
let ret = worker.query(&query)?;
let stop = Instant::now();
if resp == expected_resp {
Ok((start, stop))
if ret == response {
Ok(stop.duration_since(start).as_nanos())
} else {
Err(BenchmarkTaskWorkerError::Error(format!(
"response from server did not match expected response: {:?}",
resp
)))
Err(BombardTaskError::Mismatch)
}
}
fn terminate_worker(
&self,
_: &mut Self::TaskWorker,
) -> Result<(), Self::TaskWorkerTerminateError> {
Ok(())
}
}
/*
runner
*/
pub fn run(bench: BenchConfig) -> error::BenchResult<()> {
let bench_config = BenchmarkTask::new(&bench.host, bench.port, "root", &bench.root_pass);
let bench_config = BombardTask::new(Config::new(
&bench.host,
bench.port,
"root",
&bench.root_pass,
));
info!("running preliminary checks and creating model `bench.bench` with definition: `{{un: string, pw: uint8}}`");
let mut main_thread_db = bench_config.cfg.connect()?;
let mut main_thread_db = bench_config.config.connect()?;
main_thread_db.query_parse::<()>(&query!("create space bench"))?;
main_thread_db.query_parse::<()>(&query!("create model bench.bench(un: string, pw: uint8)"))?;
info!(
"initializing connection pool with {} connections",
bench.threads
);
let mut p = Taskpool::new(bench.threads, bench_config)?;
info!(
"pool initialized successfully. preparing {} `INSERT` queries with primary key size={} bytes",
bench.query_count, bench.key_size
);
let mut insert_stats = Default::default();
let mut update_stats = Default::default();
let mut delete_stats = Default::default();
match || -> BenchResult<()> {
// bench insert
let insert_queries: Vec<(Query, Response)> = (0..bench.query_count)
.into_iter()
.map(|i| {
(
query!(
"insert into bench.bench(?, ?)",
format!("{:0>width$}", i, width = bench.key_size),
0u64
),
Response::Empty,
)
})
.collect();
info!("benchmarking `INSERT` queries");
insert_stats = p.blocking_bombard(insert_queries)?;
// bench update
info!("completed benchmarking `INSERT`. preparing `UPDATE` queries");
let update_queries: Vec<(Query, Response)> = (0..bench.query_count)
.into_iter()
.map(|i| {
(
query!(
"update bench.bench set pw += ? where un = ?",
1u64,
format!("{:0>width$}", i, width = bench.key_size),
),
Response::Empty,
)
})
.collect();
info!("benchmarking `UPDATE` queries");
update_stats = p.blocking_bombard(update_queries)?;
// bench delete
info!("completed benchmarking `UPDATE`. preparing `DELETE` queries");
let delete_queries: Vec<(Query, Response)> = (0..bench.query_count)
.into_iter()
.map(|i| {
(
query!(
"delete from bench.bench where un = ?",
format!("{:0>width$}", i, width = bench.key_size),
),
Response::Empty,
)
})
.collect();
info!("benchmarking `DELETE` queries");
delete_stats = p.blocking_bombard(delete_queries)?;
info!("completed benchmarking `DELETE` queries");
Ok(())
}() {
Ok(()) => {}
let stats = match bench_internal(bench_config, bench) {
Ok(stats) => stats,
Err(e) => {
error!("benchmarking failed. attempting to clean up");
match cleanup(main_thread_db) {
@ -164,20 +185,18 @@ pub fn run(bench: BenchConfig) -> error::BenchResult<()> {
}
}
}
}
drop(p);
};
warn!("benchmarks might appear to be slower. this tool is currently experimental");
// print results
info!("results:");
print_table(vec![
("INSERT", insert_stats),
("UPDATE", update_stats),
("DELETE", delete_stats),
]);
print_table(stats);
cleanup(main_thread_db)?;
Ok(())
}
/*
util
*/
fn cleanup(mut main_thread_db: Connection) -> Result<(), error::BenchError> {
trace!("dropping space and table");
main_thread_db.query_parse::<()>(&query!("drop model bench.bench"))?;
@ -195,22 +214,51 @@ fn print_table(data: Vec<(&'static str, RuntimeStats)>) {
println!(
"+---------+--------------------------+-----------------------+------------------------+"
);
for (
query,
RuntimeStats {
qps,
avg_per_query_ns: _,
head_ns,
tail_ns,
},
) in data
{
for (query, RuntimeStats { qps, head, tail }) in data {
println!(
"| {:<7} | {:>24.2} | {:>21} | {:>22} |",
query, qps, tail_ns, head_ns
query, qps, tail, head
);
}
println!(
"+---------+--------------------------+-----------------------+------------------------+"
);
}
/*
bench runner
*/
fn bench_internal(
config: BombardTask,
bench: BenchConfig,
) -> BenchResult<Vec<(&'static str, RuntimeStats)>> {
let mut ret = vec![];
// initialize pool
info!("initializing connection pool");
let mut pool = BombardPool::new(bench.threads, config)?;
// bench INSERT
info!("benchmarking `INSERT`");
let insert = BombardTaskSpec::insert("insert into bench.bench(?, ?)".into(), bench.key_size, 0);
let insert_stats = pool.blocking_bombard(insert, bench.query_count)?;
ret.push(("INSERT", insert_stats));
// bench UPDATE
info!("benchmarking `UPDATE`");
let update = BombardTaskSpec::update(
"update bench.bench set pw += ? where un = ?".into(),
bench.key_size,
);
let update_stats = pool.blocking_bombard(update, bench.query_count)?;
ret.push(("UPDATE", update_stats));
// bench DELETE
info!("benchmarking `DELETE`");
let delete = BombardTaskSpec::delete(
"delete from bench.bench where un = ?".into(),
bench.key_size,
);
let delete_stats = pool.blocking_bombard(delete, bench.query_count)?;
ret.push(("DELETE", delete_stats));
info!("completed benchmarks. closing pool");
drop(pool);
Ok(ret)
}

@ -25,7 +25,7 @@
*/
use {
crate::{bench::BenchmarkTask, pool::TaskpoolError},
crate::{bench::BombardTask, runtime::BombardError},
core::fmt,
skytable::error::Error,
};
@ -35,13 +35,20 @@ pub type BenchResult<T> = Result<T, BenchError>;
#[derive(Debug)]
pub enum BenchError {
ArgsErr(String),
BenchError(TaskpoolError<BenchmarkTask>),
BenchBombardError(BombardError<BombardTask>),
DirectDbError(Error),
}
impl From<TaskpoolError<BenchmarkTask>> for BenchError {
fn from(e: TaskpoolError<BenchmarkTask>) -> Self {
Self::BenchError(e)
impl From<libsky::ArgParseError> for BenchError {
fn from(e: libsky::ArgParseError) -> Self {
match e {
libsky::ArgParseError::Duplicate(d) => {
Self::ArgsErr(format!("duplicate value for `{d}`"))
}
libsky::ArgParseError::MissingValue(m) => {
Self::ArgsErr(format!("missing value for `{m}`"))
}
}
}
}
@ -51,12 +58,18 @@ impl From<Error> for BenchError {
}
}
impl From<BombardError<BombardTask>> for BenchError {
fn from(e: BombardError<BombardTask>) -> Self {
Self::BenchBombardError(e)
}
}
impl fmt::Display for BenchError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::ArgsErr(e) => write!(f, "args error: {e}"),
Self::BenchError(e) => write!(f, "benchmark system error: {e}"),
Self::DirectDbError(e) => write!(f, "direct operation on db failed. {e}"),
Self::BenchBombardError(e) => write!(f, "benchmark failed: {e}"),
}
}
}
@ -66,7 +79,6 @@ impl std::error::Error for BenchError {}
#[derive(Debug)]
pub enum BenchmarkTaskWorkerError {
DbError(Error),
Error(String),
}
impl From<Error> for BenchmarkTaskWorkerError {
@ -79,7 +91,6 @@ impl fmt::Display for BenchmarkTaskWorkerError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::DbError(e) => write!(f, "worker failed due to DB error. {e}"),
Self::Error(e) => write!(f, "worker failed. {e}"),
}
}
}

@ -29,7 +29,7 @@ extern crate log;
mod args;
mod bench;
mod error;
mod pool;
mod runtime;
fn main() {
env_logger::Builder::new()

@ -1,279 +0,0 @@
/*
* Created on Fri Nov 17 2023
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2023, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use {
crossbeam_channel::{unbounded, Receiver, Sender},
std::{
fmt,
marker::PhantomData,
thread::{self, JoinHandle},
time::Instant,
},
};
pub type TaskPoolResult<T, Th> = Result<T, TaskpoolError<Th>>;
#[derive(Debug)]
pub enum TaskpoolError<Th: ThreadedTask> {
InitError(Th::TaskWorkerInitError),
BombardError(&'static str),
WorkerError(Th::TaskWorkerWorkError),
}
impl<Th: ThreadedTask> fmt::Display for TaskpoolError<Th>
where
Th::TaskWorkerInitError: fmt::Display,
Th::TaskWorkerWorkError: fmt::Display,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::InitError(e) => write!(f, "failed to init worker pool. {e}"),
Self::BombardError(e) => write!(f, "failed to post work to pool. {e}"),
Self::WorkerError(e) => write!(f, "failed running worker task. {e}"),
}
}
}
pub trait ThreadedTask: Send + Sync + 'static {
/// the per-thread item that does the actual work
///
/// NB: this is not to be confused with the actual underlying thread pool worker
type TaskWorker: Send + Sync;
/// when attempting initialization of the per-thread task worker, if an error is thrown, this is the type
/// you're looking for
type TaskWorkerInitError: Send + Sync;
/// when attempting to run a single unit of work, if any error occurs this is the error type that is to be returned
type TaskWorkerWorkError: Send + Sync;
/// when attempting to close a worker, if an error occurs this is the error type that is returned
type TaskWorkerTerminateError: Send + Sync;
/// the task that is sent to each worker
type TaskInput: Send + Sync;
// fn
/// initialize the worker
fn initialize_worker(&self) -> Result<Self::TaskWorker, Self::TaskWorkerInitError>;
/// drive the worker to complete a task and return the time
fn drive_worker_timed(
worker: &mut Self::TaskWorker,
task: Self::TaskInput,
) -> Result<(Instant, Instant), Self::TaskWorkerWorkError>;
fn terminate_worker(
&self,
worker: &mut Self::TaskWorker,
) -> Result<(), Self::TaskWorkerTerminateError>;
}
#[derive(Debug)]
struct ThreadWorker<Th> {
handle: JoinHandle<()>,
_m: PhantomData<Th>,
}
#[derive(Debug)]
enum WorkerTask<Th: ThreadedTask> {
Task(Th::TaskInput),
Exit,
}
impl<Th: ThreadedTask> ThreadWorker<Th> {
fn new(
hl_worker: Th::TaskWorker,
task_rx: Receiver<WorkerTask<Th>>,
res_tx: Sender<Result<(Instant, Instant), Th::TaskWorkerWorkError>>,
) -> Self {
Self {
handle: thread::spawn(move || {
let mut worker = hl_worker;
loop {
let task = match task_rx.recv().unwrap() {
WorkerTask::Exit => {
drop(task_rx);
return;
}
WorkerTask::Task(t) => t,
};
res_tx
.send(Th::drive_worker_timed(&mut worker, task))
.unwrap();
}
}),
_m: PhantomData,
}
}
}
#[derive(Debug)]
pub struct Taskpool<Th: ThreadedTask> {
workers: Vec<ThreadWorker<Th>>,
_config: Th,
task_tx: Sender<WorkerTask<Th>>,
res_rx: Receiver<Result<(Instant, Instant), Th::TaskWorkerWorkError>>,
record_real_start: Instant,
record_real_stop: Instant,
stat_run_avg_ns: f64,
stat_run_tail_ns: u128,
stat_run_head_ns: u128,
}
// TODO(@ohsayan): prepare histogram for report; for now there's no use of the head and tail latencies
#[derive(Default, Debug)]
pub struct RuntimeStats {
pub qps: f64,
pub avg_per_query_ns: f64,
pub head_ns: u128,
pub tail_ns: u128,
}
impl<Th: ThreadedTask> Taskpool<Th> {
pub fn stat_avg(&self) -> f64 {
self.stat_run_avg_ns
}
pub fn stat_tail(&self) -> u128 {
self.stat_run_tail_ns
}
pub fn stat_head(&self) -> u128 {
self.stat_run_head_ns
}
pub fn stat_elapsed(&self) -> u128 {
self.record_real_stop
.duration_since(self.record_real_start)
.as_nanos()
}
}
fn qps(query_count: usize, time_taken_in_nanos: u128) -> f64 {
const NANOS_PER_SECOND: u128 = 1_000_000_000;
let time_taken_in_nanos_f64 = time_taken_in_nanos as f64;
let query_count_f64 = query_count as f64;
(query_count_f64 / time_taken_in_nanos_f64) * NANOS_PER_SECOND as f64
}
impl<Th: ThreadedTask> Taskpool<Th> {
pub fn new(size: usize, config: Th) -> TaskPoolResult<Self, Th> {
let (task_tx, task_rx) = unbounded();
let (res_tx, res_rx) = unbounded();
let mut workers = Vec::with_capacity(size);
for _ in 0..size {
let con = config
.initialize_worker()
.map_err(TaskpoolError::InitError)?;
workers.push(ThreadWorker::new(con, task_rx.clone(), res_tx.clone()));
}
Ok(Self {
workers,
_config: config,
task_tx,
res_rx,
stat_run_avg_ns: 0.0,
record_real_start: Instant::now(),
record_real_stop: Instant::now(),
stat_run_head_ns: u128::MAX,
stat_run_tail_ns: u128::MIN,
})
}
pub fn blocking_bombard(
&mut self,
vec: Vec<Th::TaskInput>,
) -> TaskPoolResult<RuntimeStats, Th> {
let expected = vec.len();
let mut received = 0usize;
for task in vec {
match self.task_tx.send(WorkerTask::Task(task)) {
Ok(()) => {}
Err(_) => {
// stop bombarding, we hit an error
return Err(TaskpoolError::BombardError(
"all worker threads exited. this indicates a catastrophic failure",
));
}
}
}
while received != expected {
match self.res_rx.recv() {
Err(_) => {
// all workers exited. that is catastrophic
return Err(TaskpoolError::BombardError(
"detected all worker threads crashed during run check",
));
}
Ok(r) => self.recompute_stats(&mut received, r)?,
};
}
// compute stats
let ret = Ok(RuntimeStats {
qps: qps(received, self.stat_elapsed()),
avg_per_query_ns: self.stat_avg(),
head_ns: self.stat_head(),
tail_ns: self.stat_tail(),
});
// reset stats
self.stat_run_avg_ns = 0.0;
self.record_real_start = Instant::now();
self.record_real_stop = Instant::now();
self.stat_run_head_ns = u128::MAX;
self.stat_run_tail_ns = u128::MIN;
// return
ret
}
fn recompute_stats(
&mut self,
received: &mut usize,
result: Result<(Instant, Instant), <Th as ThreadedTask>::TaskWorkerWorkError>,
) -> Result<(), TaskpoolError<Th>> {
*received += 1;
let (start, stop) = match result {
Ok(time) => time,
Err(e) => return Err(TaskpoolError::WorkerError(e)),
};
// adjust real start
if start < self.record_real_start {
self.record_real_start = start;
}
if stop > self.record_real_stop {
self.record_real_stop = stop;
}
let current_time = stop.duration_since(start).as_nanos();
self.stat_run_avg_ns = self.stat_run_avg_ns
+ ((current_time as f64 - self.stat_run_avg_ns) / *received as f64);
if current_time > self.stat_run_tail_ns {
self.stat_run_tail_ns = current_time;
}
if current_time < self.stat_run_head_ns {
self.stat_run_head_ns = current_time;
}
Ok(())
}
}
impl<Th: ThreadedTask> Drop for Taskpool<Th> {
fn drop(&mut self) {
for _ in 0..self.workers.len() {
self.task_tx.send(WorkerTask::Exit).unwrap();
}
for worker in self.workers.drain(..) {
worker.handle.join().unwrap()
}
}
}

@ -0,0 +1,406 @@
/*
* Created on Sun Nov 19 2023
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2023, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use {
crossbeam_channel::{unbounded, Receiver, Sender},
std::{
fmt::{self, Display},
sync::atomic::{AtomicBool, AtomicU64, Ordering},
thread::{self, JoinHandle},
time::{Duration, Instant},
},
};
pub type BombardResult<T, Bt> = Result<T, BombardError<Bt>>;
/*
state mgmt
*/
#[derive(Debug)]
/// The pool state. Be warned **ONLY ONE POOL AT A TIME!**
struct GPState {
current: AtomicU64,
state: AtomicBool,
occupied: AtomicBool,
}
impl GPState {
#[inline(always)]
fn get() -> &'static Self {
static STATE: GPState = GPState::zero();
&STATE
}
const fn zero() -> Self {
Self {
current: AtomicU64::new(0),
state: AtomicBool::new(true),
occupied: AtomicBool::new(false),
}
}
fn occupy(&self) {
assert!(!self.occupied.swap(true, Ordering::Release));
}
fn vacate(&self) {
assert!(self.occupied.swap(false, Ordering::Release));
}
fn guard<T>(f: impl FnOnce() -> T) -> T {
let slf = Self::get();
slf.occupy();
let ret = f();
slf.vacate();
ret
}
fn post_failure(&self) {
self.state.store(false, Ordering::Release)
}
fn post_target(&self, target: u64) {
self.current.store(target, Ordering::Release)
}
/// WARNING: this is not atomic! only sensible to run a quiescent state
fn post_reset(&self) {
self.current.store(0, Ordering::Release);
self.state.store(true, Ordering::Release);
}
fn update_target(&self) -> u64 {
let mut current = self.current.load(Ordering::Acquire);
loop {
if current == 0 {
return 0;
}
match self.current.compare_exchange(
current,
current - 1,
Ordering::Release,
Ordering::Acquire,
) {
Ok(last) => {
return last;
}
Err(new) => {
current = new;
}
}
}
}
fn load_okay(&self) -> bool {
self.state.load(Ordering::Acquire)
}
}
/*
task spec
*/
/// A threaded bombard task specification which drives a global pool of threads towards a common goal
pub trait ThreadedBombardTask: Send + Sync + 'static {
/// The per-task worker that is initialized once in every thread (not to be confused with the actual thread worker!)
type Worker: Send + Sync;
/// The task that the [`ThreadedBombardTask::TaskWorker`] performs
type WorkerTask: Send + Sync;
type WorkerTaskSpec: Clone + Send + Sync + 'static;
/// Errors while running a task
type WorkerTaskError: Send + Sync;
/// Errors while initializing a task worker
type WorkerInitError: Send + Sync;
/// Initialize a task worker
fn worker_init(&self) -> Result<Self::Worker, Self::WorkerInitError>;
fn generate_task(spec: &Self::WorkerTaskSpec, current: u64) -> Self::WorkerTask;
/// Drive a single subtask
fn worker_drive_timed(
worker: &mut Self::Worker,
task: Self::WorkerTask,
) -> Result<u128, Self::WorkerTaskError>;
}
/*
worker
*/
#[derive(Debug)]
enum WorkerResult<Bt: ThreadedBombardTask> {
Completed(WorkerLocalStats),
Errored(Bt::WorkerTaskError),
}
#[derive(Debug)]
struct WorkerLocalStats {
start: Instant,
elapsed: u128,
head: u128,
tail: u128,
}
impl WorkerLocalStats {
fn new(start: Instant, elapsed: u128, head: u128, tail: u128) -> Self {
Self {
start,
elapsed,
head,
tail,
}
}
}
#[derive(Debug)]
enum WorkerTask<Bt: ThreadedBombardTask> {
Task(Bt::WorkerTaskSpec),
Exit,
}
#[derive(Debug)]
struct Worker {
handle: JoinHandle<()>,
}
impl Worker {
fn start<Bt: ThreadedBombardTask>(
id: usize,
driver: Bt::Worker,
rx_work: Receiver<WorkerTask<Bt>>,
tx_res: Sender<WorkerResult<Bt>>,
) -> Self {
Self {
handle: thread::Builder::new()
.name(format!("worker-{id}"))
.spawn(move || {
let mut worker_driver = driver;
'blocking_wait: loop {
let task = match rx_work.recv().unwrap() {
WorkerTask::Exit => return,
WorkerTask::Task(spec) => spec,
};
// check global state
let mut global_okay = GPState::get().load_okay();
let mut global_position = GPState::get().update_target();
// init local state
let mut local_start = None;
let mut local_elapsed = 0u128;
let mut local_head = u128::MAX;
let mut local_tail = 0;
// bombard
while (global_position != 0) & global_okay {
let task = Bt::generate_task(&task, global_position);
if local_start.is_none() {
local_start = Some(Instant::now());
}
let this_elapsed =
match Bt::worker_drive_timed(&mut worker_driver, task) {
Ok(elapsed) => elapsed,
Err(e) => {
GPState::get().post_failure();
tx_res.send(WorkerResult::Errored(e)).unwrap();
continue 'blocking_wait;
}
};
local_elapsed += this_elapsed;
if this_elapsed < local_head {
local_head = this_elapsed;
}
if this_elapsed > local_tail {
local_tail = this_elapsed;
}
global_position = GPState::get().update_target();
global_okay = GPState::get().load_okay();
}
if global_okay {
// we're done
tx_res
.send(WorkerResult::Completed(WorkerLocalStats::new(
local_start.unwrap(),
local_elapsed,
local_head,
local_tail,
)))
.unwrap();
}
}
})
.expect("failed to start thread"),
}
}
}
/*
pool
*/
#[derive(Debug)]
pub enum BombardError<Bt: ThreadedBombardTask> {
InitError(Bt::WorkerInitError),
WorkerTaskError(Bt::WorkerTaskError),
AllWorkersOffline,
}
impl<Bt: ThreadedBombardTask> fmt::Display for BombardError<Bt>
where
Bt::WorkerInitError: fmt::Display,
Bt::WorkerTaskError: Display,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::AllWorkersOffline => write!(
f,
"bombard failed because all workers went offline indicating catastrophic failure"
),
Self::WorkerTaskError(e) => write!(f, "worker task failed. {e}"),
Self::InitError(e) => write!(f, "worker init failed. {e}"),
}
}
}
#[derive(Debug)]
pub struct RuntimeStats {
pub qps: f64,
pub head: u128,
pub tail: u128,
}
#[derive(Debug)]
pub struct BombardPool<Bt: ThreadedBombardTask> {
workers: Vec<(Worker, Sender<WorkerTask<Bt>>)>,
rx_res: Receiver<WorkerResult<Bt>>,
_config: Bt,
}
impl<Bt: ThreadedBombardTask> BombardPool<Bt> {
fn qps(query_count: usize, time_taken_in_nanos: u128) -> f64 {
const NANOS_PER_SECOND: u128 = 1_000_000_000;
let time_taken_in_nanos_f64 = time_taken_in_nanos as f64;
let query_count_f64 = query_count as f64;
(query_count_f64 / time_taken_in_nanos_f64) * NANOS_PER_SECOND as f64
}
pub fn new(size: usize, config: Bt) -> BombardResult<Self, Bt> {
assert_ne!(size, 0, "pool can't be empty");
let mut workers = Vec::with_capacity(size);
let (tx_res, rx_res) = unbounded();
for id in 0..size {
let (tx_work, rx_work) = unbounded();
let driver = config.worker_init().map_err(BombardError::InitError)?;
workers.push((Worker::start(id, driver, rx_work, tx_res.clone()), tx_work));
}
Ok(Self {
workers,
rx_res,
_config: config,
})
}
/// Bombard queries to the workers
pub fn blocking_bombard(
&mut self,
task_description: Bt::WorkerTaskSpec,
count: usize,
) -> BombardResult<RuntimeStats, Bt> {
GPState::guard(|| {
GPState::get().post_target(count as _);
let mut global_start = None;
let mut global_stop = None;
let mut global_head = u128::MAX;
let mut global_tail = 0u128;
let messages: Vec<<Bt as ThreadedBombardTask>::WorkerTaskSpec> =
(0..self.workers.len())
.into_iter()
.map(|_| task_description.clone())
.collect();
for ((_, sender), msg) in self.workers.iter().zip(messages) {
sender.send(WorkerTask::Task(msg)).unwrap();
}
// wait for all workers to complete
let mut received = 0;
while received != self.workers.len() {
let results = match self.rx_res.recv() {
Err(_) => return Err(BombardError::AllWorkersOffline),
Ok(r) => r,
};
let WorkerLocalStats {
start: this_start,
elapsed,
head,
tail,
} = match results {
WorkerResult::Completed(r) => r,
WorkerResult::Errored(e) => return Err(BombardError::WorkerTaskError(e)),
};
// update start if required
match global_start.as_mut() {
None => {
global_start = Some(this_start);
}
Some(start) => {
if this_start < *start {
*start = this_start;
}
}
}
let this_task_stopped_at =
this_start + Duration::from_nanos(elapsed.try_into().unwrap());
match global_stop.as_mut() {
None => {
global_stop = Some(this_task_stopped_at);
}
Some(stop) => {
if this_task_stopped_at > *stop {
// this task stopped later than the previous one
*stop = this_task_stopped_at;
}
}
}
if head < global_head {
global_head = head;
}
if tail > global_tail {
global_tail = tail;
}
received += 1;
}
// reset global pool state
GPState::get().post_reset();
// compute results
let global_elapsed = global_stop
.unwrap()
.duration_since(global_start.unwrap())
.as_nanos();
Ok(RuntimeStats {
qps: Self::qps(count, global_elapsed),
head: global_head,
tail: global_tail,
})
})
}
}
impl<Bt: ThreadedBombardTask> Drop for BombardPool<Bt> {
fn drop(&mut self) {
info!("taking all workers offline");
for (_, sender) in self.workers.iter() {
sender.send(WorkerTask::Exit).unwrap();
}
for (worker, _) in self.workers.drain(..) {
worker.handle.join().unwrap();
}
info!("all workers now offline");
}
}
Loading…
Cancel
Save