本文介绍如何使用Franz-Kafka-Go实现Kafka消费。
参数说明
参数 | 描述 | 示例 |
kgo.SeedBrokers | 初始连接的集群地址,格式为
| etl-dev.cn-hangzhou.log.aliyuncs.com:10012 其中,etl-dev为Project名称。 |
kgo.ConsumerGroup | 消费组ID,用于指定消费者组的标识符。通过配置消费组ID,将消费组内的消费者分组,可以实现消费者组内的负载均衡,实现数据的处理和分发。 | kafka-test |
kgo.SASL | 为了保证数据传输的安全性,请使用sasl_ssl。 | sasl_ssl |
User | 日志服务Project名称。 | etl-dev |
kgo.ConsumeTopics | 日志服务Logstore名称。 | test |
Pass | 阿里云账号AccessKey。格式为 | 无 |
kgo.Dialer | 使用sasl必须设置这个参数,否则无法正常消费。 | 无 |
代码示例
package main
import (
"context"
"crypto/tls"
"fmt"
"os"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/sasl/plain"
"net"
"time"
)
func main() {
project := "etl-dev"
logstore := "test"
endpoint := "cn-hangzhou.log.aliyuncs.com"
port := "10012"
groupId := "kafka-test"
// 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维,请登录RAM控制台创建RAM用户。
// 此处以把AccessKey 和 AccessKeySecret 保存在环境变量为例说明。您可以根据业务需要,保存到配置文件里。
// 强烈建议不要把 AccessKey 和 AccessKeySecret 保存到代码里,会存在密钥泄漏风险
accessKeyID := os.Getenv("SLS_ACCESS_KEY_ID")
accessKeySecret := os.Getenv("SLS_ACCESS_KEY_SECRET")
//使用SASL必须添加TLS拨号程序
tlsDialer := &tls.Dialer{NetDialer: &net.Dialer{Timeout: 10 * time.Second}}
seeds := []string{fmt.Sprintf("%s.%s:%s", project, endpoint, port)}
//获取Kgo客户端
client, err := kgo.NewClient(
kgo.SeedBrokers(seeds...),
kgo.ConsumerGroup(groupId),
kgo.ConsumeTopics(logstore),
kgo.SASL(plain.Auth{
User: project,
Pass: fmt.Sprintf("%s#%s", accessKeyID, accessKeySecret),
}.AsMechanism()),
kgo.Dialer(tlsDialer.DialContext),
)
if err != nil {
panic(err)
}
defer client.Close()
ctx := context.Background()
for {
fetches := client.PollFetches(ctx)
if errs := fetches.Errors(); len(errs) > 0 {
panic(fmt.Sprint(errs))
}
iter := fetches.RecordIter()
for !iter.Done() {
record := iter.Next()
fmt.Println(string(record.Value), "from an iterator!")
}
}
}
反馈
- 本页导读 (1)