More progress on evbuffer-based WS client test

master
Jessie Murray 3 years ago
parent c46b85ab7a
commit 0a27fc7e10
No known key found for this signature in database
GPG Key ID: E7E4D57EDDA744C5

@ -98,6 +98,7 @@ http_response_cleanup(struct http_response *r, int fd, int success) {
free(r->out);
if(!r->keep_alive || !success) {
/* Close fd is client doesn't support Keep-Alive. */
fprintf(stderr, "http_response_cleanup: keep_alive=%d, success=%d -> closing\n", r->keep_alive, success);
close(fd);
}
@ -124,6 +125,8 @@ http_can_write(int fd, short event, void *p) {
if(ret > 0)
r->sent += ret;
fprintf(stderr, "http_can_write: ret=%d, r->out_sz=%lu, r->sent=%d\n",
ret, r->out_sz, r->sent);
if(ret <= 0 || r->out_sz - r->sent == 0) { /* error or done */
http_response_cleanup(r, fd, (int)r->out_sz == r->sent ? 1 : 0);
} else { /* reschedule write */

@ -220,6 +220,7 @@ ws_execute(struct http_client *c, const char *frame, size_t frame_len) {
static struct ws_msg *
ws_msg_new() {
fprintf(stderr, "------------ NEW -----------\n");
return calloc(1, sizeof(struct ws_msg));
}
@ -235,6 +236,7 @@ ws_msg_add(struct ws_msg *m, const char *p, size_t psz, const unsigned char *mas
for(i = 0; i < psz && mask; ++i) {
m->payload[m->payload_sz + i] = (unsigned char)p[i] ^ mask[i%4];
}
fprintf(stderr, "CONTENTS=[%.*s] (%lu)\n", (int)psz, m->payload, psz);
/* save new size */
m->payload_sz += psz;
@ -242,7 +244,7 @@ ws_msg_add(struct ws_msg *m, const char *p, size_t psz, const unsigned char *mas
static void
ws_msg_free(struct ws_msg **m) {
fprintf(stderr, "------------ /FREE -----------\n");
free((*m)->payload);
free(*m);
*m = NULL;
@ -262,12 +264,15 @@ ws_parse_data(const char *frame, size_t sz, struct ws_msg **msg) {
}
has_mask = frame[1] & 0x80 ? 1:0;
fprintf(stderr, "has_mask=%d\n", has_mask);
/* get payload length */
len = frame[1] & 0x7f; /* remove leftmost bit */
fprintf(stderr, "len=%llu\n", len);
if(len <= 125) { /* data starts right after the mask */
p = frame + 2 + (has_mask ? 4 : 0);
if(has_mask) memcpy(&mask, frame + 2, sizeof(mask));
if (has_mask) fprintf(stderr, "mask= %02x %02x %02x %02x\n", mask[0], mask[1], mask[2], mask[3]);
} else if(len == 126) {
uint16_t sz16;
memcpy(&sz16, frame + 2, sizeof(uint16_t));
@ -293,8 +298,10 @@ ws_parse_data(const char *frame, size_t sz, struct ws_msg **msg) {
(*msg)->total_sz += len + (p - frame);
if(frame[0] & 0x80) { /* FIN bit set */
fprintf(stderr, "FIN bit: SET\n");
return WS_MSG_COMPLETE;
} else {
fprintf(stderr, "FIN bit: NOT SET\n");
return WS_READING; /* need more data */
}
}
@ -312,6 +319,7 @@ ws_add_data(struct http_client *c) {
while(state == WS_MSG_COMPLETE) {
int ret = ws_execute(c, c->frame->payload, c->frame->payload_sz);
fprintf(stderr, "ws_execute returned %d\n", ret);
/* remove frame from client buffer */
http_client_remove_data(c, c->frame->total_sz);
@ -323,14 +331,16 @@ ws_add_data(struct http_client *c) {
/* can't process frame. */
return WS_ERROR;
}
fprintf(stderr, "Calling ws_parse_data again...\n");
state = ws_parse_data(c->buffer, c->sz, &c->frame);
fprintf(stderr, "ws_parse_data returned %d\n", (int)state);
}
return state;
}
int
ws_reply(struct cmd *cmd, const char *p, size_t sz) {
fprintf(stderr, "ws_reply: '%.*s' (%lu bytes)\n", (int)sz, p, sz);
char *frame = malloc(sz + 8); /* create frame by prepending header */
size_t frame_sz = 0;
struct http_response *r;
@ -365,7 +375,7 @@ ws_reply(struct cmd *cmd, const char *p, size_t sz) {
/* send WS frame */
r = http_response_init(cmd->w, 0, NULL);
if (cmd_is_subscribe(cmd)) {
if (1 || cmd_is_subscribe(cmd)) {
r->keep_alive = 1;
}

@ -35,7 +35,7 @@ worker_new(struct server *s) {
void
worker_can_read(int fd, short event, void *p) {
fprintf(stderr, "worker_can_read\n");
struct http_client *c = p;
int ret, nparsed;
@ -87,9 +87,11 @@ worker_can_read(int fd, short event, void *p) {
}
if(c->broken) { /* terminate client */
fprintf(stderr, "c->broken: http_client_free()\n");
http_client_free(c);
} else {
/* start monitoring input again */
fprintf(stderr, "worker_monitor_input()\n");
worker_monitor_input(c);
}
}

@ -1,11 +1,11 @@
OUT=websocket pubsub
CFLAGS=-O3 -Wall -Wextra
LDFLAGS=-levent -lpthread
CFLAGS=-O0 -g -Wall -Wextra -I../src/http-parser/
LDFLAGS=-g -levent -lpthread
all: $(OUT) Makefile
websocket: websocket.o
$(CC) -o $@ $< $(LDFLAGS)
websocket: websocket.o ../src/http-parser/http_parser.o
$(CC) -o $@ $^ $(LDFLAGS)
pubsub: pubsub.o
$(CC) -o $@ $< $(LDFLAGS)

@ -8,11 +8,15 @@
#include <pthread.h>
#include <arpa/inet.h>
#include <errno.h>
#include <ctype.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <event.h>
#include <http_parser.h>
#define DEBUG_LOGS 0
struct host_info {
char *host;
@ -22,7 +26,8 @@ struct host_info {
enum worker_state {
WS_INITIAL,
WS_SENT_HANDSHAKE,
WS_RECEIVED_RESPONSE,
WS_RECEIVED_HANDSHAKE,
WS_SENT_FRAME,
WS_COMPLETE
};
@ -47,13 +52,49 @@ struct worker_thread {
int fd;
struct event ev_r;
struct event ev_w;
http_parser parser;
http_parser_settings settings;
};
void
hex_dump(char *p, size_t sz) {
#if DEBUG_LOGS
printf("hex dump of %p (%ld bytes)\n", p, sz);
for (char *cur = p; cur < p + sz; cur += 16) {
char letters[16] = {0};
int limit = (cur + 16) > p + sz ? (sz % 16) : 16;
printf("%08lx ", cur - p); /* address */
for (int i = 0; i < limit; i++) {
printf("%02x ", (unsigned int)(cur[i] & 0xff));
letters[i] = isprint(cur[i]) ? cur[i] : '.';
}
for (int i = limit; i < 16; i++) { /* pad on last line */
printf(" ");
}
printf(" %.*s\n", limit, letters);
}
#endif
}
void
evbuffer_debug_dump(struct evbuffer *buffer) {
size_t sz = evbuffer_get_length(buffer);
char *data = malloc(sz);
evbuffer_remove(buffer, data, sz);
hex_dump(data, sz);
evbuffer_prepend(buffer, data, sz);
free(data);
}
static void
wait_for_possible_read(struct worker_thread *wt);
static void
wait_for_possible_write(struct worker_thread *wt);
static void
ws_enqueue_frame(struct worker_thread *wt);
void
process_message(struct worker_thread *wt, size_t sz) {
@ -79,26 +120,51 @@ void
websocket_can_write(int fd, short event, void *ptr) {
int ret;
struct worker_thread *wt = ptr;
#if DEBUG_LOGS
printf("%s (wt=%p, fd=%d)\n", __func__, wt, fd);
#endif
if(event != EV_WRITE) {
return;
}
switch (wt->state)
{
case WS_INITIAL: /* still sending initial HTTP request */
case WS_INITIAL: { /* still sending initial HTTP request */
ret = evbuffer_write(wt->wbuffer, fd);
#if DEBUG_LOGS
printf("evbuffer_write returned %d\n", ret);
printf("evbuffer_get_length returned %d\n", evbuffer_get_length(wt->wbuffer));
#endif
if (evbuffer_get_length(wt->wbuffer) != 0) { /* not all written */
wait_for_possible_write(wt);
return;
}
/* otherwise, we've sent the full request, time to read the response */
wt->state = WS_SENT_HANDSHAKE;
#if DEBUG_LOGS
printf("state=WS_SENT_HANDSHAKE\n");
#endif
wait_for_possible_read(wt);
return;
}
case WS_RECEIVED_HANDSHAKE: { /* ready to send a frame */
#if DEBUG_LOGS
printf("About to send data for WS frame, %lu in buffer\n", evbuffer_get_length(wt->wbuffer));
#endif
evbuffer_write(wt->wbuffer, fd);
size_t write_remains = evbuffer_get_length(wt->wbuffer);
#if DEBUG_LOGS
printf("Sent data for WS frame, still %lu left to write\n", write_remains);
#endif
if (write_remains == 0) { /* ready to read response */
wt->state = WS_SENT_FRAME;
wt->msg_sent++;
wait_for_possible_read(wt);
} else { /* not finished writing */
wait_for_possible_write(wt);
}
return;
}
default:
break;
}
@ -125,7 +191,9 @@ websocket_can_read(int fd, short event, void *ptr) {
int ret, success = 1;
struct worker_thread *wt = ptr;
#if DEBUG_LOGS
printf("%s (wt=%p)\n", __func__, wt);
#endif
if(event != EV_READ) {
return;
@ -133,8 +201,113 @@ websocket_can_read(int fd, short event, void *ptr) {
/* read message */
ret = evbuffer_read(wt->rbuffer, fd, 65536);
printf("evbuffer_read() returned %d\n", ret);
#if DEBUG_LOGS
printf("evbuffer_read() returned %d; wt->state=%d. wt->rbuffer:\n", ret, wt->state);
#endif
evbuffer_debug_dump(wt->rbuffer);
if (ret == 0) {
#if DEBUG_LOGS
printf("We didn't read anything from the socket...\n");
#endif
wait_for_possible_read(wt);
return;
}
while(1) {
switch (wt->state) {
case WS_SENT_HANDSHAKE: { /* waiting for handshake response */
size_t avail_sz = evbuffer_get_length(wt->rbuffer);
char *tmp = calloc(avail_sz, 1);
#if DEBUG_LOGS
printf("avail_sz from rbuffer = %lu\n", avail_sz);
#endif
evbuffer_remove(wt->rbuffer, tmp, avail_sz); /* copy into `tmp` */
#if DEBUG_LOGS
printf("Giving %lu bytes to http-parser\n", avail_sz);
#endif
int nparsed = http_parser_execute(&wt->parser, &wt->settings, tmp, avail_sz);
#if DEBUG_LOGS
printf("http-parser returned %d\n", nparsed);
#endif
if (nparsed != (int)avail_sz) { // put back what we didn't read
#if DEBUG_LOGS
printf("re-attach (prepend) %lu bytes\n", avail_sz - nparsed);
#endif
evbuffer_prepend(wt->rbuffer, tmp + nparsed, avail_sz - nparsed);
}
free(tmp);
if (wt->state == WS_SENT_HANDSHAKE && /* haven't encountered end of response yet */
wt->parser.upgrade && nparsed != (int)avail_sz) {
#if DEBUG_LOGS
printf("UPGRADE *and* we have some data left\n");
#endif
continue;
} else if (wt->state == WS_RECEIVED_HANDSHAKE) { /* we have the full response */
evbuffer_drain(wt->rbuffer, evbuffer_get_length(wt->rbuffer));
}
}
return;
case WS_SENT_FRAME: { /* waiting for frame response */
#if DEBUG_LOGS
printf("We're in WS_SENT_FRAME, just read a frame response. wt->rbuffer:\n");
#endif
evbuffer_debug_dump(wt->rbuffer);
uint8_t flag_opcodes, payload_len;
if (evbuffer_get_length(wt->rbuffer) < 2) { /* not enough data */
wait_for_possible_read(wt);
return;
}
evbuffer_remove(wt->rbuffer, &flag_opcodes, 1); /* remove flags & opcode */
evbuffer_remove(wt->rbuffer, &payload_len, 1); /* remove length */
evbuffer_drain(wt->rbuffer, (size_t)payload_len); /* remove payload itself */
process_message(wt, payload_len);
if (evbuffer_get_length(wt->rbuffer) == 0) { /* consumed everything, let's write again */
#if DEBUG_LOGS
printf("our turn to write again\n");
#endif
wt->state = WS_RECEIVED_HANDSHAKE;
ws_enqueue_frame(wt);
return;
} else {
#if DEBUG_LOGS
printf("there's still data to consume\n");
#endif
continue;
}
#if 0
struct evbuffer_ptr sof = evbuffer_search(wt->rbuffer, "\x00", 1, NULL);
struct evbuffer_ptr eof = evbuffer_search(wt->rbuffer, "\xff", 1, NULL);
if (evbuffer_get_length(wt->rbuffer) >= 1 && sof.pos != 0) {
printf("ERROR: length=%lu, sof at pos %ld\n", evbuffer_get_length(wt->rbuffer), sof.pos);
}
if (eof.pos == -1) { /* not there yet */
printf("Couldn't find the end-of-frame marker, need to read more\n");
wait_for_possible_read(wt);
} else { /* we have a frame */
size_t bounded_frame_sz = eof.pos + 1;
char *bounded_frame = calloc(bounded_frame_sz, 1);
evbuffer_remove(wt->rbuffer, bounded_frame, bounded_frame_sz);
process_message(wt, bounded_frame_sz);
printf("Received frame (%lu bytes total):\n", bounded_frame_sz);
hex_dump(bounded_frame, bounded_frame_sz);
free(bounded_frame);
if (evbuffer_get_length(wt->rbuffer) > 0) { /* we may have more frames to process */
continue;
} else { /* our turn to send a frame */
printf("Add frame to write buffer\n");
ws_enqueue_frame(wt);
}
}
#endif
}
return;
default:
return;
}
}
#if 0
pos = packet;
@ -198,7 +371,9 @@ websocket_can_read(int fd, short event, void *ptr) {
static void
wait_for_possible_read(struct worker_thread *wt) {
#if DEBUG_LOGS
printf("%s (wt=%p)\n", __func__, wt);
#endif
event_set(&wt->ev_r, wt->fd, EV_READ, websocket_can_read, wt);
event_base_set(wt->base, &wt->ev_r);
event_add(&wt->ev_r, NULL);
@ -206,12 +381,72 @@ wait_for_possible_read(struct worker_thread *wt) {
static void
wait_for_possible_write(struct worker_thread *wt) {
#if DEBUG_LOGS
printf("%s (wt=%p)\n", __func__, wt);
#endif
event_set(&wt->ev_r, wt->fd, EV_WRITE, websocket_can_write, wt);
event_base_set(wt->base, &wt->ev_r);
event_add(&wt->ev_r, NULL);
}
static int
ws_on_headers_complete(http_parser *p) {
struct worker_thread *wt = p->data;
#if DEBUG_LOGS
printf("%s (wt=%p)\n", __func__, wt);
#endif
// TODO
return 0;
}
static void
ws_enqueue_frame_for_command(struct worker_thread *wt, char *cmd, size_t sz) {
unsigned char mask[4];
for (int i = 0; i < 4; i++) {
mask[i] = rand() & 0xff;
}
uint8_t len = (uint8_t)(sz); /* (1 << 7) | length. */
len |= (1 << 7); /* set masking bit ON */
for (int i = 0; i < sz; i++) {
cmd[i] = (cmd[i] ^ mask[i%4]) & 0xff;
}
/* 0x81 = 10000001b: FIN bit (only one message in the frame), text frame */
evbuffer_add(wt->wbuffer, "\x81", 1);
evbuffer_add(wt->wbuffer, &len, 1);
evbuffer_add(wt->wbuffer, mask, 4);
evbuffer_add(wt->wbuffer, cmd, sz);
}
static void
ws_enqueue_frame(struct worker_thread *wt) {
#if 0
char set_command[] = "[\"SET\",\"key\",\"value\"]";
ws_enqueue_frame_for_command(wt, set_command, sizeof(set_command) - 1);
char get_command[] = "[\"GET\",\"key\"]";
ws_enqueue_frame_for_command(wt, get_command, sizeof(get_command) - 1);
#endif
char ping_command[] = "[\"PING\"]";
ws_enqueue_frame_for_command(wt, ping_command, sizeof(ping_command) - 1);
wait_for_possible_write(wt);
}
static int
ws_on_message_complete(http_parser *p) {
struct worker_thread *wt = p->data;
#if DEBUG_LOGS
printf("%s (wt=%p)\n", __func__, wt);
#endif
// we've received the full HTTP response now, so we're ready to send frames
wt->state = WS_RECEIVED_HANDSHAKE;
ws_enqueue_frame(wt); /* add frame to buffer and register interest in writing */
return 0;
}
void*
worker_main(void *ptr) {
@ -253,6 +488,13 @@ worker_main(void *ptr) {
wt->byte_count = 0;
wt->got_header = 0;
/* initialize HTTP parser, to parse the server response */
memset(&wt->settings, 0, sizeof(http_parser_settings));
wt->settings.on_headers_complete = ws_on_headers_complete;
wt->settings.on_message_complete = ws_on_message_complete;
http_parser_init(&wt->parser, HTTP_RESPONSE);
wt->parser.data = wt;
/* build handshake buffer */
/*
ws_handshake_sz = sizeof(ws_handshake)
@ -270,7 +512,7 @@ worker_main(void *ptr) {
event_base_dispatch(wt->base);
printf("event_base_dispatch returned\n");
event_base_free(wt->base);
free(ws_handshake);
// free(ws_handshake);
return NULL;
}

Loading…
Cancel
Save