From 5cddc7d33146251cbe95cfd1c51072885cf55fa4 Mon Sep 17 00:00:00 2001 From: Nicolas Favre-Felix Date: Tue, 19 Apr 2011 23:28:45 +0200 Subject: [PATCH] Better pub/sub benchmark. --- tests/pubsub.c | 141 +++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 119 insertions(+), 22 deletions(-) diff --git a/tests/pubsub.c b/tests/pubsub.c index 21a76f2..d84d448 100644 --- a/tests/pubsub.c +++ b/tests/pubsub.c @@ -13,16 +13,21 @@ #include +/* + * Connection object. + */ struct cx { int fd; - int *counter; - int total; + int *counter; /* shared counter of the number of messages received. */ + int total; /* total number of messages to send */ char *http_request; - void (*read_fun)(int,short,void*); - void (*write_fun)(int,short,void*); + void (*read_fun)(int,short,void*); /* called when able to read fd */ + void (*write_fun)(int,short,void*); /* called when able to write fd */ + + /* libevent data structures */ struct event evr, evw; struct event_base *base; }; @@ -50,6 +55,9 @@ webdis_connect(const char *host, short port) { return fd; } +/** + * Send request and read until the delimiter string is reached. blocking. + */ void reader_http_request(struct cx *c, const char* buffer, const char *limit) { @@ -73,21 +81,28 @@ reader_http_request(struct cx *c, const char* buffer, const char *limit) { } } +/** + * (re)install connection in the event loop. + */ void cx_install(struct cx *c) { - if(c->read_fun) { + if(c->read_fun) { /* attach callback for read. */ event_set(&c->evr, c->fd, EV_READ, c->read_fun, c); event_base_set(c->base, &c->evr); event_add(&c->evr, NULL); } - if(c->write_fun) { + if(c->write_fun) { /* attach callback for write. */ event_set(&c->evw, c->fd, EV_WRITE, c->write_fun, c); event_base_set(c->base, &c->evw); event_add(&c->evw, NULL); } } + +/** + * Called when a reader has received data. + */ void reader_can_read(int fd, short event, void *ptr) { @@ -106,11 +121,11 @@ reader_can_read(int fd, short event, void *ptr) { /* look for the start of a message */ p = memchr(p, '{', buffer + ret - p); - if(!p) break; /* none left */ + if(!p) break; /* none left. */ p++; - (*c->counter)++; - if(((*c->counter * 100) % c->total) == 0) { + (*c->counter)++; /* increment the global message counter. */ + if(((*c->counter * 100) % c->total) == 0) { /* show progress. */ printf("\r%d %%", 100 * *c->counter / c->total); fflush(stdout); } @@ -124,24 +139,31 @@ reader_can_read(int fd, short event, void *ptr) { } +/** + * create a new reader object. + */ void -reader_new(struct event_base *base, int total, int *counter, int chan) { +reader_new(struct event_base *base, const char *host, short port, int total, int *counter, int chan) { struct cx *c = calloc(1, sizeof(struct cx)); c->base = base; c->counter = counter; c->total = total; - c->fd = webdis_connect("127.0.0.1", 7379); + c->fd = webdis_connect(host, port); c->read_fun = reader_can_read; - /* send read request. */ + /* send subscription request. */ c->http_request = malloc(100); sprintf(c->http_request, "GET /SUBSCRIBE/chan:%d HTTP/1.1\r\n\r\n", chan); reader_http_request(c, c->http_request, "{\"SUBSCRIBE\":[\"subscribe\""); + /* add to the event loop. */ cx_install(c); } +/** + * Called when a writer has received data back. read and ignore. + */ void writer_can_read(int fd, short event, void *ptr) { char buffer[1024]; @@ -153,8 +175,11 @@ writer_can_read(int fd, short event, void *ptr) { r = read(fd, buffer, sizeof(buffer)); /* discard */ (void)r; + /* re-install in the event loop. */ cx_install(c); } + +/* send request */ void writer_can_write(int fd, short event, void *ptr) { @@ -169,11 +194,11 @@ writer_can_write(int fd, short event, void *ptr) { } void -writer_new(struct event_base *base, int chan) { +writer_new(struct event_base *base, const char *host, short port, int chan) { struct cx *c = malloc(sizeof(struct cx)); c->base = base; - c->fd = webdis_connect("127.0.0.1", 7379); + c->fd = webdis_connect(host, port); c->read_fun = writer_can_read; c->write_fun = writer_can_write; @@ -186,6 +211,23 @@ writer_new(struct event_base *base, int chan) { } +void +usage(const char* argv0, char *host_default, short port_default, + int r_default, int w_default, int n_default, int c_default) { + + printf("Usage: %s [options]\n" + "Options are:\n" + "\t-h host\t\t(default = \"%s\")\n" + "\t-p port\t\t(default = %d)\n" + "\t-r readers\t(default = %d)\n" + "\t-w writers\t(default = %d)\n" + "\t-c channels\t(default = %d)\n" + "\t-n messages\t(number of messages to read in total, default = %d)\n", + argv0, host_default, (int)port_default, + r_default, w_default, + c_default, n_default); +} + int main(int argc, char *argv[]) { @@ -195,18 +237,73 @@ main(int argc, char *argv[]) { struct timespec t0, t1; struct event_base *base = event_base_new(); - int r = 450, w = 10, chans = 1, n = 200000, count = 0; - int i; - - (void)argc; - (void)argv; + int i, count = 0; + + /* getopt vars */ + int opt; + char *colon; + + /* default values */ + short port_default = 7379; + char *host_default = "127.0.0.1"; + int r_default = 450, w_default = 10, n_default = 100000, c_default = 1; + + + /* real values */ + int r = r_default, w = w_default, chans = c_default, n = n_default; + char *host = host_default; + short port = port_default; + + /* getopt */ + while ((opt = getopt(argc, argv, "h:p:r:w:c:n:")) != -1) { + switch (opt) { + case 'h': + colon = strchr(optarg, ':'); + if(!colon) { + size_t sz = strlen(optarg); + host = calloc(1 + sz, 1); + strncpy(host, optarg, sz); + } else { + host = calloc(1+colon-optarg, 1); + strncpy(host, optarg, colon-optarg); + port = (short)atol(colon+1); + } + break; + + case 'p': + port = (short)atol(optarg); + break; + + case 'r': + r = atoi(optarg); + break; + + case 'w': + w = atoi(optarg); + break; + + case 'c': + chans = atoi(optarg); + break; + + case 'n': + n = atoi(optarg); + break; + + default: + usage(argv[0], host_default, port_default, + r_default, w_default, + n_default, c_default); + exit(EXIT_FAILURE); + } + } for(i = 0; i < r; ++i) { - reader_new(base, n, &count, i % chans); + reader_new(base, host, port, n, &count, i % chans); } for(i = 0; i < w; ++i) { - writer_new(base, i % chans); + writer_new(base, host, port, i % chans); } /* save time now */ @@ -220,7 +317,7 @@ main(int argc, char *argv[]) { float mili0 = t0.tv_sec * 1000 + t0.tv_nsec / 1000000; float mili1 = t1.tv_sec * 1000 + t1.tv_nsec / 1000000; - printf("\rPushed %ld messages from %d writers to %d readers through %d channels in %0.2f sec: %0.2f msg/sec\n", + printf("\rReceived %ld messages from %d writers to %d readers through %d channels in %0.2f sec: received %0.2f msg/sec\n", (long)n, w, r, chans, (mili1-mili0)/1000.0, 1000*n/(mili1-mili0));