使用日志服务收集Spark作业日志

ACK集群中运行Spark作业时会生成大量的日志分散在不同的Pods中,导致日志管理变得困难。您可以通过日志服务(SLS)提供的一站式的日志采集、加工、查询与分析、可视化和告警等能力,实现对Spark日志的高效管理。本文将介绍如何使用日志服务(SLS)对运行在ACK集群中的Spark作业日志进行管理。

前提条件

流程概述

本文将引导您完成如下步骤,帮助您了解如何配置SLS以管理Spark作业产生的系统日志和业务日志。

  1. 构建Spark容器镜像:构建包含了log4j JSON template layout依赖的Spark容器镜像,并推送到您的镜像仓库中。

  2. 配置Log4j2日志:创建一个ConfigMap资源,用于配置Log4j2日志,设定日志级别为INFO,并将日志打印格式设定为JSONL格式。

  3. 创建Logtail配置:创建一个AliyunConfig资源,日志服务将相应地在指定的日志库中创建Logtail采集配置,对通过Spark operator提交的Spark作业日志进行收集。

  4. 提交示例Spark作业:创建并运行示例Spark作业,查看pod日志输出是否为JSONL格式,并对部分字段含义进行说明。

  5. 查询和分析Spark日志:登录SLS控制台,查询和分析指定时间段内的Spark作业日志。

  6. (可选)环境清理:在完成测试后,清理无需使用的Spark作业和资源,避免产生额外的费用。

步骤一:构建Spark容器镜像

创建如下Dockerfile(本示例使用Spark 3.5.3版本),并将所需的依赖项添加到Spark的类路径中。构建完成后,将该镜像推送到您的镜像仓库。为了方便日志的收集和解析,我们将采用JSONL格式输出日志。

ARG SPARK_IMAGE=<SPARK_IMAGE>  # 需要将<SPARK_IMAGE>替换成您自己的Spark基础镜像。

FROM ${SPARK_IMAGE}

# Add dependency for log4j-layout-template-json
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/apache/logging/log4j/log4j-layout-template-json/2.24.1/log4j-layout-template-json-2.24.1.jar ${SPARK_HOME}/jars

步骤二:配置Log4j2日志

使用如下内容创建一个名为spark-log-conf.yaml的文件,并将日志级别设定为INFO,同时配置日志打印格式为JSONL格式,日志模板采用Elastic Common Schema(ECS),一种标准化的日志格式。有关更多配置请参见采集Log4j日志

apiVersion: v1
kind: ConfigMap
metadata:
  name: spark-log-conf
  namespace: default
data:
  log4j2.properties: |
    # Set everything to be logged to the console and file
    rootLogger.level = info

    rootLogger.appenderRefs = console, file
    rootLogger.appenderRef.console.ref = STDOUT
    rootLogger.appenderRef.file.ref = FileAppender

    appender.console.name = STDOUT
    appender.console.type = Console
    appender.console.layout.type = JsonTemplateLayout
    appender.console.layout.eventTemplateUri = classpath:EcsLayout.json

    appender.file.name = FileAppender
    appender.file.type = File
    appender.file.fileName = /opt/spark/logs/spark.log
    appender.file.layout.type = JsonTemplateLayout
    appender.file.layout.eventTemplateUri = classpath:EcsLayout.json

执行如下命令创建ConfigMap资源。

kubectl apply -f spark-log-conf.yaml

预期输出:

configmap/spark-log-conf created

步骤三:创建Logtail配置

使用如下内容创建一个名为aliyun-log-config.yamlAliyunLogConfig清单文件,并在其中替换<SLS_PROJECT>为您的SLS Project的名称,<SLS_LOGSTORE> 为您的SLS Logstore名称。有关更多配置选项请参见使用AliyunLogConfig管理采集配置

apiVersion: log.alibabacloud.com/v1alpha1
kind: AliyunLogConfig
metadata:
  name: spark
  namespace: default
