Last active
December 1, 2021 21:31
-
-
Save schigh-ntwrk/92212e125e87f0e7e00796a32541fbbc to your computer and use it in GitHub Desktop.
Get burrow consumer lag via ngrok
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
/* | |
To run this file: | |
go run main.go -uri 'https://my.ngrok.uri.from.burrow.container' | |
*/ | |
import ( | |
"context" | |
"encoding/json" | |
"flag" | |
"fmt" | |
"log" | |
"net/http" | |
"os/signal" | |
"path" | |
"strings" | |
"syscall" | |
"time" | |
) | |
type ConsumersWrapper struct { | |
Error bool `json:"error"` | |
Message string `json:"message"` | |
Consumers []string `json:"consumers"` | |
Request Request `json:"request"` | |
} | |
type Request struct { | |
URL string `json:"url"` | |
Host string `json:"host"` | |
} | |
type Wrapper struct { | |
Error bool `json:"error"` | |
Message string `json:"message"` | |
Status Status `json:"status"` | |
Request Request `json:"request"` | |
} | |
type Start struct { | |
Offset int `json:"offset"` | |
Timestamp int64 `json:"timestamp"` | |
ObservedAt int64 `json:"observedAt"` | |
Lag int `json:"lag"` | |
} | |
type End struct { | |
Offset int `json:"offset"` | |
Timestamp int64 `json:"timestamp"` | |
ObservedAt int64 `json:"observedAt"` | |
Lag int `json:"lag"` | |
} | |
type Partitions struct { | |
Topic string `json:"topic"` | |
Partition int `json:"partition"` | |
Owner string `json:"owner"` | |
ClientID string `json:"client_id"` | |
Status string `json:"status"` | |
Start Start `json:"start"` | |
End End `json:"end"` | |
CurrentLag int `json:"current_lag"` | |
Complete float64 `json:"complete"` | |
} | |
type Maxlag struct { | |
Topic string `json:"topic"` | |
Partition int `json:"partition"` | |
Owner string `json:"owner"` | |
ClientID string `json:"client_id"` | |
Status string `json:"status"` | |
Start Start `json:"start"` | |
End End `json:"end"` | |
CurrentLag int `json:"current_lag"` | |
Complete float64 `json:"complete"` | |
} | |
type Status struct { | |
Cluster string `json:"cluster"` | |
Group string `json:"group"` | |
Status string `json:"status"` | |
Complete float64 `json:"complete"` | |
Partitions []Partitions `json:"partitions"` | |
PartitionCount int `json:"partition_count"` | |
Maxlag Maxlag `json:"maxlag"` | |
Totallag int `json:"totallag"` | |
} | |
const ( | |
consumerURI = "/v3/kafka/prd/consumer" | |
) | |
var ngrokURI string | |
func main() { | |
flag.StringVar(&ngrokURI, "uri", "", "ngrok uri") | |
flag.Parse() | |
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) | |
defer cancel() | |
client := http.Client{ | |
Timeout: 5 * time.Second, | |
} | |
var consumers ConsumersWrapper | |
{ | |
resp, respErr := client.Get(ngrokURI + consumerURI) | |
if respErr != nil { | |
log.Fatalf("send request failed: %v\n", respErr) | |
} | |
if jErr := json.NewDecoder(resp.Body).Decode(&consumers); jErr != nil { | |
_ = resp.Body.Close() | |
log.Fatalf("decode consumers request failed: %v\n", jErr) | |
} | |
} | |
t := time.NewTicker(500 * time.Millisecond) | |
go func(ctx context.Context, client *http.Client, consumers []string, t *time.Ticker) { | |
l := len(consumers) | |
i := l - 1 | |
for { | |
select { | |
case <-ctx.Done(): | |
return | |
default: | |
} | |
<-t.C | |
if i >= l { | |
i = 0 | |
} | |
uri := ngrokURI + path.Join(consumerURI, consumers[i], "lag") | |
resp, respErr := client.Get(uri) | |
if respErr != nil { | |
log.Printf("send request failed: %v\n", respErr) | |
i++ | |
continue | |
} | |
var wrapper Wrapper | |
if jErr := json.NewDecoder(resp.Body).Decode(&wrapper); jErr != nil { | |
_ = resp.Body.Close() | |
log.Printf("decode response failed: %v\n", jErr) | |
i++ | |
continue | |
} | |
_ = resp.Body.Close() | |
i++ | |
if wrapper.Status.Totallag == 0 { | |
continue | |
} | |
sb := strings.Builder{} | |
sb.WriteString("GROUP: " + wrapper.Status.Group) | |
sb.WriteByte('\n') | |
for j := range wrapper.Status.Partitions { | |
if wrapper.Status.Partitions[j].CurrentLag == 0 { | |
continue | |
} | |
partition := wrapper.Status.Partitions[j] | |
startTime := time.Unix(partition.Start.Timestamp/1e3, 0) | |
endTime := time.Unix(partition.End.Timestamp/1e3, 0) | |
sb.WriteString(fmt.Sprintf("\tTOPIC: \u001b[34m%-22s\u001B[0m\n", partition.Topic)) | |
sb.WriteString(fmt.Sprintf("\tCLIENT: \u001B[34m%-22s\u001B[0m\n", partition.ClientID)) | |
sb.WriteString("\tSTART:\n") | |
sb.WriteString(fmt.Sprintf("\t OFFSET: %-22d\n", partition.Start.Offset)) | |
sb.WriteString(fmt.Sprintf("\t TS: %-22s\n", startTime.Format(time.RFC3339))) | |
sb.WriteString(fmt.Sprintf("\t OBSERVED: %-22s\n", time.Unix(partition.Start.ObservedAt/1e3, 0).Format(time.RFC3339))) | |
sb.WriteString(fmt.Sprintf("\t LAG: \u001B[31m%-22d\u001B[0m\n", partition.Start.Lag)) | |
sb.WriteString("\tEND:\n") | |
sb.WriteString(fmt.Sprintf("\t OFFSET: %-22d\n", partition.End.Offset)) | |
sb.WriteString(fmt.Sprintf("\t TS: %-22s\n", endTime.Format(time.RFC3339))) | |
sb.WriteString(fmt.Sprintf("\t OBSERVED: %-22s\n", time.Unix(partition.End.ObservedAt/1e3, 0).Format(time.RFC3339))) | |
sb.WriteString(fmt.Sprintf("\t LAG: \u001b[31m%-22d\u001b[0m\n", partition.End.Lag)) | |
sb.WriteString("\tDELTA:\n") | |
dur := endTime.Sub(startTime) | |
if partition.End.Lag == partition.Start.Lag { | |
sb.WriteString(fmt.Sprintf("\t No change in lag (%d) detected in %s\n", partition.End.Lag, dur)) | |
continue | |
} | |
diff := partition.End.Lag - partition.Start.Lag | |
color := "\u001B[31m" | |
desc := "increased" | |
if diff < 0 { | |
diff = diff * -1 | |
color = "\u001B[33m" | |
desc = "decreased" | |
} | |
sb.WriteString(fmt.Sprintf("\t %sLag %s by %d messages (%d -> %d) in %s\u001B[0m\n", color, desc, diff, partition.Start.Lag, partition.End.Lag, dur)) | |
} | |
sb.WriteString("------------------------------------------------") | |
fmt.Println(sb.String()) | |
} | |
}(ctx, &client, consumers.Consumers, t) | |
<-ctx.Done() | |
t.Stop() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment