First attempt at fixing the websocket test

master
Jessie Murray 3 years ago
parent 80139110c5
commit c46b85ab7a
No known key found for this signature in database
GPG Key ID: E7E4D57EDDA744C5

@ -1,6 +1,6 @@
OUT=websocket pubsub OUT=websocket pubsub
CFLAGS=-O3 -Wall -Wextra CFLAGS=-O3 -Wall -Wextra
LDFLAGS=-levent -lpthread -lrt LDFLAGS=-levent -lpthread
all: $(OUT) Makefile all: $(OUT) Makefile

@ -1,4 +1,4 @@
/* http://tools.ietf.org/html/draft-hixie-thewebsocketprotocol-76 */ /* https://datatracker.ietf.org/doc/html/rfc6455 */
#include <stdlib.h> #include <stdlib.h>
#define _GNU_SOURCE #define _GNU_SOURCE
@ -15,312 +15,391 @@
#include <event.h> #include <event.h>
struct host_info { struct host_info {
char *host; char *host;
short port; short port;
};
enum worker_state {
WS_INITIAL,
WS_SENT_HANDSHAKE,
WS_RECEIVED_RESPONSE,
WS_COMPLETE
}; };
/* worker_thread, with counter of remaining messages */ /* worker_thread, with counter of remaining messages */
struct worker_thread { struct worker_thread {
struct host_info *hi; struct host_info *hi;
struct event_base *base; struct event_base *base;
int msg_target; int msg_target;
int msg_received; int msg_received;
int msg_sent; int msg_sent;
int byte_count; int byte_count;
pthread_t thread; pthread_t thread;
enum worker_state state;
struct evbuffer *buffer; struct evbuffer *rbuffer;
int got_header; int got_header;
int verbose; struct evbuffer *wbuffer;
struct event ev_w;
int verbose;
int fd;
struct event ev_r;
struct event ev_w;
}; };
static void
wait_for_possible_read(struct worker_thread *wt);
static void
wait_for_possible_write(struct worker_thread *wt);
void void
process_message(struct worker_thread *wt, size_t sz) { process_message(struct worker_thread *wt, size_t sz) {
// printf("process_message\n"); // printf("process_message\n");
if(wt->msg_received % 10000 == 0) { if(wt->msg_received % 10000 == 0) {
printf("thread %u: %8d messages left (got %9d bytes so far).\n", printf("thread %u: %8d messages left (got %9d bytes so far).\n",
(unsigned int)wt->thread, (unsigned int)wt->thread,
wt->msg_target - wt->msg_received, wt->byte_count); wt->msg_target - wt->msg_received, wt->byte_count);
} }
wt->byte_count += sz; wt->byte_count += sz;
/* decrement read count, and stop receiving when we reach zero. */ /* decrement read count, and stop receiving when we reach zero. */
wt->msg_received++; wt->msg_received++;
if(wt->msg_received == wt->msg_target) { if(wt->msg_received == wt->msg_target) {
event_base_loopexit(wt->base, NULL); event_base_loopexit(wt->base, NULL);
} }
} }
/**
* Called when we can write to the socket.
*/
void void
websocket_write(int fd, short event, void *ptr) { websocket_can_write(int fd, short event, void *ptr) {
int ret; int ret;
struct worker_thread *wt = ptr; struct worker_thread *wt = ptr;
printf("%s (wt=%p, fd=%d)\n", __func__, wt, fd);
if(event != EV_WRITE) {
return; if(event != EV_WRITE) {
} return;
}
char message[] = "\x00[\"SET\",\"key\",\"value\"]\xff\x00[\"GET\",\"key\"]\xff"; switch (wt->state)
ret = write(fd, message, sizeof(message)-1); {
if(ret != sizeof(message)-1) { case WS_INITIAL: /* still sending initial HTTP request */
fprintf(stderr, "write on %d failed: %s\n", fd, strerror(errno)); ret = evbuffer_write(wt->wbuffer, fd);
close(fd); printf("evbuffer_write returned %d\n", ret);
} printf("evbuffer_get_length returned %d\n", evbuffer_get_length(wt->wbuffer));
if (evbuffer_get_length(wt->wbuffer) != 0) { /* not all written */
wt->msg_sent += 2; wait_for_possible_write(wt);
if(wt->msg_sent < wt->msg_target) { return;
event_set(&wt->ev_w, fd, EV_WRITE, websocket_write, wt); }
event_base_set(wt->base, &wt->ev_w); /* otherwise, we've sent the full request, time to read the response */
ret = event_add(&wt->ev_w, NULL); wt->state = WS_SENT_HANDSHAKE;
} wait_for_possible_read(wt);
return;
default:
break;
}
#if 0
char message[] = "\x00[\"SET\",\"key\",\"value\"]\xff\x00[\"GET\",\"key\"]\xff";
ret = write(fd, message, sizeof(message)-1);
if(ret != sizeof(message)-1) {
fprintf(stderr, "write on %d failed: %s\n", fd, strerror(errno));
close(fd);
}
wt->msg_sent += 2;
if(wt->msg_sent < wt->msg_target) {
event_set(&wt->ev_w, fd, EV_WRITE, websocket_can_write, wt);
event_base_set(wt->base, &wt->ev_w);
ret = event_add(&wt->ev_w, NULL);
}
#endif
}
static void
websocket_can_read(int fd, short event, void *ptr) {
char packet[2048], *pos;
int ret, success = 1;
struct worker_thread *wt = ptr;
printf("%s (wt=%p)\n", __func__, wt);
if(event != EV_READ) {
return;
}
/* read message */
ret = evbuffer_read(wt->rbuffer, fd, 65536);
printf("evbuffer_read() returned %d\n", ret);
#if 0
pos = packet;
if(ret > 0) {
char *data, *last;
int sz, msg_sz;
if(wt->got_header == 0) { /* first response */
char *frame_start = strstr(packet, "MH"); /* end of the handshake */
if(frame_start == NULL) {
return; /* not yet */
} else { /* start monitoring possible writes */
printf("start monitoring possible writes\n");
evbuffer_add(wt->rbuffer, frame_start + 2, ret - (frame_start + 2 - packet));
wt->got_header = 1;
event_set(&wt->ev_w, fd, EV_WRITE,
websocket_can_write, wt);
event_base_set(wt->base, &wt->ev_w);
ret = event_add(&wt->ev_w, NULL);
}
} else {
/* we've had the header already, now bufffer data. */
evbuffer_add(wt->rbuffer, packet, ret);
}
while(1) {
data = (char*)EVBUFFER_DATA(wt->rbuffer);
sz = EVBUFFER_LENGTH(wt->rbuffer);
if(sz == 0) { /* no data */
break;
}
if(*data != 0) { /* missing frame start */
success = 0;
break;
}
last = memchr(data, 0xff, sz); /* look for frame end */
if(!last) {
/* no end of frame in sight. */
break;
}
msg_sz = last - data - 1;
process_message(ptr, msg_sz); /* record packet */
/* drain including frame delimiters (+2 bytes) */
evbuffer_drain(wt->rbuffer, msg_sz + 2);
}
} else {
printf("ret=%d\n", ret);
success = 0;
}
if(success == 0) {
shutdown(fd, SHUT_RDWR);
close(fd);
event_base_loopexit(wt->base, NULL);
}
#endif
}
static void
wait_for_possible_read(struct worker_thread *wt) {
printf("%s (wt=%p)\n", __func__, wt);
event_set(&wt->ev_r, wt->fd, EV_READ, websocket_can_read, wt);
event_base_set(wt->base, &wt->ev_r);
event_add(&wt->ev_r, NULL);
} }
static void static void
websocket_read(int fd, short event, void *ptr) { wait_for_possible_write(struct worker_thread *wt) {
char packet[2048], *pos; printf("%s (wt=%p)\n", __func__, wt);
int ret, success = 1; event_set(&wt->ev_r, wt->fd, EV_WRITE, websocket_can_write, wt);
event_base_set(wt->base, &wt->ev_r);
struct worker_thread *wt = ptr; event_add(&wt->ev_r, NULL);
if(event != EV_READ) {
return;
}
/* read message */
ret = read(fd, packet, sizeof(packet));
pos = packet;
if(ret > 0) {
char *data, *last;
int sz, msg_sz;
if(wt->got_header == 0) { /* first response */
char *frame_start = strstr(packet, "MH"); /* end of the handshake */
if(frame_start == NULL) {
return; /* not yet */
} else { /* start monitoring possible writes */
printf("start monitoring possible writes\n");
evbuffer_add(wt->buffer, frame_start + 2, ret - (frame_start + 2 - packet));
wt->got_header = 1;
event_set(&wt->ev_w, fd, EV_WRITE,
websocket_write, wt);
event_base_set(wt->base, &wt->ev_w);
ret = event_add(&wt->ev_w, NULL);
}
} else {
/* we've had the header already, now bufffer data. */
evbuffer_add(wt->buffer, packet, ret);
}
while(1) {
data = (char*)EVBUFFER_DATA(wt->buffer);
sz = EVBUFFER_LENGTH(wt->buffer);
if(sz == 0) { /* no data */
break;
}
if(*data != 0) { /* missing frame start */
success = 0;
break;
}
last = memchr(data, 0xff, sz); /* look for frame end */
if(!last) {
/* no end of frame in sight. */
break;
}
msg_sz = last - data - 1;
process_message(ptr, msg_sz); /* record packet */
/* drain including frame delimiters (+2 bytes) */
evbuffer_drain(wt->buffer, msg_sz + 2);
}
} else {
printf("ret=%d\n", ret);
success = 0;
}
if(success == 0) {
shutdown(fd, SHUT_RDWR);
close(fd);
event_base_loopexit(wt->base, NULL);
}
} }
void* void*
worker_main(void *ptr) { worker_main(void *ptr) {
char ws_template[] = "GET /.json HTTP/1.1\r\n" char ws_template[] = "GET /.json HTTP/1.1\r\n"
"Host: %s:%d\r\n" "Host: %s:%d\r\n"
"Connection: Upgrade\r\n" "Connection: Upgrade\r\n"
"Upgrade: WebSocket\r\n" "Upgrade: WebSocket\r\n"
"Origin: http://%s:%d\r\n" "Origin: http://%s:%d\r\n"
"Sec-WebSocket-Key1: 18x 6]8vM;54 *(5: { U1]8 z [ 8\r\n" "Sec-WebSocket-Key: webdis-websocket-test-key\r\n"
"Sec-WebSocket-Key2: 1_ tx7X d < nw 334J702) 7]o}` 0\r\n" "\r\n"
"\r\n" ;
"Tm[K T2u";
struct worker_thread *wt = ptr;
struct worker_thread *wt = ptr;
int ret;
int ret; int fd;
int fd; struct sockaddr_in addr;
struct sockaddr_in addr; char *ws_handshake;
char *ws_handshake; size_t ws_handshake_sz;
size_t ws_handshake_sz;
/* connect socket */
/* connect socket */ fd = socket(AF_INET, SOCK_STREAM, 0);
fd = socket(AF_INET, SOCK_STREAM, 0); addr.sin_family = AF_INET;
addr.sin_family = AF_INET; addr.sin_port = htons(wt->hi->port);
addr.sin_port = htons(wt->hi->port); memset(&(addr.sin_addr), 0, sizeof(addr.sin_addr));
memset(&(addr.sin_addr), 0, sizeof(addr.sin_addr)); addr.sin_addr.s_addr = inet_addr(wt->hi->host);
addr.sin_addr.s_addr = inet_addr(wt->hi->host);
ret = connect(fd, (struct sockaddr*)&addr, sizeof(struct sockaddr));
ret = connect(fd, (struct sockaddr*)&addr, sizeof(struct sockaddr)); if(ret != 0) {
if(ret != 0) { fprintf(stderr, "connect: ret=%d: %s\n", ret, strerror(errno));
fprintf(stderr, "connect: ret=%d: %s\n", ret, strerror(errno)); return NULL;
return NULL; }
}
/* initialize worker thread */
/* initialize worker thread */ wt->fd = fd;
wt->base = event_base_new(); wt->base = event_base_new();
wt->buffer = evbuffer_new(); wt->rbuffer = evbuffer_new();
wt->byte_count = 0; wt->wbuffer = evbuffer_new(); /* write buffer */
wt->got_header = 0; wt->byte_count = 0;
wt->got_header = 0;
/* send handshake */
ws_handshake_sz = sizeof(ws_handshake) /* build handshake buffer */
+ 2*strlen(wt->hi->host) + 500; /*
ws_handshake = calloc(ws_handshake_sz, 1); ws_handshake_sz = sizeof(ws_handshake)
ws_handshake_sz = (size_t)sprintf(ws_handshake, ws_template, + 2*strlen(wt->hi->host) + 500;
wt->hi->host, wt->hi->port, ws_handshake = calloc(ws_handshake_sz, 1);
wt->hi->host, wt->hi->port); ws_handshake_sz = (size_t)sprintf(ws_handshake, ws_template,
ret = write(fd, ws_handshake, ws_handshake_sz); wt->hi->host, wt->hi->port,
wt->hi->host, wt->hi->port);
struct event ev_r; */
event_set(&ev_r, fd, EV_READ | EV_PERSIST, websocket_read, wt); int added = evbuffer_add_printf(wt->wbuffer, ws_template, wt->hi->host, wt->hi->port,
event_base_set(wt->base, &ev_r); wt->hi->host, wt->hi->port);
event_add(&ev_r, NULL); wait_for_possible_write(wt); /* request callback */
/* go! */ /* go! */
event_base_dispatch(wt->base); event_base_dispatch(wt->base);
event_base_free(wt->base); printf("event_base_dispatch returned\n");
free(ws_handshake); event_base_free(wt->base);
return NULL; free(ws_handshake);
return NULL;
} }
void void
usage(const char* argv0, char *host_default, short port_default, usage(const char* argv0, char *host_default, short port_default,
int thread_count_default, int messages_default) { int thread_count_default, int messages_default) {
printf("Usage: %s [options]\n" printf("Usage: %s [options]\n"
"Options are:\n" "Options are:\n"
"\t-h host\t\t(default = \"%s\")\n" "\t-h host\t\t(default = \"%s\")\n"
"\t-p port\t\t(default = %d)\n" "\t-p port\t\t(default = %d)\n"
"\t-c threads\t(default = %d)\n" "\t-c threads\t(default = %d)\n"
"\t-n count\t(number of messages per thread, default = %d)\n" "\t-n count\t(number of messages per thread, default = %d)\n"
"\t-v\t\t(verbose)\n", "\t-v\t\t(verbose)\n",
argv0, host_default, (int)port_default, argv0, host_default, (int)port_default,
thread_count_default, messages_default); thread_count_default, messages_default);
} }
int int
main(int argc, char *argv[]) { main(int argc, char *argv[]) {
struct timespec t0, t1; struct timespec t0, t1;
int messages_default = 100000; int messages_default = 100000;
int thread_count_default = 4; int thread_count_default = 4;
short port_default = 7379; short port_default = 7379;
char *host_default = "127.0.0.1"; char *host_default = "127.0.0.1";
int msg_target = messages_default; int msg_target = messages_default;
int thread_count = thread_count_default; int thread_count = thread_count_default;
int i, opt; int i, opt;
char *colon; char *colon;
double total = 0, total_bytes = 0; double total = 0, total_bytes = 0;
int verbose = 0; int verbose = 0, single = 0;
struct host_info hi = {host_default, port_default}; struct host_info hi = {host_default, port_default};
struct worker_thread *workers; struct worker_thread *workers;
/* getopt */ /* getopt */
while ((opt = getopt(argc, argv, "h:p:c:n:v")) != -1) { while ((opt = getopt(argc, argv, "h:p:c:n:vs")) != -1) {
switch (opt) { switch (opt) {
case 'h': case 'h':
colon = strchr(optarg, ':'); colon = strchr(optarg, ':');
if(!colon) { if(!colon) {
size_t sz = strlen(optarg); size_t sz = strlen(optarg);
hi.host = calloc(1 + sz, 1); hi.host = calloc(1 + sz, 1);
strncpy(hi.host, optarg, sz); strncpy(hi.host, optarg, sz);
} else { } else {
hi.host = calloc(1+colon-optarg, 1); hi.host = calloc(1+colon-optarg, 1);
strncpy(hi.host, optarg, colon-optarg); strncpy(hi.host, optarg, colon-optarg);
hi.port = (short)atol(colon+1); hi.port = (short)atol(colon+1);
} }
break; break;
case 'p': case 'p':
hi.port = (short)atol(optarg); hi.port = (short)atol(optarg);
break; break;
case 'c': case 'c':
thread_count = atoi(optarg); thread_count = atoi(optarg);
break; break;
case 'n': case 'n':
msg_target = atoi(optarg); msg_target = atoi(optarg);
break; break;
case 'v': case 'v':
verbose = 1; verbose = 1;
break; break;
default: /* '?' */
usage(argv[0], host_default, port_default, case 's':
thread_count_default, single = 1;
messages_default); thread_count = 1;
exit(EXIT_FAILURE); break;
} default: /* '?' */
} usage(argv[0], host_default, port_default,
thread_count_default,
/* run threads */ messages_default);
workers = calloc(sizeof(struct worker_thread), thread_count); exit(EXIT_FAILURE);
}
clock_gettime(CLOCK_MONOTONIC, &t0); }
for(i = 0; i < thread_count; ++i) {
workers[i].msg_target = msg_target; /* run threads */
workers[i].hi = &hi; workers = calloc(sizeof(struct worker_thread), thread_count);
workers[i].verbose = verbose;
clock_gettime(CLOCK_MONOTONIC, &t0);
pthread_create(&workers[i].thread, NULL, if (single) {
worker_main, &workers[i]); printf("Single-threaded mode\n");
} workers[0].msg_target = msg_target;
workers[0].hi = &hi;
/* wait for threads to finish */ workers[0].verbose = verbose;
for(i = 0; i < thread_count; ++i) { workers[0].state = WS_INITIAL;
pthread_join(workers[i].thread, NULL); worker_main(&workers[0]);
total += workers[i].msg_received; } else {
total_bytes += workers[i].byte_count; for (i = 0; i < thread_count; ++i) {
} workers[i].msg_target = msg_target;
workers[i].hi = &hi;
/* timing */ workers[i].verbose = verbose;
clock_gettime(CLOCK_MONOTONIC, &t1); workers[i].state = WS_INITIAL;
float mili0 = t0.tv_sec * 1000 + t0.tv_nsec / 1000000;
float mili1 = t1.tv_sec * 1000 + t1.tv_nsec / 1000000; pthread_create(&workers[i].thread, NULL,
worker_main, &workers[i]);
if(total != 0) { }
printf("Read %ld messages in %0.2f sec: %0.2f msg/sec (%d MB/sec, %d KB/sec)\n",
(long)total, /* wait for threads to finish */
(mili1-mili0)/1000.0, for (i = 0; i < thread_count; ++i) {
1000*total/(mili1-mili0), pthread_join(workers[i].thread, NULL);
(int)(total_bytes / (1000*(mili1-mili0))), total += workers[i].msg_received;
(int)(total_bytes / (mili1-mili0))); total_bytes += workers[i].byte_count;
return EXIT_SUCCESS; }
} else { }
printf("No message was read.\n");
return EXIT_FAILURE; /* timing */
} clock_gettime(CLOCK_MONOTONIC, &t1);
float mili0 = t0.tv_sec * 1000 + t0.tv_nsec / 1000000;
float mili1 = t1.tv_sec * 1000 + t1.tv_nsec / 1000000;
if(total != 0) {
printf("Read %ld messages in %0.2f sec: %0.2f msg/sec (%d MB/sec, %d KB/sec)\n",
(long)total,
(mili1-mili0)/1000.0,
1000*total/(mili1-mili0),
(int)(total_bytes / (1000*(mili1-mili0))),
(int)(total_bytes / (mili1-mili0)));
return EXIT_SUCCESS;
} else {
printf("No message was read.\n");
return EXIT_FAILURE;
}
} }

Loading…
Cancel
Save