diff --git a/src/http.c b/src/http.c index ef4ea9a..6b7acd2 100644 --- a/src/http.c +++ b/src/http.c @@ -98,6 +98,7 @@ http_response_cleanup(struct http_response *r, int fd, int success) { free(r->out); if(!r->keep_alive || !success) { /* Close fd is client doesn't support Keep-Alive. */ + fprintf(stderr, "http_response_cleanup: keep_alive=%d, success=%d -> closing\n", r->keep_alive, success); close(fd); } @@ -124,6 +125,8 @@ http_can_write(int fd, short event, void *p) { if(ret > 0) r->sent += ret; + fprintf(stderr, "http_can_write: ret=%d, r->out_sz=%lu, r->sent=%d\n", + ret, r->out_sz, r->sent); if(ret <= 0 || r->out_sz - r->sent == 0) { /* error or done */ http_response_cleanup(r, fd, (int)r->out_sz == r->sent ? 1 : 0); } else { /* reschedule write */ diff --git a/src/websocket.c b/src/websocket.c index 44ee6a6..687fbfe 100644 --- a/src/websocket.c +++ b/src/websocket.c @@ -220,6 +220,7 @@ ws_execute(struct http_client *c, const char *frame, size_t frame_len) { static struct ws_msg * ws_msg_new() { + fprintf(stderr, "------------ NEW -----------\n"); return calloc(1, sizeof(struct ws_msg)); } @@ -235,6 +236,7 @@ ws_msg_add(struct ws_msg *m, const char *p, size_t psz, const unsigned char *mas for(i = 0; i < psz && mask; ++i) { m->payload[m->payload_sz + i] = (unsigned char)p[i] ^ mask[i%4]; } + fprintf(stderr, "CONTENTS=[%.*s] (%lu)\n", (int)psz, m->payload, psz); /* save new size */ m->payload_sz += psz; @@ -242,7 +244,7 @@ ws_msg_add(struct ws_msg *m, const char *p, size_t psz, const unsigned char *mas static void ws_msg_free(struct ws_msg **m) { - + fprintf(stderr, "------------ /FREE -----------\n"); free((*m)->payload); free(*m); *m = NULL; @@ -262,12 +264,15 @@ ws_parse_data(const char *frame, size_t sz, struct ws_msg **msg) { } has_mask = frame[1] & 0x80 ? 1:0; + fprintf(stderr, "has_mask=%d\n", has_mask); /* get payload length */ len = frame[1] & 0x7f; /* remove leftmost bit */ + fprintf(stderr, "len=%llu\n", len); if(len <= 125) { /* data starts right after the mask */ p = frame + 2 + (has_mask ? 4 : 0); if(has_mask) memcpy(&mask, frame + 2, sizeof(mask)); + if (has_mask) fprintf(stderr, "mask= %02x %02x %02x %02x\n", mask[0], mask[1], mask[2], mask[3]); } else if(len == 126) { uint16_t sz16; memcpy(&sz16, frame + 2, sizeof(uint16_t)); @@ -293,8 +298,10 @@ ws_parse_data(const char *frame, size_t sz, struct ws_msg **msg) { (*msg)->total_sz += len + (p - frame); if(frame[0] & 0x80) { /* FIN bit set */ + fprintf(stderr, "FIN bit: SET\n"); return WS_MSG_COMPLETE; } else { + fprintf(stderr, "FIN bit: NOT SET\n"); return WS_READING; /* need more data */ } } @@ -312,6 +319,7 @@ ws_add_data(struct http_client *c) { while(state == WS_MSG_COMPLETE) { int ret = ws_execute(c, c->frame->payload, c->frame->payload_sz); + fprintf(stderr, "ws_execute returned %d\n", ret); /* remove frame from client buffer */ http_client_remove_data(c, c->frame->total_sz); @@ -323,14 +331,16 @@ ws_add_data(struct http_client *c) { /* can't process frame. */ return WS_ERROR; } + fprintf(stderr, "Calling ws_parse_data again...\n"); state = ws_parse_data(c->buffer, c->sz, &c->frame); + fprintf(stderr, "ws_parse_data returned %d\n", (int)state); } return state; } int ws_reply(struct cmd *cmd, const char *p, size_t sz) { - + fprintf(stderr, "ws_reply: '%.*s' (%lu bytes)\n", (int)sz, p, sz); char *frame = malloc(sz + 8); /* create frame by prepending header */ size_t frame_sz = 0; struct http_response *r; @@ -365,7 +375,7 @@ ws_reply(struct cmd *cmd, const char *p, size_t sz) { /* send WS frame */ r = http_response_init(cmd->w, 0, NULL); - if (cmd_is_subscribe(cmd)) { + if (1 || cmd_is_subscribe(cmd)) { r->keep_alive = 1; } diff --git a/src/worker.c b/src/worker.c index d5decef..1d5b838 100644 --- a/src/worker.c +++ b/src/worker.c @@ -35,7 +35,7 @@ worker_new(struct server *s) { void worker_can_read(int fd, short event, void *p) { - + fprintf(stderr, "worker_can_read\n"); struct http_client *c = p; int ret, nparsed; @@ -87,9 +87,11 @@ worker_can_read(int fd, short event, void *p) { } if(c->broken) { /* terminate client */ + fprintf(stderr, "c->broken: http_client_free()\n"); http_client_free(c); } else { /* start monitoring input again */ + fprintf(stderr, "worker_monitor_input()\n"); worker_monitor_input(c); } } diff --git a/tests/Makefile b/tests/Makefile index f382c4e..be3caa0 100644 --- a/tests/Makefile +++ b/tests/Makefile @@ -1,11 +1,11 @@ OUT=websocket pubsub -CFLAGS=-O3 -Wall -Wextra -LDFLAGS=-levent -lpthread +CFLAGS=-O0 -g -Wall -Wextra -I../src/http-parser/ +LDFLAGS=-g -levent -lpthread all: $(OUT) Makefile -websocket: websocket.o - $(CC) -o $@ $< $(LDFLAGS) +websocket: websocket.o ../src/http-parser/http_parser.o + $(CC) -o $@ $^ $(LDFLAGS) pubsub: pubsub.o $(CC) -o $@ $< $(LDFLAGS) diff --git a/tests/websocket.c b/tests/websocket.c index 39cb492..3c53cb1 100644 --- a/tests/websocket.c +++ b/tests/websocket.c @@ -8,11 +8,15 @@ #include #include #include +#include #include #include #include +#include + +#define DEBUG_LOGS 0 struct host_info { char *host; @@ -22,7 +26,8 @@ struct host_info { enum worker_state { WS_INITIAL, WS_SENT_HANDSHAKE, - WS_RECEIVED_RESPONSE, + WS_RECEIVED_HANDSHAKE, + WS_SENT_FRAME, WS_COMPLETE }; @@ -47,13 +52,49 @@ struct worker_thread { int fd; struct event ev_r; struct event ev_w; + + http_parser parser; + http_parser_settings settings; }; +void +hex_dump(char *p, size_t sz) { +#if DEBUG_LOGS + printf("hex dump of %p (%ld bytes)\n", p, sz); + for (char *cur = p; cur < p + sz; cur += 16) { + char letters[16] = {0}; + int limit = (cur + 16) > p + sz ? (sz % 16) : 16; + printf("%08lx ", cur - p); /* address */ + for (int i = 0; i < limit; i++) { + printf("%02x ", (unsigned int)(cur[i] & 0xff)); + letters[i] = isprint(cur[i]) ? cur[i] : '.'; + } + for (int i = limit; i < 16; i++) { /* pad on last line */ + printf(" "); + } + printf(" %.*s\n", limit, letters); + } +#endif +} + +void +evbuffer_debug_dump(struct evbuffer *buffer) { + size_t sz = evbuffer_get_length(buffer); + char *data = malloc(sz); + evbuffer_remove(buffer, data, sz); + hex_dump(data, sz); + evbuffer_prepend(buffer, data, sz); + free(data); +} + static void wait_for_possible_read(struct worker_thread *wt); static void wait_for_possible_write(struct worker_thread *wt); +static void +ws_enqueue_frame(struct worker_thread *wt); + void process_message(struct worker_thread *wt, size_t sz) { @@ -79,26 +120,51 @@ void websocket_can_write(int fd, short event, void *ptr) { int ret; struct worker_thread *wt = ptr; +#if DEBUG_LOGS printf("%s (wt=%p, fd=%d)\n", __func__, wt, fd); +#endif if(event != EV_WRITE) { return; } switch (wt->state) { - case WS_INITIAL: /* still sending initial HTTP request */ + case WS_INITIAL: { /* still sending initial HTTP request */ ret = evbuffer_write(wt->wbuffer, fd); +#if DEBUG_LOGS printf("evbuffer_write returned %d\n", ret); printf("evbuffer_get_length returned %d\n", evbuffer_get_length(wt->wbuffer)); - if (evbuffer_get_length(wt->wbuffer) != 0) { /* not all written */ +#endif + if (evbuffer_get_length(wt->wbuffer) != 0) { /* not all written */ wait_for_possible_write(wt); return; } /* otherwise, we've sent the full request, time to read the response */ wt->state = WS_SENT_HANDSHAKE; +#if DEBUG_LOGS + printf("state=WS_SENT_HANDSHAKE\n"); +#endif wait_for_possible_read(wt); return; - + } + case WS_RECEIVED_HANDSHAKE: { /* ready to send a frame */ +#if DEBUG_LOGS + printf("About to send data for WS frame, %lu in buffer\n", evbuffer_get_length(wt->wbuffer)); +#endif + evbuffer_write(wt->wbuffer, fd); + size_t write_remains = evbuffer_get_length(wt->wbuffer); +#if DEBUG_LOGS + printf("Sent data for WS frame, still %lu left to write\n", write_remains); +#endif + if (write_remains == 0) { /* ready to read response */ + wt->state = WS_SENT_FRAME; + wt->msg_sent++; + wait_for_possible_read(wt); + } else { /* not finished writing */ + wait_for_possible_write(wt); + } + return; + } default: break; } @@ -125,7 +191,9 @@ websocket_can_read(int fd, short event, void *ptr) { int ret, success = 1; struct worker_thread *wt = ptr; +#if DEBUG_LOGS printf("%s (wt=%p)\n", __func__, wt); +#endif if(event != EV_READ) { return; @@ -133,8 +201,113 @@ websocket_can_read(int fd, short event, void *ptr) { /* read message */ ret = evbuffer_read(wt->rbuffer, fd, 65536); - printf("evbuffer_read() returned %d\n", ret); - +#if DEBUG_LOGS + printf("evbuffer_read() returned %d; wt->state=%d. wt->rbuffer:\n", ret, wt->state); +#endif + evbuffer_debug_dump(wt->rbuffer); + if (ret == 0) { +#if DEBUG_LOGS + printf("We didn't read anything from the socket...\n"); +#endif + wait_for_possible_read(wt); + return; + } + + while(1) { + switch (wt->state) { + case WS_SENT_HANDSHAKE: { /* waiting for handshake response */ + size_t avail_sz = evbuffer_get_length(wt->rbuffer); + char *tmp = calloc(avail_sz, 1); +#if DEBUG_LOGS + printf("avail_sz from rbuffer = %lu\n", avail_sz); +#endif + evbuffer_remove(wt->rbuffer, tmp, avail_sz); /* copy into `tmp` */ +#if DEBUG_LOGS + printf("Giving %lu bytes to http-parser\n", avail_sz); +#endif + int nparsed = http_parser_execute(&wt->parser, &wt->settings, tmp, avail_sz); +#if DEBUG_LOGS + printf("http-parser returned %d\n", nparsed); +#endif + if (nparsed != (int)avail_sz) { // put back what we didn't read +#if DEBUG_LOGS + printf("re-attach (prepend) %lu bytes\n", avail_sz - nparsed); +#endif + evbuffer_prepend(wt->rbuffer, tmp + nparsed, avail_sz - nparsed); + } + free(tmp); + if (wt->state == WS_SENT_HANDSHAKE && /* haven't encountered end of response yet */ + wt->parser.upgrade && nparsed != (int)avail_sz) { +#if DEBUG_LOGS + printf("UPGRADE *and* we have some data left\n"); +#endif + continue; + } else if (wt->state == WS_RECEIVED_HANDSHAKE) { /* we have the full response */ + evbuffer_drain(wt->rbuffer, evbuffer_get_length(wt->rbuffer)); + } + } + return; + + case WS_SENT_FRAME: { /* waiting for frame response */ +#if DEBUG_LOGS + printf("We're in WS_SENT_FRAME, just read a frame response. wt->rbuffer:\n"); +#endif + evbuffer_debug_dump(wt->rbuffer); + uint8_t flag_opcodes, payload_len; + if (evbuffer_get_length(wt->rbuffer) < 2) { /* not enough data */ + wait_for_possible_read(wt); + return; + } + evbuffer_remove(wt->rbuffer, &flag_opcodes, 1); /* remove flags & opcode */ + evbuffer_remove(wt->rbuffer, &payload_len, 1); /* remove length */ + evbuffer_drain(wt->rbuffer, (size_t)payload_len); /* remove payload itself */ + process_message(wt, payload_len); + + if (evbuffer_get_length(wt->rbuffer) == 0) { /* consumed everything, let's write again */ +#if DEBUG_LOGS + printf("our turn to write again\n"); +#endif + wt->state = WS_RECEIVED_HANDSHAKE; + ws_enqueue_frame(wt); + return; + } else { +#if DEBUG_LOGS + printf("there's still data to consume\n"); +#endif + continue; + } +#if 0 + struct evbuffer_ptr sof = evbuffer_search(wt->rbuffer, "\x00", 1, NULL); + struct evbuffer_ptr eof = evbuffer_search(wt->rbuffer, "\xff", 1, NULL); + if (evbuffer_get_length(wt->rbuffer) >= 1 && sof.pos != 0) { + printf("ERROR: length=%lu, sof at pos %ld\n", evbuffer_get_length(wt->rbuffer), sof.pos); + } + if (eof.pos == -1) { /* not there yet */ + printf("Couldn't find the end-of-frame marker, need to read more\n"); + wait_for_possible_read(wt); + } else { /* we have a frame */ + size_t bounded_frame_sz = eof.pos + 1; + char *bounded_frame = calloc(bounded_frame_sz, 1); + evbuffer_remove(wt->rbuffer, bounded_frame, bounded_frame_sz); + process_message(wt, bounded_frame_sz); + printf("Received frame (%lu bytes total):\n", bounded_frame_sz); + hex_dump(bounded_frame, bounded_frame_sz); + free(bounded_frame); + if (evbuffer_get_length(wt->rbuffer) > 0) { /* we may have more frames to process */ + continue; + } else { /* our turn to send a frame */ + printf("Add frame to write buffer\n"); + ws_enqueue_frame(wt); + } + } +#endif + } + return; + + default: + return; + } + } #if 0 pos = packet; @@ -181,7 +354,7 @@ websocket_can_read(int fd, short event, void *ptr) { process_message(ptr, msg_sz); /* record packet */ /* drain including frame delimiters (+2 bytes) */ - evbuffer_drain(wt->rbuffer, msg_sz + 2); + evbuffer_drain(wt->rbuffer, msg_sz + 2); } } else { printf("ret=%d\n", ret); @@ -192,13 +365,15 @@ websocket_can_read(int fd, short event, void *ptr) { close(fd); event_base_loopexit(wt->base, NULL); } - #endif +#endif } static void wait_for_possible_read(struct worker_thread *wt) { +#if DEBUG_LOGS printf("%s (wt=%p)\n", __func__, wt); +#endif event_set(&wt->ev_r, wt->fd, EV_READ, websocket_can_read, wt); event_base_set(wt->base, &wt->ev_r); event_add(&wt->ev_r, NULL); @@ -206,12 +381,72 @@ wait_for_possible_read(struct worker_thread *wt) { static void wait_for_possible_write(struct worker_thread *wt) { +#if DEBUG_LOGS printf("%s (wt=%p)\n", __func__, wt); +#endif event_set(&wt->ev_r, wt->fd, EV_WRITE, websocket_can_write, wt); event_base_set(wt->base, &wt->ev_r); event_add(&wt->ev_r, NULL); } +static int +ws_on_headers_complete(http_parser *p) { + struct worker_thread *wt = p->data; + +#if DEBUG_LOGS + printf("%s (wt=%p)\n", __func__, wt); +#endif + // TODO + return 0; +} + +static void +ws_enqueue_frame_for_command(struct worker_thread *wt, char *cmd, size_t sz) { + unsigned char mask[4]; + for (int i = 0; i < 4; i++) { + mask[i] = rand() & 0xff; + } + uint8_t len = (uint8_t)(sz); /* (1 << 7) | length. */ + len |= (1 << 7); /* set masking bit ON */ + + for (int i = 0; i < sz; i++) { + cmd[i] = (cmd[i] ^ mask[i%4]) & 0xff; + } + /* 0x81 = 10000001b: FIN bit (only one message in the frame), text frame */ + evbuffer_add(wt->wbuffer, "\x81", 1); + evbuffer_add(wt->wbuffer, &len, 1); + evbuffer_add(wt->wbuffer, mask, 4); + evbuffer_add(wt->wbuffer, cmd, sz); +} + +static void +ws_enqueue_frame(struct worker_thread *wt) { + +#if 0 + char set_command[] = "[\"SET\",\"key\",\"value\"]"; + ws_enqueue_frame_for_command(wt, set_command, sizeof(set_command) - 1); + char get_command[] = "[\"GET\",\"key\"]"; + ws_enqueue_frame_for_command(wt, get_command, sizeof(get_command) - 1); +#endif + char ping_command[] = "[\"PING\"]"; + ws_enqueue_frame_for_command(wt, ping_command, sizeof(ping_command) - 1); + + wait_for_possible_write(wt); +} + +static int +ws_on_message_complete(http_parser *p) { + struct worker_thread *wt = p->data; + +#if DEBUG_LOGS + printf("%s (wt=%p)\n", __func__, wt); +#endif + // we've received the full HTTP response now, so we're ready to send frames + wt->state = WS_RECEIVED_HANDSHAKE; + ws_enqueue_frame(wt); /* add frame to buffer and register interest in writing */ + return 0; +} + void* worker_main(void *ptr) { @@ -253,12 +488,19 @@ worker_main(void *ptr) { wt->byte_count = 0; wt->got_header = 0; + /* initialize HTTP parser, to parse the server response */ + memset(&wt->settings, 0, sizeof(http_parser_settings)); + wt->settings.on_headers_complete = ws_on_headers_complete; + wt->settings.on_message_complete = ws_on_message_complete; + http_parser_init(&wt->parser, HTTP_RESPONSE); + wt->parser.data = wt; + /* build handshake buffer */ /* ws_handshake_sz = sizeof(ws_handshake) + 2*strlen(wt->hi->host) + 500; ws_handshake = calloc(ws_handshake_sz, 1); - ws_handshake_sz = (size_t)sprintf(ws_handshake, ws_template, + ws_handshake_sz = (size_t)sprintf(ws_handshake, ws_template, wt->hi->host, wt->hi->port, wt->hi->host, wt->hi->port); */ @@ -270,7 +512,7 @@ worker_main(void *ptr) { event_base_dispatch(wt->base); printf("event_base_dispatch returned\n"); event_base_free(wt->base); - free(ws_handshake); + // free(ws_handshake); return NULL; } @@ -391,7 +633,7 @@ main(int argc, char *argv[]) { if(total != 0) { printf("Read %ld messages in %0.2f sec: %0.2f msg/sec (%d MB/sec, %d KB/sec)\n", - (long)total, + (long)total, (mili1-mili0)/1000.0, 1000*total/(mili1-mili0), (int)(total_bytes / (1000*(mili1-mili0))),