通过消费组(ConsumerGroup)消费数据无需关注日志服务的实现细节及消费者之间的负载均衡、故障转移(Failover)等,只需要专注于业务逻辑。本文介绍通过Java消费组、Go消费组和Python消费组的方式,设置SPL语句来消费Logstore中的数据。
前提条件
已创建RAM用户并完成授权。具体操作,请参见创建RAM用户及授权。
已配置环境变量ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET。具体操作,请参见在Linux、macOS和Windows系统配置环境变量。
重要阿里云账号的AccessKey拥有所有API的访问权限,建议您使用RAM用户的AccessKey进行API访问或日常运维。
强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。
代码示例
Go
安装日志服务SDK:创建项目目录spl_demo,在目录下执行如下命令,更多信息,请参见安装Go SDK。
go get -u github.com/aliyun/aliyun-log-go-sdk
在spl_demo目录下创建main.go文件。创建一个消费组并启动一个消费者线程,该消费者会从指定的Logstore中消费数据。query字段请根据实际情况填写SPL语句。
package main import ( "fmt" "os" "os/signal" "syscall" sls "github.com/aliyun/aliyun-log-go-sdk" consumerLibrary "github.com/aliyun/aliyun-log-go-sdk/consumer" "github.com/go-kit/kit/log/level" ) // README : // This is a very simple example of pulling data from your logstore and printing it for consumption, including pre-handling for logs. func main() { // 日志服务的服务接入点。此处以杭州为例,其它地域请根据实际情况填写。 option := consumerLibrary.LogHubConfig{ Endpoint: "cn-hangzhou.log.aliyuncs.com", AccessKeyID: os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), AccessKeySecret: os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"), Project: "your_project", Logstore: "your_logstore", ConsumerGroupName: "test-spl-cg", ConsumerName: "test-spl-consumer", // This options is used for initialization, will be ignored once consumer group is created and each shard has been started to be consumed. // Could be "begin", "end", "specific time format in time stamp", it's log receiving time. CursorPosition: consumerLibrary.END_CURSOR, // Query is for log pre-handling before return to client, more info refer to https://www.alibabacloud.com/help/zh/sls/user-guide/rule-based-consumption Query: "* | where cast(body_bytes_sent as bigint) > 14000", } consumerWorker := consumerLibrary.InitConsumerWorkerWithCheckpointTracker(option, process) ch := make(chan os.Signal, 1) signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM) consumerWorker.Start() if _, ok := <-ch; ok { level.Info(consumerWorker.Logger).Log("msg", "get stop signal, start to stop consumer worker", "consumer worker name", option.ConsumerName) consumerWorker.StopAndWait() } } // Fill in your consumption logic here, and be careful not to change the parameters of the function and the return value, // otherwise you will report errors. func process(shardId int, logGroupList *sls.LogGroupList, checkpointTracker consumerLibrary.CheckPointTracker) (string, error) { fmt.Println(shardId, "loggroup", len(logGroupList.LogGroups)) checkpointTracker.SaveCheckPoint(false) return "", nil }
在spl_demo目录下执行命令,安装依赖。
go mod tidy go mod vendor
运行main函数,查看输出结果。
go run main.go
Python
安装日志服务SDK:创建项目目录spl_consumer_demo,在目录下执行如下命令,更多信息,请参见安装日志服务Python SDK。
pip install -U aliyun-log-python-sdk
在spl_consumer_demo目录下创建main.py文件。创建一个消费组并启动一个消费者线程,该消费者会从指定的Logstore中消费数据。query字段请根据实际情况填写SPL语句。
import os import time from aliyun.log.consumer import * from aliyun.log import * class SPLConsumer(ConsumerProcessorBase): shard_id = -1 last_check_time = 0 def initialize(self, shard): self.shard_id = shard def process(self, log_groups, check_point_tracker): for log_group in log_groups.LogGroups: items = [] for log in log_group.Logs: item = dict() item['time'] = log.Time for content in log.Contents: item[content.Key] = content.Value items.append(item) log_items = dict() log_items['topic'] = log_group.Topic log_items['source'] = log_group.Source log_items['logs'] = items print(log_items) current_time = time.time() if current_time - self.last_check_time > 3: try: self.last_check_time = current_time check_point_tracker.save_check_point(True) except Exception: import traceback traceback.print_exc() else: try: check_point_tracker.save_check_point(False) except Exception: import traceback traceback.print_exc() # None means succesful process # if need to roll-back to previous checkpoint,return check_point_tracker.get_check_point() return None def shutdown(self, check_point_tracker): try: check_point_tracker.save_check_point(True) except Exception: import traceback traceback.print_exc() def sleep_until(seconds, exit_condition=None, expect_error=False): if not exit_condition: time.sleep(seconds) return s = time.time() while time.time() - s < seconds: try: if exit_condition(): break except Exception: if expect_error: continue time.sleep(1) def spl_consumer_group(): # 日志服务的服务接入点。此处以杭州为例,其它地域请根据实际情况填写。 endpoint = os.environ.get('ALIYUN_LOG_SAMPLE_ENDPOINT', 'cn-hangzhou.log.aliyuncs.com') # 本示例从环境变量中获取AccessKey ID和AccessKey Secret。 access_key_id = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '') access_key = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '') project = 'your_project' logstore = 'your_logstore' # 消费组名称。您无需提前创建,SDK会自动创建该消费组。 consumer_group = 'consumer-group' consumer_name = "consumer-group-name" query = "* | where cast(cdn_in as bigint) > 70" # 在消费组中创建2个消费者消费数据。 option = LogHubConfig(endpoint, access_key_id, access_key, project, logstore, consumer_group, consumer_name, query=query, cursor_position=CursorPosition.BEGIN_CURSOR, heartbeat_interval=6, data_fetch_interval=1) print("*** start to consume data...") client_worker = ConsumerWorker(SPLConsumer, consumer_option=option) client_worker.start() time.sleep(10000) if __name__ == '__main__': spl_consumer_group()
在spl_consumer_demo目录下运行main.py,查看结果。
python main.py
Java
添加Maven依赖。
<dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>2.5.0</version> </dependency> <dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>loghub-client-lib</artifactId> <version>0.6.47</version> </dependency>
创建消费逻辑,
SPLLogHubProcessor.java
代码如下所示。import com.aliyun.openservices.log.common.FastLog; import com.aliyun.openservices.log.common.FastLogContent; import com.aliyun.openservices.log.common.FastLogGroup; import com.aliyun.openservices.log.common.FastLogTag; import com.aliyun.openservices.log.common.LogGroupData; import com.aliyun.openservices.loghub.client.ILogHubCheckPointTracker; import com.aliyun.openservices.loghub.client.exceptions.LogHubCheckPointException; import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor; import java.util.List; public class SPLLogHubProcessor implements ILogHubProcessor { private int shardId; // 记录上次持久化Checkpoint的时间。 private long mLastSaveTime = 0; // initialize 方法会在 processor 对象初始化时被调用一次 public void initialize(int shardId) { this.shardId = shardId; } // 消费数据的主逻辑,消费时的所有异常都需要处理,不能直接抛出。 public String process(List<LogGroupData> logGroups, ILogHubCheckPointTracker checkPointTracker) { // 打印已获取的数据。 for (LogGroupData logGroup : logGroups) { FastLogGroup fastLogGroup = logGroup.GetFastLogGroup(); System.out.println("Tags"); for (int i = 0; i < fastLogGroup.getLogTagsCount(); ++i) { FastLogTag logTag = fastLogGroup.getLogTags(i); System.out.printf("%s : %s\n", logTag.getKey(), logTag.getValue()); } for (int i = 0; i < fastLogGroup.getLogsCount(); ++i) { FastLog log = fastLogGroup.getLogs(i); System.out.println("--------\nLog: " + i + ", time: " + log.getTime() + ", GetContentCount: " + log.getContentsCount()); for (int j = 0; j < log.getContentsCount(); ++j) { FastLogContent content = log.getContents(j); System.out.println(content.getKey() + "\t:\t" + content.getValue()); } } } long curTime = System.currentTimeMillis(); // 每隔30秒,写一次Checkpoint到服务端。如果30秒内发生Worker异常终止,新启动的Worker会从上一个Checkpoint获取消费数据,可能存在少量的重复数据。 try { if (curTime - mLastSaveTime > 30 * 1000) { // 参数为true表示立即手动将Checkpoint更新到服务端。此外,默认每60秒会自动将内存中缓存的Checkpoint更新到服务端。 checkPointTracker.saveCheckPoint(true); mLastSaveTime = curTime; } else { // 参数为false表示将Checkpoint缓存在本地,可被自动更新Checkpoint机制更新到服务端。 checkPointTracker.saveCheckPoint(false); } } catch (LogHubCheckPointException e) { e.printStackTrace(); } return null; } // 当Worker退出时,会调用该函数,您可以在此处执行清理工作。 public void shutdown(ILogHubCheckPointTracker checkPointTracker) { // 将Checkpoint立即保存到服务端。 try { checkPointTracker.saveCheckPoint(true); } catch (LogHubCheckPointException e) { e.printStackTrace(); } } }
创建消费者实体,
SPLLogHubProcessorFactory.java
代码如下所示。import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor; import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessorFactory; class SPLLogHubProcessorFactory implements ILogHubProcessorFactory { public ILogHubProcessor generatorProcessor() { // 生成一个消费实例。注意:每次调用 generatorProcessor 方法,都应该返回一个新的 SPLLogHubProcessor 对象。 return new SPLLogHubProcessor(); } }
创建一个消费者并启动一个消费者线程,该消费者会从指定的Logstore中消费数据。代码如下所示。
import com.aliyun.openservices.loghub.client.ClientWorker; import com.aliyun.openservices.loghub.client.config.LogHubConfig; import com.aliyun.openservices.loghub.client.exceptions.LogHubClientWorkerException; public class Main { // 日志服务的服务接入点,请您根据实际情况填写。 private static String Endpoint = "cn-hangzhou.log.aliyuncs.com"; // 日志服务项目名称,请您根据实际情况填写。请从已创建项目中获取项目名称。 private static String Project = "ali-test-project"; // 日志库名称,请您根据实际情况填写。请从已创建日志库中获取日志库名称。 private static String Logstore = "ali-test-logstore"; // 消费组名称,请您根据实际情况填写。您无需提前创建,该程序运行时会自动创建该消费组。 private static String ConsumerGroup = "ali-test-consumergroup2"; // 本示例从环境变量中获取AccessKey ID和AccessKey Secret。。 private static String AccessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"); private static String AccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"); public static void main(String[] args) throws LogHubClientWorkerException, InterruptedException { // consumer_1是消费者名称,同一个消费组下面的消费者名称必须不同。不同消费者在多台机器上启动多个进程,均衡消费一个Logstore时,消费者名称可以使用机器IP地址来区分。 // maxFetchLogGroupSize用于设置每次从服务端获取的LogGroup最大数目,使用默认值即可。您可以使用config.setMaxFetchLogGroupSize(100);调整,取值范围为(0,1000]。 LogHubConfig config = new LogHubConfig(ConsumerGroup, "consumer_1", Endpoint, Project, Logstore, AccessKeyId, AccessKeySecret, LogHubConfig.ConsumePosition.BEGIN_CURSOR, 1000); // setQuery可以设置消费过程中的SLS SPL语句 config.setQuery("* | where cast(body_bytes_sent as bigint) > 14000"); ClientWorker worker = new ClientWorker(new SPLLogHubProcessorFactory(), config); Thread thread = new Thread(worker); // Thread运行之后,ClientWorker会自动运行,ClientWorker扩展了Runnable接口。 thread.start(); Thread.sleep(60 * 60 * 1000); // 调用Worker的Shutdown函数,退出消费实例,关联的线程也会自动停止。 worker.shutdown(); // ClientWorker运行过程中会生成多个异步的任务。Shutdown完成后,请等待还在执行的任务安全退出。建议设置sleep为30秒。 Thread.sleep(30 * 1000); } }
运行
Main.java
,以模拟消费Nginx日志为例,打印日志如下。: GET request_uri : /request/path-3/file-7 status : 200 body_bytes_sent : 3820 host : www.example.com request_time : 43 request_length : 1987 http_user_agent : Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36 http_referer : www.example.com http_x_forwarded_for : 192.168.10.196 upstream_response_time : 0.02 -------- Log: 158, time: 1635629778, GetContentCount: 14 ...... category : null source : 127.0.0.1 topic : nginx_access_log machineUUID : null Tags __receive_time__ : 1635629815 -------- Log: 0, time: 1635629788, GetContentCount: 14 ...... category : null source : 127.0.0.1 topic : nginx_access_log machineUUID : null Tags __receive_time__ : 1635629877 -------- ......