AMQP客户端接入说明

如果需要将设备消息转发至AMQP客户端进行消费,您需在业务服务器中使用AMQP SDK开启客户端连接物联网平台。AMQP客户端接入成功并在业务服务器运行,才能接收物联网平台的设备消息。本文介绍AMQP客户端接入物联网平台的方法。

前提条件

已创建消费组:使用AMQP SDK开启AMQP客户端消费消费组内订阅的消息,需要指定消费组ID。

背景信息

服务端订阅和云产品流转功能就可将设备消息流转至AMQP客户端消费。您可对比流转方案、应用场景及功能优势,根据业务需求选择合适的流转方案。具体内容,请参见:

使用限制

  • 阿里云物联网平台服务端订阅仅支持AMQP 1.0版的协议标准。AMQP协议标准的详细介绍,请参见AMQP协议标准

  • 一个AMQP客户端的连接数最大为128个。最多可开启64AMQP客户端同时消费同一个消费组。

AMQP SDK说明

重要

建议您使用阿里云物联网平台提供的AMQP SDK接入示例。对于您自研的AMQP SDK,阿里云不提供后续技术支持服务。

阿里云物联网平台提供以下语言的AMQP SDK示例代码供您使用。AMQP客户端认证过程、认证参数配置和消息处理逻辑,请参见本文的连接认证过程连接配置说明客户端接收消息逻辑

接入过程中,您可能会遇到消息相关的错误码。更多信息,请参见消息相关错误码

连接认证过程

  1. AMQP客户端与物联网平台经过三次握手建立TCP连接,然后进行TLS握手校验。

    说明

    为了保障安全,接收方必须使用TLS加密,不支持非加密的TCP传输。

  2. AMQP客户端请求建立Connection

    连接认证方式为PLAIN-SASL,可以理解为用户名(userName)和密码(password)认证。物联网平台的云端认证userNamepassword通过后,建立Connection

    此外,根据AMQP协议,AMQP客户端建连时,还需在Open帧中携带心跳时间,即AMQP协议的idle-time-out参数。心跳时间单位为毫秒,取值范围为30,000~300,000。如果超过心跳时间,Connection上没有任何帧通信,物联网平台将关闭连接。SDK不同,idle-time-out参数设置方法不同。具体设置方法,请参见各语言SDK示例文档。

  3. AMQP客户端向物联网平台的云端发起请求,建立Receiver Link(即云端向客户端推送数据的单向通道)。

    客户端建立Connection成功后,需在15秒内完成Receiver Link的建立,否则物联网平台会关闭连接。

    建立Receiver Link后,客户端成功接入物联网平台。

    说明
    • 一个Connection上只能创建一个Receiver Link,不支持创建Sender Link,即只能由物联网平台的云端向客户端推送消息,客户端不能向云端发送消息。

    • Receiver Link在不同SDK中名称不同,例如在有的SDK上称为MessageConsumer,请根据具体SDK设置。

连接配置说明

AMQP客户端接入物联网平台的连接地址和连接认证参数说明如下:

接入域名和端口

公共实例和企业版实例中,AMQP的接入域名,请参见管理实例终端节点

说明

接入域名为AMQP SDK示例代码中的${YourHost}

  • 对于Java、.NET、Python 2.7、Node.js、Go客户端:端口号为5671。

  • 对于Python3、PHP客户端:端口号为61614。

客户端身份认证参数

