Skip to content

Instantly share code, notes, and snippets.

@geekyme
Last active June 12, 2018 17:14
Show Gist options
  • Save geekyme/489f7868db8fe3215b2e9c1989dbb1ab to your computer and use it in GitHub Desktop.
Save geekyme/489f7868db8fe3215b2e9c1989dbb1ab to your computer and use it in GitHub Desktop.
Scale websocket connections horizontally using Kafka
package main
/*
This simple chat program illustrates a horizontally scaled, master-client architecture chat server that uses kafka as a distributed pub-sub system
Each server has:
- one kafka consumer group by port
- a map of connected clients (state maintained by the server whenever clients connect and disconnect)
- one kakfa producer
When a client sends a message through websocket, the producer will write the message into kafka
All consumers (or servers, since we have 1 consumer per server) listening to that topic will receive the message
Server then iterate through its own map of clients to relay the message
This setup ensures that whichever server a client connects to, it will always receive a message sent by other clients
Inspired by http://tech.trello.com/why-we-chose-kafka/
*/
import (
"fmt"
"log"
"net/http"
"os"
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/gorilla/mux"
"github.com/olahol/melody"
)
func main() {
topic := "chatMessages"
port := os.Getenv("PORT")
if port == "" {
log.Fatal("Supply an environment variable for port. Example: PORT=:8000")
}
r := mux.NewRouter()
m := melody.New()
clients := make(map[*melody.Session]bool)
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost",
"group.id": port,
"auto.offset.reset": "earliest",
})
if err != nil {
panic(err)
}
p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost"})
if err != nil {
panic(err)
}
c.SubscribeTopics([]string{topic}, nil)
r.HandleFunc("/chat/{userId}", func(w http.ResponseWriter, r *http.Request) {
m.HandleRequest(w, r)
})
m.HandleConnect(func(s *melody.Session) {
vars := mux.Vars(s.Request)
log.Printf("Client %s connected", vars["userId"])
clients[s] = true
s.Write([]byte("welcome"))
})
m.HandleDisconnect(func(s *melody.Session) {
vars := mux.Vars(s.Request)
log.Printf("Client %s disconnected", vars["userId"])
delete(clients, s)
})
m.HandleMessage(func(s *melody.Session, msg []byte) {
vars := mux.Vars(s.Request)
log.Printf("Client %s sent message", vars["userId"])
p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: msg,
}, nil)
})
log.Printf("Server running on port %s", port)
go http.ListenAndServe(port, r)
for {
msg, err := c.ReadMessage(-1)
if err == nil {
for client := range clients {
vars := mux.Vars(client.Request)
log.Printf("Client %s receive message", vars["userId"])
client.Write(msg.Value)
}
} else {
fmt.Printf("Consumer error: %v (%v)\n", err, msg)
break
}
}
c.Close()
}
@geekyme
Copy link
Author

geekyme commented Jun 12, 2018

Setup 2 servers

PORT=:8000 go run chat.go
PORT=:9000 go run chat.go

Setup 3 clients on the browser

const c1 = new WebSocket("ws://localhost:8000/chat/1")
const c2 = new WebSocket("ws://localhost:8000/chat/2")
const c3 = new WebSocket("ws://localhost:9000/chat/3")

Send multiple messages from one client

let msg = 10;

while (msg >= 0) {
   c1.send("Counting down..." + msg);
   msg--;
}

All clients will successfully receive the message.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment