MQTT协议规范

MQTT是基于TCP/IP协议栈构建的异步通信消息协议,是一种轻量级的发布、订阅信息传输协议。可以在不可靠的网络环境中进行扩展,适用于设备硬件存储空间或网络带宽有限的场景。使用MQTT协议,消息发送者与接收者不受时间和空间的限制。物联网平台支持设备使用MQTT协议接入。

支持版本

目前物联网平台支持MQTT标准协议接入,兼容5.0、3.1.1和3.1版本协议,具体的协议请参见MQTT 5.0MQTT 3.1.1MQTT 3.1协议文档。

重要

若需使用MQTT 5.0协议,请先购买企业版实例。

与标准MQTT的区别

  • 支持MQTT的PUB、SUB、PING、PONG、CONNECT、DISCONNECT和UNSUB等报文。

  • 支持clean session。

  • 不支持will、retain msg。

  • 支持QoS 0、QoS 1,不支持QoS 2。

  • 不支持SUB QoS,消息QoS以发送方(PUB)指定为准。

  • 基于原生的MQTT Topic上支持RRPC同步模式,服务器可以同步调用设备并获取设备回执结果。

支持的MQTT 5.0特性

MQTT 5.0协议在之前版本基础上添加了大量全新特性,提高了性能和易用性。更多信息,请参见Appendix C. Summary of new features in MQTT v5.0MQTT 5.0概述

目前,物联网平台支持MQTT 5.0的部分新增特性如下。

支持的特性

使用方法

会话过期

  • 设备建连时设置Clean Start和Session Expiry Interval。

    MqttConnectionOptions options = new MqttConnectionOptions();
    options.setCleanStart(true);
    options.setSessionExpiryInterval(60L);// 单位:秒。
    
    MqttClient mqttClient = new MqttClient(host, clientId, new MemoryPersistence());
    mqttClient.connect(options);
  • 设备断连时设置Session Expiry Interval。

    MqttProperties mqttProperties = new MqttProperties();
    mqttProperties.setSessionExpiryInterval(60L);// 单位:秒。
    
    MqttAsyncClient mqttAsyncClient = new MqttAsyncClient(host, clientId, new MemoryPersistence());
    mqttAsyncClient.disconnect(30000, null, null, MqttReturnCode.RETURN_CODE_SUCCESS, mqttProperties);

消息过期

设备发送消息时设置Message Expiry Interval:

IntervalString content = "Hello World";
byte[] payload = content.getBytes();

// 创建消息。
MqttMessage message = new MqttMessage(payload);
// 设置消息的服务质量。
message.setQos(1);

MqttProperties mqttProperties = new MqttProperties();

// 设置消息过期时间。
mqttProperties.setMessageExpiryInterval(600L);

message.setProperties(mqttProperties);

// 发布消息。
MqttClient mqttClient = new MqttClient(host, clientId, new MemoryPersistence());
mqttClient.publish(topic, message);

订阅选项

支持设置订阅选项:

  • QoS:MQTT消息服务质量等级。支持设置0(QoS 0消息)和1(QoS 1消息)。

  • No Local:客户端是否接收自己发布的消息。

    对于MQTT 3.1.1版本协议,如果客户端订阅了自己发布消息的Topic,则客户端会接收到自己发布的消息。如果使用MQTT 5.0版本协议,此场景下,客户端在订阅Topic时,可以设置此选项为true,客户端将不会接收到自己发布的消息。

    取值:

    • true:不接收。

    • false:接收。

  • Retain As Publish:服务端向客户端转发消息时是否保留其中的RETAIN标识。

    取值:

    • true:如果消息中有RETAIN标识,那么会保留该标识。如果消息中没有RETAIN标识,那么此选项无效。

    • false:无论消息中是否有RETAIN标识,都不会保留该标识。

    重要

    Retain As Publish的设置不会影响保留消息中的RETAIN标识。

  • Retain Handling:指定订阅建立时服务端是否向客户端发送保留消息。

    取值:

    • 0:只要客户端订阅成功,服务端就发送保留消息。

    • 1:客户端订阅成功且该订阅之前不存在,服务端才发送保留消息。

    • 2:即使客户端订阅成功,服务端也不会发送保留消息。

MqttSubscription mqttSubscription = new MqttSubscription("aaa/bbb");

// 设置订阅选项QoS。
mqttSubscription.setQos(1);

// 设置订阅选项No Local。
mqttSubscription.setNoLocal(true);

// 设置订阅选项Retaion As Published。
mqttSubscription.setRetainAsPublished(true);

