本文以Spark自带的PageRank作业为例,介绍如何在ACK集群中运行Spark作业,并配置读写位于阿里云OSS(对象存储服务)中的数据。
前提条件
- 已创建1.24及以上的ACK托管集群Pro版、ACK Serverless集群Pro版。相关操作,请参见创建ACK托管集群、创建ACK Serverless集群、手动升级集群。 
- 已部署ack-spark-operator组件,请参见部署ack-spark-operator组件。 
- 已通过kubectl工具连接集群。具体操作,请参见获取集群KubeConfig并通过kubectl工具连接集群。 
- 已创建OSS存储空间。具体操作请参见创建存储空间。 
- 已安装ossutil并配置ossutil。关于ossutil命令参考请参见命令行工具ossutil命令参考。 
流程概述
本文将引导您完成如下步骤,帮助您了解如何在ACK集群中运行Spark作业并配置读写OSS数据。
- 准备测试数据并上传至OSS:生成用于PageRank的测试数据集并将其上传至OSS。 
- 构建Spark容器镜像:构建包含了访问OSS相关Jar包依赖的Spark容器镜像。 
- 创建Secret存储OSS访问凭据:为Spark作业创建指定的OSS访问凭据,以确保安全访问OSS。 
- 提交示例Spark作业:创建并提交一个Spark作业的配置文件,实现对OSS数据的读写。 
- (可选)环境清理:在完成测试后,清理无需使用的Spark作业和资源,避免产生预期外的费用。 
步骤一:准备测试数据并上传至OSS
首先,您需生成测试数据集并将其上传到指定的OSS中,便于后续的Spark作业使用。以下是生成测试数据集的脚本和上传OSS的操作步骤。
- 根据以下内容创建名为 - generate_pagerank_dataset.sh的脚本,用于生成测试数据集。- #!/bin/bash # 检查参数数量 if [ "$#" -ne 2 ]; then echo "Usage: $0 M N" echo "M: Number of web pages" echo "N: Number of records to generate" exit 1 fi M=$1 N=$2 # 检查 M 和 N 是否为正整数 if ! [[ "$M" =~ ^[0-9]+$ ]] || ! [[ "$N" =~ ^[0-9]+$ ]]; then echo "Both M and N must be positive integers." exit 1 fi # 生成数据集 for ((i=1; i<=$N; i++)); do # 保证源页面和目标页面不相同 while true; do src=$((RANDOM % M + 1)) dst=$((RANDOM % M + 1)) if [ "$src" -ne "$dst" ]; then echo "$src $dst" break fi done done
- 执行如下命令,生成测试数据集。 - M=100000 # 网页数量 N=10000000 # 记录数量 # 随机生成数据集并保存至 pagerank_dataset.txt bash generate_pagerank_dataset.sh $M $N > pagerank_dataset.txt
- 执行如下命令,将生成的数据集上传至OSS Bucket中的 - data/路径下:- ossutil cp pagerank_dataset.txt oss://<BUCKET_NAME>/data/
步骤二:构建Spark容器镜像
为了在Spark作业中访问数据,首先需要构建一个包含了访问OSS相关Jar包依赖的容器镜像。您可以选择使用Hadoop OSS SDK、Hadoop S3 SDK或JindoSDK来访问OSS,本文演示用的容器镜像根据如下示例Dockerfile构建。关于容器镜像服务构建镜像请参见使用企业版实例构建镜像。
- 示例Dockerfile文件中使用的Spark基础镜像来自于开源社区,您可按需求自行替换成自己的Spark镜像。 
- 您需要根据使用的Spark版本,选择相应的Hadoop OSS SDK、Hadoop S3 SDK或者JindoSDK版本。 
使用Hadoop OSS SDK
以Spark 3.5.5版本和Hadoop OSS SDK 3.3.4版本为例,创建如下示例Dockerfile文件。
ARG SPARK_IMAGE=spark:3.5.5
FROM ${SPARK_IMAGE}
# Add dependency for Hadoop Aliyun OSS support
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aliyun/3.3.4/hadoop-aliyun-3.3.4.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/com/aliyun/oss/aliyun-sdk-oss/3.17.4/aliyun-sdk-oss-3.17.4.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/jdom/jdom2/2.0.6.1/jdom2-2.0.6.1.jar ${SPARK_HOME}/jars使用Hadoop S3 SDK
以Spark 3.5.5版本和Hadoop S3 SDK 3.3.4版本为例,创建如下示例Dockerfile文件。
ARG SPARK_IMAGE=spark:3.5.5
FROM ${SPARK_IMAGE}
# Add dependency for Hadoop AWS S3 support
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.4/hadoop-aws-3.3.4.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.367/aws-java-sdk-bundle-1.12.367.jar ${SPARK_HOME}/jars使用JindoSDK
以Spark 3.5.5版本和JindoSDK 6.8.0版本为例,创建如下示例Dockerfile文件。
ARG SPARK_IMAGE=spark:3.5.5
FROM ${SPARK_IMAGE}
# Add dependency for JindoSDK support
ADD --chown=spark:spark --chmod=644 https://jindodata-binary.oss-cn-shanghai.aliyuncs.com/mvn-repo/com/aliyun/jindodata/jindo-core/6.8.0/jindo-core-6.8.0.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://jindodata-binary.oss-cn-shanghai.aliyuncs.com/mvn-repo/com/aliyun/jindodata/jindo-sdk/6.8.0/jindo-sdk-6.8.0.jar ${SPARK_HOME}/jars步骤三:创建Secret存储OSS访问凭据
在Spark作业中访问OSS数据时,需要配置OSS访问凭证,为了避免在作业中硬编码访问凭据,需要创建一个Secret用于存储敏感信息,并以环境变量的形式注入到容器中。
使用Hadoop OSS SDK
- 根据以下内容创建Secret清单文件,并保存为 - spark-oss-secret.yaml。- apiVersion: v1 kind: Secret metadata: name: spark-oss-secret namespace: default stringData: # 需将 <ACCESS_KEY_ID> 替换成阿里云 AccessKey ID。 OSS_ACCESS_KEY_ID: <ACCESS_KEY_ID> # 需将 <ACCESS_KEY_SECRET> 替换成阿里云 AccessKey Secret。 OSS_ACCESS_KEY_SECRET: <ACCESS_KEY_SECRET>
- 执行如下命令创建Secret。 - kubectl apply -f spark-oss-secret.yaml- 预期输出如下。 - secret/spark-oss-secret created
使用Hadoop S3 SDK
- 根据以下内容创建Secret清单文件,并保存为 - spark-s3-secret.yaml。- apiVersion: v1 kind: Secret metadata: name: spark-s3-secret namespace: default stringData: # 需将 <ACCESS_KEY_ID> 替换成阿里云 AccessKey ID。 AWS_ACCESS_KEY_ID: <ACCESS_KEY_ID> # 需将 <ACCESS_KEY_SECRET> 替换成阿里云 AccessKey Secret。 AWS_SECRET_ACCESS_KEY: <ACCESS_KEY_SECRET>
- 执行如下命令创建Secret。 - kubectl apply -f spark-s3-secret.yaml- 预期输出如下。 - secret/spark-s3-secret created
使用JindoSDK
- 根据以下内容创建Secret清单文件,并保存为 - spark-oss-secret.yaml。- apiVersion: v1 kind: Secret metadata: name: spark-oss-secret namespace: default stringData: # 需将 <ACCESS_KEY_ID> 替换成阿里云 AccessKey ID。 OSS_ACCESS_KEY_ID: <ACCESS_KEY_ID> # 需将 <ACCESS_KEY_SECRET> 替换成阿里云 AccessKey Secret。 OSS_ACCESS_KEY_SECRET: <ACCESS_KEY_SECRET>
- 执行如下命令创建Secret。 - kubectl apply -f spark-oss-secret.yaml- 预期输出如下。 - secret/spark-oss-secret created
步骤四:运行示例Spark作业
在ACK集群上提交Spark作业,实现OSS数据的读写。
使用Hadoop OSS SDK
创建如下SparkApplication清单文件并保存为spark-pagerank.yaml。关于OSS完整的配置参数列表,请参见Hadoop-Aliyun module。
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: spark-pagerank
  namespace: default
