Golang📌常用包📌kafka-go.txt
"github.com/segmentio/kafka-go"用于连接和操作kafka

"github.com/segmentio/kafka-go/sasl/scram"提供了sasl认证方法
func Mechanism(algo Algorithm, username, password string) (sasl.Mechanism, error)
algo支持SHA256和SHA512

========== ========== ========== ========== ==========

Client是一个高级API,用于与Kafka的broker交互,可并发安全使用,首次使用后不可更改配置。

type Client struct {
	Addr net.Addr // 指定kafka集群地址,也可以在每个请求中单独指定(优先级更高)
	Timeout time.Duration // 为0默认无超时
	Transport RoundTripper // 为nil时使用kafka.DefaultTransport
}

func (c *Client) Metadata(ctx context.Context, req *MetadataRequest) (*MetadataResponse, error)
func (c *Client) OffsetFetch(ctx context.Context, req *OffsetFetchRequest) (*OffsetFetchResponse, error) 
func (c *Client) ListOffsets(ctx context.Context, req *ListOffsetsRequest) (*ListOffsetsResponse, error)
func (c *Client) CreateTopics(ctx context.Context, req *CreateTopicsRequest) (*CreateTopicsResponse, error)
func (c *Client) DeleteTopics(ctx context.Context, req *DeleteTopicsRequest) (*DeleteTopicsResponse, error) 

生产环境的Topic应通过管理平台创建(明确分区数、副本数、清理策略等)。

========== ========== ========== ========== ==========

Writer可并发安全调用,配置在首次使用后不应再修改

type Writer struct {
	Addr net.Addr // 使用 kafka.TCP(address ...string)
	Topic string
	Balancer Balancer // 默认round-robin,可设为&kafka.Hash{}
	MaxAttempts int // The default is to try at most 10 times.
	WriteBackoffMin time.Duration // Default: 100ms
	WriteBackoffMax time.Duration // Default: 1s
	BatchSize int // default 100 messages.
	BatchBytes int64 // default 1048576
	BatchTimeout time.Duration // The default is to flush at least every second.
	ReadTimeout time.Duration // Defaults to 10 seconds.
	WriteTimeout time.Duration // Defaults to 10 seconds.
	RequiredAcks RequiredAcks // Defaults to RequireNone. 可设置为RequireOne、RequireAll
	Transport RoundTripper // If nil, DefaultTransport is used.
}
Topic指将要发送消息到主题名,与每条Message设置Topic为互斥选项。
若在此处设置了Topic,就不得为任何要发送的消息(Message)再设置Topic。
反之,若未在此处设置Topic,则每条消息(Message)都必须指定Topic。
单一主题的程序可在此处设置topic,多主题建议此处不设置topic由消息投递时指定。

========== ========== ========== ========== ==========

func NewReader(config ReaderConfig) *Reader

type ReaderConfig struct {
	Brokers []string
	GroupID string // 与Partition互斥,设置后需搭配Topic(单主题)或 GroupTopics(多主题)
	GroupTopics []string
	Topic string
	Partition int // 必须与Topic搭配使用
	Dialer *Dialer
	QueueCapacity int // 缓存消费消息,defaults 100
	MinBytes int // 单次拉取的最小字节数,Default: 1
	MaxBytes int //单次拉取的最大字节数,需覆盖最大单条消息大小,Default: 1MB
	MaxWait time.Duration // 拉取消息的最大等待时间,Default: 10s
	ReadBatchTimeout time.Duration // 从内部批量消息中读取的超时时间,Default: 10s
	ReadLagInterval time.Duration // 消费延迟(lag)的更新频率,设为负数则禁用延迟上报
	GroupBalancers []GroupBalancer // Default: [Range, RoundRobin], Only used when GroupID is set
	HeartbeatInterval time.Duration // Default: 3s, Only used when GroupID is set
	CommitInterval time.Duration // Default: 0, Only used when GroupID is set
	PartitionWatchInterval time.Duration // Default: 5s, Only used when GroupID is set and WatchPartitionChanges is set.
	WatchPartitionChanges bool
	SessionTimeout time.Duration // Default: 30s, Only used when GroupID is set
	RebalanceTimeout time.Duration // Default: 30s, Only used when GroupID is set
	JoinGroupBackoff time.Duration // Default: 5s
	RetentionTime time.Duration // Default: -1, Only used when GroupID is set
	StartOffset int64 //Default: FirstOffset, Only used when GroupID is set
	ReadBackoffMin time.Duration // Default: 100ms
	ReadBackoffMax time.Duration // Default: 1s
	MaxAttempts int // The default is to try 3 times.
}

读取并返回下一条消息。调用该方法后会阻塞,直到有消息可用或发生错误。
func (r *Reader) ReadMessage(ctx context.Context) (Message, error)
func (r *Reader) FetchMessage(ctx context.Context) (Message, error)
使用消费者组时,FetchMessage不会自动提交偏移量,需通过CommitMessages方法手动提交;
而ReadMessage会自动提交偏移量(先调用FetchMessage后随即调用CommitMessages,再返回Message)。
若需要对偏移量的提交时机进行更精细的控制,应采用FetchMessage配合CommitMessages组合实现。

提交消息列表对应的偏移量
func (r *Reader) CommitMessages(ctx context.Context, msgs ...Message) error

========== ========== Client使用示例 ========== ==========

