您可以使用规则引擎,将物联网平台数据转发到消息队列(Kafka)中存储,从而实现消息从设备、物联网平台、Kafka到应用服务器之间的全链路高可靠传输能力。本文以物模型数据上报Topic为例,介绍流转消息数据的完整流程。

前提条件

  • 已创建数据源DataSource,并添加物模型数据上报Topic。具体步骤,请参见添加待流转的数据源
  • 已创建消息队列(Kafka)实例和用于接收数据的Topic。Kafka使用方法,请参见Kafka快速入门
    重要 Kafka实例所在地域必须与物联网平台服务的当前实例所在地域一致。
  • 当前阿里云账号已添加白名单权限,支持将数据转发到消息队列Kafka。您可提交工单申请开通白名单权限。

背景信息

转发的数据目的配置完成后,会自动完成以下配置,实现设备数据通过物联网平台的规则引擎转发到消息队列(Kafka)。

  • 物联网平台占用Kafka实例所在虚拟交换机的2个IP地址。
  • 在Kafka实例所在的VPC网络下创建托管安全组,安全组名称默认以sg-nsm-开头。

创建数据目的

  1. 登录物联网平台控制台
  2. 实例概览页面,选择目标环境,找到对应的实例,单击实例ID或备注名称。
    重要 目前仅开通企业版实例服务的地域下,执行此步骤。其他地域,请跳过此步骤。地域及实例的支持说明,请参见实例概述
    实例概览
  3. 在左侧导航栏,选择消息转发 > 云产品流转
  4. 可选:云产品流转页面,单击右上角体验新版,进入新版功能页面。
    说明 如果您已执行过此操作,再次进入云产品流转页面,会直接进入新版功能页面。
  5. 单击数据目的页签,然后单击创建数据目的
  6. 创建数据目的对话框,输入数据目的名称,例如DataPurpose,按照以下参数说明,完成配置,然后单击确定
    参数描述
    选择操作选择发送数据到消息队列(Kafka)中
    角色授权物联网平台将数据写入Kafka。

    如您还未创建相关角色,单击创建RAM角色,跳转到RAM控制台,创建角色和授权策略,请参见创建RAM角色

    地域固定为您物联网平台实例所在地域。
    实例选择Kafka实例。

    您可以单击创建实例,跳转到消息队列控制台,创建Kafka实例。具体操作,请参见创建实例

    Topic选择用于接收物联网平台数据的Kafka Topic。

    您可以单击创建Topic,跳转到消息队列控制台,创建Kafka Topic。具体操作,请参见创建Topic

配置并启动解析器

  1. 创建解析器,例如DataParser。具体操作,请参见创建解析器
  2. 解析器详情页面,关联数据源。
    1. 在配置向导的数据源下,单击关联数据源
    2. 在弹出的对话框中,单击数据源下拉列表,选择已创建的数据源DataSource,单击确定
  3. 解析器详情页面,关联数据目的。
    1. 单击配置向导的数据目的,然后单击数据目的列表右上方的关联数据目的
    2. 在弹出的对话框中,单击数据目的下拉列表,选择已创建的数据目的DataPurpose,单击确定
    3. 在数据目的列表,查看并保存数据目的ID,例如为1000
      后续解析脚本中,需使用此处的数据目的ID
  4. 解析器详情页面,单击解析器
  5. 在脚本输入框,输入解析脚本。脚本编辑方法,请参见脚本示例
    函数参数说明,请参见函数列表
    //通过payload函数,获取设备上报的消息内容,并按照JSON格式转换。
    var data = payload("json");
    //直接流转物模型上报数据。
    writeKafka(1000, data, "调试");
  6. 单击调试,根据页面提示,选择产品和设备,输入Topic和Payload数据,验证脚本可执行。
    参数示例如下:调试示例

    运行结果如下,表示脚本执行成功。

    action:
        transmit to kafka[destinationId=1000], data:{"deviceType":"CustomCategory","iotId":"JCp9u***","requestId":"1626948228247","checkFailedData":{},"productKey":"a1o***","gmtCreate":1626948134445,"deviceName":"Device1","items":{"Temperature":{"time":1626948134319,"value":38},"Humidity":{"time":1626948134319,"value":25}}}
    variables:
        data : {"deviceType":"CustomCategory","iotId":"JCp9u***","requestId":"1626948228247","checkFailedData":{},"productKey":"a1o***","gmtCreate":1626948134445,"deviceName":"Device1","items":{"Temperature":{"time":1626948134319,"value":38},"Humidity":{"time":1626948134319,"value":25}}}
  7. 单击发布
  8. 回到云产品流转页面的解析器页签,单击解析器DataParser对应的启动按钮,启动解析器。
  9. 消息队列Kafka版控制台对应实例的Topic详情页面,查询流转的消息。
    消息