spec:
  type: Scala
  mode: cluster
  # 需将 <SPARK_IMAGE> 替换成步骤二中构建得到的 Spark 容器镜像。
  image: <SPARK_IMAGE>
  mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.5.jar
  mainClass: org.apache.spark.examples.SparkPageRank
  arguments:
  - oss://<OSS_BUCKET>/data/pagerank_dataset.txt           # 指定输入测试数据集,将<OSS_BUCKET>替换成OSS Bucket名称。
  - "10"                                                   # 迭代次数。
  sparkVersion: 3.5.5
  hadoopConf:
    fs.AbstractFileSystem.oss.impl: org.apache.hadoop.fs.aliyun.oss.OSS
    fs.oss.impl: org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem
    # OSS 访问端点,需将 <OSS_ENDPOINT> 替换成您的 OSS 访问端点,
    # 例如,北京地域 OSS 的内网访问端点为 oss-cn-beijing-internal.aliyuncs.com。 
    fs.oss.endpoint: <OSS_ENDPOINT>
    fs.oss.credentials.provider: com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider
  driver:
    cores: 1
    coreLimit: 1200m
    memory: 512m
    envFrom:
    - secretRef:
        name: spark-oss-secret               #指定访问OSS的安全凭据。
    serviceAccount: spark-operator-spark
  executor:
    instances: 2
    cores: 1
    coreLimit: "2"
    memory: 8g
    envFrom:
    - secretRef: 
        name: spark-oss-secret               #指定访问OSS的安全凭据。
  restartPolicy:
    type: Never使用Hadoop S3 SDK
创建如下SparkApplication清单文件并保存为spark-pagerank.yaml。关于S3完整的配置参数列表,请参见Hadoop-AWS module。
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: spark-pagerank
  namespace: default
spec:
  type: Scala
  mode: cluster
  # 需将 <SPARK_IMAGE> 替换成步骤二中构建得到的 Spark 容器镜像。
  image: <SPARK_IMAGE>
  mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.5.jar
  mainClass: org.apache.spark.examples.SparkPageRank
  arguments:
  - s3a://<OSS_BUCKET>/data/pagerank_dataset.txt           # 指定输入测试数据集,将<OSS_BUCKET>替换成OSS Bucket名称。
  - "10"                                                   # 迭代次数。
  sparkVersion: 3.5.5
  hadoopConf:
    fs.s3a.impl: org.apache.hadoop.fs.s3a.S3AFileSystem
    # OSS 访问端点,需将 <OSS_ENDPOINT> 替换成您的 OSS 访问端点,
    # 例如,北京地域 OSS 的内网访问端点为 oss-cn-beijing-internal.aliyuncs.com。
    fs.s3a.endpoint: <OSS_ENDPOINT>
    # OSS 访问端点所在地域,例如北京地域为 cn-beijing。
    fs.s3a.endpoint.region: <OSS_REGION>
  driver:
    cores: 1
    coreLimit: 1200m
    memory: 512m
    envFrom:
    - secretRef:
        name: spark-s3-secret               #指定访问OSS的安全凭据。
    serviceAccount: spark-operator-spark
  executor:
    instances: 2
    cores: 1
    coreLimit: "2"
    memory: 8g
    envFrom:
    - secretRef: 
        name: spark-s3-secret               #指定访问OSS的安全凭据。
  restartPolicy:
    type: Never使用JindoSDK
创建如下SparkApplication清单文件并保存为spark-pagerank.yaml。
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: spark-pagerank
  namespace: default
