Spark作业读写OSS数据

本文以Spark自带的PageRank作业为例,介绍如何在ACK集群中运行Spark作业,并配置读写位于阿里云OSS(对象存储服务)中的数据。

前提条件

流程概述

本文将引导您完成如下步骤,帮助您了解如何在ACK集群中运行Spark作业并配置读写OSS数据。

  1. 准备测试数据并上传至OSS:生成用于PageRank的测试数据集并将其上传至OSS。

  2. 构建Spark容器镜像:构建包含了访问OSS相关Jar包依赖的Spark容器镜像。

  3. 创建Secret存储OSS访问凭据:为Spark作业创建指定的OSS访问凭据,以确保安全访问OSS。

  4. 提交示例Spark作业:创建并提交一个Spark作业的配置文件,实现对OSS数据的读写。

  5. (可选)环境清理:在完成测试后,清理无需使用的Spark作业和资源,避免产生预期外的费用。

步骤一:准备测试数据并上传至OSS

首先,您需生成测试数据集并将其上传到指定的OSS中,便于后续的Spark作业使用。以下是生成测试数据集的脚本和上传OSS的操作步骤。

  1. 根据以下内容创建名为generate_pagerank_dataset.sh的脚本,用于生成测试数据集。

    #!/bin/bash
    
    # 检查参数数量
    if [ "$#" -ne 2 ]; then
        echo "Usage: $0 M N"
        echo "M: Number of web pages"
        echo "N: Number of records to generate"
        exit 1
    fi
    
    M=$1
    N=$2
    
    # 检查 M 和 N 是否为正整数
    if ! [[ "$M" =~ ^[0-9]+$ ]] || ! [[ "$N" =~ ^[0-9]+$ ]]; then
        echo "Both M and N must be positive integers."
        exit 1
    fi
    
    # 生成数据集
    
    for ((i=1; i<=$N; i++)); do
        # 保证源页面和目标页面不相同
        while true; do
            src=$((RANDOM % M + 1))
            dst=$((RANDOM % M + 1))
            if [ "$src" -ne "$dst" ]; then
                echo "$src $dst"
                break
            fi
        done
    done
  2. 执行如下命令,生成测试数据集。

    M=100000    # 网页数量
    
    N=10000000  # 记录数量
    
    # 随机生成数据集并保存至 pagerank_dataset.txt
    bash generate_pagerank_dataset.sh $M $N > pagerank_dataset.txt
  3. 执行如下命令,将生成的数据集上传至OSS Bucket中的 data/ 路径下:

    ossutil cp pagerank_dataset.txt oss://<BUCKET_NAME>/data/

步骤二:构建Spark容器镜像

为了在Spark作业中访问数据,首先需要构建一个包含了访问OSS相关Jar包依赖的容器镜像。您可以选择使用Hadoop OSS SDK或JindoSDK来访问OSS,本文演示用的容器镜像根据如下示例Dockerfile构建。关于容器镜像服务构建镜像请参见使用企业版实例构建镜像

说明
  • 本文涉及到的容器镜像仅用于演示,不建议在生产环境中使用。

  • 您需要根据使用的Spark版本,选择相应的Hadoop OSS SDK版本或者Jindo SDK版本。

使用Hadoop OSS SDK

以Spark 3.5.2版本和Hadoop OSS SDK 3.3.4版本为例,创建如下示例Dockerfile文件。

ARG SPARK_IMAGE=registry-cn-hangzhou.ack.aliyuncs.com/dev/spark:3.5.2

FROM ${SPARK_IMAGE}

# Add dependency for Hadoop Aliyun OSS support
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aliyun/3.3.4/hadoop-aliyun-3.3.4.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/com/aliyun/oss/aliyun-sdk-oss/3.17.4/aliyun-sdk-oss-3.17.4.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/jdom/jdom2/2.0.6.1/jdom2-2.0.6.1.jar ${SPARK_HOME}/jars

使用JindoSDK

以Spark 3.5.2版本和JindoSDK 6.4.0版本为例,创建如下示例Dockerfile文件。

ARG SPARK_IMAGE=registry-cn-hangzhou.ack.aliyuncs.com/dev/spark:3.5.2

FROM ${SPARK_IMAGE}

# Add dependency for JindoSDK support
ADD --chown=spark:spark --chmod=644 https://jindodata-binary.oss-cn-shanghai.aliyuncs.com/mvn-repo/com/aliyun/jindodata/jindo-core/6.4.0/jindo-core-6.4.0.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://jindodata-binary.oss-cn-shanghai.aliyuncs.com/mvn-repo/com/aliyun/jindodata/jindo-sdk/6.4.0/jindo-sdk-6.4.0.jar ${SPARK_HOME}/jars

