服务端检测设备是否在线

本文提供通过使用RRPC来判定设备是否在线的原理、流程、实现方式。

背景信息

基于MQTT接入的设备靠心跳保活,但心跳具有周期性、自动收发包和超时重连等特性,这些特性给主动检测设备端是否在线带来了一定难度。服务端虽提供了GetDeviceStatusBatchGetDeviceState接口查询设备状态,但是调用API获取设备状态是基于会话实现的,会话保活是建立在心跳基础上的。

原理

如果设备可以接收服务端下发的消息并作出响应,则该设备通信正常,设备一定在线。

消息收发是物联网平台的核心能力。因此,这种判定方法不会因为物联网平台架构升级或业务变动而变化,也不会因为设备使用的客户端不同而不同。这是服务端检测设备是否在线最通用的一种原理。

物联网平台提供的RRPC能力就是该原理的一种特殊实现。

流程

  1. 设备端订阅RRPC请求的Topic是/sys/${yourProductKey}/${yourDeviceName}/rrpc/request/+

  2. 服务端调用RRpc接口发送指令,例如{"id":123,"version":"1.0","time":1234567890123}

    消息内容可自定义,但建议使用此格式。

    参数说明如下表。

    字段

    类型

    说明

    id

    Object

    消息ID,用于验证收发的消息是否是同一条。请自行在业务层生成,并保证其唯一性。

    version

    String

    版本号。目前版本号固定为1.0。

    time

    Long

    发送消息的时间戳,可以用于计算消息来回的延时,评估当前的通信质量。

  3. 设备端接收指令并响应RRPC请求。接收的消息格式:{"id":123,"version":"1.0","time":1234567890123}

    离线判定逻辑如下。

    • 严格:发送消息后,5秒内没有收到消息算失败,出现1次失败,判定为离线。

    • 普通:发送消息后,5秒内没有收到消息算失败,连续2次失败,判定为离线。

    • 宽松:发送消息后,5秒内没有收到消息算失败,连续3次失败,判定为离线。

    说明

    您可以根据自己的情况,自定义离线判定逻辑。

实现

为方便体验,本例基于设备端Java SDK Demo服务端Java SDK Demo开发。

说明

您可以根据实际情况,选择不同的设备端SDK服务端SDK进行开发。RRpc接口的参数说明,请参见RRpc

分别下载Demo工程,服务端添加CheckDeviceStatusOnServer类,设备端添加Device类,填写您的阿里云账号AccessKey信息和设备证书信息。

设备端代码如下:

import java.io.UnsupportedEncodingException;

import com.aliyun.alink.dm.api.DeviceInfo;
import com.aliyun.alink.dm.api.InitResult;
import com.aliyun.alink.linkkit.api.ILinkKitConnectListener;
import com.aliyun.alink.linkkit.api.IoTMqttClientConfig;
import com.aliyun.alink.linkkit.api.LinkKit;
import com.aliyun.alink.linkkit.api.LinkKitInitParams;
import com.aliyun.alink.linksdk.cmp.connect.channel.MqttPublishRequest;
import com.aliyun.alink.linksdk.cmp.connect.channel.MqttSubscribeRequest;
import com.aliyun.alink.linksdk.cmp.core.base.AMessage;
import com.aliyun.alink.linksdk.cmp.core.base.ARequest;
import com.aliyun.alink.linksdk.cmp.core.base.AResponse;
import com.aliyun.alink.linksdk.cmp.core.base.ConnectState;
import com.aliyun.alink.linksdk.cmp.core.listener.IConnectNotifyListener;
import com.aliyun.alink.linksdk.cmp.core.listener.IConnectSendListener;
import com.aliyun.alink.linksdk.cmp.core.listener.IConnectSubscribeListener;
import com.aliyun.alink.linksdk.tools.AError;

public class Device {

    // ===================需要用户填写的参数,开始===========================
    // 产品productKey,设备证书参数之一
    private static String productKey = "";
    // 设备名字deviceName,设备证书参数之一
    private static String deviceName = "";
    // 设备密钥deviceSecret,设备证书参数之一
    private static String deviceSecret = "";
    // 消息通信的Topic,无需创建和定义,直接使用即可
    private static String rrpcTopic = "/sys/" + productKey + "/" + deviceName + "/rrpc/request/+";
    // ===================需要用户填写的参数,结束===========================

    public static void main(String[] args) throws InterruptedException {

        Device device = new Device();

        // 初始化
        device.init(productKey, deviceName, deviceSecret);

        // 下行数据监听
        device.registerNotifyListener();

        // 订阅Topic
        device.subscribe(rrpcTopic);
    }

    /**
     * 初始化
     * 
     * @param pk productKey
     * @param dn deviceName
     * @param ds deviceSecret
     * @throws InterruptedException
     */
    public void init(String pk, String dn, String ds) throws InterruptedException {

        LinkKitInitParams params = new LinkKitInitParams();

        // 设置 MQTT 初始化参数
        IoTMqttClientConfig config = new IoTMqttClientConfig();
        config.productKey = pk;
        config.deviceName = dn;
        config.deviceSecret = ds;
        params.mqttClientConfig = config;

        // 设置初始化设备证书信息,用户传入
        DeviceInfo deviceInfo = new DeviceInfo();
        deviceInfo.productKey = pk;
        deviceInfo.deviceName = dn;
        deviceInfo.deviceSecret = ds;

        params.deviceInfo = deviceInfo;

        LinkKit.getInstance().init(params, new ILinkKitConnectListener() {
            public void onError(AError aError) {
                System.out.println("init failed !! code=" + aError.getCode() + ",msg=" + aError.getMsg() + ",subCode="
                        + aError.getSubCode() + ",subMsg=" + aError.getSubMsg());
            }

            public void onInitDone(InitResult initResult) {
                System.out.println("init success !!");
            }
        });

        // 确保初始化成功后才执行后面的步骤,可以根据实际情况适当延长这里的延时
        Thread.sleep(2000);
    }

