数据转发到消息队列RocketMQ

如果当前产品下设备与物联网平台之间的上下行消息量大于1,000 QPS,需要将设备上报至物联网平台的数据转发至您业务服务器的应用中进行消费(例如过滤、分析、存储等),可以使用云产品流转功能将物联网平台数据转发到消息队列(RocketMQ)中进行消费,以实现消息从设备、物联网平台、RocketMQ到应用服务器之间的全链路高可靠传输能力。本文介绍使用云产品流转功能将设备数据转发到RocketMQ中消费的完整流程。

工作原理

image

云产品流转可将设备消息转发到RocketMQ SDK客户端消费。

  1. 业务服务器:使用RocketMQ SDK注册一个消费者,用于接收物联网平台转发到RocketMQ中的消息。

  2. 物联网平台:配置云产品流转将设备消息转发到RocketMQ。

    • 数据源Topic:支持的Topic类型消息,请参见数据格式

    • SQL表达式:编写SQL表达式来解析和处理设备上报的JSON数据。SQL表达式的语法说明,请参见SQL表达式

      说明

      二进制格式的数据不做解析,直接透传。

    • 数据目的:RocketMQ中接收设备消息的Topic,需要先在云消息队列RocketMQ版控制台创建。具体操作,请参见创建Topic

      您可为设备消息设置标签,用于后续消费者消费时指定过滤条件。

  3. 云消息队列RocketMQ版:RocketMQ SDK注册的消费者获取消息时会触发服务端的动态过滤计算,RocketMQ根据该消费者上报的过滤条件的表达式进行匹配,并将符合条件的消息投递给该消费者,实现消费者所属业务服务器接收设备上报至物联网平台的消息。

使用限制

  • 物联网平台实例及所在地域支持将数据转发到消息队列RocketMQ。支持的地域详细信息,请参见各地域功能说明

  • 目前,RocketMQ实例所在地域必须与当前物联网平台实例所在地域保持一致。存量的跨地域实例下数据流转配置不受影响,仍可继续正常流转数据。

  • 仅支持将数据流转到RocketMQ 4.x或5.x版本实例的Topic中。如果是RocketMQ 5.x版本实例的Topic,该Topic的消息类型必须为普通消息

  • 目前,新版和旧版云产品流转功能均支持将数据流转到消息队列RocketMQ。新版云产品流转使用示例,请参见数据转发到消息队列RocketMQ(新版)

  • 云网关产品和设备,及MQTT型实例下产品和设备,不支持使用云产品流转(旧版)功能。如果流转对应产品和设备的数据,需要使用云产品流转(新版)功能。具体内容,请参见云产品流转(新版)

前提条件

  • 已创建消息队列(RocketMQ)实例和用于接收数据的Topic。RocketMQ使用方法,请参见RocketMQ快速入门

  • 已创建数据转发规则和编写处理数据的SQL,请参见设置数据流转规则

  • 已在业务服务器中调用RocketMQ SDK用来消费消息。具体操作,请参见调用SDK消费消息

背景信息

当使用RocketMQ 5.0时,转发的数据目的配置完成后,会自动完成以下配置,实现设备数据通过物联网平台的规则引擎转发到消息队列RocketMQ。

  • 物联网平台占用RocketMQ实例所在虚拟交换机的2个IP地址。

  • 在RocketMQ实例所在的VPC网络下创建托管安全组,安全组名称默认以sg-nsm-开头。

配置数据转发目的

  1. 登录物联网平台控制台

  2. 实例概览页签的全部环境下,找到对应的实例,单击实例卡片。

  3. 在左侧导航栏,选择消息转发 > 云产品流转

  4. 单击规则对应的查看,进入数据流转规则页面。

    重要

    若当前页面为云产品流转新版页面,需先单击右上角返回旧版,再单击目标规则对应的查看

  5. 单击转发数据一栏对应的添加操作

  6. 添加操作对话框中,选择操作为发送数据到消息队列(RocketMQ)中。按照界面提示,设置其他信息,单击确认

    参数

    说明

    选择操作

    选择发送数据到消息队列(RocketMQ)中

    地域

    选择RocketMQ所在地域。

    实例

    选择RocketMQ实例。

    您可以单击创建实例,跳转到消息队列控制台,创建RocketMQ实例,请参见消息队列文档

    Topic

    选择用于接收物联网平台数据的RocketMQ Topic。

    您可以单击创建Topic,跳转到消息队列控制台,创建RocketMQ Topic。

    Tag

    (可选)设置标签。

    设置标签后,所有通过该操作流转到RocketMQ对应Topic里的消息都会携带该标签。您可以在RocketMQ消息消费端,通过标签进行消息过滤。

    标签长度限制为128字节,可以输入常量或变量。变量格式为${key},代表SQL处理后的JSON数据中key对应的value值。

    授权

    授权物联网平台将数据写入RocketMQ。

    如您还未创建相关角色,单击创建RAM角色,跳转到RAM控制台,创建角色和授权策略,请参见创建RAM角色

  7. 回到云产品流转页,单击规则对应的启动按钮启动规则。

  8. 测试。

    向规则SQL中定义的Topic发布一条消息,然后查看设备消息转发的运行日志。

查看设备消息转发的运行日志

  1. 物联网平台控制台,单击目标企业版实例,查看设备状态和消息转发日志。具体操作,请参见云端运行日志

  2. 在业务服务器中已运行的订阅RocketMQ资源的终端,查看订阅和消费消息的日志。

  3. 云消息队列RocketMQ版控制台,在实例详情页面,查看消费者接收到消息的具体内容和消息轨迹。具体操作,请参见查询消息轨迹

相关文档

通过RocketMQ客户端消费设备消息

常见问题