在ACK上运行Spark作业,并用Alluxio进行分布式加速,可以通过ACK Spark Operator来简化提交作业的操作。同时ACK也提供了Spark History
Server用来记录作业历史数据,方便排查问题。本文介绍如何在ACK上搭建Spark运行环境。
创建ACK集群
具体操作,请参见创建Kubernetes托管版集群。配置集群参数时,请注意以下内容:
- 设置Worker节点的实例规格时,请根据实际业务情况进行选择。本文选择大数据网络增强型的ecs.d1ne.6xlarge规格,并设置节点数量为20。
- 每个ecs.d1ne.6xlarge节点实例自带12块5 TB的HDD数据盘。您需要对这12个数据盘进行分区格式化挂载。具体操作,请参见分区格式化大于2 TiB数据盘。
- 格式化并挂载完成后,执行
df -h
命令,可以看到如下图所示的挂载信息。
- /mnt目录下的12个文件路径会在Alluxio中用到。当集群节点多时,手工挂载数据盘的操作十分繁琐,ACK简化了操作方式,具体操作,请参见通过LVM数据卷管理本地存储。
创建OSS存储空间
您需要创建一个OSS存储空间(Bucket)用来存放TPC-DS生成的数据、测试结果和测试过程中的日志等。本文示例中设置Bucket名称为cloudnativeai。关于创建OSS Bucket的具体操作,请参见创建存储空间。
安装ack-spark-operator
通过安装ack-spark-operator组件,您可以使用ACK Spark Operator简化提交作业的操作。
- 登录容器服务管理控制台,在左侧导航栏中选择。
- 在应用市场页面单击应用目录页签,搜索并单击ack-spark-operator应用。
- 在ack-spark-operator页面,单击一键部署。
- 在基本信息配置向导,选择集群和命名空间,然后单击下一步,设置相应参数,然后单击确定。
安装ack-spark-history-server
ACK Spark History Server通过记录Spark执行任务过程中的日志和事件信息,并提供UI界面,帮助排查问题。
在安装ack-spark-history-server组件时,您需在参数页签配置OSS相关的信息,用于存储Spark历史数据。关于创建ack-spark-history-server的具体操作,请参见安装ack-spark-operator。
配置OSS信息如下:
oss:
enableOSS: true
# Please input your accessKeyId
alibabaCloudAccessKeyId: ""
# Please input your accessKeySecret
alibabaCloudAccessKeySecret: ""
# oss bucket endpoint such as oss-cn-beijing.aliyuncs.com
alibabaCloudOSSEndpoint: ""
# oss file path such as oss://bucket-name/path
eventsDir: "oss://cloudnativeai/spark/spark-events"
安装完成后,执行以下命令,查看安装是否成功。
kubectl get service ack-spark-history-server -n {YOUR-NAMESPACE}
安装配置Fluid和Alluxio
- 安装Fluid。
- 未安装云原生AI套件:安装时选中Fluid数据加速。具体操作,请参见安装云原生AI套件。
- 已安装云原生AI套件:在容器服务管理控制台的云原生AI套件页面的组件列表区域部署ack-fluid。
- 在集群中创建Dataset,配置Alluxio。
- 使用以下内容,创建spark-alluxio.yaml文件。
apiVersion: data.fluid.io/v1alpha1
kind: Dataset
metadata:
name: spark-alluxio
spec:
mounts:
- mountPoint: oss://{"Your oss bucket"}
name: spark-dataset
options:
fs.oss.accessKeyId: "Your Access Key"
fs.oss.accessKeySecret: "Your Access Key Secret"
fs.oss.endpoint: "Your oss Endpoint"
---
apiVersion: data.fluid.io/v1alpha1
kind: AlluxioRuntime
metadata:
name: spark-alluxio
spec:
replicas: 3 # alluxio worker number
data:
replicas: 1 # data backup number
tieredstore:
levels:
- mediumtype: HDD
path: /mnt/disk1,/mnt/disk2,/mnt/disk3,/mnt/disk4,/mnt/disk5,/mnt/disk6,/mnt/disk7,/mnt/disk8,/mnt/disk9,/mnt/disk10,/mnt/disk11,/mnt/disk12
quotaList: 1024G,1024G,1024G,1024G,1024G,1024G,1024G,1024G,1024G,1024G,1024G,1024G
high: "0.95"
low: "0.7"
properties:
# alluxio master
alluxio.master.metastore: ROCKS
alluxio.master.journal.folder: /journal
alluxio.master.journal.type: UFS
alluxio.master.metastore.inode.cache.max.size: "10000000"
alluxio.master.journal.log.size.bytes.max: 500MB
alluxio.master.metadata.sync.concurrency.level: "128"
alluxio.master.metadata.sync.executor.pool.size: "128"
alluxio.master.metadata.sync.ufs.prefetch.pool.size: "128"
alluxio.master.rpc.executor.max.pool.size: "1024"
alluxio.master.rpc.executor.core.pool.size: "128"
# alluxio worker
alluxio.worker.allocator.class: alluxio.worker.block.allocator.GreedyAllocator
alluxio.worker.network.reader.buffer.size: 32MB
alluxio.worker.file.buffer.size: 320MB
alluxio.worker.block.master.client.pool.size: "1024"
# alluxio user
alluxio.user.block.worker.client.pool.min: "512"
alluxio.user.file.writetype.default: MUST_CACHE
alluxio.user.ufs.block.read.location.policy: alluxio.client.block.policy.LocalFirstAvoidEvictionPolicy
alluxio.user.block.write.location.policy.class: alluxio.client.block.policy.LocalFirstAvoidEvictionPolicy
alluxio.user.block.size.bytes.default: 16MB
alluxio.user.streaming.reader.chunk.size.bytes: 32MB
alluxio.user.local.reader.chunk.size.bytes: 32MB
alluxio.user.metrics.collection.enabled: "false"
alluxio.user.update.file.accesstime.disabled: "true"
alluxio.user.file.passive.cache.enabled: "false"
alluxio.user.block.avoid.eviction.policy.reserved.size.bytes: 2GB
alluxio.user.block.master.client.pool.gc.threshold: 2day
alluxio.user.file.master.client.threads: "1024"
alluxio.user.block.master.client.threads: "1024"
alluxio.user.file.readtype.default: CACHE
alluxio.user.metadata.cache.enabled: "true"
alluxio.user.metadata.cache.expiration.time: 2day
alluxio.user.metadata.cache.max.size: "1000000"
alluxio.user.direct.memory.io.enabled: "true"
alluxio.user.worker.list.refresh.interval: 2min
alluxio.user.logging.threshold: 1000ms
# other alluxio configurations
alluxio.web.ui.enabled: "false"
alluxio.security.stale.channel.purge.interval: 365d
alluxio.job.worker.threadpool.size: "164"
master:
jvmOptions:
- "-Xmx6G"
- "-XX:+UnlockExperimentalVMOptions"
- "-XX:ActiveProcessorCount=8"
worker:
jvmOptions:
- "-Xmx12G"
- "-XX:+UnlockExperimentalVMOptions"
- "-XX:MaxDirectMemorySize=32g"
- "-XX:ActiveProcessorCount=8"
resources:
limits:
cpu: 4
说明
Dataset
名称和AlluxioRuntime
名称需保持一致。
AlluxioRuntime. tieredstore
中的mediumtype
、path
、quotaList
参数,请根据ACK集群中节点挂载的数据盘的实际规格和配置填写。如果您有多个数据盘,请使用半角逗号(,)分隔。
- 执行以下命令,部署Dataset。
kubectl create -f spark-alluxio.yaml
- 执行以下命令,查看Dataset是否部署成功。
预期输出:
NAME UFS TOTAL SIZE CACHED CACHE CAPACITY CACHED PERCENTAGE PHASE AGE
spark-alluxio 10.89GiB 0.00B 6.00TiB 0.0% Bound 7m4s
输出结果中存在
spark-alluxio
,表示Dataset部署成功。
- 执行以下命令,查看spark-alluxio Pod运行状态。
kubectl get pods | grep spark-alluxio
NAME READY STATUS RESTARTS AGE
spark-alluxio-master-0 2/2 Running 0 2m18s
spark-alluxio-worker-0 2/2 Running 0 109s
spark-alluxio-worker-1 2/2 Running 0 108s
spark-alluxio-worker-2 2/2 Running 0 106s
...
- 进入Alluxio Master节点,执行以下命令,查看数据盘挂载是否成功。
本文以
spark-alluxio-master-0
为例。
kubectl exec -it spark-alluxio-master-0 -n alluxio -- /bin/bash
./bin/alluxio fsadmin report capacity
若每个Worker节点上都存在挂载的数据盘,说明Fluid和Alluxio安装配置成功。
- 执行以下命令,查看Dataset的Endpoint。
kubectl describe dataset spark-alluxio
预期输出:
Name: spark-alluxio
Namespace: default
API Version: data.fluid.io/v1alpha1
Kind: Dataset
...
Spec:
Hcfs:
Endpoint: alluxio://spark-alluxio-master-0.default:20496
Underlayer File System Version: 3.3.0
...
您可以在Spark任务中使用
alluxio://spark-alluxio-master-0.default:20496
这个Endpoint访问Alluxio缓存的数据。