全部产品
存储与CDN 数据库 安全 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网 钉钉智能硬件
消息队列 MQ

查询消费状态

更新时间:2017-12-15 20:40:47

本接口限企业铂金版客户专用,请前往铂金版购买页面查看详情。

描述

OnsConsumerStatus 接口可以查询指定订阅组的详细状态数据,包含订阅关系检查,消费 TPS 统计,负载均衡状态,消费端连接等。

使用场景

消费者状态查询接口用于定位订阅组的详细状态,一般用于在粗略判断消费堆积以及客户端在线情况后,需要排查消费异常的原因时调用.可以判断出指定消费组(CID)的订阅关系是否一致,负载均衡是否正常,以及在线客户端的 Jstack 信息等。

注意:消费者状态查询接口需要调用后端大量接口完成数据聚合,因此查询速度慢,不建议频繁调用。

请求参数列表

名称 类型 是否必须 描述
OnsRegionId String 当前查询 MQ 所在区域,可以通过 ONSRegionList 方法获取
OnsPlatform String 该请求来源,默认是从 POP 平台
PreventCache Long 用于 CSRF 校验,设置为系统当前时间即可
ConsumerId String 需要查询的消费端 CID
Detail Boolean 是否查询详细信息
NeedJstack Boolean 是否打印 JStack 信息

返回参数列表

名称 类型 描述
RequestId String 为公共参数,每个请求独一无二
HelpUrl String 帮助链接
Data ConsumerStatusDo 查询结果

ConsumerStatusDo 数据结构

成员 类型 描述
Online Boolean 是否在线
TotalDiff Long 集群总的消费堆积
ConsumeTps Long 总消费 TPS
LastTimestamp Long 最后更新时间
DelayTime Long 延迟时间
ConsumeModel Long 消费模型
SubscriptionSame Boolean 订阅关系是否一致
RebalanceOK Boolean 客户端 Rebalance 是否正常
ConnectionSet List(ConnectionDo) 该集群当前在线客户端信息
DetailInTopicList List(DetailInTopicDo) 各个 Topic 的消费情况
ConsumerConnectionInfoList List(ConsumerConnectionInfoDo) 该集群在线客户端详细信息,包含 Jstack、消费 RT 时间等信息

ConnectionDo 数据结构

成员 类型 描述
ClientId String 消费实例的 ID
ClientAddr String 该消费实例的地址和端口
Language String 消费端语言
Version String 消费端版本

DetailInTopicDo 数据结构

成员 类型 描述
Topic String Topic 名称
TotalDiff Long 该 Topic 消费总堆积数
LastTimestamp Long 最后更新时间
DelayTime Long 延迟时间

ConsumerConnectionInfoDo 数据结构

成员 类型 描述
ClientId String 消费实例的 ID
Connection String 连接信息
Language String 客户端语言
Version String 客户端版本号
ConsumeModel String 消费类型,集群和广播两种
ConsumeType String 从何处消费
ThreadCount Integer 消费线程数
StartTimeStamp Long 开始时间
LastTimeStamp Long 最后更新时间
SubscriptionSet List(SubscriptionData) 订阅关系集合
RunningDataList List(ConsumerRunningDataDo) 实时状态统计
Jstack List(ThreadTrackDo) Jstack 堆栈信息

SubscriptionData 数据结构

成员 类型 描述
Topic String 订阅的 Topic 名称
SubString String 订阅该 Topic 的子类别 Tag 表达式
SubVersion Long 订阅关系版本号,为自增 long 型
TagsSet List(String) 订阅的 Tag 集合

ConsumerRunningDataDo 数据结构

成员 类型 描述
ConsumerId String 订阅方的 ConsumerId 名称
Topic String 订阅的 Topic 名称
Rt Float 消费 RT 时间,单位 ms
OkTps Float 消费消息成功的 TPS 统计
FailedTps Float 消费消息失败的 TPS 统计
FailedCountPerHour Long 每小时内消费失败的消息数统计

ThreadTrackDo 数据结构

成员 类型 描述
Thread String 线程名称
TrackList List(String) Jstack 堆栈信息字符串

相关 API

  • OnsConsumerAccumulate:查询消费堆积
  • OnsConsumerConnection:查询消费方客户端连接

使用示例


   public static void main(String []args) {
            String regionId = "cn-hangzhou";
            String accessKey = "XXXXXXXXXXXXXXXXX";
            String secretKey = "XXXXXXXXXXXXXXXXX";
            String endPointName ="cn-hangzhou";
            String productName ="Ons";
            String domain ="ons.cn-hangzhou.aliyuncs.com";

            /**
            *根据自己需要访问的区域选择 Region,并设置对应的接入点
            */
            try {
                DefaultProfile.addEndpoint(endPointName,regionId,productName,domain);
            } catch (ClientException e) {
                e.printStackTrace();
            }
            IClientProfile profile= DefaultProfile.getProfile(regionId,accessKey,secretKey);
            IAcsClient iAcsClient= new DefaultAcsClient(profile);
          OnsConsumerStatusRequest request = new OnsConsumerStatusRequest();
            /**
            *ONSRegionId 是指你需要 API 访问 MQ 哪个区域的资源.
            *该值必须要根据 OnsRegionList 方法获取的列表来选择和配置,因为 OnsRegionId 是变动的,不能够写固定值
            */
          request.setOnsRegionId("daily");
          request.setPreventCache(System.currentTimeMillis());
          request.setAcceptFormat(FormatType.JSON);
          request.setConsumerId("CID_Mingduan");
          request.setDetail(true);
          request.setNeedJstack(false);
          try {
              OnsConsumerStatusResponse response=iAcsClient.getAcsResponse(request);
              OnsConsumerStatusResponse.Data data =response.getData();
              List<OnsConsumerStatusResponse.Data.ConnectionDo> connectionDoList = data.getConnectionSet();
              List<OnsConsumerStatusResponse.Data.DetailInTopicDo> detailInTopicDoList =data.getDetailInTopicList();
              List<OnsConsumerStatusResponse.Data.ConsumerConnectionInfoDo> consumerConnectionInfoDoList =data.getConsumerConnectionInfoList();
              System.out.print(data.getOnline()+"  "+data.getTotalDiff()+"  "+data.getConsumeTps()+"  "+
              data.getLastTimestamp()+"  "+data.getDelayTime()+"  "+data.getConsumeModel()+
                      "  "+data.getSubscriptionSame()+"  "+data.getRebalanceOK());
          } catch (ServerException e) {
              e.printStackTrace();
          } catch (ClientException e) {
              e.printStackTrace();
          }
      }
本文导读目录