MQTT数据流出规则的实现
如果您的云端应用需要使用云消息队列 RocketMQ 版产品的某些功能,例如顺序消息特性、事务消息特性等,您可以通过消息流入或流出规则将云消息队列 MQTT 版和云消息队列 RocketMQ 版数据进行流转。本文介绍如何将云消息队列 MQTT 版的数据导出至其他阿里云产品。
背景信息
云消息队列 MQTT 版支持云端SDK,云上应用可直接通过云端SDK接入云消息队列 MQTT 版服务端进行消息收发。云端SDK使用,请参见云端开发概述。
同时云消息队列 MQTT 版支持和其他云产品进行互通,当前支持的云产品有云消息队列 RocketMQ 版。
本文以公网环境中的Java SDK为例说明如何将云消息队列 MQTT 版的数据导出至云消息队列 RocketMQ 版。
此场景下可使用多语言的第三方开源SDK来实现消息收发。更多信息,请参见SDK下载。
网络访问
- 公网接入点为本地公网环境访问的IP地址,一般用于物联网和移动互联网场景中;
- VPC 接入点为云上私网访问的IP地址,一般用于云端应用接入云消息队列 MQTT 版。
- 客户端不使用域名接入而是使用IP地址接入,产品方更新了域名解析导致原有IP地址失效。
- 客户端网络对IP地址设置网络防火墙策略,产品方更新了域名解析后新IP地址被您的防火墙策略拦截。
前提条件
云消息队列 MQTT 版数据流出规则仅支持云消息队列 RocketMQ 版4.x系列实例。
云消息队列 MQTT 版数据流出规则不能跨地域使用,因此,云消息队列 MQTT 版和云消息队列 RocketMQ 版的资源都必须创建在同一地域。
1.创建数据流出规则
登录云消息队列 MQTT 版控制台,并在左侧导航栏单击实例列表。
在顶部菜单栏选择目标地域,然后在实例列表中单击实例名称进入实例详情页面。
在左侧导航栏单击规则管理,然后在页面左上角,单击创建规则。
在创建规则页面完成以下操作。
配置基本信息。输入规则ID,选择数据流出的规则类型。
配置规则源。选择已经创建好的云消息队列 MQTT 版的Topic。
配置规则目标。选择已经创建好的云消息队列 RocketMQ 版的实例和Topic。
2.准备测试代码
2.1下载示例代码
下载mqtt-java-demo,并解压该Demo工程包至您指定的文件夹。
在解压的Demo工程中找到lmq-java-demo文件夹,将此文件夹导入IntelliJ IDEA,并确认pom.xml中已包含以下依赖。
<dependencies> <dependency> <groupId>org.bouncycastle</groupId> <artifactId>bcprov-jdk15on</artifactId> <version>1.70</version> </dependency> <dependency> <groupId>commons-codec</groupId> <artifactId>commons-codec</artifactId> <version>1.10</version> </dependency> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.2</version> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.5.2</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.83</version> </dependency> <dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>ons-client</artifactId> <version>1.8.5.Final</version> </dependency> <dependency> <groupId>com.aliyun</groupId> <artifactId>aliyun-java-sdk-onsmqtt</artifactId> <version>1.0.3</version> </dependency> <dependency> <groupId>com.aliyun</groupId> <artifactId>aliyun-java-sdk-core</artifactId> <version>4.5.0</version> </dependency> </dependencies>
配置访问凭证。
获取AccessKey信息。获取方式,请参见创建AccessKey。
配置环境变量。云消息队列 MQTT 版的AccessKey ID和AccessKey Secret的环境变量名称分别为MQTT_AK_ENV和MQTT_SK_ENV。关于配置环境变量的方法,请参见配置访问凭证。
2.2收发消息代码
在MQ4IoTSendMessageToRocketMQ.java
类中包含了发送MQTT消息和使用RocketMQ消费的代码,按代码注释说明填写云消息队列 MQTT 版和云消息队列 RocketMQ 版资源的参数。示例代码如下。
3.结果验证
执行MQ4IoTSendMessageToRocketMQ.java
类中的Main函数运行代码。可以根据下面操作验证消息的发送和消费情况。
代码验证
如下图所示,MQTT消息已经成功发送,RocketMQ客户端也已经成功消费。
控制台验证
查询消息发送情况。在云消息队列 MQTT 版控制台消息轨迹查询页面,根据Group ID和Device ID查询消息已经成功发送,如下图所示。
查询消息消费情况。在云消息队列 RocketMQ 版控制台消息查询页面,根据Topic查询到消息已经被转发到云消息队列 RocketMQ 版,如下图所示。
单击操作列中的消息轨迹查看,消息已经被消费,如下图所示。
更多信息
- 本页导读 (1)
- 背景信息
- 网络访问
- 前提条件
- 1.创建数据流出规则
- 2.准备测试代码
- 2.1下载示例代码
- 2.2收发消息代码
- 3.结果验证
- 代码验证
- 控制台验证
- 更多信息