Merge branch 'master' into pubsub

master
Nicolas Favre-Felix 14 years ago
commit 2f5454ca1a

@ -0,0 +1,24 @@
Copyright (c) 2010-2011, Nicolas Favre-Felix
All rights reserved.
Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice,
this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

@ -1,9 +1,9 @@
OUT=turnip
OUT=webdis
HIREDIS_OBJ=hiredis/hiredis.o hiredis/sds.o hiredis/net.o hiredis/async.o
JANSSON_OBJ=jansson/src/dump.o jansson/src/error.o jansson/src/hashtable.o jansson/src/load.o jansson/src/strbuffer.o jansson/src/utf.o jansson/src/value.o jansson/src/variadic.o
FORMAT_OBJS=formats/json.o formats/raw.o
OBJS=turnip.o conf.o $(FORMAT_OBJS) cmd.o server.o $(HIREDIS_OBJ) $(JANSSON_OBJ)
CFLAGS=-O0 -ggdb -Wall -Wextra -I. -Ijansson/src
OBJS=webdis.o conf.o $(FORMAT_OBJS) cmd.o server.o $(HIREDIS_OBJ) $(JANSSON_OBJ)
CFLAGS=-O3 -Wall -Wextra -I. -Ijansson/src
LDFLAGS=-levent
all: $(OUT) Makefile

