Spark作业使用Celeborn作为RSS

Apache Celeborn是一个专门用于处理大数据计算引擎中间数据(如Shuffle数据和溢写数据)的服务,能够提升大数据引擎的性能、稳定性和灵活性。Remote Shuffle Service(RSS)用于高效处理大规模数据集的Shuffle过程。本文介绍如何在ACK集群中部署Celeborn组件,并在Spark作业中使用Celeborn作为Remote Shuffle Service(RSS)。

Spark作业中使用Celeborn的优势

对于MapReduce、SparkFlink等大数据处理框架,使用Celeborn作为RSS具有如下优势:

  • 推送式Shuffle写入(Push-based shuffle write):Mapper节点不需要将数据存储在本地磁盘,适合云端存算分离架构。

  • 合并式Shuffle读取(Merge-based shuffle read):数据在Worker节点进行合并,而非在Reducer节点,避免小文件的随机读写及小数据量传输带来的网络开销,提升数据处理效率。

  • 高可用性:CelebornMaster节点基于Raft协议实现高可用性,确保系统的稳定运行。

  • 高容错性:支持双副本机制,显著降低Fetch失败的概率。

前提条件

集群环境

本示例中使用的ACK集群环境信息如下所示。

  • Master进程部署到节点池celeborn-master中,配置如下:

    • 节点池名称:celeborn-master

    • 节点数:3

    • ECS实例规格类型:g8i.2xlarge

    • 标签:celeborn.apache.org/role=master

    • 污点:celeborn.apache.org/role=master:NoSchedule

    • 单节点数据存储:/mnt/celeborn_ratis(1024GB)

  • Worker进程部署到节点池celeborn-worker中,配置如下:

    • 节点池名称:celeborn-worker

    • 节点数:5

    • ECS实例规格类型:g8i.4xlarge

    • 标签:celeborn.apache.org/role=worker

    • 污点:celeborn.apache.org/role=worker:NoSchedule

    • 单节点数据存储:

      • /mnt/disk1(1024GB)

      • /mnt/disk2(1024GB)

      • /mnt/disk3(1024GB)

      • /mnt/disk4(1024GB)

流程概述

本文将引导您完成以下步骤,帮助您了解如何在ACK集群中部署Celeborn。

  1. 构建Celeborn容器镜像

    根据所需的Celeborn版本下载相应的发行版,然后构建容器镜像并将其推送至您的镜像仓库,以供部署ack-celeborn组件时使用。

  2. 部署ack-celeborn组件

    通过ACK应用市场提供的ack-celeborn Helm Chart,使用已构建的Celeborn容器镜像,一键部署Celeborn集群。

  3. 构建Spark容器镜像

    构建包含了Celeborn和访问OSS相关Jar包依赖的Spark容器镜像,并推送到您的镜像仓库中。

  4. 准备测试数据并上传至OSS

    生成PageRank作业的测试数据集并将其上传至OSS。

  5. 运行示例Spark作业

    运行示例PageRank作业并配置使用Celeborn作为RSS。

  6. (可选)环境清理

    体验完本教程后,清理无需使用的Spark作业和资源,避免产生额外的费用。

步骤一:构建Celeborn容器镜像

根据您所使用的Celeborn版本,从Celeborn 官网下载相应的发行版(如0.5.2版本)。在配置过程中,将<IMAGE-REGISTRY><IMAGE-REPOSITORY>替换为您自己的镜像仓库和镜像名称。同时,您可以通过修改PLATFORMS变量来配置所需的镜像架构。更多信息,请参见Deploy Celeborn on Kubernetesdocker buildx命令需要Docker版本19.03或更高版本支持,升级详情请参见安装Docker

CELEBORN_VERSION=0.5.2               # Celeborn版本号。

IMAGE_REGISTRY=<IMAGE-REGISTRY>      # 镜像仓库,例如docker.io。

