本文向您介绍使用SDK基于SPL消费日志的示例。
前提条件
已创建RAM用户并完成授权。具体操作,请参见创建RAM用户及授权。
已配置环境变量ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET。具体操作,请参见在Linux、macOS和Windows系统配置环境变量。
重要阿里云账号的AccessKey拥有所有API的访问权限,建议您使用RAM用户的AccessKey进行API访问或日常运维。
强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。
代码示例
Java
安装日志服务SDK:在Java项目的根目录下,打开
pom.xml
文件,添加以下Maven依赖。更多信息,请参见安装Java SDK。<dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>2.5.0</version> </dependency> <dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>aliyun-log</artifactId> <version>0.6.120</version> </dependency>
创建
PullLogsWithSPLDemo.java
文件,在本示例中调用PullLog接口读取日志数据,完成使用Java SDK基于SPL消费日志数据的演示。query字段请根据实际情况填写SPL语句。import com.aliyun.openservices.log.Client; import com.aliyun.openservices.log.common.*; import com.aliyun.openservices.log.common.Consts; import com.aliyun.openservices.log.exception.LogException; import com.aliyun.openservices.log.request.PullLogsRequest; import com.aliyun.openservices.log.response.ListShardResponse; import com.aliyun.openservices.log.response.PullLogsResponse; import java.util.HashMap; import java.util.List; import java.util.Map; public class PullLogsWithSPLDemo { // 日志服务的服务接入点。此处以杭州为例,其它地域请根据实际情况填写。 private static final String endpoint = "cn-hangzhou.log.aliyuncs.com"; // 本示例从环境变量中获取 AccessKey ID 和 AccessKey Secret。 private static final String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"); private static final String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"); // Project 名称(需替换为实际Project名)。 private static final String project = "ali-project-test"; // Logstore 名称(需替换为实际Logstore名)。 private static final String logStore = "test-logstore"; public static void main(String[] args) throws Exception { // 创建日志服务 Client。 Client client = new Client(endpoint, accessKeyId, accessKeySecret); // 查询 Logstore 的 Shard。 ListShardResponse resp = client.ListShard(project, logStore); System.out.printf("%s has %d shards\n", logStore, resp.GetShards().size()); Map<Integer, String> cursorMap = new HashMap<>(); for (Shard shard : resp.GetShards()) { int shardId = shard.getShardId(); // 从头开始消费,获取游标。(如果是从尾部开始消费,使用 Consts.CursorMode.END)。 cursorMap.put(shardId, client.GetCursor(project, logStore, shardId, Consts.CursorMode.BEGIN).GetCursor()); } try { while (true) { // 从每个Shard中获取日志。 for (Shard shard : resp.GetShards()) { int shardId = shard.getShardId(); PullLogsRequest request = new PullLogsRequest(project, logStore, shardId, 1000, cursorMap.get(shardId)); request.setQuery("* | where cast(body_bytes_sent as bigint) > 14000"); request.setPullMode("scan_on_stream"); PullLogsResponse response = client.pullLogs(request); // 日志都在日志组(LogGroup)中,按照逻辑拆分即可。 List<LogGroupData> logGroups = response.getLogGroups(); System.out.printf("Get %d logGroup from logstore:%s:\tShard:%d\n", logGroups.size(), logStore, shardId); // 完成处理拉取的日志后,移动游标。 cursorMap.put(shardId, response.getNextCursor()); } } } catch (LogException e) { System.out.println("error code :" + e.GetErrorCode()); System.out.println("error message :" + e.GetErrorMessage()); throw e; } } }
运行Main函数,查看输出结果。
Get 41 logGroup from logstore:test-logstore: Shard:0 Get 49 logGroup from logstore:test-logstore: Shard:1 Get 43 logGroup from logstore:test-logstore: Shard:0 Get 39 logGroup from logstore:test-logstore: Shard:1 ... ...
Go
安装日志服务SDK:创建项目目录spl_demo,在目录下执行如下命令,更多信息,请参见安装Go SDK。
go get -u github.com/aliyun/aliyun-log-go-sdk
在spl_demo目录下创建main.go文件。创建一个消费组并启动一个消费者线程,该消费者会从指定的Logstore中消费数据。query字段请根据实际情况填写SPL语句。
package main import ( "fmt" "time" "os" sls "github.com/aliyun/aliyun-log-go-sdk" ) func main() { client := &sls.Client{ AccessKeyID: os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), AccessKeySecret: os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"), Endpoint: "cn-chengdu.log.aliyuncs.com", } project := "ali-project-test" logstore := "test-logstore" initCursor := "end" query := "* | where cast(body_bytes_sent as bigint) > 14000" shards, err := client.ListShards(project, logstore) if err != nil { fmt.Println("ListShards error", err) return } shardCursorMap := map[int]string{} for _, shard := range shards { cursor, err := client.GetCursor(project, logstore, shard.ShardID, initCursor) if err != nil { fmt.Println("GetCursor error", shard.ShardID, err) return } shardCursorMap[shard.ShardID] = cursor } for { for _, shard := range shards { pullLogRequest := &sls.PullLogRequest{ Project: project, Logstore: logstore, ShardID: shard.ShardID, LogGroupMaxCount: 10, Query: query, Cursor: shardCursorMap[shard.ShardID], } lg, nextCursor, err := client.PullLogsV2(pullLogRequest) fmt.Println("shard: ", shard.ShardID, "loggroups: ", len(lg.LogGroups), "nextCursor: ", nextCursor) if err != nil { fmt.Println("PullLogsV2 error", shard.ShardID, err) return } shardCursorMap[shard.ShardID] = nextCursor if len(lg.LogGroups) == 0 { // only for debug time.Sleep(time.Duration(3) * time.Second) } } } }
运行main函数,查看输出结果
shard: 0 loggroups: 41 nextCursor: MTY5Mz*******TIxNjcxMDcwMQ== shard: 1 loggroups: 49 nextCursor: MTY5Mz*******DYwNDIyNDQ2Mw== shard: 0 loggroups: 43 nextCursor: MTY5Mz*******TIxNjcxMDcwMQ== shard: 1 loggroups: 39 nextCursor: MTY5Mz*******DYwNDIyNDQ2Mw== ... ...
Python
安装日志服务SDK:创建项目目录spl_demo,在目录下执行如下命令,更多信息,请参见安装日志服务Python SDK。
pip install -U aliyun-log-python-sdk
在spl_demo目录下创建main.py文件。创建一个消费组并启动一个消费者线程,该消费者会从指定的Logstore中消费数据。query字段请根据实际情况填写SPL语句。
# encoding: utf-8 import time import os from aliyun.log import * def main(): # 日志服务的服务接入点。此处以杭州为例,其它地域请根据实际情况填写。 endpoint = 'cn-hangzhou.log.aliyuncs.com' # 本示例从环境变量中获取AccessKey ID和AccessKey Secret。 access_key_id = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '') access_key = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '') # Project名称(需替换为实际Project名)。 project_name = 'ali-project-test' # Logstore名称(需替换为实际Logstore名)。 logstore_name = 'test-logstore' query = '* | where cast(cdn_in as bigint) > 70' init_cursor = 'end' log_group_count = 10 # 创建日志服务Client。 client = LogClient(endpoint, access_key_id, access_key) cursor_map = {} # 列举logstore的shards res = client.list_shards(project_name, logstore_name) res.log_print() shards = res.shards # 获取初始cursor for shard in shards: shard_id = shard.get('shardID') res = client.get_cursor(project_name, logstore_name, shard_id, init_cursor) cursor_map[shard_id] = res.get_cursor() # 循环读取每个shard的数据 while True: for shard in shards: shard_id = shard.get('shardID') res = client.pull_logs(project_name, logstore_name, shard_id, cursor_map.get(shard_id), log_group_count, query=query) res.log_print() if cursor_map[shard_id] == res.next_cursor: # only for debug time.sleep(3) else: cursor_map[shard_id] = res.next_cursor if __name__ == '__main__': main()
运行main函数,查看输出结果。
ListShardResponse: headers: {'Server': 'AliyunSLS', 'Content-Type': 'application/json', 'Content-Length': '335', 'Connection': 'keep-alive', 'Access-Control-Allow-Origin': '*', 'Date': 'Wed, 26 Feb 2025 09:46:17 GMT', 'x-log-time': '1740563177', 'x-log-requestid': '67BEE2E9132069E22A1F967D'} res: [{'shardID': 0, 'status': 'readwrite', 'inclusiveBeginKey': '00000000000000000000000000000000', 'exclusiveEndKey': '80000000000000000000000000000000', 'createTime': 1737010019}, {'shardID': 1, 'status': 'readwrite', 'inclusiveBeginKey': '80000000000000000000000000000000', 'exclusiveEndKey': 'ffffffffffffffffffffffffffffffff', 'createTime': 1737010019}] PullLogResponse next_cursor MTczNz********c3ODgyMjQ0MQ== log_count 0 headers: {'Server': 'AliyunSLS', 'Content-Type': 'application/x-protobuf', 'Content-Length': '1', 'Connection': 'keep-alive', 'Access-Control-Allow-Origin': '*', 'Date': 'Wed, 26 Feb 2025 09:46:17 GMT', 'x-log-cursor-time': '0', 'x-log-end-of-cursor': '1', 'x-log-failedlines': '0', 'x-log-rawdatacount': '0', 'x-log-rawdatalines': '0', 'x-log-rawdatasize': '0', 'x-log-read-last-cursor': '0', 'x-log-resultlines': '0', 'x-log-time': '1740563177', 'x-log-bodyrawsize': '0', 'x-log-compresstype': 'gzip', 'x-log-count': '0', 'x-log-cursor': 'MTczNzAx********ODgyMjQ0MQ==', 'x-log-requestid': '67BEE2E974CA9ABCE7DDC7D6'} detail: [] PullLogResponse next_cursor MTczNz********c3OTg5NzE3NA== log_count 0 headers: {'Server': 'AliyunSLS', 'Content-Type': 'application/x-protobuf', 'Content-Length': '1', 'Connection': 'keep-alive', 'Access-Control-Allow-Origin': '*', 'Date': 'Wed, 26 Feb 2025 09:46:21 GMT', 'x-log-cursor-time': '0', 'x-log-end-of-cursor': '1', 'x-log-failedlines': '0', 'x-log-rawdatacount': '0', 'x-log-rawdatalines': '0', 'x-log-rawdatasize': '0', 'x-log-read-last-cursor': '0', 'x-log-resultlines': '0', 'x-log-time': '1740563181', 'x-log-bodyrawsize': '0', 'x-log-compresstype': 'gzip', 'x-log-count': '0', 'x-log-cursor': 'MTczNzAx********OTg5NzE3NA==', 'x-log-requestid': '67BEE2EDF2B58CF1756526EF'} detail: [] PullLogResponse ... ...