Created
November 29, 2021 15:16
-
-
Save perbu/4328d9533d6e0a4fc77c9689ca5ab74e to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
/* | |
This is an example of using the paho.golang (supporting MQTT5) library in a way where | |
the connect call will block and wait for the connection to go down. | |
I use something like this as I need to generate a TLS cert before each connect. | |
*/ | |
import ( | |
"context" | |
"encoding/json" | |
"fmt" | |
"github.com/eclipse/paho.golang/paho" | |
"log" | |
"net" | |
"sync" | |
"time" | |
) | |
type msgChan chan *paho.Publish | |
func blockingConnect(ctx context.Context, subMsgChan msgChan, pubMsgChan msgChan, topic, addr string) error { | |
log.Println("Connecting") | |
errChan := make(chan error) | |
conn, err := net.Dial("tcp", addr) | |
if err != nil { | |
return fmt.Errorf("failed to connect: %s", err) | |
} | |
client := paho.NewClient( | |
paho.ClientConfig{ | |
OnClientError: func(e error) { | |
errChan <- e | |
}, | |
Router: paho.NewSingleHandlerRouter(func(m *paho.Publish) { | |
subMsgChan <- m | |
}), | |
// Wandering if we should set the | |
// OnServerDisconnect callback. | |
Conn: conn, | |
}) | |
cp := &paho.Connect{ | |
KeepAlive: 30, | |
ClientID: "paho", | |
CleanStart: true, | |
} | |
ca, err := client.Connect(ctx, cp) | |
if err != nil { | |
return fmt.Errorf("mqtt connect error: %s", err) | |
} | |
if ca.ReasonCode != 0 { | |
return fmt.Errorf("mqtt connect failure: %d - %s", ca.ReasonCode, ca.Properties.ReasonString) | |
} | |
// goroutine that reads the pubMsgChan and publishes the messages | |
go func() { | |
for ctx.Err() == nil { | |
select { | |
case <-ctx.Done(): // abort waiting for the channel | |
log.Println("ctx cancelled in blockingConnect publisher") | |
case msg := <-pubMsgChan: | |
log.Println("publisher got a message to publish") | |
ctxTimeout, timeoutCancel := context.WithTimeout(ctx, time.Second) | |
_, err := client.Publish(ctxTimeout, msg) | |
if err != nil { | |
log.Printf("publish error: %s\n", err) | |
} else { | |
log.Println("publish successful.") | |
} | |
timeoutCancel() | |
} | |
} | |
log.Println("pub loop aborting: ", ctx.Err()) | |
}() | |
// goroutine that monitors the context and shuts down if it is cancelled. | |
go func() { | |
<-ctx.Done() | |
log.Println("context cancelled, disconnecting") | |
if client != nil { | |
d := &paho.Disconnect{ReasonCode: 0} | |
client.Disconnect(d) | |
} | |
}() | |
sa, err := client.Subscribe(ctx, &paho.Subscribe{ | |
Subscriptions: map[string]paho.SubscribeOptions{ | |
topic: {QoS: byte(1)}, | |
}, | |
}) | |
if err != nil { | |
return fmt.Errorf("mqtt subscribe error: %s", err) | |
} | |
if sa.Reasons[0] != byte(1) { | |
return fmt.Errorf("failed to subscribe to %s : %d", topic, sa.Reasons[0]) | |
} | |
log.Printf("Subscribed to %s", topic) | |
return <-errChan | |
} | |
func main() { | |
const addr = "localhost:1883" | |
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) | |
pubMsgChan := make(msgChan, 0) | |
subMsgChan := make(msgChan, 0) | |
wg := sync.WaitGroup{} | |
// Set up the goroutine which handles the connection. | |
wg.Add(1) | |
go func() { | |
for ctx.Err() == nil { | |
err := blockingConnect(ctx, subMsgChan, pubMsgChan, "test/+", addr) | |
log.Println("Err: ", err) | |
time.Sleep(time.Second) | |
} | |
wg.Done() | |
}() | |
// produces mqtt messages. | |
wg.Add(1) | |
go func() { | |
counter := 0 | |
for ctx.Err() == nil { | |
payload, _ := json.Marshal(counter) | |
msg := paho.Publish{ | |
PacketID: 0, | |
QoS: 1, | |
Retain: false, | |
Topic: "pub/pub", | |
Properties: nil, | |
Payload: payload, | |
} | |
log.Println("Message prepared") | |
counter++ | |
select { | |
case pubMsgChan <- &msg: | |
log.Println("Message sent", string(payload)) | |
case <-ctx.Done(): | |
log.Println("context cancelled. Aborting writer.") | |
} | |
time.Sleep(time.Second) | |
} | |
wg.Done() | |
}() | |
// Set up the consumer. | |
wg.Add(1) | |
go func() { | |
for ctx.Err() == nil { | |
select { | |
case <-ctx.Done(): | |
log.Println("context cancelled. Aborting reader.") | |
case m := <-subMsgChan: | |
log.Println("Received message:", string(m.Payload)) | |
} | |
} | |
wg.Done() | |
}() | |
log.Println("Waiting for goroutines to exit") | |
wg.Wait() | |
cancel() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment