diff --git a/tests/websocket.c b/tests/websocket.c index 3c53cb1..c0277d7 100644 --- a/tests/websocket.c +++ b/tests/websocket.c @@ -9,6 +9,7 @@ #include #include #include +#include #include #include @@ -40,6 +41,7 @@ struct worker_thread { int msg_received; int msg_sent; int byte_count; + int id; pthread_t thread; enum worker_state state; @@ -55,34 +57,48 @@ struct worker_thread { http_parser parser; http_parser_settings settings; + + int (*debug)(const char *fmt, ...); }; +int debug_noop(const char *fmt, ...) { + (void)fmt; + return 0; +} + +int debug_verbose(const char *fmt, ...) { + int ret; + va_list vargs; + va_start(vargs, fmt); + ret = vfprintf(stderr, fmt, vargs); + va_end(vargs); + return ret; +} + void -hex_dump(char *p, size_t sz) { -#if DEBUG_LOGS - printf("hex dump of %p (%ld bytes)\n", p, sz); +hex_dump(struct worker_thread *wt, char *p, size_t sz) { + wt->debug("hex dump of %p (%ld bytes)\n", p, sz); for (char *cur = p; cur < p + sz; cur += 16) { char letters[16] = {0}; int limit = (cur + 16) > p + sz ? (sz % 16) : 16; - printf("%08lx ", cur - p); /* address */ + wt->debug("%08lx ", cur - p); /* address */ for (int i = 0; i < limit; i++) { - printf("%02x ", (unsigned int)(cur[i] & 0xff)); + wt->debug("%02x ", (unsigned int)(cur[i] & 0xff)); letters[i] = isprint(cur[i]) ? cur[i] : '.'; } for (int i = limit; i < 16; i++) { /* pad on last line */ - printf(" "); + wt->debug(" "); } - printf(" %.*s\n", limit, letters); + wt->debug(" %.*s\n", limit, letters); } -#endif } void -evbuffer_debug_dump(struct evbuffer *buffer) { +evbuffer_debug_dump(struct worker_thread *wt, struct evbuffer *buffer) { size_t sz = evbuffer_get_length(buffer); char *data = malloc(sz); evbuffer_remove(buffer, data, sz); - hex_dump(data, sz); + hex_dump(wt, data, sz); evbuffer_prepend(buffer, data, sz); free(data); } @@ -99,9 +115,9 @@ void process_message(struct worker_thread *wt, size_t sz) { // printf("process_message\n"); - if(wt->msg_received % 10000 == 0) { - printf("thread %u: %8d messages left (got %9d bytes so far).\n", - (unsigned int)wt->thread, + if(wt->msg_received && wt->msg_received % 1000 == 0) { + printf("thread %d: %8d messages left (got %9d bytes so far).\n", + wt->id, wt->msg_target - wt->msg_received, wt->byte_count); } wt->byte_count += sz; @@ -120,9 +136,7 @@ void websocket_can_write(int fd, short event, void *ptr) { int ret; struct worker_thread *wt = ptr; -#if DEBUG_LOGS - printf("%s (wt=%p, fd=%d)\n", __func__, wt, fd); -#endif + wt->debug("%s (wt=%p, fd=%d)\n", __func__, wt, fd); if(event != EV_WRITE) { return; @@ -131,31 +145,23 @@ websocket_can_write(int fd, short event, void *ptr) { { case WS_INITIAL: { /* still sending initial HTTP request */ ret = evbuffer_write(wt->wbuffer, fd); -#if DEBUG_LOGS - printf("evbuffer_write returned %d\n", ret); - printf("evbuffer_get_length returned %d\n", evbuffer_get_length(wt->wbuffer)); -#endif + wt->debug("evbuffer_write returned %d\n", ret); + wt->debug("evbuffer_get_length returned %d\n", evbuffer_get_length(wt->wbuffer)); if (evbuffer_get_length(wt->wbuffer) != 0) { /* not all written */ wait_for_possible_write(wt); return; } /* otherwise, we've sent the full request, time to read the response */ wt->state = WS_SENT_HANDSHAKE; -#if DEBUG_LOGS - printf("state=WS_SENT_HANDSHAKE\n"); -#endif + wt->debug("state=WS_SENT_HANDSHAKE\n"); wait_for_possible_read(wt); return; } case WS_RECEIVED_HANDSHAKE: { /* ready to send a frame */ -#if DEBUG_LOGS - printf("About to send data for WS frame, %lu in buffer\n", evbuffer_get_length(wt->wbuffer)); -#endif + wt->debug("About to send data for WS frame, %lu in buffer\n", evbuffer_get_length(wt->wbuffer)); evbuffer_write(wt->wbuffer, fd); size_t write_remains = evbuffer_get_length(wt->wbuffer); -#if DEBUG_LOGS - printf("Sent data for WS frame, still %lu left to write\n", write_remains); -#endif + wt->debug("Sent data for WS frame, still %lu left to write\n", write_remains); if (write_remains == 0) { /* ready to read response */ wt->state = WS_SENT_FRAME; wt->msg_sent++; @@ -168,32 +174,14 @@ websocket_can_write(int fd, short event, void *ptr) { default: break; } -#if 0 - char message[] = "\x00[\"SET\",\"key\",\"value\"]\xff\x00[\"GET\",\"key\"]\xff"; - ret = write(fd, message, sizeof(message)-1); - if(ret != sizeof(message)-1) { - fprintf(stderr, "write on %d failed: %s\n", fd, strerror(errno)); - close(fd); - } - - wt->msg_sent += 2; - if(wt->msg_sent < wt->msg_target) { - event_set(&wt->ev_w, fd, EV_WRITE, websocket_can_write, wt); - event_base_set(wt->base, &wt->ev_w); - ret = event_add(&wt->ev_w, NULL); - } -#endif } static void websocket_can_read(int fd, short event, void *ptr) { - char packet[2048], *pos; - int ret, success = 1; + int ret; struct worker_thread *wt = ptr; -#if DEBUG_LOGS - printf("%s (wt=%p)\n", __func__, wt); -#endif + wt->debug("%s (wt=%p)\n", __func__, wt); if(event != EV_READ) { return; @@ -201,14 +189,10 @@ websocket_can_read(int fd, short event, void *ptr) { /* read message */ ret = evbuffer_read(wt->rbuffer, fd, 65536); -#if DEBUG_LOGS - printf("evbuffer_read() returned %d; wt->state=%d. wt->rbuffer:\n", ret, wt->state); -#endif - evbuffer_debug_dump(wt->rbuffer); + wt->debug("evbuffer_read() returned %d; wt->state=%d. wt->rbuffer:\n", ret, wt->state); + evbuffer_debug_dump(wt, wt->rbuffer); if (ret == 0) { -#if DEBUG_LOGS - printf("We didn't read anything from the socket...\n"); -#endif + wt->debug("We didn't read anything from the socket...\n"); wait_for_possible_read(wt); return; } @@ -218,41 +202,30 @@ websocket_can_read(int fd, short event, void *ptr) { case WS_SENT_HANDSHAKE: { /* waiting for handshake response */ size_t avail_sz = evbuffer_get_length(wt->rbuffer); char *tmp = calloc(avail_sz, 1); -#if DEBUG_LOGS - printf("avail_sz from rbuffer = %lu\n", avail_sz); -#endif + wt->debug("avail_sz from rbuffer = %lu\n", avail_sz); evbuffer_remove(wt->rbuffer, tmp, avail_sz); /* copy into `tmp` */ -#if DEBUG_LOGS - printf("Giving %lu bytes to http-parser\n", avail_sz); -#endif + wt->debug("Giving %lu bytes to http-parser\n", avail_sz); int nparsed = http_parser_execute(&wt->parser, &wt->settings, tmp, avail_sz); -#if DEBUG_LOGS - printf("http-parser returned %d\n", nparsed); -#endif + wt->debug("http-parser returned %d\n", nparsed); if (nparsed != (int)avail_sz) { // put back what we didn't read -#if DEBUG_LOGS - printf("re-attach (prepend) %lu bytes\n", avail_sz - nparsed); -#endif + wt->debug("re-attach (prepend) %lu byte%c\n", avail_sz - nparsed, + avail_sz - nparsed > 1 ? 's' : ' '); evbuffer_prepend(wt->rbuffer, tmp + nparsed, avail_sz - nparsed); } free(tmp); if (wt->state == WS_SENT_HANDSHAKE && /* haven't encountered end of response yet */ wt->parser.upgrade && nparsed != (int)avail_sz) { -#if DEBUG_LOGS - printf("UPGRADE *and* we have some data left\n"); -#endif + wt->debug("UPGRADE *and* we have some data left\n"); continue; } else if (wt->state == WS_RECEIVED_HANDSHAKE) { /* we have the full response */ evbuffer_drain(wt->rbuffer, evbuffer_get_length(wt->rbuffer)); } + return; } - return; case WS_SENT_FRAME: { /* waiting for frame response */ -#if DEBUG_LOGS - printf("We're in WS_SENT_FRAME, just read a frame response. wt->rbuffer:\n"); -#endif - evbuffer_debug_dump(wt->rbuffer); + wt->debug("We're in WS_SENT_FRAME, just read a frame response. wt->rbuffer:\n"); + evbuffer_debug_dump(wt, wt->rbuffer); uint8_t flag_opcodes, payload_len; if (evbuffer_get_length(wt->rbuffer) < 2) { /* not enough data */ wait_for_possible_read(wt); @@ -264,116 +237,27 @@ websocket_can_read(int fd, short event, void *ptr) { process_message(wt, payload_len); if (evbuffer_get_length(wt->rbuffer) == 0) { /* consumed everything, let's write again */ -#if DEBUG_LOGS - printf("our turn to write again\n"); -#endif + wt->debug("our turn to write again\n"); wt->state = WS_RECEIVED_HANDSHAKE; ws_enqueue_frame(wt); return; } else { -#if DEBUG_LOGS - printf("there's still data to consume\n"); -#endif + wt->debug("there's still data to consume\n"); continue; } -#if 0 - struct evbuffer_ptr sof = evbuffer_search(wt->rbuffer, "\x00", 1, NULL); - struct evbuffer_ptr eof = evbuffer_search(wt->rbuffer, "\xff", 1, NULL); - if (evbuffer_get_length(wt->rbuffer) >= 1 && sof.pos != 0) { - printf("ERROR: length=%lu, sof at pos %ld\n", evbuffer_get_length(wt->rbuffer), sof.pos); - } - if (eof.pos == -1) { /* not there yet */ - printf("Couldn't find the end-of-frame marker, need to read more\n"); - wait_for_possible_read(wt); - } else { /* we have a frame */ - size_t bounded_frame_sz = eof.pos + 1; - char *bounded_frame = calloc(bounded_frame_sz, 1); - evbuffer_remove(wt->rbuffer, bounded_frame, bounded_frame_sz); - process_message(wt, bounded_frame_sz); - printf("Received frame (%lu bytes total):\n", bounded_frame_sz); - hex_dump(bounded_frame, bounded_frame_sz); - free(bounded_frame); - if (evbuffer_get_length(wt->rbuffer) > 0) { /* we may have more frames to process */ - continue; - } else { /* our turn to send a frame */ - printf("Add frame to write buffer\n"); - ws_enqueue_frame(wt); - } - } -#endif + return; } - return; default: return; } } - -#if 0 - pos = packet; - if(ret > 0) { - char *data, *last; - int sz, msg_sz; - - if(wt->got_header == 0) { /* first response */ - char *frame_start = strstr(packet, "MH"); /* end of the handshake */ - if(frame_start == NULL) { - return; /* not yet */ - } else { /* start monitoring possible writes */ - printf("start monitoring possible writes\n"); - evbuffer_add(wt->rbuffer, frame_start + 2, ret - (frame_start + 2 - packet)); - - wt->got_header = 1; - event_set(&wt->ev_w, fd, EV_WRITE, - websocket_can_write, wt); - event_base_set(wt->base, &wt->ev_w); - ret = event_add(&wt->ev_w, NULL); - } - } else { - /* we've had the header already, now bufffer data. */ - evbuffer_add(wt->rbuffer, packet, ret); - } - - while(1) { - data = (char*)EVBUFFER_DATA(wt->rbuffer); - sz = EVBUFFER_LENGTH(wt->rbuffer); - - if(sz == 0) { /* no data */ - break; - } - if(*data != 0) { /* missing frame start */ - success = 0; - break; - } - last = memchr(data, 0xff, sz); /* look for frame end */ - if(!last) { - /* no end of frame in sight. */ - break; - } - msg_sz = last - data - 1; - process_message(ptr, msg_sz); /* record packet */ - - /* drain including frame delimiters (+2 bytes) */ - evbuffer_drain(wt->rbuffer, msg_sz + 2); - } - } else { - printf("ret=%d\n", ret); - success = 0; - } - if(success == 0) { - shutdown(fd, SHUT_RDWR); - close(fd); - event_base_loopexit(wt->base, NULL); - } -#endif } static void wait_for_possible_read(struct worker_thread *wt) { -#if DEBUG_LOGS - printf("%s (wt=%p)\n", __func__, wt); -#endif + wt->debug("%s (wt=%p)\n", __func__, wt); event_set(&wt->ev_r, wt->fd, EV_READ, websocket_can_read, wt); event_base_set(wt->base, &wt->ev_r); event_add(&wt->ev_r, NULL); @@ -381,9 +265,7 @@ wait_for_possible_read(struct worker_thread *wt) { static void wait_for_possible_write(struct worker_thread *wt) { -#if DEBUG_LOGS - printf("%s (wt=%p)\n", __func__, wt); -#endif + wt->debug("%s (wt=%p)\n", __func__, wt); event_set(&wt->ev_r, wt->fd, EV_WRITE, websocket_can_write, wt); event_base_set(wt->base, &wt->ev_r); event_add(&wt->ev_r, NULL); @@ -393,9 +275,7 @@ static int ws_on_headers_complete(http_parser *p) { struct worker_thread *wt = p->data; -#if DEBUG_LOGS - printf("%s (wt=%p)\n", __func__, wt); -#endif + wt->debug("%s (wt=%p)\n", __func__, wt); // TODO return 0; } @@ -409,7 +289,7 @@ ws_enqueue_frame_for_command(struct worker_thread *wt, char *cmd, size_t sz) { uint8_t len = (uint8_t)(sz); /* (1 << 7) | length. */ len |= (1 << 7); /* set masking bit ON */ - for (int i = 0; i < sz; i++) { + for (size_t i = 0; i < sz; i++) { cmd[i] = (cmd[i] ^ mask[i%4]) & 0xff; } /* 0x81 = 10000001b: FIN bit (only one message in the frame), text frame */ @@ -421,13 +301,6 @@ ws_enqueue_frame_for_command(struct worker_thread *wt, char *cmd, size_t sz) { static void ws_enqueue_frame(struct worker_thread *wt) { - -#if 0 - char set_command[] = "[\"SET\",\"key\",\"value\"]"; - ws_enqueue_frame_for_command(wt, set_command, sizeof(set_command) - 1); - char get_command[] = "[\"GET\",\"key\"]"; - ws_enqueue_frame_for_command(wt, get_command, sizeof(get_command) - 1); -#endif char ping_command[] = "[\"PING\"]"; ws_enqueue_frame_for_command(wt, ping_command, sizeof(ping_command) - 1); @@ -438,9 +311,7 @@ static int ws_on_message_complete(http_parser *p) { struct worker_thread *wt = p->data; -#if DEBUG_LOGS - printf("%s (wt=%p)\n", __func__, wt); -#endif + wt->debug("%s (wt=%p)\n", __func__, wt); // we've received the full HTTP response now, so we're ready to send frames wt->state = WS_RECEIVED_HANDSHAKE; ws_enqueue_frame(wt); /* add frame to buffer and register interest in writing */ @@ -464,8 +335,6 @@ worker_main(void *ptr) { int ret; int fd; struct sockaddr_in addr; - char *ws_handshake; - size_t ws_handshake_sz; /* connect socket */ fd = socket(AF_INET, SOCK_STREAM, 0); @@ -495,24 +364,15 @@ worker_main(void *ptr) { http_parser_init(&wt->parser, HTTP_RESPONSE); wt->parser.data = wt; - /* build handshake buffer */ - /* - ws_handshake_sz = sizeof(ws_handshake) - + 2*strlen(wt->hi->host) + 500; - ws_handshake = calloc(ws_handshake_sz, 1); - ws_handshake_sz = (size_t)sprintf(ws_handshake, ws_template, - wt->hi->host, wt->hi->port, - wt->hi->host, wt->hi->port); - */ - int added = evbuffer_add_printf(wt->wbuffer, ws_template, wt->hi->host, wt->hi->port, - wt->hi->host, wt->hi->port); + /* add GET request to buffer */ + evbuffer_add_printf(wt->wbuffer, ws_template, wt->hi->host, wt->hi->port, + wt->hi->host, wt->hi->port); wait_for_possible_write(wt); /* request callback */ /* go! */ event_base_dispatch(wt->base); - printf("event_base_dispatch returned\n"); + wt->debug("event_base_dispatch returned\n"); event_base_free(wt->base); - // free(ws_handshake); return NULL; } @@ -524,7 +384,7 @@ usage(const char* argv0, char *host_default, short port_default, "Options are:\n" "\t-h host\t\t(default = \"%s\")\n" "\t-p port\t\t(default = %d)\n" - "\t-c threads\t(default = %d)\n" + "\t-t threads\t(default = %d)\n" "\t-n count\t(number of messages per thread, default = %d)\n" "\t-v\t\t(verbose)\n", argv0, host_default, (int)port_default, @@ -545,15 +405,24 @@ main(int argc, char *argv[]) { int thread_count = thread_count_default; int i, opt; char *colon; - double total = 0, total_bytes = 0; - int verbose = 0, single = 0; + long total = 0, total_bytes = 0; + int verbose = 0; struct host_info hi = {host_default, port_default}; struct worker_thread *workers; /* getopt */ - while ((opt = getopt(argc, argv, "h:p:c:n:vs")) != -1) { + struct option long_options[] = { + {"help", no_argument, NULL, '?'}, + {"host", required_argument, NULL, 'h'}, + {"port", required_argument, NULL, 'p'}, + {"threads", required_argument, NULL, 't'}, + {"messages", required_argument, NULL, 'n'}, + {"verbose", no_argument, NULL, 'v'}, + {0, 0, 0, 0} + }; + while ((opt = getopt_long(argc, argv, "h:p:t:n:vs", long_options, NULL)) != -1) { switch (opt) { case 'h': colon = strchr(optarg, ':'); @@ -572,7 +441,7 @@ main(int argc, char *argv[]) { hi.port = (short)atol(optarg); break; - case 'c': + case 't': thread_count = atoi(optarg); break; @@ -584,10 +453,6 @@ main(int argc, char *argv[]) { verbose = 1; break; - case 's': - single = 1; - thread_count = 1; - break; default: /* '?' */ usage(argv[0], host_default, port_default, thread_count_default, @@ -600,20 +465,25 @@ main(int argc, char *argv[]) { workers = calloc(sizeof(struct worker_thread), thread_count); clock_gettime(CLOCK_MONOTONIC, &t0); - if (single) { + if (thread_count == 1) { printf("Single-threaded mode\n"); + workers[0].id = 0; workers[0].msg_target = msg_target; workers[0].hi = &hi; workers[0].verbose = verbose; workers[0].state = WS_INITIAL; + workers[0].debug = verbose ? debug_verbose : debug_noop; worker_main(&workers[0]); + total = workers[0].msg_received; + total_bytes = workers[0].byte_count; } else { for (i = 0; i < thread_count; ++i) { + workers[i].id = i; workers[i].msg_target = msg_target; workers[i].hi = &hi; workers[i].verbose = verbose; workers[i].state = WS_INITIAL; - + workers[i].debug = verbose ? debug_verbose : debug_noop; pthread_create(&workers[i].thread, NULL, worker_main, &workers[i]); } @@ -632,12 +502,13 @@ main(int argc, char *argv[]) { float mili1 = t1.tv_sec * 1000 + t1.tv_nsec / 1000000; if(total != 0) { - printf("Read %ld messages in %0.2f sec: %0.2f msg/sec (%d MB/sec, %d KB/sec)\n", - (long)total, + double kb_per_sec = ((double)total_bytes / (double)(mili1-mili0)) / 1.024; + printf("Read %ld messages (%ld bytes) in %0.2f sec: %0.2f msg/sec (%0.2f KB/sec)\n", + total, + total_bytes, (mili1-mili0)/1000.0, - 1000*total/(mili1-mili0), - (int)(total_bytes / (1000*(mili1-mili0))), - (int)(total_bytes / (mili1-mili0))); + 1000*((double)total)/(mili1-mili0), + kb_per_sec); return EXIT_SUCCESS; } else { printf("No message was read.\n");