使用GetCheckPoint接口,获取指定消费组消费数据时Shard的消费位点。
前提条件
您已完成以下操作:
参数说明
def get_check_point(self, project, logstore, consumer_group, shard=-1):
请求参数
参数 | 类型 | 是否必填 | 说明 |
project | String | 是 | Project的名称。 |
logstore | String | 是 | Logstore的名称。 |
consumer_group | String | 是 | 消费组的名称,在Logstore下唯一。 |
shard | int | 否 | Shard的ID, |
返回参数
示例代码
from aliyun.log import LogClient
import os
def main():
# 本示例从环境变量中获取AccessKey ID和AccessKey Secret
access_key_id = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '')
access_key_secret = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '')
# 日志服务的服务接入点
endpoint = "cn-hangzhou.log.aliyuncs.com"
client = LogClient(endpoint, access_key_id, access_key_secret)
project = "ali-test-peoject"
logstore = "test-logstore"
consumer_group = "consumer_group_test"
shard = -1
try:
response = client.get_check_point(project, logstore, consumer_group, shard)
response.log_print()
except Exception as e:
print(f"An error occurred while creating the consumer group: {e}")
if __name__ == '__main__':
main()
示例返回结果
ListConsumerGroupCheckPoints:
headers: {'Server': 'AliyunSLS', 'Content-Type': 'application/json', 'Content-Length': '237', 'Connection': 'keep-alive', 'Access-Control-Allow-Origin': '*', 'Date': 'Mon, 25 Nov 2024 10:09:42 GMT', 'x-log-time': '1732529382', 'x-log-requestid': '67444CE605099546E07A82E8'}
count: 2
consumer_group_check_points: [{'shard': 0, 'checkpoint': 'MTczMjUxNTg0ODM1OTg3******==', 'updateTime': 1732519816521348, 'consumer': 'consumer-group-1-B'}, {'shard': 1, 'checkpoint': 'MTczMjUxNTg0ODM2MzEx******==', 'updateTime': 1732519816330366, 'consumer': 'consumer-group-1-A'}]
Process finished with exit code 0
相关文档
文档内容是否对您有帮助?