MQTT客户端的上下线事件将会触发MQTT服务端生成一条通知消息,微消息队列MQTT版支持将该条消息数据导出至其他阿里云产品,并使用MQTT的Java SDK实现MQTT客户端与后端应用收发消息。本文以当前仅支持的消息队列RocketMQ版数据互通为例进行说明。

前提条件

  • 安装IDE。您可以使用IntelliJ IDEA或者Eclipse,本文以IntelliJ IDEA为例。
  • 下载安装JDK
  • 在公网地域,创建消息队列RocketMQ版实例、Topic以及Group ID。详细操作,请参见创建资源

背景信息

您可以通过配置客户端上下线通知规则来将MQTT客户端的上下线事件通知的数据导出至其他阿里云产品。

本文以公网环境中的Java SDK为例说明微消息队列MQTT版如何将MQTT客户端上下线事件通知的消息发送至后端应用。

此场景下可使用多语言的第三方开源SDK来实现消息收发。更多信息,请参见SDK下载

quick_start_client_stats_notify

如上图所示,您的后端应用和MQTT客户端均通过Java语言开发。您部署在公网环境中的MQTT客户端上线或下线时,会触发MQTT服务生成一条事件消息,您可以通过配置客户端上下线通知规则将该通知消息经过消息队列RocketMQ版发送至后端应用。两个产品的服务端通过各自产品提供的Java SDK分别与各自的客户端实现消息收发。

客户端上下线通知功能的更多信息,请参见MQTT客户端上下线事件数据流出

注意 消息队列RocketMQ版微消息队列MQTT版的Topic不能跨地域使用,因此,本文中所有资源都应在公网地域创建。详细信息,请参见Topic地域化

网络访问

微消息队列MQTT版同时提供了公网接入点VPC 接入点。接入点说明如下:
  • 在物联网和移动互联网的场景中,客户端推荐使用公网接入点接入。
  • VPC 接入点仅供一些特殊场景使用。因为一般而言,涉及部署在云端服务器上的应用的场景,建议使用服务端消息产品例如消息队列RocketMQ版实现。
注意 客户端使用接入点连接服务时务必使用域名接入,不得直接使用域名背后的IP地址直接连接,因为IP地址随时会变化。在以下使用情况中出现的问题微消息队列MQTT版产品方概不负责:
  • 客户端不使用域名接入而是使用IP地址接入,产品方更新了域名解析导致原有IP地址失效。
  • 客户端网络对IP地址设置网络防火墙策略,产品方更新了域名解析后新IP地址被您的防火墙策略拦截。
本文以公网接入点为例。微消息队列MQTT版消息队列RocketMQ版的应用场景对比和消息属性映射关系请参见以下文档:

使用流程

MQTT客户端上下线通知的消息收发流程如下图所示。

quick_start_client_status_notify_process

步骤一:创建MQTT实例并获取接入点

注意 轻量版实例不支持数据流入规则、数据流出规则及上下线通知规则,如需使用规则功能,请您在创建实例时选择其他类型的实例。
  1. 登录微消息队列MQTT版控制台
  2. 在左侧导航栏单击实例列表
  3. 在顶部菜单栏选择地域。
  4. 实例列表页面左上角单击创建实例
  5. 在弹出的付费方式面板中,按需选择实例付费方式。
    微消息队列MQTT版支持按量付费包年包月付费模式,两种类型的计费方式请参见计费说明
    • 创建按量付费实例
      1. 付费方式选择按量付费,然后单击确定
      2. 在实例规格面板,按需选择您需要购买的实例规格,单击立即购买按量付费实例
    • 创建包年包月实例
      1. 付费方式选择包年包月,然后确定
      2. 在实例规格面板,按需选择您需要购买的实例规格,单击立即购买购买包年包月
      3. 在订单支付面板,单击订购
    购买成功后,刷新微消息队列MQTT版控制台实例列表页面,可以看到刚才新创建的实例。
  6. 实例列表页面中,单击您所购买实例的名称或在其操作列单击详情,进入实例详情页面。
  7. 实例详情页面单击接入点页签,即可看到实例的接入点信息,本示例以公网接入点为例。

