全部产品
存储与CDN 数据库 安全 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网 钉钉智能硬件
消息队列 MQ

MQTT 获取离线消息

更新时间:2017-12-15 20:40:10

离线消息使用场景

  • 场景一:客户端本身对离线消息的优先级比较低,只要保证最终能处理就可以。
  • 场景二:客户端对于离线消息需要有限处理,且要求比较实时。

针对场景一,因为 MQTT 默认的工作模式即可支持,客户端上线后,离线消息不会立即推送,是按照固定时间间隔,固定数量的方式推送。这种工作模式的好处是可以降低客户端上线时的压力,优先处理在线消息,离线消息只要保证最终能处理即可。该模式不需要任何设定。

针对场景二,因为客户端需要自己控制离线消息优先处理,所以 MQTT 提供了主动拉的模式来供客户端获取离线消息。即在客户端上线后,客户端自己调用接口来拉取自己所需的指定数量的消息。此模式下,客户端自己控制拉取的时间间隔和条数。

主动拉取离线消息使用说明

具体步骤如下:

  1. 客户端启动后,以控制消息的形式发起拉取消息的指令,设置拉取条数和顺序。
  2. 客户端等待本地处理成功。
  3. 继续拉取下一批消息。

注意事项

  • 客户端如果需要使用主动模式,请务必在连接建立后的第一个心跳周期内发起请求,否则系统会按照自动模式,即按照固定周期推送离线数据。
  • 客户端每次最多拉取消息的数量为30条,客户端发起拉取请求的最大频率限制为5次/秒。
  • 客户端需要自己控制拉取时机,因为消息从发起指令到推送到客户端,到客户端消费完成回应 ACK 都是异步过程。如果客户端拉取过快,很有可能拉到前一批还没有删除的消息,造成重复;或者拉取到重复的消息,因为前一次的消息还没有回复 ACK。

拉取离线消息相关 API

拉取离线消息:

发送 Topic:$SYS/getOfflineMsg

内容:JSONString

内容信息:

名称 类型 说明
pushOrder String "DESC"或者"ASC",分别代表从最新消息拉取还是从最早消息拉取
maxPushNum Integer 一次最多拉取消息的条数,设置范围为1-30,超过会以上限计算

返回值

普通的 PubAck 报文。

示例程序

MQTT 客户端使用 Demo


public void testOfflineMsg() throws Exception {
        String broker = "tcp://XXXX:1883";
        String clientId = "GID_XXX@@@XXXX";
        final String topic = "XXXX/11";
        final String topicFilter[] = {topic};
        final int qos[] = {1};
        MemoryPersistence persistence = new MemoryPersistence();
        try {
            final MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
            final MqttConnectOptions connOpts = new MqttConnectOptions();
            System.out.println("Connecting to broker: " + broker);
            connOpts.setServerURIs(new String[]{broker});
            connOpts.setCleanSession(false);
            connOpts.setAutomaticReconnect(true);
            /**
             * 客户端长链接需要设置心跳实际,建议100s 以下,超时,服务端会断开连接
             */
            connOpts.setKeepAliveInterval(90);
            sampleClient.setCallback(new MqttCallbackExtended() {
                public void connectComplete(boolean reconnect, String serverURI) {
                    System.out.println("connect success");
                      sampleClient.subscribe(topicFilter, qos);
                }
                public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
                    System.out.println("recv Msg from " + topic);
                }
                public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                    System.out.println("deliveryComplete:" + iMqttDeliveryToken.getMessageId());
                }
            });
            sampleClient.connect(connOpts);
            JSONObject object = new JSONObject();
            object.put("maxPushNum", 20);
            object.put("pushOrder", "DESC");
            sampleClient.publish("$SYS/getOfflineMsg", new MqttMessage(object.toJSONString().getBytes()));
            Thread.sleep(1000000);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
本文导读目录