websocket test: formatting/indentation

master
Jessie Murray 3 years ago
parent e97056f4cf
commit 8141c00ab7
No known key found for this signature in database
GPG Key ID: E7E4D57EDDA744C5

@ -17,95 +17,98 @@
#include <event.h> #include <event.h>
#include <http_parser.h> #include <http_parser.h>
#define DEBUG_LOGS 0 struct host_info{
char *host;
struct host_info { short port;
char *host;
short port;
}; };
enum worker_state { enum worker_state {
WS_INITIAL, WS_INITIAL,
WS_SENT_HANDSHAKE, WS_SENT_HANDSHAKE,
WS_RECEIVED_HANDSHAKE, WS_RECEIVED_HANDSHAKE,
WS_SENT_FRAME, WS_SENT_FRAME,
WS_COMPLETE WS_COMPLETE
}; };
/* worker_thread, with counter of remaining messages */ /* worker_thread, with counter of remaining messages */
struct worker_thread { struct worker_thread {
struct host_info *hi; struct host_info *hi;
struct event_base *base; struct event_base *base;
int msg_target; int msg_target;
int msg_received; int msg_received;
int msg_sent; int msg_sent;
int byte_count; int byte_count;
int id; int id;
pthread_t thread; pthread_t thread;
enum worker_state state; enum worker_state state;
int timeout_seconds; int timeout_seconds;
struct evbuffer *rbuffer; struct evbuffer *rbuffer;
int got_header; int got_header;
struct evbuffer *wbuffer; struct evbuffer *wbuffer;
int verbose; int verbose;
int fd; int fd;
struct event ev_r; struct event ev_r;
struct event ev_w; struct event ev_w;
http_parser parser; http_parser parser;
http_parser_settings settings; http_parser_settings settings;
int (*debug)(const char *fmt, ...); int (*debug)(const char *fmt, ...);
}; };
int debug_noop(const char *fmt, ...) { int debug_noop(const char *fmt, ...) {
(void)fmt; (void)fmt;
return 0; return 0;
} }
int debug_verbose(const char *fmt, ...) { int debug_verbose(const char *fmt, ...) {
int ret; int ret;
va_list vargs; va_list vargs;
va_start(vargs, fmt); va_start(vargs, fmt);
ret = vfprintf(stderr, fmt, vargs); ret = vfprintf(stderr, fmt, vargs);
va_end(vargs); va_end(vargs);
return ret; return ret;
} }
void void
hex_dump(struct worker_thread *wt, char *p, size_t sz) { hex_dump(struct worker_thread *wt, char *p, size_t sz) {
wt->debug("hex dump of %p (%ld bytes)\n", p, sz); wt->debug("hex dump of %p (%ld bytes)\n", p, sz);
for (char *cur = p; cur < p + sz; cur += 16) { for (char *cur = p; cur < p + sz; cur += 16) {
char letters[16] = {0}; char letters[16] = {0};
int limit = (cur + 16) > p + sz ? (sz % 16) : 16; int limit = (cur + 16) > p + sz ? (sz % 16) : 16;
wt->debug("%08lx ", cur - p); /* address */ wt->debug("%08lx ", cur - p); /* address */
for (int i = 0; i < limit; i++) { for (int i = 0; i < limit; i++) {
wt->debug("%02x ", (unsigned int)(cur[i] & 0xff)); wt->debug("%02x ", (unsigned int)(cur[i] & 0xff));
letters[i] = isprint(cur[i]) ? cur[i] : '.'; letters[i] = isprint(cur[i]) ? cur[i] : '.';
} }
for (int i = limit; i < 16; i++) { /* pad on last line */ for (int i = limit; i < 16; i++) { /* pad on last line */
wt->debug(" "); wt->debug(" "); /* 3 spaces for "%02x " */
} }
wt->debug(" %.*s\n", limit, letters); wt->debug(" %.*s\n", limit, letters);
} }
} }
void void
evbuffer_debug_dump(struct worker_thread *wt, struct evbuffer *buffer) { evbuffer_debug_dump(struct worker_thread *wt, struct evbuffer *buffer) {
size_t sz = evbuffer_get_length(buffer); size_t sz = evbuffer_get_length(buffer);
char *data = malloc(sz); char *data = malloc(sz);
evbuffer_remove(buffer, data, sz); if (!data) {
hex_dump(wt, data, sz); fprintf(stderr, "failed to allocate %ld bytes\n", sz);
evbuffer_prepend(buffer, data, sz); return;
free(data); }
evbuffer_remove(buffer, data, sz);
hex_dump(wt, data, sz);
evbuffer_prepend(buffer, data, sz);
free(data);
} }
static void static void
wait_for_possible_read(struct worker_thread *wt); wait_for_possible_read(struct worker_thread *wt);
static void static void
wait_for_possible_write(struct worker_thread *wt); wait_for_possible_write(struct worker_thread *wt);
@ -114,22 +117,20 @@ ws_enqueue_frame(struct worker_thread *wt);
void void
process_message(struct worker_thread *wt, size_t sz) { process_message(struct worker_thread *wt, size_t sz) {
if (wt->msg_received && wt->msg_received % 1000 == 0) {
// printf("process_message\n"); printf("thread %d: %8d messages left (got %9d bytes so far).\n",
if(wt->msg_received && wt->msg_received % 1000 == 0) { wt->id,
printf("thread %d: %8d messages left (got %9d bytes so far).\n", wt->msg_target - wt->msg_received, wt->byte_count);
wt->id, }
wt->msg_target - wt->msg_received, wt->byte_count); wt->byte_count += sz;
}
wt->byte_count += sz; /* decrement read count, and stop receiving when we reach zero. */
wt->msg_received++;
/* decrement read count, and stop receiving when we reach zero. */ if (wt->msg_received == wt->msg_target) {
wt->msg_received++; wt->debug("%s: thread %d has received all %d messages it expected\n",
if(wt->msg_received == wt->msg_target) { __func__, wt->id, wt->msg_received);
wt->debug("%s: thread %d has received all %d messages it expected\n", event_base_loopexit(wt->base, NULL);
__func__, wt->id, wt->msg_received); }
event_base_loopexit(wt->base, NULL);
}
} }
/** /**
@ -137,408 +138,396 @@ process_message(struct worker_thread *wt, size_t sz) {
*/ */
void void
websocket_can_write(int fd, short event, void *ptr) { websocket_can_write(int fd, short event, void *ptr) {
int ret; int ret;
struct worker_thread *wt = ptr; struct worker_thread *wt = ptr;
wt->debug("%s (wt=%p, fd=%d)\n", __func__, wt, fd); wt->debug("%s (wt=%p, fd=%d)\n", __func__, wt, fd);
if(event != EV_WRITE) { switch (wt->state) {
return; case WS_INITIAL: { /* still sending initial HTTP request */
} ret = evbuffer_write(wt->wbuffer, fd);
switch (wt->state) wt->debug("evbuffer_write returned %d\n", ret);
{ wt->debug("evbuffer_get_length returned %d\n", evbuffer_get_length(wt->wbuffer));
case WS_INITIAL: { /* still sending initial HTTP request */ if (evbuffer_get_length(wt->wbuffer) != 0) { /* not all written */
ret = evbuffer_write(wt->wbuffer, fd); wait_for_possible_write(wt);
wt->debug("evbuffer_write returned %d\n", ret); return;
wt->debug("evbuffer_get_length returned %d\n", evbuffer_get_length(wt->wbuffer)); }
if (evbuffer_get_length(wt->wbuffer) != 0) { /* not all written */ /* otherwise, we've sent the full request, time to read the response */
wait_for_possible_write(wt); wt->state = WS_SENT_HANDSHAKE;
return; wt->debug("state=WS_SENT_HANDSHAKE\n");
} wait_for_possible_read(wt);
/* otherwise, we've sent the full request, time to read the response */ return;
wt->state = WS_SENT_HANDSHAKE; }
wt->debug("state=WS_SENT_HANDSHAKE\n"); case WS_RECEIVED_HANDSHAKE: { /* ready to send a frame */
wait_for_possible_read(wt); wt->debug("About to send data for WS frame, %lu in buffer\n", evbuffer_get_length(wt->wbuffer));
return; evbuffer_write(wt->wbuffer, fd);
} size_t write_remains = evbuffer_get_length(wt->wbuffer);
case WS_RECEIVED_HANDSHAKE: { /* ready to send a frame */ wt->debug("Sent data for WS frame, still %lu left to write\n", write_remains);
wt->debug("About to send data for WS frame, %lu in buffer\n", evbuffer_get_length(wt->wbuffer)); if (write_remains == 0) { /* ready to read response */
evbuffer_write(wt->wbuffer, fd); wt->state = WS_SENT_FRAME;
size_t write_remains = evbuffer_get_length(wt->wbuffer); wt->msg_sent++;
wt->debug("Sent data for WS frame, still %lu left to write\n", write_remains); wait_for_possible_read(wt);
if (write_remains == 0) { /* ready to read response */ } else { /* not finished writing */
wt->state = WS_SENT_FRAME; wait_for_possible_write(wt);
wt->msg_sent++; }
wait_for_possible_read(wt); return;
} else { /* not finished writing */ }
wait_for_possible_write(wt); default:
} break;
return; }
}
default:
break;
}
} }
static void static void
websocket_can_read(int fd, short event, void *ptr) { websocket_can_read(int fd, short event, void *ptr) {
int ret; int ret;
struct worker_thread *wt = ptr; struct worker_thread *wt = ptr;
wt->debug("%s (wt=%p)\n", __func__, wt); wt->debug("%s (wt=%p)\n", __func__, wt);
if(event != EV_READ) { /* read message */
return; ret = evbuffer_read(wt->rbuffer, fd, 65536);
} wt->debug("evbuffer_read() returned %d; wt->state=%d. wt->rbuffer:\n", ret, wt->state);
evbuffer_debug_dump(wt, wt->rbuffer);
/* read message */ if (ret == 0) {
ret = evbuffer_read(wt->rbuffer, fd, 65536); wt->debug("We didn't read anything from the socket...\n");
wt->debug("evbuffer_read() returned %d; wt->state=%d. wt->rbuffer:\n", ret, wt->state); event_base_loopexit(wt->base, NULL);
evbuffer_debug_dump(wt, wt->rbuffer); return;
if (ret == 0) { }
wt->debug("We didn't read anything from the socket...\n");
return; while (1) {
} switch (wt->state) {
case WS_SENT_HANDSHAKE: { /* waiting for handshake response */
while(1) { size_t avail_sz = evbuffer_get_length(wt->rbuffer);
switch (wt->state) { char *tmp = calloc(avail_sz, 1);
case WS_SENT_HANDSHAKE: { /* waiting for handshake response */ wt->debug("avail_sz from rbuffer = %lu\n", avail_sz);
size_t avail_sz = evbuffer_get_length(wt->rbuffer); evbuffer_remove(wt->rbuffer, tmp, avail_sz); /* copy into `tmp` */
char *tmp = calloc(avail_sz, 1); wt->debug("Giving %lu bytes to http-parser\n", avail_sz);
wt->debug("avail_sz from rbuffer = %lu\n", avail_sz); int nparsed = http_parser_execute(&wt->parser, &wt->settings, tmp, avail_sz);
evbuffer_remove(wt->rbuffer, tmp, avail_sz); /* copy into `tmp` */ wt->debug("http-parser returned %d\n", nparsed);
wt->debug("Giving %lu bytes to http-parser\n", avail_sz); if (nparsed != (int)avail_sz) { // put back what we didn't read
int nparsed = http_parser_execute(&wt->parser, &wt->settings, tmp, avail_sz); wt->debug("re-attach (prepend) %lu byte%c\n", avail_sz - nparsed,
wt->debug("http-parser returned %d\n", nparsed); avail_sz - nparsed > 1 ? 's' : ' ');
if (nparsed != (int)avail_sz) { // put back what we didn't read evbuffer_prepend(wt->rbuffer, tmp + nparsed, avail_sz - nparsed);
wt->debug("re-attach (prepend) %lu byte%c\n", avail_sz - nparsed, }
avail_sz - nparsed > 1 ? 's' : ' '); free(tmp);
evbuffer_prepend(wt->rbuffer, tmp + nparsed, avail_sz - nparsed); if (wt->state == WS_SENT_HANDSHAKE && /* haven't encountered end of response yet */
} wt->parser.upgrade && nparsed != (int)avail_sz) {
free(tmp); wt->debug("UPGRADE *and* we have some data left\n");
if (wt->state == WS_SENT_HANDSHAKE && /* haven't encountered end of response yet */ continue;
wt->parser.upgrade && nparsed != (int)avail_sz) { } else if (wt->state == WS_RECEIVED_HANDSHAKE) { /* we have the full response */
wt->debug("UPGRADE *and* we have some data left\n"); evbuffer_drain(wt->rbuffer, evbuffer_get_length(wt->rbuffer));
continue; }
} else if (wt->state == WS_RECEIVED_HANDSHAKE) { /* we have the full response */ return;
evbuffer_drain(wt->rbuffer, evbuffer_get_length(wt->rbuffer)); }
}
return; case WS_SENT_FRAME: { /* waiting for frame response */
} wt->debug("We're in WS_SENT_FRAME, just read a frame response. wt->rbuffer:\n");
evbuffer_debug_dump(wt, wt->rbuffer);
case WS_SENT_FRAME: { /* waiting for frame response */ uint8_t flag_opcodes, payload_len;
wt->debug("We're in WS_SENT_FRAME, just read a frame response. wt->rbuffer:\n"); if (evbuffer_get_length(wt->rbuffer) < 2) { /* not enough data */
evbuffer_debug_dump(wt, wt->rbuffer); wait_for_possible_read(wt);
uint8_t flag_opcodes, payload_len; return;
if (evbuffer_get_length(wt->rbuffer) < 2) { /* not enough data */ }
wait_for_possible_read(wt); evbuffer_remove(wt->rbuffer, &flag_opcodes, 1); /* remove flags & opcode */
return; evbuffer_remove(wt->rbuffer, &payload_len, 1); /* remove length */
} evbuffer_drain(wt->rbuffer, (size_t)payload_len); /* remove payload itself */
evbuffer_remove(wt->rbuffer, &flag_opcodes, 1); /* remove flags & opcode */ process_message(wt, payload_len);
evbuffer_remove(wt->rbuffer, &payload_len, 1); /* remove length */
evbuffer_drain(wt->rbuffer, (size_t)payload_len); /* remove payload itself */ if (evbuffer_get_length(wt->rbuffer) == 0) { /* consumed everything */
process_message(wt, payload_len); if (wt->msg_received < wt->msg_target) { /* let's write again */
wt->debug("our turn to write again\n");
if (evbuffer_get_length(wt->rbuffer) == 0) { /* consumed everything */ wt->state = WS_RECEIVED_HANDSHAKE;
if (wt->msg_received < wt->msg_target) { /* let's write again */ ws_enqueue_frame(wt);
wt->debug("our turn to write again\n"); } /* otherwise, we're done */
wt->state = WS_RECEIVED_HANDSHAKE; return;
ws_enqueue_frame(wt); } else {
} /* otherwise, we're done */ wt->debug("there's still data to consume\n");
return; continue;
} else { }
wt->debug("there's still data to consume\n"); return;
continue; }
}
return; default:
} return;
}
default: }
return;
}
}
} }
static void static void
wait_for_possible_read(struct worker_thread *wt) { wait_for_possible_read(struct worker_thread *wt) {
wt->debug("%s (wt=%p)\n", __func__, wt); wt->debug("%s (wt=%p)\n", __func__, wt);
event_set(&wt->ev_r, wt->fd, EV_READ, websocket_can_read, wt); event_set(&wt->ev_r, wt->fd, EV_READ, websocket_can_read, wt);
event_base_set(wt->base, &wt->ev_r); event_base_set(wt->base, &wt->ev_r);
event_add(&wt->ev_r, NULL); event_add(&wt->ev_r, NULL);
} }
static void static void
wait_for_possible_write(struct worker_thread *wt) { wait_for_possible_write(struct worker_thread *wt) {
wt->debug("%s (wt=%p)\n", __func__, wt); wt->debug("%s (wt=%p)\n", __func__, wt);
event_set(&wt->ev_r, wt->fd, EV_WRITE, websocket_can_write, wt); event_set(&wt->ev_r, wt->fd, EV_WRITE, websocket_can_write, wt);
event_base_set(wt->base, &wt->ev_r); event_base_set(wt->base, &wt->ev_r);
event_add(&wt->ev_r, NULL); event_add(&wt->ev_r, NULL);
} }
static int static int
ws_on_headers_complete(http_parser *p) { ws_on_headers_complete(http_parser *p) {
struct worker_thread *wt = p->data; struct worker_thread *wt = p->data;
wt->debug("%s (wt=%p)\n", __func__, wt);
wt->debug("%s (wt=%p)\n", __func__, wt); return 0;
// TODO
return 0;
} }
static void static void
ws_enqueue_frame_for_command(struct worker_thread *wt, char *cmd, size_t sz) { ws_enqueue_frame_for_command(struct worker_thread *wt, char *cmd, size_t sz) {
unsigned char mask[4]; unsigned char mask[4];
for (int i = 0; i < 4; i++) { for (int i = 0; i < 4; i++) {
mask[i] = rand() & 0xff; mask[i] = rand() & 0xff;
} }
uint8_t len = (uint8_t)(sz); /* (1 << 7) | length. */ uint8_t len = (uint8_t)(sz); /* (1 << 7) | length. */
len |= (1 << 7); /* set masking bit ON */ len |= (1 << 7); /* set masking bit ON */
for (size_t i = 0; i < sz; i++) { for (size_t i = 0; i < sz; i++) {
cmd[i] = (cmd[i] ^ mask[i%4]) & 0xff; cmd[i] = (cmd[i] ^ mask[i % 4]) & 0xff;
} }
/* 0x81 = 10000001b: FIN bit (only one message in the frame), text frame */ /* 0x81 = 10000001b: FIN bit (only one message in the frame), text frame */
evbuffer_add(wt->wbuffer, "\x81", 1); evbuffer_add(wt->wbuffer, "\x81", 1);
evbuffer_add(wt->wbuffer, &len, 1); evbuffer_add(wt->wbuffer, &len, 1);
evbuffer_add(wt->wbuffer, mask, 4); evbuffer_add(wt->wbuffer, mask, 4);
evbuffer_add(wt->wbuffer, cmd, sz); evbuffer_add(wt->wbuffer, cmd, sz);
} }
static void static void
ws_enqueue_frame(struct worker_thread *wt) { ws_enqueue_frame(struct worker_thread *wt) {
char ping_command[] = "[\"PING\"]"; char ping_command[] = "[\"PING\"]";
ws_enqueue_frame_for_command(wt, ping_command, sizeof(ping_command) - 1); ws_enqueue_frame_for_command(wt, ping_command, sizeof(ping_command) - 1);
wait_for_possible_write(wt); wait_for_possible_write(wt);
} }
static int static int
ws_on_message_complete(http_parser *p) { ws_on_message_complete(http_parser *p) {
struct worker_thread *wt = p->data; struct worker_thread *wt = p->data;
wt->debug("%s (wt=%p)\n", __func__, wt); wt->debug("%s (wt=%p)\n", __func__, wt);
// we've received the full HTTP response now, so we're ready to send frames // we've received the full HTTP response now, so we're ready to send frames
wt->state = WS_RECEIVED_HANDSHAKE; wt->state = WS_RECEIVED_HANDSHAKE;
ws_enqueue_frame(wt); /* add frame to buffer and register interest in writing */ ws_enqueue_frame(wt); /* add frame to buffer and register interest in writing */
return 0; return 0;
} }
static void static void
ws_on_timeout(evutil_socket_t fd, short event, void *arg) { ws_on_timeout(evutil_socket_t fd, short event, void *arg) {
struct worker_thread *wt = arg; struct worker_thread *wt = arg;
(void) fd; (void)fd;
(void) event; (void)event;
fprintf(stderr, "Time has run out! (thread %d)\n", wt->id); fprintf(stderr, "Time has run out! (thread %d)\n", wt->id);
event_base_loopbreak(wt->base); /* break out of event loop */ event_base_loopbreak(wt->base); /* break out of event loop */
} }
void* void*
worker_main(void *ptr) { worker_main(void *ptr) {
char ws_template[] = "GET /.json HTTP/1.1\r\n" char ws_template[] = "GET /.json HTTP/1.1\r\n"
"Host: %s:%d\r\n" "Host: %s:%d\r\n"
"Connection: Upgrade\r\n" "Connection: Upgrade\r\n"
"Upgrade: WebSocket\r\n" "Upgrade: WebSocket\r\n"
"Origin: http://%s:%d\r\n" "Origin: http://%s:%d\r\n"
"Sec-WebSocket-Key: webdis-websocket-test-key\r\n" "Sec-WebSocket-Key: webdis-websocket-test-key\r\n"
"\r\n" "\r\n";
;
struct worker_thread *wt = ptr;
struct worker_thread *wt = ptr;
int ret;
int ret; int fd;
int fd; struct sockaddr_in addr;
struct sockaddr_in addr; struct timeval timeout_tv;
struct timeval timeout_tv; struct event *timeout_ev;
struct event *timeout_ev;
/* connect socket */
/* connect socket */ fd = socket(AF_INET, SOCK_STREAM, 0);
fd = socket(AF_INET, SOCK_STREAM, 0); addr.sin_family = AF_INET;
addr.sin_family = AF_INET; addr.sin_port = htons(wt->hi->port);
addr.sin_port = htons(wt->hi->port); memset(&(addr.sin_addr), 0, sizeof(addr.sin_addr));
memset(&(addr.sin_addr), 0, sizeof(addr.sin_addr)); addr.sin_addr.s_addr = inet_addr(wt->hi->host);
addr.sin_addr.s_addr = inet_addr(wt->hi->host);
ret = connect(fd, (struct sockaddr *)&addr, sizeof(struct sockaddr));
ret = connect(fd, (struct sockaddr*)&addr, sizeof(struct sockaddr)); if (ret != 0) {
if(ret != 0) { fprintf(stderr, "connect: ret=%d: %s\n", ret, strerror(errno));
fprintf(stderr, "connect: ret=%d: %s\n", ret, strerror(errno)); return NULL;
return NULL; }
}
/* initialize worker thread */
/* initialize worker thread */ wt->fd = fd;
wt->fd = fd; wt->base = event_base_new();
wt->base = event_base_new(); wt->rbuffer = evbuffer_new();
wt->rbuffer = evbuffer_new(); wt->wbuffer = evbuffer_new(); /* write buffer */
wt->wbuffer = evbuffer_new(); /* write buffer */ wt->byte_count = 0;
wt->byte_count = 0; wt->got_header = 0;
wt->got_header = 0;
/* add timeout, if set */
/* add timeout, if set */ if (wt->timeout_seconds > 0) {
if (wt->timeout_seconds > 0) { timeout_tv.tv_sec = wt->timeout_seconds;
timeout_tv.tv_sec = wt->timeout_seconds; timeout_tv.tv_usec = 0;
timeout_tv.tv_usec = 0; timeout_ev = event_new(wt->base, -1, EV_TIMEOUT, ws_on_timeout, wt);
timeout_ev = event_new(wt->base, -1, EV_TIMEOUT, ws_on_timeout, wt); event_add(timeout_ev, &timeout_tv);
event_add(timeout_ev, &timeout_tv); }
}
/* initialize HTTP parser, to parse the server response */
/* initialize HTTP parser, to parse the server response */ memset(&wt->settings, 0, sizeof(http_parser_settings));
memset(&wt->settings, 0, sizeof(http_parser_settings)); wt->settings.on_headers_complete = ws_on_headers_complete;
wt->settings.on_headers_complete = ws_on_headers_complete; wt->settings.on_message_complete = ws_on_message_complete;
wt->settings.on_message_complete = ws_on_message_complete; http_parser_init(&wt->parser, HTTP_RESPONSE);
http_parser_init(&wt->parser, HTTP_RESPONSE); wt->parser.data = wt;
wt->parser.data = wt;
/* add GET request to buffer */
/* add GET request to buffer */ evbuffer_add_printf(wt->wbuffer, ws_template, wt->hi->host, wt->hi->port,
evbuffer_add_printf(wt->wbuffer, ws_template, wt->hi->host, wt->hi->port, wt->hi->host, wt->hi->port);
wt->hi->host, wt->hi->port); wait_for_possible_write(wt); /* request callback */
wait_for_possible_write(wt); /* request callback */
/* go! */
/* go! */ event_base_dispatch(wt->base);
event_base_dispatch(wt->base); wt->debug("event_base_dispatch returned\n");
wt->debug("event_base_dispatch returned\n"); event_base_free(wt->base);
event_base_free(wt->base); return NULL;
return NULL;
} }
void void usage(const char *argv0, char *host_default, short port_default,
usage(const char* argv0, char *host_default, short port_default, int thread_count_default, int messages_default) {
int thread_count_default, int messages_default) {
printf("Usage: %s [options]\n"
printf("Usage: %s [options]\n" "Options are:\n"
"Options are:\n" "\t[--host|-h] HOST\t(default = \"%s\")\n"
"\t[--host|-h] HOST\t(default = \"%s\")\n" "\t[--port|-p] PORT\t(default = %d)\n"
"\t[--port|-p] PORT\t(default = %d)\n" "\t[--clients|-c] THREADS\t(default = %d)\n"
"\t[--clients|-c] THREADS\t(default = %d)\n" "\t[--messages|-n] COUNT\t(number of messages per thread, default = %d)\n"
"\t[--messages|-n] COUNT\t(number of messages per thread, default = %d)\n" "\t[--max-time|-t] SECONDS\t(max time to give to the run, default = unlimited)\n"
"\t[--max-time|-t] SECONDS\t(max time to give to the run, default = unlimited)\n" "\t[--verbose|-v]\t\t(extremely verbose output)\n",
"\t[--verbose|-v]\t\t(extremely verbose output)\n", argv0, host_default, (int)port_default,
argv0, host_default, (int)port_default, thread_count_default, messages_default);
thread_count_default, messages_default);
} }
int int
main(int argc, char *argv[]) { main(int argc, char *argv[]) {
struct timespec t0, t1; struct timespec t0, t1;
int messages_default = 2500; int messages_default = 2500;
int thread_count_default = 4; int thread_count_default = 4;
short port_default = 7379; short port_default = 7379;
char *host_default = "127.0.0.1"; char *host_default = "127.0.0.1";
int msg_target = messages_default; int msg_target = messages_default;
int thread_count = thread_count_default; int thread_count = thread_count_default;
int i, opt; int i, opt;
char *colon; char *colon;
long total = 0, total_bytes = 0; long total = 0, total_bytes = 0;
int verbose = 0; int verbose = 0;
int timeout_seconds = -1; int timeout_seconds = -1;
struct host_info hi = {host_default, port_default}; struct host_info hi = {host_default, port_default};
struct worker_thread *workers; struct worker_thread *workers;
/* getopt */ /* getopt */
struct option long_options[] = { struct option long_options[] = {
{"help", no_argument, NULL, '?'}, {"help", no_argument, NULL, '?'},
{"host", required_argument, NULL, 'h'}, {"host", required_argument, NULL, 'h'},
{"port", required_argument, NULL, 'p'}, {"port", required_argument, NULL, 'p'},
{"clients", required_argument, NULL, 'c'}, {"clients", required_argument, NULL, 'c'},
{"messages", required_argument, NULL, 'n'}, {"messages", required_argument, NULL, 'n'},
{"max-time", required_argument, NULL, 't'}, {"max-time", required_argument, NULL, 't'},
{"verbose", no_argument, NULL, 'v'}, {"verbose", no_argument, NULL, 'v'},
{0, 0, 0, 0} {0, 0, 0, 0}};
}; while ((opt = getopt_long(argc, argv, "h:p:c:n:t:vs", long_options, NULL)) != -1) {
while ((opt = getopt_long(argc, argv, "h:p:c:n:t:vs", long_options, NULL)) != -1) { switch (opt) {
switch (opt) { case 'h':
case 'h': colon = strchr(optarg, ':');
colon = strchr(optarg, ':'); if (!colon) {
if(!colon) { size_t sz = strlen(optarg);
size_t sz = strlen(optarg); hi.host = calloc(1 + sz, 1);
hi.host = calloc(1 + sz, 1); strncpy(hi.host, optarg, sz);
strncpy(hi.host, optarg, sz); } else {
} else { hi.host = calloc(1 + colon - optarg, 1);
hi.host = calloc(1+colon-optarg, 1); strncpy(hi.host, optarg, colon - optarg);
strncpy(hi.host, optarg, colon-optarg); hi.port = (short)atol(colon + 1);
hi.port = (short)atol(colon+1); }
} break;
break;
case 'p':
case 'p': hi.port = (short)atol(optarg);
hi.port = (short)atol(optarg); break;
break;
case 'c':
case 'c': thread_count = atoi(optarg);
thread_count = atoi(optarg); break;
break;
case 'n':
case 'n': msg_target = atoi(optarg);
msg_target = atoi(optarg); break;
break;
case 't':
case 't': timeout_seconds = atoi(optarg);
timeout_seconds = atoi(optarg); break;
break;
case 'v':
case 'v': verbose = 1;
verbose = 1; break;
break;
default: /* '?' */
default: /* '?' */ usage(argv[0], host_default, port_default,
usage(argv[0], host_default, port_default, thread_count_default,
thread_count_default, messages_default);
messages_default); exit(EXIT_SUCCESS);
exit(EXIT_SUCCESS); }
} }
}
/* run threads */
/* run threads */ workers = calloc(sizeof(struct worker_thread), thread_count);
workers = calloc(sizeof(struct worker_thread), thread_count);
clock_gettime(CLOCK_MONOTONIC, &t0);
clock_gettime(CLOCK_MONOTONIC, &t0); for (i = 0; i < thread_count; ++i) {
for (i = 0; i < thread_count; ++i) { workers[i].id = i;
workers[i].id = i; workers[i].msg_target = msg_target;
workers[i].msg_target = msg_target; workers[i].hi = &hi;
workers[i].hi = &hi; workers[i].verbose = verbose;
workers[i].verbose = verbose; workers[i].state = WS_INITIAL;
workers[i].state = WS_INITIAL; workers[i].debug = verbose ? debug_verbose : debug_noop;
workers[i].debug = verbose ? debug_verbose : debug_noop; workers[i].timeout_seconds = timeout_seconds;
workers[i].timeout_seconds = timeout_seconds; if (thread_count == 1) {
if (thread_count == 1) { printf("Single-threaded mode\n");
printf("Single-threaded mode\n"); worker_main(&workers[0]);
worker_main(&workers[0]); } else { /* create threads */
} else { /* create threads */ pthread_create(&workers[i].thread, NULL,
pthread_create(&workers[i].thread, NULL, worker_main, &workers[i]);
worker_main, &workers[i]); }
} }
}
/* wait for threads to finish */
/* wait for threads to finish */ for (i = 0; i < thread_count; ++i) {
for (i = 0; i < thread_count; ++i) { if (thread_count > 1) {
if (thread_count > 1) { pthread_join(workers[i].thread, NULL);
pthread_join(workers[i].thread, NULL); }
} total += workers[i].msg_received;
total += workers[i].msg_received; total_bytes += workers[i].byte_count;
total_bytes += workers[i].byte_count; }
}
/* timing */
/* timing */ clock_gettime(CLOCK_MONOTONIC, &t1);
clock_gettime(CLOCK_MONOTONIC, &t1); float mili0 = t0.tv_sec * 1000 + t0.tv_nsec / 1000000;
float mili0 = t0.tv_sec * 1000 + t0.tv_nsec / 1000000; float mili1 = t1.tv_sec * 1000 + t1.tv_nsec / 1000000;
float mili1 = t1.tv_sec * 1000 + t1.tv_nsec / 1000000;
if (total != 0) {
if(total != 0) { double kb_per_sec = ((double)total_bytes / (double)(mili1 - mili0)) / 1.024;
double kb_per_sec = ((double)total_bytes / (double)(mili1-mili0)) / 1.024; printf("Read %ld messages (%ld bytes) in %0.2f sec: %0.2f msg/sec (%0.2f KB/sec)\n",
printf("Read %ld messages (%ld bytes) in %0.2f sec: %0.2f msg/sec (%0.2f KB/sec)\n", total,
total, total_bytes,
total_bytes, (mili1 - mili0) / 1000.0,
(mili1-mili0)/1000.0, 1000 * ((double)total) / (mili1 - mili0),
1000*((double)total)/(mili1-mili0), kb_per_sec);
kb_per_sec); return (total == thread_count * msg_target ? EXIT_SUCCESS : EXIT_FAILURE);
return (total == thread_count * msg_target ? EXIT_SUCCESS : EXIT_FAILURE); } else {
} else { printf("No message was read.\n");
printf("No message was read.\n"); return EXIT_FAILURE;
return EXIT_FAILURE; }
}
} }

Loading…
Cancel
Save