文档

使用Sarama-Kafka-Go实现Kafka消费

更新时间:

本文介绍如何使用Sarama-Kafka-Go实现Kafka消费。

参数说明

参数

描述

示例

brokers

初始连接的集群地址,格式为${project}.${endpoint}:${port},请根据实际的Project名称及其所在的Endpoint进行配置。更多信息,请参见服务入口

  • 阿里云VPC内网:端口号为10011。

  • 公网:端口号为10012。

etl-dev.cn-hangzhou.log.aliyuncs.com:10012

其中,etl-dev为Project名称。

version

Kafka版本。

2.1.0

groupId

消费组ID,用于指定消费者组的标识符。通过配置消费组ID,将消费组内的消费者分组,可以实现消费者组内的负载均衡,实现数据的处理和分发。

kafka-test

topics

日志服务Logstore名称。

test

conf.Net.SASL.User

日志服务Project名称。

etl-dev

conf.Net.SASL.Password

阿里云账号AccessKey。格式为{access-key-id}#{access-key-secret}。建议您使用具备日志服务Project写入权限的RAM用户的AccessKey。授予RAM用户向指定Project写入数据权限的具体操作,请参见授权。如何获取AccessKey的具体操作,请参见访问密钥

conf.Net.SASL.Mechanism

必须使用PLAIN。

PLAIN

conf.Consumer.Fetch.Min

一次请求中拉取信息的最小字节数,默认为1,单位为s。

1

conf.Consumer.Fetch.Default

一次请求中从集群拉取信息的最大字节数。

1024 * 1024

conf.Consumer.Retry.Backoff

读取分区失败后重试之前需要等待的时间,默认为2,单位为s。

2

conf.Consumer.MaxWaitTime

表示broker在Consumer.Fetch.Min字节可用之前等待的最长时间,默认为250,单位为ms。建议取值范围为[100 ms, 500 ms]

250

conf.Consumer.MaxProcessingTime

消费者期望一条消息处理的最长时间,单位为ms。

200

conf.Consumer.Offsets.AutoCommit.Enable

指定是否自动提交更新后的偏移量。true为启用,默认为true。

true

conf.Consumer.Offsets.AutoCommit.Interval

提交更新后的偏移量的频率。当conf.Consumer.Offsets.AutoCommit.Enable为false时,该参数无效,默认为1,单位为s。

1

conf.Consumer.Offsets.Initial

用于指定消费者在启动时从哪个偏移量开始消费消息。常用值为OffsetNewest和OffsetOldest,默认为OffsetNewest。

  • OffsetNewest:表示使用最新的偏移量,即从最新消息开始读取。当有已提交的偏移量时,从提交的偏移量开始消费;无提交的偏移量时,消费新产生的数据。

  • OffsetOldest:表示使用最早的偏移量,从最早的消息开始读取。当有已提交的偏移量时,从提交的偏移量开始消费;无提交的偏移量时,从头开始消费。

OffsetNewest

conf.Consumer.Offsets.Retry.Max

提交请求失败时,最多重试次数,默认为3次。

3

代码示例

package main

// SIGUSR1 toggle the pause/resume consumption
import (
	"context"
	"fmt"
	"log"
	"os"
	"os/signal"
	"strings"
	"sync"
	"syscall"
	"time"

	"github.com/Shopify/sarama"
)

