如果需要将设备消息转发至AMQP客户端进行消费,您需在业务服务器中使用AMQP SDK开启客户端连接物联网平台。AMQP客户端接入成功并在业务服务器运行,才能接收物联网平台的设备消息。本文介绍AMQP客户端接入物联网平台的方法。
前提条件
已创建消费组:使用AMQP SDK开启AMQP客户端消费消费组内订阅的消息,需要指定消费组ID。
背景信息
服务端订阅和云产品流转功能就可将设备消息流转至AMQP客户端消费。您可对比流转方案、应用场景及功能优势,根据业务需求选择合适的流转方案。具体内容,请参见:
使用限制
阿里云物联网平台服务端订阅仅支持AMQP 1.0版的协议标准。AMQP协议标准的详细介绍,请参见AMQP协议标准。
一个AMQP客户端的连接数最大为128个。最多可开启64个AMQP客户端同时消费同一个消费组。
AMQP SDK说明
建议您使用阿里云物联网平台提供的AMQP SDK接入示例。对于您自研的AMQP SDK,阿里云不提供后续技术支持服务。
阿里云物联网平台提供以下语言的AMQP SDK示例代码供您使用。AMQP客户端认证过程、认证参数配置和消息处理逻辑,请参见本文的连接认证过程、连接配置说明和客户端接收消息逻辑。
接入过程中,您可能会遇到消息相关的错误码。更多信息,请参见消息相关错误码。
连接认证过程
AMQP客户端与物联网平台经过三次握手建立TCP连接,然后进行TLS握手校验。
说明为了保障安全,接收方必须使用TLS加密,不支持非加密的TCP传输。
AMQP客户端请求建立
Connection
。连接认证方式为
PLAIN-SASL
,可以理解为用户名(userName)和密码(password)认证。物联网平台的云端认证userName和password通过后,建立Connection
。此外,根据AMQP协议,AMQP客户端建连时,还需在Open帧中携带心跳时间,即AMQP协议的idle-time-out参数。心跳时间单位为毫秒,取值范围为30,000~300,000。如果超过心跳时间,
Connection
上没有任何帧通信,物联网平台将关闭连接。SDK不同,idle-time-out参数设置方法不同。具体设置方法,请参见各语言SDK示例文档。AMQP客户端向物联网平台的云端发起请求,建立
Receiver Link
(即云端向客户端推送数据的单向通道)。客户端建立
Connection
成功后,需在15秒内完成Receiver Link
的建立,否则物联网平台会关闭连接。建立
Receiver Link
后,客户端成功接入物联网平台。说明一个
Connection
上只能创建一个Receiver Link,不支持创建Sender Link
,即只能由物联网平台的云端向客户端推送消息,客户端不能向云端发送消息。Receiver Link
在不同SDK中名称不同,例如在有的SDK上称为MessageConsumer,请根据具体SDK设置。
连接配置说明
AMQP客户端接入物联网平台的连接地址和连接认证参数说明如下:
接入域名和端口
公共实例和企业版实例中,AMQP的接入域名,请参见管理实例终端节点。
接入域名为AMQP SDK示例代码中的${YourHost}
。
对于Java、.NET、Python 2.7、Node.js、Go客户端:端口号为5671。
对于Python3、PHP客户端:端口号为61614。
客户端身份认证参数
不同身份账号使用AMQP SDK将AMQP客户端接入物联网平台,配置的认证参数有区别。
如果是当前物联网平台所属阿里云主账号或其下直接授权的RAM用户,认证参数如下:
说明对于直接授权的RAM用户,需要给该RAM用户授予操作AMQP服务端订阅功能的权限(iot:sub),否则将会连接失败。授权方法,请参见IoT授权映射表。
为提升物联网平台数据安全,推荐通过RAM角色授予RAM用户指定的操作权限。具体说明,请参见下文。
userName = clientId|iotInstanceId=${iotInstanceId},authMode=aksign,signMethod=hmacsha1,consumerGroupId=${consumerGroupId},authId=${accessKey},timestamp=1573489088171| password = signMethod(stringToSign, accessSecret)
如果是通过RAM角色授权的RAM用户,认证参数如下:
说明通过RAM角色授权的RAM用户除了当前物联网平台所属阿里云主账号下的RAM用户,还支持跨账号(其他阿里云主账号)下的RAM用户。关于如何通过RAM角色授权RAM用户操作物联网平台服务端订阅功能,请参见本账号RAM用户授权服务端订阅和跨账号RAM用户授权服务端订阅。
userName = clientId|iotInstanceId=${iotInstanceId},authMode=ststoken,securityToken=${SecurityToken},signMethod=hmacsha1,consumerGroupId=${consumerGroupId},authId=${accessKey},timestamp=1573489088171| password = signMethod(stringToSign, accessSecret)
表 1. userName参数说明
参数
是否必传
说明
clientId
是
表示客户端ID,需您自定义,长度不可超过64个字符。建议使用您的AMQP客户端所在服务器UUID、MAC地址、IP等唯一标识。
AMQP客户端接入并启动成功后,登录物联网平台控制台,在对应实例的 页签,单击消费组对应的查看,消费组详情页面将显示该参数,方便您识别区分不同的客户端。
iotInstanceId
否
当前物联网平台实例的ID。您可在物联网平台控制台的实例概览页签,查看当前实例的ID。
若有ID值,必须传入该ID值。
若无实例概览页签或ID值,则无需传入。
authMode
是
鉴权模式。
当前物联网平台所属阿里云主账号或其下直接授权的RAM用户:使用
aksign
模式。通过RAM角色授权的RAM用户:使用
ststoken
模式。
securityToken
否
重要仅通过RAM角色授权的RAM用户接入AMQP客户端时,需配置此参数。
RAM用户扮演RAM角色的临时身份凭证((STS Token)),可以通过调用AssumeRole接口获取,具体内容,请参见AssumeRole。
signMethod
是
签名算法。可选:
hmacmd5
、hmacsha1
和hmacsha256
。consumerGroupId
是
当前物联网平台对应实例中的消费组ID。
登录物联网平台控制台,在对应实例的
查看您的消费组ID。authId
是
认证信息。
对于当前物联网平台所属阿里云主账号或其下直接授权的RAM用户
分别对应取值为阿里云主账号的AccessKey ID,或RAM用户的AccessKey ID。
登录物联网平台控制台,将鼠标移至账号头像上,然后单击AccessKey管理,获取AccessKey。
对于通过RAM角色授权的RAM用户
取值为扮演RAM角色的RAM用户的AccessKey ID。
timestamp
是
当前时间。Long类型的毫秒值时间戳。
表 2. password参数说明 参数
是否必传
说明
signMethod
是
签名算法。请使用userName中指定的签名算法计算签名值,并转为base64字符串。
stringToSign
是
待签名的字符串。
将需要签名的参数的键值对按照首字母字典排序,并在键值间添加等号(=);参数间添加与号(&),拼接成待签名的字符串。
对于当前物联网平台所属阿里云主账号或其下直接授权的RAM用户
需要签名的参数为:
authId
和timestamp
。待签名的字符串为:
stringToSign = authId=${accessKey}×tamp=1573489088171
。对于通过RAM角色授权的RAM用户
需要签名的参数为:
securityToken
、authId
和timestamp
。待签名的字符串为:
stringToSign = authId=${accessKey}&securityToken=${SecurityToken}×tamp=1573489088171
。
accessSecret
是
对于当前物联网平台所属阿里云主账号或其下直接授权的RAM用户
分别对应取值为阿里云主账号的AccessKey Secret,或RAM用户的AccessKey Secret。
登录物联网平台控制台,将鼠标移至账号头像上,然后单击AccessKey管理,获取AccessKey。
对于通过RAM角色授权的RAM用户
取值为扮演RAM角色的RAM用户的AccessKey Secret。
客户端接收消息逻辑
客户端和物联网平台云端之间的Receiver Link建连成功后,云端就可以在这条Link上向AMQP客户端推送消息。
客户端仅支持接收物联网平台已经订阅的消息,要向设备发送消息或指令,可根据需要,调用对应的API。更多信息,请参见API列表。
消息推送
物联网平台推送的消息:
消息体:消息的payload为二进制格式。
消息的业务属性:如消息Topic和Message ID等,需要从AMQP协议的Application Properties中获取。格式为
key:value
。Key
含义
topic
消息Topic。
messageId
消息ID。
generateTime
消息生成时间。
说明消息生成时间generateTime不能作为判断消息顺序的依据。
消息回执
按照AMQP协议的定义,客户端需要给物联网平台的云端回执(AMQP协议上一般称为settle),通知云端消息已经被成功接收。AMQP客户端通常会提供自动回执模式(推荐)和手动回执模式。具体请参考相应的客户端的使用说明。
消息处理
AMQP客户端接收到消息后,业务层处理该消息的逻辑和方法,由您自行完成开发。
消息策略
实时消息直接推送。
进入堆积列队的消息:
由于消费客户端离线、消息消费慢等原因,消息不能实时消费,而进入堆积队列。
消费客户端重新上线并恢复稳定消费能力后,物联网平台重试推送堆积消息。
如果客户端对重试推送的消息消费失败,可能导致堆积队列阻塞。按大约一分钟间隔,物联网平台向客户端再次重试推送。
消息时序
消息不保序,即接收到消息的时间顺序不一定是消息实际产生的时间顺序。
设备上下线消息:
收到消息的顺序不是实际设备上下线时间排序。设备上下线顺序需按照time具体值排序。
例如,您依次收到3条消息:
上线:
2018-08-31 10:02:28.195
。下线:
2018-08-31 10:01:28.195
。下线:
2018-08-31 10:03:28.195
。
这3条消息展示了,设备先下线,再上线,最后下线的过程。
关于消息中参数的更多信息,请参见数据格式。
其他类型的消息:
您需要在业务层,给消息增加序列号。根据接收到消息中的序列号,幂等判断消息是否需要处理。