数据转发到消息队列Kafka

您可以使用规则引擎,将物联网平台数据转发到消息队列(Kafka)中存储,从而实现消息从设备、物联网平台、Kafka到应用服务器之间的全链路高可靠传输能力。本文以物模型数据上报Topic为例,介绍流转消息数据的完整流程。

使用限制

  • 不支持 serverless kafka。

前提条件

  • 已添加待转发的设备Topic数据源。例如:创建数据源DataSource,添加指定设备的物模型数据上报Topic。具体步骤,请参见添加待流转的数据源

  • 已创建消息队列(Kafka)实例和用于接收数据的Topic。Kafka使用方法,请参见Kafka快速入门

    重要

    Kafka实例所在地域必须与物联网平台服务的当前实例所在地域一致。

  • 当前阿里云账号已添加白名单权限,支持将数据转发到消息队列Kafka。

背景信息

转发的数据目的配置完成后,会自动完成以下配置,实现设备数据通过物联网平台的规则引擎转发到消息队列(Kafka)。

  • 物联网平台占用Kafka实例所在虚拟交换机的2IP地址。

  • Kafka实例所在的VPC网络下创建托管安全组,安全组名称默认以sg-nsm-开头。

创建数据目的

  1. 登录物联网平台控制台

  2. 实例概览页签的全部环境下,找到对应的实例,单击实例卡片。

  3. 在左侧导航栏,选择消息转发 > 云产品流转

  4. 云产品流转页面,单击右上角体验新版,进入新版功能页面。

    说明

    如果您已执行过此操作,再次进入云产品流转页面,会直接进入新版功能页面。

  5. 单击数据目的页签,然后单击创建数据目的
  6. 创建数据目的对话框,输入数据目的名称,例如DataPurpose,按照以下参数说明,完成配置,然后单击确定

    参数

    描述

    选择操作

    选择发送数据到消息队列(Kafka)中

    角色

    授权物联网平台将数据写入Kafka。

    如您还未创建相关角色,单击创建RAM角色,跳转到RAM控制台,创建角色和授权策略,请参见创建RAM角色

    地域

    固定为您物联网平台实例所在地域。

    实例

    选择Kafka实例。

    您可以单击创建实例,跳转到消息队列控制台,创建Kafka实例。具体操作,请参见创建实例

    Topic

    选择用于接收物联网平台数据的Kafka Topic。

    您可以单击创建Topic,跳转到消息队列控制台,创建Kafka Topic。具体操作,请参见创建Topic

配置并启动解析器

  1. 创建解析器,例如DataParser。具体操作,请参见步骤一:创建解析器
  2. 解析器详情页面,关联数据源。
    1. 在配置向导的数据源下,单击关联数据源
    2. 在弹出的对话框中,单击数据源下拉列表,选择已创建的数据源DataSource,单击确定
  3. 解析器详情页面,关联数据目的。
    1. 单击配置向导的数据目的,然后单击数据目的列表右上方的关联数据目的
    2. 在弹出的对话框中,单击数据目的下拉列表,选择已创建的数据目的DataPurpose,单击确定
    3. 在数据目的列表,查看并保存数据目的ID,例如为1000
      后续解析脚本中,需使用此处的数据目的ID
  4. 解析器详情页面,单击解析器
  5. 在脚本输入框,输入解析脚本。

    函数参数说明,请参见函数列表

    //通过payload函数,获取设备上报的消息内容,并按照JSON格式转换。
    var data = payload("json");
    //直接流转物模型上报数据。
    writeKafka(1000, data, "调试");
  6. 单击调试,根据页面提示,选择产品和设备,输入TopicPayload数据,验证脚本可执行。

    参数示例如下:调试示例

    运行结果如下,表示脚本执行成功。

    action:
        transmit to kafka[destinationId=1000], data:{"deviceType":"CustomCategory","iotId":"JCp9u***","requestId":"1626948228247","checkFailedData":{},"productKey":"a1o***","gmtCreate":1626948134445,"deviceName":"Device1","items":{"Temperature":{"time":1626948134319,"value":38},"Humidity":{"time":1626948134319,"value":25}}}
    variables:
        data : {"deviceType":"CustomCategory","iotId":"JCp9u***","requestId":"1626948228247","checkFailedData":{},"productKey":"a1o***","gmtCreate":1626948134445,"deviceName":"Device1","items":{"Temperature":{"time":1626948134319,"value":38},"Humidity":{"time":1626948134319,"value":25}}}
  7. 单击发布

  8. 回到云产品流转页面的解析器页签,单击解析器DataParser对应的启动按钮,启动解析器。
  9. 云消息队列 Kafka 版控制台对应实例的Topic详情页面,查询流转的消息。
    消息