spec:
  type: Scala
  mode: cluster
  # 需将 <SPARK_IMAGE> 替换成步骤二中构建得到的 Spark 容器镜像。
  image: <SPARK_IMAGE>
  mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.5.jar
  mainClass: org.apache.spark.examples.SparkPageRank
  arguments:
  - oss://<OSS_BUCKET>/data/pagerank_dataset.txt    # 指定输入测试数据集,将<OSS_BUCKET>替换成OSS Bucket名称。
  - "10"                                            # 迭代次数。
  sparkVersion: 3.5.5
  hadoopConf:
    fs.AbstractFileSystem.oss.impl: com.aliyun.jindodata.oss.JindoOSS
    fs.oss.impl: com.aliyun.jindodata.oss.JindoOssFileSystem
    fs.oss.endpoint: <OSS_ENDPOINT>                 # OSS访问端点。例如北京地区OSS的内网访问地址为oss-cn-beijing-internal.aliyuncs.com。
    fs.oss.credentials.provider: com.aliyun.jindodata.oss.auth.EnvironmentVariableCredentialsProvider
  driver:
    cores: 1
    coreLimit: 1200m
    memory: 512m
    serviceAccount: spark-operator-spark
    envFrom:
    - secretRef:
        name: spark-oss-secret                    #指定访问OSS的安全凭据。
  executor: 
    instances: 2
    cores: 1
    coreLimit: "2"
    memory: 8g
    envFrom:
    - secretRef:
        name: spark-oss-secret                    #指定访问OSS的安全凭据。
  restartPolicy:
    type: Never- 执行如下命令提交Spark作业。 - kubectl apply -f spark-pagerank.yaml
