全部产品
存储与CDN 数据库 安全 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网 钉钉智能硬件
消息队列 MQ

获取客户端上下线事件

更新时间:2017-12-15 20:40:10

本文介绍如何在服务端获取 MQTT 客户端的上下线动作等状态信息。

应用场景

  • 服务端需要关注特定的 ClientID 什么时候上线,什么时候下线;
  • 服务端需要根据特定 ClientID 的在线状态做逻辑切换;
  • 服务端需要在客户端上线或者下线时触发一些预定义的动作。

基本原理

MQTT 服务端在客户端上线和下线事件触发时,会向后端 MQ 推送一条上下线消息。用户的服务端应用一般部署在阿里云的服务器上,可以通过订阅这条 MQ 消息来获取所有客户端的上下线动作。

其中,上下线动作的这个事件对应的 Topic 命名规范是:GroupID+后缀_MQTT

例如您需要关注 GroupID_AAA@@@XXXX 类型的 MQTT 客户端,那么对应的事件 Topic 就是 GroupID_AAA_MQTT。

实现步骤

1. 申请事件 Topic

根据上文原理介绍,用户关注哪些 GroupID 分组的设备,就申请对应的事件 Topic,申请 Topic 方法请参考步骤二:申请资源

2. 服务端订阅消息

使用上一步骤中申请的 Topic,即可收到关注的 MQTT 客户端上下线事件。MQ 的接收程序请参考订阅消息

其中,数据格式如下:

事件类型放在 MQ 的 Tag 中,代表是上线还是下线。

MQTag:connect/disconnect/tcpclean

其中:

  • connect 事件代表客户端上线动作。
  • disconnect 事件代表客户端主动断开连接。按照 MQTT 协议,客户端主动断开 TCP 连接之前应该发送 disconnect 报文,服务端在收到 disconnect 报文后触发该类型消息。如果某些客户端 SDK 没有按照协议发送 disconnect 报文,相应地会无法收到该消息。
  • tcpclean 事件代表实际的 TCP 连接断开。无论客户端是否显示发送过 disconnect 报文,只要当前 TCP 连接断开就会触发 tcpclean 事件。

注意: tcpclean 消息代表客户端网络层连接的真实断开。对应的,disconnect 消息仅仅代表客户端是主动发送了下线报文。受限于客户端的实现,有时候客户端异常退出会导致 disconnect 消息并没有正常发送。因此判断客户端下线请使用 tcpclean 事件。

数据内容为 JSON 类型,相关的 Key 如下:

  • ClientId 代表具体设备;
  • time 代表本次事件的时间;
  • eventType 代表事件类型,供 MQTT 客户端区分事件类型;
  • channelId 代表每个 TCP 连接的唯一标识;
  • eventIndex 代表事件产生的递增顺序。

示例:

clientId:GID_XXX@@@XXXXX
time:1212121212
eventType:connect/disconnect/tcpclean
channelId:2b9b1281046046faafe5e0b458e4f553
eventIndex:1493772130197

注意:判断客户端当前是否在线不能仅仅根据收到的最后一条消息的状态,而需要结合上下线消息的前后关联来判断。

具体判断规则如下:

  • 同一个 clientId 的客户端,产生上下线事件的先后顺序以 eventIndex 为准。越大则越新,不能以时间来做判断(粒度太大)。
  • 同一个 clientId 的客户端,可能存在多次闪断,因此,当收到下线消息时,一定要根据 channelId 字段判断是否是当前的 TCP 连接。简而言之,下线消息只能覆盖相同 channelId 的状态,如果 channelId 不一样,尽管 eventIndex 较新,也不能覆盖。

获取状态的 Demo 如下:

public static void main(String[] args) throws InterruptedException {
        /**
         * 设置阿里云的 AccessKey,用于鉴权
         */
        final String acessKey ="XXXXXX";
        /**
         * 设置阿里云的 SecretKey,用于鉴权
         */
        final String secretKey ="XXXXXXX";
        /**
         *  上述步骤中涉及的事件 Topic,需要先在 MQ 控制台里申请
         */
        final String topic ="GID_XXX_MQTT";
        /**
         * ConsumerID,需要先在 MQ 控制台里申请
         */
        final String consumerID ="CID_XXXX";
        Properties properties =new Properties();
          //PropertyKeyConst.ONSAddr 地址请根据实际情况对应以下几类进行输入:
        //公共云生产环境:http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal
        //公共云公测环境:http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet
        //杭州金融云环境:http://jbponsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal
        //杭州深圳云环境:http://mq4finance-sz.addr.aliyun.com:8080/rocketmq/nsaddr4client-internal
        //亚太东南1公共云环境(只适用于新加坡 ECS):http://ap-southeastaddr-internal.aliyun.com:8080/rocketmq/nsaddr4broker-internal
        properties.put(PropertyKeyConst.ONSAddr,
          "http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal");//此处以公共云生产环境为例
        properties.put(PropertyKeyConst.ConsumerId, consumerID);
        properties.put(PropertyKeyConst.AccessKey, acessKey);
        properties.put(PropertyKeyConst.SecretKey, secretKey);
        Consumer consumer =ONSFactory.createConsumer(properties);
        /**
         * 处理收到的事件,根据 Tag 区分是上线还是下线,body 是个 JSON 字符串
         */
        consumer.subscribe(topic, "*", new MessageListener() {
            public Action consume(Message message, ConsumeContext consumeContext) {
                String event = message.message.getTag();
                  String event = message.getTag();
                if(event.equals("connect")){
                    // this is connect event
                }else if(event.equals("disconnect")){
                    // this is client disconnect event
                }eles if(event.equals("tcpclean")){
                    // this is tcp disconnect
                }
                String body = new String(message.getBody());
                JSONObject object = JSON.parseObject(body);
                String clientId = object.getString("clientId");
                long time  =object.getLong("time");
                return Action.CommitMessage;
            }
        });
        consumer.start();
        System.out.println("[Case Normal Consumer Init]   Ok");
        Thread.sleep(Integer.MAX_VALUE);
        consumer.shutdown();
        System.exit(0);
    }
本文导读目录