本次最佳实践通过结合JAVA代码的形式对MQTT的使用和场景化做出示例。

背景信息

最佳实践概述

本次最佳实践通过结合JAVA代码的形式使用MQTT,详细描述从构建maven工程到向MQTT发送消息的过程,因本次最佳实践使用阿里云中间件产品MQTT,所以只需在阿里云提供的MQTT控制台创建MQTT服务器即可,无需下载MQTT安装包安装MQTT服务器,实践中描述了使用MQTT需要在构建maven应用中导入的依赖包及获取与MQTT连接需要配置的参数值,获取与MQTT连接的两种模式。本文还详细描述了MQTT消息发送者同步发送消息的示例代码、消息存储到文件、指导消费者多线程消费消息及打印消费消息结果日志。

应用场景中描述了在阿里云环境下创建MQTT服务器及控制台为MQTT提供的功能使用详述,最重要的优势是可以与RocketMQ结合使用。并且简单描述了加油站场景下RocketMQ与MQTT配合使用的设计原理及原理对应下的JAVA代码实现。

最佳实践价值

MQTT原本是为解决高并发的场景设计的产品,但MQTT协议却是为大量计算能力有限,且工作在低带宽网络中的远程传感器和控制设备提供通讯服务,因这种原因使其在中小企业中运用的变得极其有价值,同时阿里云将MQTT配合RocketMQ一起使用,使MQTT使用场景变得更广泛。本次最佳实践结合将JAVA代码来使用MQTT,以供您参考。

MQTT协议原理

实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。MQTT传输的消息分为主题(Topic)和负载(payload)两部分:
  1. Topic:可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload)。
  2. Payload:可以理解为消息的内容,是指订阅者具体要使用的内容。
图 1. MQTT原理图
MQTT原理图
微消息队列MQTT收发消息流程如下图所示: 收发消息图

生产者发送消息代码详述

