Spark作业使用Fluid加速数据访问

本文介绍如何使用Fluid加速数据访问,通过JindoRuntime优化OSS数据访问,从而提升数据密集型应用的性能。

前提条件

  • 已部署ack-spark-operator组件,请参见部署ack-spark-operator组件

    说明

    本教程在部署时使用了配置参数 spark.jobNamespaces=["spark"],如果您配置了不同的命名空间,在体验本教程时需要相应地修改namespace字段。

  • 已安装云原生AI套件并部署ack-fluid组件。具体操作,请参见安装云原生AI套件

  • 已生成测试数据并上传至OSS。具体操作,请参见准备测试数据并上传至OSS

Fluid简介

Fluid是一个开源的Kubernetes原生的分布式数据集编排和加速引擎,主要服务于云原生场景下的数据密集型应用,例如大数据应用、AI 应用等。Fluid的核心功能包括:

  • 数据集抽象原生支持:将数据密集型应用所需基础支撑能力功能化,实现数据高效访问并降低多维管理成本。

  • 可扩展的数据引擎插件:提供统一的访问接口,方便接入第三方存储,通过不同的Runtime实现数据操作。

  • 自动化的数据操作:提供多种操作模式,与自动化运维体系相结合。

  • 数据弹性与调度:将数据缓存技术和弹性扩缩容、数据亲和性调度能力相结合,提高数据访问性能。

  • 运行时平台无关:支持原生、边缘、Serverless Kubernetes集群、Kubernetes多集群等多样化环境,适用于混合云场景。

关于Fluid的更多信息,请参见弹性数据集

步骤一:创建Fluid专属节点池

ACK集群中创建名为fluid的专属节点池用于部署FluidJindoRuntime Worker pod。本教程中示例节点池中共3个节点,实例规格均为大数据网络增强型实例ecs.d1ne.4xlarge,每个节点打上标签fluid-cloudnative.github.io/node="true"和污点fluid-cloudnative.github.io/node="true":NoSchedule,每个节点带有8块大小为5905 GB的高吞吐SATA HDD本地盘,分别被格式化和挂载至/mnt/disk1/mnt/disk2、... 和 /mnt/disk8。关于创建节点池的具体操作,请参见创建和管理节点池。关于节点池的选型,请参见Fluid数据缓存优化策略最佳实践

步骤二:创建Dataset

  1. 创建如下Secret清单文件并保存为fluid-oss-secret.yaml,用于存储OSS访问凭据。

    请将<ACCESS_KEY_ID><ACCESS_KEY_SECRET>替换为您的阿里云AccessKey IDAccessKey Secret。

    apiVersion: v1
    kind: Secret
    metadata:
      name: fluid-oss-secret
      namespace: spark
    stringData:
      OSS_ACCESS_KEY_ID: <ACCESS_KEY_ID>
      OSS_ACCESS_KEY_SECRET: <ACCESS_KEY_SECRET>
  1. 执行如下命令,创建Secret资源。

    kubectl create -f fluid-oss-secret.yaml

    预期输出如下:

    secret/fluid-oss-secret created
  1. 创建如下DataSet清单文件并保存为spark-fluid-dataset.yaml

    apiVersion: data.fluid.io/v1alpha1
    kind: Dataset
    metadata:
      name: spark
      namespace: spark
    spec:
      mounts:
      - name: spark
        # 需要加速的OSS访问路径,需将<OSS_BUCKET>替换成您的OSS存储桶名称。
        mountPoint: oss://<OSS_BUCKET>/
        path: /
        options:
          # OSS访问端点,需将<OSS_ENDPOINT>替换成您的OSS存储桶访问端点,
          # 例如,北京地域OSS内网访问端点为oss-cn-beijing-internal.aliyuncs.com。
          fs.oss.endpoint: <OSS_ENDPOINT>
        encryptOptions:
        - name: fs.oss.accessKeyId
          valueFrom:
            secretKeyRef:
              name: fluid-oss-secret
              key: OSS_ACCESS_KEY_ID
        - name: fs.oss.accessKeySecret
          valueFrom:
            secretKeyRef:
              name: fluid-oss-secret
              key: OSS_ACCESS_KEY_SECRET
      # 数据将缓存到满足亲和性的节点上。
      nodeAffinity:
        required:
          nodeSelectorTerms:
          - matchExpressions:
            - key: fluid-cloudnative.github.io/node
              operator: In
              values:
              - "true"
      # 节点污点容忍。
      tolerations:
      - key: fluid-cloudnative.github.io/node
        operator: Equal
        value: "true"
        effect: NoSchedule

    其中各个字段的说明如下:

    • mountPoint:需要加速的OSS路径。

    • fs.oss.endpoint:OSS Bucket访问端点,例如北京地域内网端点为oss-cn-beijing-internal.aliyuncs.com

    • encryptOptions :配置了从名为fluid-oss-secretSecret中读取OSS_ACCESS_KEY_IDOSS_ACCESS_KEY_SECRET 作为OSS访问凭据。

  2. 执行如下命令,创建Dataset资源。

    kubectl create -f spark-fluid-dataset.yaml

    预期输出:

    dataset.data.fluid.io/spark created
  3. 执行如下命令查看Dataset部署状态。

    kubectl get -n spark dataset spark -o wide

    预期输出:

    NAME    UFS TOTAL SIZE   CACHED   CACHE CAPACITY   CACHED PERCENTAGE   PHASE      HCFS URL   TOTAL FILES   CACHE HIT RATIO   AGE
    spark                                                                  NotBound                                              58m

    可以看到此时Dataset处于NotBound状态。

