背景信息
在实际业务中服务端需要对客户端的上下线数据进行统计分析,并根据客户端的在线状态推送消息。云消息队列 MQTT 版提供异步上下线事件通知获取客户端在线状态,MQTT客户端的上下线事件将会触发MQTT服务端生成一条通知消息,获取通知消息有以下方式:
本文以创建上下线通知规则为例,介绍后端应用如何获取客户端在线状态。
网络访问
云消息队列 MQTT 版同时提供了
公网接入点和
VPC 接入点。
- 公网接入点为本地公网环境访问的IP地址,一般用于物联网和移动互联网场景中;
- VPC 接入点为云上私网访问的IP地址,一般用于云端应用接入云消息队列 MQTT 版。
重要 客户端使用接入点连接服务时务必使用域名接入,不得直接使用域名背后的IP地址直接连接,因为IP地址随时会变化。在以下使用情况中出现的问题
云消息队列 MQTT 版产品方概不负责:
- 客户端不使用域名接入而是使用IP地址接入,产品方更新了域名解析导致原有IP地址失效。
- 客户端网络对IP地址设置网络防火墙策略,产品方更新了域名解析后新IP地址被您的防火墙策略拦截。
1.创建上下线通知规则
登录云消息队列 MQTT 版控制台,并在左侧导航栏单击实例列表。
在顶部菜单栏选择目标地域,然后在实例列表中单击实例名称进入实例详情页面。
在左侧导航栏单击规则管理,然后在页面左上角,单击创建规则。
在创建规则页面完成以下操作。
配置基本信息。输入规则ID,选择上下线通知的规则类型。
配置规则源。选择已经创建好的云消息队列 MQTT 版的Group ID。
配置规则目标。选择已经创建好的云消息队列 RocketMQ 版的实例和Topic。
2.准备测试代码
客户端的上下线操作,上下线通知的消息处理都需要代码完成,本文以Java的Demo示例作为您代码开发的参考。
2.1下载示例代码
下载mqtt-java-demo,并解压该Demo工程包至您指定的文件夹。
在解压的Demo工程中找到lmq-java-demo文件夹,将此文件夹导入IntelliJ IDEA,并确认pom.xml中已包含以下依赖。
<dependencies>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk15on</artifactId>
<version>1.70</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.10</version>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.2</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-client</artifactId>
<version>1.8.5.Final</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-onsmqtt</artifactId>
<version>1.0.3</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-core</artifactId>
<version>4.5.0</version>
</dependency>
</dependencies>
配置访问凭证。
2.2客户端上下线代码
在MQ4IoTSendMessageToMQ4IoTUseSignatureMode.java
类中,按代码注释说明填写云消息队列 MQTT 版资源的参数。
测试中仅模拟上下线的操作不需要发送消息,可以将其中发送消息的相关代码去掉,示例代码如下。
客户端上下线代码示例
import com.aliyun.openservices.lmq.example.util.ConnectionOptionWrapper;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class MQ4IoTSendMessageToMQ4IoTUseSignatureMode {
public static void main(String[] args) throws Exception {
/**
* MQ4IOT 实例 ID,购买后控制台获取
*/
String instanceId = "XXXXX";
/**
* 接入点地址,购买 MQ4IOT 实例,且配置完成后即可获取,接入点地址必须填写分配的域名,不得使用 IP 地址直接连接,否则可能会导致客户端异常。
*/
String endPoint = "XXXXX.mqtt.aliyuncs.com";
/**
* 账号 AccessKey,从账号系统控制台获取
* 阿里云账号AccessKey拥有所有API的访问权限,建议您使用RAM用户进行API访问或日常运维。
* 强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。
* 本示例以把AccessKey ID和AccessKey Secret保存在环境变量为例说明。运行本代码示例之前,请先配置环境变量MQTT_AK_ENV和MQTT_SK_ENV
* 例如:export MQTT_AK_ENV=<access_key_id>
* export MQTT_SK_ENV=<access_key_secret>
* 需要将<access_key_id>替换为已准备好的AccessKey ID,<access_key_secret>替换为AccessKey Secret。
*/
String accessKey = System.getenv("MQTT_AK_ENV");
/**
* 账号 secretKey,从账号系统控制台获取,仅在Signature鉴权模式下需要设置
*/
String secretKey = System.getenv("MQTT_SK_ENV");
/**
* MQ4IOT clientId,由业务系统分配,需要保证每个 tcp 连接都不一样,保证全局唯一,如果不同的客户端对象(tcp 连接)使用了相同的 clientId 会导致连接异常断开。
* clientId 由两部分组成,格式为 GroupID@@@DeviceId,其中 groupId 在 MQ4IOT 控制台申请,DeviceId 由业务方自己设置,clientId 总长度不得超过64个字符。
*/
String clientId = "GID_XXXXX@@@XXXXX";
ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
final MemoryPersistence memoryPersistence = new MemoryPersistence();
/**
* 客户端使用的协议和端口必须匹配,具体参考文档 https://help.aliyun.com/document_detail/44866.html?spm=a2c4g.11186623.6.552.25302386RcuYFB
* 如果是 SSL 加密则设置ssl://endpoint:8883
*/
final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
/**
* 客户端设置好发送超时时间,防止无限阻塞
*/
mqttClient.setTimeToWait(5000);
final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
mqttClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
System.out.println("connect success");
}
@Override
public void connectionLost(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
System.out.println("receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);
}
});
mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
Thread.sleep(Long.MAX_VALUE);
}
}
2.3上下线通知消息的处理代码
上下线通知消息被推送到云消息队列 RocketMQ 版,消费者订阅消息后需要根据业务需求进行处理。
在MQTTClientStatusNoticeProcessDemo.java
类中按代码注释说明填写云消息队列 RocketMQ 版资源的参数,示例代码如下。
上下线通知消息的处理代码示例
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
public class MQTTClientStatusNoticeProcessDemo {
public static void main(String[] args) {
/**
* 初始化消息队列RocketMQ版接收客户端,实际业务中一般部署在服务端应用中。
*/
Properties properties = new Properties();
/**
* 设置消息队列RocketMQ版Group ID,在消息队列RocketMQ版控制台创建。
*/
properties.setProperty(PropertyKeyConst.GROUP_ID, "GID_XXXX");
/**
* AccessKey ID,阿里云身份验证,在阿里云RAM控制台创建。
* 阿里云账号AccessKey拥有所有API的访问权限,建议您使用RAM用户进行API访问或日常运维。
* 强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。
* 本示例以将AccessKey 和 AccessKeySecret 保存在环境变量为例说明。
*/
properties.put(PropertyKeyConst.AccessKey, System.getenv("MQTT_AK_ENV"));
/**
* AccessKey Secret,阿里云身份验证,在阿里云RAM控制台创建。仅在签名鉴权模式下需要设置。
*/
properties.put(PropertyKeyConst.SecretKey, System.getenv("MQTT_SK_ENV"));
/**
* 设置TCP接入点,该接入点为消息队列RocketMQ版实例的接入点。进入消息队列RocketMQ版控制台实例详情页面获取。
*/
properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXXX");
/**
* 使用消息队列RocketMQ版消费端来处理MQTT客户端的上下线通知时,订阅的Topic为上下线通知Topic。
*/
final String parentTopic = "GID_XXXX_MQTT";
/**
* 客户端状态数据,实际生产环境中建议使用数据库或者Redis等外部持久化存储来保存该信息,避免应用重启丢失状态,本示例以单机内存版实现为例。
*/
MqttClientStatusStore mqttClientStatusStore = new MemoryHashMapStoreImpl();
Consumer consumer = ONSFactory.createConsumer(properties);
/**
* 此处仅处理客户端是否在线,因此只需要关注connect事件和tcpclean事件即可。
*/
consumer.subscribe(parentTopic, "connect||tcpclean", new MqttClientStatusNoticeListener(mqttClientStatusStore));
consumer.start();
String clientId = "GID_XXXXX@@@XXXXX";
while (true) {
System.out.println("ClientStatus :" + checkClientOnline(clientId, mqttClientStatusStore));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 处理上下线通知的逻辑。
* 实际部署过程中,消费上下线通知的应用可能部署多台机器,因此客户端在线状态的数据可以使用数据库或者Redis等外部共享存储来维护。
* 其次需要单独做消息幂等处理,以免重复接收消息导致状态机判断错误。
*/
static class MqttClientStatusNoticeListener implements MessageListener {
private MqttClientStatusStore mqttClientStatusStore;
public MqttClientStatusNoticeListener(
MqttClientStatusStore mqttClientStatusStore) {
this.mqttClientStatusStore = mqttClientStatusStore;
}
@Override
public Action consume(Message message, ConsumeContext context) {
try {
JSONObject msgBody = JSON.parseObject(new String(message.getBody()));
System.out.println(msgBody);
String eventType = msgBody.getString("eventType");
String clientId = msgBody.getString("clientId");
String channelId = msgBody.getString("channelId");
ClientStatusEvent event = new ClientStatusEvent();
event.setChannelId(channelId);
event.setClientIp(msgBody.getString("clientIp"));
event.setEventType(eventType);
event.setTime(msgBody.getLong("time"));
/**
* 首先存储新的事件。
*/
mqttClientStatusStore.addEvent(clientId, channelId, eventType, event);
/**
* 读取当前channel的事件列表。
*/
Set<ClientStatusEvent> events = mqttClientStatusStore.getEvent(clientId, channelId);
if (events == null || events.isEmpty()) {
return Action.CommitMessage;
}
/**
* 如果事件列表里上线和下线事件都已经收到,则当前channel已经掉线,可以清理掉这个channel的数据。
*/
boolean findOnlineEvent = false;
boolean findOfflineEvent = false;
for (ClientStatusEvent clientStatusEvent : events) {
if (clientStatusEvent.isOnlineEvent()) {
findOnlineEvent = true;
} else {
findOfflineEvent = true;
}
}
if (findOnlineEvent && findOfflineEvent) {
mqttClientStatusStore.deleteEvent(clientId, channelId);
}
return Action.CommitMessage;
} catch (Throwable e) {
e.printStackTrace();
}
return Action.ReconsumeLater;
}
}
/**
* 根据状态表判断一个clientId是否有活跃的TCP连接。
* 1.如果没有channel表,则客户端一定不在线。
* 2.如果channel表非空,检查一下channel数据中是否仅包含上线事件,如果有则代表有活跃连接,客户端在线。
* 如果全部的channel都有掉线断开事件则客户端一定不在线。
*
* @param clientId
* @param mqttClientStatusStore
* @return
*/
public static boolean checkClientOnline(String clientId,
MqttClientStatusStore mqttClientStatusStore) {
Map<String, Set<ClientStatusEvent>> channelMap = mqttClientStatusStore.getEventsByClientId(clientId);
if (channelMap == null) {
return false;
}
for (Set<ClientStatusEvent> events : channelMap.values()) {
boolean findOnlineEvent = false;
boolean findOfflineEvent = false;
for (ClientStatusEvent event : events) {
if (event.isOnlineEvent()) {
findOnlineEvent = true;
} else {
findOfflineEvent = true;
}
}
if (findOnlineEvent & !findOfflineEvent) {
return true;
}
}
return false;
}
}
3.结果验证
执行MQ4IoTSendMessageToMQ4IoTUseSignatureMode.java
类中的Main函数运行代码模拟客户端上线操作。可以根据下面操作查询客户端状态和消息推送情况。
说明 可以通过终止Main函数的执行来模拟客户端的离线操作。
执行MQTTClientStatusNoticeProcessDemo.java
类中的Main函数运行代码。此时已经收到上线的事件消息,ClientStatus
状态也从false
变成true
,如下图所示。