Created
March 23, 2023 13:44
-
-
Save ripienaar/736b1940c341ad2bc99454779d97d1fb to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"fmt" | |
"github.com/choria-io/fisk" | |
"github.com/nats-io/jsm.go/natscontext" | |
"github.com/nats-io/nats.go" | |
"os" | |
"os/signal" | |
"strconv" | |
"strings" | |
"syscall" | |
"time" | |
) | |
var ( | |
subject string | |
nctx string | |
interval time.Duration | |
ctx context.Context | |
cancel context.CancelFunc | |
) | |
func main() { | |
app := fisk.New("ct", "NATS connection latency test") | |
app.Flag("context", "NATS Context to connect with").StringVar(&nctx) | |
publisher := app.Command("publish", "Start a publisher").Alias("pub").Action(runPublisher) | |
publisher.Arg("subject", "Subject to test on").Required().StringVar(&subject) | |
publisher.Flag("interval", "Publish interval").Short('i').Default("1s").DurationVar(&interval) | |
subscriber := app.Commandf("subscribe", "Start a subscriber").Alias("sub").Action(runSubscriber) | |
subscriber.Arg("subject", "Subject to test on").Required().StringVar(&subject) | |
ctx, cancel = context.WithCancel(context.Background()) | |
go interruptWatcher() | |
app.MustParseWithUsage(os.Args[1:]) | |
} | |
func connectNats() (*nats.Conn, error) { | |
return natscontext.Connect(nctx) | |
} | |
func runPublisher(_ *fisk.ParseContext) error { | |
nc, err := connectNats() | |
if err != nil { | |
return err | |
} | |
var ( | |
errs int | |
corruptions int | |
symmetryHist []float64 | |
rttHist []time.Duration | |
platHist []time.Duration | |
) | |
ticker := time.NewTicker(interval) | |
for { | |
select { | |
case <-ticker.C: | |
start := time.Now() | |
to, cancel := context.WithTimeout(ctx, 2*time.Second) | |
res, err := nc.RequestWithContext(to, subject, []byte(fmt.Sprintf("%d", start.UnixNano()))) | |
cancel() | |
if err != nil { | |
fmt.Printf("Error publishing: %v\n", err) | |
errs++ | |
continue | |
} | |
parts := strings.Split(string(res.Data), " ") | |
if len(parts) != 2 { | |
fmt.Printf("Invalid response %q\n", res.Data) | |
corruptions++ | |
continue | |
} | |
ns, err := strconv.Atoi(parts[1]) | |
if err != nil { | |
fmt.Printf("Invalid response: %q: %v", res.Data, err) | |
corruptions++ | |
continue | |
} | |
since := time.Since(start) | |
pubRtt := time.Duration(ns) | |
symmetry := float64(since) / float64(pubRtt) | |
if len(rttHist) == 51 { | |
rttHist = rttHist[1:] | |
} | |
if len(platHist) == 51 { | |
platHist = platHist[1:] | |
} | |
if len(symmetryHist) == 51 { | |
symmetryHist = symmetryHist[1:] | |
} | |
rttHist = append(rttHist, since) | |
platHist = append(platHist, pubRtt) | |
symmetryHist = append(symmetryHist, symmetry) | |
fmt.Printf("%v roundtrip time: %v (%v) publish latency: %v (%v) rtt symmetry: %0.3f (%0.3f)\n", | |
start.Format(time.RFC3339), | |
since.Round(time.Microsecond), avgDuration(rttHist).Round(time.Microsecond), | |
pubRtt.Round(time.Microsecond), avgDuration(platHist).Round(time.Microsecond), | |
symmetry, avgFloats(symmetryHist)) | |
case <-ctx.Done(): | |
return nil | |
} | |
} | |
} | |
func avgFloats(d []float64) float64 { | |
sum := float64(0) | |
for _, i := range d { | |
sum += i | |
} | |
return sum / float64(len(d)) | |
} | |
func avgDuration(d []time.Duration) time.Duration { | |
sum := time.Duration(0) | |
for _, i := range d { | |
sum += i | |
} | |
return time.Duration(int(sum) / len(d)) | |
} | |
func runSubscriber(_ *fisk.ParseContext) error { | |
nc, err := connectNats() | |
if err != nil { | |
return err | |
} | |
_, err = nc.Subscribe(subject, func(msg *nats.Msg) { | |
ns, err := strconv.Atoi(string(msg.Data)) | |
if err != nil { | |
fmt.Printf("Invalid request: %q: %v", msg.Data, err) | |
return | |
} | |
plat := time.Since(time.Unix(0, int64(ns))) | |
fmt.Printf("%v Received a message with publish latency: %v\n", time.Now().Format(time.RFC3339), plat.Round(time.Microsecond)) | |
err = msg.Respond([]byte(fmt.Sprintf("%d %d", time.Now().UnixNano(), plat))) | |
if err != nil { | |
fmt.Printf("Reply failed: %v", err) | |
return | |
} | |
}) | |
if err != nil { | |
return err | |
} | |
<-ctx.Done() | |
return nil | |
} | |
func interruptWatcher() { | |
sigs := make(chan os.Signal, 1) | |
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) | |
for { | |
select { | |
case sig := <-sigs: | |
switch sig { | |
case syscall.SIGINT, syscall.SIGTERM: | |
cancel() | |
} | |
case <-ctx.Done(): | |
return | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment