订阅数据示例
更新时间:
通过MQTT订阅资产指标、业务属性变更、告警信息。
MQTT Server
#mqtt服务器地址
iotx.mqtt.broker = tcp://XX:31883
#mqtt服务器用户名
iotx.mqtt.userName = XXX #只可以订阅,没有发布权限
#mqtt服务器密码
iotx.mqtt.password = XXX
#mqtt默认心跳时间
iotx.mqtt.keepAliveInterval = 60
#mqtt默认清除session
iotx.mqtt.cleanSession = true
#mqtt默认qos类型
iotx.mqtt.qos = 0
#mqtt默认不持久化保存
iotx.mqtt.retained = false
注意:登录工业数据应用平台->设置->系统选项中,可查看mqtt对外的服务器地址和用户名密码。
订阅MQTT数据的示例Demo
java版本
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
public class ConnectionOptionWrapper {
/**
* 内部连接参数
*/
private MqttConnectOptions mqttConnectOptions;
/**
* MQ4IOT 实例 ID,购买后控制台获取
*/
private String instanceId;
/**
* MQ4IOT clientId,由业务系统分配,需要保证每个 tcp 连接都不一样,保证全局唯一,如果不同的客户端对象(tcp 连接)使用了相同的 clientId 会导致连接异常断开。
* clientId 由两部分组成,格式为 GroupID@@@DeviceId,其中 groupId 在 MQ4IOT 控制台申请,DeviceId 由业务方自己设置,clientId 总长度不得超过64个字符。
*/
private String clientId;
/**
* 客户端使用的 Token 参数,仅在 Token 鉴权模式下需要设置,Key 为 token 类型,一个客户端最多存在三种类型,R,W,RW,Value 是 token内容。
* 应用需要保证 token 在过期及时更新。否则会导致连接异常。
*/
private Map<String, String> tokenData = new ConcurrentHashMap<String, String>();
public ConnectionOptionWrapper(String userName, String passWord, String clientId) {
this.clientId = clientId;
mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setUserName(userName);
mqttConnectOptions.setPassword(passWord.toCharArray());
mqttConnectOptions.setCleanSession(true);
mqttConnectOptions.setKeepAliveInterval(90);
mqttConnectOptions.setAutomaticReconnect(true);
mqttConnectOptions.setMqttVersion(MQTT_VERSION_3_1_1);
mqttConnectOptions.setConnectionTimeout(5000);
}
public MqttConnectOptions getMqttConnectOptions() {
return mqttConnectOptions;
}
}
public static void main(String[] args) throws Exception {
String userName = "usernamexx";
String password = "passwordxxx";
String endPoint = "192.168.1.100"; //服务IP
/**
* MQ4IOT clientId,由业务系统分配,需要保证每个 tcp 连接都不一样,保证全局唯一,如果不同的客户端对象(tcp 连接)使用了相同的 clientId 会导致连接异常断开。
* clientId 由两部分组成,格式为 GroupID@@@DeviceId,其中 groupId 在 MQ4IOT 控制台申请,DeviceId 由业务方自己设置,clientId 总长度不得超过64个字符。
*/
String clientId = "GID@@@device_id_1";
/**
* MQ4IOT支持子级 topic,用来做自定义的过滤,此处为示意,可以填写任何字符串,具体参考https://help.aliyun.com/document_detail/42420.html?spm=a2c4g.11186623.6.544.1ea529cfAO5zV3
* 需要注意的是,完整的 topic 长度不得超过128个字符。
*/
final String mq4IotTopic = "/iot/metric/indicator/meta/zxlAssetCode1014/#";
/**
* QoS参数代表传输质量,可选0,1,2,根据实际需求合理设置,具体参考 https://help.aliyun.com/document_detail/42420.html?spm=a2c4g.11186623.6.544.1ea529cfAO5zV3
*/
final int qosLevel = 0;
ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(userName, password, clientId);
final MemoryPersistence memoryPersistence = new MemoryPersistence();
/**
* 户端使用的协议和端口必须匹配,具体参考文档 https://help.aliyun.com/document_detail/44866.html?spm=a2c4g.11186623.6.552.25302386RcuYFB
/ * 是 SSL 加密则设置ssl://endpoint:8883
*/
final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":31883", clientId, memoryPersistence);
/**
* 客户端设置好发送超时时间,防止无限阻塞
*/
mqttClient.setTimeToWait(5000);
final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
mqttClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
/**
* 客户端连接成功后就需要尽快订阅需要的 topic
*/
System.out.println("connect success");
executorService.submit(() -> {
try {
final String topicFilter[] = {mq4IotTopic};
final int[] qos = {qosLevel};
mqttClient.subscribe(topicFilter, qos);
} catch (MqttException e) {
e.printStackTrace();
}
});
}
@Override
public void connectionLost(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
/**
* 消费消息的回调接口,需要确保该接口不抛异常,该接口运行返回即代表消息消费成功。
* 消费消息需要保证在规定时间内完成,如果消费耗时超过服务端约定的超时时间,对于可靠传输的模式,服务端可能会重试推送,业务需要做好幂等去重处理。
超时时间约定参考限制
* https://help.aliyun.com/document_detail/63620.html?spm=a2c4g.11186623.6.546.229f1f6ago55Fj */
String payload = new String(mqttMessage.getPayload());
System.out.println(payload);
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);
}
});
mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
Thread.sleep(Long.MAX_VALUE);
}
注意事项
连接mqtt服务器的clientId必须唯一,如果重复会断开前一个相同clientId的连接,导致数据接收不完全。
订阅mqtt服务器在某些情况下可能会瞬时断开重连。应用需要自己做好重连。
当在高并发的情况下(5万QPS以上)订阅mqtt服务器的信息,建议使用多个subscribe,不然可能会有数据丢失的情况。
高并发下可使用$queue/t/1,或$share/{groupId}/t/1,实现负载均衡。
文档内容是否对您有帮助?