Spark on ACK概述

更新时间:2025-03-21 02:26:16

Spark on ACKACK基于Spark on Kubernetes提供的解决方案,让您能够基于ACK提供的企业级容器应用管理能力,快速构建高效、灵活且可扩展的Spark大数据处理平台。

Spark on ACK介绍

Apache Spark是一种专门用于大规模数据处理的计算引擎,广泛应用于数据分析和机器学习等场景。自 2.3 版本起,Spark支持将作业提交至 Kubernetes 集群中(Running Spark on Kubernetes)。

Spark Operator是用于在Kubernetes集群中运行Spark工作负载的Operator,支持以Kubernetes原生的方式自动化管理Spark作业的生命周期,包括作业配置、作业提交、作业重试等。

Spark on ACK解决方案对Spark Operator等相关组件进行了定制与优化,兼容开源版本并进一步拓展了功能特性。通过与阿里云产品生态的无缝集成,例如日志服务、对象存储和可观测性等,您可以基于Spark on ACK快速构建一个灵活、高效且可扩展的大数据处理平台。

功能优势

  • 简化开发与运维

    • 可移植性:支持将Spark应用及其依赖打包为标准化的容器镜像,从而实现Spark作业在不同 Kubernetes集群间的无缝迁移。

    • 可观测性:支持通过Spark History Server组件查看作业运行状态,并集成阿里云日志服务SLS和可观测监控Prometheus版,进一步提升作业的可观测性。

    • 工作流编排:通过工作流编排引擎(例如Apache AirflowArgo Workflows)编排 Spark 作业,能够实现数据处理流水线的自动化、高效调度与跨环境一致性部署,提升运维效率并降低迁移成本。

    • 多版本支持:支持在单个ACK集群中同时运行多个不同版本的Spark作业。

  • 作业调度与资源管理

    • 作业队列管理:与ack-kube-queue集成,提供灵活的作业队列管理和资源配额管理,自动优化工作负载资源分配并提升集群资源利用率。

    • 多种调度策略:复用ACK调度器已有的调度能力,支持多种批调度策略,包括Gang Scheduling、Capacity Scheduling等。

    • 多架构调度:支持混合使用x86Arm架构的ECS资源,例如通过使用倚天Arm架构服务器实现增效降本。

    • 多集群调度:通过ACK One多集群舰队Spark作业在多集群中进行调度和分发,提升多集群资源利用率。

    • 弹性算力供给:支持自定义弹性资源优先级调度,混合使用多种弹性方案,包括节点自动伸缩节点即时弹性等;也支持使用ECI、ACS算力,无需保有云服务器实例,按需使用,灵活扩缩。

    • 在离线混部:与ack-koordinator集成,支持在离线混部,提高集群资源利用率。

  • 性能与稳定性优化

    • Shuffle性能优化:通过配置Spark作业使用Celeborn作为Remote Shuffle Service,实现存算分离,提升Shuffle性能和稳定性。

    • 数据访问加速:基于Fluid数据编排和访问加速能力,加速Spark作业数据访问,提升作业性能。

整体架构

ACK集群中部署Spark作业时,您可以通过Spark Operator快速提交作业,使用ACK与阿里云产品集成带来的可观测、调度、资源弹性等能力。Spark on ACK的整体架构如下。

image
  • 客户端:通过kubectl、Arena等命令行工具将Spark作业提交至ACK集群。

  • 工作流:通过Apache Airflow、Argo Workflows等工作流框架来编排Spark作业并提交至ACK集群。

  • 可观测:通过Spark History Server、阿里云日志服务SLS、阿里云可观测监控 Prometheus 版搭建可观测体系,包括查看作业运行状态、收集和分析作业日志和监控指标等。

  • Spark Operator:自动化管理Spark作业的生命周期,包括作业配置管理、作业提交和重试等。

  • Remote Shuffle Service(RSS):使用Apache Celeborn作为 RSS,提高Spark作业在Shuffle时的性能和稳定性。

  • 缓存:使用Fluid作为分布式缓存系统实现数据接入和数据访问加速。

  • 云基础设施:作业运行过程中将使用到阿里云提供的基础设施,包括计算资源(ECSECIACS)、存储资源(云盘NASOSS)和网络资源(ENIVPCSLB)等。

