本文介绍如何使用Fluid加速数据访问,通过JindoRuntime优化OSS数据访问,从而提升数据密集型应用的性能。
前提条件
已部署ack-spark-operator组件,请参见部署ack-spark-operator组件。
说明本教程在部署时使用了配置参数
spark.jobNamespaces=["spark"]
,如果您配置了不同的命名空间,在体验本教程时需要相应地修改namespace
字段。已安装云原生AI套件并部署ack-fluid组件。具体操作,请参见安装云原生AI套件。
已生成测试数据并上传至OSS。具体操作,请参见准备测试数据并上传至OSS。
Fluid简介
Fluid是一个开源的Kubernetes原生的分布式数据集编排和加速引擎,主要服务于云原生场景下的数据密集型应用,例如大数据应用、AI 应用等。Fluid的核心功能包括:
数据集抽象原生支持:将数据密集型应用所需基础支撑能力功能化,实现数据高效访问并降低多维管理成本。
可扩展的数据引擎插件:提供统一的访问接口,方便接入第三方存储,通过不同的Runtime实现数据操作。
自动化的数据操作:提供多种操作模式,与自动化运维体系相结合。
数据弹性与调度:将数据缓存技术和弹性扩缩容、数据亲和性调度能力相结合,提高数据访问性能。
运行时平台无关:支持原生、边缘、Serverless Kubernetes集群、Kubernetes多集群等多样化环境,适用于混合云场景。
关于Fluid的更多信息,请参见弹性数据集。
步骤一:创建Fluid专属节点池
在ACK集群中创建名为fluid
的专属节点池用于部署Fluid中JindoRuntime Worker pod。本教程中示例节点池中共3个节点,实例规格均为大数据网络增强型实例ecs.d1ne.4xlarge,每个节点打上标签fluid-cloudnative.github.io/node="true"
和污点fluid-cloudnative.github.io/node="true":NoSchedule
,每个节点带有8块大小为5905 GB的高吞吐SATA HDD本地盘,分别被格式化和挂载至/mnt/disk1
、/mnt/disk2
、... 和 /mnt/disk8
。关于创建节点池的具体操作,请参见创建和管理节点池。关于节点池的选型,请参见Fluid数据缓存优化策略最佳实践。
步骤二:创建Dataset
创建如下Secret清单文件并保存为
fluid-oss-secret.yaml
,用于存储OSS访问凭据。请将
<ACCESS_KEY_ID>
和<ACCESS_KEY_SECRET>
替换为您的阿里云AccessKey ID和AccessKey Secret。apiVersion: v1 kind: Secret metadata: name: fluid-oss-secret namespace: spark stringData: OSS_ACCESS_KEY_ID: <ACCESS_KEY_ID> OSS_ACCESS_KEY_SECRET: <ACCESS_KEY_SECRET>
执行如下命令,创建Secret资源。
kubectl create -f fluid-oss-secret.yaml
预期输出如下:
secret/fluid-oss-secret created
创建如下DataSet清单文件并保存为
spark-fluid-dataset.yaml
。apiVersion: data.fluid.io/v1alpha1 kind: Dataset metadata: name: spark namespace: spark spec: mounts: - name: spark # 需要加速的OSS访问路径,需将<OSS_BUCKET>替换成您的OSS存储桶名称。 mountPoint: oss://<OSS_BUCKET>/ path: / options: # OSS访问端点,需将<OSS_ENDPOINT>替换成您的OSS存储桶访问端点, # 例如,北京地域OSS内网访问端点为oss-cn-beijing-internal.aliyuncs.com。 fs.oss.endpoint: <OSS_ENDPOINT> encryptOptions: - name: fs.oss.accessKeyId valueFrom: secretKeyRef: name: fluid-oss-secret key: OSS_ACCESS_KEY_ID - name: fs.oss.accessKeySecret valueFrom: secretKeyRef: name: fluid-oss-secret key: OSS_ACCESS_KEY_SECRET # 数据将缓存到满足亲和性的节点上。 nodeAffinity: required: nodeSelectorTerms: - matchExpressions: - key: fluid-cloudnative.github.io/node operator: In values: - "true" # 节点污点容忍。 tolerations: - key: fluid-cloudnative.github.io/node operator: Equal value: "true" effect: NoSchedule
其中各个字段的说明如下:
mountPoint
:需要加速的OSS路径。fs.oss.endpoint
:OSS Bucket访问端点,例如北京地域内网端点为oss-cn-beijing-internal.aliyuncs.com
。encryptOptions
:配置了从名为fluid-oss-secret
的Secret中读取OSS_ACCESS_KEY_ID
和OSS_ACCESS_KEY_SECRET
作为OSS访问凭据。
执行如下命令,创建Dataset资源。
kubectl create -f spark-fluid-dataset.yaml
预期输出:
dataset.data.fluid.io/spark created
执行如下命令查看Dataset部署状态。
kubectl get -n spark dataset spark -o wide
预期输出:
NAME UFS TOTAL SIZE CACHED CACHE CAPACITY CACHED PERCENTAGE PHASE HCFS URL TOTAL FILES CACHE HIT RATIO AGE spark NotBound 58m
可以看到此时Dataset处于
NotBound
状态。
步骤三:创建JindoRuntime
根据如下内容创建JindoRuntime清单文件并保存为
spark-fluid-jindoruntime.yaml
。apiVersion: data.fluid.io/v1alpha1 kind: JindoRuntime metadata: # 必须和Dataset的名称相同。 name: spark namespace: spark spec: # Worker副本数量。 replicas: 3 tieredstore: levels: # 缓存类型为HDD磁盘。 - mediumtype: HDD # 数据集类型。 volumeType: hostPath # 需要根据节点上的磁盘数量进行调整。 path: /mnt/disk1,/mnt/disk2,/mnt/disk3,/mnt/disk4,/mnt/disk5,/mnt/disk6,/mnt/disk7,/mnt/disk8 # 单个Worker能提供的缓存容量。 quotaList: 5500Gi,5500Gi,5500Gi,5500Gi,5500Gi,5500Gi,5500Gi,5500Gi high: "0.99" low: "0.95" worker: resources: requests: cpu: 14 memory: 56Gi limits: cpu: 14 memory: 56Gi
其中各个字段说明如下:
replicas
:创建JindoFS集群的Worker数量。mediumtype
:缓存类型。path
:存储路径。quota
:缓存系统最大容量。high
:存储容量上限大小。low
:存储容量下限大小。
执行如下命令创建JindoRuntime资源。
kubectl create -f spark-fluid-jindoruntime.yaml
预期输出:
jindoruntime.data.fluid.io/spark created
执行如下命令,查看JindoRuntime部署状态。
kubectl get -n spark jindoruntime spark
预期输出:
NAME MASTER PHASE WORKER PHASE FUSE PHASE AGE spark Ready Ready Ready 2m28s
可以看到
FUSE PHASE
状态为Ready
,表示JindoRuntime部署成功。执行如下命令,再次查看Dataset部署状态:
kubectl get -n spark dataset spark -o wide
预期输出:
NAME UFS TOTAL SIZE CACHED CACHE CAPACITY CACHED PERCENTAGE PHASE HCFS URL TOTAL FILES CACHE HIT RATIO AGE spark [Calculating] 0.00B 128.91TiB Bound spark-jindofs-master-0.spark:19434 [Calculating] 2m5
可以看到Dataset状态已经变为
Bound
,表示Dataset已经部署成功。
(可选)步骤四:进行数据预热
由于首次访问无法命中数据缓存,Fluid提供了DataLoad缓存预热操作,以提升首次数据访问的效率。数据预热通过预先将数据加载到缓存中,提高数据访问速度,从而优化数据处理效率和系统性能。
根据如下内容创建Dataload清单文件并保存为
spark-fluid-dataload.yaml
。apiVersion: data.fluid.io/v1alpha1 kind: DataLoad metadata: name: spark namespace: spark spec: dataset: name: spark namespace: spark loadMetadata: true
执行如下命令,创建Dataload资源。
kubectl create -f spark-fluid-dataload.yaml
预期输出:
dataload.data.fluid.io/spark created
执行如下命令,观察数据预热进度。
kubectl get -n spark dataload spark -w
预期输出:
NAME DATASET PHASE AGE DURATION spark spark Executing 20s Unfinished spark spark Complete 9m31s 8m37s
可以看到这次数据预热总共耗时
8m37s
。执行如下命令,再次查看Dataset状态。
kubectl get -n spark dataset spark -o wide
预期输出:
NAME UFS TOTAL SIZE CACHED CACHE CAPACITY CACHED PERCENTAGE PHASE HCFS URL TOTAL FILES CACHE HIT RATIO AGE spark 0.00B 326.85GiB 128.91TiB 0.0% Bound spark-jindofs-master-0.spark:19434 [Calculating] 19m
在前一次查看Dataset状态时,缓存的数据量为
0.00B
,经过数据预热之后,缓存的数据量已经变为326.85GiB
。
步骤五:运行示例Spark作业
方式一:使用Posix文件系统接口
根据如下内容创建SparkApplication清单文件并保存为
spark-pagerank-fluid-posix.yaml
。apiVersion: sparkoperator.k8s.io/v1beta2 kind: SparkApplication metadata: name: spark-pagerank-fluid-posix namespace: spark spec: type: Scala mode: cluster image: spark:3.5.4 mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.4.jar mainClass: org.apache.spark.examples.SparkPageRank arguments: # 用file://格式访问本地文件。 - file:///mnt/fluid/data/pagerank_dataset.txt - "10" sparkVersion: 3.5.4 driver: cores: 1 coreLimit: 1200m memory: 512m volumeMounts: # 将dataset对应的PVC挂载至/mnt/fluid路径。 - name: spark mountPath: /mnt/fluid serviceAccount: spark-operator-spark executor: instances: 2 cores: 1 coreLimit: "1" memory: 4g volumeMounts: # 将dataset对应的PVC挂载至/mnt/fluid路径。 - name: spark mountPath: /mnt/fluid volumes: # 添加Fluid创建出的与dataset同名的PVC。 - name: spark persistentVolumeClaim: claimName: spark restartPolicy: type: Never
说明上述作业中使用到的Spark镜像来自于社区,如果您遇到网络原因导致无法拉取镜像,请同步社区镜像或自行构建镜像并推送到您自己的镜像仓库中。
执行如下命令,提交Spark作业。
kubectl create -f spark-pagerank-fluid-posix.yaml
预期输出:
sparkapplication.sparkoperator.k8s.io/spark-pagerank-fluid-posix created
执行如下命令,查看Spark作业状态:
kubectl get -n spark sparkapplication spark-pagerank-fluid-posix -w
预期输出:
NAME STATUS ATTEMPTS START FINISH AGE spark-pagerank-fluid-posix RUNNING 1 2025-01-16T11:06:15Z <no value> 87s spark-pagerank-fluid-posix RUNNING 1 2025-01-16T11:06:15Z <no value> 102s spark-pagerank-fluid-posix RUNNING 1 2025-01-16T11:06:15Z <no value> 102s spark-pagerank-fluid-posix SUCCEEDING 1 2025-01-16T11:06:15Z 2025-01-16T11:07:59Z 104s spark-pagerank-fluid-posix COMPLETED 1 2025-01-16T11:06:15Z 2025-01-16T11:07:59Z 104s
可以看到作业已经成功运行完成。
方式二:使用HCFS文件系统接口
执行如下命令,查看Dataset的HCFS访问URL。
kubectl get -n spark dataset spark -o wide
预期输出:
NAME UFS TOTAL SIZE CACHED CACHE CAPACITY CACHED PERCENTAGE PHASE HCFS URL TOTAL FILES CACHE HIT RATIO AGE spark 0.00B 326.85GiB 128.91TiB 0.0% Bound spark-jindofs-master-0.spark:19434 [Calculating] 30m
可以看到,Dataset 的 HCFS URL 为
spark-jindofs-master-0.spark:19434
,在配置 Spark 作业时需要将参数fs.jindofsx.namespace.rpc.address
配置成该值。根据如下内容创建SparkApplication清单文件并保存为
spark-pagerank-fluid-hcfs.yaml
。apiVersion: sparkoperator.k8s.io/v1beta2 kind: SparkApplication metadata: name: spark-pagerank-fluid-hcfs namespace: spark spec: type: Scala mode: cluster # 需将 <SPARK_IMAGE> 替换成您自己的Spark镜像,该镜像中需要包含JindoSDK依赖。 image: <SPARK_IMAGE> mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.4.jar mainClass: org.apache.spark.examples.SparkPageRank arguments: # 从以下三种方式中选择一种,需将<OSS_BUCKET>替换成您的 OSS 存储桶名称。 # 方式一:使用 oss:// 格式访问 OSS 数据。 - oss://<OSS_BUCKET>/data/pagerank_dataset.txt # 方式二:使用 s3:// 格式访问OSS数据。 # - s3://<OSS_BUCKET>/data/pagerank_dataset.txt # 方式三:使用 s3a:// 格式访问OSS数据。 # - s3a://<OSS_BUCKET>/data/pagerank_dataset.txt # 迭代次数 - "10" sparkVersion: 3.5.4 hadoopConf: #=================== # 访问 OSS 相关配置 #=================== # 支持使用oss://格式访问OSS数据。 fs.oss.impl: com.aliyun.jindodata.oss.JindoOssFileSystem # OSS访问端点,需将<OSS_ENDPOINT>替换成您的OSS访问端点。 # 例如,北京地域OSS内网访问端点为oss-cn-beijing-internal.aliyuncs.com。 fs.oss.endpoint: <OSS_ENDPOINT> # 从环境变量中读取OSS访问凭据。 fs.oss.credentials.provider: com.aliyun.jindodata.oss.auth.EnvironmentVariableCredentialsProvider # 支持使用s3://格式访问OSS数据。 fs.s3.impl: com.aliyun.jindodata.s3.JindoS3FileSystem # OSS 访问端点,需将<OSS_ENDPOINT>替换成您的OSS访问端点。 # 例如,北京地域OSS内网访问端点为oss-cn-beijing-internal.aliyuncs.com。 fs.s3.endpoint: <OSS_ENDPOINT> # 从环境变量中读取OSS访问凭据。 fs.s3.credentials.provider: com.aliyun.jindodata.oss.auth.EnvironmentVariableCredentialsProvider # 支持使用s3a://格式访问OSS数据。 fs.s3a.impl: com.aliyun.jindodata.s3.JindoS3FileSystem # OSS 访问端点,需将<OSS_ENDPOINT> 替换成您的 OSS 访问端点。 # 例如,北京地域OSS内网访问端点为oss-cn-beijing-internal.aliyuncs.com。 fs.s3a.endpoint: <OSS_ENDPOINT> # 从环境变量中读取OSS访问凭据。 fs.s3a.credentials.provider: com.aliyun.jindodata.oss.auth.EnvironmentVariableCredentialsProvider #=================== # JindoFS 相关配置 #=================== fs.xengine: jindofsx # Dataset的HCFS URL。 fs.jindofsx.namespace.rpc.address: spark-jindofs-master-0.spark:19434 fs.jindofsx.data.cache.enable: "true" driver: cores: 1 coreLimit: 1200m memory: 512m envFrom: - secretRef: name: spark-oss-secret serviceAccount: spark-operator-spark executor: instances: 2 cores: 2 coreLimit: "2" memory: 8g envFrom: - secretRef: name: spark-oss-secret restartPolicy: type: Never
说明上述示例作业中使用到的Spark镜像需要包含JindoSDK依赖,您可以参考如下Dockerfile自行构建镜像并推送到自己的镜像仓库中:
ARG SPARK_IMAGE=spark:3.5.4 FROM ${SPARK_IMAGE} # Add dependency for JindoSDK support ADD --chown=spark:spark --chmod=644 https://jindodata-binary.oss-cn-shanghai.aliyuncs.com/mvn-repo/com/aliyun/jindodata/jindo-core/6.4.0/jindo-core-6.4.0.jar ${SPARK_HOME}/jars ADD --chown=spark:spark --chmod=644 https://jindodata-binary.oss-cn-shanghai.aliyuncs.com/mvn-repo/com/aliyun/jindodata/jindo-sdk/6.4.0/jindo-sdk-6.4.0.jar ${SPARK_HOME}/jars
执行如下命令提交Spark作业。
kubectl create -f spark-pagerank-fluid-hcfs.yaml
预期输出:
sparkapplication.sparkoperator.k8s.io/spark-pagerank-fluid-hcfs create
执行如下命令查看Spark作业状态。
kubectl get -n spark sparkapplication spark-pagerank-fluid-hcfs -w
预期输出:
NAME STATUS ATTEMPTS START FINISH AGE spark-pagerank-fluid-hcfs RUNNING 1 2025-01-16T11:21:16Z <no value> 9s spark-pagerank-fluid-hcfs RUNNING 1 2025-01-16T11:21:16Z <no value> 15s spark-pagerank-fluid-hcfs RUNNING 1 2025-01-16T11:21:16Z <no value> 77s spark-pagerank-fluid-hcfs RUNNING 1 2025-01-16T11:21:16Z <no value> 77s spark-pagerank-fluid-hcfs SUCCEEDING 1 2025-01-16T11:21:16Z 2025-01-16T11:22:34Z 78s spark-pagerank-fluid-hcfs COMPLETED 1 2025-01-16T11:21:16Z 2025-01-16T11:22:34Z 78s
(可选)步骤六:环境清理
如果体验教程后无需使用相关资源,请参见以下操作释放创建的测试资源。
kubectl delete -f spark-pagerank-fluid-posix.yaml
kubectl delete -f spark-pagerank-fluid-hcfs.yaml
kubectl delete -f spark-fluid-dataload.yaml
kubectl delete -f spark-fluid-jindoruntime.yaml
kubectl delete -f spark-fluid-dataset.yaml
kubectl delete -f fluid-oss-secret.yaml