本文介绍如何快速使用微消息队列MQTT版的Java SDK实现MQTT终端与终端消息的收发。本场景示例不涉及跨云产品的数据互通。

前提条件

  • 安装IDE。您可以使用IntelliJ IDEA或者Eclipse,本文以IntelliJ IDEA为例。
  • 下载安装JDK

背景信息

微消息队列MQTT版最简单的使用场景即是MQTT终端和终端交互。此场景下可使用多语言的第三方开源SDK来实现消息收发。目前支持的多语言,请参见SDK下载

本文以公网环境中的Java SDK为例说明如何实现MQTT终端与终端的消息收发。

设备端交互

如上图所示,在物联网和移动互联网场景中,您部署在公网的终端设备采用Java语言开发,需要与阿里云MQTT服务端实现消息收发。那么您需要在您的终端代码中,嵌入Java SDK的相应代码,并在SDK中配置相应参数才能与MQTT服务端通信。

网络访问

微消息队列MQTT版同时提供了公网接入点VPC 接入点。接入点说明如下:
  • 在物联网和移动互联网的场景中,客户端推荐使用公网接入点接入。
  • VPC 接入点仅供一些特殊场景使用。因为一般而言,涉及部署在云端服务器上的应用的场景,建议使用服务端消息产品例如消息队列RocketMQ版实现。
注意 客户端使用接入点连接服务时务必使用域名接入,不得直接使用域名背后的IP地址直接连接,因为IP地址随时会变化。在以下使用情况中出现的问题微消息队列MQTT版产品方概不负责:
  • 客户端不使用域名接入而是使用IP地址接入,产品方更新了域名解析导致原有IP地址失效。
  • 客户端网络对IP地址设置网络防火墙策略,产品方更新了域名解析后新IP地址被您的防火墙策略拦截。
本文以公网接入点为例。微消息队列MQTT版消息队列RocketMQ版的应用场景对比和消息属性映射关系请参见以下文档:

使用流程

调用MQTT的Java SDK收发消息的流程如下图所示。quick_start_no_cross_product

步骤一:创建实例并获取接入点

  1. 登录微消息队列MQTT版控制台
  2. 在左侧导航栏单击实例列表
  3. 在顶部菜单栏选择地域。
  4. 实例列表页面左上角单击创建实例
  5. 在弹出的付费方式面板中,按需选择实例付费方式。
    微消息队列MQTT版支持按量付费包年包月付费模式,两种类型的计费方式请参见计费说明
    • 创建按量付费实例
      1. 付费方式选择按量付费,然后单击确定
      2. 在实例规格面板,按需选择您需要购买的实例规格,单击立即购买按量付费实例
    • 创建包年包月实例
      1. 付费方式选择包年包月,然后确定
      2. 在实例规格面板,按需选择您需要购买的实例规格,单击立即购买购买包年包月
      3. 在订单支付面板,单击订购
    购买成功后,刷新微消息队列MQTT版控制台实例列表页面,可以看到刚才新创建的实例。
  6. 实例列表页面中,单击您所购买实例的名称或在其操作列单击详情,进入实例详情页面。
  7. 实例详情页面单击接入点页签,即可看到实例的接入点信息,本示例以公网接入点为例。

步骤二:创建父级Topic

MQTT协议支持多级Topic,父级Topic需在控制台创建,子级Topic无需创建,使用时直接在代码中设置即可。命名格式为:父级Topic和各子级Topic间均使用正斜线(/)隔开,<父级Topic名称>/<二级Topic名称>/<三级Topic名称>,例如,SendMessage/demo/producer。父级Topic和子级Topic总长度不能超过64个字符。Topic详细信息,请参见名词解释

  1. 登录微消息队列MQTT版控制台
  2. 在左侧导航栏单击实例列表
  3. 在顶部菜单栏选择地域。
  4. 在实例列表中找到目标实例,在其操作列中,选择更多 > Topic 管理
  5. Topic 管理页面左上角,单击创建 Topic
  6. 在创建Topic面板中,输入要创建的Topic名称描述,然后在左下角单击确定
    您可以在Topic 管理页面查看刚创建的Topic。

步骤三:创建Group ID

Group ID详细信息,请参见名词解释

  1. 登录微消息队列MQTT版控制台
  2. 在左侧导航栏单击实例列表
  3. 在顶部菜单栏选择地域。
  4. 在实例列表中找到目标实例,在其操作列中,选择更多 > Group 管理
  5. Group 管理页面的左上角,单击创建 Group
  6. 在创建Group面板中,输入Group ID,然后在左下角单击确定
    您可以在Group 管理页面查看刚创建的Group。

