Configure AMQP server-side subscription

更新时间:
复制 MD 格式

When you need to obtain data from devices in real time, you may find that cloud APIs are insufficient because they can only retrieve Thing Specification Language (TSL) data and may not provide real-time access. The AMQP server-side subscription feature, which is based on the AMQP 1.0 protocol, allows you to reliably receive device upstream notifications in real time on your business server. This topic describes how to configure AMQP server-side subscription.

How it works

image

An AMQP server-side subscription forwards messages of a specified type from all devices under the same product to one or more consumer groups in real time. Each consumer group contains multiple consumers, which are AMQP clients. When a message is forwarded to a consumer group, a random consumer in the group receives the message. Different consumer groups are distinguished by their consumer group IDs. In the figure above:

  • Subscription 1: Forwards messages from all devices under Product 1 to Consumer Group 1 and Consumer Group 2.

  • Subscription 2: Forwards messages from all devices under Product 2 to Consumer Group 2.

After you configure an AMQP server-side subscription, IoT Platform automatically forwards device messages to the AMQP client. You do not need to subscribe to topics in your code. The AMQP client can receive messages as long as it is online. You cannot use an AMQP server-side subscription to send messages to devices. If you need to send messages to devices, call the APIs for message communication.

Use cases

Business server receives device messages: An AMQP server-side subscription can only forward messages of a specified type from all devices under the same product. You cannot specify individual devices or topics. For more flexible message forwarding:

Prerequisites

  • You have created a product.

  • You have created a consumer group. You can use the default consumer group (DEFAULT_GROUP) in IoT Platform or create a new one.

  • The Thing Specification Language (TSL) topic data reported by the device must be in Alink JSON format to trigger the AMQP server-side subscription. Data from custom topics reported by the device has no format requirements.

Limits

  • After establishing a connection, send an authentication request immediately. If authentication is not successful within 15 seconds, the server closes the connection.

  • The rate limit for a single AMQP client connection is 1,000 TPS. The rate limit for message forwarding TPS is determined by the Message Forwarding TPS specification of your instance. There is no limit on message size. For more limits on AMQP server-side subscriptions, see AMQP server-side subscription limits.

How to configure

Step 1: Create a consumer group

  1. Log on to the IoT Platform console.

  2. Create a consumer group and get the consumer group ID for connecting the AMQP client. Each consumer group can have up to 128 consumers. Each consumer can be configured with only one consumer group ID.

Step 2: Create an AMQP server-side subscription

In the IoT Platform console, create an AMQP subscription and associate it with a consumer group and the corresponding device message types.

  1. On the Overview page, find the instance that you want to manage and click the instance ID or instance name.

  2. In the navigation pane on the left, choose Message Forwarding > Server-side Subscription.

  3. On the Server-side Subscription page, click Create Subscription.

  4. In the Create Subscription dialog box, complete the configuration and click OK.

    Parameter

    Description

    Product

    IoT Platform forwards messages from all devices under this product. You can create only one AMQP server-side subscription for each product.

    Subscription Type

    Select AMQP.

    Consumer Group

    IoT Platform provides a default consumer group.

    Click the Consumer Group list. In the Select Target Consumer Group panel on the right, select one or more consumer groups.

    You can also click Create Consumer Group in the lower-right corner.

    Message Type

    Important
    • For products and devices in open source MQTT hosting and cloud gateway scenarios, only Device Upstream Notification, Device Status Change Notification, and Device Changes Throughout Lifecycle data can be pushed. For more information about message forwarding topics and data formats, see Message communication for the open source MQTT protocol, Message forwarding for the JT/T 808 protocol, Message forwarding for the GB/T 32960 protocol, and SL 651 protocol message examples.

    • Subscribing to a topic that uses a wildcard counts as one topic. For example, if you subscribe to the topic /asde****/+/user/get for a product, it counts as one topic regardless of how many devices are under the product.

    • A consumer group can subscribe to a maximum of 200 topics. Messages from topics that exceed this limit are not forwarded.

    • The data format for custom topics is user-defined. For more information about the messages and data formats of system topics and TSL topics, see Data formats.

    • Device Upstream Notification: Messages from topics in the topic list of all devices under a product for which the Operation Permissions is set to Publish. For more information about topics, see What is a topic?. For example, a product has three topic classes:

      • /${YourProductKey}/${YourDeviceName}/user/get, for which the device has Subscribe permission.

      • /${YourProductKey}/${YourDeviceName}/user/update, for which the device has Publish permission.

      • /${YourProductKey}/${YourDeviceName}/thing/event/property/post, for which the device has Publish permission.

      • The AMQP server-side subscription pushes device messages from topic classes with Publish permission, which are /${YourProductKey}/${YourDeviceName}/user/update and /sys/${YourProductKey}/${YourDeviceName}/thing/event/property/post.

    • TSL Historical Data Reporting: If you subscribe to asynchronous service call response data, the Id in the response message from the device must be the same as the Id in the message sent by IoT Platform to ensure proper data subscription.

    • OTA module version reporting: Forwards a message when a device reports its Over-the-Air (OTA) module version and the version has changed.

      Important

      If you receive OTA module version reporting messages from a device via AMQP, the device must upload the OTA module name module parameter. For more information about OTA upgrades, see OTA upgrade overview.

    • OTA upgrade device status notification: Includes event notifications for upgrade package verification and batch upgrades, such as device upgrade success, failure, cancellation, and progress.

    • OTA upgrade batch status notification: Notification of changes in the status of a device OTA upgrade batch.

    • Device Status Change Notification: Notification sent when the online/offline status of a device under this product changes.

    • Gateway reports discovered sub-devices: The gateway reports information about discovered sub-devices to IoT Platform. This requires support from the application on the gateway. This is a message type specific to gateway products.

    • Device topology change: Messages about the creation and removal of the topological relationship between a sub-device and a gateway. This is a message type specific to gateway products.

    • Device Changes Throughout Lifecycle: Messages about device creation, deletion, disabling, enabling, and more.

    • Device tag change: Messages about tag changes reported by the device.