计费说明

ACK集群中运行Spark作业时,相关组件的安装是免费的。使用过程中,ACK集群本身的费用(集群管理费和相关云产品费用)仍然正常收取,请参见计费概述

如果您同时使用了其他云产品,例如通过日志服务SLS收集Spark作业产生的日志、Spark作业读写OSS/NAS中的数据等,产生的云产品费用由各云产品收取。您可以参见下文的操作文档了解。

开始使用

ACK集群中运行Spark作业的大致流程如下,包括基础使用、可观测性和高阶配置,供您按需选择和配置。

image

基础使用

流程

说明

流程

说明

构建Spark容器镜像

您可以选择直接使用开源社区提供的Spark容器镜像,或者基于开源容器镜像进行定制并推送到您自己的镜像仓库中。下面是一个Dockerfile示例,您可以按需修改此Dockerfile,例如替换Spark基础镜像、加入依赖Jar包等,然后构建镜像并推送到镜像仓库中。

展开查看示例Dockerfile

ARG SPARK_IMAGE=spark:3.5.4

FROM ${SPARK_IMAGE}

# Add dependency for Hadoop Aliyun OSS support
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aliyun/3.3.4/hadoop-aliyun-3.3.4.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/com/aliyun/oss/aliyun-sdk-oss/3.17.4/aliyun-sdk-oss-3.17.4.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/jdom/jdom2/2.0.6.1/jdom2-2.0.6.1.jar ${SPARK_HOME}/jars

# Add dependency for log4j-layout-template-json
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/apache/logging/log4j/log4j-layout-template-json/2.24.1/log4j-layout-template-json-2.24.1.jar ${SPARK_HOME}/jars

# Add dependency for Celeborn
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/apache/celeborn/celeborn-client-spark-3-shaded_2.12/0.5.3/celeborn-client-spark-3-shaded_2.12-0.5.3.jar ${SPARK_HOME}/jars

创建专属命名空间

Spark作业创建一个或多个专属的命名空间(本教程使用spark),用于实现资源隔离和资源配额等。后续Spark作业都将运行在该命名空间中。创建命令如下。

kubectl create namespace spark

使用Spark Operator运行Spark作业

部署ack-spark-operator组件,并配置 spark.jobNamespaces=["spark"](只监听spark命名空间中提交的Spark作业)。部署完成后,即可运行下述示例Spark作业。

展开查看示例Spark作业

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: spark-pi
  namespace: spark # 需要确保此命名空间在 spark.jobNamespaces 指定的命名空间列表中。
spec:
  type: Scala
  mode: cluster
  # 需将 <SPARK_IMAGE> 替换成您自己的 Spark 容器镜像。
  image: <SPARK_IMAGE>
  imagePullPolicy: IfNotPresent
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.4.jar
  arguments:
  - "5000"
  sparkVersion: 3.5.4
  driver:
    cores: 1
    coreLimit: 1200m
    memory: 512m
    template:
      spec:
        containers:
        - name: spark-kubernetes-driver
        serviceAccount: spark-operator-spark
  executor:
    instances: 1
    cores: 1
    coreLimit: 1200m
    memory: 512m
    template:
      spec:
        containers:
        - name: spark-kubernetes-executor
  restartPolicy:
    type: Never
详细内容,请参见使用Spark Operator运行Spark作业

读写OSS数据

Spark作业访问阿里云OSS数据有多种方式,包括Hadoop Aliyun SDKHadoop AWS SDKJindoSDK等,根据选择的SDK,您需要在Spark容器镜像中包含相应的依赖并在Spark作业中配置Hadoop相关参数。

展开查看示例代码

参见Spark作业读写OSS数据将测试数据集上传至OSS后,您可以运行如下示例Spark作业。

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: spark-pagerank
  namespace: spark
