Disconnect broken SUBSCRIBE clients.

master
Nicolas Favre-Felix 14 years ago
parent 21c4413d0d
commit b01cb75db7

@ -4,11 +4,14 @@
#include "server.h"
#include "worker.h"
#include "websocket.h"
#include "cmd.h"
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <hiredis/hiredis.h>
#include <hiredis/async.h>
static int
http_client_on_url(struct http_parser *p, const char *at, size_t sz) {
@ -245,7 +248,14 @@ http_client_read(struct http_client *c) {
ret = read(c->fd, buffer, sizeof(buffer));
if(ret <= 0) {
/* broken link, free buffer and client object */
/* disconnect pub/sub client if there is one. */
if(c->pub_sub) {
redisAsyncDisconnect(c->pub_sub->ac);
}
close(c->fd);
http_client_free(c);
return -1;
}

@ -7,6 +7,7 @@
struct http_header;
struct server;
struct cmd;
typedef enum {
LAST_CB_NONE = 0,
@ -49,6 +50,8 @@ struct http_client {
char *type; /* forced output content-type */
char *jsonp; /* jsonp wrapper */
struct cmd *pub_sub;
};
struct http_client *

20
cmd.c

@ -126,7 +126,6 @@ cmd_run(struct worker *w, struct http_client *client,
int param_count = 0, cur_param = 1;
struct cmd *cmd;
redisAsyncContext *ac = NULL;
formatting_fun f_format;
/* count arguments */
@ -174,19 +173,22 @@ cmd_run(struct worker *w, struct http_client *client,
if(cmd_is_subscribe(cmd)) {
/* create a new connection to Redis */
ac = (redisAsyncContext*)pool_connect(w->pool, 0);
cmd->ac = (redisAsyncContext*)pool_connect(w->pool, 0);
/* register with the client, used upon disconnection */
client->pub_sub = cmd;
} else {
/* get a connection from the pool */
ac = (redisAsyncContext*)pool_get_context(w->pool);
cmd->ac = (redisAsyncContext*)pool_get_context(w->pool);
}
/* no args (e.g. INFO command) */
if(!slash) {
if(!ac) {
if(!cmd->ac) {
cmd_free(cmd);
return CMD_REDIS_UNAVAIL;
}
redisAsyncCommandArgv(ac, f_format, cmd, 1,
redisAsyncCommandArgv(cmd->ac, f_format, cmd, 1,
(const char **)cmd->argv, cmd->argv_len);
return CMD_SENT;
}
@ -216,8 +218,8 @@ cmd_run(struct worker *w, struct http_client *client,
}
/* send it off! */
if(ac) {
cmd_send(ac, f_format, cmd);
if(cmd->ac) {
cmd_send(cmd, f_format);
return CMD_SENT;
}
/* failed to find a suitable connection to Redis. */
@ -226,8 +228,8 @@ cmd_run(struct worker *w, struct http_client *client,
}
void
cmd_send(redisAsyncContext *ac, formatting_fun f_format, struct cmd *cmd) {
redisAsyncCommandArgv(ac, f_format, cmd, cmd->count,
cmd_send(struct cmd *cmd, formatting_fun f_format) {
redisAsyncCommandArgv(cmd->ac, f_format, cmd, cmd->count,
(const char **)cmd->argv, cmd->argv_len);
}

@ -37,6 +37,8 @@ struct cmd {
int started_responding;
int is_websocket;
int http_version;
redisAsyncContext *ac;
};
struct subscription {
@ -63,7 +65,7 @@ int
cmd_is_subscribe(struct cmd *cmd);
void
cmd_send(redisAsyncContext *ac, formatting_fun f_format, struct cmd *cmd);
cmd_send(struct cmd *cmd, formatting_fun f_format);
void
cmd_setup(struct cmd *cmd, struct http_client *client);

@ -173,10 +173,10 @@ ws_execute(struct http_client *c, const char *frame, size_t frame_len) {
cmd->is_websocket = 1;
/* get Redis connection from pool */
redisAsyncContext *ac = (redisAsyncContext*)pool_get_context(c->w->pool);
cmd->ac = (redisAsyncContext*)pool_get_context(c->w->pool);
/* send it off */
cmd_send(ac, fun_reply, cmd);
cmd_send(cmd, fun_reply);
return 0;
}

Loading…
Cancel
Save