use an sdk to consume logs based on spl

更新时间:
复制 MD 格式

You can use an SDK to consume logs based on a Consume Processor (SPL). Code examples are provided in Java, Python, and Go.

Prerequisites

  • A Resource Access Management (RAM) user is created and granted the required permissions. For more information, see Create a RAM user and grant permissions.

  • The ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variables are configured. For more information, see Configure environment variables in Linux, macOS, and Windows.

    Important
    • The AccessKey pair of an Alibaba Cloud account has permissions on all API operations. We recommend that you use the AccessKey pair of a RAM user to call API operations or perform routine O&M.

    • Do not include your AccessKey ID or AccessKey secret in your project code. If either is leaked, the security of all resources in your account may be compromised.

  • Create a Consumer Processor

Code examples

Java

  1. Install the Simple Log Service SDK. In the root directory of your Java project, open the pom.xml file and add the following Maven dependencies. For more information, see Install the Java SDK.

    The Simple Log Service SDK for Java must be version 0.6.126 or later.
    <dependency>
      <groupId>com.google.protobuf</groupId>
      <artifactId>protobuf-java</artifactId>
      <version>2.5.0</version>
    </dependency>
    <!-- Import the Simple Log Service SDK -->
    <dependency>
      <groupId>com.aliyun.openservices</groupId>
      <artifactId>aliyun-log</artifactId>
      <version>0.6.126</version>
    </dependency>
  2. Create a PullLogsWithSPLDemo.java file. This example calls the PullLog operation to consume log data based on SPL.

    import com.aliyun.openservices.log.Client;
    import com.aliyun.openservices.log.common.*;
    import com.aliyun.openservices.log.request.PullLogsRequest;
    import com.aliyun.openservices.log.response.ListShardResponse;
    import com.aliyun.openservices.log.response.PullLogsResponse;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    public class PullLogsWithSPLDemo {
        // The endpoint of Simple Log Service. This example uses the China (Hangzhou) region. Replace it with the endpoint of your region.
        private static final String endpoint = "cn-hangzhou.log.aliyuncs.com";
        // This example obtains the AccessKey ID and AccessKey secret from environment variables.
        private static final String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
        private static final String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
        // The project name. Replace it with your actual project name.
        private static final String project = "ali-project-test";
        // The Logstore name. Replace it with your actual Logstore name.
        private static final String logStore = "test-logstore";
    
        public static void main(String[] args) throws Exception {
            // The ID of the Consume Processor.
            String processorName = "processor-test";
            // Create a Simple Log Service client.
            Client client = new Client(endpoint, accessKeyId, accessKeySecret);
            // Query the shards of the Logstore.
            ListShardResponse resp = client.ListShard(project, logStore);        System.out.printf("%s has %d shards\n", logStore, resp.GetShards().size());
            Map<Integer, String> cursorMap = new HashMap<>();
            for (Shard shard : resp.GetShards()) {
                int shardId = shard.getShardId();
                // Start consumption from the beginning and get a cursor. To start from the end, use Consts.CursorMode.END.
                cursorMap.put(shardId, client.GetCursor(project, logStore, shardId, Consts.CursorMode.BEGIN).GetCursor());
            }
            try {
                while (true) {
                    // Get logs from each shard.
                    for (Shard shard : resp.GetShards()) {
                        int shardId = shard.getShardId();
                        PullLogsRequest request = new PullLogsRequest(project, logStore, shardId, 1000, cursorMap.get(shardId));
                        // The ID of the Consume Processor.
                        request.setProcessor(processorName);
                        PullLogsResponse response = client.pullLogs(request);
                        // Logs are in log groups. Split them as needed.
                        List<LogGroupData> logGroups = response.getLogGroups();
                        System.out.printf("Get %d logGroup from logstore:%s:\tShard:%d\n", logGroups.size(), logStore, shardId);
    
                        // Move the cursor after you process the pulled logs.
                        cursorMap.put(shardId, response.getNextCursor());
                    }
                }
            } catch (LogException e) {
                System.out.println("error code :" + e.GetErrorCode());
                System.out.println("error message :" + e.GetErrorMessage());
                throw e;
            }
        }
    }
  3. Run the main function and view the output.

    Get 41 logGroup from logstore:test-logstore:	Shard:0
    Get 49 logGroup from logstore:test-logstore:	Shard:1
    Get 43 logGroup from logstore:test-logstore:	Shard:0
    Get 39 logGroup from logstore:test-logstore:	Shard:1
    ... ...

