您可通过配置微消息队列 MQTT 版的客户端上下线通知规则,将获取的 MQTT 客户端上下线事件数据导出至其他阿里云产品。该方法为异步上下线通知。本文介绍客户端上下线通知的原理、应用场景以及微消息队列 MQTT 版与其他阿里云产品的资源映射关系。

基本原理

在客户端上线和下线事件触发时,MQTT 服务器会根据您配置的客户端上下线通知规则,向后端其他云产品推送一条上下线消息。业务应用一般部署在阿里云的服务器上,业务应用通过向后端云产品订阅这条消息来获取所有客户端的上下线动作。

该方式属于异步感知客户端的状态,且感知到的是上下线事件,而非在线状态,云端应用需要根据事件发生的时间序列分析出客户端的状态。

异步上下线通知因为采用消息解耦,状态判断更加复杂,且误判可能性更大,但该方法可以基于事件分析多个客户端的运行状态轨迹。

注意 当前支持的其他后端云产品仅有消息队列 RocketMQ 版

应用场景

客户端上下线通知主要的应用场景为业务应用需要在客户端上线或者下线时触发一些预定义的动作。

例如,客户端在线状态聚合。此场景中,MQTT 客户端产生上下线等状态变更,微消息队列 MQTT 版会根据您配置的客户端状态通知规则,将状态变更封装后转发到消息队列 RocketMQ 版消息的方式来实现客户端状态数据的聚合和统计。

说明 针对其他场景,推荐您使用同步查询接口来获取客户端在线状态。详情请参见获取 MQTT 客户端在线状态

资源映射方式

同一个微消息队列 MQTT 版 Group ID 下的所有客户端的状态变更通知都会转发到您配置的同一个其他阿里云产品的资源里。

表 1. 映射关系
MQTT 资源 其他阿里云产品 其他阿里云产品资源 数据包定义
MQTT Group ID 消息队列 RocketMQ 版 消息队列 RocketMQ 版的 Topic MQTT 与 RocketMQ 的消息结构映射

操作流程

如上文所述,如果使用异步上下线通知的方式,您需创建客户端上下线事件通知规则,将上下线通知的消息导出至后端云产品中。下文以使用消息队列 RocketMQ 版后端云产品为例进行说明。

  1. 创建上下线通知规则。

    您需关注哪些 Group ID 分组的设备,就在微消息队列 MQTT 版控制台创建规则时,选定相应的 Group ID。创建规则的步骤请参见创建上下线通知规则

  2. 业务应用订阅该类通知消息。

    通过步骤 1 中创建的规则,即可收到关注的客户端的上下线事件。消息队列 RocketMQ 版的接收程序请参见订阅消息。示例代码详情请参见 MQTTClientStatusNoticeProcessDemo.java

    事件类型放在消息队列 RocketMQ 版的 Tag 中,代表上线或下线。数据格式如下:

    MQ Tag:connect/disconnect/tcpclean

    其中:

    • connect 事件代表客户端上线动作。
    • disconnect 事件代表客户端主动断开连接。按照 MQTT 协议,客户端主动断开 TCP 连接之前应该发送 disconnect 报文,MQTT 服务器在收到 disconnect 报文后触发该类型消息。如果某些客户端 SDK 没有按照协议发送 disconnect 报文,MQTT 服务器相应无法收到该消息。
    • tcpclean 事件代表实际的 TCP 连接断开。无论客户端是否显示发送过 disconnect 报文,只要当前 TCP 连接断开就会触发 tcpclean 事件。
    说明

    tcpclean 消息代表客户端网络层连接的真实断开。对应的,disconnect 消息仅仅代表客户端是主动发送了下线报文。受限于客户端的实现,有时候客户端异常退出会导致 disconnect 消息并没有正常发送。因此判断客户端下线请使用 tcpclean 事件。

    数据内容为 JSON 类型,相关的 Key 说明如下:

    • clientId 代表具体设备。
    • time 代表本次事件的时间。
    • eventType 代表事件类型,供客户端区分事件类型。
    • channelId 代表每个 TCP 连接的唯一标识。
    • clientIp 代表客户端使用的公网出口 IP 地址。

    示例如下:

    clientId:GID_XXX@@@YYYYY
    time:1212121212
    eventType:connect/disconnect/tcpclean
    channelId:2b9b1281046046faafe5e0b458e4XXXX
    clientIp:192.168.X.X:133XX     

    判断客户端当前是否在线不能仅仅根据收到的最后一条消息的状态,而需要结合上下线消息的前后关联来判断。

    具体判断规则如下:

    • 同一个 clientId 的客户端,产生上下线事件的先后顺序以时间为准,基本原则为时间戳越大则越新。
    • 同一个 clientId 的客户端,可能存在多次闪断,因此,当收到下线消息时,一定要根据 channelId 字段判断是否是当前的 TCP 连接。简而言之,下线消息只能覆盖 channelId 相同的下线消息,如果下线消息的 channelId 不一样,尽管 time 较新,也不能覆盖。一个 channelId 代表一个 TCP 连接,只会存在一个 connect 事件和一个 close 事件。
package com.aliyun.openservices.lmq.example.demo;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