步骤二:创建父级Topic

MQTT协议支持多级Topic,父级Topic需在控制台创建,子级Topic无需创建,Topic详细信息,请参见名词解释。本文以在控制台创建父级Topic为例。

  1. 登录微消息队列MQTT版控制台
  2. 在左侧导航栏单击实例列表
  3. 在顶部菜单栏选择地域。
  4. 在实例列表中找到目标实例,在其操作列中,选择更多 > Topic 管理
  5. Topic 管理页面左上角,单击创建 Topic
  6. 在创建Topic面板中,输入要创建的Topic名称描述,然后在左下角单击确定
    您可以在Topic 管理页面查看刚创建的Topic。

步骤三:创建Group ID

Group ID详细信息,请参见名词解释

  1. 登录微消息队列MQTT版控制台
  2. 在左侧导航栏单击实例列表
  3. 在顶部菜单栏选择地域。
  4. 在实例列表中找到目标实例,在其操作列中,选择更多 > Group 管理
  5. Group 管理页面的左上角,单击创建 Group
  6. 在创建Group面板中,输入Group ID,然后在左下角单击确定
    您可以在Group 管理页面查看刚创建的Group。

步骤四:创建客户端上下线通知规则

规则中填写的参数需与您创建的资源保持一致。

  1. 登录微消息队列MQTT版控制台
  2. 在左侧导航栏单击实例列表
  3. 在顶部菜单栏选择地域。
  4. 在实例列表中找到目标实例,在其操作列,选择更多 > 规则管理
  5. 规则管理页面左上角,单击创建规则
  6. 创建规则页面完成以下操作。
    1. 配置基本信息配置向导页面,填写规则的基本信息,然后单击下一步
      参数 取值示例 说明
      规则ID 111111 规则的全局唯一标识,说明如下:
      • 只能包含字母、数字、短划线(-)和下划线(_),至少包含一个字母或数字。
      • 名称长度限制在3~64字符之间,长于64字符将被自动截取。
      • 创建后无法更新。
      描述 migrate from rocketmq 对规则的描述。
      状态 启用 是否启用当前规则,取值说明如下:
      • 启用
      • 停用
      规则类型 上下线通知 创建的规则类型,取值说明如下:
      • 数据流出:用于将微消息队列MQTT版的数据导出至其他阿里云产品。详细信息,请参见跨云产品的数据流出
      • 数据流入:用于将其他阿里云产品的数据导入至微消息队列MQTT版。详细信息,请参见跨云产品数据流入
      • 上下线通知:用于将获取的微消息队列MQTT版客户端上下线事件数据导出至其他阿里云产品。详细信息,请参见MQTT客户端上下线事件数据流出
    2. 配置规则源配置向导页面,配置数据源,然后单击下一步
      参数 取值示例 说明
      Group ID GID_Client_Status 指定需导出数据的设备组。Group ID的详细信息,请参见名词解释
    3. 配置规则目标配置向导页面,配置数据的流转目标,然后单击创建
      参数 取值示例 说明
      目标服务类型 消息队列 RocketMQ 版 指定您需将微消息队列MQTT版客户端上下线通知流转至哪个目标云产品。
      说明 当前仅支持消息队列RocketMQ版
      RocketMQ 实例 MQ_INST_13801563067*****_BbyOD2jQ 指定目标云产品的实例ID,即消息队列RocketMQ版的实例ID。
      说明 仅支持选择和微消息队列MQTT版实例为同一地域的云产品实例。
      Topic TopicB 指定目标云产品的资源键值,即消息队列RocketMQ版的Topic。微消息队列MQTT版客户端上下线通知信息将流转至TopicB。
    您可以在规则管理的规则列表查看到刚创建的上下线通知规则。

