Skip to content

Instantly share code, notes, and snippets.

@schigh-ntwrk
Last active December 1, 2021 21:31
Show Gist options
  • Save schigh-ntwrk/92212e125e87f0e7e00796a32541fbbc to your computer and use it in GitHub Desktop.
Save schigh-ntwrk/92212e125e87f0e7e00796a32541fbbc to your computer and use it in GitHub Desktop.
Get burrow consumer lag via ngrok
package main
/*
To run this file:
go run main.go -uri 'https://my.ngrok.uri.from.burrow.container'
*/
import (
"context"
"encoding/json"
"flag"
"fmt"
"log"
"net/http"
"os/signal"
"path"
"strings"
"syscall"
"time"
)
type ConsumersWrapper struct {
Error bool `json:"error"`
Message string `json:"message"`
Consumers []string `json:"consumers"`
Request Request `json:"request"`
}
type Request struct {
URL string `json:"url"`
Host string `json:"host"`
}
type Wrapper struct {
Error bool `json:"error"`
Message string `json:"message"`
Status Status `json:"status"`
Request Request `json:"request"`
}
type Start struct {
Offset int `json:"offset"`
Timestamp int64 `json:"timestamp"`
ObservedAt int64 `json:"observedAt"`
Lag int `json:"lag"`
}
type End struct {
Offset int `json:"offset"`
Timestamp int64 `json:"timestamp"`
ObservedAt int64 `json:"observedAt"`
Lag int `json:"lag"`
}
type Partitions struct {
Topic string `json:"topic"`
Partition int `json:"partition"`
Owner string `json:"owner"`
ClientID string `json:"client_id"`
Status string `json:"status"`
Start Start `json:"start"`
End End `json:"end"`
CurrentLag int `json:"current_lag"`
Complete float64 `json:"complete"`
}
type Maxlag struct {
Topic string `json:"topic"`
Partition int `json:"partition"`
Owner string `json:"owner"`
ClientID string `json:"client_id"`
Status string `json:"status"`
Start Start `json:"start"`
End End `json:"end"`
CurrentLag int `json:"current_lag"`
Complete float64 `json:"complete"`
}
type Status struct {
Cluster string `json:"cluster"`
Group string `json:"group"`
Status string `json:"status"`
Complete float64 `json:"complete"`
Partitions []Partitions `json:"partitions"`
PartitionCount int `json:"partition_count"`
Maxlag Maxlag `json:"maxlag"`
Totallag int `json:"totallag"`
}
const (
consumerURI = "/v3/kafka/prd/consumer"
)
var ngrokURI string
func main() {
flag.StringVar(&ngrokURI, "uri", "", "ngrok uri")
flag.Parse()
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
defer cancel()
client := http.Client{
Timeout: 5 * time.Second,
}
var consumers ConsumersWrapper
{
resp, respErr := client.Get(ngrokURI + consumerURI)
if respErr != nil {
log.Fatalf("send request failed: %v\n", respErr)
}
if jErr := json.NewDecoder(resp.Body).Decode(&consumers); jErr != nil {
_ = resp.Body.Close()
log.Fatalf("decode consumers request failed: %v\n", jErr)
}
}
t := time.NewTicker(500 * time.Millisecond)
go func(ctx context.Context, client *http.Client, consumers []string, t *time.Ticker) {
l := len(consumers)
i := l - 1
for {
select {
case <-ctx.Done():
return
default:
}
<-t.C
if i >= l {
i = 0
}
uri := ngrokURI + path.Join(consumerURI, consumers[i], "lag")
resp, respErr := client.Get(uri)
if respErr != nil {
log.Printf("send request failed: %v\n", respErr)
i++
continue
}
var wrapper Wrapper
if jErr := json.NewDecoder(resp.Body).Decode(&wrapper); jErr != nil {
_ = resp.Body.Close()
log.Printf("decode response failed: %v\n", jErr)
i++
continue
}
_ = resp.Body.Close()
i++
if wrapper.Status.Totallag == 0 {
continue
}
sb := strings.Builder{}
sb.WriteString("GROUP: " + wrapper.Status.Group)
sb.WriteByte('\n')
for j := range wrapper.Status.Partitions {
if wrapper.Status.Partitions[j].CurrentLag == 0 {
continue
}
partition := wrapper.Status.Partitions[j]
startTime := time.Unix(partition.Start.Timestamp/1e3, 0)
endTime := time.Unix(partition.End.Timestamp/1e3, 0)
sb.WriteString(fmt.Sprintf("\tTOPIC: \u001b[34m%-22s\u001B[0m\n", partition.Topic))
sb.WriteString(fmt.Sprintf("\tCLIENT: \u001B[34m%-22s\u001B[0m\n", partition.ClientID))
sb.WriteString("\tSTART:\n")
sb.WriteString(fmt.Sprintf("\t OFFSET: %-22d\n", partition.Start.Offset))
sb.WriteString(fmt.Sprintf("\t TS: %-22s\n", startTime.Format(time.RFC3339)))
sb.WriteString(fmt.Sprintf("\t OBSERVED: %-22s\n", time.Unix(partition.Start.ObservedAt/1e3, 0).Format(time.RFC3339)))
sb.WriteString(fmt.Sprintf("\t LAG: \u001B[31m%-22d\u001B[0m\n", partition.Start.Lag))
sb.WriteString("\tEND:\n")
sb.WriteString(fmt.Sprintf("\t OFFSET: %-22d\n", partition.End.Offset))
sb.WriteString(fmt.Sprintf("\t TS: %-22s\n", endTime.Format(time.RFC3339)))
sb.WriteString(fmt.Sprintf("\t OBSERVED: %-22s\n", time.Unix(partition.End.ObservedAt/1e3, 0).Format(time.RFC3339)))
sb.WriteString(fmt.Sprintf("\t LAG: \u001b[31m%-22d\u001b[0m\n", partition.End.Lag))
sb.WriteString("\tDELTA:\n")
dur := endTime.Sub(startTime)
if partition.End.Lag == partition.Start.Lag {
sb.WriteString(fmt.Sprintf("\t No change in lag (%d) detected in %s\n", partition.End.Lag, dur))
continue
}
diff := partition.End.Lag - partition.Start.Lag
color := "\u001B[31m"
desc := "increased"
if diff < 0 {
diff = diff * -1
color = "\u001B[33m"
desc = "decreased"
}
sb.WriteString(fmt.Sprintf("\t %sLag %s by %d messages (%d -> %d) in %s\u001B[0m\n", color, desc, diff, partition.Start.Lag, partition.End.Lag, dur))
}
sb.WriteString("------------------------------------------------")
fmt.Println(sb.String())
}
}(ctx, &client, consumers.Consumers, t)
<-ctx.Done()
t.Stop()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment