Created
June 21, 2015 11:26
-
-
Save zerodivisi0n/155353f54f231c6a337e to your computer and use it in GitHub Desktop.
ZeroMQ Getting Started
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Based on http://nichol.as/zeromq-an-introduction | |
package main | |
import ( | |
"fmt" | |
zmq "github.com/pebbe/zmq4" | |
"log" | |
"math/rand" | |
"os" | |
"strings" | |
"time" | |
) | |
const defaultEndpoint = "ipc://zmq_learn" | |
func req() { | |
sock, err := zmq.NewSocket(zmq.REQ) | |
if err != nil { | |
log.Fatalf("NewSocket(): %v", err) | |
} | |
sock.Connect(defaultEndpoint) | |
defer sock.Close() | |
for i := 0; i < 10; i++ { | |
sock.Send(fmt.Sprintf("Message %d", i), 0) | |
// Important: Recv is required, overwise server shutdown with error: | |
// Invalid argument (src/stream_engine.cpp:143) | |
rep, err := sock.Recv(0) | |
if err != nil { | |
log.Printf("Recv(): %v", err) | |
} | |
log.Printf("Reply: %s", rep) | |
} | |
} | |
func rep() { | |
sock, err := zmq.NewSocket(zmq.REP) | |
if err != nil { | |
log.Fatalf("NewSocket(): %v", err) | |
} | |
sock.Bind(defaultEndpoint) | |
defer sock.Close() | |
for { | |
msg, err := sock.Recv(0) | |
if err != nil { | |
log.Printf("Recv(): %v", err) | |
break | |
} | |
log.Printf("Message: %s", msg) | |
// Important: Send is required, overwise socket get error on next Recv: | |
// Operation cannot be accomplished in current state | |
sock.Send(msg, 0) // echo server | |
} | |
} | |
func pub() { | |
// Important: shutdown server if there is already PUB socket exists | |
sock, err := zmq.NewSocket(zmq.PUB) | |
if err != nil { | |
log.Fatalf("NewSocket(): %v", err) | |
} | |
sock.Bind(defaultEndpoint) | |
defer sock.Close() | |
countries := []string{"netherlands", "brazil", "germany", "portugal"} | |
events := []string{"yellow card", "red card", "goal", "corner", "foul"} | |
for { | |
msg := countries[rand.Intn(len(countries))] + " " + events[rand.Intn(len(events))] | |
log.Printf("Sending %s", msg) | |
sock.Send(msg, 0) | |
time.Sleep(100 * time.Millisecond) | |
} | |
} | |
func sub() { | |
sock, err := zmq.NewSocket(zmq.SUB) | |
if err != nil { | |
log.Fatalf("NewSocket(): %v", err) | |
} | |
sock.Connect(defaultEndpoint) | |
defer sock.Close() | |
sock.SetSubscribe("netherlands") | |
sock.SetSubscribe("germany") | |
for { | |
msg, err := sock.Recv(0) | |
if err != nil { | |
log.Printf("Recv(): %v", err) | |
break | |
} | |
log.Printf("Receive message: %s", msg) | |
} | |
} | |
func push() { | |
sock, err := zmq.NewSocket(zmq.PUSH) | |
if err != nil { | |
log.Fatalf("NewSocket(): %v", err) | |
} | |
// Note: Could me Connect or Bind | |
sock.Connect(defaultEndpoint) | |
defer sock.Close() | |
for i := 0; i < 10; i++ { | |
msg := fmt.Sprintf("Message %d", i) | |
sock.Send(msg, 0) | |
log.Printf("Send message %s", msg) | |
} | |
} | |
func pull() { | |
sock, err := zmq.NewSocket(zmq.PULL) | |
if err != nil { | |
log.Fatalf("NewSocket(): %v", err) | |
} | |
// Note: Could be Connect or Bind, but Bind return error for unknown reasons: | |
// Invalid argument (src/stream_engine.cpp:143) | |
sock.Bind(defaultEndpoint) | |
defer sock.Close() | |
for { | |
msg, err := sock.Recv(0) | |
if err != nil { | |
log.Printf("Recv(): %v", err) | |
break | |
} | |
log.Printf("Receive message: %s", msg) | |
} | |
} | |
var handlers = map[string]func(){ | |
// request/reply pattern | |
"req": req, | |
"rep": rep, | |
// publish/subscribe pattern | |
"pub": pub, | |
"sub": sub, | |
// pipelining pattern | |
"push": push, | |
"pull": pull, | |
} | |
func main() { | |
modes := make([]string, 0, len(handlers)) | |
for k := range handlers { | |
modes = append(modes, k) | |
} | |
if len(os.Args) < 2 { | |
log.Printf("Usage: %s %s", os.Args[0], strings.Join(modes, "|")) | |
os.Exit(1) | |
} | |
h := handlers[os.Args[1]] | |
if h == nil { | |
log.Printf("Invalid mode %s. Supported modes: %s", os.Args[0], strings.Join(modes, "|")) | |
os.Exit(1) | |
} | |
h() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment