MQTT是基于TCP/IP协议栈构建的异步通信消息协议,是一种轻量级的发布、订阅信息传输协议。可以在不可靠的网络环境中进行扩展,适用于设备硬件存储空间或网络带宽有限的场景。使用MQTT协议,消息发送者与接收者不受时间和空间的限制。物联网平台支持设备使用MQTT协议接入。
与标准MQTT的区别
支持MQTT的PUB、SUB、PING、PONG、CONNECT、DISCONNECT和UNSUB等报文。
支持clean session。
不支持will、retain msg。
支持QoS 0、QoS 1,不支持QoS 2。
不支持SUB QoS,消息QoS以发送方(PUB)指定为准。
基于原生的MQTT Topic上支持RRPC同步模式,服务器可以同步调用设备并获取设备回执结果。
支持的MQTT 5.0特性
MQTT 5.0协议在之前版本基础上添加了大量全新特性,提高了性能和易用性。更多信息,请参见Appendix C. Summary of new features in MQTT v5.0和MQTT 5.0概述。
目前,物联网平台支持MQTT 5.0的部分新增特性如下。
支持的特性 | 使用方法 |
会话过期 | 设备建连时设置Clean Start和Session Expiry Interval。 MqttConnectionOptions options = new MqttConnectionOptions();
options.setCleanStart(true);
options.setSessionExpiryInterval(60L);// 单位:秒。
MqttClient mqttClient = new MqttClient(host, clientId, new MemoryPersistence());
mqttClient.connect(options);
设备断连时设置Session Expiry Interval。 MqttProperties mqttProperties = new MqttProperties();
mqttProperties.setSessionExpiryInterval(60L);// 单位:秒。
MqttAsyncClient mqttAsyncClient = new MqttAsyncClient(host, clientId, new MemoryPersistence());
mqttAsyncClient.disconnect(30000, null, null, MqttReturnCode.RETURN_CODE_SUCCESS, mqttProperties);
|
消息过期 | 设备发送消息时设置Message Expiry Interval: IntervalString content = "Hello World";
byte[] payload = content.getBytes();
// 创建消息。
MqttMessage message = new MqttMessage(payload);
// 设置消息的服务质量。
message.setQos(1);
MqttProperties mqttProperties = new MqttProperties();
// 设置消息过期时间。
mqttProperties.setMessageExpiryInterval(600L);
message.setProperties(mqttProperties);
// 发布消息。
MqttClient mqttClient = new MqttClient(host, clientId, new MemoryPersistence());
mqttClient.publish(topic, message);
|
订阅选项 | 支持设置订阅选项: QoS:MQTT消息服务质量等级。支持设置0(QoS 0消息)和1(QoS 1消息)。 No Local:客户端是否接收自己发布的消息。 对于MQTT 3.1.1版本协议,如果客户端订阅了自己发布消息的Topic,则客户端会接收到自己发布的消息。如果使用MQTT 5.0版本协议,此场景下,客户端在订阅Topic时,可以设置此选项为true ,客户端将不会接收到自己发布的消息。 Retain As Publish:服务端向客户端转发消息时是否保留其中的RETAIN标识。
重要 Retain As Publish的设置不会影响保留消息中的RETAIN标识。 Retain Handling:指定订阅建立时服务端是否向客户端发送保留消息。
MqttSubscription mqttSubscription = new MqttSubscription("aaa/bbb");
// 设置订阅选项QoS。
mqttSubscription.setQos(1);
// 设置订阅选项No Local。
mqttSubscription.setNoLocal(true);
// 设置订阅选项Retaion As Published。
mqttSubscription.setRetainAsPublished(true);
// 设置订阅选项Retain Handling。
mqttSubscription.setRetainHandling(1);
MqttClient mqttClient = new MqttClient(host, clientId, new MemoryPersistence());
mqttClient.subscribe(new MqttSubscription[]{mqttSubscription});
|
保留消息 | // 创建保留消息。
String content = "Hello World";
byte[] payload = content.getBytes();
MqttMessage message = new MqttMessage(payload);
// 设置消息为保留消息。
message.setRetained(true);
// 发布消息。
MqttClient mqttClient = new MqttClient(host, clientId, new MemoryPersistence());
mqttClient.publish(topic, message);
|
遗嘱消息 | // 创建遗嘱消息。
String content = "Will Message";
byte[] payload = content.getBytes();
MqttMessage message = new MqttMessage(payload);
MqttConnectionOptions options = new MqttConnectionOptions();
options.setUserName(USERNAME);
options.setPassword(PASSWORD.getBytes());
// 设置遗嘱消息。
options.setWill(topic, message);
// 设置遗嘱延迟。
MqttProperties willMessageProperties = new MqttProperties();
willMessageProperties.setWillDelayInterval(60L);
options.setWillMessageProperties(willMessageProperties);
// 建立连接。
MqttClient mqttClient = new MqttClient(host, clientId, new MemoryPersistence());
mqttClient.connect(options);
|
建连协商 | MqttConnectionOptions connOpts = new MqttConnectionOptions();
connOpts.setMaximumPacketSize(1024L);
|
用户属性 | MqttProperties properties = new MqttProperties();
List<UserProperty> userPropertys = new ArrayList<>();
userPropertys.add(new UserProperty("key1","value1"));
properties.setUserProperties(userPropertys);
设备使用MQTT 5.0协议成功接入物联网平台后,可在云端运行日志内容中,查看到上报的UserProperty数据。
重要 最多可添加20个属性。属性中Key值不允许以下划线(_)开头,Key和Value最大总长度不超过128个字符。 |
请求与响应模式 | 例如:请求方为设备,接收方为您的业务服务器,您可通过AMQP订阅或规则流转后,从消息的属性列表中解析出ResponseTopic和CorrelationData,然后调用Pub接口,将响应发送给设备。 MqttProperties properties = new MqttProperties();
properties.setCorrelationData("requestId12345".getBytes());
properties.setResponseTopic("/" + productKey + "/" + deviceName + "/user/get");
|
错误码增强 | 更多信息,请参见错误排查。 |
主题别名 | 不涉及。 |
共享订阅 | 共享订阅的Topic格式为:$share/${ShareName}/${filter} 。 $share :固定值。共享订阅Topic必须以$share 开头。
${ShareName} :一个只包含字母、数字和下划线(_)的字符串。
订阅会话通过使用相同的${ShareName} 表示共享同一个订阅,匹配该订阅的消息每次只会发布给其中一个会话。 ${filter} :非共享订阅中的主题过滤器,支持字母、数字和下划线(_)。
示例: MqttConnectionOptions options = new MqttConnectionOptions();
options.setUserName(username);
options.setPassword(password);
MqttClient mqttClient = new MqttClient(host, clientId, new MemoryPersistence());
mqttClient.connect(options);
mqttClient.subscribe("$share/testGroup/user/post", 1);
|
Topic规范
Topic定义及分类,请查看什么是Topic。
系统默认通信类Topic可前往控制台设备详情页查看,功能类Topic可前往具体功能文档页查看。
使用限制
设备身份注册成功后,针对同一设备身份信息,只可选择一种通信协议接入物联网平台,不可多种类型通信协议同时混用。
使用说明
物联网平台提供各类设备端SDK,支持设备使用MQTT协议接入物联网平台。获取SDK方法,请参见使用设备端SDK接入。
设备基于MQTT协议通过不同方式接入物联网平台进行通信的配置说明,请参见: