From b0740d0c871c59195ca4ae11591e244e98a1043e Mon Sep 17 00:00:00 2001 From: Jessie Murray Date: Sun, 27 Jun 2021 18:38:51 -0700 Subject: [PATCH] Major update of websocket test 1. Switch to evbuffer for correct handling of partial writes 2. Implement WS state machine in each worker 3. Clean up debug logging 4. Add detailed network log messages to help find WS issues 5. Switch to getopt_long --- tests/websocket.c | 305 +++++++++++++--------------------------------- 1 file changed, 88 insertions(+), 217 deletions(-) diff --git a/tests/websocket.c b/tests/websocket.c index 3c53cb1..c0277d7 100644 --- a/tests/websocket.c +++ b/tests/websocket.c @@ -9,6 +9,7 @@ #include #include #include +#include #include #include @@ -40,6 +41,7 @@ struct worker_thread { int msg_received; int msg_sent; int byte_count; + int id; pthread_t thread; enum worker_state state; @@ -55,34 +57,48 @@ struct worker_thread { http_parser parser; http_parser_settings settings; + + int (*debug)(const char *fmt, ...); }; +int debug_noop(const char *fmt, ...) { + (void)fmt; + return 0; +} + +int debug_verbose(const char *fmt, ...) { + int ret; + va_list vargs; + va_start(vargs, fmt); + ret = vfprintf(stderr, fmt, vargs); + va_end(vargs); + return ret; +} + void -hex_dump(char *p, size_t sz) { -#if DEBUG_LOGS - printf("hex dump of %p (%ld bytes)\n", p, sz); +hex_dump(struct worker_thread *wt, char *p, size_t sz) { + wt->debug("hex dump of %p (%ld bytes)\n", p, sz); for (char *cur = p; cur < p + sz; cur += 16) { char letters[16] = {0}; int limit = (cur + 16) > p + sz ? (sz % 16) : 16; - printf("%08lx ", cur - p); /* address */ + wt->debug("%08lx ", cur - p); /* address */ for (int i = 0; i < limit; i++) { - printf("%02x ", (unsigned int)(cur[i] & 0xff)); + wt->debug("%02x ", (unsigned int)(cur[i] & 0xff)); letters[i] = isprint(cur[i]) ? cur[i] : '.'; } for (int i = limit; i < 16; i++) { /* pad on last line */ - printf(" "); + wt->debug(" "); } - printf(" %.*s\n", limit, letters); + wt->debug(" %.*s\n", limit, letters); } -#endif } void -evbuffer_debug_dump(struct evbuffer *buffer) { +evbuffer_debug_dump(struct worker_thread *wt, struct evbuffer *buffer) { size_t sz = evbuffer_get_length(buffer); char *data = malloc(sz); evbuffer_remove(buffer, data, sz); - hex_dump(data, sz); + hex_dump(wt, data, sz); evbuffer_prepend(buffer, data, sz); free(data); } @@ -99,9 +115,9 @@ void process_message(struct worker_thread *wt, size_t sz) { // printf("process_message\n"); - if(wt->msg_received % 10000 == 0) { - printf("thread %u: %8d messages left (got %9d bytes so far).\n", - (unsigned int)wt->thread, + if(wt->msg_received && wt->msg_received % 1000 == 0) { + printf("thread %d: %8d messages left (got %9d bytes so far).\n", + wt->id, wt->msg_target - wt->msg_received, wt->byte_count); } wt->byte_count += sz; @@ -120,9 +136,7 @@ void websocket_can_write(int fd, short event, void *ptr) { int ret; struct worker_thread *wt = ptr; -#if DEBUG_LOGS - printf("%s (wt=%p, fd=%d)\n", __func__, wt, fd); -#endif + wt->debug("%s (wt=%p, fd=%d)\n", __func__, wt, fd); if(event != EV_WRITE) { return; @@ -131,31 +145,23 @@ websocket_can_write(int fd, short event, void *ptr) { { case WS_INITIAL: { /* still sending initial HTTP request */ ret = evbuffer_write(wt->wbuffer, fd); -#if DEBUG_LOGS - printf("evbuffer_write returned %d\n", ret); - printf("evbuffer_get_length returned %d\n", evbuffer_get_length(wt->wbuffer)); -#endif + wt->debug("evbuffer_write returned %d\n", ret); + wt->debug("evbuffer_get_length returned %d\n", evbuffer_get_length(wt->wbuffer)); if (evbuffer_get_length(wt->wbuffer) != 0) { /* not all written */ wait_for_possible_write(wt); return; } /* otherwise, we've sent the full request, time to read the response */ wt->state = WS_SENT_HANDSHAKE; -#if DEBUG_LOGS - printf("state=WS_SENT_HANDSHAKE\n"); -#endif + wt->debug("state=WS_SENT_HANDSHAKE\n"); wait_for_possible_read(wt); return; } case WS_RECEIVED_HANDSHAKE: { /* ready to send a frame */ -#if DEBUG_LOGS - printf("About to send data for WS frame, %lu in buffer\n", evbuffer_get_length(wt->wbuffer)); -#endif + wt->debug("About to send data for WS frame, %lu in buffer\n", evbuffer_get_length(wt->wbuffer)); evbuffer_write(wt->wbuffer, fd); size_t write_remains = evbuffer_get_length(wt->wbuffer); -#if DEBUG_LOGS - printf("Sent data for WS frame, still %lu left to write\n", write_remains); -#endif + wt->debug("Sent data for WS frame, still %lu left to write\n", write_remains); if (write_remains == 0) { /* ready to read response */ wt->state = WS_SENT_FRAME; wt->msg_sent++; @@ -168,32 +174,14 @@ websocket_can_write(int fd, short event, void *ptr) { default: break; } -#if 0 - char message[] = "\x00[\"SET\",\"key\",\"value\"]\xff\x00[\"GET\",\"key\"]\xff"; - ret = write(fd, message, sizeof(message)-1); - if(ret != sizeof(message)-1) { - fprintf(stderr, "write on %d failed: %s\n", fd, strerror(errno)); - close(fd); - } - - wt->msg_sent += 2; - if(wt->msg_sent < wt->msg_target) { - event_set(&wt->ev_w, fd, EV_WRITE, websocket_can_write, wt); - event_base_set(wt->base, &wt->ev_w); - ret = event_add(&wt->ev_w, NULL); - } -#endif } static void websocket_can_read(int fd, short event, void *ptr) { - char packet[2048], *pos; - int ret, success = 1; + int ret; struct worker_thread *wt = ptr; -#if DEBUG_LOGS - printf("%s (wt=%p)\n", __func__, wt); -#endif + wt->debug("%s (wt=%p)\n", __func__, wt); if(event != EV_READ) { return; @@ -201,14 +189,10 @@ websocket_can_read(int fd, short event, void *ptr) { /* read message */ ret = evbuffer_read(wt->rbuffer, fd, 65536); -#if DEBUG_LOGS - printf("evbuffer_read() returned %d; wt->state=%d. wt->rbuffer:\n", ret, wt->state); -#endif - evbuffer_debug_dump(wt->rbuffer); + wt->debug("evbuffer_read() returned %d; wt->state=%d. wt->rbuffer:\n", ret, wt->state); + evbuffer_debug_dump(wt, wt->rbuffer); if (ret == 0) { -#if DEBUG_LOGS - printf("We didn't read anything from the socket...\n"); -#endif + wt->debug("We didn't read anything from the socket...\n"); wait_for_possible_read(wt); return; } @@ -218,41 +202,30 @@ websocket_can_read(int fd, short event, void *ptr) { case WS_SENT_HANDSHAKE: { /* waiting for handshake response */ size_t avail_sz = evbuffer_get_length(wt->rbuffer); char *tmp = calloc(avail_sz, 1); -#if DEBUG_LOGS - printf("avail_sz from rbuffer = %lu\n", avail_sz); -#endif + wt->debug("avail_sz from rbuffer = %lu\n", avail_sz); evbuffer_remove(wt->rbuffer, tmp, avail_sz); /* copy into `tmp` */ -#if DEBUG_LOGS - printf("Giving %lu bytes to http-parser\n", avail_sz); -#endif + wt->debug("Giving %lu bytes to http-parser\n", avail_sz); int nparsed = http_parser_execute(&wt->parser, &wt->settings, tmp, avail_sz); -#if DEBUG_LOGS - printf("http-parser returned %d\n", nparsed); -#endif + wt->debug("http-parser returned %d\n", nparsed); if (nparsed != (int)avail_sz) { // put back what we didn't read -#if DEBUG_LOGS - printf("re-attach (prepend) %lu bytes\n", avail_sz - nparsed); -#endif + wt->debug("re-attach (prepend) %lu byte%c\n", avail_sz - nparsed, + avail_sz - nparsed > 1 ? 's' : ' '); evbuffer_prepend(wt->rbuffer, tmp + nparsed, avail_sz - nparsed); } free(tmp); if (wt->state == WS_SENT_HANDSHAKE && /* haven't encountered end of response yet */ wt->parser.upgrade && nparsed != (int)avail_sz) { -#if DEBUG_LOGS - printf("UPGRADE *and* we have some data left\n"); -#endif + wt->debug("UPGRADE *and* we have some data left\n"); continue; } else if (wt->state == WS_RECEIVED_HANDSHAKE) { /* we have the full response */ evbuffer_drain(wt->rbuffer, evbuffer_get_length(wt->rbuffer)); } + return; } - return; case WS_SENT_FRAME: { /* waiting for frame response */ -#if DEBUG_LOGS - printf("We're in WS_SENT_FRAME, just read a frame response. wt->rbuffer:\n"); -#endif - evbuffer_debug_dump(wt->rbuffer); + wt->debug("We're in WS_SENT_FRAME, just read a frame response. wt->rbuffer:\n"); + evbuffer_debug_dump(wt, wt->rbuffer); uint8_t flag_opcodes, payload_len; if (evbuffer_get_length(wt->rbuffer) < 2) { /* not enough data */ wait_for_possible_read(wt); @@ -264,116 +237,27 @@ websocket_can_read(int fd, short event, void *ptr) { process_message(wt, payload_len); if (evbuffer_get_length(wt->rbuffer) == 0) { /* consumed everything, let's write again */ -#if DEBUG_LOGS - printf("our turn to write again\n"); -#endif + wt->debug("our turn to write again\n"); wt->state = WS_RECEIVED_HANDSHAKE; ws_enqueue_frame(wt); return; } else { -#if DEBUG_LOGS - printf("there's still data to consume\n"); -#endif + wt->debug("there's still data to consume\n"); continue; } -#if 0 - struct evbuffer_ptr sof = evbuffer_search(wt->rbuffer, "\x00", 1, NULL); - struct evbuffer_ptr eof = evbuffer_search(wt->rbuffer, "\xff", 1, NULL); - if (evbuffer_get_length(wt->rbuffer) >= 1 && sof.pos != 0) { - printf("ERROR: length=%lu, sof at pos %ld\n", evbuffer_get_length(wt->rbuffer), sof.pos); - } - if (eof.pos == -1) { /* not there yet */ - printf("Couldn't find the end-of-frame marker, need to read more\n"); - wait_for_possible_read(wt); - } else { /* we have a frame */ - size_t bounded_frame_sz = eof.pos + 1; - char *bounded_frame = calloc(bounded_frame_sz, 1); - evbuffer_remove(wt->rbuffer, bounded_frame, bounded_frame_sz); - process_message(wt, bounded_frame_sz); - printf("Received frame (%lu bytes total):\n", bounded_frame_sz); - hex_dump(bounded_frame, bounded_frame_sz); - free(bounded_frame); - if (evbuffer_get_length(wt->rbuffer) > 0) { /* we may have more frames to process */ - continue; - } else { /* our turn to send a frame */ - printf("Add frame to write buffer\n"); - ws_enqueue_frame(wt); - } - } -#endif + return; } - return; default: return; } } - -#if 0 - pos = packet; - if(ret > 0) { - char *data, *last; - int sz, msg_sz; - - if(wt->got_header == 0) { /* first response */ - char *frame_start = strstr(packet, "MH"); /* end of the handshake */ - if(frame_start == NULL) { - return; /* not yet */ - } else { /* start monitoring possible writes */ - printf("start monitoring possible writes\n"); - evbuffer_add(wt->rbuffer, frame_start + 2, ret - (frame_start + 2 - packet)); - - wt->got_header = 1; - event_set(&wt->ev_w, fd, EV_WRITE, - websocket_can_write, wt); - event_base_set(wt->base, &wt->ev_w); - ret = event_add(&wt->ev_w, NULL); - } - } else { - /* we've had the header already, now bufffer data. */ - evbuffer_add(wt->rbuffer, packet, ret); - } - - while(1) { - data = (char*)EVBUFFER_DATA(wt->rbuffer); - sz = EVBUFFER_LENGTH(wt->rbuffer); - - if(sz == 0) { /* no data */ - break; - } - if(*data != 0) { /* missing frame start */ - success = 0; - break; - } - last = memchr(data, 0xff, sz); /* look for frame end */ - if(!last) { - /* no end of frame in sight. */ - break; - } - msg_sz = last - data - 1; - process_message(ptr, msg_sz); /* record packet */ - - /* drain including frame delimiters (+2 bytes) */ - evbuffer_drain(wt->rbuffer, msg_sz + 2); - } - } else { - printf("ret=%d\n", ret); - success = 0; - } - if(success == 0) { - shutdown(fd, SHUT_RDWR); - close(fd); - event_base_loopexit(wt->base, NULL); - } -#endif } static void wait_for_possible_read(struct worker_thread *wt) { -#if DEBUG_LOGS - printf("%s (wt=%p)\n", __func__, wt); -#endif + wt->debug("%s (wt=%p)\n", __func__, wt); event_set(&wt->ev_r, wt->fd, EV_READ, websocket_can_read, wt); event_base_set(wt->base, &wt->ev_r); event_add(&wt->ev_r, NULL); @@ -381,9 +265,7 @@ wait_for_possible_read(struct worker_thread *wt) { static void wait_for_possible_write(struct worker_thread *wt) { -#if DEBUG_LOGS - printf("%s (wt=%p)\n", __func__, wt); -#endif + wt->debug("%s (wt=%p)\n", __func__, wt); event_set(&wt->ev_r, wt->fd, EV_WRITE, websocket_can_write, wt); event_base_set(wt->base, &wt->ev_r); event_add(&wt->ev_r, NULL); @@ -393,9 +275,7 @@ static int ws_on_headers_complete(http_parser *p) { struct worker_thread *wt = p->data; -#if DEBUG_LOGS - printf("%s (wt=%p)\n", __func__, wt); -#endif + wt->debug("%s (wt=%p)\n", __func__, wt); // TODO return 0; } @@ -409,7 +289,7 @@ ws_enqueue_frame_for_command(struct worker_thread *wt, char *cmd, size_t sz) { uint8_t len = (uint8_t)(sz); /* (1 << 7) | length. */ len |= (1 << 7); /* set masking bit ON */ - for (int i = 0; i < sz; i++) { + for (size_t i = 0; i < sz; i++) { cmd[i] = (cmd[i] ^ mask[i%4]) & 0xff; } /* 0x81 = 10000001b: FIN bit (only one message in the frame), text frame */ @@ -421,13 +301,6 @@ ws_enqueue_frame_for_command(struct worker_thread *wt, char *cmd, size_t sz) { static void ws_enqueue_frame(struct worker_thread *wt) { - -#if 0 - char set_command[] = "[\"SET\",\"key\",\"value\"]"; - ws_enqueue_frame_for_command(wt, set_command, sizeof(set_command) - 1); - char get_command[] = "[\"GET\",\"key\"]"; - ws_enqueue_frame_for_command(wt, get_command, sizeof(get_command) - 1); -#endif char ping_command[] = "[\"PING\"]"; ws_enqueue_frame_for_command(wt, ping_command, sizeof(ping_command) - 1); @@ -438,9 +311,7 @@ static int ws_on_message_complete(http_parser *p) { struct worker_thread *wt = p->data; -#if DEBUG_LOGS - printf("%s (wt=%p)\n", __func__, wt); -#endif + wt->debug("%s (wt=%p)\n", __func__, wt); // we've received the full HTTP response now, so we're ready to send frames wt->state = WS_RECEIVED_HANDSHAKE; ws_enqueue_frame(wt); /* add frame to buffer and register interest in writing */ @@ -464,8 +335,6 @@ worker_main(void *ptr) { int ret; int fd; struct sockaddr_in addr; - char *ws_handshake; - size_t ws_handshake_sz; /* connect socket */ fd = socket(AF_INET, SOCK_STREAM, 0); @@ -495,24 +364,15 @@ worker_main(void *ptr) { http_parser_init(&wt->parser, HTTP_RESPONSE); wt->parser.data = wt; - /* build handshake buffer */ - /* - ws_handshake_sz = sizeof(ws_handshake) - + 2*strlen(wt->hi->host) + 500; - ws_handshake = calloc(ws_handshake_sz, 1); - ws_handshake_sz = (size_t)sprintf(ws_handshake, ws_template, - wt->hi->host, wt->hi->port, - wt->hi->host, wt->hi->port); - */ - int added = evbuffer_add_printf(wt->wbuffer, ws_template, wt->hi->host, wt->hi->port, - wt->hi->host, wt->hi->port); + /* add GET request to buffer */ + evbuffer_add_printf(wt->wbuffer, ws_template, wt->hi->host, wt->hi->port, + wt->hi->host, wt->hi->port); wait_for_possible_write(wt); /* request callback */ /* go! */ event_base_dispatch(wt->base); - printf("event_base_dispatch returned\n"); + wt->debug("event_base_dispatch returned\n"); event_base_free(wt->base); - // free(ws_handshake); return NULL; } @@ -524,7 +384,7 @@ usage(const char* argv0, char *host_default, short port_default, "Options are:\n" "\t-h host\t\t(default = \"%s\")\n" "\t-p port\t\t(default = %d)\n" - "\t-c threads\t(default = %d)\n" + "\t-t threads\t(default = %d)\n" "\t-n count\t(number of messages per thread, default = %d)\n" "\t-v\t\t(verbose)\n", argv0, host_default, (int)port_default, @@ -545,15 +405,24 @@ main(int argc, char *argv[]) { int thread_count = thread_count_default; int i, opt; char *colon; - double total = 0, total_bytes = 0; - int verbose = 0, single = 0; + long total = 0, total_bytes = 0; + int verbose = 0; struct host_info hi = {host_default, port_default}; struct worker_thread *workers; /* getopt */ - while ((opt = getopt(argc, argv, "h:p:c:n:vs")) != -1) { + struct option long_options[] = { + {"help", no_argument, NULL, '?'}, + {"host", required_argument, NULL, 'h'}, + {"port", required_argument, NULL, 'p'}, + {"threads", required_argument, NULL, 't'}, + {"messages", required_argument, NULL, 'n'}, + {"verbose", no_argument, NULL, 'v'}, + {0, 0, 0, 0} + }; + while ((opt = getopt_long(argc, argv, "h:p:t:n:vs", long_options, NULL)) != -1) { switch (opt) { case 'h': colon = strchr(optarg, ':'); @@ -572,7 +441,7 @@ main(int argc, char *argv[]) { hi.port = (short)atol(optarg); break; - case 'c': + case 't': thread_count = atoi(optarg); break; @@ -584,10 +453,6 @@ main(int argc, char *argv[]) { verbose = 1; break; - case 's': - single = 1; - thread_count = 1; - break; default: /* '?' */ usage(argv[0], host_default, port_default, thread_count_default, @@ -600,20 +465,25 @@ main(int argc, char *argv[]) { workers = calloc(sizeof(struct worker_thread), thread_count); clock_gettime(CLOCK_MONOTONIC, &t0); - if (single) { + if (thread_count == 1) { printf("Single-threaded mode\n"); + workers[0].id = 0; workers[0].msg_target = msg_target; workers[0].hi = &hi; workers[0].verbose = verbose; workers[0].state = WS_INITIAL; + workers[0].debug = verbose ? debug_verbose : debug_noop; worker_main(&workers[0]); + total = workers[0].msg_received; + total_bytes = workers[0].byte_count; } else { for (i = 0; i < thread_count; ++i) { + workers[i].id = i; workers[i].msg_target = msg_target; workers[i].hi = &hi; workers[i].verbose = verbose; workers[i].state = WS_INITIAL; - + workers[i].debug = verbose ? debug_verbose : debug_noop; pthread_create(&workers[i].thread, NULL, worker_main, &workers[i]); } @@ -632,12 +502,13 @@ main(int argc, char *argv[]) { float mili1 = t1.tv_sec * 1000 + t1.tv_nsec / 1000000; if(total != 0) { - printf("Read %ld messages in %0.2f sec: %0.2f msg/sec (%d MB/sec, %d KB/sec)\n", - (long)total, + double kb_per_sec = ((double)total_bytes / (double)(mili1-mili0)) / 1.024; + printf("Read %ld messages (%ld bytes) in %0.2f sec: %0.2f msg/sec (%0.2f KB/sec)\n", + total, + total_bytes, (mili1-mili0)/1000.0, - 1000*total/(mili1-mili0), - (int)(total_bytes / (1000*(mili1-mili0))), - (int)(total_bytes / (mili1-mili0))); + 1000*((double)total)/(mili1-mili0), + kb_per_sec); return EXIT_SUCCESS; } else { printf("No message was read.\n");