To forward device messages to an Advanced Message Queuing Protocol (AMQP) client, you must use an AMQP software development kit (SDK) on your business server to connect the client to IoT Platform. The AMQP client must be connected to and running on your server to receive device messages from IoT Platform. This topic describes how to connect an AMQP client to IoT Platform.
Prerequisites
You have created a consumer group. When you use an AMQP SDK to start an AMQP client that consumes subscribed messages from a consumer group, you must specify the consumer group ID.
Background information
The server-side subscription and data forwarding features can forward device messages to an AMQP client for consumption. You can compare the forwarding solutions, application scenarios, and features to select a solution that meets your business needs. For more information, see:
Limits
The server-side subscription feature of Alibaba Cloud IoT Platform supports only AMQP 1.0.
An AMQP client supports a maximum of 128 connections. A maximum of 64 AMQP clients can consume messages from the same consumer group at the same time.
AMQP SDK overview
Alibaba Cloud IoT Platform provides AMQP SDK sample code in the following languages. For information about the client authentication process, authentication parameter settings, and message processing logic, see the Connection and authentication process, Connection configuration, and Client message receiving logic sections in this topic.
During the connection process, you may encounter message-related error codes. For more information, see Message-related error codes.
Connection and authentication process
The AMQP client establishes a Transmission Control Protocol (TCP) connection with IoT Platform through a three-way handshake and then performs a Transport Layer Security (TLS) handshake for verification.
NoteTo ensure data security, the receiver must use TLS encryption. Unencrypted TCP transmission is not supported.
The AMQP client requests to establish a
Connection.The authentication method is
PLAIN-SASL, which uses a username and password. After IoT Platform authenticates the username and password, aConnectionis established.In addition, according to the AMQP protocol, the AMQP client must include the heartbeat interval in the Open frame when it establishes a connection. This is the idle-time-out parameter in the AMQP protocol. The heartbeat interval is measured in milliseconds and must be between 30,000 and 300,000. If no frames are transmitted over the
Connectionwithin the heartbeat interval, IoT Platform closes the connection. The method for setting the idle-time-out parameter varies based on the SDK. For more information, see the documentation for your specific SDK.The AMQP client sends a request to IoT Platform to establish a
Receiver Link. A Receiver Link is a one-way channel for pushing data from the cloud to the client.After the client successfully establishes a
Connection, it must establish aReceiver Linkwithin 15 seconds. Otherwise, IoT Platform closes the connection.After the
Receiver Linkis established, the client is successfully connected to IoT Platform.NoteOnly one
Connectioncan have a Receiver Link. You cannot create aSender Link. This means that messages can only be pushed from IoT Platform to the client. The client cannot send messages to the cloud.The
Receiver Linkis named differently in different SDKs. For example, it is called MessageConsumer in some SDKs. Configure it based on your specific SDK.
Connection configuration
The following sections describe the endpoint and authentication parameters for connecting an AMQP client to IoT Platform.
Endpoints and ports
For the AMQP endpoints for public and Enterprise instances, see View and configure instance endpoints.
The endpoint is the ${YourHost} parameter in the AMQP SDK sample code.
For Java, .NET, Python 2.7, Node.js, and Go clients, use port 5671.
For Python 3 and PHP clients, use port 61614.
Client authentication parameters
The authentication parameters vary based on the identity that you use to connect the AMQP client to IoT Platform.
If you use an Alibaba Cloud account or a directly authorized RAM user under the account, use the following parameters:
NoteFor a directly authorized RAM user, you must grant the user the permission to use the AMQP server-side subscription feature (iot:sub). Otherwise, the connection fails. For more information about how to grant permissions, see RAM authorization for IoT Platform.
To improve data security, we recommend that you grant specific operation permissions to RAM users using RAM roles. For more information, see the following section.
userName = clientId|iotInstanceId=${iotInstanceId},authMode=aksign,signMethod=hmacsha1,consumerGroupId=${consumerGroupId},authId=${accessKey},timestamp=1573489088171| password = signMethod(stringToSign, accessSecret)If you use a RAM user that is authorized using a RAM role, use the following parameters:
NoteRAM users authorized using a RAM role can be from the current Alibaba Cloud account or from other Alibaba Cloud accounts. For more information about how to authorize a RAM user to use the server-side subscription feature using a RAM role, see Authorize a RAM user within the same account to use server-side subscription and Authorize a RAM user from a different account to use server-side subscription.
userName = clientId|iotInstanceId=${iotInstanceId},authMode=ststoken,securityToken=${SecurityToken},signMethod=hmacsha1,consumerGroupId=${consumerGroupId},authId=${accessKey},timestamp=1573489088171| password = signMethod(stringToSign, accessSecret)Table 1. userName parameter description
Parameter
Required
Description
clientId
Yes
The client ID. You must define this ID. The ID can be up to 64 characters in length. We recommend that you use a unique identifier, such as the UUID, MAC address, or IP address of the server where your AMQP client is located.
After the AMQP client is connected and starts, log on to the IoT Platform console. On the Consumer Groups tab of the page for the instance, click View next to the consumer group. The Consumer Group Details page displays this parameter. This helps you identify different clients.
iotInstanceId
No
The ID of the current IoT Platform instance.On the Instance Overview tab of the IoT Platform console, you can view the instance's ID.
If an ID exists, you must specify this parameter.
If the Instance Overview page or the ID does not exist, you do not need to specify this parameter.
authMode
Yes
The authentication mode.
For an Alibaba Cloud account or a directly authorized RAM user: Use the
aksignmode.For a RAM user that is authorized using a RAM role: Use the
ststokenmode.
securityToken
No
ImportantThis parameter is required only when a RAM user authorized by a RAM role connects to the AMQP client.
The temporary identity credentials (Security Token Service token) that a RAM user uses to assume a RAM role. You can obtain the token by calling the AssumeRole operation. For more information, see AssumeRole.
signMethod
Yes
The signature algorithm. Valid values:
hmacmd5,hmacsha1, andhmacsha256.consumerGroupId
Yes
The ID of the consumer group in the IoT Platform instance.
Log on to the IoT Platform console. In the corresponding instance, go to to view your consumer group ID.
authId
Yes
The authentication information.
For an Alibaba Cloud account or a directly authorized RAM user
The AccessKey ID of the Alibaba Cloud account or the RAM user.
Log on to the IoT Platform console, move the mouse pointer over your profile picture, and then click AccessKey Management to obtain the AccessKey.
For a RAM user that is authorized using a RAM role
The AccessKey ID of the RAM user that assumes the RAM role.
timestamp
Yes
The current time. A UNIX timestamp in milliseconds.
Table 2. password parameter description
Parameter
Required
Description
signMethod
Yes
The signature algorithm. Use the signature algorithm specified in userName to calculate the signature value and convert it to a Base64 string.
stringToSign
Yes
The string to be signed.
Sort the key-value pairs of the parameters to be signed in alphabetical order by key. Concatenate the key-value pairs into a string. Use equal signs (=) to separate keys and values, and ampersands (&) to separate parameters.
For an Alibaba Cloud account or a directly authorized RAM user
The parameters to be signed are
authIdandtimestamp.The string to be signed is:
stringToSign = authId=${accessKey}×tamp=1573489088171.For a RAM user that is authorized using a RAM role
The parameters to be signed are
securityToken,authId, andtimestamp.The string to be signed is:
stringToSign = authId=${accessKey}&securityToken=${SecurityToken}×tamp=1573489088171.
accessSecret
Yes
For an Alibaba Cloud account or a directly authorized RAM user
The AccessKey secret of the Alibaba Cloud account or the RAM user.
Log on to the IoT Platform console, move the mouse pointer over your profile picture, and then click AccessKey Management to obtain the AccessKey.
For a RAM user that is authorized using a RAM role
The AccessKey secret of the RAM user that assumes the RAM role.
Client message receiving logic
After the Receiver Link between the client and IoT Platform is established, IoT Platform can push messages to the AMQP client over this link.
The client can only receive messages that are subscribed to by IoT Platform. To send messages or instructions to devices, you can call the corresponding API operations. For more information, see List of API operations.
Message push
IoT Platform push messages:
Message body: The message payload is in binary format.
Business properties: Properties such as the message topic and message ID are obtained from the Application Properties of the AMQP protocol. The format is
key:value.Key
Meaning
topic
Message topic.
messageId
Message ID.
generateTime
The time when the message was generated.
NoteThe message generation time generateTime cannot be used to determine the message order.
Message acknowledgment
According to the AMQP protocol, the client must send an acknowledgment (also known as a settle) to IoT Platform to confirm that the message was successfully received. AMQP clients typically provide an automatic acknowledgment mode, which is recommended, and a manual acknowledgment mode. For more information, see the documentation for your client.
Message processing
After the AMQP client receives a message, you must develop the business logic to process the message.
Message policy
Real-time messages are pushed directly.
Messages that enter the stacked queue:
Messages may not be consumed in real time due to specific issues. For example, if consumers disconnect from IoT Platform or messages are consumed at a low speed, messages are accumulated in these cases.
After disconnected consumers re-connect to IoT Platform and start to consume messages at a stable speed, IoT Platform pushes the accumulated messages to the consumers.
If consumers fail to consume the pushed messages, the queue in which accumulated messages reside may be blocked. When the messages fail to be consumed, IoT Platform retries to push the accumulated messages to the consumers after approximately 1 minute.
A brief traffic imbalance on the consumer side is normal. The traffic usually recovers within 10 minutes. If your message queries per second (QPS) is high or message processing is resource-intensive, we recommend that you increase the number of consumer clients to ensure sufficient consumption capacity.
During data forwarding, the same message may be sent multiple times to ensure delivery. This continues until the client returns an acknowledgment (ACK) or the message expires. The same message has the same message ID. You can use the message ID to deduplicate messages.
For more information about message limits, see Server-side subscription limits.
You can purge stacked messages in the IoT Platform console. For more information, see View and monitor consumer groups.
Message time series
Messages are not guaranteed to be in order. The order in which you receive messages may not be the order in which they were generated.
Device online and offline messages:
The received messages are not sorted by the time when the device went online or offline. You must sort the messages based on the value of the time parameter.
For example, you receive the following three messages in sequence:
Online:
2018-08-31 10:02:28.195.Offline:
2018-08-31 10:01:28.195.Offline:
2018-08-31 10:03:28.195.
These three messages show that the device first went offline, then went online, and finally went offline again.
For more information about the parameters in the message, see Data format.
Other types of messages:
You must add a serial number to messages at the business layer. Use the serial number in the received message to perform an idempotent check to determine whether the message needs to be processed.