步骤四:调用Java SDK收发消息

  1. 下载第三方的开源Java SDK。下载地址为Eclipse Paho Java Client
  2. 下载阿里云微消息队列MQTT版的Java SDK的Demo示例作为您代码开发的参考。下载地址为mqtt-java-demo
  3. 解压该Demo工程包至您指定的文件夹。
  4. 在IntelliJ IDEA中,导入解压后的文件以创建相应的工程,并确认pom.xml中已包含以下依赖。
    <dependencies>
            <dependency>
                <groupId>commons-codec</groupId>
                <artifactId>commons-codec</artifactId>
                <version>1.10</version>
            </dependency>
            <dependency>
                <groupId>org.eclipse.paho</groupId>
                <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
                <version>1.2.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.httpcomponents</groupId>
                <artifactId>httpclient</artifactId>
                <version>4.5.2</version>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.48</version>
            </dependency>
            <dependency>
                <groupId>com.aliyun</groupId>
                <artifactId>aliyun-java-sdk-onsmqtt</artifactId>
                <version>1.0.3</version>
            </dependency>
            <dependency>
                <groupId>com.aliyun</groupId>
                <artifactId>aliyun-java-sdk-core</artifactId>
                <version>4.5.0</version>
            </dependency>
    </dependencies>
  5. MQ4IoTSendMessageToMQ4IoTUseSignatureMode.java类中,按代码注释说明填写相应参数,主要涉及步骤一步骤三所创建的MQTT资源,然后执行Main函数运行代码实现消息收发。
    示例代码如下。
    package com.aliyun.openservices.lmq.example.demo;
    
    import com.aliyun.openservices.lmq.example.util.ConnectionOptionWrapper;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
    import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
    import org.eclipse.paho.client.mqttv3.MqttClient;
    import org.eclipse.paho.client.mqttv3.MqttException;
    import org.eclipse.paho.client.mqttv3.MqttMessage;
    import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    
    public class MQ4IoTSendMessageToMQ4IoTUseSignatureMode {
        public static void main(String[] args) throws Exception {
            /**
             * 您在控制台创建的微消息队列MQTT版的实例ID。
             */
            String instanceId = "XXXXX";
            /**
             * 设置接入点,进入微消息队列MQTT版控制台实例详情页面获取。
             */
            String endPoint = "XXXXX.mqtt.aliyuncs.com";
            /**
             * AccessKey ID,阿里云身份验证,在阿里云RAM控制台创建。
             */
            String accessKey = "XXXXX";
            /**
             * AccessKey Secret,阿里云身份验证,在阿里云RAM控制台创建。仅在签名鉴权模式下需要设置。
             */
            String secretKey = "XXXXX";
            /**
             * MQTT客户端ID,由业务系统分配,需要保证每个TCP连接都不一样,保证全局唯一,如果不同的客户端对象(TCP连接)使用了相同的clientId会导致连接异常断开。
             * clientId由两部分组成,格式为GroupID@@@DeviceID,其中GroupID在微消息队列MQTT版控制台创建,DeviceID由业务方自己设置,clientId总长度不得超过64个字符。
             */
            String clientId = "GID_XXXXX@@@XXXXX";
            /**
             * 微消息队列MQTT版消息的一级Topic,需要在控制台创建才能使用。
             * 如果使用了没有创建或者没有被授权的Topic会导致鉴权失败,服务端会断开客户端连接。
             */
            final String parentTopic = "XXXXX";
            /**
             * 微消息队列MQTT版支持子级Topic,用来做自定义的过滤,此处为示例,可以填写任意字符串。
             * 需要注意的是,完整的Topic长度不得超过128个字符。
             */
            final String mq4IotTopic = parentTopic + "/" + "testMq4Iot";
            /**
             * QoS参数代表传输质量,可选0,1,2。详细信息,请参见名词解释。
             */
            final int qosLevel = 0;
            ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
            final MemoryPersistence memoryPersistence = new MemoryPersistence();
            /**
             * 客户端协议和端口。客户端使用的协议和端口必须匹配,如果是SSL加密则设置ssl://endpoint:8883。
             */
            final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
            /**
             * 设置客户端发送超时时间,防止无限阻塞。
             */
            mqttClient.setTimeToWait(5000);
            final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>());
            mqttClient.setCallback(new MqttCallbackExtended() {
                @Override
                public void connectComplete(boolean reconnect, String serverURI) {
                    /**
                     * 客户端连接成功后就需要尽快订阅需要的Topic。
                     */
                    System.out.println("connect success");
                    executorService.submit(new Runnable() {
                        @Override
                        public void run() {
                            try {
                                final String topicFilter[] = {mq4IotTopic};
                                final int[] qos = {qosLevel};
                                mqttClient.subscribe(topicFilter, qos);
                            } catch (MqttException e) {
                                e.printStackTrace();
                            }
                        }
                    });
                }
    
                @Override
                public void connectionLost(Throwable throwable) {
                    throwable.printStackTrace();
                }
    
                @Override
                public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                    /**
                     * 消费消息的回调接口,需要确保该接口不抛异常,该接口运行返回即代表消息消费成功。
                     * 消费消息需要保证在规定时间内完成,如果消费耗时超过服务端约定的超时时间,对于可靠传输的模式,服务端可能会重试推送,业务需要做好幂等去重处理。
                     */
                    System.out.println(
                        "receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload()));
                }
    
                @Override
                public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                    System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);
                }
            });
            mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
            for (int i = 0; i < 10; i++) {
                MqttMessage message = new MqttMessage("hello mq4Iot pub sub msg".getBytes());
                message.setQos(qosLevel);
                /**
                 * 发送普通消息时,Topic必须和接收方订阅的Topic一致,或者符合通配符匹配规则。
                 */
                mqttClient.publish(mq4IotTopic, message);
                /**
                 * 微消息队列MQTT版支持点对点消息,即如果发送方明确知道该消息只需要给特定的一个设备接收,且知道对端的clientId,则可以直接发送点对点消息。
                 * 点对点消息不需要经过订阅关系匹配,可以简化订阅方的逻辑。点对点消息的topic格式规范是 {{parentTopic}}/p2p/{{targetClientId}}。
                 */
                final String p2pSendTopic = parentTopic + "/p2p/" + clientId;
                message = new MqttMessage("hello mq4Iot p2p msg".getBytes());
                message.setQos(qosLevel);
                mqttClient.publish(p2pSendTopic, message);
            }
            Thread.sleep(Long.MAX_VALUE);
        }
    }

结果验证

完成消息收发后,您可在微消息队列MQTT版控制台查询轨迹以验证消息是否发送并接收成功。详细信息,请参见消息轨迹查询

更多信息