diff --git a/Makefile b/Makefile index 23a049e..1978e09 100644 --- a/Makefile +++ b/Makefile @@ -4,10 +4,10 @@ JANSSON_OBJ=jansson/src/dump.o jansson/src/error.o jansson/src/hashtable.o janss FORMAT_OBJS=formats/json.o formats/raw.o formats/common.o formats/custom-type.o formats/bson.o HTTP_PARSER_OBJS=http-parser/http_parser.o DEPS=$(FORMAT_OBJS) $(HIREDIS_OBJ) $(JANSSON_OBJ) $(HTTP_PARSER_OBJS) -OBJS=webdis.o conf.o cmd.o slog.o server.o libb64/cencode.o acl.o md5/md5.o http.o client.o $(DEPS) +OBJS=webdis.o cmd.o worker.o slog.o server.o libb64/cencode.o acl.o md5/md5.o http.o client.o websocket.o pool.o conf.o $(DEPS) CFLAGS=-O3 -Wall -Wextra -I. -Ijansson/src -Ihttp-parser -LDFLAGS=-levent +LDFLAGS=-levent -pthread all: $(OUT) Makefile diff --git a/README.markdown b/README.markdown index 0e2e0de..514abf3 100644 --- a/README.markdown +++ b/README.markdown @@ -20,11 +20,12 @@ curl -d "GET/hello" http://127.0.0.1:7379/ # Features +* Multi-threaded server. * GET and POST are supported. * JSON output by default, optional JSONP parameter (`?jsonp=myFunction`). * Raw Redis 2.0 protocol output with `.raw` suffix * BSON support for compact responses and MongoDB compatibility. -* HTTP 1.1 pipelining (50,000 http requests per second on a desktop Linux machine.) +* HTTP 1.1 pipelining (70,000 http requests per second on a desktop Linux machine.) * Connects to Redis using a TCP or UNIX socket. * Restricted commands by IP range (CIDR subnet + mask) or HTTP Basic Auth, returning 403 errors. * Possible Redis authentication in the config file. diff --git a/acl.c b/acl.c index 1b94d7b..176202c 100644 --- a/acl.c +++ b/acl.c @@ -14,7 +14,7 @@ acl_match_client(struct acl *a, struct http_client *client, in_addr_t *ip) { /* check HTTP Basic Auth */ const char *auth; - auth = client->input_headers.authorization.s; + auth = client_get_header(client, "Authorization"); if(a->http_basic_auth) { if(auth && strncasecmp(auth, "Basic ", 6) == 0) { /* sent auth */ if(strcmp(auth + 6, a->http_basic_auth) != 0) { /* bad password */ @@ -39,6 +39,7 @@ acl_match_client(struct acl *a, struct http_client *client, in_addr_t *ip) { int acl_allow_command(struct cmd *cmd, struct conf *cfg, struct http_client *client) { + /* FIXME */ char *always_off[] = {"MULTI", "EXEC", "WATCH", "DISCARD"}; unsigned int i; diff --git a/client.c b/client.c index 188da3d..fff1cf2 100644 --- a/client.c +++ b/client.c @@ -1,257 +1,104 @@ #include "client.h" -#include "server.h" -#include "cmd.h" +#include "http_parser.h" #include "http.h" -#include "slog.h" +#include "server.h" +#include "worker.h" +#include "websocket.h" #include -#include -#include #include +#include -#include "hiredis/async.h" - -struct http_client * -http_client_new(int fd, struct server *s) { - - struct http_client *c = calloc(1, sizeof(struct http_client)); - c->fd = fd; - c->s = s; - - /* initialize HTTP parser */ - c->settings.on_path = http_on_path; - c->settings.on_body = http_on_body; - c->settings.on_message_complete = http_on_complete; - c->settings.on_header_field = http_on_header_name; - c->settings.on_header_value = http_on_header_value; - c->settings.on_query_string = http_on_query_string; - - http_parser_init(&c->parser, HTTP_REQUEST); - c->parser.data = c; - - c->state = CLIENT_WAITING; - - return c; -} - - -/** - * Called by libevent when read(2) is possible on fd without blocking. - */ -void -http_client_read(int fd, short event, void *ctx) { - - struct http_client *c = ctx; - char buffer[64*1024]; - int ret, nparsed; - - (void)fd; - (void)event; - - if(c->state != CLIENT_WAITING) { /* not expecting anything, fail. */ - http_client_free(c); - return; - } - - ret = read(c->fd, buffer, sizeof(buffer)); - if(ret <= 0) { /* broken connection, bye */ - http_client_free(c); - return; - } - - nparsed = http_parser_execute(&c->parser, &c->settings, buffer, ret); - if(c->state == CLIENT_BROKEN) { - http_client_free(c); - return; - } - - if(c->parser.upgrade) { - /* TODO: upgrade parser (WebSockets & cie) */ - } else if(nparsed != ret) { /* invalid data */ - http_client_free(c); - } else if(c->state == CLIENT_WAITING) { - http_client_serve(c); - } -} - -static void -http_client_cleanup(struct http_client *c) { - - if(c->sub) { - return; /* we need to keep those. */ - } - - free(c->path.s); - memset(&c->path, 0, sizeof(str_t)); - - free(c->body.s); - memset(&c->body, 0, sizeof(str_t)); - - free(c->input_headers.connection.s); - memset(&c->input_headers.connection, 0, sizeof(str_t)); - - free(c->input_headers.if_none_match.s); - memset(&c->input_headers.if_none_match, 0, sizeof(str_t)); - - free(c->input_headers.authorization.s); - memset(&c->input_headers.authorization, 0, sizeof(str_t)); - - free(c->output_headers.content_type.s); - memset(&c->output_headers.content_type, 0, sizeof(str_t)); - - free(c->output_headers.etag.s); - memset(&c->output_headers.etag, 0, sizeof(str_t)); - - free(c->query_string.type.s); - memset(&c->query_string.type, 0, sizeof(str_t)); +static int +http_client_on_url(struct http_parser *p, const char *at, size_t sz) { - free(c->query_string.jsonp.s); - memset(&c->query_string.jsonp, 0, sizeof(str_t)); + struct http_client *c = p->data; - memset(&c->verb, 0, sizeof(c->verb)); + c->path = realloc(c->path, c->path_sz + sz + 1); + memcpy(c->path + c->path_sz, at, sz); + c->path_sz += sz; + c->path[c->path_sz] = 0; - c->state = CLIENT_WAITING; - c->started_responding = 0; + return 0; } -void -http_client_free(struct http_client *c) { - - event_del(&c->ev); - if(c->fd != -1) { - close(c->fd); - } - - if(c->sub) { - /* clean up Redis connection */ - c->sub->s->ac->onDisconnect = NULL; - server_free(c->sub->s); - - /* clean up command object */ - if(c->sub->cmd) { - cmd_free(c->sub->cmd); - } - free(c->sub); - c->sub = NULL; - } +static int +http_client_on_body(struct http_parser *p, const char *at, size_t sz) { - http_client_cleanup(c); - free(c); + struct http_client *c = p->data; + return http_client_set_body(c, at, sz); } int -http_client_keep_alive(struct http_client *c) { - - /* check disconnection */ - int keep_alive = 0; +http_client_set_body(struct http_client *c, const char *at, size_t sz) { - if(c->parser.http_major == 1 && c->parser.http_minor == 1) { - keep_alive = 1; /* HTTP 1.1: keep-alive by default */ - } - if(c->input_headers.connection.s) { - if(strncasecmp(c->input_headers.connection.s, "Keep-Alive", 10) == 0) { - keep_alive = 1; - } else if(strncasecmp(c->input_headers.connection.s, "Close", 5) == 0) { - keep_alive = 0; - } - } - return keep_alive; -} - -void -http_client_reset(struct http_client *c) { - - if(!http_client_keep_alive(c) && !c->sub) { - c->state = CLIENT_BROKEN; - return; - } - - http_client_cleanup(c); - http_parser_init(&c->parser, HTTP_REQUEST); -} + c->body = realloc(c->body, c->body_sz + sz + 1); + memcpy(c->body + c->body_sz, at, sz); + c->body_sz += sz; + c->body[c->body_sz] = 0; -/** - * (Re-)add read event callback - */ -void -http_client_serve(struct http_client *c) { - - event_set(&c->ev, c->fd, EV_READ, http_client_read, c); - event_base_set(c->s->base, &c->ev); - event_add(&c->ev, NULL); + return 0; } -/**** Parser callbacks ****/ - -/** - * Called when the path has been found. This is before any `?query-string'. - */ -int -http_on_path(http_parser *p, const char *at, size_t length) { +static int +http_client_on_header_name(struct http_parser *p, const char *at, size_t sz) { struct http_client *c = p->data; + size_t n = c->header_count; - c->path.s = calloc(length+1, 1); - memcpy(c->path.s, at, length); - c->path.sz = length; - - /* save HTTP verb as well */ - c->verb = (enum http_method)p->method; - - return 0; -} + if(c->last_cb != LAST_CB_KEY) { + n = ++c->header_count; + c->headers = realloc(c->headers, n * sizeof(struct http_header)); + memset(&c->headers[n-1], 0, sizeof(struct http_header)); + } -/** - * Called when the whole body has been read. - */ -int -http_on_body(http_parser *p, const char *at, size_t length) { - struct http_client *c = p->data; + c->headers[n-1].key = realloc(c->headers[n-1].key, + c->headers[n-1].key_sz + sz + 1); + memcpy(c->headers[n-1].key + c->headers[n-1].key_sz, at, sz); + c->headers[n-1].key_sz += sz; + c->headers[n-1].key[c->headers[n-1].key_sz] = 0; - c->body.s = realloc(c->body.s, c->body.sz + length); - memcpy(c->body.s + c->body.sz, at, length); - c->body.sz += length; + c->last_cb = LAST_CB_KEY; return 0; } -/** - * Called when the query string has been completely read. - */ -int -http_on_query_string(http_parser *parser, const char *at, size_t length) { +static int +http_client_on_query_string(struct http_parser *parser, const char *at, size_t sz) { struct http_client *c = parser->data; const char *p = at; - while(p < at + length) { + while(p < at + sz) { const char *key = p, *val; int key_len, val_len; - char *eq = memchr(key, '=', length - (p-at)); - if(!eq || eq > at + length) { /* last argument */ + char *eq = memchr(key, '=', sz - (p-at)); + if(!eq || eq > at + sz) { /* last argument */ break; } else { /* found an '=' */ - char *and; + char *amp; val = eq + 1; key_len = eq - key; p = eq + 1; - and = memchr(p, '&', length - (p-at)); - if(!and || and > at + length) { - val_len = at + length - p; /* last arg */ + amp = memchr(p, '&', sz - (p-at)); + if(!amp || amp > at + sz) { + val_len = at + sz - p; /* last arg */ } else { - val_len = and - val; /* cur arg */ - p = and + 1; + val_len = amp - val; /* cur arg */ + p = amp + 1; } if(key_len == 4 && strncmp(key, "type", 4) == 0) { - http_set_header(&c->query_string.type, val, val_len); + c->type = calloc(1 + val_len, 1); + memcpy(c->type, val, val_len); } else if(key_len == 5 && strncmp(key, "jsonp", 5) == 0) { - http_set_header(&c->query_string.jsonp, val, val_len); + c->jsonp = calloc(1 + val_len, 1); + memcpy(c->jsonp, val, val_len); } - if(!and) { + if(!amp) { break; } } @@ -259,234 +106,187 @@ http_on_query_string(http_parser *parser, const char *at, size_t length) { return 0; } -/** - * Called when the whole request has been parsed. - */ -int -http_on_complete(http_parser *p) { - struct http_client *c = p->data; - int ret = -1; +static int +http_client_on_header_value(struct http_parser *p, const char *at, size_t sz) { - /* check that the command can be executed */ - switch(c->verb) { - case HTTP_GET: - if(c->path.sz == 16 && memcmp(c->path.s, "/crossdomain.xml", 16) == 0) { - return http_crossdomain(c); - } - slog(c->s, WEBDIS_DEBUG, c->path.s, c->path.sz); - ret = cmd_run(c->s, c, 1+c->path.s, c->path.sz-1, NULL, 0); - break; - - case HTTP_POST: - slog(c->s, WEBDIS_DEBUG, c->path.s, c->path.sz); - ret = cmd_run(c->s, c, 1+c->body.s, c->body.sz-1, NULL, 0); - break; + struct http_client *c = p->data; + size_t n = c->header_count; - case HTTP_PUT: - slog(c->s, WEBDIS_DEBUG, c->path.s, c->path.sz); - ret = cmd_run(c->s, c, 1+c->path.s, c->path.sz-1, - c->body.s, c->body.sz); - break; + c->headers[n-1].val = realloc(c->headers[n-1].val, + c->headers[n-1].val_sz + sz + 1); + memcpy(c->headers[n-1].val + c->headers[n-1].val_sz, at, sz); + c->headers[n-1].val_sz += sz; + c->headers[n-1].val[c->headers[n-1].val_sz] = 0; - case HTTP_OPTIONS: - return http_options(c); + c->last_cb = LAST_CB_VAL; - default: - slog(c->s, WEBDIS_DEBUG, "405", 3); - http_send_error(c, 405, "Method Not Allowed"); - return 0; - } - if(ret < 0) { - c->state = CLIENT_WAITING; - if(c->cmd) { - cmd_free(c->cmd); - c->cmd = NULL; + if(strncmp("Expect", c->headers[n-1].key, c->headers[n-1].key_sz) == 0) { + if(sz == 12 && strncasecmp(at, "100-continue", sz) == 0) { + /* support HTTP file upload */ + char http100[] = "HTTP/1.1 100 Continue\r\n\r\n"; + int ret = write(c->fd, http100, sizeof(http100)-1); + (void)ret; + } + } else if(strncasecmp("Connection", c->headers[n-1].key, c->headers[n-1].key_sz) == 0) { + if(sz == 10 && strncasecmp(at, "Keep-Alive", sz) == 0) { + c->keep_alive = 1; } - http_send_error(c, 403, "Forbidden"); } - return ret; -} - -/** - * Called when a header name is read - */ -int -http_on_header_name(http_parser *p, const char *at, size_t length) { - - struct http_client *c = p->data; - - c->last_header_name.s = calloc(length+1, 1); - memcpy(c->last_header_name.s, at, length); - c->last_header_name.sz = length; - return 0; } -/** - * Called when a header value is read - */ -int -http_on_header_value(http_parser *p, const char *at, size_t length) { +static int +http_client_on_message_complete(struct http_parser *p) { struct http_client *c = p->data; - if(strncmp("Connection", c->last_header_name.s, c->last_header_name.sz) == 0) { - http_set_header(&c->input_headers.connection, at, length); - } else if(strncmp("If-None-Match", c->last_header_name.s, c->last_header_name.sz) == 0) { - http_set_header(&c->input_headers.if_none_match, at, length); - } else if(strncmp("Authorization", c->last_header_name.s, c->last_header_name.sz) == 0) { - http_set_header(&c->input_headers.authorization, at, length); - } else if(strncmp("Expect", c->last_header_name.s, c->last_header_name.sz) == 0) { - if(length == 12 && memcmp(at, "100-continue", length) == 0) { - /* support HTTP file upload */ - char http100[] = "HTTP/1.1 100 Continue\r\n\r\n"; - int ret = write(c->fd, http100, sizeof(http100)-1); - if(ret != sizeof(http100)-1) { - c->state = CLIENT_BROKEN; - } - } + /* keep-alive detection */ + if(c->parser.http_major == 1 && c->parser.http_minor == 1) { /* 1.1 */ + c->keep_alive = 1; } - free(c->last_header_name.s); - c->last_header_name.s = NULL; + if(p->upgrade) { /* WebSocket, don't execute just yet */ + c->is_websocket = 1; + return 0; + } + worker_process_client(c); + http_client_reset(c); + return 0; } +struct http_client * +http_client_new(struct worker *w, int fd, in_addr_t addr) { + struct http_client *c = calloc(1, sizeof(struct http_client)); + c->fd = fd; + c->w = w; + c->addr = addr; + c->s = w->s; -/**** HTTP Responses ****/ + /* parser */ + http_parser_init(&c->parser, HTTP_REQUEST); + c->parser.data = c; -static void -http_response_set_connection_header(struct http_client *c, struct http_response *r) { + /* callbacks */ + c->settings.on_url = http_client_on_url; + c->settings.on_query_string = http_client_on_query_string; + c->settings.on_body = http_client_on_body; + c->settings.on_message_complete = http_client_on_message_complete; + c->settings.on_header_field = http_client_on_header_name; + c->settings.on_header_value = http_client_on_header_value; - if(http_client_keep_alive(c)) { - http_response_set_header(r, "Connection", "Keep-Alive"); - } else { - http_response_set_header(r, "Connection", "Close"); - } -} - -void -http_send_reply(struct http_client *c, short code, const char *msg, - const char *body, size_t body_len) { + c->last_cb = LAST_CB_NONE; - struct http_response resp; - const char *ct = c->output_headers.content_type.s; - if(!ct) { - ct = "text/html"; - } + return c; +} - /* respond */ - http_response_init(&resp, code, msg); - http_response_set_connection_header(c, &resp); - if(body_len) { - http_response_set_header(&resp, "Content-Type", ct); - } +void +http_client_reset(struct http_client *c) { - if(c->sub) { - http_response_set_header(&resp, "Transfer-Encoding", "chunked"); - } + int i; - if(code == 200 && c->output_headers.etag.s) { - http_response_set_header(&resp, "ETag", c->output_headers.etag.s); + /* headers */ + for(i = 0; i < c->header_count; ++i) { + free(c->headers[i].key); + free(c->headers[i].val); } - - http_response_set_body(&resp, body, body_len); - - /* flush response in the socket */ - if(http_response_write(&resp, c->fd) || !http_client_keep_alive(c)) { /* failure */ - if(c->state == CLIENT_EXECUTING) { - http_client_free(c); - return; - } else if(c->state == CLIENT_WAITING) { - c->state = CLIENT_BROKEN; - } - } else { - http_client_reset(c); + free(c->headers); + c->headers = NULL; + c->header_count = 0; + + /* other data */ + free(c->body); c->body = NULL; + c->body_sz = 0; + free(c->path); c->path = NULL; + c->path_sz = 0; + free(c->type); c->type = NULL; + free(c->jsonp); c->jsonp = NULL; + + /* no last known header callback */ + c->last_cb = LAST_CB_NONE; + + /* mark as broken if client doesn't support Keep-Alive. */ + if(c->keep_alive == 0) { + c->broken = 1; } - http_client_serve(c); } void -http_send_error(struct http_client *c, short code, const char *msg) { +http_client_free(struct http_client *c) { - http_send_reply(c, code, msg, NULL, 0); - http_client_cleanup(c); -} + /* printf("client free: %p\n", (void*)c); */ -/* Transfer-encoding: chunked */ -void -http_send_reply_start(struct http_client *c, short code, const char *msg) { + http_client_reset(c); + free(c->buffer); + + /* close(c->fd); */ - http_send_reply(c, code, msg, NULL, 0); + free(c); } -void -http_send_reply_chunk(struct http_client *c, const char *p, size_t sz) { +int +http_client_read(struct http_client *c) { - char buf[64]; - int ret, chunk_size; + char buffer[4096]; + int ret; - chunk_size = sprintf(buf, "%x\r\n", (int)sz); - ret = write(c->fd, buf, chunk_size); - ret = write(c->fd, p, sz); - ret = write(c->fd, "\r\n", 2); - if(ret != 2) { - c->state = CLIENT_BROKEN; + ret = read(c->fd, buffer, sizeof(buffer)); + if(ret <= 0) { + /* printf("Broken read on c=%p, fd=%d.\n", (void*)c, c->fd); */ + close(c->fd); + http_client_free(c); + return -1; } -} -/* send nil chunk to mark the end of a stream. */ -void -http_send_reply_end(struct http_client *c) { + c->buffer = realloc(c->buffer, c->sz + ret); + memcpy(c->buffer + c->sz, buffer, ret); + c->sz += ret; + +#if 0 + printf("read %d bytes\n", ret); + write(1, "\n[", 2); + write(1, buffer, ret); + write(1, "]\n", 2); +#endif - http_send_reply_chunk(c, "", 0); + return ret; } -/* Adobe flash cross-domain request */ int -http_crossdomain(struct http_client *c) { - - struct http_response resp; - char out[] = "\n" -"\n" -"\n" - "\n" -"\n"; - - http_response_init(&resp, 200, "OK"); - http_response_set_connection_header(c, &resp); - http_response_set_header(&resp, "Content-Type", "application/xml"); - http_response_set_body(&resp, out, sizeof(out)-1); +http_client_execute(struct http_client *c) { - http_response_write(&resp, c->fd); - http_client_reset(c); + int nparsed = http_parser_execute(&c->parser, &c->settings, c->buffer, c->sz); + /* printf("nparsed=%d\n", nparsed); */ - return 0; + if(!c->is_websocket) { + /* removed consumed data */ + free(c->buffer); + c->buffer = NULL; + c->sz = 0; + } + return nparsed; } -/* reply to OPTIONS HTTP verb */ -int -http_options(struct http_client *c) { - - struct http_response resp; +const char * +client_get_header(struct http_client *c, const char *key) { - http_response_init(&resp, 200, "OK"); - http_response_set_connection_header(c, &resp); + int i; + size_t sz = strlen(key); - http_response_set_header(&resp, "Content-Type", "text/html"); - http_response_set_header(&resp, "Allow", "GET,POST,PUT,OPTIONS"); - http_response_set_header(&resp, "Content-Length", "0"); + for(i = 0; i < c->header_count; ++i) { - /* Cross-Origin Resource Sharing, CORS. */ - http_response_set_header(&resp, "Access-Control-Allow-Origin", "*"); + if(sz == c->headers[i].key_sz && + strncasecmp(key, c->headers[i].key, sz) == 0) { + return c->headers[i].val; + } - http_response_write(&resp, c->fd); - http_client_reset(c); + } - return 0; + return NULL; } + diff --git a/client.h b/client.h index b991097..30fc205 100644 --- a/client.h +++ b/client.h @@ -3,127 +3,73 @@ #include #include -#include "http-parser/http_parser.h" -#include "http.h" +#include "http_parser.h" +struct http_header; struct server; -struct cmd; typedef enum { - CLIENT_WAITING, - CLIENT_EXECUTING, - CLIENT_BROKEN} client_state; + LAST_CB_NONE = 0, + LAST_CB_KEY = 1, + LAST_CB_VAL = 2} last_cb_t; struct http_client { - /* socket and server reference */ int fd; in_addr_t addr; struct event ev; + + struct worker *w; struct server *s; - client_state state; - struct cmd *cmd; - - /* http parser */ - http_parser_settings settings; - http_parser parser; - - /* decoded http */ - enum http_method verb; - str_t path; - str_t body; - - /* input headers from client */ - struct { - str_t connection; - str_t if_none_match; - str_t authorization; - } input_headers; - - /* response headers */ - struct input_headers { - str_t content_type; - str_t etag; - } output_headers; - - /* query string */ - struct { - str_t type; - str_t jsonp; - } query_string; - - /* pub/sub */ - struct subscription *sub; - int started_responding; - - struct http_response resp; - - /* private, used in HTTP parser */ - str_t last_header_name; -}; -struct http_client * -http_client_new(int fd, struct server *s); + /* HTTP parsing */ + struct http_parser parser; + struct http_parser_settings settings; + char *buffer; + size_t sz; + last_cb_t last_cb; -void -http_client_serve(struct http_client *c); + /* various flags. TODO: bit map */ + int keep_alive; + int broken; + int is_websocket; -void -http_client_free(struct http_client *c); + /* HTTP data */ + char *path; + size_t path_sz; -void -http_client_reset(struct http_client *c); + /* headers */ + struct http_header *headers; + int header_count; -int -http_on_path(http_parser*, const char *at, size_t length); + char *body; + size_t body_sz; -int -http_on_path(http_parser*, const char *at, size_t length); + char *type; /* forced output content-type */ + char *jsonp; /* jsonp wrapper */ +}; -int -http_on_body(http_parser*, const char *at, size_t length); +struct http_client * +http_client_new(struct worker *w, int fd, in_addr_t addr); -int -http_on_header_name(http_parser*, const char *at, size_t length); +void +http_client_reset(struct http_client *c); -int -http_on_header_value(http_parser*, const char *at, size_t length); +void +http_client_free(struct http_client *c); int -http_on_complete(http_parser*); +http_client_read(struct http_client *c); int -http_on_query_string(http_parser*, const char *at, size_t length); +http_client_execute(struct http_client *c); int -http_client_keep_alive(struct http_client *c); - -/* responses */ - -void -http_send_reply(struct http_client *c, short code, const char *msg, - const char *body, size_t body_len); - -/* Transfer-encoding: chunked */ -void -http_send_reply_start(struct http_client *c, short code, const char *msg); +http_client_set_body(struct http_client *c, const char *at, size_t sz); -void -http_send_reply_chunk(struct http_client *c, const char *p, size_t sz); - -void -http_send_reply_end(struct http_client *c); - -void -http_send_error(struct http_client *c, short code, const char *msg); - -/* convenience functions */ -int -http_crossdomain(struct http_client *c); +const char * +client_get_header(struct http_client *c, const char *key); -/* reply to OPTIONS HTTP verb */ -int -http_options(struct http_client *c); #endif diff --git a/cmd.c b/cmd.c index 1ed83f3..551ec71 100644 --- a/cmd.c +++ b/cmd.c @@ -1,8 +1,11 @@ #include "cmd.h" -#include "server.h" #include "conf.h" #include "acl.h" #include "client.h" +#include "pool.h" +#include "worker.h" +#include "http.h" +#include "server.h" #include "formats/json.h" #include "formats/bson.h" @@ -12,6 +15,7 @@ #include #include #include +#include #include struct cmd * @@ -31,13 +35,20 @@ cmd_new(int count) { void cmd_free(struct cmd *c) { + int i; if(!c) return; + for(i = 0; i < c->count; ++i) { + free((char*)c->argv[i]); + } free(c->argv); free(c->argv_len); + free(c->jsonp); + free(c->if_none_match); if(c->mime_free) free(c->mime); + free(c); } @@ -69,9 +80,39 @@ decode_uri(const char *uri, size_t length, size_t *out_len, int always_decode_pl return ret; } +/* setup headers */ +static void +cmd_setup(struct cmd *cmd, struct http_client *client) { + + int i; + cmd->keep_alive = client->keep_alive; + + for(i = 0; i < client->header_count; ++i) { + if(strcasecmp(client->headers[i].key, "If-None-Match") == 0) { + cmd->if_none_match = calloc(1+client->headers[i].val_sz, 1); + memcpy(cmd->if_none_match, client->headers[i].val, + client->headers[i].val_sz); + } else if(strcasecmp(client->headers[i].key, "Connection") == 0 && + strcasecmp(client->headers[i].val, "Keep-Alive") == 0) { + cmd->keep_alive = 1; + } + } + + if(client->type) { /* transfer pointer ownership */ + cmd->mime = client->type; + cmd->mime_free = 1; + client->type = NULL; + } + + if(client->jsonp) { /* transfer pointer ownership */ + cmd->jsonp = client->jsonp; + client->jsonp = NULL; + } +} + int -cmd_run(struct server *s, struct http_client *client, +cmd_run(struct worker *w, struct http_client *client, const char *uri, size_t uri_len, const char *body, size_t body_len) { @@ -79,10 +120,10 @@ cmd_run(struct server *s, struct http_client *client, char *slash; const char *p; int cmd_len; - int param_count = 0, cur_param = 1, i; + int param_count = 0, cur_param = 1; struct cmd *cmd; - + redisAsyncContext *ac = NULL; formatting_fun f_format; /* count arguments */ @@ -100,11 +141,15 @@ cmd_run(struct server *s, struct http_client *client, return -1; } - client->cmd = cmd = cmd_new(param_count); + cmd = cmd_new(param_count); + cmd->fd = client->fd; /* get output formatting function */ uri_len = cmd_select_format(client, cmd, uri, uri_len, &f_format); + /* add HTTP info */ + cmd_setup(cmd, client); + /* check if we only have one command or more. */ slash = memchr(uri, '/', uri_len); if(slash) { @@ -114,26 +159,25 @@ cmd_run(struct server *s, struct http_client *client, } /* there is always a first parameter, it's the command name */ - cmd->argv[0] = uri; + cmd->argv[0] = malloc(cmd_len); + memcpy(cmd->argv[0], uri, cmd_len); cmd->argv_len[0] = cmd_len; /* check that the client is able to run this command */ - if(!acl_allow_command(cmd, s->cfg, client)) { + if(!acl_allow_command(cmd, w->s->cfg, client)) { return -1; } /* check if we have to split the connection */ if(cmd_is_subscribe(cmd)) { - - client->sub = malloc(sizeof(struct subscription)); - client->sub->s = s = server_copy(s); - client->sub->cmd = cmd; + ac = (redisAsyncContext*)pool_connect(w->pool, 0); } /* no args (e.g. INFO command) */ if(!slash) { - client->state = CLIENT_EXECUTING; - redisAsyncCommandArgv(s->ac, f_format, client, 1, cmd->argv, cmd->argv_len); + ac = (redisAsyncContext*)pool_get_context(w->pool); + redisAsyncCommandArgv(ac, f_format, cmd, 1, + (const char **)cmd->argv, cmd->argv_len); return 0; } p = slash + 1; @@ -156,21 +200,25 @@ cmd_run(struct server *s, struct http_client *client, } if(body && body_len) { /* PUT request */ - cmd->argv[cur_param] = body; + cmd->argv[cur_param] = malloc(body_len); + memcpy(cmd->argv[cur_param], body, body_len); cmd->argv_len[cur_param] = body_len; } - /* push command to Redis. */ - client->state = CLIENT_EXECUTING; - redisAsyncCommandArgv(s->ac, f_format, client, cmd->count, cmd->argv, cmd->argv_len); - - for(i = 1; i < cur_param; ++i) { - free((char*)cmd->argv[i]); + /* send it off! */ + if(!ac) { + ac = (redisAsyncContext*)pool_get_context(w->pool); } + cmd_send(ac, f_format, cmd); return 0; } +void +cmd_send(redisAsyncContext *ac, formatting_fun f_format, struct cmd *cmd) { + redisAsyncCommandArgv(ac, f_format, cmd, cmd->count, + (const char **)cmd->argv, cmd->argv_len); +} /** * Select Content-Type and processing function. @@ -235,9 +283,9 @@ cmd_select_format(struct http_client *client, struct cmd *cmd, } /* the user can force it with ?type=some/thing */ - if(client->query_string.type.s) { + if(client->type) { *f_format = custom_type_reply; - cmd->mime = strdup(client->query_string.type.s); + cmd->mime = strdup(client->type); cmd->mime_free = 1; } diff --git a/cmd.h b/cmd.h index eb4c47c..7a94867 100644 --- a/cmd.h +++ b/cmd.h @@ -10,18 +10,28 @@ struct evhttp_request; struct http_client; struct server; +struct worker; struct cmd; typedef void (*formatting_fun)(redisAsyncContext *, void *, void *); struct cmd { + int fd; + int count; - const char **argv; + char **argv; size_t *argv_len; /* HTTP data */ - char *mime; + char *mime; /* forced output content-type */ int mime_free; + char *if_none_match; /* used with ETags */ + char *jsonp; /* jsonp wrapper */ + int keep_alive; + + /* various flags */ + int started_responding; + int is_websocket; }; struct subscription { @@ -36,7 +46,7 @@ void cmd_free(struct cmd *c); int -cmd_run(struct server *s, struct http_client *client, +cmd_run(struct worker *w, struct http_client *client, const char *uri, size_t uri_len, const char *body, size_t body_len); @@ -47,4 +57,7 @@ cmd_select_format(struct http_client *client, struct cmd *cmd, int cmd_is_subscribe(struct cmd *cmd); +void +cmd_send(redisAsyncContext *ac, formatting_fun f_format, struct cmd *cmd); + #endif diff --git a/conf.c b/conf.c index ac20d5d..026716c 100644 --- a/conf.c +++ b/conf.c @@ -30,6 +30,7 @@ conf_read(const char *filename) { conf->redis_port = 6379; conf->http_host = strdup("0.0.0.0"); conf->http_port = 7379; + conf->http_threads = 4; conf->user = getuid(); conf->group = getgid(); conf->logfile = "webdis.log"; @@ -58,6 +59,8 @@ conf_read(const char *filename) { conf->http_host = strdup(json_string_value(jtmp)); } else if(strcmp(json_object_iter_key(kv), "http_port") == 0 && json_typeof(jtmp) == JSON_INTEGER) { conf->http_port = (short)json_integer_value(jtmp); + } else if(strcmp(json_object_iter_key(kv), "threads") == 0 && json_typeof(jtmp) == JSON_INTEGER) { + conf->http_threads = (short)json_integer_value(jtmp); } else if(strcmp(json_object_iter_key(kv), "acl") == 0 && json_typeof(jtmp) == JSON_ARRAY) { conf->perms = conf_parse_acls(jtmp); } else if(strcmp(json_object_iter_key(kv), "user") == 0 && json_typeof(jtmp) == JSON_STRING) { diff --git a/conf.h b/conf.h index 292d646..02d3484 100644 --- a/conf.h +++ b/conf.h @@ -14,6 +14,7 @@ struct conf { /* HTTP server interface */ char *http_host; short http_port; + short http_threads; /* daemonize process, off by default */ int daemonize; diff --git a/formats/bson.c b/formats/bson.c index 5e70348..1993d93 100644 --- a/formats/bson.c +++ b/formats/bson.c @@ -1,7 +1,6 @@ #include "bson.h" #include "common.h" #include "cmd.h" -#include "client.h" #include "http.h" #include @@ -18,30 +17,33 @@ void bson_reply(redisAsyncContext *c, void *r, void *privdata) { redisReply *reply = r; - struct http_client *client = privdata; + struct cmd *cmd = privdata; bson_t *b; char *bstr = NULL; size_t bsz; (void)c; - if(client->cmd == NULL) { + if(cmd == NULL) { /* broken connection */ return; } if (reply == NULL) { + /* FIXME */ + /* http_send_reply(client, 404, "Not Found", NULL, 0); + */ return; } /* encode redis reply as BSON */ - b = bson_wrap_redis_reply(client->cmd, reply); + b = bson_wrap_redis_reply(cmd, reply); /* get BSON as string */ bstr = bson_string_output(b, &bsz); /* send reply */ - format_send_reply(client, bstr, bsz, "application/bson"); + format_send_reply(cmd, bstr, bsz, "application/bson"); /* cleanup */ free(bstr); diff --git a/formats/common.c b/formats/common.c index e9aebf5..8d1af46 100644 --- a/formats/common.c +++ b/formats/common.c @@ -2,11 +2,13 @@ #include "cmd.h" #include "http.h" #include "client.h" +#include "websocket.h" #include "md5/md5.h" #include +#include -/* TODO: replace this with a faster hash function */ +/* TODO: replace this with a faster hash function? */ char *etag_new(const char *p, size_t sz) { md5_byte_t buf[16]; @@ -30,42 +32,59 @@ char *etag_new(const char *p, size_t sz) { } void -format_send_reply(struct http_client *client, const char *p, size_t sz, const char *content_type) { +format_send_reply(struct cmd *cmd, const char *p, size_t sz, const char *content_type) { int free_cmd = 1; + struct http_response resp; - struct cmd *cmd = client->cmd; + if(cmd->is_websocket) { + ws_reply(cmd, p, sz); + cmd_free(cmd); + return; + } if(cmd_is_subscribe(cmd)) { free_cmd = 0; /* start streaming */ - if(client->started_responding == 0) { + if(cmd->started_responding == 0) { const char *ct = cmd->mime?cmd->mime:content_type; - client->started_responding = 1; - http_set_header(&client->output_headers.content_type, ct, strlen(ct)); - http_send_reply_start(client, 200, "OK"); + cmd->started_responding = 1; + http_response_init(&resp, 200, "OK"); + http_response_set_header(&resp, "Content-Type", ct); + http_response_set_header(&resp, "Connection", "Keep-Alive"); + http_response_set_header(&resp, "Transfer-Encoding", "Chunked"); + http_response_write(&resp, cmd->fd); } - http_send_reply_chunk(client, p, sz); + http_response_write_chunk(cmd->fd, p, sz); } else { /* compute ETag */ char *etag = etag_new(p, sz); - const char *if_none_match = client->input_headers.if_none_match.s; /* check If-None-Match */ - if(if_none_match && strncmp(if_none_match, etag, client->input_headers.if_none_match.sz) == 0) { + if(cmd->if_none_match && strcmp(cmd->if_none_match, etag) == 0) { /* SAME! send 304. */ - http_send_reply(client, 304, "Not Modified", NULL, 0); + http_response_init(&resp, 304, "Not Modified"); } else { const char *ct = cmd->mime?cmd->mime:content_type; - http_set_header(&client->output_headers.content_type, ct, strlen(ct)); - http_set_header(&client->output_headers.etag, etag, strlen(etag)); - http_send_reply(client, 200, "OK", p, sz); + http_response_init(&resp, 200, "OK"); + http_response_set_header(&resp, "Content-Type", ct); + http_response_set_header(&resp, "ETag", etag); + http_response_set_body(&resp, p, sz); + } + if(cmd->keep_alive) { + http_response_set_header(&resp, "Connection", "Keep-Alive"); + } else { + http_response_set_header(&resp, "Connection", "Close"); + } + http_response_write(&resp, cmd->fd); + if(!cmd->keep_alive) { + close(cmd->fd); } - free(etag); } + /* cleanup */ if(free_cmd) { cmd_free(cmd); diff --git a/formats/common.h b/formats/common.h index 199d7f3..41f4d08 100644 --- a/formats/common.h +++ b/formats/common.h @@ -3,10 +3,10 @@ #include -struct http_client; +struct cmd; void -format_send_reply(struct http_client *client, +format_send_reply(struct cmd *cmd, const char *p, size_t sz, const char *content_type); diff --git a/formats/custom-type.c b/formats/custom-type.c index 0c7e7f8..62ea5d3 100644 --- a/formats/custom-type.c +++ b/formats/custom-type.c @@ -2,7 +2,6 @@ #include "cmd.h" #include "common.h" #include "http.h" -#include "client.h" #include #include @@ -12,35 +11,43 @@ void custom_type_reply(redisAsyncContext *c, void *r, void *privdata) { redisReply *reply = r; - struct http_client *client = privdata; + struct cmd *cmd = privdata; (void)c; char int_buffer[50]; int int_len; + struct http_response resp; if(reply == NULL) { return; } - if(client->cmd->mime) { /* use the given content-type, but only for strings */ + if(cmd->mime) { /* use the given content-type, but only for strings */ switch(reply->type) { case REDIS_REPLY_NIL: /* or nil values */ - format_send_reply(client, "", 0, client->cmd->mime); + format_send_reply(cmd, "", 0, cmd->mime); return; case REDIS_REPLY_STRING: - format_send_reply(client, reply->str, reply->len, client->cmd->mime); + format_send_reply(cmd, reply->str, reply->len, cmd->mime); return; case REDIS_REPLY_INTEGER: int_len = sprintf(int_buffer, "%lld", reply->integer); - format_send_reply(client, int_buffer, int_len, client->cmd->mime); + format_send_reply(cmd, int_buffer, int_len, cmd->mime); return; } } /* couldn't make sense of what the client wanted. */ - http_send_reply(client, 400, "Bad request", NULL, 0); - cmd_free(client->cmd); + http_response_init(&resp, 400, "Bad Request"); + http_response_set_header(&resp, "Content-Length", "0"); + if(cmd->keep_alive) { + http_response_set_header(&resp, "Connection", "Keep-Alive"); + } else { + http_response_set_header(&resp, "Connection", "Close"); + } + http_response_write(&resp, cmd->fd); + cmd_free(cmd); } diff --git a/formats/json.c b/formats/json.c index 1ba6c2d..e52614d 100644 --- a/formats/json.c +++ b/formats/json.c @@ -15,33 +15,34 @@ void json_reply(redisAsyncContext *c, void *r, void *privdata) { redisReply *reply = r; - struct http_client *client = privdata; + struct cmd *cmd = privdata; json_t *j; char *jstr; (void)c; - if(client->cmd == NULL) { + if(cmd == NULL) { /* broken connection */ return; } if (reply == NULL) { +/* FIXME */ +#if 0 if(client->started_responding) { /* broken, close */ http_send_reply_end(client); } +#endif return; } /* encode redis reply as JSON */ - j = json_wrap_redis_reply(client->cmd, r); + j = json_wrap_redis_reply(cmd, r); /* get JSON as string, possibly with JSONP wrapper */ - jstr = json_string_output(j, - client->query_string.jsonp.s, - client->query_string.jsonp.sz); + jstr = json_string_output(j, cmd->jsonp); /* send reply */ - format_send_reply(client, jstr, strlen(jstr), "application/json"); + format_send_reply(cmd, jstr, strlen(jstr), "application/json"); /* cleanup */ json_decref(j); @@ -192,12 +193,13 @@ json_wrap_redis_reply(const struct cmd *cmd, const redisReply *r) { char * -json_string_output(json_t *j, const char *jsonp, size_t jsonp_len) { +json_string_output(json_t *j, const char *jsonp) { char *json_reply = json_dumps(j, JSON_COMPACT); /* check for JSONP */ if(jsonp) { + size_t jsonp_len = strlen(jsonp); size_t json_len = strlen(json_reply); size_t ret_len = jsonp_len + 1 + json_len + 3; char *ret = calloc(1 + ret_len, 1); @@ -214,3 +216,83 @@ json_string_output(json_t *j, const char *jsonp, size_t jsonp_len) { return json_reply; } +/* extract JSON from WebSocket frame and fill struct cmd. */ +struct cmd * +json_ws_extract(struct http_client *c, const char *p, size_t sz) { + + struct cmd *cmd = NULL; + json_t *j; + char *jsonz; /* null-terminated */ + + unsigned int i, cur; + int argc = 0; + json_error_t jerror; + + (void)c; + + jsonz = calloc(sz + 1, 1); + memcpy(jsonz, p, sz); + j = json_loads(jsonz, sz, &jerror); + free(jsonz); + + if(!j) { + return NULL; + } + if(json_typeof(j) != JSON_ARRAY) { + json_decref(j); + return NULL; /* invalid JSON */ + } + + /* count elements */ + for(i = 0; i < json_array_size(j); ++i) { + json_t *jelem = json_array_get(j, i); + + switch(json_typeof(jelem)) { + case JSON_STRING: + case JSON_INTEGER: + argc++; + break; + + default: + break; + } + } + + if(!argc) { /* not a single item could be decoded */ + json_decref(j); + return NULL; + } + + /* create command and add args */ + cmd = cmd_new(argc); + for(i = 0, cur = 0; i < json_array_size(j); ++i) { + json_t *jelem = json_array_get(j, i); + char *tmp; + + switch(json_typeof(jelem)) { + case JSON_STRING: + tmp = strdup(json_string_value(jelem)); + + cmd->argv[cur] = tmp; + cmd->argv_len[cur] = strlen(tmp); + cur++; + break; + + case JSON_INTEGER: + tmp = malloc(40); + sprintf(tmp, "%d", (int)json_integer_value(jelem)); + + cmd->argv[cur] = tmp; + cmd->argv_len[cur] = strlen(tmp); + cur++; + break; + + default: + break; + } + } + + json_decref(j); + return cmd; +} + diff --git a/formats/json.h b/formats/json.h index bc68e8c..911f8f5 100644 --- a/formats/json.h +++ b/formats/json.h @@ -6,11 +6,15 @@ #include struct cmd; +struct http_client; void json_reply(redisAsyncContext *c, void *r, void *privdata); char * -json_string_output(json_t *j, const char *jsonp, size_t jsonp_len); +json_string_output(json_t *j, const char *jsonp); + +struct cmd * +json_ws_extract(struct http_client *c, const char *p, size_t sz); #endif diff --git a/formats/raw.c b/formats/raw.c index 11c293c..26b77b8 100644 --- a/formats/raw.c +++ b/formats/raw.c @@ -13,7 +13,7 @@ void raw_reply(redisAsyncContext *c, void *r, void *privdata) { redisReply *reply = r; - struct http_client *client = privdata; + struct cmd *cmd = privdata; char *raw_out; size_t sz; (void)c; @@ -25,7 +25,7 @@ raw_reply(redisAsyncContext *c, void *r, void *privdata) { raw_out = raw_wrap(r, &sz); /* send reply */ - format_send_reply(client, raw_out, sz, "binary/octet-stream"); + format_send_reply(cmd, raw_out, sz, "binary/octet-stream"); /* cleanup */ free(raw_out); diff --git a/http.c b/http.c index 95de5fe..c5e33f9 100644 --- a/http.c +++ b/http.c @@ -1,19 +1,12 @@ #include "http.h" #include "server.h" -#include "slog.h" -#include "cmd.h" +#include "worker.h" +#include "client.h" #include #include #include - -void -http_set_header(str_t *h, const char *p, size_t sz) { - - h->s = calloc(1, 1+sz); - memcpy(h->s, p, sz); - h->sz = sz; -} +#include /* HTTP Response */ @@ -34,35 +27,39 @@ void http_response_set_header(struct http_response *r, const char *k, const char *v) { int i, pos = r->header_count; - size_t sz; - char *s; - - sz = strlen(k) + 2 + strlen(v) + 2; - s = calloc(sz + 1, 1); - sprintf(s, "%s: %s\r\n", k, v); + size_t key_sz = strlen(k); + size_t val_sz = strlen(v); for(i = 0; i < r->header_count; ++i) { - size_t klen = strlen(k); - if(strncmp(r->headers[i].s, k, klen) == 0 && r->headers[i].s[klen] == ':') { + if(strncmp(r->headers[i].key, k, key_sz) == 0) { pos = i; /* free old value before replacing it. */ - free(r->headers[i].s); + free(r->headers[i].key); + free(r->headers[i].val); break; } } /* extend array */ if(pos == r->header_count) { - r->headers = realloc(r->headers, sizeof(str_t)*(r->header_count + 1)); + r->headers = realloc(r->headers, + sizeof(struct http_header)*(r->header_count + 1)); r->header_count++; } - r->headers[pos].s = s; - r->headers[pos].sz = sz; - if(!strcmp(k, "Transfer-Encoding") && !strcmp(v, "chunked")) { + /* copy key */ + r->headers[pos].key = calloc(key_sz + 1, 1); + memcpy(r->headers[pos].key, k, key_sz); + r->headers[pos].key_sz = key_sz; + + /* copy val */ + r->headers[pos].val = calloc(val_sz + 1, 1); + memcpy(r->headers[pos].val, v, val_sz); + r->headers[pos].val_sz = val_sz; + + if(!strcmp(k, "Transfer-Encoding") && !strcmp(v, "Chunked")) { r->chunked = 1; } - } void @@ -77,13 +74,13 @@ http_response_write(struct http_response *r, int fd) { char *s = NULL, *p; size_t sz = 0; - int i, ret; + int i, ret, keep_alive = 0; sz = sizeof("HTTP/1.x xxx ")-1 + strlen(r->msg) + 2; s = calloc(sz + 1, 1); ret = sprintf(s, "HTTP/1.1 %d %s\r\n", r->code, r->msg); - p = s; // + ret - 3; + p = s; if(r->code == 200 && r->body) { char content_length[10]; @@ -94,12 +91,33 @@ http_response_write(struct http_response *r, int fd) { } for(i = 0; i < r->header_count; ++i) { - s = realloc(s, sz + r->headers[i].sz); + /* "Key: Value\r\n" */ + size_t header_sz = r->headers[i].key_sz + 2 + r->headers[i].val_sz + 2; + s = realloc(s, sz + header_sz); p = s + sz; - memcpy(p, r->headers[i].s, r->headers[i].sz); - p += r->headers[i].sz; - sz += r->headers[i].sz; + /* add key */ + memcpy(p, r->headers[i].key, r->headers[i].key_sz); + p += r->headers[i].key_sz; + + /* add ": " */ + memcpy(p, ": ", 2); + p += 2; + + /* add value */ + memcpy(p, r->headers[i].val, r->headers[i].val_sz); + p += r->headers[i].val_sz; + + /* add "\r\n" */ + memcpy(p, "\r\n", 2); + p += 2; + + sz += header_sz; + + if(strncasecmp("Connection", r->headers[i].key, r->headers[i].key_sz) == 0 && + strncasecmp("Keep-Alive", r->headers[i].val, r->headers[i].val_sz) == 0) { + keep_alive = 1; + } } /* end of headers */ @@ -114,14 +132,98 @@ http_response_write(struct http_response *r, int fd) { } ret = write(fd, s, sz); + if(!keep_alive) { + /* printf("response write, close fd=%d\n", fd); */ + close(fd); + } + /* + write(1, "response: [", 11); + write(1, s, sz); + write(1, "]\n", 2); + */ free(s); /* cleanup response object */ for(i = 0; i < r->header_count; ++i) { - free(r->headers[i].s); + free(r->headers[i].key); + free(r->headers[i].val); } free(r->headers); return ret == (int)sz ? 0 : 1; } +/* Adobe flash cross-domain request */ +void +http_crossdomain(struct http_client *c) { + + struct http_response resp; + char out[] = "\n" +"\n" +"\n" + "\n" +"\n"; + + http_response_init(&resp, 200, "OK"); + http_response_set_connection_header(c, &resp); + http_response_set_header(&resp, "Content-Type", "application/xml"); + http_response_set_body(&resp, out, sizeof(out)-1); + + http_response_write(&resp, c->fd); + http_client_reset(c); +} + +/* Simple error response */ +void +http_send_error(struct http_client *c, short code, const char *msg) { + + struct http_response resp; + http_response_init(&resp, code, msg); + http_response_set_connection_header(c, &resp); + http_response_set_body(&resp, NULL, 0); + + http_response_write(&resp, c->fd); + http_client_reset(c); +} + +void +http_response_set_connection_header(struct http_client *c, struct http_response *r) { + if(c->keep_alive) { + http_response_set_header(r, "Connection", "Keep-Alive"); + } else { + http_response_set_header(r, "Connection", "Close"); + } +} + +/* Response to HTTP OPTIONS */ +void +http_send_options(struct http_client *c) { + + struct http_response resp; + http_response_init(&resp, 200, "OK"); + http_response_set_connection_header(c, &resp); + + http_response_set_header(&resp, "Content-Type", "text/html"); + http_response_set_header(&resp, "Allow", "GET,POST,PUT,OPTIONS"); + http_response_set_header(&resp, "Content-Length", "0"); + + /* Cross-Origin Resource Sharing, CORS. */ + http_response_set_header(&resp, "Access-Control-Allow-Origin", "*"); + + http_response_write(&resp, c->fd); + http_client_reset(c); +} + +void +http_response_write_chunk(int fd, const char *p, size_t sz) { + + char buf[64]; + int ret, chunk_size; + + chunk_size = sprintf(buf, "%x\r\n", (int)sz); + ret = write(fd, buf, chunk_size); + ret = write(fd, p, sz); + ret = write(fd, "\r\n", 2); + (void)ret; +} + diff --git a/http.h b/http.h index 478daaf..7cb0202 100644 --- a/http.h +++ b/http.h @@ -3,16 +3,22 @@ #include -typedef struct { - char *s; - size_t sz; -} str_t; +struct http_client; + +struct http_header { + char *key; + size_t key_sz; + + char *val; + size_t val_sz; +}; + struct http_response { short code; const char *msg; - str_t *headers; + struct http_header *headers; int header_count; const char *body; @@ -21,9 +27,6 @@ struct http_response { int chunked; }; -void -http_set_header(str_t *h, const char *p, size_t sz); - /* HTTP response */ void @@ -38,5 +41,19 @@ http_response_set_body(struct http_response *r, const char *body, size_t body_le int http_response_write(struct http_response *r, int fd); +void +http_crossdomain(struct http_client *c); + +void +http_send_error(struct http_client *c, short code, const char *msg); + +void +http_send_options(struct http_client *c); + +void +http_response_set_connection_header(struct http_client *c, struct http_response *r); + +void +http_response_write_chunk(int fd, const char *p, size_t sz); #endif diff --git a/pool.c b/pool.c new file mode 100644 index 0000000..d69f3af --- /dev/null +++ b/pool.c @@ -0,0 +1,126 @@ +#include "pool.h" +#include "worker.h" +#include "conf.h" +#include "server.h" + +#include +#include +#include + +struct pool * +pool_new(struct worker *w, int count) { + + struct pool *p = calloc(1, sizeof(struct pool)); + + p->count = count; + p->ac = calloc(count, sizeof(redisAsyncContext*)); + + p->w = w; + p->cfg = w->s->cfg; + + return p; +} + +static void +pool_on_connect(const redisAsyncContext *c) { + struct pool *p = c->data; + int i = 0; + + printf("Connected to redis\n"); + if(!p) { + return; + } + + /* add to pool */ + for(i = 0; i < p->count; ++i) { + if(p->ac[i] == NULL) { + p->ac[i] = c; + return; + } + } +} + +static void +pool_on_disconnect(const redisAsyncContext *c, int status) { + + struct pool *p = c->data; + int i = 0; + if (status != REDIS_OK) { + fprintf(stderr, "Error: %s\n", c->errstr); + } + + if(p == NULL) { /* no need to clean anything here. */ + return; + } + + /* remove from the pool */ + for(i = 0; i < p->count; ++i) { + if(p->ac[i] == c) { + p->ac[i] = NULL; + break; + } + } + + /* reconnect */ + pool_connect(p, 1); +} + +/** + * Create new connection. + */ +redisAsyncContext * +pool_connect(struct pool *p, int attach) { + + struct redisAsyncContext *ac; + if(p->cfg->redis_host[0] == '/') { /* unix socket */ + ac = redisAsyncConnectUnix(p->cfg->redis_host); + } else { + ac = redisAsyncConnect(p->cfg->redis_host, p->cfg->redis_port); + } + + if(attach) { + ac->data = p; + } else { + ac->data = NULL; + } + + if(ac->err) { + /* + const char err[] = "Connection failed"; + slog(s, WEBDIS_ERROR, err, sizeof(err)-1); + */ + fprintf(stderr, "Error: %s\n", ac->errstr); + redisAsyncFree(ac); + return NULL; + } + + redisLibeventAttach(ac, p->w->base); + redisAsyncSetConnectCallback(ac, pool_on_connect); + redisAsyncSetDisconnectCallback(ac, pool_on_disconnect); + + if (p->cfg->redis_auth) { /* authenticate. */ + redisAsyncCommand(ac, NULL, NULL, "AUTH %s", p->cfg->redis_auth); + } + if (p->cfg->database) { /* change database. */ + redisAsyncCommand(ac, NULL, NULL, "SELECT %d", p->cfg->database); + } + return ac; +} + +const redisAsyncContext * +pool_get_context(struct pool *p) { + + int orig = p->cur++; + + do { + p->cur++; + p->cur %= p->count; + if(p->ac[p->cur] != NULL) { + return p->ac[p->cur]; + } + } while(p->cur != orig); + + return NULL; + +} + diff --git a/pool.h b/pool.h new file mode 100644 index 0000000..e38cc84 --- /dev/null +++ b/pool.h @@ -0,0 +1,30 @@ +#ifndef POOL_H +#define POOL_H + +#include + +struct conf; +struct worker; + +struct pool { + + struct worker *w; + struct conf *cfg; + + const redisAsyncContext **ac; + int count; + int cur; + +}; + + +struct pool * +pool_new(struct worker *w, int count); + +redisAsyncContext * +pool_connect(struct pool *p, int attach); + +const redisAsyncContext * +pool_get_context(struct pool *p); + +#endif diff --git a/server.c b/server.c index 26dae97..690e83e 100644 --- a/server.c +++ b/server.c @@ -1,14 +1,10 @@ #include "server.h" -#include "conf.h" -#include "cmd.h" -#include "slog.h" -#include "http.h" +#include "worker.h" #include "client.h" +#include "conf.h" -#include -#include -#include - +#include +#include #include #include #include @@ -20,7 +16,7 @@ /** * Sets up a non-blocking socket */ -int +static int socket_setup(const char *ip, short port) { int reuse = 1; @@ -75,167 +71,74 @@ socket_setup(const char *ip, short port) { } struct server * -server_new(const char *filename) { +server_new(const char *cfg_file) { + + int i; struct server *s = calloc(1, sizeof(struct server)); - s->cfg = conf_read(filename); - s->base = event_base_new(); - - return s; -} - -void -server_free(struct server *s) { - - /* cleanup Redis async object, _before_ the 2 struct event. */ - redisAsyncFree(s->ac); - - /* event_del(&s->ev); */ - event_del(&s->ev_reconnect); - free(s); -} - -static void -connectCallback(const redisAsyncContext *c) { - ((void)c); -} + s->cfg = conf_read(cfg_file); -static void -disconnectCallback(const redisAsyncContext *c, int status) { - struct server *s = c->data; - if (status != REDIS_OK) { - fprintf(stderr, "Error: %s\n", c->errstr); - } - s->ac = NULL; - - /* wait 100 msec and reconnect */ - s->tv_reconnect.tv_sec = 0; - s->tv_reconnect.tv_usec = 100*1000; - webdis_connect(s); -} - -static void -on_timer_reconnect(int fd, short event, void *ctx) { - - (void)fd; - (void)event; - struct server *s = ctx; - - if(s->cfg->redis_host[0] == '/') { /* unix socket */ - s->ac = redisAsyncConnectUnix(s->cfg->redis_host); - } else { - s->ac = redisAsyncConnect(s->cfg->redis_host, s->cfg->redis_port); - } - - s->ac->data = s; - - if(s->ac->err) { - const char err[] = "Connection failed"; - slog(s, WEBDIS_ERROR, err, sizeof(err)-1); - fprintf(stderr, "Error: %s\n", s->ac->errstr); - } - - redisLibeventAttach(s->ac, s->base); - redisAsyncSetConnectCallback(s->ac, connectCallback); - redisAsyncSetDisconnectCallback(s->ac, disconnectCallback); - - if (s->cfg->redis_auth) { /* authenticate. */ - redisAsyncCommand(s->ac, NULL, NULL, "AUTH %s", s->cfg->redis_auth); - } - if (s->cfg->database) { /* change database. */ - redisAsyncCommand(s->ac, NULL, NULL, "SELECT %d", s->cfg->database); + /* workers */ + s->w = calloc(s->cfg->http_threads, sizeof(struct worker*)); + for(i = 0; i < s->cfg->http_threads; ++i) { + s->w[i] = worker_new(s); } -} - -void -webdis_connect(struct server *s) { - /* schedule reconnect */ - evtimer_set(&s->ev_reconnect, on_timer_reconnect, s); - event_base_set(s->base, &s->ev_reconnect); - evtimer_add(&s->ev_reconnect, &s->tv_reconnect); -} - -struct server * -server_copy(const struct server *s) { - struct server *ret = calloc(1, sizeof(struct server)); - - *ret = *s; - - /* create a new connection */ - ret->ac = NULL; - on_timer_reconnect(0, 0, ret); - - return ret; + return s; } static void -on_possible_accept(int fd, short event, void *ctx) { +server_can_accept(int fd, short event, void *ptr) { - struct server *s = ctx; + struct server *s = ptr; + struct worker *w; + struct http_client *c; int client_fd; struct sockaddr_in addr; socklen_t addr_sz = sizeof(addr); (void)event; - struct http_client *c; - - client_fd = accept(fd, (struct sockaddr*)&addr, &addr_sz); - c = http_client_new(client_fd, s); - c->addr = addr.sin_addr.s_addr; - http_client_serve(c); -} -/* Taken from Redis. */ -void -server_daemonize(void) { - int fd; + w = s->w[s->next_worker]; /* select worker to send the client to */ - if (fork() != 0) exit(0); /* parent exits */ - setsid(); /* create a new session */ + /* create client and send to worker. */ + client_fd = accept(fd, (struct sockaddr*)&addr, &addr_sz); + /* printf("Just accepted fd=%d, sending to worker %p\n", client_fd, (void*)w); */ + c = http_client_new(w, client_fd, addr.sin_addr.s_addr); + worker_add_client(w, c); - /* Every output goes to /dev/null. */ - if ((fd = open("/dev/null", O_RDWR, 0)) != -1) { - dup2(fd, STDIN_FILENO); - dup2(fd, STDOUT_FILENO); - dup2(fd, STDERR_FILENO); - if (fd > STDERR_FILENO) close(fd); - } + s->next_worker = (s->next_worker + 1) % s->cfg->http_threads; /* loop over ring of workers */ } -void +int server_start(struct server *s) { - - if(s->cfg->daemonize) { - server_daemonize(); - } + int i; /* ignore sigpipe */ #ifdef SIGPIPE signal(SIGPIPE, SIG_IGN); #endif - /* start http server */ - slog(s, WEBDIS_INFO, "Starting HTTP Server", sizeof("Starting HTTP Server")-1); + /* start worker threads */ + for(i = 0; i < s->cfg->http_threads; ++i) { + worker_start(s->w[i]); + } + /* create socket */ s->fd = socket_setup(s->cfg->http_host, s->cfg->http_port); - /* check return value. */ - if(s->fd == -1) { - char err[] = "Failed to create socket."; - slog(s, WEBDIS_ERROR, err, sizeof(err)-1); - _exit(1); + if(s->fd < 0) { + return -1; } - event_set(&s->ev, s->fd, EV_READ | EV_PERSIST, on_possible_accept, s); - event_base_set(s->base, &s->ev); - event_add(&s->ev, NULL); - /* drop privileges */ - slog(s, WEBDIS_INFO, "Dropping Privileges", sizeof("Dropping Privileges")-1); - setuid(s->cfg->user); - setgid(s->cfg->group); + /* initialize libevent */ + s->base = event_base_new(); - /* attach hiredis to libevent base */ - webdis_connect(s); + /* start http server */ + event_set(&s->ev, s->fd, EV_READ | EV_PERSIST, server_can_accept, s); + event_base_set(s->base, &s->ev); + event_add(&s->ev, NULL); - /* loop */ event_base_dispatch(s->base); + + return 0; } + diff --git a/server.h b/server.h index 31e527a..e9d1377 100644 --- a/server.h +++ b/server.h @@ -1,42 +1,30 @@ #ifndef SERVER_H #define SERVER_H -#include -#include -#include #include +#include -struct server { +struct worker; +struct conf; - struct conf *cfg; - struct event_base *base; - redisAsyncContext *ac; +struct server { - /* server socket and event struct */ int fd; struct event ev; + struct event_base *base; - struct event ev_reconnect; - struct timeval tv_reconnect; -}; - -void -webdis_connect(struct server *s); - -struct server * -server_new(const char *filename); + struct conf *cfg; -void -server_free(struct server *s); + /* worker threads */ + struct worker **w; + int next_worker; +}; struct server * -server_copy(const struct server *s); +server_new(const char *cfg_file); -void +int server_start(struct server *s); -void -webdis_log(struct server *s, int level, const char *body); - #endif diff --git a/tests/Makefile b/tests/Makefile new file mode 100644 index 0000000..7cca7e1 --- /dev/null +++ b/tests/Makefile @@ -0,0 +1,15 @@ +OUT=websocket +CFLAGS=-O3 -Wall -Wextra +LDFLAGS=-levent -lpthread + +all: $(OUT) Makefile + +%: %.o Makefile + $(CC) $(LDFLAGS) -o $@ $< + +%.o: %.c Makefile + $(CC) -c $(CFLAGS) -o $@ $< + +clean: + rm -f *.o $(OUT) + diff --git a/tests/basic.py b/tests/basic.py new file mode 100755 index 0000000..0d127da --- /dev/null +++ b/tests/basic.py @@ -0,0 +1,181 @@ +#!/usr/bin/python +import urllib2, unittest, json +from functools import wraps +try: + import bson +except: + bson = None + + +host = '127.0.0.1' +port = 7379 + +class TestWebdis(unittest.TestCase): + + def wrap(self,url): + return 'http://%s:%d/%s' % (host, port, url) + + def query(self, url): + r = urllib2.Request(self.wrap(url)) + return urllib2.urlopen(r) + +class TestBasics(TestWebdis): + + def test_crossdomain(self): + f = self.query('crossdomain.xml') + self.assertTrue(f.headers.getheader('Content-Type') == 'application/xml') + self.assertTrue("allow-access-from domain" in f.read()) + + def test_options(self): + pass + # not sure if OPTIONS is supported by urllib2... + # f = self.query('') # TODO: call with OPTIONS. + # self.assertTrue(f.headers.getheader('Content-Type') == 'text/html') + # self.assertTrue(f.headers.getheader('Allow') == 'GET,POST,PUT,OPTIONS') + # self.assertTrue(f.headers.getheader('Content-Length') == '0') + # self.assertTrue(f.headers.getheader('Access-Control-Allow-Origin') == '*') + + +class TestJSON(TestWebdis): + + def test_set(self): + "success type (+OK)" + self.query('DEL/hello') + f = self.query('SET/hello/world') + self.assertTrue(f.headers.getheader('Content-Type') == 'application/json') + self.assertTrue(f.headers.getheader('ETag') == '"0db1124cf79ffeb80aff6d199d5822f8"') + self.assertTrue(f.read() == '{"SET":[true,"OK"]}') + + def test_get(self): + "string type" + self.query('SET/hello/world') + f = self.query('GET/hello') + self.assertTrue(f.headers.getheader('Content-Type') == 'application/json') + self.assertTrue(f.headers.getheader('ETag') == '"8cf38afc245b7a6a88696566483d1390"') + self.assertTrue(f.read() == '{"GET":"world"}') + + def test_incr(self): + "integer type" + self.query('DEL/hello') + f = self.query('INCR/hello') + self.assertTrue(f.headers.getheader('Content-Type') == 'application/json') + self.assertTrue(f.headers.getheader('ETag') == '"500e9bcdcbb1e98f25c1fbb880a96c99"') + self.assertTrue(f.read() == '{"INCR":1}') + + def test_list(self): + "list type" + self.query('DEL/hello') + self.query('RPUSH/hello/abc') + self.query('RPUSH/hello/def') + f = self.query('LRANGE/hello/0/-1') + self.assertTrue(f.headers.getheader('Content-Type') == 'application/json') + self.assertTrue(f.headers.getheader('ETag') == '"622e51f547a480bef7cf5452fb7782db"') + self.assertTrue(f.read() == '{"LRANGE":["abc","def"]}') + + def test_error(self): + "error return type" + f = self.query('UNKNOWN/COMMAND') + self.assertTrue(f.headers.getheader('Content-Type') == 'application/json') + try: + obj = json.loads(f.read()) + except: + self.assertTrue(False) + return + + self.assertTrue(len(obj) == 1) + self.assertTrue('UNKNOWN' in obj) + self.assertTrue(isinstance(obj['UNKNOWN'], list)) + self.assertTrue(obj['UNKNOWN'][0] == False) + self.assertTrue(isinstance(obj['UNKNOWN'][1], unicode)) + +class TestRaw(TestWebdis): + + def test_set(self): + "success type (+OK)" + self.query('DEL/hello') + f = self.query('SET/hello/world.raw') + self.assertTrue(f.headers.getheader('Content-Type') == 'binary/octet-stream') + self.assertTrue(f.read() == "+OK\r\n") + + def test_get(self): + "string type" + self.query('SET/hello/world') + f = self.query('GET/hello.raw') + self.assertTrue(f.read() == '$5\r\nworld\r\n') + + def test_incr(self): + "integer type" + self.query('DEL/hello') + f = self.query('INCR/hello.raw') + self.assertTrue(f.read() == ':1\r\n') + + def test_list(self): + "list type" + self.query('DEL/hello') + self.query('RPUSH/hello/abc') + self.query('RPUSH/hello/def') + f = self.query('LRANGE/hello/0/-1.raw') + self.assertTrue(f.read() == "*2\r\n$3\r\nabc\r\n$3\r\ndef\r\n") + + def test_error(self): + "error return type" + f = self.query('UNKNOWN/COMMAND.raw') + self.assertTrue(f.read().startswith("-ERR ")) + +def need_bson(fn): + def wrapper(self): + if bson: + fn(self) + return wrapper + +class TestBSon(TestWebdis): + + @need_bson + def test_set(self): + "success type (+OK)" + self.query('DEL/hello') + f = self.query('SET/hello/world.bson') + self.assertTrue(f.headers.getheader('Content-Type') == 'application/bson') + obj = bson.decode_all(f.read()) + self.assertTrue(obj == [{u'SET': [True, bson.Binary('OK', 0)]}]) + + @need_bson + def test_get(self): + "string type" + self.query('SET/hello/world') + f = self.query('GET/hello.bson') + obj = bson.decode_all(f.read()) + self.assertTrue(obj == [{u'GET': bson.Binary('world', 0)}]) + + @need_bson + def test_incr(self): + "integer type" + self.query('DEL/hello') + f = self.query('INCR/hello.bson') + obj = bson.decode_all(f.read()) + self.assertTrue(obj == [{u'INCR': 1L}]) + + @need_bson + def test_list(self): + "list type" + self.query('DEL/hello') + self.query('RPUSH/hello/abc') + self.query('RPUSH/hello/def') + f = self.query('LRANGE/hello/0/-1.bson') + obj = bson.decode_all(f.read()) + self.assertTrue(obj == [{u'LRANGE': [bson.Binary('abc', 0), bson.Binary('def', 0)]}]) + + @need_bson + def test_error(self): + "error return type" + f = self.query('UNKNOWN/COMMAND.bson') + obj = bson.decode_all(f.read()) + self.assertTrue(len(obj) == 1) + self.assertTrue(u'UNKNOWN' in obj[0]) + self.assertTrue(isinstance(obj[0], dict)) + self.assertTrue(isinstance(obj[0][u'UNKNOWN'], list)) + self.assertTrue(obj[0]['UNKNOWN'][0] == False) + self.assertTrue(isinstance(obj[0]['UNKNOWN'][1], bson.Binary)) + +if __name__ == '__main__': + unittest.main() diff --git a/tests/websocket.c b/tests/websocket.c new file mode 100644 index 0000000..b5fdc11 --- /dev/null +++ b/tests/websocket.c @@ -0,0 +1,334 @@ +/* http://tools.ietf.org/html/draft-hixie-thewebsocketprotocol-76 */ + +#include +#define _GNU_SOURCE +#include +#include +#include +#include +#include +#include + + +#include +#include + + +#include + +struct host_info { + char *host; + short port; +}; + +/* worker_thread, with counter of remaining messages */ +struct worker_thread { + struct host_info *hi; + struct event_base *base; + + int msg_target; + int msg_received; + int msg_sent; + int byte_count; + pthread_t thread; + + struct evbuffer *buffer; + int got_header; + + int verbose; + struct event ev_w; +}; + +void +process_message(struct worker_thread *wt, size_t sz) { + + // printf("process_message\n"); + if(wt->msg_received % 10000 == 0) { + printf("thread %u: %8d messages left (got %9d bytes so far).\n", + (unsigned int)wt->thread, + wt->msg_target - wt->msg_received, wt->byte_count); + } + wt->byte_count += sz; + + /* decrement read count, and stop receiving when we reach zero. */ + wt->msg_received++; + if(wt->msg_received == wt->msg_target) { + event_base_loopexit(wt->base, NULL); + } +} + +void +websocket_write(int fd, short event, void *ptr) { + int ret; + struct worker_thread *wt = ptr; + + if(event != EV_WRITE) { + return; + } + + char message[] = "\x00[\"SET\",\"key\",\"value\"]\xff\x00[\"GET\",\"key\"]\xff"; + ret = write(fd, message, sizeof(message)-1); + if(ret != sizeof(message)-1) { + fprintf(stderr, "write on %d failed: %s\n", fd, strerror(errno)); + close(fd); + } + + wt->msg_sent += 2; + if(wt->msg_sent < wt->msg_target) { + event_set(&wt->ev_w, fd, EV_WRITE, websocket_write, wt); + event_base_set(wt->base, &wt->ev_w); + ret = event_add(&wt->ev_w, NULL); + } +} + +static void +websocket_read(int fd, short event, void *ptr) { + char packet[2048], *pos; + int ret, success = 1; + + struct worker_thread *wt = ptr; + + if(event != EV_READ) { + return; + } + + /* read message */ + ret = read(fd, packet, sizeof(packet)); + pos = packet; + if(ret > 0) { + char *data, *last; + int sz, msg_sz; + + /* + printf("Received %d bytes: \n", ret); + write(1, packet, ret); + printf("\n"); + */ + + if(wt->got_header == 0) { /* first response */ + char *frame_start = strstr(packet, "MH"); /* end of the handshake */ + if(frame_start == NULL) { + return; /* not yet */ + } else { /* start monitoring possible writes */ + printf("start monitoring possible writes\n"); + evbuffer_add(wt->buffer, frame_start + 2, ret - (frame_start + 2 - packet)); + + wt->got_header = 1; + event_set(&wt->ev_w, fd, EV_WRITE, + websocket_write, wt); + event_base_set(wt->base, &wt->ev_w); + ret = event_add(&wt->ev_w, NULL); + } + } else { + /* we've had the header already, now bufffer data. */ + evbuffer_add(wt->buffer, packet, ret); + } + + while(1) { + data = (char*)EVBUFFER_DATA(wt->buffer); + sz = EVBUFFER_LENGTH(wt->buffer); + + if(sz == 0) { /* no data */ + break; + } + if(*data != 0) { /* missing frame start */ + success = 0; + break; + } + last = memchr(data, 0xff, sz); /* look for frame end */ + if(!last) { + /* no end of frame in sight. */ + break; + } + msg_sz = last - data - 1; + process_message(ptr, msg_sz); /* record packet */ + + /* drain including frame delimiters (+2 bytes) */ + evbuffer_drain(wt->buffer, msg_sz + 2); + } + } else { + printf("ret=%d\n", ret); + success = 0; + } + if(success == 0) { + shutdown(fd, SHUT_RDWR); + close(fd); + event_base_loopexit(wt->base, NULL); + } +} + +void* +worker_main(void *ptr) { + + char ws_template[] = "GET /.json HTTP/1.1\r\n" + "Host: %s:%d\r\n" + "Connection: Upgrade\r\n" + "Upgrade: WebSocket\r\n" + "Origin: http://%s:%d\r\n" + "Sec-WebSocket-Key1: 18x 6]8vM;54 *(5: { U1]8 z [ 8\r\n" + "Sec-WebSocket-Key2: 1_ tx7X d < nw 334J702) 7]o}` 0\r\n" + "\r\n" + "Tm[K T2u"; + + struct worker_thread *wt = ptr; + + int ret; + int fd; + struct sockaddr_in addr; + char *ws_handshake; + size_t ws_handshake_sz; + + /* connect socket */ + fd = socket(AF_INET, SOCK_STREAM, 0); + addr.sin_family = AF_INET; + addr.sin_port = htons(wt->hi->port); + memset(&(addr.sin_addr), 0, sizeof(addr.sin_addr)); + addr.sin_addr.s_addr = inet_addr(wt->hi->host); + + ret = connect(fd, (struct sockaddr*)&addr, sizeof(struct sockaddr)); + if(ret != 0) { + fprintf(stderr, "connect: ret=%d: %s\n", ret, strerror(errno)); + return NULL; + } + + /* initialize worker thread */ + wt->base = event_base_new(); + wt->buffer = evbuffer_new(); + wt->byte_count = 0; + wt->got_header = 0; + + /* send handshake */ + ws_handshake_sz = sizeof(ws_handshake) + + 2*strlen(wt->hi->host) + 500; + ws_handshake = calloc(ws_handshake_sz, 1); + ws_handshake_sz = (size_t)sprintf(ws_handshake, ws_template, + wt->hi->host, wt->hi->port, + wt->hi->host, wt->hi->port); + ret = write(fd, ws_handshake, ws_handshake_sz); + + struct event ev_r; + event_set(&ev_r, fd, EV_READ | EV_PERSIST, websocket_read, wt); + event_base_set(wt->base, &ev_r); + event_add(&ev_r, NULL); + + /* go! */ + event_base_dispatch(wt->base); + event_base_free(wt->base); + free(ws_handshake); + return NULL; +} + +void +usage(const char* argv0, char *host_default, short port_default, + int thread_count_default, int messages_default) { + + printf("Usage: %s [options]\n" + "Options are:\n" + "\t-h host\t\t(default = \"%s\")\n" + "\t-p port\t\t(default = %d)\n" + "\t-c threads\t(default = %d)\n" + "\t-n count\t(number of messages per thread, default = %d)\n" + "\t-v\t\t(verbose)\n", + argv0, host_default, (int)port_default, + thread_count_default, messages_default); +} + +int +main(int argc, char *argv[]) { + + struct timespec t0, t1; + + int messages_default = 100000; + int thread_count_default = 4; + short port_default = 7379; + char *host_default = "127.0.0.1"; + + int msg_target = messages_default; + int thread_count = thread_count_default; + int i, opt; + char *colon; + double total = 0, total_bytes = 0; + int verbose = 0; + + struct host_info hi = {host_default, port_default}; + + struct worker_thread *workers; + + /* getopt */ + while ((opt = getopt(argc, argv, "h:p:c:n:v")) != -1) { + switch (opt) { + case 'h': + colon = strchr(optarg, ':'); + if(!colon) { + size_t sz = strlen(optarg); + hi.host = calloc(1 + sz, 1); + strncpy(hi.host, optarg, sz); + } else { + hi.host = calloc(1+colon-optarg, 1); + strncpy(hi.host, optarg, colon-optarg); + hi.port = (short)atol(colon+1); + } + break; + + case 'p': + hi.port = (short)atol(optarg); + break; + + case 'c': + thread_count = atoi(optarg); + break; + + case 'n': + msg_target = atoi(optarg); + break; + + case 'v': + verbose = 1; + break; + default: /* '?' */ + usage(argv[0], host_default, port_default, + thread_count_default, + messages_default); + exit(EXIT_FAILURE); + } + } + + /* run threads */ + workers = calloc(sizeof(struct worker_thread), thread_count); + + clock_gettime(CLOCK_MONOTONIC, &t0); + for(i = 0; i < thread_count; ++i) { + workers[i].msg_target = msg_target; + workers[i].hi = &hi; + workers[i].verbose = verbose; + + pthread_create(&workers[i].thread, NULL, + worker_main, &workers[i]); + } + + /* wait for threads to finish */ + for(i = 0; i < thread_count; ++i) { + pthread_join(workers[i].thread, NULL); + total += workers[i].msg_received; + total_bytes += workers[i].byte_count; + } + + /* timing */ + clock_gettime(CLOCK_MONOTONIC, &t1); + float mili0 = t0.tv_sec * 1000 + t0.tv_nsec / 1000000; + float mili1 = t1.tv_sec * 1000 + t1.tv_nsec / 1000000; + + if(total != 0) { + printf("Read %ld messages in %0.2f sec: %0.2f msg/sec (%d MB/sec, %d KB/sec)\n", + (long)total, + (mili1-mili0)/1000.0, + 1000*total/(mili1-mili0), + (int)(total_bytes / (1000*(mili1-mili0))), + (int)(total_bytes / (mili1-mili0))); + } else { + printf("No message was read.\n"); + } + + return EXIT_SUCCESS; +} + diff --git a/webdis.c b/webdis.c index aecfa02..34e88e3 100644 --- a/webdis.c +++ b/webdis.c @@ -1,6 +1,7 @@ -#include #include "server.h" +#include + int main(int argc, char *argv[]) { diff --git a/webdis.json b/webdis.json index 9dcc98c..c74d485 100644 --- a/webdis.json +++ b/webdis.json @@ -6,6 +6,7 @@ "http_host": "0.0.0.0", "http_port": 7379, + "threads": 4, "daemonize": false, diff --git a/websocket.c b/websocket.c new file mode 100644 index 0000000..770903d --- /dev/null +++ b/websocket.c @@ -0,0 +1,234 @@ +#include "md5/md5.h" +#include "websocket.h" +#include "client.h" +#include "formats/json.h" +#include "cmd.h" +#include "worker.h" +#include "pool.h" + +#include +#include +#include +#include +#include + +static uint32_t +ws_read_key(const char *s) { + + uint32_t ret = 0, spaces = 0; + const char *p; + size_t sz; + + if(!s) { + return 0; + } + + sz = strlen(s); + + for(p = s; p < s+sz; ++p) { + if(*p >= '0' && *p <= '9') { + ret *= 10; + ret += (*p) - '0'; + } else if (*p == ' ') { + spaces++; + } + } + return htonl(ret / spaces); +} + +static int +ws_compute_handshake(struct http_client *c, unsigned char *out) { + + char buffer[16]; + md5_state_t ctx; + + // websocket handshake + uint32_t number_1 = ws_read_key(client_get_header(c, "Sec-WebSocket-Key1")); + uint32_t number_2 = ws_read_key(client_get_header(c, "Sec-WebSocket-Key2")); + + if(c->body_sz < 8) { /* we need at least 8 bytes */ + return -1; + } + + memcpy(buffer, &number_1, sizeof(uint32_t)); + memcpy(buffer + sizeof(uint32_t), &number_2, sizeof(uint32_t)); + memcpy(buffer + 2 * sizeof(uint32_t), c->body + c->body_sz - 8, 8); /* last 8 bytes */ + + md5_init(&ctx); + md5_append(&ctx, (const md5_byte_t *)buffer, sizeof(buffer)); + md5_finish(&ctx, out); + + return 0; +} + +int +ws_handshake_reply(struct http_client *c) { + + int ret; + unsigned char md5_handshake[16]; + char *buffer = NULL, *p; + const char *origin = NULL, *host = NULL; + size_t origin_sz = 0, host_sz = 0, sz; + + char template0[] = "HTTP/1.1 101 Websocket Protocol Handshake\r\n" + "Upgrade: WebSocket\r\n" + "Connection: Upgrade\r\n" + "Sec-WebSocket-Origin: "; /* %s */ + char template1[] = "\r\n" + "Sec-WebSocket-Location: ws://"; /* %s%s */ + char template2[] = "\r\n" + "Origin: http://"; /* %s */ + char template3[] = "\r\n\r\n"; + + if((origin = client_get_header(c, "Origin"))) { + origin_sz = strlen(origin); + } + if((host = client_get_header(c, "Host"))) { + host_sz = strlen(host); + } + + /* need those headers */ + if(!origin || !origin_sz || !host || !host_sz || !c->path || !c->path_sz) { + return -1; + } + + if(ws_compute_handshake(c, &md5_handshake[0]) != 0) { + printf("failed to compute handshake.\n"); + return -1; + } + + /* This code uses the WebSocket specification from May 23, 2010. + * The latest copy is available at http://www.whatwg.org/specs/web-socket-protocol/ + */ + + sz = sizeof(template0)-1 + origin_sz + + sizeof(template1)-1 + host_sz + c->path_sz + + sizeof(template2)-1 + host_sz + + sizeof(template3)-1 + sizeof(md5_handshake); + + p = buffer = malloc(sz); + + /* Concat all */ + + /* template0 */ + memcpy(p, template0, sizeof(template0)-1); + p += sizeof(template0)-1; + memcpy(p, origin, origin_sz); + p += origin_sz; + + /* template1 */ + memcpy(p, template1, sizeof(template1)-1); + p += sizeof(template1)-1; + memcpy(p, host, host_sz); + p += host_sz; + memcpy(p, c->path, c->path_sz); + p += c->path_sz; + + /* template2 */ + memcpy(p, template2, sizeof(template2)-1); + p += sizeof(template2)-1; + memcpy(p, host, host_sz); + p += host_sz; + + /* template3 */ + memcpy(p, template3, sizeof(template3)-1); + p += sizeof(template3)-1; + memcpy(p, &md5_handshake[0], sizeof(md5_handshake)); + + ret = write(c->fd, buffer, sz); + free(buffer); + + return 0; +} + + +static int +ws_execute(struct http_client *c, const char *frame, size_t frame_len) { + + struct cmd*(*fun_extract)(struct http_client *, const char *, size_t) = NULL; + + if(strncmp(c->path, "/.json", 6) == 0) { + fun_extract = json_ws_extract; + } + + if(fun_extract) { + struct cmd *cmd = fun_extract(c, frame, frame_len); + if(cmd) { + cmd->is_websocket = 1; + cmd->fd = c->fd; + + /* TODO: clean this mess */ + redisAsyncContext *ac = (redisAsyncContext*)pool_get_context(c->w->pool); + cmd_send(ac, json_reply, cmd); + return 0; + } + } + + return -1; +} + +/** + * Process some data just received on the socket. + */ +enum ws_read_action +ws_add_data(struct http_client *c) { + const char *frame_start, *frame_end; + char *tmp; + + while(1) { + /* look for frame start */ + if(!c->sz || c->buffer[0] != '\x00') { + /* printf("frame start fail\n"); */ + return WS_READ_FAIL; + } + + /* look for frame end */ + int ret; + size_t frame_len; + frame_start = c->buffer; + frame_end = memchr(frame_start, '\xff', c->sz); + if(frame_end == NULL) { + /* continue reading */ + return WS_READ_MORE; + } + + /* parse and execute frame. */ + frame_len = frame_end - frame_start - 1; + + ret = ws_execute(c, frame_start + 1, frame_len); + if(ret != 0) { + return WS_READ_FAIL; + } + + /* remove frame from buffer */ + c->sz -= (2 + frame_len); + tmp = malloc(c->sz); + memcpy(tmp, c->buffer + 2 + frame_len, c->sz); + free(c->buffer); + c->buffer = tmp; + } +} + +int +ws_reply(struct cmd *cmd, const char *p, size_t sz) { + + int ret; + char *buffer = malloc(sz + 2); + + /* create frame */ + buffer[0] = '\x00'; + memcpy(buffer + 1, p, sz); + buffer[sz + 1] = '\xff'; + + /* send WS frame */ + ret = write(cmd->fd, buffer, sz+2); + free(buffer); + + if(ret == (int)sz + 2) { + /* http_client_serve(c); */ + return 0; + } + /* printf("WRITE FAIL on fd=%d, ret=%d (%s)\n", c->fd, ret, strerror(errno)); */ + + return -1; +} diff --git a/websocket.h b/websocket.h new file mode 100644 index 0000000..11c98a2 --- /dev/null +++ b/websocket.h @@ -0,0 +1,23 @@ +#ifndef WEBSOCKET_H +#define WEBSOCKET_H + +#include + +struct http_client; +struct cmd; + +enum ws_read_action { + WS_READ_FAIL, + WS_READ_MORE, + WS_READ_EXEC}; + +int +ws_handshake_reply(struct http_client *c); + +enum ws_read_action +ws_add_data(struct http_client *c); + +int +ws_reply(struct cmd *cmd, const char *p, size_t sz); + +#endif diff --git a/worker.c b/worker.c new file mode 100644 index 0000000..fe7c748 --- /dev/null +++ b/worker.c @@ -0,0 +1,196 @@ +#include "worker.h" +#include "client.h" +#include "http.h" +#include "cmd.h" +#include "pool.h" +#include "slog.h" +#include "websocket.h" + +#include +#include +#include +#include +#include + + +struct worker * +worker_new(struct server *s) { + + struct worker *w = calloc(1, sizeof(struct worker)); + w->s = s; + + /* setup communication link */ + pipe(w->link); + + /* Redis connection pool */ + w->pool = pool_new(w, 8); /* FIXME: change the number? use conf? */ + + return w; + +} + +void +worker_can_read(int fd, short event, void *p) { + + struct http_client *c = p; + int ret, nparsed; + + (void)fd; + (void)event; + + ret = http_client_read(c); + if(ret <= 0) { + printf("client disconnected\n"); + return; + } + + if(c->is_websocket) { + /* printf("Got websocket data! (%d bytes)\n", ret); */ + ws_add_data(c); + } else { + /* run parser */ + nparsed = http_client_execute(c); + + if(c->is_websocket) { + /* we need to use the remaining (unparsed) data as the body. */ + if(nparsed < ret) { + http_client_set_body(c, c->buffer + nparsed + 1, c->sz - nparsed - 1); + ws_handshake_reply(c); + } else { + c->broken = 1; + } + free(c->buffer); + c->buffer = NULL; + c->sz = 0; + } + } + + if(c->broken) { /* terminate client */ + /* printf("terminate client\n"); */ + http_client_free(c); + } else { + /* printf("start monitoring input again.\n"); */ + worker_monitor_input(c); + } +} + +void +worker_monitor_input(struct http_client *c) { + + event_set(&c->ev, c->fd, EV_READ, worker_can_read, c); + event_base_set(c->w->base, &c->ev); + event_add(&c->ev, NULL); +} + +static void +worker_can_accept(int pipefd, short event, void *ptr) { + + struct http_client *c; + unsigned long addr; + + (void)event; + (void)ptr; + + int ret = read(pipefd, &addr, sizeof(addr)); + if(ret == sizeof(addr)) { + c = (struct http_client*)addr; + /* create client, monitor fd for input */ + worker_monitor_input(c); + } +} + +static void +worker_pool_connect(struct worker *w) { + + int i; + /* create connections */ + for(i = 0; i < w->pool->count; ++i) { + pool_connect(w->pool, 1); + } + +} + +static void* +worker_main(void *p) { + + struct worker *w = p; + struct event ev; + + /* setup libevent */ + w->base = event_base_new(); + + /* monitor pipe link */ + event_set(&ev, w->link[0], EV_READ | EV_PERSIST, worker_can_accept, w); + event_base_set(w->base, &ev); + event_add(&ev, NULL); + + /* connect to Redis */ + worker_pool_connect(w); + + /* loop */ + event_base_dispatch(w->base); + + return NULL; +} + +void +worker_start(struct worker *w) { + + pthread_create(&w->thread, NULL, worker_main, w); +} + +/* queue new client to process */ +void +worker_add_client(struct worker *w, struct http_client *c) { + + /* write into pipe link */ + unsigned long addr = (unsigned long)c; + int ret = write(w->link[1], &addr, sizeof(addr)); + (void)ret; + /* printf("[for worker %p] write: %lu, c=%p (ret=%d)\n", (void*)w, addr, (void*)c, ret); */ +} + +/* Called when a client has finished reading input and is ready to be executed. */ +void +worker_process_client(struct http_client *c) { + + /* printf("worker_process_client\n"); */ + /* check that the command can be executed */ + struct worker *w = c->w; + int ret = -1; + switch(c->parser.method) { + case HTTP_GET: + if(c->path_sz == 16 && memcmp(c->path, "/crossdomain.xml", 16) == 0) { + http_crossdomain(c); + return; + } + slog(w->s, WEBDIS_DEBUG, c->path, c->path_sz); + ret = cmd_run(c->w, c, 1+c->path, c->path_sz-1, NULL, 0); + break; + + case HTTP_POST: + slog(w->s, WEBDIS_DEBUG, c->path, c->path_sz); + ret = cmd_run(c->w, c, c->body, c->body_sz, NULL, 0); + break; + + case HTTP_PUT: + slog(w->s, WEBDIS_DEBUG, c->path, c->path_sz); + ret = cmd_run(c->w, c, 1+c->path, c->path_sz-1, + c->body, c->body_sz); + break; + + case HTTP_OPTIONS: + http_send_options(c); + + default: + slog(w->s, WEBDIS_DEBUG, "405", 3); + http_send_error(c, 405, "Method Not Allowed"); + return; + } + + if(ret < 0) { + http_send_error(c, 403, "Forbidden"); + } + +} + diff --git a/worker.h b/worker.h new file mode 100644 index 0000000..b4e9cde --- /dev/null +++ b/worker.h @@ -0,0 +1,41 @@ +#ifndef WORKER_H +#define WORKER_H + +#include + +struct http_client; +struct pool; + +struct worker { + + /* self */ + pthread_t thread; + struct event_base *base; + + /* connection dispatcher */ + struct server *s; + int link[2]; + + /* Redis connection pool */ + struct pool *pool; +}; + +struct worker * +worker_new(struct server *s); + +void +worker_start(struct worker *w); + +void +worker_add_client(struct worker *w, struct http_client *c); + +void +worker_monitor_input(struct http_client *c); + +void +worker_can_read(int fd, short event, void *p); + +void +worker_process_client(struct http_client *c); + +#endif