全部产品
弹性计算 会员服务 网络 安全 移动云 数加·大数据分析及展现 数加·大数据应用 管理与监控 云通信 阿里云办公 培训与认证 智能硬件
存储与CDN 数据库 域名与网站(万网) 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网 更多
消息队列 MQ

获取客户端在线状态

更新时间:2018-05-30 13:54:13

本文介绍如何获取 MQTT 客户端当前在线情况。

应用场景

  • 主业务流程中需要根据客户端是否在线来决定后续运行逻辑;
  • 运维过程需要判断特定客户端当前是否在线;
  • 服务端需要在客户端上线或者下线时触发一些预定义的动作。

基本原理

MQTT 服务端对上述功能需求提供了2种获取方式,分别是同步查询接口以及异步上下线通知。

同步查询接口的方式相对简单,即用户通过开放的接入点地址调用 HTTP 方式的 API 查询特定客户端的在线情况,该方式只能查询单个客户端当前实时状态,适用于对单个客户端的状态判断。

异步上下线通知则是利用消息通知的方式,在客户端上线和下线事件触发时, MQTT 服务器会向后端 MQ 推送一条上下线消息。用户的服务端应用一般部署在阿里云的服务器上,可以通过订阅这条 MQ 消息来获取所有客户端的上下线动作。该方式属于异步感知客户端的状态,且感知到的是上下线事件,而非在线状态,应用需要根据事件发生的时间序列分析出客户端的状态。

注意:同步查询接口是查询当前客户端的实时状态,理论上比异步通知的方式更精确,但只能查询单个客户端状态。上下线通知因为采用消息解耦,状态判断更加复杂,且误判可能性更大,但该方法可以基于事件分析多个客户端的运行状态轨迹。

同步查询接口(公测期间)

MQTT 服务端对外通过 HTTP协议方式暴露查询接口,查询接口的使用文档如下:

注意:同步查询接口功能目前处于公测期间,仅限公网测试 region 使用,后续会在其他 region 开放服务。

查询接口

  1. https://mqtt-xxx.mqtt.aliyuncs.com/route/clientId/get

其中:

  • 接口地址复用购买实例的接入点域名。
  • 协议可选 HTTP 或者 HTTPS,调用方式仅限 GET 方法。

参数列表

参数名称 参数类型 说明
accessKey String 当前请求使用的账号的 AK
resource String 需要查询的客户端 clientId,只能查询当前账号拥有的 clientId
timestamp Long 构建当前请求的UNIX 毫秒时间戳,时间戳过期时间为当前真实时间前后5分钟
signature String 对当前请求参数计算得到的签名字符串,服务端用于安全认证

其中:

  • timstamp 参数用于阻止非法过期请求,需要设置为当前真实时间,系统容忍前后5分钟误差区间。
  • signature 为请求参数的签名,将其他参数作为待签名字符串,使用账户 SK 计算得到,用于权限验证和防止请求篡改。具体计算方式参考文档

返回结果

MQTT 服务端接收到查询请求后,会进行接口参数验证,对于合法的查询请求返回当前 客户端的在线连接数。具体返回情况如下表所示:

HTTP 状态码 说明
400 非法请求,可能是接口 URL 不正确,或者参数缺失
403 权限验证失败,可能是签名计算错误,或者没有权限查询对应的 clientId 状态
200 请求处理成功

返回结果中,除 HTTP 状态码,HTTP content 中也会包含对应的返回结果,返回结果为 JSON 格式,具体的字段映射如下:

字段名称 类型 说明
code int 请求处理结果状态码,用于判断当前请求是否成功和失败原因
success boolean true/false,代表查询是否成功
online int 当前客户端维持的 tcp 连接数,可能存在-1,代表查询结果未知,建议保守判断为在线
message string 请求处理结果描述,用于判断异常原因

注意:查询结果中 online 字段为0代表不在线,大于0代表在线,具体数字为在线的 tcp 连接数,应用方应注意该字段大于1可能存在重复连接情况。因服务端数据同步延迟,会有很小概率出现返回-1的情况,代表查询结果未知,建议应用重试,或者基于实际场景做保守判断。

其中 code 错误码说明如下:

code 说明
0 请求处理成功
1 参数缺失,建议检查参数对是否完整
2 签名计算错误,建议检查签名计算过程
3 权限验证错误,建议检查当前账号是否创建过该设备所属的 GroupId
4 请求时间戳过期,建议检查 timestamp 字段是否为最近5分钟内

异步上下线通知

如上文所述,使用异步上下线通知的方式,上下线动作的这个事件会映射到 MQ 的消息中,该消息对应的 Topic 命名规范是:“GroupID”拼接后缀“_MQTT”

例如您需要关注 GID_XXX@@@YYYYY 类型的 MQTT 客户端,那么对应的事件 Topic 就是 GID_XXX_MQTT。具体接入步骤参考如下:

1. 创建事件 Topic

根据上文原理介绍,用户关注哪些 Group ID 分组的设备,就创建对应的事件 Topic,创建 Topic 方法请参考步骤二:创建资源