- 执行如下命令查看Spark作业执行状态并等待作业执行结果。 - kubectl get sparkapplications spark-pagerank- 预期输出。 - NAME STATUS ATTEMPTS START FINISH AGE spark-pagerank COMPLETED 1 2024-10-09T12:54:25Z 2024-10-09T12:55:46Z 90s
- 执行如下命令查看Driver pod日志输出的最后20行。 - kubectl logs spark-pagerank-driver --tail=20- 预期输出。 - 使用Hadoop OSS SDK- 从日志中可以看到,Spark作业已经成功运行结束。 - 30024 has rank: 1.0709659078941967 . 21390 has rank: 0.9933356174074005 . 28500 has rank: 1.0404018494028928 . 2137 has rank: 0.9931000490520374 . 3406 has rank: 0.9562543137167121 . 20904 has rank: 0.8827028621652337 . 25604 has rank: 1.0270134041934191 . 24/10/09 12:48:36 INFO SparkUI: Stopped Spark web UI at http://spark-pagerank-dd0d4d927151c9d0-driver-svc.default.svc:4040 24/10/09 12:48:36 INFO KubernetesClusterSchedulerBackend: Shutting down all executors 24/10/09 12:48:36 INFO KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asking each executor to shut down 24/10/09 12:48:36 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed. 24/10/09 12:48:36 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 24/10/09 12:48:36 INFO MemoryStore: MemoryStore cleared 24/10/09 12:48:36 INFO BlockManager: BlockManager stopped 24/10/09 12:48:36 INFO BlockManagerMaster: BlockManagerMaster stopped 24/10/09 12:48:36 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 24/10/09 12:48:36 INFO SparkContext: Successfully stopped SparkContext 24/10/09 12:48:36 INFO ShutdownHookManager: Shutdown hook called 24/10/09 12:48:36 INFO ShutdownHookManager: Deleting directory /tmp/spark-e8b8c2ab-c916-4f84-b60f-f54c0de3a7f0 24/10/09 12:48:36 INFO ShutdownHookManager: Deleting directory /var/data/spark-c5917d98-06fb-46fe-85bc-199b839cb885/spark-23e2c2ae-4754-43ae-854d-2752eb83b2c5- 使用Hadoop S3 SDK- 3406 has rank: 0.9562543137167121 . 20904 has rank: 0.8827028621652337 . 25604 has rank: 1.0270134041934191 . 25/04/07 03:54:11 INFO SparkContext: SparkContext is stopping with exitCode 0. 25/04/07 03:54:11 INFO SparkUI: Stopped Spark web UI at http://spark-pagerank-0f7dec960e615617-driver-svc.spark.svc:4040 25/04/07 03:54:11 INFO KubernetesClusterSchedulerBackend: Shutting down all executors 25/04/07 03:54:11 INFO KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asking each executor to shut down 25/04/07 03:54:11 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed. 25/04/07 03:54:11 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 25/04/07 03:54:11 INFO MemoryStore: MemoryStore cleared 25/04/07 03:54:11 INFO BlockManager: BlockManager stopped 25/04/07 03:54:11 INFO BlockManagerMaster: BlockManagerMaster stopped 25/04/07 03:54:11 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 25/04/07 03:54:11 INFO SparkContext: Successfully stopped SparkContext 25/04/07 03:54:11 INFO ShutdownHookManager: Shutdown hook called 25/04/07 03:54:11 INFO ShutdownHookManager: Deleting directory /var/data/spark-20d425bb-f442-4b0a-83e2-5a0202959a54/spark-ff5bbf08-4343-4a7a-9ce0-3f7c127cf4a9 25/04/07 03:54:11 INFO ShutdownHookManager: Deleting directory /tmp/spark-a421839a-07af-49c0-b637-f15f76c3e752 25/04/07 03:54:11 INFO MetricsSystemImpl: Stopping s3a-file-system metrics system... 25/04/07 03:54:11 INFO MetricsSystemImpl: s3a-file-system metrics system stopped. 25/04/07 03:54:11 INFO MetricsSystemImpl: s3a-file-system metrics system shutdown complete.- 使用JindoSDK- 从日志中可以看到,Spark作业已经成功运行结束。 - 21390 has rank: 0.9933356174074005 . 28500 has rank: 1.0404018494028928 . 2137 has rank: 0.9931000490520374 . 3406 has rank: 0.9562543137167121 . 20904 has rank: 0.8827028621652337 . 25604 has rank: 1.0270134041934191 . 24/10/09 12:55:44 INFO SparkContext: SparkContext is stopping with exitCode 0. 24/10/09 12:55:44 INFO SparkUI: Stopped Spark web UI at http://spark-pagerank-6a5e3d9271584856-driver-svc.default.svc:4040 24/10/09 12:55:44 INFO KubernetesClusterSchedulerBackend: Shutting down all executors 24/10/09 12:55:44 INFO KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asking each executor to shut down 24/10/09 12:55:44 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed. 24/10/09 12:55:45 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 24/10/09 12:55:45 INFO MemoryStore: MemoryStore cleared 24/10/09 12:55:45 INFO BlockManager: BlockManager stopped 24/10/09 12:55:45 INFO BlockManagerMaster: BlockManagerMaster stopped 24/10/09 12:55:45 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 24/10/09 12:55:45 INFO SparkContext: Successfully stopped SparkContext 24/10/09 12:55:45 INFO ShutdownHookManager: Shutdown hook called 24/10/09 12:55:45 INFO ShutdownHookManager: Deleting directory /var/data/spark-87e8406e-06a7-4b4a-b18f-2193da299d35/spark-093a1b71-121a-4367-9d22-ad4e397c9815 24/10/09 12:55:45 INFO ShutdownHookManager: Deleting directory /tmp/spark-723e2039-a493-49e8-b86d-fff5fd1bb168
(可选)步骤五:环境清理
如果您已体验完本教程,相关资源如不再需要,可以通过执行以下命令进行删除。
执行如下命令删除Spark作业。
kubectl delete -f spark-pagerank.yaml执行如下命令删除Secret资源。
使用Hadoop OSS SDK
kubectl delete -f spark-oss-secret.yaml使用Hadoop S3 SDK
kubectl delete -f spark-s3-secret.yaml使用JindoSDK
kubectl delete -f spark-oss-secret.yaml相关文档
- 关于如何使用 Spark History Server 查看 Spark 作业信息,请参见使用Spark History Server查看Spark作业信息。 
- 关于如何使用日志服务收集 Spark 作业日志,请参见使用日志服务收集Spark作业日志。 
- 关于如何使用弹性资源运行 Spark 作业,请参见使用ECI弹性资源运行Spark作业。 
- 关于如何在 Spark 作业中使用 Celeborn 作为 RSS,请参见Spark作业使用Celeborn作为RSS。