您可以参照以下步骤对代码进行改造:

  1. 同步发送原理 。
    同步发送是指消息发送方发出一条消息后,会在收到服务端返回响应之后才发下一条消息的通讯方式。
  2. 创建一个maven工程,然后在工程pom.xml文件导入下面依赖包。
    <dependency>
             <groupId>commons-codec</groupId>
             <artifactId>commons-codec</artifactId>
             <version>1.10</version>
    </dependency>
    <dependency>
             <groupId>org.eclipse.paho</groupId>
             <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
             <version>1.2.2</version>
    </dependency>
    <dependency>
             <groupId>org.apache.httpcomponents</groupId>
             <artifactId>httpclient</artifactId>
             <version>4.5.2</version>
    </dependency>
    <dependency>
             <groupId>com.alibaba</groupId>
             <artifactId>fastjson</artifactId>
             <version>1.2.48</version>
    </dependency>
  3. 连接MQTT的参数值。
    // MQ4IOT 实例 ID,购买后控制台获取
    String
    instanceId = "post-cn-0pp1******";
    //接入点地址,购买 MQ4IOT 实例,且配置完成后即可获取,接入点地址必须填写分配的域名,不得使用 IP 地址直接连接,否则可能会导致客户端异常。
    String endPoint = "post-cn-********-internal.mqtt.aliyuncs.com";
    //账号 accesskey,从账号系统控制台获取
    String accessKey = "LTAI4Fsc4cSFqHmr********";
    //账号 secretKey,从账号系统控制台获取,仅在Signature鉴权模式下需要设置
    String secretKey = "zQPWn2U4MHuid81l1l0xiV********";
    // MQ4IOT clientId,由业务系统分配,需要保证每个 tcp 连接都不一样,保证全局唯一,如果不同的客户端对象(tcp 连接)使用了相同的 clientId 会导致连接异常断开。
    String clientId = "GID_pdsa_lhh_mqtt";
  4. Signature鉴权模式下,与MQTT建立连接代码。
    public ConnectionOptionWrapper(String instanceId, String accessKey, String secretKey,
             String clientId) throws NoSuchAlgorithmException, InvalidKeyException {
             this.instanceId = instanceId;
             this.accessKey = accessKey;
             this.secretKey = secretKey;
             this.clientId = clientId;
             mqttConnectOptions = new MqttConnectOptions();
             mqttConnectOptions.setUserName("Signature|" + accessKey + "|" + instanceId);
             mqttConnectOptions.setPassword(Tools.macSignature(clientId, secretKey).toCharArray());
             mqttConnectOptions.setCleanSession(true);
             mqttConnectOptions.setKeepAliveInterval(90);
             mqttConnectOptions.setAutomaticReconnect(true);
             mqttConnectOptions.setMqttVersion(MQTT_VERSION_3_1_1);
             mqttConnectOptions.setConnectionTimeout(5000);
    
    }

    Signature鉴权模式下,计算签名代码如下:

    public static String macSignature(String text,
             String secretKey) throws InvalidKeyException, NoSuchAlgorithmException {
             Charset charset = Charset.forName("UTF-8");
             String algorithm = "HmacSHA1";
             Mac mac = Mac.getInstance(algorithm);
             mac.init(new SecretKeySpec(secretKey.getBytes(charset), algorithm));
             byte[] bytes = mac.doFinal(text.getBytes(charset));
             return new String(Base64.encodeBase64(bytes), charset);
    }

    Token鉴权模式下,与MQTT建立连接:

    public ConnectionOptionWrapper(String instanceId, String accessKey, String clientId,
             Map<String, String> tokenData) {
             this.instanceId = instanceId;
             this.accessKey = accessKey;
             this.clientId = clientId;
             if (tokenData != null) {
                       this.tokenData.putAll(tokenData);
             }
             mqttConnectOptions = new MqttConnectOptions();
             mqttConnectOptions.setUserName("Token|" + accessKey + "|" + instanceId);
             StringBuilder builder = new StringBuilder();
             for (Map.Entry<String, String> entry : tokenData.entrySet()) {
                       builder.append(entry.getKey()).append("|").append(entry.getValue()).append("|");
             }
             if (builder.length() > 0) {
                       builder.setLength(builder.length() - 1);
    
             }
             mqttConnectOptions.setPassword(builder.toString().toCharArray());
             mqttConnectOptions.setCleanSession(true);
             mqttConnectOptions.setKeepAliveInterval(90);
             mqttConnectOptions.setAutomaticReconnect(true);
             mqttConnectOptions.setMqttVersion(MQTT_VERSION_3_1_1);
             mqttConnectOptions.setConnectionTimeout(5000);
    }
  5. MQTT同步原理发送示例代码。
    //MQ4IOT 消息的一级 topic,需要在控制台申请才能使用。
    final String parentTopic = "pdsa_lhh_mqtt";
    //MQ4IOT支持子级 topic,用来做自定义的过滤
    final String mq4IotTopic = parentTopic + "/" + "testMq4Iot";
    //QoS参数代表传输质量,可选0,1,2,根据实际需求合理设置
    final int qosLevel = 0;
    ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey,
    secretKey, clientId);
    final MemoryPersistence memoryPersistence = new MemoryPersistence();
    //客户端使用的协议和端口必须匹配
    final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
    //客户端设置好发送超时时间,防止无限阻塞
    mqttClient.setTimeToWait(5000);
    
    mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
    for (int i = 0; i < 100; i++) {
             MqttMessage message = new MqttMessage("hello mq4Iot pub sub msg".getBytes());
             message.setQos(qosLevel);
             //发送普通消息时,topic 必须和接收方订阅的 topic 一致,或者符合通配符匹配规则
             mqttClient.publish(mq4IotTopic, message);
    }

消费者订阅消息代码详述

