MQTT客户端上下线事件数据流出

更新时间:

您可通过配置云消息队列 MQTT 版的客户端上下线通知规则,将获取的MQTT客户端上下线事件数据导出至其他阿里云产品。该方法为异步上下线通知。本文介绍客户端上下线通知的原理、应用场景、使用限制以及云消息队列 MQTT 版与其他阿里云产品的资源映射关系。

基本原理

在客户端上线和下线事件触发时,MQTT服务器会根据您配置的客户端上下线通知规则,向后端其他云产品推送一条上下线消息。业务应用一般部署在阿里云的服务器上,业务应用通过向后端云产品订阅这条消息来获取所有客户端的上下线动作。上下线事件流出

该方式属于异步感知客户端的状态,且感知到的是上下线事件,而非在线状态,云端应用需要根据事件发生的时间序列分析出客户端的状态。

异步上下线通知因为采用消息解耦,状态判断更加复杂,且误判可能性更大,但该方法可以基于事件分析多个客户端的运行状态轨迹。

应用场景

客户端上下线通知主要的应用场景为业务应用需要在客户端上线或者下线时触发一些预定义的动作。

例如,客户端在线状态聚合。此场景中,MQTT客户端产生上下线等状态变更,云消息队列 MQTT 版会根据您配置的客户端状态通知规则,将状态变更封装后转发到云消息队列 RocketMQ 版消息的方式来实现客户端状态数据的聚合和统计。

说明

针对其他场景,推荐您使用同步查询接口来获取客户端在线状态。详细信息,请参见获取MQTT客户端在线状态

使用限制

限制项

限制值

说明

单实例规则数量

100

如果默认限制不满足,请联系云消息队列 MQTT 版技术支持,钉钉群号:35228338。

规则去重限制

同一个内部资源同种规则只能创建一个规则。

例如一个Group ID只能创建一个上下线通知规则,一个MQTT Topic只能创建一个数据流入规则和一个数据流出规则。

地域限制

不支持跨地域创建规则,规则的数据源和数据目标所属的实例必须处于同一地域。

例如,创建数据流出规则,数据源云消息队列 MQTT 版实例属于华东1(杭州)地域,则数据目标云消息队列 RocketMQ 版只能选择华东1(杭州)地域的实例。

云消息队列 MQTT 版实例版本

仅新版本的实例支持。

新购的云消息队列 MQTT 版实例默认为新版本实例,旧版实例已不支持购买。

云消息队列 RocketMQ 版实例版本

仅4.0系列实例支持

云消息队列 MQTT 版云消息队列 RocketMQ 版通过消息流入或消息流出规则进行数据互通时,云消息队列 RocketMQ 版仅4.0系列实例支持消息流入或流出规则,5.0系列实例不支持。

资源映射方式

同一个云消息队列 MQTT 版Group ID下的所有客户端的状态变更通知都会转发到您配置的同一个其他阿里云产品的资源里。

表 1. 映射关系

MQTT资源

其他阿里云产品

其他阿里云产品资源

数据包定义

MQTT Group ID

云消息队列 RocketMQ 版

云消息队列 RocketMQ 版的Topic

MQTT和RocketMQ的消息结构映射

操作流程

