Pub/sub progress...

master
Nicolas Favre-Felix 14 years ago
parent 7f256d6461
commit d8f6460e8e

28
cmd.c

@ -35,11 +35,24 @@ cmd_free(struct cmd *c) {
free(c);
}
struct pubsub_client {
struct server *s;
struct evhttp_request *rq;
};
void __redisAsyncDisconnect(redisAsyncContext *ac);
void on_http_disconnect(struct evhttp_connection *evcon, void *ctx) {
struct server *s = ctx;
printf("SUBSCRIBE DISCONNECT (ac=%p)\n", (void*)s->ac);
redisAsyncDisconnect(s->ac);
struct pubsub_client *ps = ctx;
(void)evcon;
printf("SUBSCRIBE DISCONNECT (ac=%p)\n", (void*)ps->s->ac);
if(ps->s->ac->replies.head) { /* TODO: proper free. */
printf("NULLIFY closure\n");
ps->s->ac->replies.head->privdata = NULL;
}
redisAsyncDisconnect(ps->s->ac);
free(ps);
}
int
@ -131,8 +144,13 @@ cmd_run(struct server *s, struct evhttp_request *rq,
/* check if we have to split the connection */
if(strncasecmp(cmd->argv[0], "SUBSCRIBE", cmd->argv_len[0]) == 0) {
s = server_copy(s);
evhttp_connection_set_closecb(rq->evcon, on_http_disconnect, s);
struct pubsub_client *ps;
ps = calloc(1, sizeof(struct pubsub_client));
ps->s = s = server_copy(s);
ps->rq = rq;
evhttp_connection_set_closecb(rq->evcon, on_http_disconnect, ps);
}
if(!slash) {

@ -15,19 +15,30 @@ json_wrap_redis_reply(const struct cmd *cmd, const redisReply *r);
void
json_reply(redisAsyncContext *c, void *r, void *privdata) {
(void)c;
struct evbuffer *body;
redisReply *reply = r;
struct cmd *cmd = privdata;
json_t *j;
char *jstr;
int free_reply = 1;
redisCallback *cb;
if (reply == NULL) {
printf("reply = NULL, BYE.\n");
evhttp_send_reply(cmd->rq, 404, "Not Found", NULL);
return;
}
if(cmd == NULL) {
/* broken connection */
/* reinstall callback just in case. */
cb = calloc(1, sizeof(redisCallback));
cb->fn = json_reply;
cb->privdata = privdata;
__redisPushCallback(&c->replies, cb);
return;
}
/* encode redis reply as JSON */
j = json_wrap_redis_reply(cmd, r);
@ -41,7 +52,6 @@ json_reply(redisAsyncContext *c, void *r, void *privdata) {
evhttp_add_header(cmd->rq->output_headers, "Content-Type", "application/json");
if(strncasecmp(cmd->argv[0], "SUBSCRIBE", cmd->argv_len[0]) == 0) {
redisCallback *cb;
free_reply = 0;
/* reinstall callback */
@ -70,7 +80,6 @@ json_reply(redisAsyncContext *c, void *r, void *privdata) {
cmd_free(cmd);
}
evhttp_clear_headers(&cmd->uri_params);
free(json_reply);
}
static json_t *

Loading…
Cancel
Save