文档

订阅数据示例

更新时间:
一键部署

通过MQTT订阅资产指标、业务属性变更、告警信息。

MQTT Server

#mqtt服务器地址
iotx.mqtt.broker = tcp://XX:31883  

#mqtt服务器用户名
iotx.mqtt.userName = XXX #只可以订阅,没有发布权限

#mqtt服务器密码
iotx.mqtt.password = XXX  

#mqtt默认心跳时间
iotx.mqtt.keepAliveInterval = 60

#mqtt默认清除session
iotx.mqtt.cleanSession = true

#mqtt默认qos类型
iotx.mqtt.qos = 0

#mqtt默认不持久化保存
iotx.mqtt.retained = false

注意:登录工业数据应用平台->设置->系统选项中,可查看mqtt对外的服务器地址和用户名密码。

image.png

订阅MQTT数据的示例Demo

  • java版本

<dependency>
  <groupId>org.eclipse.paho</groupId>
  <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
  <version>1.2.5</version>
</dependency>
public class ConnectionOptionWrapper {
    /**
     * 内部连接参数
     */
    private MqttConnectOptions mqttConnectOptions;
    /**
     * MQ4IOT 实例 ID,购买后控制台获取
     */
    private String instanceId;
    /**
     * MQ4IOT clientId,由业务系统分配,需要保证每个 tcp 连接都不一样,保证全局唯一,如果不同的客户端对象(tcp 连接)使用了相同的 clientId 会导致连接异常断开。
     * clientId 由两部分组成,格式为 GroupID@@@DeviceId,其中 groupId 在 MQ4IOT 控制台申请,DeviceId 由业务方自己设置,clientId 总长度不得超过64个字符。
     */
    private String clientId;
    /**
     * 客户端使用的 Token 参数,仅在 Token 鉴权模式下需要设置,Key 为 token 类型,一个客户端最多存在三种类型,R,W,RW,Value 是 token内容。
     * 应用需要保证 token 在过期及时更新。否则会导致连接异常。
     */
    private Map<String, String> tokenData = new ConcurrentHashMap<String, String>();

    public ConnectionOptionWrapper(String userName, String passWord, String clientId) {
        this.clientId = clientId;
        mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setUserName(userName);
        mqttConnectOptions.setPassword(passWord.toCharArray());
        mqttConnectOptions.setCleanSession(true);
        mqttConnectOptions.setKeepAliveInterval(90);
        mqttConnectOptions.setAutomaticReconnect(true);
        mqttConnectOptions.setMqttVersion(MQTT_VERSION_3_1_1);
        mqttConnectOptions.setConnectionTimeout(5000);
    }

    public MqttConnectOptions getMqttConnectOptions() {
        return mqttConnectOptions;
    }
}

public static void main(String[] args) throws Exception {

        String userName = "usernamexx";
        String password = "passwordxxx";
        String endPoint = "192.168.1.100"; //服务IP
        /**
         * MQ4IOT clientId,由业务系统分配,需要保证每个 tcp 连接都不一样,保证全局唯一,如果不同的客户端对象(tcp 连接)使用了相同的 clientId 会导致连接异常断开。
         * clientId 由两部分组成,格式为 GroupID@@@DeviceId,其中 groupId 在 MQ4IOT 控制台申请,DeviceId 由业务方自己设置,clientId 总长度不得超过64个字符。
         */
        String clientId = "GID@@@device_id_1";
        /**
         * MQ4IOT支持子级 topic,用来做自定义的过滤,此处为示意,可以填写任何字符串,具体参考https://help.aliyun.com/document_detail/42420.html?spm=a2c4g.11186623.6.544.1ea529cfAO5zV3
         * 需要注意的是,完整的 topic 长度不得超过128个字符。
         */
        final String mq4IotTopic = "/iot/metric/indicator/meta/zxlAssetCode1014/#";
        /**
         * QoS参数代表传输质量,可选0,1,2,根据实际需求合理设置,具体参考 https://help.aliyun.com/document_detail/42420.html?spm=a2c4g.11186623.6.544.1ea529cfAO5zV3
         */
        final int qosLevel = 0;
        ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(userName, password, clientId);
        final MemoryPersistence memoryPersistence = new MemoryPersistence();
        /**
         * 户端使用的协议和端口必须匹配,具体参考文档 https://help.aliyun.com/document_detail/44866.html?spm=a2c4g.11186623.6.552.25302386RcuYFB
        / * 是 SSL 加密则设置ssl://endpoint:8883
         */
        final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":31883", clientId, memoryPersistence);
        /**
         * 客户端设置好发送超时时间,防止无限阻塞
         */
        mqttClient.setTimeToWait(5000);
        final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>());
        mqttClient.setCallback(new MqttCallbackExtended() {
            @Override
            public void connectComplete(boolean reconnect, String serverURI) {
                /**
                 * 客户端连接成功后就需要尽快订阅需要的 topic
                 */
                System.out.println("connect success");
                executorService.submit(() -> {
                    try {
                        final String topicFilter[] = {mq4IotTopic};
                        final int[] qos = {qosLevel};
                        mqttClient.subscribe(topicFilter, qos);
                    } catch (MqttException e) {
                        e.printStackTrace();
                    }
                });
            }

            @Override
            public void connectionLost(Throwable throwable) {
                throwable.printStackTrace();
            }

            @Override
            public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                /**
                 * 消费消息的回调接口,需要确保该接口不抛异常,该接口运行返回即代表消息消费成功。
                 * 消费消息需要保证在规定时间内完成,如果消费耗时超过服务端约定的超时时间,对于可靠传输的模式,服务端可能会重试推送,业务需要做好幂等去重处理。
 超时时间约定参考限制
                 * https://help.aliyun.com/document_detail/63620.html?spm=a2c4g.11186623.6.546.229f1f6ago55Fj                */
                String payload = new String(mqttMessage.getPayload());
                System.out.println(payload);
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);
            }
        });
        mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
        Thread.sleep(Long.MAX_VALUE);
    }

注意事项

  1. 连接mqtt服务器的clientId必须唯一,如果重复会断开前一个相同clientId的连接,导致数据接收不完全。

  2. 订阅mqtt服务器在某些情况下可能会瞬时断开重连。应用需要自己做好重连。

  3. 当在高并发的情况下(5万QPS以上)订阅mqtt服务器的信息,建议使用多个subscribe,不然可能会有数据丢失的情况。

  4. 高并发下可使用$queue/t/1,或$share/{groupId}/t/1,实现负载均衡。