使用消费组基于SPL消费日志

通过消费组(ConsumerGroup)消费数据无需关注日志服务的实现细节及消费者之间的负载均衡、故障转移(Failover)等,只需要专注于业务逻辑。本文介绍通过Java消费组、Go消费组和Python消费组的方式,设置SPL语句来消费Logstore中的数据。

前提条件

  • 已创建RAM用户并完成授权。具体操作,请参见创建RAM用户及授权

  • 已配置环境变量ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET。具体操作,请参见Linux、macOSWindows系统配置环境变量

    重要
    • 阿里云账号的AccessKey拥有所有API的访问权限,建议您使用RAM用户的AccessKey进行API访问或日常运维。

    • 强烈建议不要把AccessKey IDAccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。

代码示例

Go

  1. 安装日志服务SDK:创建项目目录spl_demo,在目录下执行如下命令,更多信息,请参见安装Go SDK

    go get -u github.com/aliyun/aliyun-log-go-sdk
  2. spl_demo目录下创建main.go文件。创建一个消费组并启动一个消费者线程,该消费者会从指定的Logstore中消费数据。query字段请根据实际情况填写SPL语句

    package main
    
    import (
      "fmt"
      "os"
      "os/signal"
      "syscall"
    
      sls "github.com/aliyun/aliyun-log-go-sdk"
      consumerLibrary "github.com/aliyun/aliyun-log-go-sdk/consumer"
      "github.com/go-kit/kit/log/level"
    )
    
    // README :
    // This is a very simple example of pulling data from your logstore and printing it for consumption, including pre-handling for logs.
    
    func main() {
      // 日志服务的服务接入点。此处以杭州为例,其它地域请根据实际情况填写。
      option := consumerLibrary.LogHubConfig{
        Endpoint:          "cn-hangzhou.log.aliyuncs.com",
        AccessKeyID:       os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
        AccessKeySecret:   os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"),
        Project:           "your_project",
        Logstore:          "your_logstore",
        ConsumerGroupName: "test-spl-cg",
        ConsumerName:      "test-spl-consumer",
        // This options is used for initialization, will be ignored once consumer group is created and each shard has been started to be consumed.
        // Could be "begin", "end", "specific time format in time stamp", it's log receiving time.
        CursorPosition: consumerLibrary.END_CURSOR,
        // Query is for log pre-handling before return to client, more info refer to https://www.alibabacloud.com/help/zh/sls/user-guide/rule-based-consumption
        Query: "* | where cast(body_bytes_sent as bigint) > 14000",
      }
    
      consumerWorker := consumerLibrary.InitConsumerWorkerWithCheckpointTracker(option, process)
      ch := make(chan os.Signal, 1)
      signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
      consumerWorker.Start()
      if _, ok := <-ch; ok {
        level.Info(consumerWorker.Logger).Log("msg", "get stop signal, start to stop consumer worker", "consumer worker name", option.ConsumerName)
        consumerWorker.StopAndWait()
      }
    }
    
    // Fill in your consumption logic here, and be careful not to change the parameters of the function and the return value,
    // otherwise you will report errors.
    func process(shardId int, logGroupList *sls.LogGroupList, checkpointTracker consumerLibrary.CheckPointTracker) (string, error) {
      fmt.Println(shardId, "loggroup", len(logGroupList.LogGroups))
      checkpointTracker.SaveCheckPoint(false)
      return "", nil
    }
  3. spl_demo目录下执行命令,安装依赖。

    go mod tidy
    go mod vendor
  4. 运行main函数,查看输出结果。

    go run main.go

Python

  1. 安装日志服务SDK:创建项目目录spl_consumer_demo,在目录下执行如下命令,更多信息,请参见安装日志服务Python SDK

    pip install -U aliyun-log-python-sdk
  2. spl_consumer_demo目录下创建main.py文件。创建一个消费组并启动一个消费者线程,该消费者会从指定的Logstore中消费数据。query字段请根据实际情况填写SPL语句

    import os
    import time
    
    from aliyun.log.consumer import *
    from aliyun.log import *
    
    
    class SPLConsumer(ConsumerProcessorBase):
        shard_id = -1
        last_check_time = 0
    
        def initialize(self, shard):
            self.shard_id = shard
    
        def process(self, log_groups, check_point_tracker):
            for log_group in log_groups.LogGroups:
                items = []
                for log in log_group.Logs:
                    item = dict()
                    item['time'] = log.Time
                    for content in log.Contents:
                        item[content.Key] = content.Value
                    items.append(item)
                log_items = dict()
                log_items['topic'] = log_group.Topic
                log_items['source'] = log_group.Source
                log_items['logs'] = items
    
                print(log_items)
    
            current_time = time.time()
            if current_time - self.last_check_time > 3:
                try:
                    self.last_check_time = current_time
                    check_point_tracker.save_check_point(True)
                except Exception:
                    import traceback
                    traceback.print_exc()
            else:
                try:
                    check_point_tracker.save_check_point(False)
                except Exception:
                    import traceback
                    traceback.print_exc()
    
            # None means succesful process
            # if need to roll-back to previous checkpoint,return check_point_tracker.get_check_point()
            return None
    
        def shutdown(self, check_point_tracker):
            try:
                check_point_tracker.save_check_point(True)
            except Exception:
                import traceback
                traceback.print_exc()
    
    
    def sleep_until(seconds, exit_condition=None, expect_error=False):
        if not exit_condition:
            time.sleep(seconds)
            return
    
        s = time.time()
        while time.time() - s < seconds:
            try:
                if exit_condition():
                    break
            except Exception:
                if expect_error:
                    continue
            time.sleep(1)
    
    def spl_consumer_group():
        # 日志服务的服务接入点。此处以杭州为例,其它地域请根据实际情况填写。
        endpoint = os.environ.get('ALIYUN_LOG_SAMPLE_ENDPOINT', 'cn-hangzhou.log.aliyuncs.com')
    
        # 本示例从环境变量中获取AccessKey IDAccessKey Secret。
        access_key_id = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '')
        access_key = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '')
    
        project = 'your_project'
        logstore = 'your_logstore'
    
        # 消费组名称。您无需提前创建,SDK会自动创建该消费组。
        consumer_group = 'consumer-group'
        consumer_name = "consumer-group-name"
        query = "* | where cast(cdn_in as bigint) > 70"
    
        # 在消费组中创建2个消费者消费数据。
        option = LogHubConfig(endpoint,
                              access_key_id,
                              access_key,
                              project,
                              logstore,
                              consumer_group,
                              consumer_name,
                              query=query,
                              cursor_position=CursorPosition.BEGIN_CURSOR,
                              heartbeat_interval=6,
                              data_fetch_interval=1)
    
        print("*** start to consume data...")
        client_worker = ConsumerWorker(SPLConsumer, consumer_option=option)
        client_worker.start()
    
        time.sleep(10000)
    
    
    if __name__ == '__main__':
        spl_consumer_group()
  3. spl_consumer_demo目录下运行main.py,查看结果。

    python main.py

