Created
December 30, 2010 19:28
-
-
Save thomasluce/760174 to your computer and use it in GitHub Desktop.
gcc -g3 -o queue -I/opt/local/include -lzmq queue.c
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <stdio.h> | |
#include <stdlib.h> | |
#include <errno.h> | |
#include <string.h> | |
#include <unistd.h> | |
#include <zmq.h> | |
#include <assert.h> | |
char in_connect[1000] = ""; | |
char out_connect[1000] = ""; | |
void *context; | |
void *in_socket; | |
void *out_socket; | |
void push_on_queue(char *message, void *queue) { | |
if(strcmp(out_connect, "") != 0) { | |
zmq_msg_t out_msg; | |
if(zmq_msg_init_size(&out_msg, strlen(message)) == -1) { | |
fprintf(stderr, "Error creating output queue mesage, %d\n", errno); | |
exit(errno); | |
} | |
memcpy(zmq_msg_data(&out_msg), message, strlen(message)); | |
if(zmq_send(queue, &out_msg, 0) == -1) { | |
fprintf(stderr, "Error sending: %d\n", errno); | |
exit(errno); | |
} | |
if(zmq_msg_close(&out_msg) == -1) { | |
fprintf(stderr, "Error closing message: %d\n", errno); | |
exit(errno); | |
} | |
} else { | |
printf("%s", message); | |
fflush(stdout); | |
} | |
} | |
void usage() { | |
printf("Usage: ./queue [-i INPUT] [-o OUTPUT]\nWhere INPUT and OUTPUT are formatted as connection strings (urls)\n"); | |
exit(0); | |
} | |
char *get_line(char line[], size_t s) { | |
if(strcmp(in_connect, "") == 0) { | |
return fgets(line, s, stdin); | |
} else { | |
zmq_msg_t in_msg; | |
if(zmq_msg_init(&in_msg) == -1) { | |
fprintf(stderr, "Error creating input queue mesage, %d\n", errno); | |
exit(errno); | |
} | |
if(zmq_recv(in_socket, &in_msg, 0) == 0) { | |
int size = zmq_msg_size (&in_msg); | |
memcpy (line, zmq_msg_data (&in_msg), size); | |
if(zmq_msg_close(&in_msg) == -1) { | |
fprintf(stderr, "Error closing, %d\n", errno); | |
exit(errno); | |
} | |
line[size] = 0; | |
return (line); | |
} else { | |
if(zmq_msg_close(&in_msg) == -1) { | |
fprintf(stderr, "Error closing, %d\n", errno); | |
exit(errno); | |
} | |
return NULL; | |
} | |
} | |
} | |
int main(int argc, char *argv[]) { | |
int c; | |
while((c = getopt(argc, argv, "i:o:h")) != -1) { | |
switch(c) { | |
case 'i': | |
strcpy(in_connect, optarg); | |
break; | |
case 'o': | |
strcpy(out_connect, optarg); | |
break; | |
case 'h': | |
usage(); | |
break; | |
default: | |
printf("Bad argument: %c\n", c); | |
exit(1); | |
break; | |
} | |
} | |
context = zmq_init(1); | |
if(!context) { | |
fprintf(stderr, "Error creating queue context, %d\n", errno); | |
return errno; | |
} | |
in_socket = zmq_socket(context, ZMQ_PULL); | |
out_socket = zmq_socket(context, ZMQ_PUSH); | |
//Set up input | |
if(strcmp(in_connect, "") != 0) { | |
if(!in_socket) { | |
fprintf(stderr, "Error creating input queue socket, %d\n", errno); | |
return errno; | |
} | |
if(zmq_connect(in_socket, in_connect) == -1) { | |
fprintf(stderr, "Error connecting to input queue, %d\n", errno); | |
return errno; | |
} | |
} | |
// Set up output | |
if(strcmp(out_connect, "") != 0) { | |
if(!out_socket) { | |
fprintf(stderr, "Error creating output queue socket, %d\n", errno); | |
return errno; | |
} | |
if(zmq_bind(out_socket, out_connect) == -1) { | |
fprintf(stderr, "Error binding to output queue, %d, %s\n", errno, out_connect); | |
return errno; | |
} | |
} | |
char item[1000]; | |
while(get_line(item, 1000)) { | |
push_on_queue(item, out_socket); | |
} | |
if(strcmp(out_connect, "") != 0) { | |
zmq_close(&out_socket); | |
} | |
if(strcmp(in_connect, "") != 0) { | |
zmq_close(&in_socket); | |
} | |
zmq_term(&context); | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment