全部产品
存储与CDN 数据库 安全 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 阿里云办公 培训与认证 物联网
消息队列 MQ

MQTT 获取离线消息

更新时间:2017-09-21 17:37:20

离线消息使用场景

  • 场景一:客户端本身对离线消息的优先级比较低,只要保证最终能处理就可以。
  • 场景二:客户端对于离线消息需要有限处理,且要求比较实时。

针对场景一,因为 MQTT 默认的工作模式即可支持,客户端上线后,离线消息不会立即推送,是按照固定时间间隔,固定数量的方式推送。这种工作模式的好处是可以降低客户端上线时的压力,优先处理在线消息,离线消息只要保证最终能处理即可。该模式不需要任何设定。

针对场景二,因为客户端需要自己控制离线消息优先处理,所以 MQTT 提供了主动拉的模式来供客户端获取离线消息。即在客户端上线后,客户端自己调用接口来拉取自己所需的指定数量的消息。此模式下,客户端自己控制拉取的时间间隔和条数。

主动拉取离线消息使用说明

具体步骤如下:

  1. 客户端启动后,以控制消息的形式发起拉取消息的指令,设置拉取条数和顺序。
  2. 客户端等待本地处理成功。
  3. 继续拉取下一批消息。

注意事项

  • 客户端如果需要使用主动模式,请务必在连接建立后的第一个心跳周期内发起请求,否则系统会按照自动模式,即按照固定周期推送离线数据。
  • 客户端每次最多拉取消息的数量为30条,客户端发起拉取请求的最大频率限制为5次/秒。
  • 客户端需要自己控制拉取时机,因为消息从发起指令到推送到客户端,到客户端消费完成回应ACK都是异步过程。如果客户端拉取过快,很有可能拉到前一批还没有删除的消息,造成重复;或者拉取到重复的消息,因为前一次的消息还没有回复ACK。

拉取离线消息相关API

拉取离线消息:

发送 Topic:$SYS/getOfflineMsg

内容:JSONString

内容信息:

名称 类型 说明
maxPushNum String “DESC”或者”ASC”,分别代表从最新消息拉取还是从最早消息拉取
pushOrder Integer 一次最多拉取消息的条数,设置范围为1-30,超过会以上限计算

返回值

普通的 PubAck 报文。

示例程序

MQTT客户端使用 Demo

  1. public void testOfflineMsg() throws Exception {
  2. String broker = "tcp://XXXX:1883";
  3. String clientId = "GID_XXX@@@XXXX";
  4. final String topic = "XXXX/11";
  5. final String topicFilter[] = {topic};
  6. final int qos[] = {1};
  7. MemoryPersistence persistence = new MemoryPersistence();
  8. try {
  9. final MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
  10. final MqttConnectOptions connOpts = new MqttConnectOptions();
  11. System.out.println("Connecting to broker: " + broker);
  12. connOpts.setServerURIs(new String[]{broker});
  13. connOpts.setCleanSession(false);
  14. connOpts.setAutomaticReconnect(true);
  15. /**
  16. * 客户端长链接需要设置心跳实际,建议100s以下,超时,服务端会断开连接
  17. */
  18. connOpts.setKeepAliveInterval(90);
  19. sampleClient.setCallback(new MqttCallbackExtended() {
  20. public void connectComplete(boolean reconnect, String serverURI) {
  21. System.out.println("connect success");
  22. sampleClient.subscribe(topicFilter, qos);
  23. }
  24. public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
  25. System.out.println("recv Msg from " + topic);
  26. }
  27. public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
  28. System.out.println("deliveryComplete:" + iMqttDeliveryToken.getMessageId());
  29. }
  30. });
  31. sampleClient.connect(connOpts);
  32. JSONObject object = new JSONObject();
  33. object.put("maxPushNum", 20);
  34. object.put("pushOrder", "DESC");
  35. sampleClient.publish("$SYS/getOfflineMsg", new MqttMessage(object.toJSONString().getBytes()));
  36. Thread.sleep(1000000);
  37. } catch (Exception e) {
  38. e.printStackTrace();
  39. }
  40. }
本文导读目录