Java

  1. 添加Maven依赖。

    <dependency>
      <groupId>com.google.protobuf</groupId>
      <artifactId>protobuf-java</artifactId>
      <version>2.5.0</version>
    </dependency>
    <dependency>
      <groupId>com.aliyun.openservices</groupId>
      <artifactId>loghub-client-lib</artifactId>
      <version>0.6.47</version>
    </dependency>
  2. 创建消费逻辑,SPLLogHubProcessor.java代码如下所示。

    import com.aliyun.openservices.log.common.FastLog;
    import com.aliyun.openservices.log.common.FastLogContent;
    import com.aliyun.openservices.log.common.FastLogGroup;
    import com.aliyun.openservices.log.common.FastLogTag;
    import com.aliyun.openservices.log.common.LogGroupData;
    import com.aliyun.openservices.loghub.client.ILogHubCheckPointTracker;
    import com.aliyun.openservices.loghub.client.exceptions.LogHubCheckPointException;
    import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
    
    import java.util.List;
    
    public class SPLLogHubProcessor implements ILogHubProcessor {
        private int shardId;
        // 记录上次持久化Checkpoint的时间。
        private long mLastSaveTime = 0;
    
        // initialize 方法会在 processor 对象初始化时被调用一次
        public void initialize(int shardId) {
            this.shardId = shardId;
        }
    
        // 消费数据的主逻辑,消费时的所有异常都需要处理,不能直接抛出。
        public String process(List<LogGroupData> logGroups, ILogHubCheckPointTracker checkPointTracker) {
            // 打印已获取的数据。
            for (LogGroupData logGroup : logGroups) {
                FastLogGroup fastLogGroup = logGroup.GetFastLogGroup();
                System.out.println("Tags");
                for (int i = 0; i < fastLogGroup.getLogTagsCount(); ++i) {
                    FastLogTag logTag = fastLogGroup.getLogTags(i);
                    System.out.printf("%s : %s\n", logTag.getKey(), logTag.getValue());
                }
                for (int i = 0; i < fastLogGroup.getLogsCount(); ++i) {
                    FastLog log = fastLogGroup.getLogs(i);
                    System.out.println("--------\nLog: " + i + ", time: " + log.getTime() + ", GetContentCount: " + log.getContentsCount());
                    for (int j = 0; j < log.getContentsCount(); ++j) {
                        FastLogContent content = log.getContents(j);
                        System.out.println(content.getKey() + "\t:\t" + content.getValue());
                    }
                }
            }
            long curTime = System.currentTimeMillis();
            // 每隔30秒,写一次Checkpoint到服务端。如果30秒内发生Worker异常终止,新启动的Worker会从上一个Checkpoint获取消费数据,可能存在少量的重复数据。
            try {
                if (curTime - mLastSaveTime > 30 * 1000) {
                    // 参数为true表示立即手动将Checkpoint更新到服务端。此外,默认每60秒会自动将内存中缓存的Checkpoint更新到服务端。
                    checkPointTracker.saveCheckPoint(true);
                    mLastSaveTime = curTime;
                } else {
                    // 参数为false表示将Checkpoint缓存在本地,可被自动更新Checkpoint机制更新到服务端。
                    checkPointTracker.saveCheckPoint(false);
                }
            } catch (LogHubCheckPointException e) {
                e.printStackTrace();
            }
            return null;
        }
    
        // 当Worker退出时,会调用该函数,您可以在此处执行清理工作。
        public void shutdown(ILogHubCheckPointTracker checkPointTracker) {
            // 将Checkpoint立即保存到服务端。
            try {
                checkPointTracker.saveCheckPoint(true);
            } catch (LogHubCheckPointException e) {
                e.printStackTrace();
            }
        }
    }
  3. 创建消费者实体,SPLLogHubProcessorFactory.java代码如下所示。

    import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
    import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessorFactory;
    
    class SPLLogHubProcessorFactory implements ILogHubProcessorFactory {
        public ILogHubProcessor generatorProcessor() {
            // 生成一个消费实例。注意:每次调用 generatorProcessor 方法,都应该返回一个新的 SPLLogHubProcessor 对象。
            return new SPLLogHubProcessor();
        }
    }
  4. 创建一个消费者并启动一个消费者线程,该消费者会从指定的Logstore中消费数据。代码如下所示。

    import com.aliyun.openservices.loghub.client.ClientWorker;
    import com.aliyun.openservices.loghub.client.config.LogHubConfig;
    import com.aliyun.openservices.loghub.client.exceptions.LogHubClientWorkerException;
    
    public class Main {
        // 日志服务的服务接入点,请您根据实际情况填写。
        private static String Endpoint = "cn-hangzhou.log.aliyuncs.com";
        // 日志服务项目名称,请您根据实际情况填写。请从已创建项目中获取项目名称。
        private static String Project = "ali-test-project";
        // 日志库名称,请您根据实际情况填写。请从已创建日志库中获取日志库名称。
        private static String Logstore = "ali-test-logstore";
        // 消费组名称,请您根据实际情况填写。您无需提前创建,该程序运行时会自动创建该消费组。
        private static String ConsumerGroup = "ali-test-consumergroup2";
        // 本示例从环境变量中获取AccessKey IDAccessKey Secret。。
        private static String AccessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
        private static String AccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
    
    
        public static void main(String[] args) throws LogHubClientWorkerException, InterruptedException {
            // consumer_1是消费者名称,同一个消费组下面的消费者名称必须不同。不同消费者在多台机器上启动多个进程,均衡消费一个Logstore时,消费者名称可以使用机器IP地址来区分。
            // maxFetchLogGroupSize用于设置每次从服务端获取的LogGroup最大数目,使用默认值即可。您可以使用config.setMaxFetchLogGroupSize(100);调整,取值范围为(0,1000]。
            LogHubConfig config = new LogHubConfig(ConsumerGroup, "consumer_1", Endpoint, Project, Logstore, AccessKeyId, AccessKeySecret, LogHubConfig.ConsumePosition.BEGIN_CURSOR, 1000);
            // setQuery可以设置消费过程中的SLS SPL语句
            config.setQuery("* | where cast(body_bytes_sent as bigint) > 14000");
            ClientWorker worker = new ClientWorker(new SPLLogHubProcessorFactory(), config);
            Thread thread = new Thread(worker);
            // Thread运行之后,ClientWorker会自动运行,ClientWorker扩展了Runnable接口。
            thread.start();
            Thread.sleep(60 * 60 * 1000);
            // 调用WorkerShutdown函数,退出消费实例,关联的线程也会自动停止。
            worker.shutdown();
            // ClientWorker运行过程中会生成多个异步的任务。Shutdown完成后,请等待还在执行的任务安全退出。建议设置sleep30秒。
            Thread.sleep(30 * 1000);
        }
    }
  5. 运行Main.java,以模拟消费Nginx日志为例,打印日志如下。

    :    GET
    request_uri    :    /request/path-3/file-7
    status    :    200
    body_bytes_sent    :    3820
    host    :    www.example.com
    request_time    :    43
    request_length    :    1987
    http_user_agent    :    Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36
    http_referer    :    www.example.com
    http_x_forwarded_for    :    192.168.10.196
    upstream_response_time    :    0.02
    --------
    Log: 158, time: 1635629778, GetContentCount: 14
    ......
        category    :    null
        source    :    127.0.0.1
        topic    :    nginx_access_log
        machineUUID    :    null
    Tags
        __receive_time__    :    1635629815
    --------
    Log: 0, time: 1635629788, GetContentCount: 14
    ......
        category    :    null
        source    :    127.0.0.1
        topic    :    nginx_access_log
        machineUUID    :    null
    Tags
        __receive_time__    :    1635629877
    --------
    ......