Persistent cmd for WS, write buffer for responses

1. Only HTTP-based pub-sub clients were re-using a cmd object, but
   WS clients were not. This led to the commands sent by a WS client
   to be processed out of order, just queued to Redis but with no
   guarantee that they would be de-queued from the event loop in the
   same order. This change attaches a permanent cmd object (with its
   associated Redis context) to WS clients just like pub-sub clients
   do.

2. WS responses are also no longer sent out of order, but added to a
   write buffer that is scheduled for writing as long as there is still
   some data left to send. This replaces the use of http_response which
   contained extra fields (headers, HTTP response) that were duplicated
   without ever being sent out.
master
Jessie Murray 3 years ago
parent 052458e876
commit b98116abc8
No known key found for this signature in database
GPG Key ID: E7E4D57EDDA744C5

@ -286,15 +286,15 @@ http_client_read(struct http_client *c) {
if(ret <= 0) { if(ret <= 0) {
/* broken link, free buffer and client object */ /* broken link, free buffer and client object */
/* disconnect pub/sub client if there is one. */ /* disconnect pub/sub or WS client if there is one. */
if(c->pub_sub && c->pub_sub->ac) { if(c->self_cmd && c->self_cmd->ac) {
struct cmd *cmd = c->pub_sub; struct cmd *cmd = c->self_cmd;
/* disconnect from all channels */ /* disconnect from all channels */
redisAsyncDisconnect(c->pub_sub->ac); redisAsyncDisconnect(c->self_cmd->ac);
// c->pub_sub might be already cleared by an event handler in redisAsyncDisconnect // c->self_cmd might be already cleared by an event handler in redisAsyncDisconnect
cmd->ac = NULL; cmd->ac = NULL;
c->pub_sub = NULL; c->self_cmd = NULL;
/* delete command object */ /* delete command object */
cmd_free(cmd); cmd_free(cmd);

@ -61,9 +61,12 @@ struct http_client {
char *separator; /* list separator for raw lists */ char *separator; /* list separator for raw lists */
char *filename; /* content-disposition */ char *filename; /* content-disposition */
struct cmd *pub_sub; struct cmd *self_cmd;
struct ws_msg *frame; /* websocket frame */ struct ws_msg *frame; /* websocket frame (containing *received* data) */
struct event ws_wev; /* websocket write event */
struct evbuffer *ws_wbuf; /* write buffer for websocket responses */
int ws_scheduled_write; /* whether we are already scheduled to send out WS data */
}; };
struct http_client * struct http_client *

@ -22,11 +22,12 @@
#include <ctype.h> #include <ctype.h>
struct cmd * struct cmd *
cmd_new(int count) { cmd_new(struct http_client *client, int count) {
struct cmd *c = calloc(1, sizeof(struct cmd)); struct cmd *c = calloc(1, sizeof(struct cmd));
c->count = count; c->count = count;
c->http_client = client;
c->argv = calloc(count, sizeof(char*)); c->argv = calloc(count, sizeof(char*));
c->argv_len = calloc(count, sizeof(size_t)); c->argv_len = calloc(count, sizeof(size_t));
@ -164,7 +165,7 @@ cmd_run(struct worker *w, struct http_client *client,
return CMD_PARAM_ERROR; return CMD_PARAM_ERROR;
} }
cmd = cmd_new(param_count); cmd = cmd_new(client, param_count);
cmd->fd = client->fd; cmd->fd = client->fd;
cmd->database = w->s->cfg->database; cmd->database = w->s->cfg->database;
@ -224,7 +225,7 @@ cmd_run(struct worker *w, struct http_client *client,
cmd->ac = (redisAsyncContext*)pool_connect(w->pool, cmd->database, 0); cmd->ac = (redisAsyncContext*)pool_connect(w->pool, cmd->database, 0);
/* register with the client, used upon disconnection */ /* register with the client, used upon disconnection */
client->pub_sub = cmd; client->self_cmd = cmd;
cmd->pub_sub_client = client; cmd->pub_sub_client = client;
} else if(cmd->database != w->s->cfg->database) { } else if(cmd->database != w->s->cfg->database) {
/* create a new connection to Redis for custom DBs */ /* create a new connection to Redis for custom DBs */
@ -276,7 +277,7 @@ cmd_run(struct worker *w, struct http_client *client,
} }
/* failed to find a suitable connection to Redis. */ /* failed to find a suitable connection to Redis. */
cmd_free(cmd); cmd_free(cmd);
client->pub_sub = NULL; client->self_cmd = NULL;
return CMD_REDIS_UNAVAIL; return CMD_REDIS_UNAVAIL;
} }
@ -370,6 +371,9 @@ cmd_select_format(struct http_client *client, struct cmd *cmd,
int int
cmd_is_subscribe(struct cmd *cmd) { cmd_is_subscribe(struct cmd *cmd) {
if(cmd->pub_sub_client) { /* persistent command */
return 1;
}
if(cmd->count >= 1 && cmd->argv[0] && if(cmd->count >= 1 && cmd->argv[0] &&
(strncasecmp(cmd->argv[0], "SUBSCRIBE", cmd->argv_len[0]) == 0 || (strncasecmp(cmd->argv[0], "SUBSCRIBE", cmd->argv_len[0]) == 0 ||
strncasecmp(cmd->argv[0], "PSUBSCRIBE", cmd->argv_len[0]) == 0)) { strncasecmp(cmd->argv[0], "PSUBSCRIBE", cmd->argv_len[0]) == 0)) {

@ -43,6 +43,7 @@ struct cmd {
int http_version; int http_version;
int database; int database;
struct http_client *http_client;
struct http_client *pub_sub_client; struct http_client *pub_sub_client;
redisAsyncContext *ac; redisAsyncContext *ac;
struct worker *w; struct worker *w;
@ -54,7 +55,7 @@ struct subscription {
}; };
struct cmd * struct cmd *
cmd_new(int count); cmd_new(struct http_client *c, int count);
void void
cmd_free(struct cmd *c); cmd_free(struct cmd *c);

@ -48,7 +48,7 @@ format_send_error(struct cmd *cmd, short code, const char *msg) {
/* for pub/sub, remove command from client */ /* for pub/sub, remove command from client */
if(cmd->pub_sub_client) { if(cmd->pub_sub_client) {
cmd->pub_sub_client->pub_sub = NULL; cmd->pub_sub_client->self_cmd = NULL;
} else { } else {
cmd_free(cmd); cmd_free(cmd);
} }
@ -62,7 +62,7 @@ format_send_reply(struct cmd *cmd, const char *p, size_t sz, const char *content
struct http_response *resp; struct http_response *resp;
if(cmd->is_websocket) { if(cmd->is_websocket) {
ws_reply(cmd, p, sz); ws_frame_and_send_response(cmd, p, sz);
/* If it's a subscribe command, there'll be more responses */ /* If it's a subscribe command, there'll be more responses */
if(!cmd_is_subscribe(cmd)) if(!cmd_is_subscribe(cmd))

@ -522,7 +522,7 @@ json_ws_extract(struct http_client *c, const char *p, size_t sz) {
} }
/* create command and add args */ /* create command and add args */
cmd = cmd_new(argc); cmd = cmd_new(c, argc);
for(i = 0, cur = 0; i < json_array_size(j); ++i) { for(i = 0, cur = 0; i < json_array_size(j); ++i) {
json_t *jelem = json_array_get(j, i); json_t *jelem = json_array_get(j, i);
char *tmp; char *tmp;

@ -62,7 +62,7 @@ raw_ws_extract(struct http_client *c, const char *p, size_t sz) {
} }
/* create cmd object */ /* create cmd object */
cmd = cmd_new(reply->elements); cmd = cmd_new(c, reply->elements);
for(i = 0; i < reply->elements; ++i) { for(i = 0; i < reply->elements; ++i) {
redisReply *ri = reply->element[i]; redisReply *ri = reply->element[i];

@ -19,6 +19,9 @@
#include <errno.h> #include <errno.h>
#include <sys/param.h> #include <sys/param.h>
static void
ws_schedule_write(struct http_client *c);
/** /**
* This code uses the WebSocket specification from RFC 6455. * This code uses the WebSocket specification from RFC 6455.
* A copy is available at http://www.rfc-editor.org/rfc/rfc6455.txt * A copy is available at http://www.rfc-editor.org/rfc/rfc6455.txt
@ -171,15 +174,22 @@ ws_handshake_reply(struct http_client *c) {
memcpy(p, template_end, sizeof(template_end)-1); memcpy(p, template_end, sizeof(template_end)-1);
p += sizeof(template_end)-1; p += sizeof(template_end)-1;
/* build HTTP response object by hand, since we have the full response already */ /* create buffer that will hold data to send out */
struct http_response *r = http_response_init_with_buffer(c->w, buffer, sz, 1); c->ws_wbuf = evbuffer_new();
if(!r) { if(!c->ws_wbuf) {
slog(c->s, WEBDIS_ERROR, "Failed to allocate response for WS handshake", 0); slog(c->s, WEBDIS_ERROR, "Failed to allocate response for WS handshake", 0);
free(buffer); free(buffer);
return -1; return -1;
} }
http_schedule_write(c->fd, r); /* will free buffer and response once sent */ int add_ret = evbuffer_add(c->ws_wbuf, buffer, sz);
if(add_ret < 0) {
slog(c->s, WEBDIS_ERROR, "Failed to add response for WS handshake", 0);
free(buffer);
return -1;
}
ws_schedule_write(c); /* will free buffer and response once sent */
return 0; return 0;
} }
@ -210,20 +220,17 @@ ws_execute(struct http_client *c, const char *frame, size_t frame_len) {
cmd_setup(cmd, c); cmd_setup(cmd, c);
cmd->is_websocket = 1; cmd->is_websocket = 1;
if (c->pub_sub != NULL) { if (c->self_cmd != NULL) {
/* This client already has its own connection /* This client already has its own connection
* to Redis due to a subscription; use it from * to Redis from a previous command; use it from
* now on. */ * now on. */
cmd->ac = c->pub_sub->ac; cmd->ac = c->self_cmd->ac;
} else if (cmd_is_subscribe(cmd)) { } else {
/* New subscribe command; make new Redis context /* First WS command; make new Redis context
* for this client */ * for this client */
cmd->ac = pool_connect(c->w->pool, cmd->database, 0); cmd->ac = pool_connect(c->w->pool, cmd->database, 0);
c->pub_sub = cmd; c->self_cmd = cmd;
cmd->pub_sub_client = c; cmd->pub_sub_client = c;
} else {
/* get Redis connection from pool */
cmd->ac = (redisAsyncContext*)pool_get_context(c->w->pool);
} }
/* send it off */ /* send it off */
@ -348,11 +355,10 @@ ws_add_data(struct http_client *c) {
} }
int int
ws_reply(struct cmd *cmd, const char *p, size_t sz) { ws_frame_and_send_response(struct cmd *cmd, const char *p, size_t sz) {
char *frame = malloc(sz + 8); /* create frame by prepending header */ char *frame = malloc(sz + 8); /* create frame by prepending header */
size_t frame_sz = 0; size_t frame_sz = 0;
struct http_response *r;
if (frame == NULL) if (frame == NULL)
return -1; return -1;
@ -383,15 +389,50 @@ ws_reply(struct cmd *cmd, const char *p, size_t sz) {
} }
/* mark as keep alive, otherwise we'll close the connection after the first reply */ /* mark as keep alive, otherwise we'll close the connection after the first reply */
r = http_response_init_with_buffer(cmd->w, frame, frame_sz, 1); int add_ret = evbuffer_add(cmd->http_client->ws_wbuf, frame, frame_sz);
if (r == NULL) { if (add_ret < 0) {
free(frame); free(frame);
slog(cmd->w->s, WEBDIS_ERROR, "Failed response allocation in ws_reply", 0); slog(cmd->w->s, WEBDIS_ERROR, "Failed response allocation in ws_frame_and_send_response", 0);
return -1; return -1;
} }
/* send WS frame */ /* send WS frame */
http_schedule_write(cmd->fd, r); ws_schedule_write(cmd->http_client);
return 0; return 0;
} }
static void
ws_can_write(int fd, short event, void *p) {
int ret;
struct http_client *c = p;
(void)event;
c->ws_scheduled_write = 0;
/* send pending data */
ret = evbuffer_write(c->ws_wbuf, fd);
if(ret < 0) {
close(fd);
} else if(ret > 0 && evbuffer_get_length(c->ws_wbuf) > 0) { /* more data to send */
ws_schedule_write(c);
}
}
static void
ws_schedule_write(struct http_client *c) {
if(c->ws_scheduled_write) {
return;
}
event_set(&c->ws_wev, c->fd, EV_WRITE, ws_can_write, c);
event_base_set(c->w->base, &c->ws_wev);
int ret = event_add(&c->ws_wev, NULL);
if(ret == 0) {
c->ws_scheduled_write = 1;
} else { /* could not schedule write */
slog(c->w->s, WEBDIS_ERROR, "Could not schedule WS write", 0);
}
}

@ -25,6 +25,6 @@ enum ws_state
ws_add_data(struct http_client *c); ws_add_data(struct http_client *c);
int int
ws_reply(struct cmd *cmd, const char *p, size_t sz); ws_frame_and_send_response(struct cmd *cmd, const char *p, size_t sz);
#endif #endif

Loading…
Cancel
Save