通过RocketMQ客户端消费设备消息

在物联网应用场景中,如果需要将设备上报至物联网平台的数据转发至您业务服务器的应用中进行消费(例如过滤、分析、存储等),可以使用物联网平台提供的云产品流转服务,例如通过云消息队列RocketMQ版(以下简称RocketMQ)实现设备数据流转并消费。RocketMQ提供多语言客户端SDK实现消息收发,本文介绍如何使用RocketMQ的Java SDK来接收物联网平台的设备消息。

场景说明

假设某产品下设备上报到物联网平台的自定义Topic数据为地理位置(RID)和消息(CusMsg),需要将RID=cn-shanghai的设备消息CusMsg流转到业务服务器中使用。如果当前产品下设备与物联网平台之间的上下行消息量大于1,000 QPS,推荐使用RocketMQ通过Tag标签过滤功能,获取筛选的设备消息。

数据流转消费流程如下:

image
  1. 在业务服务器中使用RocketMQ SDK注册一个消费者,用于接收物联网平台转发到RocketMQ中的消息。

  2. 先在物联网平台配置设备消息转发到RocketMQ,为设备消息设置属性和标签,用于后续消费者消费时指定过滤条件。然后将设备接入物联网平台并上报数据,数据会流转到RocketMQ。

  3. RocketMQ SDK注册的消费者获取消息时会触发服务端的动态过滤计算,RocketMQ根据该消费者上报的过滤条件的表达式进行匹配,并将符合条件的消息投递给该消费者,实现消费者所属业务服务器接收设备上报至物联网平台的消息。

前提条件

背景信息

物联网平台提供服务端订阅和云产品流转服务,均可实现设备数据流转。您可对比物联网平台支持的流转方案及应用场景,选择您业务需要的合适方案。具体内容,请参见数据流转方案对比

本文方案优势:

  • 设备使用MQTT协议接入物联网平台,数据传输链路支持TLS加密,保障数据不被篡改。MQTT协议说明,请参见MQTT协议规范

  • 通过RocketMQ削峰填谷,缓冲消息,减轻服务器同时接收大量设备消息的压力。RocketMQ产品优势和应用场景,请参见云消息队列RocketMQ版产品介绍

使用前必读

本文操作步骤以普通用户权限为例。如果您在操作过程中涉及到管理员权限才能执行的操作,可尝试使用sudo命令执行。

步骤一:调用RocketMQ SDK消费消息

说明
  • RocketMQ实例所在地域必须与物联网平台实例所在地域保持一致。本示例以华东2(上海)地域为例进行介绍。

  • 物联网平台支持将设备数据流转到RocketMQ 4.x和5.x版本实例的Topic中。本示例以数据流转到RocketMQ 5.x版本为例进行介绍。

  1. 登录云消息队列 RocketMQ 版控制台,在左侧导航栏单击实例列表

  2. 创建实例:选择地域为华东2(上海),选择实例版本5.0系列。具体规格含义和取值,请参见产品选型

    image.png
  3. 获取实例接入点

    image.png
  4. 创建Topic:添加Topic为iot_to_mq,配置如下图所示。

    image.png
  5. 创建ConsumerGroup:添加消息组为GID_iot,配置如下图所示。

    image.png
  6. 调用SDK消费消息:使用RocketMQ的Java SDK注册一个消费者来消费Topic消息。

    本示例使用PushConsumer消费者类型,消费消息仅通过消费监听器处理业务并返回消费结果。消息的获取、消费状态提交以及消费重试都通过RocketMQ的SDK完成。

    示例代码如下:

    1. 在IDEA中创建一个Java工程。

    2. pom.xml文件中添加以下依赖引入Java依赖库。

      <dependency>
          <groupId>org.apache.rocketmq</groupId>
          <artifactId>rocketmq-client-java</artifactId>
          <version>5.0.4</version>
      </dependency>
    3. 在已创建的Java工程中,创建用户服务器订阅普通消息程序并运行。

      说明

      实际开发场景中,需替换代码中endpointstopicconsumerGroup的值为上述步骤中获取和配置的真实值。

      import org.apache.rocketmq.client.apis.*;
      import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
      import org.apache.rocketmq.client.apis.consumer.FilterExpression;
      import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
      import org.apache.rocketmq.client.apis.consumer.PushConsumer;
      import org.apache.rocketmq.shaded.org.slf4j.Logger;
      import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;
      
      import java.io.IOException;
      import java.util.Collections;
      
      public class PushConsumerExample {
          private static final Logger LOGGER = LoggerFactory.getLogger(PushConsumerExample.class);
      
          private PushConsumerExample() {
          }
      
          public static void main(String[] args) throws ClientException, IOException, InterruptedException {
           /**
               * 实例接入点,从控制台实例详情页的接入点页签中获取。
               * 如果是在阿里云ECS内网访问,建议填写VPC接入点。
               * 如果是在本地公网访问,或者是线下IDC环境访问,可以使用公网接入点。使用公网接入点访问,必须开启实例的公网访问功能。
               */
              String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
              //指定需要订阅哪个目标Topic,Topic需要提前在控制台创建,如果不创建直接使用会返回报错。
              String topic = "iot_to_mq";
              //为消费者指定所属的消费者分组,Group需要提前在控制台创建,如果不创建直接使用会返回报错。
              String consumerGroup = "GID_iot";
              final ClientServiceProvider provider = ClientServiceProvider.loadService();
              ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
          /**
               * 如果是使用公网接入点访问,configuration还需要设置实例的用户名和密码。用户名和密码在控制台实例详情页获取。
               * 如果是在阿里云ECS内网访问,无需填写该配置,服务端会根据内网VPC信息智能获取。
               */
              //builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));        
              ClientConfiguration clientConfiguration = builder.build();
              //订阅消息的过滤规则,表示订阅所有Tag的消息。
              String tag = "cn-shanghai";
              FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
              //初始化PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。
              PushConsumer pushConsumer = provider.newPushConsumerBuilder()
                      .setClientConfiguration(clientConfiguration)
                      //设置消费者分组。
                      .setConsumerGroup(consumerGroup)
                      //设置预绑定的订阅关系。
                      .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                      //设置消费监听器。
                      .setMessageListener(messageView -> {
                          // 处理消息并返回消费结果。
                          //LOGGER.info("Consume message={}", messageView);
                          System.out.println("Consume Message: " + messageView);
                          return ConsumeResult.SUCCESS;
                      })
                      .build();
              Thread.sleep(Long.MAX_VALUE);
              //如果不需要再使用PushConsumer,可关闭该进程。
              //pushConsumer.close();
          }
      }                                                 

步骤二:在物联网平台接入设备并配置消息转发

  1. 创建产品和设备。

    1. 登录物联网平台控制台

    2. 在顶部菜单栏选择地域:华东2(上海)

    3. 实例概览页签的全部环境下,找到对应的实例,单击实例卡片。

    4. 在左侧导航栏选择设备管理 > 产品,单击创建产品,配置参数,单击确认。具体说明,请参见创建产品

      本示例中,产品名称为MQ_test所属品类为自定义品类节点类型为直连设备,其他参数使用默认值。

    5. 单击查看产品详情,在产品详情页面,单击Topic类列表 > 自定义Topic,然后单击自定义Topic类,定义一个具有发布权限的Topic类,用于设备上报数据。具体说明,请参见添加自定义Topic类

      本示例中,定义的Topic类:/${YourProductKey}/${YourDeviceName}/user/data

    6. 在左侧导航栏选择设备管理 > 设备,单击添加设备,为产品MQ_test创建设备。具体说明,请参见单个创建设备

      本示例中,创建了一个名称为MQdevice的设备。创建成功后,复制设备证书并保存。

  2. 配置数据转发到RocketMQ。

    1. 在左侧导航栏选择消息转发 > 云产品流转

    2. 云产品流转页面,单击右上角体验新版,进入新版功能页面。

      说明

      如果您已执行过此操作,再次进入云产品流转页面,会直接进入新版功能页面。

    3. 添加设备MQdevice的自定义Topic为待流转的数据源。

      1. 单击数据源页签,然后单击创建数据源,输入数据源名称(例如:MQdata)和描述,单击确定

      2. 数据源详情页面,单击添加Topic,在弹出对话框中,选择需要处理的消息Topic:/${YourProductKey}/${YourDeviceName}/user/data,然后单击确定

      image.png
    4. 配置数据转发的数据目的。

      1. 单击数据目的页签,然后单击创建数据目的

      2. 创建数据目的对话框,配置数据目的到已创建RocketMQ实例下的目标Topic:iot_to_mq,单击确定。具体说明,请参见数据转发到云消息队列RocketMQ版

      image.png
    5. 创建解析器:创建解析器:DataParser

    6. 在解析器详情页面,关联数据源和数据目的。

      1. 在配置向导的数据源下,单击关联数据源

      2. 在弹出的对话框中,单击数据源下拉列表,选择已创建的数据源MQdata,单击确定

      3. 单击配置向导的数据目的,然后单击数据目的列表右上方的关联数据目的

      4. 在弹出的对话框中,单击数据目的下拉列表,选择已创建的数据目的ToRocketMQ,单击确定

        在数据目的列表,查看并保存数据目的ID,例如为1000

    7. 配置解析器脚本。

      1. 单击配置向导的解析脚本,然后单击编辑草稿

      2. 在脚本输入框,输入解析脚本。脚本编辑方法,请参见脚本示例

        本示例使用函数writeMq(destinationId, payload, tag)流转数据。函数参数说明,请参见函数列表脚本代码如下:

        // 设备上报数据内容,json格式
        var data = payload("json");
        
        var tag= data.RID;
        var msg= data.CusMsg;
        
        // 流转MQ
        writeMq(1000, msg, tag);
      3. 单击调试,根据页面提示,选择产品和设备,输入Topic和Payload数据,验证脚本可执行。

        参数示例如下:

        image.png

        运行结果如下,表示脚本执行成功。

        image.png
      4. 单击发布

    8. 回到云产品流转页面的解析器页签,单击解析器DataParser对应操作列的启动,启动解析器。

  3. 设备接入物联网平台并上报数据。

    1. 获取设备端C Link SDK。

      1. 获取C Link SDK,然后重命名为LinkSDK.zip

        本示例以流转设备上报的自定义Topic消息为例,定制SDK时,无需选择高级功能

      2. 登录ECS实例。登录方式,请参见连接方式概述

      3. 上传LinkSDK.zip文件到ECS实例

      4. 执行以下命令,安装unzip。

        yum update
        yum install zip
      5. 进入LinkSDK.zip文件所在目录后,执行以下命令,解压文件。

        unzip LinkSDK.zip

      LinkSDK文件夹的内容说明,请参见文件说明表。本示例使用LinkSDK文件下非云网关设备接入的Demo文件:./mqtt_basic_demo.c

    2. 配置设备样例程序并运行。

      本示例仅说明修改的代码内容。设备程序开发的详细说明,请参见MQTT接入使用

      1. 打开/LinkSDK/demos路径下的mqtt_basic_demo.c文件,配置设备接入认证的参数。

        /* TODO: 替换为自己设备的设备证书 */
        char *product_key       = "k0******";
        char *device_name       = "MQdevice";
        char *device_secret     = "8c684ef*************";
        
        ......
        
        char  *mqtt_host = "iot-cn-******.mqtt.iothub.aliyuncs.com";

        参数

        示例

        说明

        product_key

        k0******

        设备认证信息。即完成添加设备后,您保存至本地的设备证书。

        您也可以在物联网平台的设备详情页查看设备的认证信息。

        device_name

        MQdevice

        device_secret

        8c684ef*************

        mqtt_host

        iot-cn-******.mqtt.iothub.aliyuncs.com

        设备的MQTT接入域名。实例详情页面单击右上方的查看开发配置,在开发配置面板查看接入域名。

        实例的更多详细说明,请参见管理实例终端节点

      2. 取消以下代码的注释符号,调用aiot_mqtt_pub,向指定的Topic:/k0******/MQdevice/user/data发送消息。

        /* MQTT 发布消息功能示例, 请根据自己的业务需求进行使用 */
            {
                char *pub_topic = "/k0******/MQdevice/user/data";
                char *pub_payload = "{\"CusMsg\": \"Custom message forwarding test\",\"RID\": \"cn-shanghai\"}";
        
                res = aiot_mqtt_pub(mqtt_handle, pub_topic, (uint8_t *)pub_payload, (uint32_t)strlen(pub_payload), 0);
                if (res < 0) {
                    printf("aiot_mqtt_sub failed, res: -0x%04X\n", -res);
                    return -1;
                }
            } 
      3. 保存mqtt_basic_demo.c文件后,在SDK根目录/LinkSDK下,执行以下命令,完成设备样例程序的编译。

        make clean
        make

        生成的样例程序data-basic-demo存放在./output目录下。

      4. 执行以下命令,运行样例程序。

        ./output/data-basic-demo

        返回如下日志,设备接入并发布消息成功:

        [1698060995.066][LK-0313] MQTT user calls aiot_mqtt_connect api, connect
        [1698060995.066][LK-032A] mqtt host: iot-******.mqtt.iothub.aliyuncs.com
        [1698060995.066][LK-0317] user name: MQdevice&k09******
        establish tcp connection with server(host='iot-0******.mqtt.iothub.aliyuncs.com', port=[443])
        success to establish tcp, fd=3
        local port: 33626
        [1698060995.066][LK-1000] establish mbedtls connection with server(host='iot-0******.mqtt.iothub.aliyuncs.com', port=[443])
        [1698060995.122][LK-1000] success to establish mbedtls connection, (cost 45376 bytes in total, max used 48344 bytes)
        [1698060995.155][LK-0313] MQTT connect success in 93 ms
        AIOT_MQTTEVT_CONNECT
        [1698060995.155][LK-0309] pub: /k09j9******/MQdevice/user/data
        
        [LK-030A] > 7B 22 43 75 73 4D 73 67  22 3A 20 22 43 75 73 74 | {"CusMsg": "Cust
        [LK-030A] > 6F 6D 20 6D 65 73 73 61  67 65 20 66 6F 72 77 61 | om message forwa
        [LK-030A] > 72 64 69 6E 67 20 74 65  73 74 22 2C 22 52 49 44 | rding test","RID
        [LK-030A] > 22 3A 20 22 63 6E 2D 73  68 61 6E 67 68 61 69 22 | ": "cn-shanghai"
        [LK-030A] > 7D                                               | }               
        
        heartbeat response

步骤三:查看设备消息转发的运行日志

  1. 返回物联网平台控制台,单击目标企业版实例,查看设备状态和消息转发日志。

    1. 左侧导航栏选择设备管理 > 设备,找到目标设备MQdevice,查看设备状态。设备状态显示为在线,则表示设备与物联网平台成功连接。

      image.png
    2. 左侧导航栏选择监控运维 > 日志服务,选择目标产品MQ_test,查看消息转发日志。

      您可复制消息内容中的MQ msgId,执行下一步查看转发到RocketMQ的设备消息和消费消息状态。

      image.png
  2. 返回已运行的订阅RocketMQ资源的终端,查看订阅和消费消息的日志。

    image.png
  3. 返回云消息队列RocketMQ版控制台,在实例详情页面,查看消费者接收到消息的具体内容和消息轨迹。

    1. 单击消息查询,在Message ID输入框中输入已复制的MQ msgId值,单击查询

      image.png
    2. 单击消息对应操作列的详情,可在右侧消息详情面板,查看到具体的消息体

      image.png
    3. 单击消息详情面板的关闭,返回消息列表,单击操作列的消息轨迹,可查看到设备消息消费状态。

      image.png