In IoT applications, you may need to forward data reported by devices to an application on your business server for processing, such as filtering, analysis, or storage. You can use the Data Forwarding feature of IoT Platform to route device data to services such as ApsaraMQ for RocketMQ for consumption. ApsaraMQ for RocketMQ provides SDKs in multiple languages for sending and receiving messages. This topic describes how to use the Java SDK for ApsaraMQ for RocketMQ to receive device messages from IoT Platform.
Use cases
Suppose a device under a product reports data containing a location ID (RID) and a custom message (CusMsg) to a custom topic in IoT Platform. You need to forward the device message CusMsg to your business server when RID=cn-shanghai. If the message volume for this product exceeds 1,000 QPS, we recommend using the tag filtering feature of ApsaraMQ for RocketMQ to retrieve the specified device messages.
-
On your business server, use the ApsaraMQ for RocketMQ SDK to register a consumer. This consumer receives messages forwarded from IoT Platform.
-
In IoT Platform, configure data forwarding to ApsaraMQ for RocketMQ. Set properties and tags for device messages, which the consumer will use as filter conditions. Then, connect your device to IoT Platform to report data, which is then forwarded to ApsaraMQ for RocketMQ.
-
When the consumer retrieves messages, the ApsaraMQ for RocketMQ SDK triggers server-side dynamic filtering. ApsaraMQ for RocketMQ matches messages against the filter expression provided by the consumer and delivers only the eligible messages. This allows your business server to receive specific messages that devices report to IoT Platform.
Prerequisites
-
You have an active Alibaba Cloud account.
-
IoT Platform is activated.
NoteAfter activation, try IoT Platform for free. Alibaba Cloud IoT Platform products for free trial. Billing overview.
-
Purchase an Enterprise Edition instance. This topic uses an Enterprise Edition instance in the China (Shanghai) region.
-
You have activated ApsaraMQ for RocketMQ.
If you have not activated the service, go to the ApsaraMQ for RocketMQ product page to activate the service.
-
You have prepared a development environment.
-
This topic uses the Java SDK to demonstrate how to consume messages with ApsaraMQ for RocketMQ.
-
Operating system: Windows 10 (64-bit)
You can use IntelliJ IDEA or Eclipse. This topic uses IntelliJ IDEA Ultimate as an example.
-
-
This topic uses an ECS instance that runs Alibaba Cloud Linux, which provides a C language compilation and runtime environment by default. The device C Link SDK demo provided by IoT Platform is used to simulate a device that connects to IoT Platform and reports data. For information about how to purchase an ECS instance, see Create an instance.
-
Background
IoT Platform provides server-side subscriptions and Data Forwarding, both of which can forward device data. You can compare the available data forwarding solutions and their use cases to choose the one that best suits your business needs. For more information, see Comparison of data forwarding solutions.
Benefits of this solution:
-
Devices connect to IoT Platform by using the Message Queuing Telemetry Transport (MQTT) protocol, and the data transmission link is encrypted with Transport Layer Security (TLS) to prevent tampering. For more information about the MQTT protocol, see MQTT protocol specifications.
-
ApsaraMQ for RocketMQ smooths traffic spikes by buffering messages and reduces the pressure on your server from receiving a large number of device messages simultaneously. For more information about the benefits and use cases of ApsaraMQ for RocketMQ, see the ApsaraMQ for RocketMQ product page.
Before you begin
The steps in this topic are demonstrated using a regular user. If an operation requires administrative permissions, run the command with sudo.
Step 1: Consume messages with RocketMQ SDK
-
Your ApsaraMQ for RocketMQ and IoT Platform instances must be in the same region.
-
IoT Platform supports forwarding device data to topics in ApsaraMQ for RocketMQ 4.x and 5.x instances. This topic uses a 5.x instance as an example.
Log on to the ApsaraMQ for RocketMQ console. In the left-side navigation pane, click Instances.
-
Create an instance: Select China (Shanghai) as the region and 5.0 Series as the Instance Version. For more information about the specifications and their values, see Product Selection.
Select Professional Edition for Primary Series, Cluster High-Availability Edition (Recommended for Production) for Sub-series, and rmq.s2.2xlarge for Compute Specification for Message Sending and Receiving. Configure the VPC ID and VSwitch ID. Set public access to Enabled, Public Network Billing Method to Fixed Bandwidth, and public bandwidth to 1 Mbps.
-
On the Endpoints page, when the Client Protocol is TCP, you can obtain two types of endpoints: a VPC endpoint (for example,
rmq-c<InstanceID>.cn-shanghai.rmq.aliyuncs.com:8080) and a public endpoint. For public endpoints, you can configure network access control by clicking Edit Whitelist. -
Create a topic named iot_to_mq.
Select Normal Message as the message type and enter Forward IoT device reported data to a business server for the Description.
-
Create a Consumer Group named GID_iot.
Select Concurrent Delivery for Delivery Order and enter
IoT device message subscriptionfor Description. -
Consume messages by using an SDK. Use the Java SDK for ApsaraMQ for RocketMQ to register a consumer that consumes topic messages.
This topic uses the PushConsumer type, where message consumption is handled only by a message listener that processes business logic and returns a consumption result. The ApsaraMQ for RocketMQ SDK handles message retrieval, consumption status submission, and retries.
The following is the sample code:
-
Create a Java project in your IDE.
-
Add the following dependency to the
pom.xmlfile to import the Java library.<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client-java</artifactId> <version>5.0.4</version> </dependency> -
In the created Java project, create and run a program for your server to subscribe to normal messages.
NoteIn a real-world development scenario, replace the values of
endpoints,topic, andconsumerGroupin the code with the actual values you obtained and configured in the previous steps.import org.apache.rocketmq.client.apis.*; import org.apache.rocketmq.client.apis.consumer.ConsumeResult; import org.apache.rocketmq.client.apis.consumer.FilterExpression; import org.apache.rocketmq.client.apis.consumer.FilterExpressionType; import org.apache.rocketmq.client.apis.consumer.PushConsumer; import org.apache.rocketmq.shaded.org.slf4j.Logger; import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Collections; public class PushConsumerExample { private static final Logger LOGGER = LoggerFactory.getLogger(PushConsumerExample.class); private PushConsumerExample() { } public static void main(String[] args) throws ClientException, IOException, InterruptedException { /** * The instance endpoint, which can be obtained from the Endpoints tab of the instance details page in the console. * If you are accessing from an Alibaba Cloud ECS instance, we recommend using the VPC endpoint. * If you are accessing from a local machine over the public network or from an on-premises IDC, you can use the public endpoint. To use a public endpoint, you must enable public access for the instance. */ String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080"; // The destination topic to which you want to subscribe. The topic must be created in the console in advance. Otherwise, an error is returned. String topic = "iot_to_mq"; // The consumer group for the consumer. The group must be created in the console in advance. Otherwise, an error is returned. String consumerGroup = "GID_iot"; final ClientServiceProvider provider = ClientServiceProvider.loadService(); ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints); /** * If you use a public endpoint, you must also set the username and password for the instance in the configuration. * You can obtain the username and password from the instance details page in the console. * If you are accessing from an Alibaba Cloud ECS instance over the internal network, you do not need to set these credentials. The server automatically obtains them based on the VPC information. */ //builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password")); ClientConfiguration clientConfiguration = builder.build(); // The filter rule for message subscription, which indicates that messages with the specified tag are subscribed. String tag = "cn-shanghai"; FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG); // Initialize PushConsumer and bind the consumer group, communication parameters, and subscription. PushConsumer pushConsumer = provider.newPushConsumerBuilder() .setClientConfiguration(clientConfiguration) // Set the consumer group. .setConsumerGroup(consumerGroup) // Set the pre-bound subscription. .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) // Set the message listener. .setMessageListener(messageView -> { // Process the message and return the consumption result. //LOGGER.info("Consume message={}", messageView); System.out.println("Consume Message: " + messageView); return ConsumeResult.SUCCESS; }) .build(); Thread.sleep(Long.MAX_VALUE); // If you no longer need the PushConsumer, you can close it. //pushConsumer.close(); } }
-
Step 2: Connect a device and configure forwarding
-
Create a product and a device.
-
Log on to the IoT Platform console.
-
In the top menu bar, select the region: China (Shanghai).
-
On the Overview page, under All environment, find your instance and click the instance card.
-
In the left navigation pane, choose , click Create Product, configure the parameters, and then click OK. For more information, see Create a product.
In this example, the product name is MQ_test, the category is Custom Category, the Node Type is Directly Connected Device, and the default values are used for the other parameters.
-
Click View Product Details. On the product details page, choose , and then click Define Topic Category to define a custom topic with Publish permissions for the device to report data. For more information, see Add a custom topic category.
In this topic, the defined topic category is:
/${YourProductKey}/${YourDeviceName}/user/data. -
In the left navigation pane, choose , and click Add Device to create a device for the MQ_test product. For more information, see Create a single device.
In this topic, a device named MQdevice is created. After creation, copy and save the device certificate.
-
-
Configure data forwarding to ApsaraMQ for RocketMQ.
-
In the left navigation pane, choose .
-
On the Data Forwarding page, click Go to New Version in the upper-right corner.
NoteIf you have performed this operation before, you are automatically redirected to the new version of the Data Forwarding page.
-
Add the custom topic of the device MQdevice as the data source.
-
Click the Data Sources tab, and then click Create Data Source. Enter a name for the data source (for example, MQdata) and a description, then click OK.
-
On the Data Source Details page, click Add Topic. In the dialog box that appears, select the topic to process:
/${YourProductKey}/${YourDeviceName}/user/data, and then click OK.
-
-
Configure the data destination for data forwarding.
-
Click the Data Destinations tab, and then click Create Data Destination.
-
In the Create Data Destination dialog box, configure the data destination to the target topic iot_to_mq in the created ApsaraMQ for RocketMQ instance, and then click OK. For more information, see Forward data to ApsaraMQ for RocketMQ.
In this topic, Data Destination Name is set to
ToRocketMQ, Action is set to Send data to ApsaraMQ for RocketMQ, region is set to China (Shanghai), instance is set toIoT-forward-data-test, topic is set toiot_to_mq, and Authorization is set toAliyunIOTAccessingMQRole. -
-
Create a parser named
DataParser. -
On the parser details page, associate the data source and data destination.
-
In the Configuration Wizard, under Data Source, click Associate Data Source.
-
In the dialog box that appears, click the Data source drop-down list, select MQdata, and click OK.
-
Click Data Destination in the configuration wizard, and then click Associate Data Destination in the upper-right corner of the Data Destination list.
-
In the pop-up dialog box, click the Data Destination dropdown list, select ToRocketMQ, and click OK.
In the data destination list, view and save the data destination ID, for example, 1000.
-
-
Configure the parser script.
-
Click Parser Script in the configuration wizard, and then click Edit Draft.
-
In the script editor, enter the parser script.
This topic uses the
writeMq(destinationId, payload, tag)function to forward data. For more information about the function parameters, see Function list. The script is as follows:// The data reported by the device, in JSON format. var data = payload("json"); var tag= data.RID; var msg= data.CusMsg; // Forward to MQ. writeMq(1000, msg, tag); -
Click Debug. Follow the on-screen instructions to select a product and device, and enter a topic and payload data to verify the script.
Select
MQ_testfor Product,MQdevicefor Device, enter/${ProductKey}/MQdevice/user/datafor Topic, and enter{"CusMsg": "Custom message forwarding test", "RID":"cn-shanghai"}for Payload.The following result indicates that the script executed successfully.
action: transmit to MQ[destinationId=1000], data:Custom messag variables: msg : Custom message forwarding test data : {"CusMsg":"Custom message forwarding test","RID":"cn-shanghai"} tag : cn-shanghai -
Click Publish.
-
-
Return to the Parser tab on the Data Forwarding page. In the Actions column for the DataParser parser, click Start to start the parser.
-
-
Connect the device to IoT Platform and report data.
-
Obtain the device-side C Link SDK.
-
Obtain the C-language Link SDK and rename it to
LinkSDK.zip.In this example of forwarding messages that a device reports to a custom topic, you do not need to select Advanced Features when you customize the SDK.
Log on to the ECS instance. For information about logon methods, see Select a method to connect to an ECS instance.
-
Run the following commands to install unzip:
yum update yum install zip -
Go to the directory where the
LinkSDK.zipfile is located and run the following command to decompress the file:unzip LinkSDK.zip
For more information about the contents of the LinkSDK folder, see the file description table. This topic uses the demo file for connecting devices that are not cloud gateways:
./mqtt_basic_demo.c. -
-
Configure and run the device sample program.
This section only explains the code modifications. For detailed instructions on developing the device program, see Use MQTT to connect to IoT Platform.
-
Open the
mqtt_basic_demo.cfile in the/LinkSDK/demosdirectory and configure the device connection and authentication parameters./* TODO: Replace with your own device certificate information */ char *product_key = "k0******"; char *device_name = "MQdevice"; char *device_secret = "8c684ef*************"; ...... char *mqtt_host = "iot-cn-******.mqtt.iothub.aliyuncs.com";Parameter
Example
Description
product_key
k0******Device authentication information. The device certificate that you saved locally after adding the device.
You can also view the device's authentication information on the Device Details page in the IoT Platform console.
device_name
MQdevicedevice_secret
8c684ef*************mqtt_host
iot-cn-******.mqtt.iothub.aliyuncs.comThe MQTT endpoint for the device. On the Instance Details page, click View Development Configurations in the upper-right corner to view the endpoint in the Development Configurations panel.
For more information about instances, see View and configure instance endpoints.
-
Uncomment the following code block to call aiot_mqtt_pub and send a message to the specified topic:
/k0******/MQdevice/user/data./* Sample code for the MQTT message publishing feature. Use it based on your business requirements. */ { char *pub_topic = "/k0******/MQdevice/user/data"; char *pub_payload = "{\"CusMsg\": \"Custom message forwarding test\",\"RID\": \"cn-shanghai\"}"; res = aiot_mqtt_pub(mqtt_handle, pub_topic, (uint8_t *)pub_payload, (uint32_t)strlen(pub_payload), 0); if (res < 0) { printf("aiot_mqtt_sub failed, res: -0x%04X\n", -res); return -1; } } -
After saving the
mqtt_basic_demo.cfile, run the following commands in the SDK root directory/LinkSDKto compile the device sample program.make clean makeThe compilation process stores the generated sample program
data-basic-demoin the./outputdirectory. -
Run the following command to run the sample program.
./output/data-basic-demoThe following log indicates that the device has successfully connected and published a message:
[1698060995.066][LK-0313] MQTT user calls aiot_mqtt_connect api, connect [1698060995.066][LK-032A] mqtt host: iot-******.mqtt.iothub.aliyuncs.com [1698060995.066][LK-0317] user name: MQdevice&k09****** establish tcp connection with server(host='iot-0******.mqtt.iothub.aliyuncs.com', port=[443]) success to establish tcp, fd=3 local port: 33626 [1698060995.066][LK-1000] establish mbedtls connection with server(host='iot-0******.mqtt.iothub.aliyuncs.com', port=[443]) [1698060995.122][LK-1000] success to establish mbedtls connection, (cost 45376 bytes in total, max used 48344 bytes) [1698060995.155][LK-0313] MQTT connect success in 93 ms AIOT_MQTTEVT_CONNECT [1698060995.155][LK-0309] pub: /k09j9******/MQdevice/user/data [LK-030A] > 7B 22 43 75 73 4D 73 67 22 3A 20 22 43 75 73 74 | {"CusMsg": "Cust [LK-030A] > 6F 6D 20 6D 65 73 73 61 67 65 20 66 6F 72 77 61 | om message forwa [LK-030A] > 72 64 69 6E 67 20 74 65 73 74 22 2C 22 52 49 44 | rding test","RID [LK-030A] > 22 3A 20 22 63 6E 2D 73 68 61 6E 67 68 61 69 22 | ": "cn-shanghai" [LK-030A] > 7D | } heartbeat response
-
-
Step 3: View message forwarding logs
-
Return to the IoT Platform console, click your Enterprise Edition instance, and check the device status and message forwarding logs.
-
In the left navigation bar, select . Find the target device MQdevice and view its status. If the device status is Online, the device is successfully connected to the IoT Platform.
-
In the left navigation pane, choose , select the target product MQ_test, and view the message forwarding logs.
You can copy the MQ msgId from the message content and proceed to the next step to view the device message that is forwarded to RocketMQ and its consumption status.
A log entry with a Business Type of Data Forwarding indicates that the data was successfully forwarded to ApsaraMQ for RocketMQ. Click the Details link in the Message Content column to see the full JSON in a pop-up window, which includes the
MQ msgIdvalue.
-
-
Return to the terminal where the ApsaraMQ for RocketMQ subscription client is running to view the message subscription and consumption logs.
Consume Message: MessageViewImpl{messageId=xxx, topic=iot_to_mq, bornHost=iotx-data-xxx, bornTimestamp=xxx, endpoints=dns:rmq-cn-xxx.cn-shanghai.rmq.aliyuncs.com:8081, deliveryAttempt=1, tag=cn-shanghai, keys=[0], messageGroup=null, deliveryTimestamp=null, properties={}} -
Return to the ApsaraMQ for RocketMQ console. On the Instance Details page, view the specific content of the received message and the message trace.
-
Click Message Query, enter the copied MQ msgId value in the Message ID input box, and click Query.
Select iot_to_mq for the topic. A message record appears in the query results, confirming that the device message has been successfully forwarded to ApsaraMQ for RocketMQ.
-
Click Details in the Actions column for the message to view the Message Body in the Message Details panel on the right.
The Message Tags (Tags) is
cn-shanghai, and the Message Body isCustom message forwarding test. This confirms that the IoT message has been successfully forwarded to ApsaraMQ for RocketMQ. -
Click Close on the Message Details panel, and in the message list, click Message Trace in the Actions column to view the consumption status of the device message.
The message trace shows the message delivery path: the producer sent the message successfully, the message type is Normal Message, the topic is
iot_to_mq, and the tag iscn-shanghai. The Consumer GroupGID_iothad a total of one delivery attempt, which was successful.
-