From d8f6460e8ea40700f94d13981e4889376e3e6816 Mon Sep 17 00:00:00 2001 From: Nicolas Favre-Felix Date: Wed, 29 Dec 2010 13:50:49 +0100 Subject: [PATCH] Pub/sub progress... --- cmd.c | 28 +++++++++++++++++++++++----- formats/json.c | 15 ++++++++++++--- 2 files changed, 35 insertions(+), 8 deletions(-) diff --git a/cmd.c b/cmd.c index 9e6f259..fcbc77d 100644 --- a/cmd.c +++ b/cmd.c @@ -35,11 +35,24 @@ cmd_free(struct cmd *c) { free(c); } +struct pubsub_client { + struct server *s; + struct evhttp_request *rq; +}; + void __redisAsyncDisconnect(redisAsyncContext *ac); void on_http_disconnect(struct evhttp_connection *evcon, void *ctx) { - struct server *s = ctx; - printf("SUBSCRIBE DISCONNECT (ac=%p)\n", (void*)s->ac); - redisAsyncDisconnect(s->ac); + struct pubsub_client *ps = ctx; + + (void)evcon; + + printf("SUBSCRIBE DISCONNECT (ac=%p)\n", (void*)ps->s->ac); + if(ps->s->ac->replies.head) { /* TODO: proper free. */ + printf("NULLIFY closure\n"); + ps->s->ac->replies.head->privdata = NULL; + } + redisAsyncDisconnect(ps->s->ac); + free(ps); } int @@ -131,8 +144,13 @@ cmd_run(struct server *s, struct evhttp_request *rq, /* check if we have to split the connection */ if(strncasecmp(cmd->argv[0], "SUBSCRIBE", cmd->argv_len[0]) == 0) { - s = server_copy(s); - evhttp_connection_set_closecb(rq->evcon, on_http_disconnect, s); + struct pubsub_client *ps; + ps = calloc(1, sizeof(struct pubsub_client)); + ps->s = s = server_copy(s); + + ps->rq = rq; + + evhttp_connection_set_closecb(rq->evcon, on_http_disconnect, ps); } if(!slash) { diff --git a/formats/json.c b/formats/json.c index ebada55..a04382c 100644 --- a/formats/json.c +++ b/formats/json.c @@ -15,19 +15,30 @@ json_wrap_redis_reply(const struct cmd *cmd, const redisReply *r); void json_reply(redisAsyncContext *c, void *r, void *privdata) { - (void)c; struct evbuffer *body; redisReply *reply = r; struct cmd *cmd = privdata; json_t *j; char *jstr; int free_reply = 1; + redisCallback *cb; if (reply == NULL) { printf("reply = NULL, BYE.\n"); evhttp_send_reply(cmd->rq, 404, "Not Found", NULL); return; } + if(cmd == NULL) { + /* broken connection */ + + /* reinstall callback just in case. */ + cb = calloc(1, sizeof(redisCallback)); + cb->fn = json_reply; + cb->privdata = privdata; + __redisPushCallback(&c->replies, cb); + + return; + } /* encode redis reply as JSON */ j = json_wrap_redis_reply(cmd, r); @@ -41,7 +52,6 @@ json_reply(redisAsyncContext *c, void *r, void *privdata) { evhttp_add_header(cmd->rq->output_headers, "Content-Type", "application/json"); if(strncasecmp(cmd->argv[0], "SUBSCRIBE", cmd->argv_len[0]) == 0) { - redisCallback *cb; free_reply = 0; /* reinstall callback */ @@ -70,7 +80,6 @@ json_reply(redisAsyncContext *c, void *r, void *privdata) { cmd_free(cmd); } evhttp_clear_headers(&cmd->uri_params); - free(json_reply); } static json_t *