您可通过调用同步查询接口或使用异步上下线通知的方式获取微消息队列 MQTT 版客户端(下文简称为客户端)当前在线情况。

微消息队列 MQTT 版需结合后端存储消息队列产品,如消息队列 RocketMQ 版等使用,来和部署在云端服务器上的应用(下文简称为业务应用)完成业务流程。

获取客户端在线状态这个功能主要的应用场景如下:

  • 主业务流程中需要根据客户端是否在线来决定后续运行逻辑;
  • 运维过程需要判断特定客户端当前是否在线;
  • 业务应用需要在客户端上线或者下线时触发一些预定义的动作。

基本原理

微消息队列 MQTT 版服务器(下文简称为 MQTT 服务器)提供以下方式获取客户端在线状态:
  • 调用同步查询 SDK(敬请期待)

    该方式相对简单,后续将提供 SDK 帮助您查询某个特定客户端的当前实时状态,适用于对单个客户端的状态判断。

    说明 不再开放此前处于公测期间的、以 HTTP/HTTPS 方式调用的 OpenAPI。
  • 异步上下线通知

    该方式使用消息通知,在客户端上线和下线事件触发时,MQTT 服务器会向后端存储消息队列推送一条上下线消息。业务应用一般部署在阿里云的服务器上,业务应用通过向后端存储消息队列订阅这条消息来获取所有客户端的上下线动作。

    该方式属于异步感知客户端的状态,且感知到的是上下线事件,而非在线状态,云端应用需要根据事件发生的时间序列分析出客户端的状态。

两种查询方式的区别如下:
  • 同步查询接口是查询当前客户端的实时状态,理论上比异步通知的方式更精确,但只能查询单个客户端状态。
  • 异步上下线通知因为采用消息解耦,状态判断更加复杂,且误判可能性更大,但该方法可以基于事件分析多个客户端的运行状态轨迹。

异步上下线通知

如上文所述,如果使用异步上下线通知的方式,上下线事件会映射到后端存储消息队列中。

下文以消息队列 RocketMQ 版作为后端存储消息队列的情况为例说明。

操作步骤

  1. 创建上下线事件对应的 Topic。

    您需关注哪些 Group ID 分组的设备,就在微消息队列 MQTT 版控制台创建对应的 Topic。创建 Topic 的步骤请参见 MQTT 快速入门

    例如您需要关注 Group ID 为 GID_XXX 类型的所有客户端,那么这类客户端对应的 Client ID 和 Topic 分别是 GID_XXX@@@YYYYYGID_XXX_MQTT

    其中:

    • GID_XXX 是在微消息队列 MQTT 版控制台上创建的 Group ID。
    • YYYYY 是设备 ID,与 Group ID 以“<GroupID>@@@<DeviceID>” 模式构成 Client ID。
    • _MQTT 是该类事件通知消息的 Topic 命名所必需的固定后缀。

    更多信息请参见名词解释

  2. 业务应用订阅该类通知消息。

    使用步骤 1 中创建的 Topic,即可收到关注的客户端的上下线事件。消息队列 RocketMQ 版的接收程序请参见订阅消息。示例代码详情请参见 MQTTClientStatusNoticeProcessDemo.java

    事件类型放在消息队列 RocketMQ 版的 Tag 中,代表上线或下线。数据格式如下:

    MQ Tag:connect/disconnect/tcpclean

    其中:

    • connect 事件代表客户端上线动作。
    • disconnect 事件代表客户端主动断开连接。按照 MQTT 协议,客户端主动断开 TCP 连接之前应该发送 disconnect 报文,MQTT 服务器在收到 disconnect 报文后触发该类型消息。如果某些客户端 SDK 没有按照协议发送 disconnect 报文,MQTT 服务器相应无法收到该消息。
    • tcpclean 事件代表实际的 TCP 连接断开。无论客户端是否显示发送过 disconnect 报文,只要当前 TCP 连接断开就会触发 tcpclean 事件。
    说明

    tcpclean 消息代表客户端网络层连接的真实断开。对应的,disconnect 消息仅仅代表客户端是主动发送了下线报文。受限于客户端的实现,有时候客户端异常退出会导致 disconnect 消息并没有正常发送。因此判断客户端下线请使用 tcpclean 事件。

    数据内容为 JSON 类型,相关的 Key 说明如下:

    • clientId 代表具体设备;
    • time 代表本次事件的时间;
    • eventType 代表事件类型,供客户端区分事件类型;
    • channelId 代表每个 TCP 连接的唯一标识;
    • clientIp 代表客户端使用的公网出口 IP 地址。

    示例:

    clientId:GID_XXX@@@YYYYY
    time:1212121212
    eventType:connect/disconnect/tcpclean
    channelId:2b9b1281046046faafe5e0b458e4XXXX
    clientIp:192.168.X.X:133XX     

    判断客户端当前是否在线不能仅仅根据收到的最后一条消息的状态,而需要结合上下线消息的前后关联来判断。

    具体判断规则如下:

    • 同一个 clientId 的客户端,产生上下线事件的先后顺序以时间为准,基本原则为时间戳越大则越新。
    • 同一个 clientId 的客户端,可能存在多次闪断,因此,当收到下线消息时,一定要根据 channelId 字段判断是否是当前的 TCP 连接。简而言之,下线消息只能覆盖 channelId 相同的下线消息,如果下线消息的 channelId 不一样,尽管 time 较新,也不能覆盖。一个 channelId 代表一个 TCP 连接,只会存在一个 connect 事件和一个 close 事件。
package com.aliyun.openservices.lmq.example.demo;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

