Connect a client to IoT Platform by using the SDK for Python 2.7
Use the SDK for Python 2.7 to connect an AMQP client to Alibaba Cloud IoT Platform and receive messages through server-side subscription.
Prerequisites
You have obtained a consumer group ID and subscribed to the required topic messages.
Manage AMQP consumer groups: You can use the default consumer group (DEFAULT_GROUP) in IoT Platform or create a consumer group.
Configure an AMQP server-side subscription: Subscribe to the required topic messages using a consumer group.
Development environment
This example uses Python 2.7.
Download the SDK
We recommend the Apache Qpid Proton 0.29.0 library, which encapsulates the Python API. To download the library and view the instructions, visit Qpid Proton 0.29.0.
Install Qpid Proton. For more information, see Installing Qpid Proton.
After you install Qpid Proton, run the following Python command to verify that the SSL library is available:
import proton;print('%s' % 'SSL present' if proton.SSL.present() else 'SSL NOT AVAILABLE')
Sample code
# encoding=utf-8
import sys
import logging
import time
from proton.handlers import MessagingHandler
from proton.reactor import Container
import hashlib
import hmac
import base64
import os
reload(sys)
sys.setdefaultencoding('utf-8')
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
console_handler = logging.StreamHandler(sys.stdout)
def current_time_millis():
return str(int(round(time.time() * 1000)))
def do_sign(secret, sign_content):
m = hmac.new(secret, sign_content, digestmod=hashlib.sha1)
return base64.b64encode(m.digest())
class AmqpClient(MessagingHandler):
def __init__(self):
super(AmqpClient, self).__init__()
def on_start(self, event):
# The endpoint used to connect to IoT Platform. For details, see the AMQP connection guide.
url = "amqps://${YourHost}:5671"
# Hard-coding an AccessKey pair in your project code poses a security risk. If the code is leaked, your AccessKey pair will be exposed, compromising all resources in your account. The following code retrieves the AccessKey pair from environment variables as a best practice. This is for reference only.
accessKey = os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID']
accessSecret = os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
consumerGroupId = "${YourConsumerGroupId}"
clientId = "${YourClientId}"
# iotInstanceId: The ID of the IoT Platform instance.
iotInstanceId = "${YourIotInstanceId}"
# The signature algorithm. Valid values: hmacmd5, hmacsha1, and hmacsha256.
signMethod = "hmacsha1"
timestamp = current_time_millis()
# For instructions on how to construct the userName parameter, see the AMQP connection guide.
userName = clientId + "|authMode=aksign" + ",signMethod=" + signMethod \
+ ",timestamp=" + timestamp + ",authId=" + accessKey \
+ ",iotInstanceId=" + iotInstanceId + ",consumerGroupId=" + consumerGroupId + "|"
signContent = "authId=" + accessKey + "×tamp=" + timestamp
# Calculate the signature. For instructions on how to construct the password, see the AMQP connection guide.
passWord = do_sign(accessSecret.encode("utf-8"), signContent.encode("utf-8"))
conn = event.container.connect(url, user=userName, password=passWord, heartbeat=60)
self.receiver = event.container.create_receiver(conn)
# Called when the connection is successfully established.
def on_connection_opened(self, event):
logger.info("Connection established, remoteUrl: %s", event.connection.hostname)
# Called when the connection is closed.
def on_connection_closed(self, event):
logger.info("Connection closed: %s", self)
# Called when the remote peer closes the connection due to an error.
def on_connection_error(self, event):
logger.info("Connection error")
# Called when an AMQP connection error occurs, such as an authentication or socket error.
def on_transport_error(self, event):
if event.transport.condition:
if event.transport.condition.info:
logger.error("%s: %s: %s" % (
event.transport.condition.name, event.transport.condition.description,
event.transport.condition.info))
else:
logger.error("%s: %s" % (event.transport.condition.name, event.transport.condition.description))
else:
logging.error("Unspecified transport error")
# Called when a message is received.
def on_message(self, event):
message = event.message
content = message.body.decode('utf-8')
topic = message.properties.get("topic")
message_id = message.properties.get("messageId")
print("receive message: message_id=%s, topic=%s, content=%s" % (message_id, topic, content))
event.receiver.flow(1)
Container(AmqpClient()).run()
You can configure the parameters in the preceding code based on the parameter description in the following table. For more information about other parameters, see Connect an AMQP client to IoT Platform.
Make sure that you specify valid parameter values. Otherwise, the AMQP client fails to connect to IoT Platform.
|
Parameter |
Description |
|
url |
The endpoint that the AMQP client uses to connect to IoT Platform. Format: For more information about the endpoint that you can specify for the |
|
accessKey |
Log on to the IoT Platform console, hover over the profile picture in the upper-right corner, and then click AccessKey Management to obtain the AccessKey ID and AccessKey secret. Note
If you use a Resource Access Management (RAM) user, you must attach the AliyunIOTFullAccess policy to the RAM user to grant permissions for managing IoT Platform resources. Otherwise, the connection fails. For more information, see Access IoT Platform as a RAM user. |
|
accessSecret |
|
|
consumerGroupId |
The ID of the consumer group in the IoT Platform instance. Log on to the IoT Platform console. In the corresponding instance, go to to view your consumer group ID. |
|
iotInstanceId |
The ID of the IoT Platform instance. You can view the instance ID on the Overview tab in the IoT Platform console.
|
|
clientId |
The client ID. You must define this ID. The ID can be up to 64 characters in length. We recommend that you use a unique identifier, such as the UUID, MAC address, or IP address of the server where your AMQP client is located. After the AMQP client is connected and starts, log on to the IoT Platform console. On the Consumer Groups tab of the page for the instance, click View next to the consumer group. The Consumer Group Details page displays this parameter. This helps you identify different clients. |
Sample results
-
Success: The following log message indicates that the AMQP client connected to IoT Platform and is receiving messages.
202x-xx-xx xx:xx:xx,490 - __main__ - INFO - Connection established, remoteUrl: xxx.amqp.iothub.aliyuncs.com receive message: message_id=13xxx, topic=/xxx/thing/event/property/post, content={"deviceType":"CustomCategory","iotId":"xxx","requestId":"1xxx"} receive message: message_id=13xxx, topic=/xxx/thing/event/property/post, content={"deviceType":"CustomCategory","iotId":"xxx","requestId":"1xxx"}Parameter
Example
Description
message_id
2**************7
The ID of the message.
topic
/***********/******/thing/event/property/post
The topic used to submit device properties.
content
{"deviceType":"CustomCategory","iotId":"qPi*","requestId":"161*","checkFailedData":{},"productKey":"g4*","gmtCreate":1613635594038,"deviceName":"de*","items":{"Temperature":{"value":24,"time":1613635594036},"Humidity":{"value":26,"time":1613635594036}}}
The content of the message.
-
If information similar to the following output is displayed, the AMQP client fails to connect to IoT Platform.
Use the error log to check your code and network settings. Resolve the issue and run the code again.
2021-02-18 16:19:40,993 - proton - ERROR - Couldn't connect: 10053 2021-02-18 16:19:40,996 - __main__ - ERROR - proton.pythonio: Connection error: 10053 2021-02-18 16:19:40,996 - proton - INFO - Disconnected, reconnecting... Traceback (most recent call last): File "xxx", line 87, in <module> Container(AmqpClient()).run() File "xxx", line 181, in run while self.process(): pass File "xxx", line 240, in process event.dispatch(self._global_handler) File "xxx", line 135, in dispatch _dispatch(handler, type.method, self) File "xxx", line 117, in _dispatch handler.on_unhandled(method, *args) File "xxx", line 665, in on_unhandled event.dispatch(self.base) File "xxx", line 135, in dispatch _dispatch(handler, type.method, self) File "xxx", line 115, in _dispatch m(*args) File "xxx", line 892, in on_connection_bound addrs = socket.getaddrinfo(host, port, socket.AF_UNSPEC, socket.SOCK_STREAM) socket.gaierror: [Errno 11001] getaddrinfo failed
References
For more information about error codes for server-side subscription messages, see Message-related error codes.