背景信息

RRPC是指用户云端通过云端API发起一个RRPC调用,该调用将同步返回设备的响应。

设备端会收到一个同步请求的topic, 格式如 /ext/rrpc/{messageId}/{rrpc_topic} 或者 /sys/{pk}/{dn}/rrpc/request/${msgId},设备端接收到该消息后进行处理,并将处理结果publish到 /ext/rrpc/{messageId}/{rrpc_topic} 或者 /ext/rrpc/${msgId}/${topic}

消息通信API

通过调用云端该RRPC接口,触发向设备发起RRPC同步请求。

设备管理API

通过调用云端该同步服务接口,或者在物联网平台定义设备的服务为同步服务,并触发同步服务调用,即可向设备发起RRPC同步请求。

接口调用

在完成整个RRPC调用链路的过程中,设备端需要先确保SDK初始化成功,即MQTT建联成功。

RRPC监听和回复

云端RRPC请求可以在下行监听的位置接收到,具体接收到的topic由调用方指定。

// 这个是全局的下行监听设置接口
LinkKit.getInstance().registerOnPushListener(notifyListener);
        /**
     * 下行监听器,云端 MQTT 下行数据都会通过这里回调
     */
private static IConnectNotifyListener notifyListener = new IConnectNotifyListener() {
    /**
         * onNotify 会触发的前提是 shouldHandle 没有指定不处理这个topic
         * @param connectId 连接类型,这里判断是否长链 connectId == ConnectSDK.getInstance().getPersistentConnectId()
         * @param topic 下行的topic
         * @param aMessage 下行的数据内容
         */
    @Override
    public void onNotify(String connectId, String topic, AMessage aMessage) {
        String data = new String((byte[]) aMessage.data);
        // 服务端返回数据示例  data = {"method":"thing.service.test_service","id":"123374967","params":{"vv":60},"version":"1.0.0"}
    }
    /**
         * @param connectId 连接类型,这里判断是否长链 connectId == ConnectSDK.getInstance().getPersistentConnectId()
         * @param topic 下行topic
         * @return 是否要处理这个topic,如果为true,则会回调到onNotify;如果为false,onNotify不会回调这个topic相关的数据。建议默认为true。
         */
    @Override
    public boolean shouldHandle(String connectId, String topic) {
        return true;
    }
    /**
         * @param connectId 连接类型,这里判断是否长链 connectId == ConnectSDK.getInstance().getPersistentConnectId()
         * @param connectState {@link ConnectState}
         *     CONNECTED, 连接成功
         *     DISCONNECTED, 已断链
         *     CONNECTING, 连接中
         *     CONNECTFAIL; 连接失败
         */
    @Override
    public void onConnectStateChange(String connectId, ConnectState connectState) {
        Log.d(TAG, "onConnectStateChange() called with: connectId = [" + connectId + "], connectState = [" + connectState + "]");
    }
};

调用示例

设备端

根据调用者需要的数据,RRPC响应将返回相应的数据内容,以下示例默认返回空的data。

// 可使用全局的下行监听
LinkKit.getInstance().registerOnPushListener(notifyListener);
private IConnectNotifyListener notifyListener = new IConnectNotifyListener() {
    @Override
    public void onNotify(String connectId, String topic, AMessage aMessage) {
            if (CONNECT_ID.equals(connectId) && !TextUtils.isEmpty(topic) &&
                    topic.startsWith("/ext/rrpc/")) {
            //示例 topic=/ext/rrpc/1138654706478941696//a1ExY4afKY1/testDevice/user/get
            //ALog.d(TAG, "receice Message=" + new String((byte[]) aMessage.data));
            // 服务端返回数据示例  {"method":"thing.service.test_service","id":"123374967","params":{"vv":60},"version":"1.0.0"}
            MqttPublishRequest request = new MqttPublishRequest();
            request.isRPC = false;
            request.topic = topic;
            String[] array = topic.split("/");
            String resId = array[3];
            request.msgId = resId;
            // TODO 用户根据实际情况填写 仅做参考
            request.payloadObj = "{\"id\":\"" + resId + "\", \"code\":\"200\"" + ",\"data\":{} }";
            LinkKit.getInstance().publish(request, new IConnectSendListener() {
                @Override
                public void onResponse(ARequest aRequest, AResponse aResponse) {
                                        // 响应成功
                }
                @Override
                public void onFailure(ARequest aRequest, AError aError) {
                                        // 响应失败
                }
                    });
        }
    }
    @Override
    public boolean shouldHandle(String connectId, String topic) {
        return true;
    }
    @Override
    public void onConnectStateChange(String connectId, ConnectState connectState) {
        Log.d(TAG, "onConnectStateChange() called with: connectId = [" + connectId + "], connectState = [" + connectState + "]");
    }
};

云端

云端触发RRPC调用代码参考:调用自定义topic RRPC