本教程介绍如何快速开发基于Kubeflow Pipelines的机器学习工作流。
可体验到
- 在Kubernates中创建共享存储
- 基于Kubeflow Pipelines的机器学习工作流
前置知识
- Kubernetes Volume
- 创建NAS存储
- 了解Python语言
- 手写数字识别训练
前提条件
在开始之前您需要先在Kubernates集群上部署Kubeflow Pipelines服务,请参见在阿里云容器服务上部署Kubeflow Pipelines服务。
步骤一:创建共享存储
- 登录NAS控制台,在集群所在可用区下创建一个文件系统。
- 在NFS Server中创建/data目录。
其中NFS_SERVER_IP请替换成真实NAS服务器地址。
mkdir -p /nfs mount -t nfs -o vers=4.0 NFS_SERVER_IP:/ /nfs mkdir -p /data umount /nfs
- 创建Persistent Volume。
vim nfs-pvc.yaml
新增以下内容:
其中NFS_SERVER_IP请替换成真实NAS服务器地址。
apiVersion: v1 kind: PersistentVolume metadata: name: user-susan labels: user-susan: pipelines spec: persistentVolumeReclaimPolicy: Retain capacity: storage: 10Gi accessModes: - ReadWriteMany nfs: server: NFS_SERVER_IP path: "/data"
执行创建命令:kubectl create -f nfs-pv.yaml
- 创建Persistent Volume Claim。
vim nfs-pvc.yaml
新增以下内容:
apiVersion: v1 kind: PersistentVolumeClaim metadata: name: user-susan namespace: kubeflow annotations: description: "this is the mnist demo" owner: Tom spec: accessModes: - ReadWriteMany resources: requests: storage: 5Gi selector: matchLabels: user-susan: pipelines
执行创建命令:kubectl create -f nfs-pvc.yaml
步骤二:开发Pipeline
由于Kubeflow Pipelines提供的例子都是依赖于Google的存储服务,这导致国内的用户无法真正体验Pipelines的能力。阿里云容器服务团队提供了训练MNIST模型的例子,方便您在阿里云上使用和学习Kubeflow Pipelines。
分为以下三步:
- 下载数据。
- 利用TensorFlow进行模型训练。
- 模型导出。
在Kubeflow Pipelines中可以用Python代码描述了这样一个流程,完整代码可以请参见standalone_pipeline.py。

