全部产品
云市场

自定义MQTT Topic通信

更新时间:2019-05-17 09:39:36

LinkKit SDK 提供了与云端长链接的基础能力接口,用户可以直接使用这些接口完成自定义 Topic 相关的功能。提供的基础能力包括:发布、订阅、取消订阅、RRPC、订阅下行。

上行接口请求

调用上行请求接口,SDK 封装了上行Publish请求、订阅Subscribe和取消订阅unSubscribe等接口。

  1. /**
  2. * 发布
  3. *
  4. * @param request 发布请求
  5. * @param listener 监听器
  6. */
  7. void publish(ARequest request, IConnectSendListener var2);
  8. /**
  9. * 订阅
  10. *
  11. * @param request 订阅请求
  12. * @param listener 监听器
  13. */
  14. void subscribe(ARequest request, IConnectSubscribeListener var2);
  15. /**
  16. * 取消订阅
  17. *
  18. * @param request 取消订阅请求
  19. * @param listener 监听器
  20. */
  21. void unsubscribe(ARequest request, IConnectUnscribeListener var2);

调用示例:
MqttPublishRequest 类路径参见 com.aliyun.alink.linksdk.cmp.connect.channel.MqttPublishRequest。

  1. // 发布
  2. MqttPublishRequest request = new MqttPublishRequest();
  3. // topic 用户根据实际场景填写
  4. request.topic = "/sys/" + pk + "/" + dn + "/thing/deviceinfo/update";
  5. /**
  6. * 订阅回复的 replyTopic
  7. * 如果业务有相应的响应需求,可以设置 replyTopic,且 isRPC=true
  8. */
  9. // request.replyTopic = request.topic + "_reply";
  10. /**
  11. * isRPC = true; 表示先订阅 replyTopic,然后再发布;
  12. * isRPC = false; 不会订阅回复
  13. */
  14. // request.isRPC = true;
  15. /**
  16. * 设置请求的 qos
  17. */
  18. request.qos = 0;
  19. // 更新标签 仅做测试
  20. // payloadObj 替换成用户需要发布的数据 json String
  21. //示例 属性上报 {"id":"160865432","method":"thing.event.property.post","params":{"LightSwitch":1},"version":"1.0"}
  22. request.payloadObj = "{\"id\":2, \"params\":{\"version\":\"1.0.0\"}}";
  23. LinkKit.getInstance().publish(request, new IConnectSendListener() {
  24. @Override
  25. public void onResponse(ARequest aRequest, AResponse aResponse) {
  26. // publish 结果
  27. ALog.d(TAG, "onResponse " + (aResponse==null?"":aResponse.data));
  28. }
  29. @Override
  30. public void onFailure(ARequest aRequest, AError aError) {
  31. // publish 失败
  32. ALog.d(TAG, "onFailure " + (aError==null?"":(aError.getCode()+aError.getMsg())));
  33. }
  34. });
  35. // 订阅
  36. MqttSubscribeRequest request = new MqttSubscribeRequest();
  37. // topic 用户根据实际场景填写
  38. request.topic = "/sys/" + pk + "/" + dn + "/thing/deviceinfo/update";
  39. request.isSubscribe = true;
  40. LinkKit.getInstance().subscribe(request, new IConnectSubscribeListener() {
  41. @Override
  42. public void onSuccess() {
  43. // 订阅成功
  44. ALog.d(TAG, "onSuccess ");
  45. }
  46. @Override
  47. public void onFailure(AError aError) {
  48. // 订阅失败
  49. ALog.d(TAG, "onFailure " + (aError==null?"":(aError.getCode()+aError.getMsg())));
  50. }
  51. });
  52. // 取消订阅
  53. MqttSubscribeRequest request = new MqttSubscribeRequest();
  54. // topic 用户根据实际场景填写
  55. request.topic = "/sys/" + pk + "/" + dn + "/thing/deviceinfo/update";
  56. request.isSubscribe = false;
  57. LinkKit.getInstance().unsubscribe(request, new IConnectUnscribeListener() {
  58. @Override
  59. public void onSuccess() {
  60. // 取消订阅成功
  61. ALog.d(TAG, "onSuccess ");
  62. }
  63. @Override
  64. public void onFailure(AError aError) {
  65. // 取消订阅失败
  66. ALog.d(TAG, "onFailure " + (aError==null?"":(aError.getCode()+aError.getMsg())));
  67. }
  68. });

下行数据监听