IMAGE_REPOSITORY=<IMAGE-REPOSITORY>  # 镜像名称,例如apache/celeborn。

IMAGE_TAG=${CELEBORN_VERSION}        # 镜像标签,这里使用Celeborn版本号作为标签。

# 下载。
wget https://downloads.apache.org/celeborn/celeborn-${CELEBORN_VERSION}/apache-celeborn-${CELEBORN_VERSION}-bin.tgz

# 解压。
tar -zxvf apache-celeborn-${CELEBORN_VERSION}-bin.tgz

# 切换工作目录。
cd apache-celeborn-${CELEBORN_VERSION}-bin

# 使用Docker Buildkit构建镜像并推送到镜像仓库中。
docker buildx build \
    --output=type=registry \
    --push \
    --platform=${PLATFORMS} \
    --tag=${IMAGE_REGISTRY}/${IMAGE_REPOSITORY}:${IMAGE_TAG} \
    -f docker/Dockerfile \
    .

步骤二:部署ack-celeborn组件

  1. 登录容器服务管理控制台,在左侧导航栏选择市场 > 应用市场

  2. 应用市场页面,单击应用目录页签,然后搜索并选中ack-celeborn,然后在ack-celeborn页面,单击一键部署

  3. 创建面板中,选择集群和命名空间,然后单击下一步

  4. 参数配置页面,设置相应参数,然后单击确定

    image:                         # 需替换成步骤一中构建得到的Celeborn镜像地址。
      registry: docker.io          # 镜像仓库。
      repository: apache/celeborn  # 镜像名称。
      tag: 0.5.2                   # 镜像标签。
    
    celeborn:
      celeborn.client.push.stageEnd.timeout: 120s
      celeborn.master.ha.enabled: true
      celeborn.master.ha.ratis.raft.server.storage.dir: /mnt/celeborn_ratis
      celeborn.master.heartbeat.application.timeout: 300s
      celeborn.master.heartbeat.worker.timeout: 120s
      celeborn.master.http.port: 9098
      celeborn.metrics.enabled: true
      celeborn.metrics.prometheus.path: /metrics/prometheus
      celeborn.rpc.dispatcher.numThreads: 4
      celeborn.rpc.io.clientThreads: 64
      celeborn.rpc.io.numConnectionsPerPeer: 2
      celeborn.rpc.io.serverThreads: 64
      celeborn.shuffle.chunk.size: 8m
      celeborn.worker.fetch.io.threads: 32
      celeborn.worker.flusher.buffer.size: 256K
      celeborn.worker.http.port: 9096
      celeborn.worker.monitor.disk.enabled: false
      celeborn.worker.push.io.threads: 32
      celeborn.worker.storage.dirs: /mnt/disk1:disktype=SSD:capacity=1024Gi,/mnt/disk2:disktype=SSD:capacity=1024Gi,/mnt/disk3:disktype=SSD:capacity=1024Gi,/mnt/disk4:disktype=SSD:capacity=1024Gi
    
    master:
      replicas: 3
      env:
      - name: CELEBORN_MASTER_MEMORY
        value: 28g
      - name: CELEBORN_MASTER_JAVA_OPTS
        value: -XX:-PrintGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -Xloggc:gc-master.out -Dio.netty.leakDetectionLevel=advanced
      - name: CELEBORN_NO_DAEMONIZE
        value: "1"
      - name: TZ
        value: Asia/Shanghai
      volumeMounts:
      - name: celeborn-ratis
        mountPath: /mnt/celeborn_ratis
      resources:
        requests:
          cpu: 7                
          memory: 28Gi        
        limits:
          cpu: 7
          memory: 28Gi
      volumes:
      - name: celeborn-ratis
        hostPath:
          path: /mnt/celeborn_ratis
          type: DirectoryOrCreate
      nodeSelector:
        celeborn.apache.org/role: master
      tolerations:
      - key: celeborn.apache.org/role
        operator: Equal
        value: master
        effect: NoSchedule
    
    worker:
      replicas: 5
      env:
      - name: CELEBORN_WORKER_MEMORY
        value: 28g
      - name: CELEBORN_WORKER_OFFHEAP_MEMORY
        value: 28g
      - name: CELEBORN_WORKER_JAVA_OPTS
        value: -XX:-PrintGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -Xloggc:gc-worker.out -Dio.netty.leakDetectionLevel=advanced
      - name: CELEBORN_NO_DAEMONIZE
        value: "1"
      - name: TZ
        value: Asia/Shanghai
      volumeMounts:
      - name: disk1
        mountPath: /mnt/disk1
      - name: disk2
        mountPath: /mnt/disk2
      - name: disk3
        mountPath: /mnt/disk3
      - name: disk4
        mountPath: /mnt/disk4
      resources:
        requests:
          cpu: 14
          memory: 56Gi
        limits:
          cpu: 14
          memory: 56Gi
      volumes:
      - name: disk1
        hostPath:
          path: /mnt/disk1
          type: DirectoryOrCreate
      - name: disk2
        hostPath:
          path: /mnt/disk2
          type: DirectoryOrCreate
      - name: disk3
        hostPath:
          path: /mnt/disk3
          type: DirectoryOrCreate
      - name: disk4
        hostPath:
          path: /mnt/disk4
          type: DirectoryOrCreate
      nodeSelector:
        celeborn.apache.org/role: worker
      tolerations:
      - key: celeborn.apache.org/role
        operator: Equal
        value: worker
        effect: NoSchedule

    下表列出了部分配置参数的说明。完整的参数配置详情,您可以在ack-celeborn页面中的配置项查看。

    参数配置

    参数

    描述

    示例值

    image.registry

    镜像仓库地址。

    "docker.io"

    image.repository

    镜像名称。

    "apache/celeborn"

    image.tag

    镜像标签。

    "0.5.1"

    image.pullPolicy

    镜像拉取策略。

    "IfNotPresent"

    celeborn

    Celeoborn配置项。

    {
      "celeborn.client.push.stageEnd.timeout": "120s",
      "celeborn.master.ha.enabled": true,
      "celeborn.master.ha.ratis.raft.server.storage.dir": "/mnt/celeborn_ratis",
      "celeborn.master.heartbeat.application.timeout": "300s",
      "celeborn.master.heartbeat.worker.timeout": "120s",
      "celeborn.master.http.port": 9098,
      "celeborn.metrics.enabled": true,
      "celeborn.metrics.prometheus.path": "/metrics/prometheus",
      "celeborn.rpc.dispatcher.numThreads": 4,
      "celeborn.rpc.io.clientThreads": 64,
      "celeborn.rpc.io.numConnectionsPerPeer": 2,
      "celeborn.rpc.io.serverThreads": 64,
      "celeborn.shuffle.chunk.size": "8m",
      "celeborn.worker.fetch.io.threads": 32,
      "celeborn.worker.flusher.buffer.size": "256K",
      "celeborn.worker.http.port": 9096,
      "celeborn.worker.monitor.disk.enabled": false,
      "celeborn.worker.push.io.threads": 32,
      "celeborn.worker.storage.dirs": "/mnt/disk1:disktype=SSD:capacity=1024Gi,/mnt/disk2:disktype=SSD:capacity=1024Gi,/mnt/disk3:disktype=SSD:capacity=1024Gi,/mnt/disk4:disktype=SSD:capacity=1024Gi"
    }

    master.replicas

    Master Pod副本数量。

    3

    master.volumeMounts

    Master 容器数据卷挂载。

    [
      {
        "mountPath": "/mnt/celeborn_ratis",
        "name": "celeborn-ratis"
      }
    ]

    master.volumes

    Master Pod数据卷。

    目前数据卷类型仅支持hostPathemptyDir类型。

    [
      {
        "hostPath": {
          "path": "/mnt/celeborn_ratis",
          "type": "DirectoryOrCreate"
        },
        "name": "celeborn-ratis"
      }
    ]

    master.nodeSelector

    Master Pod节点选择器。

    {}

    master.affinity

    Master Pod亲和性。

    {
      "podAntiAffinity": {
        "requiredDuringSchedulingIgnoredDuringExecution": [
          {
            "labelSelector": {
              "matchExpressions": [
                {
                  "key": "app.kubernetes.io/name",
                  "operator": "In",
                  "values": [
                    "celeborn"
                  ]
                },
                {
                  "key": "app.kubernetes.io/role",
                  "operator": "In",
                  "values": [
                    "master"
                  ]
                }
              ]
            },
            "topologyKey": "kubernetes.io/hostname"
          }
        ]
      }
    }

    master.tolerations

    Master Pod污点容忍。

    []

    worker.replicas

    Worker Pod副本数量。

    5

    worker.volumeMounts

    Worker 容器数据卷挂载。

    [
      {
        "mountPath": "/mnt/disk1",
        "name": "disk1"
      },
      {
        "mountPath": "/mnt/disk2",
        "name": "disk2"
      },
      {
        "mountPath": "/mnt/disk3",
        "name": "disk3"
      },
      {
        "mountPath": "/mnt/disk4",
        "name": "disk4"
      }
    ]

    worker.volumes

    Worker Pod数据卷。

    目前数据卷类型仅支持hostPathemptyDir类型;

    [
      {
        "capacity": "100Gi",
        "diskType": "SSD",
        "hostPath": "/mnt/disk1",
        "mountPath": "/mnt/disk1",
        "type": "hostPath"
      },
      {
        "capacity": "100Gi",
        "diskType": "SSD",
        "hostPath": "/mnt/disk2",
        "mountPath": "/mnt/disk2",
        "type": "hostPath"
      },
      {
        "capacity": "100Gi",
        "diskType": "SSD",
        "hostPath": "/mnt/disk3",
        "mountPath": "/mnt/disk3",
        "type": "hostPath"
      },
      {
        "capacity": "100Gi",
        "diskType": "SSD",
        "hostPath": "/mnt/disk4",
        "mountPath": "/mnt/disk4",
        "type": "hostPath"
      }
    ]

    worker.nodeSelector

    Worker Pod节点选择器。

    {}

    worker.affinity

    Worker Pod亲和性。

    {
      "podAntiAffinity": {
        "requiredDuringSchedulingIgnoredDuringExecution": [
          {
            "labelSelector": {
              "matchExpressions": [
                {
                  "key": "app.kubernetes.io/name",
                  "operator": "In",
                  "values": [
                    "celeborn"
                  ]
                },
                {
                  "key": "app.kubernetes.io/role",
                  "operator": "In",
                  "values": [
                    "worker"
                  ]
                }
              ]
            },
            "topologyKey": "kubernetes.io/hostname"
          }
        ]
      }
    }

    worker.tolerations

    Worker Pod污点容忍。

    []

  1. 执行以下命令并耐心等待Celeborn部署完成。在组件部署期间,如遇到Pod异常问题请参见Pod异常问题排查

    kubectl get -n celeborn statefulset 

    预期输出:

    NAME              READY   AGE
    celeborn-master   3/3     68s
    celeborn-worker   5/5     68s