public class MQTTClientStatusNoticeProcessDemo {
    public static void main(String[] args) {
        /**
         * 初始化消息队列 RocketMQ 版接收客户端,实际业务中一般部署在服务端应用中。
         */
        Properties properties = new Properties();
        /**
         * 设置 RocketMQ 客户端的 Group ID,注意此处的 groupId 和 MQTT 实例中的 groupId 是两个概念,请按照各自产品的说明申请填写。
         */
        properties.setProperty(PropertyKeyConst.GROUP_ID, "GID_XXXX");
        /**
         * 账号 AccessKeyId,从账号系统控制台获取。
         */
        properties.put(PropertyKeyConst.AccessKey, "XXXX");
        /**
         * 账号 AccessKeySecret,从账号系统控制台获取,仅在 Signature 鉴权模式下需要设置。
         */
        properties.put(PropertyKeyConst.SecretKey, "XXXX");
        /**
         * 设置 TCP 接入域名
         */
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "http://XXXX");
        /**
         * 使用 RocketMQ 消费端来处理 MQTT 客户端的上下线通知时,订阅的 Topic 为上下线通知 Topic,请遵循控制台文档提前创建。
         */
        final String parentTopic = "GID_XXXX_MQTT";
        /**
         * 客户端状态数据,实际生产环境中建议使用数据库或者 Redis等外部持久化存储来保存该信息,避免应用重启丢失状态,本 Demo 以单机内存版实现做演示。
         */
        MqttClientStatusStore mqttClientStatusStore = new MemoryHashMapStoreImpl();
        Consumer consumer = ONSFactory.createConsumer(properties);
        /**
         *  此处仅处理客户端是否在线,因此只需要关注 connect 事件和 tcpclean 事件即可。
         */
        consumer.subscribe(parentTopic, "connect||tcpclean", new MqttClientStatusNoticeListener(mqttClientStatusStore));
        consumer.start();
        String clientId = "GID_XXXXxXX@@@XXXXX";
        while (true) {
            System.out.println("ClientStatus :" + checkClientOnline(clientId, mqttClientStatusStore));
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 处理上下线通知的逻辑。处理状态机参见文档:https://help.aliyun.com/document_detail/50069.html。
     * 实际部署过程中,消费上下线通知的应用可能部署多台机器,因此客户端在线状态的数据可以使用数据库或者 Redis 等外部共享存储来维护。
     * 其次需要单独做消息幂等处理,以免重复接收消息导致状态机判断错误。
     */
    static class MqttClientStatusNoticeListener implements MessageListener {
        private MqttClientStatusStore mqttClientStatusStore;

        public MqttClientStatusNoticeListener(
            MqttClientStatusStore mqttClientStatusStore) {
            this.mqttClientStatusStore = mqttClientStatusStore;
        }

        @Override
        public Action consume(Message message, ConsumeContext context) {
            try {
                JSONObject msgBody = JSON.parseObject(new String(message.getBody()));
                System.out.println(msgBody);
                String eventType = msgBody.getString("eventType");
                String clientId = msgBody.getString("clientId");
                String channelId = msgBody.getString("channelId");
                ClientStatusEvent event = new ClientStatusEvent();
                event.setChannelId(channelId);
                event.setClientIp(msgBody.getString("clientIp"));
                event.setEventType(eventType);
                event.setTime(msgBody.getLong("time"));
                /**
                 * 首先存储新的事件。
                 */
                mqttClientStatusStore.addEvent(clientId, channelId, eventType, event);
                /**
                 * 读取当前 channel 的事件列表。
                 */
                Set<ClientStatusEvent> events = mqttClientStatusStore.getEvent(clientId, channelId);
                if (events == null || events.isEmpty()) {
                    return Action.CommitMessage;
                }
                /**
                 * 如果事件列表里上线和下线事件都已经收到,则当前 channel 已经掉线,可以清理掉这个 channel 的数据。
                 */
                boolean findOnlineEvent = false;
                boolean findOfflineEvent = false;
                for (ClientStatusEvent clientStatusEvent : events) {
                    if (clientStatusEvent.isOnlineEvent()) {
                        findOnlineEvent = true;
                    } else {
                        findOfflineEvent = true;
                    }
                }
                if (findOnlineEvent && findOfflineEvent) {
                    mqttClientStatusStore.deleteEvent(clientId, channelId);
                }
                return Action.CommitMessage;
            } catch (Throwable e) {
                e.printStackTrace();
            }
            return Action.ReconsumeLater;
        }
    }

    /**
     * 根据状态表判断一个 clientId 是否有活跃的 TCP 连接。
     * 1.如果没有 channel 表,则一定不在线。
     * 2.如果 channel 表非空,检查一下 channel 数据中是否仅包含上线事件,如果有则代表有活跃连接,在线。
     * 如果全部的 channel 都有掉线断开事件则一定是不在线。
     *
     * @param clientId
     * @param mqttClientStatusStore
     * @return
     */
    public static boolean checkClientOnline(String clientId,
        MqttClientStatusStore mqttClientStatusStore) {
        Map<String, Set<ClientStatusEvent>> channelMap = mqttClientStatusStore.getEventsByClientId(clientId);
        if (channelMap == null) {
            return false;
        }
        for (Set<ClientStatusEvent> events : channelMap.values()) {
            boolean findOnlineEvent = false;
            boolean findOfflineEvent = false;
            for (ClientStatusEvent event : events) {
                if (event.isOnlineEvent()) {
                    findOnlineEvent = true;
                } else {
                    findOfflineEvent = true;
                }
            }
            if (findOnlineEvent & !findOfflineEvent) {
                return true;
            }
        }
        return false;
    }

}