Connect an AMQP client

更新时间:
复制 MD 格式

To forward device messages to an Advanced Message Queuing Protocol (AMQP) client, you must use an AMQP software development kit (SDK) on your business server to connect the client to IoT Platform. The AMQP client must be connected to and running on your server to receive device messages from IoT Platform. This topic describes how to connect an AMQP client to IoT Platform.

Prerequisites

You have created a consumer group. When you use an AMQP SDK to start an AMQP client that consumes subscribed messages from a consumer group, you must specify the consumer group ID.

Background information

The server-side subscription and data forwarding features can forward device messages to an AMQP client for consumption. You can compare the forwarding solutions, application scenarios, and features to select a solution that meets your business needs. For more information, see:

Limits

  • The server-side subscription feature of Alibaba Cloud IoT Platform supports only AMQP 1.0.

  • An AMQP client supports a maximum of 128 connections. A maximum of 64 AMQP clients can consume messages from the same consumer group at the same time.

AMQP SDK overview

Alibaba Cloud IoT Platform provides AMQP SDK sample code in the following languages. For information about the client authentication process, authentication parameter settings, and message processing logic, see the Connection and authentication process, Connection configuration, and Client message receiving logic sections in this topic.

During the connection process, you may encounter message-related error codes. For more information, see Message-related error codes.

Connection and authentication process

  1. The AMQP client establishes a Transmission Control Protocol (TCP) connection with IoT Platform through a three-way handshake and then performs a Transport Layer Security (TLS) handshake for verification.

    Note

    To ensure data security, the receiver must use TLS encryption. Unencrypted TCP transmission is not supported.

  2. The AMQP client requests to establish a Connection.

    The authentication method is PLAIN-SASL, which uses a username and password. After IoT Platform authenticates the username and password, a Connection is established.

    In addition, according to the AMQP protocol, the AMQP client must include the heartbeat interval in the Open frame when it establishes a connection. This is the idle-time-out parameter in the AMQP protocol. The heartbeat interval is measured in milliseconds and must be between 30,000 and 300,000. If no frames are transmitted over the Connection within the heartbeat interval, IoT Platform closes the connection. The method for setting the idle-time-out parameter varies based on the SDK. For more information, see the documentation for your specific SDK.

  3. The AMQP client sends a request to IoT Platform to establish a Receiver Link. A Receiver Link is a one-way channel for pushing data from the cloud to the client.

    After the client successfully establishes a Connection, it must establish a Receiver Link within 15 seconds. Otherwise, IoT Platform closes the connection.

    After the Receiver Link is established, the client is successfully connected to IoT Platform.

    Note
    • Only one Connection can have a Receiver Link. You cannot create a Sender Link. This means that messages can only be pushed from IoT Platform to the client. The client cannot send messages to the cloud.

    • The Receiver Link is named differently in different SDKs. For example, it is called MessageConsumer in some SDKs. Configure it based on your specific SDK.

Connection configuration

The following sections describe the endpoint and authentication parameters for connecting an AMQP client to IoT Platform.

Endpoints and ports

For the AMQP endpoints for public and Enterprise instances, see View and configure instance endpoints.

Note

The endpoint is the ${YourHost} parameter in the AMQP SDK sample code.

  • For Java, .NET, Python 2.7, Node.js, and Go clients, use port 5671.

  • For Python 3 and PHP clients, use port 61614.

Client authentication parameters

