本文介绍如何使用Sarama-Kafka-Go实现Kafka消费。
参数说明
参数 | 描述 | 示例 |
brokers | 初始连接的集群地址,格式为
| etl-dev.cn-hangzhou.log.aliyuncs.com:10012 其中,etl-dev为Project名称。 |
version | Kafka版本。 | 2.1.0 |
groupId | 消费组ID,用于指定消费者组的标识符。通过配置消费组ID,将消费组内的消费者分组,可以实现消费者组内的负载均衡,实现数据的处理和分发。 | kafka-test |
topics | 日志服务Logstore名称。 | test |
conf.Net.SASL.User | 日志服务Project名称。 | etl-dev |
conf.Net.SASL.Password | 阿里云账号AccessKey。格式为 | 无 |
conf.Net.SASL.Mechanism | 必须使用PLAIN。 | PLAIN |
conf.Consumer.Fetch.Min | 一次请求中拉取信息的最小字节数,默认为1,单位为s。 | 1 |
conf.Consumer.Fetch.Default | 一次请求中从集群拉取信息的最大字节数。 | 1024 * 1024 |
conf.Consumer.Retry.Backoff | 读取分区失败后重试之前需要等待的时间,默认为2,单位为s。 | 2 |
conf.Consumer.MaxWaitTime | 表示broker在Consumer.Fetch.Min字节可用之前等待的最长时间,默认为250,单位为ms。建议取值范围为[100 ms, 500 ms] | 250 |
conf.Consumer.MaxProcessingTime | 消费者期望一条消息处理的最长时间,单位为ms。 | 200 |
conf.Consumer.Offsets.AutoCommit.Enable | 指定是否自动提交更新后的偏移量。true为启用,默认为true。 | true |
conf.Consumer.Offsets.AutoCommit.Interval | 提交更新后的偏移量的频率。当conf.Consumer.Offsets.AutoCommit.Enable为false时,该参数无效,默认为1,单位为s。 | 1 |
conf.Consumer.Offsets.Initial | 用于指定消费者在启动时从哪个偏移量开始消费消息。常用值为OffsetNewest和OffsetOldest,默认为OffsetNewest。
| OffsetNewest |
conf.Consumer.Offsets.Retry.Max | 提交请求失败时,最多重试次数,默认为3次。 | 3 |
代码示例
package main
// SIGUSR1 toggle the pause/resume consumption
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"time"
"github.com/Shopify/sarama"
)
func main() {
endpoint := "cn-beijing.log.aliyuncs.com"
port := "10012"
version := "2.1.0"
project := "test-project"
topics := "your sls logstore"
// 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维,请登录RAM控制台创建RAM用户。
// 此处以把AccessKey 和 AccessKeySecret 保存在环境变量为例说明。您可以根据业务需要,保存到配置文件里。
// 强烈建议不要把 AccessKey 和 AccessKeySecret 保存到代码里,会存在密钥泄漏风险
accessId := os.Getenv("SLS_ACCESS_KEY_ID")
accessKey := os.Getenv("SLS_ACCESS_KEY_SECRET")
group := "test-groupId"
keepRunning := true
log.Println("Starting a new Sarama consumer")
version, err := sarama.ParseKafkaVersion(version)
if err != nil {
log.Panicf("Error parsing Kafka version: %v", err)
}
/**
* 构建一个新的Sarama配置。
* 在初始化消费者/生产者之前,必须定义Kafka集群版本。
*/
brokers := []string{fmt.Sprintf("%s.%s:%s", project, endpoint, port)}
conf := sarama.NewConfig()
conf.Version = version
conf.Net.TLS.Enable = true
conf.Net.SASL.Enable = true
conf.Net.SASL.User = project
conf.Net.SASL.Password = fmt.Sprintf("%s#%s", accessId, accessKey)
conf.Net.SASL.Mechanism = "PLAIN"
conf.Consumer.Fetch.Min = 1
conf.Consumer.Fetch.Default = 1024 * 1024
conf.Consumer.Retry.Backoff = 2 * time.Second
conf.Consumer.MaxWaitTime = 250 * time.Millisecond
conf.Consumer.MaxProcessingTime = 100 * time.Millisecond
conf.Consumer.Return.Errors = false
conf.Consumer.Offsets.AutoCommit.Enable = true
conf.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second
conf.Consumer.Offsets.Initial = sarama.OffsetOldest
conf.Consumer.Offsets.Retry.Max = 3
/**
* 设置一个新的Sarama消费者组
*/
consumer := Consumer{
ready: make(chan bool),
}
ctx, cancel := context.WithCancel(context.Background())
client, err := sarama.NewConsumerGroup(brokers, group, conf)
if err != nil {
log.Panicf("Error creating consumer group client: %v", err)
}
consumptionIsPaused := false
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for {
// `Consume`应该在一个无限循环内调用,当服务器端重新平衡时,消费者会话将需要重新创建以获取新的声明
if err := client.Consume(ctx, strings.Split(topics, ","), &consumer); err != nil {
log.Panicf("Error from consumer: %v", err)
}
// 检查上下文是否被取消,表示消费者应该停止
if ctx.Err() != nil {
return
}
consumer.ready = make(chan bool)
}
}()
<-consumer.ready // 等待消费者设置完成
log.Println("Sarama consumer up and running!...")
sigusr1 := make(chan os.Signal, 1)
signal.Notify(sigusr1, syscall.SIGUSR1)
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
for keepRunning {
select {
case <-ctx.Done():
log.Println("terminating: context cancelled")
keepRunning = false
case <-sigterm:
log.Println("terminating: via signal")
keepRunning = false
case <-sigusr1:
toggleConsumptionFlow(client, &consumptionIsPaused)
}
}
cancel()
wg.Wait()
if err = client.Close(); err != nil {
log.Panicf("Error closing client: %v", err)
}
}
func toggleConsumptionFlow(client sarama.ConsumerGroup, isPaused *bool) {
if *isPaused {
client.ResumeAll()
log.Println("Resuming consumption")
} else {
client.PauseAll()
log.Println("Pausing consumption")
}
*isPaused = !*isPaused
}
// Consumer表示Sarama消费者组消费者
type Consumer struct {
ready chan bool
}
// Setup在新会话开始时运行,在ConsumeClaim之前
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
// 将消费者标记为已准备好
close(consumer.ready)
return nil
}
// Cleanup在会话结束时运行,一旦所有ConsumeClaim goroutine退出
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}
// ConsumeClaim必须启动ConsumerGroupClaim的Messages()的消费者循环
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
// 注意:
// 不要把下面的代码移到goroutine里。
// ConsumeClaim本身在goroutine中调用,参见:https://github.com/Shopify/sarama/blob/main/consumer_group.go#L27-L29
for {
select {
case message := <-claim.Messages():
realUnixTimeSeconds := message.Timestamp.Unix()
if realUnixTimeSeconds < 2000000 {
realUnixTimeSeconds = message.Timestamp.UnixMicro() / 1000
}
log.Printf("Message claimed: value = %s, timestamp = %d, topic = %s", string(message.Value), realUnixTimeSeconds, message.Topic)
session.MarkMessage(message, "")
// 当session.Context()完成时应返回。
// 如果不这样做,当kafka重新平衡时,会引发ErrRebalanceInProgress或read tcp <ip>:<port>: i/o timeout错误。参见:https://github.com/Shopify/sarama/issues/1192
case <-session.Context().Done():
return nil
}
}
}
- 本页导读 (1)