本文介绍如何将其他阿里云产品的数据导入微消息队列 MQTT 版。本文以当前仅支持的消息队列 RocketMQ 版数据互通为例进行说明。

前提条件

  • 安装 IDE。您可以使用 IntelliJ IDEA 或者 Eclipse,本文以 IntelliJ IDEA 为例。
  • 下载安装 JDK
  • 在公网地域,创建消息队列 RocketMQ 版实例、Topic 以及 Group ID。详情请参见创建资源

背景信息

本文以公网环境中的 Java SDK 为例说明如何将消息队列 RocketMQ 版数据流入至微消息队列 MQTT 版

此场景下可使用多语言的第三方开源 SDK 来实现消息收发。详情请参见 SDK 下载

quick_start_data_inflow

如上图所示,您部署在公网地域的后端应用数据需要传送至公网环境中的 MQTT 客户端,后端应用和 MQTT 客户端均通过 Java 语言开发。数据从消息队列 RocketMQ 版流入至微消息队列 MQTT 版是通过创建相应的数据流入规则实现。两个产品的服务端通过各自产品提供的 Java SDK 分别与各自的客户端实现消息收发。

说明 消息队列 RocketMQ 版微消息队列 MQTT 版的 Topic 不能跨地域使用,因此,本文中所有资源都应在公网地域创建。详情请参见 Topic 地域化

网络访问

微消息队列 MQTT 版同时提供了公网接入点经典网接入点VPC 接入点。接入点说明如下:
  • 在物联网和移动互联网的场景中,客户端推荐使用公网接入点接入。
  • 经典网接入点VPC 接入点仅供一些特殊场景使用。因为一般而言,涉及部署在云端服务器上的应用的场景,建议使用服务端消息产品例如消息队列 RocketMQ 版实现。
注意 客户端使用接入点连接服务时务必使用域名接入,不得直接使用域名背后的 IP 地址直接连接,因为 IP 地址随时会变化。在以下使用情况中出现的问题微消息队列 MQTT 版产品方概不负责:
  • 客户端不使用域名接入而是使用 IP 地址接入,产品方更新了域名解析导致原有 IP 地址失效。
  • 客户端网络对 IP 地址设置网络防火墙策略,产品方更新了域名解析后新 IP 地址被您的防火墙策略拦截。
本文以公网接入点为例。微消息队列 MQTT 版消息队列 RocketMQ 版的应用场景对比和消息属性映射关系请参见以下文档:

使用流程

后端应用消息发送至 MQTT 客户端的使用流程如下所示。

rocketmq_send_message_to_mqtt

步骤一:创建 MQTT 实例并获取接入点

  1. 登录微消息队列 MQTT 版控制台,在顶部菜单栏选择实例所在地域。
  2. 概览页面的实例列表区域,单击 点击这里创建您的第一个实例
  3. 在售卖页面,按需选择您所需的实例规格,单击立即购买
    您可购买包年包月或按量付费实例,两种实例的计费详情请参见计费说明。此处以按量付费实例为例。select_instance_type
  4. 确认订单页面,选中相应服务协议,单击去支付
    此处以按量付费实例为例。common_buy_mqtt
    支付完成后,即可看到以下界面,说明已购买成功。create_success
  5. 回到微消息队列 MQTT 版控制台,在左侧导航栏,选择实例详情 > 实例信息
  6. 实例信息页签的获取接入点信息 区域,即可看到接入点信息,本文示例以公网接入点为例。

步骤二:创建父级 Topic

MQTT 协议支持多级 Topic,父级 Topic 需在控制台或调用 OpenAPI 创建,子级 Topic 无需创建,Topic 详情请参见名词解释。本文以在控制台创建父级 Topic 为例。

  1. 在左侧导航栏,选择 Topic 管理 > 创建 Topic
  2. 创建 Topic 对话框,输入 Topic 以及描述,单击确认
    您可以在 Topic 管理页面查看刚创建的 Topic。

