MQTT上下线通知规则的实现

更新时间:

云消息队列 MQTT 版支持通过创建上下线通知规则,将在线状态通知消息推送至云消息队列 RocketMQ 版,以获取客户端的在线状态。本文为您介绍如何通过创建上下线通知规则来获取客户端的在线状态。

背景信息

在实际业务中服务端需要对客户端的上下线数据进行统计分析,并根据客户端的在线状态推送消息。云消息队列 MQTT 版提供异步上下线事件通知获取客户端在线状态,MQTT客户端的上下线事件将会触发MQTT服务端生成一条通知消息,获取通知消息有以下方式:

  • 通过云端SDK接入云消息队列 MQTT 版服务端获取客户端的在线状态。更多信息,请参见获取MQTT客户端在线状态

  • 通过创建上下线通知规则,将在线状态通知消息推送至云消息队列 RocketMQ 版,然后订阅云消息队列 RocketMQ 版获取客户端的在线状态。

本文以创建上下线通知规则为例,介绍后端应用如何获取客户端在线状态。

quick_start_client_stats_notify

网络访问

云消息队列 MQTT 版同时提供了公网接入点VPC 接入点
  • 公网接入点为本地公网环境访问的IP地址,一般用于物联网和移动互联网场景中;
  • VPC 接入点为云上私网访问的IP地址,一般用于云端应用接入云消息队列 MQTT 版
重要 客户端使用接入点连接服务时务必使用域名接入,不得直接使用域名背后的IP地址直接连接,因为IP地址随时会变化。在以下使用情况中出现的问题云消息队列 MQTT 版产品方概不负责:
  • 客户端不使用域名接入而是使用IP地址接入,产品方更新了域名解析导致原有IP地址失效。
  • 客户端网络对IP地址设置网络防火墙策略,产品方更新了域名解析后新IP地址被您的防火墙策略拦截。

前提条件

  • 安装IDE。您可以使用IntelliJ IDEA或者Eclipse,本文以IntelliJ IDEA为例。
  • 下载安装JDK
  • 已创建云消息队列 MQTT 版实例、Topic和Group ID,具体操作,请参见创建资源

  • 已创建云消息队列 RocketMQ 版实例、Topic和Group ID,具体操作,请参见步骤二:创建资源

重要
  • 云消息队列 MQTT 版客户端上下线通知规则仅支持云消息队列 RocketMQ 版4.x系列实例。

  • 云消息队列 MQTT 版客户端上下线通知规则不能跨地域使用,因此,云消息队列 MQTT 版云消息队列 RocketMQ 版的资源都必须创建在同一地域。

1.创建上下线通知规则

  1. 登录云消息队列 MQTT 版控制台,并在左侧导航栏单击实例列表

  2. 在顶部菜单栏选择目标地域,然后在实例列表中单击实例名称进入实例详情页面。

  3. 在左侧导航栏单击规则管理,然后在页面左上角,单击创建规则

  4. 创建规则页面完成以下操作。

    1. 配置基本信息。输入规则ID,选择上下线通知的规则类型。

      image

    2. 配置规则源。选择已经创建好的云消息队列 MQTT 版的Group ID。

      image

    3. 配置规则目标。选择已经创建好的云消息队列 RocketMQ 版的实例和Topic。

      image

2.准备测试代码

客户端的上下线操作,上下线通知的消息处理都需要代码完成,本文以Java的Demo示例作为您代码开发的参考。

2.1下载示例代码

  1. 下载mqtt-java-demo,并解压该Demo工程包至您指定的文件夹。

  2. 在解压的Demo工程中找到lmq-java-demo文件夹,将此文件夹导入IntelliJ IDEA,并确认pom.xml中已包含以下依赖。

    <dependencies>
        <dependency>
            <groupId>org.bouncycastle</groupId>
            <artifactId>bcprov-jdk15on</artifactId>
            <version>1.70</version>
        </dependency>
        <dependency>
            <groupId>commons-codec</groupId>
            <artifactId>commons-codec</artifactId>
            <version>1.10</version>
        </dependency>
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.5.2</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.83</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun.openservices</groupId>
            <artifactId>ons-client</artifactId>
            <version>1.8.5.Final</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>aliyun-java-sdk-onsmqtt</artifactId>
            <version>1.0.3</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>aliyun-java-sdk-core</artifactId>
            <version>4.5.0</version>
        </dependency>
    </dependencies>
  3. 配置访问凭证。

    • 获取AccessKey信息。获取方式,请参见创建AccessKey

    • 配置环境变量。云消息队列 MQTT 版的AccessKey ID和AccessKey Secret的环境变量名称分别为MQTT_AK_ENVMQTT_SK_ENV。关于配置环境变量的方法,请参见配置访问凭证

2.2客户端上下线代码

MQ4IoTSendMessageToMQ4IoTUseSignatureMode.java类中,按代码注释说明填写云消息队列 MQTT 版资源的参数。

测试中仅模拟上下线的操作不需要发送消息,可以将其中发送消息的相关代码去掉,示例代码如下。

