diff --git a/src/conf.c b/src/conf.c index 92a68fc..f886b57 100644 --- a/src/conf.c +++ b/src/conf.c @@ -159,9 +159,11 @@ conf_read(const char *filename) { } } else if(strcmp(json_object_iter_key(kv),"verbosity") == 0 && json_typeof(jtmp) == JSON_INTEGER){ int tmp = json_integer_value(jtmp); - if(tmp < 0) conf->verbosity = WEBDIS_ERROR; - else if(tmp > (int)WEBDIS_DEBUG) conf->verbosity = WEBDIS_DEBUG; - else conf->verbosity = (log_level)tmp; + if(tmp < 0 || tmp > (int)WEBDIS_TRACE) { + fprintf(stderr, "Invalid log verbosity: %d. Acceptable range: [%d .. %d]\n", + tmp, WEBDIS_ERROR, WEBDIS_TRACE); + } + conf->verbosity = (tmp < 0 ? WEBDIS_ERROR : (tmp > WEBDIS_TRACE ? WEBDIS_TRACE : (log_level)tmp)); } else if(strcmp(json_object_iter_key(kv), "daemonize") == 0 && json_typeof(jtmp) == JSON_TRUE) { conf->daemonize = 1; } else if(strcmp(json_object_iter_key(kv), "daemonize") == 0 && json_typeof(jtmp) == JSON_STRING) { diff --git a/src/http.c b/src/http.c index ef4ea9a..66f8135 100644 --- a/src/http.c +++ b/src/http.c @@ -2,6 +2,7 @@ #include "server.h" #include "worker.h" #include "client.h" +#include "slog.h" #include #include @@ -137,7 +138,11 @@ http_schedule_write(int fd, struct http_response *r) { if(r->w) { /* async */ event_set(&r->ev, fd, EV_WRITE, http_can_write, r); event_base_set(r->w->base, &r->ev); - event_add(&r->ev, NULL); + int ret = event_add(&r->ev, NULL); + if (ret != 0) { /* could not schedule write */ + slog(r->w->s, WEBDIS_ERROR, "Could not schedule HTTP write", 0); + http_response_cleanup(r, fd, 0); + } } else { /* blocking */ http_can_write(fd, 0, r); } diff --git a/src/slog.c b/src/slog.c index 4c30438..9bae415 100644 --- a/src/slog.c +++ b/src/slog.c @@ -76,14 +76,22 @@ slog_fsync_init(struct server *s) { } } +/** + * Returns whether this log level is enabled. + */ +int +slog_enabled(struct server *s, log_level level) { + return level <= s->cfg->verbosity ? 1 : 0; +} + /** * Write log message to disk, or stderr. */ -void -slog(struct server *s, log_level level, +static void +slog_internal(struct server *s, log_level level, const char *body, size_t sz) { - const char *c = "EWNID"; + const char *c = "EWNIDT"; time_t now; struct tm now_tm, *lt_ret; char time_buf[64]; @@ -91,8 +99,6 @@ slog(struct server *s, log_level level, char line[256]; /* bounds are checked. */ int line_sz, ret; - if(level > s->cfg->verbosity) return; /* too verbose */ - if(!s->log.fd) return; /* limit message size */ @@ -110,8 +116,9 @@ slog(struct server *s, log_level level, } /* generate output line. */ + char letter = (level == WEBDIS_TRACE ? 5 : c[level]); line_sz = snprintf(line, sizeof(line), - "[%d] %s %c %s\n", (int)s->log.self, time_buf, c[level], msg); + "[%d] %s %c %s\n", (int)s->log.self, time_buf, letter, msg); /* write to log and maybe flush to disk. */ ret = write(s->log.fd, line, line_sz); @@ -121,3 +128,14 @@ slog(struct server *s, log_level level, (void)ret; } + +/** + * Thin wrapper around slog_internal that first checks the log level. + */ +void +slog(struct server *s, log_level level, + const char *body, size_t sz) { + if(level <= s->cfg->verbosity) { /* check log level first */ + slog_internal(s, level, body, sz); + } +} diff --git a/src/slog.h b/src/slog.h index f521d9a..14d9c66 100644 --- a/src/slog.h +++ b/src/slog.h @@ -6,7 +6,8 @@ typedef enum { WEBDIS_WARNING, WEBDIS_NOTICE, WEBDIS_INFO, - WEBDIS_DEBUG + WEBDIS_DEBUG, + WEBDIS_TRACE = 8 } log_level; typedef enum { @@ -26,6 +27,9 @@ slog_init(struct server *s); void slog_fsync_init(struct server *s); +int +slog_enabled(struct server *s, log_level level); + void slog(struct server *s, log_level level, const char *body, size_t sz); diff --git a/src/websocket.c b/src/websocket.c index 44ee6a6..53d48c5 100644 --- a/src/websocket.c +++ b/src/websocket.c @@ -6,6 +6,7 @@ #include "worker.h" #include "pool.h" #include "http.h" +#include "slog.h" /* message parsers */ #include "formats/json.h" @@ -47,7 +48,15 @@ ws_compute_handshake(struct http_client *c, char *out, size_t *out_sz) { // websocket handshake const char *key = client_get_header(c, "Sec-WebSocket-Key"); size_t key_sz = key?strlen(key):0, buffer_sz = key_sz + sizeof(magic) - 1; + if(!key || key_sz < 16 || key_sz > 32) { /* supposed to be exactly 16 bytes that were b64 encoded */ + slog(c->s, WEBDIS_WARNING, "Invalid Sec-WebSocket-Key", 0); + return -1; + } buffer = calloc(buffer_sz, 1); + if(!buffer) { + slog(c->s, WEBDIS_ERROR, "Failed to allocate memory for WS header", 0); + return -1; + } // concatenate key and guid in buffer memcpy(buffer, key, key_sz); @@ -57,10 +66,10 @@ ws_compute_handshake(struct http_client *c, char *out, size_t *out_sz) { SHA1Reset(&ctx); SHA1Input(&ctx, buffer, buffer_sz); SHA1Result(&ctx); - for(i = 0; i < 5; ++i) { // put in correct byte order before memcpy. + for(i = 0; i < (int)(20/sizeof(int)); ++i) { // put in correct byte order before memcpy. ctx.Message_Digest[i] = ntohl(ctx.Message_Digest[i]); } - memcpy(sha1_output, (unsigned char*)ctx.Message_Digest, 20); + memcpy(sha1_output, ctx.Message_Digest, 20); // encode `sha1_output' in base 64, into `out'. base64_init_encodestate(&b64_ctx); @@ -80,7 +89,6 @@ ws_compute_handshake(struct http_client *c, char *out, size_t *out_sz) { int ws_handshake_reply(struct http_client *c) { - int ret; char sha1_handshake[40]; char *buffer = NULL, *p; const char *origin = NULL, *host = NULL; @@ -109,12 +117,14 @@ ws_handshake_reply(struct http_client *c) { /* need those headers */ if(!origin || !origin_sz || !host || !host_sz || !c->path || !c->path_sz) { + slog(c->s, WEBDIS_WARNING, "Missing headers for WS handshake", 0); return -1; } memset(sha1_handshake, 0, sizeof(sha1_handshake)); if(ws_compute_handshake(c, &sha1_handshake[0], &handshake_sz) != 0) { /* failed to compute handshake. */ + slog(c->s, WEBDIS_WARNING, "Failed to compute handshake", 0); return -1; } @@ -125,6 +135,10 @@ ws_handshake_reply(struct http_client *c) { + sizeof(template4)-1; p = buffer = malloc(sz); + if(!p) { + slog(c->s, WEBDIS_ERROR, "Failed to allocate buffer for WS handshake", 0); + return -1; + } /* Concat all */ @@ -158,10 +172,19 @@ ws_handshake_reply(struct http_client *c) { memcpy(p, template4, sizeof(template4)-1); p += sizeof(template4)-1; - /* send data to client */ - ret = write(c->fd, buffer, sz); - (void)ret; - free(buffer); + /* build HTTP response object by hand, since we have the full response already */ + struct http_response *r = calloc(1, sizeof(struct http_response)); + if(!r) { + slog(c->s, WEBDIS_ERROR, "Failed to allocate response for WS handshake", 0); + free(buffer); + return -1; + } + r->w = c->w; + r->keep_alive = 1; + r->out = buffer; + r->out_sz = sz; + r->sent = 0; + http_schedule_write(c->fd, r); /* will free buffer and response once sent */ return 0; } @@ -321,6 +344,7 @@ ws_add_data(struct http_client *c) { if(ret != 0) { /* can't process frame. */ + slog(c->s, WEBDIS_WARNING, "ws_add_data: ws_execute failed", 0); return WS_ERROR; } state = ws_parse_data(c->buffer, c->sz, &c->frame); @@ -365,12 +389,14 @@ ws_reply(struct cmd *cmd, const char *p, size_t sz) { /* send WS frame */ r = http_response_init(cmd->w, 0, NULL); - if (cmd_is_subscribe(cmd)) { - r->keep_alive = 1; + if (r == NULL) { + free(frame); + slog(cmd->w->s, WEBDIS_ERROR, "Failed response allocation in ws_reply", 0); + return -1; } - if (r == NULL) - return -1; + /* mark as keep alive, otherwise we'll close the connection after the first reply */ + r->keep_alive = 1; r->out = frame; r->out_sz = frame_sz; diff --git a/tests/Makefile b/tests/Makefile index a0b8a11..55c87f5 100644 --- a/tests/Makefile +++ b/tests/Makefile @@ -1,11 +1,26 @@ OUT=websocket pubsub -CFLAGS=-O3 -Wall -Wextra -LDFLAGS=-levent -lpthread -lrt +OBJS=../src/http-parser/http_parser.o ../src/b64/cencode.o ../src/sha1/sha1.o +CFLAGS=-Wall -Wextra -I../src -I../src/http-parser +LDFLAGS=-levent -lpthread -lm + +# if `make` is run with DEBUG=1, include debug symbols (same as in Makefile in root directory) +DEBUG_FLAGS= +ifeq ($(DEBUG),1) + DEBUG_FLAGS += -O0 + ifeq ($(shell cc -v 2>&1 | grep -cw 'gcc version'),1) # GCC used: add GDB debugging symbols + DEBUG_FLAGS += -ggdb3 + else ifeq ($(shell gcc -v 2>&1 | grep -cw 'clang version'),1) # Clang used: add LLDB debugging symbols + DEBUG_FLAGS += -g3 -glldb + endif +else + DEBUG_FLAGS += -O3 +endif +CFLAGS += $(DEBUG_FLAGS) all: $(OUT) Makefile -websocket: websocket.o - $(CC) -o $@ $< $(LDFLAGS) +websocket: websocket.o $(OBJS) + $(CC) -o $@ $^ $(LDFLAGS) pubsub: pubsub.o $(CC) -o $@ $< $(LDFLAGS) @@ -14,5 +29,5 @@ pubsub: pubsub.o $(CC) -c $(CFLAGS) -o $@ $< clean: - rm -f *.o $(OUT) + rm -f *.o $(OUT) $(OBJS) diff --git a/tests/websocket.c b/tests/websocket.c index 1b23ed8..aa501ca 100644 --- a/tests/websocket.c +++ b/tests/websocket.c @@ -1,4 +1,4 @@ -/* http://tools.ietf.org/html/draft-hixie-thewebsocketprotocol-76 */ +/* https://datatracker.ietf.org/doc/html/rfc6455 */ #include #define _GNU_SOURCE @@ -8,6 +8,19 @@ #include #include #include +#include +#include +#include +#include +#include +#include +#include +#include +#ifdef __APPLE__ +#include +#else +#include +#endif #include #include @@ -19,6 +32,21 @@ struct host_info { short port; }; +enum worker_state { + WS_INITIAL, + WS_SENT_HANDSHAKE, + WS_RECEIVED_HANDSHAKE, + WS_SENT_FRAME, + WS_COMPLETE, + WS_BROKEN +}; + +enum mask_config { + MASK_NEVER, + MASK_ALWAYS, + MASK_ALTERNATE +}; + /* worker_thread, with counter of remaining messages */ struct worker_thread { struct host_info *hi; @@ -28,147 +56,498 @@ struct worker_thread { int msg_received; int msg_sent; int byte_count; + int id; pthread_t thread; - - struct evbuffer *buffer; + enum worker_state state; + int timeout_seconds; + + /* non-encoded websocket key */ + char ws_key[16]; + /* expected response */ + char ws_response[28]; + size_t ws_response_len; + /* actual response */ + char *sec_websocket_accept; + /* masking */ + enum mask_config mask_cfg; + int mask_applied; + + /* current header */ + char *cur_hdr_key; + size_t cur_hdr_key_len; /* not including trailing \0 */ + char *cur_hdr_val; + size_t cur_hdr_val_len; /* not including trailing \0 */ + int hdr_last_cb_was_name; /* tells us if the last call was header name or value */ + + struct evbuffer *rbuffer; int got_header; + struct evbuffer *wbuffer; + int verbose; + int fd; + struct event ev_r; struct event ev_w; + + http_parser parser; + http_parser_settings settings; + + int (*debug)(const char *fmt, ...); +}; + +struct progress_thread { + pthread_t thread; +#ifdef __APPLE__ + dispatch_semaphore_t sem_finished; +#else + sem_t sem_finished; +#endif + struct worker_thread *workers; + int worker_count; + int msg_target; + float interval_sec; }; +int debug_noop(const char *fmt, ...) { + (void)fmt; + return 0; +} + +int debug_verbose(const char *fmt, ...) { + int ret; + va_list vargs; + va_start(vargs, fmt); + ret = vfprintf(stderr, fmt, vargs); + va_end(vargs); + return ret; +} + void -process_message(struct worker_thread *wt, size_t sz) { +hex_dump(struct worker_thread *wt, char *p, size_t sz) { + wt->debug("hex dump of %p (%ld bytes)\n", p, sz); + for (char *cur = p; cur < p + sz; cur += 16) { + char letters[16] = {0}; + int limit = (cur + 16) > p + sz ? (sz % 16) : 16; + wt->debug("%08lx ", cur - p); /* address */ + for (int i = 0; i < limit; i++) { + wt->debug("%02x ", (unsigned int)(cur[i] & 0xff)); + letters[i] = isprint(cur[i]) ? cur[i] : '.'; + } + for (int i = limit; i < 16; i++) { /* pad on last line */ + wt->debug(" "); /* 3 spaces for "%02x " */ + } + wt->debug(" %.*s\n", limit, letters); + } +} - // printf("process_message\n"); - if(wt->msg_received % 10000 == 0) { - printf("thread %u: %8d messages left (got %9d bytes so far).\n", - (unsigned int)wt->thread, - wt->msg_target - wt->msg_received, wt->byte_count); +void +evbuffer_debug_dump(struct worker_thread *wt, struct evbuffer *buffer) { + size_t sz = evbuffer_get_length(buffer); + char *data = malloc(sz); + if (!data) { + fprintf(stderr, "failed to allocate %ld bytes\n", sz); + return; + } + evbuffer_remove(buffer, data, sz); + hex_dump(wt, data, sz); + evbuffer_prepend(buffer, data, sz); + free(data); +} + +static void +wait_for_possible_read(struct worker_thread *wt); + +static void +wait_for_possible_write(struct worker_thread *wt); + +static void +ws_enqueue_frame(struct worker_thread *wt); + +static void +wt_mark_finished(struct worker_thread *wt, enum worker_state state) { + wt->state = state; + event_base_loopbreak(wt->base); +} + +void +process_message(struct worker_thread *wt, size_t sz) { + if (0 && wt->msg_received && wt->msg_received % 1000 == 0) { + printf("thread %d: %8d messages left (got %9d bytes so far).\n", + wt->id, + wt->msg_target - wt->msg_received, wt->byte_count); } wt->byte_count += sz; /* decrement read count, and stop receiving when we reach zero. */ wt->msg_received++; - if(wt->msg_received == wt->msg_target) { - event_base_loopexit(wt->base, NULL); + if (wt->msg_received == wt->msg_target) { + wt->debug("%s: thread %d has received all %d messages it expected\n", + __func__, wt->id, wt->msg_received); + wt_mark_finished(wt, WS_COMPLETE); } } +/** + * Called when we can write to the socket. + */ void -websocket_write(int fd, short event, void *ptr) { +websocket_can_write(int fd, short event, void *ptr) { int ret; struct worker_thread *wt = ptr; - - if(event != EV_WRITE) { + (void) event; + wt->debug("%s (wt=%p, fd=%d)\n", __func__, wt, fd); + + switch (wt->state) { + case WS_INITIAL: { /* still sending initial HTTP request */ + ret = evbuffer_write(wt->wbuffer, fd); + wt->debug("evbuffer_write returned %d\n", ret); + wt->debug("evbuffer_get_length returned %d\n", evbuffer_get_length(wt->wbuffer)); + if (evbuffer_get_length(wt->wbuffer) != 0) { /* not all written */ + wait_for_possible_write(wt); + return; + } + /* otherwise, we've sent the full request, time to read the response */ + wt->state = WS_SENT_HANDSHAKE; + wt->debug("state=WS_SENT_HANDSHAKE\n"); + wait_for_possible_read(wt); return; } - - char message[] = "\x00[\"SET\",\"key\",\"value\"]\xff\x00[\"GET\",\"key\"]\xff"; - ret = write(fd, message, sizeof(message)-1); - if(ret != sizeof(message)-1) { - fprintf(stderr, "write on %d failed: %s\n", fd, strerror(errno)); - close(fd); + case WS_RECEIVED_HANDSHAKE: { /* ready to send a frame */ + wt->debug("About to send data for WS frame, %lu in buffer\n", evbuffer_get_length(wt->wbuffer)); + evbuffer_write(wt->wbuffer, fd); + size_t write_remains = evbuffer_get_length(wt->wbuffer); + wt->debug("Sent data for WS frame, still %lu left to write\n", write_remains); + if (write_remains == 0) { /* ready to read response */ + wt->state = WS_SENT_FRAME; + wt->msg_sent++; + wait_for_possible_read(wt); + } else { /* not finished writing */ + wait_for_possible_write(wt); + } + return; } - - wt->msg_sent += 2; - if(wt->msg_sent < wt->msg_target) { - event_set(&wt->ev_w, fd, EV_WRITE, websocket_write, wt); - event_base_set(wt->base, &wt->ev_w); - ret = event_add(&wt->ev_w, NULL); + default: + break; } } static void -websocket_read(int fd, short event, void *ptr) { - char packet[2048], *pos; - int ret, success = 1; +websocket_can_read(int fd, short event, void *ptr) { + int ret; struct worker_thread *wt = ptr; + (void) event; + wt->debug("%s (wt=%p)\n", __func__, wt); - if(event != EV_READ) { + /* read message */ + ret = evbuffer_read(wt->rbuffer, fd, 65536); + wt->debug("evbuffer_read() returned %d; wt->state=%d. wt->rbuffer:\n", ret, wt->state); + evbuffer_debug_dump(wt, wt->rbuffer); + if (ret == 0) { + wt->debug("We didn't read anything from the socket...\n"); + wt_mark_finished(wt, WS_BROKEN); return; } - /* read message */ - ret = read(fd, packet, sizeof(packet)); - pos = packet; - if(ret > 0) { - char *data, *last; - int sz, msg_sz; - - if(wt->got_header == 0) { /* first response */ - char *frame_start = strstr(packet, "MH"); /* end of the handshake */ - if(frame_start == NULL) { - return; /* not yet */ - } else { /* start monitoring possible writes */ - printf("start monitoring possible writes\n"); - evbuffer_add(wt->buffer, frame_start + 2, ret - (frame_start + 2 - packet)); - - wt->got_header = 1; - event_set(&wt->ev_w, fd, EV_WRITE, - websocket_write, wt); - event_base_set(wt->base, &wt->ev_w); - ret = event_add(&wt->ev_w, NULL); + while (1) { + switch (wt->state) { + case WS_SENT_HANDSHAKE: { /* waiting for handshake response */ + size_t avail_sz = evbuffer_get_length(wt->rbuffer); + char *tmp = calloc(avail_sz, 1); + wt->debug("avail_sz from rbuffer = %lu\n", avail_sz); + evbuffer_remove(wt->rbuffer, tmp, avail_sz); /* copy into `tmp` */ + wt->debug("Giving %lu bytes to http-parser\n", avail_sz); + int nparsed = http_parser_execute(&wt->parser, &wt->settings, tmp, avail_sz); + wt->debug("http-parser returned %d\n", nparsed); + free(tmp); + /* http parser will return the offset at which the upgraded protocol begins, + which in our case is 1 under the total response size. */ + + if (wt->state == WS_SENT_HANDSHAKE || /* haven't encountered end of response yet */ + (wt->parser.upgrade && nparsed != (int)avail_sz -1)) { + wt->debug("UPGRADE *and* we have some data left (nparsed=%d, avail_sz=%lu)\n", nparsed, avail_sz); + continue; + } else if (wt->state == WS_RECEIVED_HANDSHAKE) { /* we have the full response */ + evbuffer_drain(wt->rbuffer, evbuffer_get_length(wt->rbuffer)); } - } else { - /* we've had the header already, now bufffer data. */ - evbuffer_add(wt->buffer, packet, ret); + return; } - while(1) { - data = (char*)EVBUFFER_DATA(wt->buffer); - sz = EVBUFFER_LENGTH(wt->buffer); - - if(sz == 0) { /* no data */ - break; + case WS_SENT_FRAME: { /* waiting for frame response */ + wt->debug("We're in WS_SENT_FRAME, just read a frame response. wt->rbuffer:\n"); + evbuffer_debug_dump(wt, wt->rbuffer); + uint8_t flag_opcodes, payload_len; + if (evbuffer_get_length(wt->rbuffer) < 2) { /* not enough data */ + wait_for_possible_read(wt); + return; } - if(*data != 0) { /* missing frame start */ - success = 0; - break; + evbuffer_remove(wt->rbuffer, &flag_opcodes, 1); /* remove flags & opcode */ + evbuffer_remove(wt->rbuffer, &payload_len, 1); /* remove length */ + evbuffer_drain(wt->rbuffer, (size_t)payload_len); /* remove payload itself */ + process_message(wt, payload_len); + + if (evbuffer_get_length(wt->rbuffer) == 0) { /* consumed everything */ + if (wt->msg_received < wt->msg_target) { /* let's write again */ + wt->debug("our turn to write again\n"); + wt->state = WS_RECEIVED_HANDSHAKE; + ws_enqueue_frame(wt); + } /* otherwise, we're done */ + return; + } else { + wt->debug("there's still data to consume\n"); + continue; } - last = memchr(data, 0xff, sz); /* look for frame end */ - if(!last) { - /* no end of frame in sight. */ - break; - } - msg_sz = last - data - 1; - process_message(ptr, msg_sz); /* record packet */ + return; + } - /* drain including frame delimiters (+2 bytes) */ - evbuffer_drain(wt->buffer, msg_sz + 2); + default: + return; } - } else { - printf("ret=%d\n", ret); - success = 0; } - if(success == 0) { - shutdown(fd, SHUT_RDWR); - close(fd); - event_base_loopexit(wt->base, NULL); +} + + +static void +wait_for_possible_read(struct worker_thread *wt) { + wt->debug("%s (wt=%p)\n", __func__, wt); + event_set(&wt->ev_r, wt->fd, EV_READ, websocket_can_read, wt); + event_base_set(wt->base, &wt->ev_r); + event_add(&wt->ev_r, NULL); +} + +static void +wait_for_possible_write(struct worker_thread *wt) { + wt->debug("%s (wt=%p)\n", __func__, wt); + event_set(&wt->ev_r, wt->fd, EV_WRITE, websocket_can_write, wt); + event_base_set(wt->base, &wt->ev_r); + event_add(&wt->ev_r, NULL); +} + +static int +ws_on_header_field(http_parser *p, const char *at, size_t length) { + (void)length; + struct worker_thread *wt = (struct worker_thread *)p->data; + + if (wt->hdr_last_cb_was_name) { /* we're appending to the name */ + wt->cur_hdr_key = realloc(wt->cur_hdr_key, wt->cur_hdr_key_len + length + 1); + memcpy(wt->cur_hdr_key + wt->cur_hdr_key_len, at, length); + wt->cur_hdr_key_len += length; + } else { /* first call for this header name */ + free(wt->cur_hdr_key); /* free the previous header name if there was one */ + wt->cur_hdr_key_len = length; + wt->cur_hdr_key = calloc(length + 1, 1); + memcpy(wt->cur_hdr_key, at, length); } + wt->debug("%s appended header name data: currently [%.*s]\n", __func__, + (int)wt->cur_hdr_key_len, wt->cur_hdr_key); + + wt->hdr_last_cb_was_name = 1; + return 0; +} + +static int +ws_on_header_value(http_parser *p, const char *at, size_t length) { + struct worker_thread *wt = (struct worker_thread *)p->data; + + if (wt->hdr_last_cb_was_name == 0) { /* we're appending to the value */ + wt->cur_hdr_val = realloc(wt->cur_hdr_val, wt->cur_hdr_val_len + length + 1); + memcpy(wt->cur_hdr_val + wt->cur_hdr_val_len, at, length); + wt->cur_hdr_val_len += length; + } else { /* first call for this header value */ + free(wt->cur_hdr_val); /* free the previous header value if there was one */ + wt->cur_hdr_val_len = length; + wt->cur_hdr_val = calloc(length + 1, 1); + memcpy(wt->cur_hdr_val, at, length); + } + wt->debug("%s appended header value data: currently [%.*s]\n", __func__, + (int)wt->cur_hdr_val_len, wt->cur_hdr_val); + + if (wt->cur_hdr_key_len == 20 && strncasecmp(wt->cur_hdr_key, "Sec-WebSocket-Accept", 20) == 0) { + free(wt->sec_websocket_accept); + wt->sec_websocket_accept = calloc(wt->cur_hdr_val_len + 1, 1); + memcpy(wt->sec_websocket_accept, wt->cur_hdr_val, wt->cur_hdr_val_len); + } + + wt->hdr_last_cb_was_name = 0; + return 0; +} + + +static int +ws_on_headers_complete(http_parser *p) { + struct worker_thread *wt = p->data; + wt->debug("%s (wt=%p)\n", __func__, wt); + free(wt->cur_hdr_key); + free(wt->cur_hdr_val); + + /* make sure that we received a Sec-WebSocket-Accept header */ + if (!wt->sec_websocket_accept) { + wt->debug("%s: no Sec-WebSocket-Accept header was returned\n", __func__); + return 1; + } + + /* and that it matches what we expect */ + int ret = 0; + if (strlen(wt->sec_websocket_accept) != wt->ws_response_len + || memcmp(wt->ws_response, wt->sec_websocket_accept, wt->ws_response_len) != 0) { + wt->debug("Invalid WS handshake: expected [%.*s], got [%s]\n", + (int)wt->ws_response_len, wt->ws_response, wt->sec_websocket_accept); + ret = 1; + } + + free(wt->sec_websocket_accept); + return ret; +} + +static void +ws_enqueue_frame_for_command(struct worker_thread *wt, char *cmd, size_t sz) { + int include_mask = (wt->mask_cfg == MASK_ALWAYS || + (wt->mask_cfg == MASK_ALTERNATE && wt->msg_sent % 2 == 0)) ? 1 : 0; + + unsigned char mask[4]; + for (int i = 0; include_mask && i < 4; i++) { /* only if mask is needed */ + mask[i] = rand() & 0xff; + } + uint8_t len = (uint8_t)(sz); /* (1 << 7) | length. */ + if (include_mask) { + len |= (1 << 7); /* set masking bit ON */ + } + + /* apply the mask to the payload */ + for (size_t i = 0; include_mask && i < sz; i++) { + cmd[i] = (cmd[i] ^ mask[i % 4]) & 0xff; + } + /* 0x81 = 10000001b: + 1: FIN bit (meaning there's only one message in the frame), + 0: RSV1 bit (reserved), + 0: RSV2 bit (reserved), + 0: RSV3 bit (reserved), + 0001: text frame */ + evbuffer_add(wt->wbuffer, "\x81", 1); + evbuffer_add(wt->wbuffer, &len, 1); + if (include_mask) { /* only include mask in the frame if needed */ + evbuffer_add(wt->wbuffer, mask, 4); + } + evbuffer_add(wt->wbuffer, cmd, sz); + wt->mask_applied += include_mask; +} + +static void +ws_enqueue_frame(struct worker_thread *wt) { + char ping_command[] = "[\"PING\"]"; + ws_enqueue_frame_for_command(wt, ping_command, sizeof(ping_command) - 1); + + wait_for_possible_write(wt); +} + +static int +ws_on_message_complete(http_parser *p) { + struct worker_thread *wt = p->data; + + wt->debug("%s (wt=%p), upgrade=%d\n", __func__, wt, p->upgrade); + /* we've received the full HTTP response now, so we're ready to send frames */ + wt->state = WS_RECEIVED_HANDSHAKE; + ws_enqueue_frame(wt); /* add frame to buffer and register interest in writing */ + return 0; +} + +static void +ws_on_timeout(evutil_socket_t fd, short event, void *arg) { + struct worker_thread *wt = arg; + (void)fd; + (void)event; + + fprintf(stderr, "Time has run out! (thread %d)\n", wt->id); + wt_mark_finished(wt, WS_BROKEN); /* break out of event loop */ +} + +void* +progress_thread_main(void *ptr) { + struct progress_thread *pt = ptr; + struct timespec ts_wait; + ts_wait.tv_sec = floor(pt->interval_sec); /* integer seconds */ + ts_wait.tv_nsec = (pt->interval_sec - (float)ts_wait.tv_sec) * 1e9; /* nanoseconds */ + + int last_received = 0; + int num_sleeps = 0; + setlocale(LC_NUMERIC, ""); + + struct timespec ts_start; + clock_gettime(CLOCK_MONOTONIC, &ts_start); + + while(1) { + int sem_received = 0; +#ifdef __APPLE__ + dispatch_time_t sem_timeout = dispatch_time(DISPATCH_TIME_NOW, (int64_t)(ts_wait.tv_sec * NSEC_PER_SEC + ts_wait.tv_nsec)); + if (dispatch_semaphore_wait(pt->sem_finished, sem_timeout) == 0) { + sem_received = 1; + } +#else + struct timespec ts_now, ts_sem_timeout; /* now + timeout */ + + /* get current time */ + if (clock_gettime(CLOCK_REALTIME, &ts_now) == -1) { + fprintf(stderr, "clock_gettime failed: %s\n", strerror(errno)); + exit(EXIT_FAILURE); + } + /* calculate when the timeout should occur */ + long long now_nanos = ts_now.tv_sec * 1e9 + ts_now.tv_nsec; + long long sem_timeout_nanos = now_nanos + (long long)ts_wait.tv_sec * 1e9 + (long long)ts_wait.tv_nsec; + ts_sem_timeout.tv_sec = sem_timeout_nanos / 1000000000LL; + ts_sem_timeout.tv_nsec = sem_timeout_nanos % 1000000000LL; + + int sem_ret = sem_timedwait(&pt->sem_finished, &ts_sem_timeout); + if (sem_ret == 0) { + sem_received = 1; + } +#endif + // nanosleep(&ts, NULL); + num_sleeps++; + int total_sent = 0, total_received = 0, any_broken = 0, num_complete = 0; + for (int i = 0; i < pt->worker_count; i++) { + total_sent += pt->workers[i].msg_sent; + total_received += pt->workers[i].msg_received; + if (pt->workers[i].state == WS_BROKEN) { + any_broken = 1; + } else if (pt->workers[i].state == WS_COMPLETE) { + num_complete++; + } + } + struct timespec ts_after_sleep; + clock_gettime(CLOCK_MONOTONIC, &ts_after_sleep); + fprintf(stderr, "After %0.2f sec: %'d messages sent, %'d received (%.02f%%). Instant rate: %'ld/sec, overall rate: %'ld/sec\n", + ((float)((ts_after_sleep.tv_sec * 1e9 + ts_after_sleep.tv_nsec) - (ts_start.tv_sec * 1e9 + ts_start.tv_nsec))) / (float)1e9, + total_sent, total_received, 100.0f * (float)total_received / (float)(pt->worker_count * pt->msg_target), + lroundf((float)(total_received - last_received) / pt->interval_sec), + lroundf((float)total_received / ((float)num_sleeps) * pt->interval_sec)); + + if (sem_received || total_received == pt->msg_target * pt->worker_count || any_broken || num_complete == pt->worker_count) { + break; + } + last_received = total_received; + } + return NULL; } void* worker_main(void *ptr) { char ws_template[] = "GET /.json HTTP/1.1\r\n" - "Host: %s:%d\r\n" - "Connection: Upgrade\r\n" - "Upgrade: WebSocket\r\n" - "Origin: http://%s:%d\r\n" - "Sec-WebSocket-Key1: 18x 6]8vM;54 *(5: { U1]8 z [ 8\r\n" - "Sec-WebSocket-Key2: 1_ tx7X d < nw 334J702) 7]o}` 0\r\n" - "\r\n" - "Tm[K T2u"; + "Host: %s:%d\r\n" + "Connection: Upgrade\r\n" + "Upgrade: WebSocket\r\n" + "Origin: http://%s:%d\r\n" + "Sec-WebSocket-Key: %s\r\n" + "\r\n"; struct worker_thread *wt = ptr; int ret; int fd; + int int_one = 1; struct sockaddr_in addr; - char *ws_handshake; - size_t ws_handshake_sz; + struct timeval timeout_tv; + struct event *timeout_ev; /* connect socket */ fd = socket(AF_INET, SOCK_STREAM, 0); @@ -177,60 +556,116 @@ worker_main(void *ptr) { memset(&(addr.sin_addr), 0, sizeof(addr.sin_addr)); addr.sin_addr.s_addr = inet_addr(wt->hi->host); - ret = connect(fd, (struct sockaddr*)&addr, sizeof(struct sockaddr)); - if(ret != 0) { + ret = connect(fd, (struct sockaddr *)&addr, sizeof(struct sockaddr)); + if (ret != 0) { fprintf(stderr, "connect: ret=%d: %s\n", ret, strerror(errno)); return NULL; } + ret = ioctl(fd, FIONBIO, &int_one); + if (ret != 0) { + fprintf(stderr, "ioctl: ret=%d: %s\n", ret, strerror(errno)); + return NULL; + } /* initialize worker thread */ + wt->fd = fd; wt->base = event_base_new(); - wt->buffer = evbuffer_new(); + wt->rbuffer = evbuffer_new(); + wt->wbuffer = evbuffer_new(); /* write buffer */ wt->byte_count = 0; wt->got_header = 0; - /* send handshake */ - ws_handshake_sz = sizeof(ws_handshake) - + 2*strlen(wt->hi->host) + 500; - ws_handshake = calloc(ws_handshake_sz, 1); - ws_handshake_sz = (size_t)sprintf(ws_handshake, ws_template, - wt->hi->host, wt->hi->port, - wt->hi->host, wt->hi->port); - ret = write(fd, ws_handshake, ws_handshake_sz); + /* generate a random key */ + for (int i = 0; i < 16; i++) { + wt->ws_key[i] = rand() & 0xff; + } + wt->debug("Raw WS key:\n"); + hex_dump(wt, wt->ws_key, 16); + + char encoded_key[23]; /* it shouldn't be more than 4/3 * 16 */ + base64_encodestate b64state; + base64_init_encodestate(&b64state); + int pos = base64_encode_block((const char *)wt->ws_key, 16, encoded_key, &b64state); + int delta = base64_encode_blockend(encoded_key + pos, &b64state); + /* the block ends with a '\n', which we need to remove */ + encoded_key[pos+delta-1] = '\0'; + wt->debug("Encoded WS key [%s]:\n", encoded_key); + + /* compute the expected response, to be validated when we receive it */ + char magic[] = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; + size_t expected_raw_sz = (pos+delta-1) + sizeof(magic)-1; + char *expected_raw = calloc(expected_raw_sz + 1, 1); + memcpy(expected_raw, encoded_key, pos+delta-1); /* add encoded key */ + memcpy(expected_raw + pos+delta-1, magic, sizeof(magic)-1); /* then constant guid */ + + SHA1Context ctx; + SHA1Reset(&ctx); + SHA1Input(&ctx, (const unsigned char*)expected_raw, expected_raw_sz); + SHA1Result(&ctx); + for(int i = 0; i < (int)(20/sizeof(int)); ++i) { /* put in correct byte order */ + ctx.Message_Digest[i] = ntohl(ctx.Message_Digest[i]); + } + + /* and then base64 encode the hash */ + base64_init_encodestate(&b64state); + int resp_pos = base64_encode_block((const char *)ctx.Message_Digest, 20, wt->ws_response, &b64state); + int resp_delta = base64_encode_blockend(wt->ws_response + resp_pos, &b64state); + wt->ws_response_len = resp_pos + resp_delta - 1; + wt->ws_response[wt->ws_response_len] = '\0'; /* again remove the '\n' */ + + wt->debug("Expected response header: [%s]\n", wt->ws_response); + + /* add timeout, if set */ + if (wt->timeout_seconds > 0) { + timeout_tv.tv_sec = wt->timeout_seconds; + timeout_tv.tv_usec = 0; + timeout_ev = event_new(wt->base, -1, EV_TIMEOUT, ws_on_timeout, wt); + event_add(timeout_ev, &timeout_tv); + } - struct event ev_r; - event_set(&ev_r, fd, EV_READ | EV_PERSIST, websocket_read, wt); - event_base_set(wt->base, &ev_r); - event_add(&ev_r, NULL); + /* initialize HTTP parser, to parse the server response */ + memset(&wt->settings, 0, sizeof(http_parser_settings)); + wt->settings.on_header_field = ws_on_header_field; + wt->settings.on_header_value = ws_on_header_value; + wt->settings.on_headers_complete = ws_on_headers_complete; + wt->settings.on_message_complete = ws_on_message_complete; + http_parser_init(&wt->parser, HTTP_RESPONSE); + wt->parser.data = wt; + + /* add GET request to buffer */ + evbuffer_add_printf(wt->wbuffer, ws_template, wt->hi->host, wt->hi->port, + wt->hi->host, wt->hi->port, encoded_key); + wait_for_possible_write(wt); /* request callback */ /* go! */ event_base_dispatch(wt->base); + wt->debug("event_base_dispatch returned\n"); event_base_free(wt->base); - free(ws_handshake); return NULL; } -void -usage(const char* argv0, char *host_default, short port_default, - int thread_count_default, int messages_default) { +void usage(const char *argv0, char *host_default, short port_default, + int thread_count_default, int messages_default) { printf("Usage: %s [options]\n" - "Options are:\n" - "\t-h host\t\t(default = \"%s\")\n" - "\t-p port\t\t(default = %d)\n" - "\t-c threads\t(default = %d)\n" - "\t-n count\t(number of messages per thread, default = %d)\n" - "\t-v\t\t(verbose)\n", - argv0, host_default, (int)port_default, - thread_count_default, messages_default); + "Options are:\n" + "\t[--host|-h] HOST\t(default = \"%s\")\n" + "\t[--port|-p] PORT\t(default = %d)\n" + "\t[--clients|-c] THREADS\t(default = %d)\n" + "\t[--messages|-n] COUNT\t(number of messages per thread, default = %d)\n" + "\t[--mask|-m] MASK_CFG\t(%d: always, %d: never, %d: alternate, default = always)\n" + "\t[--max-time|-t] SECONDS\t(max time to give to the run, default = unlimited)\n" + "\t[--interval|-i] SECONDS\t(interval at which to report progress, default = 1)\n" + "\t[--verbose|-v]\t\t(extremely verbose output)\n", + argv0, host_default, (int)port_default, + thread_count_default, messages_default, + MASK_ALWAYS, MASK_NEVER, MASK_ALTERNATE); } int main(int argc, char *argv[]) { - struct timespec t0, t1; - - int messages_default = 100000; + int messages_default = 2500; int thread_count_default = 4; short port_default = 7379; char *host_default = "127.0.0.1"; @@ -239,85 +674,134 @@ main(int argc, char *argv[]) { int thread_count = thread_count_default; int i, opt; char *colon; - double total = 0, total_bytes = 0; + long total = 0, total_bytes = 0; int verbose = 0; + int timeout_seconds = -1; + float report_interval = 1.0; + enum mask_config mask_cfg = MASK_ALWAYS; struct host_info hi = {host_default, port_default}; struct worker_thread *workers; /* getopt */ - while ((opt = getopt(argc, argv, "h:p:c:n:v")) != -1) { + struct option long_options[] = { + {"help", no_argument, NULL, '?'}, + {"host", required_argument, NULL, 'h'}, + {"port", required_argument, NULL, 'p'}, + {"clients", required_argument, NULL, 'c'}, + {"messages", required_argument, NULL, 'n'}, + {"mask", required_argument, NULL, 'm'}, + {"max-time", required_argument, NULL, 't'}, + {"interval", required_argument, NULL, 'i'}, + {"verbose", no_argument, NULL, 'v'}, + {0, 0, 0, 0}}; + while ((opt = getopt_long(argc, argv, "h:p:c:n:m:t:i:vs", long_options, NULL)) != -1) { switch (opt) { - case 'h': - colon = strchr(optarg, ':'); - if(!colon) { - size_t sz = strlen(optarg); - hi.host = calloc(1 + sz, 1); - strncpy(hi.host, optarg, sz); - } else { - hi.host = calloc(1+colon-optarg, 1); - strncpy(hi.host, optarg, colon-optarg); - hi.port = (short)atol(colon+1); - } - break; - - case 'p': - hi.port = (short)atol(optarg); - break; - - case 'c': - thread_count = atoi(optarg); - break; - - case 'n': - msg_target = atoi(optarg); - break; - - case 'v': - verbose = 1; - break; - default: /* '?' */ - usage(argv[0], host_default, port_default, - thread_count_default, - messages_default); + case 'h': + colon = strchr(optarg, ':'); + if (!colon) { + size_t sz = strlen(optarg); + hi.host = calloc(1 + sz, 1); + strncpy(hi.host, optarg, sz); + } else { + hi.host = calloc(1 + colon - optarg, 1); + strncpy(hi.host, optarg, colon - optarg); + hi.port = (short)atol(colon + 1); + } + break; + + case 'p': + hi.port = (short)atol(optarg); + break; + + case 'c': + thread_count = atoi(optarg); + break; + + case 'n': + msg_target = atoi(optarg); + break; + + case 'm': + mask_cfg = atoi(optarg); + if (mask_cfg < MASK_NEVER || mask_cfg > MASK_ALTERNATE) { + fprintf(stderr, "Invalid mask configuration: %d (range is [%d .. %d])\n", + mask_cfg, MASK_NEVER, MASK_ALTERNATE); exit(EXIT_FAILURE); + } + break; + + case 't': + timeout_seconds = atoi(optarg); + break; + + case 'i': + report_interval = atof(optarg); + break; + + case 'v': + verbose = 1; + break; + + default: /* '?' */ + usage(argv[0], host_default, port_default, + thread_count_default, + messages_default); + exit(EXIT_SUCCESS); } } /* run threads */ workers = calloc(sizeof(struct worker_thread), thread_count); - clock_gettime(CLOCK_MONOTONIC, &t0); - for(i = 0; i < thread_count; ++i) { + struct progress_thread progress; + progress.interval_sec = report_interval; + progress.msg_target = msg_target; + progress.worker_count = thread_count; + progress.workers = workers; +#ifdef __APPLE__ + dispatch_semaphore_t *sem = &progress.sem_finished; + *sem = dispatch_semaphore_create(0); +#else + if (sem_init(&progress.sem_finished, 0, 0) != 0) { + fprintf(stderr, "sem_init failed: %s\n", strerror(errno)); + exit(EXIT_FAILURE); + } +#endif + pthread_create(&progress.thread, NULL, progress_thread_main, &progress); + + for (i = 0; i < thread_count; ++i) { + workers[i].id = i; workers[i].msg_target = msg_target; workers[i].hi = &hi; workers[i].verbose = verbose; - + workers[i].state = WS_INITIAL; + workers[i].debug = verbose ? debug_verbose : debug_noop; + workers[i].timeout_seconds = timeout_seconds; + workers[i].mask_cfg = mask_cfg; pthread_create(&workers[i].thread, NULL, - worker_main, &workers[i]); + worker_main, &workers[i]); } /* wait for threads to finish */ - for(i = 0; i < thread_count; ++i) { + for (i = 0; i < thread_count; ++i) { pthread_join(workers[i].thread, NULL); total += workers[i].msg_received; total_bytes += workers[i].byte_count; } - /* timing */ - clock_gettime(CLOCK_MONOTONIC, &t1); - float mili0 = t0.tv_sec * 1000 + t0.tv_nsec / 1000000; - float mili1 = t1.tv_sec * 1000 + t1.tv_nsec / 1000000; - - if(total != 0) { - printf("Read %ld messages in %0.2f sec: %0.2f msg/sec (%d MB/sec, %d KB/sec)\n", - (long)total, - (mili1-mili0)/1000.0, - 1000*total/(mili1-mili0), - (int)(total_bytes / (1000*(mili1-mili0))), - (int)(total_bytes / (mili1-mili0))); - return EXIT_SUCCESS; + /* signal progress thread to stop */ +#ifdef __APPLE__ + dispatch_semaphore_signal(progress.sem_finished); +#else + sem_post(&progress.sem_finished); +#endif + + pthread_join(progress.thread, NULL); + + if (total != 0) { + return (total == thread_count * msg_target ? EXIT_SUCCESS : EXIT_FAILURE); } else { printf("No message was read.\n"); return EXIT_FAILURE; diff --git a/tests/websocket.html b/tests/websocket.html index a30f936..0f06fc8 100644 --- a/tests/websocket.html +++ b/tests/websocket.html @@ -3,150 +3,210 @@ WebSocket example + + -
Webdis with HTML5 WebSockets
+
+

Webdis with HTML5 WebSockets

+
-
-

JSON

-
- Connecting... -
-
-
-

Raw

-
- Connecting... -
-
+
+
+
+ +
- - var desc = document.createElement("div"); - desc.setAttribute("class", "desc"); - desc.innerHTML = dir; - $(id).appendChild(desc); + diff --git a/webdis.json b/webdis.json index 534ee8c..09d782c 100644 --- a/webdis.json +++ b/webdis.json @@ -26,6 +26,6 @@ } ], - "verbosity": 6, + "verbosity": 4, "logfile": "webdis.log" }