Skip to content

Instantly share code, notes, and snippets.

@zerodivisi0n
Created June 21, 2015 11:26
Show Gist options
  • Save zerodivisi0n/155353f54f231c6a337e to your computer and use it in GitHub Desktop.
Save zerodivisi0n/155353f54f231c6a337e to your computer and use it in GitHub Desktop.
ZeroMQ Getting Started
// Based on http://nichol.as/zeromq-an-introduction
package main
import (
"fmt"
zmq "github.com/pebbe/zmq4"
"log"
"math/rand"
"os"
"strings"
"time"
)
const defaultEndpoint = "ipc://zmq_learn"
func req() {
sock, err := zmq.NewSocket(zmq.REQ)
if err != nil {
log.Fatalf("NewSocket(): %v", err)
}
sock.Connect(defaultEndpoint)
defer sock.Close()
for i := 0; i < 10; i++ {
sock.Send(fmt.Sprintf("Message %d", i), 0)
// Important: Recv is required, overwise server shutdown with error:
// Invalid argument (src/stream_engine.cpp:143)
rep, err := sock.Recv(0)
if err != nil {
log.Printf("Recv(): %v", err)
}
log.Printf("Reply: %s", rep)
}
}
func rep() {
sock, err := zmq.NewSocket(zmq.REP)
if err != nil {
log.Fatalf("NewSocket(): %v", err)
}
sock.Bind(defaultEndpoint)
defer sock.Close()
for {
msg, err := sock.Recv(0)
if err != nil {
log.Printf("Recv(): %v", err)
break
}
log.Printf("Message: %s", msg)
// Important: Send is required, overwise socket get error on next Recv:
// Operation cannot be accomplished in current state
sock.Send(msg, 0) // echo server
}
}
func pub() {
// Important: shutdown server if there is already PUB socket exists
sock, err := zmq.NewSocket(zmq.PUB)
if err != nil {
log.Fatalf("NewSocket(): %v", err)
}
sock.Bind(defaultEndpoint)
defer sock.Close()
countries := []string{"netherlands", "brazil", "germany", "portugal"}
events := []string{"yellow card", "red card", "goal", "corner", "foul"}
for {
msg := countries[rand.Intn(len(countries))] + " " + events[rand.Intn(len(events))]
log.Printf("Sending %s", msg)
sock.Send(msg, 0)
time.Sleep(100 * time.Millisecond)
}
}
func sub() {
sock, err := zmq.NewSocket(zmq.SUB)
if err != nil {
log.Fatalf("NewSocket(): %v", err)
}
sock.Connect(defaultEndpoint)
defer sock.Close()
sock.SetSubscribe("netherlands")
sock.SetSubscribe("germany")
for {
msg, err := sock.Recv(0)
if err != nil {
log.Printf("Recv(): %v", err)
break
}
log.Printf("Receive message: %s", msg)
}
}
func push() {
sock, err := zmq.NewSocket(zmq.PUSH)
if err != nil {
log.Fatalf("NewSocket(): %v", err)
}
// Note: Could me Connect or Bind
sock.Connect(defaultEndpoint)
defer sock.Close()
for i := 0; i < 10; i++ {
msg := fmt.Sprintf("Message %d", i)
sock.Send(msg, 0)
log.Printf("Send message %s", msg)
}
}
func pull() {
sock, err := zmq.NewSocket(zmq.PULL)
if err != nil {
log.Fatalf("NewSocket(): %v", err)
}
// Note: Could be Connect or Bind, but Bind return error for unknown reasons:
// Invalid argument (src/stream_engine.cpp:143)
sock.Bind(defaultEndpoint)
defer sock.Close()
for {
msg, err := sock.Recv(0)
if err != nil {
log.Printf("Recv(): %v", err)
break
}
log.Printf("Receive message: %s", msg)
}
}
var handlers = map[string]func(){
// request/reply pattern
"req": req,
"rep": rep,
// publish/subscribe pattern
"pub": pub,
"sub": sub,
// pipelining pattern
"push": push,
"pull": pull,
}
func main() {
modes := make([]string, 0, len(handlers))
for k := range handlers {
modes = append(modes, k)
}
if len(os.Args) < 2 {
log.Printf("Usage: %s %s", os.Args[0], strings.Join(modes, "|"))
os.Exit(1)
}
h := handlers[os.Args[1]]
if h == nil {
log.Printf("Invalid mode %s. Supported modes: %s", os.Args[0], strings.Join(modes, "|"))
os.Exit(1)
}
h()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment