Last active
June 12, 2018 17:14
-
-
Save geekyme/489f7868db8fe3215b2e9c1989dbb1ab to your computer and use it in GitHub Desktop.
Scale websocket connections horizontally using Kafka
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
/* | |
This simple chat program illustrates a horizontally scaled, master-client architecture chat server that uses kafka as a distributed pub-sub system | |
Each server has: | |
- one kafka consumer group by port | |
- a map of connected clients (state maintained by the server whenever clients connect and disconnect) | |
- one kakfa producer | |
When a client sends a message through websocket, the producer will write the message into kafka | |
All consumers (or servers, since we have 1 consumer per server) listening to that topic will receive the message | |
Server then iterate through its own map of clients to relay the message | |
This setup ensures that whichever server a client connects to, it will always receive a message sent by other clients | |
Inspired by http://tech.trello.com/why-we-chose-kafka/ | |
*/ | |
import ( | |
"fmt" | |
"log" | |
"net/http" | |
"os" | |
"github.com/confluentinc/confluent-kafka-go/kafka" | |
"github.com/gorilla/mux" | |
"github.com/olahol/melody" | |
) | |
func main() { | |
topic := "chatMessages" | |
port := os.Getenv("PORT") | |
if port == "" { | |
log.Fatal("Supply an environment variable for port. Example: PORT=:8000") | |
} | |
r := mux.NewRouter() | |
m := melody.New() | |
clients := make(map[*melody.Session]bool) | |
c, err := kafka.NewConsumer(&kafka.ConfigMap{ | |
"bootstrap.servers": "localhost", | |
"group.id": port, | |
"auto.offset.reset": "earliest", | |
}) | |
if err != nil { | |
panic(err) | |
} | |
p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost"}) | |
if err != nil { | |
panic(err) | |
} | |
c.SubscribeTopics([]string{topic}, nil) | |
r.HandleFunc("/chat/{userId}", func(w http.ResponseWriter, r *http.Request) { | |
m.HandleRequest(w, r) | |
}) | |
m.HandleConnect(func(s *melody.Session) { | |
vars := mux.Vars(s.Request) | |
log.Printf("Client %s connected", vars["userId"]) | |
clients[s] = true | |
s.Write([]byte("welcome")) | |
}) | |
m.HandleDisconnect(func(s *melody.Session) { | |
vars := mux.Vars(s.Request) | |
log.Printf("Client %s disconnected", vars["userId"]) | |
delete(clients, s) | |
}) | |
m.HandleMessage(func(s *melody.Session, msg []byte) { | |
vars := mux.Vars(s.Request) | |
log.Printf("Client %s sent message", vars["userId"]) | |
p.Produce(&kafka.Message{ | |
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, | |
Value: msg, | |
}, nil) | |
}) | |
log.Printf("Server running on port %s", port) | |
go http.ListenAndServe(port, r) | |
for { | |
msg, err := c.ReadMessage(-1) | |
if err == nil { | |
for client := range clients { | |
vars := mux.Vars(client.Request) | |
log.Printf("Client %s receive message", vars["userId"]) | |
client.Write(msg.Value) | |
} | |
} else { | |
fmt.Printf("Consumer error: %v (%v)\n", err, msg) | |
break | |
} | |
} | |
c.Close() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Setup 2 servers
PORT=:8000 go run chat.go
PORT=:9000 go run chat.go
Setup 3 clients on the browser
const c1 = new WebSocket("ws://localhost:8000/chat/1")
const c2 = new WebSocket("ws://localhost:8000/chat/2")
const c3 = new WebSocket("ws://localhost:9000/chat/3")
Send multiple messages from one client
All clients will successfully receive the message.