步骤五:调用Java SDK收发消息

  1. 下载第三方的开源Java SDK。下载地址为Eclipse Paho Java Client
  2. 下载阿里云微消息队列MQTT版的Java SDK的Demo示例作为您代码开发的参考。下载地址为mqtt-java-demo
  3. 解压该Demo工程包至您指定的文件夹。
  4. 在IntelliJ IDEA中,导入解压后的文件以创建相应的工程,并确认pom.xml中已包含以下依赖。
    <dependencies>
            <dependency>
                <groupId>commons-codec</groupId>
                <artifactId>commons-codec</artifactId>
                <version>1.10</version>
            </dependency>
            <dependency>
                <groupId>org.eclipse.paho</groupId>
                <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
                <version>1.2.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.httpcomponents</groupId>
                <artifactId>httpclient</artifactId>
                <version>4.5.2</version>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.48</version>
            </dependency>
            <dependency>
                <groupId>com.aliyun.openservices</groupId>
                <artifactId>ons-client</artifactId>
                <version>1.8.5.Final</version>
            </dependency>
            <dependency>
                <groupId>com.aliyun</groupId>
                <artifactId>aliyun-java-sdk-onsmqtt</artifactId>
                <version>1.0.3</version>
            </dependency>
            <dependency>
                <groupId>com.aliyun</groupId>
                <artifactId>aliyun-java-sdk-core</artifactId>
                <version>4.5.0</version>
            </dependency>
    </dependencies>
    说明 ons-client的版本信息,请参见版本说明
  5. MQTTClientStatusNoticeProcessDemo.java类中,按代码注释说明填写相应参数,主要涉及步骤一步骤三所创建MQTT资源以及您在消息队列RocketMQ版创建的相应资源,然后执行Main函数运行代码实现消息收发。
    示例代码如下。
    package com.aliyun.openservices.lmq.example.demo;
    
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONObject;
    import com.aliyun.openservices.ons.api.Action;
    import com.aliyun.openservices.ons.api.ConsumeContext;
    import com.aliyun.openservices.ons.api.Consumer;
    import com.aliyun.openservices.ons.api.Message;
    import com.aliyun.openservices.ons.api.MessageListener;
    import com.aliyun.openservices.ons.api.ONSFactory;
    import com.aliyun.openservices.ons.api.PropertyKeyConst;
    import java.util.Map;
    import java.util.Properties;
    import java.util.Set;
    
    public class MQTTClientStatusNoticeProcessDemo {
        public static void main(String[] args) {
            /**
             * 初始化消息队列RocketMQ版接收客户端,实际业务中一般部署在服务端应用中。
             */
            Properties properties = new Properties();
            /**
             * 设置消息队列RocketMQ版Group ID,在消息队列RocketMQ版控制台创建。
             */
            properties.setProperty(PropertyKeyConst.GROUP_ID, "GID_XXXX");
            /**
             * AccessKey ID,阿里云身份验证,在阿里云RAM控制台创建。
             */
            properties.put(PropertyKeyConst.AccessKey, "XXXX");
             /**
             * AccessKey Secret,阿里云身份验证,在阿里云RAM控制台创建。仅在签名鉴权模式下需要设置。
             */
            properties.put(PropertyKeyConst.SecretKey, "XXXX");
            /**
             * 设置TCP接入点,该接入点为消息队列RocketMQ版实例的接入点。进入消息队列RocketMQ版控制台实例详情页面获取。
             */
            properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXXX");
            /**
             * 使用消息队列RocketMQ版消费端来处理MQTT客户端的上下线通知时,订阅的Topic为上下线通知Topic。
             */
            final String parentTopic = "GID_XXXX_MQTT";
            /**
             * 客户端状态数据,实际生产环境中建议使用数据库或者Redis等外部持久化存储来保存该信息,避免应用重启丢失状态,本示例以单机内存版实现为例。
             */
            MqttClientStatusStore mqttClientStatusStore = new MemoryHashMapStoreImpl();
            Consumer consumer = ONSFactory.createConsumer(properties);
            /**
             *  此处仅处理客户端是否在线,因此只需要关注connect事件和tcpclean事件即可。
             */
            consumer.subscribe(parentTopic, "connect||tcpclean", new MqttClientStatusNoticeListener(mqttClientStatusStore));
            consumer.start();
            String clientId = "GID_XXXXX@@@XXXXX";
            while (true) {
                System.out.println("ClientStatus :" + checkClientOnline(clientId, mqttClientStatusStore));
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
        /**
         * 处理上下线通知的逻辑。
         * 实际部署过程中,消费上下线通知的应用可能部署多台机器,因此客户端在线状态的数据可以使用数据库或者Redis等外部共享存储来维护。
         * 其次需要单独做消息幂等处理,以免重复接收消息导致状态机判断错误。
         */
        static class MqttClientStatusNoticeListener implements MessageListener {
            private MqttClientStatusStore mqttClientStatusStore;
    
            public MqttClientStatusNoticeListener(
                MqttClientStatusStore mqttClientStatusStore) {
                this.mqttClientStatusStore = mqttClientStatusStore;
            }
    
            @Override
            public Action consume(Message message, ConsumeContext context) {
                try {
                    JSONObject msgBody = JSON.parseObject(new String(message.getBody()));
                    System.out.println(msgBody);
                    String eventType = msgBody.getString("eventType");
                    String clientId = msgBody.getString("clientId");
                    String channelId = msgBody.getString("channelId");
                    ClientStatusEvent event = new ClientStatusEvent();
                    event.setChannelId(channelId);
                    event.setClientIp(msgBody.getString("clientIp"));
                    event.setEventType(eventType);
                    event.setTime(msgBody.getLong("time"));
                    /**
                     * 首先存储新的事件。
                     */
                    mqttClientStatusStore.addEvent(clientId, channelId, eventType, event);
                    /**
                     * 读取当前channel的事件列表。
                     */
                    Set<ClientStatusEvent> events = mqttClientStatusStore.getEvent(clientId, channelId);
                    if (events == null || events.isEmpty()) {
                        return Action.CommitMessage;
                    }
                    /**
                     * 如果事件列表里上线和下线事件都已经收到,则当前channel已经掉线,可以清理掉这个channel的数据。
                     */
                    boolean findOnlineEvent = false;
                    boolean findOfflineEvent = false;
                    for (ClientStatusEvent clientStatusEvent : events) {
                        if (clientStatusEvent.isOnlineEvent()) {
                            findOnlineEvent = true;
                        } else {
                            findOfflineEvent = true;
                        }
                    }
                    if (findOnlineEvent && findOfflineEvent) {
                        mqttClientStatusStore.deleteEvent(clientId, channelId);
                    }
                    return Action.CommitMessage;
                } catch (Throwable e) {
                    e.printStackTrace();
                }
                return Action.ReconsumeLater;
            }
        }
    
        /**
         * 根据状态表判断一个clientId是否有活跃的TCP连接。
         * 1.如果没有channel表,则客户端一定不在线。
         * 2.如果channel表非空,检查一下channel数据中是否仅包含上线事件,如果有则代表有活跃连接,客户端在线。
         * 如果全部的channel都有掉线断开事件则客户端一定不在线。
         *
         * @param clientId
         * @param mqttClientStatusStore
         * @return
         */
        public static boolean checkClientOnline(String clientId,
            MqttClientStatusStore mqttClientStatusStore) {
            Map<String, Set<ClientStatusEvent>> channelMap = mqttClientStatusStore.getEventsByClientId(clientId);
            if (channelMap == null) {
                return false;
            }
            for (Set<ClientStatusEvent> events : channelMap.values()) {
                boolean findOnlineEvent = false;
                boolean findOfflineEvent = false;
                for (ClientStatusEvent event : events) {
                    if (event.isOnlineEvent()) {
                        findOnlineEvent = true;
                    } else {
                        findOfflineEvent = true;
                    }
                }
                if (findOnlineEvent & !findOfflineEvent) {
                    return true;
                }
            }
            return false;
        }
    
    }

结果验证

完成消息收发后,您可在微消息队列MQTT版控制台查询轨迹以验证消息是否发送并接收成功。详细信息,请参见消息轨迹查询

更多信息