全部产品

Consumer 操作接口

更新时间:2019-04-18 19:59:56

Consumer 相关的操作接口有以下三个:

  • GetConsumerList

  • GetConsumerProgress

  • CreateConsumerGroup

在使用 GetInstanceList 获取您购买地域(Region)下的实例信息后,您可以:

  • 使用 GetConsumerList 接口获取某实例下消费组(Consumer Group)列表。

  • 使用 GetConsumerProgress 接口查询某个消费组的消费进度。

  • 使用 CreateConsumerGroup 接口创建消费组。

注意:所有请求都需要设置此实例所在的地域,而且必须使用 GetInstanceList 返回值里的 instance 的地域。

GetConsumerList

根据实例 ID 获取该实例下的消费组列表。

请求参数列表

名称 类型 是否必须 描述
InstanceId String 实例 ID,可使用 GetInstanceList 获取
RegionId String 此实例所在地域,可使用 GetInstanceList 获取

返回参数列表

名称 类型 描述
RequestId String 请求的唯一标识ID
Code Integer 返回码,返回“200”为成功
Message String 描述信息
ConsumerList Array 消费组列表信息

ConsumerList 数据结构列表

名称 类型 描述
RegionId String 地域的 ID
InstanceId Long 实例 ID
ConsumerId String 消费组 ID

使用示例

该示例的接入地域是“华北 2”,任务是查询某个实例 ID下的消费组列表。

  1. public static void main(String[] args) {
  2. //构建 client
  3. IAcsClient iAcsClient = buildAcsClient();
  4. //构造获取 consumerList 信息 request
  5. GetConsumerListRequest request = new GetConsumerListRequest();
  6. request.setAcceptFormat(FormatType.JSON);
  7. //必要参数实例 ID
  8. request.setInstanceId("alikafka_pre-xxxxxx");
  9. //必要参数,此实例所在地域,必须使用 GetInstanceList 返回值的 instance 的地域
  10. request.setRegionId("cn-xxxxxx");
  11. //获取返回值
  12. try {
  13. GetConsumerListResponse response = iAcsClient.getAcsResponse(request);
  14. if (200 == response.getCode()) {
  15. List<ConsumerListItem> consumerList = response.getConsumerList();
  16. if (CollectionUtils.isNotEmpty(consumerList)) {
  17. for (ConsumerListItem item : consumerList) {
  18. String consumerId = item.getConsumerId();
  19. System.out.println(consumerId);
  20. //......
  21. }
  22. }
  23. } else {
  24. //log warn
  25. }
  26. } catch (ClientException e) {
  27. //log error
  28. }
  29. }
  30. private static IAcsClient buildAcsClient() {
  31. //产品 code
  32. String productName = "alikafka";
  33. //用户 AccessKeyId/AccessKeySecret
  34. String accessKey = "xxxxxx";
  35. String secretKey = "xxxxxx";
  36. //设置接入点相关参数通常 regionId 值和 endPointName 值相等,接入点也是用对应地域的 domain
  37. String regionId = "cn-beijing";
  38. String endPointName = "cn-beijing";
  39. String domain = "alikafka.cn-beijing.aliyuncs.com";
  40. try {
  41. DefaultProfile.addEndpoint(endPointName, regionId, productName, domain);
  42. } catch (ClientException e) {
  43. //log error
  44. }
  45. //构造 client
  46. IClientProfile profile = DefaultProfile.getProfile(regionId, accessKey, secretKey);
  47. return new DefaultAcsClient(profile);
  48. }

GetConsumerProgress

根据实例 ID 和消费组 ID 查询消费组的消费进度信息。

请求参数列表

名称 类型 是否必须 描述
RegionId String 此实例所在地域,可使用 GetInstanceList 的 Response 获取
InstanceId String 实例 ID,可使用 GetInstanceList 获取
ConsumerId String 消费组 ID,可使用 GetConsumerList获取

返回参数列表

名称 类型 描述
RequestId String 请求的唯一标识ID
Code Integer 返回码,返回“200”为成功
Message String 描述信息
ConsumerProgress Struct ConsumerProgress 的数据结构

ConsumerProgress 的数据结构列表

名称 类型 描述
TotalDiff Long 此消费组未消费的消息总量,即堆积量
LastTimestamp Long 此消费组最后被消费的消息的产生时间
TopicList Array 此消费组对应的每个 Topic 的消费进度列表

TopicList 的数据结构列表

名称 类型 描述
Topic String Topic 名称
TotalDiff Integer 该 Topic 的未消费消息总量,即堆积量
LastTimestamp Long 该 Topic 最后被消费的消息的产生时间
OffsetList Array 该 Topic 的消费位点信息

OffsetList 的数据结构列表

名称 类型 描述
Partition Integer 分区的 ID
BrokerOffset Long 最大位点
ConsumerOffset Long 消费位点
LastTimestamp Long 该分区最后被消费的消息的产生时间

使用示例

