配置AMQP服务端订阅

重要

本文中含有需要您注意的重要提示信息,忽略该信息可能对您的业务造成影响,请务必仔细阅读。

当需要实时获取设备上报的数据时,如果使用云端API只能获取物模型数据,而且可能无法实时获取,使用基于AMQP 1.0版协议的AMQP服务端订阅功能,可以在业务服务器实时、可靠地获取设备上报消息。本文为您介绍配置AMQP服务端订阅的操作步骤。

工作原理

image

AMQP服务端订阅会将同一产品所有设备的指定类型消息,实时转发到一个或多个消费组中,每个消费组中包括多个消费者即AMQP客户端。每条消息转发到消费组时,消费组中随机一个消费者收到消息,不同消费组通过消费组ID区分。在上图中:

  • 订阅关系1:将产品1所有设备的消息,转发到消费组1和消费组2。

  • 订阅关系2:将产品2所有设备的消息,转发到消费组2。

配置AMQP服务端订阅后,物联网平台会自动将设备消息转发到AMQP客户端,不需要在代码中订阅Topic,只要AMQP客户端在线就可以接收消息。不能通过AMQP服务端订阅向设备下发消息,如果需要请调用消息通信的API

使用场景

业务服务器接收设备消息:AMQP服务端订阅只能转发同一产品所有设备的指定类型消息,不能指定设备或指定Topic。如果需要更灵活地转发设备消息:

  • 指定设备、指定Topic的消息、消息过滤或处理:首先创建AMQP消费组,然后在业务服务器运行AMQP客户端,最后使用云产品流转将设备消息转发到相应的消费组。云产品流转中的解析器,可以对流转的消息进行转换数据格式、处理字符串、组装JSON格式数据、处理二进制数据等操作,更多信息请参见脚本语法。AMQP服务端订阅、云产品流转的应用场景和优缺点对比,请参见数据流转方案对比

  • 将设备消息转发到其他设备数据转发到其他Topic

  • 消费组内所有消费者收到消息:当设备消息被转发到AMQP消费组时,组内随机一个消费者收到消息。如果需要所有消费者收到消息,当消费者数量较少时,可以为每个消费者单独创建一个消费组,当消费者数量较多时,可以使用云产品流转将消息转发到消息队列RocketMQ,然后使用广播消费

前提条件

  • 已创建产品

  • 已创建消费组。您可使用物联网平台默认消费组(DEFAULT_GROUP)或创建消费组。

  • 设备上报的物模型Topic数据必须符合Alink JSON格式,才能触发AMQP服务端订阅。设备上报的自定义Topic数据没有格式要求。

使用限制

  • 建立连接之后,需要立刻发送认证请求。如果15秒内没有认证成功,服务器会主动关闭连接。

  • AMQP客户端的一个连接限流1,000 TPS,消息转发TPS限流由实例的消息转发TPS规格决定,消息大小无限制。更多的AMQP服务端订阅限制,请参见AMQP服务端订阅使用限制

如何配置

步骤一:创建消费组

  1. 登录物联网平台控制台

  2. 创建消费组,获取消费组ID用于后续接入AMQP客户端。每个消费组最多包括128个消费者,每个消费者只能配置一个消费组ID。

步骤二:创建AMQP服务端订阅

在物联网平台控制台创建AMQP订阅,关联消费组以及对应的设备消息类型。

  1. 实例概览页签的全部环境下,找到对应的实例,单击实例卡片。

  2. 在左侧导航栏,选择消息转发 > 服务端订阅

  3. 服务端订阅页面,单击创建订阅

  4. 创建订阅对话框中,完成配置,单击确认

    参数

    说明

    产品

    物联网平台会转发该产品下所有设备的消息。一个产品只能创建一个AMQP服务端订阅。

    订阅类型

    选择AMQP

    消费组

    物联网平台提供默认消费组。

    选择消费组列表,在右侧选择目标消费组面板,可以选择多个消费组,也可以

    单击右下角的创建消费组

    推送消息类型

    重要
    • 开源MQTT托管形态及云网关下产品和设备,仅支持推送设备上报消息设备状态变化通知设备生命周期变更的数据。消息转发Topic和数据格式的详情说明,请参见开源MQTT协议的消息通信说明JT/T 808协议的消息转发说明GB/T 32960协议的消息转发说明SL 651协议消息示例

    • 订阅一个使用通配符的Topic,计为一个Topic。例如订阅产品下的Topic:/asde****/+/user/get,无论产品下有多少设备,都计为一个Topic。

    • 一个消费组最多订阅200个Topic,超出限制的Topic消息不会被转发。

    • 自定义Topic的数据格式由用户定义。系统Topic和物模型Topic的消息说明及数据格式请参见数据格式

    • 设备上报消息:产品下所有设备的Topic列表中,操作权限发布的Topic中的消息,Topic的概念请参见什么是Topic。 例如产品有3个Topic类:

      • /${YourProductKey}/${YourDeviceName}/user/get,设备具有订阅权限。

      • /${YourProductKey}/${YourDeviceName}/user/update,设备具有发布权限。

      • /${YourProductKey}/${YourDeviceName}/thing/event/property/post,设备具有发布权限。

      • AMQP服务端订阅会推送具有发布权限的Topic类中的消息,即/${YourProductKey}/${YourDeviceName}/user/update/sys/${YourProductKey}/${YourDeviceName}/thing/event/property/post中的设备消息。

    • 物模型历史数据上报:如果订阅异步服务调用响应数据,设备端返回的响应消息Id必须与物联网平台下发消息的Id相同,才可实现数据正常订阅。

    • OTA模块版本号上报:当设备上报OTA模块版本号,且版本号有变更时进行消息转发。

      重要

      如果通过AMQP接收设备OTA模块版本号上报的消息,设备必须上传OTA模块名module参数,OTA升级的更多说明请参见OTA升级概述

    • OTA升级设备状态通知:包括升级包验证和批量升级时,设备升级成功、失败、取消和进度的事件通知。

    • OTA升级批次状态通知:设备OTA升级批次状态变化通知。

    • 设备状态变化通知:该产品下的设备上下线状态变化时通知的消息。

    • 网关子设备发现上报:网关将发现的子设备信息上报给物联网平台。需要网关上的应用程序支持。网关产品特有消息类型。

    • 设备拓扑关系变更:子设备和网关之间的拓扑关系建立和解除消息。网关产品特有消息类型。

    • 设备生命周期变更:设备创建、删除、禁用、启用等消息。

    • 设备标签变更:设备上报的标签变更消息。

步骤三:运行AMQP客户端

