调用ConsumerGroupUpdateCheckPoint接口,查询指定消费组消费数据时Shard的checkpoint。
前提条件
您已完成以下操作:
参数说明
def update_check_point(self, project, logstore, consumer_group, shard, check_point,
consumer='', force_success=True):
请求参数
参数 | 类型 | 是否必填 | 说明 |
project | String | 是 | Project的名称。 |
logstore | String | 是 | Logstore的名称。 |
consumer_group | String | 是 | 消费组的名称。 |
shard | int | 是 | Shard的ID。 |
check_point | String | 是 | 消费组消费数据时Shard的checkpoint,请参见获取指定消费组的消费点。 |
consumer | String | 否 | 消费者名称。 |
force_success | bool | 否 | 是否强制更新。如果不指定consumer,则必须置为True。
|
返回参数
示例代码
from aliyun.log import LogClient
import os
# 本示例从环境变量中获取AccessKey ID和AccessKey Secret
access_key_id = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '')
access_key_secret = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '')
# 日志服务的服务接入点
endpoint = "cn-hangzhou.log.aliyuncs.com"
client = LogClient(endpoint, access_key_id, access_key_secret)
# Project名称
project = "project-1"
# Logstore 名称
logstore = "logstore-1"
# consumer_group 名称
consumer_group = "consumer_group_test"
# shard的id
shard = 0
# check_point的名称
check_point = "check_point_1"
# 消费者名称
consumer = ''
# 消费者名称
force_success = True
try:
# 创建指定 Project 的 Consumer Group
response = client.update_check_point(project, logstore, consumer_group, shard, check_point,
consumer, force_success)
response.log_print()
except Exception as e:
print(f"An error occurred while creating the consumer group: {e}")
示例返回结果
header: {'Server': 'AliyunSLS', 'Content-Length': '0', 'Connection': 'keep-alive', 'Access-Control-Allow-Origin': '*', 'Date': 'Fri, 25 Oct 2024 02:11:00 GMT', 'x-log-time': '1729822260', 'x-log-requestid': '671AFE342B225A2F8FD03D85'}
相关文档
该文章对您有帮助吗?