文档

使用Franz-Kafka-Go实现Kafka的消费

更新时间:

本文介绍如何使用Franz-Kafka-Go实现Kafka消费。

参数说明

参数

描述

示例

kgo.SeedBrokers

初始连接的集群地址,格式为${project}.${endpoint}:${port},请根据实际的Project名称及其所在的Endpoint进行配置。更多信息,请参见服务入口

  • 阿里云VPC内网:端口号为10011。

  • 公网:端口号为10012。

etl-dev.cn-hangzhou.log.aliyuncs.com:10012

其中,etl-dev为Project名称。

kgo.ConsumerGroup

消费组ID,用于指定消费者组的标识符。通过配置消费组ID,将消费组内的消费者分组,可以实现消费者组内的负载均衡,实现数据的处理和分发。

kafka-test

kgo.SASL

为了保证数据传输的安全性,请使用sasl_ssl。

sasl_ssl

User

日志服务Project名称。

etl-dev

kgo.ConsumeTopics

日志服务Logstore名称。

test

Pass

阿里云账号AccessKey。格式为{access-key-id}#{access-key-secret}。建议您使用具备日志服务Project写入权限的RAM用户的AccessKey。授予RAM用户向指定Project写入数据权限的具体操作,请参见授权。如何获取AccessKey的具体操作,请参见访问密钥

kgo.Dialer

使用sasl必须设置这个参数,否则无法正常消费。

代码示例

package main

import (
	"context"
	"crypto/tls"
	"fmt"
  "os"
	"github.com/twmb/franz-go/pkg/kgo"
	"github.com/twmb/franz-go/pkg/sasl/plain"
	"net"
	"time"
)

func main() {
	project         := "etl-dev"
	logstore        := "test"
	endpoint        := "cn-hangzhou.log.aliyuncs.com"
	port            := "10012"
	groupId         := "kafka-test"
  // 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维,请登录RAM控制台创建RAM用户。
  // 此处以把AccessKey 和 AccessKeySecret 保存在环境变量为例说明。您可以根据业务需要,保存到配置文件里。
  // 强烈建议不要把 AccessKey 和 AccessKeySecret 保存到代码里,会存在密钥泄漏风险
	accessKeyID     := os.Getenv("SLS_ACCESS_KEY_ID")	
  accessKeySecret := os.Getenv("SLS_ACCESS_KEY_SECRET")

 	//使用SASL必须添加TLS拨号程序
	tlsDialer := &tls.Dialer{NetDialer: &net.Dialer{Timeout: 10 * time.Second}}
	seeds := []string{fmt.Sprintf("%s.%s:%s", project, endpoint, port)}
 
	//获取Kgo客户端
	client, err := kgo.NewClient(
		kgo.SeedBrokers(seeds...),
		kgo.ConsumerGroup(groupId),
		kgo.ConsumeTopics(logstore),
		kgo.SASL(plain.Auth{
			User: project,
			Pass: fmt.Sprintf("%s#%s", accessKeyID, accessKeySecret),
		}.AsMechanism()),
		kgo.Dialer(tlsDialer.DialContext),
	)

	if err != nil {
		panic(err)
	}
	defer client.Close()
	ctx := context.Background()

	for {
		fetches := client.PollFetches(ctx)
		if errs := fetches.Errors(); len(errs) > 0 {
			panic(fmt.Sprint(errs))
		}

		iter := fetches.RecordIter()
		for !iter.Done() {
			record := iter.Next()
			fmt.Println(string(record.Value), "from an iterator!")
		}
	}
}

  • 本页导读 (1)