// 设置订阅选项Retain Handling。
mqttSubscription.setRetainHandling(1);

MqttClient mqttClient = new MqttClient(host, clientId, new MemoryPersistence());
mqttClient.subscribe(new MqttSubscription[]{mqttSubscription});

保留消息

// 创建保留消息。
String content = "Hello World";
byte[] payload = content.getBytes();
MqttMessage message = new MqttMessage(payload);
// 设置消息为保留消息。
message.setRetained(true);

// 发布消息。
MqttClient mqttClient = new MqttClient(host, clientId, new MemoryPersistence());
mqttClient.publish(topic, message);

遗嘱消息

// 创建遗嘱消息。
String content = "Will Message";
byte[] payload = content.getBytes();
MqttMessage message = new MqttMessage(payload);

MqttConnectionOptions options = new MqttConnectionOptions();
options.setUserName(USERNAME);
options.setPassword(PASSWORD.getBytes());

// 设置遗嘱消息。
options.setWill(topic, message);

// 设置遗嘱延迟。
MqttProperties willMessageProperties = new MqttProperties();
willMessageProperties.setWillDelayInterval(60L);
options.setWillMessageProperties(willMessageProperties);

// 建立连接。
MqttClient mqttClient = new MqttClient(host, clientId, new MemoryPersistence());
mqttClient.connect(options);

建连协商

MqttConnectionOptions connOpts = new MqttConnectionOptions();
connOpts.setMaximumPacketSize(1024L);

用户属性

MqttProperties properties = new MqttProperties();
List<UserProperty> userPropertys = new ArrayList<>();
userPropertys.add(new UserProperty("key1","value1"));
properties.setUserProperties(userPropertys);

设备使用MQTT 5.0协议成功接入物联网平台后,可在云端运行日志内容中,查看到上报的UserProperty数据。

重要

最多可添加20个属性。属性中Key值不允许以下划线(_)开头,Key和Value最大总长度不超过128个字符。

请求与响应模式

例如:请求方为设备,接收方为您的业务服务器,您可通过AMQP订阅或规则流转后,从消息的属性列表中解析出ResponseTopic和CorrelationData,然后调用Pub接口,将响应发送给设备。

MqttProperties properties = new MqttProperties();
properties.setCorrelationData("requestId12345".getBytes());
properties.setResponseTopic("/" + productKey + "/" + deviceName + "/user/get");
重要
  • 解析出的CorrelationData, 需要通过Base64解码,才能还原成设备上报的byte数组类型数据。

  • ResponseTopic和CorrelationData的最大长度都不能超过128个字符。

错误码增强

更多信息,请参见错误排查

主题别名

不涉及。

共享订阅

共享订阅的Topic格式为:$share/${ShareName}/${filter}

  • $share:固定值。共享订阅Topic必须以$share开头。

  • ${ShareName}:一个只包含字母、数字和下划线(_)的字符串。

    订阅会话通过使用相同的${ShareName}表示共享同一个订阅,匹配该订阅的消息每次只会发布给其中一个会话。

  • ${filter}:非共享订阅中的主题过滤器,支持字母、数字和下划线(_)。

示例:

MqttConnectionOptions options = new MqttConnectionOptions();
options.setUserName(username);
options.setPassword(password);

MqttClient mqttClient = new MqttClient(host, clientId, new MemoryPersistence());
mqttClient.connect(options);

mqttClient.subscribe("$share/testGroup/user/post", 1);

安全等级

  • TLS直连模式(通道加密):安全级别高。

    重要
    • 支持TLS协议1.0、1.1、1.2和1.3版本,强烈建议您的设备使用TLS 1.2、1.3加密。因TLS 1.0、1.1版本较老,可能有安全风险。

    • 设备端Link SDK已配置V1.2和V1.3版本的TLS协议,您无需自行配置。

  • TCP直连模式(通道不加密):不安全,功能即将下线,请勿使用。

    重要

    使用TCP直连模式造成的数据泄露风险和后果,需要由您自行承担。

Topic规范

Topic定义及分类,请查看什么是Topic

系统默认通信类Topic可前往控制台设备详情页查看,功能类Topic可前往具体功能文档页查看。

使用限制

设备身份注册成功后,针对同一设备身份信息,只可选择一种通信协议接入物联网平台,不可多种类型通信协议同时混用。

使用说明

物联网平台提供各类设备端SDK,支持设备使用MQTT协议接入物联网平台。获取SDK方法,请参见使用设备端SDK接入

设备基于MQTT协议通过不同方式接入物联网平台进行通信的配置说明,请参见: