When you need to obtain data from devices in real time, you may find that cloud APIs are insufficient because they can only retrieve Thing Specification Language (TSL) data and may not provide real-time access. The AMQP server-side subscription feature, which is based on the AMQP 1.0 protocol, allows you to reliably receive device upstream notifications in real time on your business server. This topic describes how to configure AMQP server-side subscription.
How it works
An AMQP server-side subscription forwards messages of a specified type from all devices under the same product to one or more consumer groups in real time. Each consumer group contains multiple consumers, which are AMQP clients. When a message is forwarded to a consumer group, a random consumer in the group receives the message. Different consumer groups are distinguished by their consumer group IDs. In the figure above:
-
Subscription 1: Forwards messages from all devices under Product 1 to Consumer Group 1 and Consumer Group 2.
-
Subscription 2: Forwards messages from all devices under Product 2 to Consumer Group 2.
After you configure an AMQP server-side subscription, IoT Platform automatically forwards device messages to the AMQP client. You do not need to subscribe to topics in your code. The AMQP client can receive messages as long as it is online. You cannot use an AMQP server-side subscription to send messages to devices. If you need to send messages to devices, call the APIs for message communication.
Use cases
Business server receives device messages: An AMQP server-side subscription can only forward messages of a specified type from all devices under the same product. You cannot specify individual devices or topics. For more flexible message forwarding:
-
Forwarding messages from specific devices or topics, or filtering and processing messages: First, create an AMQP consumer group. Then, run an AMQP client on your business server. Finally, use Data Forwarding to forward device messages to the corresponding consumer group. The parser in Data Forwarding can convert data formats, process strings, assemble JSON data, and handle binary data. For more information, see Script syntax. For a comparison of the use cases, advantages, and disadvantages of AMQP server-side subscription and Data Forwarding, see Comparison of data forwarding solutions.
-
Forwarding device messages to other devices: Forward data to other topics.
-
All consumers in a consumer group receive messages: When a device message is forwarded to an AMQP consumer group, a random consumer in the group receives it. If you need all consumers to receive the message, you can create a separate consumer group for each consumer if the number of consumers is small. If there are many consumers, use Data Forwarding to forward the message to Message Queue for RocketMQ, and then use broadcast consumption.
Prerequisites
-
You have created a consumer group. You can use the default consumer group (DEFAULT_GROUP) in IoT Platform or create a new one.
-
The Thing Specification Language (TSL) topic data reported by the device must be in Alink JSON format to trigger the AMQP server-side subscription. Data from custom topics reported by the device has no format requirements.
Limits
-
After establishing a connection, send an authentication request immediately. If authentication is not successful within 15 seconds, the server closes the connection.
-
The rate limit for a single AMQP client connection is 1,000 TPS. The rate limit for message forwarding TPS is determined by the Message Forwarding TPS specification of your instance. There is no limit on message size. For more limits on AMQP server-side subscriptions, see AMQP server-side subscription limits.
How to configure
Step 1: Create a consumer group
-
Log on to the IoT Platform console.
-
Create a consumer group and get the consumer group ID for connecting the AMQP client. Each consumer group can have up to 128 consumers. Each consumer can be configured with only one consumer group ID.
Step 2: Create an AMQP server-side subscription
In the IoT Platform console, create an AMQP subscription and associate it with a consumer group and the corresponding device message types.
On the Overview page, find the instance that you want to manage and click the instance ID or instance name.
-
In the navigation pane on the left, choose .
-
On the Server-side Subscription page, click Create Subscription.
-
In the Create Subscription dialog box, complete the configuration and click OK.
Parameter
Description
Product
IoT Platform forwards messages from all devices under this product. You can create only one AMQP server-side subscription for each product.
Subscription Type
Select AMQP.
Consumer Group
IoT Platform provides a default consumer group.
Click the Consumer Group list. In the Select Target Consumer Group panel on the right, select one or more consumer groups.
You can also click Create Consumer Group in the lower-right corner.
Message Type
Important-
For products and devices in open source MQTT hosting and cloud gateway scenarios, only Device Upstream Notification, Device Status Change Notification, and Device Changes Throughout Lifecycle data can be pushed. For more information about message forwarding topics and data formats, see Message communication for the open source MQTT protocol, Message forwarding for the JT/T 808 protocol, Message forwarding for the GB/T 32960 protocol, and SL 651 protocol message examples.
-
Subscribing to a topic that uses a wildcard counts as one topic. For example, if you subscribe to the topic
/asde****/+/user/getfor a product, it counts as one topic regardless of how many devices are under the product. -
A consumer group can subscribe to a maximum of 200 topics. Messages from topics that exceed this limit are not forwarded.
-
The data format for custom topics is user-defined. For more information about the messages and data formats of system topics and TSL topics, see Data formats.
-
Device Upstream Notification: Messages from topics in the topic list of all devices under a product for which the Operation Permissions is set to Publish. For more information about topics, see What is a topic?. For example, a product has three topic classes:
-
/${YourProductKey}/${YourDeviceName}/user/get, for which the device has Subscribe permission. -
/${YourProductKey}/${YourDeviceName}/user/update, for which the device has Publish permission. -
/${YourProductKey}/${YourDeviceName}/thing/event/property/post, for which the device has Publish permission. -
The AMQP server-side subscription pushes device messages from topic classes with Publish permission, which are
/${YourProductKey}/${YourDeviceName}/user/updateand/sys/${YourProductKey}/${YourDeviceName}/thing/event/property/post.
-
-
TSL Historical Data Reporting: If you subscribe to asynchronous service call response data, the Id in the response message from the device must be the same as the Id in the message sent by IoT Platform to ensure proper data subscription.
-
OTA module version reporting: Forwards a message when a device reports its Over-the-Air (OTA) module version and the version has changed.
ImportantIf you receive OTA module version reporting messages from a device via AMQP, the device must upload the OTA module name
moduleparameter. For more information about OTA upgrades, see OTA upgrade overview. -
OTA upgrade device status notification: Includes event notifications for upgrade package verification and batch upgrades, such as device upgrade success, failure, cancellation, and progress.
-
OTA upgrade batch status notification: Notification of changes in the status of a device OTA upgrade batch.
-
Device Status Change Notification: Notification sent when the online/offline status of a device under this product changes.
-
Gateway reports discovered sub-devices: The gateway reports information about discovered sub-devices to IoT Platform. This requires support from the application on the gateway. This is a message type specific to gateway products.
-
Device topology change: Messages about the creation and removal of the topological relationship between a sub-device and a gateway. This is a message type specific to gateway products.
-
Device Changes Throughout Lifecycle: Messages about device creation, deletion, disabling, enabling, and more.
-
Device tag change: Messages about tag changes reported by the device.
-
Step 3: Run the AMQP client
-
Use the AMQP SDK access demo provided by Alibaba Cloud IoT Platform. Alibaba Cloud does not provide technical support for self-developed AMQP SDKs.
-
This example uses Java. For examples in other languages, see AMQP client connection guide.
-
Run the following command to download the demo file.
wget https://linkkit-export.oss-cn-shanghai.aliyuncs.com/amqp/amqp-demo.zip -
Run the following command to unzip the demo file.
unzip amqp-demo.zip -
In the
AmqpClient.javafile in thesrc/main/java/com.aliyun.iotx.demodirectory, modify the AMQP connection information according to the following table.ImportantThe demo code in this example includes code to terminate the program (
Thread.sleep(60 * 1000);). This means the program runs for one minute after a successful start and then stops. In a real-world scenario, you can set the runtime as needed.Parameter
Description
accessKey
The AccessKey ID and AccessKey Secret of your Alibaba Cloud account or RAM user.
Log on to the IoT Platform console, move the pointer over your profile picture, and then click AccessKey Management to obtain the AccessKey ID and AccessKey Secret.
accessSecret
consumerGroupId
The ID of the consumer group in your IoT Platform instance.
Log on to the IoT Platform console. In the corresponding instance, choose to view your consumer group ID.
iotInstanceId
The instance ID. You can view the ID of the current instance on the Instance Overview page of the IoT Platform console.
-
If there is an ID value, you must pass this ID.
-
If there is no Instance Overview page or ID value, pass an empty value, such as
iotInstanceId = "".
clientId
The client ID. This is user-defined and cannot exceed 64 characters. We recommend using a unique identifier such as the UUID, MAC address, or IP address of the server where your AMQP client is located.
After the AMQP client is connected and started, log on to the IoT Platform console. In the corresponding instance, choose . Click View for the consumer group. The Consumer Group Details page will display this parameter, helping you identify and distinguish different clients.
connectionCount
The number of connections to start for the AMQP client. The maximum is 128. This is used to scale out real-time message pushing.
The Consumer Group Details page displays connected clients in the format
${clientId}+"-"+number. The minimum value for the number is 0.host
The AMQP access domain name.
For the AMQP access domain name corresponding to
${YourHost}, see View and configure instance endpoints. -
-
The relevant Maven dependencies have been added to the
pom.xmlfile. In theamqp-demoroot directory, run the following command to reload the Maven changes and build the project.mvn clean package -
In the
amqp-demo/targetdirectory, run the following command to run the generated JAR package.java -jar demo-0.0.1-SNAPSHOT-jar-with-dependencies.jar -
After running the sample code, the following information is returned, indicating that the AMQP client has connected to IoT Platform and is successfully receiving messages.
ImportantDevice messages can be received on the server only when the AMQP client is online.
10:42:43.254 [main] INFO com.aliyun.iotx.demo.AmqpClient - amqp demo is started successfully, and will exit after 60s 10:59:46.405 [AmqpProvider :(3):[amqps://iot-cn-6******.amqp.iothub.aliyuncs.com:5671]] DEBUG org.apache.qpid.jms.provider.amqp.AmqpConsumer - Dispatching received message: JmsInboundMessageDispatch { sequence = 1, messageId = 1731508564705******, consumerId = ID:4e6d6f2b-1423-4c44-91ca-37202a******:1:1:1 } 10:59:46.409 [JmsSession [ID:4e6d6f2b-1423-4c44-91ca-37202a******:1:1] delivery dispatcher] DEBUG org.apache.qpid.jms.provider.failover.FailoverProvider - Executing Failover Task: message acknowledge -> JmsInboundMessageDispatch { sequence = 1, messageId = 1731508564705073664, consumerId = ID:4e6d6f2b-1423-4c44-91ca-37202******:1:1:1 } ackType: DELIVERED (5) 10:59:46.432 [AmqpProvider :(3):[amqps://iot-cn-6******.amqp.iothub.aliyuncs.com:5671]] DEBUG org.apache.qpid.jms.provider.amqp.AmqpConsumer - Delivered Ack of message: JmsInboundMessageDispatch { sequence = 1, messageId = 1731508564705******, consumerId = ID:4e6d6f2b-1423-4c44-91ca-37202a******:1:1:1 } 10:59:46.441 [JmsSession [ID:4e6d6f2b-1423-4c44-91ca-37202******:1:1] delivery dispatcher] DEBUG org.apache.qpid.jms.provider.failover.FailoverProvider - Executing Failover Task: message acknowledge -> JmsInboundMessageDispatch { sequence = 1, messageId = 1731508564705073664, consumerId = ID:4e6d6f2b-1423-4c44-91ca-37202a******:1:1:1 } ackType: ACCEPTED (6) 10:59:46.442 [AmqpProvider :(3):[amqps://iot-cn-6******.amqp.iothub.aliyuncs.com:5671]] DEBUG org.apache.qpid.jms.provider.amqp.AmqpConsumer - Accepted Ack of message: JmsInboundMessageDispatch { sequence = 1, messageId = 1731508564705******, consumerId = ID:4e6d6f2b-1423-4c44-91ca-37202a******:1:1:1 } 10:59:46.452 [pool-1-thread-1] INFO com.aliyun.iotx.demo.AmqpClient - receive message, topic = /g18******/device1/user/update, messageId = 17315085647050******, generateTime = 1701658786356, content = testThe online AMQP clients are displayed in the corresponding consumer group. In the
amqp-demo,connectionCount = 4represents four clients.On the consumer group details page, click the Consumer Group Status tab. In the Online Client List, you can confirm that the four AMQP clients are online.
-
If an AMQP client is offline, messages for the AMQP server-side subscription accumulate. When the client comes back online, IoT Platform re-pushes the messages. If you do not need to consume the accumulated messages, clear the accumulated messages before the AMQP client goes online.
-
After all configurations are complete and the device reports subscribed data that is received by the AMQP client, you can log on to the IoT Platform console and go to the corresponding instance to view message logs.
-
On the tab, view the logs for data reported by the device, data forwarded by IoT Platform to the AMQP client, and ACK messages returned by the AMQP client. For specific operations, see Query cloud-side logs.
-
On the tab, click View in the Actions column for the target consumer group. On the consumer group details page, view the message consumption rate, message accumulation, consumption logs, and more. For specific operations, see View and monitor consumer groups.
-
Manage server-side subscriptions
After you successfully set up a server-side subscription, find the subscribed product in the subscription list on the Server-side Subscription page. You can perform the following operations.
-
Edit: Click Edit in the Actions column for the product. In the Edit Subscription dialog box, modify the Consumer Group or Message Type.
-
Delete: Click Delete in the Actions column for the product. Click OK.
WarningIf other services use the device data from this product subscription, deleting it may cause service unavailability or affect your business. Proceed with caution.
Related documents
Related APIs
|
API |
Description |
|
Creates a server-side subscription to associate a product with a consumer group and corresponding device message types. |
|
|
Updates the consumer group or device message types associated with a product in an existing server-side subscription. |
|
|
Adds a consumer group to a specified product in an existing server-side subscription. |
|
|
Queries the status of a consumer group, including online client information, message consumption rate, message accumulation, and the time of the last message consumption. |