From 5b82660a774380994b5273008c3dbdf657273e50 Mon Sep 17 00:00:00 2001 From: Nicolas Favre-Felix Date: Fri, 24 Dec 2010 12:16:40 +0100 Subject: [PATCH] Scheduled reconnect. --- turnip.c | 92 +++++++++++++++++++++++++++++++++++--------------------- 1 file changed, 58 insertions(+), 34 deletions(-) diff --git a/turnip.c b/turnip.c index 6e7613b..5cb4180 100644 --- a/turnip.c +++ b/turnip.c @@ -13,12 +13,49 @@ #include "conf.h" #include "cmd.h" +#include "turnip.h" -static struct conf *cfg; -redisAsyncContext *__redis_context = NULL; -struct event_base *__base; +struct turnip *__t = NULL; + +static void +connectCallback(const redisAsyncContext *c); + +static void +disconnectCallback(const redisAsyncContext *c, int status); -static void reconnect(); + +static void +reconnect(); + +static void +on_timer_reconnect(int fd, short event, void *ctx) { + + (void)fd; + (void)event; + (void)ctx; + + if(__t->ac) { + redisLibeventCleanup(__t->ac->_adapter_data); + } + + /* TODO: free AC. */ + + if(__t->cfg->redis_host[0] == '/') { /* unix socket */ + __t->ac = redisAsyncConnectUnix(__t->cfg->redis_host); + } else { + __t->ac = redisAsyncConnect(__t->cfg->redis_host, __t->cfg->redis_port); + } + if(__t->ac->err) { + /* Let *c leak for now... */ + printf("Error: %s\n", __t->ac->errstr); + } + + + redisLibeventAttach(__t->ac, __t->base); + redisAsyncSetConnectCallback(__t->ac, connectCallback); + redisAsyncSetDisconnectCallback(__t->ac, disconnectCallback); + +} static void connectCallback(const redisAsyncContext *c) { @@ -31,34 +68,20 @@ disconnectCallback(const redisAsyncContext *c, int status) { if (status != REDIS_OK) { printf("Error: %s\n", c->errstr); } - printf("disconnected...\n"); - __redis_context = NULL; + printf("disconnected, schedule reconnect.\n"); + __t->ac = NULL; - /* TODO: schedule reconnect instead to avoid opening too many sockets */ reconnect(); } static void reconnect() { - - if(__redis_context) { - redisLibeventCleanup(__redis_context->_adapter_data); - } - - if(cfg->redis_host[0] == '/') { /* unix socket */ - __redis_context = redisAsyncConnectUnix(cfg->redis_host); - } else { - __redis_context = redisAsyncConnect(cfg->redis_host, cfg->redis_port); - } - if(__redis_context->err) { - /* Let *c leak for now... */ - printf("Error: %s\n", __redis_context->errstr); - } - - - redisLibeventAttach(__redis_context, __base); - redisAsyncSetConnectCallback(__redis_context, connectCallback); - redisAsyncSetDisconnectCallback(__redis_context, disconnectCallback); + /* schedule reconnect */ + evtimer_set(&__t->ev_reconnect, on_timer_reconnect, NULL); + event_base_set(__t->base, &__t->ev_reconnect); + __t->tv_reconnect.tv_sec = 1; + __t->tv_reconnect.tv_usec = 0; + evtimer_add(&__t->ev_reconnect, &__t->tv_reconnect); } void @@ -67,18 +90,18 @@ on_request(struct evhttp_request *rq, void *ctx) { const char *uri = evhttp_request_uri(rq); (void)ctx; - if(!__redis_context) { /* redis is unavailable */ + if(!__t->ac) { /* redis is unavailable */ evhttp_send_reply(rq, 503, "Service Unavailable", NULL); return; } switch(rq->type) { case EVHTTP_REQ_GET: - cmd_run(__redis_context, rq, 1+uri, strlen(uri)-1); + cmd_run(__t->ac, rq, 1+uri, strlen(uri)-1); break; case EVHTTP_REQ_POST: - cmd_run(__redis_context, rq, + cmd_run(__t->ac, rq, (const char*)EVBUFFER_DATA(rq->input_buffer), EVBUFFER_LENGTH(rq->input_buffer)); break; @@ -94,10 +117,11 @@ main(int argc, char *argv[]) { (void)argc; (void)argv; - __base = event_base_new(); - struct evhttp *http = evhttp_new(__base); + __t = calloc(1, sizeof(struct turnip)); + __t->base = event_base_new(); + struct evhttp *http = evhttp_new(__t->base); - cfg = conf_read("turnip.conf"); + __t->cfg = conf_read("turnip.conf"); /* ignore sigpipe */ #ifdef SIGPIPE @@ -105,14 +129,14 @@ main(int argc, char *argv[]) { #endif /* start http server */ - evhttp_bind_socket(http, cfg->http_host, cfg->http_port); + evhttp_bind_socket(http, __t->cfg->http_host, __t->cfg->http_port); evhttp_set_gencb(http, on_request, NULL); /* attach hiredis to libevent base */ reconnect(); /* loop */ - event_base_dispatch(__base); + event_base_dispatch(__t->base); return EXIT_SUCCESS; }