Add new fury benchmark engine

next
Sayan Nandan 10 months ago
parent a3d87e4850
commit f5866bc22e
No known key found for this signature in database
GPG Key ID: 42EEDF4AE9D96B54

3
Cargo.lock generated

@ -1182,6 +1182,7 @@ dependencies = [
"log",
"num_cpus",
"skytable",
"tokio",
]
[[package]]
@ -1232,7 +1233,7 @@ dependencies = [
[[package]]
name = "skytable"
version = "0.8.0"
source = "git+https://github.com/skytable/client-rust.git?branch=octave#18aad43142fbe013ace64d92c3ffd71ab1394a18"
source = "git+https://github.com/skytable/client-rust.git?branch=octave#691e15578c1a09a5d2b6b85c8752e6eee31fbda2"
dependencies = [
"async-trait",
"bb8",

@ -15,3 +15,4 @@ crossbeam-channel = "0.5.8"
num_cpus = "1.16.0"
env_logger = "0.10.1"
log = "0.4.20"
tokio = { version = "1.34.0", features = ["full"] }

@ -13,13 +13,21 @@ REQUIRED OPTIONS:
--password Provide the password
OPTIONS:
--endpoint Set the endpoint (defaults to tcp@127.0.0.1:2003)
--threads Set the number of threads to be used (defaults to number of physical CPU cores)
--keysize Set the default primary key size. defaults to 7
--rowcount Set the number of rows to be manipulated for the benchmark
--endpoint Set the endpoint (defaults to tcp@127.0.0.1:2003)
--threads Set the number of threads to be used (defaults to logical
CPU count)
--connections Set the number of connections. Defaults to twice the logical CPU
count but is only supported by the `fury` engine
--keysize Set the default primary key size. defaults to 7
--rowcount Set the number of rows to be manipulated for the benchmark
Defaults to 1,000,000 rows.
--engine Set the engine for benchmarking. `rookie` is the stable engine
and `fury` is the experimental engine
NOTES:
- The user for auth will be 'root' since only 'root' accounts allow the creation and deletion of spaces and models
- The user for auth will be 'root' since only 'root' accounts allow the
creation and deletion of spaces and models
- A space called 'benchmark_[random 8B string]' will be created
- A model called 'benchmark_[random 8B string]' will be created in the space created above. The created model has the structure {name: string, pass: string}
- The model and space will be removed once the benchmark is complete
- A model called 'benchmark_[random 8B string]' will be created in the space
created above. The created model has the structure {name: string, pass: string}
- The model and space will be removed once the benchmark is complete

@ -44,6 +44,12 @@ pub enum Task {
BenchConfig(BenchConfig),
}
#[derive(Debug, PartialEq)]
pub enum BenchEngine {
Rookie,
Fury,
}
#[derive(Debug)]
pub struct BenchConfig {
pub host: String,
@ -52,6 +58,8 @@ pub struct BenchConfig {
pub threads: usize,
pub key_size: usize,
pub query_count: usize,
pub engine: BenchEngine,
pub connections: usize,
}
impl BenchConfig {
@ -62,6 +70,8 @@ impl BenchConfig {
threads: usize,
key_size: usize,
query_count: usize,
engine: BenchEngine,
connections: usize,
) -> Self {
Self {
host,
@ -70,6 +80,8 @@ impl BenchConfig {
threads,
key_size,
query_count,
engine,
connections,
}
}
}
@ -139,7 +151,7 @@ pub fn parse() -> BenchResult<Task> {
};
// threads
let thread_count = match args.remove("--threads") {
None => num_cpus::get_physical(),
None => num_cpus::get(),
Some(tc) => match tc.parse() {
Ok(tc) if tc > 0 => tc,
Err(_) | Ok(_) => {
@ -169,12 +181,51 @@ pub fn parse() -> BenchResult<Task> {
Err(_) | Ok(_) => return Err(BenchError::ArgsErr(format!("incorrect value for `--keysize`. must be set to a value that can be used to generate atleast {query_count} unique primary keys"))),
}
};
Ok(Task::BenchConfig(BenchConfig::new(
host,
port,
passsword,
thread_count,
key_size,
query_count,
)))
let engine = match args.remove("--engine") {
None => {
warn!("engine unspecified. choosing 'fury'");
BenchEngine::Fury
}
Some(engine) => match engine.as_str() {
"rookie" => BenchEngine::Rookie,
"fury" => BenchEngine::Fury,
_ => {
return Err(BenchError::ArgsErr(format!(
"bad value for `--engine`. got `{engine}` but expected warp or rookie"
)))
}
},
};
let connections = match args.remove("--connections") {
None => num_cpus::get() * 2,
Some(c) => match c.parse::<usize>() {
Ok(s) if s != 0 => {
if engine == BenchEngine::Rookie {
return Err(BenchError::ArgsErr(format!(
"the 'rookie' engine does not support explicit connection count. the number of threads is the connection count"
)));
}
s
}
_ => {
return Err(BenchError::ArgsErr(format!(
"bad value for `--connections`. must be a nonzero value"
)))
}
},
};
if args.is_empty() {
Ok(Task::BenchConfig(BenchConfig::new(
host,
port,
passsword,
thread_count,
key_size,
query_count,
engine,
connections,
)))
} else {
Err(BenchError::ArgsErr(format!("unrecognized arguments")))
}
}

@ -24,11 +24,13 @@
*
*/
use crate::args::BenchEngine;
use {
crate::{
args::BenchConfig,
error::{self, BenchResult},
runtime::{BombardPool, RuntimeStats, ThreadedBombardTask},
runtime::{fury, rookie, RuntimeStats},
},
skytable::{error::Error, query, response::Response, Config, Connection, Query},
std::{fmt, time::Instant},
@ -87,7 +89,7 @@ impl BombardTaskSpec {
pk_len,
}
}
fn generate(&self, current: u64) -> (Query, Response) {
pub fn generate(&self, current: u64) -> (Query, Response) {
let mut q = query!(&self.base_query);
let resp = match self.kind {
BombardTaskKind::Insert(second_column) => {
@ -137,7 +139,7 @@ impl From<Error> for BombardTaskError {
}
}
impl ThreadedBombardTask for BombardTask {
impl rookie::ThreadedBombardTask for BombardTask {
type Worker = Connection;
type WorkerTask = (Query, Response);
type WorkerTaskSpec = BombardTaskSpec;
@ -179,7 +181,11 @@ pub fn run(bench: BenchConfig) -> error::BenchResult<()> {
let mut main_thread_db = bench_config.config.connect()?;
main_thread_db.query_parse::<()>(&query!("create space bench"))?;
main_thread_db.query_parse::<()>(&query!("create model bench.bench(un: string, pw: uint8)"))?;
let stats = match bench_internal(bench_config, bench) {
let stats = match bench.engine {
BenchEngine::Rookie => bench_rookie(bench_config, bench),
BenchEngine::Fury => bench_fury(bench),
};
let stats = match stats {
Ok(stats) => stats,
Err(e) => {
error!("benchmarking failed. attempting to clean up");
@ -253,24 +259,19 @@ impl BenchItem {
self.count
)
}
fn run(self, pool: &mut BombardPool<BombardTask>) -> BenchResult<RuntimeStats> {
fn run(self, pool: &mut rookie::BombardPool<BombardTask>) -> BenchResult<RuntimeStats> {
pool.blocking_bombard(self.spec, self.count)
.map_err(From::from)
}
async fn run_async(self, pool: &mut fury::Fury) -> BenchResult<RuntimeStats> {
pool.bombard(self.count, self.spec)
.await
.map_err(From::from)
}
}
fn bench_internal(
config: BombardTask,
bench: BenchConfig,
) -> BenchResult<Vec<(&'static str, RuntimeStats)>> {
// initialize pool
info!(
"initializing connections. threads={}, primary key size ={} bytes",
bench.threads, bench.key_size
);
let mut pool = BombardPool::new(bench.threads, config)?;
// prepare benches
let benches = vec![
fn prepare_bench_spec(bench: &BenchConfig) -> Vec<BenchItem> {
vec![
BenchItem::new(
"INSERT",
BombardTaskSpec::insert("insert into bench.bench(?, ?)".into(), bench.key_size, 0),
@ -292,7 +293,34 @@ fn bench_internal(
),
bench.query_count,
),
];
]
}
fn fmt_u64(n: u64) -> String {
let num_str = n.to_string();
let mut result = String::new();
let chars_rev: Vec<_> = num_str.chars().rev().collect();
for (i, ch) in chars_rev.iter().enumerate() {
if i % 3 == 0 && i != 0 {
result.push(',');
}
result.push(*ch);
}
result.chars().rev().collect()
}
fn bench_rookie(
task: BombardTask,
bench: BenchConfig,
) -> BenchResult<Vec<(&'static str, RuntimeStats)>> {
// initialize pool
info!(
"initializing connections. engine=rookie, threads={}, primary key size ={} bytes",
bench.threads, bench.key_size
);
let mut pool = rookie::BombardPool::new(bench.threads, task)?;
// prepare benches
let benches = prepare_bench_spec(&bench);
// bench
let total_queries = bench.query_count as u64 * benches.len() as u64;
let mut results = vec![];
@ -309,15 +337,37 @@ fn bench_internal(
Ok(results)
}
fn fmt_u64(n: u64) -> String {
let num_str = n.to_string();
let mut result = String::new();
let chars_rev: Vec<_> = num_str.chars().rev().collect();
for (i, ch) in chars_rev.iter().enumerate() {
if i % 3 == 0 && i != 0 {
result.push(',');
fn bench_fury(bench: BenchConfig) -> BenchResult<Vec<(&'static str, RuntimeStats)>> {
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(bench.threads)
.enable_all()
.build()
.unwrap();
rt.block_on(async move {
info!(
"initializing connections. engine=fury, threads={}, connections={}, primary key size ={} bytes",
bench.threads, bench.connections, bench.key_size
);
let mut pool = fury::Fury::new(
bench.connections,
Config::new(&bench.host, bench.port, "root", &bench.root_pass),
)
.await?;
// prepare benches
let benches = prepare_bench_spec(&bench);
// bench
let total_queries = bench.query_count as u64 * benches.len() as u64;
let mut results = vec![];
for task in benches {
let name = task.name;
task.print_log_start();
let this_result = task.run_async(&mut pool).await?;
results.push((name, this_result));
}
result.push(*ch);
}
result.chars().rev().collect()
info!(
"benchmark complete. finished executing {} queries",
fmt_u64(total_queries)
);
Ok(results)
})
}

@ -25,7 +25,10 @@
*/
use {
crate::{bench::BombardTask, runtime::BombardError},
crate::{
bench::BombardTask,
runtime::{fury, rookie::BombardError},
},
core::fmt,
skytable::error::Error,
};
@ -35,10 +38,17 @@ pub type BenchResult<T> = Result<T, BenchError>;
#[derive(Debug)]
pub enum BenchError {
ArgsErr(String),
BenchBombardError(BombardError<BombardTask>),
RookieEngineError(BombardError<BombardTask>),
FuryEngineError(fury::FuryError),
DirectDbError(Error),
}
impl From<fury::FuryError> for BenchError {
fn from(e: fury::FuryError) -> Self {
Self::FuryEngineError(e)
}
}
impl From<libsky::ArgParseError> for BenchError {
fn from(e: libsky::ArgParseError) -> Self {
match e {
@ -60,7 +70,7 @@ impl From<Error> for BenchError {
impl From<BombardError<BombardTask>> for BenchError {
fn from(e: BombardError<BombardTask>) -> Self {
Self::BenchBombardError(e)
Self::RookieEngineError(e)
}
}
@ -69,7 +79,8 @@ impl fmt::Display for BenchError {
match self {
Self::ArgsErr(e) => write!(f, "args error: {e}"),
Self::DirectDbError(e) => write!(f, "direct operation on db failed. {e}"),
Self::BenchBombardError(e) => write!(f, "benchmark failed: {e}"),
Self::RookieEngineError(e) => write!(f, "benchmark failed (rookie engine): {e}"),
Self::FuryEngineError(e) => write!(f, "benchmark failed (fury engine): {e}"),
}
}
}

@ -24,126 +24,29 @@
*
*/
use {
crossbeam_channel::{unbounded, Receiver, Sender},
std::{
fmt::{self, Display},
sync::atomic::{AtomicBool, AtomicU64, Ordering},
thread::{self, JoinHandle},
time::{Duration, Instant},
},
};
pub mod fury;
pub mod rookie;
pub type BombardResult<T, Bt> = Result<T, BombardError<Bt>>;
use std::time::Instant;
/*
state mgmt
*/
#[derive(Debug)]
/// The pool state. Be warned **ONLY ONE POOL AT A TIME!**
struct GPState {
current: AtomicU64,
state: AtomicBool,
occupied: AtomicBool,
}
impl GPState {
#[inline(always)]
fn get() -> &'static Self {
static STATE: GPState = GPState::zero();
&STATE
}
const fn zero() -> Self {
Self {
current: AtomicU64::new(0),
state: AtomicBool::new(true),
occupied: AtomicBool::new(false),
}
}
fn occupy(&self) {
assert!(!self.occupied.swap(true, Ordering::Release));
}
fn vacate(&self) {
assert!(self.occupied.swap(false, Ordering::Release));
}
fn guard<T>(f: impl FnOnce() -> T) -> T {
let slf = Self::get();
slf.occupy();
let ret = f();
slf.vacate();
ret
}
fn post_failure(&self) {
self.state.store(false, Ordering::Release)
}
fn post_target(&self, target: u64) {
self.current.store(target, Ordering::Release)
}
/// WARNING: this is not atomic! only sensible to run a quiescent state
fn post_reset(&self) {
self.current.store(0, Ordering::Release);
self.state.store(true, Ordering::Release);
}
fn update_target(&self) -> u64 {
let mut current = self.current.load(Ordering::Acquire);
loop {
if current == 0 {
return 0;
}
match self.current.compare_exchange(
current,
current - 1,
Ordering::Release,
Ordering::Acquire,
) {
Ok(last) => {
return last;
}
Err(new) => {
current = new;
}
}
}
}
fn load_okay(&self) -> bool {
self.state.load(Ordering::Acquire)
}
fn qps(query_count: usize, time_taken_in_nanos: u128) -> f64 {
const NANOS_PER_SECOND: u128 = 1_000_000_000;
let time_taken_in_nanos_f64 = time_taken_in_nanos as f64;
let query_count_f64 = query_count as f64;
(query_count_f64 / time_taken_in_nanos_f64) * NANOS_PER_SECOND as f64
}
/*
task spec
*/
/// A threaded bombard task specification which drives a global pool of threads towards a common goal
pub trait ThreadedBombardTask: Send + Sync + 'static {
/// The per-task worker that is initialized once in every thread (not to be confused with the actual thread worker!)
type Worker: Send + Sync;
/// The task that the [`ThreadedBombardTask::TaskWorker`] performs
type WorkerTask: Send + Sync;
type WorkerTaskSpec: Clone + Send + Sync + 'static;
/// Errors while running a task
type WorkerTaskError: Send + Sync;
/// Errors while initializing a task worker
type WorkerInitError: Send + Sync;
/// Initialize a task worker
fn worker_init(&self) -> Result<Self::Worker, Self::WorkerInitError>;
fn generate_task(spec: &Self::WorkerTaskSpec, current: u64) -> Self::WorkerTask;
/// Drive a single subtask
fn worker_drive_timed(
worker: &mut Self::Worker,
task: Self::WorkerTask,
) -> Result<u128, Self::WorkerTaskError>;
#[derive(Debug, Clone)]
pub(self) enum WorkerTask<T> {
Task(T),
Exit,
}
/*
worker
*/
#[derive(Debug)]
enum WorkerResult<Bt: ThreadedBombardTask> {
Completed(WorkerLocalStats),
Errored(Bt::WorkerTaskError),
pub struct RuntimeStats {
pub qps: f64,
pub head: u128,
pub tail: u128,
}
#[derive(Debug)]
@ -164,240 +67,3 @@ impl WorkerLocalStats {
}
}
}
#[derive(Debug)]
enum WorkerTask<Bt: ThreadedBombardTask> {
Task(Bt::WorkerTaskSpec),
Exit,
}
#[derive(Debug)]
struct Worker {
handle: JoinHandle<()>,
}
impl Worker {
fn start<Bt: ThreadedBombardTask>(
id: usize,
driver: Bt::Worker,
rx_work: Receiver<WorkerTask<Bt>>,
tx_res: Sender<WorkerResult<Bt>>,
) -> Self {
Self {
handle: thread::Builder::new()
.name(format!("worker-{id}"))
.spawn(move || {
let mut worker_driver = driver;
'blocking_wait: loop {
let task = match rx_work.recv().unwrap() {
WorkerTask::Exit => return,
WorkerTask::Task(spec) => spec,
};
// check global state
let mut global_okay = GPState::get().load_okay();
let mut global_position = GPState::get().update_target();
// init local state
let mut local_start = None;
let mut local_elapsed = 0u128;
let mut local_head = u128::MAX;
let mut local_tail = 0;
// bombard
while (global_position != 0) & global_okay {
let task = Bt::generate_task(&task, global_position);
if local_start.is_none() {
local_start = Some(Instant::now());
}
let this_elapsed =
match Bt::worker_drive_timed(&mut worker_driver, task) {
Ok(elapsed) => elapsed,
Err(e) => {
GPState::get().post_failure();
tx_res.send(WorkerResult::Errored(e)).unwrap();
continue 'blocking_wait;
}
};
local_elapsed += this_elapsed;
if this_elapsed < local_head {
local_head = this_elapsed;
}
if this_elapsed > local_tail {
local_tail = this_elapsed;
}
global_position = GPState::get().update_target();
global_okay = GPState::get().load_okay();
}
if global_okay {
// we're done
tx_res
.send(WorkerResult::Completed(WorkerLocalStats::new(
local_start.unwrap(),
local_elapsed,
local_head,
local_tail,
)))
.unwrap();
}
}
})
.expect("failed to start thread"),
}
}
}
/*
pool
*/
#[derive(Debug)]
pub enum BombardError<Bt: ThreadedBombardTask> {
InitError(Bt::WorkerInitError),
WorkerTaskError(Bt::WorkerTaskError),
AllWorkersOffline,
}
impl<Bt: ThreadedBombardTask> fmt::Display for BombardError<Bt>
where
Bt::WorkerInitError: fmt::Display,
Bt::WorkerTaskError: Display,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::AllWorkersOffline => write!(
f,
"bombard failed because all workers went offline indicating catastrophic failure"
),
Self::WorkerTaskError(e) => write!(f, "worker task failed. {e}"),
Self::InitError(e) => write!(f, "worker init failed. {e}"),
}
}
}
#[derive(Debug)]
pub struct RuntimeStats {
pub qps: f64,
pub head: u128,
pub tail: u128,
}
#[derive(Debug)]
pub struct BombardPool<Bt: ThreadedBombardTask> {
workers: Vec<(Worker, Sender<WorkerTask<Bt>>)>,
rx_res: Receiver<WorkerResult<Bt>>,
_config: Bt,
}
impl<Bt: ThreadedBombardTask> BombardPool<Bt> {
fn qps(query_count: usize, time_taken_in_nanos: u128) -> f64 {
const NANOS_PER_SECOND: u128 = 1_000_000_000;
let time_taken_in_nanos_f64 = time_taken_in_nanos as f64;
let query_count_f64 = query_count as f64;
(query_count_f64 / time_taken_in_nanos_f64) * NANOS_PER_SECOND as f64
}
pub fn new(size: usize, config: Bt) -> BombardResult<Self, Bt> {
assert_ne!(size, 0, "pool can't be empty");
let mut workers = Vec::with_capacity(size);
let (tx_res, rx_res) = unbounded();
for id in 0..size {
let (tx_work, rx_work) = unbounded();
let driver = config.worker_init().map_err(BombardError::InitError)?;
workers.push((Worker::start(id, driver, rx_work, tx_res.clone()), tx_work));
}
Ok(Self {
workers,
rx_res,
_config: config,
})
}
/// Bombard queries to the workers
pub fn blocking_bombard(
&mut self,
task_description: Bt::WorkerTaskSpec,
count: usize,
) -> BombardResult<RuntimeStats, Bt> {
GPState::guard(|| {
GPState::get().post_target(count as _);
let mut global_start = None;
let mut global_stop = None;
let mut global_head = u128::MAX;
let mut global_tail = 0u128;
for (_, sender) in self.workers.iter() {
sender
.send(WorkerTask::Task(task_description.clone()))
.unwrap();
}
// wait for all workers to complete
let mut received = 0;
while received != self.workers.len() {
let results = match self.rx_res.recv() {
Err(_) => return Err(BombardError::AllWorkersOffline),
Ok(r) => r,
};
let WorkerLocalStats {
start: this_start,
elapsed,
head,
tail,
} = match results {
WorkerResult::Completed(r) => r,
WorkerResult::Errored(e) => return Err(BombardError::WorkerTaskError(e)),
};
// update start if required
match global_start.as_mut() {
None => {
global_start = Some(this_start);
}
Some(start) => {
if this_start < *start {
*start = this_start;
}
}
}
let this_task_stopped_at =
this_start + Duration::from_nanos(elapsed.try_into().unwrap());
match global_stop.as_mut() {
None => {
global_stop = Some(this_task_stopped_at);
}
Some(stop) => {
if this_task_stopped_at > *stop {
// this task stopped later than the previous one
*stop = this_task_stopped_at;
}
}
}
if head < global_head {
global_head = head;
}
if tail > global_tail {
global_tail = tail;
}
received += 1;
}
// reset global pool state
GPState::get().post_reset();
// compute results
let global_elapsed = global_stop
.unwrap()
.duration_since(global_start.unwrap())
.as_nanos();
Ok(RuntimeStats {
qps: Self::qps(count, global_elapsed),
head: global_head,
tail: global_tail,
})
})
}
}
impl<Bt: ThreadedBombardTask> Drop for BombardPool<Bt> {
fn drop(&mut self) {
info!("taking all workers offline");
for (_, sender) in self.workers.iter() {
sender.send(WorkerTask::Exit).unwrap();
}
for (worker, _) in self.workers.drain(..) {
worker.handle.join().unwrap();
}
info!("all workers now offline");
}
}

@ -0,0 +1,315 @@
/*
* Created on Wed Nov 22 2023
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2023, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use {
super::{RuntimeStats, WorkerLocalStats, WorkerTask},
crate::bench::BombardTaskSpec,
skytable::Config,
std::{
fmt,
sync::atomic::{AtomicBool, AtomicUsize, Ordering},
time::{Duration, Instant},
},
tokio::sync::{broadcast, mpsc, RwLock},
};
/*
state
*/
static GLOBAL_START: RwLock<()> = RwLock::const_new(());
static GLOBAL_TARGET: AtomicUsize = AtomicUsize::new(0);
static GLOBAL_EXIT: AtomicBool = AtomicBool::new(false);
fn gset_target(target: usize) {
GLOBAL_TARGET.store(target, Ordering::Release)
}
fn gset_exit() {
GLOBAL_EXIT.store(true, Ordering::Release)
}
fn grefresh_target() -> usize {
let mut current = GLOBAL_TARGET.load(Ordering::Acquire);
loop {
if current == 0 {
return 0;
}
match GLOBAL_TARGET.compare_exchange(
current,
current - 1,
Ordering::Release,
Ordering::Acquire,
) {
Ok(prev) => return prev,
Err(new) => current = new,
}
}
}
fn grefresh_early_exit() -> bool {
GLOBAL_EXIT.load(Ordering::Acquire)
}
/*
errors
*/
pub type FuryResult<T> = Result<T, FuryError>;
#[derive(Debug)]
pub enum FuryError {
Init(skytable::error::Error),
Worker(FuryWorkerError),
Dead,
}
impl fmt::Display for FuryError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Init(e) => write!(f, "fury init failed. {e}"),
Self::Worker(e) => write!(f, "worker failed. {e}"),
Self::Dead => write!(f, "all workers offline"),
}
}
}
impl From<FuryWorkerError> for FuryError {
fn from(e: FuryWorkerError) -> Self {
Self::Worker(e)
}
}
#[derive(Debug)]
pub enum FuryWorkerError {
DbError(skytable::error::Error),
Mismatch,
}
impl fmt::Display for FuryWorkerError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::DbError(e) => write!(f, "client errored. {e}"),
Self::Mismatch => write!(f, "server response did not match expected response"),
}
}
}
/*
impl
*/
#[derive(Debug)]
pub struct Fury {
tx_task: broadcast::Sender<WorkerTask<BombardTaskSpec>>,
rx_task_result: mpsc::Receiver<FuryResult<WorkerLocalStats>>,
client_count: usize,
}
impl Fury {
pub async fn new(client_count: usize, config: Config) -> FuryResult<Self> {
let (tx_task, rx_task) = broadcast::channel(1);
let (tx_task_result, rx_task_result) = mpsc::channel(client_count);
let (tx_ack, mut rx_ack) = mpsc::channel(1);
for _ in 0..client_count {
let rx_task = tx_task.subscribe();
let tx_task_result = tx_task_result.clone();
let tx_ack = tx_ack.clone();
let config = config.clone();
tokio::spawn(async move { worker_svc(rx_task, tx_task_result, tx_ack, config).await });
}
drop((tx_ack, rx_task));
match rx_ack.recv().await {
None => {}
Some(e) => return Err(FuryError::Init(e)),
}
info!("all workers online. ready for event loop");
Ok(Self {
tx_task,
rx_task_result,
client_count,
})
}
pub async fn bombard(
&mut self,
count: usize,
task: BombardTaskSpec,
) -> FuryResult<RuntimeStats> {
// pause workers and set target
let start_guard = GLOBAL_START.write().await;
gset_target(count);
// send tasks
self.tx_task.send(WorkerTask::Task(task)).unwrap();
// begin work
drop(start_guard);
// init stats
let mut global_start = None;
let mut global_stop = None;
let mut global_head = u128::MAX;
let mut global_tail = 0u128;
let mut remaining = self.client_count;
while remaining != 0 {
let WorkerLocalStats {
start: this_start,
elapsed: this_elapsed,
head: this_head,
tail: this_tail,
} = match self.rx_task_result.recv().await {
None => {
return Err(FuryError::Dead);
}
Some(res) => res,
}?;
match global_start.as_mut() {
None => global_start = Some(this_start),
Some(current_start) => {
if this_start < *current_start {
*current_start = this_start;
}
}
}
let this_stop = this_start + Duration::from_nanos(this_elapsed.try_into().unwrap());
match global_stop.as_mut() {
None => global_stop = Some(this_stop),
Some(current_gstop) => {
if this_stop > *current_gstop {
*current_gstop = this_stop;
}
}
}
if this_head < global_head {
global_head = this_head;
}
if this_tail > global_tail {
global_tail = this_tail;
}
remaining -= 1;
}
Ok(RuntimeStats {
qps: super::qps(
count,
global_stop
.unwrap()
.duration_since(global_start.unwrap())
.as_nanos(),
),
head: global_head,
tail: global_tail,
})
}
}
async fn worker_svc(
mut rx_task: broadcast::Receiver<WorkerTask<BombardTaskSpec>>,
tx_task_result: mpsc::Sender<FuryResult<WorkerLocalStats>>,
tx_ack: mpsc::Sender<skytable::error::Error>,
connection_cfg: Config,
) {
let mut db = match connection_cfg.connect_async().await {
Ok(c) => c,
Err(e) => {
tx_ack.send(e).await.unwrap();
return;
}
};
// we're connected and ready to server
drop(tx_ack);
'wait: loop {
let task = match rx_task.recv().await.unwrap() {
WorkerTask::Exit => return,
WorkerTask::Task(t) => t,
};
// received a task; ready to roll; wait for begin signal
let permit = GLOBAL_START.read().await;
// off to the races
let mut current = grefresh_target();
let mut exit_now = grefresh_early_exit();
// init local stats
let mut local_start = None;
let mut local_elapsed = 0u128;
let mut local_head = u128::MAX;
let mut local_tail = 0u128;
while (current != 0) && !exit_now {
// prepare query
let (query, response) = task.generate(current as u64);
// execute timed
let start = Instant::now();
let ret = db.query(&query).await;
let stop = Instant::now();
// check response
let resp = match ret {
Ok(resp) => resp,
Err(e) => {
tx_task_result
.send(Err(FuryError::Worker(FuryWorkerError::DbError(e))))
.await
.unwrap();
gset_exit();
continue 'wait;
}
};
if resp != response {
tx_task_result
.send(Err(FuryError::Worker(FuryWorkerError::Mismatch)))
.await
.unwrap();
gset_exit();
continue 'wait;
}
// update stats
if local_start.is_none() {
local_start = Some(start);
}
let elapsed = stop.duration_since(start).as_nanos();
local_elapsed += elapsed;
if elapsed > local_tail {
local_tail = elapsed;
}
if elapsed < local_head {
local_head = elapsed;
}
current = grefresh_target();
exit_now = grefresh_early_exit();
}
if exit_now {
continue 'wait;
}
// good! send these results
tx_task_result
.send(Ok(WorkerLocalStats::new(
local_start.unwrap(),
local_elapsed,
local_head,
local_tail,
)))
.await
.unwrap();
drop(permit);
}
}
impl Drop for Fury {
fn drop(&mut self) {
self.tx_task.send(WorkerTask::Exit).unwrap();
}
}

@ -0,0 +1,378 @@
/*
* Created on Tue Nov 21 2023
*
* This file is a part of Skytable
* Skytable (formerly known as TerrabaseDB or Skybase) is a free and open-source
* NoSQL database written by Sayan Nandan ("the Author") with the
* vision to provide flexibility in data modelling without compromising
* on performance, queryability or scalability.
*
* Copyright (c) 2023, Sayan Nandan <ohsayan@outlook.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use {
super::{RuntimeStats, WorkerLocalStats, WorkerTask},
crossbeam_channel::{unbounded, Receiver, Sender},
std::{
fmt::{self, Display},
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
RwLock, RwLockReadGuard, RwLockWriteGuard,
},
thread::{self, JoinHandle},
time::{Duration, Instant},
},
};
pub type BombardResult<T, Bt> = Result<T, BombardError<Bt>>;
/*
state mgmt
*/
#[derive(Debug)]
/// The pool state. Be warned **ONLY ONE POOL AT A TIME!**
struct GPState {
current: AtomicU64,
state: AtomicBool,
occupied: AtomicBool,
start_sig: RwLock<()>,
}
impl GPState {
#[inline(always)]
fn get() -> &'static Self {
static STATE: GPState = GPState::zero();
&STATE
}
const fn zero() -> Self {
Self {
current: AtomicU64::new(0),
state: AtomicBool::new(true),
occupied: AtomicBool::new(false),
start_sig: RwLock::new(()),
}
}
fn wait_for_global_begin(&self) -> RwLockReadGuard<'_, ()> {
self.start_sig.read().unwrap()
}
fn occupy(&self) {
assert!(!self.occupied.swap(true, Ordering::Release));
}
fn vacate(&self) {
assert!(self.occupied.swap(false, Ordering::Release));
}
fn guard<T>(f: impl FnOnce(RwLockWriteGuard<'static, ()>) -> T) -> T {
let slf = Self::get();
slf.occupy();
let ret = f(slf.start_sig.write().unwrap());
slf.vacate();
ret
}
fn post_failure(&self) {
self.state.store(false, Ordering::Release)
}
fn post_target(&self, target: u64) {
self.current.store(target, Ordering::Release)
}
/// WARNING: this is not atomic! only sensible to run a quiescent state
fn post_reset(&self) {
self.current.store(0, Ordering::Release);
self.state.store(true, Ordering::Release);
}
fn update_target(&self) -> u64 {
let mut current = self.current.load(Ordering::Acquire);
loop {
if current == 0 {
return 0;
}
match self.current.compare_exchange(
current,
current - 1,
Ordering::Release,
Ordering::Acquire,
) {
Ok(last) => {
return last;
}
Err(new) => {
current = new;
}
}
}
}
fn load_okay(&self) -> bool {
self.state.load(Ordering::Acquire)
}
}
/*
task spec
*/
/// A threaded bombard task specification which drives a global pool of threads towards a common goal
pub trait ThreadedBombardTask: Send + Sync + 'static {
/// The per-task worker that is initialized once in every thread (not to be confused with the actual thread worker!)
type Worker: Send + Sync;
/// The task that the [`ThreadedBombardTask::TaskWorker`] performs
type WorkerTask: Send + Sync;
type WorkerTaskSpec: Clone + Send + Sync + 'static;
/// Errors while running a task
type WorkerTaskError: Send + Sync;
/// Errors while initializing a task worker
type WorkerInitError: Send + Sync;
/// Initialize a task worker
fn worker_init(&self) -> Result<Self::Worker, Self::WorkerInitError>;
fn generate_task(spec: &Self::WorkerTaskSpec, current: u64) -> Self::WorkerTask;
/// Drive a single subtask
fn worker_drive_timed(
worker: &mut Self::Worker,
task: Self::WorkerTask,
) -> Result<u128, Self::WorkerTaskError>;
}
/*
worker
*/
#[derive(Debug)]
enum WorkerResult<Bt: ThreadedBombardTask> {
Completed(WorkerLocalStats),
Errored(Bt::WorkerTaskError),
}
#[derive(Debug)]
struct Worker {
handle: JoinHandle<()>,
}
impl Worker {
fn start<Bt: ThreadedBombardTask>(
id: usize,
driver: Bt::Worker,
rx_work: Receiver<WorkerTask<Bt::WorkerTaskSpec>>,
tx_res: Sender<WorkerResult<Bt>>,
) -> Self {
Self {
handle: thread::Builder::new()
.name(format!("worker-{id}"))
.spawn(move || {
let mut worker_driver = driver;
'blocking_wait: loop {
let task = match rx_work.recv().unwrap() {
WorkerTask::Exit => return,
WorkerTask::Task(spec) => spec,
};
let guard = GPState::get().wait_for_global_begin();
// check global state
let mut global_okay = GPState::get().load_okay();
let mut global_position = GPState::get().update_target();
// init local state
let mut local_start = None;
let mut local_elapsed = 0u128;
let mut local_head = u128::MAX;
let mut local_tail = 0;
// bombard
while (global_position != 0) & global_okay {
let task = Bt::generate_task(&task, global_position);
if local_start.is_none() {
local_start = Some(Instant::now());
}
let this_elapsed =
match Bt::worker_drive_timed(&mut worker_driver, task) {
Ok(elapsed) => elapsed,
Err(e) => {
GPState::get().post_failure();
tx_res.send(WorkerResult::Errored(e)).unwrap();
continue 'blocking_wait;
}
};
local_elapsed += this_elapsed;
if this_elapsed < local_head {
local_head = this_elapsed;
}
if this_elapsed > local_tail {
local_tail = this_elapsed;
}
global_position = GPState::get().update_target();
global_okay = GPState::get().load_okay();
}
if global_okay {
// we're done
tx_res
.send(WorkerResult::Completed(WorkerLocalStats::new(
local_start.unwrap(),
local_elapsed,
local_head,
local_tail,
)))
.unwrap();
}
drop(guard);
}
})
.expect("failed to start thread"),
}
}
}
/*
pool
*/
#[derive(Debug)]
pub enum BombardError<Bt: ThreadedBombardTask> {
InitError(Bt::WorkerInitError),
WorkerTaskError(Bt::WorkerTaskError),
AllWorkersOffline,
}
impl<Bt: ThreadedBombardTask> fmt::Display for BombardError<Bt>
where
Bt::WorkerInitError: fmt::Display,
Bt::WorkerTaskError: Display,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::AllWorkersOffline => write!(
f,
"bombard failed because all workers went offline indicating catastrophic failure"
),
Self::WorkerTaskError(e) => write!(f, "worker task failed. {e}"),
Self::InitError(e) => write!(f, "worker init failed. {e}"),
}
}
}
#[derive(Debug)]
pub struct BombardPool<Bt: ThreadedBombardTask> {
workers: Vec<(Worker, Sender<WorkerTask<Bt::WorkerTaskSpec>>)>,
rx_res: Receiver<WorkerResult<Bt>>,
_config: Bt,
}
impl<Bt: ThreadedBombardTask> BombardPool<Bt> {
pub fn new(size: usize, config: Bt) -> BombardResult<Self, Bt> {
assert_ne!(size, 0, "pool can't be empty");
let mut workers = Vec::with_capacity(size);
let (tx_res, rx_res) = unbounded();
for id in 0..size {
let (tx_work, rx_work) = unbounded();
let driver = config.worker_init().map_err(BombardError::InitError)?;
workers.push((Worker::start(id, driver, rx_work, tx_res.clone()), tx_work));
}
Ok(Self {
workers,
rx_res,
_config: config,
})
}
/// Bombard queries to the workers
pub fn blocking_bombard(
&mut self,
task_description: Bt::WorkerTaskSpec,
count: usize,
) -> BombardResult<RuntimeStats, Bt> {
GPState::guard(|paused| {
GPState::get().post_target(count as _);
let mut global_start = None;
let mut global_stop = None;
let mut global_head = u128::MAX;
let mut global_tail = 0u128;
for (_, sender) in self.workers.iter() {
sender
.send(WorkerTask::Task(task_description.clone()))
.unwrap();
}
// now let them begin!
drop(paused);
// wait for all workers to complete
let mut received = 0;
while received != self.workers.len() {
let results = match self.rx_res.recv() {
Err(_) => return Err(BombardError::AllWorkersOffline),
Ok(r) => r,
};
let WorkerLocalStats {
start: this_start,
elapsed,
head,
tail,
} = match results {
WorkerResult::Completed(r) => r,
WorkerResult::Errored(e) => return Err(BombardError::WorkerTaskError(e)),
};
// update start if required
match global_start.as_mut() {
None => {
global_start = Some(this_start);
}
Some(start) => {
if this_start < *start {
*start = this_start;
}
}
}
let this_task_stopped_at =
this_start + Duration::from_nanos(elapsed.try_into().unwrap());
match global_stop.as_mut() {
None => {
global_stop = Some(this_task_stopped_at);
}
Some(stop) => {
if this_task_stopped_at > *stop {
// this task stopped later than the previous one
*stop = this_task_stopped_at;
}
}
}
if head < global_head {
global_head = head;
}
if tail > global_tail {
global_tail = tail;
}
received += 1;
}
// reset global pool state
GPState::get().post_reset();
// compute results
let global_elapsed = global_stop
.unwrap()
.duration_since(global_start.unwrap())
.as_nanos();
Ok(RuntimeStats {
qps: super::qps(count, global_elapsed),
head: global_head,
tail: global_tail,
})
})
}
}
impl<Bt: ThreadedBombardTask> Drop for BombardPool<Bt> {
fn drop(&mut self) {
info!("taking all workers offline");
for (_, sender) in self.workers.iter() {
sender.send(WorkerTask::Exit).unwrap();
}
for (worker, _) in self.workers.drain(..) {
worker.handle.join().unwrap();
}
info!("all workers now offline");
}
}
Loading…
Cancel
Save