Example: Python 3 SDK connection
Use the Python 3 SDK to connect to Alibaba Cloud IoT Platform and receive messages through a server-side subscription.
Prerequisites
You have obtained a consumer group ID and subscribed to the required topic messages.
Manage AMQP consumer groups: You can use the default consumer group (DEFAULT_GROUP) in IoT Platform or create a consumer group.
Configure an AMQP server-side subscription: Subscribe to the required topic messages using a consumer group.
Prepare your development environment
You can use Python 3.0 or later. This example uses Python 3.8.
Download the SDK
This example uses the stomp.py and schedule libraries. For installation instructions, see Installing Packages.
Sample code
The following sample code is based on stomp.py version 7.0.0.
# encoding=utf-8
import time
import sys
import hashlib
import hmac
import base64
import stomp
import ssl
import schedule
import threading
import os
def connect_and_subscribe(conn):
# For security, do not hardcode your AccessKey pair in the code.
# This example retrieves the AccessKey pair from environment variables for reference.
accessKey = os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID']
accessSecret = os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
consumerGroupId = "${YourConsumerGroupId}"
# iotInstanceId: The ID of the instance.
iotInstanceId = "${YourIotInstanceId}"
clientId = "${YourClientId}"
# The signature algorithm. Valid values: hmacmd5, hmacsha1, and hmacsha256.
signMethod = "hmacsha1"
timestamp = current_time_millis()
# For information about how to construct the userName parameter, see the documentation for connecting an AMQP client.
# If you transmit data in a binary format, you must add the encode=base64 parameter to userName.
# The server then encodes the message body in Base64 before pushing the message.
# For more information, see the "Binary format" section in this topic.
username = clientId + "|authMode=aksign" + ",signMethod=" + signMethod \
+ ",timestamp=" + timestamp + ",authId=" + accessKey \
+ ",iotInstanceId=" + iotInstanceId \
+ ",consumerGroupId=" + consumerGroupId + "|"
signContent = "authId=" + accessKey + "×tamp=" + timestamp
# Calculate the signature. For information about how to construct the password, see the documentation for connecting an AMQP client.
password = do_sign(accessSecret.encode("utf-8"), signContent.encode("utf-8"))
conn.set_listener('', MyListener(conn))
conn.connect(username, password, wait=True)
# Clear previous connection-check tasks and create a new one.
schedule.clear('conn-check')
schedule.every(1).seconds.do(do_check,conn).tag('conn-check')
class MyListener(stomp.ConnectionListener):
def __init__(self, conn):
self.conn = conn
def on_error(self, frame):
print('received an error "%s"' % frame.body)
def on_message(self, frame):
print('received a message "%s"' % frame.body)
def on_heartbeat_timeout(self):
print('on_heartbeat_timeout')
def on_connected(self, headers):
print("successfully connected")
conn.subscribe(destination='/topic/#', id=1, ack='auto')
print("successfully subscribe")
def on_disconnected(self):
print('disconnected')
connect_and_subscribe(self.conn)
def current_time_millis():
return str(int(round(time.time() * 1000)))
def do_sign(secret, sign_content):
m = hmac.new(secret, sign_content, digestmod=hashlib.sha1)
return base64.b64encode(m.digest()).decode("utf-8")
# Check the connection and reconnect if disconnected.
def do_check(conn):
print('check connection, is_connected: %s', conn.is_connected())
if (not conn.is_connected()):
try:
connect_and_subscribe(conn)
except Exception as e:
print('disconnected, ', e)
# A scheduled task to check the connection status.
def connection_check_timer():
while 1:
schedule.run_pending()
time.sleep(10)
# The connection endpoint. For more information, see the documentation for connecting an AMQP client.
# Enter the endpoint directly. Do not add the amqps:// prefix.
conn = stomp.Connection([('${YourHost}', 61614)], heartbeats=(0,300))
conn.set_ssl(for_hosts=[('${YourHost}', 61614)], ssl_version=ssl.PROTOCOL_TLS)
try:
connect_and_subscribe(conn)
except Exception as e:
print('connecting failed')
raise e
# Run the scheduled connection check in an asynchronous thread.
thread = threading.Thread(target=connection_check_timer)
thread.start()
Replace the placeholder values in the code with your actual values. For more information about the parameters, see Connect an AMQP client to IoT Platform.
Specify valid parameter values. Otherwise, the AMQP client fails to connect to IoT Platform.
|
Parameter |
Description |
|
accessKey |
Sign in to the IoT Platform console, move the pointer over your profile picture in the upper-right corner, and click AccessKey Management to get the AccessKey ID and AccessKey secret. Note If you use a RAM user, you must grant the user the |
|
accessSecret |
|
|
consumerGroupId |
The ID of the consumer group in the IoT Platform instance. Log on to the IoT Platform console. In the corresponding instance, go to to view your consumer group ID. |
|
iotInstanceId |
The ID of the instance. You can view the ID of the current instance on the Overview tab of the IoT Platform console.
|
|
clientId |
The client ID. You must define this ID. The ID can be up to 64 characters in length. We recommend that you use a unique identifier, such as the UUID, MAC address, or IP address of the server where your AMQP client is located. After the AMQP client is connected and starts, log on to the IoT Platform console. On the Consumer Groups tab of the page for the instance, click View next to the consumer group. The Consumer Group Details page displays this parameter. This helps you identify different clients. |
|
conn |
Establishes a TLS connection between the AMQP client and IoT Platform. For information about the AMQP endpoint that corresponds to |
|
conn.set_ssl |
Sample results
-
Success: If you see log messages similar to the following, the AMQP client is connected to IoT Platform and receiving messages.
successfully connected successfully subscribe check connection, is_connected: %s True check connection, is_connected: %s True -
Failure: If you see log messages similar to the following, the AMQP client failed to connect to IoT Platform.
Use the error log to check your code and network settings. Resolve the issue and run the code again.
Could not connect to host ixxx.com, port 61614 Traceback (most recent call last): File "xxx" self.xxx File "xxx" for r xxx File "xxx" for r xxx socket.gaierror: [Errno 11001] getaddrinfo failed
Binary format
To transmit binary data, use a Base64 encoding parameter because STOMP is a text-based protocol. Without encoding, message bodies may be truncated.
Add the encode=base64 parameter to userName as shown below. The server then Base64-encodes the message body before sending it.
username = clientId + "|authMode=aksign" + ",signMethod=" + signMethod \
+ ",timestamp=" + timestamp + ",authId=" + accessKey \
+ ",iotInstanceId=" + iotInstanceId \
+ ",consumerGroupId=" + consumerGroupId \
+ ",encode=base64"+"|"
References
For more information about error codes for server-side subscription messages, see Message-related error codes.