Apache Celeborn是一个专门用于处理大数据计算引擎中间数据(如Shuffle数据和溢写数据)的服务,能够提升大数据引擎的性能、稳定性和灵活性。Remote Shuffle Service(RSS)用于高效处理大规模数据集的Shuffle过程。本文介绍如何在ACK集群中部署Celeborn组件,并在Spark作业中使用Celeborn作为Remote Shuffle Service(RSS)。
在Spark作业中使用Celeborn的优势
对于MapReduce、Spark和Flink等大数据处理框架,使用Celeborn作为RSS具有如下优势:
推送式Shuffle写入(Push-based shuffle write):Mapper节点不需要将数据存储在本地磁盘,适合云端存算分离架构。
合并式Shuffle读取(Merge-based shuffle read):数据在Worker节点进行合并,而非在Reducer节点,避免小文件的随机读写及小数据量传输带来的网络开销,提升数据处理效率。
高可用性:Celeborn的Master节点基于Raft协议实现高可用性,确保系统的稳定运行。
高容错性:支持双副本机制,显著降低Fetch失败的概率。
前提条件
已部署ack-spark-operator组件,请参见部署ack-spark-operator组件。
已通过kubectl工具连接集群。具体操作,请参见获取集群KubeConfig并通过kubectl工具连接集群。
已创建OSS存储空间。具体操作请参见创建存储空间。
已安装ossutil并配置ossutil。关于ossutil命令参考请参见命令行工具ossutil命令参考。
根据如下集群环境配置创建和管理节点池。
集群环境
本示例中使用的ACK集群环境信息如下所示。
Master进程部署到节点池celeborn-master中,配置如下:
节点池名称:celeborn-master
节点数:3
ECS实例规格类型:g8i.2xlarge
标签:celeborn.apache.org/role=master
污点:celeborn.apache.org/role=master:NoSchedule
单节点数据存储:/mnt/celeborn_ratis(1024GB)
Worker进程部署到节点池celeborn-worker中,配置如下:
节点池名称:celeborn-worker
节点数:5
ECS实例规格类型:g8i.4xlarge
标签:celeborn.apache.org/role=worker
污点:celeborn.apache.org/role=worker:NoSchedule
单节点数据存储:
/mnt/disk1(1024GB)
/mnt/disk2(1024GB)
/mnt/disk3(1024GB)
/mnt/disk4(1024GB)
流程概述
本文将引导您完成以下步骤,帮助您了解如何在ACK集群中部署Celeborn。
构建Celeborn容器镜像
根据所需的Celeborn版本下载相应的发行版,然后构建容器镜像并将其推送至您的镜像仓库,以供部署ack-celeborn组件时使用。
部署ack-celeborn组件
通过ACK应用市场提供的ack-celeborn Helm Chart,使用已构建的Celeborn容器镜像,一键部署Celeborn集群。
构建Spark容器镜像
构建包含了Celeborn和访问OSS相关Jar包依赖的Spark容器镜像,并推送到您的镜像仓库中。
准备测试数据并上传至OSS
生成PageRank作业的测试数据集并将其上传至OSS。
运行示例Spark作业
运行示例PageRank作业并配置使用Celeborn作为RSS。
(可选)环境清理
体验完本教程后,清理无需使用的Spark作业和资源,避免产生额外的费用。
步骤一:构建Celeborn容器镜像
根据您所使用的Celeborn版本,从Celeborn 官网下载相应的发行版(如0.5.2版本)。在配置过程中,将<IMAGE-REGISTRY>
和<IMAGE-REPOSITORY>
替换为您自己的镜像仓库和镜像名称。同时,您可以通过修改PLATFORMS
变量来配置所需的镜像架构。更多信息,请参见Deploy Celeborn on Kubernetes。docker buildx
命令需要Docker版本19.03或更高版本支持,升级详情请参见安装Docker。
CELEBORN_VERSION=0.5.2 # Celeborn版本号。
IMAGE_REGISTRY=<IMAGE-REGISTRY> # 镜像仓库,例如docker.io。
IMAGE_REPOSITORY=<IMAGE-REPOSITORY> # 镜像名称,例如apache/celeborn。
IMAGE_TAG=${CELEBORN_VERSION} # 镜像标签,这里使用Celeborn版本号作为标签。
# 下载。
wget https://downloads.apache.org/celeborn/celeborn-${CELEBORN_VERSION}/apache-celeborn-${CELEBORN_VERSION}-bin.tgz
# 解压。
tar -zxvf apache-celeborn-${CELEBORN_VERSION}-bin.tgz
# 切换工作目录。
cd apache-celeborn-${CELEBORN_VERSION}-bin
# 使用Docker Buildkit构建镜像并推送到镜像仓库中。
docker buildx build \
--output=type=registry \
--push \
--platform=${PLATFORMS} \
--tag=${IMAGE_REGISTRY}/${IMAGE_REPOSITORY}:${IMAGE_TAG} \
-f docker/Dockerfile \
.
步骤二:部署ack-celeborn组件
登录容器服务管理控制台,在左侧导航栏选择 。
在应用市场页面,单击应用目录页签,然后搜索并选中ack-celeborn,然后在ack-celeborn页面,单击一键部署。
在创建面板中,选择集群和命名空间,然后单击下一步。
在参数配置页面,设置相应参数,然后单击确定。
image: # 需替换成步骤一中构建得到的Celeborn镜像地址。 registry: docker.io # 镜像仓库。 repository: apache/celeborn # 镜像名称。 tag: 0.5.2 # 镜像标签。 celeborn: celeborn.client.push.stageEnd.timeout: 120s celeborn.master.ha.enabled: true celeborn.master.ha.ratis.raft.server.storage.dir: /mnt/celeborn_ratis celeborn.master.heartbeat.application.timeout: 300s celeborn.master.heartbeat.worker.timeout: 120s celeborn.master.http.port: 9098 celeborn.metrics.enabled: true celeborn.metrics.prometheus.path: /metrics/prometheus celeborn.rpc.dispatcher.numThreads: 4 celeborn.rpc.io.clientThreads: 64 celeborn.rpc.io.numConnectionsPerPeer: 2 celeborn.rpc.io.serverThreads: 64 celeborn.shuffle.chunk.size: 8m celeborn.worker.fetch.io.threads: 32 celeborn.worker.flusher.buffer.size: 256K celeborn.worker.http.port: 9096 celeborn.worker.monitor.disk.enabled: false celeborn.worker.push.io.threads: 32 celeborn.worker.storage.dirs: /mnt/disk1:disktype=SSD:capacity=1024Gi,/mnt/disk2:disktype=SSD:capacity=1024Gi,/mnt/disk3:disktype=SSD:capacity=1024Gi,/mnt/disk4:disktype=SSD:capacity=1024Gi master: replicas: 3 env: - name: CELEBORN_MASTER_MEMORY value: 28g - name: CELEBORN_MASTER_JAVA_OPTS value: -XX:-PrintGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -Xloggc:gc-master.out -Dio.netty.leakDetectionLevel=advanced - name: CELEBORN_NO_DAEMONIZE value: "1" - name: TZ value: Asia/Shanghai volumeMounts: - name: celeborn-ratis mountPath: /mnt/celeborn_ratis resources: requests: cpu: 7 memory: 28Gi limits: cpu: 7 memory: 28Gi volumes: - name: celeborn-ratis hostPath: path: /mnt/celeborn_ratis type: DirectoryOrCreate nodeSelector: celeborn.apache.org/role: master tolerations: - key: celeborn.apache.org/role operator: Equal value: master effect: NoSchedule worker: replicas: 5 env: - name: CELEBORN_WORKER_MEMORY value: 28g - name: CELEBORN_WORKER_OFFHEAP_MEMORY value: 28g - name: CELEBORN_WORKER_JAVA_OPTS value: -XX:-PrintGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -Xloggc:gc-worker.out -Dio.netty.leakDetectionLevel=advanced - name: CELEBORN_NO_DAEMONIZE value: "1" - name: TZ value: Asia/Shanghai volumeMounts: - name: disk1 mountPath: /mnt/disk1 - name: disk2 mountPath: /mnt/disk2 - name: disk3 mountPath: /mnt/disk3 - name: disk4 mountPath: /mnt/disk4 resources: requests: cpu: 14 memory: 56Gi limits: cpu: 14 memory: 56Gi volumes: - name: disk1 hostPath: path: /mnt/disk1 type: DirectoryOrCreate - name: disk2 hostPath: path: /mnt/disk2 type: DirectoryOrCreate - name: disk3 hostPath: path: /mnt/disk3 type: DirectoryOrCreate - name: disk4 hostPath: path: /mnt/disk4 type: DirectoryOrCreate nodeSelector: celeborn.apache.org/role: worker tolerations: - key: celeborn.apache.org/role operator: Equal value: worker effect: NoSchedule
下表列出了部分配置参数的说明。完整的参数配置详情,您可以在ack-celeborn页面中的配置项查看。
执行以下命令并耐心等待Celeborn部署完成。在组件部署期间,如遇到Pod异常问题请参见Pod异常问题排查。
kubectl get -n celeborn statefulset
预期输出:
NAME READY AGE celeborn-master 3/3 68s celeborn-worker 5/5 68s
步骤三:构建Spark容器镜像
以Spark 3.5.3版本为例,创建如下Dockerfile文件,构建并上传至您的镜像仓库。
ARG SPARK_IMAGE=<SPARK_IMAGE> # 将<SPARK_IMAGE>替换成您的Spark基础镜像。
FROM ${SPARK_IMAGE}
# Add dependency for Hadoop Aliyun OSS support
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aliyun/3.3.4/hadoop-aliyun-3.3.4.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/com/aliyun/oss/aliyun-sdk-oss/3.17.4/aliyun-sdk-oss-3.17.4.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/jdom/jdom2/2.0.6.1/jdom2-2.0.6.1.jar ${SPARK_HOME}/jars
# Add dependency for Celeborn
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/apache/celeborn/celeborn-client-spark-3-shaded_2.12/0.5.1/celeborn-client-spark-3-shaded_2.12-0.5.1.jar ${SPARK_HOME}/jars
步骤四:准备测试数据并上传至 OSS
关于如何准备测试数据并上传至OSS,参见步骤一:准备测试数据并上传至OSS。
步骤五:创建Secret存储OSS访问凭据
关于如何创建Secret用于存储OSS访问凭据,参见步骤三:创建Secret存储OSS访问凭据。
步骤六:提交示例Spark作业
根据如下内容创建SparkApplication清单文件并保存为spark-pagerank.yaml
。将<SPARK_IMAGE>
替换为您在步骤三:构建Spark容器镜像的仓库地址,同时将<OSS_BUCKET>
和<OSS_ENDPOINT>
替换成您的OSS存储桶名称和访问端点。关于如何在Spark作业中配置Celeborn的更多信息,请参见Celeborn使用文档。
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: spark-pagerank
namespace: default
spec:
type: Scala
mode: cluster
image: <SPARK_IMAGE> # Spark 镜像,将<SPARK_IMAGE>替换成Spark镜像名称
mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.3.jar
mainClass: org.apache.spark.examples.SparkPageRank
arguments:
- oss://<OSS_BUCKET>/data/pagerank_dataset.txt # 指定输入测试数据集,将<OSS_BUCKET>替换成OSS Buckt名称。
- "10" # 迭代次数。
sparkVersion: 3.5.3
hadoopConf:
fs.AbstractFileSystem.oss.impl: org.apache.hadoop.fs.aliyun.oss.OSS
fs.oss.impl: org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem
fs.oss.endpoint: <OSS_ENDPOINT> # OSS访问端点。例如北京地区OSS的内网访问地址为oss-cn-beijing-internal.aliyuncs.com。
fs.oss.credentials.provider: com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider
sparkConf:
spark.shuffle.manager: org.apache.spark.shuffle.celeborn.SparkShuffleManager
spark.serializer: org.apache.spark.serializer.KryoSerializer
spark.celeborn.master.endpoints: celeborn-master-0.celeborn-master-svc.celeborn.svc.cluster.local,celeborn-master-1.celeborn-master-svc.celeborn.svc.cluster.local,celeborn-master-2.celeborn-master-svc.celeborn.svc.cluster.local
spark.celeborn.client.spark.shuffle.writer: hash
spark.celeborn.client.push.replicate.enabled: "false"
spark.sql.adaptive.localShuffleReader.enabled: "false"
spark.sql.adaptive.enabled: "true"
spark.sql.adaptive.skewJoin.enabled: "true"
spark.shuffle.sort.io.plugin.class: org.apache.spark.shuffle.celeborn.CelebornShuffleDataIO
spark.dynamicAllocation.shuffleTracking.enabled: "false"
spark.executor.userClassPathFirst: "false"
driver:
cores: 1
coreLimit: 1200m
memory: 512m
serviceAccount: spark-operator-spark
envFrom:
- secretRef:
name: spark-oss-secret
executor:
instances: 2
cores: 1
coreLimit: "2"
memory: 8g
envFrom:
- secretRef:
name: spark-oss-secret
restartPolicy:
type: Never
(可选)步骤七:环境清理
如果您已体验完本教程,相关资源如不再需要,可以通过执行以下命令进行删除。
执行如下命令删除Spark作业。
kubectl delete sparkapplication spark-pagerank
执行如下命令删除Secret资源。
kubectl delete secret spark-oss-secret
相关文档
关于如何使用Spark Operator提交Spark作业,请参见使用Spark Operator运行Spark作业。
关于如何使用Spark History Server查看Spark作业信息,请参见使用Spark History Server查看Spark作业信息。
关于如何使用Celeborn,请参见Apache Celeborn 使用文档。