Spark on ACK介绍
Apache Spark是一种专门用于大规模数据处理的计算引擎,广泛应用于数据分析和机器学习等场景。自 2.3 版本起,Spark支持将作业提交至 Kubernetes 集群中(Running Spark on Kubernetes)。
Spark Operator是用于在Kubernetes集群中运行Spark工作负载的Operator,支持以Kubernetes原生的方式自动化管理Spark作业的生命周期,包括作业配置、作业提交、作业重试等。
Spark on ACK解决方案对Spark Operator等相关组件进行了定制与优化,兼容开源版本并进一步拓展了功能特性。通过与阿里云产品生态的无缝集成,例如日志服务、对象存储和可观测性等,您可以基于Spark on ACK快速构建一个灵活、高效且可扩展的大数据处理平台。
功能优势
简化开发与运维
可移植性:支持将Spark应用及其依赖打包为标准化的容器镜像,从而实现Spark作业在不同 Kubernetes集群间的无缝迁移。
可观测性:支持通过Spark History Server组件查看作业运行状态,并集成阿里云日志服务SLS和可观测监控Prometheus版,进一步提升作业的可观测性。
工作流编排:通过工作流编排引擎(例如Apache Airflow、Argo Workflows)编排 Spark 作业,能够实现数据处理流水线的自动化、高效调度与跨环境一致性部署,提升运维效率并降低迁移成本。
多版本支持:支持在单个ACK集群中同时运行多个不同版本的Spark作业。
作业调度与资源管理
作业队列管理:与ack-kube-queue集成,提供灵活的作业队列管理和资源配额管理,自动优化工作负载资源分配并提升集群资源利用率。
多种调度策略:复用ACK调度器已有的调度能力,支持多种批调度策略,包括Gang Scheduling、Capacity Scheduling等。
多架构调度:支持混合使用x86和Arm架构的ECS资源,例如通过使用倚天Arm架构服务器实现增效降本。
多集群调度:通过ACK One多集群舰队将Spark作业在多集群中进行调度和分发,提升多集群资源利用率。
弹性算力供给:支持自定义弹性资源优先级调度,混合使用多种弹性方案,包括节点自动伸缩、节点即时弹性等;也支持使用ECI、ACS算力,无需保有云服务器实例,按需使用,灵活扩缩。
在离线混部:与ack-koordinator集成,支持在离线混部,提高集群资源利用率。
性能与稳定性优化
整体架构
在ACK集群中部署Spark作业时,您可以通过Spark Operator快速提交作业,使用ACK与阿里云产品集成带来的可观测、调度、资源弹性等能力。Spark on ACK的整体架构如下。

客户端:通过kubectl、Arena等命令行工具将Spark作业提交至ACK集群。
工作流:通过Apache Airflow、Argo Workflows等工作流框架来编排Spark作业并提交至ACK集群。
可观测:通过Spark History Server、阿里云日志服务SLS、阿里云可观测监控 Prometheus 版搭建可观测体系,包括查看作业运行状态、收集和分析作业日志和监控指标等。
Spark Operator:自动化管理Spark作业的生命周期,包括作业配置管理、作业提交和重试等。
Remote Shuffle Service(RSS):使用Apache Celeborn作为 RSS,提高Spark作业在Shuffle时的性能和稳定性。
缓存:使用Fluid作为分布式缓存系统实现数据接入和数据访问加速。
云基础设施:作业运行过程中将使用到阿里云提供的基础设施,包括计算资源(ECS、ECI、ACS)、存储资源(云盘、NAS、OSS)和网络资源(ENI、VPC、SLB)等。
计费说明
在ACK集群中运行Spark作业时,相关组件的安装是免费的。使用过程中,ACK集群本身的费用(集群管理费和相关云产品费用)仍然正常收取,请参见计费概述。
如果您同时使用了其他云产品,例如通过日志服务SLS收集Spark作业产生的日志、Spark作业读写OSS/NAS中的数据等,产生的云产品费用由各云产品收取。您可以参见下文的操作文档了解。
开始使用
在ACK集群中运行Spark作业的大致流程如下,包括基础使用、可观测性和高阶配置,供您按需选择和配置。

