全部产品
存储与CDN 数据库 安全 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 阿里云办公 培训与认证 物联网
消息队列 MQ

获取客户端上下线事件

更新时间:2017-09-07 15:46:27

本文介绍如何在服务端获取 MQTT 客户端的上下线动作等状态信息。

应用场景

  • 服务端需要关注特定的 ClientID 什么时候上线,什么时候下线;
  • 服务端需要根据特定 ClientID 的在线状态做逻辑切换;
  • 服务端需要在客户端上线或者下线时触发一些预定义的动作。

基本原理

MQTT 服务端在客户端上线和下线事件触发时,会向后端 MQ 推送一条上下线消息。用户的服务端应用一般部署在阿里云的服务器上,可以通过订阅这条 MQ 消息来获取所有客户端的上下线动作。

其中,上下线动作的这个事件对应的 Topic 命名规范是:GroupID+后缀_MQTT

例如您需要关注 GroupID_AAA@@@XXXX 类型的 MQTT 客户端,那么对应的事件 Topic 就是 GroupID_AAA_MQTT。

实现步骤

1. 申请事件 Topic

根据上文原理介绍,用户关注哪些 GroupID 分组的设备,就申请对应的事件 Topic,申请 Topic 方法请参考步骤二:申请资源

2. 服务端订阅消息

使用上一步骤中申请的 Topic,即可收到关注的 MQTT 客户端上下线事件。MQ 的接收程序请参考订阅消息

其中,数据格式如下:

事件类型放在 MQ 的 Tag 中,代表是上线还是下线。

MQTag:connect/disconnect/tcpclean

其中:

  • connect 事件代表客户端上线动作。
  • disconnect 事件代表客户端主动断开连接。按照 MQTT 协议,客户端主动断开 TCP 连接之前应该发送 disconnect 报文,服务端在收到 disconnect 报文后触发该类型消息。如果某些客户端 SDK 没有按照协议发送 disconnect 报文,相应地会无法收到该消息。
  • tcpclean 事件代表实际的 TCP 连接断开。无论客户端是否显示发送过 disconnect 报文,只要当前 TCP 连接断开就会触发 tcpclean 事件。

注意: tcpclean 消息代表客户端网络层连接的真实断开。对应的,disconnect 消息仅仅代表客户端是主动发送了下线报文。受限于客户端的实现,有时候客户端异常退出会导致 disconnect 消息并没有正常发送。因此判断客户端下线请使用 tcpclean 事件。

数据内容为 JSON 类型,相关的 Key 如下:

  • ClientId 代表具体设备;
  • time 代表本次事件的时间;
  • eventType 代表事件类型,供 MQTT 客户端区分事件类型;
  • channelId 代表每个 TCP 连接的唯一标识;
  • eventIndex 代表事件产生的递增顺序。

示例:

  1. clientIdGID_XXX@@@XXXXX
  2. time:1212121212
  3. eventType:connect/disconnect/tcpclean
  4. channelId:2b9b1281046046faafe5e0b458e4f553
  5. eventIndex:1493772130197

注意:判断客户端当前是否在线不能仅仅根据收到的最后一条消息的状态,而需要结合上下线消息的前后关联来判断。

具体判断规则如下:

  • 同一个 clientId 的客户端,产生上下线事件的先后顺序以 eventIndex 为准。越大则越新,不能以时间来做判断(粒度太大)。
  • 同一个 clientId 的客户端,可能存在多次闪断,因此,当收到下线消息时,一定要根据 channelId 字段判断是否是当前的 TCP 连接。简而言之,下线消息只能覆盖相同 channelId 的状态,如果channelId 不一样,尽管 eventIndex 较新,也不能覆盖。

获取状态的 Demo 如下:

  1. public static void main(String[] args) throws InterruptedException {
  2. /**
  3. * 设置阿里云的AccessKey,用于鉴权
  4. */
  5. final String acessKey ="XXXXXX";
  6. /**
  7. * 设置阿里云的SecretKey,用于鉴权
  8. */
  9. final String secretKey ="XXXXXXX";
  10. /**
  11. * 上述步骤中涉及的事件Topic,需要先在MQ控制台里申请
  12. */
  13. final String topic ="GID_XXX_MQTT";
  14. /**
  15. * ConsumerID,需要先在MQ控制台里申请
  16. */
  17. final String consumerID ="CID_XXXX";
  18. Properties properties =new Properties();
  19. //PropertyKeyConst.ONSAddr地址请根据实际情况对应以下几类进行输入:
  20. //公共云生产环境:http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal
  21. //公共云公测环境:http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet
  22. //杭州金融云环境:http://jbponsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal
  23. //杭州深圳云环境:http://mq4finance-sz.addr.aliyun.com:8080/rocketmq/nsaddr4client-internal
  24. //亚太东南1公共云环境(只适用于新加坡ECS):http://ap-southeastaddr-internal.aliyun.com:8080/rocketmq/nsaddr4broker-internal
  25. properties.put(PropertyKeyConst.ONSAddr,
  26. "http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal");//此处以公共云生产环境为例
  27. properties.put(PropertyKeyConst.ConsumerId, consumerID);
  28. properties.put(PropertyKeyConst.AccessKey, acessKey);
  29. properties.put(PropertyKeyConst.SecretKey, secretKey);
  30. Consumer consumer =ONSFactory.createConsumer(properties);
  31. /**
  32. * 处理收到的事件,根据Tag区分是上线还是下线,body是个JSON字符串
  33. */
  34. consumer.subscribe(topic, "*", new MessageListener() {
  35. public Action consume(Message message, ConsumeContext consumeContext) {
  36. String event = message.message.getTag();
  37. String event = message.getTag();
  38. if(event.equals("connect")){
  39. // this is connect event
  40. }else if(event.equals("disconnect")){
  41. // this is client disconnect event
  42. }eles if(event.equals("tcpclean")){
  43. // this is tcp disconnect
  44. }
  45. String body = new String(message.getBody());
  46. JSONObject object = JSON.parseObject(body);
  47. String clientId = object.getString("clientId");
  48. long time =object.getLong("time");
  49. return Action.CommitMessage;
  50. }
  51. });
  52. consumer.start();
  53. System.out.println("[Case Normal Consumer Init] Ok");
  54. Thread.sleep(Integer.MAX_VALUE);
  55. consumer.shutdown();
  56. System.exit(0);
  57. }
本文导读目录