本文中含有需要您注意的重要提示信息,忽略该信息可能对您的业务造成影响,请务必仔细阅读。
当需要实时获取设备上报的数据时,如果使用云端API只能获取物模型数据,而且可能无法实时获取,使用基于AMQP 1.0版协议的AMQP服务端订阅功能,可以在业务服务器实时、可靠地获取设备上报消息。本文为您介绍配置AMQP服务端订阅的操作步骤。
工作原理
AMQP服务端订阅会将同一产品所有设备的指定类型消息,实时转发到一个或多个消费组中,每个消费组中包括多个消费者即AMQP客户端。每条消息转发到消费组时,消费组中随机一个消费者收到消息,不同消费组通过消费组ID区分。在上图中:
订阅关系1:将产品1所有设备的消息,转发到消费组1和消费组2。
订阅关系2:将产品2所有设备的消息,转发到消费组2。
配置AMQP服务端订阅后,物联网平台会自动将设备消息转发到AMQP客户端,不需要在代码中订阅Topic,只要AMQP客户端在线就可以接收消息。不能通过AMQP服务端订阅向设备下发消息,如果需要请调用消息通信的API。
使用场景
业务服务器接收设备消息:AMQP服务端订阅只能转发同一产品所有设备的指定类型消息,不能指定设备或指定Topic。如果需要更灵活地转发设备消息:
指定设备、指定Topic的消息、消息过滤或处理:首先创建AMQP消费组,然后在业务服务器运行AMQP客户端,最后使用云产品流转将设备消息转发到相应的消费组。云产品流转中的解析器,可以对流转的消息进行转换数据格式、处理字符串、组装JSON格式数据、处理二进制数据等操作,更多信息请参见脚本语法。AMQP服务端订阅、云产品流转的应用场景和优缺点对比,请参见数据流转方案对比。
将设备消息转发到其他设备:数据转发到其他Topic。
消费组内所有消费者收到消息:当设备消息被转发到AMQP消费组时,组内随机一个消费者收到消息。如果需要所有消费者收到消息,当消费者数量较少时,可以为每个消费者单独创建一个消费组,当消费者数量较多时,可以使用云产品流转将消息转发到消息队列RocketMQ,然后使用广播消费。
前提条件
已创建消费组。您可使用物联网平台默认消费组(DEFAULT_GROUP)或创建消费组。
设备上报的物模型Topic数据必须符合Alink JSON格式,才能触发AMQP服务端订阅。设备上报的自定义Topic数据没有格式要求。
使用限制
建立连接之后,需要立刻发送认证请求。如果15秒内没有认证成功,服务器会主动关闭连接。
AMQP客户端的一个连接限流1,000 TPS,消息转发TPS限流由实例的消息转发TPS规格决定,消息大小无限制。更多的AMQP服务端订阅限制,请参见AMQP服务端订阅使用限制。
如何配置
步骤一:创建消费组
步骤二:创建AMQP服务端订阅
在物联网平台控制台创建AMQP订阅,关联消费组以及对应的设备消息类型。
在实例概览页签的全部环境下,找到对应的实例,单击实例卡片。
在左侧导航栏,选择
。在服务端订阅页面,单击创建订阅。
在创建订阅对话框中,完成配置,单击确认。
参数
说明
产品
物联网平台会转发该产品下所有设备的消息。一个产品只能创建一个AMQP服务端订阅。
订阅类型
选择AMQP。
消费组
物联网平台提供默认消费组。
选择消费组列表,在右侧选择目标消费组面板,可以选择多个消费组,也可以
单击右下角的创建消费组。
推送消息类型
重要开源MQTT托管形态及云网关下产品和设备,仅支持推送设备上报消息、设备状态变化通知和设备生命周期变更的数据。消息转发Topic和数据格式的详情说明,请参见开源MQTT协议的消息通信说明、JT/T 808协议的消息转发说明、GB/T 32960协议的消息转发说明、SL 651协议消息示例。
订阅一个使用通配符的Topic,计为一个Topic。例如订阅产品下的Topic:
/asde****/+/user/get
,无论产品下有多少设备,都计为一个Topic。一个消费组最多订阅200个Topic,超出限制的Topic消息不会被转发。
自定义Topic的数据格式由用户定义。系统Topic和物模型Topic的消息说明及数据格式请参见数据格式。
设备上报消息:产品下所有设备的Topic列表中,操作权限为发布的Topic中的消息,Topic的概念请参见什么是Topic。 例如产品有3个Topic类:
/${YourProductKey}/${YourDeviceName}/user/get
,设备具有订阅权限。/${YourProductKey}/${YourDeviceName}/user/update
,设备具有发布权限。/${YourProductKey}/${YourDeviceName}/thing/event/property/post
,设备具有发布权限。AMQP服务端订阅会推送具有发布权限的Topic类中的消息,即
/${YourProductKey}/${YourDeviceName}/user/update
和/sys/${YourProductKey}/${YourDeviceName}/thing/event/property/post
中的设备消息。
物模型历史数据上报:如果订阅异步服务调用响应数据,设备端返回的响应消息Id必须与物联网平台下发消息的Id相同,才可实现数据正常订阅。
OTA模块版本号上报:当设备上报OTA模块版本号,且版本号有变更时进行消息转发。
重要如果通过AMQP接收设备OTA模块版本号上报的消息,设备必须上传OTA模块名
module
参数,OTA升级的更多说明请参见OTA升级概述。OTA升级设备状态通知:包括升级包验证和批量升级时,设备升级成功、失败、取消和进度的事件通知。
OTA升级批次状态通知:设备OTA升级批次状态变化通知。
设备状态变化通知:该产品下的设备上下线状态变化时通知的消息。
网关子设备发现上报:网关将发现的子设备信息上报给物联网平台。需要网关上的应用程序支持。网关产品特有消息类型。
设备拓扑关系变更:子设备和网关之间的拓扑关系建立和解除消息。网关产品特有消息类型。
设备生命周期变更:设备创建、删除、禁用、启用等消息。
设备标签变更:设备上报的标签变更消息。
步骤三:运行AMQP客户端
建议使用阿里云物联网平台提供的AMQP SDK接入示例。对于您自研的AMQP SDK,阿里云不提供后续技术支持服务。
本示例使用Java语言,其他语言的示例请参见AMQP客户端接入说明。
执行以下命令,下载demo文件。
wget https://linkkit-export.oss-cn-shanghai.aliyuncs.com/amqp/amqp-demo.zip
执行以下命令,解压demo文件。
unzip amqp-demo.zip
在
src/main/java/com.aliyun.iotx.demo
目录下AmqpClient.java
文件中,参照下表修改AMQP的接入信息。重要本示例Demo代码中,添加了结束程序的代码(
Thread.sleep(60 * 1000);
),即程序启动成功,运行一分钟后会结束。实际场景中,您可根据需要自行设置运行时间。参数
说明
accessKey
阿里云主账号或RAM用户的AccessKey ID和AccessKey Secret。
登录物联网平台控制台,将鼠标移至账号头像上,然后单击AccessKey管理,获取AccessKey ID和AccessKey Secret。
accessSecret
consumerGroupId
当前物联网平台对应实例中的消费组ID。
登录物联网平台控制台,在对应实例的
查看您的消费组ID。iotInstanceId
实例ID。您可在物联网平台控制台的实例概览页面,查看当前实例的ID。
若有ID值,必须传入该ID值。
若无实例概览页面或ID值,传入空值,即
iotInstanceId = ""
。
clientId
表示客户端ID,用户自定义,长度不可超过64个字符。建议使用您的AMQP客户端所在服务器UUID、MAC地址、IP等唯一标识。
AMQP客户端接入并启动成功后,登录物联网平台控制台,在对应实例的 页签,单击消费组对应的查看,消费组详情页面将显示该参数,方便您识别区分不同的客户端。
connectionCount
启动AMQP客户端的连接数,最大不超过128个。用于实时消息推送的扩容。
消费组详情页面会以
${clientId}+"-"+数字
形式,显示连接的客户端。其中数字最小值为0。host
AMQP接入域名。
${YourHost}
对应的AMQP接入域名信息,请参见查看和配置实例终端节点信息(Endpoint)。在
pom.xml
文件中,已添加相关Maven依赖。在amqp-demo
根目录执行以下命令,重新加载Maven变更,构建项目。mvn clean package
在
amqp-demo/target
目录执行以下命令,运行生成的JAR包。java -jar demo-0.0.1-SNAPSHOT-jar-with-dependencies.jar
运行示例代码后返回如下信息,表示AMQP客户端已接入物联网平台并成功接收消息。
重要只有当AMQP客户端在线时,才能在服务器上收到设备消息。
10:42:43.254 [main] INFO com.aliyun.iotx.demo.AmqpClient - amqp demo is started successfully, and will exit after 60s 10:59:46.405 [AmqpProvider :(3):[amqps://iot-cn-6******.amqp.iothub.aliyuncs.com:5671]] DEBUG org.apache.qpid.jms.provider.amqp.AmqpConsumer - Dispatching received message: JmsInboundMessageDispatch { sequence = 1, messageId = 1731508564705******, consumerId = ID:4e6d6f2b-1423-4c44-91ca-37202a******:1:1:1 } 10:59:46.409 [JmsSession [ID:4e6d6f2b-1423-4c44-91ca-37202a******:1:1] delivery dispatcher] DEBUG org.apache.qpid.jms.provider.failover.FailoverProvider - Executing Failover Task: message acknowledge -> JmsInboundMessageDispatch { sequence = 1, messageId = 1731508564705073664, consumerId = ID:4e6d6f2b-1423-4c44-91ca-37202******:1:1:1 } ackType: DELIVERED (5) 10:59:46.432 [AmqpProvider :(3):[amqps://iot-cn-6******.amqp.iothub.aliyuncs.com:5671]] DEBUG org.apache.qpid.jms.provider.amqp.AmqpConsumer - Delivered Ack of message: JmsInboundMessageDispatch { sequence = 1, messageId = 1731508564705******, consumerId = ID:4e6d6f2b-1423-4c44-91ca-37202a******:1:1:1 } 10:59:46.441 [JmsSession [ID:4e6d6f2b-1423-4c44-91ca-37202******:1:1] delivery dispatcher] DEBUG org.apache.qpid.jms.provider.failover.FailoverProvider - Executing Failover Task: message acknowledge -> JmsInboundMessageDispatch { sequence = 1, messageId = 1731508564705073664, consumerId = ID:4e6d6f2b-1423-4c44-91ca-37202a******:1:1:1 } ackType: ACCEPTED (6) 10:59:46.442 [AmqpProvider :(3):[amqps://iot-cn-6******.amqp.iothub.aliyuncs.com:5671]] DEBUG org.apache.qpid.jms.provider.amqp.AmqpConsumer - Accepted Ack of message: JmsInboundMessageDispatch { sequence = 1, messageId = 1731508564705******, consumerId = ID:4e6d6f2b-1423-4c44-91ca-37202a******:1:1:1 } 10:59:46.452 [pool-1-thread-1] INFO com.aliyun.iotx.demo.AmqpClient - receive message, topic = /g18******/device1/user/update, messageId = 17315085647050******, generateTime = 1701658786356, content = test
在相应消费组显示在线的AMQP客户端。
amqp-demo
中connectionCount = 4
代表4个客户端。如果AMQP客户端不在线,AMQP服务端订阅消息会堆积,AMQP客户端重新上线后,物联网平台重新推送消息。如果不需要消费堆积的消息,可在AMQP客户端上线前,清空堆积的消息。
所有配置完成,设备上报订阅数据并被AMQP客户端接收后,您可以登录物联网平台控制台,进入对应实例查看消息运行日志。
管理服务端订阅
设置服务端订阅成功后,在服务端订阅页面的订阅列表下,找到已订阅的产品名称,可执行以下操作。
编辑:单击产品对应操作列的编辑,在编辑订阅对话框,修改消费组或推送消息类型。
删除:单击产品对应操作列的删除。单击确认。
警告如果用户在其他业务中使用了该产品订阅的设备数据,可能导致服务不可用或影响用户的业务。请谨慎操作。
相关文档
相关API
API | 描述 |
创建服务端订阅为指定产品关联消费组以及对应设备消息类型。 | |
在已创建的服务端订阅中,更新产品关联的消费组或设备消息类型。 | |
在已创建的服务端订阅中,为指定产品添加一个消费组。 | |
查询某个消费组的状态,包括在线客户端信息、消息消费速率、消息堆积数、最近消息消费时间。 |