diff --git a/cmd.c b/cmd.c
index c309137..0d401d8 100644
--- a/cmd.c
+++ b/cmd.c
@@ -86,6 +86,7 @@ cmd_setup(struct cmd *cmd, struct http_client *client) {
int i;
cmd->keep_alive = client->keep_alive;
+ cmd->w = client->w; /* keep track of the worker */
for(i = 0; i < client->header_count; ++i) {
if(strcasecmp(client->headers[i].key, "If-None-Match") == 0) {
diff --git a/cmd.h b/cmd.h
index ea293da..c527bf6 100644
--- a/cmd.h
+++ b/cmd.h
@@ -43,6 +43,7 @@ struct cmd {
struct http_client *pub_sub_client;
redisAsyncContext *ac;
+ struct worker *w;
};
struct subscription {
diff --git a/formats/common.c b/formats/common.c
index 3cd2e1a..f9dbcd0 100644
--- a/formats/common.c
+++ b/formats/common.c
@@ -34,13 +34,13 @@ char *etag_new(const char *p, size_t sz) {
void
format_send_error(struct cmd *cmd, short code, const char *msg) {
- struct http_response resp;
+ struct http_response *resp;
if(!cmd->is_websocket && !cmd->pub_sub_client) {
- http_response_init(&resp, code, msg);
- resp.http_version = cmd->http_version;
- http_response_set_keep_alive(&resp, cmd->keep_alive);
- http_response_write(&resp, cmd->fd);
+ resp = http_response_init(cmd->w, code, msg);
+ resp->http_version = cmd->http_version;
+ http_response_set_keep_alive(resp, cmd->keep_alive);
+ http_response_write(resp, cmd->fd);
}
/* for pub/sub, remove command from client */
@@ -55,8 +55,8 @@ void
format_send_reply(struct cmd *cmd, const char *p, size_t sz, const char *content_type) {
int free_cmd = 1;
- struct http_response resp;
const char *ct = cmd->mime?cmd->mime:content_type;
+ struct http_response *resp;
if(cmd->is_websocket) {
ws_reply(cmd, p, sz);
@@ -70,17 +70,19 @@ format_send_reply(struct cmd *cmd, const char *p, size_t sz, const char *content
/* start streaming */
if(cmd->started_responding == 0) {
cmd->started_responding = 1;
- http_response_init(&resp, 200, "OK");
- resp.http_version = cmd->http_version;
+ resp = http_response_init(cmd->w, 200, "OK");
+ resp->http_version = cmd->http_version;
if(cmd->filename) {
- http_response_set_header(&resp, "Content-Disposition", cmd->filename);
+ http_response_set_header(resp, "Content-Disposition", cmd->filename);
}
- http_response_set_header(&resp, "Content-Type", ct);
- http_response_set_keep_alive(&resp, 1);
- http_response_set_header(&resp, "Transfer-Encoding", "chunked");
- http_response_write(&resp, cmd->fd);
+ http_response_set_header(resp, "Content-Type", ct);
+ http_response_set_keep_alive(resp, 1);
+ http_response_set_header(resp, "Transfer-Encoding", "chunked");
+ http_response_write(resp, cmd->fd);
}
- http_response_write_chunk(cmd->fd, p, sz);
+
+ /* Asynchronous chunk write. */
+ http_response_write_chunk(cmd->fd, cmd->w, p, sz);
} else {
/* compute ETag */
@@ -89,19 +91,19 @@ format_send_reply(struct cmd *cmd, const char *p, size_t sz, const char *content
/* check If-None-Match */
if(cmd->if_none_match && strcmp(cmd->if_none_match, etag) == 0) {
/* SAME! send 304. */
- http_response_init(&resp, 304, "Not Modified");
+ resp = http_response_init(cmd->w, 304, "Not Modified");
} else {
- http_response_init(&resp, 200, "OK");
+ resp = http_response_init(cmd->w, 200, "OK");
if(cmd->filename) {
- http_response_set_header(&resp, "Content-Disposition", cmd->filename);
+ http_response_set_header(resp, "Content-Disposition", cmd->filename);
}
- http_response_set_header(&resp, "Content-Type", ct);
- http_response_set_header(&resp, "ETag", etag);
- http_response_set_body(&resp, p, sz);
+ http_response_set_header(resp, "Content-Type", ct);
+ http_response_set_header(resp, "ETag", etag);
+ http_response_set_body(resp, p, sz);
}
- resp.http_version = cmd->http_version;
- http_response_set_keep_alive(&resp, cmd->keep_alive);
- http_response_write(&resp, cmd->fd);
+ resp->http_version = cmd->http_version;
+ http_response_set_keep_alive(resp, cmd->keep_alive);
+ http_response_write(resp, cmd->fd);
free(etag);
}
diff --git a/formats/custom-type.c b/formats/custom-type.c
index 0267872..8b0cdeb 100644
--- a/formats/custom-type.c
+++ b/formats/custom-type.c
@@ -15,7 +15,7 @@ custom_type_reply(redisAsyncContext *c, void *r, void *privdata) {
(void)c;
char int_buffer[50];
int int_len;
- struct http_response resp;
+ struct http_response *resp;
if (reply == NULL) { /* broken Redis link */
format_send_error(cmd, 503, "Service Unavailable");
@@ -41,10 +41,10 @@ custom_type_reply(redisAsyncContext *c, void *r, void *privdata) {
}
/* couldn't make sense of what the client wanted. */
- http_response_init(&resp, 400, "Bad Request");
- http_response_set_header(&resp, "Content-Length", "0");
- http_response_set_keep_alive(&resp, cmd->keep_alive);
- http_response_write(&resp, cmd->fd);
+ resp = http_response_init(cmd->w, 400, "Bad Request");
+ http_response_set_header(resp, "Content-Length", "0");
+ http_response_set_keep_alive(resp, cmd->keep_alive);
+ http_response_write(resp, cmd->fd);
if(!cmd_is_subscribe(cmd)) {
cmd_free(cmd);
diff --git a/http.c b/http.c
index fd3081f..89f7ccc 100644
--- a/http.c
+++ b/http.c
@@ -10,20 +10,23 @@
/* HTTP Response */
-void
-http_response_init(struct http_response *r, int code, const char *msg) {
+struct http_response *
+http_response_init(struct worker *w, int code, const char *msg) {
- /* remove any old data */
- memset(r, 0, sizeof(struct http_response));
+ /* create object */
+ struct http_response *r = calloc(1, sizeof(struct http_response));
r->code = code;
r->msg = msg;
+ r->w = w;
http_response_set_header(r, "Server", "Webdis");
/* Cross-Origin Resource Sharing, CORS. */
http_response_set_header(r, "Allow", "GET,POST,PUT,OPTIONS");
http_response_set_header(r, "Access-Control-Allow-Origin", "*");
+
+ return r;
}
@@ -73,18 +76,73 @@ http_response_set_body(struct http_response *r, const char *body, size_t body_le
r->body_len = body_len;
}
-int
+static void
+http_response_cleanup(struct http_response *r, int fd, int success) {
+
+ int i;
+
+ /* cleanup buffer */
+ free(r->out);
+ if(!r->keep_alive && !success) {
+ /* Close fd is client doesn't support Keep-Alive. */
+ close(fd);
+ }
+
+ /* cleanup response object */
+ for(i = 0; i < r->header_count; ++i) {
+ free(r->headers[i].key);
+ free(r->headers[i].val);
+ }
+ free(r->headers);
+
+ free(r);
+}
+
+static void
+http_can_write(int fd, short event, void *p) {
+
+ int ret;
+ struct http_response *r = p;
+
+ (void)event;
+
+ ret = write(fd, r->out + r->sent, r->out_sz - r->sent);
+
+ if(ret > 0)
+ r->sent += ret;
+
+ if(ret <= 0 || r->out_sz - r->sent == 0) { /* error or done */
+ http_response_cleanup(r, fd, (int)r->out_sz == r->sent ? 1 : 0);
+ } else { /* reschedule write */
+ http_schedule_write(fd, r);
+ }
+}
+
+void
+http_schedule_write(int fd, struct http_response *r) {
+
+ if(r->w) { /* async */
+ event_set(&r->ev, fd, EV_WRITE, http_can_write, r);
+ event_base_set(r->w->base, &r->ev);
+ event_add(&r->ev, NULL);
+ } else { /* blocking */
+ http_can_write(fd, 0, r);
+ }
+
+}
+
+void
http_response_write(struct http_response *r, int fd) {
- char *s = NULL, *p;
- size_t sz = 0;
- int i, ret, keep_alive = 0;
+ char *p;
+ int i, ret;
- sz = sizeof("HTTP/1.x xxx ")-1 + strlen(r->msg) + 2;
- s = calloc(sz + 1, 1);
+ r->keep_alive = 0;
+ r->out_sz = sizeof("HTTP/1.x xxx ")-1 + strlen(r->msg) + 2;
+ r->out = calloc(r->out_sz + 1, 1);
- ret = sprintf(s, "HTTP/1.%d %d %s\r\n", (r->http_version?1:0), r->code, r->msg);
- p = s;
+ ret = sprintf(r->out, "HTTP/1.%d %d %s\r\n", (r->http_version?1:0), r->code, r->msg);
+ p = r->out;
if(r->code == 200 && r->body) {
char content_length[10];
@@ -97,8 +155,8 @@ http_response_write(struct http_response *r, int fd) {
for(i = 0; i < r->header_count; ++i) {
/* "Key: Value\r\n" */
size_t header_sz = r->headers[i].key_sz + 2 + r->headers[i].val_sz + 2;
- s = realloc(s, sz + header_sz);
- p = s + sz;
+ r->out = realloc(r->out, r->out_sz + header_sz);
+ p = r->out + r->out_sz;
/* add key */
memcpy(p, r->headers[i].key, r->headers[i].key_sz);
@@ -116,52 +174,29 @@ http_response_write(struct http_response *r, int fd) {
*(p++) = '\r';
*(p++) = '\n';
- sz += header_sz;
+ r->out_sz += header_sz;
if(strncasecmp("Connection", r->headers[i].key, r->headers[i].key_sz) == 0 &&
strncasecmp("Keep-Alive", r->headers[i].val, r->headers[i].val_sz) == 0) {
- keep_alive = 1;
+ r->keep_alive = 1;
}
}
/* end of headers */
- s = realloc(s, sz + 2);
- memcpy(s + sz, "\r\n", 2);
- sz += 2;
+ r->out = realloc(r->out, r->out_sz + 2);
+ memcpy(r->out + r->out_sz, "\r\n", 2);
+ r->out_sz += 2;
/* append body if there is one. */
if(r->body && r->body_len) {
- s = realloc(s, sz + r->body_len);
- memcpy(s + sz, r->body, r->body_len);
- sz += r->body_len;
+ r->out = realloc(r->out, r->out_sz + r->body_len);
+ memcpy(r->out + r->out_sz, r->body, r->body_len);
+ r->out_sz += r->body_len;
}
/* send buffer to client */
- p = s;
- while(sz) {
- ret = write(fd, p, sz);
- if(ret > 0) { /* block */
- sz -= ret;
- p += ret;
- }
- }
-
-
- /* cleanup buffer */
- free(s);
- if(!keep_alive && (size_t)ret == sz) {
- /* Close fd is client doesn't support Keep-Alive. */
- close(fd);
- }
-
- /* cleanup response object */
- for(i = 0; i < r->header_count; ++i) {
- free(r->headers[i].key);
- free(r->headers[i].val);
- }
- free(r->headers);
-
- return ret == (int)sz ? 0 : 1;
+ r->sent = 0;
+ http_schedule_write(fd, r);
}
static void
@@ -175,20 +210,19 @@ http_response_set_connection_header(struct http_client *c, struct http_response
void
http_crossdomain(struct http_client *c) {
- struct http_response resp;
+ struct http_response *resp = http_response_init(NULL, 200, "OK");
char out[] = "\n"
"\n"
"\n"
"\n"
"\n";
- http_response_init(&resp, 200, "OK");
- resp.http_version = c->http_version;
- http_response_set_connection_header(c, &resp);
- http_response_set_header(&resp, "Content-Type", "application/xml");
- http_response_set_body(&resp, out, sizeof(out)-1);
+ resp->http_version = c->http_version;
+ http_response_set_connection_header(c, resp);
+ http_response_set_header(resp, "Content-Type", "application/xml");
+ http_response_set_body(resp, out, sizeof(out)-1);
- http_response_write(&resp, c->fd);
+ http_response_write(resp, c->fd);
http_client_reset(c);
}
@@ -196,13 +230,12 @@ http_crossdomain(struct http_client *c) {
void
http_send_error(struct http_client *c, short code, const char *msg) {
- struct http_response resp;
- http_response_init(&resp, code, msg);
- resp.http_version = c->http_version;
- http_response_set_connection_header(c, &resp);
- http_response_set_body(&resp, NULL, 0);
+ struct http_response *resp = http_response_init(NULL, code, msg);
+ resp->http_version = c->http_version;
+ http_response_set_connection_header(c, resp);
+ http_response_set_body(resp, NULL, 0);
- http_response_write(&resp, c->fd);
+ http_response_write(resp, c->fd);
http_client_reset(c);
}
@@ -222,15 +255,14 @@ http_response_set_keep_alive(struct http_response *r, int enabled) {
void
http_send_options(struct http_client *c) {
- struct http_response resp;
- http_response_init(&resp, 200, "OK");
- resp.http_version = c->http_version;
- http_response_set_connection_header(c, &resp);
+ struct http_response *resp = http_response_init(NULL, 200, "OK");
+ resp->http_version = c->http_version;
+ http_response_set_connection_header(c, resp);
- http_response_set_header(&resp, "Content-Type", "text/html");
- http_response_set_header(&resp, "Content-Length", "0");
+ http_response_set_header(resp, "Content-Type", "text/html");
+ http_response_set_header(resp, "Content-Length", "0");
- http_response_write(&resp, c->fd);
+ http_response_write(resp, c->fd);
http_client_reset(c);
}
@@ -238,15 +270,27 @@ http_send_options(struct http_client *c) {
* Write HTTP chunk.
*/
void
-http_response_write_chunk(int fd, const char *p, size_t sz) {
+http_response_write_chunk(int fd, struct worker *w, const char *p, size_t sz) {
+
+ char *out, tmp[64];
+ size_t out_sz;
+ int chunk_size;
+ struct http_response *r = http_response_init(w, 0, NULL);
+
+ /* calculate format size */
+ chunk_size = sprintf(tmp, "%x\r\n", (int)sz);
+
+ out_sz = chunk_size + sz + 2;
+ out = malloc(out_sz);
+ memcpy(out, tmp, chunk_size);
+ memcpy(out + chunk_size, p, sz);
+ memcpy(out + chunk_size + sz, "\r\n", 2);
+
- char buf[64];
- int ret, chunk_size;
+ /* send async write */
+ r->out = out;
+ r->out_sz = out_sz;
- chunk_size = sprintf(buf, "%x\r\n", (int)sz);
- ret = write(fd, buf, chunk_size);
- ret = write(fd, p, sz);
- ret = write(fd, "\r\n", 2);
- (void)ret;
+ http_schedule_write(fd, r);
}
diff --git a/http.h b/http.h
index 45014a4..eac1ffe 100644
--- a/http.h
+++ b/http.h
@@ -2,8 +2,10 @@
#define HTTP_H
#include
+#include
struct http_client;
+struct worker;
struct http_header {
char *key;
@@ -15,6 +17,9 @@ struct http_header {
struct http_response {
+
+ struct event ev;
+
short code;
const char *msg;
@@ -24,14 +29,21 @@ struct http_response {
const char *body;
size_t body_len;
+ char *out;
+ size_t out_sz;
+
int chunked;
int http_version;
+ int keep_alive;
+ int sent;
+
+ struct worker *w;
};
/* HTTP response */
-void
-http_response_init(struct http_response *r, int code, const char *msg);
+struct http_response *
+http_response_init(struct worker *w, int code, const char *msg);
void
http_response_set_header(struct http_response *r, const char *k, const char *v);
@@ -39,9 +51,12 @@ http_response_set_header(struct http_response *r, const char *k, const char *v);
void
http_response_set_body(struct http_response *r, const char *body, size_t body_len);
-int
+void
http_response_write(struct http_response *r, int fd);
+void
+http_schedule_write(int fd, struct http_response *r);
+
void
http_crossdomain(struct http_client *c);
@@ -52,7 +67,7 @@ void
http_send_options(struct http_client *c);
void
-http_response_write_chunk(int fd, const char *p, size_t sz);
+http_response_write_chunk(int fd, struct worker *w, const char *p, size_t sz);
void
http_response_set_keep_alive(struct http_response *r, int enabled);