func main() {

	endpoint  := "cn-beijing.log.aliyuncs.com"
	port      := "10012"
	version   := "2.1.0"
	project   := "test-project"                 
	topics    := "your sls logstore"                       
	// 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维,请登录RAM控制台创建RAM用户。
	// 此处以把AccessKey 和 AccessKeySecret 保存在环境变量为例说明。您可以根据业务需要,保存到配置文件里。
	// 强烈建议不要把 AccessKey 和 AccessKeySecret 保存到代码里,会存在密钥泄漏风险
  accessId  := os.Getenv("SLS_ACCESS_KEY_ID") 
  accessKey := os.Getenv("SLS_ACCESS_KEY_SECRET")     
	group     := "test-groupId"                

	keepRunning := true
	log.Println("Starting a new Sarama consumer")

	version, err := sarama.ParseKafkaVersion(version)
	if err != nil {
		log.Panicf("Error parsing Kafka version: %v", err)
	}

	/**
 	 * 构建一个新的Sarama配置。
	 * 在初始化消费者/生产者之前,必须定义Kafka集群版本。
	 */
	brokers := []string{fmt.Sprintf("%s.%s:%s", project, endpoint, port)}

	conf := sarama.NewConfig()
	conf.Version = version

	conf.Net.TLS.Enable = true
	conf.Net.SASL.Enable = true
	conf.Net.SASL.User = project
	conf.Net.SASL.Password = fmt.Sprintf("%s#%s", accessId, accessKey)
	conf.Net.SASL.Mechanism = "PLAIN"

	conf.Consumer.Fetch.Min = 1
	conf.Consumer.Fetch.Default = 1024 * 1024
	conf.Consumer.Retry.Backoff = 2 * time.Second
	conf.Consumer.MaxWaitTime = 250 * time.Millisecond
	conf.Consumer.MaxProcessingTime = 100 * time.Millisecond
	conf.Consumer.Return.Errors = false
	conf.Consumer.Offsets.AutoCommit.Enable = true
	conf.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second
	conf.Consumer.Offsets.Initial = sarama.OffsetOldest
	conf.Consumer.Offsets.Retry.Max = 3

	/**
 	 * 设置一个新的Sarama消费者组
 	 */
	consumer := Consumer{
		ready: make(chan bool),
	}

	ctx, cancel := context.WithCancel(context.Background())
	client, err := sarama.NewConsumerGroup(brokers, group, conf)
	if err != nil {
		log.Panicf("Error creating consumer group client: %v", err)
	}

	consumptionIsPaused := false
	wg := &sync.WaitGroup{}
	wg.Add(1)
	go func() {
		defer wg.Done()
		for {
			// `Consume`应该在一个无限循环内调用,当服务器端重新平衡时,消费者会话将需要重新创建以获取新的声明
			if err := client.Consume(ctx, strings.Split(topics, ","), &consumer); err != nil {
				log.Panicf("Error from consumer: %v", err)
			}
			// 检查上下文是否被取消,表示消费者应该停止
      if ctx.Err() != nil {
				return
			}
			consumer.ready = make(chan bool)
		}
	}()

	<-consumer.ready // 等待消费者设置完成
	log.Println("Sarama consumer up and running!...")

	sigusr1 := make(chan os.Signal, 1)
	signal.Notify(sigusr1, syscall.SIGUSR1)

	sigterm := make(chan os.Signal, 1)
	signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)

	for keepRunning {
		select {
		case <-ctx.Done():
			log.Println("terminating: context cancelled")
			keepRunning = false
		case <-sigterm:
			log.Println("terminating: via signal")
			keepRunning = false
		case <-sigusr1:
			toggleConsumptionFlow(client, &consumptionIsPaused)
		}
	}
	cancel()
	wg.Wait()
	if err = client.Close(); err != nil {
		log.Panicf("Error closing client: %v", err)
	}
}

func toggleConsumptionFlow(client sarama.ConsumerGroup, isPaused *bool) {
	if *isPaused {
		client.ResumeAll()
		log.Println("Resuming consumption")
	} else {
		client.PauseAll()
		log.Println("Pausing consumption")
	}

	*isPaused = !*isPaused
}

// Consumer表示Sarama消费者组消费者
type Consumer struct {
	ready chan bool
}

// Setup在新会话开始时运行,在ConsumeClaim之前
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
	// 将消费者标记为已准备好
	close(consumer.ready)
	return nil
}

// Cleanup在会话结束时运行,一旦所有ConsumeClaim goroutine退出
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
	return nil
}

// ConsumeClaim必须启动ConsumerGroupClaim的Messages()的消费者循环
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	// 注意:
	// 不要把下面的代码移到goroutine里。
	// ConsumeClaim本身在goroutine中调用,参见:https://github.com/Shopify/sarama/blob/main/consumer_group.go#L27-L29
	for {
		select {
		case message := <-claim.Messages():
			realUnixTimeSeconds := message.Timestamp.Unix()
			if realUnixTimeSeconds < 2000000 {
				realUnixTimeSeconds = message.Timestamp.UnixMicro() / 1000
			}

			log.Printf("Message claimed: value = %s, timestamp = %d, topic = %s", string(message.Value), realUnixTimeSeconds, message.Topic)
			session.MarkMessage(message, "")

		// 当session.Context()完成时应返回。
		// 如果不这样做,当kafka重新平衡时,会引发ErrRebalanceInProgress或read tcp <ip>:<port>: i/o timeout错误。参见:https://github.com/Shopify/sarama/issues/1192
		case <-session.Context().Done():
			return nil
		}
	}
}
  • 本页导读 (1)
文档反馈