步骤三:创建JindoRuntime

  1. 根据如下内容创建JindoRuntime清单文件并保存为 spark-fluid-jindoruntime.yaml

    apiVersion: data.fluid.io/v1alpha1
    kind: JindoRuntime
    metadata:
      # 必须和Dataset的名称相同。
      name: spark
      namespace: spark
    spec:
      # Worker副本数量。
      replicas: 3
      tieredstore:
        levels:
        # 缓存类型为HDD磁盘。
        - mediumtype: HDD
          # 数据集类型。
          volumeType: hostPath
          # 需要根据节点上的磁盘数量进行调整。
          path: /mnt/disk1,/mnt/disk2,/mnt/disk3,/mnt/disk4,/mnt/disk5,/mnt/disk6,/mnt/disk7,/mnt/disk8
          # 单个Worker能提供的缓存容量。
          quotaList: 5500Gi,5500Gi,5500Gi,5500Gi,5500Gi,5500Gi,5500Gi,5500Gi
          high: "0.99"
          low: "0.95"
      worker:
        resources:
          requests:
            cpu: 14
            memory: 56Gi
          limits:
            cpu: 14
            memory: 56Gi

    其中各个字段说明如下:

    • replicas:创建JindoFS集群的Worker数量。

    • mediumtype:缓存类型。

    • path:存储路径。

    • quota:缓存系统最大容量。

    • high:存储容量上限大小。

    • low:存储容量下限大小。

  1. 执行如下命令创建JindoRuntime资源。

    kubectl create -f spark-fluid-jindoruntime.yaml

    预期输出:

    jindoruntime.data.fluid.io/spark created
  2. 执行如下命令,查看JindoRuntime部署状态。

    kubectl get -n spark jindoruntime spark

    预期输出:

    NAME    MASTER PHASE   WORKER PHASE   FUSE PHASE   AGE
    spark   Ready          Ready          Ready        2m28s

    可以看到FUSE PHASE状态为Ready,表示JindoRuntime部署成功。

  3. 执行如下命令,再次查看Dataset部署状态:

    kubectl get -n spark dataset spark -o wide

    预期输出:

    NAME    UFS TOTAL SIZE   CACHED   CACHE CAPACITY   CACHED PERCENTAGE   PHASE   HCFS URL                             TOTAL FILES     CACHE HIT RATIO   AGE
    spark   [Calculating]    0.00B    128.91TiB                            Bound   spark-jindofs-master-0.spark:19434   [Calculating]                     2m5

    可以看到Dataset状态已经变为Bound,表示Dataset已经部署成功。

