全部产品
弹性计算 会员服务 网络 安全 移动云 数加·大数据分析及展现 数加·大数据应用 管理与监控 云通信 阿里云办公 培训与认证 智能硬件
存储与CDN 数据库 域名与网站(万网) 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网 更多
消息队列 MQ

查询消费堆积

更新时间:2018-03-08 21:59:21

本接口限企业铂金版客户专用,请前往铂金版购买页查看详情。

描述

OnsConsumerAccumulate 接口用于查询指定Consumer ID的消费堆积情况,可以获取到Consumer ID当前未消费的消息数以及大概的延迟时间。

使用场景

消费堆积查询一般在生产环境中需要关注Consumer ID消费进度时使用,用于粗略判断消息消费情况和延迟情况。

请求参数列表

名称 类型 是否必须 描述
OnsRegionId String 当前查询 MQ 所在区域,可以通过 ONSRegionList 方法获取
OnsPlatform String 该请求来源,默认是从 POP 平台
PreventCache Long 用于 CSRF 校验,设置为系统当前时间即可
ConsumerId String 需要查询的消费端 CID
Detail Boolean 是否查询详细信息,默认为否

返回参数列表

名称 类型 描述
RequestId String 为公共参数,每个请求独一无二
HelpUrl String 帮助链接
Data Data 指定 Consumer 的消费堆积情况

Data 数据结构

成员 类型 描述
OnLine Boolean 当前消费者是否在线
TotalDiff Long 当前 Consumer ID 订阅的所有 Topic 的消息总堆积数
ConsumeTps Float 当前消费 TPS
LastTimestamp Long 最后更新时刻
DelayTime Long 延迟时间
DetailInTopicList List(OnsConsumerAccumulateResponse.Data.DetailInTopicDo) 各个 Topic 具体情况

DetailInTopicList 数据结构

成员 类型 描述
Topic String Topic 名称
TotalDiff Long 当前 Topic 的消费堆积
LastTimestamp Long 最后更新时刻
DelayTime Long 当前 Topic 的消费延迟时间

相关 API

  • OnsConsumerStatus:消费者状态详情查询
  • OnsConsumerConnection:消费者连接查询
  • OnsResetOffset:消费清理堆积

使用示例

  1. public static void main(String []args) {
  2. String regionId = "cn-hangzhou";
  3. String accessKey = "XXXXXXXXXXXXXXXXX";
  4. String secretKey = "XXXXXXXXXXXXXXXXX";
  5. String endPointName ="cn-hangzhou";
  6. String productName ="Ons";
  7. String domain ="ons.cn-hangzhou.aliyuncs.com";
  8. /**
  9. *根据自己需要访问的区域选择 Region,并设置对应的接入点
  10. */
  11. try {
  12. DefaultProfile.addEndpoint(endPointNameregionIdproductNamedomain);
  13. } catch (ClientException e) {
  14. e.printStackTrace();
  15. }
  16. IClientProfile profile= DefaultProfile.getProfile(regionIdaccessKeysecretKey);
  17. IAcsClient iAcsClient= new DefaultAcsClient(profile);
  18. OnsConsumerAccumulateRequest request = new OnsConsumerAccumulateRequest();
  19. // request.setCluster("taobaodaily");
  20. /**
  21. *ONSRegionId 是指你需要 API 访问 MQ 哪个区域的资源.
  22. *该值必须要根据 OnsRegionList 方法获取的列表来选择和配置,因为 OnsRegionId 是变动的,不能够写固定值
  23. */
  24. request.setOnsRegionId("daily");
  25. request.setPreventCache(System.currentTimeMillis());
  26. request.setAcceptFormat(FormatType.JSON);
  27. request.setDetail(true);
  28. request.setConsumerId("RTDSQ_1013_GROUP");
  29. try {
  30. OnsConsumerAccumulateResponse response=iAcsClient.getAcsResponse(request);
  31. OnsConsumerAccumulateResponse.Data data =response.getData();
  32. System.out.println(data.getOnline()+" "+data.getTotalDiff()+" "+data.getConsumeTps()+" "
  33. +data.getDelayTime()+" "+data.getLastTimestamp());
  34. for (OnsConsumerAccumulateResponse.Data.DetailInTopicDo detailInTopicDo:data.getDetailInTopicList()){
  35. System.out.println(detailInTopicDo.getTopic()+" "
  36. +detailInTopicDo.getTotalDiff()+" "
  37. +detailInTopicDo.getLastTimestamp()+" "+detailInTopicDo.getDelayTime());
  38. }
  39. } catch (ServerException e) {
  40. e.printStackTrace();
  41. } catch (ClientException e) {
  42. e.printStackTrace();
  43. }
  44. }
本文导读目录