Link SDK 提供了与云端长链接的基础能力接口,用户可以直接使用这些接口完成自定义 Topic 相关的功能。提供的基础能力包括:发布、订阅、取消订阅、RRPC、订阅下行。

调用上行请求接口,SDK 封装了上行Publish请求、订阅Subscribe和取消订阅unSubscribe等接口。

/**
 * 发布
 *
 * @param request  发布请求
 * @param listener 监听器
 */
void publish(ARequest request, IConnectSendListener var2);

/**
 * 订阅
 *
 * @param request    订阅请求
 * @param listener 监听器
 */
void subscribe(ARequest request, IConnectSubscribeListener var2);

/**
 * 取消订阅
 *
 * @param request 取消订阅请求
 * @param listener 监听器
 */
void unsubscribe(ARequest request, IConnectUnscribeListener var2);
		

调用示例:MqttPublishRequest 类路径参见 com.aliyun.alink.linksdk.cmp.connect.channel.MqttPublishRequest。

// 发布
MqttPublishRequest request = new MqttPublishRequest();
// topic 用户根据实际场景填写
request.topic = "/sys/" + pk + "/" + dn + "/thing/deviceinfo/update";
/**
 * 订阅回复的 replyTopic
 * 如果业务有相应的响应需求,可以设置 replyTopic,且 isRPC=true
 */
//        request.replyTopic = request.topic + "_reply";
/**
 * isRPC = true; 表示先订阅 replyTopic,然后再发布;
 * isRPC = false; 不会订阅回复
 */
//        request.isRPC = true;
/**
 * 设置请求的 qos
 */
request.qos = 0;
// 更新标签 仅做测试
// payloadObj 替换成用户需要发布的数据 json String
//示例 属性上报 {"id":"160865432","method":"thing.event.property.post","params":{"LightSwitch":1},"version":"1.0"}
request.payloadObj = "{\"id\":2, \"params\":{\"version\":\"1.0.0\"}}";
LinkKit.getInstance().publish(request, new IConnectSendListener() {
    @Override
    public void onResponse(ARequest aRequest, AResponse aResponse) {
        // publish 结果
        ALog.d(TAG, "onResponse " + (aResponse==null?"":aResponse.data));
    }

    @Override
    public void onFailure(ARequest aRequest, AError aError) {
        // publish 失败
        ALog.d(TAG, "onFailure " + (aError==null?"":(aError.getCode()+aError.getMsg())));
    }
});
// 订阅
MqttSubscribeRequest request = new MqttSubscribeRequest();
// topic 用户根据实际场景填写
request.topic = "/sys/" + pk + "/" + dn + "/thing/deviceinfo/update";
request.isSubscribe = true;
LinkKit.getInstance().subscribe(request, new IConnectSubscribeListener() {
    @Override
    public void onSuccess() {
        // 订阅成功
        ALog.d(TAG, "onSuccess ");
    }

    @Override
    public void onFailure(AError aError) {
        // 订阅失败
        ALog.d(TAG, "onFailure " + (aError==null?"":(aError.getCode()+aError.getMsg())));
    }
});
// 取消订阅
MqttSubscribeRequest request = new MqttSubscribeRequest();
// topic 用户根据实际场景填写
request.topic = "/sys/" + pk + "/" + dn + "/thing/deviceinfo/update";
request.isSubscribe = false;
LinkKit.getInstance().unsubscribe(request, new IConnectUnscribeListener() {
    @Override
    public void onSuccess() {
        // 取消订阅成功
        ALog.d(TAG, "onSuccess ");
    }

    @Override
    public void onFailure(AError aError) {
        // 取消订阅失败
        ALog.d(TAG, "onFailure " + (aError==null?"":(aError.getCode()+aError.getMsg())));
    }
});
		

下行数据监听

下行数据监听可以通过 RRPC 方式或者注册一个下行数据监听器实现。

