MQTT和RocketMQ进行数据互通(客户端上下线通知)
MQTT客户端的上下线事件将会触发MQTT服务端生成一条通知消息,云消息队列 MQTT 版支持将该条消息数据导出至其他阿里云产品,并使用MQTT的Java SDK实现MQTT客户端与后端应用收发消息。本文以当前仅支持的云消息队列 RocketMQ 版数据互通为例进行说明。
背景信息
云消息队列 MQTT 版支持云端SDK,云上应用可直接通过云端SDK接入云消息队列 MQTT 版服务端进行消息收发并获取客户端的上下线状态。更多信息,请参见获取MQTT客户端在线状态。
同时云消息队列 MQTT 版支持和其他云产品进行互通,当前支持的云产品有云消息队列 RocketMQ 版。如果您的云端应用需要使用云消息队列 RocketMQ 版产品的某些功能,例如顺序消息特性、事务消息特性等,您可以通过消息流入或流出规则将云消息队列 MQTT 版和云消息队列 RocketMQ 版数据进行流转。
本文以公网环境中的Java SDK为例说明云消息队列 MQTT 版如何将MQTT客户端上下线事件通知的消息发送至后端应用。
此场景下可使用多语言的第三方开源SDK来实现消息收发。更多信息,请参见SDK下载。
网络访问
- 公网接入点为本地公网环境访问的IP地址,一般用于物联网和移动互联网场景中;
- VPC 接入点为云上私网访问的IP地址,一般用于云端应用接入云消息队列 MQTT 版。
- 客户端不使用域名接入而是使用IP地址接入,产品方更新了域名解析导致原有IP地址失效。
- 客户端网络对IP地址设置网络防火墙策略,产品方更新了域名解析后新IP地址被您的防火墙策略拦截。
使用流程
MQTT客户端上下线通知的消息收发流程如下图所示。
前提条件
步骤一:创建MQTT实例并获取接入点
- 登录云消息队列 MQTT 版控制台,并在左侧导航栏单击实例列表。
- 在顶部菜单栏选择目标地域,然后在页面左上角单击创建实例。
- 在弹出的付费方式面板中,付费方式固定为包年包月,您无需设置,直接在面板左下角单击确定。
- 在弹出的实例规格面板中,按需选择您需要购买的实例规格,选中微消息队列 MQTT 版(包年包月)服务协议,然后单击立即购买。
- 在订单支付面板,根据提示完成支付。
- 在支付成功页面单击返回控制台。
- 回到云消息队列 MQTT 版控制台,在左侧导航栏单击实例列表,并将地域切换为您所购买的实例所对应的地域。
- 在实例列表页面中,单击您所购买实例的名称或在其操作列单击详情,进入实例详情页面。
- 在实例详情页面单击接入点页签,即可看到实例的接入点信息,本示例以公网接入点为例。
步骤二:创建父级Topic
MQTT协议支持多级Topic,父级Topic需在控制台创建,子级Topic无需创建,使用时直接在代码中设置即可。命名格式为:父级Topic和各子级Topic间均使用正斜线(/)隔开,<父级Topic名称>/<二级Topic名称>/<三级Topic名称>,例如,SendMessage/demo/producer。父级Topic和子级Topic总长度不能超过64个字符。Topic详细信息,请参见名词解释。
- 登录云消息队列 MQTT 版控制台,并在左侧导航栏单击实例列表。
- 在顶部菜单栏选择目标地域,然后在实例列表中单击实例名称进入实例详情页面。
- 在左侧导航栏单击Topic 管理,然后在页面左上角,单击创建 Topic。
- 在创建Topic面板中,输入要创建的Topic名称和描述,然后在左下角单击确定。
步骤三:创建Group ID
Group ID详细信息,请参见名词解释。
- 登录云消息队列 MQTT 版控制台,并在左侧导航栏单击实例列表。
- 在顶部菜单栏选择目标地域,然后在实例列表中单击实例名称进入实例详情页面。
- 在左侧导航栏单击Group 管理,然后在页面左上角单击创建 Group。
- 在创建Group面板中,输入Group ID,然后在左下角单击确定。
步骤四:创建客户端上下线通知规则
规则中填写的参数需与您创建的资源保持一致。
- 登录云消息队列 MQTT 版控制台,并在左侧导航栏单击实例列表。
- 在顶部菜单栏选择目标地域,然后在实例列表中单击实例名称进入实例详情页面。
- 在左侧导航栏单击规则管理,然后在页面左上角,单击创建规则。
- 在创建规则页面完成以下操作。
- 在配置基本信息配置向导页面,填写规则的基本信息,然后单击下一步。
参数 取值示例 说明 规则ID 111111 规则的全局唯一标识,说明如下: - 只能包含字母、数字、短划线(-)和下划线(_),至少包含一个字母或数字。
- 名称长度限制在3~64字符之间,长于64字符将被自动截取。
- 创建后无法更新。
描述 migrate from rocketmq 对规则的描述。 状态 启用 是否启用当前规则,取值说明如下: - 启用
- 停用
规则类型 上下线通知 创建的规则类型,取值说明如下: - 数据流出:用于将云消息队列 MQTT 版的数据导出至其他阿里云产品。详细信息,请参见跨云产品的数据流出。
- 数据流入:用于将其他阿里云产品的数据导入至云消息队列 MQTT 版。详细信息,请参见跨云产品数据流入。
- 上下线通知:用于将获取的云消息队列 MQTT 版客户端上下线事件数据导出至其他阿里云产品。详细信息,请参见MQTT客户端上下线事件数据流出。
- 在配置规则源配置向导页面,配置数据源,然后单击下一步。
参数 取值示例 说明 Group ID GID_Client_Status 指定需导出数据的设备组。Group ID的详细信息,请参见名词解释。 - 在配置规则目标配置向导页面,配置数据的流转目标,然后单击创建。
参数 取值示例 说明 目标服务类型 消息队列 RocketMQ 版 指定您需将云消息队列 MQTT 版客户端上下线通知流转至哪个目标云产品。 说明 当前仅支持云消息队列 RocketMQ 版。RocketMQ 实例 MQ_INST_13801563067*****_BbyOD2jQ 指定目标云产品的实例ID,即云消息队列 RocketMQ 版的实例ID。 说明 仅支持选择和云消息队列 MQTT 版实例为同一地域的云产品实例。Topic TopicB 指定目标云产品的资源键值,即云消息队列 RocketMQ 版的Topic。云消息队列 MQTT 版客户端上下线通知信息将流转至TopicB。
您可以在规则管理的规则列表查看到刚创建的上下线通知规则。 - 在配置基本信息配置向导页面,填写规则的基本信息,然后单击下一步。
步骤五:调用Java SDK收发消息
- 下载第三方的开源Java SDK。下载地址为Eclipse Paho Java Client。
- 下载阿里云云消息队列 MQTT 版的Java SDK的Demo示例作为您代码开发的参考。下载地址为mqtt-java-demo。
- 解压该Demo工程包至您指定的文件夹。
在IntelliJ IDEA中,导入解压后的文件以创建相应的工程,并确认pom.xml中已包含以下依赖。
<dependencies> <dependency> <groupId>commons-codec</groupId> <artifactId>commons-codec</artifactId> <version>1.10</version> </dependency> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.2</version> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.5.2</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.83</version> </dependency> <dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>ons-client</artifactId> <version>1.8.5.Final</version> </dependency> <dependency> <groupId>com.aliyun</groupId> <artifactId>aliyun-java-sdk-onsmqtt</artifactId> <version>1.0.3</version> </dependency> <dependency> <groupId>com.aliyun</groupId> <artifactId>aliyun-java-sdk-core</artifactId> <version>4.5.0</version> </dependency> </dependencies>
说明 ons-client的版本信息,请参见版本说明。在MQTTClientStatusNoticeProcessDemo.java类中,按代码注释说明填写相应参数,主要涉及步骤一至步骤三所创建MQTT资源以及您在云消息队列 RocketMQ 版创建的相应资源,然后执行Main函数运行代码实现消息收发。
示例代码如下。
说明 在使用示例代码前,需要配置环境变量,通过环境变量读取访问凭证。关于配置环境变量的方法,请参见配置访问凭证。云消息队列 MQTT 版的AccessKey ID和AccessKey Secret的环境变量名称分别为MQTT_AK_ENV和MQTT_SK_ENV。
package com.aliyun.openservices.lmq.example.demo; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.aliyun.openservices.ons.api.Action; import com.aliyun.openservices.ons.api.ConsumeContext; import com.aliyun.openservices.ons.api.Consumer; import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.MessageListener; import com.aliyun.openservices.ons.api.ONSFactory; import com.aliyun.openservices.ons.api.PropertyKeyConst; import java.util.Map; import java.util.Properties; import java.util.Set; public class MQTTClientStatusNoticeProcessDemo { public static void main(String[] args) { /** * 初始化消息队列RocketMQ版接收客户端,实际业务中一般部署在服务端应用中。 */ Properties properties = new Properties(); /** * 设置消息队列RocketMQ版Group ID,在消息队列RocketMQ版控制台创建。 */ properties.setProperty(PropertyKeyConst.GROUP_ID, "GID_XXXX"); /** * AccessKey ID,阿里云身份验证,在阿里云RAM控制台创建。 * 阿里云账号AccessKey拥有所有API的访问权限,建议您使用RAM用户进行API访问或日常运维。 * 强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。 * 本示例以将AccessKey 和 AccessKeySecret 保存在环境变量为例说明。 */ properties.put(PropertyKeyConst.AccessKey, System.getenv("MQTT_AK_ENV")); /** * AccessKey Secret,阿里云身份验证,在阿里云RAM控制台创建。仅在签名鉴权模式下需要设置。 */ properties.put(PropertyKeyConst.SecretKey, System.getenv("MQTT_SK_ENV")); /** * 设置TCP接入点,该接入点为消息队列RocketMQ版实例的接入点。进入消息队列RocketMQ版控制台实例详情页面获取。 */ properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXXX"); /** * 使用消息队列RocketMQ版消费端来处理MQTT客户端的上下线通知时,订阅的Topic为上下线通知Topic。 */ final String parentTopic = "GID_XXXX_MQTT"; /** * 客户端状态数据,实际生产环境中建议使用数据库或者Redis等外部持久化存储来保存该信息,避免应用重启丢失状态,本示例以单机内存版实现为例。 */ MqttClientStatusStore mqttClientStatusStore = new MemoryHashMapStoreImpl(); Consumer consumer = ONSFactory.createConsumer(properties); /** * 此处仅处理客户端是否在线,因此只需要关注connect事件和tcpclean事件即可。 */ consumer.subscribe(parentTopic, "connect||tcpclean", new MqttClientStatusNoticeListener(mqttClientStatusStore)); consumer.start(); String clientId = "GID_XXXXX@@@XXXXX"; while (true) { System.out.println("ClientStatus :" + checkClientOnline(clientId, mqttClientStatusStore)); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } /** * 处理上下线通知的逻辑。 * 实际部署过程中,消费上下线通知的应用可能部署多台机器,因此客户端在线状态的数据可以使用数据库或者Redis等外部共享存储来维护。 * 其次需要单独做消息幂等处理,以免重复接收消息导致状态机判断错误。 */ static class MqttClientStatusNoticeListener implements MessageListener { private MqttClientStatusStore mqttClientStatusStore; public MqttClientStatusNoticeListener( MqttClientStatusStore mqttClientStatusStore) { this.mqttClientStatusStore = mqttClientStatusStore; } @Override public Action consume(Message message, ConsumeContext context) { try { JSONObject msgBody = JSON.parseObject(new String(message.getBody())); System.out.println(msgBody); String eventType = msgBody.getString("eventType"); String clientId = msgBody.getString("clientId"); String channelId = msgBody.getString("channelId"); ClientStatusEvent event = new ClientStatusEvent(); event.setChannelId(channelId); event.setClientIp(msgBody.getString("clientIp")); event.setEventType(eventType); event.setTime(msgBody.getLong("time")); /** * 首先存储新的事件。 */ mqttClientStatusStore.addEvent(clientId, channelId, eventType, event); /** * 读取当前channel的事件列表。 */ Set<ClientStatusEvent> events = mqttClientStatusStore.getEvent(clientId, channelId); if (events == null || events.isEmpty()) { return Action.CommitMessage; } /** * 如果事件列表里上线和下线事件都已经收到,则当前channel已经掉线,可以清理掉这个channel的数据。 */ boolean findOnlineEvent = false; boolean findOfflineEvent = false; for (ClientStatusEvent clientStatusEvent : events) { if (clientStatusEvent.isOnlineEvent()) { findOnlineEvent = true; } else { findOfflineEvent = true; } } if (findOnlineEvent && findOfflineEvent) { mqttClientStatusStore.deleteEvent(clientId, channelId); } return Action.CommitMessage; } catch (Throwable e) { e.printStackTrace(); } return Action.ReconsumeLater; } } /** * 根据状态表判断一个clientId是否有活跃的TCP连接。 * 1.如果没有channel表,则客户端一定不在线。 * 2.如果channel表非空,检查一下channel数据中是否仅包含上线事件,如果有则代表有活跃连接,客户端在线。 * 如果全部的channel都有掉线断开事件则客户端一定不在线。 * * @param clientId * @param mqttClientStatusStore * @return */ public static boolean checkClientOnline(String clientId, MqttClientStatusStore mqttClientStatusStore) { Map<String, Set<ClientStatusEvent>> channelMap = mqttClientStatusStore.getEventsByClientId(clientId); if (channelMap == null) { return false; } for (Set<ClientStatusEvent> events : channelMap.values()) { boolean findOnlineEvent = false; boolean findOfflineEvent = false; for (ClientStatusEvent event : events) { if (event.isOnlineEvent()) { findOnlineEvent = true; } else { findOfflineEvent = true; } } if (findOnlineEvent & !findOfflineEvent) { return true; } } return false; } }
结果验证
完成消息收发后,您可在云消息队列 MQTT 版控制台查询轨迹以验证消息是否发送并接收成功。详细信息,请参见消息轨迹查询。