func ExampleClient(brokers []string, username, password string) {
	client := &kafka.Client{
		Addr:    kafka.TCP(brokers...),
		Timeout: 10 * time.Second,
	}
	if username != "" {
		sm, err := scram.Mechanism(scram.SHA512, username, password)
		if err != nil {
			log.Fatal("scram.Mechanism.error: ", err)
		}
		client.Transport = &kafka.Transport{
			TLS:  &tls.Config{},
			SASL: sm,
		}
	}

	createTopicResp, err := client.CreateTopics(context.Background(), &kafka.CreateTopicsRequest{
		Addr: nil, // 若指定则优先使用,否则使用client.Addr
		Topics: []kafka.TopicConfig{
			{
				Topic:             "example_topic",
				NumPartitions:     4,
				ReplicationFactor: 1,
			},
			{
				Topic:             "test_topic",
				NumPartitions:     4,
				ReplicationFactor: 1,
			},
		},
		ValidateOnly: false, // 是否只验证不创建
	})
	if err != nil {
		log.Println("client.CreateTopics.error: ", err)
	}
	log.Printf("client.CreateTopics.response: %+v\n", createTopicResp)

	deleteTopicResp, err := client.DeleteTopics(context.Background(), &kafka.DeleteTopicsRequest{
		Addr:   nil, // 若指定则优先使用,否则使用client.Addr
		Topics: []string{"example_topic", "test_topic"},
	})
	if err != nil {
		log.Println("client.DeleteTopics.error: ", err)
	}
	log.Printf("client.DeleteTopics.response: %+v\n", deleteTopicResp)
}

========== ========== 生产者消费者常用方法 ========== ==========

package gkafka

import (
	"context"
	"crypto/tls"
	"github.com/segmentio/kafka-go"
	"github.com/segmentio/kafka-go/sasl/scram"
	"log"
	"sync"
	"time"
)

type Config struct {
	Brokers            []string
	Username, Password string
}

// NewProducer 创建通用生产者,多数场景优先使用单个Writer处理多Topic
func NewProducer(cfg *Config) *kafka.Writer {
	w := &kafka.Writer{
		Addr:         kafka.TCP(cfg.Brokers...),
		Balancer:     &kafka.Hash{},
		RequiredAcks: kafka.RequireOne,
	}
	if cfg.Username != "" {
		sm, err := scram.Mechanism(scram.SHA512, cfg.Username, cfg.Password)
		if err != nil {
			log.Fatal("error: ", err)
		}
		w.Transport = &kafka.Transport{
			TLS:  &tls.Config{},
			SASL: sm,
		}
	}
	return w
}

type Consumer struct {
	stop context.CancelFunc
	wg   sync.WaitGroup
}

// NewConsumer 可并发启动多个消费者,支持优雅退出,同一topic+group下多实例concurrency总数宜<=partitions
func NewConsumer(cfg *Config, topic, groupID string, concurrency int, handler func(msg *kafka.Message)) *Consumer {
	dialer := &kafka.Dialer{
		Timeout:   10 * time.Second,
		DualStack: true,
	}
	if cfg.Username != "" {
		sm, err := scram.Mechanism(scram.SHA512, cfg.Username, cfg.Password)
		if err != nil {
			log.Fatal("error: ", err)
		}
		dialer.TLS = &tls.Config{}
		dialer.SASLMechanism = sm
	}
	ctx, cancel := context.WithCancel(context.Background())
	c := &Consumer{stop: cancel}
	for range concurrency {
		c.wg.Add(1)
		go func() {
			defer c.wg.Done()
			r := kafka.NewReader(kafka.ReaderConfig{
				Brokers: cfg.Brokers,
				GroupID: groupID,
				Topic:   topic,
				Dialer:  dialer,
			})
			for {
				select {
				case <-ctx.Done():
					r.Close()
					return
				default:
					msg, err := r.FetchMessage(ctx)
					if err == nil {
						handler(&msg)
						if groupID != "" {
							r.CommitMessages(context.Background(), msg) // 使用新的context避免ctx取消后导致commit失败
						}
					}
				}
			}
		}()
	}
	return c
}

func (c *Consumer) Stop() {
	c.stop()
	c.wg.Wait()
}

func MapToHeaders(m map[string]string) []kafka.Header {
	headers := make([]kafka.Header, 0, len(m))
	for k, v := range m {
		headers = append(headers, kafka.Header{
			Key:   k,
			Value: []byte(v),
		})
	}
	return headers
}

func HeadersToMap(headers []kafka.Header) map[string]string {
	m := make(map[string]string, len(headers))
	for _, h := range headers {
		m[h.Key] = string(h.Value)
	}
	return m
}

========== ========== ========== ========== ==========

const topic = "example_topic"

func main() {
	producer := gkafka.NewProducer(&gkafka.Config{Brokers: []string{"127.0.0.1:9092"}})
	err := producer.WriteMessages(context.Background(),
		kafka.Message{
			Topic:   topic,
			Key:     []byte("key1"),
			Value:   []byte("content_1"),
			Headers: gkafka.MapToHeaders(map[string]string{"kkk": "vvv1"}),
		},
		kafka.Message{
			Topic:   topic,
			Key:     []byte("key2"),
			Value:   []byte("content_2"),
			Headers: gkafka.MapToHeaders(map[string]string{"kkk": "vvv2"}),
		},
		kafka.Message{
			Topic:   topic,
			Key:     []byte("key3"),
			Value:   []byte("content_3"),
			Headers: gkafka.MapToHeaders(map[string]string{"kkk": "vvv3"}),
		},
	)
	if err != nil {
		fmt.Println(err)
	}
	producer.Close()

	consumer := gkafka.NewConsumer(&gkafka.Config{Brokers: []string{"127.0.0.1:9092"}},
		topic, "example_group", 2, Handle)
	quit := make(chan os.Signal, 1)
	signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
	<-quit
	consumer.Stop()
}

func Handle(msg *kafka.Message) {
	fmt.Println(
		"Key:", string(msg.Key),
		"Time:", msg.Time,
		"Headers:", gkafka.HeadersToMap(msg.Headers),
		"Value:", string(msg.Value),
	)
}