步骤三:创建Secret存储OSS访问凭据

在Spark作业中访问OSS数据时,需要配置OSS访问凭证,为了避免在作业中硬编码访问凭据,需要创建一个Secret用于存储敏感信息,并以环境变量的形式注入到容器中。

  1. 根据以下内容创建Secret清单文件,并保存为spark-oss-secret.yaml。

    apiVersion: v1
    kind: Secret
    metadata:
      name: spark-oss-secret
      namespace: default
    stringData:
      OSS_ACCESS_KEY_ID: <ACCESS_KEY_ID>            # 阿里云AccessKey ID。
      OSS_ACCESS_KEY_SECRET: <ACCESS_KEY_SECRET>    # 阿里云AccessKey Secret。
  2. 执行如下命令创建Secret。

    kubectl apply -f spark-oss-secret.yaml

    预期输出如下。

    secret/spark-oss-secret created

步骤四:运行示例Spark作业

在ACK集群上提交Spark作业,实现OSS数据的读写。

使用Hadoop OSS SDK

创建如下SparkApplication清单文件并保存为spark-pagerank.yaml。关于OSS完整的配置参数列表,请参见Hadoop-Aliyun module

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: spark-pagerank
  namespace: default
spec:
  type: Scala
  mode: cluster
  image: registry-cn-hangzhou.ack.aliyuncs.com/ack-demo/spark:3.5.2-oss
  mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.2.jar
  mainClass: org.apache.spark.examples.SparkPageRank
  arguments:
  - oss://<OSS_BUCKET>/data/pagerank_dataset.txt           # 指定输入测试数据集,将<OSS_BUCKET>替换成OSS Buckt名称。
  - "10"                                                   # 迭代次数。
  sparkVersion: 3.5.2
  hadoopConf:
    fs.AbstractFileSystem.oss.impl: org.apache.hadoop.fs.aliyun.oss.OSS
    fs.oss.impl: org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem
    # OSS 访问端点
    fs.oss.endpoint: <OSS_ENDPOINT>                        # OSS访问端点。例如北京地区OSS的内网访问地址为oss-cn-beijing-internal.aliyuncs.com。 
    fs.oss.credentials.provider: com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider
  driver:
    cores: 1
    coreLimit: 1200m
    memory: 512m
    serviceAccount: spark-operator-spark
    envFrom:
    - secretRef:
        name: spark-oss-secret               #指定访问OSS的安全凭据。
  executor:
    instances: 2
    cores: 1
    coreLimit: "2"
    memory: 8g
    envFrom:
    - secretRef: 
        name: spark-oss-secret               #指定访问OSS的安全凭据。
  restartPolicy:
    type: Never

使用JindoSDK

创建如下SparkApplication清单文件并保存为spark-pagerank.yaml

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: spark-pagerank
  namespace: default