基础使用
流程 | 说明 |
构建Spark容器镜像 | 您可以选择直接使用开源社区提供的Spark容器镜像,或者基于开源容器镜像进行定制并推送到您自己的镜像仓库中。下面是一个Dockerfile示例,您可以按需修改此Dockerfile,例如替换Spark基础镜像、加入依赖Jar包等,然后构建镜像并推送到镜像仓库中。
ARG SPARK_IMAGE=spark:3.5.4
FROM ${SPARK_IMAGE}
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aliyun/3.3.4/hadoop-aliyun-3.3.4.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/com/aliyun/oss/aliyun-sdk-oss/3.17.4/aliyun-sdk-oss-3.17.4.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/jdom/jdom2/2.0.6.1/jdom2-2.0.6.1.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/apache/logging/log4j/log4j-layout-template-json/2.24.1/log4j-layout-template-json-2.24.1.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/apache/celeborn/celeborn-client-spark-3-shaded_2.12/0.5.3/celeborn-client-spark-3-shaded_2.12-0.5.3.jar ${SPARK_HOME}/jars
|
创建专属命名空间 | 为Spark作业创建一个或多个专属的命名空间(本教程使用spark ),用于实现资源隔离和资源配额等。后续Spark作业都将运行在该命名空间中。创建命令如下。
kubectl create namespace spark
|
使用Spark Operator运行Spark作业 | 部署ack-spark-operator组件,并配置 spark.jobNamespaces=["spark"] (只监听spark 命名空间中提交的Spark作业)。部署完成后,即可运行下述示例Spark作业。
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: spark-pi
namespace: spark
spec:
type: Scala
mode: cluster
image: <SPARK_IMAGE>
imagePullPolicy: IfNotPresent
mainClass: org.apache.spark.examples.SparkPi
mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.4.jar
arguments:
- "5000"
sparkVersion: 3.5.4
driver:
cores: 1
coreLimit: 1200m
memory: 512m
template:
spec:
containers:
- name: spark-kubernetes-driver
serviceAccount: spark-operator-spark
executor:
instances: 1
cores: 1
coreLimit: 1200m
memory: 512m
template:
spec:
containers:
- name: spark-kubernetes-executor
restartPolicy:
type: Never
详细内容,请参见使用Spark Operator运行Spark作业。 |
读写OSS数据 | Spark作业访问阿里云OSS数据有多种方式,包括Hadoop Aliyun SDK、Hadoop AWS SDK和JindoSDK等,根据选择的SDK,您需要在Spark容器镜像中包含相应的依赖并在Spark作业中配置Hadoop相关参数。
参见Spark作业读写OSS数据将测试数据集上传至OSS后,您可以运行如下示例Spark作业。
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: spark-pagerank
namespace: spark
spec:
type: Scala
mode: cluster
image: <SPARK_IMAGE>
imagePullPolicy: IfNotPresent
mainClass: org.apache.spark.examples.SparkPageRank
mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.4.jar
arguments:
- oss://<OSS_BUCKET>/data/pagerank_dataset.txt
- "10"
sparkVersion: 3.5.4
hadoopConf:
fs.oss.impl: org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem
fs.oss.endpoint: <OSS_ENDPOINT>
fs.oss.credentials.provider: com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider
driver:
cores: 1
coreLimit: "1"
memory: 4g
template:
spec:
containers:
- name: spark-kubernetes-driver
envFrom:
- secretRef:
name: spark-oss-secret
serviceAccount: spark-operator-spark
executor:
instances: 2
cores: 1
coreLimit: "1"
memory: 4g
template:
spec:
containers:
- name: spark-kubernetes-executor
envFrom:
- secretRef:
name: spark-oss-secret
restartPolicy:
type: Never
详细内容,请参见Spark作业读写OSS数据。 |
可观测性
流程 | 说明 |
部署Spark History Server | 在spark 命名空间中部署ack-spark-history-server,配置日志存储后端(支持PVC、OSS/OSS-HDFS、HDFS)等信息,从指定的存储系统中读取Spark事件日志并解析成Web UI方便用户查看。下面的示例配置展示了如何配置Spark History Server从指定的NAS文件系统的/spark/event-logs 路径中读取事件日志。
sparkConf:
spark.history.fs.logDirectory: file:///mnt/nas/spark/event-logs
env:
- name: SPARK_DAEMON_MEMORY
value: 7g
volumes:
- name: nas
persistentVolumeClaim:
claimName: nas-pvc
volumeMounts:
- name: nas
subPath: spark/event-logs
mountPath: /mnt/nas/spark/event-logs
resources:
requests:
cpu: 2
memory: 8Gi
limits:
cpu: 2
memory: 8Gi
接着,在提交Spark作业时挂载相同的NAS文件系统并配置Spark将事件日志写入相同的路径,后续您将可以从Spark History Server中查看该作业,下面是一个示例作业。
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: spark-pi
namespace: spark
spec:
type: Scala
mode: cluster
image: <SPARK_IMAGE>
imagePullPolicy: IfNotPresent
mainClass: org.apache.spark.examples.SparkPi
mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.4.jar
arguments:
- "5000"
sparkVersion: 3.5.4
sparkConf:
spark.eventLog.enabled: "true"
spark.eventLog.dir: file:///mnt/nas/spark/event-logs
driver:
cores: 1
coreLimit: 1200m
memory: 512m
template:
spec:
containers:
- name: spark-kubernetes-driver
volumeMounts:
- name: nas
subPath: spark/event-logs
mountPath: /mnt/nas/spark/event-logs
volumes:
- name: nas
persistentVolumeClaim:
claimName: nas-pvc
serviceAccount: spark-operator-spark
executor:
instances: 1
cores: 1
coreLimit: 1200m
memory: 512m
template:
spec:
containers:
- name: spark-kubernetes-executor
restartPolicy:
type: Never
详细内容,请参见使用Spark History Server查看Spark作业信息。 |
配置日志服务SLS收集Spark日志 | 集群中运行大量 Spark 作业时,建议使用阿里云日志服务 SLS 统一收集所有 Spark 作业日志,以便查询和分析Spark容器的 stdout、stderr 日志。
此代码在Spark作业中使用SLS收集Spark容器中位于/opt/spark/logs/*.log 路径中的日志。
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: spark-pi
namespace: spark
spec:
type: Scala
mode: cluster
image: <SPARK_IMAGE>
imagePullPolicy: IfNotPresent
mainClass: org.apache.spark.examples.SparkPi
mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.4.jar
arguments:
- "5000"
sparkVersion: 3.5.4
sparkConfigMap: spark-log-conf
sparkConf:
spark.eventLog.enabled: "true"
spark.eventLog.dir: file:///mnt/nas/spark/event-logs
driver:
cores: 1
coreLimit: 1200m
memory: 512m
template:
spec:
containers:
- name: spark-kubernetes-driver
volumeMounts:
- name: nas
subPath: spark/event-logs
mountPath: /mnt/nas/spark/event-logs
serviceAccount: spark-operator-spark
volumes:
- name: nas
persistentVolumeClaim:
claimName: nas-pvc
executor:
instances: 1
cores: 1
coreLimit: 1200m
memory: 512m
template:
spec:
containers:
- name: spark-kubernetes-executor
restartPolicy:
type: Never
详细内容,请参见使用日志服务收集Spark作业日志。 |
性能优化
流程 | 说明 |
通过RSS提升Shuffle性能 | Shuffle是分布式计算中的重要操作,其过程通常伴随着大量磁盘IO、数据序列化和网络IO,容易引发OOM和数据获取失败(Fetch失败)等问题。为了优化Shuffle的性能和稳定性,提升计算服务质量,您可以在Spark作业配置中使用Apache Celeborn作为Remote Shuffle Service(RSS)。
参见Spark作业使用Celeborn作为RSS在集群中部署ack-celeborn组件后,您可以基于下方代码提交Spark作业,使用Celeborn作为RSS。
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: spark-pagerank
namespace: spark
spec:
type: Scala
mode: cluster
image: <SPARK_IMAGE>
imagePullPolicy: IfNotPresent
mainClass: org.apache.spark.examples.SparkPageRank
mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.4.jar
arguments:
- oss://<OSS_BUCKET>/data/pagerank_dataset.txt
- "10"
sparkVersion: 3.5.4
hadoopConf:
fs.oss.impl: org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem
fs.oss.endpoint: <OSS_ENDPOINT>
fs.oss.credentials.provider: com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider
sparkConfigMap: spark-log-conf
sparkConf:
spark.eventLog.enabled: "true"
spark.eventLog.dir: file:///mnt/nas/spark/event-logs
spark.shuffle.manager: org.apache.spark.shuffle.celeborn.SparkShuffleManager
spark.serializer: org.apache.spark.serializer.KryoSerializer
spark.celeborn.master.endpoints: celeborn-master-0.celeborn-master-svc.celeborn.svc.cluster.local,celeborn-master-1.celeborn-master-svc.celeborn.svc.cluster.local,celeborn-master-2.celeborn-master-svc.celeborn.svc.cluster.local
spark.celeborn.client.spark.shuffle.writer: hash
spark.celeborn.client.push.replicate.enabled: "false"
spark.sql.adaptive.localShuffleReader.enabled: "false"
spark.sql.adaptive.enabled: "true"
spark.sql.adaptive.skewJoin.enabled: "true"
spark.shuffle.sort.io.plugin.class: org.apache.spark.shuffle.celeborn.CelebornShuffleDataIO
spark.dynamicAllocation.shuffleTracking.enabled: "false"
spark.executor.userClassPathFirst: "false"
driver:
cores: 1
coreLimit: "1"
memory: 4g
template:
spec:
containers:
- name: spark-kubernetes-driver
envFrom:
- secretRef:
name: spark-oss-secret
volumeMounts:
- name: nas
subPath: spark/event-logs
mountPath: /mnt/nas/spark/event-logs
volumes:
- name: nas
persistentVolumeClaim:
claimName: nas-pvc
serviceAccount: spark-operator-spark
executor:
instances: 2
cores: 1
coreLimit: "1"
memory: 4g
template:
spec:
containers:
- name: spark-kubernetes-executor
envFrom:
- secretRef:
name: spark-oss-secret
restartPolicy:
type: Never
详细内容,请参见Spark作业使用Celeborn作为RSS。 |
定义弹性资源调度优先级 | 使用ECI Pod并配置合适的调度策略,可以按需创建并按资源实际用量付费,有效减少集群资源闲置带来的成本浪费。在ECS和ECI资源混用的场景下,还可以指定调度优先级。 您无需在SparkApplication 中修改调度相关配置,ACK调度器会根据配置的弹性策略自动完成Pod调度。您可以按需灵活地定制多种弹性资源(例如ECS和ECI)的混用。
以下示例自定义了弹性策略:对于spark 命名空间中由Spark Operator启动的Pod,优先使用ECS资源并且最多调度10个Pod,在ECS资源不足时再使用ECI资源并且最多调度10个Pod。
apiVersion: scheduling.alibabacloud.com/v1alpha1
kind: ResourcePolicy
metadata:
name: spark
namespace: spark
spec:
selector:
sparkoperator.k8s.io/launched-by-spark-operator: "true"
strategy: prefer
units:
- resource: ecs
max: 10
podLabels:
k8s.aliyun.com/resource-policy-wait-for-ecs-scaling: "true"
nodeSelector:
node.alibabacloud.com/instance-charge-type: PostPaid
- resource: eci
max: 10
ignorePreviousPod: false
ignoreTerminatingPod: true
preemptPolicy: AfterAllUnits
whenTryNextUnits:
policy: TimeoutOrExceedMax
timeout: 30s
详细内容,请参见使用ECI弹性资源运行Spark作业。 |
配置动态资源分配 | 动态资源分配(Dynamic Resource Allocation,简称DRA)可根据工作负载的大小动态调整作业所使用的计算资源。您可以为Spark作业启用动态资源分配,避免因资源不足导致作业执行时间过长或因资源过剩导致资源浪费。
此示例作业结合Celeborn RSS进一步配置了动态资源分配。
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: spark-pagerank
namespace: spark
spec:
type: Scala
mode: cluster
image: <SPARK_IMAGE>
imagePullPolicy: IfNotPresent
mainClass: org.apache.spark.examples.SparkPageRank
mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.4.jar
arguments:
- oss://<OSS_BUCKET>/data/pagerank_dataset.txt
- "10"
sparkVersion: 3.5.4
hadoopConf:
fs.oss.impl: org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem
fs.oss.endpoint: <OSS_ENDPOINT>
fs.oss.credentials.provider: com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider
sparkConfigMap: spark-log-conf
sparkConf:
spark.eventLog.enabled: "true"
spark.eventLog.dir: file:///mnt/nas/spark/event-logs
spark.shuffle.manager: org.apache.spark.shuffle.celeborn.SparkShuffleManager
spark.serializer: org.apache.spark.serializer.KryoSerializer
spark.celeborn.master.endpoints: celeborn-master-0.celeborn-master-svc.celeborn.svc.cluster.local,celeborn-master-1.celeborn-master-svc.celeborn.svc.cluster.local,celeborn-master-2.celeborn-master-svc.celeborn.svc.cluster.local
spark.celeborn.client.spark.shuffle.writer: hash
spark.celeborn.client.push.replicate.enabled: "false"
spark.sql.adaptive.localShuffleReader.enabled: "false"
spark.sql.adaptive.enabled: "true"
spark.sql.adaptive.skewJoin.enabled: "true"
spark.shuffle.sort.io.plugin.class: org.apache.spark.shuffle.celeborn.CelebornShuffleDataIO
spark.executor.userClassPathFirst: "false"
spark.dynamicAllocation.enabled: "true"
spark.dynamicAllocation.shuffleTracking.enabled: "false"
spark.dynamicAllocation.initialExecutors: "3"
spark.dynamicAllocation.minExecutors: "0"
spark.dynamicAllocation.maxExecutors: "10"
spark.dynamicAllocation.executorIdleTimeout: 60s
spark.dynamicAllocation.schedulerBacklogTimeout: 1s
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout: 1s
driver:
cores: 1
coreLimit: "1"
memory: 4g
template:
spec:
containers:
- name: spark-kubernetes-driver
envFrom:
- secretRef:
name: spark-oss-secret
volumeMounts:
- name: nas
subPath: spark/event-logs
mountPath: /mnt/nas/spark/event-logs
volumes:
- name: nas
persistentVolumeClaim:
claimName: nas-pvc
serviceAccount: spark-operator-spark
executor:
cores: 1
coreLimit: "1"
memory: 4g
template:
spec:
containers:
- name: spark-kubernetes-executor
envFrom:
- secretRef:
name: spark-oss-secret
restartPolicy:
type: Never
详细内容,请参见Spark作业配置动态资源分配。 |
使用Fluid加速数据访问 | 如果您的数据位于线下IDC或在数据访问时遇到性能瓶颈,可使用Fluid提供的数据接入和分布式缓存编排能力加速数据访问。 详细内容,请参见Spark作业使用Fluid加速数据访问。 |