|
|
@ -125,6 +125,8 @@ process_message(struct worker_thread *wt, size_t sz) {
|
|
|
|
/* decrement read count, and stop receiving when we reach zero. */
|
|
|
|
/* decrement read count, and stop receiving when we reach zero. */
|
|
|
|
wt->msg_received++;
|
|
|
|
wt->msg_received++;
|
|
|
|
if(wt->msg_received == wt->msg_target) {
|
|
|
|
if(wt->msg_received == wt->msg_target) {
|
|
|
|
|
|
|
|
wt->debug("%s: thread %d has received all %d messages it expected\n",
|
|
|
|
|
|
|
|
__func__, wt->id, wt->msg_received);
|
|
|
|
event_base_loopexit(wt->base, NULL);
|
|
|
|
event_base_loopexit(wt->base, NULL);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -193,7 +195,6 @@ websocket_can_read(int fd, short event, void *ptr) {
|
|
|
|
evbuffer_debug_dump(wt, wt->rbuffer);
|
|
|
|
evbuffer_debug_dump(wt, wt->rbuffer);
|
|
|
|
if (ret == 0) {
|
|
|
|
if (ret == 0) {
|
|
|
|
wt->debug("We didn't read anything from the socket...\n");
|
|
|
|
wt->debug("We didn't read anything from the socket...\n");
|
|
|
|
wait_for_possible_read(wt);
|
|
|
|
|
|
|
|
return;
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
@ -236,10 +237,12 @@ websocket_can_read(int fd, short event, void *ptr) {
|
|
|
|
evbuffer_drain(wt->rbuffer, (size_t)payload_len); /* remove payload itself */
|
|
|
|
evbuffer_drain(wt->rbuffer, (size_t)payload_len); /* remove payload itself */
|
|
|
|
process_message(wt, payload_len);
|
|
|
|
process_message(wt, payload_len);
|
|
|
|
|
|
|
|
|
|
|
|
if (evbuffer_get_length(wt->rbuffer) == 0) { /* consumed everything, let's write again */
|
|
|
|
if (evbuffer_get_length(wt->rbuffer) == 0) { /* consumed everything */
|
|
|
|
|
|
|
|
if (wt->msg_received < wt->msg_target) { /* let's write again */
|
|
|
|
wt->debug("our turn to write again\n");
|
|
|
|
wt->debug("our turn to write again\n");
|
|
|
|
wt->state = WS_RECEIVED_HANDSHAKE;
|
|
|
|
wt->state = WS_RECEIVED_HANDSHAKE;
|
|
|
|
ws_enqueue_frame(wt);
|
|
|
|
ws_enqueue_frame(wt);
|
|
|
|
|
|
|
|
} /* otherwise, we're done */
|
|
|
|
return;
|
|
|
|
return;
|
|
|
|
} else {
|
|
|
|
} else {
|
|
|
|
wt->debug("there's still data to consume\n");
|
|
|
|
wt->debug("there's still data to consume\n");
|
|
|
@ -382,11 +385,11 @@ usage(const char* argv0, char *host_default, short port_default,
|
|
|
|
|
|
|
|
|
|
|
|
printf("Usage: %s [options]\n"
|
|
|
|
printf("Usage: %s [options]\n"
|
|
|
|
"Options are:\n"
|
|
|
|
"Options are:\n"
|
|
|
|
"\t-h host\t\t(default = \"%s\")\n"
|
|
|
|
"\t[--host|-h] HOST\t(default = \"%s\")\n"
|
|
|
|
"\t-p port\t\t(default = %d)\n"
|
|
|
|
"\t[--port|-p] PORT\t(default = %d)\n"
|
|
|
|
"\t-t threads\t(default = %d)\n"
|
|
|
|
"\t[--clients|-c] THREADS\t(default = %d)\n"
|
|
|
|
"\t-n count\t(number of messages per thread, default = %d)\n"
|
|
|
|
"\t[--messages|-n] COUNT\t(number of messages per thread, default = %d)\n"
|
|
|
|
"\t-v\t\t(verbose)\n",
|
|
|
|
"\t[--verbose|-v]\t\t(extremely verbose output)\n",
|
|
|
|
argv0, host_default, (int)port_default,
|
|
|
|
argv0, host_default, (int)port_default,
|
|
|
|
thread_count_default, messages_default);
|
|
|
|
thread_count_default, messages_default);
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -417,12 +420,12 @@ main(int argc, char *argv[]) {
|
|
|
|
{"help", no_argument, NULL, '?'},
|
|
|
|
{"help", no_argument, NULL, '?'},
|
|
|
|
{"host", required_argument, NULL, 'h'},
|
|
|
|
{"host", required_argument, NULL, 'h'},
|
|
|
|
{"port", required_argument, NULL, 'p'},
|
|
|
|
{"port", required_argument, NULL, 'p'},
|
|
|
|
{"threads", required_argument, NULL, 't'},
|
|
|
|
{"clients", required_argument, NULL, 'c'},
|
|
|
|
{"messages", required_argument, NULL, 'n'},
|
|
|
|
{"messages", required_argument, NULL, 'n'},
|
|
|
|
{"verbose", no_argument, NULL, 'v'},
|
|
|
|
{"verbose", no_argument, NULL, 'v'},
|
|
|
|
{0, 0, 0, 0}
|
|
|
|
{0, 0, 0, 0}
|
|
|
|
};
|
|
|
|
};
|
|
|
|
while ((opt = getopt_long(argc, argv, "h:p:t:n:vs", long_options, NULL)) != -1) {
|
|
|
|
while ((opt = getopt_long(argc, argv, "h:p:c:n:vs", long_options, NULL)) != -1) {
|
|
|
|
switch (opt) {
|
|
|
|
switch (opt) {
|
|
|
|
case 'h':
|
|
|
|
case 'h':
|
|
|
|
colon = strchr(optarg, ':');
|
|
|
|
colon = strchr(optarg, ':');
|
|
|
@ -441,7 +444,7 @@ main(int argc, char *argv[]) {
|
|
|
|
hi.port = (short)atol(optarg);
|
|
|
|
hi.port = (short)atol(optarg);
|
|
|
|
break;
|
|
|
|
break;
|
|
|
|
|
|
|
|
|
|
|
|
case 't':
|
|
|
|
case 'c':
|
|
|
|
thread_count = atoi(optarg);
|
|
|
|
thread_count = atoi(optarg);
|
|
|
|
break;
|
|
|
|
break;
|
|
|
|
|
|
|
|
|
|
|
@ -457,7 +460,7 @@ main(int argc, char *argv[]) {
|
|
|
|
usage(argv[0], host_default, port_default,
|
|
|
|
usage(argv[0], host_default, port_default,
|
|
|
|
thread_count_default,
|
|
|
|
thread_count_default,
|
|
|
|
messages_default);
|
|
|
|
messages_default);
|
|
|
|
exit(EXIT_FAILURE);
|
|
|
|
exit(EXIT_SUCCESS);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
@ -509,7 +512,7 @@ main(int argc, char *argv[]) {
|
|
|
|
(mili1-mili0)/1000.0,
|
|
|
|
(mili1-mili0)/1000.0,
|
|
|
|
1000*((double)total)/(mili1-mili0),
|
|
|
|
1000*((double)total)/(mili1-mili0),
|
|
|
|
kb_per_sec);
|
|
|
|
kb_per_sec);
|
|
|
|
return EXIT_SUCCESS;
|
|
|
|
return (total == thread_count * msg_target ? EXIT_SUCCESS : EXIT_FAILURE);
|
|
|
|
} else {
|
|
|
|
} else {
|
|
|
|
printf("No message was read.\n");
|
|
|
|
printf("No message was read.\n");
|
|
|
|
return EXIT_FAILURE;
|
|
|
|
return EXIT_FAILURE;
|
|
|
|