客户端上下线代码示例

import com.aliyun.openservices.lmq.example.util.ConnectionOptionWrapper;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class MQ4IoTSendMessageToMQ4IoTUseSignatureMode {
    public static void main(String[] args) throws Exception {
        /**
         * MQ4IOT 实例 ID,购买后控制台获取
         */
        String instanceId = "XXXXX";
        /**
         * 接入点地址,购买 MQ4IOT 实例,且配置完成后即可获取,接入点地址必须填写分配的域名,不得使用 IP 地址直接连接,否则可能会导致客户端异常。
         */
        String endPoint = "XXXXX.mqtt.aliyuncs.com";
        /**
         * 账号 AccessKey,从账号系统控制台获取
         * 阿里云账号AccessKey拥有所有API的访问权限,建议您使用RAM用户进行API访问或日常运维。
         * 强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。
         * 本示例以把AccessKey ID和AccessKey Secret保存在环境变量为例说明。运行本代码示例之前,请先配置环境变量MQTT_AK_ENV和MQTT_SK_ENV
         * 例如:export MQTT_AK_ENV=<access_key_id>
         *      export MQTT_SK_ENV=<access_key_secret>
         * 需要将<access_key_id>替换为已准备好的AccessKey ID,<access_key_secret>替换为AccessKey Secret。
         */
        String accessKey = System.getenv("MQTT_AK_ENV");
        /**
         * 账号 secretKey,从账号系统控制台获取,仅在Signature鉴权模式下需要设置
         */
        String secretKey = System.getenv("MQTT_SK_ENV");
        /**
         * MQ4IOT clientId,由业务系统分配,需要保证每个 tcp 连接都不一样,保证全局唯一,如果不同的客户端对象(tcp 连接)使用了相同的 clientId 会导致连接异常断开。
         * clientId 由两部分组成,格式为 GroupID@@@DeviceId,其中 groupId 在 MQ4IOT 控制台申请,DeviceId 由业务方自己设置,clientId 总长度不得超过64个字符。
         */
        String clientId = "GID_XXXXX@@@XXXXX";

        ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
        final MemoryPersistence memoryPersistence = new MemoryPersistence();
        /**
         * 客户端使用的协议和端口必须匹配,具体参考文档 https://help.aliyun.com/document_detail/44866.html?spm=a2c4g.11186623.6.552.25302386RcuYFB
         * 如果是 SSL 加密则设置ssl://endpoint:8883
         */
        final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
        /**
         * 客户端设置好发送超时时间,防止无限阻塞
         */
        mqttClient.setTimeToWait(5000);
        final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>());
        mqttClient.setCallback(new MqttCallbackExtended() {
            @Override
            public void connectComplete(boolean reconnect, String serverURI) {
                System.out.println("connect success");
            }

            @Override
            public void connectionLost(Throwable throwable) {
                throwable.printStackTrace();
            }

            @Override
            public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                System.out.println("receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload()));
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);
            }
        });
        mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
        Thread.sleep(Long.MAX_VALUE);
    }
}

2.3上下线通知消息的处理代码

上下线通知消息被推送到云消息队列 RocketMQ 版,消费者订阅消息后需要根据业务需求进行处理。

MQTTClientStatusNoticeProcessDemo.java类中按代码注释说明填写云消息队列 RocketMQ 版资源的参数,示例代码如下。

上下线通知消息的处理代码示例

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