    /**
     * 监听下行数据
     */
    public void registerNotifyListener() {
        LinkKit.getInstance().registerOnNotifyListener(new IConnectNotifyListener() {
            @Override
            public boolean shouldHandle(String connectId, String topic) {
                // 只处理特定Topic的消息
                if (topic.contains("/rrpc/request/")) {
                    return true;
                } else {
                    return false;
                }
            }

            @Override
            public void onNotify(String connectId, String topic, AMessage aMessage) {
                // 接收RRPC请求并回复RRPC响应
                try {
                    String response = topic.replace("/request/", "/response/");
                    publish(response, new String((byte[]) aMessage.getData(), "UTF-8"));
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                }
            }

            @Override
            public void onConnectStateChange(String connectId, ConnectState connectState) {
            }
        });
    }

    /**
     * 发布消息
     * 
     * @param topic 发送消息的Topic
     * @param payload 发送的消息内容
     */
    public void publish(String topic, String payload) {
        MqttPublishRequest request = new MqttPublishRequest();
        request.topic = topic;
        request.payloadObj = payload;
        request.qos = 0;
        LinkKit.getInstance().getMqttClient().publish(request, new IConnectSendListener() {
            @Override
            public void onResponse(ARequest aRequest, AResponse aResponse) {
            }

            @Override
            public void onFailure(ARequest aRequest, AError aError) {
            }
        });
    }

    /**
     * 订阅消息
     * 
     * @param topic 订阅消息的Topic
     */
    public void subscribe(String topic) {
        MqttSubscribeRequest request = new MqttSubscribeRequest();
        request.topic = topic;
        LinkKit.getInstance().getMqttClient().subscribe(request, new IConnectSubscribeListener() {
            @Override
            public void onSuccess() {
            }

            @Override
            public void onFailure(AError aError) {
            }
        });
    }

}

服务端代码如下:

import java.io.UnsupportedEncodingException;

import org.apache.commons.codec.binary.Base64;

import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.exceptions.ClientException;
import com.aliyuncs.exceptions.ServerException;
import com.aliyuncs.iot.model.v20180120.RRpcRequest;
import com.aliyuncs.iot.model.v20180120.RRpcResponse;
import com.aliyuncs.profile.DefaultProfile;
import com.aliyuncs.profile.IClientProfile;

public class CheckDeviceStatusOnServer {

    // ===================需要用户填写的参数,开始===========================
    // 用户账号AccessKey
    private static String accessKeyID = "";
    // 用户账号AccesseKeySecret
    private static String accessKeySecret = "";
    // 产品productKey,设备证书参数之一
    private static String productKey = "";
    // 设备名字deviceName,设备证书参数之一
    private static String deviceName = "";
    // 实例ID。
    private static String instanceId = "iot-******02";
    // ===================需要用户填写的参数,结束===========================

    public static void main(String[] args) throws ServerException, ClientException, UnsupportedEncodingException {

        // -------------------------------------------------------------------
        // 要发送的消息,可以自定义,建议使用当前格式
        // -------------------------------------------------------------------
        // Field   | Tyep   | Desc
        // -------------------------------------------------------------------
        // id      | Object | 用于验证收发的消息是否是同一个,请自行业务层保证唯一
        // -------------------------------------------------------------------
        // version | String | 版本号固定1.0
        // -------------------------------------------------------------------
        // time    | Long   | 发送消息的时间戳,可以计算消息来回的延时,评估当前的通信质量
        // -------------------------------------------------------------------
        String payload = "{\"id\":123, \"version\":\"1.0\",\"time\":" + System.currentTimeMillis() + "}";

        // 构建RRPC请求
        RRpcRequest request = new RRpcRequest();
        request.setProductKey(productKey);
        request.setDeviceName(deviceName);
        request.setIotInstanceId(instanceId);
        request.setRequestBase64Byte(Base64.encodeBase64String(payload.getBytes()));
        request.setTimeout(5000);

        // 获取服务端请求客户端
        DefaultAcsClient client = getClient();

        // 发起RRPC请求
        RRpcResponse response = (RRpcResponse) client.getAcsResponse(request);

        // RRPC响应处理
        // 这个不能看response.getSuccess(),这个仅表明RRPC请求发送成功,不代表设备接收成功和响应成功
        // 需要根据RrpcCode来判定,参考文档https://help.aliyun.com/document_detail/69797.html
        if (response != null && "SUCCESS".equals(response.getRrpcCode())) {
            if (payload.equals(new String(Base64.decodeBase64(response.getPayloadBase64Byte()), "UTF-8"))) {
                System.out.println("Device is online");
            } else {
                System.out.println("Device is offline1");
            }
        } else {
            System.out.println("Device is offline");
        }
    }

    public static DefaultAcsClient getClient() {

        DefaultAcsClient client = null;

        try {
            // 以下代码中cn-shanghai仅为示例,需替换为您物联网平台服务的地域ID。
            IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", accessKeyID, accessKeySecret);
            DefaultProfile.addEndpoint("cn-shanghai", "cn-shanghai", "Iot", "iot.cn-shanghai.aliyuncs.com");
            client = new DefaultAcsClient(profile);
        } catch (Exception e) {
            System.out.println("init client failed !! exception:" + e.getMessage());
        }

        return client;
    }
}
说明

需服务端主动触发RRPC调用,检测是否能及时收到设备端响应结果。