spec:
  type: Scala
  mode: cluster
  image: registry-cn-hangzhou.ack.aliyuncs.com/ack-demo/spark:3.5.2-jindosdk
  mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.2.jar
  mainClass: org.apache.spark.examples.SparkPageRank
  arguments:
  - oss://<OSS_BUCKET>/data/pagerank_dataset.txt    # 指定输入测试数据集,将<OSS_BUCKET>替换成OSS Buckt名称。
  - "10"                                            # 迭代次数。
  sparkVersion: 3.5.2
  hadoopConf:
    fs.AbstractFileSystem.oss.impl: com.aliyun.jindodata.oss.JindoOSS
    fs.oss.impl: com.aliyun.jindodata.oss.JindoOssFileSystem
    fs.oss.endpoint: <OSS_ENDPOINT>                 # OSS访问端点。例如北京地区OSS的内网访问地址为oss-cn-beijing-internal.aliyuncs.com。
    fs.oss.credentials.provider: com.aliyun.jindodata.oss.auth.EnvironmentVariableCredentialsProvider
  driver:
    cores: 1
    coreLimit: 1200m
    memory: 512m
    serviceAccount: spark-operator-spark
    envFrom:
    - secretRef:
        name: spark-oss-secret                    #指定访问OSS的安全凭据。
  executor: 
    instances: 2
    cores: 1
    coreLimit: "2"
    memory: 8g
    envFrom:
    - secretRef:
        name: spark-oss-secret                    #指定访问OSS的安全凭据。
  restartPolicy:
    type: Never
  1. 执行如下命令提交Spark作业。

    kubectl apply -f spark-pagerank.yaml
  2. 执行如下命令查看Spark作业执行状态并等待作业执行结果。

    kubectl get sparkapplications spark-pagerank

    预期输出。

    NAME             STATUS      ATTEMPTS   START                  FINISH                 AGE
    spark-pagerank   COMPLETED   1          2024-10-09T12:54:25Z   2024-10-09T12:55:46Z   90s
  3. 执行如下命令查看Driver pod日志输出的最后20行。

    kubectl logs spark-pagerank-driver --tail=20

    预期输出。

    使用Hadoop OSS SDK

    从日志中可以看到,Spark作业已经成功运行结束。

    30024 has rank:  1.0709659078941967 .
    21390 has rank:  0.9933356174074005 .
    28500 has rank:  1.0404018494028928 .
    2137 has rank:  0.9931000490520374 .
    3406 has rank:  0.9562543137167121 .
    20904 has rank:  0.8827028621652337 .
    25604 has rank:  1.0270134041934191 .
    24/10/09 12:48:36 INFO SparkUI: Stopped Spark web UI at http://spark-pagerank-dd0d4d927151c9d0-driver-svc.default.svc:4040
    24/10/09 12:48:36 INFO KubernetesClusterSchedulerBackend: Shutting down all executors
    24/10/09 12:48:36 INFO KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asking each executor to shut down
    24/10/09 12:48:36 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed.
    24/10/09 12:48:36 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
    24/10/09 12:48:36 INFO MemoryStore: MemoryStore cleared
    24/10/09 12:48:36 INFO BlockManager: BlockManager stopped
    24/10/09 12:48:36 INFO BlockManagerMaster: BlockManagerMaster stopped
    24/10/09 12:48:36 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
    24/10/09 12:48:36 INFO SparkContext: Successfully stopped SparkContext
    24/10/09 12:48:36 INFO ShutdownHookManager: Shutdown hook called
    24/10/09 12:48:36 INFO ShutdownHookManager: Deleting directory /tmp/spark-e8b8c2ab-c916-4f84-b60f-f54c0de3a7f0
    24/10/09 12:48:36 INFO ShutdownHookManager: Deleting directory /var/data/spark-c5917d98-06fb-46fe-85bc-199b839cb885/spark-23e2c2ae-4754-43ae-854d-2752eb83b2c5

    使用JindoSDK

    从日志中可以看到,Spark作业已经成功运行结束。

    21390 has rank:  0.9933356174074005 .
    28500 has rank:  1.0404018494028928 .
    2137 has rank:  0.9931000490520374 .
    3406 has rank:  0.9562543137167121 .
    20904 has rank:  0.8827028621652337 .
    25604 has rank:  1.0270134041934191 .
    24/10/09 12:55:44 INFO SparkContext: SparkContext is stopping with exitCode 0.
    24/10/09 12:55:44 INFO SparkUI: Stopped Spark web UI at http://spark-pagerank-6a5e3d9271584856-driver-svc.default.svc:4040
    24/10/09 12:55:44 INFO KubernetesClusterSchedulerBackend: Shutting down all executors
    24/10/09 12:55:44 INFO KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asking each executor to shut down
    24/10/09 12:55:44 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed.
    24/10/09 12:55:45 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
    24/10/09 12:55:45 INFO MemoryStore: MemoryStore cleared
    24/10/09 12:55:45 INFO BlockManager: BlockManager stopped
    24/10/09 12:55:45 INFO BlockManagerMaster: BlockManagerMaster stopped
    24/10/09 12:55:45 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
    24/10/09 12:55:45 INFO SparkContext: Successfully stopped SparkContext
    24/10/09 12:55:45 INFO ShutdownHookManager: Shutdown hook called
    24/10/09 12:55:45 INFO ShutdownHookManager: Deleting directory /var/data/spark-87e8406e-06a7-4b4a-b18f-2193da299d35/spark-093a1b71-121a-4367-9d22-ad4e397c9815
    24/10/09 12:55:45 INFO ShutdownHookManager: Deleting directory /tmp/spark-723e2039-a493-49e8-b86d-fff5fd1bb168

(可选)步骤五:环境清理

如果您已体验完本教程,相关资源如不再需要,可以通过执行以下命令进行删除。

执行如下命令删除Spark作业。

kubectl delete -f spark-pagerank.yaml

执行如下命令删除Secret资源。

kubectl delete -f oss-secret.yaml