使用ListConsumerGroup接口查询指定Logstore名称下的所有消费组。
前提条件
您已完成以下操作:
参数说明
def list_consumer_group(self, project, logstore):
请求参数
参数 | 类型 | 是否必填 | 说明 |
project | String | 是 | Project的名称。 |
logstore | String | 是 | Logstore的名称。 |
返回参数
示例代码
from aliyun.log import LogClient
import os
def main():
# 本示例从环境变量中获取AccessKey ID和AccessKey Secret
access_key_id = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '')
access_key_secret = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '')
# 日志服务的服务接入点
endpoint = "cn-hangzhou.log.aliyuncs.com"
client = LogClient(endpoint, access_key_id, access_key_secret)
project = "ali-test-peoject"
logstore = "test-logstore"
try:
# 创建指定 Project 的 Consumer Group
response = client.list_consumer_group(project, logstore)
response.log_print()
except Exception as e:
print(f"An error occurred while creating the consumer group: {e}")
if __name__ == '__main__':
main()
示例返回结果
ListConsumerGroupResponse:
headers: {'Server': 'AliyunSLS', 'Content-Type': 'application/json', 'Content-Length': '59', 'Connection': 'keep-alive', 'Access-Control-Allow-Origin': '*', 'Date': 'Thu, 24 Oct 2024 11:34:05 GMT', 'x-log-time': '1729769645', 'x-log-requestid': '671A30AD19073AFD9D*****'}
count: 1
consumer_groups: [{'name': 'consumer_group_test', 'timeout': 30, 'order': False}]
相关文档
文档内容是否对您有帮助?