diff --git a/tests/Makefile b/tests/Makefile index b2fa1f3..55c87f5 100644 --- a/tests/Makefile +++ b/tests/Makefile @@ -1,7 +1,7 @@ OUT=websocket pubsub OBJS=../src/http-parser/http_parser.o ../src/b64/cencode.o ../src/sha1/sha1.o CFLAGS=-Wall -Wextra -I../src -I../src/http-parser -LDFLAGS=-levent -lpthread +LDFLAGS=-levent -lpthread -lm # if `make` is run with DEBUG=1, include debug symbols (same as in Makefile in root directory) DEBUG_FLAGS= diff --git a/tests/websocket.c b/tests/websocket.c index cab2300..aa501ca 100644 --- a/tests/websocket.c +++ b/tests/websocket.c @@ -14,6 +14,13 @@ #include #include #include +#include +#include +#ifdef __APPLE__ +#include +#else +#include +#endif #include #include @@ -30,7 +37,8 @@ enum worker_state { WS_SENT_HANDSHAKE, WS_RECEIVED_HANDSHAKE, WS_SENT_FRAME, - WS_COMPLETE + WS_COMPLETE, + WS_BROKEN }; enum mask_config { @@ -87,6 +95,19 @@ struct worker_thread { int (*debug)(const char *fmt, ...); }; +struct progress_thread { + pthread_t thread; +#ifdef __APPLE__ + dispatch_semaphore_t sem_finished; +#else + sem_t sem_finished; +#endif + struct worker_thread *workers; + int worker_count; + int msg_target; + float interval_sec; +}; + int debug_noop(const char *fmt, ...) { (void)fmt; return 0; @@ -142,9 +163,15 @@ wait_for_possible_write(struct worker_thread *wt); static void ws_enqueue_frame(struct worker_thread *wt); +static void +wt_mark_finished(struct worker_thread *wt, enum worker_state state) { + wt->state = state; + event_base_loopbreak(wt->base); +} + void process_message(struct worker_thread *wt, size_t sz) { - if (wt->msg_received && wt->msg_received % 1000 == 0) { + if (0 && wt->msg_received && wt->msg_received % 1000 == 0) { printf("thread %d: %8d messages left (got %9d bytes so far).\n", wt->id, wt->msg_target - wt->msg_received, wt->byte_count); @@ -156,7 +183,7 @@ process_message(struct worker_thread *wt, size_t sz) { if (wt->msg_received == wt->msg_target) { wt->debug("%s: thread %d has received all %d messages it expected\n", __func__, wt->id, wt->msg_received); - event_base_loopexit(wt->base, NULL); + wt_mark_finished(wt, WS_COMPLETE); } } @@ -218,7 +245,7 @@ websocket_can_read(int fd, short event, void *ptr) { evbuffer_debug_dump(wt, wt->rbuffer); if (ret == 0) { wt->debug("We didn't read anything from the socket...\n"); - event_base_loopexit(wt->base, NULL); + wt_mark_finished(wt, WS_BROKEN); return; } @@ -238,7 +265,7 @@ websocket_can_read(int fd, short event, void *ptr) { if (wt->state == WS_SENT_HANDSHAKE || /* haven't encountered end of response yet */ (wt->parser.upgrade && nparsed != (int)avail_sz -1)) { - wt->debug("UPGRADE *and* we have some data left\n"); + wt->debug("UPGRADE *and* we have some data left (nparsed=%d, avail_sz=%lu)\n", nparsed, avail_sz); continue; } else if (wt->state == WS_RECEIVED_HANDSHAKE) { /* we have the full response */ evbuffer_drain(wt->rbuffer, evbuffer_get_length(wt->rbuffer)); @@ -431,7 +458,75 @@ ws_on_timeout(evutil_socket_t fd, short event, void *arg) { (void)event; fprintf(stderr, "Time has run out! (thread %d)\n", wt->id); - event_base_loopbreak(wt->base); /* break out of event loop */ + wt_mark_finished(wt, WS_BROKEN); /* break out of event loop */ +} + +void* +progress_thread_main(void *ptr) { + struct progress_thread *pt = ptr; + struct timespec ts_wait; + ts_wait.tv_sec = floor(pt->interval_sec); /* integer seconds */ + ts_wait.tv_nsec = (pt->interval_sec - (float)ts_wait.tv_sec) * 1e9; /* nanoseconds */ + + int last_received = 0; + int num_sleeps = 0; + setlocale(LC_NUMERIC, ""); + + struct timespec ts_start; + clock_gettime(CLOCK_MONOTONIC, &ts_start); + + while(1) { + int sem_received = 0; +#ifdef __APPLE__ + dispatch_time_t sem_timeout = dispatch_time(DISPATCH_TIME_NOW, (int64_t)(ts_wait.tv_sec * NSEC_PER_SEC + ts_wait.tv_nsec)); + if (dispatch_semaphore_wait(pt->sem_finished, sem_timeout) == 0) { + sem_received = 1; + } +#else + struct timespec ts_now, ts_sem_timeout; /* now + timeout */ + + /* get current time */ + if (clock_gettime(CLOCK_REALTIME, &ts_now) == -1) { + fprintf(stderr, "clock_gettime failed: %s\n", strerror(errno)); + exit(EXIT_FAILURE); + } + /* calculate when the timeout should occur */ + long long now_nanos = ts_now.tv_sec * 1e9 + ts_now.tv_nsec; + long long sem_timeout_nanos = now_nanos + (long long)ts_wait.tv_sec * 1e9 + (long long)ts_wait.tv_nsec; + ts_sem_timeout.tv_sec = sem_timeout_nanos / 1000000000LL; + ts_sem_timeout.tv_nsec = sem_timeout_nanos % 1000000000LL; + + int sem_ret = sem_timedwait(&pt->sem_finished, &ts_sem_timeout); + if (sem_ret == 0) { + sem_received = 1; + } +#endif + // nanosleep(&ts, NULL); + num_sleeps++; + int total_sent = 0, total_received = 0, any_broken = 0, num_complete = 0; + for (int i = 0; i < pt->worker_count; i++) { + total_sent += pt->workers[i].msg_sent; + total_received += pt->workers[i].msg_received; + if (pt->workers[i].state == WS_BROKEN) { + any_broken = 1; + } else if (pt->workers[i].state == WS_COMPLETE) { + num_complete++; + } + } + struct timespec ts_after_sleep; + clock_gettime(CLOCK_MONOTONIC, &ts_after_sleep); + fprintf(stderr, "After %0.2f sec: %'d messages sent, %'d received (%.02f%%). Instant rate: %'ld/sec, overall rate: %'ld/sec\n", + ((float)((ts_after_sleep.tv_sec * 1e9 + ts_after_sleep.tv_nsec) - (ts_start.tv_sec * 1e9 + ts_start.tv_nsec))) / (float)1e9, + total_sent, total_received, 100.0f * (float)total_received / (float)(pt->worker_count * pt->msg_target), + lroundf((float)(total_received - last_received) / pt->interval_sec), + lroundf((float)total_received / ((float)num_sleeps) * pt->interval_sec)); + + if (sem_received || total_received == pt->msg_target * pt->worker_count || any_broken || num_complete == pt->worker_count) { + break; + } + last_received = total_received; + } + return NULL; } void* @@ -560,6 +655,7 @@ void usage(const char *argv0, char *host_default, short port_default, "\t[--messages|-n] COUNT\t(number of messages per thread, default = %d)\n" "\t[--mask|-m] MASK_CFG\t(%d: always, %d: never, %d: alternate, default = always)\n" "\t[--max-time|-t] SECONDS\t(max time to give to the run, default = unlimited)\n" + "\t[--interval|-i] SECONDS\t(interval at which to report progress, default = 1)\n" "\t[--verbose|-v]\t\t(extremely verbose output)\n", argv0, host_default, (int)port_default, thread_count_default, messages_default, @@ -569,8 +665,6 @@ void usage(const char *argv0, char *host_default, short port_default, int main(int argc, char *argv[]) { - struct timespec t0, t1; - int messages_default = 2500; int thread_count_default = 4; short port_default = 7379; @@ -583,6 +677,7 @@ main(int argc, char *argv[]) { long total = 0, total_bytes = 0; int verbose = 0; int timeout_seconds = -1; + float report_interval = 1.0; enum mask_config mask_cfg = MASK_ALWAYS; struct host_info hi = {host_default, port_default}; @@ -598,9 +693,10 @@ main(int argc, char *argv[]) { {"messages", required_argument, NULL, 'n'}, {"mask", required_argument, NULL, 'm'}, {"max-time", required_argument, NULL, 't'}, + {"interval", required_argument, NULL, 'i'}, {"verbose", no_argument, NULL, 'v'}, {0, 0, 0, 0}}; - while ((opt = getopt_long(argc, argv, "h:p:c:n:m:t:vs", long_options, NULL)) != -1) { + while ((opt = getopt_long(argc, argv, "h:p:c:n:m:t:i:vs", long_options, NULL)) != -1) { switch (opt) { case 'h': colon = strchr(optarg, ':'); @@ -640,6 +736,10 @@ main(int argc, char *argv[]) { timeout_seconds = atoi(optarg); break; + case 'i': + report_interval = atof(optarg); + break; + case 'v': verbose = 1; break; @@ -655,7 +755,22 @@ main(int argc, char *argv[]) { /* run threads */ workers = calloc(sizeof(struct worker_thread), thread_count); - clock_gettime(CLOCK_MONOTONIC, &t0); + struct progress_thread progress; + progress.interval_sec = report_interval; + progress.msg_target = msg_target; + progress.worker_count = thread_count; + progress.workers = workers; +#ifdef __APPLE__ + dispatch_semaphore_t *sem = &progress.sem_finished; + *sem = dispatch_semaphore_create(0); +#else + if (sem_init(&progress.sem_finished, 0, 0) != 0) { + fprintf(stderr, "sem_init failed: %s\n", strerror(errno)); + exit(EXIT_FAILURE); + } +#endif + pthread_create(&progress.thread, NULL, progress_thread_main, &progress); + for (i = 0; i < thread_count; ++i) { workers[i].id = i; workers[i].msg_target = msg_target; @@ -676,25 +791,16 @@ main(int argc, char *argv[]) { total_bytes += workers[i].byte_count; } - /* timing */ - clock_gettime(CLOCK_MONOTONIC, &t1); - float mili0 = t0.tv_sec * 1000 + t0.tv_nsec / 1000000; - float mili1 = t1.tv_sec * 1000 + t1.tv_nsec / 1000000; + /* signal progress thread to stop */ +#ifdef __APPLE__ + dispatch_semaphore_signal(progress.sem_finished); +#else + sem_post(&progress.sem_finished); +#endif - if (total != 0) { - int total_masked = 0; - for (i = 0; i < thread_count; ++i) { - total_masked += workers[i].mask_applied; - } + pthread_join(progress.thread, NULL); - double kb_per_sec = ((double)total_bytes / (double)(mili1 - mili0)) / 1.024; - printf("Sent+received %ld messages (%d sent masked) for a total of %ld bytes in %0.2f sec: %0.2f msg/sec (%0.2f KB/sec)\n", - total, - total_masked, - total_bytes, - (mili1 - mili0) / 1000.0, - 1000 * ((double)total) / (mili1 - mili0), - kb_per_sec); + if (total != 0) { return (total == thread_count * msg_target ? EXIT_SUCCESS : EXIT_FAILURE); } else { printf("No message was read.\n");