本文介绍如何将其他阿里云产品的数据导入云消息队列 MQTT 版。本文以当前仅支持的云消息队列 RocketMQ 版数据互通为例进行说明。
前提条件
背景信息
本文以公网环境中的Java SDK为例说明如何将云消息队列 RocketMQ 版数据流入至云消息队列 MQTT 版。
此场景下可使用多语言的第三方开源SDK来实现消息收发。更多信息,请参见SDK下载。

如上图所示,您部署在公网地域的后端应用数据需要传送至公网环境中的MQTT客户端,后端应用和MQTT客户端均通过Java语言开发。数据从云消息队列 RocketMQ 版流入至云消息队列 MQTT 版是通过创建相应的数据流入规则实现。两个产品的服务端通过各自产品提供的Java SDK分别与各自的客户端实现消息收发。
重要 云消息队列 RocketMQ 版和云消息队列 MQTT 版的Topic不能跨地域使用,因此,本文中所有资源都应在公网地域创建。
网络访问
云消息队列 MQTT 版同时提供了公网接入点和VPC 接入点。
- 公网接入点为本地公网环境访问的IP地址,一般用于物联网和移动互联网场景中;
- VPC 接入点为云上私网访问的IP地址,一般用于云端应用接入云消息队列 MQTT 版。
重要 客户端使用接入点连接服务时务必使用域名接入,不得直接使用域名背后的IP地址直接连接,因为IP地址随时会变化。在以下使用情况中出现的问题云消息队列 MQTT 版产品方概不负责:
- 客户端不使用域名接入而是使用IP地址接入,产品方更新了域名解析导致原有IP地址失效。
- 客户端网络对IP地址设置网络防火墙策略,产品方更新了域名解析后新IP地址被您的防火墙策略拦截。
本文以公网接入点为例。云消息队列 MQTT 版与云消息队列 RocketMQ 版的应用场景对比和消息属性映射关系请参见以下文档:
使用流程
后端应用消息发送至MQTT客户端的使用流程如下所示。

