Use Simple Log Service SDK for Python to manage consumer groups

更新时间:
复制 MD 格式

Consumer groups simplify log data consumption. You can focus on your business logic instead of the implementation details of Simple Log Service, load balancing between consumers, and failovers. This topic provides code examples that show you how to create, modify, query, and delete consumer groups.

Prerequisites

Precautions

In this example, the public Simple Log Service endpoint for the China (Hangzhou) region is used. Endpoint: https://cn-hangzhou.log.aliyuncs.com.

If you want to access Simple Log Service from other Alibaba Cloud services that reside in the same region as your project, you can use the internal Simple Log Service endpoint, which is https://cn-hangzhou-intranet.log.aliyuncs.com.

For more information about the supported regions and endpoints of Simple Log Service, see Endpoints.

Sample code that is used to create a consumer group

The following sample code provides an example on how to create a consumer group named ali-test-consumergroup:

from aliyun.log import LogClient
import os

# This example obtains the AccessKey ID and AccessKey secret from environment variables.
accessKeyId = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '')
accessKey = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '')
# The endpoint of Simple Log Service. This example uses the endpoint of the China (Hangzhou) region. Replace it with the actual endpoint.
endpoint = "cn-hangzhou.log.aliyuncs.com"
# Create a Simple Log Service client.
client = LogClient(endpoint, accessKeyId, accessKey)

# The project name.
project_name = "ali-test-project"

# The Logstore name.
logstore_name = "ali-test-logstore"

# The consumer group name.
consumergroup_name = "ali-test-consumergroup"

if __name__ == '__main__':
    print("ready to create consumergroup")
    res = client.create_consumer_group(project_name, logstore_name, consumergroup_name, 300, in_order=False)
    print("create consumergroup success ")

    res2 = client.list_consumer_group(project_name, logstore_name)
    for r in res2.get_consumer_groups():
        print("The consumergroup name is:" + r.get_consumer_group_name())

Expected results:

ready to create consumergroup
create consumergroup success
The consumergroup name is:ali-test-consumergroup

Sample code that is used to modify a consumer group

The following sample code provides an example on how to modify the ali-test-consumergroup consumer group:

from aliyun.log import LogClient
import os

# This example obtains the AccessKey ID and AccessKey secret from environment variables.
accessKeyId = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '')
accessKey = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '')
# The endpoint of Simple Log Service. This example uses the endpoint of the China (Hangzhou) region. Replace it with the actual endpoint.
endpoint = "cn-hangzhou.log.aliyuncs.com"
# Create a Simple Log Service client.
client = LogClient(endpoint, accessKeyId, accessKey)

# The project name.
project_name = "ali-test-project"

# The Logstore name.
logstore_name = "ali-test-logstore"

# The consumer group name.
consumergroup_name = "ali-test-consumergroup"

if __name__ == '__main__':
    print("ready to update consumergroup")
    # Change the timeout period of the consumer group to 350 seconds.
    res = client.update_consumer_group(project_name, logstore_name, consumergroup_name, 350, in_order=False)
    print("update consumergroup success ")

    res2 = client.list_consumer_group(project_name, logstore_name)
    for r in res2.get_consumer_groups():
        print("The consumergroup name is:" + r.get_consumer_group_name())
        print("The consumergroup timeout is:%s" % r.get_timeout())

Expected results:

ready to update consumergroup
update consumergroup success
The consumergroup name is:ali-test-consumergroup
The consumergroup timeout is:350

Sample code that is used to query all consumer groups

The following code shows how to query all consumer groups in a specified Logstore.

from aliyun.log import LogClient
import os

# This example obtains the AccessKey ID and AccessKey secret from environment variables.
accessKeyId = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '')
accessKey = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '')
# The endpoint of Simple Log Service. This example uses the endpoint of the China (Hangzhou) region. Replace it with the actual endpoint.
endpoint = "cn-hangzhou.log.aliyuncs.com"
# Create a Simple Log Service client.
client = LogClient(endpoint, accessKeyId, accessKey)

# The project name.
project_name = "ali-test-project"

# The Logstore name.
logstore_name = "ali-test-logstore"

if __name__ == '__main__':
    print("ready to list consumergroup")
    # Query all consumer groups in the specified Logstore.
    res = client.list_consumer_group(project_name, logstore_name)
    for r in res.get_consumer_groups():
        print("The consumergroup name is:" + r.get_consumer_group_name())
        print("The consumergroup timeout is:%s" % r.get_timeout())
        print("The consumergroup order is:%s" % r.is_in_order())
    print("list consumergroup success ")

Expected results:

ready to list consumergroup
The consumergroup name is:ali-test-consumergroup
The consumergroup timeout is:350
The consumergroup order is:False
list consumergroup success

Sample code that is used to delete a consumer group

The following sample code provides an example on how to delete a consumer group in a specified project:

from aliyun.log import LogClient
import os

# This example obtains the AccessKey ID and AccessKey secret from environment variables.
accessKeyId = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '')
accessKey = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '')
# The endpoint of Simple Log Service. This example uses the endpoint of the China (Hangzhou) region. Replace it with the actual endpoint.
endpoint = "cn-hangzhou.log.aliyuncs.com"
# Create a Simple Log Service client.
client = LogClient(endpoint, accessKeyId, accessKey)

# The project name.
project_name = "ali-test-project"

# The Logstore name.
logstore_name = "ali-test-logstore"

# The consumer group name.
consumergroup_name = "ali-test-consumergroup2"

if __name__ == '__main__':
    print("ready to delete consumergroup")
    # Delete the specified consumer group.
    res = client.delete_consumer_group(project_name, logstore_name, consumergroup_name)
    print("delete consumergroup success ")

Expected results:

ready to delete consumergroup
delete consumergroup success

Sample code that is used to obtain a checkpoint of a consumer group

The following sample code provides an example on how to obtain a checkpoint of a specified consumer group:

from aliyun.log import LogClient, ListConsumerGroupResponse
import os

# This example obtains the AccessKey ID and AccessKey secret from environment variables.
accessKeyId = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '')
accessKey = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '')
# The endpoint of Simple Log Service. This example uses the endpoint of the China (Hangzhou) region. Replace it with the actual endpoint.
endpoint = "cn-hangzhou.log.aliyuncs.com"
# Create a Simple Log Service client.
client = LogClient(endpoint, accessKeyId, accessKey)

# The project name.
project_name = "ali-test-project"

# The Logstore name.
logstore_name = "ali-test-logstore"

# The consumer group name.
consumergroup_name = "ali-test-consumergroup"

if __name__ == '__main__':
    print("ready to get CheckPoint")
    # Get the checkpoint of a shard in the specified consumer group.
    res = client.get_check_point(project_name, logstore_name, consumergroup_name, shard=0)
    print("The consumergroup checkpoints info is:%s" % res.get_consumer_group_check_points())
    print("list CheckPoint success in shard_0")

Expected results:

ready to get CheckPoint
The consumergroup checkpoints info is:[{'shard': 0, 'checkpoint': 'MTY3MDk5OTY3NzEzMzQzODg2NQ==', 'updateTime': 1671607210514072, 'consumer': 'consumer_1'}]
list CheckPoint success in shard_0

References