The authentication parameters vary based on the identity that you use to connect the AMQP client to IoT Platform.

  • If you use an Alibaba Cloud account or a directly authorized RAM user under the account, use the following parameters:

    Note

    For a directly authorized RAM user, you must grant the user the permission to use the AMQP server-side subscription feature (iot:sub). Otherwise, the connection fails. For more information about how to grant permissions, see RAM authorization for IoT Platform.

    To improve data security, we recommend that you grant specific operation permissions to RAM users using RAM roles. For more information, see the following section.

    userName = clientId|iotInstanceId=${iotInstanceId},authMode=aksign,signMethod=hmacsha1,consumerGroupId=${consumerGroupId},authId=${accessKey},timestamp=1573489088171|
    password = signMethod(stringToSign, accessSecret)
  • If you use a RAM user that is authorized using a RAM role, use the following parameters:

    Note

    RAM users authorized using a RAM role can be from the current Alibaba Cloud account or from other Alibaba Cloud accounts. For more information about how to authorize a RAM user to use the server-side subscription feature using a RAM role, see Authorize a RAM user within the same account to use server-side subscription and Authorize a RAM user from a different account to use server-side subscription.

    userName = clientId|iotInstanceId=${iotInstanceId},authMode=ststoken,securityToken=${SecurityToken},signMethod=hmacsha1,consumerGroupId=${consumerGroupId},authId=${accessKey},timestamp=1573489088171|
    password = signMethod(stringToSign, accessSecret)

    Table 1. userName parameter description

    Parameter

    Required

    Description

    clientId

    Yes

    The client ID. You must define this ID. The ID can be up to 64 characters in length. We recommend that you use a unique identifier, such as the UUID, MAC address, or IP address of the server where your AMQP client is located.

    After the AMQP client is connected and starts, log on to the IoT Platform console. On the Consumer Groups tab of the Message Forwarding > > > Server-side Subscription page for the instance, click View next to the consumer group. The Consumer Group Details page displays this parameter. This helps you identify different clients.

    iotInstanceId

    No

    The ID of the current IoT Platform instance.On the Instance Overview tab of the IoT Platform console, you can view the instance's ID.

    • If an ID exists, you must specify this parameter.

    • If the Instance Overview page or the ID does not exist, you do not need to specify this parameter.

    authMode

    Yes

    The authentication mode.

    • For an Alibaba Cloud account or a directly authorized RAM user: Use the aksign mode.

    • For a RAM user that is authorized using a RAM role: Use the ststoken mode.

    securityToken

    No

    Important

    This parameter is required only when a RAM user authorized by a RAM role connects to the AMQP client.

    The temporary identity credentials (Security Token Service token) that a RAM user uses to assume a RAM role. You can obtain the token by calling the AssumeRole operation. For more information, see AssumeRole.

    signMethod

    Yes

    The signature algorithm. Valid values: hmacmd5, hmacsha1, and hmacsha256.

    consumerGroupId

    Yes

    The ID of the consumer group in the IoT Platform instance.

    Log on to the IoT Platform console. In the corresponding instance, go to Message Forwarding > Server-side Subscription > Consumer Group List to view your consumer group ID.

    authId

    Yes

    The authentication information.

    • For an Alibaba Cloud account or a directly authorized RAM user

      The AccessKey ID of the Alibaba Cloud account or the RAM user.

      Log on to the IoT Platform console, move the mouse pointer over your profile picture, and then click AccessKey Management to obtain the AccessKey.

    • For a RAM user that is authorized using a RAM role

      The AccessKey ID of the RAM user that assumes the RAM role.

    timestamp

    Yes

    The current time. A UNIX timestamp in milliseconds.

    Table 2. password parameter description

    Parameter

    Required

    Description

    signMethod

    Yes

    The signature algorithm. Use the signature algorithm specified in userName to calculate the signature value and convert it to a Base64 string.

    stringToSign

    Yes

    The string to be signed.

    Sort the key-value pairs of the parameters to be signed in alphabetical order by key. Concatenate the key-value pairs into a string. Use equal signs (=) to separate keys and values, and ampersands (&) to separate parameters.

    • For an Alibaba Cloud account or a directly authorized RAM user

      The parameters to be signed are authId and timestamp.

      The string to be signed is: stringToSign = authId=${accessKey}&timestamp=1573489088171.

    • For a RAM user that is authorized using a RAM role

      The parameters to be signed are securityToken, authId, and timestamp.

      The string to be signed is: stringToSign = authId=${accessKey}&securityToken=${SecurityToken}&timestamp=1573489088171.

    accessSecret

    Yes

    • For an Alibaba Cloud account or a directly authorized RAM user

      The AccessKey secret of the Alibaba Cloud account or the RAM user.

      Log on to the IoT Platform console, move the mouse pointer over your profile picture, and then click AccessKey Management to obtain the AccessKey.

    • For a RAM user that is authorized using a RAM role

      The AccessKey secret of the RAM user that assumes the RAM role.

Client message receiving logic

After the Receiver Link between the client and IoT Platform is established, IoT Platform can push messages to the AMQP client over this link.

Note

The client can only receive messages that are subscribed to by IoT Platform. To send messages or instructions to devices, you can call the corresponding API operations. For more information, see List of API operations.

Message push

IoT Platform push messages:

  • Message body: The message payload is in binary format.

  • Business properties: Properties such as the message topic and message ID are obtained from the Application Properties of the AMQP protocol. The format is key:value.

    Key

    Meaning

    topic

    Message topic.

    messageId

    Message ID.

    generateTime

    The time when the message was generated.

    Note

    The message generation time generateTime cannot be used to determine the message order.

Message acknowledgment

According to the AMQP protocol, the client must send an acknowledgment (also known as a settle) to IoT Platform to confirm that the message was successfully received. AMQP clients typically provide an automatic acknowledgment mode, which is recommended, and a manual acknowledgment mode. For more information, see the documentation for your client.

Message processing

After the AMQP client receives a message, you must develop the business logic to process the message.

Message policy

  • Real-time messages are pushed directly.

  • Messages that enter the stacked queue:

    Messages may not be consumed in real time due to specific issues. For example, if consumers disconnect from IoT Platform or messages are consumed at a low speed, messages are accumulated in these cases.

    • After disconnected consumers re-connect to IoT Platform and start to consume messages at a stable speed, IoT Platform pushes the accumulated messages to the consumers.

    • If consumers fail to consume the pushed messages, the queue in which accumulated messages reside may be blocked. When the messages fail to be consumed, IoT Platform retries to push the accumulated messages to the consumers after approximately 1 minute.

Note
  • A brief traffic imbalance on the consumer side is normal. The traffic usually recovers within 10 minutes. If your message queries per second (QPS) is high or message processing is resource-intensive, we recommend that you increase the number of consumer clients to ensure sufficient consumption capacity.

  • During data forwarding, the same message may be sent multiple times to ensure delivery. This continues until the client returns an acknowledgment (ACK) or the message expires. The same message has the same message ID. You can use the message ID to deduplicate messages.

  • For more information about message limits, see Server-side subscription limits.

  • You can purge stacked messages in the IoT Platform console. For more information, see View and monitor consumer groups.

Message time series

Note

Messages are not guaranteed to be in order. The order in which you receive messages may not be the order in which they were generated.

  • Device online and offline messages:

    The received messages are not sorted by the time when the device went online or offline. You must sort the messages based on the value of the time parameter.

    For example, you receive the following three messages in sequence:

    1. Online: 2018-08-31 10:02:28.195.

    2. Offline: 2018-08-31 10:01:28.195.

    3. Offline: 2018-08-31 10:03:28.195.

    These three messages show that the device first went offline, then went online, and finally went offline again.

    For more information about the parameters in the message, see Data format.

  • Other types of messages:

    You must add a serial number to messages at the business layer. Use the serial number in the received message to perform an idempotent check to determine whether the message needs to be processed.