Add progress thread w/ custom interval for WS test

* Start progress thread that reports once a second by default
* Customizable rate with [--interval|-i]
* Portable semaphore tested on Linux & macOS
master
Jessie Murray 3 years ago
parent 69be409b87
commit 1204f66ec1
No known key found for this signature in database
GPG Key ID: E7E4D57EDDA744C5

@ -1,7 +1,7 @@
OUT=websocket pubsub OUT=websocket pubsub
OBJS=../src/http-parser/http_parser.o ../src/b64/cencode.o ../src/sha1/sha1.o OBJS=../src/http-parser/http_parser.o ../src/b64/cencode.o ../src/sha1/sha1.o
CFLAGS=-Wall -Wextra -I../src -I../src/http-parser CFLAGS=-Wall -Wextra -I../src -I../src/http-parser
LDFLAGS=-levent -lpthread LDFLAGS=-levent -lpthread -lm
# if `make` is run with DEBUG=1, include debug symbols (same as in Makefile in root directory) # if `make` is run with DEBUG=1, include debug symbols (same as in Makefile in root directory)
DEBUG_FLAGS= DEBUG_FLAGS=

@ -14,6 +14,13 @@
#include <b64/cencode.h> #include <b64/cencode.h>
#include <http-parser/http_parser.h> #include <http-parser/http_parser.h>
#include <sha1/sha1.h> #include <sha1/sha1.h>
#include <math.h>
#include <locale.h>
#ifdef __APPLE__
#include <dispatch/dispatch.h>
#else
#include <semaphore.h>
#endif
#include <sys/types.h> #include <sys/types.h>
#include <sys/socket.h> #include <sys/socket.h>
@ -30,7 +37,8 @@ enum worker_state {
WS_SENT_HANDSHAKE, WS_SENT_HANDSHAKE,
WS_RECEIVED_HANDSHAKE, WS_RECEIVED_HANDSHAKE,
WS_SENT_FRAME, WS_SENT_FRAME,
WS_COMPLETE WS_COMPLETE,
WS_BROKEN
}; };
enum mask_config { enum mask_config {
@ -87,6 +95,19 @@ struct worker_thread {
int (*debug)(const char *fmt, ...); int (*debug)(const char *fmt, ...);
}; };
struct progress_thread {
pthread_t thread;
#ifdef __APPLE__
dispatch_semaphore_t sem_finished;
#else
sem_t sem_finished;
#endif
struct worker_thread *workers;
int worker_count;
int msg_target;
float interval_sec;
};
int debug_noop(const char *fmt, ...) { int debug_noop(const char *fmt, ...) {
(void)fmt; (void)fmt;
return 0; return 0;
@ -142,9 +163,15 @@ wait_for_possible_write(struct worker_thread *wt);
static void static void
ws_enqueue_frame(struct worker_thread *wt); ws_enqueue_frame(struct worker_thread *wt);
static void
wt_mark_finished(struct worker_thread *wt, enum worker_state state) {
wt->state = state;
event_base_loopbreak(wt->base);
}
void void
process_message(struct worker_thread *wt, size_t sz) { process_message(struct worker_thread *wt, size_t sz) {
if (wt->msg_received && wt->msg_received % 1000 == 0) { if (0 && wt->msg_received && wt->msg_received % 1000 == 0) {
printf("thread %d: %8d messages left (got %9d bytes so far).\n", printf("thread %d: %8d messages left (got %9d bytes so far).\n",
wt->id, wt->id,
wt->msg_target - wt->msg_received, wt->byte_count); wt->msg_target - wt->msg_received, wt->byte_count);
@ -156,7 +183,7 @@ process_message(struct worker_thread *wt, size_t sz) {
if (wt->msg_received == wt->msg_target) { if (wt->msg_received == wt->msg_target) {
wt->debug("%s: thread %d has received all %d messages it expected\n", wt->debug("%s: thread %d has received all %d messages it expected\n",
__func__, wt->id, wt->msg_received); __func__, wt->id, wt->msg_received);
event_base_loopexit(wt->base, NULL); wt_mark_finished(wt, WS_COMPLETE);
} }
} }
@ -218,7 +245,7 @@ websocket_can_read(int fd, short event, void *ptr) {
evbuffer_debug_dump(wt, wt->rbuffer); evbuffer_debug_dump(wt, wt->rbuffer);
if (ret == 0) { if (ret == 0) {
wt->debug("We didn't read anything from the socket...\n"); wt->debug("We didn't read anything from the socket...\n");
event_base_loopexit(wt->base, NULL); wt_mark_finished(wt, WS_BROKEN);
return; return;
} }
@ -238,7 +265,7 @@ websocket_can_read(int fd, short event, void *ptr) {
if (wt->state == WS_SENT_HANDSHAKE || /* haven't encountered end of response yet */ if (wt->state == WS_SENT_HANDSHAKE || /* haven't encountered end of response yet */
(wt->parser.upgrade && nparsed != (int)avail_sz -1)) { (wt->parser.upgrade && nparsed != (int)avail_sz -1)) {
wt->debug("UPGRADE *and* we have some data left\n"); wt->debug("UPGRADE *and* we have some data left (nparsed=%d, avail_sz=%lu)\n", nparsed, avail_sz);
continue; continue;
} else if (wt->state == WS_RECEIVED_HANDSHAKE) { /* we have the full response */ } else if (wt->state == WS_RECEIVED_HANDSHAKE) { /* we have the full response */
evbuffer_drain(wt->rbuffer, evbuffer_get_length(wt->rbuffer)); evbuffer_drain(wt->rbuffer, evbuffer_get_length(wt->rbuffer));
@ -431,7 +458,75 @@ ws_on_timeout(evutil_socket_t fd, short event, void *arg) {
(void)event; (void)event;
fprintf(stderr, "Time has run out! (thread %d)\n", wt->id); fprintf(stderr, "Time has run out! (thread %d)\n", wt->id);
event_base_loopbreak(wt->base); /* break out of event loop */ wt_mark_finished(wt, WS_BROKEN); /* break out of event loop */
}
void*
progress_thread_main(void *ptr) {
struct progress_thread *pt = ptr;
struct timespec ts_wait;
ts_wait.tv_sec = floor(pt->interval_sec); /* integer seconds */
ts_wait.tv_nsec = (pt->interval_sec - (float)ts_wait.tv_sec) * 1e9; /* nanoseconds */
int last_received = 0;
int num_sleeps = 0;
setlocale(LC_NUMERIC, "");
struct timespec ts_start;
clock_gettime(CLOCK_MONOTONIC, &ts_start);
while(1) {
int sem_received = 0;
#ifdef __APPLE__
dispatch_time_t sem_timeout = dispatch_time(DISPATCH_TIME_NOW, (int64_t)(ts_wait.tv_sec * NSEC_PER_SEC + ts_wait.tv_nsec));
if (dispatch_semaphore_wait(pt->sem_finished, sem_timeout) == 0) {
sem_received = 1;
}
#else
struct timespec ts_now, ts_sem_timeout; /* now + timeout */
/* get current time */
if (clock_gettime(CLOCK_REALTIME, &ts_now) == -1) {
fprintf(stderr, "clock_gettime failed: %s\n", strerror(errno));
exit(EXIT_FAILURE);
}
/* calculate when the timeout should occur */
long long now_nanos = ts_now.tv_sec * 1e9 + ts_now.tv_nsec;
long long sem_timeout_nanos = now_nanos + (long long)ts_wait.tv_sec * 1e9 + (long long)ts_wait.tv_nsec;
ts_sem_timeout.tv_sec = sem_timeout_nanos / 1000000000LL;
ts_sem_timeout.tv_nsec = sem_timeout_nanos % 1000000000LL;
int sem_ret = sem_timedwait(&pt->sem_finished, &ts_sem_timeout);
if (sem_ret == 0) {
sem_received = 1;
}
#endif
// nanosleep(&ts, NULL);
num_sleeps++;
int total_sent = 0, total_received = 0, any_broken = 0, num_complete = 0;
for (int i = 0; i < pt->worker_count; i++) {
total_sent += pt->workers[i].msg_sent;
total_received += pt->workers[i].msg_received;
if (pt->workers[i].state == WS_BROKEN) {
any_broken = 1;
} else if (pt->workers[i].state == WS_COMPLETE) {
num_complete++;
}
}
struct timespec ts_after_sleep;
clock_gettime(CLOCK_MONOTONIC, &ts_after_sleep);
fprintf(stderr, "After %0.2f sec: %'d messages sent, %'d received (%.02f%%). Instant rate: %'ld/sec, overall rate: %'ld/sec\n",
((float)((ts_after_sleep.tv_sec * 1e9 + ts_after_sleep.tv_nsec) - (ts_start.tv_sec * 1e9 + ts_start.tv_nsec))) / (float)1e9,
total_sent, total_received, 100.0f * (float)total_received / (float)(pt->worker_count * pt->msg_target),
lroundf((float)(total_received - last_received) / pt->interval_sec),
lroundf((float)total_received / ((float)num_sleeps) * pt->interval_sec));
if (sem_received || total_received == pt->msg_target * pt->worker_count || any_broken || num_complete == pt->worker_count) {
break;
}
last_received = total_received;
}
return NULL;
} }
void* void*
@ -560,6 +655,7 @@ void usage(const char *argv0, char *host_default, short port_default,
"\t[--messages|-n] COUNT\t(number of messages per thread, default = %d)\n" "\t[--messages|-n] COUNT\t(number of messages per thread, default = %d)\n"
"\t[--mask|-m] MASK_CFG\t(%d: always, %d: never, %d: alternate, default = always)\n" "\t[--mask|-m] MASK_CFG\t(%d: always, %d: never, %d: alternate, default = always)\n"
"\t[--max-time|-t] SECONDS\t(max time to give to the run, default = unlimited)\n" "\t[--max-time|-t] SECONDS\t(max time to give to the run, default = unlimited)\n"
"\t[--interval|-i] SECONDS\t(interval at which to report progress, default = 1)\n"
"\t[--verbose|-v]\t\t(extremely verbose output)\n", "\t[--verbose|-v]\t\t(extremely verbose output)\n",
argv0, host_default, (int)port_default, argv0, host_default, (int)port_default,
thread_count_default, messages_default, thread_count_default, messages_default,
@ -569,8 +665,6 @@ void usage(const char *argv0, char *host_default, short port_default,
int int
main(int argc, char *argv[]) { main(int argc, char *argv[]) {
struct timespec t0, t1;
int messages_default = 2500; int messages_default = 2500;
int thread_count_default = 4; int thread_count_default = 4;
short port_default = 7379; short port_default = 7379;
@ -583,6 +677,7 @@ main(int argc, char *argv[]) {
long total = 0, total_bytes = 0; long total = 0, total_bytes = 0;
int verbose = 0; int verbose = 0;
int timeout_seconds = -1; int timeout_seconds = -1;
float report_interval = 1.0;
enum mask_config mask_cfg = MASK_ALWAYS; enum mask_config mask_cfg = MASK_ALWAYS;
struct host_info hi = {host_default, port_default}; struct host_info hi = {host_default, port_default};
@ -598,9 +693,10 @@ main(int argc, char *argv[]) {
{"messages", required_argument, NULL, 'n'}, {"messages", required_argument, NULL, 'n'},
{"mask", required_argument, NULL, 'm'}, {"mask", required_argument, NULL, 'm'},
{"max-time", required_argument, NULL, 't'}, {"max-time", required_argument, NULL, 't'},
{"interval", required_argument, NULL, 'i'},
{"verbose", no_argument, NULL, 'v'}, {"verbose", no_argument, NULL, 'v'},
{0, 0, 0, 0}}; {0, 0, 0, 0}};
while ((opt = getopt_long(argc, argv, "h:p:c:n:m:t:vs", long_options, NULL)) != -1) { while ((opt = getopt_long(argc, argv, "h:p:c:n:m:t:i:vs", long_options, NULL)) != -1) {
switch (opt) { switch (opt) {
case 'h': case 'h':
colon = strchr(optarg, ':'); colon = strchr(optarg, ':');
@ -640,6 +736,10 @@ main(int argc, char *argv[]) {
timeout_seconds = atoi(optarg); timeout_seconds = atoi(optarg);
break; break;
case 'i':
report_interval = atof(optarg);
break;
case 'v': case 'v':
verbose = 1; verbose = 1;
break; break;
@ -655,7 +755,22 @@ main(int argc, char *argv[]) {
/* run threads */ /* run threads */
workers = calloc(sizeof(struct worker_thread), thread_count); workers = calloc(sizeof(struct worker_thread), thread_count);
clock_gettime(CLOCK_MONOTONIC, &t0); struct progress_thread progress;
progress.interval_sec = report_interval;
progress.msg_target = msg_target;
progress.worker_count = thread_count;
progress.workers = workers;
#ifdef __APPLE__
dispatch_semaphore_t *sem = &progress.sem_finished;
*sem = dispatch_semaphore_create(0);
#else
if (sem_init(&progress.sem_finished, 0, 0) != 0) {
fprintf(stderr, "sem_init failed: %s\n", strerror(errno));
exit(EXIT_FAILURE);
}
#endif
pthread_create(&progress.thread, NULL, progress_thread_main, &progress);
for (i = 0; i < thread_count; ++i) { for (i = 0; i < thread_count; ++i) {
workers[i].id = i; workers[i].id = i;
workers[i].msg_target = msg_target; workers[i].msg_target = msg_target;
@ -676,25 +791,16 @@ main(int argc, char *argv[]) {
total_bytes += workers[i].byte_count; total_bytes += workers[i].byte_count;
} }
/* timing */ /* signal progress thread to stop */
clock_gettime(CLOCK_MONOTONIC, &t1); #ifdef __APPLE__
float mili0 = t0.tv_sec * 1000 + t0.tv_nsec / 1000000; dispatch_semaphore_signal(progress.sem_finished);
float mili1 = t1.tv_sec * 1000 + t1.tv_nsec / 1000000; #else
sem_post(&progress.sem_finished);
#endif
if (total != 0) { pthread_join(progress.thread, NULL);
int total_masked = 0;
for (i = 0; i < thread_count; ++i) {
total_masked += workers[i].mask_applied;
}
double kb_per_sec = ((double)total_bytes / (double)(mili1 - mili0)) / 1.024; if (total != 0) {
printf("Sent+received %ld messages (%d sent masked) for a total of %ld bytes in %0.2f sec: %0.2f msg/sec (%0.2f KB/sec)\n",
total,
total_masked,
total_bytes,
(mili1 - mili0) / 1000.0,
1000 * ((double)total) / (mili1 - mili0),
kb_per_sec);
return (total == thread_count * msg_target ? EXIT_SUCCESS : EXIT_FAILURE); return (total == thread_count * msg_target ? EXIT_SUCCESS : EXIT_FAILURE);
} else { } else {
printf("No message was read.\n"); printf("No message was read.\n");

Loading…
Cancel
Save