文档

使用Confluent-Kafka-Go实现Kafka消费

更新时间:

本文介绍如何使用Confluent-Kafka-Go实现Kafka消费。

参数说明

参数

描述

示例

bootstrap.servers

初始连接的集群地址,格式为${project}.${endpoint}:${port},请根据实际的Project名称及其所在的Endpoint进行配置。更多信息,请参见服务入口

  • 阿里云VPC内网:端口号为10011。

  • 公网:端口号为10012。

etl-dev.cn-hangzhou.log.aliyuncs.com:10012

其中,etl-dev为Project名称。

sasl.mechanism

必须使用PLAIN。

PLAIN

security.protocol

为了保证数据传输的安全性,请使用sasl_ssl。

sasl_ssl

sasl.username

日志服务Project名称。

etl-dev

sasl.password

阿里云账号AccessKey。格式为{access-key-id}#{access-key-secret}。建议您使用具备日志服务Project写入权限的RAM用户的AccessKey。授予RAM用户向指定Project写入数据权限的具体操作,请参见授权。如何获取AccessKey的具体操作,请参见访问密钥

topic

日志服务Logstore名称。

test

group.id

消费组ID,用于指定消费者组的标识符。通过配置消费组ID,将消费组内的消费者分组,可以实现消费者组内的负载均衡,实现数据的处理和分发。

kafka-test

enable.auto.commit

是否自动提交消费点位,建议设置为true。

true

auto.commit.interval.ms

自动提交消费点位的间隔时间,建议设置为30000,单位为ms。

30000

max.poll.interval.ms

消费组在消费者发起加入组请求后,等待所有消费者加入的时间间隔。

在这个时间间隔内加入组的消费者为消费组的成员,进行分区分配,各个消费者按分配的分区开发消费数据,如果在这个时间内还有消费者没有加入消费组,则会触发消费组再平衡操作,再平衡期间不会消费数据,会导致消费延迟。建议设置max.poll.interval.ms为130000,单位为ms,保证所有消费者都能加入消费组。

重要

使用confluent这个库时需要保证max.poll.interval.ms值大于session.timeout.ms,否则无法正常消费。

130000

session.timeout.ms

心跳最大超时时间。

在该时间如果消费者没有发送心跳请求,则视为该消费者发生异常,触发消费组再平衡操作。建议设置session.timeout.ms值为120000,单位为ms。

120000

heartbeat.interval.ms

规定客户端和服务端之间心跳检测间隔时间。

heartbeat.interval.ms越小,客户端和服务端之间的心跳检测越频繁,但也会导致更多的网络流量。建议设置为5000,单位为ms。

5000

auto.offset.reset

消费起始点位,常用的值为latest和earliest,默认为latest。

  • earliest:表示使用最早的偏移量,从最早的消息开始读取。当有已提交的偏移量时,从提交的偏移量开始消费;无提交的偏移量时,从头开始消费。

  • latest:表示使用最新的偏移量,即从最新消息开始读取。当有已提交的偏移量时,从提交的偏移量开始消费;无提交的偏移量时,消费新产生的数据。

latest

代码示例

package main

import (
	"fmt"
  "os"
	"github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {
	endpoint := "cn-hangzhou.log.aliyuncs.com"
  // 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维,请登录RAM控制台创建RAM用户。
  // 此处以把AccessKey和AccessKeySecret保存在环境变量为例说明。您可以根据业务需要,保存到配置文件里。
  // 强烈建议不要把AccessKey和AccessKeySecret保存到代码里,会存在密钥泄漏风险。
	accessKeyID := os.Getenv("SLS_ACCESS_KEY_ID")
	accessKeySecret := os.Getenv("SLS_ACCESS_KEY_SECRET")
	project := "project"
	logstore := "logstore"
	port := "10012"

  // 获取Kafka消费者实例
	consumer := getKafkaConsumer(project, endpoint, port, accessKeyID, accessKeySecret)
	consumer.SubscribeTopics([]string{logstore}, nil)
	for {
		msg, err := consumer.ReadMessage(-1)
		if err == nil {
			fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
		} else {
			fmt.Printf("Consumer error: %v (%v)\n", err, msg)
		}
	}
	consumer.Close()
}

func getKafkaConsumer(project string, endpoint string, port string, accessKeyID string, accessKeySecret string) *kafka.Consumer {
	var kafkaConf = &kafka.ConfigMap{
		"bootstrap.servers":       fmt.Sprintf("%s.%s:%s", project, endpoint, port),
		"sasl.mechanism":          "PLAIN",
		"security.protocol":       "sasl_ssl",
		"sasl.username":           project,
		"sasl.password":           fmt.Sprintf("%s#%s", accessKeyID, accessKeySecret),
		"group.id":                "kafka-test",
		"enable.auto.commit":      "true",
		"auto.commit.interval.ms": 30000,
		"session.timeout.ms":      120000,
		"auto.offset.reset":       "earliest",
		"max.poll.interval.ms":    130000,
		"heartbeat.interval.ms":   5000,
	}
	consumer, err := kafka.NewConsumer(kafkaConf)
	if err != nil {
		panic(err)
	}
	fmt.Print("init kafka consumer success\n")
	return consumer
}

  • 本页导读 (1)