Fix pub/sub. Valgrind is happy.

master
Nicolas Favre-Felix 14 years ago
parent e914952435
commit 6026811c02

@ -31,18 +31,17 @@ curl -d "GET/hello" http://127.0.0.1:7379/
* Custom Content-Type using a pre-defined file extension, or with `?type=some/thing`. * Custom Content-Type using a pre-defined file extension, or with `?type=some/thing`.
* URL-encoded parameters for binary data or slashes. For instance, `%2f` is decoded as `/` but not used as a command separator. * URL-encoded parameters for binary data or slashes. For instance, `%2f` is decoded as `/` but not used as a command separator.
* Logs, with a configurable verbosity. * Logs, with a configurable verbosity.
* Cross-origin XHR, if compiled with libevent2 (for `OPTIONS` support). * Cross-origin requests, usable with XMLHttpRequest2 (Cross-Origin Resource Sharing - CORS).
* File upload with PUT, if compiled with libevent2 (for `PUT` support). * File upload with PUT.
# Ideas, TODO... # Ideas, TODO...
* Add better support for PUT, DELETE, HEAD, OPTIONS? How? For which commands? * Add better support for PUT, DELETE, HEAD, OPTIONS? How? For which commands?
* Switch from evhttp to raw libevent + the http-parser library from node.js for clean disconnection support on SUBSCRIBE commands.
* MULTI/EXEC/DISCARD/WATCH are disabled at the moment; find a way to use them. * MULTI/EXEC/DISCARD/WATCH are disabled at the moment; find a way to use them.
* Support POST of raw Redis protocol data, and execute the whole thing. This could be useful for MULTI/EXEC transactions. * Support POST of raw Redis protocol data, and execute the whole thing. This could be useful for MULTI/EXEC transactions.
* Enrich config file: * Enrich config file:
* Provide timeout (maybe for some commands only?). What should the response be? 504 Gateway Timeout? 503 Service Unavailable? * Provide timeout (maybe for some commands only?). What should the response be? 504 Gateway Timeout? 503 Service Unavailable?
* Multi-server support, using consistent hashing. * Multi-server support, using consistent hashing.
* Add WebSocket support (with which protocol?). This will only be possible after the switch from evhttp to http-parser. * Add WebSocket support (with which protocol?).
* Send your ideas using the github tracker, on twitter [@yowgi](http://twitter.com/yowgi) or by mail to n.favrefelix@gmail.com. * Send your ideas using the github tracker, on twitter [@yowgi](http://twitter.com/yowgi) or by mail to n.favrefelix@gmail.com.
# HTTP error codes # HTTP error codes

39
cmd.c

@ -39,24 +39,6 @@ cmd_free(struct cmd *c) {
free(c); free(c);
} }
/**
* Detect disconnection of a pub/sub client. We need to clean up the command.
*/
void on_http_disconnect(struct evhttp_connection *evcon, void *ctx) {
struct pubsub_client *ps = ctx;
(void)evcon;
/* clean up redis object */
redisAsyncFree(ps->s->ac);
/* clean up command object */
if(ps->cmd) {
cmd_free(ps->cmd);
}
free(ps);
}
/* taken from libevent */ /* taken from libevent */
static char * static char *
decode_uri(const char *uri, size_t length, size_t *out_len, int always_decode_plus) { decode_uri(const char *uri, size_t length, size_t *out_len, int always_decode_plus) {
@ -136,18 +118,13 @@ cmd_run(struct server *s, struct http_client *client,
return -1; return -1;
} }
/* FIXME:check if we have to split the connection */ /* check if we have to split the connection */
/*
if(cmd_is_subscribe(cmd)) { if(cmd_is_subscribe(cmd)) {
struct pubsub_client *ps;
ps = calloc(1, sizeof(struct pubsub_client)); client->sub = malloc(sizeof(struct subscription));
ps->s = s = server_copy(s); client->sub->s = s = server_copy(s);
ps->cmd = cmd; client->sub->cmd = cmd;
ps->rq = rq;
evhttp_connection_set_closecb(rq->evcon, on_http_disconnect, ps);
} }
*/
/* no args (e.g. INFO command) */ /* no args (e.g. INFO command) */
if(!slash) { if(!slash) {
@ -247,7 +224,7 @@ cmd_select_format(struct http_client *client, struct cmd *cmd,
} }
} }
/* FIXME:the user can force it with ?type=some/thing */ /* the user can force it with ?type=some/thing */
if(client->qs_type.s) { if(client->qs_type.s) {
*f_format = custom_type_reply; *f_format = custom_type_reply;
cmd->mime = strdup(client->qs_type.s); cmd->mime = strdup(client->qs_type.s);
@ -260,6 +237,12 @@ cmd_select_format(struct http_client *client, struct cmd *cmd,
int int
cmd_is_subscribe(struct cmd *cmd) { cmd_is_subscribe(struct cmd *cmd) {
/*
if(cmd->started_responding) {
return 1;
}
*/
if(cmd->count >= 1 && if(cmd->count >= 1 &&
(strncasecmp(cmd->argv[0], "SUBSCRIBE", cmd->argv_len[0]) == 0 || (strncasecmp(cmd->argv[0], "SUBSCRIBE", cmd->argv_len[0]) == 0 ||
strncasecmp(cmd->argv[0], "PSUBSCRIBE", cmd->argv_len[0]) == 0)) { strncasecmp(cmd->argv[0], "PSUBSCRIBE", cmd->argv_len[0]) == 0)) {

@ -15,7 +15,6 @@ struct cmd;
typedef void (*formatting_fun)(redisAsyncContext *, void *, void *); typedef void (*formatting_fun)(redisAsyncContext *, void *, void *);
struct cmd { struct cmd {
int count; int count;
const char **argv; const char **argv;
size_t *argv_len; size_t *argv_len;
@ -28,10 +27,9 @@ struct cmd {
int mime_free; int mime_free;
}; };
struct pubsub_client { struct subscription {
struct server *s; struct server *s;
struct cmd *cmd; struct cmd *cmd;
struct evhttp_request *rq;
}; };
struct cmd * struct cmd *

@ -33,7 +33,6 @@ format_send_reply(struct cmd *cmd, const char *p, size_t sz, const char *content
int free_cmd = 1; int free_cmd = 1;
if(cmd_is_subscribe(cmd)) { if(cmd_is_subscribe(cmd)) {
free_cmd = 0; free_cmd = 0;
@ -42,18 +41,15 @@ format_send_reply(struct cmd *cmd, const char *p, size_t sz, const char *content
const char *ct = cmd->mime?cmd->mime:content_type; const char *ct = cmd->mime?cmd->mime:content_type;
cmd->started_responding = 1; cmd->started_responding = 1;
http_set_header(&cmd->client->out_content_type, ct, strlen(ct)); http_set_header(&cmd->client->out_content_type, ct, strlen(ct));
/*FIXME: http_send_reply_start(cmd->client, 200, "OK");
evhttp_send_reply_start(cmd->rq, 200, "OK");
*/
} }
/*FIXME: evhttp_send_reply_chunk(cmd->rq, body); */ http_send_reply_chunk(cmd->client, p, sz);
} else { } else {
/* compute ETag */ /* compute ETag */
char *etag = etag_new(p, sz); char *etag = etag_new(p, sz);
const char *if_none_match = cmd->client->header_if_none_match.s; const char *if_none_match = cmd->client->header_if_none_match.s;
/* FIXME */
#if 1
/* check If-None-Match */ /* check If-None-Match */
if(if_none_match && strncmp(if_none_match, etag, cmd->client->header_if_none_match.sz) == 0) { if(if_none_match && strncmp(if_none_match, etag, cmd->client->header_if_none_match.sz) == 0) {
/* SAME! send 304. */ /* SAME! send 304. */
@ -64,12 +60,11 @@ format_send_reply(struct cmd *cmd, const char *p, size_t sz, const char *content
http_set_header(&cmd->client->out_etag, etag, strlen(etag)); http_set_header(&cmd->client->out_etag, etag, strlen(etag));
http_send_reply(cmd->client, 200, "OK", p, sz); http_send_reply(cmd->client, 200, "OK", p, sz);
} }
#endif
free(etag); free(etag);
} }
/* cleanup */ /* cleanup */
if(free_cmd) { if(free_cmd) {
/*FIXME: evhttp_clear_headers(&cmd->uri_params); */
cmd_free(cmd); cmd_free(cmd);
} }
} }

@ -17,7 +17,6 @@ custom_type_reply(redisAsyncContext *c, void *r, void *privdata) {
int int_len; int int_len;
if(reply == NULL) { if(reply == NULL) {
http_send_reply(cmd->client, 404, "Not Found", NULL, 0);
return; return;
} }

@ -25,7 +25,6 @@ json_reply(redisAsyncContext *c, void *r, void *privdata) {
} }
if (reply == NULL) { if (reply == NULL) {
http_send_reply(cmd->client, 404, "Not Found", NULL, 0);
return; return;
} }

@ -20,7 +20,6 @@ raw_reply(redisAsyncContext *c, void *r, void *privdata) {
(void)c; (void)c;
if (reply == NULL) { if (reply == NULL) {
http_send_reply(cmd->client, 404, "Not Found", NULL, 0);
return; return;
} }

@ -61,6 +61,10 @@ http_client_read(int fd, short event, void *ctx) {
static void static void
http_client_cleanup(struct http_client *c) { http_client_cleanup(struct http_client *c) {
if(c->sub) {
return; /* we need to keep those. */
}
free(c->path.s); free(c->path.s);
memset(&c->path, 0, sizeof(str_t)); memset(&c->path, 0, sizeof(str_t));
@ -100,6 +104,18 @@ http_client_free(struct http_client *c) {
event_del(&c->ev); event_del(&c->ev);
close(c->fd); close(c->fd);
if(c->sub) {
/* clean up redis object */
redisAsyncFree(c->sub->s->ac);
/* clean up command object */
if(c->sub->cmd) {
cmd_free(c->sub->cmd);
}
free(c->sub);
c->sub = NULL;
}
http_client_cleanup(c); http_client_cleanup(c);
free(c); free(c);
} }
@ -128,7 +144,7 @@ http_client_keep_alive(struct http_client *c) {
void void
http_client_reset(struct http_client *c) { http_client_reset(struct http_client *c) {
if(!http_client_keep_alive(c)) { if(!http_client_keep_alive(c) && !c->sub) {
http_client_free(c); http_client_free(c);
return; return;
} }
@ -285,29 +301,65 @@ http_send_reply(struct http_client *c, short code, const char *msg,
http_response_init(c, &resp, code, msg); http_response_init(c, &resp, code, msg);
sprintf(content_length, "%zd", body_len); sprintf(content_length, "%zd", body_len);
http_response_set_header(&resp, "Content-Length", content_length); if(!c->sub) {
http_response_set_header(&resp, "Content-Length", content_length);
}
if(body_len) { if(body_len) {
http_response_set_header(&resp, "Content-Type", ct); http_response_set_header(&resp, "Content-Type", ct);
} }
if(c->sub) {
http_response_set_header(&resp, "Transfer-Encoding", "chunked");
}
if(code == 200 && c->out_etag.s) { if(code == 200 && c->out_etag.s) {
http_response_set_header(&resp, "ETag", c->out_etag.s); http_response_set_header(&resp, "ETag", c->out_etag.s);
} }
http_response_set_body(&resp, body, body_len); http_response_set_body(&resp, body, body_len);
/* flush response in the socket */
if(http_response_send(&resp, c->fd)) { if(http_response_send(&resp, c->fd)) {
http_client_free(c); http_client_free(c);
} else { } else {
if(code == 200 && http_client_keep_alive(c)) { if(c->sub) { /* don't free the client, but monitor fd. */
http_client_serve(c);
return;
} else if(code == 200 && http_client_keep_alive(c)) { /* reset client */
http_client_reset(c); http_client_reset(c);
http_client_serve(c); http_client_serve(c);
} else { } else {
http_client_free(c); http_client_free(c); /* error or HTTP < 1.1: close */
} }
} }
} }
/* Transfer-encoding: chunked */
void
http_send_reply_start(struct http_client *c, short code, const char *msg) {
http_send_reply(c, code, msg, NULL, 0);
}
void
http_send_reply_chunk(struct http_client *c, const char *p, size_t sz) {
char buf[64];
int ret;
ret = sprintf(buf, "%x\r\n", (int)sz);
write(c->fd, buf, ret);
write(c->fd, p, sz);
write(c->fd, "\r\n", 2);
}
void
http_send_reply_end(struct http_client *c) {
http_send_reply_chunk(c, "", 0);
http_client_free(c);
}
void void
http_set_header(str_t *h, const char *p, size_t sz) { http_set_header(str_t *h, const char *p, size_t sz) {

@ -35,9 +35,12 @@ struct http_client {
str_t out_content_type; str_t out_content_type;
str_t out_etag; str_t out_etag;
/* query string */
str_t qs_type; str_t qs_type;
str_t qs_jsonp; str_t qs_jsonp;
struct subscription *sub;
/* private, used in HTTP parser */ /* private, used in HTTP parser */
str_t last_header_name; str_t last_header_name;
}; };
@ -91,6 +94,16 @@ void
http_send_reply(struct http_client *c, short code, const char *msg, http_send_reply(struct http_client *c, short code, const char *msg,
const char *body, size_t body_len); const char *body, size_t body_len);
/* Transfer-encoding: chunked */
void
http_send_reply_start(struct http_client *c, short code, const char *msg);
void
http_send_reply_chunk(struct http_client *c, const char *p, size_t sz);
void
http_send_reply_end(struct http_client *c);
void void
http_send_error(struct http_client *c, short code, const char *msg); http_send_error(struct http_client *c, short code, const char *msg);
@ -107,5 +120,4 @@ http_response_set_body(struct http_response *r, const char *body, size_t body_le
int int
http_response_send(struct http_response *r, int fd); http_response_send(struct http_response *r, int fd);
#endif #endif

Loading…
Cancel
Save