Python

  1. Install the Simple Log Service SDK. Create a project folder named spl_demo and run the following command in the folder. For more information, see Install the Simple Log Service SDK for Python.

    The Alibaba Cloud Simple Log Service SDK must be version 0.9.28 or later.
    pip install -U aliyun-log-python-sdk
  2. In the spl_demo folder, create a main.py file. This code creates a consumer group and starts a consumer thread to consume data from the specified Logstore.

    # encoding: utf-8
    
    import time
    import os
    from aliyun.log import *
    
    def main():
        # The endpoint of Simple Log Service. This example uses the China (Hangzhou) region. Replace it with the endpoint of your region.
        endpoint = 'cn-hangzhou.log.aliyuncs.com'
        # This example obtains the AccessKey ID and AccessKey secret from environment variables.
        access_key_id = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '')
        access_key = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '')
        # The project name. Replace it with your actual project name.
        project_name = 'ali-project-test'
        # The Logstore name. Replace it with your actual Logstore name.
        logstore_name = 'test-logstore'
        # The ID of the Consume Processor.
        processor = "processor-test"
        init_cursor = 'end'
        log_group_count = 10
    
        # Create a Simple Log Service client.
        client = LogClient(endpoint, access_key_id, access_key)
    
        cursor_map = {}
        # List the shards of the logstore.
        res = client.list_shards(project_name, logstore_name)
        res.log_print()
        shards = res.shards
    
        # Get the initial cursor.
        for shard in shards:
            shard_id = shard.get('shardID')
            res = client.get_cursor(project_name, logstore_name, shard_id, init_cursor)
            cursor_map[shard_id] = res.get_cursor()
    
        # Read data from each shard in a loop.
        while True:
            for shard in shards:
                shard_id = shard.get('shardID')
                res = client.pull_logs(project_name, logstore_name, shard_id, cursor_map.get(shard_id), log_group_count,
                                       processor=processor)
                res.log_print()
                if cursor_map[shard_id] == res.next_cursor:
                    # only for debug
                    time.sleep(3)
                else:
                    cursor_map[shard_id] = res.next_cursor
    
    
    if __name__ == '__main__':
        main()
  3. Run the main function and view the output.

    ListShardResponse:
    headers: {'Server': 'AliyunSLS', 'Content-Type': 'application/json', 'Content-Length': '335', 'Connection': 'keep-alive', 'Access-Control-Allow-Origin': '*', 'Date': 'Wed, 26 Feb 2025 09:46:17 GMT', 'x-log-time': '1740563177', 'x-log-requestid': '67BEE2E9132069E22A1F967D'}
    res: [{'shardID': 0, 'status': 'readwrite', 'inclusiveBeginKey': '00000000000000000000000000000000', 'exclusiveEndKey': '80000000000000000000000000000000', 'createTime': 1737010019}, {'shardID': 1, 'status': 'readwrite', 'inclusiveBeginKey': '80000000000000000000000000000000', 'exclusiveEndKey': 'ffffffffffffffffffffffffffffffff', 'createTime': 1737010019}]
    PullLogResponse
    next_cursor MTczNz********c3ODgyMjQ0MQ==
    log_count 0
    headers: {'Server': 'AliyunSLS', 'Content-Type': 'application/x-protobuf', 'Content-Length': '1', 'Connection': 'keep-alive', 'Access-Control-Allow-Origin': '*', 'Date': 'Wed, 26 Feb 2025 09:46:17 GMT', 'x-log-cursor-time': '0', 'x-log-end-of-cursor': '1', 'x-log-failedlines': '0', 'x-log-rawdatacount': '0', 'x-log-rawdatalines': '0', 'x-log-rawdatasize': '0', 'x-log-read-last-cursor': '0', 'x-log-resultlines': '0', 'x-log-time': '1740563177', 'x-log-bodyrawsize': '0', 'x-log-compresstype': 'gzip', 'x-log-count': '0', 'x-log-cursor': 'MTczNzAx********ODgyMjQ0MQ==', 'x-log-requestid': '67BEE2E974CA9ABCE7DDC7D6'}
    detail: []
    PullLogResponse
    next_cursor MTczNz********c3OTg5NzE3NA==
    log_count 0
    headers: {'Server': 'AliyunSLS', 'Content-Type': 'application/x-protobuf', 'Content-Length': '1', 'Connection': 'keep-alive', 'Access-Control-Allow-Origin': '*', 'Date': 'Wed, 26 Feb 2025 09:46:21 GMT', 'x-log-cursor-time': '0', 'x-log-end-of-cursor': '1', 'x-log-failedlines': '0', 'x-log-rawdatacount': '0', 'x-log-rawdatalines': '0', 'x-log-rawdatasize': '0', 'x-log-read-last-cursor': '0', 'x-log-resultlines': '0', 'x-log-time': '1740563181', 'x-log-bodyrawsize': '0', 'x-log-compresstype': 'gzip', 'x-log-count': '0', 'x-log-cursor': 'MTczNzAx********OTg5NzE3NA==', 'x-log-requestid': '67BEE2EDF2B58CF1756526EF'}
    detail: []
    PullLogResponse
    ... ...

