基于MQTT接入的设备靠心跳保活,但心跳是周期性的、且自动收发和超时重连,这些特性给主动检测设备端是否在线带来了一定难度。本文提供通过消息收发是否正常判定设备是否在线的原理、流程、实现方式。

原理

如果设备可以发送、接收消息,那么该设备的通信是没问题的,并且一定在线。

消息收发是物联网平台的核心能力。因此,这种判定方法不会因为物联网平台架构升级或业务变动而变化,也不会因为设备使用的客户端不同而不同。是设备端检测自己是否在线最通用的一种原理。

该原理的一种特殊实现就是设备端消息的自发自收。

流程

  1. 创建Topic = /yourProductKey/yourDeviceName/user/checkstatus

    Topic可以自定义,但权限必须为发布和订阅。

  2. 设备端订阅上一步创建的Topic 。
  3. 设备端发送消息{"id":123,"version":"1.0","time":1234567890123},请一定使用QoS=0 。

    消息内容可自定义,但建议使用此格式。

    参数说明:

    字段 类型 说明
    id Object 用于验证收发的消息是否是同一个,请自行业务层保证唯一
    version String 版本号固定1.0
    time Long 发送消息的时间戳,可以计算消息来回的延时,评估当前的通信质量
  4. 设备端收到消息上一步发送的消息。

    离线判定逻辑

    • 严格的:发送消息后,5秒内没有收到消息算失败,出现1次失败,判定为离线
    • 普通的:发送消息后,5秒内没有收到消息算失败,连续2次失败,判定为离线
    • 宽松的:发送消息后,5秒内没有收到消息算失败,连续3次失败,判定为离线
    说明 您可以根据自己的情况,自定义离线判定逻辑。

实现

为方便体验,本例基于Java SDK Demo开发,实现设备端检测自己是否在线的严格判定逻辑。

Java SDK开发具体细节,请查看相关文档
说明 您可以根据自己的喜好,选择不同的设备端SDK进行开发。

首先,下载Demo工程,添加本类,并填写设备证书信息。设备端代码如下:

import java.io.UnsupportedEncodingException;

import com.aliyun.alink.dm.api.DeviceInfo;
import com.aliyun.alink.dm.api.InitResult;
import com.aliyun.alink.linkkit.api.ILinkKitConnectListener;
import com.aliyun.alink.linkkit.api.IoTMqttClientConfig;
import com.aliyun.alink.linkkit.api.LinkKit;
import com.aliyun.alink.linkkit.api.LinkKitInitParams;
import com.aliyun.alink.linksdk.cmp.connect.channel.MqttPublishRequest;
import com.aliyun.alink.linksdk.cmp.connect.channel.MqttSubscribeRequest;
import com.aliyun.alink.linksdk.cmp.core.base.AMessage;
import com.aliyun.alink.linksdk.cmp.core.base.ARequest;
import com.aliyun.alink.linksdk.cmp.core.base.AResponse;
import com.aliyun.alink.linksdk.cmp.core.base.ConnectState;
import com.aliyun.alink.linksdk.cmp.core.listener.IConnectNotifyListener;
import com.aliyun.alink.linksdk.cmp.core.listener.IConnectSendListener;
import com.aliyun.alink.linksdk.cmp.core.listener.IConnectSubscribeListener;
import com.aliyun.alink.linksdk.tools.AError;

public class CheckDeviceStatusOnDevice {

	// ===================需要用户填写的参数,开始===========================
	// 产品productKey,设备证书参数之一
	private static String productKey = "";
	// 设备名字deviceName,设备证书参数之一
	private static String deviceName = "";
	// 设备密钥deviceSecret,设备证书参数之一
	private static String deviceSecret = "";
	// 消息通信的Topic,需要在控制台定义,权限必须为发布和订阅
	private static String checkStatusTopic = "/" + productKey + "/" + deviceName + "/user/checkstatus";
	// ===================需要用户填写的参数结束===========================

	// 接收到的消息
	private static String subInfo = "";

	public static void main(String[] args) throws InterruptedException {

		CheckDeviceStatusOnDevice device = new CheckDeviceStatusOnDevice();

		// 初始化
		device.init(productKey, deviceName, deviceSecret);

		// 下行数据监听
		device.registerNotifyListener();

		// 订阅Topic
		device.subscribe(checkStatusTopic);

		// 测试设备状态
		System.out.println("we will check device online status now.");
		device.checkStatus();

		// 准备测试设备离线状态,请拔掉网线
		System.out.println("pls close network,we will check device offline status after 60 seconds.");
		for (int i = 0; i < 6; i++) {
			Thread.sleep(10000);
		}
		device.checkStatus();
	}