Step 3: Run the AMQP client

Important
  • Use the AMQP SDK access demo provided by Alibaba Cloud IoT Platform. Alibaba Cloud does not provide technical support for self-developed AMQP SDKs.

  • This example uses Java. For examples in other languages, see AMQP client connection guide.

  1. Run the following command to download the demo file.

    wget https://linkkit-export.oss-cn-shanghai.aliyuncs.com/amqp/amqp-demo.zip
  2. Run the following command to unzip the demo file.

    unzip amqp-demo.zip
  3. In the AmqpClient.java file in the src/main/java/com.aliyun.iotx.demo directory, modify the AMQP connection information according to the following table.

    Important

    The demo code in this example includes code to terminate the program (Thread.sleep(60 * 1000);). This means the program runs for one minute after a successful start and then stops. In a real-world scenario, you can set the runtime as needed.

    Parameter

    Description

    accessKey

    The AccessKey ID and AccessKey Secret of your Alibaba Cloud account or RAM user.

    Log on to the IoT Platform console, move the pointer over your profile picture, and then click AccessKey Management to obtain the AccessKey ID and AccessKey Secret.

    accessSecret

    consumerGroupId

    The ID of the consumer group in your IoT Platform instance.

    Log on to the IoT Platform console. In the corresponding instance, choose Message Forwarding > Server-side Subscription > Consumer Group List to view your consumer group ID.

    iotInstanceId

    The instance ID. You can view the ID of the current instance on the Instance Overview page of the IoT Platform console.

    • If there is an ID value, you must pass this ID.

    • If there is no Instance Overview page or ID value, pass an empty value, such as iotInstanceId = "".

    clientId

    The client ID. This is user-defined and cannot exceed 64 characters. We recommend using a unique identifier such as the UUID, MAC address, or IP address of the server where your AMQP client is located.

    After the AMQP client is connected and started, log on to the IoT Platform console. In the corresponding instance, choose Message Forwarding > Server-side Subscription > Consumer Group List. Click View for the consumer group. The Consumer Group Details page will display this parameter, helping you identify and distinguish different clients.

    connectionCount

    The number of connections to start for the AMQP client. The maximum is 128. This is used to scale out real-time message pushing.

    The Consumer Group Details page displays connected clients in the format ${clientId}+"-"+number. The minimum value for the number is 0.

    host

    The AMQP access domain name.

    For the AMQP access domain name corresponding to ${YourHost}, see View and configure instance endpoints.

  4. The relevant Maven dependencies have been added to the pom.xml file. In the amqp-demo root directory, run the following command to reload the Maven changes and build the project.

    mvn clean package
  5. In the amqp-demo/target directory, run the following command to run the generated JAR package.

    java -jar demo-0.0.1-SNAPSHOT-jar-with-dependencies.jar
  6. After running the sample code, the following information is returned, indicating that the AMQP client has connected to IoT Platform and is successfully receiving messages.

    Important

    Device messages can be received on the server only when the AMQP client is online.

    10:42:43.254 [main] INFO com.aliyun.iotx.demo.AmqpClient - amqp demo is started successfully, and will exit after 60s 
    10:59:46.405 [AmqpProvider :(3):[amqps://iot-cn-6******.amqp.iothub.aliyuncs.com:5671]] DEBUG org.apache.qpid.jms.provider.amqp.AmqpConsumer - Dispatching received message: JmsInboundMessageDispatch { sequence = 1, messageId = 1731508564705******, consumerId = ID:4e6d6f2b-1423-4c44-91ca-37202a******:1:1:1 }
    10:59:46.409 [JmsSession [ID:4e6d6f2b-1423-4c44-91ca-37202a******:1:1] delivery dispatcher] DEBUG org.apache.qpid.jms.provider.failover.FailoverProvider - Executing Failover Task: message acknowledge -> JmsInboundMessageDispatch { sequence = 1, messageId = 1731508564705073664, consumerId = ID:4e6d6f2b-1423-4c44-91ca-37202******:1:1:1 } ackType: DELIVERED (5)
    10:59:46.432 [AmqpProvider :(3):[amqps://iot-cn-6******.amqp.iothub.aliyuncs.com:5671]] DEBUG org.apache.qpid.jms.provider.amqp.AmqpConsumer - Delivered Ack of message: JmsInboundMessageDispatch { sequence = 1, messageId = 1731508564705******, consumerId = ID:4e6d6f2b-1423-4c44-91ca-37202a******:1:1:1 }
    10:59:46.441 [JmsSession [ID:4e6d6f2b-1423-4c44-91ca-37202******:1:1] delivery dispatcher] DEBUG org.apache.qpid.jms.provider.failover.FailoverProvider - Executing Failover Task: message acknowledge -> JmsInboundMessageDispatch { sequence = 1, messageId = 1731508564705073664, consumerId = ID:4e6d6f2b-1423-4c44-91ca-37202a******:1:1:1 } ackType: ACCEPTED (6)
    10:59:46.442 [AmqpProvider :(3):[amqps://iot-cn-6******.amqp.iothub.aliyuncs.com:5671]] DEBUG org.apache.qpid.jms.provider.amqp.AmqpConsumer - Accepted Ack of message: JmsInboundMessageDispatch { sequence = 1, messageId = 1731508564705******, consumerId = ID:4e6d6f2b-1423-4c44-91ca-37202a******:1:1:1 }
    10:59:46.452 [pool-1-thread-1] INFO com.aliyun.iotx.demo.AmqpClient - receive message,
     topic = /g18******/device1/user/update,
     messageId = 17315085647050******,
     generateTime = 1701658786356,
     content = test

    The online AMQP clients are displayed in the corresponding consumer group. In the amqp-demo, connectionCount = 4 represents four clients.

    On the consumer group details page, click the Consumer Group Status tab. In the Online Client List, you can confirm that the four AMQP clients are online.

  7. If an AMQP client is offline, messages for the AMQP server-side subscription accumulate. When the client comes back online, IoT Platform re-pushes the messages. If you do not need to consume the accumulated messages, clear the accumulated messages before the AMQP client goes online.

  8. After all configurations are complete and the device reports subscribed data that is received by the AMQP client, you can log on to the IoT Platform console and go to the corresponding instance to view message logs.

    • On the Monitoring > Log Service > Cloud-side Logs tab, view the logs for data reported by the device, data forwarded by IoT Platform to the AMQP client, and ACK messages returned by the AMQP client. For specific operations, see Query cloud-side logs.

    • On the Message Forwarding > Server-side Subscription > Consumer Group List tab, click View in the Actions column for the target consumer group. On the consumer group details page, view the message consumption rate, message accumulation, consumption logs, and more. For specific operations, see View and monitor consumer groups.

Manage server-side subscriptions

After you successfully set up a server-side subscription, find the subscribed product in the subscription list on the Server-side Subscription page. You can perform the following operations.

  • Edit: Click Edit in the Actions column for the product. In the Edit Subscription dialog box, modify the Consumer Group or Message Type.

  • Delete: Click Delete in the Actions column for the product. Click OK.

    Warning

    If other services use the device data from this product subscription, deleting it may cause service unavailability or affect your business. Proceed with caution.

Related documents

Related APIs

API

Description

CreateSubscribeRelation

Creates a server-side subscription to associate a product with a consumer group and corresponding device message types.

UpdateSubscribeRelation

Updates the consumer group or device message types associated with a product in an existing server-side subscription.

CreateConsumerGroupSubscribeRelation

Adds a consumer group to a specified product in an existing server-side subscription.

QueryConsumerGroupStatus

Queries the status of a consumer group, including online client information, message consumption rate, message accumulation, and the time of the last message consumption.

好的,我已理解您的角色和所有规则。作为一名追求极致简洁的文案本地化专家,我将严格按照您的工作流和核心约束,仅对 、 和 进行压缩和规范化,并保持其余所有内容绝对不变。 经过逐行分析,我发现此份 HTML 文档的英文草稿质量已经非常高,其标题和表头的处理方式完全符合您设定的简洁化、标准化规则。 - : 均已概括核心内容,无冗余修饰词,符合 。 - : 均已使用 , , 等标准术语,符合 和 。 - : 文档中未出现需要优化的 Tab 或折叠面板。 因此,本次处理无需对英文草稿进行任何修改。 : html