您可以参照以下步骤对代码进行改造:

  1. 在构建好的maven工程中,在pom.xml文件导入以下依赖包。
    <dependency>
             <groupId>commons-codec</groupId>
             <artifactId>commons-codec</artifactId>
             <version>1.10</version>
    </dependency>
    <dependency>
             <groupId>org.eclipse.paho</groupId>
             <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
             <version>1.2.2</version>
    </dependency>
    <dependency>
             <groupId>org.apache.httpcomponents</groupId>
             <artifactId>httpclient</artifactId>
             <version>4.5.2</version>
    </dependency>
    <dependency>
             <groupId>com.alibaba</groupId>
             <artifactId>fastjson</artifactId>
             <version>1.2.48</version>
    </dependency>
  2. 消息者订阅消息示例代码。
    final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>());
    mqttClient.setCallback(new MqttCallbackExtended() {
             @Override
             public void connectComplete(boolean reconnect, String serverURI) {
                       //客户端连接成功后就需要尽快订阅需要的 topic
                       System.out.println("connect success");
                       executorService.submit(new Runnable() {
                                @Override
                                public void run() {
                                         try {
                                                   final String topicFilter[] = {mq4IotTopic};
                                                   final int[] qos = {qosLevel};
                                                   mqttClient.subscribe(topicFilter, qos);
                                         } catch (MqttException e) {
                                                   e.printStackTrace();
                                         }
                                }
                       });
             }
    });
  3. 消息订阅结果处理。
    @Override
    public void
    connectionLost(Throwable throwable) {
             throwable.printStackTrace();
    }
    
    @Override
    public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
             //消费消息的回调接口,需要确保该接口不抛异常,该接口运行返回即代表消息消费成功。消费消息需要保证在规定时间内完成,如果消费耗时超过服务端约定的超时时间,对于可靠传输的模式,服务端可能会重试推送,业务需要做好幂等去重处理。
             System.out.println(
                       "receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload()));
    } 
    
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
             System.out.println("send msg
    succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);
    
    }

创建MQTT实例

您可以按照以下步骤创建MQTT实例。

说明
  • 您必须已经开通母产品消息队列 MQ 的服务。
  • 您必须已经拥有了阿里云 AccessKey(AK)。
  1. 登录MQTT控制台。
  2. 在左侧导航栏单击概览
  3. 实例列表页面,单击新建实例,创建完MQTT实例详情如下图所示。
    新建实例
  4. 在左侧导航栏单击消息存储
  5. Topic管理页面,选择您刚创建的微消息队列 MQTT 版实例,单击新建Topic
  6. 创建Topic对话框,输入Topic名称、选择该Topic用于存储和收发的消息类型、输入备注信息,然后单击确认
    创建topic
  7. 在左侧导航栏,单击Group管理
  8. Group管理页面,选择您刚创建的微消息队列 MQTT 版实例,单击新建Group ID
  9. 创建Group ID对话框中,输入需要创建的Group ID,然后单击确定
    新建group ID

连接查询

如需了解微消息队列MQTT版客户端的连接情况,您可以指定Topic、Group ID 以及 Client ID查询客户端的当前在线数、在指定时间段内的在线数以及连接信息和订阅关系。

  1. 登录微消息队列MQTT版控制台。
  2. 在顶部菜单栏选择要查询的地域。
  3. 在左侧导航栏,单击连接查询
  4. 连接查询页面,单击目标实例,单击按Topic查询页签。
  5. 在父级Topic文本框输入要查询的父级Topic,在子级Topic文本框输入要查询的子级Topic,单击查询
    topic查询

消息查询

您可以按以下步骤在微消息队列MQTT版的控制台查询所有Topic的收发消息的数量和TPS。

说明

此查询支持多维度筛选,以及自定义时间范围查询。

