diff --git a/src/client.c b/src/client.c index 3e8eda4..c991895 100644 --- a/src/client.c +++ b/src/client.c @@ -286,15 +286,15 @@ http_client_read(struct http_client *c) { if(ret <= 0) { /* broken link, free buffer and client object */ - /* disconnect pub/sub client if there is one. */ - if(c->pub_sub && c->pub_sub->ac) { - struct cmd *cmd = c->pub_sub; + /* disconnect pub/sub or WS client if there is one. */ + if(c->self_cmd && c->self_cmd->ac) { + struct cmd *cmd = c->self_cmd; /* disconnect from all channels */ - redisAsyncDisconnect(c->pub_sub->ac); - // c->pub_sub might be already cleared by an event handler in redisAsyncDisconnect + redisAsyncDisconnect(c->self_cmd->ac); + // c->self_cmd might be already cleared by an event handler in redisAsyncDisconnect cmd->ac = NULL; - c->pub_sub = NULL; + c->self_cmd = NULL; /* delete command object */ cmd_free(cmd); diff --git a/src/client.h b/src/client.h index 6b38992..ae571fe 100644 --- a/src/client.h +++ b/src/client.h @@ -61,9 +61,12 @@ struct http_client { char *separator; /* list separator for raw lists */ char *filename; /* content-disposition */ - struct cmd *pub_sub; + struct cmd *self_cmd; - struct ws_msg *frame; /* websocket frame */ + struct ws_msg *frame; /* websocket frame (containing *received* data) */ + struct event ws_wev; /* websocket write event */ + struct evbuffer *ws_wbuf; /* write buffer for websocket responses */ + int ws_scheduled_write; /* whether we are already scheduled to send out WS data */ }; struct http_client * diff --git a/src/cmd.c b/src/cmd.c index 290df87..03a2821 100644 --- a/src/cmd.c +++ b/src/cmd.c @@ -22,11 +22,12 @@ #include struct cmd * -cmd_new(int count) { +cmd_new(struct http_client *client, int count) { struct cmd *c = calloc(1, sizeof(struct cmd)); c->count = count; + c->http_client = client; c->argv = calloc(count, sizeof(char*)); c->argv_len = calloc(count, sizeof(size_t)); @@ -164,7 +165,7 @@ cmd_run(struct worker *w, struct http_client *client, return CMD_PARAM_ERROR; } - cmd = cmd_new(param_count); + cmd = cmd_new(client, param_count); cmd->fd = client->fd; cmd->database = w->s->cfg->database; @@ -224,7 +225,7 @@ cmd_run(struct worker *w, struct http_client *client, cmd->ac = (redisAsyncContext*)pool_connect(w->pool, cmd->database, 0); /* register with the client, used upon disconnection */ - client->pub_sub = cmd; + client->self_cmd = cmd; cmd->pub_sub_client = client; } else if(cmd->database != w->s->cfg->database) { /* create a new connection to Redis for custom DBs */ @@ -276,7 +277,7 @@ cmd_run(struct worker *w, struct http_client *client, } /* failed to find a suitable connection to Redis. */ cmd_free(cmd); - client->pub_sub = NULL; + client->self_cmd = NULL; return CMD_REDIS_UNAVAIL; } @@ -370,6 +371,9 @@ cmd_select_format(struct http_client *client, struct cmd *cmd, int cmd_is_subscribe(struct cmd *cmd) { + if(cmd->pub_sub_client) { /* persistent command */ + return 1; + } if(cmd->count >= 1 && cmd->argv[0] && (strncasecmp(cmd->argv[0], "SUBSCRIBE", cmd->argv_len[0]) == 0 || strncasecmp(cmd->argv[0], "PSUBSCRIBE", cmd->argv_len[0]) == 0)) { diff --git a/src/cmd.h b/src/cmd.h index e216bde..196a732 100644 --- a/src/cmd.h +++ b/src/cmd.h @@ -43,6 +43,7 @@ struct cmd { int http_version; int database; + struct http_client *http_client; struct http_client *pub_sub_client; redisAsyncContext *ac; struct worker *w; @@ -54,7 +55,7 @@ struct subscription { }; struct cmd * -cmd_new(int count); +cmd_new(struct http_client *c, int count); void cmd_free(struct cmd *c); diff --git a/src/formats/common.c b/src/formats/common.c index 296b822..7842935 100644 --- a/src/formats/common.c +++ b/src/formats/common.c @@ -48,7 +48,7 @@ format_send_error(struct cmd *cmd, short code, const char *msg) { /* for pub/sub, remove command from client */ if(cmd->pub_sub_client) { - cmd->pub_sub_client->pub_sub = NULL; + cmd->pub_sub_client->self_cmd = NULL; } else { cmd_free(cmd); } @@ -62,7 +62,7 @@ format_send_reply(struct cmd *cmd, const char *p, size_t sz, const char *content struct http_response *resp; if(cmd->is_websocket) { - ws_reply(cmd, p, sz); + ws_frame_and_send_response(cmd, p, sz); /* If it's a subscribe command, there'll be more responses */ if(!cmd_is_subscribe(cmd)) diff --git a/src/formats/json.c b/src/formats/json.c index 006bb1f..0cbcb52 100644 --- a/src/formats/json.c +++ b/src/formats/json.c @@ -522,7 +522,7 @@ json_ws_extract(struct http_client *c, const char *p, size_t sz) { } /* create command and add args */ - cmd = cmd_new(argc); + cmd = cmd_new(c, argc); for(i = 0, cur = 0; i < json_array_size(j); ++i) { json_t *jelem = json_array_get(j, i); char *tmp; diff --git a/src/formats/raw.c b/src/formats/raw.c index 20c3927..7caff97 100644 --- a/src/formats/raw.c +++ b/src/formats/raw.c @@ -62,7 +62,7 @@ raw_ws_extract(struct http_client *c, const char *p, size_t sz) { } /* create cmd object */ - cmd = cmd_new(reply->elements); + cmd = cmd_new(c, reply->elements); for(i = 0; i < reply->elements; ++i) { redisReply *ri = reply->element[i]; diff --git a/src/websocket.c b/src/websocket.c index e0891fd..0bd337b 100644 --- a/src/websocket.c +++ b/src/websocket.c @@ -19,6 +19,9 @@ #include #include +static void +ws_schedule_write(struct http_client *c); + /** * This code uses the WebSocket specification from RFC 6455. * A copy is available at http://www.rfc-editor.org/rfc/rfc6455.txt @@ -171,15 +174,22 @@ ws_handshake_reply(struct http_client *c) { memcpy(p, template_end, sizeof(template_end)-1); p += sizeof(template_end)-1; - /* build HTTP response object by hand, since we have the full response already */ - struct http_response *r = http_response_init_with_buffer(c->w, buffer, sz, 1); - if(!r) { + /* create buffer that will hold data to send out */ + c->ws_wbuf = evbuffer_new(); + if(!c->ws_wbuf) { slog(c->s, WEBDIS_ERROR, "Failed to allocate response for WS handshake", 0); free(buffer); return -1; } - http_schedule_write(c->fd, r); /* will free buffer and response once sent */ + int add_ret = evbuffer_add(c->ws_wbuf, buffer, sz); + if(add_ret < 0) { + slog(c->s, WEBDIS_ERROR, "Failed to add response for WS handshake", 0); + free(buffer); + return -1; + } + + ws_schedule_write(c); /* will free buffer and response once sent */ return 0; } @@ -210,20 +220,17 @@ ws_execute(struct http_client *c, const char *frame, size_t frame_len) { cmd_setup(cmd, c); cmd->is_websocket = 1; - if (c->pub_sub != NULL) { + if (c->self_cmd != NULL) { /* This client already has its own connection - * to Redis due to a subscription; use it from + * to Redis from a previous command; use it from * now on. */ - cmd->ac = c->pub_sub->ac; - } else if (cmd_is_subscribe(cmd)) { - /* New subscribe command; make new Redis context + cmd->ac = c->self_cmd->ac; + } else { + /* First WS command; make new Redis context * for this client */ cmd->ac = pool_connect(c->w->pool, cmd->database, 0); - c->pub_sub = cmd; + c->self_cmd = cmd; cmd->pub_sub_client = c; - } else { - /* get Redis connection from pool */ - cmd->ac = (redisAsyncContext*)pool_get_context(c->w->pool); } /* send it off */ @@ -348,11 +355,10 @@ ws_add_data(struct http_client *c) { } int -ws_reply(struct cmd *cmd, const char *p, size_t sz) { +ws_frame_and_send_response(struct cmd *cmd, const char *p, size_t sz) { char *frame = malloc(sz + 8); /* create frame by prepending header */ size_t frame_sz = 0; - struct http_response *r; if (frame == NULL) return -1; @@ -383,15 +389,50 @@ ws_reply(struct cmd *cmd, const char *p, size_t sz) { } /* mark as keep alive, otherwise we'll close the connection after the first reply */ - r = http_response_init_with_buffer(cmd->w, frame, frame_sz, 1); - if (r == NULL) { + int add_ret = evbuffer_add(cmd->http_client->ws_wbuf, frame, frame_sz); + if (add_ret < 0) { free(frame); - slog(cmd->w->s, WEBDIS_ERROR, "Failed response allocation in ws_reply", 0); + slog(cmd->w->s, WEBDIS_ERROR, "Failed response allocation in ws_frame_and_send_response", 0); return -1; } /* send WS frame */ - http_schedule_write(cmd->fd, r); + ws_schedule_write(cmd->http_client); return 0; } + +static void +ws_can_write(int fd, short event, void *p) { + + int ret; + struct http_client *c = p; + (void)event; + + c->ws_scheduled_write = 0; + + /* send pending data */ + ret = evbuffer_write(c->ws_wbuf, fd); + + if(ret < 0) { + close(fd); + } else if(ret > 0 && evbuffer_get_length(c->ws_wbuf) > 0) { /* more data to send */ + ws_schedule_write(c); + } +} + +static void +ws_schedule_write(struct http_client *c) { + + if(c->ws_scheduled_write) { + return; + } + event_set(&c->ws_wev, c->fd, EV_WRITE, ws_can_write, c); + event_base_set(c->w->base, &c->ws_wev); + int ret = event_add(&c->ws_wev, NULL); + if(ret == 0) { + c->ws_scheduled_write = 1; + } else { /* could not schedule write */ + slog(c->w->s, WEBDIS_ERROR, "Could not schedule WS write", 0); + } +} diff --git a/src/websocket.h b/src/websocket.h index dc9d764..09886ca 100644 --- a/src/websocket.h +++ b/src/websocket.h @@ -25,6 +25,6 @@ enum ws_state ws_add_data(struct http_client *c); int -ws_reply(struct cmd *cmd, const char *p, size_t sz); +ws_frame_and_send_response(struct cmd *cmd, const char *p, size_t sz); #endif