您可通过调用同步查询接口或使用异步上下线通知的方式获取云消息队列 MQTT 版客户端(下文简称为客户端)当前在线情况。

应用场景

云消息队列 MQTT 版需结合后端存储消息队列产品,如云消息队列 RocketMQ 版等使用,来和部署在云端服务器上的应用(下文简称为业务应用)完成业务流程。

获取客户端在线状态主要的应用场景如下:
  • 主业务流程中需要根据客户端是否在线来决定后续运行逻辑;
  • 运维过程需要判断特定客户端当前是否在线;
  • 业务应用需要在客户端上线或者下线时触发一些预定义的动作。

基本原理

云消息队列 MQTT 版服务器(下文简称为MQTT服务器)使用异步上下线通知方式获取客户端在线状态。

该方式使用消息通知,在客户端上线和下线事件触发时,MQTT服务器会向后端存储消息队列推送一条上下线消息。业务应用一般部署在阿里云的服务器上,业务应用通过向后端存储消息队列订阅这条消息来获取所有客户端的上下线动作。

该方式属于异步感知客户端的状态,且感知到的是上下线事件,而非在线状态,云端应用需要根据事件发生的时间序列分析出客户端的状态。

异步上下线通知

如上文所述,使用异步上下线通知的方式,上下线事件会映射到后端存储消息队列中。

下文以云消息队列 RocketMQ 版作为后端存储消息队列的情况为例说明。

操作步骤

  1. 创建上下线事件对应的Topic。

    您需关注哪些Group ID分组的设备,就在云消息队列 MQTT 版控制台创建对应的Topic。创建Topic的步骤请参见MQTT快速入门

    例如您需要关注Group ID为GID_XXX类型的所有客户端,那么这类客户端对应的Client ID和Topic分别是GID_XXX@@@YYYYYGID_XXX_MQTT

    其中:

    • GID_XXX是在云消息队列 MQTT 版控制台上创建的Group ID。
    • YYYYY是设备ID,与Group ID以“<GroupID>@@@<DeviceID>” 模式构成Client ID。
    • _MQTT是该类事件通知消息的Topic命名所必需的固定后缀。

    更多信息请参见名词解释

  2. 业务应用订阅该类通知消息。

    使用步骤1中创建的Topic,即可收到关注的客户端的上下线事件。云消息队列 RocketMQ 版的接收程序请参见订阅消息。示例代码详细信息,请参见MQTTClientStatusNoticeProcessDemo.java

    事件类型放在云消息队列 RocketMQ 版的Tag中,代表上线或下线。数据格式如下:

    MQ Tag:connect/disconnect/tcpclean

    其中:

    • connect事件代表客户端上线动作。
    • disconnect事件代表客户端主动断开连接。按照MQTT协议,客户端主动断开TCP连接之前应该发送disconnect 报文,MQTT服务器在收到disconnect 报文后触发该类型消息。如果某些客户端SDK没有按照协议发送disconnect 报文,MQTT服务器相应无法收到该消息。
    • tcpclean事件代表实际的TCP连接断开。无论客户端是否显示发送过disconnect 报文,只要当前TCP连接断开就会触发tcpclean事件。
    说明

    tcpclean消息代表客户端网络层连接的真实断开。对应的,disconnect消息仅仅代表客户端是主动发送了下线报文。受限于客户端的实现,有时候客户端异常退出会导致disconnect消息并没有正常发送。因此判断客户端下线请使用tcpclean事件。

    数据内容为JSON类型,相关的Key说明如下:

    • clientId代表具体设备;
    • time代表本次事件的时间;
    • eventType代表事件类型,供客户端区分事件类型;
    • channelId代表每个TCP连接的唯一标识;
    • clientIp代表客户端使用的公网出口IP地址。

    示例:

    clientId:GID_XXX@@@YYYYY
    time:1212121212
    eventType:connect/disconnect/tcpclean
    channelId:2b9b1281046046faafe5e0b458e4XXXX
    clientIp:192.168.XX.XX:133XX     

    判断客户端当前是否在线不能仅仅根据收到的最后一条消息的状态,而需要结合上下线消息的前后关联来判断。

    具体判断规则如下:

    • 同一个clientId的客户端,产生上下线事件的先后顺序以时间为准,基本原则为时间戳越大则越新。
    • 同一个clientId的客户端,可能存在多次闪断,因此,当收到下线消息时,一定要根据channelId字段判断是否是当前的TCP连接。简而言之,下线消息只能覆盖channelId相同的下线消息,如果下线消息的channelId不一样,尽管time较新,也不能覆盖。一个channelId代表一个TCP连接,只会存在一个connect事件和一个close事件。
package com.aliyun.openservices.lmq.example.demo;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

public class MQTTClientStatusNoticeProcessDemo {
    public static void main(String[] args) {
        /**
         * 初始化消息队列RocketMQ版接收客户端,实际业务中一般部署在服务端应用中。
         */
        Properties properties = new Properties();
        /**
         * 设置RocketMQ客户端的Group ID,注意此处的groupId和MQTT实例中的groupId是两个概念,请按照各自产品的说明申请填写。
         */
        properties.setProperty(PropertyKeyConst.GROUP_ID, "GID_XXXX");
        /**
         * 账号AccessKey ID,从账号系统控制台获取。
         */
        properties.put(PropertyKeyConst.AccessKey, "XXXX");
        /**
         * 账号AccessKey Secret,从账号系统控制台获取,仅在Signature鉴权模式下需要设置。
         */
        properties.put(PropertyKeyConst.SecretKey, "XXXX");
        /**
         * 设置TCP接入域名。
         */
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "http://XXXX");
        /**
         * 使用RocketMQ消费端来处理MQTT客户端的上下线通知时,订阅的Topic为上下线通知Topic,请遵循控制台文档提前创建。
         */
        final String parentTopic = "GID_XXXX_MQTT";
        /**
         * 客户端状态数据,实际生产环境中建议使用数据库或者Redis等外部持久化存储来保存该信息,避免应用重启丢失状态,本Demo以单机内存版实现做演示。
         */
        MqttClientStatusStore mqttClientStatusStore = new MemoryHashMapStoreImpl();
        Consumer consumer = ONSFactory.createConsumer(properties);
        /**
         *  此处仅处理客户端是否在线,因此只需要关注connect事件和tcpclean事件即可。
         */
        consumer.subscribe(parentTopic, "connect||tcpclean", new MqttClientStatusNoticeListener(mqttClientStatusStore));
        consumer.start();
        String clientId = "GID_XXXXxXX@@@XXXXX";
        while (true) {
            System.out.println("ClientStatus :" + checkClientOnline(clientId, mqttClientStatusStore));
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 处理上下线通知的逻辑。
     * 实际部署过程中,消费上下线通知的应用可能部署多台机器,因此客户端在线状态的数据可以使用数据库或者Redis等外部共享存储来维护。
     * 其次需要单独做消息幂等处理,以免重复接收消息导致状态机判断错误。
     */
    static class MqttClientStatusNoticeListener implements MessageListener {
        private MqttClientStatusStore mqttClientStatusStore;

        public MqttClientStatusNoticeListener(
            MqttClientStatusStore mqttClientStatusStore) {
            this.mqttClientStatusStore = mqttClientStatusStore;
        }

        @Override
        public Action consume(Message message, ConsumeContext context) {
            try {
                JSONObject msgBody = JSON.parseObject(new String(message.getBody()));
                System.out.println(msgBody);
                String eventType = msgBody.getString("eventType");
                String clientId = msgBody.getString("clientId");
                String channelId = msgBody.getString("channelId");
                ClientStatusEvent event = new ClientStatusEvent();
                event.setChannelId(channelId);
                event.setClientIp(msgBody.getString("clientIp"));
                event.setEventType(eventType);
                event.setTime(msgBody.getLong("time"));
                /**
                 * 首先存储新的事件。
                 */
                mqttClientStatusStore.addEvent(clientId, channelId, eventType, event);
                /**
                 * 读取当前channel的事件列表。
                 */
                Set<ClientStatusEvent> events = mqttClientStatusStore.getEvent(clientId, channelId);
                if (events == null || events.isEmpty()) {
                    return Action.CommitMessage;
                }
                /**
                 * 如果事件列表里上线和下线事件都已经收到,则当前channel已经掉线,可以清理掉这个channel的数据。
                 */
                boolean findOnlineEvent = false;
                boolean findOfflineEvent = false;
                for (ClientStatusEvent clientStatusEvent : events) {
                    if (clientStatusEvent.isOnlineEvent()) {
                        findOnlineEvent = true;
                    } else {
                        findOfflineEvent = true;
                    }
                }
                if (findOnlineEvent && findOfflineEvent) {
                    mqttClientStatusStore.deleteEvent(clientId, channelId);
                }
                return Action.CommitMessage;
            } catch (Throwable e) {
                e.printStackTrace();
            }
            return Action.ReconsumeLater;
        }
    }

    /**
     * 根据状态表判断一个clientId是否有活跃的TCP连接。
     * 1.如果没有channel表,则一定不在线。
     * 2.如果channel表非空,检查一下channel数据中是否仅包含上线事件,如果有则代表有活跃连接在线。
     * 如果全部的channel都有掉线断开事件则一定是不在线。
     *
     * @param clientId
     * @param mqttClientStatusStore
     * @return
     */
    public static boolean checkClientOnline(String clientId,
        MqttClientStatusStore mqttClientStatusStore) {
        Map<String, Set<ClientStatusEvent>> channelMap = mqttClientStatusStore.getEventsByClientId(clientId);
        if (channelMap == null) {
            return false;
        }
        for (Set<ClientStatusEvent> events : channelMap.values()) {
            boolean findOnlineEvent = false;
            boolean findOfflineEvent = false;
            for (ClientStatusEvent event : events) {
                if (event.isOnlineEvent()) {
                    findOnlineEvent = true;
                } else {
                    findOfflineEvent = true;
                }
            }
            if (findOnlineEvent & !findOfflineEvent) {
                return true;
            }
        }
        return false;
    }

}