如上文所述,如果使用异步上下线通知的方式,您需创建客户端上下线事件通知规则,将上下线通知的消息导出至后端云产品中。下文以使用云消息队列 RocketMQ 版后端云产品为例进行说明。

  1. 创建上下线通知规则。

    您需关注哪些Group ID分组的设备,就在云消息队列 MQTT 版控制台创建规则时,选定相应的Group ID。创建规则的详细步骤,请参见创建上下线通知规则

  2. 业务应用订阅该类通知消息。

    通过步骤1中创建的规则,即可收到关注的客户端的上下线事件。云消息队列 RocketMQ 版的接收程序,请参见订阅消息。示例代码详细信息,请参见MQTTClientStatusNoticeProcessDemo.java

    事件类型放在云消息队列 RocketMQ 版的Tag中,代表上线或下线。数据格式如下:

    MQ Tag:connect/disconnect/tcpclean

    其中:

    • connect事件代表客户端上线动作。

    • disconnect事件代表客户端主动断开连接。按照MQTT协议,客户端主动断开TCP连接之前应该发送disconnect 报文,MQTT服务器在收到disconnect 报文后触发该类型消息。如果某些客户端SDK没有按照协议发送disconnect 报文,MQTT服务器相应无法收到该消息。

    • tcpclean事件代表实际的TCP连接断开。无论客户端是否显示发送过disconnect 报文,只要当前TCP连接断开就会触发tcpclean事件。

    说明

    tcpclean消息代表客户端网络层连接的真实断开。对应的,disconnect消息仅仅代表客户端是主动发送了下线报文。受限于客户端的实现,有时候客户端异常退出会导致disconnect消息并没有正常发送。因此判断客户端下线请使用tcpclean事件。

    数据内容为JSON类型,相关的Key说明如下:

    • clientId代表具体设备。

    • time代表本次事件的时间。

    • eventType代表事件类型,供客户端区分事件类型。

    • channelId代表每个TCP连接的唯一标识。

    • clientIp代表客户端使用的公网出口IP地址。

    • certificateChainSn代表客户端设备的证书链的SN序列号。格式为{设备证书SN,CA证书SN}。

      仅当客户端接入时使用证书认证功能,才包含该参数。更多信息,请参见客户端证书认证

    示例如下:

    clientId:GID_XXX@@@YYYYY
    time:1212121212
    eventType:connect/disconnect/tcpclean
    channelId:2b9b1281046046faafe5e0b458e4XXXX
    clientIp:192.168.XX.XX:133XX  
    certificateChainSn:1254685.....2458655,154689536.....25655

    判断客户端当前是否在线不能仅仅根据收到的最后一条消息的状态,而需要结合上下线消息的前后关联来判断。

    具体判断规则如下:

    • 同一个clientId的客户端,产生上下线事件的先后顺序以时间为准,基本原则为时间戳越大则越新。

    • 同一个clientId的客户端,可能存在多次闪断,因此,当收到下线消息时,一定要根据channelId字段判断是否是当前的TCP连接。简而言之,下线消息只能覆盖channelId相同的下线消息,如果下线消息的channelId不一样,尽管time较新,也不能覆盖。一个channelId代表一个TCP连接,只会存在一个connect事件和一个close事件。

异步上下线通知Java示例代码

package com.aliyun.openservices.lmq.example.demo;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

public class MQTTClientStatusNoticeProcessDemo {
    public static void main(String[] args) {
        /**
         * 初始化消息队列RocketMQ版接收客户端,实际业务中一般部署在服务端应用中。
         */
        Properties properties = new Properties();
        /**
         * 设置RocketMQ客户端的Group ID,注意此处的groupId和MQTT实例中的groupId是两个概念,请按照各自产品的说明申请填写。
         */
        properties.setProperty(PropertyKeyConst.GROUP_ID, "GID_XXXX");
        /**
         * 账号AccessKey ID,从控制台获取。
         */
        properties.put(PropertyKeyConst.AccessKey, "XXXX");
        /**
         * 账号AccessKey Secret,从控制台获取,仅在Signature鉴权模式下需要设置。
         */
        properties.put(PropertyKeyConst.SecretKey, "XXXX");
        /**
         * 设置TCP接入域名。
         */
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "http://XXXX");
        /**
         * 使用RocketMQ消费端来处理MQTT客户端的上下线通知时,订阅的Topic为上下线通知Topic,请遵循控制台文档提前创建。
         */
        final String parentTopic = "GID_XXXX_MQTT";
        /**
         * 客户端状态数据,实际生产环境中建议使用数据库或者Redis等外部持久化存储来保存该信息,避免应用重启丢失状态,本Demo以单机内存版实现做演示。
         */
        MqttClientStatusStore mqttClientStatusStore = new MemoryHashMapStoreImpl();
        Consumer consumer = ONSFactory.createConsumer(properties);
        /**
         *  此处仅处理客户端是否在线,因此只需要关注connect事件和tcpclean事件即可。
         */
        consumer.subscribe(parentTopic, "connect||tcpclean", new MqttClientStatusNoticeListener(mqttClientStatusStore));
        consumer.start();
        String clientId = "GID_XXXXxXX@@@XXXXX";
        while (true) {
            System.out.println("ClientStatus :" + checkClientOnline(clientId, mqttClientStatusStore));
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 处理上下线通知的逻辑。
     * 实际部署过程中,消费上下线通知的应用可能部署多台机器,因此客户端在线状态的数据可以使用数据库或者Redis等外部共享存储来维护。
     * 其次需要单独做消息幂等处理,以免重复接收消息导致状态机判断错误。
     */
    static class MqttClientStatusNoticeListener implements MessageListener {
        private MqttClientStatusStore mqttClientStatusStore;

        public MqttClientStatusNoticeListener(
            MqttClientStatusStore mqttClientStatusStore) {
            this.mqttClientStatusStore = mqttClientStatusStore;
        }

        @Override
        public Action consume(Message message, ConsumeContext context) {
            try {
                JSONObject msgBody = JSON.parseObject(new String(message.getBody()));
                System.out.println(msgBody);
                String eventType = msgBody.getString("eventType");
                String clientId = msgBody.getString("clientId");
                String channelId = msgBody.getString("channelId");
                ClientStatusEvent event = new ClientStatusEvent();
                event.setChannelId(channelId);
                event.setClientIp(msgBody.getString("clientIp"));
                event.setEventType(eventType);
                event.setTime(msgBody.getLong("time"));
                /**
                 * 首先存储新的事件。
                 */
                mqttClientStatusStore.addEvent(clientId, channelId, eventType, event);
                /**
                 * 读取当前channel的事件列表。
                 */
                Set<ClientStatusEvent> events = mqttClientStatusStore.getEvent(clientId, channelId);
                if (events == null || events.isEmpty()) {
                    return Action.CommitMessage;
                }
                /**
                 * 如果事件列表里上线和下线事件都已经收到,则当前channel已经掉线,可以清理掉这个channel的数据。
                 */
                boolean findOnlineEvent = false;
                boolean findOfflineEvent = false;
                for (ClientStatusEvent clientStatusEvent : events) {
                    if (clientStatusEvent.isOnlineEvent()) {
                        findOnlineEvent = true;
                    } else {
                        findOfflineEvent = true;
                    }
                }
                if (findOnlineEvent && findOfflineEvent) {
                    mqttClientStatusStore.deleteEvent(clientId, channelId);
                }
                return Action.CommitMessage;
            } catch (Throwable e) {
                e.printStackTrace();
            }
            return Action.ReconsumeLater;
        }
    }

    /**
     * 根据状态表判断一个clientId是否有活跃的TCP连接。
     * 1.如果没有channel表,则一定不在线。
     * 2.如果channel表非空,检查一下channel数据中是否仅包含上线事件,如果有则代表有活跃连接在线。
     * 如果全部的channel都有掉线断开事件则一定是不在线。
     *
     * @param clientId
     * @param mqttClientStatusStore
     * @return
     */
    public static boolean checkClientOnline(String clientId,
        MqttClientStatusStore mqttClientStatusStore) {
        Map<String, Set<ClientStatusEvent>> channelMap = mqttClientStatusStore.getEventsByClientId(clientId);
        if (channelMap == null) {
            return false;
        }
        for (Set<ClientStatusEvent> events : channelMap.values()) {
            boolean findOnlineEvent = false;
            boolean findOfflineEvent = false;
            for (ClientStatusEvent event : events) {
                if (event.isOnlineEvent()) {
                    findOnlineEvent = true;
                } else {
                    findOfflineEvent = true;
                }
            }
            if (findOnlineEvent & !findOfflineEvent) {
                return true;
            }
        }
        return false;
    }

}

更多信息

如需了解控制台上的操作,请参见上下线通知规则管理