本文介绍如何在Kafka消费中使用日志服务SPL语句。
原理
在虚拟Topic中添加SPL语句后,当Kafka消费兼容接口接收到虚拟Topic时,会使用虚拟Topic中已添加的SPL语句处理数据,最终KafkaConsumer获取经过SPL处理过的数据。
kafkaConsumer -> [虚拟Topic] -> 真实Logstore
目前仅支持使用Kafka官方SDK库(Java SDK和基于librdkafka的库)进行消费。
已知第三方库python-kafka在基于SPL进行Kafka消费时存在问题。如果您要使用Python消费,请使用Kafka官方的Python库confluent_kafka。
创建虚拟Topic
编辑config.json文件。
{ "Project" : "Project", "Endpoint" : "Endpoint", "Port" : 10012, "AccessKeyId" : "", "AccessKeySecret" : "", "Topic" : "logstore.0", "Query" : "SPL语句" }
参数
说明
Project
待消费的数据所在的Project。
Endpoint
日志服务的服务入口。更多信息,请参见服务入口。
Port
端口号,包括:
阿里云内网:10011。
公网:10012。
AccessKeyId
阿里云账号AccessKey ID。
建议您使用具备日志服务Project写入权限的RAM用户的AccessKey。授予RAM用户向指定Project写入数据权限的具体操作,请参见RAM自定义授权场景。如何获取AccessKey的具体操作,请参见访问密钥。
AccessKeySecret
阿里云账号AccessKey Secret。
建议您使用具备日志服务Project写入权限的RAM用户的AccessKey。授予RAM用户向指定Project写入数据权限的具体操作,请参见RAM自定义授权场景。如何获取AccessKey的具体操作,请参见访问密钥。
Topic
设置为
${logstore}.标识符
,其中标识符只能为0~31的数字。Query
设置为SPL语句。更多信息,请参见SPL概述。
下载kafka_admin_tool。
重要仅支持在Linux下执行。
wget https://sls-resource.oss-cn-shanghai.aliyuncs.com/tools/kafka_admin_tool
创建虚拟Topic。
./kafka_admin_tool create_virtual_topic -f config.json
查看虚拟Topic
您可以使用如下命令查看虚拟Topic。
./kafka_admin_tool info_virtual_topic -f config.json
常见问题
如何查看消费错误?
目前,遇到消费错误时,KafkaConsumer无法读取到SPL的错误信息,您需要开通日志服务的服务日志,通过详细日志来查看报错信息。更多信息,请参见开通服务日志、详细日志。
您可以在详细日志中执行如下SQL语句查找错误信息。
Logstore: 进行消费的Logstore名字 and Method: PullData and UserAgent: kafka-forwarderrdkafka
如何更新虚拟Topic的SPL语句?
目前,您需要先删除再创建Topic来完成SPL语句的更新。操作步骤如下:
停止虚拟Topic的消费者。
删除虚拟Topic。
./kafka_admin_tool delete_virtual_topic -f config.json
更新config.json文件中的SPL语句。
创建虚拟Topic。
说明重新创建虚拟Topic后,需等待10分钟,SPL语句才生效。
./kafka_admin_tool create_virtual_topic -f config.json
- 本页导读 (1)