Go

  1. Install the Simple Log Service SDK. Create a project folder named spl_demo and run the following command in the folder. For more information, see Install the Go SDK.

    The Alibaba Cloud Simple Log Service SDK must be version 0.1.107 or later.
    go get -u github.com/aliyun/aliyun-log-go-sdk
  2. In the spl_demo folder, create a main.go file. This code creates a consumer group and starts a consumer thread to consume data from the specified Logstore.

    package main
    
    import (
    	"fmt"
    	"os"
    	"time"
    
    	sls "github.com/aliyun/aliyun-log-go-sdk"
    )
    
    func main() {
    	client := &sls.Client{
    		AccessKeyID:     os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
    		AccessKeySecret: os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"),
    		Endpoint:        "cn-chengdu.log.aliyuncs.com",
    	}
    
    	project := "ali-project-test"
    	logstore := "test-logstore"
    	initCursor := "end"
        // The ID of the Consume Processor.
    	consumeProcessor := "ali-test-consume-processor"
    	shards, err := client.ListShards(project, logstore)
    	if err != nil {
    		fmt.Println("ListShards error", err)
    		return
    	}
    
    	shardCursorMap := map[int]string{}
    	for _, shard := range shards {
    		cursor, err := client.GetCursor(project, logstore, shard.ShardID, initCursor)
    		if err != nil {
    			fmt.Println("GetCursor error", shard.ShardID, err)
    			return
    		}
    		shardCursorMap[shard.ShardID] = cursor
    	}
    
    	for {
    		for _, shard := range shards {
    			pullLogRequest := &sls.PullLogRequest{
    				Project:          project,
    				Logstore:         logstore,
    				ShardID:          shard.ShardID,
    				LogGroupMaxCount: 10,
    				Processor:        consumeProcessor,
    				Cursor:           shardCursorMap[shard.ShardID],
    			  }
    			lg, nextCursor, err := client.PullLogsV2(pullLogRequest)
    			fmt.Println("shard: ", shard.ShardID, "loggroups: ", len(lg.LogGroups), "nextCursor: ", nextCursor)
    			if err != nil {
    				fmt.Println("PullLogsV2 error", shard.ShardID, err)
    				return
    			}
    			shardCursorMap[shard.ShardID] = nextCursor
    			if len(lg.LogGroups) == 0 {
    				// only for debug
    				time.Sleep(time.Duration(3) * time.Second)
    			}
    		}
    	}
    }
  3. Run the main function and view the output.

    shard:  0 loggroups:  41 nextCursor:  MTY5Mz*******TIxNjcxMDcwMQ==
    shard:  1 loggroups:  49 nextCursor:  MTY5Mz*******DYwNDIyNDQ2Mw==
    shard:  0 loggroups:  43 nextCursor:  MTY5Mz*******TIxNjcxMDcwMQ==
    shard:  1 loggroups:  39 nextCursor:  MTY5Mz*******DYwNDIyNDQ2Mw==
    ... ...