spec:
  type: Scala
  mode: cluster
  # 需将 <SPARK_IMAGE> 替换成您自己的 Spark 镜像
  image: <SPARK_IMAGE>
  imagePullPolicy: IfNotPresent
  mainClass: org.apache.spark.examples.SparkPageRank
  mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.4.jar
  arguments:
  # 指定输入测试数据集,将 <OSS_BUCKET> 替换成 OSS Buckt 名称
  - oss://<OSS_BUCKET>/data/pagerank_dataset.txt
  # 迭代次数
  - "10"
  sparkVersion: 3.5.4
  hadoopConf:
    # 配置 Spark 作业访问 OSS
    fs.oss.impl: org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem
    # 将 <OSS_ENDPOINT> 替换成 OSS 访问端点,例如北京地区 OSS 的内网访问端点为 oss-cn-beijing-internal.aliyuncs.com
    fs.oss.endpoint: <OSS_ENDPOINT>
    fs.oss.credentials.provider: com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider
  driver:
    cores: 1
    coreLimit: "1"
    memory: 4g
    template:
      spec:
        containers:
        - name: spark-kubernetes-driver
          envFrom:
          # 从指定 Secret 中读取环境变量
          - secretRef:
              name: spark-oss-secret
        serviceAccount: spark-operator-spark
  executor:
    instances: 2
    cores: 1
    coreLimit: "1"
    memory: 4g
    template:
      spec:
        containers:
        - name: spark-kubernetes-executor
          envFrom:
          # 从指定 Secret 中读取环境变量
          - secretRef:
              name: spark-oss-secret
  restartPolicy:
    type: Never
详细内容,请参见Spark作业读写OSS数据

可观测性

流程

说明

流程

说明

部署Spark History Server

spark命名空间中部署ack-spark-history-server,配置日志存储后端(支持PVC、OSS/OSS-HDFS、HDFS)等信息,从指定的存储系统中读取Spark事件日志并解析成Web UI方便用户查看。下面的示例配置展示了如何配置Spark History Server从指定的NAS文件系统的/spark/event-logs路径中读取事件日志。

展开查看示例配置

# Spark 配置
sparkConf:
  spark.history.fs.logDirectory: file:///mnt/nas/spark/event-logs

# 环境变量
env:
- name: SPARK_DAEMON_MEMORY
  value: 7g

# 数据卷
volumes:
- name: nas
  persistentVolumeClaim:
    claimName: nas-pvc

# 数据卷挂载
volumeMounts:
- name: nas
  subPath: spark/event-logs
  mountPath: /mnt/nas/spark/event-logs

# 根据 Spark 作业数量和规模调整资源大小
resources:
  requests:
    cpu: 2
    memory: 8Gi
  limits:
    cpu: 2
    memory: 8Gi

接着,在提交Spark作业时挂载相同的NAS文件系统并配置Spark将事件日志写入相同的路径,后续您将可以从Spark History Server中查看该作业,下面是一个示例作业。

展开查看示例Spark作业

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: spark-pi
  namespace: spark
spec:
  type: Scala
  mode: cluster
  # 需将 <SPARK_IMAGE> 替换成您的 Spark 镜像
  image: <SPARK_IMAGE>
  imagePullPolicy: IfNotPresent
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.4.jar
  arguments:
  - "5000"
  sparkVersion: 3.5.4
  sparkConf:
    # 事件日志
    spark.eventLog.enabled: "true"
    spark.eventLog.dir: file:///mnt/nas/spark/event-logs
  driver:
    cores: 1
    coreLimit: 1200m
    memory: 512m
    template:
      spec:
        containers:
        - name: spark-kubernetes-driver
          volumeMounts:
          - name: nas
            subPath: spark/event-logs
            mountPath: /mnt/nas/spark/event-logs
        volumes:
        - name: nas
          persistentVolumeClaim:
            claimName: nas-pvc
        serviceAccount: spark-operator-spark
  executor:
    instances: 1
    cores: 1
    coreLimit: 1200m
    memory: 512m
    template:
      spec:
        containers:
        - name: spark-kubernetes-executor
  restartPolicy:
    type: Never