下行数据监听可以通过 RRPC 方式或者注册一个下行数据监听器实现。

  1. /**
  2. * RRPC 接口
  3. * RRPC:先订阅 topic A;云端需要的时候调用设备的服务,通过 topic A下发数据;设备收到数据,回复云端;
  4. * 另外一种方式是:单独调用订阅接口,然后在 registerOnNotifyListener 接收对应 topic 的下行数据,
  5. * 并回复云端;
  6. * @param topic 订阅 topic
  7. * @param listener 监听器
  8. */
  9. void registerResource(AResource var1, IResourceRequestListener var2);
  10. /**
  11. * 注册下行数据监听器,所有已订阅的 topic 下行数据都会在这里返回
  12. *
  13. * @param listener 监听器
  14. */
  15. void registerOnNotifyListener(IConnectNotifyListener listener);
  16. /**
  17. * 取消注册下行监听器
  18. *
  19. * @param listener 监听器
  20. */
  21. void unRegisterOnNotifyListener(IConnectNotifyListener listener);

调用示例:

  1. /**
  2. * 下行数据接收&处理
  3. * 设备连接状态变化
  4. */
  5. private IConnectNotifyListener notifyListener = new IConnectNotifyListener() {
  6. public void onNotify(String connectId, String topic, AMessage aMessage) {
  7. // 云端下行数据通知
  8. }
  9. public void onConnectStateChange(String connectId, ConnectState connectState) {
  10. // 设备连接状态通知
  11. }
  12. public boolean shouldHandle(String connectId, String topic){
  13. return true; // 根据实际场景设置
  14. }
  15. };
  16. /**
  17. * 所有topic的下行数据入口(前提是先订阅了该 topic,才会在这里收到)
  18. * 如果想要在这里收到对应 topic 的下行数据,需要先订阅该 topic
  19. */
  20. public void registerNotifyListener(){
  21. LinkKit.getInstance().registerOnNotifyListener(notifyListener);
  22. }
  23. /**
  24. * 取消注册下行的监听器,该 listener 需要保持和注册的 listener 是同一个对象
  25. */
  26. public void unregisterNotifyListener() {
  27. LinkKit.getInstance().unRegisterOnNotifyListener(notifyListener);
  28. }


RRPC 调用示例:

  1. final CommonResource resource = new CommonResource();
  2. resource.topic = "/ext/rrpc/+/" + productKey + "/" + deviceName + "/get";
  3. resource.replyTopic = resource.topic;
  4. LinkKit.getInstance().registerResource(resource, new IResourceRequestListener() {
  5. @Override
  6. public void onHandleRequest(AResource aResource, ResourceRequest resourceRequest, IResourceResponseListener iResourceResponseListener) {
  7. // 收到云端数据下行
  8. ALog.d(TAG, "onHandleRequest aResource=" + aResource + ", resourceRequest=" + resourceRequest + ", iResourceResponseListener=" + iResourceResponseListener);
  9. // 下行数据解析示例
  10. // String downstreamData = new String((byte[]) resourceRequest.payloadObj);
  11. // 示例 {"id":"269297015","version":"1.0","method":"thing.event.property.post","params":{"lightData":{"vv":12}}}
  12. // 如果数据是json,且包含id字段,格式可以按照如下示例回复,传输数据请根据实际情况定制
  13. // if (aResource instanceof CommonResource) {
  14. // ((CommonResource) aResource).replyTopic = resourceRequest.topic;
  15. // }
  16. // if (iResourceResponseListener != null) {
  17. // AResponse response = new AResponse();
  18. //
  19. // response.data = "{\"id\":\"123\", \"code\":\"200\"" + ",\"data\":{} }";
  20. // iResourceResponseListener.onResponse(aResource, resourceRequest, response);
  21. // }
  22. // 如果不一定是json格式,可以参考如下方式回复
  23. MqttPublishRequest rrpcResponse = new MqttPublishRequest();
  24. rrpcResponse.topic = resourceRequest.topic;
  25. rrpcResponse.payloadObj ="xxx";
  26. LinkKit.getInstance().publish(rrpcResponse,null);
  27. }
  28. @Override
  29. public void onSuccess() {
  30. // 注册资源成功
  31. ALog.d(TAG, "onSuccess ");
  32. }
  33. @Override
  34. public void onFailure(AError aError) {
  35. // 注册资源失败
  36. ALog.d(TAG, "onFailure " + getError(aError));
  37. }
  38. });