该示例的接入地域是“华北2”,任务是查询某个实例下的某个消费组的消费进度。

  1. public static void main(String[] args) {
  2. //构建 client
  3. IAcsClient iAcsClient = buildAcsClient();
  4. //构造获取 consumer 消费进度的 request
  5. GetConsumerProgressRequest request = new GetConsumerProgressRequest();
  6. request.setAcceptFormat(FormatType.JSON);
  7. //必要参数,此实例所在地域,必须使用 GetInstanceList 返回值的 instance 的地域
  8. request.setRegionId("cn-xxxxxx");
  9. //必要参数实例 ID
  10. request.setInstanceId("alikafka_pre-xxxxxx");
  11. //必要参数 consumerId
  12. request.setConsumerId("CID_xxxxxx");
  13. //获取返回值
  14. try {
  15. GetConsumerProgressResponse response = iAcsClient.getAcsResponse(request);
  16. if (200 == response.getCode()) {
  17. GetConsumerProgressResponse.ConsumerProgress consumerProgress = response.getConsumerProgress();
  18. long totalDiff = consumerProgress.getTotalDiff();
  19. System.out.println(totalDiff);
  20. for (GetConsumerProgressResponse.ConsumerProgress.TopicListItem item : consumerProgress.getTopicList()) {
  21. System.out.println(item.getTopic());;
  22. item.getTotalDiff();
  23. List<OffsetListItem> offsetListItems = item.getOffsetList();
  24. for (OffsetListItem offsetListItem : offsetListItems) {
  25. long brokerOffset = offsetListItem.getBrokerOffset();
  26. long consumerOffset = offsetListItem.getConsumerOffset();
  27. System.out.println("brokerOffset="+brokerOffset);
  28. System.out.println("consumerOffset="+consumerOffset);
  29. //......
  30. }
  31. }
  32. } else {
  33. //log warn
  34. }
  35. } catch (ClientException e) {
  36. //log error
  37. }
  38. }
  39. private static IAcsClient buildAcsClient() {
  40. //产品 code
  41. String productName = "alikafka";
  42. //用户 AccessKeyId/AccessKeySecret
  43. String accessKey = "xxxxxx";
  44. String secretKey = "xxxxxx";
  45. //设置接入点相关参数通常 regionId 值和 endPointName 值相等,接入点也是用对应地域的 domain
  46. String regionId = "cn-beijing";
  47. String endPointName = "cn-beijing";
  48. String domain = "alikafka.cn-beijing.aliyuncs.com";
  49. try {
  50. DefaultProfile.addEndpoint(endPointName, regionId, productName, domain);
  51. } catch (ClientException e) {
  52. //log error
  53. }
  54. //构造 client
  55. IClientProfile profile = DefaultProfile.getProfile(regionId, accessKey, secretKey);
  56. return new DefaultAcsClient(profile);
  57. }

CreateConsumerGroup

根据实例 ID 创建消费组的接口。

使用限制

该接口有以下使用限制:

  • 接口调用受白名单限制,请提交工单申请将用户 ID 加入白名单。
  • 单用户请求频率限制为 1 QPS。
  • 每个实例下最多可创建 100 个消费组。

请求参数列表

名称 类型 是否必须 描述
InstanceId String 实例 ID,可使用 GetInstanceList 获取
RegionId String 此实例所在地域,可使用 GetInstanceList 获取
ConsumerId String 创建的消费组 ID,限制 64 个字符

返回参数列表

名称 类型 描述
RequestId String 请求的唯一标识 ID
Code Integer 返回码,返回“200”为成功
Success Boolean 成功与否
Message String 描述信息

使用示例

该示例的接入地域是“华北 2”,任务是创建在某个实例下创建一个消费组。

  1. public static void main(String[] args) {
  2. //构建 client
  3. IAcsClient iAcsClient = buildAcsClient();
  4. //构造创建 consumerGroup 的 request
  5. CreateConsumerGroupRequest request = new CreateConsumerGroupRequest();
  6. request.setAcceptFormat(FormatType.JSON);
  7. //必要参数,实例 ID
  8. request.setInstanceId("alikafka_XXXXXXXXXXXXX");
  9. //必要参数,实例所在地域
  10. request.setRegionId("cn-beijing");
  11. //必要参数 consumerGroupID 64 个字符以内
  12. request.setConsumerId("alikafka_for_test_XXXXXX");
  13. //获取返回值
  14. try {
  15. CreateConsumerGroupResponse response = iAcsClient.getAcsResponse(request);
  16. //log requestId for trace
  17. String requestId = response.getRequestId();
  18. if (200 == response.getCode()) {
  19. if (response.getSuccess()) {
  20. //log success
  21. } else {
  22. //log warn
  23. }
  24. } else {
  25. //log warn
  26. }
  27. } catch (ClientException e) {
  28. //log error
  29. }
  30. }
  31. private static IAcsClient buildAcsClient() {
  32. //产品 code
  33. String productName = "alikafka";
  34. //用户 AccessKeyId/AccessKeySecret
  35. String accessKey = "xxxxxx";
  36. String secretKey = "xxxxxx";
  37. //设置接入点相关参数通常 regionId 值和 endPointName 值相等,接入点也是用对应地域的 domain
  38. String regionId = "cn-beijing";
  39. String endPointName = "cn-beijing";
  40. String domain = "alikafka.cn-beijing.aliyuncs.com";
  41. try {
  42. DefaultProfile.addEndpoint(endPointName, regionId, productName, domain);
  43. } catch (ClientException e) {
  44. //log error
  45. }
  46. //构造 client
  47. IClientProfile profile = DefaultProfile.getProfile(regionId, accessKey, secretKey);
  48. return new DefaultAcsClient(profile);
  49. }