spec:
  # (可选)目标project名称(默认为 k8s-log-<Your_Cluster_ID>)
  project: <SLS_PROJECT>

  # Logstore 名称。如果您所指定的Logstore不存在,日志服务会自动创建。
  logstore: <SLS_LOGSTORE>

  # iLogtail采集配置。
  logtailConfig:
    # 采集配置名称。
    configName: spark

    # 数据源类型,file表示文本日志
    inputType: file

    # 日志输入的相关配置。
    inputDetail:
      # 日志文件所在目录。
      logPath: /opt/spark/logs

      # 日志文件名称,支持通配符。
      filePattern: '*.log'

      # 日志文件编码。
      fileEncoding: utf8

      # 日志类型。
      logType: json_log
      localStorage: true
      key:
      - content
      logBeginRegex: .*
      logTimezone: ''
      discardNonUtf8: false
      discardUnmatch: true
      preserve: true
      preserveDepth: 0
      regex: (.*)
      outputType: LogService
      topicFormat: none
      adjustTimezone: false
      enableRawLog: false

      # 采集容器中的文本日志。
      dockerFile: true

      # 高级配置。
      advanced:
        # 容器元信息预览。
        collect_containers_flag: true

        # Kubernetes采集配置。
        k8s:
          # 按照标签过滤Pod。
          IncludeK8sLabel:
            sparkoperator.k8s.io/launched-by-spark-operator: "true"

          # 按照容器名称过滤容器。
          K8sContainerRegex: "^spark-kubernetes-(driver|executor)$"

          # 额外的日志标签配置。
          ExternalK8sLabelTag:
            spark-app-name: spark-app-name
            spark-version: spark-version
            spark-role: spark-role
            spark-app-selector: spark-app-selector
            sparkoperator.k8s.io/submission-id: sparkoperator.k8s.io/submission-id

      # 日志处理插件。
      plugin:
        processors:
        # 日志分隔。
        - type: processor_split_log_string
          detail:
            SplitKey: content
            SplitSep: ''

        # 字段JSON解析。
        - type: processor_json
          detail:
            ExpandArray: false
            ExpandConnector: ''
            ExpandDepth: 0
            IgnoreFirstConnector: false
            SourceKey: content
            KeepSource: false
            KeepSourceIfParseError: true
            NoKeyError: false
            UseSourceKeyAsPrefix: false

        # 日志时间戳提取。
        - type: processor_strptime
          detail:
            SourceKey: '@timestamp'
            Format: '%Y-%m-%dT%H:%M:%S.%fZ'
            KeepSource: false
            AdjustUTCOffset: true
            UTCOffset: 0
            AlarmIfFail: false

执行如下命令创建Logtail配置。

kubectl apply -f aliyun-log-config.yaml

您可以按照以下步骤查看新建的日志库和Logtail配置。

  1. 登录日志服务控制台

  2. Project列表区域,单击目标Project。

    image

  3. 日志存储 > 日志库页签中,单击目标日志库前面的>,依次选择数据接入 > Logtail配置

    image

  4. 单击目标Logtail采集配置,查看Logtail采集配置详情。

步骤四:提交示例Spark作业

使用以下内容创建一个名为spark-pi.yamlSparkApplication清单文件。

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: spark-pi
  namespace: default
spec:
  type: Scala
  mode: cluster
  image: <SPARK_IMAGE>
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.3.jar
  arguments: 
  - "5000"
  sparkVersion: 3.5.3
  sparkConfigMap: spark-log-conf
  driver:
    cores: 1
    memory: 512m
    serviceAccount: spark-operator-spark
  executor:
    instances: 1
    cores: 1
    memory: 4g

执行如下命令提交作业。

kubectl apply -f spark-pi.yaml

等待作业执行结束后,查看Driver Pod日志的最后10行。

kubectl logs  --tail=10 spark-pi-driver  

