文档

基于SPL的Kafka消费

更新时间:

本文介绍如何在Kafka消费中使用日志服务SPL语句。

原理

在虚拟Topic中添加SPL语句后,当Kafka消费兼容接口接收到虚拟Topic时,会使用虚拟Topic中已添加的SPL语句处理数据,最终KafkaConsumer获取经过SPL处理过的数据。

kafkaConsumer -> [虚拟Topic] -> 真实Logstore
重要
  • 目前仅支持使用Kafka官方SDK库(Java SDK和基于librdkafka的库)进行消费。

  • 已知第三方库python-kafka在基于SPL进行Kafka消费时存在问题。如果您要使用Python消费,请使用Kafka官方的Python库confluent_kafka。

创建虚拟Topic

  1. 编辑config.json文件。

    {
        "Project" : "Project",
        "Endpoint" : "Endpoint",
        "Port" : 10012,
        "AccessKeyId" : "",
        "AccessKeySecret" : "",
        "Topic" : "logstore.0",
        "Query" : "SPL语句"
    }

    参数

    说明

    Project

    待消费的数据所在的Project。

    Endpoint

    日志服务的服务入口。更多信息,请参见服务入口

    Port

    端口号,包括:

    • 阿里云内网:10011。

    • 公网:10012。

    AccessKeyId

    阿里云账号AccessKey ID。

    建议您使用具备日志服务Project写入权限的RAM用户的AccessKey。授予RAM用户向指定Project写入数据权限的具体操作,请参见RAM自定义授权场景。如何获取AccessKey的具体操作,请参见访问密钥

    AccessKeySecret

    阿里云账号AccessKey Secret。

    建议您使用具备日志服务Project写入权限的RAM用户的AccessKey。授予RAM用户向指定Project写入数据权限的具体操作,请参见RAM自定义授权场景。如何获取AccessKey的具体操作,请参见访问密钥

    Topic

    设置为${logstore}.标识符 ,其中标识符只能为0~31的数字。

    Query

    设置为SPL语句。更多信息,请参见SPL概述

  2. 下载kafka_admin_tool。

    重要

    仅支持在Linux下执行。

    wget https://sls-resource.oss-cn-shanghai.aliyuncs.com/tools/kafka_admin_tool
  3. 创建虚拟Topic。

    ./kafka_admin_tool create_virtual_topic -f config.json

查看虚拟Topic

您可以使用如下命令查看虚拟Topic。

./kafka_admin_tool info_virtual_topic -f config.json

常见问题

  • 如何查看消费错误?

    目前,遇到消费错误时,KafkaConsumer无法读取到SPL的错误信息,您需要开通日志服务的服务日志,通过详细日志来查看报错信息。更多信息,请参见开通服务日志详细日志

    您可以在详细日志中执行如下SQL语句查找错误信息。

    Logstore: 进行消费的Logstore名字  and Method: PullData and UserAgent: kafka-forwarderrdkafka 
  • 如何更新虚拟Topic的SPL语句?

    目前,您需要先删除再创建Topic来完成SPL语句的更新。操作步骤如下:

    1. 停止虚拟Topic的消费者。

    2. 删除虚拟Topic。

      ./kafka_admin_tool delete_virtual_topic -f config.json
    3. 更新config.json文件中的SPL语句。

    4. 创建虚拟Topic。

      说明

      重新创建虚拟Topic后,需等待10分钟,SPL语句才生效。

      ./kafka_admin_tool create_virtual_topic -f config.json
  • 本页导读 (1)
文档反馈