通过消费组(ConsumerGroup)消费日志数据有显著优点,您无需关注日志服务的实现细节和消费者之间的负载均衡、Failover等,只需关注业务逻辑。本文通过代码示例介绍如何创建、修改、查询、删除消费组等。

前提条件

  • 已开通日志服务。更多信息,请参见开通日志服务
  • 已创建并获取AccessKey。更多信息,请参见访问密钥

    阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维。RAM用户需具备操作日志服务资源的权限。具体操作,请参见为RAM用户授权

  • 已安装Python开发环境。更多信息,请参见Python官网

    日志服务Python SDK支持Pypy2、Pypy3、Python2.6、Python2.7、Python3.3、Python3.4、Python3.5、Python3.6、Python3.7、Python3.8和Python3.9版本。您可以执行python -V命令检查您已安装的Python版本。

  • 已安装日志服务Python SDK。具体操作,请参见安装Java SDK
  • 已创建项目Project和日志库Logstore,并已写入日志到Logstore。具体操作,请参见创建Project示例代码创建Logstore示例代码

注意事项

本示例以华东1(杭州)的公网Endpoint为例,其公网Endpoint为https://cn-hangzhou.log.aliyuncs.com。如果您希望通过与Project同地域的其他阿里云产品访问日志服务,请使用格式为https://cn-hangzhou-intranet.log.aliyuncs.com的私网Endpoint。关于日志服务支持的地域与Endpoint的对应关系,请参见服务入口

创建消费组示例代码

以下代码用于创建名为ali-test-consumergroup的消费组。

from aliyun.log import LogClient

# 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维。
accessKeyId = 'yourAccessKeyId'
accessKey = 'yourAccessKeySecret'
endpoint = "cn-hangzhou.log.aliyuncs.com"
# 创建日志服务Client。
client = LogClient(endpoint, accessKeyId, accessKey)

# Project名称。
project_name = "ali-test-project"

# Logstore名称。
logstore_name = "ali-test-logstore"

# 消费组名称。
consumergroup_name = "ali-test-consumergroup"

if __name__ == '__main__':
    print("ready to create consumergroup")
    res = client.create_consumer_group(project_name, logstore_name, consumergroup_name, 300, in_order=False)
    print("create consumergroup success ")

    res2 = client.list_consumer_group(project_name, logstore_name)
    for r in res2.get_consumer_groups():
        print("The consumergroup name is:" + r.get_consumer_group_name())

预期结果如下:

ready to create consumergroup
create consumergroup success
The consumergroup name is:ali-test-consumergroup

修改消费组示例代码

以下代码用于修改名为ali-test-consumergroup的消费组信息。

from aliyun.log import LogClient

# 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维。
accessKeyId = 'yourAccessKeyId'
accessKey = 'yourAccessKeySecret'
endpoint = "cn-hangzhou.log.aliyuncs.com"
# 创建日志服务Client。
client = LogClient(endpoint, accessKeyId, accessKey)

# Project名称。
project_name = "ali-test-project"

# Logstore名称。
logstore_name = "ali-test-logstore"

# 消费组名称。
consumergroup_name = "ali-test-consumergroup"

if __name__ == '__main__':
    print("ready to update consumergroup")
    # 修改消费组超时时间为350秒。
    res = client.update_consumer_group(project_name, logstore_name, consumergroup_name, 350, in_order=False)
    print("update consumergroup success ")

    res2 = client.list_consumer_group(project_name, logstore_name)
    for r in res2.get_consumer_groups():
        print("The consumergroup name is:" + r.get_consumer_group_name())
        print("The consumergroup timeout is:%s" % r.get_timeout())

预期结果如下:

ready to update consumergroup
update consumergroup success
The consumergroup name is:ali-test-consumergroup
The consumergroup timeout is:350

查询所有消费组示例代码

以下代码用于查询指定Logstore的所有消费组。

from aliyun.log import LogClient

# 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维。
accessKeyId = 'yourAccessKeyId'
accessKey = 'yourAccessKeySecret'
endpoint = "cn-hangzhou.log.aliyuncs.com"
# 创建日志服务Client。
client = LogClient(endpoint, accessKeyId, accessKey)

# Project名称。
project_name = "ali-test-project"

# Logstore名称。
logstore_name = "ali-test-logstore"

if __name__ == '__main__':
    print("ready to list consumergroup")
    # 查询指定Logstore的所有消费组。
    res = client.list_consumer_group(project_name, logstore_name)
    for r in res.get_consumer_groups():
        print("The consumergroup name is:" + r.get_consumer_group_name())
        print("The consumergroup timeout is:%s" % r.get_timeout())
        print("The consumergroup order is:%s" % r.is_in_order())
    print("list consumergroup success ")

预期结果如下:

ready to list consumergroup
The consumergroup name is:ali-test-consumergroup
The consumergroup timeout is:350
The consumergroup order is:False
list consumergroup success

删除消费组示例代码

以下代码用于删除目标Project下的消费组。

from aliyun.log import LogClient

# 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维。
accessKeyId = 'yourAccessKeyId'
accessKey = 'yourAccessKeySecret'
endpoint = "cn-hangzhou.log.aliyuncs.com"
# 创建日志服务Client。
client = LogClient(endpoint, accessKeyId, accessKey)

# Project名称。
project_name = "ali-test-project"

# Logstore名称。
logstore_name = "ali-test-logstore"

# 消费组名称。
consumergroup_name = "ali-test-consumergroup2"

if __name__ == '__main__':
    print("ready to delete consumergroup")
    # 删除指定消费组。
    res = client.delete_consumer_group(project_name, logstore_name, consumergroup_name)
    print("delete consumergroup success ")

预期结果如下:

ready to delete consumergroup
delete consumergroup success

获取消费组CheckPoint示例代码

以下代码用于获取指定消费组的CheckPoint。

from aliyun.log import LogClient, ListConsumerGroupResponse

# 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维。
accessKeyId = 'yourAccessKeyId'
accessKey = 'yourAccessKeySecret'
endpoint = "cn-hangzhou.log.aliyuncs.com"
# 创建日志服务Client。
client = LogClient(endpoint, accessKeyId, accessKey)

# Project名称。
project_name = "ali-test-project"

# Logstore名称。
logstore_name = "ali-test-logstore"

# 消费组名称。
consumergroup_name = "ali-test-consumergroup"

if __name__ == '__main__':
    print("ready to get CheckPoint")
    # 获取指定消费组中Shard的CheckPoint。
    res = client.get_check_point(project_name, logstore_name, consumergroup_name, shard=0)
    print("The consumergroup checkpoints info is:%s" % res.get_consumer_group_check_points())
    print("list CheckPoint success in shard_0")

预期结果如下:

ready to get CheckPoint
The consumergroup checkpoints info is:[{'shard': 0, 'checkpoint': 'MTY3MDk5OTY3NzEzMzQzODg2NQ==', 'updateTime': 1671607210514072, 'consumer': 'consumer_1'}]
list CheckPoint success in shard_0

相关文档