|
|
@ -13,16 +13,21 @@
|
|
|
|
#include <event.h>
|
|
|
|
#include <event.h>
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
|
|
|
* Connection object.
|
|
|
|
|
|
|
|
*/
|
|
|
|
struct cx {
|
|
|
|
struct cx {
|
|
|
|
int fd;
|
|
|
|
int fd;
|
|
|
|
|
|
|
|
|
|
|
|
int *counter;
|
|
|
|
int *counter; /* shared counter of the number of messages received. */
|
|
|
|
int total;
|
|
|
|
int total; /* total number of messages to send */
|
|
|
|
|
|
|
|
|
|
|
|
char *http_request;
|
|
|
|
char *http_request;
|
|
|
|
|
|
|
|
|
|
|
|
void (*read_fun)(int,short,void*);
|
|
|
|
void (*read_fun)(int,short,void*); /* called when able to read fd */
|
|
|
|
void (*write_fun)(int,short,void*);
|
|
|
|
void (*write_fun)(int,short,void*); /* called when able to write fd */
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/* libevent data structures */
|
|
|
|
struct event evr, evw;
|
|
|
|
struct event evr, evw;
|
|
|
|
struct event_base *base;
|
|
|
|
struct event_base *base;
|
|
|
|
};
|
|
|
|
};
|
|
|
@ -50,6 +55,9 @@ webdis_connect(const char *host, short port) {
|
|
|
|
return fd;
|
|
|
|
return fd;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
|
|
* Send request and read until the delimiter string is reached. blocking.
|
|
|
|
|
|
|
|
*/
|
|
|
|
void
|
|
|
|
void
|
|
|
|
reader_http_request(struct cx *c, const char* buffer, const char *limit) {
|
|
|
|
reader_http_request(struct cx *c, const char* buffer, const char *limit) {
|
|
|
|
|
|
|
|
|
|
|
@ -73,21 +81,28 @@ reader_http_request(struct cx *c, const char* buffer, const char *limit) {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
|
|
* (re)install connection in the event loop.
|
|
|
|
|
|
|
|
*/
|
|
|
|
void
|
|
|
|
void
|
|
|
|
cx_install(struct cx *c) {
|
|
|
|
cx_install(struct cx *c) {
|
|
|
|
|
|
|
|
|
|
|
|
if(c->read_fun) {
|
|
|
|
if(c->read_fun) { /* attach callback for read. */
|
|
|
|
event_set(&c->evr, c->fd, EV_READ, c->read_fun, c);
|
|
|
|
event_set(&c->evr, c->fd, EV_READ, c->read_fun, c);
|
|
|
|
event_base_set(c->base, &c->evr);
|
|
|
|
event_base_set(c->base, &c->evr);
|
|
|
|
event_add(&c->evr, NULL);
|
|
|
|
event_add(&c->evr, NULL);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if(c->write_fun) {
|
|
|
|
if(c->write_fun) { /* attach callback for write. */
|
|
|
|
event_set(&c->evw, c->fd, EV_WRITE, c->write_fun, c);
|
|
|
|
event_set(&c->evw, c->fd, EV_WRITE, c->write_fun, c);
|
|
|
|
event_base_set(c->base, &c->evw);
|
|
|
|
event_base_set(c->base, &c->evw);
|
|
|
|
event_add(&c->evw, NULL);
|
|
|
|
event_add(&c->evw, NULL);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
|
|
* Called when a reader has received data.
|
|
|
|
|
|
|
|
*/
|
|
|
|
void
|
|
|
|
void
|
|
|
|
reader_can_read(int fd, short event, void *ptr) {
|
|
|
|
reader_can_read(int fd, short event, void *ptr) {
|
|
|
|
|
|
|
|
|
|
|
@ -106,11 +121,11 @@ reader_can_read(int fd, short event, void *ptr) {
|
|
|
|
/* look for the start of a message */
|
|
|
|
/* look for the start of a message */
|
|
|
|
p = memchr(p, '{', buffer + ret - p);
|
|
|
|
p = memchr(p, '{', buffer + ret - p);
|
|
|
|
|
|
|
|
|
|
|
|
if(!p) break; /* none left */
|
|
|
|
if(!p) break; /* none left. */
|
|
|
|
p++;
|
|
|
|
p++;
|
|
|
|
|
|
|
|
|
|
|
|
(*c->counter)++;
|
|
|
|
(*c->counter)++; /* increment the global message counter. */
|
|
|
|
if(((*c->counter * 100) % c->total) == 0) {
|
|
|
|
if(((*c->counter * 100) % c->total) == 0) { /* show progress. */
|
|
|
|
printf("\r%d %%", 100 * *c->counter / c->total);
|
|
|
|
printf("\r%d %%", 100 * *c->counter / c->total);
|
|
|
|
fflush(stdout);
|
|
|
|
fflush(stdout);
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -124,24 +139,31 @@ reader_can_read(int fd, short event, void *ptr) {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
|
|
* create a new reader object.
|
|
|
|
|
|
|
|
*/
|
|
|
|
void
|
|
|
|
void
|
|
|
|
reader_new(struct event_base *base, int total, int *counter, int chan) {
|
|
|
|
reader_new(struct event_base *base, const char *host, short port, int total, int *counter, int chan) {
|
|
|
|
|
|
|
|
|
|
|
|
struct cx *c = calloc(1, sizeof(struct cx));
|
|
|
|
struct cx *c = calloc(1, sizeof(struct cx));
|
|
|
|
c->base = base;
|
|
|
|
c->base = base;
|
|
|
|
c->counter = counter;
|
|
|
|
c->counter = counter;
|
|
|
|
c->total = total;
|
|
|
|
c->total = total;
|
|
|
|
c->fd = webdis_connect("127.0.0.1", 7379);
|
|
|
|
c->fd = webdis_connect(host, port);
|
|
|
|
c->read_fun = reader_can_read;
|
|
|
|
c->read_fun = reader_can_read;
|
|
|
|
|
|
|
|
|
|
|
|
/* send read request. */
|
|
|
|
/* send subscription request. */
|
|
|
|
c->http_request = malloc(100);
|
|
|
|
c->http_request = malloc(100);
|
|
|
|
sprintf(c->http_request, "GET /SUBSCRIBE/chan:%d HTTP/1.1\r\n\r\n", chan);
|
|
|
|
sprintf(c->http_request, "GET /SUBSCRIBE/chan:%d HTTP/1.1\r\n\r\n", chan);
|
|
|
|
reader_http_request(c, c->http_request, "{\"SUBSCRIBE\":[\"subscribe\"");
|
|
|
|
reader_http_request(c, c->http_request, "{\"SUBSCRIBE\":[\"subscribe\"");
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/* add to the event loop. */
|
|
|
|
cx_install(c);
|
|
|
|
cx_install(c);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
|
|
* Called when a writer has received data back. read and ignore.
|
|
|
|
|
|
|
|
*/
|
|
|
|
void
|
|
|
|
void
|
|
|
|
writer_can_read(int fd, short event, void *ptr) {
|
|
|
|
writer_can_read(int fd, short event, void *ptr) {
|
|
|
|
char buffer[1024];
|
|
|
|
char buffer[1024];
|
|
|
@ -153,8 +175,11 @@ writer_can_read(int fd, short event, void *ptr) {
|
|
|
|
r = read(fd, buffer, sizeof(buffer)); /* discard */
|
|
|
|
r = read(fd, buffer, sizeof(buffer)); /* discard */
|
|
|
|
(void)r;
|
|
|
|
(void)r;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/* re-install in the event loop. */
|
|
|
|
cx_install(c);
|
|
|
|
cx_install(c);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/* send request */
|
|
|
|
void
|
|
|
|
void
|
|
|
|
writer_can_write(int fd, short event, void *ptr) {
|
|
|
|
writer_can_write(int fd, short event, void *ptr) {
|
|
|
|
|
|
|
|
|
|
|
@ -169,11 +194,11 @@ writer_can_write(int fd, short event, void *ptr) {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
void
|
|
|
|
void
|
|
|
|
writer_new(struct event_base *base, int chan) {
|
|
|
|
writer_new(struct event_base *base, const char *host, short port, int chan) {
|
|
|
|
|
|
|
|
|
|
|
|
struct cx *c = malloc(sizeof(struct cx));
|
|
|
|
struct cx *c = malloc(sizeof(struct cx));
|
|
|
|
c->base = base;
|
|
|
|
c->base = base;
|
|
|
|
c->fd = webdis_connect("127.0.0.1", 7379);
|
|
|
|
c->fd = webdis_connect(host, port);
|
|
|
|
c->read_fun = writer_can_read;
|
|
|
|
c->read_fun = writer_can_read;
|
|
|
|
c->write_fun = writer_can_write;
|
|
|
|
c->write_fun = writer_can_write;
|
|
|
|
|
|
|
|
|
|
|
@ -186,6 +211,23 @@ writer_new(struct event_base *base, int chan) {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
void
|
|
|
|
|
|
|
|
usage(const char* argv0, char *host_default, short port_default,
|
|
|
|
|
|
|
|
int r_default, int w_default, int n_default, int c_default) {
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
printf("Usage: %s [options]\n"
|
|
|
|
|
|
|
|
"Options are:\n"
|
|
|
|
|
|
|
|
"\t-h host\t\t(default = \"%s\")\n"
|
|
|
|
|
|
|
|
"\t-p port\t\t(default = %d)\n"
|
|
|
|
|
|
|
|
"\t-r readers\t(default = %d)\n"
|
|
|
|
|
|
|
|
"\t-w writers\t(default = %d)\n"
|
|
|
|
|
|
|
|
"\t-c channels\t(default = %d)\n"
|
|
|
|
|
|
|
|
"\t-n messages\t(number of messages to read in total, default = %d)\n",
|
|
|
|
|
|
|
|
argv0, host_default, (int)port_default,
|
|
|
|
|
|
|
|
r_default, w_default,
|
|
|
|
|
|
|
|
c_default, n_default);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
int
|
|
|
|
int
|
|
|
|
main(int argc, char *argv[]) {
|
|
|
|
main(int argc, char *argv[]) {
|
|
|
@ -195,18 +237,73 @@ main(int argc, char *argv[]) {
|
|
|
|
struct timespec t0, t1;
|
|
|
|
struct timespec t0, t1;
|
|
|
|
|
|
|
|
|
|
|
|
struct event_base *base = event_base_new();
|
|
|
|
struct event_base *base = event_base_new();
|
|
|
|
int r = 450, w = 10, chans = 1, n = 200000, count = 0;
|
|
|
|
int i, count = 0;
|
|
|
|
int i;
|
|
|
|
|
|
|
|
|
|
|
|
/* getopt vars */
|
|
|
|
(void)argc;
|
|
|
|
int opt;
|
|
|
|
(void)argv;
|
|
|
|
char *colon;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/* default values */
|
|
|
|
|
|
|
|
short port_default = 7379;
|
|
|
|
|
|
|
|
char *host_default = "127.0.0.1";
|
|
|
|
|
|
|
|
int r_default = 450, w_default = 10, n_default = 100000, c_default = 1;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/* real values */
|
|
|
|
|
|
|
|
int r = r_default, w = w_default, chans = c_default, n = n_default;
|
|
|
|
|
|
|
|
char *host = host_default;
|
|
|
|
|
|
|
|
short port = port_default;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/* getopt */
|
|
|
|
|
|
|
|
while ((opt = getopt(argc, argv, "h:p:r:w:c:n:")) != -1) {
|
|
|
|
|
|
|
|
switch (opt) {
|
|
|
|
|
|
|
|
case 'h':
|
|
|
|
|
|
|
|
colon = strchr(optarg, ':');
|
|
|
|
|
|
|
|
if(!colon) {
|
|
|
|
|
|
|
|
size_t sz = strlen(optarg);
|
|
|
|
|
|
|
|
host = calloc(1 + sz, 1);
|
|
|
|
|
|
|
|
strncpy(host, optarg, sz);
|
|
|
|
|
|
|
|
} else {
|
|
|
|
|
|
|
|
host = calloc(1+colon-optarg, 1);
|
|
|
|
|
|
|
|
strncpy(host, optarg, colon-optarg);
|
|
|
|
|
|
|
|
port = (short)atol(colon+1);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
break;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
case 'p':
|
|
|
|
|
|
|
|
port = (short)atol(optarg);
|
|
|
|
|
|
|
|
break;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
case 'r':
|
|
|
|
|
|
|
|
r = atoi(optarg);
|
|
|
|
|
|
|
|
break;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
case 'w':
|
|
|
|
|
|
|
|
w = atoi(optarg);
|
|
|
|
|
|
|
|
break;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
case 'c':
|
|
|
|
|
|
|
|
chans = atoi(optarg);
|
|
|
|
|
|
|
|
break;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
case 'n':
|
|
|
|
|
|
|
|
n = atoi(optarg);
|
|
|
|
|
|
|
|
break;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
default:
|
|
|
|
|
|
|
|
usage(argv[0], host_default, port_default,
|
|
|
|
|
|
|
|
r_default, w_default,
|
|
|
|
|
|
|
|
n_default, c_default);
|
|
|
|
|
|
|
|
exit(EXIT_FAILURE);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
for(i = 0; i < r; ++i) {
|
|
|
|
for(i = 0; i < r; ++i) {
|
|
|
|
reader_new(base, n, &count, i % chans);
|
|
|
|
reader_new(base, host, port, n, &count, i % chans);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
for(i = 0; i < w; ++i) {
|
|
|
|
for(i = 0; i < w; ++i) {
|
|
|
|
writer_new(base, i % chans);
|
|
|
|
writer_new(base, host, port, i % chans);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/* save time now */
|
|
|
|
/* save time now */
|
|
|
@ -220,7 +317,7 @@ main(int argc, char *argv[]) {
|
|
|
|
float mili0 = t0.tv_sec * 1000 + t0.tv_nsec / 1000000;
|
|
|
|
float mili0 = t0.tv_sec * 1000 + t0.tv_nsec / 1000000;
|
|
|
|
float mili1 = t1.tv_sec * 1000 + t1.tv_nsec / 1000000;
|
|
|
|
float mili1 = t1.tv_sec * 1000 + t1.tv_nsec / 1000000;
|
|
|
|
|
|
|
|
|
|
|
|
printf("\rPushed %ld messages from %d writers to %d readers through %d channels in %0.2f sec: %0.2f msg/sec\n",
|
|
|
|
printf("\rReceived %ld messages from %d writers to %d readers through %d channels in %0.2f sec: received %0.2f msg/sec\n",
|
|
|
|
(long)n, w, r, chans,
|
|
|
|
(long)n, w, r, chans,
|
|
|
|
(mili1-mili0)/1000.0,
|
|
|
|
(mili1-mili0)/1000.0,
|
|
|
|
1000*n/(mili1-mili0));
|
|
|
|
1000*n/(mili1-mili0));
|
|
|
|