全部产品
云市场

自定义MQTT Topic通信

更新时间:2018-11-29 14:35:48

Android SDK 提供了与云端长链接的基础能力接口,用户可以直接使用这些接口完成自定义 Topic 相关的功能。提供的基础能力包括:发布、订阅、取消订阅、RRPC、订阅下行。如果不想使用物模型,可以通过这部分接口实现云端数据的上下行。

上行接口请求

调用上行请求接口,SDK封装了上行发布、订阅和取消订阅等接口。

  1. /**
  2. * 发布
  3. *
  4. * @param request 发布请求
  5. * @param listener 监听器
  6. */
  7. void publish(ARequest request, IConnectSendListener listener);
  8. /**
  9. * 订阅
  10. *
  11. * @param request 订阅请求
  12. * @param listener 监听器
  13. */
  14. void subscribe(ARequest request, IConnectSubscribeListener listener);
  15. /**
  16. * 取消订阅
  17. *
  18. * @param request 取消订阅请求
  19. * @param listener 监听器
  20. */
  21. void unsubscribe(ARequest request, IConnectUnscribeListener listener);

调用示例:QoS 设置可以参照该示例进行设置。

  1. // 发布
  2. MqttPublishRequest request = new MqttPublishRequest();
  3. request.isRPC = false;
  4. // topic 替换成用户自己需要发布的 topic
  5. request.topic = topic;
  6. // 设置 qos
  7. request.qos = 0;
  8. // data 替换成用户需要发布的数据
  9. request.payloadObj = data;
  10. LinkKit.getInstance().publish(request, new IConnectSendListener() {
  11. @Override
  12. public void onResponse(ARequest aRequest, AResponse aResponse) {
  13. // 发布成功
  14. }
  15. @Override
  16. public void onFailure(ARequest aRequest, AError aError) {
  17. // 发布失败
  18. }
  19. });
  20. // 订阅
  21. MqttSubscribeRequest subscribeRequest = new MqttSubscribeRequest();
  22. // subTopic 替换成用户自己需要订阅的 topic
  23. subscribeRequest.topic = subTopic;
  24. subscribeRequest.isSubscribe = true;
  25. LinkKit.getInstance().subscribe(subscribeRequest, new IConnectSubscribeListener() {
  26. @Override
  27. public void onSuccess() {
  28. // 订阅成功
  29. }
  30. @Override
  31. public void onFailure(AError aError) {
  32. // 订阅失败
  33. }
  34. });
  35. // 取消订阅
  36. MqttSubscribeRequest unsubRequest = new MqttSubscribeRequest();
  37. // unSubTopic 替换成用户自己需要取消订阅的 topic
  38. unsubRequest.topic = unSubTopic;
  39. unsubRequest.isSubscribe = false;
  40. LinkKit.getInstance().unsubscribe(unsubRequest, new IConnectUnscribeListener() {
  41. @Override
  42. public void onSuccess() {
  43. // 取消订阅成功
  44. }
  45. @Override
  46. public void onFailure(AError aError) {
  47. // 取消订阅失败
  48. }
  49. });

下行数据监听

下行数据监听可以通过 RRPC 方式或者注册一个下行数据监听器实现。

  1. /**
  2. * RRPC 接口
  3. *
  4. * @param request RRPC 请求
  5. * @param listener 监听器
  6. */
  7. void subscribeRRPC(ARequest request, IConnectRrpcListener listener);
  8. /**
  9. * 注册下行数据监听器
  10. *
  11. * @param listener 监听器
  12. */
  13. void registerOnPushListener(IConnectNotifyListener listener);
  14. /**
  15. * 取消注册下行监听器
  16. *
  17. * @param listener 监听器
  18. */
  19. void unRegisterOnPushListener(IConnectNotifyListener listener);

调用示例(数据监听):

  1. // 下行数据监听
  2. IConnectNotifyListener onPushListener = new IConnectNotifyListener() {
  3. @Override
  4. public void onNotify(String connectId, String topic, AMessage aMessage) {
  5. // 下行数据通知
  6. }
  7. @Override
  8. public boolean shouldHandle(String connectId, String topic) {
  9. return true; // 是否需要处理 该 topic
  10. }
  11. @Override
  12. public void onConnectStateChange(String connectId, ConnectState connectState) {
  13. // 连接状态变化
  14. }
  15. };
  16. // 注册
  17. LinkKit.getInstance().registerOnPushListener(onPushListener);
  18. // 取消注册
  19. LinkKit.getInstance().unRegisterOnPushListener(onPushListener);

调用示例(RRPC):

  1. final MqttRrpcRegisterRequest registerRequest = new MqttRrpcRegisterRequest();
  2. // rrpcTopic 替换成用户自己自定义的 RRPC topic
  3. registerRequest.topic = rrpcTopic;
  4. // rrpcReplyTopic 替换成用户自己定义的RRPC 响应 topic
  5. registerRequest.replyTopic = rrpcReplyTopic;
  6. // 根据需要填写,一般不填
  7. // registerRequest.payloadObj = payload;
  8. // 先订阅 rrpcTopic
  9. // 云端发布消息到 rrpcTopic
  10. // 收到下行数据 回复云端(rrpcReplyTopic) 具体可参考 Demo 同步服务调用
  11. LinkKit.getInstance().subscribeRRPC(registerRequest, new IConnectRrpcListener() {
  12. @Override
  13. public void onSubscribeSuccess(ARequest aRequest) {
  14. // 订阅成功
  15. }
  16. @Override
  17. public void onSubscribeFailed(ARequest aRequest, AError aError) {
  18. // 订阅失败
  19. }
  20. @Override
  21. public void onReceived(ARequest aRequest, IConnectRrpcHandle iConnectRrpcHandle) {
  22. // 收到云端下行
  23. // 响应获取成功
  24. if (iConnectRrpcHandle != null){
  25. AResponse aResponse = new AResponse();
  26. // 仅供参考,具体返回云端的数据用户根据实际场景添加到data结构体
  27. aResponse.data = "{\"id\":\"" + 123 + "\", \"code\":\"200\"" + ",\"data\":{} }";
  28. iConnectRrpcHandle.onRrpcResponse(registerRequest.replyTopic, aResponse);
  29. }
  30. }
  31. @Override
  32. public void onResponseSuccess(ARequest aRequest) {
  33. // RRPC 响应成功
  34. }
  35. @Override
  36. public void onResponseFailed(ARequest aRequest, AError aError) {
  37. // RRPC 响应失败
  38. }
  39. });