diff --git a/hiredis/async.c b/hiredis/async.c index f83e2f5..83d88ec 100644 --- a/hiredis/async.c +++ b/hiredis/async.c @@ -102,7 +102,12 @@ static dictType callbackDict = { }; static redisAsyncContext *redisAsyncInitialize(redisContext *c) { - redisAsyncContext *ac = realloc(c,sizeof(redisAsyncContext)); + redisAsyncContext *ac; + + ac = realloc(c,sizeof(redisAsyncContext)); + if (ac == NULL) + return NULL; + c = &(ac->c); /* The regular connect functions will always set the flag REDIS_CONNECTED. @@ -142,15 +147,32 @@ static void __redisAsyncCopyError(redisAsyncContext *ac) { } redisAsyncContext *redisAsyncConnect(const char *ip, int port) { - redisContext *c = redisConnectNonBlock(ip,port); - redisAsyncContext *ac = redisAsyncInitialize(c); + redisContext *c; + redisAsyncContext *ac; + + c = redisConnectNonBlock(ip,port); + if (c == NULL) + return NULL; + + ac = redisAsyncInitialize(c); + if (ac == NULL) { + redisFree(c); + return NULL; + } + __redisAsyncCopyError(ac); return ac; } redisAsyncContext *redisAsyncConnectUnix(const char *path) { - redisContext *c = redisConnectUnixNonBlock(path); - redisAsyncContext *ac = redisAsyncInitialize(c); + redisContext *c; + redisAsyncContext *ac; + + c = redisConnectUnixNonBlock(path); + if (c == NULL) + return NULL; + + ac = redisAsyncInitialize(c); __redisAsyncCopyError(ac); return ac; } @@ -182,6 +204,9 @@ static int __redisPushCallback(redisCallbackList *list, redisCallback *source) { /* Copy callback from stack to heap */ cb = malloc(sizeof(*cb)); + if (cb == NULL) + return REDIS_ERR_OOM; + if (source != NULL) { memcpy(cb,source,sizeof(*cb)); cb->next = NULL; @@ -372,6 +397,11 @@ void redisProcessCallbacks(redisAsyncContext *ac) { __redisAsyncDisconnect(ac); return; } + + /* If monitor mode, repush callback */ + if(c->flags & REDIS_MONITORING) { + __redisPushCallback(&ac->replies,&cb); + } /* When the connection is not being disconnected, simply stop * trying to get replies and wait for the next loop tick. */ @@ -381,22 +411,31 @@ void redisProcessCallbacks(redisAsyncContext *ac) { /* Even if the context is subscribed, pending regular callbacks will * get a reply before pub/sub messages arrive. */ if (__redisShiftCallback(&ac->replies,&cb) != REDIS_OK) { - /* A spontaneous reply in a not-subscribed context can only be the - * error reply that is sent when a new connection exceeds the - * maximum number of allowed connections on the server side. This - * is seen as an error instead of a regular reply because the - * server closes the connection after sending it. To prevent the - * error from being overwritten by an EOF error the connection is - * closed here. See issue #43. */ - if ( !(c->flags & REDIS_SUBSCRIBED) && ((redisReply*)reply)->type == REDIS_REPLY_ERROR ) { + /* + * A spontaneous reply in a not-subscribed context can be the error + * reply that is sent when a new connection exceeds the maximum + * number of allowed connections on the server side. + * + * This is seen as an error instead of a regular reply because the + * server closes the connection after sending it. + * + * To prevent the error from being overwritten by an EOF error the + * connection is closed here. See issue #43. + * + * Another possibility is that the server is loading its dataset. + * In this case we also want to close the connection, and have the + * user wait until the server is ready to take our request. + */ + if (((redisReply*)reply)->type == REDIS_REPLY_ERROR) { c->err = REDIS_ERR_OTHER; snprintf(c->errstr,sizeof(c->errstr),"%s",((redisReply*)reply)->str); __redisAsyncDisconnect(ac); return; } - /* No more regular callbacks and no errors, the context *must* be subscribed. */ - assert(c->flags & REDIS_SUBSCRIBED); - __redisGetSubscribeCallback(ac,reply,&cb); + /* No more regular callbacks and no errors, the context *must* be subscribed or monitoring. */ + assert((c->flags & REDIS_SUBSCRIBED || c->flags & REDIS_MONITORING)); + if(c->flags & REDIS_SUBSCRIBED) + __redisGetSubscribeCallback(ac,reply,&cb); } if (cb.fn != NULL) { @@ -557,6 +596,10 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void /* (P)UNSUBSCRIBE does not have its own response: every channel or * pattern that is unsubscribed will receive a message. This means we * should not append a callback function for this command. */ + } else if(strncasecmp(cstr,"monitor\r\n",9) == 0) { + /* Set monitor flag and push callback */ + c->flags |= REDIS_MONITORING; + __redisPushCallback(&ac->replies,&cb); } else { if (c->flags & REDIS_SUBSCRIBED) /* This will likely result in an error reply, but it needs to be diff --git a/hiredis/example.c b/hiredis/example.c index 90ff9ed..d9d7271 100644 --- a/hiredis/example.c +++ b/hiredis/example.c @@ -10,9 +10,14 @@ int main(void) { redisReply *reply; struct timeval timeout = { 1, 500000 }; // 1.5 seconds - c = redisConnectWithTimeout((char*)"127.0.0.2", 6379, timeout); - if (c->err) { - printf("Connection error: %s\n", c->errstr); + c = redisConnectWithTimeout((char*)"127.0.0.1", 6379, timeout); + if (c == NULL || c->err) { + if (c) { + printf("Connection error: %s\n", c->errstr); + redisFree(c); + } else { + printf("Connection error: can't allocate redis context\n"); + } exit(1); } diff --git a/hiredis/hiredis.c b/hiredis/hiredis.c index 1a57adb..958f58b 100644 --- a/hiredis/hiredis.c +++ b/hiredis/hiredis.c @@ -446,10 +446,10 @@ static int processMultiBulkItem(redisReader *r) { long elements; int root = 0; - /* Set error for nested multi bulks with depth > 2 */ - if (r->ridx == 3) { + /* Set error for nested multi bulks with depth > 7 */ + if (r->ridx == 8) { __redisReaderSetError(r,REDIS_ERR_PROTOCOL, - "No support for nested multi bulk replies with depth > 2"); + "No support for nested multi bulk replies with depth > 7"); return REDIS_ERR; } @@ -564,6 +564,7 @@ redisReader *redisReaderCreate(void) { r->errstr[0] = '\0'; r->fn = &defaultFunctions; r->buf = sdsempty(); + r->maxbuf = REDIS_READER_MAX_BUF; if (r->buf == NULL) { free(r); return NULL; @@ -591,7 +592,7 @@ int redisReaderFeed(redisReader *r, const char *buf, size_t len) { /* Copy the provided buffer. */ if (buf != NULL && len >= 1) { /* Destroy internal buffer when it is empty and is quite large. */ - if (r->len == 0 && sdsavail(r->buf) > 16*1024) { + if (r->len == 0 && r->maxbuf != 0 && sdsavail(r->buf) > r->maxbuf) { sdsfree(r->buf); r->buf = sdsempty(); r->pos = 0; @@ -1013,42 +1014,72 @@ void redisFree(redisContext *c) { * context will be set to the return value of the error function. * When no set of reply functions is given, the default set will be used. */ redisContext *redisConnect(const char *ip, int port) { - redisContext *c = redisContextInit(); + redisContext *c; + + c = redisContextInit(); + if (c == NULL) + return NULL; + c->flags |= REDIS_BLOCK; redisContextConnectTcp(c,ip,port,NULL); return c; } redisContext *redisConnectWithTimeout(const char *ip, int port, struct timeval tv) { - redisContext *c = redisContextInit(); + redisContext *c; + + c = redisContextInit(); + if (c == NULL) + return NULL; + c->flags |= REDIS_BLOCK; redisContextConnectTcp(c,ip,port,&tv); return c; } redisContext *redisConnectNonBlock(const char *ip, int port) { - redisContext *c = redisContextInit(); + redisContext *c; + + c = redisContextInit(); + if (c == NULL) + return NULL; + c->flags &= ~REDIS_BLOCK; redisContextConnectTcp(c,ip,port,NULL); return c; } redisContext *redisConnectUnix(const char *path) { - redisContext *c = redisContextInit(); + redisContext *c; + + c = redisContextInit(); + if (c == NULL) + return NULL; + c->flags |= REDIS_BLOCK; redisContextConnectUnix(c,path,NULL); return c; } redisContext *redisConnectUnixWithTimeout(const char *path, struct timeval tv) { - redisContext *c = redisContextInit(); + redisContext *c; + + c = redisContextInit(); + if (c == NULL) + return NULL; + c->flags |= REDIS_BLOCK; redisContextConnectUnix(c,path,&tv); return c; } redisContext *redisConnectUnixNonBlock(const char *path) { - redisContext *c = redisContextInit(); + redisContext *c; + + c = redisContextInit(); + if (c == NULL) + return NULL; + c->flags &= ~REDIS_BLOCK; redisContextConnectUnix(c,path,NULL); return c; @@ -1067,7 +1098,7 @@ int redisSetTimeout(redisContext *c, struct timeval tv) { * After this function is called, you may use redisContextReadReply to * see if there is a reply available. */ int redisBufferRead(redisContext *c) { - char buf[2048]; + char buf[1024*16]; int nread; /* Return early when the context has seen an error. */ @@ -1076,7 +1107,7 @@ int redisBufferRead(redisContext *c) { nread = read(c->fd,buf,sizeof(buf)); if (nread == -1) { - if (errno == EAGAIN && !(c->flags & REDIS_BLOCK)) { + if ((errno == EAGAIN && !(c->flags & REDIS_BLOCK)) || (errno == EINTR)) { /* Try again later */ } else { __redisSetError(c,REDIS_ERR_IO,NULL); @@ -1113,7 +1144,7 @@ int redisBufferWrite(redisContext *c, int *done) { if (sdslen(c->obuf) > 0) { nwritten = write(c->fd,c->obuf,sdslen(c->obuf)); if (nwritten == -1) { - if (errno == EAGAIN && !(c->flags & REDIS_BLOCK)) { + if ((errno == EAGAIN && !(c->flags & REDIS_BLOCK)) || (errno == EINTR)) { /* Try again later */ } else { __redisSetError(c,REDIS_ERR_IO,NULL); diff --git a/hiredis/hiredis.h b/hiredis/hiredis.h index 8358375..aadcf35 100644 --- a/hiredis/hiredis.h +++ b/hiredis/hiredis.h @@ -36,8 +36,8 @@ #include /* for struct timeval */ #define HIREDIS_MAJOR 0 -#define HIREDIS_MINOR 10 -#define HIREDIS_PATCH 1 +#define HIREDIS_MINOR 11 +#define HIREDIS_PATCH 0 #define REDIS_ERR -1 #define REDIS_OK 0 @@ -76,6 +76,9 @@ /* Flag that is set when the async context has one or more subscriptions. */ #define REDIS_SUBSCRIBED 0x20 +/* Flag that is set when monitor mode is active */ +#define REDIS_MONITORING 0x40 + #define REDIS_REPLY_STRING 1 #define REDIS_REPLY_ARRAY 2 #define REDIS_REPLY_INTEGER 3 @@ -83,6 +86,8 @@ #define REDIS_REPLY_STATUS 5 #define REDIS_REPLY_ERROR 6 +#define REDIS_READER_MAX_BUF (1024*16) /* Default max unused reader buffer. */ + #ifdef __cplusplus extern "C" { #endif @@ -122,8 +127,9 @@ typedef struct redisReader { char *buf; /* Read buffer */ size_t pos; /* Buffer cursor */ size_t len; /* Buffer length */ + size_t maxbuf; /* Max length of unused buffer */ - redisReadTask rstack[4]; + redisReadTask rstack[9]; int ridx; /* Index of current read task */ void *reply; /* Temporary reply pointer */ diff --git a/hiredis/test.c b/hiredis/test.c index 5945b65..6786003 100644 --- a/hiredis/test.c +++ b/hiredis/test.c @@ -88,7 +88,10 @@ static redisContext *connect(struct config config) { assert(NULL); } - if (c->err) { + if (c == NULL) { + printf("Connection error: can't allocate redis context\n"); + exit(1); + } else if (c->err) { printf("Connection error: %s\n", c->errstr); exit(1); } @@ -204,6 +207,7 @@ static void test_reply_reader(void) { redisReader *reader; void *reply; int ret; + int i; test("Error handling in reply parser: "); reader = redisReaderCreate(); @@ -225,12 +229,13 @@ static void test_reply_reader(void) { strcasecmp(reader->errstr,"Protocol error, got \"@\" as reply type byte") == 0); redisReaderFree(reader); - test("Set error on nested multi bulks with depth > 2: "); + test("Set error on nested multi bulks with depth > 7: "); reader = redisReaderCreate(); - redisReaderFeed(reader,(char*)"*1\r\n",4); - redisReaderFeed(reader,(char*)"*1\r\n",4); - redisReaderFeed(reader,(char*)"*1\r\n",4); - redisReaderFeed(reader,(char*)"*1\r\n",4); + + for (i = 0; i < 9; i++) { + redisReaderFeed(reader,(char*)"*1\r\n",4); + } + ret = redisReaderGetReply(reader,NULL); test_cond(ret == REDIS_ERR && strncasecmp(reader->errstr,"No support for",14) == 0);