Merge pull request #198 from jessie-murray/websocket-fixes

master
Nicolas Favre-Felix 3 years ago committed by GitHub
commit 5c223e7869
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -159,9 +159,11 @@ conf_read(const char *filename) {
} }
} else if(strcmp(json_object_iter_key(kv),"verbosity") == 0 && json_typeof(jtmp) == JSON_INTEGER){ } else if(strcmp(json_object_iter_key(kv),"verbosity") == 0 && json_typeof(jtmp) == JSON_INTEGER){
int tmp = json_integer_value(jtmp); int tmp = json_integer_value(jtmp);
if(tmp < 0) conf->verbosity = WEBDIS_ERROR; if(tmp < 0 || tmp > (int)WEBDIS_TRACE) {
else if(tmp > (int)WEBDIS_DEBUG) conf->verbosity = WEBDIS_DEBUG; fprintf(stderr, "Invalid log verbosity: %d. Acceptable range: [%d .. %d]\n",
else conf->verbosity = (log_level)tmp; tmp, WEBDIS_ERROR, WEBDIS_TRACE);
}
conf->verbosity = (tmp < 0 ? WEBDIS_ERROR : (tmp > WEBDIS_TRACE ? WEBDIS_TRACE : (log_level)tmp));
} else if(strcmp(json_object_iter_key(kv), "daemonize") == 0 && json_typeof(jtmp) == JSON_TRUE) { } else if(strcmp(json_object_iter_key(kv), "daemonize") == 0 && json_typeof(jtmp) == JSON_TRUE) {
conf->daemonize = 1; conf->daemonize = 1;
} else if(strcmp(json_object_iter_key(kv), "daemonize") == 0 && json_typeof(jtmp) == JSON_STRING) { } else if(strcmp(json_object_iter_key(kv), "daemonize") == 0 && json_typeof(jtmp) == JSON_STRING) {

@ -2,6 +2,7 @@
#include "server.h" #include "server.h"
#include "worker.h" #include "worker.h"
#include "client.h" #include "client.h"
#include "slog.h"
#include <string.h> #include <string.h>
#include <stdlib.h> #include <stdlib.h>
@ -137,7 +138,11 @@ http_schedule_write(int fd, struct http_response *r) {
if(r->w) { /* async */ if(r->w) { /* async */
event_set(&r->ev, fd, EV_WRITE, http_can_write, r); event_set(&r->ev, fd, EV_WRITE, http_can_write, r);
event_base_set(r->w->base, &r->ev); event_base_set(r->w->base, &r->ev);
event_add(&r->ev, NULL); int ret = event_add(&r->ev, NULL);
if (ret != 0) { /* could not schedule write */
slog(r->w->s, WEBDIS_ERROR, "Could not schedule HTTP write", 0);
http_response_cleanup(r, fd, 0);
}
} else { /* blocking */ } else { /* blocking */
http_can_write(fd, 0, r); http_can_write(fd, 0, r);
} }

@ -76,14 +76,22 @@ slog_fsync_init(struct server *s) {
} }
} }
/**
* Returns whether this log level is enabled.
*/
int
slog_enabled(struct server *s, log_level level) {
return level <= s->cfg->verbosity ? 1 : 0;
}
/** /**
* Write log message to disk, or stderr. * Write log message to disk, or stderr.
*/ */
void static void
slog(struct server *s, log_level level, slog_internal(struct server *s, log_level level,
const char *body, size_t sz) { const char *body, size_t sz) {
const char *c = "EWNID"; const char *c = "EWNIDT";
time_t now; time_t now;
struct tm now_tm, *lt_ret; struct tm now_tm, *lt_ret;
char time_buf[64]; char time_buf[64];
@ -91,8 +99,6 @@ slog(struct server *s, log_level level,
char line[256]; /* bounds are checked. */ char line[256]; /* bounds are checked. */
int line_sz, ret; int line_sz, ret;
if(level > s->cfg->verbosity) return; /* too verbose */
if(!s->log.fd) return; if(!s->log.fd) return;
/* limit message size */ /* limit message size */
@ -110,8 +116,9 @@ slog(struct server *s, log_level level,
} }
/* generate output line. */ /* generate output line. */
char letter = (level == WEBDIS_TRACE ? 5 : c[level]);
line_sz = snprintf(line, sizeof(line), line_sz = snprintf(line, sizeof(line),
"[%d] %s %c %s\n", (int)s->log.self, time_buf, c[level], msg); "[%d] %s %c %s\n", (int)s->log.self, time_buf, letter, msg);
/* write to log and maybe flush to disk. */ /* write to log and maybe flush to disk. */
ret = write(s->log.fd, line, line_sz); ret = write(s->log.fd, line, line_sz);
@ -121,3 +128,14 @@ slog(struct server *s, log_level level,
(void)ret; (void)ret;
} }
/**
* Thin wrapper around slog_internal that first checks the log level.
*/
void
slog(struct server *s, log_level level,
const char *body, size_t sz) {
if(level <= s->cfg->verbosity) { /* check log level first */
slog_internal(s, level, body, sz);
}
}

@ -6,7 +6,8 @@ typedef enum {
WEBDIS_WARNING, WEBDIS_WARNING,
WEBDIS_NOTICE, WEBDIS_NOTICE,
WEBDIS_INFO, WEBDIS_INFO,
WEBDIS_DEBUG WEBDIS_DEBUG,
WEBDIS_TRACE = 8
} log_level; } log_level;
typedef enum { typedef enum {
@ -26,6 +27,9 @@ slog_init(struct server *s);
void void
slog_fsync_init(struct server *s); slog_fsync_init(struct server *s);
int
slog_enabled(struct server *s, log_level level);
void slog(struct server *s, log_level level, void slog(struct server *s, log_level level,
const char *body, size_t sz); const char *body, size_t sz);

@ -6,6 +6,7 @@
#include "worker.h" #include "worker.h"
#include "pool.h" #include "pool.h"
#include "http.h" #include "http.h"
#include "slog.h"
/* message parsers */ /* message parsers */
#include "formats/json.h" #include "formats/json.h"
@ -47,7 +48,15 @@ ws_compute_handshake(struct http_client *c, char *out, size_t *out_sz) {
// websocket handshake // websocket handshake
const char *key = client_get_header(c, "Sec-WebSocket-Key"); const char *key = client_get_header(c, "Sec-WebSocket-Key");
size_t key_sz = key?strlen(key):0, buffer_sz = key_sz + sizeof(magic) - 1; size_t key_sz = key?strlen(key):0, buffer_sz = key_sz + sizeof(magic) - 1;
if(!key || key_sz < 16 || key_sz > 32) { /* supposed to be exactly 16 bytes that were b64 encoded */
slog(c->s, WEBDIS_WARNING, "Invalid Sec-WebSocket-Key", 0);
return -1;
}
buffer = calloc(buffer_sz, 1); buffer = calloc(buffer_sz, 1);
if(!buffer) {
slog(c->s, WEBDIS_ERROR, "Failed to allocate memory for WS header", 0);
return -1;
}
// concatenate key and guid in buffer // concatenate key and guid in buffer
memcpy(buffer, key, key_sz); memcpy(buffer, key, key_sz);
@ -57,10 +66,10 @@ ws_compute_handshake(struct http_client *c, char *out, size_t *out_sz) {
SHA1Reset(&ctx); SHA1Reset(&ctx);
SHA1Input(&ctx, buffer, buffer_sz); SHA1Input(&ctx, buffer, buffer_sz);
SHA1Result(&ctx); SHA1Result(&ctx);
for(i = 0; i < 5; ++i) { // put in correct byte order before memcpy. for(i = 0; i < (int)(20/sizeof(int)); ++i) { // put in correct byte order before memcpy.
ctx.Message_Digest[i] = ntohl(ctx.Message_Digest[i]); ctx.Message_Digest[i] = ntohl(ctx.Message_Digest[i]);
} }
memcpy(sha1_output, (unsigned char*)ctx.Message_Digest, 20); memcpy(sha1_output, ctx.Message_Digest, 20);
// encode `sha1_output' in base 64, into `out'. // encode `sha1_output' in base 64, into `out'.
base64_init_encodestate(&b64_ctx); base64_init_encodestate(&b64_ctx);
@ -80,7 +89,6 @@ ws_compute_handshake(struct http_client *c, char *out, size_t *out_sz) {
int int
ws_handshake_reply(struct http_client *c) { ws_handshake_reply(struct http_client *c) {
int ret;
char sha1_handshake[40]; char sha1_handshake[40];
char *buffer = NULL, *p; char *buffer = NULL, *p;
const char *origin = NULL, *host = NULL; const char *origin = NULL, *host = NULL;
@ -109,12 +117,14 @@ ws_handshake_reply(struct http_client *c) {
/* need those headers */ /* need those headers */
if(!origin || !origin_sz || !host || !host_sz || !c->path || !c->path_sz) { if(!origin || !origin_sz || !host || !host_sz || !c->path || !c->path_sz) {
slog(c->s, WEBDIS_WARNING, "Missing headers for WS handshake", 0);
return -1; return -1;
} }
memset(sha1_handshake, 0, sizeof(sha1_handshake)); memset(sha1_handshake, 0, sizeof(sha1_handshake));
if(ws_compute_handshake(c, &sha1_handshake[0], &handshake_sz) != 0) { if(ws_compute_handshake(c, &sha1_handshake[0], &handshake_sz) != 0) {
/* failed to compute handshake. */ /* failed to compute handshake. */
slog(c->s, WEBDIS_WARNING, "Failed to compute handshake", 0);
return -1; return -1;
} }
@ -125,6 +135,10 @@ ws_handshake_reply(struct http_client *c) {
+ sizeof(template4)-1; + sizeof(template4)-1;
p = buffer = malloc(sz); p = buffer = malloc(sz);
if(!p) {
slog(c->s, WEBDIS_ERROR, "Failed to allocate buffer for WS handshake", 0);
return -1;
}
/* Concat all */ /* Concat all */
@ -158,10 +172,19 @@ ws_handshake_reply(struct http_client *c) {
memcpy(p, template4, sizeof(template4)-1); memcpy(p, template4, sizeof(template4)-1);
p += sizeof(template4)-1; p += sizeof(template4)-1;
/* send data to client */ /* build HTTP response object by hand, since we have the full response already */
ret = write(c->fd, buffer, sz); struct http_response *r = calloc(1, sizeof(struct http_response));
(void)ret; if(!r) {
free(buffer); slog(c->s, WEBDIS_ERROR, "Failed to allocate response for WS handshake", 0);
free(buffer);
return -1;
}
r->w = c->w;
r->keep_alive = 1;
r->out = buffer;
r->out_sz = sz;
r->sent = 0;
http_schedule_write(c->fd, r); /* will free buffer and response once sent */
return 0; return 0;
} }
@ -321,6 +344,7 @@ ws_add_data(struct http_client *c) {
if(ret != 0) { if(ret != 0) {
/* can't process frame. */ /* can't process frame. */
slog(c->s, WEBDIS_WARNING, "ws_add_data: ws_execute failed", 0);
return WS_ERROR; return WS_ERROR;
} }
state = ws_parse_data(c->buffer, c->sz, &c->frame); state = ws_parse_data(c->buffer, c->sz, &c->frame);
@ -365,12 +389,14 @@ ws_reply(struct cmd *cmd, const char *p, size_t sz) {
/* send WS frame */ /* send WS frame */
r = http_response_init(cmd->w, 0, NULL); r = http_response_init(cmd->w, 0, NULL);
if (cmd_is_subscribe(cmd)) { if (r == NULL) {
r->keep_alive = 1; free(frame);
slog(cmd->w->s, WEBDIS_ERROR, "Failed response allocation in ws_reply", 0);
return -1;
} }
if (r == NULL) /* mark as keep alive, otherwise we'll close the connection after the first reply */
return -1; r->keep_alive = 1;
r->out = frame; r->out = frame;
r->out_sz = frame_sz; r->out_sz = frame_sz;

@ -1,11 +1,26 @@
OUT=websocket pubsub OUT=websocket pubsub
CFLAGS=-O3 -Wall -Wextra OBJS=../src/http-parser/http_parser.o ../src/b64/cencode.o ../src/sha1/sha1.o
LDFLAGS=-levent -lpthread -lrt CFLAGS=-Wall -Wextra -I../src -I../src/http-parser
LDFLAGS=-levent -lpthread -lm
# if `make` is run with DEBUG=1, include debug symbols (same as in Makefile in root directory)
DEBUG_FLAGS=
ifeq ($(DEBUG),1)
DEBUG_FLAGS += -O0
ifeq ($(shell cc -v 2>&1 | grep -cw 'gcc version'),1) # GCC used: add GDB debugging symbols
DEBUG_FLAGS += -ggdb3
else ifeq ($(shell gcc -v 2>&1 | grep -cw 'clang version'),1) # Clang used: add LLDB debugging symbols
DEBUG_FLAGS += -g3 -glldb
endif
else
DEBUG_FLAGS += -O3
endif
CFLAGS += $(DEBUG_FLAGS)
all: $(OUT) Makefile all: $(OUT) Makefile
websocket: websocket.o websocket: websocket.o $(OBJS)
$(CC) -o $@ $< $(LDFLAGS) $(CC) -o $@ $^ $(LDFLAGS)
pubsub: pubsub.o pubsub: pubsub.o
$(CC) -o $@ $< $(LDFLAGS) $(CC) -o $@ $< $(LDFLAGS)
@ -14,5 +29,5 @@ pubsub: pubsub.o
$(CC) -c $(CFLAGS) -o $@ $< $(CC) -c $(CFLAGS) -o $@ $<
clean: clean:
rm -f *.o $(OUT) rm -f *.o $(OUT) $(OBJS)

@ -1,4 +1,4 @@
/* http://tools.ietf.org/html/draft-hixie-thewebsocketprotocol-76 */ /* https://datatracker.ietf.org/doc/html/rfc6455 */
#include <stdlib.h> #include <stdlib.h>
#define _GNU_SOURCE #define _GNU_SOURCE
@ -8,6 +8,19 @@
#include <pthread.h> #include <pthread.h>
#include <arpa/inet.h> #include <arpa/inet.h>
#include <errno.h> #include <errno.h>
#include <ctype.h>
#include <getopt.h>
#include <sys/ioctl.h>
#include <b64/cencode.h>
#include <http-parser/http_parser.h>
#include <sha1/sha1.h>
#include <math.h>
#include <locale.h>
#ifdef __APPLE__
#include <dispatch/dispatch.h>
#else
#include <semaphore.h>
#endif
#include <sys/types.h> #include <sys/types.h>
#include <sys/socket.h> #include <sys/socket.h>
@ -19,6 +32,21 @@ struct host_info {
short port; short port;
}; };
enum worker_state {
WS_INITIAL,
WS_SENT_HANDSHAKE,
WS_RECEIVED_HANDSHAKE,
WS_SENT_FRAME,
WS_COMPLETE,
WS_BROKEN
};
enum mask_config {
MASK_NEVER,
MASK_ALWAYS,
MASK_ALTERNATE
};
/* worker_thread, with counter of remaining messages */ /* worker_thread, with counter of remaining messages */
struct worker_thread { struct worker_thread {
struct host_info *hi; struct host_info *hi;
@ -28,147 +56,498 @@ struct worker_thread {
int msg_received; int msg_received;
int msg_sent; int msg_sent;
int byte_count; int byte_count;
int id;
pthread_t thread; pthread_t thread;
enum worker_state state;
struct evbuffer *buffer; int timeout_seconds;
/* non-encoded websocket key */
char ws_key[16];
/* expected response */
char ws_response[28];
size_t ws_response_len;
/* actual response */
char *sec_websocket_accept;
/* masking */
enum mask_config mask_cfg;
int mask_applied;
/* current header */
char *cur_hdr_key;
size_t cur_hdr_key_len; /* not including trailing \0 */
char *cur_hdr_val;
size_t cur_hdr_val_len; /* not including trailing \0 */
int hdr_last_cb_was_name; /* tells us if the last call was header name or value */
struct evbuffer *rbuffer;
int got_header; int got_header;
struct evbuffer *wbuffer;
int verbose; int verbose;
int fd;
struct event ev_r;
struct event ev_w; struct event ev_w;
http_parser parser;
http_parser_settings settings;
int (*debug)(const char *fmt, ...);
};
struct progress_thread {
pthread_t thread;
#ifdef __APPLE__
dispatch_semaphore_t sem_finished;
#else
sem_t sem_finished;
#endif
struct worker_thread *workers;
int worker_count;
int msg_target;
float interval_sec;
}; };
int debug_noop(const char *fmt, ...) {
(void)fmt;
return 0;
}
int debug_verbose(const char *fmt, ...) {
int ret;
va_list vargs;
va_start(vargs, fmt);
ret = vfprintf(stderr, fmt, vargs);
va_end(vargs);
return ret;
}
void void
process_message(struct worker_thread *wt, size_t sz) { hex_dump(struct worker_thread *wt, char *p, size_t sz) {
wt->debug("hex dump of %p (%ld bytes)\n", p, sz);
for (char *cur = p; cur < p + sz; cur += 16) {
char letters[16] = {0};
int limit = (cur + 16) > p + sz ? (sz % 16) : 16;
wt->debug("%08lx ", cur - p); /* address */
for (int i = 0; i < limit; i++) {
wt->debug("%02x ", (unsigned int)(cur[i] & 0xff));
letters[i] = isprint(cur[i]) ? cur[i] : '.';
}
for (int i = limit; i < 16; i++) { /* pad on last line */
wt->debug(" "); /* 3 spaces for "%02x " */
}
wt->debug(" %.*s\n", limit, letters);
}
}
// printf("process_message\n"); void
if(wt->msg_received % 10000 == 0) { evbuffer_debug_dump(struct worker_thread *wt, struct evbuffer *buffer) {
printf("thread %u: %8d messages left (got %9d bytes so far).\n", size_t sz = evbuffer_get_length(buffer);
(unsigned int)wt->thread, char *data = malloc(sz);
wt->msg_target - wt->msg_received, wt->byte_count); if (!data) {
fprintf(stderr, "failed to allocate %ld bytes\n", sz);
return;
}
evbuffer_remove(buffer, data, sz);
hex_dump(wt, data, sz);
evbuffer_prepend(buffer, data, sz);
free(data);
}
static void
wait_for_possible_read(struct worker_thread *wt);
static void
wait_for_possible_write(struct worker_thread *wt);
static void
ws_enqueue_frame(struct worker_thread *wt);
static void
wt_mark_finished(struct worker_thread *wt, enum worker_state state) {
wt->state = state;
event_base_loopbreak(wt->base);
}
void
process_message(struct worker_thread *wt, size_t sz) {
if (0 && wt->msg_received && wt->msg_received % 1000 == 0) {
printf("thread %d: %8d messages left (got %9d bytes so far).\n",
wt->id,
wt->msg_target - wt->msg_received, wt->byte_count);
} }
wt->byte_count += sz; wt->byte_count += sz;
/* decrement read count, and stop receiving when we reach zero. */ /* decrement read count, and stop receiving when we reach zero. */
wt->msg_received++; wt->msg_received++;
if(wt->msg_received == wt->msg_target) { if (wt->msg_received == wt->msg_target) {
event_base_loopexit(wt->base, NULL); wt->debug("%s: thread %d has received all %d messages it expected\n",
__func__, wt->id, wt->msg_received);
wt_mark_finished(wt, WS_COMPLETE);
} }
} }
/**
* Called when we can write to the socket.
*/
void void
websocket_write(int fd, short event, void *ptr) { websocket_can_write(int fd, short event, void *ptr) {
int ret; int ret;
struct worker_thread *wt = ptr; struct worker_thread *wt = ptr;
(void) event;
if(event != EV_WRITE) { wt->debug("%s (wt=%p, fd=%d)\n", __func__, wt, fd);
switch (wt->state) {
case WS_INITIAL: { /* still sending initial HTTP request */
ret = evbuffer_write(wt->wbuffer, fd);
wt->debug("evbuffer_write returned %d\n", ret);
wt->debug("evbuffer_get_length returned %d\n", evbuffer_get_length(wt->wbuffer));
if (evbuffer_get_length(wt->wbuffer) != 0) { /* not all written */
wait_for_possible_write(wt);
return;
}
/* otherwise, we've sent the full request, time to read the response */
wt->state = WS_SENT_HANDSHAKE;
wt->debug("state=WS_SENT_HANDSHAKE\n");
wait_for_possible_read(wt);
return; return;
} }
case WS_RECEIVED_HANDSHAKE: { /* ready to send a frame */
char message[] = "\x00[\"SET\",\"key\",\"value\"]\xff\x00[\"GET\",\"key\"]\xff"; wt->debug("About to send data for WS frame, %lu in buffer\n", evbuffer_get_length(wt->wbuffer));
ret = write(fd, message, sizeof(message)-1); evbuffer_write(wt->wbuffer, fd);
if(ret != sizeof(message)-1) { size_t write_remains = evbuffer_get_length(wt->wbuffer);
fprintf(stderr, "write on %d failed: %s\n", fd, strerror(errno)); wt->debug("Sent data for WS frame, still %lu left to write\n", write_remains);
close(fd); if (write_remains == 0) { /* ready to read response */
wt->state = WS_SENT_FRAME;
wt->msg_sent++;
wait_for_possible_read(wt);
} else { /* not finished writing */
wait_for_possible_write(wt);
}
return;
} }
default:
wt->msg_sent += 2; break;
if(wt->msg_sent < wt->msg_target) {
event_set(&wt->ev_w, fd, EV_WRITE, websocket_write, wt);
event_base_set(wt->base, &wt->ev_w);
ret = event_add(&wt->ev_w, NULL);
} }
} }
static void static void
websocket_read(int fd, short event, void *ptr) { websocket_can_read(int fd, short event, void *ptr) {
char packet[2048], *pos; int ret;
int ret, success = 1;
struct worker_thread *wt = ptr; struct worker_thread *wt = ptr;
(void) event;
wt->debug("%s (wt=%p)\n", __func__, wt);
if(event != EV_READ) { /* read message */
ret = evbuffer_read(wt->rbuffer, fd, 65536);
wt->debug("evbuffer_read() returned %d; wt->state=%d. wt->rbuffer:\n", ret, wt->state);
evbuffer_debug_dump(wt, wt->rbuffer);
if (ret == 0) {
wt->debug("We didn't read anything from the socket...\n");
wt_mark_finished(wt, WS_BROKEN);
return; return;
} }
/* read message */ while (1) {
ret = read(fd, packet, sizeof(packet)); switch (wt->state) {
pos = packet; case WS_SENT_HANDSHAKE: { /* waiting for handshake response */
if(ret > 0) { size_t avail_sz = evbuffer_get_length(wt->rbuffer);
char *data, *last; char *tmp = calloc(avail_sz, 1);
int sz, msg_sz; wt->debug("avail_sz from rbuffer = %lu\n", avail_sz);
evbuffer_remove(wt->rbuffer, tmp, avail_sz); /* copy into `tmp` */
if(wt->got_header == 0) { /* first response */ wt->debug("Giving %lu bytes to http-parser\n", avail_sz);
char *frame_start = strstr(packet, "MH"); /* end of the handshake */ int nparsed = http_parser_execute(&wt->parser, &wt->settings, tmp, avail_sz);
if(frame_start == NULL) { wt->debug("http-parser returned %d\n", nparsed);
return; /* not yet */ free(tmp);
} else { /* start monitoring possible writes */ /* http parser will return the offset at which the upgraded protocol begins,
printf("start monitoring possible writes\n"); which in our case is 1 under the total response size. */
evbuffer_add(wt->buffer, frame_start + 2, ret - (frame_start + 2 - packet));
if (wt->state == WS_SENT_HANDSHAKE || /* haven't encountered end of response yet */
wt->got_header = 1; (wt->parser.upgrade && nparsed != (int)avail_sz -1)) {
event_set(&wt->ev_w, fd, EV_WRITE, wt->debug("UPGRADE *and* we have some data left (nparsed=%d, avail_sz=%lu)\n", nparsed, avail_sz);
websocket_write, wt); continue;
event_base_set(wt->base, &wt->ev_w); } else if (wt->state == WS_RECEIVED_HANDSHAKE) { /* we have the full response */
ret = event_add(&wt->ev_w, NULL); evbuffer_drain(wt->rbuffer, evbuffer_get_length(wt->rbuffer));
} }
} else { return;
/* we've had the header already, now bufffer data. */
evbuffer_add(wt->buffer, packet, ret);
} }
while(1) { case WS_SENT_FRAME: { /* waiting for frame response */
data = (char*)EVBUFFER_DATA(wt->buffer); wt->debug("We're in WS_SENT_FRAME, just read a frame response. wt->rbuffer:\n");
sz = EVBUFFER_LENGTH(wt->buffer); evbuffer_debug_dump(wt, wt->rbuffer);
uint8_t flag_opcodes, payload_len;
if(sz == 0) { /* no data */ if (evbuffer_get_length(wt->rbuffer) < 2) { /* not enough data */
break; wait_for_possible_read(wt);
return;
} }
if(*data != 0) { /* missing frame start */ evbuffer_remove(wt->rbuffer, &flag_opcodes, 1); /* remove flags & opcode */
success = 0; evbuffer_remove(wt->rbuffer, &payload_len, 1); /* remove length */
break; evbuffer_drain(wt->rbuffer, (size_t)payload_len); /* remove payload itself */
process_message(wt, payload_len);
if (evbuffer_get_length(wt->rbuffer) == 0) { /* consumed everything */
if (wt->msg_received < wt->msg_target) { /* let's write again */
wt->debug("our turn to write again\n");
wt->state = WS_RECEIVED_HANDSHAKE;
ws_enqueue_frame(wt);
} /* otherwise, we're done */
return;
} else {
wt->debug("there's still data to consume\n");
continue;
} }
last = memchr(data, 0xff, sz); /* look for frame end */ return;
if(!last) { }
/* no end of frame in sight. */
break;
}
msg_sz = last - data - 1;
process_message(ptr, msg_sz); /* record packet */
/* drain including frame delimiters (+2 bytes) */ default:
evbuffer_drain(wt->buffer, msg_sz + 2); return;
} }
} else {
printf("ret=%d\n", ret);
success = 0;
} }
if(success == 0) { }
shutdown(fd, SHUT_RDWR);
close(fd);
event_base_loopexit(wt->base, NULL); static void
wait_for_possible_read(struct worker_thread *wt) {
wt->debug("%s (wt=%p)\n", __func__, wt);
event_set(&wt->ev_r, wt->fd, EV_READ, websocket_can_read, wt);
event_base_set(wt->base, &wt->ev_r);
event_add(&wt->ev_r, NULL);
}
static void
wait_for_possible_write(struct worker_thread *wt) {
wt->debug("%s (wt=%p)\n", __func__, wt);
event_set(&wt->ev_r, wt->fd, EV_WRITE, websocket_can_write, wt);
event_base_set(wt->base, &wt->ev_r);
event_add(&wt->ev_r, NULL);
}
static int
ws_on_header_field(http_parser *p, const char *at, size_t length) {
(void)length;
struct worker_thread *wt = (struct worker_thread *)p->data;
if (wt->hdr_last_cb_was_name) { /* we're appending to the name */
wt->cur_hdr_key = realloc(wt->cur_hdr_key, wt->cur_hdr_key_len + length + 1);
memcpy(wt->cur_hdr_key + wt->cur_hdr_key_len, at, length);
wt->cur_hdr_key_len += length;
} else { /* first call for this header name */
free(wt->cur_hdr_key); /* free the previous header name if there was one */
wt->cur_hdr_key_len = length;
wt->cur_hdr_key = calloc(length + 1, 1);
memcpy(wt->cur_hdr_key, at, length);
} }
wt->debug("%s appended header name data: currently [%.*s]\n", __func__,
(int)wt->cur_hdr_key_len, wt->cur_hdr_key);
wt->hdr_last_cb_was_name = 1;
return 0;
}
static int
ws_on_header_value(http_parser *p, const char *at, size_t length) {
struct worker_thread *wt = (struct worker_thread *)p->data;
if (wt->hdr_last_cb_was_name == 0) { /* we're appending to the value */
wt->cur_hdr_val = realloc(wt->cur_hdr_val, wt->cur_hdr_val_len + length + 1);
memcpy(wt->cur_hdr_val + wt->cur_hdr_val_len, at, length);
wt->cur_hdr_val_len += length;
} else { /* first call for this header value */
free(wt->cur_hdr_val); /* free the previous header value if there was one */
wt->cur_hdr_val_len = length;
wt->cur_hdr_val = calloc(length + 1, 1);
memcpy(wt->cur_hdr_val, at, length);
}
wt->debug("%s appended header value data: currently [%.*s]\n", __func__,
(int)wt->cur_hdr_val_len, wt->cur_hdr_val);
if (wt->cur_hdr_key_len == 20 && strncasecmp(wt->cur_hdr_key, "Sec-WebSocket-Accept", 20) == 0) {
free(wt->sec_websocket_accept);
wt->sec_websocket_accept = calloc(wt->cur_hdr_val_len + 1, 1);
memcpy(wt->sec_websocket_accept, wt->cur_hdr_val, wt->cur_hdr_val_len);
}
wt->hdr_last_cb_was_name = 0;
return 0;
}
static int
ws_on_headers_complete(http_parser *p) {
struct worker_thread *wt = p->data;
wt->debug("%s (wt=%p)\n", __func__, wt);
free(wt->cur_hdr_key);
free(wt->cur_hdr_val);
/* make sure that we received a Sec-WebSocket-Accept header */
if (!wt->sec_websocket_accept) {
wt->debug("%s: no Sec-WebSocket-Accept header was returned\n", __func__);
return 1;
}
/* and that it matches what we expect */
int ret = 0;
if (strlen(wt->sec_websocket_accept) != wt->ws_response_len
|| memcmp(wt->ws_response, wt->sec_websocket_accept, wt->ws_response_len) != 0) {
wt->debug("Invalid WS handshake: expected [%.*s], got [%s]\n",
(int)wt->ws_response_len, wt->ws_response, wt->sec_websocket_accept);
ret = 1;
}
free(wt->sec_websocket_accept);
return ret;
}
static void
ws_enqueue_frame_for_command(struct worker_thread *wt, char *cmd, size_t sz) {
int include_mask = (wt->mask_cfg == MASK_ALWAYS ||
(wt->mask_cfg == MASK_ALTERNATE && wt->msg_sent % 2 == 0)) ? 1 : 0;
unsigned char mask[4];
for (int i = 0; include_mask && i < 4; i++) { /* only if mask is needed */
mask[i] = rand() & 0xff;
}
uint8_t len = (uint8_t)(sz); /* (1 << 7) | length. */
if (include_mask) {
len |= (1 << 7); /* set masking bit ON */
}
/* apply the mask to the payload */
for (size_t i = 0; include_mask && i < sz; i++) {
cmd[i] = (cmd[i] ^ mask[i % 4]) & 0xff;
}
/* 0x81 = 10000001b:
1: FIN bit (meaning there's only one message in the frame),
0: RSV1 bit (reserved),
0: RSV2 bit (reserved),
0: RSV3 bit (reserved),
0001: text frame */
evbuffer_add(wt->wbuffer, "\x81", 1);
evbuffer_add(wt->wbuffer, &len, 1);
if (include_mask) { /* only include mask in the frame if needed */
evbuffer_add(wt->wbuffer, mask, 4);
}
evbuffer_add(wt->wbuffer, cmd, sz);
wt->mask_applied += include_mask;
}
static void
ws_enqueue_frame(struct worker_thread *wt) {
char ping_command[] = "[\"PING\"]";
ws_enqueue_frame_for_command(wt, ping_command, sizeof(ping_command) - 1);
wait_for_possible_write(wt);
}
static int
ws_on_message_complete(http_parser *p) {
struct worker_thread *wt = p->data;
wt->debug("%s (wt=%p), upgrade=%d\n", __func__, wt, p->upgrade);
/* we've received the full HTTP response now, so we're ready to send frames */
wt->state = WS_RECEIVED_HANDSHAKE;
ws_enqueue_frame(wt); /* add frame to buffer and register interest in writing */
return 0;
}
static void
ws_on_timeout(evutil_socket_t fd, short event, void *arg) {
struct worker_thread *wt = arg;
(void)fd;
(void)event;
fprintf(stderr, "Time has run out! (thread %d)\n", wt->id);
wt_mark_finished(wt, WS_BROKEN); /* break out of event loop */
}
void*
progress_thread_main(void *ptr) {
struct progress_thread *pt = ptr;
struct timespec ts_wait;
ts_wait.tv_sec = floor(pt->interval_sec); /* integer seconds */
ts_wait.tv_nsec = (pt->interval_sec - (float)ts_wait.tv_sec) * 1e9; /* nanoseconds */
int last_received = 0;
int num_sleeps = 0;
setlocale(LC_NUMERIC, "");
struct timespec ts_start;
clock_gettime(CLOCK_MONOTONIC, &ts_start);
while(1) {
int sem_received = 0;
#ifdef __APPLE__
dispatch_time_t sem_timeout = dispatch_time(DISPATCH_TIME_NOW, (int64_t)(ts_wait.tv_sec * NSEC_PER_SEC + ts_wait.tv_nsec));
if (dispatch_semaphore_wait(pt->sem_finished, sem_timeout) == 0) {
sem_received = 1;
}
#else
struct timespec ts_now, ts_sem_timeout; /* now + timeout */
/* get current time */
if (clock_gettime(CLOCK_REALTIME, &ts_now) == -1) {
fprintf(stderr, "clock_gettime failed: %s\n", strerror(errno));
exit(EXIT_FAILURE);
}
/* calculate when the timeout should occur */
long long now_nanos = ts_now.tv_sec * 1e9 + ts_now.tv_nsec;
long long sem_timeout_nanos = now_nanos + (long long)ts_wait.tv_sec * 1e9 + (long long)ts_wait.tv_nsec;
ts_sem_timeout.tv_sec = sem_timeout_nanos / 1000000000LL;
ts_sem_timeout.tv_nsec = sem_timeout_nanos % 1000000000LL;
int sem_ret = sem_timedwait(&pt->sem_finished, &ts_sem_timeout);
if (sem_ret == 0) {
sem_received = 1;
}
#endif
// nanosleep(&ts, NULL);
num_sleeps++;
int total_sent = 0, total_received = 0, any_broken = 0, num_complete = 0;
for (int i = 0; i < pt->worker_count; i++) {
total_sent += pt->workers[i].msg_sent;
total_received += pt->workers[i].msg_received;
if (pt->workers[i].state == WS_BROKEN) {
any_broken = 1;
} else if (pt->workers[i].state == WS_COMPLETE) {
num_complete++;
}
}
struct timespec ts_after_sleep;
clock_gettime(CLOCK_MONOTONIC, &ts_after_sleep);
fprintf(stderr, "After %0.2f sec: %'d messages sent, %'d received (%.02f%%). Instant rate: %'ld/sec, overall rate: %'ld/sec\n",
((float)((ts_after_sleep.tv_sec * 1e9 + ts_after_sleep.tv_nsec) - (ts_start.tv_sec * 1e9 + ts_start.tv_nsec))) / (float)1e9,
total_sent, total_received, 100.0f * (float)total_received / (float)(pt->worker_count * pt->msg_target),
lroundf((float)(total_received - last_received) / pt->interval_sec),
lroundf((float)total_received / ((float)num_sleeps) * pt->interval_sec));
if (sem_received || total_received == pt->msg_target * pt->worker_count || any_broken || num_complete == pt->worker_count) {
break;
}
last_received = total_received;
}
return NULL;
} }
void* void*
worker_main(void *ptr) { worker_main(void *ptr) {
char ws_template[] = "GET /.json HTTP/1.1\r\n" char ws_template[] = "GET /.json HTTP/1.1\r\n"
"Host: %s:%d\r\n" "Host: %s:%d\r\n"
"Connection: Upgrade\r\n" "Connection: Upgrade\r\n"
"Upgrade: WebSocket\r\n" "Upgrade: WebSocket\r\n"
"Origin: http://%s:%d\r\n" "Origin: http://%s:%d\r\n"
"Sec-WebSocket-Key1: 18x 6]8vM;54 *(5: { U1]8 z [ 8\r\n" "Sec-WebSocket-Key: %s\r\n"
"Sec-WebSocket-Key2: 1_ tx7X d < nw 334J702) 7]o}` 0\r\n" "\r\n";
"\r\n"
"Tm[K T2u";
struct worker_thread *wt = ptr; struct worker_thread *wt = ptr;
int ret; int ret;
int fd; int fd;
int int_one = 1;
struct sockaddr_in addr; struct sockaddr_in addr;
char *ws_handshake; struct timeval timeout_tv;
size_t ws_handshake_sz; struct event *timeout_ev;
/* connect socket */ /* connect socket */
fd = socket(AF_INET, SOCK_STREAM, 0); fd = socket(AF_INET, SOCK_STREAM, 0);
@ -177,60 +556,116 @@ worker_main(void *ptr) {
memset(&(addr.sin_addr), 0, sizeof(addr.sin_addr)); memset(&(addr.sin_addr), 0, sizeof(addr.sin_addr));
addr.sin_addr.s_addr = inet_addr(wt->hi->host); addr.sin_addr.s_addr = inet_addr(wt->hi->host);
ret = connect(fd, (struct sockaddr*)&addr, sizeof(struct sockaddr)); ret = connect(fd, (struct sockaddr *)&addr, sizeof(struct sockaddr));
if(ret != 0) { if (ret != 0) {
fprintf(stderr, "connect: ret=%d: %s\n", ret, strerror(errno)); fprintf(stderr, "connect: ret=%d: %s\n", ret, strerror(errno));
return NULL; return NULL;
} }
ret = ioctl(fd, FIONBIO, &int_one);
if (ret != 0) {
fprintf(stderr, "ioctl: ret=%d: %s\n", ret, strerror(errno));
return NULL;
}
/* initialize worker thread */ /* initialize worker thread */
wt->fd = fd;
wt->base = event_base_new(); wt->base = event_base_new();
wt->buffer = evbuffer_new(); wt->rbuffer = evbuffer_new();
wt->wbuffer = evbuffer_new(); /* write buffer */
wt->byte_count = 0; wt->byte_count = 0;
wt->got_header = 0; wt->got_header = 0;
/* send handshake */ /* generate a random key */
ws_handshake_sz = sizeof(ws_handshake) for (int i = 0; i < 16; i++) {
+ 2*strlen(wt->hi->host) + 500; wt->ws_key[i] = rand() & 0xff;
ws_handshake = calloc(ws_handshake_sz, 1); }
ws_handshake_sz = (size_t)sprintf(ws_handshake, ws_template, wt->debug("Raw WS key:\n");
wt->hi->host, wt->hi->port, hex_dump(wt, wt->ws_key, 16);
wt->hi->host, wt->hi->port);
ret = write(fd, ws_handshake, ws_handshake_sz); char encoded_key[23]; /* it shouldn't be more than 4/3 * 16 */
base64_encodestate b64state;
base64_init_encodestate(&b64state);
int pos = base64_encode_block((const char *)wt->ws_key, 16, encoded_key, &b64state);
int delta = base64_encode_blockend(encoded_key + pos, &b64state);
/* the block ends with a '\n', which we need to remove */
encoded_key[pos+delta-1] = '\0';
wt->debug("Encoded WS key [%s]:\n", encoded_key);
/* compute the expected response, to be validated when we receive it */
char magic[] = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
size_t expected_raw_sz = (pos+delta-1) + sizeof(magic)-1;
char *expected_raw = calloc(expected_raw_sz + 1, 1);
memcpy(expected_raw, encoded_key, pos+delta-1); /* add encoded key */
memcpy(expected_raw + pos+delta-1, magic, sizeof(magic)-1); /* then constant guid */
SHA1Context ctx;
SHA1Reset(&ctx);
SHA1Input(&ctx, (const unsigned char*)expected_raw, expected_raw_sz);
SHA1Result(&ctx);
for(int i = 0; i < (int)(20/sizeof(int)); ++i) { /* put in correct byte order */
ctx.Message_Digest[i] = ntohl(ctx.Message_Digest[i]);
}
/* and then base64 encode the hash */
base64_init_encodestate(&b64state);
int resp_pos = base64_encode_block((const char *)ctx.Message_Digest, 20, wt->ws_response, &b64state);
int resp_delta = base64_encode_blockend(wt->ws_response + resp_pos, &b64state);
wt->ws_response_len = resp_pos + resp_delta - 1;
wt->ws_response[wt->ws_response_len] = '\0'; /* again remove the '\n' */
wt->debug("Expected response header: [%s]\n", wt->ws_response);
/* add timeout, if set */
if (wt->timeout_seconds > 0) {
timeout_tv.tv_sec = wt->timeout_seconds;
timeout_tv.tv_usec = 0;
timeout_ev = event_new(wt->base, -1, EV_TIMEOUT, ws_on_timeout, wt);
event_add(timeout_ev, &timeout_tv);
}
struct event ev_r; /* initialize HTTP parser, to parse the server response */
event_set(&ev_r, fd, EV_READ | EV_PERSIST, websocket_read, wt); memset(&wt->settings, 0, sizeof(http_parser_settings));
event_base_set(wt->base, &ev_r); wt->settings.on_header_field = ws_on_header_field;
event_add(&ev_r, NULL); wt->settings.on_header_value = ws_on_header_value;
wt->settings.on_headers_complete = ws_on_headers_complete;
wt->settings.on_message_complete = ws_on_message_complete;
http_parser_init(&wt->parser, HTTP_RESPONSE);
wt->parser.data = wt;
/* add GET request to buffer */
evbuffer_add_printf(wt->wbuffer, ws_template, wt->hi->host, wt->hi->port,
wt->hi->host, wt->hi->port, encoded_key);
wait_for_possible_write(wt); /* request callback */
/* go! */ /* go! */
event_base_dispatch(wt->base); event_base_dispatch(wt->base);
wt->debug("event_base_dispatch returned\n");
event_base_free(wt->base); event_base_free(wt->base);
free(ws_handshake);
return NULL; return NULL;
} }
void void usage(const char *argv0, char *host_default, short port_default,
usage(const char* argv0, char *host_default, short port_default, int thread_count_default, int messages_default) {
int thread_count_default, int messages_default) {
printf("Usage: %s [options]\n" printf("Usage: %s [options]\n"
"Options are:\n" "Options are:\n"
"\t-h host\t\t(default = \"%s\")\n" "\t[--host|-h] HOST\t(default = \"%s\")\n"
"\t-p port\t\t(default = %d)\n" "\t[--port|-p] PORT\t(default = %d)\n"
"\t-c threads\t(default = %d)\n" "\t[--clients|-c] THREADS\t(default = %d)\n"
"\t-n count\t(number of messages per thread, default = %d)\n" "\t[--messages|-n] COUNT\t(number of messages per thread, default = %d)\n"
"\t-v\t\t(verbose)\n", "\t[--mask|-m] MASK_CFG\t(%d: always, %d: never, %d: alternate, default = always)\n"
argv0, host_default, (int)port_default, "\t[--max-time|-t] SECONDS\t(max time to give to the run, default = unlimited)\n"
thread_count_default, messages_default); "\t[--interval|-i] SECONDS\t(interval at which to report progress, default = 1)\n"
"\t[--verbose|-v]\t\t(extremely verbose output)\n",
argv0, host_default, (int)port_default,
thread_count_default, messages_default,
MASK_ALWAYS, MASK_NEVER, MASK_ALTERNATE);
} }
int int
main(int argc, char *argv[]) { main(int argc, char *argv[]) {
struct timespec t0, t1; int messages_default = 2500;
int messages_default = 100000;
int thread_count_default = 4; int thread_count_default = 4;
short port_default = 7379; short port_default = 7379;
char *host_default = "127.0.0.1"; char *host_default = "127.0.0.1";
@ -239,85 +674,134 @@ main(int argc, char *argv[]) {
int thread_count = thread_count_default; int thread_count = thread_count_default;
int i, opt; int i, opt;
char *colon; char *colon;
double total = 0, total_bytes = 0; long total = 0, total_bytes = 0;
int verbose = 0; int verbose = 0;
int timeout_seconds = -1;
float report_interval = 1.0;
enum mask_config mask_cfg = MASK_ALWAYS;
struct host_info hi = {host_default, port_default}; struct host_info hi = {host_default, port_default};
struct worker_thread *workers; struct worker_thread *workers;
/* getopt */ /* getopt */
while ((opt = getopt(argc, argv, "h:p:c:n:v")) != -1) { struct option long_options[] = {
{"help", no_argument, NULL, '?'},
{"host", required_argument, NULL, 'h'},
{"port", required_argument, NULL, 'p'},
{"clients", required_argument, NULL, 'c'},
{"messages", required_argument, NULL, 'n'},
{"mask", required_argument, NULL, 'm'},
{"max-time", required_argument, NULL, 't'},
{"interval", required_argument, NULL, 'i'},
{"verbose", no_argument, NULL, 'v'},
{0, 0, 0, 0}};
while ((opt = getopt_long(argc, argv, "h:p:c:n:m:t:i:vs", long_options, NULL)) != -1) {
switch (opt) { switch (opt) {
case 'h': case 'h':
colon = strchr(optarg, ':'); colon = strchr(optarg, ':');
if(!colon) { if (!colon) {
size_t sz = strlen(optarg); size_t sz = strlen(optarg);
hi.host = calloc(1 + sz, 1); hi.host = calloc(1 + sz, 1);
strncpy(hi.host, optarg, sz); strncpy(hi.host, optarg, sz);
} else { } else {
hi.host = calloc(1+colon-optarg, 1); hi.host = calloc(1 + colon - optarg, 1);
strncpy(hi.host, optarg, colon-optarg); strncpy(hi.host, optarg, colon - optarg);
hi.port = (short)atol(colon+1); hi.port = (short)atol(colon + 1);
} }
break; break;
case 'p': case 'p':
hi.port = (short)atol(optarg); hi.port = (short)atol(optarg);
break; break;
case 'c': case 'c':
thread_count = atoi(optarg); thread_count = atoi(optarg);
break; break;
case 'n': case 'n':
msg_target = atoi(optarg); msg_target = atoi(optarg);
break; break;
case 'v': case 'm':
verbose = 1; mask_cfg = atoi(optarg);
break; if (mask_cfg < MASK_NEVER || mask_cfg > MASK_ALTERNATE) {
default: /* '?' */ fprintf(stderr, "Invalid mask configuration: %d (range is [%d .. %d])\n",
usage(argv[0], host_default, port_default, mask_cfg, MASK_NEVER, MASK_ALTERNATE);
thread_count_default,
messages_default);
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
}
break;
case 't':
timeout_seconds = atoi(optarg);
break;
case 'i':
report_interval = atof(optarg);
break;
case 'v':
verbose = 1;
break;
default: /* '?' */
usage(argv[0], host_default, port_default,
thread_count_default,
messages_default);
exit(EXIT_SUCCESS);
} }
} }
/* run threads */ /* run threads */
workers = calloc(sizeof(struct worker_thread), thread_count); workers = calloc(sizeof(struct worker_thread), thread_count);
clock_gettime(CLOCK_MONOTONIC, &t0); struct progress_thread progress;
for(i = 0; i < thread_count; ++i) { progress.interval_sec = report_interval;
progress.msg_target = msg_target;
progress.worker_count = thread_count;
progress.workers = workers;
#ifdef __APPLE__
dispatch_semaphore_t *sem = &progress.sem_finished;
*sem = dispatch_semaphore_create(0);
#else
if (sem_init(&progress.sem_finished, 0, 0) != 0) {
fprintf(stderr, "sem_init failed: %s\n", strerror(errno));
exit(EXIT_FAILURE);
}
#endif
pthread_create(&progress.thread, NULL, progress_thread_main, &progress);
for (i = 0; i < thread_count; ++i) {
workers[i].id = i;
workers[i].msg_target = msg_target; workers[i].msg_target = msg_target;
workers[i].hi = &hi; workers[i].hi = &hi;
workers[i].verbose = verbose; workers[i].verbose = verbose;
workers[i].state = WS_INITIAL;
workers[i].debug = verbose ? debug_verbose : debug_noop;
workers[i].timeout_seconds = timeout_seconds;
workers[i].mask_cfg = mask_cfg;
pthread_create(&workers[i].thread, NULL, pthread_create(&workers[i].thread, NULL,
worker_main, &workers[i]); worker_main, &workers[i]);
} }
/* wait for threads to finish */ /* wait for threads to finish */
for(i = 0; i < thread_count; ++i) { for (i = 0; i < thread_count; ++i) {
pthread_join(workers[i].thread, NULL); pthread_join(workers[i].thread, NULL);
total += workers[i].msg_received; total += workers[i].msg_received;
total_bytes += workers[i].byte_count; total_bytes += workers[i].byte_count;
} }
/* timing */ /* signal progress thread to stop */
clock_gettime(CLOCK_MONOTONIC, &t1); #ifdef __APPLE__
float mili0 = t0.tv_sec * 1000 + t0.tv_nsec / 1000000; dispatch_semaphore_signal(progress.sem_finished);
float mili1 = t1.tv_sec * 1000 + t1.tv_nsec / 1000000; #else
sem_post(&progress.sem_finished);
if(total != 0) { #endif
printf("Read %ld messages in %0.2f sec: %0.2f msg/sec (%d MB/sec, %d KB/sec)\n",
(long)total, pthread_join(progress.thread, NULL);
(mili1-mili0)/1000.0,
1000*total/(mili1-mili0), if (total != 0) {
(int)(total_bytes / (1000*(mili1-mili0))), return (total == thread_count * msg_target ? EXIT_SUCCESS : EXIT_FAILURE);
(int)(total_bytes / (mili1-mili0)));
return EXIT_SUCCESS;
} else { } else {
printf("No message was read.\n"); printf("No message was read.\n");
return EXIT_FAILURE; return EXIT_FAILURE;

@ -3,150 +3,210 @@
<head> <head>
<title>WebSocket example</title> <title>WebSocket example</title>
<meta charset="utf-8" /> <meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1">
<link rel="stylesheet" href="https://unpkg.com/purecss@2.0.6/build/pure-min.css" integrity="sha384-Uu6IeWbM+gzNVXJcM9XV3SohHtmWE+3VGi496jvgX1jyvDTXfdK+rfZc8C1Aehk5" crossorigin="anonymous">
<style type="text/css"> <style type="text/css">
h1, h3 {
body {
width: 800px;
margin: auto;
}
header {
font-size: 36pt;
width: 100%;
text-align: center; text-align: center;
margin-bottom: 1em;
color: #541F14;
}
section.proto {
float: left;
/*background-color: #f8f8f8;*/
}
section#json {
width: 380px;
margin-right: 20px;
}
section#raw {
width: 380px;
padding-left: 16px;
border-left: 4px solid #626266;
} }
div.desc {
margin: 0px;
}
pre.sent, pre.received { pre.sent, pre.received {
margin-top: 0px; margin-top: 0px;
border-radius: 4px; border-radius: 4px;
padding: 5px; padding: 5px;
} }
pre.sent { pre.sent {
border: 1px solid #938172; border: 1px solid #4b63cc;
background-color: white; background-color: white;
} }
pre.received { pre.received {
border: 1px solid #CC9E61; border: 1px solid #4db96d;
text-align: right; text-align: right;
} }
.ws-state {
font-weight: bold;
line-height: 18px;
vertical-align: middle;
}
div.log {
font-size: 13pt;
}
</style> </style>
</head> </head>
<body> <body>
<header>Webdis with HTML5 WebSockets</header> <div class="pure-g">
<h1 class="pure-u-1">Webdis with HTML5 WebSockets</h1>
</div>
<section class="proto" id="json"> <div class="pure-g">
<h3>JSON</h3> <div class="pure-u-1-8"></div>
<div class="log" id="json-log"> <div class="pure-u-1-3" id="json-container"></div>
Connecting...
</div>
</section>
<section class="proto" id="raw">
<h3>Raw</h3>
<div class="log" id="raw-log">
Connecting...
</div>
</section>
<!-- spacer -->
<div class="pure-u-1-12"></div>
<script type="text/javascript"> <div class="pure-u-1-3" id="raw-container"></div>
<div class="pure-u-1-8"></div>
</div>
$ = function(id) {return document.getElementById(id);}; <script type="text/javascript">
var host = "127.0.0.1"; </script>
var port = 7379;
function log(id, dir, msg) {
var desc = document.createElement("div"); <script type="text/javascript">
desc.setAttribute("class", "desc");
desc.innerHTML = dir;
$(id).appendChild(desc);
var e = document.createElement("pre"); $ = function(id) {return document.getElementById(id);};
e.setAttribute("class", dir); const host = "127.0.0.1";
e.innerHTML = msg; const port = 7379;
$(id).appendChild(e);
function installBlock(title, type) {
const contents = `
<h3>$TITLE</h3>
<form class="pure-form">
<fieldset>
<div class="pure-g">
<div class="pure-u-2-3"><label class="ws-state pure-u-23-24" id="$type-state">State: Disconnected</label></div>
<div class="pure-u-1-3"><button type="submit" class="pure-u-23-24 pure-button pure-button-primary" id="$type-btn-connect">Connect</button></div>
</div>
</fieldset>
</form>
<form class="pure-form">
<fieldset>
<div class="pure-g">
<div class="pure-u-2-3">&nbsp;</div>
<div class="pure-u-1-3"><button disabled type="submit" class="pure-u-23-24 pure-button pure-button-primary" id="$type-btn-ping">Ping</button></div>
</div>
</fieldset>
</form>
<form class="pure-form">
<fieldset>
<div class="pure-g">
<div class="pure-u-1-3"><input disabled class="pure-u-23-24" type="text" placeholder="key" id="$type-set-key" value="hello" /></div>
<div class="pure-u-1-3"><input disabled class="pure-u-23-24" type="text" placeholder="value" id="$type-set-value" value="world" /></div>
<div class="pure-u-1-3"><button disabled type="submit" class="pure-u-23-24 pure-button pure-button-primary" id="$type-btn-set">SET</button></div>
</div>
</fieldset>
</form>
<form class="pure-form">
<fieldset>
<div class="pure-g">
<div class="pure-u-1-3">&nbsp;</div>
<div class="pure-u-1-3"><input disabled class="pure-u-23-24" type="text" placeholder="key" id="$type-get-key" value="hello" /></div>
<div class="pure-u-1-3"><button disabled type="submit" class="pure-u-23-24 pure-button pure-button-primary" id="$type-btn-get">GET</button></div>
</div>
</fieldset>
</form>
<div class="pure-g">
<div class="pure-u-2-3">&nbsp;</div>
<div class="pure-u-1-3"><button disabled type="submit" class="pure-u-23-24 pure-button pure-button-primary" id="$type-btn-clear">Clear logs</button></div>
<div class="log pure-u-1-1" id="$type-log">
</div>
</div>
`;
$(`${type}-container`).innerHTML = contents.replace(/\$TITLE/g, title).replace(/\$type/g, type);
} }
function testJSON() {
if(typeof(WebSocket) == 'function')
f = WebSocket;
if(typeof(MozWebSocket) == 'function')
f = MozWebSocket;
var jsonSocket = new f("ws://"+host+":"+port+"/.json"); class Client {
var self = this; constructor(type, pingSerializer, getSerializer, setSerializer) {
this.type = type;
send = function(j) { this.pingSerializer = pingSerializer;
var json = JSON.stringify(j); this.getSerializer = getSerializer;
jsonSocket.send(json); this.setSerializer = setSerializer;
log("json-log", "sent", json); this.ws = null;
};
$(`${this.type}-btn-connect`).addEventListener('click', event => {
jsonSocket.onopen = function() { event.preventDefault();
$("json-log").innerHTML = ""; console.log('Connecting...');
self.send(["SET", "hello", "world"]); this.ws = new WebSocket(`ws://${ host }:${ port }/.${ this.type }`);
self.send(["GET", "hello"]); this.ws.onopen = event => {
}; console.log('Connected');
this.setConnectedState(true);
jsonSocket.onmessage = function(messageEvent) { };
log("json-log", "received", messageEvent.data);
}; // log received messages
this.ws.onmessage = messageEvent => {
this.log("received", messageEvent.data);
};
this.ws.onclose = event => {
$(`${this.type}-btn-connect`).disabled = false;
this.setConnectedState(false);
};
});
$(`${this.type}-btn-ping`).addEventListener('click', event => {
event.preventDefault();
const serialized = this.pingSerializer();
this.log("sent", serialized);
this.ws.send(serialized);
});
$(`${this.type}-btn-set`).addEventListener('click', event => {
event.preventDefault();
const serialized = this.setSerializer($(`${this.type}-set-key`).value, $(`${this.type}-set-value`).value);
this.log("sent", serialized);
this.ws.send(serialized);
});
$(`${this.type}-btn-get`).addEventListener('click', event => {
event.preventDefault();
const serialized = this.getSerializer($(`${this.type}-set-key`).value);
this.log("sent", serialized);
this.ws.send(serialized);
});
$(`${this.type}-btn-clear`).addEventListener('click', event => {
event.preventDefault();
$(`${this.type}-log`).innerText = "";
});
}
setConnectedState(connected) {
$(`${this.type}-btn-connect`).disabled = connected;
$(`${this.type}-btn-ping`).disabled = !connected;
$(`${this.type}-set-key`).disabled = !connected;
$(`${this.type}-set-value`).disabled = !connected;
$(`${this.type}-btn-set`).disabled = !connected;
$(`${this.type}-get-key`).disabled = !connected;
$(`${this.type}-btn-get`).disabled = !connected;
$(`${this.type}-btn-clear`).disabled = !connected;
$(`${this.type}-state`).innerText = `State: ${connected ? 'Connected' : 'Disconnected'}`;
}
log(dir, msg) {
const id = `${this.type}-log`;
const description = document.createElement("div");
description.innerHTML = dir;
$(id).appendChild(description);
const contents = document.createElement("pre");
contents.setAttribute("class", dir);
contents.innerHTML = msg;
$(id).appendChild(contents);
}
} }
function testRAW() { addEventListener("DOMContentLoaded", () => {
installBlock('JSON', 'json');
if(typeof(WebSocket) == 'function') installBlock('Raw', 'raw');
f = WebSocket;
if(typeof(MozWebSocket) == 'function')
f = MozWebSocket;
var rawSocket = new f("ws://"+host+":"+port+"/.raw"); const jsonClient = new Client('json',
var self = this; () => JSON.stringify(['PING']),
(key) => JSON.stringify(['GET', key]),
sendRaw = function(raw) { (key, value) => JSON.stringify(['SET', key, value]));
rawSocket.send(raw);
log("raw-log", "sent", raw);
};
rawSocket.onopen = function() {
$("raw-log").innerHTML = "";
self.sendRaw("*3\r\n$3\r\nSET\r\n$5\r\nhello\r\n$5\r\nworld\r\n");
self.sendRaw("*2\r\n$3\r\nGET\r\n$5\r\nhello\r\n");
};
rawSocket.onmessage = function(messageEvent) {
log("raw-log", "received", messageEvent.data);
};
}
addEventListener("DOMContentLoaded", function(){ const rawClient = new Client('raw',
testJSON(); () => '*1\r\n$4\r\nPING\r\n',
testRAW(); (key) => `*2\r\n$3\r\nGET\r\n$${key.length}\r\n${key}\r\n`,
}, null); (key, value) => `*3\r\n$3\r\nSET\r\n$${key.length}\r\n${key}\r\n$${value.length}\r\n${value}\r\n`);
});
</script> </script>
</body> </body>

@ -26,6 +26,6 @@
} }
], ],
"verbosity": 6, "verbosity": 4,
"logfile": "webdis.log" "logfile": "webdis.log"
} }

Loading…
Cancel
Save