不同身份账号使用AMQP SDKAMQP客户端接入物联网平台,配置的认证参数有区别。

  • 如果是当前物联网平台所属阿里云主账号或其下直接授权的RAM用户,认证参数如下:

    说明

    对于直接授权的RAM用户,需要给该RAM用户授予操作AMQP服务端订阅功能的权限(iot:sub),否则将会连接失败。授权方法,请参见IoT授权映射表

    为提升物联网平台数据安全,推荐通过RAM角色授予RAM用户指定的操作权限。具体说明,请参见下文。

    userName = clientId|iotInstanceId=${iotInstanceId},authMode=aksign,signMethod=hmacsha1,consumerGroupId=${consumerGroupId},authId=${accessKey},timestamp=1573489088171|
    password = signMethod(stringToSign, accessSecret)
  • 如果是通过RAM角色授权的RAM用户,认证参数如下:

    说明

    通过RAM角色授权的RAM用户除了当前物联网平台所属阿里云主账号下的RAM用户,还支持跨账号(其他阿里云主账号)下的RAM用户。关于如何通过RAM角色授权RAM用户操作物联网平台服务端订阅功能,请参见本账号RAM用户授权服务端订阅跨账号RAM用户授权服务端订阅

    userName = clientId|iotInstanceId=${iotInstanceId},authMode=ststoken,securityToken=${SecurityToken},signMethod=hmacsha1,consumerGroupId=${consumerGroupId},authId=${accessKey},timestamp=1573489088171|
    password = signMethod(stringToSign, accessSecret)

    表 1. userName参数说明

    参数

    是否必传

    说明

    clientId

    表示客户端ID,需您自定义,长度不可超过64个字符。建议使用您的AMQP客户端所在服务器UUID、MAC地址、IP等唯一标识。

    AMQP客户端接入并启动成功后,登录物联网平台控制台,在对应实例的消息转发 > 服务端订阅 > 消费组列表页签,单击消费组对应的查看消费组详情页面将显示该参数,方便您识别区分不同的客户端。

    iotInstanceId

    当前物联网平台实例的ID。您可在物联网平台控制台的实例概览页签,查看当前实例的ID

    • 若有ID值,必须传入该ID值。

    • 若无实例概览页签或ID值,则无需传入。

    authMode

    鉴权模式。

    • 当前物联网平台所属阿里云主账号或其下直接授权的RAM用户:使用aksign模式。

    • 通过RAM角色授权的RAM用户:使用ststoken模式。

    securityToken

    重要

    仅通过RAM角色授权的RAM用户接入AMQP客户端时,需配置此参数。

    RAM用户扮演RAM角色的临时身份凭证((STS Token)),可以通过调用AssumeRole接口获取,具体内容,请参见AssumeRole

    signMethod

    签名算法。可选:hmacmd5hmacsha1hmacsha256

    consumerGroupId

    当前物联网平台对应实例中的消费组ID。

    登录物联网平台控制台,在对应实例的消息转发 > 服务端订阅 > 消费组列表查看您的消费组ID。

    authId

    认证信息。

    • 对于当前物联网平台所属阿里云主账号或其下直接授权的RAM用户

      分别对应取值为阿里云主账号的AccessKey ID,或RAM用户的AccessKey ID。

      登录物联网平台控制台,将鼠标移至账号头像上,然后单击AccessKey管理,获取AccessKey。

    • 对于通过RAM角色授权的RAM用户

      取值为扮演RAM角色的RAM用户的AccessKey ID。

    timestamp

    当前时间。Long类型的毫秒值时间戳。

    表 2. password参数说明

    参数

    是否必传

    说明

    signMethod

    签名算法。请使用userName中指定的签名算法计算签名值,并转为base64字符串。

    stringToSign

    待签名的字符串。

    将需要签名的参数的键值对按照首字母字典排序,并在键值间添加等号(=);参数间添加与号(&),拼接成待签名的字符串。

    • 对于当前物联网平台所属阿里云主账号或其下直接授权的RAM用户

      需要签名的参数为:authIdtimestamp

      待签名的字符串为:stringToSign = authId=${accessKey}&timestamp=1573489088171

    • 对于通过RAM角色授权的RAM用户

      需要签名的参数为:securityTokenauthIdtimestamp

      待签名的字符串为:stringToSign = authId=${accessKey}&securityToken=${SecurityToken}&timestamp=1573489088171

    accessSecret

    • 对于当前物联网平台所属阿里云主账号或其下直接授权的RAM用户

      分别对应取值为阿里云主账号的AccessKey Secret,或RAM用户的AccessKey Secret。

      登录物联网平台控制台,将鼠标移至账号头像上,然后单击AccessKey管理,获取AccessKey。

    • 对于通过RAM角色授权的RAM用户

      取值为扮演RAM角色的RAM用户的AccessKey Secret。

客户端接收消息逻辑

客户端和物联网平台云端之间的Receiver Link建连成功后,云端就可以在这条Link上向AMQP客户端推送消息。

说明

客户端仅支持接收物联网平台已经订阅的消息,要向设备发送消息或指令,可根据需要,调用对应的API。更多信息,请参见API列表

消息推送

物联网平台推送的消息:

  • 消息体:消息的payload为二进制格式。

  • 消息的业务属性:如消息TopicMessage ID等,需要从AMQP协议的Application Properties中获取。格式为key:value

    Key

    含义

    topic

    消息Topic。

    messageId

    消息ID。

    generateTime

    消息生成时间。

    说明

    消息生成时间generateTime不能作为判断消息顺序的依据。

消息回执

按照AMQP协议的定义,客户端需要给物联网平台的云端回执(AMQP协议上一般称为settle),通知云端消息已经被成功接收。AMQP客户端通常会提供自动回执模式(推荐)和手动回执模式。具体请参考相应的客户端的使用说明。

消息处理

AMQP客户端接收到消息后,业务层处理该消息的逻辑和方法,由您自行完成开发。

消息策略

  • 实时消息直接推送。

  • 进入堆积列队的消息:

    由于消费客户端离线、消息消费慢等原因,消息不能实时消费,而进入堆积队列。

    • 消费客户端重新上线并恢复稳定消费能力后,物联网平台重试推送堆积消息。

    • 如果客户端对重试推送的消息消费失败,可能导致堆积队列阻塞。按大约一分钟间隔,物联网平台向客户端再次重试推送。

说明
  • 消费端存在短暂的流量不均衡,属于正常现象。一般能在10分钟内恢复。如果您的消息QPS较高或消息处理较耗费资源,建议增加消费端的数量,保持消费能力冗余。

  • 数据流转时,为确保消息送达,同一条消息可能重复发送,直到客户端返回ACK或消息过期。同一条消息的消息ID相同,您可根据消息ID去重。

  • 关于消息限制的更多信息,请参见服务端订阅使用限制

  • 您可以在物联网平台控制台清除堆积消息。具体操作,请参见查看和监控消费组

消息时序

说明

消息不保序,即接收到消息的时间顺序不一定是消息实际产生的时间顺序。

  • 设备上下线消息:

    收到消息的顺序不是实际设备上下线时间排序。设备上下线顺序需按照time具体值排序。

    例如,您依次收到3条消息:

    1. 上线:2018-08-31 10:02:28.195

    2. 下线:2018-08-31 10:01:28.195

    3. 下线:2018-08-31 10:03:28.195

    3条消息展示了,设备先下线,再上线,最后下线的过程。

    关于消息中参数的更多信息,请参见数据格式

  • 其他类型的消息:

    您需要在业务层,给消息增加序列号。根据接收到消息中的序列号,幂等判断消息是否需要处理。

常见问题