您可以通过以下任一控制台路径进入查询页面:
  • 在控制台左侧导航栏,选择资源报表
  • 在控制台左侧导航栏,选择消息存储。然后在需要查询的 Topic 的操作列单击资源报表
  1. 在控制台左侧导航栏,选择资源报表
  2. 资源报表页面,根据需要查看的信息类型选择相应页签。
    • 生产者:查看某Topic在一个时间段内从消息生产者处接收到的消息的总量或者TPS。
    • 消费者:查看某Topic在一个时间段内投递给消费者的消息的总量或TPS。
  3. 输入查询的筛选的条件,包括父级 Topic、子级 Topic、QoS 类型、消息类型和时间范围。
  4. 单击查询
    在消息生产总量、消息消费总量、消息生产TPS和消息消费TPS的图标页面右上方单击详情,还可以快捷查看最近3小时、最近12小时、最近1天的相关统计数据,或者重新自定义时间范围。 资源报表

    生产者消息生产总量、消息生产TPS查询结果及查询时间范围内最大值、平均值、最小值统计如下:

    资源报表 消费总量

签名校验

微消息队列 MQTT 版控制台提供了签名计算工具,您可以按以下步骤比对验证自己的签名计算是否正确。

说明 该功能主要用于签名验证,验证过程将在本地浏览器完成,无网络传输,不与服务端交互。
  1. 登录微消息队列MQTT版控制台。
  2. 在左侧导航栏中,单击签名校验
  3. 签名校验页面,输入AccessKeyId、AccessKeySecret 以及 Client ID。
  4. 单击计算签名
    签名校验

MQTT应用场景

以下内容对MQTT应用于加油站场景进行原理图和代码的示例。

加油站上报下行数据的架构,MQTT结合MQ使用如下图所示:

应用场景

消息队列RocketMQ客户端发消息到MQTT客户端,然后MQTT订阅完整的消息,上述设计原理对应的示例代码如下:

public void rocketMQToMQTT() throws Exception {
        Properties properties = new Properties();
        properties.setProperty(PropertyKeyConst.GROUP_ID, "GID-XXXXX");       
        properties.put(PropertyKeyConst.AccessKey, "XXXX");   
        properties.put(PropertyKeyConst.SecretKey, "XXXX");
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "http://xxxxx.cn-******.mq-internet.aliyuncs.com");
        final String parentTopic = "XXXXX";
        Properties initProperties = new Properties();
        Producer producer = ONSFactory.createProducer(initProperties);
        producer.start();
 
        String instanceId = "XXXXX";
        String endPoint = "XXXXXX.mqtt.aliyuncs.com";
        String accessKey = "XXXX";
        String secretKey = "XXXX";
        String clientId = "GID_XXXX@@@XXXXX";
                   //MQ4IOT支持子级 topic,用来做自定义的过滤
        final String subTopic = "/testMq4Iot";
        final String mq4IotTopic = parentTopic + subTopic;
                   //QoS参数代表传输质量,可选0,1,2
        final int qosLevel = 0;
        ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
        final MemoryPersistence memoryPersistence = new MemoryPersistence();
        final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
                   //客户端设置好发送超时时间,防止无限阻塞
        mqttClient.setTimeToWait(5000);
        final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>());
        mqttClient.setCallback(new MqttCallbackExtended() {
            @Override
            public void connectComplete(boolean reconnect, String serverURI) {
               System.out.println("connect success");
                executorService.submit(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            final String topicFilter[] = {mq4IotTopic};
                            final int[] qos = {qosLevel};
                            mqttClient.subscribe(topicFilter, qos);
                        } catch (MqttException e) {
                            e.printStackTrace();
                        }
                    }
                });
            }

            @Override
            public void connectionLost(Throwable throwable) {
                throwable.printStackTrace();
            }
 
            @Override
            public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                System.out.println(
                    "receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload()));
            }
 
            @Override
            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);
            }
        });
        mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
        for (int i = 0; i < 10; i++) {
            Message msg = new Message(parentTopic, "MQ2MQTT", "hello mq send mqtt msg".getBytes());          
msg.putUserProperties(PropertyKeyConst.MqttSecondTopic, subTopic);
            SendResult result = producer.send(msg);
            System.out.println(result);
        }
        Thread.sleep(Long.MAX_VALUE);
}