public class MQTTClientStatusNoticeProcessDemo {
    public static void main(String[] args) {
        /**
         * 初始化消息队列 RocketMQ 版接收客户端,实际业务中一般部署在服务端应用中。
         */
        Properties properties = new Properties();
        /**
         * 设置 RocketMQ 客户端的 Group ID,注意此处的 groupId 和 MQTT 实例中的 groupId 是两个概念,请按照各自产品的说明申请填写。
         */
        properties.setProperty(PropertyKeyConst.GROUP_ID, "GID_XXXX");
        /**
         * 账号 AccessKeyId,从账号系统控制台获取。
         */
        properties.put(PropertyKeyConst.AccessKey, "XXXX");
        /**
         * 账号 AccessKeySecret,从账号系统控制台获取,仅在 Signature 鉴权模式下需要设置。
         */
        properties.put(PropertyKeyConst.SecretKey, "XXXX");
        /**
         * 设置 TCP 接入域名
         */
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "http://XXXX");
        /**
         * 使用 RocketMQ 消费端来处理 MQTT 客户端的上下线通知时,订阅的 Topic 为上下线通知 Topic,请遵循控制台文档提前创建。
         */
        final String parentTopic = "GID_XXXX_MQTT";
        /**
         * 客户端状态数据,实际生产环境中建议使用数据库或者 Redis等外部持久化存储来保存该信息,避免应用重启丢失状态,本 Demo 以单机内存版实现做演示。
         */
        MqttClientStatusStore mqttClientStatusStore = new MemoryHashMapStoreImpl();
        Consumer consumer = ONSFactory.createConsumer(properties);
        /**
         *  此处仅处理客户端是否在线,因此只需要关注 connect 事件和 tcpclean 事件即可。
         */
        consumer.subscribe(parentTopic, "connect||tcpclean", new MqttClientStatusNoticeListener(mqttClientStatusStore));
        consumer.start();
        String clientId = "GID_XXXXxXX@@@XXXXX";
        while (true) {
            System.out.println("ClientStatus :" + checkClientOnline(clientId, mqttClientStatusStore));
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 处理上下线通知的逻辑。处理状态机参见文档:https://help.aliyun.com/document_detail/50069.html。
     * 实际部署过程中,消费上下线通知的应用可能部署多台机器,因此客户端在线状态的数据可以使用数据库或者 Redis 等外部共享存储来维护。
     * 其次需要单独做消息幂等处理,以免重复接收消息导致状态机判断错误。
     */
    static class MqttClientStatusNoticeListener implements MessageListener {
        private MqttClientStatusStore mqttClientStatusStore;

        public MqttClientStatusNoticeListener(
            MqttClientStatusStore mqttClientStatusStore) {
            this.mqttClientStatusStore = mqttClientStatusStore;
        }

        @Override
        public Action consume(Message message, ConsumeContext context) {
            try {
                JSONObject msgBody = JSON.parseObject(new String(message.getBody()));
                System.out.println(msgBody);
                String eventType = msgBody.getString("eventType");
                String clientId = msgBody.getString("clientId");
                String channelId = msgBody.getString("channelId");
                ClientStatusEvent event = new ClientStatusEvent();
                event.setChannelId(channelId);
                event.setClientIp(msgBody.getString("clientIp"));
                event.setEventType(eventType);
                event.setTime(msgBody.getLong("time"));
                /**
                 * 首先存储新的事件。
                 */
                mqttClientStatusStore.addEvent(clientId, channelId, eventType, event);
                /**
                 * 读取当前 channel 的事件列表。
                 */
                Set<ClientStatusEvent> events = mqttClientStatusStore.getEvent(clientId, channelId);
                if (events == null || events.isEmpty()) {
                    return Action.CommitMessage;
                }
                /**
                 * 如果事件列表里上线和下线事件都已经收到,则当前 channel 已经掉线,可以清理掉这个 channel 的数据。
                 */
                boolean findOnlineEvent = false;
                boolean findOfflineEvent = false;
                for (ClientStatusEvent clientStatusEvent : events) {
                    if (clientStatusEvent.isOnlineEvent()) {
                        findOnlineEvent = true;
                    } else {
                        findOfflineEvent = true;
                    }
                }
                if (findOnlineEvent && findOfflineEvent) {
                    mqttClientStatusStore.deleteEvent(clientId, channelId);
                }
                return Action.CommitMessage;
            } catch (Throwable e) {
                e.printStackTrace();
            }
            return Action.ReconsumeLater;
        }
    }

    /**
     * 根据状态表判断一个 clientId 是否有活跃的 TCP 连接。
     * 1.如果没有 channel 表,则一定不在线。
     * 2.如果 channel 表非空,检查一下 channel 数据中是否仅包含上线事件,如果有则代表有活跃连接,在线。
     * 如果全部的 channel 都有掉线断开事件则一定是不在线。
     *
     * @param clientId
     * @param mqttClientStatusStore
     * @return
     */
    public static boolean checkClientOnline(String clientId,
        MqttClientStatusStore mqttClientStatusStore) {
        Map<String, Set<ClientStatusEvent>> channelMap = mqttClientStatusStore.getEventsByClientId(clientId);
        if (channelMap == null) {
            return false;
        }
        for (Set<ClientStatusEvent> events : channelMap.values()) {
            boolean findOnlineEvent = false;
            boolean findOfflineEvent = false;
            for (ClientStatusEvent event : events) {
                if (event.isOnlineEvent()) {
                    findOnlineEvent = true;
                } else {
                    findOfflineEvent = true;
                }
            }
            if (findOnlineEvent & !findOfflineEvent) {
                return true;
            }
        }
        return false;
    }

}

更多信息

如需了解控制台上的操作,请参见上下线通知规则管理