From c01adb1122a0e8c877d6e802a91519ae44736549 Mon Sep 17 00:00:00 2001 From: Nicolas Favre-Felix Date: Wed, 13 Apr 2011 19:27:10 +0200 Subject: [PATCH] Started pub/sub test. --- .gitignore | 1 + tests/Makefile | 2 +- tests/pubsub.c | 126 +++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 128 insertions(+), 1 deletion(-) create mode 100644 tests/pubsub.c diff --git a/.gitignore b/.gitignore index a7cef2b..67cccf6 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,4 @@ webdis websocket *.png +pubsub diff --git a/tests/Makefile b/tests/Makefile index 7cca7e1..f7a9927 100644 --- a/tests/Makefile +++ b/tests/Makefile @@ -1,4 +1,4 @@ -OUT=websocket +OUT=websocket pubsub CFLAGS=-O3 -Wall -Wextra LDFLAGS=-levent -lpthread diff --git a/tests/pubsub.c b/tests/pubsub.c new file mode 100644 index 0000000..7446474 --- /dev/null +++ b/tests/pubsub.c @@ -0,0 +1,126 @@ +#include +#define _GNU_SOURCE +#include +#include +#include +#include +#include +#include + +#include +#include + +#include + +struct reader { + int fd; + + struct event ev; + struct event_base *base; +}; + +int +webdis_connect(const char *host, short port) { + + int ret; + int fd; + struct sockaddr_in addr; + + /* connect socket */ + fd = socket(AF_INET, SOCK_STREAM, 0); + addr.sin_family = AF_INET; + addr.sin_port = htons(port); + memset(&(addr.sin_addr), 0, sizeof(addr.sin_addr)); + addr.sin_addr.s_addr = inet_addr(host); + + ret = connect(fd, (struct sockaddr*)&addr, sizeof(struct sockaddr)); + if(ret != 0) { + fprintf(stderr, "connect: ret=%d: %s\n", ret, strerror(errno)); + return -1; + } + + return fd; +} + +void +reader_http_request(struct reader *r) { + + char buffer[] = "GET /SUBSCRIBE/chan HTTP/1.1\r\n\r\n"; + char first_msg[] = "{\"SUBSCRIBE\":[\"subscribe\",\"chan\",1]}"; + char resp[2048]; + int pos = 0; + + write(r->fd, buffer, sizeof(buffer)-1); + + memset(resp, 0, sizeof(resp)); + while(1) { + int ret = read(r->fd, resp+pos, sizeof(resp)-pos); + if(ret <= 0) { + printf("fd=%d, ret=%d\n", r->fd, ret); + return; + } + pos += ret; + + if(strstr(resp, first_msg) != NULL) { + break; + } + } +} + +void +reader_can_read(int fd, short event, void *ptr) { + + struct reader *r = ptr; + printf("Reader can read on fd=%d\n", fd); + + // reader_install(r); +} + +void +reader_install(struct reader *r) { + + event_set(&r->ev, r->fd, EV_READ, reader_can_read, r); + event_base_set(r->base, &r->ev); + event_add(&r->ev, NULL); + +} + +void +reader_new(struct event_base *base) { + + struct reader *r = malloc(sizeof(struct reader)); + r->base = base; + r->fd = webdis_connect("127.0.0.1", 7379); + + /* send read request. */ + reader_http_request(r); + + reader_install(r); +} + +void +write_can_write(int fd, short event, void *ptr) { + + printf("Can write on fd=%d\n", fd); + +} + + +int +main(int argc, char *argv[]) { + + /* Create R readers and W writers, send N messages in total. */ + + struct event_base *base = event_base_new(); + int r = 10, w = 10, n = 1000; + int i; + + for(i = 0; i < w; ++i) { + reader_new(base); + } + + event_base_dispatch(base); + + return EXIT_SUCCESS; +} +