获取指定消费组的消费点

使用GetCheckPoint接口,获取指定消费组消费数据时Shard的消费位点。

前提条件

您已完成以下操作:

参数说明

def get_check_point(self, project, logstore, consumer_group, shard=-1):

请求参数

参数

类型

是否必填

说明

project

String

Project的名称。

logstore

String

Logstore的名称。

consumer_group

String

消费组的名称,在Logstore下唯一。

shard

int

ShardID,shard=-1代表所有shard。

返回参数

请参见GetCheckPoint - 获取指定消费组的消费点

示例代码

from aliyun.log import LogClient
import os


def main():
    # 本示例从环境变量中获取AccessKey ID和AccessKey Secret
    access_key_id = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '')
    access_key_secret = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '')
    # 日志服务的服务接入点
    endpoint = "cn-hangzhou.log.aliyuncs.com"

    client = LogClient(endpoint, access_key_id, access_key_secret)

    project = "ali-test-peoject"
    logstore = "test-logstore"
    consumer_group = "consumer_group_test"
    shard = -1

    try:
        response = client.get_check_point(project, logstore, consumer_group, shard)
        response.log_print()
    except Exception as e:
        print(f"An error occurred while creating the consumer group: {e}")


if __name__ == '__main__':
    main()

示例返回结果

ListConsumerGroupCheckPoints:
headers: {'Server': 'AliyunSLS', 'Content-Type': 'application/json', 'Content-Length': '237', 'Connection': 'keep-alive', 'Access-Control-Allow-Origin': '*', 'Date': 'Mon, 25 Nov 2024 10:09:42 GMT', 'x-log-time': '1732529382', 'x-log-requestid': '67444CE605099546E07A82E8'}
count: 2
consumer_group_check_points: [{'shard': 0, 'checkpoint': 'MTczMjUxNTg0ODM1OTg3******==', 'updateTime': 1732519816521348, 'consumer': 'consumer-group-1-B'}, {'shard': 1, 'checkpoint': 'MTczMjUxNTg0ODM2MzEx******==', 'updateTime': 1732519816330366, 'consumer': 'consumer-group-1-A'}]

Process finished with exit code 0

相关文档