/**
 * RRPC 接口
 * RRPC:先订阅 topic A;云端需要的时候调用设备的服务,通过 topic A下发数据;设备收到数据,回复云端;
 * 另外一种方式是:单独调用订阅接口,然后在 registerOnNotifyListener 接收对应 topic 的下行数据,
 * 并回复云端;
 * @param topic    订阅 topic
 * @param listener 监听器
 */
void registerResource(AResource var1, IResourceRequestListener var2);

/**
 * 注册下行数据监听器,所有已订阅的 topic 下行数据都会在这里返回
 *
 * @param listener 监听器
 */
void registerOnNotifyListener(IConnectNotifyListener listener);

/**
 * 取消注册下行监听器
 *
 * @param listener 监听器
 */
void unRegisterOnNotifyListener(IConnectNotifyListener listener);
			

调用示例:

/**
 * 下行数据接收&处理
 * 设备连接状态变化
 */
private IConnectNotifyListener notifyListener = new IConnectNotifyListener() {
    public void onNotify(String connectId, String topic, AMessage aMessage) {
        // 云端下行数据通知
    }

    public void onConnectStateChange(String connectId, ConnectState connectState) {
        // 设备连接状态通知
    }
    public boolean shouldHandle(String connectId, String topic){
        return true; // 根据实际场景设置
    }
};

/**
 * 所有topic的下行数据入口(前提是先订阅了该 topic,才会在这里收到)
 * 如果想要在这里收到对应 topic 的下行数据,需要先订阅该 topic
 */
public void registerNotifyListener(){
    LinkKit.getInstance().registerOnNotifyListener(notifyListener);
}

/**
 * 取消注册下行的监听器,该 listener 需要保持和注册的 listener 是同一个对象
 */
public void unregisterNotifyListener() {
    LinkKit.getInstance().unRegisterOnNotifyListener(notifyListener);
}
			

RRPC 调用示例:

final CommonResource resource = new CommonResource();
resource.topic = "/ext/rrpc/+/" + productKey + "/" + deviceName + "/get";
resource.replyTopic = resource.topic;

LinkKit.getInstance().registerResource(resource, new IResourceRequestListener() {
  @Override
  public void onHandleRequest(AResource aResource, ResourceRequest resourceRequest, IResourceResponseListener iResourceResponseListener) {
    // 收到云端数据下行
    ALog.d(TAG, "onHandleRequest aResource=" + aResource + ", resourceRequest=" + resourceRequest + ", iResourceResponseListener=" + iResourceResponseListener);
    // 下行数据解析示例
    //                String downstreamData = new String((byte[]) resourceRequest.payloadObj);
    // 示例 {"id":"269297015","version":"1.0","method":"thing.event.property.post","params":{"lightData":{"vv":12}}}

    // 如果数据是json,且包含id字段,格式可以按照如下示例回复,传输数据请根据实际情况定制
    //                if (aResource instanceof  CommonResource) {
    //                    ((CommonResource) aResource).replyTopic = resourceRequest.topic;
    //                }
    //                if (iResourceResponseListener != null) {
    //                    AResponse response = new AResponse();
    //
    //                    response.data = "{\"id\":\"123\", \"code\":\"200\"" + ",\"data\":{} }";
    //                    iResourceResponseListener.onResponse(aResource, resourceRequest, response);
    //                }
    // 如果不一定是json格式,可以参考如下方式回复
    MqttPublishRequest rrpcResponse = new MqttPublishRequest();
    rrpcResponse.topic = resourceRequest.topic;
    rrpcResponse.payloadObj ="xxx";

    LinkKit.getInstance().publish(rrpcResponse,null);
  }

  @Override
  public void onSuccess() {
    // 注册资源成功
    ALog.d(TAG, "onSuccess ");
  }

  @Override
  public void onFailure(AError aError) {
    // 注册资源失败
    ALog.d(TAG, "onFailure " + getError(aError));
  }
});