Example: Python 3 SDK connection

更新时间:
复制 MD 格式

Use the Python 3 SDK to connect to Alibaba Cloud IoT Platform and receive messages through a server-side subscription.

Prerequisites

You have obtained a consumer group ID and subscribed to the required topic messages.

Prepare your development environment

You can use Python 3.0 or later. This example uses Python 3.8.

Download the SDK

This example uses the stomp.py and schedule libraries. For installation instructions, see Installing Packages.

Sample code

The following sample code is based on stomp.py version 7.0.0.

# encoding=utf-8
import time
import sys
import hashlib
import hmac
import base64
import stomp
import ssl
import schedule
import threading
import os
def connect_and_subscribe(conn):
    # For security, do not hardcode your AccessKey pair in the code.
    # This example retrieves the AccessKey pair from environment variables for reference.
    accessKey = os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID']
    accessSecret = os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
    consumerGroupId = "${YourConsumerGroupId}"
    # iotInstanceId: The ID of the instance.
    iotInstanceId = "${YourIotInstanceId}"
    clientId = "${YourClientId}"
    # The signature algorithm. Valid values: hmacmd5, hmacsha1, and hmacsha256.
    signMethod = "hmacsha1"
    timestamp = current_time_millis()
    # For information about how to construct the userName parameter, see the documentation for connecting an AMQP client.
    # If you transmit data in a binary format, you must add the encode=base64 parameter to userName. 
    # The server then encodes the message body in Base64 before pushing the message. 
    # For more information, see the "Binary format" section in this topic.
    username = clientId + "|authMode=aksign" + ",signMethod=" + signMethod \
                    + ",timestamp=" + timestamp + ",authId=" + accessKey \
                    + ",iotInstanceId=" + iotInstanceId \
                    + ",consumerGroupId=" + consumerGroupId + "|"
    signContent = "authId=" + accessKey + "&timestamp=" + timestamp
    # Calculate the signature. For information about how to construct the password, see the documentation for connecting an AMQP client.
    password = do_sign(accessSecret.encode("utf-8"), signContent.encode("utf-8"))
    conn.set_listener('', MyListener(conn))
    conn.connect(username, password, wait=True)
    # Clear previous connection-check tasks and create a new one.
    schedule.clear('conn-check')
    schedule.every(1).seconds.do(do_check,conn).tag('conn-check')
class MyListener(stomp.ConnectionListener):
    def __init__(self, conn):
        self.conn = conn
    def on_error(self, frame):
        print('received an error "%s"' % frame.body)
    def on_message(self, frame):
        print('received a message "%s"' % frame.body)
    def on_heartbeat_timeout(self):
        print('on_heartbeat_timeout')
    def on_connected(self, headers):
        print("successfully connected")
        conn.subscribe(destination='/topic/#', id=1, ack='auto')
        print("successfully subscribe")
    def on_disconnected(self):
        print('disconnected')
        connect_and_subscribe(self.conn)
def current_time_millis():
    return str(int(round(time.time() * 1000)))
def do_sign(secret, sign_content):
    m = hmac.new(secret, sign_content, digestmod=hashlib.sha1)
    return base64.b64encode(m.digest()).decode("utf-8")
# Check the connection and reconnect if disconnected.
def do_check(conn):
    print('check connection, is_connected: %s', conn.is_connected())
    if (not conn.is_connected()):
        try:
            connect_and_subscribe(conn)
        except Exception as e:
            print('disconnected, ', e)
# A scheduled task to check the connection status.
def connection_check_timer():
    while 1:
        schedule.run_pending()
        time.sleep(10)
# The connection endpoint. For more information, see the documentation for connecting an AMQP client.
# Enter the endpoint directly. Do not add the amqps:// prefix.
conn = stomp.Connection([('${YourHost}', 61614)], heartbeats=(0,300))
conn.set_ssl(for_hosts=[('${YourHost}', 61614)], ssl_version=ssl.PROTOCOL_TLS)
try:
    connect_and_subscribe(conn)
except Exception as e:
    print('connecting failed')
    raise e
# Run the scheduled connection check in an asynchronous thread.
thread = threading.Thread(target=connection_check_timer)
thread.start()

Replace the placeholder values in the code with your actual values. For more information about the parameters, see Connect an AMQP client to IoT Platform.

Important

Specify valid parameter values. Otherwise, the AMQP client fails to connect to IoT Platform.

Parameter

Description

accessKey

Sign in to the IoT Platform console, move the pointer over your profile picture in the upper-right corner, and click AccessKey Management to get the AccessKey ID and AccessKey secret.

Note If you use a RAM user, you must grant the user the AliyunIOTFullAccess permission. Otherwise, the connection fails. For more information about how to grant permissions, see Access IoT Platform as a RAM user.

accessSecret

consumerGroupId

The ID of the consumer group in the IoT Platform instance.

Log on to the IoT Platform console. In the corresponding instance, go to Message Forwarding > Server-side Subscription > Consumer Group List to view your consumer group ID.

iotInstanceId

The ID of the instance. You can view the ID of the current instance on the Overview tab of the IoT Platform console.

  • If an instance ID is available, you must specify it in the parameter.

  • If the Overview tab is not displayed or no instance ID is available, set this parameter to an empty string, for example, iotInstanceId = "".

clientId

The client ID. You must define this ID. The ID can be up to 64 characters in length. We recommend that you use a unique identifier, such as the UUID, MAC address, or IP address of the server where your AMQP client is located.

After the AMQP client is connected and starts, log on to the IoT Platform console. On the Consumer Groups tab of the Message Forwarding > > > Server-side Subscription page for the instance, click View next to the consumer group. The Consumer Group Details page displays this parameter. This helps you identify different clients.

conn

Establishes a TLS connection between the AMQP client and IoT Platform.

For information about the AMQP endpoint that corresponds to ${YourHost}, see View and manage instance endpoints.

conn.set_ssl

Sample results

  • Success: If you see log messages similar to the following, the AMQP client is connected to IoT Platform and receiving messages.

    successfully connected
    successfully subscribe
    check connection, is_connected: %s True
    check connection, is_connected: %s True
  • Failure: If you see log messages similar to the following, the AMQP client failed to connect to IoT Platform.

    Use the error log to check your code and network settings. Resolve the issue and run the code again.

    Could not connect to host ixxx.com, port 61614
    Traceback (most recent call last):
      File "xxx"
        self.xxx
      File "xxx"
        for r xxx
      File "xxx"
        for r xxx
    socket.gaierror: [Errno 11001] getaddrinfo failed

Binary format

To transmit binary data, use a Base64 encoding parameter because STOMP is a text-based protocol. Without encoding, message bodies may be truncated.

Add the encode=base64 parameter to userName as shown below. The server then Base64-encodes the message body before sending it.

username = clientId + "|authMode=aksign" + ",signMethod=" + signMethod \
                    + ",timestamp=" + timestamp + ",authId=" + accessKey \
                    + ",iotInstanceId=" + iotInstanceId \
                    + ",consumerGroupId=" + consumerGroupId \ 
                    + ",encode=base64"+"|"

References

For more information about error codes for server-side subscription messages, see Message-related error codes.