(可选)步骤四:进行数据预热

由于首次访问无法命中数据缓存,Fluid提供了DataLoad缓存预热操作,以提升首次数据访问的效率。数据预热通过预先将数据加载到缓存中,提高数据访问速度,从而优化数据处理效率和系统性能。

  1. 根据如下内容创建Dataload清单文件并保存为spark-fluid-dataload.yaml

    apiVersion: data.fluid.io/v1alpha1
    kind: DataLoad
    metadata:
      name: spark
      namespace: spark
    spec:
      dataset:
        name: spark
        namespace: spark
      loadMetadata: true
  2. 执行如下命令,创建Dataload资源。

    kubectl create -f spark-fluid-dataload.yaml

    预期输出:

    dataload.data.fluid.io/spark created
  3. 执行如下命令,观察数据预热进度。

    kubectl get -n spark dataload spark -w

    预期输出:

    NAME    DATASET   PHASE      AGE     DURATION
    spark   spark     Executing   20s   Unfinished
    spark   spark     Complete   9m31s   8m37s

    可以看到这次数据预热总共耗时8m37s

  4. 执行如下命令,再次查看Dataset状态。

    kubectl get -n spark dataset spark -o wide

    预期输出:

    NAME    UFS TOTAL SIZE   CACHED      CACHE CAPACITY   CACHED PERCENTAGE   PHASE   HCFS URL                             TOTAL FILES     CACHE HIT RATIO   AGE
    spark   0.00B            326.85GiB   128.91TiB        0.0%                Bound   spark-jindofs-master-0.spark:19434   [Calculating]                     19m

    在前一次查看Dataset状态时,缓存的数据量为0.00B,经过数据预热之后,缓存的数据量已经变为326.85GiB

步骤五:运行示例Spark作业

方式一:使用Posix文件系统接口

  1. 根据如下内容创建SparkApplication清单文件并保存为spark-pagerank-fluid-posix.yaml

    apiVersion: sparkoperator.k8s.io/v1beta2
    kind: SparkApplication
    metadata:
      name: spark-pagerank-fluid-posix
      namespace: spark
    spec:
      type: Scala
      mode: cluster
      image: spark:3.5.4
      mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.4.jar
      mainClass: org.apache.spark.examples.SparkPageRank
      arguments:
      # 用file://格式访问本地文件。
      - file:///mnt/fluid/data/pagerank_dataset.txt
      - "10"
      sparkVersion: 3.5.4
      driver:
        cores: 1
        coreLimit: 1200m
        memory: 512m
        volumeMounts:
        # 将dataset对应的PVC挂载至/mnt/fluid路径。
        - name: spark
          mountPath: /mnt/fluid
        serviceAccount: spark-operator-spark
      executor:
        instances: 2
        cores: 1
        coreLimit: "1"
        memory: 4g
        volumeMounts:
        # 将dataset对应的PVC挂载至/mnt/fluid路径。
        - name: spark
          mountPath: /mnt/fluid
      volumes:
      #  添加Fluid创建出的与dataset同名的PVC。
      - name: spark
        persistentVolumeClaim:
          claimName: spark
      restartPolicy:
        type: Never
    说明

    上述作业中使用到的Spark镜像来自于社区,如果您遇到网络原因导致无法拉取镜像,请同步社区镜像或自行构建镜像并推送到您自己的镜像仓库中。

  2. 执行如下命令,提交Spark作业。

    kubectl create -f spark-pagerank-fluid-posix.yaml

    预期输出:

    sparkapplication.sparkoperator.k8s.io/spark-pagerank-fluid-posix created
  3. 执行如下命令,查看Spark作业状态:

    kubectl get -n spark sparkapplication spark-pagerank-fluid-posix -w

    预期输出:

    NAME                         STATUS    ATTEMPTS   START                  FINISH       AGE
    spark-pagerank-fluid-posix   RUNNING   1          2025-01-16T11:06:15Z   <no value>   87s
    spark-pagerank-fluid-posix   RUNNING   1          2025-01-16T11:06:15Z   <no value>   102s
    spark-pagerank-fluid-posix   RUNNING   1          2025-01-16T11:06:15Z   <no value>   102s
    spark-pagerank-fluid-posix   SUCCEEDING   1          2025-01-16T11:06:15Z   2025-01-16T11:07:59Z   104s
    spark-pagerank-fluid-posix   COMPLETED    1          2025-01-16T11:06:15Z   2025-01-16T11:07:59Z   104s

    可以看到作业已经成功运行完成。