详细内容,请参见使用Spark History Server查看Spark作业信息

配置日志服务SLS收集Spark日志

集群中运行大量 Spark 作业时,建议使用阿里云日志服务 SLS 统一收集所有 Spark 作业日志,以便查询和分析Spark容器的 stdout、stderr 日志。

展开查看示例作业

此代码在Spark作业中使用SLS收集Spark容器中位于/opt/spark/logs/*.log路径中的日志。

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: spark-pi
  namespace: spark
spec:
  type: Scala
  mode: cluster
  # 需将 <SPARK_IMAGE> 替换成步骤一种构建得到的 Spark 镜像
  image: <SPARK_IMAGE>
  imagePullPolicy: IfNotPresent
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.4.jar
  arguments:
  - "5000"
  sparkVersion: 3.5.4
  # 从指定的 ConfigMap 中读取日志配置文件 log4j2.properties
  sparkConfigMap: spark-log-conf
  sparkConf:
    # 事件日志
    spark.eventLog.enabled: "true"
    spark.eventLog.dir: file:///mnt/nas/spark/event-logs
  driver:
    cores: 1
    coreLimit: 1200m
    memory: 512m
    template:
      spec:
        containers:
        - name: spark-kubernetes-driver
          volumeMounts:
          - name: nas
            subPath: spark/event-logs
            mountPath: /mnt/nas/spark/event-logs
        serviceAccount: spark-operator-spark
        volumes:
        - name: nas
          persistentVolumeClaim:
            claimName: nas-pvc
  executor:
    instances: 1
    cores: 1
    coreLimit: 1200m
    memory: 512m
    template:
      spec:
        containers:
        - name: spark-kubernetes-executor
  restartPolicy:
    type: Never
详细内容,请参见使用日志服务收集Spark作业日志

性能优化

流程

说明

流程

说明

通过RSS提升Shuffle性能

Shuffle是分布式计算中的重要操作,其过程通常伴随着大量磁盘IO、数据序列化和网络IO,容易引发OOM和数据获取失败(Fetch失败)等问题。为了优化Shuffle的性能和稳定性,提升计算服务质量,您可以在Spark作业配置中使用Apache Celeborn作为Remote Shuffle Service(RSS)。

展开查看示例代码

参见Spark作业使用Celeborn作为RSS在集群中部署ack-celeborn组件后,您可以基于下方代码提交Spark作业,使用Celeborn作为RSS。

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: spark-pagerank
  namespace: spark
spec:
  type: Scala
  mode: cluster
  # 需将 <SPARK_IMAGE> 替换成您的 Spark 镜像
  image: <SPARK_IMAGE>
  imagePullPolicy: IfNotPresent
  mainClass: org.apache.spark.examples.SparkPageRank
  mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.4.jar
  arguments:
  # 指定输入测试数据集,将 <OSS_BUCKET> 替换成 OSS Buckt 名称
  - oss://<OSS_BUCKET>/data/pagerank_dataset.txt
  # 迭代次数
  - "10"
  sparkVersion: 3.5.4
  hadoopConf:
    # 配置 Spark 作业访问 OSS
    fs.oss.impl: org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem
    # 将 <OSS_ENDPOINT> 替换成 OSS 访问端点,例如北京地区 OSS 的内网访问端点为 oss-cn-beijing-internal.aliyuncs.com
    fs.oss.endpoint: <OSS_ENDPOINT>
    fs.oss.credentials.provider: com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider
  # 从指定的 ConfigMap 中读取日志配置文件 log4j2.properties
  sparkConfigMap: spark-log-conf
  sparkConf:
    # 事件日志
    spark.eventLog.enabled: "true"
    spark.eventLog.dir: file:///mnt/nas/spark/event-logs
    
    # Celeborn 相关配置
    spark.shuffle.manager: org.apache.spark.shuffle.celeborn.SparkShuffleManager
    spark.serializer: org.apache.spark.serializer.KryoSerializer
    # 需要根据 Celeborn master 副本数量进行配置
    spark.celeborn.master.endpoints: celeborn-master-0.celeborn-master-svc.celeborn.svc.cluster.local,celeborn-master-1.celeborn-master-svc.celeborn.svc.cluster.local,celeborn-master-2.celeborn-master-svc.celeborn.svc.cluster.local
    spark.celeborn.client.spark.shuffle.writer: hash
    spark.celeborn.client.push.replicate.enabled: "false"
    spark.sql.adaptive.localShuffleReader.enabled: "false"
    spark.sql.adaptive.enabled: "true"
    spark.sql.adaptive.skewJoin.enabled: "true"
    spark.shuffle.sort.io.plugin.class: org.apache.spark.shuffle.celeborn.CelebornShuffleDataIO
    spark.dynamicAllocation.shuffleTracking.enabled: "false"
    spark.executor.userClassPathFirst: "false"
  driver:
    cores: 1
    coreLimit: "1"
    memory: 4g
    template:
      spec:
        containers:
        - name: spark-kubernetes-driver
          envFrom:
          # 从指定 Secret 中读取环境变量
          - secretRef:
              name: spark-oss-secret
          volumeMounts:
          - name: nas
            subPath: spark/event-logs
            mountPath: /mnt/nas/spark/event-logs
        volumes:
        - name: nas
          persistentVolumeClaim:
            claimName: nas-pvc
        serviceAccount: spark-operator-spark
  executor:
    instances: 2
    cores: 1
    coreLimit: "1"
    memory: 4g
    template:
      spec:
        containers:
        - name: spark-kubernetes-executor
          envFrom:
          # 从指定 Secret 中读取环境变量
          - secretRef:
              name: spark-oss-secret
  restartPolicy:
    type: Never
详细内容,请参见Spark作业使用Celeborn作为RSS

定义弹性资源调度优先级

使用ECI Pod并配置合适的调度策略,可以按需创建并按资源实际用量付费,有效减少集群资源闲置带来的成本浪费。在ECSECI资源混用的场景下,还可以指定调度优先级。

您无需在SparkApplication 中修改调度相关配置,ACK调度器会根据配置的弹性策略自动完成Pod调度。您可以按需灵活地定制多种弹性资源(例如ECSECI)的混用。

展开查看示例弹性策略

以下示例自定义了弹性策略:对于spark命名空间中由Spark Operator启动的Pod,优先使用ECS资源并且最多调度10Pod,在ECS资源不足时再使用ECI资源并且最多调度10Pod。

apiVersion: scheduling.alibabacloud.com/v1alpha1
kind: ResourcePolicy
metadata:
  name: spark
  namespace: spark
spec:
  # 通过标签选择器指定调度策略应用的 Pod
  selector:
    # 例如,指定调度策略应用于通过 Spark Operator 方式提交的作业 Pod
    sparkoperator.k8s.io/launched-by-spark-operator: "true"
  strategy: prefer
  # 调度单元配置
  # 扩容时,将按照调度单元的顺序进行扩容;缩容时,将按照调度单元的逆序进行缩容。
  units:
  # 第一个调度单元使用 ecs 资源,最多能调度 10 个 pod 到 ECS 实例
  - resource: ecs
    max: 10
    # 调度器会将标签信息更新到 Pod 上
    podLabels:
      # 这是一条特殊标签,不会更新到 Pod 上
      k8s.aliyun.com/resource-policy-wait-for-ecs-scaling: "true"
    # 使用 ECS 资源时可以通过节点选择器指定可以调度的节点
    nodeSelector:
      # 例如,选择按量付费类型的 ECS 节点
      node.alibabacloud.com/instance-charge-type: PostPaid
  # 第二个调度单元使用 ECI 资源,最多能调度 10 个 pod 到 ECI 实例
  - resource: eci
    max: 10
  # 在统计 Pod 数量时忽略 ResourcePolicy 创建之前已经调度的 Pod
  ignorePreviousPod: false
  # 在统计 Pod 数量时忽略处于 Terminating 状态的 Pod
  ignoreTerminatingPod: true
  # 抢占策略
  # BeforeNextUnit 表示调度器将在每个 Unit 调度失败时尝试抢占
  # AfterAllUnits 表示 ResourcePolicy 只在最后一个 Unit 调度失败时尝试抢占
  preemptPolicy: AfterAllUnits
  # Pod 在何种情况下被允许使用后续 Unit 中的资源
  whenTryNextUnits:
    # 当满足以下两种情形之一时,允许使用后续 Unit 中的资源
    # 1. 当前 Unit 的 Max 已设置,且该 Unit 中的 Pod 数量大于或等于设置的 Max 值时;
    # 2. 当前 Unit 的 Max 未设置,且该 Unit 的 PodLabels 中包含标签 k8s.aliyun.com/resource-policy-wait-for-ecs-scaling: "true",且等待超时之后。
    policy: TimeoutOrExceedMax
    # 当 policy 为 TimeoutOrExceedMax 时,若当前 Unit 资源不足以调度 Pod,则在当前 Unit 中等待,等待时间最大为 timeout,
    # 该策略可以与自动伸缩以及 ECI 配合,达到优先尝试节点池自动伸缩,并且在超时后自动使用 eci 的效果。
    timeout: 30s
详细内容,请参见使用ECI弹性资源运行Spark作业

配置动态资源分配

动态资源分配(Dynamic Resource Allocation,简称DRA)可根据工作负载的大小动态调整作业所使用的计算资源。您可以为Spark作业启用动态资源分配,避免因资源不足导致作业执行时间过长或因资源过剩导致资源浪费。

展开查看示例作业

此示例作业结合Celeborn RSS进一步配置了动态资源分配。

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: spark-pagerank
  namespace: spark
spec:
  type: Scala
  mode: cluster
  # 需将 <SPARK_IMAGE> 替换成建您的 Spark 镜像
  image: <SPARK_IMAGE>
  imagePullPolicy: IfNotPresent
  mainClass: org.apache.spark.examples.SparkPageRank
  mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.4.jar
  arguments:
  # 指定输入测试数据集,将 <OSS_BUCKET> 替换成 OSS Buckt 名称
  - oss://<OSS_BUCKET>/data/pagerank_dataset.txt
  # 迭代次数
  - "10"
  sparkVersion: 3.5.4
  hadoopConf:
    # 配置 Spark 作业访问 OSS
    fs.oss.impl: org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem
    # 将 <OSS_ENDPOINT> 替换成 OSS 访问端点,例如北京地区 OSS 的内网访问端点为 oss-cn-beijing-internal.aliyuncs.com
    fs.oss.endpoint: <OSS_ENDPOINT>
    fs.oss.credentials.provider: com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider
  # 从指定的 ConfigMap 中读取日志配置文件 log4j2.properties
  sparkConfigMap: spark-log-conf
  sparkConf:
    # ====================
    # 事件日志
    # ====================
    spark.eventLog.enabled: "true"
    spark.eventLog.dir: file:///mnt/nas/spark/event-logs

    # ====================
    # Celeborn
    # Ref: https://github.com/apache/celeborn/blob/main/README.md#spark-configuration
    # ====================
    # Shuffle manager class name changed in 0.3.0:
    #    before 0.3.0: `org.apache.spark.shuffle.celeborn.RssShuffleManager`
    #    since 0.3.0: `org.apache.spark.shuffle.celeborn.SparkShuffleManager`
    spark.shuffle.manager: org.apache.spark.shuffle.celeborn.SparkShuffleManager
    # Must use kryo serializer because java serializer do not support relocation
    spark.serializer: org.apache.spark.serializer.KryoSerializer
    # 需要根据 Celeborn master 副本数量进行配置。
    spark.celeborn.master.endpoints: celeborn-master-0.celeborn-master-svc.celeborn.svc.cluster.local,celeborn-master-1.celeborn-master-svc.celeborn.svc.cluster.local,celeborn-master-2.celeborn-master-svc.celeborn.svc.cluster.local
    # options: hash, sort
    # Hash shuffle writer use (partition count) * (celeborn.push.buffer.max.size) * (spark.executor.cores) memory.
    # Sort shuffle writer uses less memory than hash shuffle writer, if your shuffle partition count is large, try to use sort hash writer.
    spark.celeborn.client.spark.shuffle.writer: hash
    # We recommend setting `spark.celeborn.client.push.replicate.enabled` to true to enable server-side data replication
    # If you have only one worker, this setting must be false 
    # If your Celeborn is using HDFS, it's recommended to set this setting to false
    spark.celeborn.client.push.replicate.enabled: "false"
    # Support for Spark AQE only tested under Spark 3
    spark.sql.adaptive.localShuffleReader.enabled: "false"
    # we recommend enabling aqe support to gain better performance
    spark.sql.adaptive.enabled: "true"
    spark.sql.adaptive.skewJoin.enabled: "true"
    # 当 Spark 版本 >= 3.5.0 时,配置该选项以支持动态资源分配
    spark.shuffle.sort.io.plugin.class: org.apache.spark.shuffle.celeborn.CelebornShuffleDataIO
    spark.executor.userClassPathFirst: "false"

    # ====================
    # 动态资源分配
    # Ref: https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation
    # ====================
    # 启用动态资源分配
    spark.dynamicAllocation.enabled: "true"
    # 启用 shuffle 文件跟踪,不依赖 ESS 即可实现动态资源分配。
    # 在使用 Celeborn 作为 RSS 时,当 Spark 版本 >= 3.4.0 时,强烈建议关闭该选项。
    spark.dynamicAllocation.shuffleTracking.enabled: "false"
    # Executor 数量的初始值。
    spark.dynamicAllocation.initialExecutors: "3"
    # Executor 数量的最小值。
    spark.dynamicAllocation.minExecutors: "0"
    # Executor 数量的最大值。
    spark.dynamicAllocation.maxExecutors: "10"
    # Executor 空闲超时时间,超过该时间将会被释放掉。
    spark.dynamicAllocation.executorIdleTimeout: 60s
    # 缓存了数据块的 Executor 空闲超时时间,超过该时间将会被释放掉,默认为 infinity,即不会释放。
    # spark.dynamicAllocation.cachedExecutorIdleTimeout:
    # 当存在待调度任务超过该时间后,将会申请更多的 executor。
    spark.dynamicAllocation.schedulerBacklogTimeout: 1s
    # 每间隔该时间后,将会开始下一批次申请 Executor。 
    spark.dynamicAllocation.sustainedSchedulerBacklogTimeout: 1s
  driver:
    cores: 1
    coreLimit: "1"
    memory: 4g
    template:
      spec:
        containers:
        - name: spark-kubernetes-driver
          envFrom:
          # 从指定 Secret 中读取环境变量
          - secretRef:
              name: spark-oss-secret
          volumeMounts:
          - name: nas
            subPath: spark/event-logs
            mountPath: /mnt/nas/spark/event-logs
        volumes:
        - name: nas
          persistentVolumeClaim:
            claimName: nas-pvc
        serviceAccount: spark-operator-spark
  executor:
    cores: 1
    coreLimit: "1"
    memory: 4g
    template:
      spec:
        containers:
        - name: spark-kubernetes-executor
          envFrom:
          # 从指定 Secret 中读取环境变量
          - secretRef:
              name: spark-oss-secret
  restartPolicy:
    type: Never
详细内容,请参见Spark作业配置动态资源分配

使用Fluid加速数据访问

如果您的数据位于线下IDC或在数据访问时遇到性能瓶颈,可使用Fluid提供的数据接入和分布式缓存编排能力加速数据访问。

详细内容,请参见Spark作业使用Fluid加速数据访问

相关文档

  • 本页导读
  • Spark on ACK介绍
  • 功能优势
  • 整体架构
  • 计费说明
  • 开始使用
  • 基础使用
  • 可观测性
  • 性能优化
  • 相关文档