@ -1,10 +1,10 @@
# About
A very simple prototype providing an HTTP interface to Redis. It uses [hiredis](https://github.com/antirez/hiredis) and [jansson](https://github.com/akheron/jansson).
A very simple web server providing an HTTP interface to Redis. It uses [hiredis](https://github.com/antirez/hiredis), [jansson](https://github.com/akheron/jansson) and libevent.
<pre>
make clean all
./turnip &
./webdis &
curl http://127.0.0.1:7379/SET/hello/world
→ {"SET":[true,"OK"]}
curl http://127.0.0.1:7379/GET/hello
@ -19,28 +19,31 @@ curl -d "GET/hello" http://127.0.0.1:7379/
* GET and POST are supported.
* JSON output by default, optional JSONP parameter.
* Raw Redis 2.0 protocol output with `?format=raw`
* HTTP 1.1 pipelining (45 kqps on a desktop Linux machine.)
* HTTP 1.1 pipelining (50,000 http requests per second on a desktop Linux machine.)
* Connects to Redis using a TCP or UNIX socket.
* Restricted commands by IP range (CIDR subnet + mask), returning 403 errors.
* Possible Redis authentication in the config file.
# Ideas, TODO...
* Add meta-data info per key (MIME type in a second key, for instance).
* Support PUT, DELETE, HEAD?
* Support pub/sub.
* Disable MULTI/EXEC/DISCARD/WATCH.
* Add logging.
* Support PUT, DELETE, HEAD, OPTIONS? How? For which commands?
* Support pub/sub (waiting for HiRedis ticket \#17 in order to add this.)
* MULTI/EXEC/DISCARD/WATCH are disabled at the moment; find a way to use them.
* Drop privileges on startup.
* Add logs.
* Support POST of raw Redis protocol data, and execute the whole thing. This could be useful for MULTI/EXEC transactions.
* Enrich config file:
* Provide timeout (this needs to be added to hiredis first.)
* Restrict commands by IP range
* Get config file path from command line.
* Change config file to JSON format? That would be convenient.
* Send your ideas using the github tracker or on twitter [@yowgi](http://twitter.com/yowgi).
* Multi-server support, using consistent hashing.
* Send your ideas using the github tracker, on twitter [@yowgi](http://twitter.com/yowgi) or by mail to n.favrefelix@gmail.com.
# HTTP error codes
* Unknown HTTP verb: 405 Method Not Allowed
* Redis is unreachable: 503 Service Unavailable
* Could also be used:
* Timeout on the redis side: 503 Service Unavailable
* Timeout on the redis side: 503 Service Unavailable (this isn't supported by HiRedis yet).
* Missing key: 404 Not Found
* Unauthorized command (disabled in config file): 403 Forbidden
# Command format
The URI `/COMMAND/arg0/arg1/.../argN` executes the command on Redis and returns the response to the client. GET and POST are supported:

56
cmd.c

@ -1,5 +1,6 @@
#include "cmd.h"
#include "server.h"
#include "conf.h"
#include "formats/json.h"
#include "formats/raw.h"
@ -7,6 +8,8 @@
#include <stdlib.h>
#include <string.h>
#include <hiredis/hiredis.h>
#include <netinet/in.h>
#include <arpa/inet.h>
struct cmd *
cmd_new(struct evhttp_request *rq, int count) {
@ -39,7 +42,49 @@ void on_http_disconnect(struct evhttp_connection *evcon, void *ctx) {
redisAsyncDisconnect(s->ac);
}
void
int
cmd_authorized(struct conf *cfg, struct evhttp_request *rq, const char *verb, size_t verb_len) {
char *always_off[] = {"MULTI", "EXEC", "WATCH", "DISCARD"};
struct disabled_command *dc;
unsigned int i;
char *client_ip;
u_short client_port;
in_addr_t client_addr;
/* some commands are always disabled, regardless of the config file. */
for(i = 0; i < sizeof(always_off) / sizeof(always_off[0]); ++i) {
if(strncasecmp(always_off[i], verb, verb_len) == 0) {
return 0;
}
}
/* find client's address */
evhttp_connection_get_peer(rq->evcon, &client_ip, &client_port);
client_addr = ntohl(inet_addr(client_ip));
for(dc = cfg->disabled; dc; dc = dc->next) {
/* CIDR test */
if((client_addr & dc->mask) != (dc->subnet & dc->mask)) {
continue;
}
/* matched an ip */
for(i = 0; i < dc->count; ++i) {
if(strncasecmp(dc->commands[i], verb, verb_len) == 0) {
return 0;
}
}
}
return 1;
}
int
cmd_run(struct server *s, struct evhttp_request *rq,
const char *uri, size_t uri_len) {
@ -78,6 +123,12 @@ cmd_run(struct server *s, struct evhttp_request *rq,
cmd->argv[0] = uri;
cmd->argv_len[0] = cmd_len;
/* check that the client is able to run this command */
if(!cmd_authorized(s->cfg, rq, cmd->argv[0], cmd->argv_len[0])) {
return -1;
}
/* check if we have to split the connection */
if(strncasecmp(cmd->argv[0], "SUBSCRIBE", cmd->argv_len[0]) == 0) {
s = server_copy(s);
@ -86,7 +137,7 @@ cmd_run(struct server *s, struct evhttp_request *rq,
if(!slash) {
redisAsyncCommandArgv(s->ac, fun, cmd, 1, cmd->argv, cmd->argv_len);
return;
return 0;
}
p = slash + 1;
while(p < uri + uri_len) {
@ -110,6 +161,7 @@ cmd_run(struct server *s, struct evhttp_request *rq,
}
redisAsyncCommandArgv(s->ac, fun, cmd, param_count, cmd->argv, cmd->argv_len);
return 0;
}

@ -30,7 +30,7 @@ cmd_new(struct evhttp_request *rq, int count);
void
cmd_free(struct cmd *c);
void
int
cmd_run(struct server *s, struct evhttp_request *rq,
const char *uri, size_t uri_len);

144
conf.c

@ -2,73 +2,137 @@
#include <stdio.h>
#include <string.h>
#include <ctype.h>
#include <arpa/inet.h>
#include <jansson.h>
#include "conf.h"
static char *
skipspaces(char *p) {
while(isspace(*p)) p++;
return p;
}
static struct disabled_command *
conf_disable_commands(json_t *jtab);
struct conf *
conf_read(const char *filename) {
json_t *j;
json_error_t error;
struct conf *conf;
void *kv;
FILE *f = fopen(filename, "r");
if(!f) {
return NULL;
}
/* defaults */
conf = calloc(1, sizeof(struct conf));
conf->redis_host = strdup("127.0.0.1");
conf->redis_port = 6379;
conf->http_host = strdup("0.0.0.0");
conf->http_port = 7379;
while(!feof(f)) {
char buffer[100], *ret;
memset(buffer, 0, sizeof(buffer));
if(!(ret = fgets(buffer, sizeof(buffer)-1, f))) {
break;
}
if(*ret == '#') { /* comments */
continue;
j = json_load_file(filename, 0, &error);
if(!j) {
fprintf(stderr, "Error: %s (line %d)\n", error.text, error.line);
return conf;
}
for(kv = json_object_iter(j); kv; kv = json_object_iter_next(j, kv)) {
json_t *jtmp = json_object_iter_value(kv);
if(strcmp(json_object_iter_key(kv), "redis_host") == 0 && json_typeof(jtmp) == JSON_STRING) {
free(conf->redis_host);
conf->redis_host = strdup(json_string_value(jtmp));
} else if(strcmp(json_object_iter_key(kv), "redis_port") == 0 && json_typeof(jtmp) == JSON_INTEGER) {
conf->redis_port = (short)json_integer_value(jtmp);
} else if(strcmp(json_object_iter_key(kv), "redis_auth") == 0 && json_typeof(jtmp) == JSON_STRING) {
conf->redis_auth = strdup(json_string_value(jtmp));
} else if(strcmp(json_object_iter_key(kv), "http_host") == 0 && json_typeof(jtmp) == JSON_STRING) {
free(conf->http_host);
conf->http_host = strdup(json_string_value(jtmp));
} else if(strcmp(json_object_iter_key(kv), "http_port") == 0 && json_typeof(jtmp) == JSON_INTEGER) {
conf->http_port = (short)json_integer_value(jtmp);
} else if(strcmp(json_object_iter_key(kv), "disable") == 0 && json_typeof(jtmp) == JSON_OBJECT) {
conf->disabled = conf_disable_commands(jtmp);
}
}
json_decref(j);
return conf;
}
struct disabled_command *
conf_disable_commands(json_t *jtab) {
struct disabled_command *root = NULL;
void *kv;
for(kv = json_object_iter(jtab); kv; kv = json_object_iter_next(jtab, kv)) {
unsigned int i, cur, n;
char *p, *ip;
const char *s;
in_addr_t mask, subnet;
unsigned short mask_bits = 0;
struct disabled_command *dc;
json_t *val = json_object_iter_value(kv);
if(*ret != 0) {
ret[strlen(ret)-1] = 0; /* remove new line */
if(json_typeof(val) != JSON_ARRAY) {
continue; /* TODO: report error? */
}
if(strncmp(ret, "redis_host", 10) == 0) {
conf->redis_host = strdup(skipspaces(ret + 11));
} else if(strncmp(ret, "redis_port", 10) == 0) {
conf->redis_port = (short)atoi(skipspaces(ret + 10));
} else if(strncmp(ret, "http_host", 10) == 0) {
conf->http_host = strdup(skipspaces(ret + 11));
} else if(strncmp(ret, "http_port", 9) == 0) {
conf->http_port = (short)atoi(skipspaces(ret + 10));
/* parse key in format "ip/mask" */
s = json_object_iter_key(kv);
p = strchr(s, '/');
if(!p) {
ip = strdup(s);
} else {
ip = calloc((size_t)(p - s + 1), 1);
memcpy(ip, s, (size_t)(p - s));
mask_bits = (unsigned short)atoi(p+1);
}
mask = (mask_bits == 0 ? 0 : (0xffffffff << (32 - mask_bits)));
subnet = ntohl(inet_addr(ip)) & mask;
/* count strings in the array */
n = 0;
for(i = 0; i < json_array_size(val); ++i) {
json_t *jelem = json_array_get(val, (size_t)i);
if(json_typeof(jelem) == JSON_STRING) {
n++;
}
}
}
fclose(f);
/* default values */
if(!conf->redis_host) {
conf->redis_host = strdup("127.0.0.1");
}
if(!conf->http_host) {
conf->http_host = strdup("0.0.0.0");
/* allocate block */
dc = calloc(1, sizeof(struct disabled_command));
dc->commands = calloc((size_t)n, sizeof(char*));
dc->subnet = subnet;
dc->mask = mask;
dc->count = n;
dc->next = root;
root = dc;
/* add all disabled commands */
for(i = 0, cur = 0; i < json_array_size(val); ++i) {
json_t *jelem = json_array_get(val, i);
if(json_typeof(jelem) == JSON_STRING) {
size_t sz;
s = json_string_value(jelem);
sz = strlen(s);
dc->commands[cur] = calloc(1 + sz, 1);
memcpy(dc->commands[cur], s, sz);
cur++;
}
}
}
return conf;
return root;
}
void
conf_free(struct conf *conf) {
free(conf->redis_host);
free(conf->redis_auth);
free(conf->http_host);
free(conf);

@ -1,14 +1,29 @@
#ifndef CONF_H
#define CONF_H
#include <netinet/in.h>
struct disabled_command {
in_addr_t subnet;
in_addr_t mask;
unsigned int count;
char **commands;
struct disabled_command *next;
};
struct conf {
char *redis_host;
short redis_port;
char *redis_auth;
char *http_host;
short http_port;
struct disabled_command *disabled;
};
struct conf *

@ -64,11 +64,13 @@ json_reply(redisAsyncContext *c, void *r, void *privdata) {
/* cleanup */
evbuffer_free(body);
json_decref(j);
free(jstr);
if(free_reply) {
freeReplyObject(r);
cmd_free(cmd);
}
free(jstr);
evhttp_clear_headers(&cmd->uri_params);
free(json_reply);
}
static json_t *

@ -13,12 +13,14 @@ raw_wrap(const redisReply *r, size_t *sz);
void
raw_reply(redisAsyncContext *c, void *r, void *privdata) {
(void)c;
struct evbuffer *body;
redisReply *reply = r;
struct cmd *cmd = privdata;
char *raw_out;
size_t sz;
(void)c;
evhttp_clear_headers(&cmd->uri_params);
if (reply == NULL) {
evhttp_send_reply(cmd->rq, 404, "Not Found", NULL);
@ -124,7 +126,7 @@ raw_wrap(const redisReply *r, size_t *sz) {
*sz = 1 + integer_length(r->len) + 1 + r->len + 1;
p = ret = malloc(*sz);
p += sprintf(p, "$%d\n", r->len);
memcpy(p, r->str, *sz - 1);
memcpy(p, r->str, *sz - 1 - (p-ret));
memcpy(ret + *sz - 1, "\n", 1);
return ret;

@ -0,0 +1,34 @@
/*
* Copyright (c) 2010 Petri Lehtinen <petri@digip.org>
*
* Jansson is free software; you can redistribute it and/or modify
* it under the terms of the MIT license. See LICENSE for details.
*
*
* This file specifies a part of the site-specific configuration for
* Jansson, namely those things that affect the public API in
* jansson.h.
*
* The configure script copies this file to jansson_config.h and
* replaces @var@ substitutions by values that fit your system. If you
* cannot run the configure script, you can do the value substitution
* by hand.
*/
#ifndef JANSSON_CONFIG_H
#define JANSSON_CONFIG_H
/* If your compiler supports the inline keyword in C, JSON_INLINE is
defined to `inline', otherwise empty. In C++, the inline is always
supported. */
#ifdef __cplusplus
#define JSON_INLINE inline
#else
#define JSON_INLINE inline
#endif
/* If your compiler supports the `long long` type,
JSON_INTEGER_IS_LONG_LONG is defined to 1, otherwise to 0. */
#define JSON_INTEGER_IS_LONG_LONG 1
#endif

@ -9,19 +9,20 @@
static void
connectCallback(const redisAsyncContext *c) {
((void)c);
printf("connected...\n");
}
static void
disconnectCallback(const redisAsyncContext *c, int status) {
struct server *s = c->data;
if (status != REDIS_OK) {
printf("Error: %s\n", c->errstr);
fprintf(stderr, "Error: %s\n", c->errstr);
}
printf("disconnected, schedule reconnect.\n");
s->ac = NULL;
turnip_connect(s);
/* wait 10 msec and reconnect */
s->tv_reconnect.tv_sec = 0;
s->tv_reconnect.tv_usec = 100000;
webdis_connect(s);
}
static void
@ -33,10 +34,9 @@ on_timer_reconnect(int fd, short event, void *ctx) {
if(s->ac) {
redisLibeventCleanup(s->ac->_adapter_data);
redisFree((redisContext*)s->ac);
}
/* TODO: free AC. */
if(s->cfg->redis_host[0] == '/') { /* unix socket */
s->ac = redisAsyncConnectUnix(s->cfg->redis_host);
} else {
@ -46,23 +46,23 @@ on_timer_reconnect(int fd, short event, void *ctx) {
s->ac->data = s;
if(s->ac->err) {
/* Let *c leak for now... */
printf("Error: %s\n", s->ac->errstr);
fprintf(stderr, "Error: %s\n", s->ac->errstr);
}
redisLibeventAttach(s->ac, s->base);
redisAsyncSetConnectCallback(s->ac, connectCallback);
redisAsyncSetDisconnectCallback(s->ac, disconnectCallback);
if (s->cfg->redis_auth) { /* authenticate. */
redisAsyncCommand(s->ac, NULL, NULL, "AUTH %s", s->cfg->redis_auth);
}
}
void
turnip_connect(struct server *s) {
webdis_connect(struct server *s) {
/* schedule reconnect */
evtimer_set(&s->ev_reconnect, on_timer_reconnect, s);
event_base_set(s->base, &s->ev_reconnect);
s->tv_reconnect.tv_sec = 1;
s->tv_reconnect.tv_usec = 0;
evtimer_add(&s->ev_reconnect, &s->tv_reconnect);
}

@ -1,5 +1,5 @@
#ifndef TURNIP_H
#define TURNIP_H
#ifndef SERVER_H
#define SERVER_H
#include <hiredis/async.h>
#include <time.h>
@ -17,7 +17,7 @@ struct server {
};
void
turnip_connect(struct server *s);
webdis_connect(struct server *s);
struct server *
server_copy(const struct server *s);

@ -1,5 +0,0 @@
redis_host 127.0.0.1
redis_port 6379
http_host 0.0.0.0
http_port 7379

@ -19,6 +19,7 @@ on_request(struct evhttp_request *rq, void *ctx) {
const char *uri = evhttp_request_uri(rq);
struct server *s = ctx;
int ret;
if(!s->ac) { /* redis is unavailable */
printf("503\n");
@ -26,15 +27,14 @@ on_request(struct evhttp_request *rq, void *ctx) {
return;
}
/* check that the command can be executed */
switch(rq->type) {
case EVHTTP_REQ_GET:
cmd_run(s, rq, 1+uri, strlen(uri)-1);
ret = cmd_run(s, rq, 1+uri, strlen(uri)-1);
break;
case EVHTTP_REQ_POST:
cmd_run(s, rq,
ret = cmd_run(s, rq,
(const char*)EVBUFFER_DATA(rq->input_buffer),
EVBUFFER_LENGTH(rq->input_buffer));
break;
@ -44,18 +44,24 @@ on_request(struct evhttp_request *rq, void *ctx) {
evhttp_send_reply(rq, 405, "Method Not Allowed", NULL);
return;
}
if(ret < 0) {
evhttp_send_reply(rq, 403, "Forbidden", NULL);
}
}
int
main(int argc, char *argv[]) {
(void)argc;
(void)argv;
struct server *s = calloc(1, sizeof(struct server));
s->base = event_base_new();
struct evhttp *http = evhttp_new(s->base);
s->cfg = conf_read("turnip.conf");
if(argc > 1) {
s->cfg = conf_read(argv[1]);
} else {
s->cfg = conf_read("webdis.json");
}
/* ignore sigpipe */
#ifdef SIGPIPE
@ -67,7 +73,7 @@ main(int argc, char *argv[]) {
evhttp_set_gencb(http, on_request, s);
/* attach hiredis to libevent base */
turnip_connect(s);
webdis_connect(s);
/* loop */
event_base_dispatch(s->base);

@ -0,0 +1,12 @@
{
"redis_host": "127.0.0.1",
"redis_port": 6379,
"redis_auth": null,
"http_host": "0.0.0.0",
"http_port": 7379,
"disable": {
"0.0.0.0/0": ["DEBUG", "FLUSHDB", "FLUSHALL"]
}
}
Loading…
Cancel
Save