方式二:使用HCFS文件系统接口

  1. 执行如下命令,查看DatasetHCFS访问URL。

    kubectl get -n spark dataset spark -o wide

    预期输出:

    NAME    UFS TOTAL SIZE   CACHED      CACHE CAPACITY   CACHED PERCENTAGE   PHASE   HCFS URL                             TOTAL FILES     CACHE HIT RATIO   AGE
    spark   0.00B            326.85GiB   128.91TiB        0.0%                Bound   spark-jindofs-master-0.spark:19434   [Calculating]                     30m

    可以看到,Dataset 的 HCFS URL 为 spark-jindofs-master-0.spark:19434,在配置 Spark 作业时需要将参数 fs.jindofsx.namespace.rpc.address 配置成该值。

  2. 根据如下内容创建SparkApplication清单文件并保存为spark-pagerank-fluid-hcfs.yaml

    apiVersion: sparkoperator.k8s.io/v1beta2
    kind: SparkApplication
    metadata:
      name: spark-pagerank-fluid-hcfs
      namespace: spark
    spec:
      type: Scala
      mode: cluster
      # 需将 <SPARK_IMAGE> 替换成您自己的Spark镜像,该镜像中需要包含JindoSDK依赖。
      image: <SPARK_IMAGE>
      mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.4.jar
      mainClass: org.apache.spark.examples.SparkPageRank
      arguments:
      # 从以下三种方式中选择一种,需将<OSS_BUCKET>替换成您的 OSS 存储桶名称。
      # 方式一:使用 oss:// 格式访问 OSS 数据。
      - oss://<OSS_BUCKET>/data/pagerank_dataset.txt
      # 方式二:使用 s3:// 格式访问OSS数据。
      # - s3://<OSS_BUCKET>/data/pagerank_dataset.txt
      # 方式三:使用 s3a:// 格式访问OSS数据。
      # - s3a://<OSS_BUCKET>/data/pagerank_dataset.txt
      # 迭代次数
      - "10"
      sparkVersion: 3.5.4
      hadoopConf:
        #===================
        # 访问 OSS 相关配置
        #===================
        # 支持使用oss://格式访问OSS数据。
        fs.oss.impl: com.aliyun.jindodata.oss.JindoOssFileSystem
        # OSS访问端点,需将<OSS_ENDPOINT>替换成您的OSS访问端点。
        # 例如,北京地域OSS内网访问端点为oss-cn-beijing-internal.aliyuncs.com。
        fs.oss.endpoint: <OSS_ENDPOINT>
        # 从环境变量中读取OSS访问凭据。
        fs.oss.credentials.provider: com.aliyun.jindodata.oss.auth.EnvironmentVariableCredentialsProvider
    
        # 支持使用s3://格式访问OSS数据。
        fs.s3.impl: com.aliyun.jindodata.s3.JindoS3FileSystem
        # OSS 访问端点,需将<OSS_ENDPOINT>替换成您的OSS访问端点。
        # 例如,北京地域OSS内网访问端点为oss-cn-beijing-internal.aliyuncs.com。
        fs.s3.endpoint: <OSS_ENDPOINT>
        # 从环境变量中读取OSS访问凭据。
        fs.s3.credentials.provider: com.aliyun.jindodata.oss.auth.EnvironmentVariableCredentialsProvider
    
        # 支持使用s3a://格式访问OSS数据。
        fs.s3a.impl: com.aliyun.jindodata.s3.JindoS3FileSystem
        # OSS 访问端点,需将<OSS_ENDPOINT> 替换成您的 OSS 访问端点。
        # 例如,北京地域OSS内网访问端点为oss-cn-beijing-internal.aliyuncs.com。
        fs.s3a.endpoint: <OSS_ENDPOINT>
        # 从环境变量中读取OSS访问凭据。
        fs.s3a.credentials.provider: com.aliyun.jindodata.oss.auth.EnvironmentVariableCredentialsProvider
    
        #===================
        # JindoFS 相关配置
        #===================
        fs.xengine: jindofsx
        # DatasetHCFS URL。
        fs.jindofsx.namespace.rpc.address: spark-jindofs-master-0.spark:19434
        fs.jindofsx.data.cache.enable: "true"
      driver:
        cores: 1
        coreLimit: 1200m
        memory: 512m
        envFrom:
        - secretRef:
            name: spark-oss-secret
        serviceAccount: spark-operator-spark
      executor:
        instances: 2
        cores: 2
        coreLimit: "2"
        memory: 8g
        envFrom:
        - secretRef:
            name: spark-oss-secret
      restartPolicy:
        type: Never
    说明

    上述示例作业中使用到的Spark镜像需要包含JindoSDK依赖,您可以参考如下Dockerfile自行构建镜像并推送到自己的镜像仓库中:

    ARG SPARK_IMAGE=spark:3.5.4
    
    FROM ${SPARK_IMAGE}
    
    # Add dependency for JindoSDK support
    ADD --chown=spark:spark --chmod=644 https://jindodata-binary.oss-cn-shanghai.aliyuncs.com/mvn-repo/com/aliyun/jindodata/jindo-core/6.4.0/jindo-core-6.4.0.jar ${SPARK_HOME}/jars
    ADD --chown=spark:spark --chmod=644 https://jindodata-binary.oss-cn-shanghai.aliyuncs.com/mvn-repo/com/aliyun/jindodata/jindo-sdk/6.4.0/jindo-sdk-6.4.0.jar ${SPARK_HOME}/jars
  3. 执行如下命令提交Spark作业。

    kubectl create -f spark-pagerank-fluid-hcfs.yaml

    预期输出:

    sparkapplication.sparkoperator.k8s.io/spark-pagerank-fluid-hcfs create
  4. 执行如下命令查看Spark作业状态。

    kubectl get -n spark sparkapplication spark-pagerank-fluid-hcfs -w

    预期输出:

    NAME                        STATUS    ATTEMPTS   START                  FINISH       AGE
    spark-pagerank-fluid-hcfs   RUNNING   1          2025-01-16T11:21:16Z   <no value>   9s
    spark-pagerank-fluid-hcfs   RUNNING   1          2025-01-16T11:21:16Z   <no value>   15s
    spark-pagerank-fluid-hcfs   RUNNING   1          2025-01-16T11:21:16Z   <no value>   77s
    spark-pagerank-fluid-hcfs   RUNNING   1          2025-01-16T11:21:16Z   <no value>   77s
    spark-pagerank-fluid-hcfs   SUCCEEDING   1          2025-01-16T11:21:16Z   2025-01-16T11:22:34Z   78s
    spark-pagerank-fluid-hcfs   COMPLETED    1          2025-01-16T11:21:16Z   2025-01-16T11:22:34Z   78s

(可选)步骤六:环境清理

如果体验教程后无需使用相关资源,请参见以下操作释放创建的测试资源。

kubectl delete -f spark-pagerank-fluid-posix.yaml
kubectl delete -f spark-pagerank-fluid-hcfs.yaml
kubectl delete -f spark-fluid-dataload.yaml
kubectl delete -f spark-fluid-jindoruntime.yaml
kubectl delete -f spark-fluid-dataset.yaml
kubectl delete -f fluid-oss-secret.yaml