Python 2.7 SDK接入示例

本文介绍使用Python 2.7 SDK接入阿里云物联网平台,接收服务端订阅消息的示例。

前提条件

已获取消费组ID,并订阅Topic消息。

准备开发环境

本示例所使用的开发环境为Python 2.7版。

下载SDK

Python语言的AMQP SDK,推荐使用Apache Qpid Proton 0.29.0,该库中已封装了Python API。请访问Qpid Proton 0.29.0下载库和查看使用说明。

安装Proton。安装操作指导,请参见Installing Qpid Proton

安装完成后,通过以下Python命令查看SSL库是否安装成功。

import proton;print('%s' % 'SSL present' if proton.SSL.present() else 'SSL NOT AVAILABLE')

代码示例

# encoding=utf-8
import sys
import logging
import time
from proton.handlers import MessagingHandler
from proton.reactor import Container
import hashlib
import hmac
import base64
import os

reload(sys)
sys.setdefaultencoding('utf-8')
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
console_handler = logging.StreamHandler(sys.stdout)


def current_time_millis():
    return str(int(round(time.time() * 1000)))


def do_sign(secret, sign_content):
    m = hmac.new(secret, sign_content, digestmod=hashlib.sha1)
    return base64.b64encode(m.digest())


class AmqpClient(MessagingHandler):
    def __init__(self):
        super(AmqpClient, self).__init__()

    def on_start(self, event):
        # 接入域名,请参见AMQP客户端接入说明文档。
        url = "amqps://${YourHost}:5671"
        # 工程代码泄露可能会导致 AccessKey 泄露,并威胁账号下所有资源的安全性。以下代码示例使用环境变量获取 AccessKey 的方式进行调用,仅供参考
        accessKey = os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID']
        accessSecret = os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
        consumerGroupId = "${YourConsumerGroupId}"
        clientId = "${YourClientId}"
        # iotInstanceId:实例ID。
        iotInstanceId = "${YourIotInstanceId}"
        # 签名方法:支持hmacmd5,hmacsha1和hmacsha256。
        signMethod = "hmacsha1"
        timestamp = current_time_millis()
        # userName组装方法,请参见AMQP客户端接入说明文档。
        userName = clientId + "|authMode=aksign" + ",signMethod=" + signMethod \
                        + ",timestamp=" + timestamp + ",authId=" + accessKey \
                        + ",iotInstanceId=" + iotInstanceId + ",consumerGroupId=" + consumerGroupId + "|"
        signContent = "authId=" + accessKey + "&timestamp=" + timestamp
        # 计算签名,password组装方法,请参见AMQP客户端接入说明文档。
        passWord = do_sign(accessSecret.encode("utf-8"), signContent.encode("utf-8"))
        conn = event.container.connect(url, user=userName, password=passWord, heartbeat=60)
        self.receiver = event.container.create_receiver(conn)

    # 当连接成功建立时被调用。
    def on_connection_opened(self, event):
        logger.info("Connection established, remoteUrl: %s", event.connection.hostname)

    # 当连接关闭时被调用。
    def on_connection_closed(self, event):
        logger.info("Connection closed: %s", self)

    # 当远端因错误而关闭连接时被调用。
    def on_connection_error(self, event):
        logger.info("Connection error")

    # 当建立AMQP连接错误时被调用,包括身份验证错误和套接字错误。
    def on_transport_error(self, event):
        if event.transport.condition:
            if event.transport.condition.info:
                logger.error("%s: %s: %s" % (
                    event.transport.condition.name, event.transport.condition.description,
                    event.transport.condition.info))
            else:
                logger.error("%s: %s" % (event.transport.condition.name, event.transport.condition.description))
        else:
            logging.error("Unspecified transport error")

    # 当收到消息时被调用。
    def on_message(self, event):
        message = event.message
        content = message.body.decode('utf-8')
        topic = message.properties.get("topic")
        message_id = message.properties.get("messageId")
        print("receive message: message_id=%s, topic=%s, content=%s" % (message_id, topic, content))
        event.receiver.flow(1)


Container(AmqpClient()).run()

您需按照如下表格中的参数说明,修改代码中的参数值。更多参数说明,请参见AMQP客户端接入说明

重要

请确保参数值输入正确,否则AMQP客户端接入会失败。

参数

说明

url

AMQP客户端接入物联网平台的连接地址。格式:"amqps://${YourHost}:5671"

${YourHost}对应的AMQP接入域名信息,请参见管理实例终端节点

accessKey

登录物联网平台控制台,将鼠标移至账号头像上,然后单击AccessKey管理,获取AccessKey ID和AccessKey Secret。

说明

如果使用RAM用户,您需授予该RAM用户管理物联网平台的权限(AliyunIOTFullAccess),否则将连接失败。授权方法请参见RAM用户访问

accessSecret

consumerGroupId

当前物联网平台对应实例中的消费组ID。

登录物联网平台控制台,在对应实例的消息转发 > 服务端订阅 > 消费组列表查看您的消费组ID。

iotInstanceId

实例ID。您可在物联网平台控制台实例概览页面,查看当前实例的ID。

  • 若有ID值,必须传入该ID值。

  • 若无实例概览页面或ID值,传入空值,即iotInstanceId = ""

clientId

表示客户端ID,需您自定义,长度不可超过64个字符。建议使用您的AMQP客户端所在服务器UUID、MAC地址、IP等唯一标识。

AMQP客户端接入并启动成功后,登录物联网平台控制台,在对应实例的消息转发 > 服务端订阅 > 消费组列表页签,单击消费组对应的查看消费组详情页面将显示该参数,方便您识别区分不同的客户端。

运行结果示例

  • 成功:返回类似如下日志信息,表示AMQP客户端已接入物联网平台并成功接收消息。成功

    参数

    示例

    说明

    message_id

    2**************7

    消息的ID。

    topic

    /***********/******/thing/event/property/post

    设备属性上报的Topic。

    content

    {"deviceType":"CustomCategory","iotId":"qPi***","requestId":"161***","checkFailedData":{},"productKey":"g4***","gmtCreate":1613635594038,"deviceName":"de***","items":{"Temperature":{"value":24,"time":1613635594036},"Humidity":{"value":26,"time":1613635594036}}}

    消息的内容。

  • 失败:返回类似如下日志信息,表示AMQP客户端连接物联网平台失败。

    您可根据日志提示,检查代码或网络环境,然后修正问题,重新运行代码。

    失败

相关文档

服务端订阅消息相关错误码,请参见消息相关错误码