Update hiredis.

master
Nicolas Favre-Felix 13 years ago
parent e9da83d268
commit 28ec18582a

@ -1,5 +1,6 @@
# Hiredis Makefile # Hiredis Makefile
# Copyright (C) 2010 Salvatore Sanfilippo <antirez at gmail dot com> # Copyright (C) 2010-2011 Salvatore Sanfilippo <antirez at gmail dot com>
# Copyright (C) 2010-2011 Pieter Noordhuis <pcnoordhuis at gmail dot com>
# This file is released under the BSD license, see the COPYING file # This file is released under the BSD license, see the COPYING file
OBJ=net.o hiredis.o sds.o async.o OBJ=net.o hiredis.o sds.o async.o
@ -9,61 +10,37 @@ LIBNAME=libhiredis
HIREDIS_MAJOR=0 HIREDIS_MAJOR=0
HIREDIS_MINOR=10 HIREDIS_MINOR=10
uname_S := $(shell sh -c 'uname -s 2>/dev/null || echo not') # Fallback to gcc when $CC is not in $PATH.
CC:=$(shell sh -c 'type $(CC) >/dev/null 2>/dev/null && echo $(CC) || echo gcc')
OPTIMIZATION?=-O3 OPTIMIZATION?=-O3
WARNINGS=-Wall -W -Wstrict-prototypes -Wwrite-strings
DEBUG?= -g -ggdb
REAL_CFLAGS=$(OPTIMIZATION) -fPIC $(CFLAGS) $(WARNINGS) $(DEBUG)
REAL_LDFLAGS=$(LDFLAGS)
DYLIBSUFFIX=so
STLIBSUFFIX=a
DYLIB_MINOR_NAME=$(LIBNAME).$(DYLIBSUFFIX).$(HIREDIS_MAJOR).$(HIREDIS_MINOR)
DYLIB_MAJOR_NAME=$(LIBNAME).$(DYLIBSUFFIX).$(HIREDIS_MAJOR)
DYLIBNAME=$(LIBNAME).$(DYLIBSUFFIX)
DYLIB_MAKE_CMD=$(CC) -shared -Wl,-soname,$(DYLIB_MINOR_NAME) -o $(DYLIBNAME) $(LDFLAGS)
STLIBNAME=$(LIBNAME).$(STLIBSUFFIX)
STLIB_MAKE_CMD=ar rcs $(STLIBNAME)
# Platform-specific overrides
uname_S := $(shell sh -c 'uname -s 2>/dev/null || echo not')
ifeq ($(uname_S),SunOS) ifeq ($(uname_S),SunOS)
CFLAGS?=$(OPTIMIZATION) -fPIC -Wall -W -D__EXTENSIONS__ -D_XPG6 $(ARCH) $(PROF) REAL_LDFLAGS+= -ldl -lnsl -lsocket
CCLINK?=-ldl -lnsl -lsocket -lm -lpthread DYLIB_MAKE_CMD=$(CC) -G -o $(DYLIBNAME) -h $(DYLIB_MINOR_NAME) $(LDFLAGS)
LDFLAGS?=-L.
DYLIBSUFFIX=so
STLIBSUFFIX=a
DYLIB_MINOR_NAME?=$(LIBNAME).$(DYLIBSUFFIX).$(HIREDIS_MAJOR).$(HIREDIS_MINOR)
DYLIB_MAJOR_NAME?=$(LIBNAME).$(DYLIBSUFFIX).$(HIREDIS_MAJOR)
DYLIBNAME?=$(LIBNAME).$(DYLIBSUFFIX)
DYLIB_MAKE_CMD?=$(CC) -G -o $(DYLIBNAME) -h $(DYLIB_MINOR_NAME)
STLIBNAME?=$(LIBNAME).$(STLIBSUFFIX)
STLIB_MAKE_CMD?=ar rcs $(STLIBNAME)
INSTALL= cp -r INSTALL= cp -r
else endif
ifeq ($(uname_S),Darwin) ifeq ($(uname_S),Darwin)
CFLAGS?=$(OPTIMIZATION) -fPIC -Wall -W -Wstrict-prototypes -Wwrite-strings $(ARCH) $(PROF)
CCLINK?=-lm -pthread
LDFLAGS?=-L.
OBJARCH?=-arch i386 -arch x86_64
DYLIBSUFFIX=dylib DYLIBSUFFIX=dylib
STLIBSUFFIX=a DYLIB_MINOR_NAME=$(LIBNAME).$(HIREDIS_MAJOR).$(HIREDIS_MINOR).$(DYLIBSUFFIX)
DYLIB_MINOR_NAME?=$(LIBNAME).$(HIREDIS_MAJOR).$(HIREDIS_MINOR).$(DYLIBSUFFIX) DYLIB_MAJOR_NAME=$(LIBNAME).$(HIREDIS_MAJOR).$(DYLIBSUFFIX)
DYLIB_MAJOR_NAME?=$(LIBNAME).$(HIREDIS_MAJOR).$(DYLIBSUFFIX) DYLIB_MAKE_CMD=$(CC) -shared -Wl,-install_name,$(DYLIB_MINOR_NAME) -o $(DYLIBNAME) $(LDFLAGS)
DYLIBNAME?=$(LIBNAME).$(DYLIBSUFFIX)
DYLIB_MAKE_CMD?=libtool -dynamic -o $(DYLIBNAME) -install_name $(DYLIB_MINOR_NAME) -lm $(DEBUG) -
STLIBNAME?=$(LIBNAME).$(STLIBSUFFIX)
STLIB_MAKE_CMD?=libtool -static -o $(STLIBNAME) -
INSTALL= cp -a
else
CFLAGS?=$(OPTIMIZATION) -fPIC -Wall -W -Wstrict-prototypes -Wwrite-strings $(ARCH) $(PROF)
CCLINK?=-lm -pthread
LDFLAGS?=-L.
DYLIBSUFFIX=so
STLIBSUFFIX=a
DYLIB_MINOR_NAME?=$(LIBNAME).$(DYLIBSUFFIX).$(HIREDIS_MAJOR).$(HIREDIS_MINOR)
DYLIB_MAJOR_NAME?=$(LIBNAME).$(DYLIBSUFFIX).$(HIREDIS_MAJOR)
DYLIBNAME?=$(LIBNAME).$(DYLIBSUFFIX)
DYLIB_MAKE_CMD?=gcc -shared -Wl,-soname,$(DYLIB_MINOR_NAME) -o $(DYLIBNAME)
STLIBNAME?=$(LIBNAME).$(STLIBSUFFIX)
STLIB_MAKE_CMD?=ar rcs $(STLIBNAME)
INSTALL= cp -a
endif
endif endif
CCOPT= $(CFLAGS) $(CCLINK)
DEBUG?= -g -ggdb
PREFIX?=/usr/local
INCLUDE_PATH?=include/hiredis
LIBRARY_PATH?=lib
INSTALL_INCLUDE_PATH= $(PREFIX)/$(INCLUDE_PATH)
INSTALL_LIBRARY_PATH= $(PREFIX)/$(LIBRARY_PATH)
all: $(DYLIBNAME) $(BINS) all: $(DYLIBNAME) $(BINS)
# Deps (use make dep to generate this) # Deps (use make dep to generate this)
@ -85,10 +62,10 @@ static: $(STLIBNAME)
# Binaries: # Binaries:
hiredis-example-libevent: example-libevent.c adapters/libevent.h $(STLIBNAME) hiredis-example-libevent: example-libevent.c adapters/libevent.h $(STLIBNAME)
$(CC) -o $@ $(CCOPT) $(DEBUG) $(LDFLAGS) -levent example-libevent.c $(STLIBNAME) $(CC) -o $@ $(REAL_CFLAGS) $(REAL_LDFLAGS) -levent example-libevent.c $(STLIBNAME)
hiredis-example-libev: example-libev.c adapters/libev.h $(STLIBNAME) hiredis-example-libev: example-libev.c adapters/libev.h $(STLIBNAME)
$(CC) -o $@ $(CCOPT) $(DEBUG) $(LDFLAGS) -lev example-libev.c $(STLIBNAME) $(CC) -o $@ $(REAL_CFLAGS) $(REAL_LDFLAGS) -lev example-libev.c $(STLIBNAME)
ifndef AE_DIR ifndef AE_DIR
hiredis-example-ae: hiredis-example-ae:
@ -96,11 +73,11 @@ hiredis-example-ae:
@false @false
else else
hiredis-example-ae: example-ae.c adapters/ae.h $(STLIBNAME) hiredis-example-ae: example-ae.c adapters/ae.h $(STLIBNAME)
$(CC) -o $@ $(CCOPT) $(DEBUG) -I$(AE_DIR) $(LDFLAGS) $(AE_DIR)/ae.o $(AE_DIR)/zmalloc.o example-ae.c $(STLIBNAME) $(CC) -o $@ $(REAL_CFLAGS) $(REAL_LDFLAGS) -I$(AE_DIR) $(AE_DIR)/ae.o $(AE_DIR)/zmalloc.o example-ae.c $(STLIBNAME)
endif endif
hiredis-%: %.o $(STLIBNAME) hiredis-%: %.o $(STLIBNAME)
$(CC) -o $@ $(CCOPT) $(DEBUG) $(LDFLAGS) $< $(STLIBNAME) $(CC) -o $@ $(REAL_LDFLAGS) $< $(STLIBNAME)
test: hiredis-test test: hiredis-test
./hiredis-test ./hiredis-test
@ -118,7 +95,7 @@ check: hiredis-test
kill `cat /tmp/hiredis-test-redis.pid` kill `cat /tmp/hiredis-test-redis.pid`
.c.o: .c.o:
$(CC) -std=c99 -pedantic -c $(CFLAGS) $(OBJARCH) $(DEBUG) $(COMPILE_TIME) $< $(CC) -std=c99 -pedantic -c $(REAL_CFLAGS) $<
clean: clean:
rm -rf $(DYLIBNAME) $(STLIBNAME) $(BINS) hiredis-example* *.o *.gcda *.gcno *.gcov rm -rf $(DYLIBNAME) $(STLIBNAME) $(BINS) hiredis-example* *.o *.gcda *.gcno *.gcov
@ -126,6 +103,19 @@ clean:
dep: dep:
$(CC) -MM *.c $(CC) -MM *.c
# Installation related variables and target
PREFIX?=/usr/local
INCLUDE_PATH?=include/hiredis
LIBRARY_PATH?=lib
INSTALL_INCLUDE_PATH= $(PREFIX)/$(INCLUDE_PATH)
INSTALL_LIBRARY_PATH= $(PREFIX)/$(LIBRARY_PATH)
ifeq ($(uname_S),SunOS)
INSTALL?= cp -r
endif
INSTALL?= cp -a
install: $(DYLIBNAME) $(STLIBNAME) install: $(DYLIBNAME) $(STLIBNAME)
mkdir -p $(INSTALL_INCLUDE_PATH) $(INSTALL_LIBRARY_PATH) mkdir -p $(INSTALL_INCLUDE_PATH) $(INSTALL_LIBRARY_PATH)
$(INSTALL) hiredis.h async.h adapters $(INSTALL_INCLUDE_PATH) $(INSTALL) hiredis.h async.h adapters $(INSTALL_INCLUDE_PATH)
@ -134,18 +124,25 @@ install: $(DYLIBNAME) $(STLIBNAME)
cd $(INSTALL_LIBRARY_PATH) && ln -sf $(DYLIB_MAJOR_NAME) $(DYLIBNAME) cd $(INSTALL_LIBRARY_PATH) && ln -sf $(DYLIB_MAJOR_NAME) $(DYLIBNAME)
$(INSTALL) $(STLIBNAME) $(INSTALL_LIBRARY_PATH) $(INSTALL) $(STLIBNAME) $(INSTALL_LIBRARY_PATH)
32bit: 32bit:
@echo "" @echo ""
@echo "WARNING: if it fails under Linux you probably need to install libc6-dev-i386" @echo "WARNING: if this fails under Linux you probably need to install libc6-dev-i386"
@echo "" @echo ""
$(MAKE) ARCH="-m32" $(MAKE) CFLAGS="-m32" LDFLAGS="-m32"
gprof: gprof:
$(MAKE) PROF="-pg" $(MAKE) CFLAGS="-pg" LDFLAGS="-pg"
gcov: gcov:
$(MAKE) PROF="-fprofile-arcs -ftest-coverage" $(MAKE) CFLAGS="-fprofile-arcs -ftest-coverage" LDFLAGS="-fprofile-arcs"
coverage: gcov
make check
mkdir -p tmp/lcov
lcov -d . -c -o tmp/lcov/hiredis.info
genhtml --legend -o tmp/lcov/report tmp/lcov/hiredis.info
noopt: noopt:
$(MAKE) OPTIMIZATION="" $(MAKE) OPTIMIZATION=""
.PHONY: all test check clean dep install 32bit gprof gcov noopt

@ -286,7 +286,8 @@ is being disconnected per user-request, no new commands may be added to the outp
returned on calls to the `redisAsyncCommand` family. returned on calls to the `redisAsyncCommand` family.
If the reply for a command with a `NULL` callback is read, it is immediately free'd. When the callback If the reply for a command with a `NULL` callback is read, it is immediately free'd. When the callback
for a command is non-`NULL`, it is responsible for cleaning up the reply. for a command is non-`NULL`, the memory is free'd immediately following the callback: the reply is only
valid for the duration of the callback.
All pending callbacks are called with a `NULL` reply when the context encountered an error. All pending callbacks are called with a `NULL` reply when the context encountered an error.

@ -1,3 +1,5 @@
#ifndef __HIREDIS_AE_H__
#define __HIREDIS_AE_H__
#include <sys/types.h> #include <sys/types.h>
#include <ae.h> #include <ae.h>
#include "../hiredis.h" #include "../hiredis.h"
@ -10,21 +12,21 @@ typedef struct redisAeEvents {
int reading, writing; int reading, writing;
} redisAeEvents; } redisAeEvents;
void redisAeReadEvent(aeEventLoop *el, int fd, void *privdata, int mask) { static void redisAeReadEvent(aeEventLoop *el, int fd, void *privdata, int mask) {
((void)el); ((void)fd); ((void)mask); ((void)el); ((void)fd); ((void)mask);
redisAeEvents *e = (redisAeEvents*)privdata; redisAeEvents *e = (redisAeEvents*)privdata;
redisAsyncHandleRead(e->context); redisAsyncHandleRead(e->context);
} }
void redisAeWriteEvent(aeEventLoop *el, int fd, void *privdata, int mask) { static void redisAeWriteEvent(aeEventLoop *el, int fd, void *privdata, int mask) {
((void)el); ((void)fd); ((void)mask); ((void)el); ((void)fd); ((void)mask);
redisAeEvents *e = (redisAeEvents*)privdata; redisAeEvents *e = (redisAeEvents*)privdata;
redisAsyncHandleWrite(e->context); redisAsyncHandleWrite(e->context);
} }
void redisAeAddRead(void *privdata) { static void redisAeAddRead(void *privdata) {
redisAeEvents *e = (redisAeEvents*)privdata; redisAeEvents *e = (redisAeEvents*)privdata;
aeEventLoop *loop = e->loop; aeEventLoop *loop = e->loop;
if (!e->reading) { if (!e->reading) {
@ -33,7 +35,7 @@ void redisAeAddRead(void *privdata) {
} }
} }
void redisAeDelRead(void *privdata) { static void redisAeDelRead(void *privdata) {
redisAeEvents *e = (redisAeEvents*)privdata; redisAeEvents *e = (redisAeEvents*)privdata;
aeEventLoop *loop = e->loop; aeEventLoop *loop = e->loop;
if (e->reading) { if (e->reading) {
@ -42,7 +44,7 @@ void redisAeDelRead(void *privdata) {
} }
} }
void redisAeAddWrite(void *privdata) { static void redisAeAddWrite(void *privdata) {
redisAeEvents *e = (redisAeEvents*)privdata; redisAeEvents *e = (redisAeEvents*)privdata;
aeEventLoop *loop = e->loop; aeEventLoop *loop = e->loop;
if (!e->writing) { if (!e->writing) {
@ -51,7 +53,7 @@ void redisAeAddWrite(void *privdata) {
} }
} }
void redisAeDelWrite(void *privdata) { static void redisAeDelWrite(void *privdata) {
redisAeEvents *e = (redisAeEvents*)privdata; redisAeEvents *e = (redisAeEvents*)privdata;
aeEventLoop *loop = e->loop; aeEventLoop *loop = e->loop;
if (e->writing) { if (e->writing) {
@ -60,14 +62,14 @@ void redisAeDelWrite(void *privdata) {
} }
} }
void redisAeCleanup(void *privdata) { static void redisAeCleanup(void *privdata) {
redisAeEvents *e = (redisAeEvents*)privdata; redisAeEvents *e = (redisAeEvents*)privdata;
redisAeDelRead(privdata); redisAeDelRead(privdata);
redisAeDelWrite(privdata); redisAeDelWrite(privdata);
free(e); free(e);
} }
int redisAeAttach(aeEventLoop *loop, redisAsyncContext *ac) { static int redisAeAttach(aeEventLoop *loop, redisAsyncContext *ac) {
redisContext *c = &(ac->c); redisContext *c = &(ac->c);
redisAeEvents *e; redisAeEvents *e;
@ -92,4 +94,4 @@ int redisAeAttach(aeEventLoop *loop, redisAsyncContext *ac) {
return REDIS_OK; return REDIS_OK;
} }
#endif

@ -1,3 +1,6 @@
#ifndef __HIREDIS_LIBEV_H__
#define __HIREDIS_LIBEV_H__
#include <stdlib.h>
#include <sys/types.h> #include <sys/types.h>
#include <ev.h> #include <ev.h>
#include "../hiredis.h" #include "../hiredis.h"
@ -10,7 +13,7 @@ typedef struct redisLibevEvents {
ev_io rev, wev; ev_io rev, wev;
} redisLibevEvents; } redisLibevEvents;
void redisLibevReadEvent(EV_P_ ev_io *watcher, int revents) { static void redisLibevReadEvent(EV_P_ ev_io *watcher, int revents) {
#if EV_MULTIPLICITY #if EV_MULTIPLICITY
((void)loop); ((void)loop);
#endif #endif
@ -20,7 +23,7 @@ void redisLibevReadEvent(EV_P_ ev_io *watcher, int revents) {
redisAsyncHandleRead(e->context); redisAsyncHandleRead(e->context);
} }
void redisLibevWriteEvent(EV_P_ ev_io *watcher, int revents) { static void redisLibevWriteEvent(EV_P_ ev_io *watcher, int revents) {
#if EV_MULTIPLICITY #if EV_MULTIPLICITY
((void)loop); ((void)loop);
#endif #endif
@ -30,7 +33,7 @@ void redisLibevWriteEvent(EV_P_ ev_io *watcher, int revents) {
redisAsyncHandleWrite(e->context); redisAsyncHandleWrite(e->context);
} }
void redisLibevAddRead(void *privdata) { static void redisLibevAddRead(void *privdata) {
redisLibevEvents *e = (redisLibevEvents*)privdata; redisLibevEvents *e = (redisLibevEvents*)privdata;
struct ev_loop *loop = e->loop; struct ev_loop *loop = e->loop;
((void)loop); ((void)loop);
@ -40,7 +43,7 @@ void redisLibevAddRead(void *privdata) {
} }
} }
void redisLibevDelRead(void *privdata) { static void redisLibevDelRead(void *privdata) {
redisLibevEvents *e = (redisLibevEvents*)privdata; redisLibevEvents *e = (redisLibevEvents*)privdata;
struct ev_loop *loop = e->loop; struct ev_loop *loop = e->loop;
((void)loop); ((void)loop);
@ -50,7 +53,7 @@ void redisLibevDelRead(void *privdata) {
} }
} }
void redisLibevAddWrite(void *privdata) { static void redisLibevAddWrite(void *privdata) {
redisLibevEvents *e = (redisLibevEvents*)privdata; redisLibevEvents *e = (redisLibevEvents*)privdata;
struct ev_loop *loop = e->loop; struct ev_loop *loop = e->loop;
((void)loop); ((void)loop);
@ -60,7 +63,7 @@ void redisLibevAddWrite(void *privdata) {
} }
} }
void redisLibevDelWrite(void *privdata) { static void redisLibevDelWrite(void *privdata) {
redisLibevEvents *e = (redisLibevEvents*)privdata; redisLibevEvents *e = (redisLibevEvents*)privdata;
struct ev_loop *loop = e->loop; struct ev_loop *loop = e->loop;
((void)loop); ((void)loop);
@ -70,14 +73,14 @@ void redisLibevDelWrite(void *privdata) {
} }
} }
void redisLibevCleanup(void *privdata) { static void redisLibevCleanup(void *privdata) {
redisLibevEvents *e = (redisLibevEvents*)privdata; redisLibevEvents *e = (redisLibevEvents*)privdata;
redisLibevDelRead(privdata); redisLibevDelRead(privdata);
redisLibevDelWrite(privdata); redisLibevDelWrite(privdata);
free(e); free(e);
} }
int redisLibevAttach(EV_P_ redisAsyncContext *ac) { static int redisLibevAttach(EV_P_ redisAsyncContext *ac) {
redisContext *c = &(ac->c); redisContext *c = &(ac->c);
redisLibevEvents *e; redisLibevEvents *e;
@ -111,3 +114,4 @@ int redisLibevAttach(EV_P_ redisAsyncContext *ac) {
return REDIS_OK; return REDIS_OK;
} }
#endif

@ -1,3 +1,5 @@
#ifndef __HIREDIS_LIBEVENT_H__
#define __HIREDIS_LIBEVENT_H__
#include <event.h> #include <event.h>
#include "../hiredis.h" #include "../hiredis.h"
#include "../async.h" #include "../async.h"
@ -7,46 +9,46 @@ typedef struct redisLibeventEvents {
struct event rev, wev; struct event rev, wev;
} redisLibeventEvents; } redisLibeventEvents;
void redisLibeventReadEvent(int fd, short event, void *arg) { static void redisLibeventReadEvent(int fd, short event, void *arg) {
((void)fd); ((void)event); ((void)fd); ((void)event);
redisLibeventEvents *e = (redisLibeventEvents*)arg; redisLibeventEvents *e = (redisLibeventEvents*)arg;
redisAsyncHandleRead(e->context); redisAsyncHandleRead(e->context);
} }
void redisLibeventWriteEvent(int fd, short event, void *arg) { static void redisLibeventWriteEvent(int fd, short event, void *arg) {
((void)fd); ((void)event); ((void)fd); ((void)event);
redisLibeventEvents *e = (redisLibeventEvents*)arg; redisLibeventEvents *e = (redisLibeventEvents*)arg;
redisAsyncHandleWrite(e->context); redisAsyncHandleWrite(e->context);
} }
void redisLibeventAddRead(void *privdata) { static void redisLibeventAddRead(void *privdata) {
redisLibeventEvents *e = (redisLibeventEvents*)privdata; redisLibeventEvents *e = (redisLibeventEvents*)privdata;
event_add(&e->rev,NULL); event_add(&e->rev,NULL);
} }
void redisLibeventDelRead(void *privdata) { static void redisLibeventDelRead(void *privdata) {
redisLibeventEvents *e = (redisLibeventEvents*)privdata; redisLibeventEvents *e = (redisLibeventEvents*)privdata;
event_del(&e->rev); event_del(&e->rev);
} }
void redisLibeventAddWrite(void *privdata) { static void redisLibeventAddWrite(void *privdata) {
redisLibeventEvents *e = (redisLibeventEvents*)privdata; redisLibeventEvents *e = (redisLibeventEvents*)privdata;
event_add(&e->wev,NULL); event_add(&e->wev,NULL);
} }
void redisLibeventDelWrite(void *privdata) { static void redisLibeventDelWrite(void *privdata) {
redisLibeventEvents *e = (redisLibeventEvents*)privdata; redisLibeventEvents *e = (redisLibeventEvents*)privdata;
event_del(&e->wev); event_del(&e->wev);
} }
void redisLibeventCleanup(void *privdata) { static void redisLibeventCleanup(void *privdata) {
redisLibeventEvents *e = (redisLibeventEvents*)privdata; redisLibeventEvents *e = (redisLibeventEvents*)privdata;
event_del(&e->rev); event_del(&e->rev);
event_del(&e->wev); event_del(&e->wev);
free(e); free(e);
} }
int redisLibeventAttach(redisAsyncContext *ac, struct event_base *base) { static int redisLibeventAttach(redisAsyncContext *ac, struct event_base *base) {
redisContext *c = &(ac->c); redisContext *c = &(ac->c);
redisLibeventEvents *e; redisLibeventEvents *e;
@ -73,3 +75,4 @@ int redisLibeventAttach(redisAsyncContext *ac, struct event_base *base) {
event_base_set(base,&e->wev); event_base_set(base,&e->wev);
return REDIS_OK; return REDIS_OK;
} }
#endif

@ -29,14 +29,34 @@
* POSSIBILITY OF SUCH DAMAGE. * POSSIBILITY OF SUCH DAMAGE.
*/ */
#include "fmacros.h"
#include <stdlib.h>
#include <string.h> #include <string.h>
#include <strings.h> #include <strings.h>
#include <assert.h> #include <assert.h>
#include <ctype.h> #include <ctype.h>
#include <errno.h>
#include "async.h" #include "async.h"
#include "net.h"
#include "dict.c" #include "dict.c"
#include "sds.h" #include "sds.h"
#define _EL_ADD_READ(ctx) do { \
if ((ctx)->ev.addRead) (ctx)->ev.addRead((ctx)->ev.data); \
} while(0)
#define _EL_DEL_READ(ctx) do { \
if ((ctx)->ev.delRead) (ctx)->ev.delRead((ctx)->ev.data); \
} while(0)
#define _EL_ADD_WRITE(ctx) do { \
if ((ctx)->ev.addWrite) (ctx)->ev.addWrite((ctx)->ev.data); \
} while(0)
#define _EL_DEL_WRITE(ctx) do { \
if ((ctx)->ev.delWrite) (ctx)->ev.delWrite((ctx)->ev.data); \
} while(0)
#define _EL_CLEANUP(ctx) do { \
if ((ctx)->ev.cleanup) (ctx)->ev.cleanup((ctx)->ev.data); \
} while(0);
/* Forward declaration of function in hiredis.c */ /* Forward declaration of function in hiredis.c */
void __redisAppendCommand(redisContext *c, char *cmd, size_t len); void __redisAppendCommand(redisContext *c, char *cmd, size_t len);
@ -142,7 +162,7 @@ int redisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn
/* The common way to detect an established connection is to wait for /* The common way to detect an established connection is to wait for
* the first write event to be fired. This assumes the related event * the first write event to be fired. This assumes the related event
* library functions are already set. */ * library functions are already set. */
if (ac->ev.addWrite) ac->ev.addWrite(ac->ev.data); _EL_ADD_WRITE(ac);
return REDIS_OK; return REDIS_OK;
} }
return REDIS_ERR; return REDIS_ERR;
@ -230,7 +250,7 @@ static void __redisAsyncFree(redisAsyncContext *ac) {
dictRelease(ac->sub.patterns); dictRelease(ac->sub.patterns);
/* Signal event lib to clean up */ /* Signal event lib to clean up */
if (ac->ev.cleanup) ac->ev.cleanup(ac->ev.data); _EL_CLEANUP(ac);
/* Execute disconnect callback. When redisAsyncFree() initiated destroying /* Execute disconnect callback. When redisAsyncFree() initiated destroying
* this context, the status will always be REDIS_OK. */ * this context, the status will always be REDIS_OK. */
@ -346,7 +366,7 @@ void redisProcessCallbacks(redisAsyncContext *ac) {
while((status = redisGetReply(c,&reply)) == REDIS_OK) { while((status = redisGetReply(c,&reply)) == REDIS_OK) {
if (reply == NULL) { if (reply == NULL) {
/* When the connection is being disconnected and there are /* When the connection is being disconnected and there are
* no more replies, this is the cue to really disconnect. */ * no more replies, this is the cue to really disconnect. */
if (c->flags & REDIS_DISCONNECTING && sdslen(c->obuf) == 0) { if (c->flags & REDIS_DISCONNECTING && sdslen(c->obuf) == 0) {
__redisAsyncDisconnect(ac); __redisAsyncDisconnect(ac);
@ -397,23 +417,53 @@ void redisProcessCallbacks(redisAsyncContext *ac) {
} }
} }
/* Disconnect when there was an error reading the reply */ /* Disconnect when there was an error reading the reply */
if (status != REDIS_OK) if (status != REDIS_OK)
__redisAsyncDisconnect(ac); __redisAsyncDisconnect(ac);
} }
/* Internal helper function to detect socket status the first time a read or
* write event fires. When connecting was not succesful, the connect callback
* is called with a REDIS_ERR status and the context is free'd. */
static int __redisAsyncHandleConnect(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
if (redisCheckSocketError(c,c->fd) == REDIS_ERR) {
/* Try again later when connect(2) is still in progress. */
if (errno == EINPROGRESS)
return REDIS_OK;
if (ac->onConnect) ac->onConnect(ac,REDIS_ERR);
__redisAsyncDisconnect(ac);
return REDIS_ERR;
}
/* Mark context as connected. */
c->flags |= REDIS_CONNECTED;
if (ac->onConnect) ac->onConnect(ac,REDIS_OK);
return REDIS_OK;
}
/* This function should be called when the socket is readable. /* This function should be called when the socket is readable.
* It processes all replies that can be read and executes their callbacks. * It processes all replies that can be read and executes their callbacks.
*/ */
void redisAsyncHandleRead(redisAsyncContext *ac) { void redisAsyncHandleRead(redisAsyncContext *ac) {
redisContext *c = &(ac->c); redisContext *c = &(ac->c);
if (!(c->flags & REDIS_CONNECTED)) {
/* Abort connect was not successful. */
if (__redisAsyncHandleConnect(ac) != REDIS_OK)
return;
/* Try again later when the context is still not connected. */
if (!(c->flags & REDIS_CONNECTED))
return;
}
if (redisBufferRead(c) == REDIS_ERR) { if (redisBufferRead(c) == REDIS_ERR) {
__redisAsyncDisconnect(ac); __redisAsyncDisconnect(ac);
} else { } else {
/* Always re-schedule reads */ /* Always re-schedule reads */
if (ac->ev.addRead) ac->ev.addRead(ac->ev.data); _EL_ADD_READ(ac);
redisProcessCallbacks(ac); redisProcessCallbacks(ac);
} }
} }
@ -422,24 +472,26 @@ void redisAsyncHandleWrite(redisAsyncContext *ac) {
redisContext *c = &(ac->c); redisContext *c = &(ac->c);
int done = 0; int done = 0;
if (!(c->flags & REDIS_CONNECTED)) {
/* Abort connect was not successful. */
if (__redisAsyncHandleConnect(ac) != REDIS_OK)
return;
/* Try again later when the context is still not connected. */
if (!(c->flags & REDIS_CONNECTED))
return;
}
if (redisBufferWrite(c,&done) == REDIS_ERR) { if (redisBufferWrite(c,&done) == REDIS_ERR) {
__redisAsyncDisconnect(ac); __redisAsyncDisconnect(ac);
} else { } else {
/* Continue writing when not done, stop writing otherwise */ /* Continue writing when not done, stop writing otherwise */
if (!done) { if (!done)
if (ac->ev.addWrite) ac->ev.addWrite(ac->ev.data); _EL_ADD_WRITE(ac);
} else { else
if (ac->ev.delWrite) ac->ev.delWrite(ac->ev.data); _EL_DEL_WRITE(ac);
}
/* Always schedule reads after writes */ /* Always schedule reads after writes */
if (ac->ev.addRead) ac->ev.addRead(ac->ev.data); _EL_ADD_READ(ac);
/* Fire onConnect when this is the first write event. */
if (!(c->flags & REDIS_CONNECTED)) {
c->flags |= REDIS_CONNECTED;
if (ac->onConnect) ac->onConnect(ac);
}
} }
} }
@ -517,7 +569,7 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void
__redisAppendCommand(c,cmd,len); __redisAppendCommand(c,cmd,len);
/* Always schedule a write when the write buffer is non-empty */ /* Always schedule a write when the write buffer is non-empty */
if (ac->ev.addWrite) ac->ev.addWrite(ac->ev.data); _EL_ADD_WRITE(ac);
return REDIS_OK; return REDIS_OK;
} }

@ -55,7 +55,7 @@ typedef struct redisCallbackList {
/* Connection callback prototypes */ /* Connection callback prototypes */
typedef void (redisDisconnectCallback)(const struct redisAsyncContext*, int status); typedef void (redisDisconnectCallback)(const struct redisAsyncContext*, int status);
typedef void (redisConnectCallback)(const struct redisAsyncContext*); typedef void (redisConnectCallback)(const struct redisAsyncContext*, int status);
/* Context for an async connection to Redis */ /* Context for an async connection to Redis */
typedef struct redisAsyncContext { typedef struct redisAsyncContext {

@ -18,17 +18,20 @@ void getCallback(redisAsyncContext *c, void *r, void *privdata) {
redisAsyncDisconnect(c); redisAsyncDisconnect(c);
} }
void connectCallback(const redisAsyncContext *c) { void connectCallback(const redisAsyncContext *c, int status) {
((void)c); if (status != REDIS_OK) {
printf("connected...\n"); printf("Error: %s\n", c->errstr);
return;
}
printf("Connected...\n");
} }
void disconnectCallback(const redisAsyncContext *c, int status) { void disconnectCallback(const redisAsyncContext *c, int status) {
if (status != REDIS_OK) { if (status != REDIS_OK) {
printf("Error: %s\n", c->errstr); printf("Error: %s\n", c->errstr);
return;
} }
printf("disconnected...\n"); printf("Disconnected...\n");
aeStop(loop);
} }
int main (int argc, char **argv) { int main (int argc, char **argv) {

@ -15,16 +15,20 @@ void getCallback(redisAsyncContext *c, void *r, void *privdata) {
redisAsyncDisconnect(c); redisAsyncDisconnect(c);
} }
void connectCallback(const redisAsyncContext *c) { void connectCallback(const redisAsyncContext *c, int status) {
((void)c); if (status != REDIS_OK) {
printf("connected...\n"); printf("Error: %s\n", c->errstr);
return;
}
printf("Connected...\n");
} }
void disconnectCallback(const redisAsyncContext *c, int status) { void disconnectCallback(const redisAsyncContext *c, int status) {
if (status != REDIS_OK) { if (status != REDIS_OK) {
printf("Error: %s\n", c->errstr); printf("Error: %s\n", c->errstr);
return;
} }
printf("disconnected...\n"); printf("Disconnected...\n");
} }
int main (int argc, char **argv) { int main (int argc, char **argv) {

@ -15,16 +15,20 @@ void getCallback(redisAsyncContext *c, void *r, void *privdata) {
redisAsyncDisconnect(c); redisAsyncDisconnect(c);
} }
void connectCallback(const redisAsyncContext *c) { void connectCallback(const redisAsyncContext *c, int status) {
((void)c); if (status != REDIS_OK) {
printf("connected...\n"); printf("Error: %s\n", c->errstr);
return;
}
printf("Connected...\n");
} }
void disconnectCallback(const redisAsyncContext *c, int status) { void disconnectCallback(const redisAsyncContext *c, int status) {
if (status != REDIS_OK) { if (status != REDIS_OK) {
printf("Error: %s\n", c->errstr); printf("Error: %s\n", c->errstr);
return;
} }
printf("disconnected...\n"); printf("Disconnected...\n");
} }
int main (int argc, char **argv) { int main (int argc, char **argv) {

@ -1,12 +1,14 @@
#ifndef __HIREDIS_FMACRO_H #ifndef __HIREDIS_FMACRO_H
#define __HIREDIS_FMACRO_H #define __HIREDIS_FMACRO_H
#ifndef _BSD_SOURCE #if !defined(_BSD_SOURCE)
#define _BSD_SOURCE #define _BSD_SOURCE
#endif #endif
#ifdef __linux__ #if defined(__sun__)
#define _XOPEN_SOURCE 700 #define _POSIX_C_SOURCE 200112L
#elif defined(__linux__)
#define _XOPEN_SOURCE 600
#else #else
#define _XOPEN_SOURCE #define _XOPEN_SOURCE
#endif #endif

@ -358,12 +358,7 @@ static int processLineItem(redisReader *r) {
char *p; char *p;
int len; int len;
cur->poff = (r->pos-r->roff)-1;
cur->coff = cur->poff+1;
if ((p = readLine(r,&len)) != NULL) { if ((p = readLine(r,&len)) != NULL) {
cur->plen = 1+len+2; /* include \r\n */
cur->clen = len;
if (cur->type == REDIS_REPLY_INTEGER) { if (cur->type == REDIS_REPLY_INTEGER) {
if (r->fn && r->fn->createInteger) if (r->fn && r->fn->createInteger)
obj = r->fn->createInteger(cur,readLongLong(p)); obj = r->fn->createInteger(cur,readLongLong(p));
@ -402,13 +397,10 @@ static int processBulkItem(redisReader *r) {
p = r->buf+r->pos; p = r->buf+r->pos;
s = seekNewline(p,r->len-r->pos); s = seekNewline(p,r->len-r->pos);
if (s != NULL) { if (s != NULL) {
p = r->buf+r->pos;
bytelen = s-(r->buf+r->pos)+2; /* include \r\n */ bytelen = s-(r->buf+r->pos)+2; /* include \r\n */
cur->poff = (r->pos-r->roff)-1;
cur->plen = bytelen+1;
cur->coff = cur->poff+1+bytelen;
cur->clen = 0;
len = readLongLong(p); len = readLongLong(p);
if (len < 0) { if (len < 0) {
/* The nil object can always be created. */ /* The nil object can always be created. */
if (r->fn && r->fn->createNil) if (r->fn && r->fn->createNil)
@ -420,8 +412,6 @@ static int processBulkItem(redisReader *r) {
/* Only continue when the buffer contains the entire bulk item. */ /* Only continue when the buffer contains the entire bulk item. */
bytelen += len+2; /* include \r\n */ bytelen += len+2; /* include \r\n */
if (r->pos+bytelen <= r->len) { if (r->pos+bytelen <= r->len) {
cur->plen += len+2;
cur->clen = len;
if (r->fn && r->fn->createString) if (r->fn && r->fn->createString)
obj = r->fn->createString(cur,s+2,len); obj = r->fn->createString(cur,s+2,len);
else else
@ -456,19 +446,14 @@ static int processMultiBulkItem(redisReader *r) {
long elements; long elements;
int root = 0; int root = 0;
/* Set error for nested multi bulks with depth > 1 */ /* Set error for nested multi bulks with depth > 2 */
if (r->ridx == 2) { if (r->ridx == 3) {
__redisReaderSetError(r,REDIS_ERR_PROTOCOL, __redisReaderSetError(r,REDIS_ERR_PROTOCOL,
"No support for nested multi bulk replies with depth > 1"); "No support for nested multi bulk replies with depth > 2");
return REDIS_ERR; return REDIS_ERR;
} }
cur->poff = (r->pos-r->roff)-1;
cur->coff = 0;
if ((p = readLine(r,NULL)) != NULL) { if ((p = readLine(r,NULL)) != NULL) {
cur->plen = (r->pos-r->roff)-cur->poff; /* includes \r\n */
cur->clen = 0;
elements = readLongLong(p); elements = readLongLong(p);
root = (r->ridx == 0); root = (r->ridx == 0);
@ -605,7 +590,7 @@ int redisReaderFeed(redisReader *r, const char *buf, size_t len) {
/* Copy the provided buffer. */ /* Copy the provided buffer. */
if (buf != NULL && len >= 1) { if (buf != NULL && len >= 1) {
/* Destroy buffer when it is empty and is quite large. */ /* Destroy internal buffer when it is empty and is quite large. */
if (r->len == 0 && sdsavail(r->buf) > 16*1024) { if (r->len == 0 && sdsavail(r->buf) > 16*1024) {
sdsfree(r->buf); sdsfree(r->buf);
r->buf = sdsempty(); r->buf = sdsempty();
@ -615,15 +600,6 @@ int redisReaderFeed(redisReader *r, const char *buf, size_t len) {
assert(r->buf != NULL); assert(r->buf != NULL);
} }
/* Discard consumed part of the buffer when the offset for the reply
* that is currently being read is high enough. */
if (r->roff >= 1024) {
r->buf = sdsrange(r->buf,r->roff,-1);
r->pos -= r->roff;
r->roff = 0;
r->len = sdslen(r->buf);
}
newbuf = sdscatlen(r->buf,buf,len); newbuf = sdscatlen(r->buf,buf,len);
if (newbuf == NULL) { if (newbuf == NULL) {
__redisReaderSetErrorOOM(r); __redisReaderSetErrorOOM(r);
@ -659,7 +635,6 @@ int redisReaderGetReply(redisReader *r, void **reply) {
r->rstack[0].parent = NULL; r->rstack[0].parent = NULL;
r->rstack[0].privdata = r->privdata; r->rstack[0].privdata = r->privdata;
r->ridx = 0; r->ridx = 0;
r->roff = r->pos; /* Start offset in buffer. */
} }
/* Process items in reply. */ /* Process items in reply. */
@ -671,6 +646,14 @@ int redisReaderGetReply(redisReader *r, void **reply) {
if (r->err) if (r->err)
return REDIS_ERR; return REDIS_ERR;
/* Discard part of the buffer when we've consumed at least 1k, to avoid
* doing unnecessary calls to memmove() in sds.c. */
if (r->pos >= 1024) {
r->buf = sdsrange(r->buf,r->pos,-1);
r->pos = 0;
r->len = sdslen(r->buf);
}
/* Emit a reply when there is one. */ /* Emit a reply when there is one. */
if (r->ridx == -1) { if (r->ridx == -1) {
if (reply != NULL) if (reply != NULL)
@ -680,17 +663,6 @@ int redisReaderGetReply(redisReader *r, void **reply) {
return REDIS_OK; return REDIS_OK;
} }
const char *redisReaderGetRaw(redisReader *r, size_t *len) {
/* ridx == -1: No or a full reply has been read. */
/* pos > roff: Buffer position is larger than start offset, meaning
* the buffer has not yet been truncated. */
if (r->ridx == -1 && r->pos > r->roff) {
if (len) *len = (r->pos-r->roff);
return r->buf+r->roff;
}
return NULL;
}
/* Calculate the number of bytes needed to represent an integer as string. */ /* Calculate the number of bytes needed to represent an integer as string. */
static int intlen(int i) { static int intlen(int i) {
int len = 0; int len = 0;
@ -777,6 +749,7 @@ int redisvFormatCommand(char **target, const char *format, va_list ap) {
default: default:
/* Try to detect printf format */ /* Try to detect printf format */
{ {
static const char intfmts[] = "diouxX";
char _format[16]; char _format[16];
const char *_p = c+1; const char *_p = c+1;
size_t _l = 0; size_t _l = 0;
@ -798,33 +771,79 @@ int redisvFormatCommand(char **target, const char *format, va_list ap) {
while (*_p != '\0' && isdigit(*_p)) _p++; while (*_p != '\0' && isdigit(*_p)) _p++;
} }
/* Modifiers */ /* Copy va_list before consuming with va_arg */
if (*_p != '\0') { va_copy(_cpy,ap);
if (*_p == 'h' || *_p == 'l') {
/* Allow a single repetition for these modifiers */ /* Integer conversion (without modifiers) */
if (_p[0] == _p[1]) _p++; if (strchr(intfmts,*_p) != NULL) {
_p++; va_arg(ap,int);
goto fmt_valid;
}
/* Double conversion (without modifiers) */
if (strchr("eEfFgGaA",*_p) != NULL) {
va_arg(ap,double);
goto fmt_valid;
}
/* Size: char */
if (_p[0] == 'h' && _p[1] == 'h') {
_p += 2;
if (*_p != '\0' && strchr(intfmts,*_p) != NULL) {
va_arg(ap,int); /* char gets promoted to int */
goto fmt_valid;
}
goto fmt_invalid;
}
/* Size: short */
if (_p[0] == 'h') {
_p += 1;
if (*_p != '\0' && strchr(intfmts,*_p) != NULL) {
va_arg(ap,int); /* short gets promoted to int */
goto fmt_valid;
} }
goto fmt_invalid;
} }
/* Conversion specifier */ /* Size: long long */
if (*_p != '\0' && strchr("diouxXeEfFgGaA",*_p) != NULL) { if (_p[0] == 'l' && _p[1] == 'l') {
_l = (_p+1)-c; _p += 2;
if (_l < sizeof(_format)-2) { if (*_p != '\0' && strchr(intfmts,*_p) != NULL) {
memcpy(_format,c,_l); va_arg(ap,long long);
_format[_l] = '\0'; goto fmt_valid;
va_copy(_cpy,ap);
newarg = sdscatvprintf(curarg,_format,_cpy);
va_end(_cpy);
/* Update current position (note: outer blocks
* increment c twice so compensate here) */
c = _p-1;
} }
goto fmt_invalid;
}
/* Size: long */
if (_p[0] == 'l') {
_p += 1;
if (*_p != '\0' && strchr(intfmts,*_p) != NULL) {
va_arg(ap,long);
goto fmt_valid;
}
goto fmt_invalid;
}
fmt_invalid:
va_end(_cpy);
goto err;
fmt_valid:
_l = (_p+1)-c;
if (_l < sizeof(_format)-2) {
memcpy(_format,c,_l);
_format[_l] = '\0';
newarg = sdscatvprintf(curarg,_format,_cpy);
/* Update current position (note: outer blocks
* increment c twice so compensate here) */
c = _p-1;
} }
/* Consume and discard vararg */ va_end(_cpy);
va_arg(ap,void); break;
} }
} }
@ -1079,10 +1098,10 @@ int redisBufferRead(redisContext *c) {
* *
* Returns REDIS_OK when the buffer is empty, or (a part of) the buffer was * Returns REDIS_OK when the buffer is empty, or (a part of) the buffer was
* succesfully written to the socket. When the buffer is empty after the * succesfully written to the socket. When the buffer is empty after the
* write operation, "wdone" is set to 1 (if given). * write operation, "done" is set to 1 (if given).
* *
* Returns REDIS_ERR if an error occured trying to write and sets * Returns REDIS_ERR if an error occured trying to write and sets
* c->error to hold the appropriate error string. * c->errstr to hold the appropriate error string.
*/ */
int redisBufferWrite(redisContext *c, int *done) { int redisBufferWrite(redisContext *c, int *done) {
int nwritten; int nwritten;
@ -1133,7 +1152,6 @@ int redisGetReply(redisContext *c, void **reply) {
/* For the blocking context, flush output buffer and read reply */ /* For the blocking context, flush output buffer and read reply */
if (aux == NULL && c->flags & REDIS_BLOCK) { if (aux == NULL && c->flags & REDIS_BLOCK) {
printf("BLOCKING CONTEXT\n");
/* Write until done */ /* Write until done */
do { do {
if (redisBufferWrite(c,&wdone) == REDIS_ERR) if (redisBufferWrite(c,&wdone) == REDIS_ERR)

@ -37,7 +37,7 @@
#define HIREDIS_MAJOR 0 #define HIREDIS_MAJOR 0
#define HIREDIS_MINOR 10 #define HIREDIS_MINOR 10
#define HIREDIS_PATCH 0 #define HIREDIS_PATCH 1
#define REDIS_ERR -1 #define REDIS_ERR -1
#define REDIS_OK 0 #define REDIS_OK 0
@ -98,11 +98,6 @@ typedef struct redisReply {
} redisReply; } redisReply;
typedef struct redisReadTask { typedef struct redisReadTask {
size_t poff; /* Protocol offset */
size_t plen; /* Protocol length */
size_t coff; /* Content offset */
size_t clen; /* Content length */
int type; int type;
int elements; /* number of elements in multibulk container */ int elements; /* number of elements in multibulk container */
int idx; /* index in parent (array) object */ int idx; /* index in parent (array) object */
@ -127,9 +122,8 @@ typedef struct redisReader {
char *buf; /* Read buffer */ char *buf; /* Read buffer */
size_t pos; /* Buffer cursor */ size_t pos; /* Buffer cursor */
size_t len; /* Buffer length */ size_t len; /* Buffer length */
size_t roff; /* Reply offset */
redisReadTask rstack[3]; redisReadTask rstack[4];
int ridx; /* Index of current read task */ int ridx; /* Index of current read task */
void *reply; /* Temporary reply pointer */ void *reply; /* Temporary reply pointer */
@ -142,7 +136,6 @@ redisReader *redisReaderCreate(void);
void redisReaderFree(redisReader *r); void redisReaderFree(redisReader *r);
int redisReaderFeed(redisReader *r, const char *buf, size_t len); int redisReaderFeed(redisReader *r, const char *buf, size_t len);
int redisReaderGetReply(redisReader *r, void **reply); int redisReaderGetReply(redisReader *r, void **reply);
const char *redisReaderGetRaw(redisReader *r, size_t *len);
/* Backwards compatibility, can be removed on big version bump. */ /* Backwards compatibility, can be removed on big version bump. */
#define redisReplyReaderCreate redisReaderCreate #define redisReplyReaderCreate redisReaderCreate

@ -45,6 +45,8 @@
#include <errno.h> #include <errno.h>
#include <stdarg.h> #include <stdarg.h>
#include <stdio.h> #include <stdio.h>
#include <poll.h>
#include <limits.h>
#include "net.h" #include "net.h"
#include "sds.h" #include "sds.h"
@ -62,16 +64,24 @@ static void __redisSetErrorFromErrno(redisContext *c, int type, const char *pref
__redisSetError(c,type,buf); __redisSetError(c,type,buf);
} }
static int redisSetReuseAddr(redisContext *c, int fd) {
int on = 1;
if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) == -1) {
__redisSetErrorFromErrno(c,REDIS_ERR_IO,NULL);
close(fd);
return REDIS_ERR;
}
return REDIS_OK;
}
static int redisCreateSocket(redisContext *c, int type) { static int redisCreateSocket(redisContext *c, int type) {
int s, on = 1; int s;
if ((s = socket(type, SOCK_STREAM, 0)) == -1) { if ((s = socket(type, SOCK_STREAM, 0)) == -1) {
__redisSetErrorFromErrno(c,REDIS_ERR_IO,NULL); __redisSetErrorFromErrno(c,REDIS_ERR_IO,NULL);
return REDIS_ERR; return REDIS_ERR;
} }
if (type == AF_INET) { if (type == AF_INET) {
if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) == -1) { if (redisSetReuseAddr(c,s) == REDIS_ERR) {
__redisSetErrorFromErrno(c,REDIS_ERR_IO,NULL);
close(s);
return REDIS_ERR; return REDIS_ERR;
} }
} }
@ -113,50 +123,46 @@ static int redisSetTcpNoDelay(redisContext *c, int fd) {
return REDIS_OK; return REDIS_OK;
} }
#define __MAX_MSEC (((LONG_MAX) - 999) / 1000)
static int redisContextWaitReady(redisContext *c, int fd, const struct timeval *timeout) { static int redisContextWaitReady(redisContext *c, int fd, const struct timeval *timeout) {
struct timeval to; struct pollfd wfd[1];
struct timeval *toptr = NULL; long msec;
fd_set wfd;
int err; msec = -1;
socklen_t errlen; wfd[0].fd = fd;
wfd[0].events = POLLOUT;
/* Only use timeout when not NULL. */ /* Only use timeout when not NULL. */
if (timeout != NULL) { if (timeout != NULL) {
to = *timeout; if (timeout->tv_usec > 1000000 || timeout->tv_sec > __MAX_MSEC) {
toptr = &to; close(fd);
return REDIS_ERR;
}
msec = (timeout->tv_sec * 1000) + ((timeout->tv_usec + 999) / 1000);
if (msec < 0 || msec > INT_MAX) {
msec = INT_MAX;
}
} }
if (errno == EINPROGRESS) { if (errno == EINPROGRESS) {
FD_ZERO(&wfd); int res;
FD_SET(fd, &wfd);
if (select(FD_SETSIZE, NULL, &wfd, NULL, toptr) == -1) { if ((res = poll(wfd, 1, msec)) == -1) {
__redisSetErrorFromErrno(c,REDIS_ERR_IO,"select(2)"); __redisSetErrorFromErrno(c, REDIS_ERR_IO, "poll(2)");
close(fd); close(fd);
return REDIS_ERR; return REDIS_ERR;
} } else if (res == 0) {
if (!FD_ISSET(fd, &wfd)) {
errno = ETIMEDOUT; errno = ETIMEDOUT;
__redisSetErrorFromErrno(c,REDIS_ERR_IO,NULL); __redisSetErrorFromErrno(c,REDIS_ERR_IO,NULL);
close(fd); close(fd);
return REDIS_ERR; return REDIS_ERR;
} }
err = 0; if (redisCheckSocketError(c, fd) != REDIS_OK)
errlen = sizeof(err);
if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &errlen) == -1) {
__redisSetErrorFromErrno(c,REDIS_ERR_IO,"getsockopt(SO_ERROR)");
close(fd);
return REDIS_ERR;
}
if (err) {
errno = err;
__redisSetErrorFromErrno(c,REDIS_ERR_IO,NULL);
close(fd);
return REDIS_ERR; return REDIS_ERR;
}
return REDIS_OK; return REDIS_OK;
} }
@ -166,6 +172,26 @@ static int redisContextWaitReady(redisContext *c, int fd, const struct timeval *
return REDIS_ERR; return REDIS_ERR;
} }
int redisCheckSocketError(redisContext *c, int fd) {
int err = 0;
socklen_t errlen = sizeof(err);
if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &errlen) == -1) {
__redisSetErrorFromErrno(c,REDIS_ERR_IO,"getsockopt(SO_ERROR)");
close(fd);
return REDIS_ERR;
}
if (err) {
errno = err;
__redisSetErrorFromErrno(c,REDIS_ERR_IO,NULL);
close(fd);
return REDIS_ERR;
}
return REDIS_OK;
}
int redisContextSetTimeout(redisContext *c, struct timeval tv) { int redisContextSetTimeout(redisContext *c, struct timeval tv) {
if (setsockopt(c->fd,SOL_SOCKET,SO_RCVTIMEO,&tv,sizeof(tv)) == -1) { if (setsockopt(c->fd,SOL_SOCKET,SO_RCVTIMEO,&tv,sizeof(tv)) == -1) {
__redisSetErrorFromErrno(c,REDIS_ERR_IO,"setsockopt(SO_RCVTIMEO)"); __redisSetErrorFromErrno(c,REDIS_ERR_IO,"setsockopt(SO_RCVTIMEO)");
@ -179,50 +205,59 @@ int redisContextSetTimeout(redisContext *c, struct timeval tv) {
} }
int redisContextConnectTcp(redisContext *c, const char *addr, int port, struct timeval *timeout) { int redisContextConnectTcp(redisContext *c, const char *addr, int port, struct timeval *timeout) {
int s; int s, rv;
char _port[6]; /* strlen("65535"); */
struct addrinfo hints, *servinfo, *p;
int blocking = (c->flags & REDIS_BLOCK); int blocking = (c->flags & REDIS_BLOCK);
struct sockaddr_in sa;
if ((s = redisCreateSocket(c,AF_INET)) < 0) snprintf(_port, 6, "%d", port);
return REDIS_ERR; memset(&hints,0,sizeof(hints));
if (redisSetBlocking(c,s,0) != REDIS_OK) hints.ai_family = AF_INET;
return REDIS_ERR; hints.ai_socktype = SOCK_STREAM;
sa.sin_family = AF_INET; if ((rv = getaddrinfo(addr,_port,&hints,&servinfo)) != 0) {
sa.sin_port = htons(port); __redisSetError(c,REDIS_ERR_OTHER,gai_strerror(rv));
if (inet_aton(addr, &sa.sin_addr) == 0) { return REDIS_ERR;
struct hostent *he;
he = gethostbyname(addr);
if (he == NULL) {
char buf[128];
snprintf(buf,sizeof(buf),"Can't resolve: %s", addr);
__redisSetError(c,REDIS_ERR_OTHER,buf);
close(s);
return REDIS_ERR;
}
memcpy(&sa.sin_addr, he->h_addr, sizeof(struct in_addr));
} }
for (p = servinfo; p != NULL; p = p->ai_next) {
if (connect(s, (struct sockaddr*)&sa, sizeof(sa)) == -1) { if ((s = socket(p->ai_family,p->ai_socktype,p->ai_protocol)) == -1)
if (errno == EINPROGRESS && !blocking) { continue;
/* This is ok. */
} else { if (redisSetBlocking(c,s,0) != REDIS_OK)
if (redisContextWaitReady(c,s,timeout) != REDIS_OK) goto error;
return REDIS_ERR; if (connect(s,p->ai_addr,p->ai_addrlen) == -1) {
if (errno == EHOSTUNREACH) {
close(s);
continue;
} else if (errno == EINPROGRESS && !blocking) {
/* This is ok. */
} else {
if (redisContextWaitReady(c,s,timeout) != REDIS_OK)
goto error;
}
} }
if (blocking && redisSetBlocking(c,s,1) != REDIS_OK)
goto error;
if (redisSetTcpNoDelay(c,s) != REDIS_OK)
goto error;
c->fd = s;
c->flags |= REDIS_CONNECTED;
rv = REDIS_OK;
goto end;
}
if (p == NULL) {
char buf[128];
snprintf(buf,sizeof(buf),"Can't create socket: %s",strerror(errno));
__redisSetError(c,REDIS_ERR_OTHER,buf);
goto error;
} }
/* Reset socket to be blocking after connect(2). */ error:
if (blocking && redisSetBlocking(c,s,1) != REDIS_OK) rv = REDIS_ERR;
return REDIS_ERR; end:
freeaddrinfo(servinfo);
if (redisSetTcpNoDelay(c,s) != REDIS_OK) return rv; // Need to return REDIS_OK if alright
return REDIS_ERR;
c->fd = s;
c->flags |= REDIS_CONNECTED;
return REDIS_OK;
} }
int redisContextConnectUnix(redisContext *c, const char *path, struct timeval *timeout) { int redisContextConnectUnix(redisContext *c, const char *path, struct timeval *timeout) {

@ -39,6 +39,7 @@
#define AF_LOCAL AF_UNIX #define AF_LOCAL AF_UNIX
#endif #endif
int redisCheckSocketError(redisContext *c, int fd);
int redisContextSetTimeout(redisContext *c, struct timeval tv); int redisContextSetTimeout(redisContext *c, struct timeval tv);
int redisContextConnectTcp(redisContext *c, const char *addr, int port, struct timeval *timeout); int redisContextConnectTcp(redisContext *c, const char *addr, int port, struct timeval *timeout);
int redisContextConnectUnix(redisContext *c, const char *path, struct timeval *timeout); int redisContextConnectUnix(redisContext *c, const char *path, struct timeval *timeout);

@ -1,3 +1,4 @@
#include "fmacros.h"
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
@ -31,7 +32,7 @@ struct config {
/* The following lines make up our testing "framework" :) */ /* The following lines make up our testing "framework" :) */
static int tests = 0, fails = 0; static int tests = 0, fails = 0;
#define test(_s) { printf("#%02d ", ++tests); printf(_s); } #define test(_s) { printf("#%02d ", ++tests); printf(_s); }
#define test_cond(_c) if(_c) printf("PASSED\n"); else {printf("FAILED\n"); fails++;} #define test_cond(_c) if(_c) printf("\033[0;32mPASSED\033[0;0m\n"); else {printf("\033[0;31mFAILED\033[0;0m\n"); fails++;}
static long long usec(void) { static long long usec(void) {
struct timeval tv; struct timeval tv;
@ -77,7 +78,7 @@ static void disconnect(redisContext *c) {
} }
static redisContext *connect(struct config config) { static redisContext *connect(struct config config) {
redisContext *c; redisContext *c = NULL;
if (config.type == CONN_TCP) { if (config.type == CONN_TCP) {
c = redisConnect(config.tcp.host, config.tcp.port); c = redisConnect(config.tcp.host, config.tcp.port);
@ -141,29 +142,43 @@ static void test_format_commands(void) {
len == 4+4+(3+2)+4+(1+2)+4+(1+2)); len == 4+4+(3+2)+4+(1+2)+4+(1+2));
free(cmd); free(cmd);
test("Format command with printf-delegation (long long): "); /* Vararg width depends on the type. These tests make sure that the
len = redisFormatCommand(&cmd,"key:%08lld",1234ll); * width is correctly determined using the format and subsequent varargs
test_cond(strncmp(cmd,"*1\r\n$12\r\nkey:00001234\r\n",len) == 0 && * can correctly be interpolated. */
len == 4+5+(12+2)); #define INTEGER_WIDTH_TEST(fmt, type) do { \
free(cmd); type value = 123; \
test("Format command with printf-delegation (" #type "): "); \
test("Format command with printf-delegation (float): "); len = redisFormatCommand(&cmd,"key:%08" fmt " str:%s", value, "hello"); \
len = redisFormatCommand(&cmd,"v:%06.1f",12.34f); test_cond(strncmp(cmd,"*2\r\n$12\r\nkey:00000123\r\n$9\r\nstr:hello\r\n",len) == 0 && \
test_cond(strncmp(cmd,"*1\r\n$8\r\nv:0012.3\r\n",len) == 0 && len == 4+5+(12+2)+4+(9+2)); \
len == 4+4+(8+2)); free(cmd); \
free(cmd); } while(0)
test("Format command with printf-delegation and extra interpolation: "); #define FLOAT_WIDTH_TEST(type) do { \
len = redisFormatCommand(&cmd,"key:%d %b",1234,"foo",3); type value = 123.0; \
test_cond(strncmp(cmd,"*2\r\n$8\r\nkey:1234\r\n$3\r\nfoo\r\n",len) == 0 && test("Format command with printf-delegation (" #type "): "); \
len == 4+4+(8+2)+4+(3+2)); len = redisFormatCommand(&cmd,"key:%08.3f str:%s", value, "hello"); \
free(cmd); test_cond(strncmp(cmd,"*2\r\n$12\r\nkey:0123.000\r\n$9\r\nstr:hello\r\n",len) == 0 && \
len == 4+5+(12+2)+4+(9+2)); \
test("Format command with wrong printf format and extra interpolation: "); free(cmd); \
len = redisFormatCommand(&cmd,"key:%08p %b",1234,"foo",3); } while(0)
test_cond(strncmp(cmd,"*2\r\n$6\r\nkey:8p\r\n$3\r\nfoo\r\n",len) == 0 &&
len == 4+4+(6+2)+4+(3+2)); INTEGER_WIDTH_TEST("d", int);
free(cmd); INTEGER_WIDTH_TEST("hhd", char);
INTEGER_WIDTH_TEST("hd", short);
INTEGER_WIDTH_TEST("ld", long);
INTEGER_WIDTH_TEST("lld", long long);
INTEGER_WIDTH_TEST("u", unsigned int);
INTEGER_WIDTH_TEST("hhu", unsigned char);
INTEGER_WIDTH_TEST("hu", unsigned short);
INTEGER_WIDTH_TEST("lu", unsigned long);
INTEGER_WIDTH_TEST("llu", unsigned long long);
FLOAT_WIDTH_TEST(float);
FLOAT_WIDTH_TEST(double);
test("Format command with invalid printf format: ");
len = redisFormatCommand(&cmd,"key:%08p %b",(void*)1234,"foo",3);
test_cond(len == -1);
const char *argv[3]; const char *argv[3];
argv[0] = "SET"; argv[0] = "SET";
@ -210,11 +225,12 @@ static void test_reply_reader(void) {
strcasecmp(reader->errstr,"Protocol error, got \"@\" as reply type byte") == 0); strcasecmp(reader->errstr,"Protocol error, got \"@\" as reply type byte") == 0);
redisReaderFree(reader); redisReaderFree(reader);
test("Set error on nested multi bulks with depth > 1: "); test("Set error on nested multi bulks with depth > 2: ");
reader = redisReaderCreate(); reader = redisReaderCreate();
redisReaderFeed(reader,(char*)"*1\r\n",4); redisReaderFeed(reader,(char*)"*1\r\n",4);
redisReaderFeed(reader,(char*)"*1\r\n",4); redisReaderFeed(reader,(char*)"*1\r\n",4);
redisReaderFeed(reader,(char*)"*1\r\n",4); redisReaderFeed(reader,(char*)"*1\r\n",4);
redisReaderFeed(reader,(char*)"*1\r\n",4);
ret = redisReaderGetReply(reader,NULL); ret = redisReaderGetReply(reader,NULL);
test_cond(ret == REDIS_ERR && test_cond(ret == REDIS_ERR &&
strncasecmp(reader->errstr,"No support for",14) == 0); strncasecmp(reader->errstr,"No support for",14) == 0);
@ -261,105 +277,14 @@ static void test_reply_reader(void) {
redisReaderFree(reader); redisReaderFree(reader);
} }
static void *test_create_string(const redisReadTask *task, char *str, size_t len) {
redisReader *r = (redisReader*)task->privdata;
const char *roff = r->buf+r->roff;
((void)str); ((void)len);
assert(task->plen > 0);
assert(task->clen > 0);
switch(task->type) {
case REDIS_REPLY_STATUS:
assert(strncmp("+status\r\n", roff+task->poff, task->plen) == 0);
assert(strncmp("status", roff+task->coff, task->clen) == 0);
break;
case REDIS_REPLY_ERROR:
assert(strncmp("-error\r\n", roff+task->poff, task->plen) == 0);
assert(strncmp("error", roff+task->coff, task->clen) == 0);
break;
case REDIS_REPLY_STRING: /* bulk */
assert(strncmp("$4\r\nbulk\r\n", roff+task->poff, task->plen) == 0);
assert(strncmp("bulk", roff+task->coff, task->clen) == 0);
break;
default:
assert(NULL);
}
return (void*)1;
}
static void *test_create_array(const redisReadTask *task, int len) {
redisReader *r = (redisReader*)task->privdata;
const char *roff = r->buf+r->roff;
((void)len);
assert(task->plen > 0);
assert(task->clen == 0);
assert(strncmp("*5\r\n", roff+task->poff, task->plen) == 0);
return (void*)1;
}
static void *test_create_integer(const redisReadTask *task, long long value) {
redisReader *r = (redisReader*)task->privdata;
const char *roff = r->buf+r->roff;
((void)value);
assert(task->plen > 0);
assert(task->clen > 0);
assert(strncmp(":1234\r\n", roff+task->poff, task->plen) == 0);
assert(strncmp("1234", roff+task->coff, task->clen) == 0);
return (void*)1;
}
static void *test_create_nil(const redisReadTask *task) {
redisReader *r = (redisReader*)task->privdata;
const char *roff = r->buf+r->roff;
assert(task->plen > 0);
assert(task->clen == 0);
assert(strncmp("$-1\r\n", roff+task->poff, task->plen) == 0);
return (void*)1;
}
static redisReplyObjectFunctions test_reader_fn = {
test_create_string,
test_create_array,
test_create_integer,
test_create_nil,
NULL
};
static void test_reader_functions(void) {
redisReader *reader;
const char *input;
int ret;
void *obj;
input =
"*5\r\n"
"$-1\r\n"
":1234\r\n"
"+status\r\n"
"-error\r\n"
"$4\r\nbulk\r\n";
test("Custom object functions in reply reader: ");
reader = redisReaderCreate();
reader->fn = &test_reader_fn;
reader->privdata = reader;
redisReaderFeed(reader,input,strlen(input));
ret = redisReaderGetReply(reader,&obj);
test_cond(ret == REDIS_OK && obj == (void*)1);
redisReaderFree(reader);
}
static void test_blocking_connection_errors(void) { static void test_blocking_connection_errors(void) {
redisContext *c; redisContext *c;
test("Returns error when host cannot be resolved: "); test("Returns error when host cannot be resolved: ");
c = redisConnect((char*)"idontexist.local", 6379); c = redisConnect((char*)"idontexist.local", 6379);
test_cond(c->err == REDIS_ERR_OTHER && test_cond(c->err == REDIS_ERR_OTHER &&
strcmp(c->errstr,"Can't resolve: idontexist.local") == 0); (strcmp(c->errstr,"Name or service not known") == 0 ||
strcmp(c->errstr,"Can't resolve: idontexist.local") == 0));
redisFree(c); redisFree(c);
test("Returns error when the port is not open: "); test("Returns error when the port is not open: ");
@ -455,6 +380,7 @@ static void test_blocking_connection(struct config config) {
static void test_blocking_io_errors(struct config config) { static void test_blocking_io_errors(struct config config) {
redisContext *c; redisContext *c;
redisReply *reply; redisReply *reply;
void *_reply;
int major, minor; int major, minor;
/* Connect to target given by config. */ /* Connect to target given by config. */
@ -478,7 +404,7 @@ static void test_blocking_io_errors(struct config config) {
/* > 2.0 returns OK on QUIT and read() should be issued once more /* > 2.0 returns OK on QUIT and read() should be issued once more
* to know the descriptor is at EOF. */ * to know the descriptor is at EOF. */
test_cond(strcasecmp(reply->str,"OK") == 0 && test_cond(strcasecmp(reply->str,"OK") == 0 &&
redisGetReply(c,(void**)&reply) == REDIS_ERR); redisGetReply(c,&_reply) == REDIS_ERR);
freeReplyObject(reply); freeReplyObject(reply);
} else { } else {
test_cond(reply == NULL); test_cond(reply == NULL);
@ -497,7 +423,7 @@ static void test_blocking_io_errors(struct config config) {
test("Returns I/O error on socket timeout: "); test("Returns I/O error on socket timeout: ");
struct timeval tv = { 0, 1000 }; struct timeval tv = { 0, 1000 };
assert(redisSetTimeout(c,tv) == REDIS_OK); assert(redisSetTimeout(c,tv) == REDIS_OK);
test_cond(redisGetReply(c,(void**)&reply) == REDIS_ERR && test_cond(redisGetReply(c,&_reply) == REDIS_ERR &&
c->err == REDIS_ERR_IO && errno == EAGAIN); c->err == REDIS_ERR_IO && errno == EAGAIN);
redisFree(c); redisFree(c);
} }
@ -704,7 +630,6 @@ int main(int argc, char **argv) {
test_format_commands(); test_format_commands();
test_reply_reader(); test_reply_reader();
test_reader_functions();
test_blocking_connection_errors(); test_blocking_connection_errors();
printf("\nTesting against TCP connection (%s:%d):\n", cfg.tcp.host, cfg.tcp.port); printf("\nTesting against TCP connection (%s:%d):\n", cfg.tcp.host, cfg.tcp.port);

@ -22,11 +22,11 @@ pool_new(struct worker *w, int count) {
} }
static void static void
pool_on_connect(const redisAsyncContext *ac) { pool_on_connect(const redisAsyncContext *ac, int status) {
struct pool *p = ac->data; struct pool *p = ac->data;
int i = 0; int i = 0;
if(!p || ac->err) { if(!p || status == REDIS_ERR || ac->err) {
return; return;
} }
/* connected to redis! */ /* connected to redis! */

Loading…
Cancel
Save