MQTT和RocketMQ进行数据互通(跨产品数据流入)
如果您的云端应用需要使用云消息队列 RocketMQ 版产品的某些功能,例如顺序消息特性、事务消息特性等,您可以通过消息流入或流出规则将云消息队列 MQTT 版和云消息队列 RocketMQ 版数据进行流转。本文介绍如何将云消息队列 RocketMQ 版产品的数据导入云消息队列 MQTT 版。
背景信息
云消息队列 MQTT 版支持云端SDK,云上应用可直接通过云端SDK接入云消息队列 MQTT 版服务端进行消息收发。云端SDK使用,请参见云端开发概述。
同时云消息队列 MQTT 版支持和其他云产品进行互通,当前支持的云产品有云消息队列 RocketMQ 版。
本文以公网环境中的Java SDK为例说明如何将云消息队列 RocketMQ 版数据流入至云消息队列 MQTT 版。
云消息队列 RocketMQ 版和云消息队列 MQTT 版的Topic不能跨地域使用,因此,本文中所有资源都应在公网地域创建。
网络访问
- 公网接入点为本地公网环境访问的IP地址,一般用于物联网和移动互联网场景中;
- VPC 接入点为云上私网访问的IP地址,一般用于云端应用接入云消息队列 MQTT 版。
- 客户端不使用域名接入而是使用IP地址接入,产品方更新了域名解析导致原有IP地址失效。
- 客户端网络对IP地址设置网络防火墙策略,产品方更新了域名解析后新IP地址被您的防火墙策略拦截。
使用流程
后端应用消息发送至MQTT客户端的使用流程如下所示。
前提条件
步骤一:创建MQTT实例并获取接入点
登录云消息队列 MQTT 版控制台,并在左侧导航栏单击实例列表。
在顶部菜单栏选择目标地域,然后在页面左上角单击创建实例。
在弹出的付费方式面板中,选择按需选择包年包月或Serverless 按量付费,然后单击确定。
在弹出的商品购买页中,选择实例类型和各计费项的规格,然后单击立即购买。
云消息队列 MQTT 版支持的实例类型及功能差异,请参见实例类型。
在确认订单页面,根据提示完成支付。
在支付成功页面单击返回控制台。
- 回到云消息队列 MQTT 版控制台,在左侧导航栏单击实例列表,并将地域切换为您所购买的实例所对应的地域。
- 在实例列表页面中,单击您所购买实例的名称或在其操作列单击详情,进入实例详情页面。
- 在实例详情页面单击接入点页签,即可看到实例的接入点信息,本示例以公网接入点为例。
步骤二:创建父级Topic
MQTT协议支持多级Topic,父级Topic需在控制台创建,子级Topic无需创建,使用时直接在代码中设置即可。命名格式为:父级Topic和各子级Topic间均使用正斜线(/)隔开,<父级Topic名称>/<二级Topic名称>/<三级Topic名称>,例如,SendMessage/demo/producer。父级Topic和子级Topic总长度不能超过64个字符。Topic详细信息,请参见名词解释。
登录云消息队列 MQTT 版控制台,并在左侧导航栏单击实例列表。
在顶部菜单栏选择目标地域,然后在实例列表中单击实例名称进入实例详情页面。
- 在左侧导航栏单击Topic 管理,然后在页面左上角,单击创建 Topic。
- 在创建Topic面板中,输入要创建的Topic名称和描述,然后在左下角单击确定。
步骤三:创建Group ID
Group ID详细信息,请参见名词解释。
登录云消息队列 MQTT 版控制台,并在左侧导航栏单击实例列表。
在顶部菜单栏选择目标地域,然后在实例列表中单击实例名称进入实例详情页面。
- 在左侧导航栏单击Group 管理,然后在页面左上角单击创建 Group。
- 在创建Group面板中,输入Group ID,然后在左下角单击确定。
步骤四:创建数据流入规则
规则中填写的参数需与您创建的资源保持一致。
登录云消息队列 MQTT 版控制台,并在左侧导航栏单击实例列表。
在顶部菜单栏选择目标地域,然后在实例列表中单击实例名称进入实例详情页面。
- 在左侧导航栏单击规则管理,然后在页面左上角,单击创建规则。
- 在创建规则页面完成以下操作。
- 在配置基本信息配置向导页面,填写规则的基本信息,然后单击下一步。
参数 取值示例 说明 规则ID 111111 规则的全局唯一标识,说明如下: - 只能包含字母、数字、短划线(-)和下划线(_),至少包含一个字母或数字。
- 名称长度限制在3~64字符之间,长于64字符将被自动截取。
- 创建后无法更新。
描述 migrate from rocketmq 对规则的描述。 状态 启用 是否启用当前规则,取值说明如下: - 启用
- 停用
规则类型 数据流入 创建的规则类型,取值说明如下: - 数据流出:用于将云消息队列 MQTT 版的数据导出至其他阿里云产品。详细信息,请参见跨云产品的数据流出。
- 数据流入:用于将其他阿里云产品的数据导入至云消息队列 MQTT 版。详细信息,请参见跨云产品数据流入。
- 上下线通知:用于将获取的云消息队列 MQTT 版客户端上下线事件数据导出至其他阿里云产品。详细信息,请参见MQTT客户端上下线事件数据流出。
- 在配置规则源配置向导页面,配置数据源,然后单击下一步。
参数 取值示例 说明 源服务类型 消息队列 RocketMQ 版 指定您需将哪个源云产品的数据流入至云消息队列 MQTT 版。 说明 当前仅支持云消息队列 RocketMQ 版。RocketMQ 实例 MQ_INST_13801563067*****_BbyOD2jQ 指定源云产品的实例ID,即云消息队列 RocketMQ 版的实例ID。 说明 仅支持选择和云消息队列 MQTT 版实例为同一地域的云产品实例。Topic TopicA 指定源云产品的资源键值,即云消息队列 RocketMQ 版的Topic。TopicA的消息将流转至目标云消息队列 MQTT 版的Topic。 - 在配置规则目标配置向导页面,配置数据的流转目标,然后单击创建。
参数 取值示例 说明 Topic TopicB 指定您需要将其他源云产品的数据导入至云消息队列 MQTT 版的哪个目标Topic。
您可以在规则管理的规则列表查看到刚创建的数据流入规则。 - 在配置基本信息配置向导页面,填写规则的基本信息,然后单击下一步。
步骤五:调用Java SDK收发消息
- 下载第三方的开源Java SDK。下载地址为Eclipse Paho Java Client。
- 下载阿里云云消息队列 MQTT 版的Java SDK的Demo示例作为您代码开发的参考。下载地址为mqtt-java-demo。
- 解压该Demo工程包至您指定的文件夹。
在IntelliJ IDEA中,导入解压后的文件以创建相应的工程,并确认pom.xml中已包含以下依赖。
<dependencies> <dependency> <groupId>commons-codec</groupId> <artifactId>commons-codec</artifactId> <version>1.10</version> </dependency> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.2</version> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.5.2</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.83</version> </dependency> <dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>ons-client</artifactId> <version>1.8.5.Final</version> </dependency> <dependency> <groupId>com.aliyun</groupId> <artifactId>aliyun-java-sdk-onsmqtt</artifactId> <version>1.0.3</version> </dependency> <dependency> <groupId>com.aliyun</groupId> <artifactId>aliyun-java-sdk-core</artifactId> <version>4.5.0</version> </dependency> </dependencies>
说明 ons-client的版本信息,请参见版本说明。在RocketMQSendMessageToMQ4IoT.java类中,按代码注释说明填写相应参数,主要涉及步骤一至步骤三所创建MQTT资源以及您在云消息队列 RocketMQ 版创建的相应资源,然后执行Main函数运行代码实现消息收发。
示例代码如下。
说明在使用示例代码前,需要配置环境变量,通过环境变量读取访问凭证。关于配置环境变量的方法,请参见配置访问凭证。
云消息队列 MQTT 版的AccessKey ID和AccessKey Secret的环境变量名称分别为MQTT_AK_ENV和MQTT_SK_ENV。
package com.aliyun.openservices.lmq.example.demo; import com.aliyun.openservices.lmq.example.util.ConnectionOptionWrapper; import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.ONSFactory; import com.aliyun.openservices.ons.api.Producer; import com.aliyun.openservices.ons.api.PropertyKeyConst; import com.aliyun.openservices.ons.api.SendResult; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallbackExtended; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; public class RocketMQSendMessageToMQ4IoT { public static void main(String[] args) throws Exception { /** * 初始化云消息队列 RocketMQ 版发送客户端,实际业务中一般部署在服务端应用中。 */ Properties properties = new Properties(); /** * 设置云消息队列 RocketMQ 版Group ID,在云消息队列 RocketMQ 版控制台创建。 */ properties.setProperty(PropertyKeyConst.GROUP_ID, "GID-XXXXX"); /** * AccessKey ID,阿里云身份验证,在阿里云RAM控制台创建。 * 阿里云账号AccessKey拥有所有API的访问权限,建议您使用RAM用户进行API访问或日常运维。 * 强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。 * 本示例以将AccessKey 和 AccessKeySecret 保存在环境变量为例说明。 */ properties.put(PropertyKeyConst.AccessKey, System.getenv("MQTT_AK_ENV")); /** * AccessKey Secret,阿里云身份验证,在阿里云RAM控制台创建。仅在签名鉴权模式下需要设置。 */ properties.put(PropertyKeyConst.SecretKey, System.getenv("MQTT_SK_ENV")); /** * 设置TCP接入点,该接入点为云消息队列 RocketMQ 版实例的接入点。进入云消息队列 RocketMQ 版控制台实例详情页面获取。 */ properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXXX"); /** * 设置云消息队列 RocketMQ 版的Topic,在云消息队列 RocketMQ 版控制台创建。 * 云消息队列 RocketMQ 版和云消息队列 MQTT 版配合使用时,RocketMQ客户端仅操作一级Topic。 */ final String parentTopic = "XXXXX"; Producer producer = ONSFactory.createProducer(properties); producer.start(); ////////////////////////////////////////////////////////////////////////////////////////////////////////////////// /** * 初始化云消息队列 MQTT 版接收客户端,实际业务中云消息队列 MQTT 版一般部署在移动终端环境。 */ /** * 您在控制台创建的云消息队列 MQTT 版的实例ID。 */ String instanceId = "XXXXX"; /** * 设置接入点,进入云消息队列 MQTT 版控制台实例详情页面获取。 */ String endPoint = "XXXXXX.mqtt.aliyuncs.com"; /** * AccessKey ID,阿里云身份验证,在阿里云RAM控制台创建。 * 阿里云账号AccessKey拥有所有API的访问权限,建议您使用RAM用户进行API访问或日常运维。 * 强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。 * 本示例以将AccessKey 和 AccessKeySecret 保存在环境变量为例说明。 */ String accessKey = System.getenv("MQTT_AK_ENV"); /** * AccessKey Secret,阿里云身份验证,在阿里云RAM控制台创建。仅在签名鉴权模式下需要设置。 */ String secretKey = System.getenv("MQTT_SK_ENV"); /** * MQTT客户端ID,由业务系统分配,需要保证每个TCP连接都不一样,保证全局唯一,如果不同的客户端对象(TCP连接)使用了相同的clientId会导致连接异常断开。 * clientId由两部分组成,格式为GroupID@@@DeviceID,其中GroupID在云消息队列 MQTT 版控制台创建,DeviceID由业务方自己设置,clientId总长度不得超过64个字符。 */ String clientId = "GID_XXXX@@@XXXXX"; /** * 云消息队列 MQTT 版支持子级Topic,用来做自定义的过滤,此处为示例,可以填写任何字符串。 * 需要注意的是,完整的Topic长度不得超过128个字符。 */ final String subTopic = "/testMq4Iot"; final String mq4IotTopic = parentTopic + subTopic; /** * QoS参数代表传输质量,可选0,1,2。详细信息,请参见名词解释。 */ final int qosLevel = 0; ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId); final MemoryPersistence memoryPersistence = new MemoryPersistence(); /** * 客户端协议和端口。客户端使用的协议和端口必须匹配,如果是SSL加密则设置ssl://endpoint:8883。 */ final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence); /** * 设置客户端发送超时时间,防止无限阻塞。 */ mqttClient.setTimeToWait(5000); final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); mqttClient.setCallback(new MqttCallbackExtended() { @Override public void connectComplete(boolean reconnect, String serverURI) { /** * 客户端连接成功后就需要尽快订阅需要的Topic。 */ System.out.println("connect success"); executorService.submit(new Runnable() { @Override public void run() { try { final String topicFilter[] = {mq4IotTopic}; final int[] qos = {qosLevel}; mqttClient.subscribe(topicFilter, qos); } catch (MqttException e) { e.printStackTrace(); } } }); } @Override public void connectionLost(Throwable throwable) { throwable.printStackTrace(); } @Override public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { /** * 消费消息的回调接口,需要确保该接口不抛异常,该接口运行返回即代表消息消费成功。 * 消费消息需要保证在规定时间内完成,如果消费耗时超过服务端约定的超时时间,对于可靠传输的模式,服务端可能会重试推送,业务需要做好幂等去重处理。 */ System.out.println( "receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload())); } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]); } }); mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions()); for (int i = 0; i < 10; i++) { /** * 使用RocketMQ客户端发消息给MQTT客户端时,Topic指定为一级父Topic,Tag指定为MQ2MQTT。 */ Message msg = new Message(parentTopic, "MQ2MQTT", "hello mq send mqtt msg".getBytes()); /** * 使用RocketMQ客户端发消息给MQTT客户端时,可以通过MqttSecondTopic属性设置MQTT的子级Topic属性。 */ msg.putUserProperties(PropertyKeyConst.MqttSecondTopic, subTopic); SendResult result = producer.send(msg); System.out.println(result); /** * 发送P2P消息,设置子级Topic。 */ msg.putUserProperties(PropertyKeyConst.MqttSecondTopic, "/p2p/" + clientId); result = producer.send(msg); System.out.println(result); } Thread.sleep(Long.MAX_VALUE); } }
结果验证
完成消息收发后,您可在云消息队列 MQTT 版控制台查询轨迹以验证消息是否发送并接收成功。详细信息,请参见消息轨迹查询。