Last active
October 29, 2015 09:31
-
-
Save hintjens/501abc0f8eddc0428ce4 to your computer and use it in GitHub Desktop.
Test case for issue 1608
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Test case for #1608 | |
#include <czmq.h> | |
#define BATCH_SIZE 5000 | |
static void | |
sink_actor (zsock_t *pipe, void *args) | |
{ | |
int latency [BATCH_SIZE]; | |
int count = 0; | |
zsock_signal (pipe, 0); | |
bool terminated = false; | |
while (!terminated && !zsys_interrupted) { | |
char *command; | |
int64_t sent_usecs; | |
zsock_recv (pipe, "s8", &command, &sent_usecs); | |
if (streq (command, "$TERM")) | |
terminated = true; | |
else | |
if (streq (command, "DATA")) { | |
assert (count < BATCH_SIZE); | |
latency [count++] = (int) (zclock_usecs () - sent_usecs); | |
} | |
else | |
if (streq (command, "REPORT")) { | |
assert (count == BATCH_SIZE); | |
int index; | |
for (index = 0; index < BATCH_SIZE; index++) | |
zstr_sendf (pipe, "%d", latency [index]); | |
} | |
free (command); | |
} | |
} | |
static void | |
s_collect_latencies (char *label, zactor_t *actor, FILE *stream) | |
{ | |
fprintf (stream, "%s", label); | |
zstr_send (actor, "REPORT"); | |
int index; | |
for (index = 0; index < BATCH_SIZE; index++) { | |
char *latency = zstr_recv (actor); | |
fprintf (stream, ", %s", latency); | |
free (latency); | |
} | |
fprintf (stream, "\n"); | |
} | |
int main (void) | |
{ | |
// Remove HWM on pipes so we don't block on sending | |
zsys_set_pipehwm (0); | |
FILE *results = fopen ("results.csv", "w"); | |
assert (results); | |
printf ("Test 1: send batch immediately...\n"); | |
zactor_t *sink = zactor_new (sink_actor, NULL); | |
assert (sink); | |
int count; | |
for (count = 0; count < BATCH_SIZE; count++) | |
zsock_send (sink, "s8", "DATA", zclock_usecs ()); | |
s_collect_latencies ("send", sink, results); | |
zactor_destroy (&sink); | |
printf ("Test 2: sleep 1 second, then send messages...\n"); | |
sink = zactor_new (sink_actor, NULL); | |
assert (sink); | |
sleep (1); | |
for (count = 0; count < BATCH_SIZE; count++) | |
zsock_send (sink, "s8", "DATA", zclock_usecs ()); | |
s_collect_latencies ("sleep-send", sink, results); | |
zactor_destroy (&sink); | |
printf ("Test 3: poll 1 second, then send messages...\n"); | |
sink = zactor_new (sink_actor, NULL); | |
assert (sink); | |
zpoller_t *poller = zpoller_new (NULL); | |
zpoller_wait (poller, 1000); | |
for (count = 0; count < BATCH_SIZE; count++) | |
zsock_send (sink, "s8", "DATA", zclock_usecs ()); | |
s_collect_latencies ("poll-send", sink, results); | |
zpoller_destroy (&poller); | |
zactor_destroy (&sink); | |
printf ("Test 4: send batch with sleep delay...\n"); | |
sink = zactor_new (sink_actor, NULL); | |
assert (sink); | |
for (count = 0; count < BATCH_SIZE; count++) { | |
zsock_send (sink, "s8", "DATA", zclock_usecs ()); | |
zclock_sleep (1); // One msec | |
} | |
s_collect_latencies ("sendsleep", sink, results); | |
zactor_destroy (&sink); | |
printf ("Test 5: send batch with poll delay...\n"); | |
sink = zactor_new (sink_actor, NULL); | |
assert (sink); | |
poller = zpoller_new (NULL); | |
for (count = 0; count < BATCH_SIZE; count++) { | |
zsock_send (sink, "s8", "DATA", zclock_usecs ()); | |
zpoller_wait (poller, 1); | |
} | |
s_collect_latencies ("sendpoll", sink, results); | |
zpoller_destroy (&poller); | |
zactor_destroy (&sink); | |
printf ("Test 6: sleep, then send batch with sleep delay...\n"); | |
sink = zactor_new (sink_actor, NULL); | |
assert (sink); | |
sleep (1); | |
for (count = 0; count < BATCH_SIZE; count++) { | |
zsock_send (sink, "s8", "DATA", zclock_usecs ()); | |
zclock_sleep (1); // One msec | |
} | |
s_collect_latencies ("sleep-sendsleep", sink, results); | |
zactor_destroy (&sink); | |
printf ("Test 7: delay, then send batch with poll delay...\n"); | |
sink = zactor_new (sink_actor, NULL); | |
assert (sink); | |
poller = zpoller_new (NULL); | |
zpoller_wait (poller, BATCH_SIZE); | |
for (count = 0; count < BATCH_SIZE; count++) { | |
zsock_send (sink, "s8", "DATA", zclock_usecs ()); | |
zpoller_wait (poller, 1); | |
} | |
s_collect_latencies ("poll-sendpoll", sink, results); | |
zpoller_destroy (&poller); | |
zactor_destroy (&sink); | |
fclose (results); | |
return 0; | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Test case for #1608 | |
// This sends with varying pauses between messages | |
#include <czmq.h> | |
#define BATCH_SIZE 1000 | |
static void | |
sink_actor (zsock_t *pipe, void *args) | |
{ | |
int latency [BATCH_SIZE]; | |
int count = 0; | |
zsock_signal (pipe, 0); | |
bool terminated = false; | |
while (!terminated && !zsys_interrupted) { | |
char *command; | |
int64_t sent_usecs; | |
zsock_recv (pipe, "s8", &command, &sent_usecs); | |
if (streq (command, "$TERM")) | |
terminated = true; | |
else | |
if (streq (command, "DATA")) { | |
assert (count < BATCH_SIZE); | |
latency [count++] = (int) (zclock_usecs () - sent_usecs); | |
} | |
else | |
if (streq (command, "REPORT")) { | |
assert (count == BATCH_SIZE); | |
int index; | |
for (index = 0; index < BATCH_SIZE; index++) | |
zstr_sendf (pipe, "%d", latency [index]); | |
} | |
free (command); | |
} | |
} | |
static void | |
s_collect_latencies (char *label, zactor_t *actor, FILE *stream) | |
{ | |
fprintf (stream, "%s", label); | |
zstr_send (actor, "REPORT"); | |
int index; | |
for (index = 0; index < BATCH_SIZE; index++) { | |
char *latency = zstr_recv (actor); | |
fprintf (stream, ", %s", latency); | |
free (latency); | |
} | |
fprintf (stream, "\n"); | |
} | |
int main (void) | |
{ | |
// Remove HWM on pipes so we don't block on sending | |
zsys_set_pipehwm (0); | |
FILE *results = fopen ("results.csv", "w"); | |
assert (results); | |
zactor_t *sink; | |
int count; | |
printf ("Send batch with 1 msec delay...\n"); | |
sink = zactor_new (sink_actor, NULL); | |
assert (sink); | |
for (count = 0; count < BATCH_SIZE; count++) { | |
zsock_send (sink, "s8", "DATA", zclock_usecs ()); | |
zclock_sleep (1); | |
} | |
s_collect_latencies ("delay1", sink, results); | |
zactor_destroy (&sink); | |
printf ("Send batch with 8 msec delay...\n"); | |
sink = zactor_new (sink_actor, NULL); | |
assert (sink); | |
for (count = 0; count < BATCH_SIZE; count++) { | |
zsock_send (sink, "s8", "DATA", zclock_usecs ()); | |
zclock_sleep (8); | |
} | |
s_collect_latencies ("delay8", sink, results); | |
zactor_destroy (&sink); | |
printf ("Send batch with 32 msec delay...\n"); | |
sink = zactor_new (sink_actor, NULL); | |
assert (sink); | |
for (count = 0; count < BATCH_SIZE; count++) { | |
zsock_send (sink, "s8", "DATA", zclock_usecs ()); | |
zclock_sleep (64); | |
} | |
s_collect_latencies ("delay32", sink, results); | |
zactor_destroy (&sink); | |
printf ("Send batch with 64 msec delay...\n"); | |
sink = zactor_new (sink_actor, NULL); | |
assert (sink); | |
for (count = 0; count < BATCH_SIZE; count++) { | |
zsock_send (sink, "s8", "DATA", zclock_usecs ()); | |
zclock_sleep (64); | |
} | |
s_collect_latencies ("delay64", sink, results); | |
zactor_destroy (&sink); | |
fclose (results); | |
return 0; | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Test case for #1608 | |
// This sends with varying message sizes | |
#include <czmq.h> | |
#define BATCH_SIZE 1000 | |
#define SEND_DELAY 10 // msec | |
static void | |
sink_actor (zsock_t *pipe, void *args) | |
{ | |
int latency [BATCH_SIZE]; | |
int count = 0; | |
zsock_signal (pipe, 0); | |
bool terminated = false; | |
while (!terminated && !zsys_interrupted) { | |
char *command; | |
int64_t sent_usecs; | |
zsock_recv (pipe, "s8", &command, &sent_usecs); | |
if (streq (command, "$TERM")) | |
terminated = true; | |
else | |
if (streq (command, "DATA")) { | |
assert (count < BATCH_SIZE); | |
latency [count++] = (int) (zclock_usecs () - sent_usecs); | |
} | |
else | |
if (streq (command, "REPORT")) { | |
assert (count == BATCH_SIZE); | |
int index; | |
for (index = 0; index < BATCH_SIZE; index++) | |
zstr_sendf (pipe, "%d", latency [index]); | |
} | |
free (command); | |
} | |
} | |
static void | |
run_testcase (char *label, int size, FILE *stream) | |
{ | |
// Remove HWM on pipes so we don't block on sending | |
zsys_set_pipehwm (0); | |
printf ("Test payload size=%d: ", size); | |
zactor_t *sink = zactor_new (sink_actor, NULL); | |
char *payload = malloc (size); | |
assert (sink); | |
int count; | |
for (count = 0; count < BATCH_SIZE; count++) { | |
zsock_send (sink, "s8s", "DATA", zclock_usecs (), payload); | |
zclock_sleep (SEND_DELAY); | |
if (count % 100 == 0) { | |
printf ("."); | |
fflush (stdout); | |
} | |
} | |
printf ("\n"); | |
fprintf (stream, "%s", label); | |
zstr_send (sink, "REPORT"); | |
int index; | |
for (index = 0; index < BATCH_SIZE; index++) { | |
char *latency = zstr_recv (sink); | |
fprintf (stream, ", %s", latency); | |
free (latency); | |
} | |
fprintf (stream, "\n"); | |
free (payload); | |
zactor_destroy (&sink); | |
} | |
int main (void) | |
{ | |
FILE *results = fopen ("results.csv", "w"); | |
assert (results); | |
run_testcase ("data0", 0, results); | |
run_testcase ("data10", 10, results); | |
run_testcase ("data100", 100, results); | |
run_testcase ("data1K", 1000, results); | |
run_testcase ("data10K", 10000, results); | |
fclose (results); | |
return 0; | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Test case for #1608 | |
// This uses CLIENT and SERVER sockets | |
#include <czmq.h> | |
#define BATCH_SIZE 1000 | |
#define SEND_DELAY 10 // msec | |
static void | |
sink_actor (zsock_t *pipe, void *args) | |
{ | |
int latency [BATCH_SIZE]; | |
zsock_t *server = zsock_new_server ((char *) args); | |
zsock_signal (pipe, 0); | |
bool terminated = false; | |
while (!terminated && !zsys_interrupted) { | |
char *command = zstr_recv (pipe); | |
if (streq (command, "$TERM")) | |
terminated = true; | |
else | |
if (streq (command, "START")) { | |
int count; | |
for (count = 0; count < BATCH_SIZE; count++) { | |
int64_t sent_usecs; | |
zsock_recv (server, "8", &sent_usecs); | |
latency [count++] = (int) (zclock_usecs () - sent_usecs); | |
} | |
} | |
else | |
if (streq (command, "REPORT")) { | |
FILE *results = fopen ("results.csv", "a"); | |
char *label = zstr_recv (pipe); | |
fprintf (results, "%s", label); | |
free (label); | |
int count; | |
for (count = 0; count < BATCH_SIZE; count++) | |
fprintf (results, ",%d", latency [count]); | |
fprintf (results, "\n"); | |
fclose (results); | |
} | |
free (command); | |
} | |
zsock_destroy (&server); | |
} | |
static void | |
run_testcase (char *label, char *endpoint) | |
{ | |
zactor_t *sink = zactor_new (sink_actor, endpoint); | |
assert (sink); | |
zsock_t *client = zsock_new_client (endpoint); | |
assert (client); | |
zstr_send (sink, "START"); | |
int count; | |
for (count = 0; count < BATCH_SIZE; count++) { | |
zsock_send (client, "8", zclock_usecs ()); | |
zclock_sleep (SEND_DELAY); | |
if (count % 100 == 0) { | |
printf ("."); | |
fflush (stdout); | |
} | |
} | |
printf ("\n"); | |
zstr_send (sink, "REPORT"); | |
zstr_send (sink, label); | |
zactor_destroy (&sink); | |
zsock_destroy (&client); | |
} | |
int main (void) | |
{ | |
// Don't block on sending test data | |
zsys_set_sndhwm (0); | |
zsys_set_rcvhwm (0); | |
remove ("results.csv"); | |
run_testcase ("TCP", "tcp://127.0.0.1:10001"); | |
run_testcase ("IPC", "ipc://@/testcase4"); | |
run_testcase ("inproc", "inproc://testcase4"); | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment