|
|
@ -12,10 +12,18 @@
|
|
|
|
|
|
|
|
|
|
|
|
#include <event.h>
|
|
|
|
#include <event.h>
|
|
|
|
|
|
|
|
|
|
|
|
struct reader {
|
|
|
|
|
|
|
|
|
|
|
|
struct cx {
|
|
|
|
int fd;
|
|
|
|
int fd;
|
|
|
|
|
|
|
|
|
|
|
|
struct event ev;
|
|
|
|
int *counter;
|
|
|
|
|
|
|
|
int total;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
char *http_request;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
void (*read_fun)(int,short,void*);
|
|
|
|
|
|
|
|
void (*write_fun)(int,short,void*);
|
|
|
|
|
|
|
|
struct event evr, evw;
|
|
|
|
struct event_base *base;
|
|
|
|
struct event_base *base;
|
|
|
|
};
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
@ -43,84 +51,180 @@ webdis_connect(const char *host, short port) {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
void
|
|
|
|
void
|
|
|
|
reader_http_request(struct reader *r) {
|
|
|
|
reader_http_request(struct cx *c, const char* buffer, const char *limit) {
|
|
|
|
|
|
|
|
|
|
|
|
char buffer[] = "GET /SUBSCRIBE/chan HTTP/1.1\r\n\r\n";
|
|
|
|
|
|
|
|
char first_msg[] = "{\"SUBSCRIBE\":[\"subscribe\",\"chan\",1]}";
|
|
|
|
|
|
|
|
char resp[2048];
|
|
|
|
char resp[2048];
|
|
|
|
int pos = 0;
|
|
|
|
int pos = 0;
|
|
|
|
|
|
|
|
|
|
|
|
write(r->fd, buffer, sizeof(buffer)-1);
|
|
|
|
int r = write(c->fd, buffer, strlen(buffer));
|
|
|
|
|
|
|
|
(void)r;
|
|
|
|
|
|
|
|
|
|
|
|
memset(resp, 0, sizeof(resp));
|
|
|
|
memset(resp, 0, sizeof(resp));
|
|
|
|
while(1) {
|
|
|
|
while(1) {
|
|
|
|
int ret = read(r->fd, resp+pos, sizeof(resp)-pos);
|
|
|
|
int ret = read(c->fd, resp+pos, sizeof(resp)-pos);
|
|
|
|
if(ret <= 0) {
|
|
|
|
if(ret <= 0) {
|
|
|
|
printf("fd=%d, ret=%d\n", r->fd, ret);
|
|
|
|
|
|
|
|
return;
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
pos += ret;
|
|
|
|
pos += ret;
|
|
|
|
|
|
|
|
|
|
|
|
if(strstr(resp, first_msg) != NULL) {
|
|
|
|
if(strstr(resp, limit) != NULL) {
|
|
|
|
break;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
void
|
|
|
|
|
|
|
|
cx_install(struct cx *c) {
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if(c->read_fun) {
|
|
|
|
|
|
|
|
event_set(&c->evr, c->fd, EV_READ, c->read_fun, c);
|
|
|
|
|
|
|
|
event_base_set(c->base, &c->evr);
|
|
|
|
|
|
|
|
event_add(&c->evr, NULL);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
if(c->write_fun) {
|
|
|
|
|
|
|
|
event_set(&c->evw, c->fd, EV_WRITE, c->write_fun, c);
|
|
|
|
|
|
|
|
event_base_set(c->base, &c->evw);
|
|
|
|
|
|
|
|
event_add(&c->evw, NULL);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
void
|
|
|
|
void
|
|
|
|
reader_can_read(int fd, short event, void *ptr) {
|
|
|
|
reader_can_read(int fd, short event, void *ptr) {
|
|
|
|
|
|
|
|
|
|
|
|
struct reader *r = ptr;
|
|
|
|
char buffer[1024];
|
|
|
|
printf("Reader can read on fd=%d\n", fd);
|
|
|
|
struct cx *c = ptr;
|
|
|
|
|
|
|
|
const char *p;
|
|
|
|
|
|
|
|
|
|
|
|
// reader_install(r);
|
|
|
|
(void)event;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
int ret = read(fd, buffer, sizeof(buffer));
|
|
|
|
|
|
|
|
if(ret > 0) {
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/* count messages, each message starts with '{' */
|
|
|
|
|
|
|
|
p = buffer;
|
|
|
|
|
|
|
|
do {
|
|
|
|
|
|
|
|
/* look for the start of a message */
|
|
|
|
|
|
|
|
p = memchr(p, '{', buffer + ret - p);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if(!p) break; /* none left */
|
|
|
|
|
|
|
|
p++;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
(*c->counter)++;
|
|
|
|
|
|
|
|
if(((*c->counter * 100) % c->total) == 0) {
|
|
|
|
|
|
|
|
printf("\r%d %%", 100 * *c->counter / c->total);
|
|
|
|
|
|
|
|
fflush(stdout);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
if(*c->counter > c->total) {
|
|
|
|
|
|
|
|
event_base_loopbreak(c->base);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
} while(1);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
cx_install(c);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
void
|
|
|
|
void
|
|
|
|
reader_install(struct reader *r) {
|
|
|
|
reader_new(struct event_base *base, int total, int *counter, int chan) {
|
|
|
|
|
|
|
|
|
|
|
|
event_set(&r->ev, r->fd, EV_READ, reader_can_read, r);
|
|
|
|
struct cx *c = calloc(1, sizeof(struct cx));
|
|
|
|
event_base_set(r->base, &r->ev);
|
|
|
|
c->base = base;
|
|
|
|
event_add(&r->ev, NULL);
|
|
|
|
c->counter = counter;
|
|
|
|
|
|
|
|
c->total = total;
|
|
|
|
|
|
|
|
c->fd = webdis_connect("127.0.0.1", 7379);
|
|
|
|
|
|
|
|
c->read_fun = reader_can_read;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/* send read request. */
|
|
|
|
|
|
|
|
c->http_request = malloc(100);
|
|
|
|
|
|
|
|
sprintf(c->http_request, "GET /SUBSCRIBE/chan:%d HTTP/1.1\r\n\r\n", chan);
|
|
|
|
|
|
|
|
reader_http_request(c, c->http_request, "{\"SUBSCRIBE\":[\"subscribe\"");
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
cx_install(c);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
void
|
|
|
|
void
|
|
|
|
reader_new(struct event_base *base) {
|
|
|
|
writer_can_read(int fd, short event, void *ptr) {
|
|
|
|
|
|
|
|
char buffer[1024];
|
|
|
|
|
|
|
|
struct cx *c = ptr;
|
|
|
|
|
|
|
|
int r;
|
|
|
|
|
|
|
|
|
|
|
|
struct reader *r = malloc(sizeof(struct reader));
|
|
|
|
(void)event;
|
|
|
|
r->base = base;
|
|
|
|
|
|
|
|
r->fd = webdis_connect("127.0.0.1", 7379);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/* send read request. */
|
|
|
|
r = read(fd, buffer, sizeof(buffer)); /* discard */
|
|
|
|
reader_http_request(r);
|
|
|
|
(void)r;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
cx_install(c);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
void
|
|
|
|
|
|
|
|
writer_can_write(int fd, short event, void *ptr) {
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
struct cx *c = ptr;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
(void)fd;
|
|
|
|
|
|
|
|
(void)event;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
reader_http_request(c, c->http_request, "{\"PUBLISH\":");
|
|
|
|
|
|
|
|
|
|
|
|
reader_install(r);
|
|
|
|
cx_install(c);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
void
|
|
|
|
void
|
|
|
|
write_can_write(int fd, short event, void *ptr) {
|
|
|
|
writer_new(struct event_base *base, int chan) {
|
|
|
|
|
|
|
|
|
|
|
|
printf("Can write on fd=%d\n", fd);
|
|
|
|
struct cx *c = malloc(sizeof(struct cx));
|
|
|
|
|
|
|
|
c->base = base;
|
|
|
|
|
|
|
|
c->fd = webdis_connect("127.0.0.1", 7379);
|
|
|
|
|
|
|
|
c->read_fun = writer_can_read;
|
|
|
|
|
|
|
|
c->write_fun = writer_can_write;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/* send request. */
|
|
|
|
|
|
|
|
c->http_request = malloc(100);
|
|
|
|
|
|
|
|
sprintf(c->http_request, "GET /PUBLISH/chan:%d/hi HTTP/1.1\r\n\r\n", chan);
|
|
|
|
|
|
|
|
reader_http_request(c, c->http_request, "{\"PUBLISH\":");
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
cx_install(c);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
int
|
|
|
|
int
|
|
|
|
main(int argc, char *argv[]) {
|
|
|
|
main(int argc, char *argv[]) {
|
|
|
|
|
|
|
|
|
|
|
|
/* Create R readers and W writers, send N messages in total. */
|
|
|
|
/* Create R readers and W writers, send N messages in total. */
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
struct timespec t0, t1;
|
|
|
|
|
|
|
|
|
|
|
|
struct event_base *base = event_base_new();
|
|
|
|
struct event_base *base = event_base_new();
|
|
|
|
int r = 10, w = 10, n = 1000;
|
|
|
|
int r = 450, w = 10, chans = 1, n = 200000, count = 0;
|
|
|
|
int i;
|
|
|
|
int i;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
(void)argc;
|
|
|
|
|
|
|
|
(void)argv;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for(i = 0; i < r; ++i) {
|
|
|
|
|
|
|
|
reader_new(base, n, &count, i % chans);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
for(i = 0; i < w; ++i) {
|
|
|
|
for(i = 0; i < w; ++i) {
|
|
|
|
reader_new(base);
|
|
|
|
writer_new(base, i % chans);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/* save time now */
|
|
|
|
|
|
|
|
clock_gettime(CLOCK_MONOTONIC, &t0);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/* run test */
|
|
|
|
event_base_dispatch(base);
|
|
|
|
event_base_dispatch(base);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/* timing */
|
|
|
|
|
|
|
|
clock_gettime(CLOCK_MONOTONIC, &t1);
|
|
|
|
|
|
|
|
float mili0 = t0.tv_sec * 1000 + t0.tv_nsec / 1000000;
|
|
|
|
|
|
|
|
float mili1 = t1.tv_sec * 1000 + t1.tv_nsec / 1000000;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
printf("\rPushed %ld messages from %d writers to %d readers through %d channels in %0.2f sec: %0.2f msg/sec\n",
|
|
|
|
|
|
|
|
(long)n, w, r, chans,
|
|
|
|
|
|
|
|
(mili1-mili0)/1000.0,
|
|
|
|
|
|
|
|
1000*n/(mili1-mili0));
|
|
|
|
|
|
|
|
|
|
|
|
return EXIT_SUCCESS;
|
|
|
|
return EXIT_SUCCESS;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|