Link SDK 提供了与云端长链接的基础能力接口,用户可以直接使用这些接口完成自定义 Topic 相关的功能。提供的基础能力包括:发布、订阅、取消订阅、RRPC、订阅下行。
调用上行请求接口,SDK 封装了上行Publish请求、订阅Subscribe和取消订阅unSubscribe等接口。
/**
* 发布
*
* @param request 发布请求
* @param listener 监听器
*/
void publish(ARequest request, IConnectSendListener var2);
/**
* 订阅
*
* @param request 订阅请求
* @param listener 监听器
*/
void subscribe(ARequest request, IConnectSubscribeListener var2);
/**
* 取消订阅
*
* @param request 取消订阅请求
* @param listener 监听器
*/
void unsubscribe(ARequest request, IConnectUnscribeListener var2);
调用示例:MqttPublishRequest 类路径参见 com.aliyun.alink.linksdk.cmp.connect.channel.MqttPublishRequest。
// 发布
MqttPublishRequest request = new MqttPublishRequest();
// topic 用户根据实际场景填写
request.topic = "/sys/" + pk + "/" + dn + "/thing/deviceinfo/update";
/**
* 订阅回复的 replyTopic
* 如果业务有相应的响应需求,可以设置 replyTopic,且 isRPC=true
*/
// request.replyTopic = request.topic + "_reply";
/**
* isRPC = true; 表示先订阅 replyTopic,然后再发布;
* isRPC = false; 不会订阅回复
*/
// request.isRPC = true;
/**
* 设置请求的 qos
*/
request.qos = 0;
// 更新标签 仅做测试
// payloadObj 替换成用户需要发布的数据 json String
//示例 属性上报 {"id":"160865432","method":"thing.event.property.post","params":{"LightSwitch":1},"version":"1.0"}
request.payloadObj = "{\"id\":2, \"params\":{\"version\":\"1.0.0\"}}";
LinkKit.getInstance().publish(request, new IConnectSendListener() {
@Override
public void onResponse(ARequest aRequest, AResponse aResponse) {
// publish 结果
ALog.d(TAG, "onResponse " + (aResponse==null?"":aResponse.data));
}
@Override
public void onFailure(ARequest aRequest, AError aError) {
// publish 失败
ALog.d(TAG, "onFailure " + (aError==null?"":(aError.getCode()+aError.getMsg())));
}
});
// 订阅
MqttSubscribeRequest request = new MqttSubscribeRequest();
// topic 用户根据实际场景填写
request.topic = "/sys/" + pk + "/" + dn + "/thing/deviceinfo/update";
request.isSubscribe = true;
LinkKit.getInstance().subscribe(request, new IConnectSubscribeListener() {
@Override
public void onSuccess() {
// 订阅成功
ALog.d(TAG, "onSuccess ");
}
@Override
public void onFailure(AError aError) {
// 订阅失败
ALog.d(TAG, "onFailure " + (aError==null?"":(aError.getCode()+aError.getMsg())));
}
});
// 取消订阅
MqttSubscribeRequest request = new MqttSubscribeRequest();
// topic 用户根据实际场景填写
request.topic = "/sys/" + pk + "/" + dn + "/thing/deviceinfo/update";
request.isSubscribe = false;
LinkKit.getInstance().unsubscribe(request, new IConnectUnscribeListener() {
@Override
public void onSuccess() {
// 取消订阅成功
ALog.d(TAG, "onSuccess ");
}
@Override
public void onFailure(AError aError) {
// 取消订阅失败
ALog.d(TAG, "onFailure " + (aError==null?"":(aError.getCode()+aError.getMsg())));
}
});
下行数据监听
下行数据监听可以通过 RRPC 方式或者注册一个下行数据监听器实现。
/**
* RRPC 接口
* RRPC:先订阅 topic A;云端需要的时候调用设备的服务,通过 topic A下发数据;设备收到数据,回复云端;
* 另外一种方式是:单独调用订阅接口,然后在 registerOnNotifyListener 接收对应 topic 的下行数据,
* 并回复云端;
* @param topic 订阅 topic
* @param listener 监听器
*/
void registerResource(AResource var1, IResourceRequestListener var2);
/**
* 注册下行数据监听器,所有已订阅的 topic 下行数据都会在这里返回
*
* @param listener 监听器
*/
void registerOnNotifyListener(IConnectNotifyListener listener);
/**
* 取消注册下行监听器
*
* @param listener 监听器
*/
void unRegisterOnNotifyListener(IConnectNotifyListener listener);
调用示例:
/**
* 下行数据接收&处理
* 设备连接状态变化
*/
private IConnectNotifyListener notifyListener = new IConnectNotifyListener() {
public void onNotify(String connectId, String topic, AMessage aMessage) {
// 云端下行数据通知
}
public void onConnectStateChange(String connectId, ConnectState connectState) {
// 设备连接状态通知
}
public boolean shouldHandle(String connectId, String topic){
return true; // 根据实际场景设置
}
};
/**
* 所有topic的下行数据入口(前提是先订阅了该 topic,才会在这里收到)
* 如果想要在这里收到对应 topic 的下行数据,需要先订阅该 topic
*/
public void registerNotifyListener(){
LinkKit.getInstance().registerOnNotifyListener(notifyListener);
}
/**
* 取消注册下行的监听器,该 listener 需要保持和注册的 listener 是同一个对象
*/
public void unregisterNotifyListener() {
LinkKit.getInstance().unRegisterOnNotifyListener(notifyListener);
}
RRPC 调用示例:
final CommonResource resource = new CommonResource();
resource.topic = "/ext/rrpc/+/" + productKey + "/" + deviceName + "/get";
resource.replyTopic = resource.topic;
LinkKit.getInstance().registerResource(resource, new IResourceRequestListener() {
@Override
public void onHandleRequest(AResource aResource, ResourceRequest resourceRequest, IResourceResponseListener iResourceResponseListener) {
// 收到云端数据下行
ALog.d(TAG, "onHandleRequest aResource=" + aResource + ", resourceRequest=" + resourceRequest + ", iResourceResponseListener=" + iResourceResponseListener);
// 下行数据解析示例
// String downstreamData = new String((byte[]) resourceRequest.payloadObj);
// 示例 {"id":"269297015","version":"1.0","method":"thing.event.property.post","params":{"lightData":{"vv":12}}}
// 如果数据是json,且包含id字段,格式可以按照如下示例回复,传输数据请根据实际情况定制
// if (aResource instanceof CommonResource) {
// ((CommonResource) aResource).replyTopic = resourceRequest.topic;
// }
// if (iResourceResponseListener != null) {
// AResponse response = new AResponse();
//
// response.data = "{\"id\":\"123\", \"code\":\"200\"" + ",\"data\":{} }";
// iResourceResponseListener.onResponse(aResource, resourceRequest, response);
// }
// 如果不一定是json格式,可以参考如下方式回复
MqttPublishRequest rrpcResponse = new MqttPublishRequest();
rrpcResponse.topic = resourceRequest.topic;
rrpcResponse.payloadObj ="xxx";
LinkKit.getInstance().publish(rrpcResponse,null);
}
@Override
public void onSuccess() {
// 注册资源成功
ALog.d(TAG, "onSuccess ");
}
@Override
public void onFailure(AError aError) {
// 注册资源失败
ALog.d(TAG, "onFailure " + getError(aError));
}
});