步骤三:创建 Group ID

Group ID 详情请参见名词解释

  1. 在左侧导航栏,选择 Group 管理 > 创建 Group ID
  2. 创建 Group ID 对话框,输入 Group ID,然后单击确认
    您可以在 Group ID 列表中查看到刚创建的 Group ID。

步骤四:创建数据流入规则

规则中填写的参数需与您创建的资源保持一致。

  1. 在左侧导航栏,选择规则管理 > 创建规则
  2. 创建规则面板,单击数据流入页签。
  3. 创建规则面板,单击数据流入页签。
  4. 数据流入页签,按提示填写以下参数,单击确定
    参数 取值示例 说明
    规则 ID 111111 规则的全局唯一标识,说明如下:
    • 只能包含字母、数字、横划线(-)和下划线(_),至少包含一个字母或数字。
    • 名称长度限制在 3~64 字符之间,长于 64 字符将被自动截取。
    • 创建后无法更新。
    规则描述 migrate from rocketmq 对规则的描述。
    状态 启用 是否启用当前规则,取值说明如下:
    • 启用
    • 禁用
    数据源
    说明 当前仅支持消息队列 RocketMQ 版
    云产品 消息队列 RocketMQ 指定您需将哪个源云产品的数据流入至微消息队列 MQTT 版。
    实例 ID MQ_INST_13801563067*****_BbyOD2jQ 指定的源云产品的实例 ID。
    Topic Topic_test 指定的源云产品的资源键值,即消息队列 RocketMQ 版的 Topic。Topic_test 的数据将流转至目标微消息队列 MQTT 版的 Topic。
    流转目标
    Topic TopicA 指定您需将其他源云产品的数据导入至微消息队列 MQTT 版的哪个目标 Topic。
    您可以在规则管理的规则列表查看到刚创建的数据流入规则。

步骤五:调用 Java SDK 收发消息

  1. 下载第三方的开源 Java SDK。下载地址为 Eclipse Paho Java Client
  2. 下载阿里云 MQTT 的 Java SDK 的 Demo 示例作为您代码开发的参考。下载地址为 mqtt-java-demo
  3. 解压该 Demo 工程包至您指定的文件夹。
  4. 在 Intellij IDEA 中,导入解压后的文件以创建相应的工程,并确认 pom.xml 中已包含以下依赖。
    <dependencies>
            <dependency>
                <groupId>commons-codec</groupId>
                <artifactId>commons-codec</artifactId>
                <version>1.10</version>
            </dependency>
            <dependency>
                <groupId>org.eclipse.paho</groupId>
                <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
                <version>1.2.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.httpcomponents</groupId>
                <artifactId>httpclient</artifactId>
                <version>4.5.2</version>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.48</version>
            </dependency>
            <dependency>
                <groupId>com.aliyun.openservices</groupId>
                <artifactId>ons-client</artifactId>
                <version>1.8.5.Final</version>
            </dependency>
            <dependency>
                <groupId>com.aliyun</groupId>
                <artifactId>aliyun-java-sdk-onsmqtt</artifactId>
                <version>1.0.3</version>
            </dependency>
            <dependency>
                <groupId>com.aliyun</groupId>
                <artifactId>aliyun-java-sdk-core</artifactId>
                <version>4.5.0</version>
            </dependency>
    </dependencies>
    说明 ons-client 的版本详情请参见版本说明
  5. RocketMQSendMessageToMQ4IoT.java 类中,按代码注释说明填写相应参数,主要涉及步骤一至步骤三所创建的 MQTT 资源以及您在消息队列 RocketMQ 版创建的相应资源,然后执行 Main 函数运行代码实现消息收发。
    示例代码详情请参见 RocketMQSendMessageToMQ4IoT.java

结果验证

完成消息收发后,您可在微消息队列 MQTT 版控制台查询轨迹以验证消息是否发送并接收成功。详情请参见查询消息轨迹

更多信息