@ -6,6 +6,7 @@
# include "worker.h"
# include "worker.h"
# include "pool.h"
# include "pool.h"
# include "http.h"
# include "http.h"
# include "slog.h"
/* message parsers */
/* message parsers */
# include "formats/json.h"
# include "formats/json.h"
@ -219,13 +220,13 @@ ws_execute(struct http_client *c, const char *frame, size_t frame_len) {
}
}
static struct ws_msg *
static struct ws_msg *
ws_msg_new ( ) {
ws_msg_new ( struct server * s ) {
fprintf( stderr , " ------------ NEW ----------- \n " ) ;
slog( s , WEBDIS_TRACE , " ws_msg_new " , 0 ) ;
return calloc ( 1 , sizeof ( struct ws_msg ) ) ;
return calloc ( 1 , sizeof ( struct ws_msg ) ) ;
}
}
static void
static void
ws_msg_add ( struct ws_msg * m , const char * p , size_t psz , const unsigned char * mask ) {
ws_msg_add ( struct server * s , struct ws_msg * m , const char * p , size_t psz , const unsigned char * mask ) {
/* add data to frame */
/* add data to frame */
size_t i ;
size_t i ;
@ -236,22 +237,35 @@ ws_msg_add(struct ws_msg *m, const char *p, size_t psz, const unsigned char *mas
for ( i = 0 ; i < psz & & mask ; + + i ) {
for ( i = 0 ; i < psz & & mask ; + + i ) {
m - > payload [ m - > payload_sz + i ] = ( unsigned char ) p [ i ] ^ mask [ i % 4 ] ;
m - > payload [ m - > payload_sz + i ] = ( unsigned char ) p [ i ] ^ mask [ i % 4 ] ;
}
}
fprintf ( stderr , " CONTENTS=[%.*s] (%lu) \n " , ( int ) psz , m - > payload , psz ) ;
if ( slog_enabled ( s , WEBDIS_TRACE ) ) {
char format [ ] = " ws_msg_add: %lu bytes, mask_enabled=%c " ;
char mask_enabled = mask ? ' Y ' : ' N ' ;
size_t contents_sz = snprintf ( NULL , 0 , format , psz , mask_enabled ) ;
char * contents_msg = calloc ( contents_sz + 1 , 1 ) ;
if ( contents_msg ) {
snprintf ( contents_msg , contents_sz + 1 , format , psz , mask_enabled ) ;
slog ( s , WEBDIS_TRACE , contents_msg , contents_sz ) ;
free ( contents_msg ) ;
} else {
slog ( s , WEBDIS_ERROR , " Failed allocation in ws_msg_add " , 0 ) ;
}
}
/* save new size */
/* save new size */
m - > payload_sz + = psz ;
m - > payload_sz + = psz ;
}
}
static void
static void
ws_msg_free ( struct ws_msg * * m ) {
ws_msg_free ( struct server * s , struct ws_msg * * m ) {
fprintf ( stderr , " ------------ /FREE ----------- \n " ) ;
slog( s , WEBDIS_TRACE , " ws_msg_free " , 0 ) ;
free ( ( * m ) - > payload ) ;
free ( ( * m ) - > payload ) ;
free ( * m ) ;
free ( * m ) ;
* m = NULL ;
* m = NULL ;
}
}
static enum ws_state
static enum ws_state
ws_parse_data ( const char * frame , size_t sz , struct ws_msg * * msg ) {
ws_parse_data ( struct server * s , const char * frame , size_t sz , struct ws_msg * * msg ) {
int has_mask ;
int has_mask ;
uint64_t len ;
uint64_t len ;
@ -264,15 +278,30 @@ ws_parse_data(const char *frame, size_t sz, struct ws_msg **msg) {
}
}
has_mask = frame [ 1 ] & 0x80 ? 1 : 0 ;
has_mask = frame [ 1 ] & 0x80 ? 1 : 0 ;
fprintf ( stderr , " has_mask=%d \n " , has_mask ) ;
if ( slog_enabled ( s , WEBDIS_TRACE ) ) {
char log_mask [ ] = " ws_parse_data: has_mask=? " ;
log_mask [ sizeof ( log_mask ) - 2 ] = has_mask ? ' Y ' : ' N ' ; /* -1 for \0 and -1 again for last char */
slog ( s , WEBDIS_TRACE , log_mask , sizeof ( log_mask ) - 1 ) ;
}
/* get payload length */
/* get payload length */
len = frame [ 1 ] & 0x7f ; /* remove leftmost bit */
len = frame [ 1 ] & 0x7f ; /* remove leftmost bit */
fprintf ( stderr , " len=%llu \n " , len ) ;
if ( slog_enabled ( s , WEBDIS_TRACE ) ) { /* log length */
char format [ ] = " ws_parse_data: payload length = %llu bytes " ;
size_t contents_sz = snprintf ( NULL , 0 , format , len ) ;
char * contents_msg = calloc ( contents_sz + 1 , 1 ) ;
if ( contents_msg ) {
snprintf ( contents_msg , contents_sz + 1 , format , len ) ;
slog ( s , WEBDIS_TRACE , contents_msg , contents_sz ) ;
free ( contents_msg ) ;
} else {
slog ( s , WEBDIS_ERROR , " Failed allocation in ws_parse_data " , 0 ) ;
}
}
if ( len < = 125 ) { /* data starts right after the mask */
if ( len < = 125 ) { /* data starts right after the mask */
p = frame + 2 + ( has_mask ? 4 : 0 ) ;
p = frame + 2 + ( has_mask ? 4 : 0 ) ;
if ( has_mask ) memcpy ( & mask , frame + 2 , sizeof ( mask ) ) ;
if ( has_mask ) memcpy ( & mask , frame + 2 , sizeof ( mask ) ) ;
if ( has_mask ) fprintf ( stderr , " mask= %02x %02x %02x %02x \n " , mask [ 0 ] , mask [ 1 ] , mask [ 2 ] , mask [ 3 ] ) ;
} else if ( len = = 126 ) {
} else if ( len = = 126 ) {
uint16_t sz16 ;
uint16_t sz16 ;
memcpy ( & sz16 , frame + 2 , sizeof ( uint16_t ) ) ;
memcpy ( & sz16 , frame + 2 , sizeof ( uint16_t ) ) ;
@ -293,15 +322,15 @@ ws_parse_data(const char *frame, size_t sz, struct ws_msg **msg) {
}
}
if ( ! * msg )
if ( ! * msg )
* msg = ws_msg_new ( ) ;
* msg = ws_msg_new ( s ) ;
ws_msg_add ( * msg , p , len , has_mask ? mask : NULL ) ;
ws_msg_add ( s , * msg , p , len , has_mask ? mask : NULL ) ;
( * msg ) - > total_sz + = len + ( p - frame ) ;
( * msg ) - > total_sz + = len + ( p - frame ) ;
if ( frame [ 0 ] & 0x80 ) { /* FIN bit set */
if ( frame [ 0 ] & 0x80 ) { /* FIN bit set */
fprintf( stderr , " FIN bit: SET \n " ) ;
slog( s , WEBDIS_TRACE , " ws_parse_data: FIN bit set " , 0 ) ;
return WS_MSG_COMPLETE ;
return WS_MSG_COMPLETE ;
} else {
} else {
fprintf( stderr , " FIN bit: NOT SET \n " ) ;
slog( s , WEBDIS_TRACE , " ws_parse_data: FIN bit not set " , 0 ) ;
return WS_READING ; /* need more data */
return WS_READING ; /* need more data */
}
}
}
}
@ -315,32 +344,43 @@ ws_add_data(struct http_client *c) {
enum ws_state state ;
enum ws_state state ;
state = ws_parse_data ( c - > buffer, c - > sz , & c - > frame ) ;
state = ws_parse_data ( c - > s, c - > buffer, c - > sz , & c - > frame ) ;
while ( state = = WS_MSG_COMPLETE ) {
while ( state = = WS_MSG_COMPLETE ) {
int ret = ws_execute ( c , c - > frame - > payload , c - > frame - > payload_sz ) ;
int ret = ws_execute ( c , c - > frame - > payload , c - > frame - > payload_sz ) ;
fprintf ( stderr , " ws_execute returned %d \n " , ret ) ;
if ( slog_enabled ( c - > s , WEBDIS_TRACE ) ) {
char format [ ] = " ws_add_data: ws_execute(payload_sz=%lu) returned %d " ;
size_t contents_sz = snprintf ( NULL , 0 , format , c - > frame - > payload_sz , ret ) ;
char * contents_msg = calloc ( contents_sz + 1 , 1 ) ;
if ( contents_msg ) {
snprintf ( contents_msg , contents_sz + 1 , format , c - > frame - > payload_sz , ret ) ;
slog ( c - > s , WEBDIS_TRACE , contents_msg , contents_sz ) ;
free ( contents_msg ) ;
} else {
slog ( c - > s , WEBDIS_ERROR , " Failed allocation in ws_add_data " , 0 ) ;
}
}
/* remove frame from client buffer */
/* remove frame from client buffer */
http_client_remove_data ( c , c - > frame - > total_sz ) ;
http_client_remove_data ( c , c - > frame - > total_sz ) ;
/* free frame and set back to NULL */
/* free frame and set back to NULL */
ws_msg_free ( & c - > frame ) ;
ws_msg_free ( c - > s , & c - > frame ) ;
if ( ret ! = 0 ) {
if ( ret ! = 0 ) {
/* can't process frame. */
/* can't process frame. */
slog ( c - > s , WEBDIS_WARNING , " ws_add_data: ws_execute failed " , 0 ) ;
return WS_ERROR ;
return WS_ERROR ;
}
}
fprintf ( stderr , " Calling ws_parse_data again... \n " ) ;
slog ( c - > s , WEBDIS_TRACE , " ws_add_data: calling ws_parse_data again " , 0 ) ;
state = ws_parse_data ( c - > buffer , c - > sz , & c - > frame ) ;
state = ws_parse_data ( c - > s , c - > buffer , c - > sz , & c - > frame ) ;
fprintf ( stderr , " ws_parse_data returned %d \n " , ( int ) state ) ;
}
}
return state ;
return state ;
}
}
int
int
ws_reply ( struct cmd * cmd , const char * p , size_t sz ) {
ws_reply ( struct cmd * cmd , const char * p , size_t sz ) {
fprintf ( stderr , " ws_reply: '%.*s' (%lu bytes) \n " , ( int ) sz , p , sz ) ;
char * frame = malloc ( sz + 8 ) ; /* create frame by prepending header */
char * frame = malloc ( sz + 8 ) ; /* create frame by prepending header */
size_t frame_sz = 0 ;
size_t frame_sz = 0 ;
struct http_response * r ;
struct http_response * r ;
@ -375,7 +415,7 @@ ws_reply(struct cmd *cmd, const char *p, size_t sz) {
/* send WS frame */
/* send WS frame */
r = http_response_init ( cmd - > w , 0 , NULL ) ;
r = http_response_init ( cmd - > w , 0 , NULL ) ;
if ( 1 | | cmd_is_subscribe ( cmd ) ) {
if ( cmd_is_subscribe ( cmd ) ) {
r - > keep_alive = 1 ;
r - > keep_alive = 1 ;
}
}
@ -385,6 +425,20 @@ ws_reply(struct cmd *cmd, const char *p, size_t sz) {
r - > out = frame ;
r - > out = frame ;
r - > out_sz = frame_sz ;
r - > out_sz = frame_sz ;
r - > sent = 0 ;
r - > sent = 0 ;
if ( slog_enabled ( cmd - > w - > s , WEBDIS_TRACE ) ) {
char format [ ] = " ws_reply: response is %lu bytes, frame is %lu " ;
size_t contents_sz = snprintf ( NULL , 0 , format , sz , frame_sz ) ;
char * contents_msg = calloc ( contents_sz + 1 , 1 ) ;
if ( contents_msg ) {
snprintf ( contents_msg , contents_sz + 1 , format , sz , frame_sz ) ;
slog ( cmd - > w - > s , WEBDIS_TRACE , contents_msg , contents_sz ) ;
free ( contents_msg ) ;
} else {
slog ( cmd - > w - > s , WEBDIS_ERROR , " Failed allocation in ws_reply " , 0 ) ;
}
}
http_schedule_write ( cmd - > fd , r ) ;
http_schedule_write ( cmd - > fd , r ) ;
return 0 ;
return 0 ;