	/**
	 * 测试设备状态
	 * 
	 * @throws InterruptedException
	 */
	public void checkStatus() throws InterruptedException {

		// -------------------------------------------------------------------
		// 要发送的消息,可以自定义,建议使用当前格式
		// -------------------------------------------------------------------
		// Field   | Tyep   | Desc
		// -------------------------------------------------------------------
		// id      | Object | 用于验证收发的消息是否是同一个,请自行业务层保证唯一
		// -------------------------------------------------------------------
		// version | String | 版本号固定1.0
		// -------------------------------------------------------------------
		// time    | Long   | 发送消息的时间戳,可以计算消息来回的延时,评估当前的通信质量
		// -------------------------------------------------------------------
		String payload = "{\"id\":123, \"version\":\"1.0\",\"time\":" + System.currentTimeMillis() + "}";

		// 发送消息
		publish(checkStatusTopic, payload);

		// 严格的离线判定逻辑:发送消息后,5秒内没有收到消息算失败,出现1次失败,判定为离线
		boolean isTimeout = true;
		for (int i = 0; i < 5; i++) {
			Thread.sleep(1000);
			if (!subInfo.isEmpty()) {
				isTimeout = false;
				break;
			}
		}
		if (!isTimeout && payload.equals(subInfo)) {
			System.out.println("Device is online !!");
		} else {
			System.out.println("Device is offline !!");
		}

		// 置空接收到的消息,方便下一次测试
		subInfo = "";
	}

	/**
	 * 初始化
	 * 
	 * @param pk productKey
	 * @param dn devcieName
	 * @param ds deviceSecret
	 * @throws InterruptedException
	 */
	public void init(String pk, String dn, String ds) throws InterruptedException {

		LinkKitInitParams params = new LinkKitInitParams();

		// 设置 MQTT 初始化参数
		IoTMqttClientConfig config = new IoTMqttClientConfig();
		config.productKey = pk;
		config.deviceName = dn;
		config.deviceSecret = ds;
		params.mqttClientConfig = config;

		// 设置初始化设备证书信息,用户传入
		DeviceInfo deviceInfo = new DeviceInfo();
		deviceInfo.productKey = pk;
		deviceInfo.deviceName = dn;
		deviceInfo.deviceSecret = ds;

		params.deviceInfo = deviceInfo;

		LinkKit.getInstance().init(params, new ILinkKitConnectListener() {
			@Override
			public void onInitDone(InitResult initResult) {
				System.out.println("init success !!");
			}

			@Override
			public void onError(AError aError) {
				System.out.println("init failed !! code=" + aError.getCode() + ",msg=" + aError.getMsg() + ",subCode="
						+ aError.getSubCode() + ",subMsg=" + aError.getSubMsg());
			}
		});

		// 确保初始化成功后才执行后面的步骤,可以根据实际情况适当延长这里的延时
		Thread.sleep(2000);
	}

	/**
	 * 监听下行数据
	 */
	public void registerNotifyListener() {
		LinkKit.getInstance().registerOnNotifyListener(new IConnectNotifyListener() {
			@Override
			public boolean shouldHandle(String connectId, String topic) {
				// 只处理特定Topic的消息
				if (checkStatusTopic.equals(topic)) {
					return true;
				} else {
					return false;
				}
			}

			@Override
			public void onNotify(String connectId, String topic, AMessage aMessage) {
				// 接收消息
				try {
					subInfo = new String((byte[]) aMessage.getData(), "UTF-8");
				} catch (UnsupportedEncodingException e) {
					e.printStackTrace();
				}
			}

			@Override
			public void onConnectStateChange(String connectId, ConnectState connectState) {
			}
		});
	}

	/**
	 * 发布消息
	 * 
	 * @param topic 发送消息的Topic
	 * @param payload 发送的消息内容
	 */
	public void publish(String topic, String payload) {
		MqttPublishRequest request = new MqttPublishRequest();
		request.topic = topic;
		request.payloadObj = payload;
		request.qos = 0;
		LinkKit.getInstance().getMqttClient().publish(request, new IConnectSendListener() {
			@Override
			public void onResponse(ARequest aRequest, AResponse aResponse) {
			}

			@Override
			public void onFailure(ARequest aRequest, AError aError) {
			}
		});
	}

	/**
	 * 订阅消息
	 * 
	 * @param topic 订阅消息的Topic
	 */
	public void subscribe(String topic) {
		MqttSubscribeRequest request = new MqttSubscribeRequest();
		request.topic = topic;
		LinkKit.getInstance().getMqttClient().subscribe(request, new IConnectSubscribeListener() {
			@Override
			public void onSuccess() {
			}

			@Override
			public void onFailure(AError aError) {
			}
		});
	}

}
说明 检测到设备离线后,尽量不要选择主动重连。

物联网平台规定一个设备1分钟只允许尝试接入平台5次,超过会触发限流,限制设备接入。此时停止接入,等待1分钟即可解除限制。

设备端应注意退避,切勿触发限流。