Partial rewrite, adding WebSockets, threads, pool.

master
Nicolas Favre-Felix 14 years ago
parent dda2236e4b
commit 5b7aa50e62

@ -4,10 +4,10 @@ JANSSON_OBJ=jansson/src/dump.o jansson/src/error.o jansson/src/hashtable.o janss
FORMAT_OBJS=formats/json.o formats/raw.o formats/common.o formats/custom-type.o formats/bson.o
HTTP_PARSER_OBJS=http-parser/http_parser.o
DEPS=$(FORMAT_OBJS) $(HIREDIS_OBJ) $(JANSSON_OBJ) $(HTTP_PARSER_OBJS)
OBJS=webdis.o conf.o cmd.o slog.o server.o libb64/cencode.o acl.o md5/md5.o http.o client.o $(DEPS)
OBJS=webdis.o cmd.o worker.o slog.o server.o libb64/cencode.o acl.o md5/md5.o http.o client.o websocket.o pool.o conf.o $(DEPS)
CFLAGS=-O3 -Wall -Wextra -I. -Ijansson/src -Ihttp-parser
LDFLAGS=-levent
LDFLAGS=-levent -pthread
all: $(OUT) Makefile

@ -20,11 +20,12 @@ curl -d "GET/hello" http://127.0.0.1:7379/
</pre>
# Features
* Multi-threaded server.
* GET and POST are supported.
* JSON output by default, optional JSONP parameter (`?jsonp=myFunction`).
* Raw Redis 2.0 protocol output with `.raw` suffix
* BSON support for compact responses and MongoDB compatibility.
* HTTP 1.1 pipelining (50,000 http requests per second on a desktop Linux machine.)
* HTTP 1.1 pipelining (70,000 http requests per second on a desktop Linux machine.)
* Connects to Redis using a TCP or UNIX socket.
* Restricted commands by IP range (CIDR subnet + mask) or HTTP Basic Auth, returning 403 errors.
* Possible Redis authentication in the config file.

@ -14,7 +14,7 @@ acl_match_client(struct acl *a, struct http_client *client, in_addr_t *ip) {
/* check HTTP Basic Auth */
const char *auth;
auth = client->input_headers.authorization.s;
auth = client_get_header(client, "Authorization");
if(a->http_basic_auth) {
if(auth && strncasecmp(auth, "Basic ", 6) == 0) { /* sent auth */
if(strcmp(auth + 6, a->http_basic_auth) != 0) { /* bad password */
@ -39,6 +39,7 @@ acl_match_client(struct acl *a, struct http_client *client, in_addr_t *ip) {
int
acl_allow_command(struct cmd *cmd, struct conf *cfg, struct http_client *client) {
/* FIXME */
char *always_off[] = {"MULTI", "EXEC", "WATCH", "DISCARD"};
unsigned int i;

@ -1,257 +1,104 @@
#include "client.h"
#include "server.h"
#include "cmd.h"
#include "http_parser.h"
#include "http.h"
#include "slog.h"
#include "server.h"
#include "worker.h"
#include "websocket.h"
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <unistd.h>
#include "hiredis/async.h"
struct http_client *
http_client_new(int fd, struct server *s) {
struct http_client *c = calloc(1, sizeof(struct http_client));
c->fd = fd;
c->s = s;
/* initialize HTTP parser */
c->settings.on_path = http_on_path;
c->settings.on_body = http_on_body;
c->settings.on_message_complete = http_on_complete;
c->settings.on_header_field = http_on_header_name;
c->settings.on_header_value = http_on_header_value;
c->settings.on_query_string = http_on_query_string;
http_parser_init(&c->parser, HTTP_REQUEST);
c->parser.data = c;
c->state = CLIENT_WAITING;
return c;
}
/**
* Called by libevent when read(2) is possible on fd without blocking.
*/
void
http_client_read(int fd, short event, void *ctx) {
struct http_client *c = ctx;
char buffer[64*1024];
int ret, nparsed;
(void)fd;
(void)event;
if(c->state != CLIENT_WAITING) { /* not expecting anything, fail. */
http_client_free(c);
return;
}
ret = read(c->fd, buffer, sizeof(buffer));
if(ret <= 0) { /* broken connection, bye */
http_client_free(c);
return;
}
nparsed = http_parser_execute(&c->parser, &c->settings, buffer, ret);
if(c->state == CLIENT_BROKEN) {
http_client_free(c);
return;
}
if(c->parser.upgrade) {
/* TODO: upgrade parser (WebSockets & cie) */
} else if(nparsed != ret) { /* invalid data */
http_client_free(c);
} else if(c->state == CLIENT_WAITING) {
http_client_serve(c);
}
}
static void
http_client_cleanup(struct http_client *c) {
if(c->sub) {
return; /* we need to keep those. */
}
free(c->path.s);
memset(&c->path, 0, sizeof(str_t));
free(c->body.s);
memset(&c->body, 0, sizeof(str_t));
free(c->input_headers.connection.s);
memset(&c->input_headers.connection, 0, sizeof(str_t));
free(c->input_headers.if_none_match.s);
memset(&c->input_headers.if_none_match, 0, sizeof(str_t));
free(c->input_headers.authorization.s);
memset(&c->input_headers.authorization, 0, sizeof(str_t));
free(c->output_headers.content_type.s);
memset(&c->output_headers.content_type, 0, sizeof(str_t));
free(c->output_headers.etag.s);
memset(&c->output_headers.etag, 0, sizeof(str_t));
free(c->query_string.type.s);
memset(&c->query_string.type, 0, sizeof(str_t));
static int
http_client_on_url(struct http_parser *p, const char *at, size_t sz) {
free(c->query_string.jsonp.s);
memset(&c->query_string.jsonp, 0, sizeof(str_t));
struct http_client *c = p->data;
memset(&c->verb, 0, sizeof(c->verb));
c->path = realloc(c->path, c->path_sz + sz + 1);
memcpy(c->path + c->path_sz, at, sz);
c->path_sz += sz;
c->path[c->path_sz] = 0;
c->state = CLIENT_WAITING;
c->started_responding = 0;
return 0;
}
void
http_client_free(struct http_client *c) {
event_del(&c->ev);
if(c->fd != -1) {
close(c->fd);
}
if(c->sub) {
/* clean up Redis connection */
c->sub->s->ac->onDisconnect = NULL;
server_free(c->sub->s);
/* clean up command object */
if(c->sub->cmd) {
cmd_free(c->sub->cmd);
}
free(c->sub);
c->sub = NULL;
}
static int
http_client_on_body(struct http_parser *p, const char *at, size_t sz) {
http_client_cleanup(c);
free(c);
struct http_client *c = p->data;
return http_client_set_body(c, at, sz);
}
int
http_client_keep_alive(struct http_client *c) {
/* check disconnection */
int keep_alive = 0;
http_client_set_body(struct http_client *c, const char *at, size_t sz) {
if(c->parser.http_major == 1 && c->parser.http_minor == 1) {
keep_alive = 1; /* HTTP 1.1: keep-alive by default */
}
if(c->input_headers.connection.s) {
if(strncasecmp(c->input_headers.connection.s, "Keep-Alive", 10) == 0) {
keep_alive = 1;
} else if(strncasecmp(c->input_headers.connection.s, "Close", 5) == 0) {
keep_alive = 0;
}
}
return keep_alive;
}
void
http_client_reset(struct http_client *c) {
if(!http_client_keep_alive(c) && !c->sub) {
c->state = CLIENT_BROKEN;
return;
}
http_client_cleanup(c);
http_parser_init(&c->parser, HTTP_REQUEST);
}
c->body = realloc(c->body, c->body_sz + sz + 1);
memcpy(c->body + c->body_sz, at, sz);
c->body_sz += sz;
c->body[c->body_sz] = 0;
/**
* (Re-)add read event callback
*/
void
http_client_serve(struct http_client *c) {
event_set(&c->ev, c->fd, EV_READ, http_client_read, c);
event_base_set(c->s->base, &c->ev);
event_add(&c->ev, NULL);
return 0;
}
/**** Parser callbacks ****/
/**
* Called when the path has been found. This is before any `?query-string'.
*/
int
http_on_path(http_parser *p, const char *at, size_t length) {
static int
http_client_on_header_name(struct http_parser *p, const char *at, size_t sz) {
struct http_client *c = p->data;
size_t n = c->header_count;
c->path.s = calloc(length+1, 1);
memcpy(c->path.s, at, length);
c->path.sz = length;
/* save HTTP verb as well */
c->verb = (enum http_method)p->method;
return 0;
}
if(c->last_cb != LAST_CB_KEY) {
n = ++c->header_count;
c->headers = realloc(c->headers, n * sizeof(struct http_header));
memset(&c->headers[n-1], 0, sizeof(struct http_header));
}
/**
* Called when the whole body has been read.
*/
int
http_on_body(http_parser *p, const char *at, size_t length) {
struct http_client *c = p->data;
c->headers[n-1].key = realloc(c->headers[n-1].key,
c->headers[n-1].key_sz + sz + 1);
memcpy(c->headers[n-1].key + c->headers[n-1].key_sz, at, sz);
c->headers[n-1].key_sz += sz;
c->headers[n-1].key[c->headers[n-1].key_sz] = 0;
c->body.s = realloc(c->body.s, c->body.sz + length);
memcpy(c->body.s + c->body.sz, at, length);
c->body.sz += length;
c->last_cb = LAST_CB_KEY;
return 0;
}
/**
* Called when the query string has been completely read.
*/
int
http_on_query_string(http_parser *parser, const char *at, size_t length) {
static int
http_client_on_query_string(struct http_parser *parser, const char *at, size_t sz) {
struct http_client *c = parser->data;
const char *p = at;
while(p < at + length) {
while(p < at + sz) {
const char *key = p, *val;
int key_len, val_len;
char *eq = memchr(key, '=', length - (p-at));
if(!eq || eq > at + length) { /* last argument */
char *eq = memchr(key, '=', sz - (p-at));
if(!eq || eq > at + sz) { /* last argument */
break;
} else { /* found an '=' */
char *and;
char *amp;
val = eq + 1;
key_len = eq - key;
p = eq + 1;
and = memchr(p, '&', length - (p-at));
if(!and || and > at + length) {
val_len = at + length - p; /* last arg */
amp = memchr(p, '&', sz - (p-at));
if(!amp || amp > at + sz) {
val_len = at + sz - p; /* last arg */
} else {
val_len = and - val; /* cur arg */
p = and + 1;
val_len = amp - val; /* cur arg */
p = amp + 1;
}
if(key_len == 4 && strncmp(key, "type", 4) == 0) {
http_set_header(&c->query_string.type, val, val_len);
c->type = calloc(1 + val_len, 1);
memcpy(c->type, val, val_len);
} else if(key_len == 5 && strncmp(key, "jsonp", 5) == 0) {
http_set_header(&c->query_string.jsonp, val, val_len);
c->jsonp = calloc(1 + val_len, 1);
memcpy(c->jsonp, val, val_len);
}
if(!and) {
if(!amp) {
break;
}
}
@ -259,234 +106,187 @@ http_on_query_string(http_parser *parser, const char *at, size_t length) {
return 0;
}
/**
* Called when the whole request has been parsed.
*/
int
http_on_complete(http_parser *p) {
struct http_client *c = p->data;
int ret = -1;
static int
http_client_on_header_value(struct http_parser *p, const char *at, size_t sz) {
/* check that the command can be executed */
switch(c->verb) {
case HTTP_GET:
if(c->path.sz == 16 && memcmp(c->path.s, "/crossdomain.xml", 16) == 0) {
return http_crossdomain(c);
}
slog(c->s, WEBDIS_DEBUG, c->path.s, c->path.sz);
ret = cmd_run(c->s, c, 1+c->path.s, c->path.sz-1, NULL, 0);
break;
case HTTP_POST:
slog(c->s, WEBDIS_DEBUG, c->path.s, c->path.sz);
ret = cmd_run(c->s, c, 1+c->body.s, c->body.sz-1, NULL, 0);
break;
struct http_client *c = p->data;
size_t n = c->header_count;
case HTTP_PUT:
slog(c->s, WEBDIS_DEBUG, c->path.s, c->path.sz);
ret = cmd_run(c->s, c, 1+c->path.s, c->path.sz-1,
c->body.s, c->body.sz);
break;
c->headers[n-1].val = realloc(c->headers[n-1].val,
c->headers[n-1].val_sz + sz + 1);
memcpy(c->headers[n-1].val + c->headers[n-1].val_sz, at, sz);
c->headers[n-1].val_sz += sz;
c->headers[n-1].val[c->headers[n-1].val_sz] = 0;
case HTTP_OPTIONS:
return http_options(c);
c->last_cb = LAST_CB_VAL;
default:
slog(c->s, WEBDIS_DEBUG, "405", 3);
http_send_error(c, 405, "Method Not Allowed");
return 0;
}
if(ret < 0) {
c->state = CLIENT_WAITING;
if(c->cmd) {
cmd_free(c->cmd);
c->cmd = NULL;
if(strncmp("Expect", c->headers[n-1].key, c->headers[n-1].key_sz) == 0) {
if(sz == 12 && strncasecmp(at, "100-continue", sz) == 0) {
/* support HTTP file upload */
char http100[] = "HTTP/1.1 100 Continue\r\n\r\n";
int ret = write(c->fd, http100, sizeof(http100)-1);
(void)ret;
}
} else if(strncasecmp("Connection", c->headers[n-1].key, c->headers[n-1].key_sz) == 0) {
if(sz == 10 && strncasecmp(at, "Keep-Alive", sz) == 0) {
c->keep_alive = 1;
}
http_send_error(c, 403, "Forbidden");
}
return ret;
}
/**
* Called when a header name is read
*/
int
http_on_header_name(http_parser *p, const char *at, size_t length) {
struct http_client *c = p->data;
c->last_header_name.s = calloc(length+1, 1);
memcpy(c->last_header_name.s, at, length);
c->last_header_name.sz = length;
return 0;
}
/**
* Called when a header value is read
*/
int
http_on_header_value(http_parser *p, const char *at, size_t length) {
static int
http_client_on_message_complete(struct http_parser *p) {
struct http_client *c = p->data;
if(strncmp("Connection", c->last_header_name.s, c->last_header_name.sz) == 0) {
http_set_header(&c->input_headers.connection, at, length);
} else if(strncmp("If-None-Match", c->last_header_name.s, c->last_header_name.sz) == 0) {
http_set_header(&c->input_headers.if_none_match, at, length);
} else if(strncmp("Authorization", c->last_header_name.s, c->last_header_name.sz) == 0) {
http_set_header(&c->input_headers.authorization, at, length);
} else if(strncmp("Expect", c->last_header_name.s, c->last_header_name.sz) == 0) {
if(length == 12 && memcmp(at, "100-continue", length) == 0) {
/* support HTTP file upload */
char http100[] = "HTTP/1.1 100 Continue\r\n\r\n";
int ret = write(c->fd, http100, sizeof(http100)-1);
if(ret != sizeof(http100)-1) {
c->state = CLIENT_BROKEN;
}
}
/* keep-alive detection */
if(c->parser.http_major == 1 && c->parser.http_minor == 1) { /* 1.1 */
c->keep_alive = 1;
}
free(c->last_header_name.s);
c->last_header_name.s = NULL;
if(p->upgrade) { /* WebSocket, don't execute just yet */
c->is_websocket = 1;
return 0;
}
worker_process_client(c);
http_client_reset(c);
return 0;
}
struct http_client *
http_client_new(struct worker *w, int fd, in_addr_t addr) {
struct http_client *c = calloc(1, sizeof(struct http_client));
c->fd = fd;
c->w = w;
c->addr = addr;
c->s = w->s;
/**** HTTP Responses ****/
/* parser */
http_parser_init(&c->parser, HTTP_REQUEST);
c->parser.data = c;
static void
http_response_set_connection_header(struct http_client *c, struct http_response *r) {
/* callbacks */
c->settings.on_url = http_client_on_url;
c->settings.on_query_string = http_client_on_query_string;
c->settings.on_body = http_client_on_body;
c->settings.on_message_complete = http_client_on_message_complete;
c->settings.on_header_field = http_client_on_header_name;
c->settings.on_header_value = http_client_on_header_value;
if(http_client_keep_alive(c)) {
http_response_set_header(r, "Connection", "Keep-Alive");
} else {
http_response_set_header(r, "Connection", "Close");
}
}
void
http_send_reply(struct http_client *c, short code, const char *msg,
const char *body, size_t body_len) {
c->last_cb = LAST_CB_NONE;
struct http_response resp;
const char *ct = c->output_headers.content_type.s;
if(!ct) {
ct = "text/html";
}
return c;
}
/* respond */
http_response_init(&resp, code, msg);
http_response_set_connection_header(c, &resp);
if(body_len) {
http_response_set_header(&resp, "Content-Type", ct);
}
void
http_client_reset(struct http_client *c) {
if(c->sub) {
http_response_set_header(&resp, "Transfer-Encoding", "chunked");
}
int i;
if(code == 200 && c->output_headers.etag.s) {
http_response_set_header(&resp, "ETag", c->output_headers.etag.s);
/* headers */
for(i = 0; i < c->header_count; ++i) {
free(c->headers[i].key);
free(c->headers[i].val);
}
http_response_set_body(&resp, body, body_len);
/* flush response in the socket */
if(http_response_write(&resp, c->fd) || !http_client_keep_alive(c)) { /* failure */
if(c->state == CLIENT_EXECUTING) {
http_client_free(c);
return;
} else if(c->state == CLIENT_WAITING) {
c->state = CLIENT_BROKEN;
}
} else {
http_client_reset(c);
free(c->headers);
c->headers = NULL;
c->header_count = 0;
/* other data */
free(c->body); c->body = NULL;
c->body_sz = 0;
free(c->path); c->path = NULL;
c->path_sz = 0;
free(c->type); c->type = NULL;
free(c->jsonp); c->jsonp = NULL;
/* no last known header callback */
c->last_cb = LAST_CB_NONE;
/* mark as broken if client doesn't support Keep-Alive. */
if(c->keep_alive == 0) {
c->broken = 1;
}
http_client_serve(c);
}
void
http_send_error(struct http_client *c, short code, const char *msg) {
http_client_free(struct http_client *c) {
http_send_reply(c, code, msg, NULL, 0);
http_client_cleanup(c);
}
/* printf("client free: %p\n", (void*)c); */
/* Transfer-encoding: chunked */
void
http_send_reply_start(struct http_client *c, short code, const char *msg) {
http_client_reset(c);
free(c->buffer);
/* close(c->fd); */
http_send_reply(c, code, msg, NULL, 0);
free(c);
}
void
http_send_reply_chunk(struct http_client *c, const char *p, size_t sz) {
int
http_client_read(struct http_client *c) {
char buf[64];
int ret, chunk_size;
char buffer[4096];
int ret;
chunk_size = sprintf(buf, "%x\r\n", (int)sz);
ret = write(c->fd, buf, chunk_size);
ret = write(c->fd, p, sz);
ret = write(c->fd, "\r\n", 2);
if(ret != 2) {
c->state = CLIENT_BROKEN;
ret = read(c->fd, buffer, sizeof(buffer));
if(ret <= 0) {
/* printf("Broken read on c=%p, fd=%d.\n", (void*)c, c->fd); */
close(c->fd);
http_client_free(c);
return -1;
}
}
/* send nil chunk to mark the end of a stream. */
void
http_send_reply_end(struct http_client *c) {
c->buffer = realloc(c->buffer, c->sz + ret);
memcpy(c->buffer + c->sz, buffer, ret);
c->sz += ret;
#if 0
printf("read %d bytes\n", ret);
write(1, "\n[", 2);
write(1, buffer, ret);
write(1, "]\n", 2);
#endif
http_send_reply_chunk(c, "", 0);
return ret;
}
/* Adobe flash cross-domain request */
int
http_crossdomain(struct http_client *c) {
struct http_response resp;
char out[] = "<?xml version=\"1.0\"?>\n"
"<!DOCTYPE cross-domain-policy SYSTEM \"http://www.macromedia.com/xml/dtds/cross-domain-policy.dtd\">\n"
"<cross-domain-policy>\n"
"<allow-access-from domain=\"*\" />\n"
"</cross-domain-policy>\n";
http_response_init(&resp, 200, "OK");
http_response_set_connection_header(c, &resp);
http_response_set_header(&resp, "Content-Type", "application/xml");
http_response_set_body(&resp, out, sizeof(out)-1);
http_client_execute(struct http_client *c) {
http_response_write(&resp, c->fd);
http_client_reset(c);
int nparsed = http_parser_execute(&c->parser, &c->settings, c->buffer, c->sz);
/* printf("nparsed=%d\n", nparsed); */
return 0;
if(!c->is_websocket) {
/* removed consumed data */
free(c->buffer);
c->buffer = NULL;
c->sz = 0;
}
return nparsed;
}
/* reply to OPTIONS HTTP verb */
int
http_options(struct http_client *c) {
struct http_response resp;
const char *
client_get_header(struct http_client *c, const char *key) {
http_response_init(&resp, 200, "OK");
http_response_set_connection_header(c, &resp);
int i;
size_t sz = strlen(key);
http_response_set_header(&resp, "Content-Type", "text/html");
http_response_set_header(&resp, "Allow", "GET,POST,PUT,OPTIONS");
http_response_set_header(&resp, "Content-Length", "0");
for(i = 0; i < c->header_count; ++i) {
/* Cross-Origin Resource Sharing, CORS. */
http_response_set_header(&resp, "Access-Control-Allow-Origin", "*");
if(sz == c->headers[i].key_sz &&
strncasecmp(key, c->headers[i].key, sz) == 0) {
return c->headers[i].val;
}
http_response_write(&resp, c->fd);
http_client_reset(c);
}
return 0;
return NULL;
}

@ -3,127 +3,73 @@
#include <event.h>
#include <arpa/inet.h>
#include "http-parser/http_parser.h"
#include "http.h"
#include "http_parser.h"
struct http_header;
struct server;
struct cmd;
typedef enum {
CLIENT_WAITING,
CLIENT_EXECUTING,
CLIENT_BROKEN} client_state;
LAST_CB_NONE = 0,
LAST_CB_KEY = 1,
LAST_CB_VAL = 2} last_cb_t;
struct http_client {
/* socket and server reference */
int fd;
in_addr_t addr;
struct event ev;
struct worker *w;
struct server *s;
client_state state;
struct cmd *cmd;
/* http parser */
http_parser_settings settings;
http_parser parser;
/* decoded http */
enum http_method verb;
str_t path;
str_t body;
/* input headers from client */
struct {
str_t connection;
str_t if_none_match;
str_t authorization;
} input_headers;
/* response headers */
struct input_headers {
str_t content_type;
str_t etag;
} output_headers;
/* query string */
struct {
str_t type;
str_t jsonp;
} query_string;
/* pub/sub */
struct subscription *sub;
int started_responding;
struct http_response resp;
/* private, used in HTTP parser */
str_t last_header_name;
};
struct http_client *
http_client_new(int fd, struct server *s);
/* HTTP parsing */
struct http_parser parser;
struct http_parser_settings settings;
char *buffer;
size_t sz;
last_cb_t last_cb;
void
http_client_serve(struct http_client *c);
/* various flags. TODO: bit map */
int keep_alive;
int broken;
int is_websocket;
void
http_client_free(struct http_client *c);
/* HTTP data */
char *path;
size_t path_sz;
void
http_client_reset(struct http_client *c);
/* headers */
struct http_header *headers;
int header_count;
int
http_on_path(http_parser*, const char *at, size_t length);
char *body;
size_t body_sz;
int
http_on_path(http_parser*, const char *at, size_t length);
char *type; /* forced output content-type */
char *jsonp; /* jsonp wrapper */
};
int
http_on_body(http_parser*, const char *at, size_t length);
struct http_client *
http_client_new(struct worker *w, int fd, in_addr_t addr);
int
http_on_header_name(http_parser*, const char *at, size_t length);
void
http_client_reset(struct http_client *c);
int
http_on_header_value(http_parser*, const char *at, size_t length);
void
http_client_free(struct http_client *c);
int
http_on_complete(http_parser*);
http_client_read(struct http_client *c);
int
http_on_query_string(http_parser*, const char *at, size_t length);
http_client_execute(struct http_client *c);
int
http_client_keep_alive(struct http_client *c);
/* responses */
void
http_send_reply(struct http_client *c, short code, const char *msg,
const char *body, size_t body_len);
/* Transfer-encoding: chunked */
void
http_send_reply_start(struct http_client *c, short code, const char *msg);
http_client_set_body(struct http_client *c, const char *at, size_t sz);
void
http_send_reply_chunk(struct http_client *c, const char *p, size_t sz);
void
http_send_reply_end(struct http_client *c);
void
http_send_error(struct http_client *c, short code, const char *msg);
/* convenience functions */
int
http_crossdomain(struct http_client *c);
const char *
client_get_header(struct http_client *c, const char *key);
/* reply to OPTIONS HTTP verb */
int
http_options(struct http_client *c);
#endif

92
cmd.c

@ -1,8 +1,11 @@
#include "cmd.h"
#include "server.h"
#include "conf.h"
#include "acl.h"
#include "client.h"
#include "pool.h"
#include "worker.h"
#include "http.h"
#include "server.h"
#include "formats/json.h"
#include "formats/bson.h"
@ -12,6 +15,7 @@
#include <stdlib.h>
#include <string.h>
#include <hiredis/hiredis.h>
#include <hiredis/async.h>
#include <ctype.h>
struct cmd *
@ -31,13 +35,20 @@ cmd_new(int count) {
void
cmd_free(struct cmd *c) {
int i;
if(!c) return;
for(i = 0; i < c->count; ++i) {
free((char*)c->argv[i]);
}
free(c->argv);
free(c->argv_len);
free(c->jsonp);
free(c->if_none_match);
if(c->mime_free) free(c->mime);
free(c);
}
@ -69,9 +80,39 @@ decode_uri(const char *uri, size_t length, size_t *out_len, int always_decode_pl
return ret;
}
/* setup headers */
static void
cmd_setup(struct cmd *cmd, struct http_client *client) {
int i;
cmd->keep_alive = client->keep_alive;
for(i = 0; i < client->header_count; ++i) {
if(strcasecmp(client->headers[i].key, "If-None-Match") == 0) {
cmd->if_none_match = calloc(1+client->headers[i].val_sz, 1);
memcpy(cmd->if_none_match, client->headers[i].val,
client->headers[i].val_sz);
} else if(strcasecmp(client->headers[i].key, "Connection") == 0 &&
strcasecmp(client->headers[i].val, "Keep-Alive") == 0) {
cmd->keep_alive = 1;
}
}
if(client->type) { /* transfer pointer ownership */
cmd->mime = client->type;
cmd->mime_free = 1;
client->type = NULL;
}
if(client->jsonp) { /* transfer pointer ownership */
cmd->jsonp = client->jsonp;
client->jsonp = NULL;
}
}
int
cmd_run(struct server *s, struct http_client *client,
cmd_run(struct worker *w, struct http_client *client,
const char *uri, size_t uri_len,
const char *body, size_t body_len) {
@ -79,10 +120,10 @@ cmd_run(struct server *s, struct http_client *client,
char *slash;
const char *p;
int cmd_len;
int param_count = 0, cur_param = 1, i;
int param_count = 0, cur_param = 1;
struct cmd *cmd;
redisAsyncContext *ac = NULL;
formatting_fun f_format;
/* count arguments */
@ -100,11 +141,15 @@ cmd_run(struct server *s, struct http_client *client,
return -1;
}
client->cmd = cmd = cmd_new(param_count);
cmd = cmd_new(param_count);
cmd->fd = client->fd;
/* get output formatting function */
uri_len = cmd_select_format(client, cmd, uri, uri_len, &f_format);
/* add HTTP info */
cmd_setup(cmd, client);
/* check if we only have one command or more. */
slash = memchr(uri, '/', uri_len);
if(slash) {
@ -114,26 +159,25 @@ cmd_run(struct server *s, struct http_client *client,
}
/* there is always a first parameter, it's the command name */
cmd->argv[0] = uri;
cmd->argv[0] = malloc(cmd_len);
memcpy(cmd->argv[0], uri, cmd_len);
cmd->argv_len[0] = cmd_len;
/* check that the client is able to run this command */
if(!acl_allow_command(cmd, s->cfg, client)) {
if(!acl_allow_command(cmd, w->s->cfg, client)) {
return -1;
}
/* check if we have to split the connection */
if(cmd_is_subscribe(cmd)) {
client->sub = malloc(sizeof(struct subscription));
client->sub->s = s = server_copy(s);
client->sub->cmd = cmd;
ac = (redisAsyncContext*)pool_connect(w->pool, 0);
}
/* no args (e.g. INFO command) */
if(!slash) {
client->state = CLIENT_EXECUTING;
redisAsyncCommandArgv(s->ac, f_format, client, 1, cmd->argv, cmd->argv_len);
ac = (redisAsyncContext*)pool_get_context(w->pool);
redisAsyncCommandArgv(ac, f_format, cmd, 1,
(const char **)cmd->argv, cmd->argv_len);
return 0;
}
p = slash + 1;
@ -156,21 +200,25 @@ cmd_run(struct server *s, struct http_client *client,
}
if(body && body_len) { /* PUT request */
cmd->argv[cur_param] = body;
cmd->argv[cur_param] = malloc(body_len);
memcpy(cmd->argv[cur_param], body, body_len);
cmd->argv_len[cur_param] = body_len;
}
/* push command to Redis. */
client->state = CLIENT_EXECUTING;
redisAsyncCommandArgv(s->ac, f_format, client, cmd->count, cmd->argv, cmd->argv_len);
for(i = 1; i < cur_param; ++i) {
free((char*)cmd->argv[i]);
/* send it off! */
if(!ac) {
ac = (redisAsyncContext*)pool_get_context(w->pool);
}
cmd_send(ac, f_format, cmd);
return 0;
}
void
cmd_send(redisAsyncContext *ac, formatting_fun f_format, struct cmd *cmd) {
redisAsyncCommandArgv(ac, f_format, cmd, cmd->count,
(const char **)cmd->argv, cmd->argv_len);
}
/**
* Select Content-Type and processing function.
@ -235,9 +283,9 @@ cmd_select_format(struct http_client *client, struct cmd *cmd,
}
/* the user can force it with ?type=some/thing */
if(client->query_string.type.s) {
if(client->type) {
*f_format = custom_type_reply;
cmd->mime = strdup(client->query_string.type.s);
cmd->mime = strdup(client->type);
cmd->mime_free = 1;
}

19
cmd.h

@ -10,18 +10,28 @@
struct evhttp_request;
struct http_client;
struct server;
struct worker;
struct cmd;
typedef void (*formatting_fun)(redisAsyncContext *, void *, void *);
struct cmd {
int fd;
int count;
const char **argv;
char **argv;
size_t *argv_len;
/* HTTP data */
char *mime;
char *mime; /* forced output content-type */
int mime_free;
char *if_none_match; /* used with ETags */
char *jsonp; /* jsonp wrapper */
int keep_alive;
/* various flags */
int started_responding;
int is_websocket;
};
struct subscription {
@ -36,7 +46,7 @@ void
cmd_free(struct cmd *c);
int
cmd_run(struct server *s, struct http_client *client,
cmd_run(struct worker *w, struct http_client *client,
const char *uri, size_t uri_len,
const char *body, size_t body_len);
@ -47,4 +57,7 @@ cmd_select_format(struct http_client *client, struct cmd *cmd,
int
cmd_is_subscribe(struct cmd *cmd);
void
cmd_send(redisAsyncContext *ac, formatting_fun f_format, struct cmd *cmd);
#endif

@ -30,6 +30,7 @@ conf_read(const char *filename) {
conf->redis_port = 6379;
conf->http_host = strdup("0.0.0.0");
conf->http_port = 7379;
conf->http_threads = 4;
conf->user = getuid();
conf->group = getgid();
conf->logfile = "webdis.log";
@ -58,6 +59,8 @@ conf_read(const char *filename) {
conf->http_host = strdup(json_string_value(jtmp));
} else if(strcmp(json_object_iter_key(kv), "http_port") == 0 && json_typeof(jtmp) == JSON_INTEGER) {
conf->http_port = (short)json_integer_value(jtmp);
} else if(strcmp(json_object_iter_key(kv), "threads") == 0 && json_typeof(jtmp) == JSON_INTEGER) {
conf->http_threads = (short)json_integer_value(jtmp);
} else if(strcmp(json_object_iter_key(kv), "acl") == 0 && json_typeof(jtmp) == JSON_ARRAY) {
conf->perms = conf_parse_acls(jtmp);
} else if(strcmp(json_object_iter_key(kv), "user") == 0 && json_typeof(jtmp) == JSON_STRING) {

@ -14,6 +14,7 @@ struct conf {
/* HTTP server interface */
char *http_host;
short http_port;
short http_threads;
/* daemonize process, off by default */
int daemonize;

@ -1,7 +1,6 @@
#include "bson.h"
#include "common.h"
#include "cmd.h"
#include "client.h"
#include "http.h"
#include <string.h>
@ -18,30 +17,33 @@ void
bson_reply(redisAsyncContext *c, void *r, void *privdata) {
redisReply *reply = r;
struct http_client *client = privdata;
struct cmd *cmd = privdata;
bson_t *b;
char *bstr = NULL;
size_t bsz;
(void)c;
if(client->cmd == NULL) {
if(cmd == NULL) {
/* broken connection */
return;
}
if (reply == NULL) {
/* FIXME */
/*
http_send_reply(client, 404, "Not Found", NULL, 0);
*/
return;
}
/* encode redis reply as BSON */
b = bson_wrap_redis_reply(client->cmd, reply);
b = bson_wrap_redis_reply(cmd, reply);
/* get BSON as string */
bstr = bson_string_output(b, &bsz);
/* send reply */
format_send_reply(client, bstr, bsz, "application/bson");
format_send_reply(cmd, bstr, bsz, "application/bson");
/* cleanup */
free(bstr);

@ -2,11 +2,13 @@
#include "cmd.h"
#include "http.h"
#include "client.h"
#include "websocket.h"
#include "md5/md5.h"
#include <string.h>
#include <unistd.h>
/* TODO: replace this with a faster hash function */
/* TODO: replace this with a faster hash function? */
char *etag_new(const char *p, size_t sz) {
md5_byte_t buf[16];
@ -30,42 +32,59 @@ char *etag_new(const char *p, size_t sz) {
}
void
format_send_reply(struct http_client *client, const char *p, size_t sz, const char *content_type) {
format_send_reply(struct cmd *cmd, const char *p, size_t sz, const char *content_type) {
int free_cmd = 1;
struct http_response resp;
struct cmd *cmd = client->cmd;
if(cmd->is_websocket) {
ws_reply(cmd, p, sz);
cmd_free(cmd);
return;
}
if(cmd_is_subscribe(cmd)) {
free_cmd = 0;
/* start streaming */
if(client->started_responding == 0) {
if(cmd->started_responding == 0) {
const char *ct = cmd->mime?cmd->mime:content_type;
client->started_responding = 1;
http_set_header(&client->output_headers.content_type, ct, strlen(ct));
http_send_reply_start(client, 200, "OK");
cmd->started_responding = 1;
http_response_init(&resp, 200, "OK");
http_response_set_header(&resp, "Content-Type", ct);
http_response_set_header(&resp, "Connection", "Keep-Alive");
http_response_set_header(&resp, "Transfer-Encoding", "Chunked");
http_response_write(&resp, cmd->fd);
}
http_send_reply_chunk(client, p, sz);
http_response_write_chunk(cmd->fd, p, sz);
} else {
/* compute ETag */
char *etag = etag_new(p, sz);
const char *if_none_match = client->input_headers.if_none_match.s;
/* check If-None-Match */
if(if_none_match && strncmp(if_none_match, etag, client->input_headers.if_none_match.sz) == 0) {
if(cmd->if_none_match && strcmp(cmd->if_none_match, etag) == 0) {
/* SAME! send 304. */
http_send_reply(client, 304, "Not Modified", NULL, 0);
http_response_init(&resp, 304, "Not Modified");
} else {
const char *ct = cmd->mime?cmd->mime:content_type;
http_set_header(&client->output_headers.content_type, ct, strlen(ct));
http_set_header(&client->output_headers.etag, etag, strlen(etag));
http_send_reply(client, 200, "OK", p, sz);
http_response_init(&resp, 200, "OK");
http_response_set_header(&resp, "Content-Type", ct);
http_response_set_header(&resp, "ETag", etag);
http_response_set_body(&resp, p, sz);
}
if(cmd->keep_alive) {
http_response_set_header(&resp, "Connection", "Keep-Alive");
} else {
http_response_set_header(&resp, "Connection", "Close");
}
http_response_write(&resp, cmd->fd);
if(!cmd->keep_alive) {
close(cmd->fd);
}
free(etag);
}
/* cleanup */
if(free_cmd) {
cmd_free(cmd);

@ -3,10 +3,10 @@
#include <stdlib.h>
struct http_client;
struct cmd;
void
format_send_reply(struct http_client *client,
format_send_reply(struct cmd *cmd,
const char *p, size_t sz,
const char *content_type);

@ -2,7 +2,6 @@
#include "cmd.h"
#include "common.h"
#include "http.h"
#include "client.h"
#include <string.h>
#include <hiredis/hiredis.h>
@ -12,35 +11,43 @@ void
custom_type_reply(redisAsyncContext *c, void *r, void *privdata) {
redisReply *reply = r;
struct http_client *client = privdata;
struct cmd *cmd = privdata;
(void)c;
char int_buffer[50];
int int_len;
struct http_response resp;
if(reply == NULL) {
return;
}
if(client->cmd->mime) { /* use the given content-type, but only for strings */
if(cmd->mime) { /* use the given content-type, but only for strings */
switch(reply->type) {
case REDIS_REPLY_NIL: /* or nil values */
format_send_reply(client, "", 0, client->cmd->mime);
format_send_reply(cmd, "", 0, cmd->mime);
return;
case REDIS_REPLY_STRING:
format_send_reply(client, reply->str, reply->len, client->cmd->mime);
format_send_reply(cmd, reply->str, reply->len, cmd->mime);
return;
case REDIS_REPLY_INTEGER:
int_len = sprintf(int_buffer, "%lld", reply->integer);
format_send_reply(client, int_buffer, int_len, client->cmd->mime);
format_send_reply(cmd, int_buffer, int_len, cmd->mime);
return;
}
}
/* couldn't make sense of what the client wanted. */
http_send_reply(client, 400, "Bad request", NULL, 0);
cmd_free(client->cmd);
http_response_init(&resp, 400, "Bad Request");
http_response_set_header(&resp, "Content-Length", "0");
if(cmd->keep_alive) {
http_response_set_header(&resp, "Connection", "Keep-Alive");
} else {
http_response_set_header(&resp, "Connection", "Close");
}
http_response_write(&resp, cmd->fd);
cmd_free(cmd);
}

@ -15,33 +15,34 @@ void
json_reply(redisAsyncContext *c, void *r, void *privdata) {
redisReply *reply = r;
struct http_client *client = privdata;
struct cmd *cmd = privdata;
json_t *j;
char *jstr;
(void)c;
if(client->cmd == NULL) {
if(cmd == NULL) {
/* broken connection */
return;
}
if (reply == NULL) {
/* FIXME */
#if 0
if(client->started_responding) { /* broken, close */
http_send_reply_end(client);
}
#endif
return;
}
/* encode redis reply as JSON */
j = json_wrap_redis_reply(client->cmd, r);
j = json_wrap_redis_reply(cmd, r);
/* get JSON as string, possibly with JSONP wrapper */
jstr = json_string_output(j,
client->query_string.jsonp.s,
client->query_string.jsonp.sz);
jstr = json_string_output(j, cmd->jsonp);
/* send reply */
format_send_reply(client, jstr, strlen(jstr), "application/json");
format_send_reply(cmd, jstr, strlen(jstr), "application/json");
/* cleanup */
json_decref(j);
@ -192,12 +193,13 @@ json_wrap_redis_reply(const struct cmd *cmd, const redisReply *r) {
char *
json_string_output(json_t *j, const char *jsonp, size_t jsonp_len) {
json_string_output(json_t *j, const char *jsonp) {
char *json_reply = json_dumps(j, JSON_COMPACT);
/* check for JSONP */
if(jsonp) {
size_t jsonp_len = strlen(jsonp);
size_t json_len = strlen(json_reply);
size_t ret_len = jsonp_len + 1 + json_len + 3;
char *ret = calloc(1 + ret_len, 1);
@ -214,3 +216,83 @@ json_string_output(json_t *j, const char *jsonp, size_t jsonp_len) {
return json_reply;
}
/* extract JSON from WebSocket frame and fill struct cmd. */
struct cmd *
json_ws_extract(struct http_client *c, const char *p, size_t sz) {
struct cmd *cmd = NULL;
json_t *j;
char *jsonz; /* null-terminated */
unsigned int i, cur;
int argc = 0;
json_error_t jerror;
(void)c;
jsonz = calloc(sz + 1, 1);
memcpy(jsonz, p, sz);
j = json_loads(jsonz, sz, &jerror);
free(jsonz);
if(!j) {
return NULL;
}
if(json_typeof(j) != JSON_ARRAY) {
json_decref(j);
return NULL; /* invalid JSON */
}
/* count elements */
for(i = 0; i < json_array_size(j); ++i) {
json_t *jelem = json_array_get(j, i);
switch(json_typeof(jelem)) {
case JSON_STRING:
case JSON_INTEGER:
argc++;
break;
default:
break;
}
}
if(!argc) { /* not a single item could be decoded */
json_decref(j);
return NULL;
}
/* create command and add args */
cmd = cmd_new(argc);
for(i = 0, cur = 0; i < json_array_size(j); ++i) {
json_t *jelem = json_array_get(j, i);
char *tmp;
switch(json_typeof(jelem)) {
case JSON_STRING:
tmp = strdup(json_string_value(jelem));
cmd->argv[cur] = tmp;
cmd->argv_len[cur] = strlen(tmp);
cur++;
break;
case JSON_INTEGER:
tmp = malloc(40);
sprintf(tmp, "%d", (int)json_integer_value(jelem));
cmd->argv[cur] = tmp;
cmd->argv_len[cur] = strlen(tmp);
cur++;
break;
default:
break;
}
}
json_decref(j);
return cmd;
}

@ -6,11 +6,15 @@
#include <hiredis/async.h>
struct cmd;
struct http_client;
void
json_reply(redisAsyncContext *c, void *r, void *privdata);
char *
json_string_output(json_t *j, const char *jsonp, size_t jsonp_len);
json_string_output(json_t *j, const char *jsonp);
struct cmd *
json_ws_extract(struct http_client *c, const char *p, size_t sz);
#endif

@ -13,7 +13,7 @@ void
raw_reply(redisAsyncContext *c, void *r, void *privdata) {
redisReply *reply = r;
struct http_client *client = privdata;
struct cmd *cmd = privdata;
char *raw_out;
size_t sz;
(void)c;
@ -25,7 +25,7 @@ raw_reply(redisAsyncContext *c, void *r, void *privdata) {
raw_out = raw_wrap(r, &sz);
/* send reply */
format_send_reply(client, raw_out, sz, "binary/octet-stream");
format_send_reply(cmd, raw_out, sz, "binary/octet-stream");
/* cleanup */
free(raw_out);

164
http.c

@ -1,19 +1,12 @@
#include "http.h"
#include "server.h"
#include "slog.h"
#include "cmd.h"
#include "worker.h"
#include "client.h"
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
void
http_set_header(str_t *h, const char *p, size_t sz) {
h->s = calloc(1, 1+sz);
memcpy(h->s, p, sz);
h->sz = sz;
}
#include <stdio.h>
/* HTTP Response */
@ -34,35 +27,39 @@ void
http_response_set_header(struct http_response *r, const char *k, const char *v) {
int i, pos = r->header_count;
size_t sz;
char *s;
sz = strlen(k) + 2 + strlen(v) + 2;
s = calloc(sz + 1, 1);
sprintf(s, "%s: %s\r\n", k, v);
size_t key_sz = strlen(k);
size_t val_sz = strlen(v);
for(i = 0; i < r->header_count; ++i) {
size_t klen = strlen(k);
if(strncmp(r->headers[i].s, k, klen) == 0 && r->headers[i].s[klen] == ':') {
if(strncmp(r->headers[i].key, k, key_sz) == 0) {
pos = i;
/* free old value before replacing it. */
free(r->headers[i].s);
free(r->headers[i].key);
free(r->headers[i].val);
break;
}
}
/* extend array */
if(pos == r->header_count) {
r->headers = realloc(r->headers, sizeof(str_t)*(r->header_count + 1));
r->headers = realloc(r->headers,
sizeof(struct http_header)*(r->header_count + 1));
r->header_count++;
}
r->headers[pos].s = s;
r->headers[pos].sz = sz;
if(!strcmp(k, "Transfer-Encoding") && !strcmp(v, "chunked")) {
/* copy key */
r->headers[pos].key = calloc(key_sz + 1, 1);
memcpy(r->headers[pos].key, k, key_sz);
r->headers[pos].key_sz = key_sz;
/* copy val */
r->headers[pos].val = calloc(val_sz + 1, 1);
memcpy(r->headers[pos].val, v, val_sz);
r->headers[pos].val_sz = val_sz;
if(!strcmp(k, "Transfer-Encoding") && !strcmp(v, "Chunked")) {
r->chunked = 1;
}
}
void
@ -77,13 +74,13 @@ http_response_write(struct http_response *r, int fd) {
char *s = NULL, *p;
size_t sz = 0;
int i, ret;
int i, ret, keep_alive = 0;
sz = sizeof("HTTP/1.x xxx ")-1 + strlen(r->msg) + 2;
s = calloc(sz + 1, 1);
ret = sprintf(s, "HTTP/1.1 %d %s\r\n", r->code, r->msg);
p = s; // + ret - 3;
p = s;
if(r->code == 200 && r->body) {
char content_length[10];
@ -94,12 +91,33 @@ http_response_write(struct http_response *r, int fd) {
}
for(i = 0; i < r->header_count; ++i) {
s = realloc(s, sz + r->headers[i].sz);
/* "Key: Value\r\n" */
size_t header_sz = r->headers[i].key_sz + 2 + r->headers[i].val_sz + 2;
s = realloc(s, sz + header_sz);
p = s + sz;
memcpy(p, r->headers[i].s, r->headers[i].sz);
p += r->headers[i].sz;
sz += r->headers[i].sz;
/* add key */
memcpy(p, r->headers[i].key, r->headers[i].key_sz);
p += r->headers[i].key_sz;
/* add ": " */
memcpy(p, ": ", 2);
p += 2;
/* add value */
memcpy(p, r->headers[i].val, r->headers[i].val_sz);
p += r->headers[i].val_sz;
/* add "\r\n" */
memcpy(p, "\r\n", 2);
p += 2;
sz += header_sz;
if(strncasecmp("Connection", r->headers[i].key, r->headers[i].key_sz) == 0 &&
strncasecmp("Keep-Alive", r->headers[i].val, r->headers[i].val_sz) == 0) {
keep_alive = 1;
}
}
/* end of headers */
@ -114,14 +132,98 @@ http_response_write(struct http_response *r, int fd) {
}
ret = write(fd, s, sz);
if(!keep_alive) {
/* printf("response write, close fd=%d\n", fd); */
close(fd);
}
/*
write(1, "response: [", 11);
write(1, s, sz);
write(1, "]\n", 2);
*/
free(s);
/* cleanup response object */
for(i = 0; i < r->header_count; ++i) {
free(r->headers[i].s);
free(r->headers[i].key);
free(r->headers[i].val);
}
free(r->headers);
return ret == (int)sz ? 0 : 1;
}
/* Adobe flash cross-domain request */
void
http_crossdomain(struct http_client *c) {
struct http_response resp;
char out[] = "<?xml version=\"1.0\"?>\n"
"<!DOCTYPE cross-domain-policy SYSTEM \"http://www.macromedia.com/xml/dtds/cross-domain-policy.dtd\">\n"
"<cross-domain-policy>\n"
"<allow-access-from domain=\"*\" />\n"
"</cross-domain-policy>\n";
http_response_init(&resp, 200, "OK");
http_response_set_connection_header(c, &resp);
http_response_set_header(&resp, "Content-Type", "application/xml");
http_response_set_body(&resp, out, sizeof(out)-1);
http_response_write(&resp, c->fd);
http_client_reset(c);
}
/* Simple error response */
void
http_send_error(struct http_client *c, short code, const char *msg) {
struct http_response resp;
http_response_init(&resp, code, msg);
http_response_set_connection_header(c, &resp);
http_response_set_body(&resp, NULL, 0);
http_response_write(&resp, c->fd);
http_client_reset(c);
}
void
http_response_set_connection_header(struct http_client *c, struct http_response *r) {
if(c->keep_alive) {
http_response_set_header(r, "Connection", "Keep-Alive");
} else {
http_response_set_header(r, "Connection", "Close");
}
}
/* Response to HTTP OPTIONS */
void
http_send_options(struct http_client *c) {
struct http_response resp;
http_response_init(&resp, 200, "OK");
http_response_set_connection_header(c, &resp);
http_response_set_header(&resp, "Content-Type", "text/html");
http_response_set_header(&resp, "Allow", "GET,POST,PUT,OPTIONS");
http_response_set_header(&resp, "Content-Length", "0");
/* Cross-Origin Resource Sharing, CORS. */
http_response_set_header(&resp, "Access-Control-Allow-Origin", "*");
http_response_write(&resp, c->fd);
http_client_reset(c);
}
void
http_response_write_chunk(int fd, const char *p, size_t sz) {
char buf[64];
int ret, chunk_size;
chunk_size = sprintf(buf, "%x\r\n", (int)sz);
ret = write(fd, buf, chunk_size);
ret = write(fd, p, sz);
ret = write(fd, "\r\n", 2);
(void)ret;
}

@ -3,16 +3,22 @@
#include <sys/types.h>
typedef struct {
char *s;
size_t sz;
} str_t;
struct http_client;
struct http_header {
char *key;
size_t key_sz;
char *val;
size_t val_sz;
};
struct http_response {
short code;
const char *msg;
str_t *headers;
struct http_header *headers;
int header_count;
const char *body;
@ -21,9 +27,6 @@ struct http_response {
int chunked;
};
void
http_set_header(str_t *h, const char *p, size_t sz);
/* HTTP response */
void
@ -38,5 +41,19 @@ http_response_set_body(struct http_response *r, const char *body, size_t body_le
int
http_response_write(struct http_response *r, int fd);
void
http_crossdomain(struct http_client *c);
void
http_send_error(struct http_client *c, short code, const char *msg);
void
http_send_options(struct http_client *c);
void
http_response_set_connection_header(struct http_client *c, struct http_response *r);
void
http_response_write_chunk(int fd, const char *p, size_t sz);
#endif

126
pool.c

@ -0,0 +1,126 @@
#include "pool.h"
#include "worker.h"
#include "conf.h"
#include "server.h"
#include <stdlib.h>
#include <event.h>
#include <hiredis/adapters/libevent.h>
struct pool *
pool_new(struct worker *w, int count) {
struct pool *p = calloc(1, sizeof(struct pool));
p->count = count;
p->ac = calloc(count, sizeof(redisAsyncContext*));
p->w = w;
p->cfg = w->s->cfg;
return p;
}
static void
pool_on_connect(const redisAsyncContext *c) {
struct pool *p = c->data;
int i = 0;
printf("Connected to redis\n");
if(!p) {
return;
}
/* add to pool */
for(i = 0; i < p->count; ++i) {
if(p->ac[i] == NULL) {
p->ac[i] = c;
return;
}
}
}
static void
pool_on_disconnect(const redisAsyncContext *c, int status) {
struct pool *p = c->data;
int i = 0;
if (status != REDIS_OK) {
fprintf(stderr, "Error: %s\n", c->errstr);
}
if(p == NULL) { /* no need to clean anything here. */
return;
}
/* remove from the pool */
for(i = 0; i < p->count; ++i) {
if(p->ac[i] == c) {
p->ac[i] = NULL;
break;
}
}
/* reconnect */
pool_connect(p, 1);
}
/**
* Create new connection.
*/
redisAsyncContext *
pool_connect(struct pool *p, int attach) {
struct redisAsyncContext *ac;
if(p->cfg->redis_host[0] == '/') { /* unix socket */
ac = redisAsyncConnectUnix(p->cfg->redis_host);
} else {
ac = redisAsyncConnect(p->cfg->redis_host, p->cfg->redis_port);
}
if(attach) {
ac->data = p;
} else {
ac->data = NULL;
}
if(ac->err) {
/*
const char err[] = "Connection failed";
slog(s, WEBDIS_ERROR, err, sizeof(err)-1);
*/
fprintf(stderr, "Error: %s\n", ac->errstr);
redisAsyncFree(ac);
return NULL;
}
redisLibeventAttach(ac, p->w->base);
redisAsyncSetConnectCallback(ac, pool_on_connect);
redisAsyncSetDisconnectCallback(ac, pool_on_disconnect);
if (p->cfg->redis_auth) { /* authenticate. */
redisAsyncCommand(ac, NULL, NULL, "AUTH %s", p->cfg->redis_auth);
}
if (p->cfg->database) { /* change database. */
redisAsyncCommand(ac, NULL, NULL, "SELECT %d", p->cfg->database);
}
return ac;
}
const redisAsyncContext *
pool_get_context(struct pool *p) {
int orig = p->cur++;
do {
p->cur++;
p->cur %= p->count;
if(p->ac[p->cur] != NULL) {
return p->ac[p->cur];
}
} while(p->cur != orig);
return NULL;
}

@ -0,0 +1,30 @@
#ifndef POOL_H
#define POOL_H
#include <hiredis/async.h>
struct conf;
struct worker;
struct pool {
struct worker *w;
struct conf *cfg;
const redisAsyncContext **ac;
int count;
int cur;
};
struct pool *
pool_new(struct worker *w, int count);
redisAsyncContext *
pool_connect(struct pool *p, int attach);
const redisAsyncContext *
pool_get_context(struct pool *p);
#endif

@ -1,14 +1,10 @@
#include "server.h"
#include "conf.h"
#include "cmd.h"
#include "slog.h"
#include "http.h"
#include "worker.h"
#include "client.h"
#include "conf.h"
#include <hiredis/hiredis.h>
#include <hiredis/adapters/libevent.h>
#include <jansson.h>
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <signal.h>
#include <string.h>
@ -20,7 +16,7 @@
/**
* Sets up a non-blocking socket
*/
int
static int
socket_setup(const char *ip, short port) {
int reuse = 1;
@ -75,167 +71,74 @@ socket_setup(const char *ip, short port) {
}
struct server *
server_new(const char *filename) {
struct server *s = calloc(1, sizeof(struct server));
s->cfg = conf_read(filename);
s->base = event_base_new();
return s;
}
void
server_free(struct server *s) {
/* cleanup Redis async object, _before_ the 2 struct event. */
redisAsyncFree(s->ac);
/* event_del(&s->ev); */
event_del(&s->ev_reconnect);
free(s);
}
static void
connectCallback(const redisAsyncContext *c) {
((void)c);
}
static void
disconnectCallback(const redisAsyncContext *c, int status) {
struct server *s = c->data;
if (status != REDIS_OK) {
fprintf(stderr, "Error: %s\n", c->errstr);
}
s->ac = NULL;
/* wait 100 msec and reconnect */
s->tv_reconnect.tv_sec = 0;
s->tv_reconnect.tv_usec = 100*1000;
webdis_connect(s);
}
static void
on_timer_reconnect(int fd, short event, void *ctx) {
server_new(const char *cfg_file) {
(void)fd;
(void)event;
struct server *s = ctx;
if(s->cfg->redis_host[0] == '/') { /* unix socket */
s->ac = redisAsyncConnectUnix(s->cfg->redis_host);
} else {
s->ac = redisAsyncConnect(s->cfg->redis_host, s->cfg->redis_port);
}
s->ac->data = s;
if(s->ac->err) {
const char err[] = "Connection failed";
slog(s, WEBDIS_ERROR, err, sizeof(err)-1);
fprintf(stderr, "Error: %s\n", s->ac->errstr);
}
int i;
struct server *s = calloc(1, sizeof(struct server));
redisLibeventAttach(s->ac, s->base);
redisAsyncSetConnectCallback(s->ac, connectCallback);
redisAsyncSetDisconnectCallback(s->ac, disconnectCallback);
s->cfg = conf_read(cfg_file);
if (s->cfg->redis_auth) { /* authenticate. */
redisAsyncCommand(s->ac, NULL, NULL, "AUTH %s", s->cfg->redis_auth);
}
if (s->cfg->database) { /* change database. */
redisAsyncCommand(s->ac, NULL, NULL, "SELECT %d", s->cfg->database);
/* workers */
s->w = calloc(s->cfg->http_threads, sizeof(struct worker*));
for(i = 0; i < s->cfg->http_threads; ++i) {
s->w[i] = worker_new(s);
}
}
void
webdis_connect(struct server *s) {
/* schedule reconnect */
evtimer_set(&s->ev_reconnect, on_timer_reconnect, s);
event_base_set(s->base, &s->ev_reconnect);
evtimer_add(&s->ev_reconnect, &s->tv_reconnect);
}
struct server *
server_copy(const struct server *s) {
struct server *ret = calloc(1, sizeof(struct server));
*ret = *s;
/* create a new connection */
ret->ac = NULL;
on_timer_reconnect(0, 0, ret);
return ret;
return s;
}
static void
on_possible_accept(int fd, short event, void *ctx) {
server_can_accept(int fd, short event, void *ptr) {
struct server *s = ctx;
struct server *s = ptr;
struct worker *w;
struct http_client *c;
int client_fd;
struct sockaddr_in addr;
socklen_t addr_sz = sizeof(addr);
(void)event;
struct http_client *c;
client_fd = accept(fd, (struct sockaddr*)&addr, &addr_sz);
c = http_client_new(client_fd, s);
c->addr = addr.sin_addr.s_addr;
http_client_serve(c);
}
/* Taken from Redis. */
void
server_daemonize(void) {
int fd;
w = s->w[s->next_worker]; /* select worker to send the client to */
if (fork() != 0) exit(0); /* parent exits */
setsid(); /* create a new session */
/* create client and send to worker. */
client_fd = accept(fd, (struct sockaddr*)&addr, &addr_sz);
/* printf("Just accepted fd=%d, sending to worker %p\n", client_fd, (void*)w); */
c = http_client_new(w, client_fd, addr.sin_addr.s_addr);
worker_add_client(w, c);
/* Every output goes to /dev/null. */
if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
dup2(fd, STDIN_FILENO);
dup2(fd, STDOUT_FILENO);
dup2(fd, STDERR_FILENO);
if (fd > STDERR_FILENO) close(fd);
}
s->next_worker = (s->next_worker + 1) % s->cfg->http_threads; /* loop over ring of workers */
}
void
int
server_start(struct server *s) {
if(s->cfg->daemonize) {
server_daemonize();
}
int i;
/* ignore sigpipe */
#ifdef SIGPIPE
signal(SIGPIPE, SIG_IGN);
#endif
/* start http server */
slog(s, WEBDIS_INFO, "Starting HTTP Server", sizeof("Starting HTTP Server")-1);
/* start worker threads */
for(i = 0; i < s->cfg->http_threads; ++i) {
worker_start(s->w[i]);
}
/* create socket */
s->fd = socket_setup(s->cfg->http_host, s->cfg->http_port);
/* check return value. */
if(s->fd == -1) {
char err[] = "Failed to create socket.";
slog(s, WEBDIS_ERROR, err, sizeof(err)-1);
_exit(1);
if(s->fd < 0) {
return -1;
}
event_set(&s->ev, s->fd, EV_READ | EV_PERSIST, on_possible_accept, s);
event_base_set(s->base, &s->ev);
event_add(&s->ev, NULL);
/* drop privileges */
slog(s, WEBDIS_INFO, "Dropping Privileges", sizeof("Dropping Privileges")-1);
setuid(s->cfg->user);
setgid(s->cfg->group);
/* initialize libevent */
s->base = event_base_new();
/* attach hiredis to libevent base */
webdis_connect(s);
/* start http server */
event_set(&s->ev, s->fd, EV_READ | EV_PERSIST, server_can_accept, s);
event_base_set(s->base, &s->ev);
event_add(&s->ev, NULL);
/* loop */
event_base_dispatch(s->base);
return 0;
}

@ -1,42 +1,30 @@
#ifndef SERVER_H
#define SERVER_H
#include <hiredis/async.h>
#include <time.h>
#include <sys/queue.h>
#include <event.h>
#include <hiredis/async.h>
struct server {
struct worker;
struct conf;
struct conf *cfg;
struct event_base *base;
redisAsyncContext *ac;
struct server {
/* server socket and event struct */
int fd;
struct event ev;
struct event_base *base;
struct event ev_reconnect;
struct timeval tv_reconnect;
};
void
webdis_connect(struct server *s);
struct server *
server_new(const char *filename);
struct conf *cfg;
void
server_free(struct server *s);
/* worker threads */
struct worker **w;
int next_worker;
};
struct server *
server_copy(const struct server *s);
server_new(const char *cfg_file);
void
int
server_start(struct server *s);
void
webdis_log(struct server *s, int level, const char *body);
#endif

@ -0,0 +1,15 @@
OUT=websocket
CFLAGS=-O3 -Wall -Wextra
LDFLAGS=-levent -lpthread
all: $(OUT) Makefile
%: %.o Makefile
$(CC) $(LDFLAGS) -o $@ $<
%.o: %.c Makefile
$(CC) -c $(CFLAGS) -o $@ $<
clean:
rm -f *.o $(OUT)

@ -0,0 +1,181 @@
#!/usr/bin/python
import urllib2, unittest, json
from functools import wraps
try:
import bson
except:
bson = None
host = '127.0.0.1'
port = 7379
class TestWebdis(unittest.TestCase):
def wrap(self,url):
return 'http://%s:%d/%s' % (host, port, url)
def query(self, url):
r = urllib2.Request(self.wrap(url))
return urllib2.urlopen(r)
class TestBasics(TestWebdis):
def test_crossdomain(self):
f = self.query('crossdomain.xml')
self.assertTrue(f.headers.getheader('Content-Type') == 'application/xml')
self.assertTrue("allow-access-from domain" in f.read())
def test_options(self):
pass
# not sure if OPTIONS is supported by urllib2...
# f = self.query('') # TODO: call with OPTIONS.
# self.assertTrue(f.headers.getheader('Content-Type') == 'text/html')
# self.assertTrue(f.headers.getheader('Allow') == 'GET,POST,PUT,OPTIONS')
# self.assertTrue(f.headers.getheader('Content-Length') == '0')
# self.assertTrue(f.headers.getheader('Access-Control-Allow-Origin') == '*')
class TestJSON(TestWebdis):
def test_set(self):
"success type (+OK)"
self.query('DEL/hello')
f = self.query('SET/hello/world')
self.assertTrue(f.headers.getheader('Content-Type') == 'application/json')
self.assertTrue(f.headers.getheader('ETag') == '"0db1124cf79ffeb80aff6d199d5822f8"')
self.assertTrue(f.read() == '{"SET":[true,"OK"]}')
def test_get(self):
"string type"
self.query('SET/hello/world')
f = self.query('GET/hello')
self.assertTrue(f.headers.getheader('Content-Type') == 'application/json')
self.assertTrue(f.headers.getheader('ETag') == '"8cf38afc245b7a6a88696566483d1390"')
self.assertTrue(f.read() == '{"GET":"world"}')
def test_incr(self):
"integer type"
self.query('DEL/hello')
f = self.query('INCR/hello')
self.assertTrue(f.headers.getheader('Content-Type') == 'application/json')
self.assertTrue(f.headers.getheader('ETag') == '"500e9bcdcbb1e98f25c1fbb880a96c99"')
self.assertTrue(f.read() == '{"INCR":1}')
def test_list(self):
"list type"
self.query('DEL/hello')
self.query('RPUSH/hello/abc')
self.query('RPUSH/hello/def')
f = self.query('LRANGE/hello/0/-1')
self.assertTrue(f.headers.getheader('Content-Type') == 'application/json')
self.assertTrue(f.headers.getheader('ETag') == '"622e51f547a480bef7cf5452fb7782db"')
self.assertTrue(f.read() == '{"LRANGE":["abc","def"]}')
def test_error(self):
"error return type"
f = self.query('UNKNOWN/COMMAND')
self.assertTrue(f.headers.getheader('Content-Type') == 'application/json')
try:
obj = json.loads(f.read())
except:
self.assertTrue(False)
return
self.assertTrue(len(obj) == 1)
self.assertTrue('UNKNOWN' in obj)
self.assertTrue(isinstance(obj['UNKNOWN'], list))
self.assertTrue(obj['UNKNOWN'][0] == False)
self.assertTrue(isinstance(obj['UNKNOWN'][1], unicode))
class TestRaw(TestWebdis):
def test_set(self):
"success type (+OK)"
self.query('DEL/hello')
f = self.query('SET/hello/world.raw')
self.assertTrue(f.headers.getheader('Content-Type') == 'binary/octet-stream')
self.assertTrue(f.read() == "+OK\r\n")
def test_get(self):
"string type"
self.query('SET/hello/world')
f = self.query('GET/hello.raw')
self.assertTrue(f.read() == '$5\r\nworld\r\n')
def test_incr(self):
"integer type"
self.query('DEL/hello')
f = self.query('INCR/hello.raw')
self.assertTrue(f.read() == ':1\r\n')
def test_list(self):
"list type"
self.query('DEL/hello')
self.query('RPUSH/hello/abc')
self.query('RPUSH/hello/def')
f = self.query('LRANGE/hello/0/-1.raw')
self.assertTrue(f.read() == "*2\r\n$3\r\nabc\r\n$3\r\ndef\r\n")
def test_error(self):
"error return type"
f = self.query('UNKNOWN/COMMAND.raw')
self.assertTrue(f.read().startswith("-ERR "))
def need_bson(fn):
def wrapper(self):
if bson:
fn(self)
return wrapper
class TestBSon(TestWebdis):
@need_bson
def test_set(self):
"success type (+OK)"
self.query('DEL/hello')
f = self.query('SET/hello/world.bson')
self.assertTrue(f.headers.getheader('Content-Type') == 'application/bson')
obj = bson.decode_all(f.read())
self.assertTrue(obj == [{u'SET': [True, bson.Binary('OK', 0)]}])
@need_bson
def test_get(self):
"string type"
self.query('SET/hello/world')
f = self.query('GET/hello.bson')
obj = bson.decode_all(f.read())
self.assertTrue(obj == [{u'GET': bson.Binary('world', 0)}])
@need_bson
def test_incr(self):
"integer type"
self.query('DEL/hello')
f = self.query('INCR/hello.bson')
obj = bson.decode_all(f.read())
self.assertTrue(obj == [{u'INCR': 1L}])
@need_bson
def test_list(self):
"list type"
self.query('DEL/hello')
self.query('RPUSH/hello/abc')
self.query('RPUSH/hello/def')
f = self.query('LRANGE/hello/0/-1.bson')
obj = bson.decode_all(f.read())
self.assertTrue(obj == [{u'LRANGE': [bson.Binary('abc', 0), bson.Binary('def', 0)]}])
@need_bson
def test_error(self):
"error return type"
f = self.query('UNKNOWN/COMMAND.bson')
obj = bson.decode_all(f.read())
self.assertTrue(len(obj) == 1)
self.assertTrue(u'UNKNOWN' in obj[0])
self.assertTrue(isinstance(obj[0], dict))
self.assertTrue(isinstance(obj[0][u'UNKNOWN'], list))
self.assertTrue(obj[0]['UNKNOWN'][0] == False)
self.assertTrue(isinstance(obj[0]['UNKNOWN'][1], bson.Binary))
if __name__ == '__main__':
unittest.main()

@ -0,0 +1,334 @@
/* http://tools.ietf.org/html/draft-hixie-thewebsocketprotocol-76 */
#include <stdlib.h>
#define _GNU_SOURCE
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <arpa/inet.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <event.h>
struct host_info {
char *host;
short port;
};
/* worker_thread, with counter of remaining messages */
struct worker_thread {
struct host_info *hi;
struct event_base *base;
int msg_target;
int msg_received;
int msg_sent;
int byte_count;
pthread_t thread;
struct evbuffer *buffer;
int got_header;
int verbose;
struct event ev_w;
};
void
process_message(struct worker_thread *wt, size_t sz) {
// printf("process_message\n");
if(wt->msg_received % 10000 == 0) {
printf("thread %u: %8d messages left (got %9d bytes so far).\n",
(unsigned int)wt->thread,
wt->msg_target - wt->msg_received, wt->byte_count);
}
wt->byte_count += sz;
/* decrement read count, and stop receiving when we reach zero. */
wt->msg_received++;
if(wt->msg_received == wt->msg_target) {
event_base_loopexit(wt->base, NULL);
}
}
void
websocket_write(int fd, short event, void *ptr) {
int ret;
struct worker_thread *wt = ptr;
if(event != EV_WRITE) {
return;
}
char message[] = "\x00[\"SET\",\"key\",\"value\"]\xff\x00[\"GET\",\"key\"]\xff";
ret = write(fd, message, sizeof(message)-1);
if(ret != sizeof(message)-1) {
fprintf(stderr, "write on %d failed: %s\n", fd, strerror(errno));
close(fd);
}
wt->msg_sent += 2;
if(wt->msg_sent < wt->msg_target) {
event_set(&wt->ev_w, fd, EV_WRITE, websocket_write, wt);
event_base_set(wt->base, &wt->ev_w);
ret = event_add(&wt->ev_w, NULL);
}
}
static void
websocket_read(int fd, short event, void *ptr) {
char packet[2048], *pos;
int ret, success = 1;
struct worker_thread *wt = ptr;
if(event != EV_READ) {
return;
}
/* read message */
ret = read(fd, packet, sizeof(packet));
pos = packet;
if(ret > 0) {
char *data, *last;
int sz, msg_sz;
/*
printf("Received %d bytes: \n", ret);
write(1, packet, ret);
printf("\n");
*/
if(wt->got_header == 0) { /* first response */
char *frame_start = strstr(packet, "MH"); /* end of the handshake */
if(frame_start == NULL) {
return; /* not yet */
} else { /* start monitoring possible writes */
printf("start monitoring possible writes\n");
evbuffer_add(wt->buffer, frame_start + 2, ret - (frame_start + 2 - packet));
wt->got_header = 1;
event_set(&wt->ev_w, fd, EV_WRITE,
websocket_write, wt);
event_base_set(wt->base, &wt->ev_w);
ret = event_add(&wt->ev_w, NULL);
}
} else {
/* we've had the header already, now bufffer data. */
evbuffer_add(wt->buffer, packet, ret);
}
while(1) {
data = (char*)EVBUFFER_DATA(wt->buffer);
sz = EVBUFFER_LENGTH(wt->buffer);
if(sz == 0) { /* no data */
break;
}
if(*data != 0) { /* missing frame start */
success = 0;
break;
}
last = memchr(data, 0xff, sz); /* look for frame end */
if(!last) {
/* no end of frame in sight. */
break;
}
msg_sz = last - data - 1;
process_message(ptr, msg_sz); /* record packet */
/* drain including frame delimiters (+2 bytes) */
evbuffer_drain(wt->buffer, msg_sz + 2);
}
} else {
printf("ret=%d\n", ret);
success = 0;
}
if(success == 0) {
shutdown(fd, SHUT_RDWR);
close(fd);
event_base_loopexit(wt->base, NULL);
}
}
void*
worker_main(void *ptr) {
char ws_template[] = "GET /.json HTTP/1.1\r\n"
"Host: %s:%d\r\n"
"Connection: Upgrade\r\n"
"Upgrade: WebSocket\r\n"
"Origin: http://%s:%d\r\n"
"Sec-WebSocket-Key1: 18x 6]8vM;54 *(5: { U1]8 z [ 8\r\n"
"Sec-WebSocket-Key2: 1_ tx7X d < nw 334J702) 7]o}` 0\r\n"
"\r\n"
"Tm[K T2u";
struct worker_thread *wt = ptr;
int ret;
int fd;
struct sockaddr_in addr;
char *ws_handshake;
size_t ws_handshake_sz;
/* connect socket */
fd = socket(AF_INET, SOCK_STREAM, 0);
addr.sin_family = AF_INET;
addr.sin_port = htons(wt->hi->port);
memset(&(addr.sin_addr), 0, sizeof(addr.sin_addr));
addr.sin_addr.s_addr = inet_addr(wt->hi->host);
ret = connect(fd, (struct sockaddr*)&addr, sizeof(struct sockaddr));
if(ret != 0) {
fprintf(stderr, "connect: ret=%d: %s\n", ret, strerror(errno));
return NULL;
}
/* initialize worker thread */
wt->base = event_base_new();
wt->buffer = evbuffer_new();
wt->byte_count = 0;
wt->got_header = 0;
/* send handshake */
ws_handshake_sz = sizeof(ws_handshake)
+ 2*strlen(wt->hi->host) + 500;
ws_handshake = calloc(ws_handshake_sz, 1);
ws_handshake_sz = (size_t)sprintf(ws_handshake, ws_template,
wt->hi->host, wt->hi->port,
wt->hi->host, wt->hi->port);
ret = write(fd, ws_handshake, ws_handshake_sz);
struct event ev_r;
event_set(&ev_r, fd, EV_READ | EV_PERSIST, websocket_read, wt);
event_base_set(wt->base, &ev_r);
event_add(&ev_r, NULL);
/* go! */
event_base_dispatch(wt->base);
event_base_free(wt->base);
free(ws_handshake);
return NULL;
}
void
usage(const char* argv0, char *host_default, short port_default,
int thread_count_default, int messages_default) {
printf("Usage: %s [options]\n"
"Options are:\n"
"\t-h host\t\t(default = \"%s\")\n"
"\t-p port\t\t(default = %d)\n"
"\t-c threads\t(default = %d)\n"
"\t-n count\t(number of messages per thread, default = %d)\n"
"\t-v\t\t(verbose)\n",
argv0, host_default, (int)port_default,
thread_count_default, messages_default);
}
int
main(int argc, char *argv[]) {
struct timespec t0, t1;
int messages_default = 100000;
int thread_count_default = 4;
short port_default = 7379;
char *host_default = "127.0.0.1";
int msg_target = messages_default;
int thread_count = thread_count_default;
int i, opt;
char *colon;
double total = 0, total_bytes = 0;
int verbose = 0;
struct host_info hi = {host_default, port_default};
struct worker_thread *workers;
/* getopt */
while ((opt = getopt(argc, argv, "h:p:c:n:v")) != -1) {
switch (opt) {
case 'h':
colon = strchr(optarg, ':');
if(!colon) {
size_t sz = strlen(optarg);
hi.host = calloc(1 + sz, 1);
strncpy(hi.host, optarg, sz);
} else {
hi.host = calloc(1+colon-optarg, 1);
strncpy(hi.host, optarg, colon-optarg);
hi.port = (short)atol(colon+1);
}
break;
case 'p':
hi.port = (short)atol(optarg);
break;
case 'c':
thread_count = atoi(optarg);
break;
case 'n':
msg_target = atoi(optarg);
break;
case 'v':
verbose = 1;
break;
default: /* '?' */
usage(argv[0], host_default, port_default,
thread_count_default,
messages_default);
exit(EXIT_FAILURE);
}
}
/* run threads */
workers = calloc(sizeof(struct worker_thread), thread_count);
clock_gettime(CLOCK_MONOTONIC, &t0);
for(i = 0; i < thread_count; ++i) {
workers[i].msg_target = msg_target;
workers[i].hi = &hi;
workers[i].verbose = verbose;
pthread_create(&workers[i].thread, NULL,
worker_main, &workers[i]);
}
/* wait for threads to finish */
for(i = 0; i < thread_count; ++i) {
pthread_join(workers[i].thread, NULL);
total += workers[i].msg_received;
total_bytes += workers[i].byte_count;
}
/* timing */
clock_gettime(CLOCK_MONOTONIC, &t1);
float mili0 = t0.tv_sec * 1000 + t0.tv_nsec / 1000000;
float mili1 = t1.tv_sec * 1000 + t1.tv_nsec / 1000000;
if(total != 0) {
printf("Read %ld messages in %0.2f sec: %0.2f msg/sec (%d MB/sec, %d KB/sec)\n",
(long)total,
(mili1-mili0)/1000.0,
1000*total/(mili1-mili0),
(int)(total_bytes / (1000*(mili1-mili0))),
(int)(total_bytes / (mili1-mili0)));
} else {
printf("No message was read.\n");
}
return EXIT_SUCCESS;
}

@ -1,6 +1,7 @@
#include <stdlib.h>
#include "server.h"
#include <stdlib.h>
int
main(int argc, char *argv[]) {

@ -6,6 +6,7 @@
"http_host": "0.0.0.0",
"http_port": 7379,
"threads": 4,
"daemonize": false,

@ -0,0 +1,234 @@
#include "md5/md5.h"
#include "websocket.h"
#include "client.h"
#include "formats/json.h"
#include "cmd.h"
#include "worker.h"
#include "pool.h"
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
static uint32_t
ws_read_key(const char *s) {
uint32_t ret = 0, spaces = 0;
const char *p;
size_t sz;
if(!s) {
return 0;
}
sz = strlen(s);
for(p = s; p < s+sz; ++p) {
if(*p >= '0' && *p <= '9') {
ret *= 10;
ret += (*p) - '0';
} else if (*p == ' ') {
spaces++;
}
}
return htonl(ret / spaces);
}
static int
ws_compute_handshake(struct http_client *c, unsigned char *out) {
char buffer[16];
md5_state_t ctx;
// websocket handshake
uint32_t number_1 = ws_read_key(client_get_header(c, "Sec-WebSocket-Key1"));
uint32_t number_2 = ws_read_key(client_get_header(c, "Sec-WebSocket-Key2"));
if(c->body_sz < 8) { /* we need at least 8 bytes */
return -1;
}
memcpy(buffer, &number_1, sizeof(uint32_t));
memcpy(buffer + sizeof(uint32_t), &number_2, sizeof(uint32_t));
memcpy(buffer + 2 * sizeof(uint32_t), c->body + c->body_sz - 8, 8); /* last 8 bytes */
md5_init(&ctx);
md5_append(&ctx, (const md5_byte_t *)buffer, sizeof(buffer));
md5_finish(&ctx, out);
return 0;
}
int
ws_handshake_reply(struct http_client *c) {
int ret;
unsigned char md5_handshake[16];
char *buffer = NULL, *p;
const char *origin = NULL, *host = NULL;
size_t origin_sz = 0, host_sz = 0, sz;
char template0[] = "HTTP/1.1 101 Websocket Protocol Handshake\r\n"
"Upgrade: WebSocket\r\n"
"Connection: Upgrade\r\n"
"Sec-WebSocket-Origin: "; /* %s */
char template1[] = "\r\n"
"Sec-WebSocket-Location: ws://"; /* %s%s */
char template2[] = "\r\n"
"Origin: http://"; /* %s */
char template3[] = "\r\n\r\n";
if((origin = client_get_header(c, "Origin"))) {
origin_sz = strlen(origin);
}
if((host = client_get_header(c, "Host"))) {
host_sz = strlen(host);
}
/* need those headers */
if(!origin || !origin_sz || !host || !host_sz || !c->path || !c->path_sz) {
return -1;
}
if(ws_compute_handshake(c, &md5_handshake[0]) != 0) {
printf("failed to compute handshake.\n");
return -1;
}
/* This code uses the WebSocket specification from May 23, 2010.
* The latest copy is available at http://www.whatwg.org/specs/web-socket-protocol/
*/
sz = sizeof(template0)-1 + origin_sz
+ sizeof(template1)-1 + host_sz + c->path_sz
+ sizeof(template2)-1 + host_sz
+ sizeof(template3)-1 + sizeof(md5_handshake);
p = buffer = malloc(sz);
/* Concat all */
/* template0 */
memcpy(p, template0, sizeof(template0)-1);
p += sizeof(template0)-1;
memcpy(p, origin, origin_sz);
p += origin_sz;
/* template1 */
memcpy(p, template1, sizeof(template1)-1);
p += sizeof(template1)-1;
memcpy(p, host, host_sz);
p += host_sz;
memcpy(p, c->path, c->path_sz);
p += c->path_sz;
/* template2 */
memcpy(p, template2, sizeof(template2)-1);
p += sizeof(template2)-1;
memcpy(p, host, host_sz);
p += host_sz;
/* template3 */
memcpy(p, template3, sizeof(template3)-1);
p += sizeof(template3)-1;
memcpy(p, &md5_handshake[0], sizeof(md5_handshake));
ret = write(c->fd, buffer, sz);
free(buffer);
return 0;
}
static int
ws_execute(struct http_client *c, const char *frame, size_t frame_len) {
struct cmd*(*fun_extract)(struct http_client *, const char *, size_t) = NULL;
if(strncmp(c->path, "/.json", 6) == 0) {
fun_extract = json_ws_extract;
}
if(fun_extract) {
struct cmd *cmd = fun_extract(c, frame, frame_len);
if(cmd) {
cmd->is_websocket = 1;
cmd->fd = c->fd;
/* TODO: clean this mess */
redisAsyncContext *ac = (redisAsyncContext*)pool_get_context(c->w->pool);
cmd_send(ac, json_reply, cmd);
return 0;
}
}
return -1;
}
/**
* Process some data just received on the socket.
*/
enum ws_read_action
ws_add_data(struct http_client *c) {
const char *frame_start, *frame_end;
char *tmp;
while(1) {
/* look for frame start */
if(!c->sz || c->buffer[0] != '\x00') {
/* printf("frame start fail\n"); */
return WS_READ_FAIL;
}
/* look for frame end */
int ret;
size_t frame_len;
frame_start = c->buffer;
frame_end = memchr(frame_start, '\xff', c->sz);
if(frame_end == NULL) {
/* continue reading */
return WS_READ_MORE;
}
/* parse and execute frame. */
frame_len = frame_end - frame_start - 1;
ret = ws_execute(c, frame_start + 1, frame_len);
if(ret != 0) {
return WS_READ_FAIL;
}
/* remove frame from buffer */
c->sz -= (2 + frame_len);
tmp = malloc(c->sz);
memcpy(tmp, c->buffer + 2 + frame_len, c->sz);
free(c->buffer);
c->buffer = tmp;
}
}
int
ws_reply(struct cmd *cmd, const char *p, size_t sz) {
int ret;
char *buffer = malloc(sz + 2);
/* create frame */
buffer[0] = '\x00';
memcpy(buffer + 1, p, sz);
buffer[sz + 1] = '\xff';
/* send WS frame */
ret = write(cmd->fd, buffer, sz+2);
free(buffer);
if(ret == (int)sz + 2) {
/* http_client_serve(c); */
return 0;
}
/* printf("WRITE FAIL on fd=%d, ret=%d (%s)\n", c->fd, ret, strerror(errno)); */
return -1;
}

@ -0,0 +1,23 @@
#ifndef WEBSOCKET_H
#define WEBSOCKET_H
#include <stdlib.h>
struct http_client;
struct cmd;
enum ws_read_action {
WS_READ_FAIL,
WS_READ_MORE,
WS_READ_EXEC};
int
ws_handshake_reply(struct http_client *c);
enum ws_read_action
ws_add_data(struct http_client *c);
int
ws_reply(struct cmd *cmd, const char *p, size_t sz);
#endif

@ -0,0 +1,196 @@
#include "worker.h"
#include "client.h"
#include "http.h"
#include "cmd.h"
#include "pool.h"
#include "slog.h"
#include "websocket.h"
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <event.h>
#include <string.h>
struct worker *
worker_new(struct server *s) {
struct worker *w = calloc(1, sizeof(struct worker));
w->s = s;
/* setup communication link */
pipe(w->link);
/* Redis connection pool */
w->pool = pool_new(w, 8); /* FIXME: change the number? use conf? */
return w;
}
void
worker_can_read(int fd, short event, void *p) {
struct http_client *c = p;
int ret, nparsed;
(void)fd;
(void)event;
ret = http_client_read(c);
if(ret <= 0) {
printf("client disconnected\n");
return;
}
if(c->is_websocket) {
/* printf("Got websocket data! (%d bytes)\n", ret); */
ws_add_data(c);
} else {
/* run parser */
nparsed = http_client_execute(c);
if(c->is_websocket) {
/* we need to use the remaining (unparsed) data as the body. */
if(nparsed < ret) {
http_client_set_body(c, c->buffer + nparsed + 1, c->sz - nparsed - 1);
ws_handshake_reply(c);
} else {
c->broken = 1;
}
free(c->buffer);
c->buffer = NULL;
c->sz = 0;
}
}
if(c->broken) { /* terminate client */
/* printf("terminate client\n"); */
http_client_free(c);
} else {
/* printf("start monitoring input again.\n"); */
worker_monitor_input(c);
}
}
void
worker_monitor_input(struct http_client *c) {
event_set(&c->ev, c->fd, EV_READ, worker_can_read, c);
event_base_set(c->w->base, &c->ev);
event_add(&c->ev, NULL);
}
static void
worker_can_accept(int pipefd, short event, void *ptr) {
struct http_client *c;
unsigned long addr;
(void)event;
(void)ptr;
int ret = read(pipefd, &addr, sizeof(addr));
if(ret == sizeof(addr)) {
c = (struct http_client*)addr;
/* create client, monitor fd for input */
worker_monitor_input(c);
}
}
static void
worker_pool_connect(struct worker *w) {
int i;
/* create connections */
for(i = 0; i < w->pool->count; ++i) {
pool_connect(w->pool, 1);
}
}
static void*
worker_main(void *p) {
struct worker *w = p;
struct event ev;
/* setup libevent */
w->base = event_base_new();
/* monitor pipe link */
event_set(&ev, w->link[0], EV_READ | EV_PERSIST, worker_can_accept, w);
event_base_set(w->base, &ev);
event_add(&ev, NULL);
/* connect to Redis */
worker_pool_connect(w);
/* loop */
event_base_dispatch(w->base);
return NULL;
}
void
worker_start(struct worker *w) {
pthread_create(&w->thread, NULL, worker_main, w);
}
/* queue new client to process */
void
worker_add_client(struct worker *w, struct http_client *c) {
/* write into pipe link */
unsigned long addr = (unsigned long)c;
int ret = write(w->link[1], &addr, sizeof(addr));
(void)ret;
/* printf("[for worker %p] write: %lu, c=%p (ret=%d)\n", (void*)w, addr, (void*)c, ret); */
}
/* Called when a client has finished reading input and is ready to be executed. */
void
worker_process_client(struct http_client *c) {
/* printf("worker_process_client\n"); */
/* check that the command can be executed */
struct worker *w = c->w;
int ret = -1;
switch(c->parser.method) {
case HTTP_GET:
if(c->path_sz == 16 && memcmp(c->path, "/crossdomain.xml", 16) == 0) {
http_crossdomain(c);
return;
}
slog(w->s, WEBDIS_DEBUG, c->path, c->path_sz);
ret = cmd_run(c->w, c, 1+c->path, c->path_sz-1, NULL, 0);
break;
case HTTP_POST:
slog(w->s, WEBDIS_DEBUG, c->path, c->path_sz);
ret = cmd_run(c->w, c, c->body, c->body_sz, NULL, 0);
break;
case HTTP_PUT:
slog(w->s, WEBDIS_DEBUG, c->path, c->path_sz);
ret = cmd_run(c->w, c, 1+c->path, c->path_sz-1,
c->body, c->body_sz);
break;
case HTTP_OPTIONS:
http_send_options(c);
default:
slog(w->s, WEBDIS_DEBUG, "405", 3);
http_send_error(c, 405, "Method Not Allowed");
return;
}
if(ret < 0) {
http_send_error(c, 403, "Forbidden");
}
}

@ -0,0 +1,41 @@
#ifndef WORKER_H
#define WORKER_H
#include <pthread.h>
struct http_client;
struct pool;
struct worker {
/* self */
pthread_t thread;
struct event_base *base;
/* connection dispatcher */
struct server *s;
int link[2];
/* Redis connection pool */
struct pool *pool;
};
struct worker *
worker_new(struct server *s);
void
worker_start(struct worker *w);
void
worker_add_client(struct worker *w, struct http_client *c);
void
worker_monitor_input(struct http_client *c);
void
worker_can_read(int fd, short event, void *p);
void
worker_process_client(struct http_client *c);
#endif
Loading…
Cancel
Save