PUB/SUB seems to work without crashing...

master
Nicolas Favre-Felix 14 years ago
parent d8f6460e8e
commit 240f05ea34

@ -51,7 +51,7 @@ void on_http_disconnect(struct evhttp_connection *evcon, void *ctx) {
printf("NULLIFY closure\n"); printf("NULLIFY closure\n");
ps->s->ac->replies.head->privdata = NULL; ps->s->ac->replies.head->privdata = NULL;
} }
redisAsyncDisconnect(ps->s->ac); redisAsyncFree(ps->s->ac);
free(ps); free(ps);
} }

@ -23,20 +23,14 @@ json_reply(redisAsyncContext *c, void *r, void *privdata) {
int free_reply = 1; int free_reply = 1;
redisCallback *cb; redisCallback *cb;
if (reply == NULL) {
printf("reply = NULL, BYE.\n");
evhttp_send_reply(cmd->rq, 404, "Not Found", NULL);
return;
}
if(cmd == NULL) { if(cmd == NULL) {
/* broken connection */ /* broken connection */
return;
}
/* reinstall callback just in case. */ if (reply == NULL) {
cb = calloc(1, sizeof(redisCallback)); printf("reply = NULL, BYE.\n");
cb->fn = json_reply; evhttp_send_reply(cmd->rq, 404, "Not Found", NULL);
cb->privdata = privdata;
__redisPushCallback(&c->replies, cb);
return; return;
} }

@ -96,6 +96,11 @@ int redisAsyncSetReplyObjectFunctions(redisAsyncContext *ac, redisReplyObjectFun
int redisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn) { int redisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn) {
if (ac->onConnect == NULL) { if (ac->onConnect == NULL) {
ac->onConnect = fn; ac->onConnect = fn;
/* The common way to detect an established connection is to wait for
* the first write event to be fired. This assumes the related event
* library functions are already set. */
if (ac->evAddWrite) ac->evAddWrite(ac->_adapter_data);
return REDIS_OK; return REDIS_OK;
} }
return REDIS_ERR; return REDIS_ERR;
@ -146,53 +151,82 @@ static int __redisShiftCallback(redisCallbackList *list, redisCallback *target)
return REDIS_ERR; return REDIS_ERR;
} }
/* Tries to do a clean disconnect from Redis, meaning it stops new commands /* Helper function to free the context. */
* from being issued, but tries to flush the output buffer and execute static void __redisAsyncFree(redisAsyncContext *ac) {
* callbacks for all remaining replies.
*
* This functions is generally called from within a callback, so the
* processCallbacks function will pick up the flag when there are no
* more replies. */
void redisAsyncDisconnect(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
c->flags |= REDIS_DISCONNECTING;
}
/* Helper function to make the disconnect happen and clean up. */
void __redisAsyncDisconnect(redisAsyncContext *ac) {
redisContext *c = &(ac->c); redisContext *c = &(ac->c);
redisCallback cb; redisCallback cb;
int status;
/* Make sure error is accessible if there is any */
__redisAsyncCopyError(ac);
status = (ac->err == 0) ? REDIS_OK : REDIS_ERR;
if (status == REDIS_OK) {
/* When the connection is cleanly disconnected, there should not
* be pending callbacks. */
assert(__redisShiftCallback(&ac->replies,NULL) == REDIS_ERR);
} else {
/* Callbacks should not be able to issue new commands. */
c->flags |= REDIS_DISCONNECTING;
/* Execute pending callbacks with NULL reply. */ /* Execute pending callbacks with NULL reply. */
while (__redisShiftCallback(&ac->replies,&cb) == REDIS_OK) { while (__redisShiftCallback(&ac->replies,&cb) == REDIS_OK) {
if (cb.fn != NULL) if (cb.fn != NULL) {
c->flags |= REDIS_IN_CALLBACK;
cb.fn(ac,NULL,cb.privdata); cb.fn(ac,NULL,cb.privdata);
c->flags &= ~REDIS_IN_CALLBACK;
} }
} }
/* Signal event lib to clean up */ /* Signal event lib to clean up */
if (ac->evCleanup) ac->evCleanup(ac->_adapter_data); if (ac->evCleanup) ac->evCleanup(ac->_adapter_data);
/* Execute callback with proper status */ /* Execute disconnect callback. When redisAsyncFree() initiated destroying
if (ac->onDisconnect) ac->onDisconnect(ac,status); * this context, the status will always be REDIS_OK. */
if (ac->onDisconnect && (c->flags & REDIS_CONNECTED)) {
if (c->flags & REDIS_FREEING) {
ac->onDisconnect(ac,REDIS_OK);
} else {
ac->onDisconnect(ac,(ac->err == 0) ? REDIS_OK : REDIS_ERR);
}
}
/* Cleanup self */ /* Cleanup self */
redisFree(c); redisFree(c);
} }
/* Free the async context. When this function is called from a callback,
* control needs to be returned to redisProcessCallbacks() before actual
* free'ing. To do so, a flag is set on the context which is picked up by
* redisProcessCallbacks(). Otherwise, the context is immediately free'd. */
void redisAsyncFree(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
c->flags |= REDIS_FREEING;
if (!(c->flags & REDIS_IN_CALLBACK))
__redisAsyncFree(ac);
}
/* Helper function to make the disconnect happen and clean up. */
static void __redisAsyncDisconnect(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
/* Make sure error is accessible if there is any */
__redisAsyncCopyError(ac);
if (ac->err == 0) {
/* For clean disconnects, there should be no pending callbacks. */
assert(__redisShiftCallback(&ac->replies,NULL) == REDIS_ERR);
} else {
/* Disconnection is caused by an error, make sure that pending
* callbacks cannot call new commands. */
c->flags |= REDIS_DISCONNECTING;
}
/* For non-clean disconnects, __redisAsyncFree() will execute pending
* callbacks with a NULL-reply. */
__redisAsyncFree(ac);
}
/* Tries to do a clean disconnect from Redis, meaning it stops new commands
* from being issued, but tries to flush the output buffer and execute
* callbacks for all remaining replies. When this function is called from a
* callback, there might be more replies and we can safely defer disconnecting
* to redisProcessCallbacks(). Otherwise, we can only disconnect immediately
* when there are no pending callbacks. */
void redisAsyncDisconnect(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
c->flags |= REDIS_DISCONNECTING;
if (!(c->flags & REDIS_IN_CALLBACK) && ac->replies.head == NULL)
__redisAsyncDisconnect(ac);
}
void redisProcessCallbacks(redisAsyncContext *ac) { void redisProcessCallbacks(redisAsyncContext *ac) {
redisContext *c = &(ac->c); redisContext *c = &(ac->c);
redisCallback cb; redisCallback cb;
@ -216,7 +250,15 @@ void redisProcessCallbacks(redisAsyncContext *ac) {
/* Shift callback and execute it */ /* Shift callback and execute it */
assert(__redisShiftCallback(&ac->replies,&cb) == REDIS_OK); assert(__redisShiftCallback(&ac->replies,&cb) == REDIS_OK);
if (cb.fn != NULL) { if (cb.fn != NULL) {
c->flags |= REDIS_IN_CALLBACK;
cb.fn(ac,reply,cb.privdata); cb.fn(ac,reply,cb.privdata);
c->flags &= ~REDIS_IN_CALLBACK;
/* Proceed with free'ing when redisAsyncFree() was called. */
if (c->flags & REDIS_FREEING) {
__redisAsyncFree(ac);
return;
}
} else { } else {
c->fn->freeObject(reply); c->fn->freeObject(reply);
} }
@ -276,8 +318,8 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void
redisContext *c = &(ac->c); redisContext *c = &(ac->c);
redisCallback cb; redisCallback cb;
/* Don't accept new commands when the connection is lazily closed. */ /* Don't accept new commands when the connection is about to be closed. */
if (c->flags & REDIS_DISCONNECTING) return REDIS_ERR; if (c->flags & (REDIS_DISCONNECTING | REDIS_FREEING)) return REDIS_ERR;
__redisAppendCommand(c,cmd,len); __redisAppendCommand(c,cmd,len);
/* Store callback */ /* Store callback */

@ -95,6 +95,7 @@ int redisAsyncSetReplyObjectFunctions(redisAsyncContext *ac, redisReplyObjectFun
int redisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn); int redisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn);
int redisAsyncSetDisconnectCallback(redisAsyncContext *ac, redisDisconnectCallback *fn); int redisAsyncSetDisconnectCallback(redisAsyncContext *ac, redisDisconnectCallback *fn);
void redisAsyncDisconnect(redisAsyncContext *ac); void redisAsyncDisconnect(redisAsyncContext *ac);
void redisAsyncFree(redisAsyncContext *ac);
/* Handle read/write events */ /* Handle read/write events */
void redisAsyncHandleRead(redisAsyncContext *ac); void redisAsyncHandleRead(redisAsyncContext *ac);

@ -809,8 +809,7 @@ static redisContext *redisContextInit() {
} }
void redisFree(redisContext *c) { void redisFree(redisContext *c) {
/* Disconnect before free'ing if not yet disconnected. */ if (c->fd > 0)
if (c->flags & REDIS_CONNECTED)
close(c->fd); close(c->fd);
if (c->errstr != NULL) if (c->errstr != NULL)
sdsfree(c->errstr); sdsfree(c->errstr);

@ -64,6 +64,13 @@
* should be terminated once all replies have been read. */ * should be terminated once all replies have been read. */
#define REDIS_DISCONNECTING 0x4 #define REDIS_DISCONNECTING 0x4
/* Flag specific to the async API which means that the context should be clean
* up as soon as possible. */
#define REDIS_FREEING 0x8
/* Flag that is set when an async callback is executed. */
#define REDIS_IN_CALLBACK 0x10
#define REDIS_REPLY_STRING 1 #define REDIS_REPLY_STRING 1
#define REDIS_REPLY_ARRAY 2 #define REDIS_REPLY_ARRAY 2
#define REDIS_REPLY_INTEGER 3 #define REDIS_REPLY_INTEGER 3

Loading…
Cancel
Save