重要
  • 建议使用阿里云物联网平台提供的AMQP SDK接入示例。对于您自研的AMQP SDK,阿里云不提供后续技术支持服务。

  • 本示例使用Java语言,其他语言的示例请参见AMQP客户端接入说明

  1. 执行以下命令,下载demo文件。

    wget https://linkkit-export.oss-cn-shanghai.aliyuncs.com/amqp/amqp-demo.zip
  2. 执行以下命令,解压demo文件。

    unzip amqp-demo.zip
  3. src/main/java/com.aliyun.iotx.demo目录下AmqpClient.java文件中,参照下表修改AMQP的接入信息。

    重要

    本示例Demo代码中,添加了结束程序的代码(Thread.sleep(60 * 1000);),即程序启动成功,运行一分钟后会结束。实际场景中,您可根据需要自行设置运行时间。

    参数

    说明

    accessKey

    阿里云主账号或RAM用户的AccessKey ID和AccessKey Secret。

    登录物联网平台控制台,将鼠标移至账号头像上,然后单击AccessKey管理,获取AccessKey ID和AccessKey Secret。

    accessSecret

    consumerGroupId

    当前物联网平台对应实例中的消费组ID。

    登录物联网平台控制台,在对应实例的消息转发 > 服务端订阅 > 消费组列表查看您的消费组ID。

    iotInstanceId

    实例ID。您可在物联网平台控制台实例概览页面,查看当前实例的ID。

    • 若有ID值,必须传入该ID值。

    • 若无实例概览页面或ID值,传入空值,即iotInstanceId = ""

    clientId

    表示客户端ID,用户自定义,长度不可超过64个字符。建议使用您的AMQP客户端所在服务器UUID、MAC地址、IP等唯一标识。

    AMQP客户端接入并启动成功后,登录物联网平台控制台,在对应实例的消息转发 > 服务端订阅 > 消费组列表页签,单击消费组对应的查看消费组详情页面将显示该参数,方便您识别区分不同的客户端。

    connectionCount

    启动AMQP客户端的连接数,最大不超过128个。用于实时消息推送的扩容。

    消费组详情页面会以${clientId}+"-"+数字形式,显示连接的客户端。其中数字最小值为0。

    host

    AMQP接入域名。

    ${YourHost}对应的AMQP接入域名信息,请参见查看和配置实例终端节点信息(Endpoint)

  4. pom.xml文件中,已添加相关Maven依赖。在amqp-demo根目录执行以下命令,重新加载Maven变更,构建项目。

    mvn clean package
  5. amqp-demo/target目录执行以下命令,运行生成的JAR包。

    java -jar demo-0.0.1-SNAPSHOT-jar-with-dependencies.jar
  6. 运行示例代码后返回如下信息,表示AMQP客户端已接入物联网平台并成功接收消息。

    重要

    只有当AMQP客户端在线时,才能在服务器上收到设备消息。

    10:42:43.254 [main] INFO com.aliyun.iotx.demo.AmqpClient - amqp demo is started successfully, and will exit after 60s 
    10:59:46.405 [AmqpProvider :(3):[amqps://iot-cn-6******.amqp.iothub.aliyuncs.com:5671]] DEBUG org.apache.qpid.jms.provider.amqp.AmqpConsumer - Dispatching received message: JmsInboundMessageDispatch { sequence = 1, messageId = 1731508564705******, consumerId = ID:4e6d6f2b-1423-4c44-91ca-37202a******:1:1:1 }
    10:59:46.409 [JmsSession [ID:4e6d6f2b-1423-4c44-91ca-37202a******:1:1] delivery dispatcher] DEBUG org.apache.qpid.jms.provider.failover.FailoverProvider - Executing Failover Task: message acknowledge -> JmsInboundMessageDispatch { sequence = 1, messageId = 1731508564705073664, consumerId = ID:4e6d6f2b-1423-4c44-91ca-37202******:1:1:1 } ackType: DELIVERED (5)
    10:59:46.432 [AmqpProvider :(3):[amqps://iot-cn-6******.amqp.iothub.aliyuncs.com:5671]] DEBUG org.apache.qpid.jms.provider.amqp.AmqpConsumer - Delivered Ack of message: JmsInboundMessageDispatch { sequence = 1, messageId = 1731508564705******, consumerId = ID:4e6d6f2b-1423-4c44-91ca-37202a******:1:1:1 }
    10:59:46.441 [JmsSession [ID:4e6d6f2b-1423-4c44-91ca-37202******:1:1] delivery dispatcher] DEBUG org.apache.qpid.jms.provider.failover.FailoverProvider - Executing Failover Task: message acknowledge -> JmsInboundMessageDispatch { sequence = 1, messageId = 1731508564705073664, consumerId = ID:4e6d6f2b-1423-4c44-91ca-37202a******:1:1:1 } ackType: ACCEPTED (6)
    10:59:46.442 [AmqpProvider :(3):[amqps://iot-cn-6******.amqp.iothub.aliyuncs.com:5671]] DEBUG org.apache.qpid.jms.provider.amqp.AmqpConsumer - Accepted Ack of message: JmsInboundMessageDispatch { sequence = 1, messageId = 1731508564705******, consumerId = ID:4e6d6f2b-1423-4c44-91ca-37202a******:1:1:1 }
    10:59:46.452 [pool-1-thread-1] INFO com.aliyun.iotx.demo.AmqpClient - receive message,
     topic = /g18******/device1/user/update,
     messageId = 17315085647050******,
     generateTime = 1701658786356,
     content = test

    在相应消费组显示在线的AMQP客户端。amqp-democonnectionCount = 4代表4个客户端。

    image.png

  7. 如果AMQP客户端不在线,AMQP服务端订阅消息会堆积,AMQP客户端重新上线后,物联网平台重新推送消息。如果不需要消费堆积的消息,可在AMQP客户端上线前,清空堆积的消息

  8. 所有配置完成,设备上报订阅数据并被AMQP客户端接收后,您可以登录物联网平台控制台,进入对应实例查看消息运行日志。

    • 监控运维 > 日志服务 > 云端运行日志页签,查看设备上报数据、物联网平台转发数据到AMQP客户端和AMQP客户端返回ACK的日志记录。具体操作,请参见查询云端运行日志

    • 消息转发 > 服务端订阅 > 消费组列表页签,单击目标消费组右侧操作列的查看,在消费组详情页面,查看消息消费速率、消息堆积量、消费日志等。具体操作,请参见查看和监控消费组

管理服务端订阅

设置服务端订阅成功后,在服务端订阅页面的订阅列表下,找到已订阅的产品名称,可执行以下操作。

  • 编辑:单击产品对应操作列的编辑,在编辑订阅对话框,修改消费组推送消息类型

  • 删除:单击产品对应操作列的删除。单击确认

    警告

    如果用户在其他业务中使用了该产品订阅的设备数据,可能导致服务不可用或影响用户的业务。请谨慎操作。

相关文档

相关API

API

描述

CreateSubscribeRelation

创建服务端订阅为指定产品关联消费组以及对应设备消息类型。

UpdateSubscribeRelation

在已创建的服务端订阅中,更新产品关联的消费组或设备消息类型。

CreateConsumerGroupSubscribeRelation

在已创建的服务端订阅中,为指定产品添加一个消费组。

QueryConsumerGroupStatus

查询某个消费组的状态,包括在线客户端信息、消息消费速率、消息堆积数、最近消息消费时间。