本次最佳实践通过结合Java代码的形式对MQTT的使用和场景化做出示例。
背景信息
最佳实践概述
本次最佳实践通过结合Java代码的形式使用MQTT,详细描述从构建Maven工程到向MQTT发送消息的过程,因本次最佳实践使用阿里云中间件产品MQTT,所以只需在阿里云提供的MQTT控制台创建MQTT服务器即可,无需下载MQTT安装包安装MQTT服务器,实践中描述了使用MQTT需要在构建Maven应用中导入的依赖包及获取与MQTT连接需要配置的参数值,获取与MQTT连接的两种模式。本文还详细描述了MQTT消息发送者同步发送消息的示例代码、消息存储到文件、指导消费者多线程消费消息及打印消费消息结果日志。
应用场景中描述了在阿里云环境下创建MQTT服务器及控制台为MQTT提供的功能使用详述,最重要的优势是可以与RocketMQ结合使用。并且简单描述了加油站场景下RocketMQ与MQTT配合使用的设计原理及原理对应下的Java代码实现。
最佳实践价值
MQTT原本是为解决高并发的场景设计的产品,但MQTT协议却是为大量计算能力有限,且工作在低带宽网络中的远程传感器和控制设备提供通讯服务,因这种原因使其在中小企业中运用的变得极其有价值,同时阿里云将MQTT配合RocketMQ一起使用,使MQTT使用场景变得更广泛。本次最佳实践结合将Java代码来使用MQTT,以供您参考。
MQTT协议原理
实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。MQTT传输的消息分为主题(Topic)和负载(payload)两部分:
- Topic:可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload)。
- Payload:可以理解为消息的内容,是指订阅者具体要使用的内容。
微消息队列MQTT收发消息流程如下图所示:
生产者发送消息代码详述
您可以参照以下步骤对代码进行改造:
- 同步发送原理 。
同步发送是指消息发送方发出一条消息后,会在收到服务端返回响应之后才发下一条消息的通讯方式。
- 创建一个Maven工程,然后在工程pom.xml文件导入下面依赖包。
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.10</version>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.2</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
- 连接MQTT的参数值。
// MQ4IOT 实例 ID,购买后控制台获取
String
instanceId = "post-cn-0pp1******";
//接入点地址,购买 MQ4IOT 实例,且配置完成后即可获取,接入点地址必须填写分配的域名,不得使用 IP 地址直接连接,否则可能会导致客户端异常。
String endPoint = "post-cn-********-internal.mqtt.aliyuncs.com";
//账号 accesskey,从账号系统控制台获取
String accessKey = "********";
//账号 secretKey,从账号系统控制台获取,仅在Signature鉴权模式下需要设置
String secretKey = "********";
// MQ4IOT clientId,由业务系统分配,需要保证每个 tcp 连接都不一样,保证全局唯一,如果不同的客户端对象(tcp 连接)使用了相同的 clientId 会导致连接异常断开。
String clientId = "GID_pdsa_lhh_mqtt";
- Signature鉴权模式下,与MQTT建立连接代码。
public ConnectionOptionWrapper(String instanceId, String accessKey, String secretKey,
String clientId) throws NoSuchAlgorithmException, InvalidKeyException {
this.instanceId = instanceId;
this.accessKey = accessKey;
this.secretKey = secretKey;
this.clientId = clientId;
mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setUserName("Signature|" + accessKey + "|" + instanceId);
mqttConnectOptions.setPassword(Tools.macSignature(clientId, secretKey).toCharArray());
mqttConnectOptions.setCleanSession(true);
mqttConnectOptions.setKeepAliveInterval(90);
mqttConnectOptions.setAutomaticReconnect(true);
mqttConnectOptions.setMqttVersion(MQTT_VERSION_3_1_1);
mqttConnectOptions.setConnectionTimeout(5000);
}
Signature鉴权模式下,计算签名代码如下:
public static String macSignature(String text,
String secretKey) throws InvalidKeyException, NoSuchAlgorithmException {
Charset charset = Charset.forName("UTF-8");
String algorithm = "HmacSHA1";
Mac mac = Mac.getInstance(algorithm);
mac.init(new SecretKeySpec(secretKey.getBytes(charset), algorithm));
byte[] bytes = mac.doFinal(text.getBytes(charset));
return new String(Base64.encodeBase64(bytes), charset);
}
Token鉴权模式下,与MQTT建立连接:
public ConnectionOptionWrapper(String instanceId, String accessKey, String clientId,
Map<String, String> tokenData) {
this.instanceId = instanceId;
this.accessKey = accessKey;
this.clientId = clientId;
if (tokenData != null) {
this.tokenData.putAll(tokenData);
}
mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setUserName("Token|" + accessKey + "|" + instanceId);
StringBuilder builder = new StringBuilder();
for (Map.Entry<String, String> entry : tokenData.entrySet()) {
builder.append(entry.getKey()).append("|").append(entry.getValue()).append("|");
}
if (builder.length() > 0) {
builder.setLength(builder.length() - 1);
}
mqttConnectOptions.setPassword(builder.toString().toCharArray());
mqttConnectOptions.setCleanSession(true);
mqttConnectOptions.setKeepAliveInterval(90);
mqttConnectOptions.setAutomaticReconnect(true);
mqttConnectOptions.setMqttVersion(MQTT_VERSION_3_1_1);
mqttConnectOptions.setConnectionTimeout(5000);
}
- MQTT同步原理发送示例代码。
//MQ4IOT 消息的一级 topic,需要在控制台申请才能使用。
final String parentTopic = "pdsa_lhh_mqtt";
//MQ4IOT支持子级 topic,用来做自定义的过滤
final String mq4IotTopic = parentTopic + "/" + "testMq4Iot";
//QoS参数代表传输质量,可选0,1,2,根据实际需求合理设置
final int qosLevel = 0;
ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey,
secretKey, clientId);
final MemoryPersistence memoryPersistence = new MemoryPersistence();
//客户端使用的协议和端口必须匹配
final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
//客户端设置好发送超时时间,防止无限阻塞
mqttClient.setTimeToWait(5000);
mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
for (int i = 0; i < 100; i++) {
MqttMessage message = new MqttMessage("hello mq4Iot pub sub msg".getBytes());
message.setQos(qosLevel);
//发送普通消息时,topic 必须和接收方订阅的 topic 一致,或者符合通配符匹配规则
mqttClient.publish(mq4IotTopic, message);
}
消费者订阅消息代码详述
您可以参照以下步骤对代码进行改造:
- 在构建好的Maven工程中,在pom.xml文件导入以下依赖包。
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.10</version>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.2</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.48</version>
</dependency>
- 消息者订阅消息示例代码。
final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
mqttClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
//客户端连接成功后就需要尽快订阅需要的 topic
System.out.println("connect success");
executorService.submit(new Runnable() {
@Override
public void run() {
try {
final String topicFilter[] = {mq4IotTopic};
final int[] qos = {qosLevel};
mqttClient.subscribe(topicFilter, qos);
} catch (MqttException e) {
e.printStackTrace();
}
}
});
}
});
- 消息订阅结果处理。
@Override
public void
connectionLost(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
//消费消息的回调接口,需要确保该接口不抛异常,该接口运行返回即代表消息消费成功。消费消息需要保证在规定时间内完成,如果消费耗时超过服务端约定的超时时间,对于可靠传输的模式,服务端可能会重试推送,业务需要做好幂等去重处理。
System.out.println(
"receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println("send msg
succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);
}
创建MQTT实例
您可以按照以下步骤创建MQTT实例。
说明
- 您必须已经开通母产品消息队列 MQ 的服务。
- 您必须已经拥有了阿里云 AccessKey(AK)。
- 登录MQTT控制台。
- 在左侧导航栏单击概览。
- 在实例列表页面,单击新建实例,创建完MQTT实例详情如下图所示。
- 在左侧导航栏单击消息存储。
- 在Topic管理页面,选择您刚创建的微消息队列 MQTT 版实例,单击新建Topic。
- 在创建Topic对话框,输入Topic名称、选择该Topic用于存储和收发的消息类型、输入备注信息,然后单击确认。
- 在左侧导航栏,单击Group管理。
- 在Group管理页面,选择您刚创建的微消息队列 MQTT 版实例,单击新建Group ID。
- 在创建Group ID对话框中,输入需要创建的Group ID,然后单击确定。
连接查询
如需了解微消息队列MQTT版客户端的连接情况,您可以指定Topic、Group ID 以及 Client ID查询客户端的当前在线数、在指定时间段内的在线数以及连接信息和订阅关系。
- 登录微消息队列MQTT版控制台。
- 在顶部菜单栏选择要查询的地域。
- 在左侧导航栏,单击连接查询。
- 在连接查询页面,单击目标实例,单击按Topic查询页签。
- 在父级Topic文本框输入要查询的父级Topic,在子级Topic文本框输入要查询的子级Topic,单击查询。
消息查询
您可以按以下步骤在微消息队列MQTT版的控制台查询所有Topic的收发消息的数量和TPS。
说明
此查询支持多维度筛选,以及自定义时间范围查询。
您可以通过以下任一控制台路径进入查询页面:
- 在控制台左侧导航栏,选择资源报表。
- 在控制台左侧导航栏,选择消息存储。然后在需要查询的 Topic 的操作列单击资源报表。
- 在控制台左侧导航栏,选择资源报表。
- 在资源报表页面,根据需要查看的信息类型选择相应页签。
- 生产者:查看某Topic在一个时间段内从消息生产者处接收到的消息的总量或者TPS。
- 消费者:查看某Topic在一个时间段内投递给消费者的消息的总量或TPS。
- 输入查询的筛选的条件,包括父级 Topic、子级 Topic、QoS 类型、消息类型和时间范围。
- 单击查询。
在消息生产总量、消息消费总量、消息生产TPS和消息消费TPS的图标页面右上方单击详情,还可以快捷查看最近3小时、最近12小时、最近1天的相关统计数据,或者重新自定义时间范围。
生产者消息生产总量、消息生产TPS查询结果及查询时间范围内最大值、平均值、最小值统计如下:
签名校验
微消息队列 MQTT 版控制台提供了签名计算工具,您可以按以下步骤比对验证自己的签名计算是否正确。
说明 该功能主要用于签名验证,验证过程将在本地浏览器完成,无网络传输,不与服务端交互。
- 登录微消息队列MQTT版控制台。
- 在左侧导航栏中,单击签名校验。
- 在签名校验页面,输入AccessKeyId、AccessKeySecret 以及 Client ID。
- 单击计算签名。
MQTT应用场景
以下内容对MQTT应用于加油站场景进行原理图和代码的示例。
加油站上报下行数据的架构,MQTT结合MQ使用如下图所示:
消息队列RocketMQ客户端发消息到MQTT客户端,然后MQTT订阅完整的消息,上述设计原理对应的示例代码如下:
public void rocketMQToMQTT() throws Exception {
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.GROUP_ID, "GID-XXXXX");
properties.put(PropertyKeyConst.AccessKey, "XXXX");
properties.put(PropertyKeyConst.SecretKey, "XXXX");
properties.put(PropertyKeyConst.NAMESRV_ADDR, "http://xxxxx.cn-******.mq-internet.aliyuncs.com");
final String parentTopic = "XXXXX";
Properties initProperties = new Properties();
Producer producer = ONSFactory.createProducer(initProperties);
producer.start();
String instanceId = "XXXXX";
String endPoint = "XXXXXX.mqtt.aliyuncs.com";
String accessKey = "XXXX";
String secretKey = "XXXX";
String clientId = "GID_XXXX@@@XXXXX";
//MQ4IOT支持子级 topic,用来做自定义的过滤
final String subTopic = "/testMq4Iot";
final String mq4IotTopic = parentTopic + subTopic;
//QoS参数代表传输质量,可选0,1,2
final int qosLevel = 0;
ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
final MemoryPersistence memoryPersistence = new MemoryPersistence();
final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
//客户端设置好发送超时时间,防止无限阻塞
mqttClient.setTimeToWait(5000);
final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
mqttClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
System.out.println("connect success");
executorService.submit(new Runnable() {
@Override
public void run() {
try {
final String topicFilter[] = {mq4IotTopic};
final int[] qos = {qosLevel};
mqttClient.subscribe(topicFilter, qos);
} catch (MqttException e) {
e.printStackTrace();
}
}
});
}
@Override
public void connectionLost(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
System.out.println(
"receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);
}
});
mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
for (int i = 0; i < 10; i++) {
Message msg = new Message(parentTopic, "MQ2MQTT", "hello mq send mqtt msg".getBytes());
msg.putUserProperties(PropertyKeyConst.MqttSecondTopic, subTopic);
SendResult result = producer.send(msg);
System.out.println(result);
}
Thread.sleep(Long.MAX_VALUE);
}