Golang📌常用包📌kafka-go.txt
"github.com/segmentio/kafka-go"用于连接和操作kafka
"github.com/segmentio/kafka-go/sasl/scram"提供了sasl认证方法
func Mechanism(algo Algorithm, username, password string) (sasl.Mechanism, error)
algo支持SHA256和SHA512
========== ========== ========== ========== ==========
Client是一个高级API,用于与Kafka的broker交互,可并发安全使用,首次使用后不可更改配置。
type Client struct {
Addr net.Addr // 指定kafka集群地址,也可以在每个请求中单独指定(优先级更高)
Timeout time.Duration // 为0默认无超时
Transport RoundTripper // 为nil时使用kafka.DefaultTransport
}
func (c *Client) Metadata(ctx context.Context, req *MetadataRequest) (*MetadataResponse, error)
func (c *Client) OffsetFetch(ctx context.Context, req *OffsetFetchRequest) (*OffsetFetchResponse, error)
func (c *Client) ListOffsets(ctx context.Context, req *ListOffsetsRequest) (*ListOffsetsResponse, error)
func (c *Client) CreateTopics(ctx context.Context, req *CreateTopicsRequest) (*CreateTopicsResponse, error)
func (c *Client) DeleteTopics(ctx context.Context, req *DeleteTopicsRequest) (*DeleteTopicsResponse, error)
生产环境的Topic应通过管理平台创建(明确分区数、副本数、清理策略等)。
========== ========== ========== ========== ==========
Writer可并发安全调用,配置在首次使用后不应再修改
type Writer struct {
Addr net.Addr // 使用 kafka.TCP(address ...string)
Topic string
Balancer Balancer // 默认round-robin,可设为&kafka.Hash{}
MaxAttempts int // The default is to try at most 10 times.
WriteBackoffMin time.Duration // Default: 100ms
WriteBackoffMax time.Duration // Default: 1s
BatchSize int // default 100 messages.
BatchBytes int64 // default 1048576
BatchTimeout time.Duration // The default is to flush at least every second.
ReadTimeout time.Duration // Defaults to 10 seconds.
WriteTimeout time.Duration // Defaults to 10 seconds.
RequiredAcks RequiredAcks // Defaults to RequireNone. 可设置为RequireOne、RequireAll
Transport RoundTripper // If nil, DefaultTransport is used.
}
Topic指将要发送消息到主题名,与每条Message设置Topic为互斥选项。
若在此处设置了Topic,就不得为任何要发送的消息(Message)再设置Topic。
反之,若未在此处设置Topic,则每条消息(Message)都必须指定Topic。
单一主题的程序可在此处设置topic,多主题建议此处不设置topic由消息投递时指定。
========== ========== ========== ========== ==========
func NewReader(config ReaderConfig) *Reader
type ReaderConfig struct {
Brokers []string
GroupID string // 与Partition互斥,设置后需搭配Topic(单主题)或 GroupTopics(多主题)
GroupTopics []string
Topic string
Partition int // 必须与Topic搭配使用
Dialer *Dialer
QueueCapacity int // 缓存消费消息,defaults 100
MinBytes int // 单次拉取的最小字节数,Default: 1
MaxBytes int //单次拉取的最大字节数,需覆盖最大单条消息大小,Default: 1MB
MaxWait time.Duration // 拉取消息的最大等待时间,Default: 10s
ReadBatchTimeout time.Duration // 从内部批量消息中读取的超时时间,Default: 10s
ReadLagInterval time.Duration // 消费延迟(lag)的更新频率,设为负数则禁用延迟上报
GroupBalancers []GroupBalancer // Default: [Range, RoundRobin], Only used when GroupID is set
HeartbeatInterval time.Duration // Default: 3s, Only used when GroupID is set
CommitInterval time.Duration // Default: 0, Only used when GroupID is set
PartitionWatchInterval time.Duration // Default: 5s, Only used when GroupID is set and WatchPartitionChanges is set.
WatchPartitionChanges bool
SessionTimeout time.Duration // Default: 30s, Only used when GroupID is set
RebalanceTimeout time.Duration // Default: 30s, Only used when GroupID is set
JoinGroupBackoff time.Duration // Default: 5s
RetentionTime time.Duration // Default: -1, Only used when GroupID is set
StartOffset int64 //Default: FirstOffset, Only used when GroupID is set
ReadBackoffMin time.Duration // Default: 100ms
ReadBackoffMax time.Duration // Default: 1s
MaxAttempts int // The default is to try 3 times.
}
读取并返回下一条消息。调用该方法后会阻塞,直到有消息可用或发生错误。
func (r *Reader) ReadMessage(ctx context.Context) (Message, error)
func (r *Reader) FetchMessage(ctx context.Context) (Message, error)
使用消费者组时,FetchMessage不会自动提交偏移量,需通过CommitMessages方法手动提交;
而ReadMessage会自动提交偏移量(先调用FetchMessage后随即调用CommitMessages,再返回Message)。
若需要对偏移量的提交时机进行更精细的控制,应采用FetchMessage配合CommitMessages组合实现。
提交消息列表对应的偏移量
func (r *Reader) CommitMessages(ctx context.Context, msgs ...Message) error
========== ========== Client使用示例 ========== ==========
func ExampleClient(brokers []string, username, password string) {
client := &kafka.Client{
Addr: kafka.TCP(brokers...),
Timeout: 10 * time.Second,
}
if username != "" {
sm, err := scram.Mechanism(scram.SHA512, username, password)
if err != nil {
log.Fatal("scram.Mechanism.error: ", err)
}
client.Transport = &kafka.Transport{
TLS: &tls.Config{},
SASL: sm,
}
}
createTopicResp, err := client.CreateTopics(context.Background(), &kafka.CreateTopicsRequest{
Addr: nil, // 若指定则优先使用,否则使用client.Addr
Topics: []kafka.TopicConfig{
{
Topic: "example_topic",
NumPartitions: 4,
ReplicationFactor: 1,
},
{
Topic: "test_topic",
NumPartitions: 4,
ReplicationFactor: 1,
},
},
ValidateOnly: false, // 是否只验证不创建
})
if err != nil {
log.Println("client.CreateTopics.error: ", err)
}
log.Printf("client.CreateTopics.response: %+v\n", createTopicResp)
deleteTopicResp, err := client.DeleteTopics(context.Background(), &kafka.DeleteTopicsRequest{
Addr: nil, // 若指定则优先使用,否则使用client.Addr
Topics: []string{"example_topic", "test_topic"},
})
if err != nil {
log.Println("client.DeleteTopics.error: ", err)
}
log.Printf("client.DeleteTopics.response: %+v\n", deleteTopicResp)
}
========== ========== 生产者消费者常用方法 ========== ==========
package gkafka
import (
"context"
"crypto/tls"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl/scram"
"log"
"sync"
"time"
)
type Config struct {
Brokers []string
Username, Password string
}
// NewProducer 创建通用生产者,多数场景优先使用单个Writer处理多Topic
func NewProducer(cfg *Config) *kafka.Writer {
w := &kafka.Writer{
Addr: kafka.TCP(cfg.Brokers...),
Balancer: &kafka.Hash{},
RequiredAcks: kafka.RequireOne,
}
if cfg.Username != "" {
sm, err := scram.Mechanism(scram.SHA512, cfg.Username, cfg.Password)
if err != nil {
log.Fatal("error: ", err)
}
w.Transport = &kafka.Transport{
TLS: &tls.Config{},
SASL: sm,
}
}
return w
}
type Consumer struct {
stop context.CancelFunc
wg sync.WaitGroup
}
// NewConsumer 可并发启动多个消费者,支持优雅退出,同一topic+group下多实例concurrency总数宜<=partitions
func NewConsumer(cfg *Config, topic, groupID string, concurrency int, handler func(msg *kafka.Message)) *Consumer {
dialer := &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
}
if cfg.Username != "" {
sm, err := scram.Mechanism(scram.SHA512, cfg.Username, cfg.Password)
if err != nil {
log.Fatal("error: ", err)
}
dialer.TLS = &tls.Config{}
dialer.SASLMechanism = sm
}
ctx, cancel := context.WithCancel(context.Background())
c := &Consumer{stop: cancel}
for range concurrency {
c.wg.Add(1)
go func() {
defer c.wg.Done()
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: cfg.Brokers,
GroupID: groupID,
Topic: topic,
Dialer: dialer,
})
for {
select {
case <-ctx.Done():
r.Close()
return
default:
msg, err := r.FetchMessage(ctx)
if err == nil {
handler(&msg)
if groupID != "" {
r.CommitMessages(context.Background(), msg) // 使用新的context避免ctx取消后导致commit失败
}
}
}
}
}()
}
return c
}
func (c *Consumer) Stop() {
c.stop()
c.wg.Wait()
}
func MapToHeaders(m map[string]string) []kafka.Header {
headers := make([]kafka.Header, 0, len(m))
for k, v := range m {
headers = append(headers, kafka.Header{
Key: k,
Value: []byte(v),
})
}
return headers
}
func HeadersToMap(headers []kafka.Header) map[string]string {
m := make(map[string]string, len(headers))
for _, h := range headers {
m[h.Key] = string(h.Value)
}
return m
}
========== ========== ========== ========== ==========
const topic = "example_topic"
func main() {
producer := gkafka.NewProducer(&gkafka.Config{Brokers: []string{"127.0.0.1:9092"}})
err := producer.WriteMessages(context.Background(),
kafka.Message{
Topic: topic,
Key: []byte("key1"),
Value: []byte("content_1"),
Headers: gkafka.MapToHeaders(map[string]string{"kkk": "vvv1"}),
},
kafka.Message{
Topic: topic,
Key: []byte("key2"),
Value: []byte("content_2"),
Headers: gkafka.MapToHeaders(map[string]string{"kkk": "vvv2"}),
},
kafka.Message{
Topic: topic,
Key: []byte("key3"),
Value: []byte("content_3"),
Headers: gkafka.MapToHeaders(map[string]string{"kkk": "vvv3"}),
},
)
if err != nil {
fmt.Println(err)
}
producer.Close()
consumer := gkafka.NewConsumer(&gkafka.Config{Brokers: []string{"127.0.0.1:9092"}},
topic, "example_group", 2, Handle)
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
<-quit
consumer.Stop()
}
func Handle(msg *kafka.Message) {
fmt.Println(
"Key:", string(msg.Key),
"Time:", msg.Time,
"Headers:", gkafka.HeadersToMap(msg.Headers),
"Value:", string(msg.Value),
)
}