2. 服务端订阅消息

使用上一步骤中创建的 Topic,即可收到关注的 MQTT 客户端上下线事件。MQ 的接收程序请参考订阅消息

其中,数据格式如下:

事件类型放在 MQ 的 Tag 中,代表是上线还是下线。

MQTag:connect/disconnect/tcpclean

其中:

  • connect 事件代表客户端上线动作。
  • disconnect 事件代表客户端主动断开连接。按照 MQTT 协议,客户端主动断开 TCP 连接之前应该发送 disconnect 报文,服务端在收到 disconnect 报文后触发该类型消息。如果某些客户端 SDK 没有按照协议发送 disconnect 报文,相应地会无法收到该消息。
  • tcpclean 事件代表实际的 TCP 连接断开。无论客户端是否显示发送过 disconnect 报文,只要当前 TCP 连接断开就会触发 tcpclean 事件。

注意: tcpclean 消息代表客户端网络层连接的真实断开。对应的,disconnect 消息仅仅代表客户端是主动发送了下线报文。受限于客户端的实现,有时候客户端异常退出会导致 disconnect 消息并没有正常发送。因此判断客户端下线请使用 tcpclean 事件。

数据内容为 JSON 类型,相关的 Key 如下:

  • ClientId 代表具体设备;
  • time 代表本次事件的时间;
  • eventType 代表事件类型,供 MQTT 客户端区分事件类型;
  • channelId 代表每个 TCP 连接的唯一标识;

示例:

  1. clientIdGID_XXX@@@XXXXX
  2. time:1212121212
  3. eventType:connect/disconnect/tcpclean
  4. channelId:2b9b1281046046faafe5e0b458e4f553

注意:判断客户端当前是否在线不能仅仅根据收到的最后一条消息的状态,而需要结合上下线消息的前后关联来判断。

具体判断规则如下:

  • 同一个 clientId 的客户端,产生上下线事件的先后顺序以时间为准。越大则越新。
  • 同一个 clientId 的客户端,可能存在多次闪断,因此,当收到下线消息时,一定要根据 channelId 字段判断是否是当前的 TCP 连接。简而言之,下线消息只能覆盖相同 channelId 的状态,如果 channelId 不一样,尽管 eventIndex 较新,也不能覆盖。

获取状态的 Demo 如下:

  1. public static void main(String[] args) throws InterruptedException {
  2. /**
  3. * 设置阿里云的 AccessKey,用于鉴权
  4. */
  5. final String acessKey ="XXXXXX";
  6. /**
  7. * 设置阿里云的 SecretKey,用于鉴权
  8. */
  9. final String secretKey ="XXXXXXX";
  10. /**
  11. * 上述步骤中涉及的事件 Topic,需要先在 MQ 控制台里创建
  12. */
  13. final String topic ="GID_XXX_MQTT";
  14. /**
  15. * ConsumerID,需要先在 MQ 控制台里创建
  16. */
  17. final String consumerID ="CID_XXXX";
  18. Properties properties =new Properties();
  19. //PropertyKeyConst.ONSAddr 地址请根据实际情况对应以下几类进行输入:
  20. //公共云生产环境:http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal
  21. //公共云公测环境:http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet
  22. //杭州金融云环境:http://jbponsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal
  23. //杭州深圳云环境:http://mq4finance-sz.addr.aliyun.com:8080/rocketmq/nsaddr4client-internal
  24. //亚太东南1公共云环境(只适用于新加坡 ECS):http://ap-southeastaddr-internal.aliyun.com:8080/rocketmq/nsaddr4broker-internal
  25. properties.put(PropertyKeyConst.ONSAddr,
  26. "http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal");//此处以公共云生产环境为例
  27. properties.put(PropertyKeyConst.ConsumerId, consumerID);
  28. properties.put(PropertyKeyConst.AccessKey, acessKey);
  29. properties.put(PropertyKeyConst.SecretKey, secretKey);
  30. Consumer consumer =ONSFactory.createConsumer(properties);
  31. /**
  32. * 处理收到的事件,根据 Tag 区分是上线还是下线,body 是个 JSON 字符串
  33. */
  34. consumer.subscribe(topic, "*", new MessageListener() {
  35. public Action consume(Message message, ConsumeContext consumeContext) {
  36. String event = message.getTag();
  37. if(event.equals("connect")){
  38. // this is connect event
  39. }else if(event.equals("disconnect")){
  40. // this is client disconnect event
  41. }else if(event.equals("tcpclean")){
  42. // this is tcp disconnect
  43. }
  44. String body = new String(message.getBody());
  45. JSONObject object = JSON.parseObject(body);
  46. String clientId = object.getString("clientId");
  47. long time =object.getLong("time");
  48. return Action.CommitMessage;
  49. }
  50. });
  51. consumer.start();
  52. System.out.println("[Case Normal Consumer Init] Ok");
  53. Thread.sleep(Integer.MAX_VALUE);
  54. consumer.shutdown();
  55. System.exit(0);
  56. }
本文导读目录