本文介绍如何使用Confluent-Kafka-Go实现Kafka消费。
参数说明
参数 | 描述 | 示例 |
bootstrap.servers | 初始连接的集群地址,格式为
| etl-dev.cn-hangzhou.log.aliyuncs.com:10012 其中,etl-dev为Project名称。 |
sasl.mechanism | 必须使用PLAIN。 | PLAIN |
security.protocol | 为了保证数据传输的安全性,请使用sasl_ssl。 | sasl_ssl |
sasl.username | 日志服务Project名称。 | etl-dev |
sasl.password | 阿里云账号AccessKey。格式为 | 无 |
topic | 日志服务Logstore名称。 | test |
group.id | 消费组ID,用于指定消费者组的标识符。通过配置消费组ID,将消费组内的消费者分组,可以实现消费者组内的负载均衡,实现数据的处理和分发。 | kafka-test |
enable.auto.commit | 是否自动提交消费点位,建议设置为true。 | true |
auto.commit.interval.ms | 自动提交消费点位的间隔时间,建议设置为30000,单位为ms。 | 30000 |
max.poll.interval.ms | 消费组在消费者发起加入组请求后,等待所有消费者加入的时间间隔。 在这个时间间隔内加入组的消费者为消费组的成员,进行分区分配,各个消费者按分配的分区开发消费数据,如果在这个时间内还有消费者没有加入消费组,则会触发消费组再平衡操作,再平衡期间不会消费数据,会导致消费延迟。建议设置max.poll.interval.ms为130000,单位为ms,保证所有消费者都能加入消费组。 重要 使用confluent这个库时需要保证max.poll.interval.ms值大于session.timeout.ms,否则无法正常消费。 | 130000 |
session.timeout.ms | 心跳最大超时时间。 在该时间如果消费者没有发送心跳请求,则视为该消费者发生异常,触发消费组再平衡操作。建议设置session.timeout.ms值为120000,单位为ms。 | 120000 |
heartbeat.interval.ms | 规定客户端和服务端之间心跳检测间隔时间。 heartbeat.interval.ms越小,客户端和服务端之间的心跳检测越频繁,但也会导致更多的网络流量。建议设置为5000,单位为ms。 | 5000 |
auto.offset.reset | 消费起始点位,常用的值为latest和earliest,默认为latest。
| latest |
代码示例
package main
import (
"fmt"
"os"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
endpoint := "cn-hangzhou.log.aliyuncs.com"
// 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维,请登录RAM控制台创建RAM用户。
// 此处以把AccessKey和AccessKeySecret保存在环境变量为例说明。您可以根据业务需要,保存到配置文件里。
// 强烈建议不要把AccessKey和AccessKeySecret保存到代码里,会存在密钥泄漏风险。
accessKeyID := os.Getenv("SLS_ACCESS_KEY_ID")
accessKeySecret := os.Getenv("SLS_ACCESS_KEY_SECRET")
project := "project"
logstore := "logstore"
port := "10012"
// 获取Kafka消费者实例
consumer := getKafkaConsumer(project, endpoint, port, accessKeyID, accessKeySecret)
consumer.SubscribeTopics([]string{logstore}, nil)
for {
msg, err := consumer.ReadMessage(-1)
if err == nil {
fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
} else {
fmt.Printf("Consumer error: %v (%v)\n", err, msg)
}
}
consumer.Close()
}
func getKafkaConsumer(project string, endpoint string, port string, accessKeyID string, accessKeySecret string) *kafka.Consumer {
var kafkaConf = &kafka.ConfigMap{
"bootstrap.servers": fmt.Sprintf("%s.%s:%s", project, endpoint, port),
"sasl.mechanism": "PLAIN",
"security.protocol": "sasl_ssl",
"sasl.username": project,
"sasl.password": fmt.Sprintf("%s#%s", accessKeyID, accessKeySecret),
"group.id": "kafka-test",
"enable.auto.commit": "true",
"auto.commit.interval.ms": 30000,
"session.timeout.ms": 120000,
"auto.offset.reset": "earliest",
"max.poll.interval.ms": 130000,
"heartbeat.interval.ms": 5000,
}
consumer, err := kafka.NewConsumer(kafkaConf)
if err != nil {
panic(err)
}
fmt.Print("init kafka consumer success\n")
return consumer
}
- 本页导读 (1)