预期输出:

{"@timestamp":"2024-11-20T11:45:48.487Z","ecs.version":"1.2.0","log.level":"WARN","message":"Kubernetes client has been closed.","process.thread.name":"-937428334-pool-19-thread-1","log.logger":"org.apache.spark.scheduler.cluster.k8s.ExecutorPodsWatchSnapshotSource"}
{"@timestamp":"2024-11-20T11:45:48.585Z","ecs.version":"1.2.0","log.level":"INFO","message":"MapOutputTrackerMasterEndpoint stopped!","process.thread.name":"dispatcher-event-loop-7","log.logger":"org.apache.spark.MapOutputTrackerMasterEndpoint"}
{"@timestamp":"2024-11-20T11:45:48.592Z","ecs.version":"1.2.0","log.level":"INFO","message":"MemoryStore cleared","process.thread.name":"main","log.logger":"org.apache.spark.storage.memory.MemoryStore"}
{"@timestamp":"2024-11-20T11:45:48.592Z","ecs.version":"1.2.0","log.level":"INFO","message":"BlockManager stopped","process.thread.name":"main","log.logger":"org.apache.spark.storage.BlockManager"}
{"@timestamp":"2024-11-20T11:45:48.596Z","ecs.version":"1.2.0","log.level":"INFO","message":"BlockManagerMaster stopped","process.thread.name":"main","log.logger":"org.apache.spark.storage.BlockManagerMaster"}
{"@timestamp":"2024-11-20T11:45:48.598Z","ecs.version":"1.2.0","log.level":"INFO","message":"OutputCommitCoordinator stopped!","process.thread.name":"dispatcher-event-loop-1","log.logger":"org.apache.spark.scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint"}
{"@timestamp":"2024-11-20T11:45:48.602Z","ecs.version":"1.2.0","log.level":"INFO","message":"Successfully stopped SparkContext","process.thread.name":"main","log.logger":"org.apache.spark.SparkContext"}
{"@timestamp":"2024-11-20T11:45:48.604Z","ecs.version":"1.2.0","log.level":"INFO","message":"Shutdown hook called","process.thread.name":"shutdown-hook-0","log.logger":"org.apache.spark.util.ShutdownHookManager"}
{"@timestamp":"2024-11-20T11:45:48.604Z","ecs.version":"1.2.0","log.level":"INFO","message":"Deleting directory /var/data/spark-f783cf2e-44db-452c-83c9-738f9c894ef9/spark-2caa5814-bd32-431c-a9f9-a32208b34fbb","process.thread.name":"shutdown-hook-0","log.logger":"org.apache.spark.util.ShutdownHookManager"}
{"@timestamp":"2024-11-20T11:45:48.606Z","ecs.version":"1.2.0","log.level":"INFO","message":"Deleting directory /tmp/spark-dacdfd95-f166-4b23-9312-af9052730417","process.thread.name":"shutdown-hook-0","log.logger":"org.apache.spark.util.ShutdownHookManager"}

输出日志已按JSONL格式打印,各字段含义如下:

  • @timestamp:日志记录产生时间。

  • ecs.version:Elastic Common Schema (ECS) 版本号(ECS为标准化日志格式)。

  • log.level:日志级别。

  • message:日志消息。

  • process.thread.name:产生该日志的线程名称。

  • log.logger:记录该日志的logger名称。

步骤五:查询和分析Spark日志

您可以通过查询和分析日志,指定作业执行的时间范围,以确认日志是否已成功收集。

image

(可选)步骤六:环境清理

如果您已体验完本教程,相关资源如不再需要,可以通过执行以下命令进行删除。

执行如下命令删除Spark作业。

kubectl delete -f spark-pi.yaml

执行如下命令删除Logtail配置。

kubectl delete -f aliyun-log-config.yaml

执行如下命令删除Log4j2日志配置。

kubectl delete -f spark-log-conf.yaml