diff --git a/Makefile b/Makefile index 5dae2d4..98c0c62 100644 --- a/Makefile +++ b/Makefile @@ -1,14 +1,28 @@ OUT=webdis -HIREDIS_OBJ=hiredis/hiredis.o hiredis/sds.o hiredis/net.o hiredis/async.o hiredis/dict.o -JANSSON_OBJ=jansson/src/dump.o jansson/src/error.o jansson/src/hashtable.o jansson/src/load.o jansson/src/strbuffer.o jansson/src/utf.o jansson/src/value.o jansson/src/variadic.o +HIREDIS_OBJ=hiredis/hiredis.o hiredis/sds.o hiredis/net.o hiredis/async.o +JANSSON_OBJ=jansson/src/dump.o jansson/src/error.o jansson/src/hashtable.o jansson/src/load.o jansson/src/strbuffer.o jansson/src/utf.o jansson/src/value.o jansson/src/variadic.o FORMAT_OBJS=formats/json.o formats/raw.o formats/common.o formats/custom-type.o formats/bson.o HTTP_PARSER_OBJS=http-parser/http_parser.o -DEPS=$(FORMAT_OBJS) $(HIREDIS_OBJ) $(JANSSON_OBJ) $(HTTP_PARSER_OBJS) -OBJS=webdis.o cmd.o worker.o slog.o server.o libb64/cencode.o acl.o md5/md5.o sha1/sha1.o http.o client.o websocket.o pool.o conf.o $(DEPS) CFLAGS=-O3 -Wall -Wextra -I. -Ijansson/src -Ihttp-parser LDFLAGS=-levent -pthread +# check for MessagePack +MSGPACK_LIB=$(shell ls /usr/lib/libmsgpack.so) +ifneq ($(strip $(MSGPACK_LIB)),) + FORMAT_OBJS += formats/msgpack.o + CFLAGS += -DMSGPACK=1 + LDFLAGS += -lmsgpack +else + CFLAGS += -DMSGPACK=0 +endif + + +DEPS=$(FORMAT_OBJS) $(HIREDIS_OBJ) $(JANSSON_OBJ) $(HTTP_PARSER_OBJS) +OBJS=webdis.o cmd.o worker.o slog.o server.o libb64/cencode.o acl.o md5/md5.o sha1/sha1.o http.o client.o websocket.o pool.o conf.o $(DEPS) + + + PREFIX ?= /usr/local CONFDIR ?= $(DESTDIR)/etc diff --git a/README.markdown b/README.markdown index 3ed4081..38b6def 100644 --- a/README.markdown +++ b/README.markdown @@ -24,6 +24,7 @@ curl -d "GET/hello" http://127.0.0.1:7379/ * JSON output by default, optional JSONP parameter (`?jsonp=myFunction` or `?callback=myFunction`). * Raw Redis 2.0 protocol output with `.raw` suffix * BSON support for compact responses and MongoDB compatibility. +* MessagePack output with `.msg` suffix * HTTP 1.1 pipelining (70,000 http requests per second on a desktop Linux machine.) * Multi-threaded server, configurable number of worker threads. * WebSocket support (Currently using the “hixie-76” specification). @@ -173,6 +174,7 @@ Several content-types are available: * `.json` for `application/json` (this is the default Content-Type). * `.bson` for `application/bson`. See [http://bsonspec.org/](http://bsonspec.org/) for the specs. +* `.msg` for `application/x-msgpack`. See [http://msgpack.org/](http://msgpack.org/) for the specs. * `.txt` for `text/plain` * `.html` for `text/html` * `xhtml` for `application/xhtml+xml` diff --git a/cmd.c b/cmd.c index 0d401d8..1679e9f 100644 --- a/cmd.c +++ b/cmd.c @@ -10,6 +10,9 @@ #include "formats/json.h" #include "formats/bson.h" #include "formats/raw.h" +#ifdef MSGPACK +#include "formats/msgpack.h" +#endif #include "formats/custom-type.h" #include @@ -265,6 +268,10 @@ cmd_select_format(struct http_client *client, struct cmd *cmd, {.s = "raw", .sz = 3, .f = raw_reply, .ct = "binary/octet-stream"}, {.s = "bson", .sz = 4, .f = bson_reply, .ct = "application/bson"}, +#ifdef MSGPACK + {.s = "msg", .sz = 3, .f = msgpack_reply, .ct = "application/x-msgpack"}, +#endif + {.s = "bin", .sz = 3, .f = custom_type_reply, .ct = "binary/octet-stream"}, {.s = "txt", .sz = 3, .f = custom_type_reply, .ct = "text/plain"}, {.s = "html", .sz = 4, .f = custom_type_reply, .ct = "text/html"}, diff --git a/formats/msgpack.c b/formats/msgpack.c new file mode 100644 index 0000000..dc1ab0c --- /dev/null +++ b/formats/msgpack.c @@ -0,0 +1,236 @@ +#include "msgpack.h" +#include "common.h" +#include "cmd.h" +#include "http.h" +#include "client.h" + +#include +#include +#include + +struct msg_out { + char *p; + size_t sz; +}; + +static void +msgpack_wrap_redis_reply(const struct cmd *cmd, struct msg_out *, const redisReply *r); + +void +msgpack_reply(redisAsyncContext *c, void *r, void *privdata) { + + redisReply *reply = r; + struct cmd *cmd = privdata; + struct msg_out out; + (void)c; + + if(cmd == NULL) { + /* broken connection */ + return; + } + + if (reply == NULL) { /* broken Redis link */ + format_send_error(cmd, 503, "Service Unavailable"); + return; + } + + /* prepare data structure for output */ + out.p = NULL; + out.sz = 0; + + /* encode redis reply */ + msgpack_wrap_redis_reply(cmd, &out, r); + + /* send reply */ + format_send_reply(cmd, out.p, out.sz, "application/x-msgpack"); + + /* cleanup */ + free(out.p); +} + +static int +on_msgpack_write(void *data, const char *s, unsigned int sz) { + + struct msg_out *out = data; + + out->p = realloc(out->p, out->sz + sz); + memcpy(out->p + out->sz, s, sz); + out->sz += sz; + + return sz; +} + +/** + * Parse info message and return object. + */ +void +msg_info_reply(msgpack_packer* pk, const char *s, size_t sz) { + + const char *p = s; + unsigned int count = 0; + + /* TODO: handle new format */ + + /* count number of lines */ + while(p < s + sz) { + p = strchr(p, '\r'); + if(!p) break; + + p++; + count++; + } + + /* create msgpack object */ + msgpack_pack_map(pk, count); + + p = s; + while(p < s + sz) { + char *key, *val, *nl, *colon; + size_t key_sz, val_sz; + + /* find key */ + colon = strchr(p, ':'); + if(!colon) { + break; + } + key_sz = colon - p; + key = calloc(key_sz + 1, 1); + memcpy(key, p, key_sz); + p = colon + 1; + + /* find value */ + nl = strchr(p, '\r'); + if(!nl) { + free(key); + break; + } + val_sz = nl - p; + val = calloc(val_sz + 1, 1); + memcpy(val, p, val_sz); + p = nl + 1; + if(*p == '\n') p++; + + /* add to object */ + msgpack_pack_raw(pk, key_sz); + msgpack_pack_raw_body(pk, key, key_sz); + msgpack_pack_raw(pk, val_sz); + msgpack_pack_raw_body(pk, val, val_sz); + + free(key); + free(val); + } +} + +static void +msg_hgetall_reply(msgpack_packer* pk, const redisReply *r) { + + /* zip keys and values together in a msgpack object */ + + unsigned int i; + + if(r->elements % 2 != 0) { + return; + } + + msgpack_pack_map(pk, r->elements / 2); + for(i = 0; i < r->elements; i += 2) { + redisReply *k = r->element[i], *v = r->element[i+1]; + + /* keys and values need to be strings */ + if(k->type != REDIS_REPLY_STRING || v->type != REDIS_REPLY_STRING) { + return; + } + + /* key */ + msgpack_pack_raw(pk, k->len); + msgpack_pack_raw_body(pk, k->str, k->len); + + /* value */ + msgpack_pack_raw(pk, v->len); + msgpack_pack_raw_body(pk, v->str, v->len); + } +} + +static void +msgpack_wrap_redis_reply(const struct cmd *cmd, struct msg_out *out, const redisReply *r) { + + unsigned int i; + msgpack_packer* pk = msgpack_packer_new(out, on_msgpack_write); + + /* copy verb, as jansson only takes a char* but not its length. */ + char *verb = ""; + size_t verb_sz = 0; + if(cmd->count) { + verb_sz = cmd->argv_len[0]; + verb = cmd->argv[0]; + } + + /* Create map object */ + msgpack_pack_map(pk, 1); + + /* The single element is the verb */ + msgpack_pack_raw(pk, verb_sz); + msgpack_pack_raw_body(pk, verb, verb_sz); + + switch(r->type) { + case REDIS_REPLY_STATUS: + case REDIS_REPLY_ERROR: + msgpack_pack_array(pk, 2); + + /* first element: book */ + if(r->type == REDIS_REPLY_ERROR) + msgpack_pack_false(pk); + else + msgpack_pack_true(pk); + + /* second element: message */ + msgpack_pack_raw(pk, r->len); + msgpack_pack_raw_body(pk, r->str, r->len); + break; + + case REDIS_REPLY_STRING: + if(verb_sz ==4 && strncasecmp(verb, "INFO", 4) == 0) { + msg_info_reply(pk, r->str, r->len); + } else { + msgpack_pack_raw(pk, r->len); + msgpack_pack_raw_body(pk, r->str, r->len); + } + break; + + case REDIS_REPLY_INTEGER: + msgpack_pack_int(pk, r->integer); + break; + + case REDIS_REPLY_ARRAY: + if(verb_sz == 7 && strncasecmp(verb, "HGETALL", 7) == 0) { + msg_hgetall_reply(pk, r); + break; + } + + msgpack_pack_array(pk, r->elements); + + for(i = 0; i < r->elements; ++i) { + redisReply *e = r->element[i]; + switch(e->type) { + case REDIS_REPLY_STRING: + msgpack_pack_raw(pk, e->len); + msgpack_pack_raw_body(pk, e->str, e->len); + break; + case REDIS_REPLY_INTEGER: + msgpack_pack_int(pk, e->integer); + break; + default: + msgpack_pack_nil(pk); + break; + } + } + + break; + + default: + msgpack_pack_nil(pk); + break; + } + + msgpack_packer_free(pk); +} diff --git a/formats/msgpack.h b/formats/msgpack.h new file mode 100644 index 0000000..796caec --- /dev/null +++ b/formats/msgpack.h @@ -0,0 +1,11 @@ +#ifndef MSGPACK_H +#define MSGPACK_H + +#include +#include +#include + +void +msgpack_reply(redisAsyncContext *c, void *r, void *privdata); + +#endif diff --git a/tests/basic.py b/tests/basic.py index c85f5a2..1d55d2d 100755 --- a/tests/basic.py +++ b/tests/basic.py @@ -5,6 +5,10 @@ try: import bson except: bson = None +try: + import msgpack +except: + msgpack = None host = '127.0.0.1' @@ -177,6 +181,60 @@ class TestBSon(TestWebdis): self.assertTrue(obj[0]['UNKNOWN'][0] == False) self.assertTrue(isinstance(obj[0]['UNKNOWN'][1], bson.Binary)) +def need_msgpack(fn): + def wrapper(self): + if msgpack: + fn(self) + return wrapper + +class TestMsgPack(TestWebdis): + + @need_msgpack + def test_set(self): + "success type (+OK)" + self.query('DEL/hello') + f = self.query('SET/hello/world.msg') + self.assertTrue(f.headers.getheader('Content-Type') == 'application/x-msgpack') + obj = msgpack.loads(f.read()) + self.assertTrue(obj == {'SET': (True, 'OK')}) + + @need_msgpack + def test_get(self): + "string type" + self.query('SET/hello/world') + f = self.query('GET/hello.msg') + obj = msgpack.loads(f.read()) + self.assertTrue(obj == {'GET': 'world'}) + + @need_msgpack + def test_incr(self): + "integer type" + self.query('DEL/hello') + f = self.query('INCR/hello.msg') + obj = msgpack.loads(f.read()) + self.assertTrue(obj == {'INCR': 1}) + + @need_msgpack + def test_list(self): + "list type" + self.query('DEL/hello') + self.query('RPUSH/hello/abc') + self.query('RPUSH/hello/def') + f = self.query('LRANGE/hello/0/-1.msg') + obj = msgpack.loads(f.read()) + self.assertTrue(obj == {'LRANGE': ('abc', 'def')}) + + @need_msgpack + def test_error(self): + "error return type" + f = self.query('UNKNOWN/COMMAND.msg') + obj = msgpack.loads(f.read()) + self.assertTrue('UNKNOWN' in obj) + self.assertTrue(isinstance(obj, dict)) + self.assertTrue(isinstance(obj['UNKNOWN'], tuple)) + self.assertTrue(obj['UNKNOWN'][0] == False) + self.assertTrue(isinstance(obj['UNKNOWN'][1], str)) + class TestETag(TestWebdis): def test_etag_match(self): @@ -201,7 +259,7 @@ class TestBadRequest(TestWebdis): self.query('DEL/hello') self.query('LPUSH/hello/world') # "hello" is a list. try: - f = self.query('LRANGE/hello/world.txt') # let's try a range query on it (valid) but as text (invalid) + f = self.query('LRANGE/hello/0/-1.txt') # let's try a range query on it (valid) but as text (invalid) except urllib2.HTTPError as e: self.assertTrue(e.code == 400) return