步骤三:构建Spark容器镜像

Spark 3.5.3版本为例,创建如下Dockerfile文件,构建并上传至您的镜像仓库。

ARG SPARK_IMAGE=<SPARK_IMAGE>  # 将<SPARK_IMAGE>替换成您的Spark基础镜像。

FROM ${SPARK_IMAGE}

# Add dependency for Hadoop Aliyun OSS support
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aliyun/3.3.4/hadoop-aliyun-3.3.4.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/com/aliyun/oss/aliyun-sdk-oss/3.17.4/aliyun-sdk-oss-3.17.4.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/jdom/jdom2/2.0.6.1/jdom2-2.0.6.1.jar ${SPARK_HOME}/jars

# Add dependency for Celeborn
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/apache/celeborn/celeborn-client-spark-3-shaded_2.12/0.5.1/celeborn-client-spark-3-shaded_2.12-0.5.1.jar ${SPARK_HOME}/jars

步骤四:准备测试数据并上传至 OSS

关于如何准备测试数据并上传至OSS,参见步骤一:准备测试数据并上传至OSS

步骤五:创建Secret存储OSS访问凭据

关于如何创建Secret用于存储OSS访问凭据,参见步骤三:创建Secret存储OSS访问凭据

步骤六:提交示例Spark作业

根据如下内容创建SparkApplication清单文件并保存为spark-pagerank.yaml。将<SPARK_IMAGE>替换为您在步骤三:构建Spark容器镜像的仓库地址,同时将<OSS_BUCKET><OSS_ENDPOINT>替换成您的OSS存储桶名称和访问端点。关于如何在Spark作业中配置Celeborn的更多信息,请参见Celeborn使用文档

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: spark-pagerank
  namespace: default
spec:
  type: Scala
  mode: cluster
  image: <SPARK_IMAGE>                                     # Spark 镜像,将<SPARK_IMAGE>替换成Spark镜像名称
  mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.3.jar
  mainClass: org.apache.spark.examples.SparkPageRank
  arguments:
  - oss://<OSS_BUCKET>/data/pagerank_dataset.txt           # 指定输入测试数据集,将<OSS_BUCKET>替换成OSS Buckt名称。
  - "10"                                                   # 迭代次数。
  sparkVersion: 3.5.3
  hadoopConf:
    fs.AbstractFileSystem.oss.impl: org.apache.hadoop.fs.aliyun.oss.OSS
    fs.oss.impl: org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem
    fs.oss.endpoint: <OSS_ENDPOINT>                        # OSS访问端点。例如北京地区OSS的内网访问地址为oss-cn-beijing-internal.aliyuncs.com。 
    fs.oss.credentials.provider: com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider
  sparkConf:
    spark.shuffle.manager: org.apache.spark.shuffle.celeborn.SparkShuffleManager
    spark.serializer: org.apache.spark.serializer.KryoSerializer
    spark.celeborn.master.endpoints: celeborn-master-0.celeborn-master-svc.celeborn.svc.cluster.local,celeborn-master-1.celeborn-master-svc.celeborn.svc.cluster.local,celeborn-master-2.celeborn-master-svc.celeborn.svc.cluster.local
    spark.celeborn.client.spark.shuffle.writer: hash
    spark.celeborn.client.push.replicate.enabled: "false"
    spark.sql.adaptive.localShuffleReader.enabled: "false"
    spark.sql.adaptive.enabled: "true"
    spark.sql.adaptive.skewJoin.enabled: "true"
    spark.shuffle.sort.io.plugin.class: org.apache.spark.shuffle.celeborn.CelebornShuffleDataIO
    spark.dynamicAllocation.shuffleTracking.enabled: "false"
    spark.executor.userClassPathFirst: "false"
  driver:
    cores: 1
    coreLimit: 1200m
    memory: 512m
    serviceAccount: spark-operator-spark
    envFrom:
    - secretRef:
        name: spark-oss-secret
  executor:
    instances: 2
    cores: 1
    coreLimit: "2"
    memory: 8g
    envFrom:
    - secretRef:
        name: spark-oss-secret
  restartPolicy:
    type: Never

(可选)步骤七:环境清理

如果您已体验完本教程,相关资源如不再需要,可以通过执行以下命令进行删除。

执行如下命令删除Spark作业。

kubectl delete sparkapplication spark-pagerank

执行如下命令删除Secret资源。

kubectl delete secret spark-oss-secret

相关文档