本文以Spark自带的PageRank作业为例,介绍如何在ACK集群中运行Spark作业,并配置读写位于阿里云OSS(对象存储服务)中的数据。
前提条件
已创建1.24及以上的ACK集群Pro版、ACK Serverless集群Pro版。相关操作,请参见创建ACK托管集群、创建ACK Serverless集群、手动升级集群。
已部署ack-spark-operator组件,请参见部署ack-spark-operator组件。
已通过kubectl工具连接集群。具体操作,请参见获取集群KubeConfig并通过kubectl工具连接集群。
已创建OSS存储空间。具体操作请参见创建存储空间。
已安装ossutil并配置ossutil。关于ossutil命令参考请参见命令行工具ossutil命令参考。
流程概述
本文将引导您完成如下步骤,帮助您了解如何在ACK集群中运行Spark作业并配置读写OSS数据。
准备测试数据并上传至OSS:生成用于PageRank的测试数据集并将其上传至OSS。
构建Spark容器镜像:构建包含了访问OSS相关Jar包依赖的Spark容器镜像。
创建Secret存储OSS访问凭据:为Spark作业创建指定的OSS访问凭据,以确保安全访问OSS。
提交示例Spark作业:创建并提交一个Spark作业的配置文件,实现对OSS数据的读写。
(可选)环境清理:在完成测试后,清理无需使用的Spark作业和资源,避免产生预期外的费用。
步骤一:准备测试数据并上传至OSS
首先,您需生成测试数据集并将其上传到指定的OSS中,便于后续的Spark作业使用。以下是生成测试数据集的脚本和上传OSS的操作步骤。
根据以下内容创建名为generate_pagerank_dataset.sh的脚本,用于生成测试数据集。
#!/bin/bash # 检查参数数量 if [ "$#" -ne 2 ]; then echo "Usage: $0 M N" echo "M: Number of web pages" echo "N: Number of records to generate" exit 1 fi M=$1 N=$2 # 检查 M 和 N 是否为正整数 if ! [[ "$M" =~ ^[0-9]+$ ]] || ! [[ "$N" =~ ^[0-9]+$ ]]; then echo "Both M and N must be positive integers." exit 1 fi # 生成数据集 for ((i=1; i<=$N; i++)); do # 保证源页面和目标页面不相同 while true; do src=$((RANDOM % M + 1)) dst=$((RANDOM % M + 1)) if [ "$src" -ne "$dst" ]; then echo "$src $dst" break fi done done
执行如下命令,生成测试数据集。
M=100000 # 网页数量 N=10000000 # 记录数量 # 随机生成数据集并保存至 pagerank_dataset.txt bash generate_pagerank_dataset.sh $M $N > pagerank_dataset.txt
执行如下命令,将生成的数据集上传至OSS Bucket中的
data/
路径下:ossutil cp pagerank_dataset.txt oss://<BUCKET_NAME>/data/
步骤二:构建Spark容器镜像
为了在Spark作业中访问数据,首先需要构建一个包含了访问OSS相关Jar包依赖的容器镜像。您可以选择使用Hadoop OSS SDK或JindoSDK来访问OSS,本文演示用的容器镜像根据如下示例Dockerfile构建。关于容器镜像服务构建镜像请参见使用企业版实例构建镜像。
本文涉及到的容器镜像仅用于演示,不建议在生产环境中使用。
您需要根据使用的Spark版本,选择相应的Hadoop OSS SDK版本或者Jindo SDK版本。
使用Hadoop OSS SDK
以Spark 3.5.2版本和Hadoop OSS SDK 3.3.4版本为例,创建如下示例Dockerfile文件。
ARG SPARK_IMAGE=registry-cn-hangzhou.ack.aliyuncs.com/dev/spark:3.5.2
FROM ${SPARK_IMAGE}
# Add dependency for Hadoop Aliyun OSS support
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aliyun/3.3.4/hadoop-aliyun-3.3.4.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/com/aliyun/oss/aliyun-sdk-oss/3.17.4/aliyun-sdk-oss-3.17.4.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/jdom/jdom2/2.0.6.1/jdom2-2.0.6.1.jar ${SPARK_HOME}/jars
使用JindoSDK
以Spark 3.5.2版本和JindoSDK 6.4.0版本为例,创建如下示例Dockerfile文件。
ARG SPARK_IMAGE=registry-cn-hangzhou.ack.aliyuncs.com/dev/spark:3.5.2
FROM ${SPARK_IMAGE}
# Add dependency for JindoSDK support
ADD --chown=spark:spark --chmod=644 https://jindodata-binary.oss-cn-shanghai.aliyuncs.com/mvn-repo/com/aliyun/jindodata/jindo-core/6.4.0/jindo-core-6.4.0.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://jindodata-binary.oss-cn-shanghai.aliyuncs.com/mvn-repo/com/aliyun/jindodata/jindo-sdk/6.4.0/jindo-sdk-6.4.0.jar ${SPARK_HOME}/jars
步骤三:创建Secret存储OSS访问凭据
在Spark作业中访问OSS数据时,需要配置OSS访问凭证,为了避免在作业中硬编码访问凭据,需要创建一个Secret用于存储敏感信息,并以环境变量的形式注入到容器中。
根据以下内容创建Secret清单文件,并保存为spark-oss-secret.yaml。
apiVersion: v1 kind: Secret metadata: name: spark-oss-secret namespace: default stringData: OSS_ACCESS_KEY_ID: <ACCESS_KEY_ID> # 阿里云AccessKey ID。 OSS_ACCESS_KEY_SECRET: <ACCESS_KEY_SECRET> # 阿里云AccessKey Secret。
执行如下命令创建Secret。
kubectl apply -f spark-oss-secret.yaml
预期输出如下。
secret/spark-oss-secret created
步骤四:运行示例Spark作业
在ACK集群上提交Spark作业,实现OSS数据的读写。
使用Hadoop OSS SDK
创建如下SparkApplication清单文件并保存为spark-pagerank.yaml
。关于OSS完整的配置参数列表,请参见Hadoop-Aliyun module。
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: spark-pagerank
namespace: default
spec:
type: Scala
mode: cluster
image: registry-cn-hangzhou.ack.aliyuncs.com/ack-demo/spark:3.5.2-oss
mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.2.jar
mainClass: org.apache.spark.examples.SparkPageRank
arguments:
- oss://<OSS_BUCKET>/data/pagerank_dataset.txt # 指定输入测试数据集,将<OSS_BUCKET>替换成OSS Buckt名称。
- "10" # 迭代次数。
sparkVersion: 3.5.2
hadoopConf:
fs.AbstractFileSystem.oss.impl: org.apache.hadoop.fs.aliyun.oss.OSS
fs.oss.impl: org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem
# OSS 访问端点
fs.oss.endpoint: <OSS_ENDPOINT> # OSS访问端点。例如北京地区OSS的内网访问地址为oss-cn-beijing-internal.aliyuncs.com。
fs.oss.credentials.provider: com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider
driver:
cores: 1
coreLimit: 1200m
memory: 512m
serviceAccount: spark-operator-spark
envFrom:
- secretRef:
name: spark-oss-secret #指定访问OSS的安全凭据。
executor:
instances: 2
cores: 1
coreLimit: "2"
memory: 8g
envFrom:
- secretRef:
name: spark-oss-secret #指定访问OSS的安全凭据。
restartPolicy:
type: Never
使用JindoSDK
创建如下SparkApplication清单文件并保存为spark-pagerank.yaml
。
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: spark-pagerank
namespace: default
spec:
type: Scala
mode: cluster
image: registry-cn-hangzhou.ack.aliyuncs.com/ack-demo/spark:3.5.2-jindosdk
mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.2.jar
mainClass: org.apache.spark.examples.SparkPageRank
arguments:
- oss://<OSS_BUCKET>/data/pagerank_dataset.txt # 指定输入测试数据集,将<OSS_BUCKET>替换成OSS Buckt名称。
- "10" # 迭代次数。
sparkVersion: 3.5.2
hadoopConf:
fs.AbstractFileSystem.oss.impl: com.aliyun.jindodata.oss.JindoOSS
fs.oss.impl: com.aliyun.jindodata.oss.JindoOssFileSystem
fs.oss.endpoint: <OSS_ENDPOINT> # OSS访问端点。例如北京地区OSS的内网访问地址为oss-cn-beijing-internal.aliyuncs.com。
fs.oss.credentials.provider: com.aliyun.jindodata.oss.auth.EnvironmentVariableCredentialsProvider
driver:
cores: 1
coreLimit: 1200m
memory: 512m
serviceAccount: spark-operator-spark
envFrom:
- secretRef:
name: spark-oss-secret #指定访问OSS的安全凭据。
executor:
instances: 2
cores: 1
coreLimit: "2"
memory: 8g
envFrom:
- secretRef:
name: spark-oss-secret #指定访问OSS的安全凭据。
restartPolicy:
type: Never
执行如下命令提交Spark作业。
kubectl apply -f spark-pagerank.yaml
执行如下命令查看Spark作业执行状态并等待作业执行结果。
kubectl get sparkapplications spark-pagerank
预期输出。
NAME STATUS ATTEMPTS START FINISH AGE spark-pagerank COMPLETED 1 2024-10-09T12:54:25Z 2024-10-09T12:55:46Z 90s
执行如下命令查看Driver pod日志输出的最后20行。
kubectl logs spark-pagerank-driver --tail=20
预期输出。
使用Hadoop OSS SDK
从日志中可以看到,Spark作业已经成功运行结束。
30024 has rank: 1.0709659078941967 . 21390 has rank: 0.9933356174074005 . 28500 has rank: 1.0404018494028928 . 2137 has rank: 0.9931000490520374 . 3406 has rank: 0.9562543137167121 . 20904 has rank: 0.8827028621652337 . 25604 has rank: 1.0270134041934191 . 24/10/09 12:48:36 INFO SparkUI: Stopped Spark web UI at http://spark-pagerank-dd0d4d927151c9d0-driver-svc.default.svc:4040 24/10/09 12:48:36 INFO KubernetesClusterSchedulerBackend: Shutting down all executors 24/10/09 12:48:36 INFO KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asking each executor to shut down 24/10/09 12:48:36 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed. 24/10/09 12:48:36 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 24/10/09 12:48:36 INFO MemoryStore: MemoryStore cleared 24/10/09 12:48:36 INFO BlockManager: BlockManager stopped 24/10/09 12:48:36 INFO BlockManagerMaster: BlockManagerMaster stopped 24/10/09 12:48:36 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 24/10/09 12:48:36 INFO SparkContext: Successfully stopped SparkContext 24/10/09 12:48:36 INFO ShutdownHookManager: Shutdown hook called 24/10/09 12:48:36 INFO ShutdownHookManager: Deleting directory /tmp/spark-e8b8c2ab-c916-4f84-b60f-f54c0de3a7f0 24/10/09 12:48:36 INFO ShutdownHookManager: Deleting directory /var/data/spark-c5917d98-06fb-46fe-85bc-199b839cb885/spark-23e2c2ae-4754-43ae-854d-2752eb83b2c5
使用JindoSDK
从日志中可以看到,Spark作业已经成功运行结束。
21390 has rank: 0.9933356174074005 . 28500 has rank: 1.0404018494028928 . 2137 has rank: 0.9931000490520374 . 3406 has rank: 0.9562543137167121 . 20904 has rank: 0.8827028621652337 . 25604 has rank: 1.0270134041934191 . 24/10/09 12:55:44 INFO SparkContext: SparkContext is stopping with exitCode 0. 24/10/09 12:55:44 INFO SparkUI: Stopped Spark web UI at http://spark-pagerank-6a5e3d9271584856-driver-svc.default.svc:4040 24/10/09 12:55:44 INFO KubernetesClusterSchedulerBackend: Shutting down all executors 24/10/09 12:55:44 INFO KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asking each executor to shut down 24/10/09 12:55:44 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed. 24/10/09 12:55:45 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 24/10/09 12:55:45 INFO MemoryStore: MemoryStore cleared 24/10/09 12:55:45 INFO BlockManager: BlockManager stopped 24/10/09 12:55:45 INFO BlockManagerMaster: BlockManagerMaster stopped 24/10/09 12:55:45 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 24/10/09 12:55:45 INFO SparkContext: Successfully stopped SparkContext 24/10/09 12:55:45 INFO ShutdownHookManager: Shutdown hook called 24/10/09 12:55:45 INFO ShutdownHookManager: Deleting directory /var/data/spark-87e8406e-06a7-4b4a-b18f-2193da299d35/spark-093a1b71-121a-4367-9d22-ad4e397c9815 24/10/09 12:55:45 INFO ShutdownHookManager: Deleting directory /tmp/spark-723e2039-a493-49e8-b86d-fff5fd1bb168
(可选)步骤五:环境清理
如果您已体验完本教程,相关资源如不再需要,可以通过执行以下命令进行删除。
执行如下命令删除Spark作业。
kubectl delete -f spark-pagerank.yaml
执行如下命令删除Secret资源。
kubectl delete -f oss-secret.yaml