Skip to content

Instantly share code, notes, and snippets.

@wk989898
Created December 26, 2024 09:46
Show Gist options
  • Save wk989898/e0bb44605832ba86d71d017ae1e459a1 to your computer and use it in GitHub Desktop.
Save wk989898/e0bb44605832ba86d71d017ae1e459a1 to your computer and use it in GitHub Desktop.
package main
import (
"context"
"fmt"
"math/rand"
"time"
"github.com/IBM/sarama"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/sink/codec/common"
"github.com/pingcap/tiflow/pkg/sink/kafka"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
var (
PartitionNum int32 = 3
ReplicationFactor int16 = 1
partition int32 = 0
schema = ""
table = ""
)
func initlog() {
logger, err := zap.NewStdLogAt(log.L().With(zap.String("component", "sarama")), zapcore.InfoLevel)
if err != nil {
fmt.Println("err", err)
}
sarama.Logger = logger
}
func main() {
initlog()
topic := fmt.Sprintf("test-%d", rand.Int())
ctx, cancel := context.WithCancel(context.Background())
o := kafka.NewOptions()
o.BrokerEndpoints = []string{"127.0.0.1:9092"}
o.ClientID = "sarama-test"
f, err := kafka.NewSaramaFactory(o, model.DefaultChangeFeedID("sarama-test"))
if err != nil {
fmt.Println("err", err)
}
admin, err := f.AdminClient(ctx)
if err != nil {
fmt.Println("err", err)
}
err = admin.CreateTopic(ctx, &kafka.TopicDetail{Name: topic,
NumPartitions: PartitionNum,
ReplicationFactor: ReplicationFactor,
}, false)
if err != nil {
fmt.Println("err", err)
}
async, err := f.AsyncProducer(ctx, make(chan error, 1))
if err != nil {
fmt.Println("err", err)
}
go func() {
err := async.AsyncRunCallback(ctx)
if err != nil {
cancel()
async.Close()
fmt.Println("err", err)
}
}()
for i := 0; i < 200; i++ {
val := fmt.Sprintf("val: %d", i)
msg := common.NewMsg(config.ProtocolCanalJSON, nil, []byte(val), 0, model.MessageTypeRow, &schema, &table)
msg.HandleKey = []string{val}
err := async.AsyncSend(ctx, topic, partition, msg)
if err != nil {
cancel()
async.Close()
fmt.Println("AsyncSend err", err)
return
}
time.Sleep(time.Microsecond * 30)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment