Create new connection for new separate DBs.

master
Nicolas Favre-Felix 12 years ago
parent 3db6e26ee6
commit 540b1a91bb

12
cmd.c

@ -139,7 +139,6 @@ cmd_run(struct worker *w, struct http_client *client,
const char *p, *cmd_name = uri; const char *p, *cmd_name = uri;
int cmd_len; int cmd_len;
int param_count = 0, cur_param = 1; int param_count = 0, cur_param = 1;
int db_num = w->s->cfg->database;
struct cmd *cmd; struct cmd *cmd;
formatting_fun f_format; formatting_fun f_format;
@ -161,6 +160,7 @@ cmd_run(struct worker *w, struct http_client *client,
cmd = cmd_new(param_count); cmd = cmd_new(param_count);
cmd->fd = client->fd; cmd->fd = client->fd;
cmd->database = w->s->cfg->database;
/* get output formatting function */ /* get output formatting function */
uri_len = cmd_select_format(client, cmd, uri, uri_len, &f_format); uri_len = cmd_select_format(client, cmd, uri, uri_len, &f_format);
@ -174,6 +174,7 @@ cmd_run(struct worker *w, struct http_client *client,
/* detect DB number by checking if first arg is only numbers */ /* detect DB number by checking if first arg is only numbers */
int has_db = 1; int has_db = 1;
int db_num = 0;
for(p = uri; p < slash; ++p) { for(p = uri; p < slash; ++p) {
if(*p < '0' || *p > '9') { if(*p < '0' || *p > '9') {
has_db = 0; has_db = 0;
@ -185,10 +186,12 @@ cmd_run(struct worker *w, struct http_client *client,
/* shift to next arg if a db was set up */ /* shift to next arg if a db was set up */
if(has_db) { if(has_db) {
char *next; char *next;
cmd->database = db_num;
cmd->count--; /* overcounted earlier */
cmd_name = slash + 1; cmd_name = slash + 1;
if((next = memchr(cmd_name, '/', uri_len - (slash - uri)))) { if((next = memchr(cmd_name, '/', uri_len - (slash - uri)))) {
cmd_len = next - uri; cmd_len = next - cmd_name;
} else { } else {
cmd_len = uri_len - (slash - uri + 1); cmd_len = uri_len - (slash - uri + 1);
} }
@ -212,11 +215,14 @@ cmd_run(struct worker *w, struct http_client *client,
if(cmd_is_subscribe(cmd)) { if(cmd_is_subscribe(cmd)) {
/* create a new connection to Redis */ /* create a new connection to Redis */
cmd->ac = (redisAsyncContext*)pool_connect(w->pool, 0); cmd->ac = (redisAsyncContext*)pool_connect(w->pool, cmd->database, 0);
/* register with the client, used upon disconnection */ /* register with the client, used upon disconnection */
client->pub_sub = cmd; client->pub_sub = cmd;
cmd->pub_sub_client = client; cmd->pub_sub_client = client;
} else if(cmd->database != w->s->cfg->database) {
/* create a new connection to Redis for custom DBs */
cmd->ac = (redisAsyncContext*)pool_connect(w->pool, cmd->database, 0);
} else { } else {
/* get a connection from the pool */ /* get a connection from the pool */
cmd->ac = (redisAsyncContext*)pool_get_context(w->pool); cmd->ac = (redisAsyncContext*)pool_get_context(w->pool);

@ -41,6 +41,7 @@ struct cmd {
int started_responding; int started_responding;
int is_websocket; int is_websocket;
int http_version; int http_version;
int database;
struct http_client *pub_sub_client; struct http_client *pub_sub_client;
redisAsyncContext *ac; redisAsyncContext *ac;

@ -57,7 +57,7 @@ pool_can_connect(int fd, short event, void *ptr) {
free(pr); free(pr);
pool_connect(p, 1); pool_connect(p, p->cfg->database, 1);
} }
static void static void
pool_schedule_reconnect(struct pool *p) { pool_schedule_reconnect(struct pool *p) {
@ -104,7 +104,7 @@ pool_on_disconnect(const redisAsyncContext *ac, int status) {
* Create new connection. * Create new connection.
*/ */
redisAsyncContext * redisAsyncContext *
pool_connect(struct pool *p, int attach) { pool_connect(struct pool *p, int db_num, int attach) {
struct redisAsyncContext *ac; struct redisAsyncContext *ac;
if(p->cfg->redis_host[0] == '/') { /* unix socket */ if(p->cfg->redis_host[0] == '/') { /* unix socket */
@ -134,11 +134,11 @@ pool_connect(struct pool *p, int attach) {
redisAsyncSetConnectCallback(ac, pool_on_connect); redisAsyncSetConnectCallback(ac, pool_on_connect);
redisAsyncSetDisconnectCallback(ac, pool_on_disconnect); redisAsyncSetDisconnectCallback(ac, pool_on_disconnect);
if (p->cfg->redis_auth) { /* authenticate. */ if(p->cfg->redis_auth) { /* authenticate. */
redisAsyncCommand(ac, NULL, NULL, "AUTH %s", p->cfg->redis_auth); redisAsyncCommand(ac, NULL, NULL, "AUTH %s", p->cfg->redis_auth);
} }
if (p->cfg->database) { /* change database. */ if(db_num) { /* change database. */
redisAsyncCommand(ac, NULL, NULL, "SELECT %d", p->cfg->database); redisAsyncCommand(ac, NULL, NULL, "SELECT %d", db_num);
} }
return ac; return ac;
} }

@ -22,7 +22,7 @@ struct pool *
pool_new(struct worker *w, int count); pool_new(struct worker *w, int count);
redisAsyncContext * redisAsyncContext *
pool_connect(struct pool *p, int attach); pool_connect(struct pool *p, int db_num, int attach);
const redisAsyncContext * const redisAsyncContext *
pool_get_context(struct pool *p); pool_get_context(struct pool *p);

@ -200,7 +200,7 @@ ws_execute(struct http_client *c, const char *frame, size_t frame_len) {
} else if (cmd_is_subscribe(cmd)) { } else if (cmd_is_subscribe(cmd)) {
/* New subscribe command; make new Redis context /* New subscribe command; make new Redis context
* for this client */ * for this client */
cmd->ac = pool_connect(c->w->pool, 0); cmd->ac = pool_connect(c->w->pool, cmd->database, 0);
c->pub_sub = cmd; c->pub_sub = cmd;
cmd->pub_sub_client = c; cmd->pub_sub_client = c;
} else { } else {

@ -130,7 +130,7 @@ worker_pool_connect(struct worker *w) {
int i; int i;
/* create connections */ /* create connections */
for(i = 0; i < w->pool->count; ++i) { for(i = 0; i < w->pool->count; ++i) {
pool_connect(w->pool, 1); pool_connect(w->pool, w->s->cfg->database, 1);
} }
} }

Loading…
Cancel
Save