步骤一:创建MQTT实例并获取接入点
重要 轻量版实例不支持数据流入规则、数据流出规则及上下线通知规则,如需使用规则功能,请您在创建实例时选择其他类型的实例。
- 登录云消息队列 MQTT 版控制台。
- 在左侧导航栏单击实例列表。
- 在顶部菜单栏选择地域。
- 在实例列表页面左上角单击创建实例。
- 在弹出的付费方式面板中,付费方式固定为包年包月,您无需设置,直接在面板左下角单击确定。
- 在弹出的实例规格面板中,按需选择您需要购买的实例规格,选中微消息队列 MQTT 版(包年包月)服务协议,然后单击立即购买。
- 在订单支付面板,根据提示完成支付。
- 在支付成功页面单击返回控制台。
- 回到云消息队列 MQTT 版控制台,在左侧导航栏单击实例列表,并将地域切换为您所购买的实例所对应的地域。
- 在实例列表页面中,单击您所购买实例的名称或在其操作列单击详情,进入实例详情页面。
- 在实例详情页面单击接入点页签,即可看到实例的接入点信息,本示例以公网接入点为例。
步骤二:创建父级Topic
MQTT协议支持多级Topic,父级Topic需在控制台创建,子级Topic无需创建,使用时直接在代码中设置即可。命名格式为:父级Topic和各子级Topic间均使用正斜线(/)隔开,<父级Topic名称>/<二级Topic名称>/<三级Topic名称>,例如,SendMessage/demo/producer。父级Topic和子级Topic总长度不能超过64个字符。Topic详细信息,请参见名词解释。
- 登录云消息队列 MQTT 版控制台。
- 在左侧导航栏单击实例列表。
- 在顶部菜单栏选择地域。
- 在实例列表中找到目标实例,在其操作列中,选择 。
- 在Topic 管理页面左上角,单击创建 Topic。
- 在创建Topic面板中,输入要创建的Topic名称和描述,然后在左下角单击确定。您可以在Topic 管理页面查看刚创建的Topic。
步骤三:创建Group ID
Group ID详细信息,请参见名词解释。
- 登录云消息队列 MQTT 版控制台。
- 在左侧导航栏单击实例列表。
- 在顶部菜单栏选择地域。
- 在实例列表中找到目标实例,在其操作列中,选择 。
- 在Group 管理页面的左上角,单击创建 Group。
- 在创建Group面板中,输入Group ID,然后在左下角单击确定。您可以在Group 管理页面查看刚创建的Group。
步骤四:创建数据流入规则
规则中填写的参数需与您创建的资源保持一致。
- 登录云消息队列 MQTT 版控制台,并在左侧导航栏单击实例列表。
- 在顶部菜单栏选择目标地域,然后在实例列表中单击实例名称进入实例详情页面。
- 在左侧导航栏单击规则管理,然后在页面左上角,单击创建规则。
- 在创建规则页面完成以下操作。您可以在规则管理的规则列表查看到刚创建的数据流入规则。
步骤五:调用Java SDK收发消息
- 下载第三方的开源Java SDK。下载地址为Eclipse Paho Java Client。
- 下载阿里云云消息队列 MQTT 版的Java SDK的Demo示例作为您代码开发的参考。下载地址为mqtt-java-demo。
- 解压该Demo工程包至您指定的文件夹。
- 在IntelliJ IDEA中,导入解压后的文件以创建相应的工程,并确认pom.xml中已包含以下依赖。
<dependencies> <dependency> <groupId>commons-codec</groupId> <artifactId>commons-codec</artifactId> <version>1.10</version> </dependency> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.2</version> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.5.2</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.83</version> </dependency> <dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>ons-client</artifactId> <version>1.8.5.Final</version> </dependency> <dependency> <groupId>com.aliyun</groupId> <artifactId>aliyun-java-sdk-onsmqtt</artifactId> <version>1.0.3</version> </dependency> <dependency> <groupId>com.aliyun</groupId> <artifactId>aliyun-java-sdk-core</artifactId> <version>4.5.0</version> </dependency> </dependencies>
说明 ons-client的版本信息,请参见版本说明。 - 在RocketMQSendMessageToMQ4IoT.java类中,按代码注释说明填写相应参数,主要涉及步骤一至步骤三所创建MQTT资源以及您在云消息队列 RocketMQ 版创建的相应资源,然后执行Main函数运行代码实现消息收发。示例代码如下。说明 在使用示例代码前,需要配置环境变量,通过环境变量读取访问凭证。关于配置环境变量的方法,请参见配置访问凭证。
云消息队列 MQTT 版的AccessKey ID和AccessKey Secret的环境变量名称分别为MQTT_AK_ENV和MQTT_SK_ENV。
package com.aliyun.openservices.lmq.example.demo; import com.aliyun.openservices.lmq.example.util.ConnectionOptionWrapper; import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.ONSFactory; import com.aliyun.openservices.ons.api.Producer; import com.aliyun.openservices.ons.api.PropertyKeyConst; import com.aliyun.openservices.ons.api.SendResult; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallbackExtended; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; /** * 阿里云生产环境中,云消息队列 RocketMQ 版除了公网地域,其他地域的实例不允许在本地使用,必须在对应地域的ECS机器上部署使用。 */ public class RocketMQSendMessageToMQ4IoT { public static void main(String[] args) throws Exception { /** * 初始化云消息队列 RocketMQ 版发送客户端,实际业务中一般部署在服务端应用中。 */ Properties properties = new Properties(); /** * 设置云消息队列 RocketMQ 版Group ID,在云消息队列 RocketMQ 版控制台创建。 */ properties.setProperty(PropertyKeyConst.GROUP_ID, "GID-XXXXX"); /** * AccessKey ID,阿里云身份验证,在阿里云RAM控制台创建。 * 阿里云账号AccessKey拥有所有API的访问权限,建议您使用RAM用户进行API访问或日常运维。 * 强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。 * 本示例以将AccessKey 和 AccessKeySecret 保存在环境变量为例说明。 */ properties.put(PropertyKeyConst.AccessKey, System.getenv("MQTT_AK_ENV")); /** * AccessKey Secret,阿里云身份验证,在阿里云RAM控制台创建。仅在签名鉴权模式下需要设置。 */ properties.put(PropertyKeyConst.SecretKey, System.getenv("MQTT_SK_ENV")); /** * 设置TCP接入点,该接入点为云消息队列 RocketMQ 版实例的接入点。进入云消息队列 RocketMQ 版控制台实例详情页面获取。 */ properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXXX"); /** * 设置云消息队列 RocketMQ 版的Topic,在云消息队列 RocketMQ 版控制台创建。 * 云消息队列 RocketMQ 版和云消息队列 MQTT 版配合使用时,RocketMQ客户端仅操作一级Topic。 */ final String parentTopic = "XXXXX"; Producer producer = ONSFactory.createProducer(properties); producer.start(); ////////////////////////////////////////////////////////////////////////////////////////////////////////////////// /** * 初始化云消息队列 MQTT 版接收客户端,实际业务中云消息队列 MQTT 版一般部署在移动终端环境。 */ /** * 您在控制台创建的云消息队列 MQTT 版的实例ID。 */ String instanceId = "XXXXX"; /** * 设置接入点,进入云消息队列 MQTT 版控制台实例详情页面获取。 */ String endPoint = "XXXXXX.mqtt.aliyuncs.com"; /** * AccessKey ID,阿里云身份验证,在阿里云RAM控制台创建。 * 阿里云账号AccessKey拥有所有API的访问权限,建议您使用RAM用户进行API访问或日常运维。 * 强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。 * 本示例以将AccessKey 和 AccessKeySecret 保存在环境变量为例说明。 */ String accessKey = System.getenv("MQTT_AK_ENV"); /** * AccessKey Secret,阿里云身份验证,在阿里云RAM控制台创建。仅在签名鉴权模式下需要设置。 */ String secretKey = System.getenv("MQTT_SK_ENV"); /** * MQTT客户端ID,由业务系统分配,需要保证每个TCP连接都不一样,保证全局唯一,如果不同的客户端对象(TCP连接)使用了相同的clientId会导致连接异常断开。 * clientId由两部分组成,格式为GroupID@@@DeviceID,其中GroupID在云消息队列 MQTT 版控制台创建,DeviceID由业务方自己设置,clientId总长度不得超过64个字符。 */ String clientId = "GID_XXXX@@@XXXXX"; /** * 云消息队列 MQTT 版支持子级Topic,用来做自定义的过滤,此处为示例,可以填写任何字符串。 * 需要注意的是,完整的Topic长度不得超过128个字符。 */ final String subTopic = "/testMq4Iot"; final String mq4IotTopic = parentTopic + subTopic; /** * QoS参数代表传输质量,可选0,1,2。详细信息,请参见名词解释。 */ final int qosLevel = 0; ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId); final MemoryPersistence memoryPersistence = new MemoryPersistence(); /** * 客户端协议和端口。客户端使用的协议和端口必须匹配,如果是SSL加密则设置ssl://endpoint:8883。 */ final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence); /** * 设置客户端发送超时时间,防止无限阻塞。 */ mqttClient.setTimeToWait(5000); final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); mqttClient.setCallback(new MqttCallbackExtended() { @Override public void connectComplete(boolean reconnect, String serverURI) { /** * 客户端连接成功后就需要尽快订阅需要的Topic。 */ System.out.println("connect success"); executorService.submit(new Runnable() { @Override public void run() { try { final String topicFilter[] = {mq4IotTopic}; final int[] qos = {qosLevel}; mqttClient.subscribe(topicFilter, qos); } catch (MqttException e) { e.printStackTrace(); } } }); } @Override public void connectionLost(Throwable throwable) { throwable.printStackTrace(); } @Override public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { /** * 消费消息的回调接口,需要确保该接口不抛异常,该接口运行返回即代表消息消费成功。 * 消费消息需要保证在规定时间内完成,如果消费耗时超过服务端约定的超时时间,对于可靠传输的模式,服务端可能会重试推送,业务需要做好幂等去重处理。 */ System.out.println( "receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload())); } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]); } }); mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions()); for (int i = 0; i < 10; i++) { /** * 使用RocketMQ客户端发消息给MQTT客户端时,Topic指定为一级父Topic,Tag指定为MQ2MQTT。 */ Message msg = new Message(parentTopic, "MQ2MQTT", "hello mq send mqtt msg".getBytes()); /** * 使用RocketMQ客户端发消息给MQTT客户端时,可以通过MqttSecondTopic属性设置MQTT的子级Topic属性。 */ msg.putUserProperties(PropertyKeyConst.MqttSecondTopic, subTopic); SendResult result = producer.send(msg); System.out.println(result); /** * 发送P2P消息,设置子级Topic。 */ msg.putUserProperties(PropertyKeyConst.MqttSecondTopic, "/p2p/" + clientId); result = producer.send(msg); System.out.println(result); } Thread.sleep(Long.MAX_VALUE); } }
结果验证
完成消息收发后,您可在云消息队列 MQTT 版控制台查询轨迹以验证消息是否发送并接收成功。详细信息,请参见消息轨迹查询。