文档

数据转发到消息队列RocketMQ

更新时间:

如果当前产品下设备与物联网平台之间的上下行消息量大于1,000 QPS,需要将设备上报至物联网平台的数据转发至您业务服务器的应用中进行消费(例如过滤、分析、存储等),可以使用云产品流转功能将物联网平台数据转发到消息队列(RocketMQ)中消费,以实现消息从设备、物联网平台、RocketMQ到应用服务器之间的全链路高可靠传输能力。本文以物模型数据上报Topic为例,介绍流转消息数据的完整流程。

工作原理

image

云产品流转可将设备消息转发到RocketMQ SDK客户端消费。上图工作流程:

  1. 业务服务器:使用RocketMQ SDK注册一个消费者,用于接收物联网平台转发到RocketMQ中的消息。

  2. 物联网平台:配置云产品流转将设备消息转发到RocketMQ。

    1. 数据源:支持的Topic类型消息,请参见数据格式(非云网关产品和设备)自定义Topic(MQTT云网关)消息转发Topic(GB/T 32960云网关)消息转发Topic(JT/T 808云网关)消息转发Topic(SL 651云网关)

    2. 数据目的:RocketMQ中接收设备消息的Topic,需要先在云消息队列RocketMQ版控制台创建。具体操作,请参见创建Topic

    3. 解析器脚本:配置通过数据流转函数writeMq(destinationId, payload, tag)将设备数据转发到RocketMQ的Topic中。

      函数说明,请参见流转数据到数据目的函数

      您可为设备消息设置属性和标签,用于后续消费者消费时指定过滤条件。

  3. 云消息队列RocketMQ版:RocketMQ SDK注册的消费者获取消息时会触发服务端的动态过滤计算,RocketMQ根据该消费者上报的过滤条件的表达式进行匹配,并将符合条件的消息投递给该消费者,实现消费者所属业务服务器接收设备上报至物联网平台的消息。

应用场景

业务服务器接收设备消息:云产品流转可以灵活地转发设备消息到消息队列RocketMQ中消费。

  • 对设备数据进行复杂或精细化处理的海量设备场景。

  • 设备消息量大于1,000 QPS的场景。

使用限制

  • 物联网平台实例及所在地域支持将数据转发到消息队列RocketMQ。支持的地域详细信息,请参见各地域功能说明

  • 目前,RocketMQ实例所在地域必须与当前物联网平台实例所在地域保持一致。存量的跨地域实例下数据流转配置不受影响,仍可继续正常流转数据。

  • 仅支持将数据流转到RocketMQ 4.x或5.x版本实例的Topic中。如果是RocketMQ 5.x版本实例的Topic,该Topic的消息类型必须为普通消息

  • 目前,新版和旧版云产品流转功能均支持将数据流转到消息队列RocketMQ。旧版云产品流转使用示例,请参见数据转发到消息队列RocketM(旧版)

前提条件

  • 已添加待转发的设备Topic数据源。例如:创建数据源DataSource,添加指定设备的物模型数据上报Topic。具体步骤,请参见添加待流转的数据源

  • 已创建消息队列(RocketMQ)实例和用于接收数据的Topic和ConsumerGroup。具体操作,请参见创建资源

  • 已在业务服务器中调用RocketMQ SDK用来消费消息。具体操作,请参见调用SDK消费消息

背景信息

当使用RocketMQ 5.0时,转发的数据目的配置完成后,会自动完成以下配置,实现设备数据通过物联网平台的规则引擎转发到消息队列RocketMQ。

  • 物联网平台占用RocketMQ实例所在虚拟交换机的2个IP地址。

  • 在RocketMQ实例所在的VPC网络下创建托管安全组,安全组名称默认以sg-nsm-开头。

步骤一:创建数据目的

  1. 登录物联网平台控制台

  2. 实例概览页签的全部环境下,找到对应的实例,单击实例卡片。

  3. 在左侧导航栏,选择消息转发 > 云产品流转

  4. 云产品流转页面,单击右上角体验新版,进入新版功能页面。

    说明

    如果您已执行过此操作,再次进入云产品流转页面,会直接进入新版功能页面。

  5. 单击数据目的页签,然后单击创建数据目的

  6. 创建数据目的对话框,输入数据目的名称,例如DataPurpose,按照以下参数说明,完成配置,然后单击确定

    数据流转到MQ

    参数

    说明

    选择操作

    选择发送数据到消息队列(RocketMQ)中

    地域

    选择RocketMQ所在地域。

    实例

    选择RocketMQ实例。

    您可以单击创建实例,跳转到消息队列控制台,创建RocketMQ实例,请参见消息队列文档

    Topic

    选择用于接收物联网平台数据的RocketMQ Topic。

    您可以单击创建Topic,跳转到消息队列控制台,创建RocketMQ Topic。

    授权

    授权物联网平台将数据写入RocketMQ。

    如您还未创建相关角色,单击创建RAM角色,跳转到RAM控制台,创建角色和授权策略,请参见创建RAM角色

步骤二:配置并启动解析器

  1. 创建解析器,例如DataParser。具体操作,请参见步骤一:创建解析器

  2. 解析器详情页面,关联数据源。

    1. 在配置向导的数据源下,单击关联数据源

    2. 在弹出的对话框中,单击数据源下拉列表,选择已创建的数据源DataSource,单击确定

  3. 解析器详情页面,关联数据目的。

    1. 单击配置向导的数据目的,然后单击数据目的列表右上方的关联数据目的

    2. 在弹出的对话框中,单击数据目的下拉列表,选择已创建的数据目的DataPurpose,单击确定

    3. 在数据目的列表,查看并保存数据目的ID,例如为1000

      后续解析脚本中,需使用此处的数据目的ID

  4. 解析器详情页面,单击解析器

  5. 在脚本输入框,输入解析脚本。

    函数参数说明,请参见函数列表

    //通过payload函数,获取设备上报的消息内容,并按照JSON格式转换。
    var data = payload("json");
    //直接流转物模型上报数据。
    writeMq(1000, data, "调试");
  6. 单击调试,根据页面提示,选择产品和设备,输入Topic和Payload数据,验证脚本可执行。

    参数示例如下:调试示例

    运行结果如下,表示脚本执行成功。

    调试结果

  7. 单击发布

  8. 回到云产品流转页面的解析器页签,单击解析器DataParser对应的启动按钮,启动解析器。

  9. RocketMQ控制台,查看是否成功接收到消息。

步骤三:查看设备消息转发的运行日志

  1. 物联网平台控制台,单击目标企业版实例,查看设备状态和消息转发日志。具体操作,请参见云端运行日志

  2. 在业务服务器中已运行的订阅RocketMQ资源的终端,查看订阅和消费消息的日志。

  3. 云消息队列RocketMQ版控制台,在实例详情页面,查看消费者接收到消息的具体内容和消息轨迹。具体操作,请参见查询消息轨迹

操作样例

通过RocketMQ客户端消费设备消息

常见问题