设置Clean Start和Session Expiry Interval。 |
- 设备建连时设置Clean Start和Session Expiry Interval。
MqttConnectionOptions options = new MqttConnectionOptions();
options.setCleanStart(true);
options.setSessionExpiryInterval(60L);// 单位:秒。
MqttClient mqttClient = new MqttClient(host, clientId, new MemoryPersistence());
mqttClient.connect(options);
- 设备断连时设置Session Expiry Interval。
MqttProperties mqttProperties = new MqttProperties();
mqttProperties.setSessionExpiryInterval(60L);// 单位:秒。
MqttAsyncClient mqttAsyncClient = new MqttAsyncClient(host, clientId, new MemoryPersistence());
mqttAsyncClient.disconnect(30000, null, null, MqttReturnCode.RETURN_CODE_SUCCESS, mqttProperties);
|
设置Message Expiry Interval。 |
设备发送消息时设置Message Expiry Interval:IntervalString content = "Hello World";
byte[] payload = content.getBytes();
// 创建消息。
MqttMessage message = new MqttMessage(payload);
// 设置消息的服务质量。
message.setQos(1);
MqttProperties mqttProperties = new MqttProperties();
// 设置消息过期时间。
mqttProperties.setMessageExpiryInterval(600L);
message.setProperties(mqttProperties);
// 发布消息。
MqttClient mqttClient = new MqttClient(host, clientId, new MemoryPersistence());
mqttClient.publish(topic, message);
|
设置订阅选项。 |
支持设置订阅选项:
MqttSubscription mqttSubscription = new MqttSubscription("aaa/bbb");
// 设置订阅选项QoS。
mqttSubscription.setQos(1);
// 设置订阅选项No Local。
mqttSubscription.setNoLocal(true);
// 设置订阅选项Retaion As Published。
mqttSubscription.setRetainAsPublished(true);
// 设置订阅选项Retain Handling。
mqttSubscription.setRetainHandling(1);
MqttClient mqttClient = new MqttClient(host, clientId, new MemoryPersistence());
mqttClient.subscribe(new MqttSubscription[]{mqttSubscription});
|
设置保留消息。 |
// 创建保留消息。
String content = "Hello World";
byte[] payload = content.getBytes();
MqttMessage message = new MqttMessage(payload);
// 设置消息为保留消息。
message.setRetained(true);
// 发布消息。
MqttClient mqttClient = new MqttClient(host, clientId, new MemoryPersistence());
mqttClient.publish(topic, message);
|
设置遗嘱消息。 |
// 创建遗嘱消息。
String content = "Will Message";
byte[] payload = content.getBytes();
MqttMessage message = new MqttMessage(payload);
MqttConnectionOptions options = new MqttConnectionOptions();
options.setUserName(USERNAME);
options.setPassword(PASSWORD.getBytes());
// 设置遗嘱消息。
options.setWill(topic, message);
// 设置遗嘱延迟。
MqttProperties willMessageProperties = new MqttProperties();
willMessageProperties.setWillDelayInterval(60L);
options.setWillMessageProperties(willMessageProperties);
// 建立连接。
MqttClient mqttClient = new MqttClient(host, clientId, new MemoryPersistence());
mqttClient.connect(options);
|
设置客户端和服务端发送报文的最大长度,直接过滤冗长的消息。 |
MqttConnectionOptions connOpts = new MqttConnectionOptions();
connOpts.setMaximumPacketSize(1024L);
|
设置QoS 1消息限流值,单位为条/秒。 |
MqttConnectionOptions connOpts = new MqttConnectionOptions();
connOpts.setReceiveMaximum(5);
|
设置UserProperty属性列表,每个属性由Key和Value组成,用于传输额外的属性数据。 |
MqttProperties properties = new MqttProperties();
List<UserProperty> userPropertys = new ArrayList<>();
userPropertys.add(new UserProperty("key1","value1"));
properties.setUserProperties(userPropertys);
设备使用MQTT 5.0协议成功接入物联网平台后,可在云端运行日志内容中,查看到上报的UserProperty数据。
重要 最多可添加20个属性。属性中Key值不允许以下划线(_)开头,Key和Value最大总长度不超过128个字符。
|
新增响应主题(ResponseTopic)和相关数据(CorrelationData),类似HTTP请求响应的模式,实现双方通信。 |
例如:请求方为设备,接收方为您的业务服务器,您可通过AMQP订阅或规则流转后,从消息的属性列表中解析出ResponseTopic和CorrelationData,然后调用Pub接口,将响应发送给设备。 MqttProperties properties = new MqttProperties();
properties.setCorrelationData("requestId12345".getBytes());
properties.setResponseTopic("/" + productKey + "/" + deviceName + "/user/get");
重要
- 解析出的CorrelationData, 需要通过Base64解码,才能还原成设备上报的byte数组类型数据。
- ResponseTopic和CorrelationData的最大长度都不能超过128个字符。
|
增加更多返回码,便于设备快速定位请求状态及问题。 |
更多信息,请参见错误排查。 |
将消息通信Topic缩小为整型数值,来减小MQTT报文,节约网络带宽资源。 |
不涉及。 |
共享订阅。 |
共享订阅的Topic格式为:$share/${ShareName}/${filter} 。
示例:
MqttConnectionOptions options = new MqttConnectionOptions();
options.setUserName(username);
options.setPassword(password);
MqttClient mqttClient = new MqttClient(host, clientId, new MemoryPersistence());
mqttClient.connect(options);
mqttClient.subscribe("$share/testGroup/user/post", 1);
|