下面将解读以下主要部分代码:
- 首先看一下函数定义部分:
@dsl.pipeline( name='pipeline to run jobs', description='shows how to run pipeline jobs.' ) def sample_pipeline(learning_rate='0.01', dropout='0.9', model_version='1', commit='f097575656f927d86d99dd64931042e1a9003cb2'): """A pipeline for end to end machine learning workflow.""" data=["user-susan:/training"] gpus=1
@dsl.pipeline
是表示工作流的装饰器,这个装饰器中需要定义两个属性,分别是name和description。入口方法sample_pipeline中定义了4个参数learning_rate,dropout,model_version和commit,分别可以在训练和模型导出阶段使用。
- 函数中第一步是数据准备工作,这里我们提供了arena.standalone_job_op的Python API。
- name:工作流中步骤的名称。
- image:要使用的容器镜像。
- data:要使用的数据以及其对应到容器内部的挂载目录,这里的data是一个数组格式,例如
data=["user-susan:/training"]
,表示可以挂载到多个数据,user-susan是之前创建的Persistent Volume Claim,而/training为容器内部的挂载目录。
prepare_data = arena.standalone_job_op( name="prepare-data", image="byrnedo/alpine-curl", data=data, command="mkdir -p /training/dataset/mnist && \ cd /training/dataset/mnist && \ curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/t10k-images-idx3-ubyte.gz && \ curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/t10k-labels-idx1-ubyte.gz && \ curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/train-images-idx3-ubyte.gz && \ curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/train-labels-idx1-ubyte.gz")
上述步骤实际上是从指定地址利用curl下载数据到分布式存储对应的目录/training/dataset/mnist,这里的/training为分布式存储的根目录,类似根mount点,而/training/dataset/mnist是子目录。
- 第二步是利用下载到分布式存储的数据,并通过git指定固定commit id下载代码,并进行模型训练。
train = arena.standalone_job_op( name="train", image="tensorflow/tensorflow:1.11.0-gpu-py3", sync_source="https://code.aliyun.com/xiaozhou/tensorflow-sample-code.git", env=["GIT_SYNC_REV=%s" % (commit)], gpus=gpus, data=data, command=''' echo %s;python code/tensorflow-sample-code/tfjob/docker/mnist/main.py \ --max_steps 500 --data_dir /training/dataset/mnist \ --log_dir /training/output/mnist --learning_rate %s \ --dropout %s''' % (prepare_data.output, learning_rate, dropout), metrics=["Train-accuracy:PERCENTAGE"])
可以看到这个步骤比数据准备要相对复杂一点,除了第一步骤中的name,image, data和command之外,在模型训练步骤中,还需要指定:
- 获取代码的方式:从可重现实验的角度来看,对于运行试验代码的追本溯源,是非常重要的一环。可以在API调用时指定sync_source的git代码源,同时通过设定env中GIT_SYNC_REV指定训练代码的commit id。
- gpu:默认为0,就是不使用GPU;如果为大于0的整数值,就代表该步骤需要这个数量的GPU数。
- metrics:同样是从可重现和可比较的实验目的出发,用户可以将需要的一系列指标导出,并且通过Pipelines UI上直观的显示和比较。具体使用方法分为两步。
- 在调用API时以数组的形式指定要收集指标的metrics、name和指标的展示格式PERCENTAGE或者是RAW,例如
metrics=["Train-accuracy:PERCENTAGE"]
。 - 由于Pipelines默认会从stdout日志中收集指标,您需要在真正运行的模型代码中输出
{metrics name}={value}
或者{metrics name}:{value}
,可以参考具体样例代码。
- 在调用API时以数组的形式指定要收集指标的metrics、name和指标的展示格式PERCENTAGE或者是RAW,例如
在本步骤中指定了和prepare_data相同的data参数
["user-susan:/training"]
,就可以在训练代码中读到对应的数据,例如--data_dir /training/dataset/mnist
。由于该步骤依赖于prepare_data,可以在方法中通过指定prepare_data.output
表示两个步骤的依赖关系。 - 最后export_model是基于train训练产生的checkpoint,生成训练模型:
export_model = arena.standalone_job_op( name="export-model", image="tensorflow/tensorflow:1.11.0-py3", sync_source="https://code.aliyun.com/xiaozhou/tensorflow-sample-code.git", env=["GIT_SYNC_REV=%s" % (commit)], data=data, command="echo %s;python code/tensorflow-sample-code/tfjob/docker/mnist/export_model.py --model_version=%s --checkpoint_path=/training/output/mnist /training/output/models" % (train.output, model_version))
export_model和第二步train类似,它只是从git同步模型导出代码并且利用共享目录/training/output/mnist中的checkpoint执行模型导出。
步骤三:提交Pipeline
KFP_SERVICE="ml-pipeline.kubeflow.svc.cluster.local:8888"
import kfp.compiler as compiler
compiler.Compiler().compile(sample_pipeline, __file__ + '.tar.gz')
client = kfp.Client(host=KFP_SERVICE)
try:
experiment_id = client.get_experiment(experiment_name=EXPERIMENT_NAME).id
except:
experiment_id = client.create_experiment(EXPERIMENT_NAME).id
run = client.run_pipeline(experiment_id, RUN_ID, __file__ + '.tar.gz',
params={'learning_rate':learning_rate,
'dropout':dropout,
'model_version':model_version,
'commit':commit})
上述代码中:
- 利用compiler.compile将Python代码编译成执行引擎(Argo)识别的DAG配置文件。
- 通过Kubeflow Pipeline的客户端创建或者找到已有的实验,并且提交之前编译出的DAG配置文件。
下面介绍如何在Kubernetes内执行提交代码:
- 在集群内准备一个python3的环境,并且安装Kubeflow Pipelines SDK。
kubectl create job pipeline-client --namespace kubeflow --image python:3 -- sleep infinity kubectl exec -it -n kubeflow $(kubectl get po -l job-name=pipeline-client -n kubeflow | grep -v NAME| awk '{print $1}') bash
- 登录到Python3的环境后,执行如下命令,连续提交两个不同参数的任务。
pip3 install http://kubeflow.oss-cn-beijing.aliyuncs.com/kfp/0.1.14/kfp.tar.gz --upgrade pip3 install http://kubeflow.oss-cn-beijing.aliyuncs.com/kfp-arena/kfp-arena-0.4.tar.gz --upgrade curl -O https://raw.githubusercontent.com/cheyang/pipelines/update_standalone_sample/samples/arena-samples/standalonejob/standalone_pipeline.py python3 standalone_pipeline.py --learning_rate 0.0001 --dropout 0.8 --model_version 2 python3 standalone_pipeline.py --learning_rate 0.0005 --dropout 0.8 --model_version 3
步骤四:查看运行结果
https://{pipeline地址}/pipeline/#/experiments
, 例如
https://192.168.285.171/pipeline/#/experiments

单击Compare runs按钮,可以比较两个实验的输入,花费的时间和精度等一系列指标。让实验可追溯是让实验可重现的第一步;而利用Kubeflow Pipelines本身的实验管理能力则是开启实验可重现的第一步。

在文档使用中是否遇到以下问题
更多建议
匿名提交