本教程介绍如何快速开发基于Kubeflow Pipelines的机器学习工作流。

可体验到

  • 在Kubernates中创建共享存储
  • 基于Kubeflow Pipelines的机器学习工作流

前置知识

  • Kubernetes Volume
  • 创建NAS存储
  • 了解Python语言
  • 手写数字识别训练

前提条件

在开始之前您需要先在Kubernates集群上部署Kubeflow Pipelines服务,请参见在阿里云容器服务上部署Kubeflow Pipelines服务

步骤一:创建共享存储

  1. 登录NAS控制台,在集群所在可用区下创建一个文件系统。
  2. 在NFS Server中创建/data目录。

    其中NFS_SERVER_IP请替换成真实NAS服务器地址。

    mkdir -p /nfs
    mount -t nfs -o vers=4.0 NFS_SERVER_IP:/ /nfs
    mkdir -p /data
    umount /nfs
  3. 创建Persistent Volume。
    vim nfs-pvc.yaml

    新增以下内容:

    其中NFS_SERVER_IP请替换成真实NAS服务器地址。

    apiVersion: v1
    kind: PersistentVolume
    metadata:
      name: user-susan
      labels:
        user-susan: pipelines
    spec:
      persistentVolumeReclaimPolicy: Retain
      capacity:
        storage: 10Gi
      accessModes:
      - ReadWriteMany
      nfs:
        server: NFS_SERVER_IP
        path: "/data"
    执行创建命令:
    kubectl create -f nfs-pv.yaml
  4. 创建Persistent Volume Claim。
    vim nfs-pvc.yaml

    新增以下内容:

    apiVersion: v1
    kind: PersistentVolumeClaim
    metadata:
      name: user-susan
      namespace: kubeflow
      annotations:
        description: "this is the mnist demo"
        owner: Tom
    spec:
      accessModes:
        - ReadWriteMany
      resources:
        requests:
           storage: 5Gi
      selector:
        matchLabels:
    	user-susan: pipelines
    执行创建命令:
    kubectl create -f nfs-pvc.yaml

步骤二:开发Pipeline

由于Kubeflow Pipelines提供的例子都是依赖于Google的存储服务,这导致国内的用户无法真正体验Pipelines的能力。阿里云容器服务团队提供了训练MNIST模型的例子,方便您在阿里云上使用和学习Kubeflow Pipelines。

分为以下三步:

  1. 下载数据。
  2. 利用TensorFlow进行模型训练。
  3. 模型导出。

在Kubeflow Pipelines中可以用Python代码描述了这样一个流程,完整代码可以请参见standalone_pipeline.py

Kubeflow Pipelines会将上面的代码转化成一个有向无环图(DAG),其中的每一个节点就是Component(组件),而Component(组件)之间的连线代表它们之间的依赖关系。从Pipelines UI可以看到DAG图:DAG

下面将解读以下主要部分代码:

  1. 首先看一下函数定义部分:
    @dsl.pipeline(
      name='pipeline to run jobs',
      description='shows how to run pipeline jobs.'
    )
    def sample_pipeline(learning_rate='0.01',
        dropout='0.9',
        model_version='1',
        commit='f097575656f927d86d99dd64931042e1a9003cb2'):
      """A pipeline for end to end machine learning workflow."""
      data=["user-susan:/training"]
         gpus=1

    @dsl.pipeline是表示工作流的装饰器,这个装饰器中需要定义两个属性,分别是namedescription

    入口方法sample_pipeline中定义了4个参数learning_ratedropoutmodel_versioncommit,分别可以在训练和模型导出阶段使用。

  2. 函数中第一步是数据准备工作,这里我们提供了arena.standalone_job_op的Python API。
    • name:工作流中步骤的名称。
    • image:要使用的容器镜像。
    • data:要使用的数据以及其对应到容器内部的挂载目录,这里的data是一个数组格式,例如data=["user-susan:/training"],表示可以挂载到多个数据,user-susan是之前创建的Persistent Volume Claim,而/training为容器内部的挂载目录。
    prepare_data = arena.standalone_job_op(
        name="prepare-data",
        image="byrnedo/alpine-curl",
        data=data,
        command="mkdir -p /training/dataset/mnist && \
      cd /training/dataset/mnist && \
      curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/t10k-images-idx3-ubyte.gz && \
      curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/t10k-labels-idx1-ubyte.gz && \
      curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/train-images-idx3-ubyte.gz && \
      curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/train-labels-idx1-ubyte.gz")

    上述步骤实际上是从指定地址利用curl下载数据到分布式存储对应的目录/training/dataset/mnist,这里的/training为分布式存储的根目录,类似根mount点,而/training/dataset/mnist是子目录。

  3. 第二步是利用下载到分布式存储的数据,并通过git指定固定commit id下载代码,并进行模型训练。
    train = arena.standalone_job_op(
        name="train",
        image="tensorflow/tensorflow:1.11.0-gpu-py3",
        sync_source="https://code.aliyun.com/xiaozhou/tensorflow-sample-code.git",
        env=["GIT_SYNC_REV=%s" % (commit)],
        gpus=gpus,
        data=data,
        command='''
        echo %s;python code/tensorflow-sample-code/tfjob/docker/mnist/main.py \
        --max_steps 500 --data_dir /training/dataset/mnist \
        --log_dir /training/output/mnist  --learning_rate %s \
        --dropout %s''' % (prepare_data.output, learning_rate, dropout),
    metrics=["Train-accuracy:PERCENTAGE"])

    可以看到这个步骤比数据准备要相对复杂一点,除了第一步骤中的nameimage, datacommand之外,在模型训练步骤中,还需要指定:

    • 获取代码的方式:从可重现实验的角度来看,对于运行试验代码的追本溯源,是非常重要的一环。可以在API调用时指定sync_source的git代码源,同时通过设定envGIT_SYNC_REV指定训练代码的commit id
    • gpu:默认为0,就是不使用GPU;如果为大于0的整数值,就代表该步骤需要这个数量的GPU数。
    • metrics:同样是从可重现和可比较的实验目的出发,用户可以将需要的一系列指标导出,并且通过Pipelines UI上直观的显示和比较。具体使用方法分为两步。
      1. 在调用API时以数组的形式指定要收集指标的metricsname和指标的展示格式PERCENTAGE或者是RAW,例如metrics=["Train-accuracy:PERCENTAGE"]
      2. 由于Pipelines默认会从stdout日志中收集指标,您需要在真正运行的模型代码中输出{metrics name}={value}或者{metrics name}:{value},可以参考具体样例代码

    在本步骤中指定了和prepare_data相同的data参数["user-susan:/training"],就可以在训练代码中读到对应的数据,例如--data_dir /training/dataset/mnist。由于该步骤依赖于prepare_data,可以在方法中通过指定prepare_data.output表示两个步骤的依赖关系。

  4. 最后export_model是基于train训练产生的checkpoint,生成训练模型:
    export_model = arena.standalone_job_op(
        name="export-model",
        image="tensorflow/tensorflow:1.11.0-py3",
        sync_source="https://code.aliyun.com/xiaozhou/tensorflow-sample-code.git",
        env=["GIT_SYNC_REV=%s" % (commit)],
        data=data,
        command="echo %s;python code/tensorflow-sample-code/tfjob/docker/mnist/export_model.py --model_version=%s --checkpoint_path=/training/output/mnist /training/output/models" % (train.output, model_version))

    export_model和第二步train类似,它只是从git同步模型导出代码并且利用共享目录/training/output/mnist中的checkpoint执行模型导出。

步骤三:提交Pipeline

您可以在自己的Kubernetes内将前面开发工作流的Python DSL提交到Kubeflow Pipelines服务中,提交代码如下所示:
 KFP_SERVICE="ml-pipeline.kubeflow.svc.cluster.local:8888"
  import kfp.compiler as compiler
  compiler.Compiler().compile(sample_pipeline, __file__ + '.tar.gz')
  client = kfp.Client(host=KFP_SERVICE)
  try:
    experiment_id = client.get_experiment(experiment_name=EXPERIMENT_NAME).id
  except:
    experiment_id = client.create_experiment(EXPERIMENT_NAME).id
  run = client.run_pipeline(experiment_id, RUN_ID, __file__ + '.tar.gz',
                            params={'learning_rate':learning_rate,
                                     'dropout':dropout,
                                    'model_version':model_version,
                                    'commit':commit})

上述代码中:

  • 利用compiler.compile将Python代码编译成执行引擎(Argo)识别的DAG配置文件。
  • 通过Kubeflow Pipeline的客户端创建或者找到已有的实验,并且提交之前编译出的DAG配置文件。

下面介绍如何在Kubernetes内执行提交代码:

  1. 在集群内准备一个python3的环境,并且安装Kubeflow Pipelines SDK。
    kubectl create job pipeline-client --namespace kubeflow --image python:3 -- sleep infinity
    kubectl  exec -it -n kubeflow $(kubectl get po -l job-name=pipeline-client -n kubeflow | grep -v NAME| awk '{print $1}') bash
  2. 登录到Python3的环境后,执行如下命令,连续提交两个不同参数的任务。
    pip3 install http://kubeflow.oss-cn-beijing.aliyuncs.com/kfp/0.1.14/kfp.tar.gz --upgrade
    pip3 install http://kubeflow.oss-cn-beijing.aliyuncs.com/kfp-arena/kfp-arena-0.4.tar.gz --upgrade
    curl -O https://raw.githubusercontent.com/cheyang/pipelines/update_standalone_sample/samples/arena-samples/standalonejob/standalone_pipeline.py
    python3 standalone_pipeline.py --learning_rate 0.0001 --dropout 0.8 --model_version 2
    python3 standalone_pipeline.py --learning_rate 0.0005 --dropout 0.8 --model_version 3

步骤四:查看运行结果

登录到Kubeflow Pipelines的UI:https://{pipeline地址}/pipeline/#/experiments, 例如
https://11.124.285.171/pipeline/#/experiments
EXP

单击Compare runs按钮,可以比较两个实验的输入,花费的时间和精度等一系列指标。让实验可追溯是让实验可重现的第一步;而利用Kubeflow Pipelines本身的实验管理能力则是开启实验可重现的第一步。

compare