Using a consumer group (ConsumerGroup) to consume logs offers significant advantages. You can focus on your business logic without managing implementation details, such as load balancing and failover between consumers. This topic describes how to use a consumer group to consume logs with Python code.
Workflow
A Logstore contains multiple shards. When you use a consumer group, Simple Log Service allocates shards to the consumers in the group based on the following rules.
In a consumer group, each shard is allocated to only one consumer.
In a consumer group, a consumer can be allocated multiple shards.
If a new consumer joins the group, the shards are reassigned to balance the load. The same allocation rules apply.
When you use a consumer group, the system automatically saves a Checkpoint if your program fails. When the program recovers, it can resume consumption from the Checkpoint. This prevents duplicate data consumption.
Prerequisites
-
Simple Log Service is activated.
-
Simple Log Service SDK for Python is initialized.
-
Simple Log Service SDK for Python is installed. For more information, see Install Simple Log Service SDK for Python.
-
Simple Log Service is activated. For more information, see Enable Log Service.
Precautions
In this example, the public Simple Log Service endpoint for the China (Hangzhou) region is used. Endpoint: https://cn-hangzhou.log.aliyuncs.com.
If you want to access Simple Log Service from other Alibaba Cloud services that reside in the same region as your project, you can use the internal Simple Log Service endpoint, which is https://cn-hangzhou-intranet.log.aliyuncs.com.
For more information about the supported regions and endpoints of Simple Log Service, see Endpoints.
Code example
The following code shows how to create a Logstore, write logs to the Logstore, create a consumer group, and consume logs.
import os
import time
from aliyun.log.consumer import *
from aliyun.log import *
from threading import RLock
class SampleConsumer(ConsumerProcessorBase):
shard_id = -1
last_check_time = 0
log_results = []
lock = RLock()
def initialize(self, shard):
self.shard_id = shard
def process(self, log_groups, check_point_tracker):
for log_group in log_groups.LogGroups:
items = []
for log in log_group.Logs:
item = dict()
item['time'] = log.Time
for content in log.Contents:
item[content.Key] = content.Value
items.append(item)
log_items = dict()
log_items['topic'] = log_group.Topic
log_items['source'] = log_group.Source
log_items['logs'] = items
with SampleConsumer.lock:
SampleConsumer.log_results.append(log_items)
print(log_items)
current_time = time.time()
if current_time - self.last_check_time > 3:
try:
self.last_check_time = current_time
check_point_tracker.save_check_point(True)
except Exception:
import traceback
traceback.print_exc()
else:
try:
check_point_tracker.save_check_point(False)
except Exception:
import traceback
traceback.print_exc()
# None means succesful process
# To roll back to the previous checkpoint, return check_point_tracker.get_check_point().
return None
def shutdown(self, check_point_tracker):
try:
check_point_tracker.save_check_point(True)
except Exception:
import traceback
traceback.print_exc()
test_item_count = 20
# Write logs to the Logstore.
def _prepare_data(client, project, logstore):
topic = 'python-ide-test'
source = ''
for i in range(0, test_item_count):
logitemList = [] # LogItem list
contents = [
('user', 'magic_user_' + str(i)),
('avg', 'magic_age_' + str(i))
]
logItem = LogItem()
logItem.set_time(int(time.time()))
logItem.set_contents(contents)
logitemList.append(logItem)
# Write logs to the Logstore.
request = PutLogsRequest(project, logstore, topic, source, logitemList)
response = client.put_logs(request)
print("successfully put logs in logstore")
def sleep_until(seconds, exit_condition=None, expect_error=False):
if not exit_condition:
time.sleep(seconds)
return
s = time.time()
while time.time() - s < seconds:
try:
if exit_condition():
break
except Exception:
if expect_error:
continue
time.sleep(1)
def sample_consumer_group():
# The endpoint of Simple Log Service. This example uses the China (Hangzhou) endpoint. Replace it with your actual endpoint.
endpoint = os.environ.get('ALIYUN_LOG_SAMPLE_ENDPOINT', 'cn-hangzhou.log.aliyuncs.com')
# This example obtains the AccessKey ID and AccessKey secret from environment variables.
accessKeyId = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '')
accessKey = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '')
# The project name. The SDK does not create the project in this example. You must create the project before you run the code.
project = os.environ.get('ALIYUN_LOG_SAMPLE_PROJECT', 'ali-test-project-python')
# The Logstore name. The SDK automatically creates the Logstore in this example. You do not need to create it in advance.
logstore = 'ali-test-logstore'
# The consumer group name. You do not need to create it in advance. The SDK automatically creates it.
consumer_group = 'consumer-group-1'
consumer_name1 = "consumer-group-1-A"
consumer_name2 = "consumer-group-1-B"
token = ""
if not logstore:
logstore = 'consumer_group_test_' + str(time.time()).replace('.', '_')
assert endpoint and accessKeyId and accessKey and project, ValueError("endpoint/access_id/key and "
"project cannot be empty")
# Create the Logstore.
client = LogClient(endpoint, accessKeyId, accessKey, token)
ret = client.create_logstore(project, logstore, 2, 4)
print("successfully create logstore")
time.sleep(60)
SampleConsumer.log_results = []
try:
# Write logs to the Logstore.
_prepare_data(client, project, logstore)
# Create two consumers in the consumer group to consume data.
option1 = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group,
consumer_name1, cursor_position=CursorPosition.BEGIN_CURSOR, heartbeat_interval=6,
data_fetch_interval=1)
option2 = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group,
consumer_name2, cursor_position=CursorPosition.BEGIN_CURSOR, heartbeat_interval=6,
data_fetch_interval=1)
print("*** start to consume data...")
client_worker1 = ConsumerWorker(SampleConsumer, consumer_option=option1)
client_worker1.start()
client_worker2 = ConsumerWorker(SampleConsumer, consumer_option=option2)
client_worker2.start()
sleep_until(300, lambda: len(SampleConsumer.log_results) >= test_item_count)
print("*** consumer group status ***")
ret = client.list_consumer_group(project, logstore)
print("successfully list consumergroup")
for c in ret.get_consumer_groups():
ret = client.get_check_point_fixed(project, logstore, c.get_consumer_group_name())
print("successfully get checkpoint fixed")
print("*** stopping workers")
client_worker1.shutdown()
client_worker2.shutdown()
finally:
# clean-up
# ret = client.delete_logstore(project, logstore)
ret = client.list_logstore(project, logstore)
print("successfully list logstore")
# validate
ret = str(SampleConsumer.log_results)
print("*** get content:")
print(ret)
assert 'magic_user_0' in ret and 'magic_age_0' in ret \
and 'magic_user_' + str(test_item_count-1) in ret \
and 'magic_age_' + str(test_item_count-1) in ret
if __name__ == '__main__':
sample_consumer_group()Expected results:
successfully create logstore
successfully put logs in logstore
successfully put logs in logstore
successfully put logs in logstore
successfully put logs in logstore
successfully put logs in logstore
......
*** start to consume data...
{'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123452, 'user': 'magic_user_0', 'avg': 'magic_age_0'}]}
{'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_1', 'avg': 'magic_age_1'}]}
{'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_3', 'avg': 'magic_age_3'}]}
{'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_5', 'avg': 'magic_age_5'}]}
{'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_6', 'avg': 'magic_age_6'}]}
......
*** consumer group status ***
successfully list consumergroup
successfully get checkpoint fixed
*** stopping workers
successfully list logstore
*** get content:
[{'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123452, 'user': 'magic_user_0', 'avg': 'magic_age_0'}]}, {'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_1', 'avg': 'magic_age_1'}]}, {'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_3', 'avg': 'magic_age_3'}]}, {'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_5', 'avg': 'magic_age_5'}]}, {'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_6', 'avg': 'magic_age_6'}]}, {'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_8', 'avg': 'magic_age_8'}]}, {'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_9', 'avg': 'magic_age_9'}]}, {'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_2', 'avg': 'magic_age_2'}]}, {'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_11', 'avg': 'magic_age_11'}]}, ......}]References
-
In addition to its native SDK, SLS also supports the common Alibaba Cloud SDKs. For more information, see Log Service_SDKcenter-Alibaba CloudOpenAPIDeveloper Portal.