功能介绍

RRPC是指客户云端通过云端API发起一个 RRPC 调用,该调用将同步返回设备的响应。设备端会收到一个同步请求的 topic, 格式如 /ext/rrpc/{messageId}/{rrpc_topic} 或者 /sys/{pk}/{dn}/rrpc/request/${msgId}, 设备端接收到该消息后,进行处理,并将处理结果 publish 到 /ext/rrpc/{messageId}/{rrpc_topic} 或者 /ext/rrpc/${msgId}/${topic}。需要注意的是设备端首先需要订阅对应的下行topic,才能收到云端的RRPC同步调用。

消息通信API - RRPC

通过调用云端该 RRPC 接口,触发向设备发起 RRPC 同步请求。

设备管理API - InvokeThingsService

通过调用云端该 同步服务 接口或者在物联网平台定义设备的服务为同步服务并触发同步服务调用,即可向设备发起RRPC同步请求。

接口调用

在完成整个RRPC调用链路的过程中,设备端需要去订阅RRPC的topic,并在云端请求的时候,立即响应云端的请求。以下调用流程需要先确保SDK初始化成功,即MQTT建联成功。

RRPC订阅

${topic} 这部分替换成用户自定义的topic,如/a1pMraiYxxx/test/user/get,其中a1pMraiYxxx为具体产品productKey,test为deviceName。系统RRPC无需开发者订阅,在处理云端下行调用的时候系统 RRPC 调用 的 topic 为/sys/${pk}/${dn}/rrpc/request/${requestId}。

MqttSubscribeRequest subscribeRequest = new MqttSubscribeRequest();
subscribeRequest.isSubscribe = true;
// ${topic} 替换成调用方的自定义 topic
subscribeRequest.topic = "/ext/rrpc/+/${topic}";
LinkKit.getInstance().subscribe(subscribeRequest, new IConnectSubscribeListener() {
    @Override
    public void onSuccess() {
        // 订阅成功
    }

    @Override
    public void onFailure(AError aError) {
        // 订阅失败
    }
);
			

RRPC监听和回复

云端RRPC请求可以在下行监听的地方接收到,具体接收到的topic由调用方指定。

// 这个是全局的下行监听设置接口
LinkKit.getInstance().registerOnPushListener(notifyListener);

	/**
     * 下行监听器,云端 MQTT 下行数据都会通过这里回调
     */
private static IConnectNotifyListener notifyListener = new IConnectNotifyListener() {
    /**
         * onNotify 会触发的前提是 shouldHandle 没有指定不处理这个topic
         * @param connectId 连接类型,这里判断是否长链 connectId == ConnectSDK.getInstance().getPersistentConnectId()
         * @param topic 下行的topic
         * @param aMessage 下行的数据内容
         */
    @Override
    public void onNotify(String connectId, String topic, AMessage aMessage) {
        String data = new String((byte[]) aMessage.data);
        // 服务端返回数据示例  data = {"method":"thing.service.test_service","id":"123374967","params":{"vv":60},"version":"1.0.0"}
    }

    /**
         * @param connectId 连接类型,这里判断是否长链 connectId == ConnectSDK.getInstance().getPersistentConnectId()
         * @param topic 下行topic
         * @return 是否要处理这个topic,如果为true,则会回调到onNotify;如果为false,onNotify不会回调这个topic相关的数据。建议默认为true。
         */
    @Override
    public boolean shouldHandle(String connectId, String topic) {
        return true;
    }

    /**
         * @param connectId 连接类型,这里判断是否长链 connectId == ConnectSDK.getInstance().getPersistentConnectId()
         * @param connectState {@link ConnectState}
         *     CONNECTED, 连接成功
         *     DISCONNECTED, 已断链
         *     CONNECTING, 连接中
         *     CONNECTFAIL; 连接失败
         */
    @Override
    public void onConnectStateChange(String connectId, ConnectState connectState) {
        Log.d(TAG, "onConnectStateChange() called with: connectId = [" + connectId + "], connectState = [" + connectState + "]");
    }
};
			

调用示例

设备端

订阅自定义topic "/a1ExY4afKY1/testDevice/user/get",并监听云端下行。RRPC 响应的数据内容根据调用者需要的数据来进行返回,以下示例默认返回空的data。


MqttSubscribeRequest subscribeRequest = new MqttSubscribeRequest();
subscribeRequest.isSubscribe = true;
subscribeRequest.topic = "/a1ExY4afKY1/testDevice/user/get";
LinkKit.getInstance().subscribe(subscribeRequest, new IConnectSubscribeListener() {
    @Override
    public void onSuccess() {
        // 订阅成功
    }

    @Override
    public void onFailure(AError aError) {
        // 订阅失败
    }
);

// 可使用全局的下行监听
LinkKit.getInstance().registerOnPushListener(notifyListener);

private IConnectNotifyListener notifyListener = new IConnectNotifyListener() {
    @Override
    public void onNotify(String connectId, String topic, AMessage aMessage) {
    	if (CONNECT_ID.equals(connectId) && !TextUtils.isEmpty(topic) &&
                    topic.startsWith("/ext/rrpc/")) {
            //示例 topic=/ext/rrpc/1138654706478941696//a1ExY4afKY1/testDevice/user/get
            //ALog.d(TAG, "receice Message=" + new String((byte[]) aMessage.data));
            // 服务端返回数据示例  {"method":"thing.service.test_service","id":"123374967","params":{"vv":60},"version":"1.0.0"}
            MqttPublishRequest request = new MqttPublishRequest();
            request.isRPC = false;
            request.topic = topic;
            String[] array = topic.split("/");
            String resId = array[3];
            request.msgId = resId;
            // TODO 用户根据实际情况填写 仅做参考
            request.payloadObj = "{\"id\":\"" + resId + "\", \"code\":\"200\"" + ",\"data\":{} }";
            LinkKit.getInstance().publish(request, new IConnectSendListener() {
                @Override
                public void onResponse(ARequest aRequest, AResponse aResponse) {
					// 响应成功
                }

                @Override
                public void onFailure(ARequest aRequest, AError aError) {
					// 响应失败
                }
    		});
        }
    }
    @Override
    public boolean shouldHandle(String connectId, String topic) {
        return true;
    }

    @Override
    public void onConnectStateChange(String connectId, ConnectState connectState) {
        Log.d(TAG, "onConnectStateChange() called with: connectId = [" + connectId + "], connectState = [" + connectState + "]");
    }
};
			

云端

云端触发RRPC调用代码如下,参考:调用自定义topic RRPC