public class MQTTClientStatusNoticeProcessDemo {
    public static void main(String[] args) {
        /**
         * 初始化消息队列RocketMQ版接收客户端,实际业务中一般部署在服务端应用中。
         */
        Properties properties = new Properties();
        /**
         * 设置消息队列RocketMQ版Group ID,在消息队列RocketMQ版控制台创建。
         */
        properties.setProperty(PropertyKeyConst.GROUP_ID, "GID_XXXX");
        /**
         * AccessKey ID,阿里云身份验证,在阿里云RAM控制台创建。
         * 阿里云账号AccessKey拥有所有API的访问权限,建议您使用RAM用户进行API访问或日常运维。
         * 强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。
         * 本示例以将AccessKey 和 AccessKeySecret 保存在环境变量为例说明。
         */
        properties.put(PropertyKeyConst.AccessKey, System.getenv("MQTT_AK_ENV"));
        /**
         * AccessKey Secret,阿里云身份验证,在阿里云RAM控制台创建。仅在签名鉴权模式下需要设置。
         */
        properties.put(PropertyKeyConst.SecretKey, System.getenv("MQTT_SK_ENV"));
        /**
         * 设置TCP接入点,该接入点为消息队列RocketMQ版实例的接入点。进入消息队列RocketMQ版控制台实例详情页面获取。
         */
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXXX");
        /**
         * 使用消息队列RocketMQ版消费端来处理MQTT客户端的上下线通知时,订阅的Topic为上下线通知Topic。
         */
        final String parentTopic = "GID_XXXX_MQTT";
        /**
         * 客户端状态数据,实际生产环境中建议使用数据库或者Redis等外部持久化存储来保存该信息,避免应用重启丢失状态,本示例以单机内存版实现为例。
         */
        MqttClientStatusStore mqttClientStatusStore = new MemoryHashMapStoreImpl();
        Consumer consumer = ONSFactory.createConsumer(properties);
        /**
         *  此处仅处理客户端是否在线,因此只需要关注connect事件和tcpclean事件即可。
         */
        consumer.subscribe(parentTopic, "connect||tcpclean", new MqttClientStatusNoticeListener(mqttClientStatusStore));
        consumer.start();
        String clientId = "GID_XXXXX@@@XXXXX";
        while (true) {
            System.out.println("ClientStatus :" + checkClientOnline(clientId, mqttClientStatusStore));
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 处理上下线通知的逻辑。
     * 实际部署过程中,消费上下线通知的应用可能部署多台机器,因此客户端在线状态的数据可以使用数据库或者Redis等外部共享存储来维护。
     * 其次需要单独做消息幂等处理,以免重复接收消息导致状态机判断错误。
     */
    static class MqttClientStatusNoticeListener implements MessageListener {
        private MqttClientStatusStore mqttClientStatusStore;

        public MqttClientStatusNoticeListener(
            MqttClientStatusStore mqttClientStatusStore) {
            this.mqttClientStatusStore = mqttClientStatusStore;
        }

        @Override
        public Action consume(Message message, ConsumeContext context) {
            try {
                JSONObject msgBody = JSON.parseObject(new String(message.getBody()));
                System.out.println(msgBody);
                String eventType = msgBody.getString("eventType");
                String clientId = msgBody.getString("clientId");
                String channelId = msgBody.getString("channelId");
                ClientStatusEvent event = new ClientStatusEvent();
                event.setChannelId(channelId);
                event.setClientIp(msgBody.getString("clientIp"));
                event.setEventType(eventType);
                event.setTime(msgBody.getLong("time"));
                /**
                 * 首先存储新的事件。
                 */
                mqttClientStatusStore.addEvent(clientId, channelId, eventType, event);
                /**
                 * 读取当前channel的事件列表。
                 */
                Set<ClientStatusEvent> events = mqttClientStatusStore.getEvent(clientId, channelId);
                if (events == null || events.isEmpty()) {
                    return Action.CommitMessage;
                }
                /**
                 * 如果事件列表里上线和下线事件都已经收到,则当前channel已经掉线,可以清理掉这个channel的数据。
                 */
                boolean findOnlineEvent = false;
                boolean findOfflineEvent = false;
                for (ClientStatusEvent clientStatusEvent : events) {
                    if (clientStatusEvent.isOnlineEvent()) {
                        findOnlineEvent = true;
                    } else {
                        findOfflineEvent = true;
                    }
                }
                if (findOnlineEvent && findOfflineEvent) {
                    mqttClientStatusStore.deleteEvent(clientId, channelId);
                }
                return Action.CommitMessage;
            } catch (Throwable e) {
                e.printStackTrace();
            }
            return Action.ReconsumeLater;
        }
    }

    /**
     * 根据状态表判断一个clientId是否有活跃的TCP连接。
     * 1.如果没有channel表,则客户端一定不在线。
     * 2.如果channel表非空,检查一下channel数据中是否仅包含上线事件,如果有则代表有活跃连接,客户端在线。
     * 如果全部的channel都有掉线断开事件则客户端一定不在线。
     *
     * @param clientId
     * @param mqttClientStatusStore
     * @return
     */
    public static boolean checkClientOnline(String clientId,
        MqttClientStatusStore mqttClientStatusStore) {
        Map<String, Set<ClientStatusEvent>> channelMap = mqttClientStatusStore.getEventsByClientId(clientId);
        if (channelMap == null) {
            return false;
        }
        for (Set<ClientStatusEvent> events : channelMap.values()) {
            boolean findOnlineEvent = false;
            boolean findOfflineEvent = false;
            for (ClientStatusEvent event : events) {
                if (event.isOnlineEvent()) {
                    findOnlineEvent = true;
                } else {
                    findOfflineEvent = true;
                }
            }
            if (findOnlineEvent & !findOfflineEvent) {
                return true;
            }
        }
        return false;
    }
}

3.结果验证

  1. 执行MQ4IoTSendMessageToMQ4IoTUseSignatureMode.java类中的Main函数运行代码模拟客户端上线操作。可以根据下面操作查询客户端状态和消息推送情况。

    说明

    可以通过终止Main函数的执行来模拟客户端的离线操作。

    • 查询客户端状态。在云消息队列 MQTT 版控制台设备状态查询页面,根据Device ID查询客户端此时已是在线状态,如下图所示。

      image

    • 查询事件消息推送情况。在云消息队列 RocketMQ 版控制台消息查询页面,根据Topic查询到上线事件消息已经被推送过来,如下图所示。

      image

  2. 执行MQTTClientStatusNoticeProcessDemo.java类中的Main函数运行代码。此时已经收到上线的事件消息,ClientStatus状态也从false变成true,如下图所示。

    image

更多信息