使用任务队列ack-kube-queue

任务队列ack-kube-queue旨在管理Kubernetes中的AI/ML工作负载和批处理工作负载。它允许系统管理员使用自定义队列的作业队列管理,以提高队列的灵活性。结合Quota系统,ack-kube-queue自动优化了工作负载和资源配额管理,以便最大化利用集群资源。本文介绍如何安装及使用任务队列ack-kube-queue。

前提条件

已开通云原生AI套件

使用限制

仅支持ACK Pro托管版集群,且集群版本为1.18.aliyun.1及以上。

安装ack-kube-queue

分为以下两个场景讲解如何安装ack-kube-queue。

场景一:未部署云原生AI套件

  1. 登录容器服务管理控制台,在左侧导航栏单击集群

  2. 集群列表页面,单击目标集群名称,然后在左侧导航栏,选择应用 > 云原生AI套件

  3. 云原生AI套件页面下方,单击一键部署

  4. 调度区域,选中Kube Queue,在交互方式区域,选中Arena,然后在页面下方单击部署云原生AI套件

场景二:已部署云原生AI套件

  1. 登录容器服务管理控制台,在左侧导航栏单击集群

  2. 集群列表页面,单击目标集群名称,然后在左侧导航栏,选择应用 > 云原生AI套件

  3. 安装ack-arenaack-kube-queue

    • 云原生AI套件页面的操作列,单击组件ack-arena对应的部署。在参数配置页面,单击确定

    • 云原生AI套件页面的操作列,单击组件ack-kube-queue对应的部署。在弹出页面,单击确定

    ack-arenaack-kube-queue安装完成后,组件列表区域的组件状态已部署

支持排队功能的任务类型

ack-kube-queue支持TfJob、PytorchJob、MpiJob、Argo Workflow、RayJob、SparkApplication以及原生Job的排队功能。

使用限制

  • TfJob、PytorchJob、MpiJob需要使用ack-arena组件中提供的Operator。

  • 使用原生Job类型排队功能需要集群版本不低于1.22。

  • MpiJob当前仅支持通过Arena提交MpiJob。

  • Argo Workflow当前仅支持对整体进行排队,可以通过在Annotation中申明如下内容,申明Workflow需要的资源。

    ```
     annotations:
       kube-queue/min-resources: |
         cpu: 5
         memory: 5G
    ```

如何开启不同种类的任务排队功能

默认情况下,kube-queue将会支持TfJob,Pytorchjob的排队功能,您可以根据需要开启或关闭任意类型任务的排队功能。

v0.4.0之前

v0.4.0版本之前,每种任务类型的排队功能由一个单独的Deployment工作负载类型控制,您可以在kube-queue命名空间下将对应工作负载的Extension的副本数调整为0来关闭对应的负载类型的排队功能。

v0.4.0及之后

v0.4.0版本及之后的版本中。除Argo Workflow之外,所有的任务类型的排队功能均由Job-Extensions负责,您可以修改其Command中--enabled-extensions参数的值来开关特定任务类型。不同任务类型以逗号分隔,不同任务类型以及在参数中的表示如下表:

TfJob

tfjob

Pytorchjob

pytorchjob

Job

job

SparkApplication

sparkapp

RayJob

rayjob

RayJob(v1alpha1)

rayjobv1alpha1

MpiJob

mpiv1

如何提交TfJob、PytorchJob、MpiJob

您需要在Job的Annotation中添加scheduling.x-k8s.io/suspend="true"的标识,以下以TfJob为例进行说明:

apiVersion: "kubeflow.org/v1"
kind: "TFJob"
metadata:
  name: "job1"
  annotations:
    scheduling.x-k8s.io/suspend: "true"
spec:
  ...

如何提交原生Job

您需要将Job的Suspend字段设置成True,以下是提交一个需要排队的Job的例子:

apiVersion: batch/v1
kind: Job
metadata:
  generateName: pi-
spec:
  suspend: true
  completions: 1
  parallelism: 1
  template:
    spec:
      schedulerName: default-scheduler
      containers:
      - name: pi
        image: perl:5.34.0
        command: ["sleep",  "3s"]
        resources:
          requests:
            cpu: 100m
          limits:
            cpu: 100m
      restartPolicy: Never

以上的例子中,我们将生成一个需求100mCPU的排队单元,当该排队单元出队后,将Job的Suspend改为false,此时该Job将会由集群组件开始执行。

如何提交Argo Workflow

说明

请提前在应用市场中安装ack-workflow组件。

您需要在Argo Workflow中增加名为kube-queue-suspend,类型为suspend的自定义Template,同时在提交时将Workflow设置为Suspend状态,例子如下:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: $example-name
spec:
  suspend: true # 需要将该项设置为true
  entrypoint: $example-entrypoint
  templates:
  # 需要添加名为kube-queue-suspend的suspend模板
  - name: kube-queue-suspend
    suspend: {}
  - name: $example-entrypoint
  ...

如何提交SparkApplication

说明

请提前在应用市场中安装ack-spark-operator组件。

您需要在SparkApplication的Annotaion中scheduling.x-k8s.io/suspend="true"的标识。

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  generateName: spark-pi-suspend-
  namespace: spark-operator
  annotations:
    scheduling.x-k8s.io/suspend: "true"
spec:
  type: Scala
  mode: cluster
  image: registry-cn-beijing.ack.aliyuncs.com/acs/spark:v3.1.1
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.1.1.jar
  sparkVersion: "3.1.1"
  driver:
    cores: 1
    coreLimit: "1200m"
    memory: "512m"
    serviceAccount: ack-spark-operator3.0-spark
  executor:
    cores: 1
    instances: 1
    memory: "512m"

如何提交RayJob

说明

请提前在应用市场中安装ack-kuberay-operator组件。

您需要在提交RayJob时将RayJob的spec.suspend字段设置成true。

apiVersion: ray.io/v1
kind: RayJob
metadata:
  name: rayjob-sample
spec:
  entrypoint: python /home/ray/samples/sample_code.py
  runtimeEnvYAML: |
    pip:
      - requests==2.26.0
      - pendulum==2.1.2
    env_vars:
      counter_name: "test_counter"

  # Suspend specifies whether the RayJob controller should create a RayCluster instance.
  # If a job is applied with the suspend field set to true, the RayCluster will not be created and we will wait for the transition to false.
  # If the RayCluster is already created, it will be deleted. In the case of transition to false, a new RayCluste rwill be created.
  suspend: true

  rayClusterSpec:
    rayVersion: '2.9.0' # should match the Ray version in the image of the containers
    headGroupSpec:
      rayStartParams:
        dashboard-host: '0.0.0.0'
      template:
        spec:
          containers:
            - name: ray-head
              image: rayproject/ray:2.9.0
              ports:
                - containerPort: 6379
                  name: gcs-server
                - containerPort: 8265 # Ray dashboard
                  name: dashboard
                - containerPort: 10001
                  name: client
              resources:
                limits:
                  cpu: "1"
                requests:
                  cpu: "200m"
              volumeMounts:
                - mountPath: /home/ray/samples
                  name: code-sample
          volumes:
            # You set volumes at the Pod level, then mount them into containers inside that Pod
            - name: code-sample
              configMap:
                # Provide the name of the ConfigMap you want to mount.
                name: ray-job-code-sample
                # An array of keys from the ConfigMap to create as files
                items:
                  - key: sample_code.py
                    path: sample_code.py
    workerGroupSpecs:
      - replicas: 1
        minReplicas: 1
        maxReplicas: 5
        groupName: small-group
        rayStartParams: {}
        template:
          spec:
            containers:
              - name: ray-worker # must consist of lower case alphanumeric characters or '-', and must start and end with an alphanumeric character (e.g. 'my-name',  or '123-abc'
                image: rayproject/ray:2.9.0
                lifecycle:
                  preStop:
                    exec:
                      command: [ "/bin/sh","-c","ray stop" ]
                resources:
                  limits:
                    cpu: "1"
                  requests:
                    cpu: "200m"

切换Quota资源类型

当集群中有多个用户时,为了保障用户有足够的资源使用,管理员会将集群的资源固定分配给不同的用户。传统的方法是通过Kubernetes原生的ResourceQuota方式进行固定资源的分配。但由于小组之间资源忙闲不一,为了提升资源整体利用率,ack-kube-queue默认使用ElasticQuota。如果您想切换Quota资源类型为Kubernetes原生的ResourceQuota,可以按照如下步骤操作。关于ElasticQuota的更多信息,请参见ElasticQuota

  1. 执行如下命令,切换ElasticQuota为Kubernetes的ResourceQuota。

    kubectl edit deploy kube-queue-controller -nkube-queue
  2. 修改环境变量elasticquotaresourcequota

    env:
    - name: QueueGroupPlugin
        value: resourcequota
  3. 切换完成后,保存文件。等待新的kube-queue-controller启动,资源分配方式即可切换为ResourceQuota模式。

开启阻塞队列

默认情况下,ack-kube-queue处理任务时采用与kube-scheduler相同的任务轮转机制,所有任务会在队列中依次请求资源,请求失败后进入Unschedulable队列进行退避,直到下次调度。当集群中存在大量资源需求量小的任务时,由于小任务会占用大量队列轮转时间,资源需求量大的任务将难以获得资源执行,存在长时间Pending的风险。为了避免大量小任务轮转导致大任务无法执行,ack-kube-queue提供阻塞队列功能,开启后队列将只调度队列最前端的任务,从而使得大任务能够有机会执行。

开启方法

  1. 登录容器服务管理控制台,在左侧导航栏选择集群

  2. 集群列表页面,单击目标集群名称,然后在左侧导航栏,选择工作负载 > 无状态

  3. 选择命名空间kube-queue,在kube-queue-controller所在行,单击操作列的编辑

  4. 单击环境变量后面的新增,增加如下记录。

    配置项

    取值

    类型

    自定义

    变量名称

    StrictPriority

    变量/变量引用

    true

  5. 单击页面右侧更新。在确认弹出框中单击确定

开启严格优先级调度

默认情况下,请求失败的任务进入Unschedulable队列进行退避,直到下次调度。当集群资源由于任务结束空闲时,高优先级任务由于仍处于退避阶段,不会被任务队列调度,可能会导致集群资源被低优先级任务占用。为了使得高优先级任务在集群获得空闲资源时能够被优先尝试调度,ack-kube-queue提供严格优先级调度功能。开启后队列将在正在运行的任务结束后从高优先级的最早提交的任务开始尝试执行,从而使得高优先级任务能够在集群获得空闲资源时优先调度。

说明

当高优先级任务未获得充足资源时,低优先级任务仍然可以获得资源执行。

开启方法

  1. 登录容器服务管理控制台,在左侧导航栏选择集群

  2. 集群列表页面,单击目标集群名称,然后在左侧导航栏,选择工作负载 > 无状态

  3. 选择命名空间kube-queue,在kube-queue-controller所在行,单击操作列的编辑

  4. 单击环境变量后面的新增,增加如下记录。

    配置项

    取值

    类型

    自定义

    变量名称

    StrictConsistency

    变量/变量引用

    true

  5. 单击页面右侧更新。在确认弹出框中单击确定

Quota资源使用示例

ElasticQuota

  1. 使用如下YAML,创建一个ElasticQuotaTree。

    展开查看YAML文件

    apiVersion: v1
    kind: List
    metadata:
      resourceVersion: ""
      selfLink: ""
    items:
      - apiVersion: scheduling.sigs.k8s.io/v1beta1
        kind: ElasticQuotaTree
        metadata:
          name: elasticquotatree
          namespace: kube-system
        spec:
          root: # root节点为总的资源数量,max的值不能小于Children的max之和。
            name: root
            namespaces: []
            max:
              cpu: "4"
              memory: 4Gi
              nvidia.com/gpu: "64"
              aliyun.com/gpu-mem: "32"
            min:
              cpu: "0"
              memory: 0M
              nvidia.com/gpu: "0"
              aliyun.com/gpu-mem: "0"
            children: # 可以有多个Child,一般每个Child对应一个Namespace。
              - name: root.defaultQuotaGroup
                namespaces:
                  - default
                max:
                  cpu: "4"
                  memory: 4Gi
                  nvidia.com/gpu: "64"
                  aliyun.com/gpu-mem: "16"
                min:
                  cpu: "0"
                  memory: 0M
                  nvidia.com/gpu: "0"
                  aliyun.com/gpu-mem: "0"
                children: null
  2. 执行如下命令,查看ElasticQuotaTree是否创建成功。

    kubectl get elasticquotatree -A

    预期输出:

    NAMESPACE     NAME               AGE
    kube-system   elasticquotatree   7s

    表示ElasticQuotaTree创建成功。

  3. 创建测试任务。

    说明
    • 为了测试任务队列效果,测试任务的资源配额应该小于执行所有任务需要的总资源。

    • 为了方便测试,TFJob用busybox镜像替代真正的tensorflow镜像。模拟训练过程,每个容器Sleep 30秒。

    1. 使用如下YAML,创建两个TFJob进行测试。

      展开查看YAML文件

      apiVersion: "kubeflow.org/v1"
      kind: "TFJob"
      metadata:
        name: "job1"
        annotations:
          scheduling.x-k8s.io/suspend: "true"
      spec:
        tfReplicaSpecs:
          PS:
            replicas: 1
            restartPolicy: Never
            template:
              spec:
                containers:
                  - name: tensorflow
                    image: busybox
                    command:
                      - /bin/sh
                      - -c
                      - --
                    args:
                      - "sleep 30s"
                    resources:
                      requests:
                        cpu: 1
                        memory: 1Gi
                      limits:
                        cpu: 1
                        memory: 1Gi
      
          Worker:
            replicas: 2
            restartPolicy: Never
            template:
              spec:
                containers:
                  - name: tensorflow
                    image: busybox
                    command:
                      - /bin/sh
                      - -c
                      - --
                    args:
                      - "sleep 30s"
                    resources:
                      requests:
                        cpu: 1
                        memory: 1Gi
                      limits:
                        cpu: 1
                        memory: 1Gi
      --
      apiVersion: "kubeflow.org/v1"
      kind: "TFJob"
      metadata:
        name: "job2"
        annotations:
          scheduling.x-k8s.io/suspend: "true"
      spec:
        tfReplicaSpecs:
          PS:
            replicas: 1
            restartPolicy: Never
            template:
              spec:
                containers:
                  - name: tensorflow
                    image: busybox
                    command:
                      - /bin/sh
                      - -c
                      - --
                    args:
                      - "sleep 30s"
                    resources:
                      requests:
                        cpu: 1
                        memory: 1Gi
                      limits:
                        cpu: 1
                        memory: 1Gi
      
          Worker:
            replicas: 2
            restartPolicy: Never
            template:
              spec:
                containers:
                  - name: tensorflow
                    image: busybox
                    command:
                      - /bin/sh
                      - -c
                      - --
                    args:
                      - "sleep 30s"
                    resources:
                      requests:
                        cpu: 1
                        memory: 1Gi
                      limits:
                        cpu: 1
                        memory: 1Gi
    2. 待任务提交后,查看任务状态。

      1. 执行如下命令,查看任务状态。

        kubectl get tfjob

        预期输出:

        NAME   STATE     AGE
        job1   Running   3s
        job2   Queuing   2s

        预期输出显示job1处于Running状态,job2处于Queuing状态,符合预期。因为每个TFJob训练任务需要的CPU资源为3 Core,ElasticQuotaTree给Default Namespace分配的最大CPU核数为4 Core,所以同一时间段内只能运行一个TFJob。

      2. 稍等片刻后,再次执行如下命令。

        kubectl get tfjob

        预期输出:

        NAME   STATE       AGE
        job1   Succeeded   77s
        job2   Running     77s

        预期输出显示job1执行成功。job1执行完成后,job2开始执行。表明ack-kube-queue可以正常工作。

ResourceQuota

  1. 使用如下YAML,创建一个ResourceQuota。

    apiVersion: v1
    kind: ResourceQuota
    metadata:
      name: default
    spec:
      hard:
        cpu: "4"
        memory: 4Gi
  2. 执行如下命令,查看ResourceQuota是否创建成功。

    kubectl get resourcequota default -o wide

    预期输出:

    NAME      AGE   REQUEST                   LIMIT
    default   76s   cpu: 0/4, memory: 0/4Gi

    表示ResourceQuota创建成功。

  3. 使用如下YAML,创建两个TFJob进行测试。

    展开查看YAML文件

    apiVersion: "kubeflow.org/v1"
    kind: "TFJob"
    metadata:
      name: "job1"
      annotations:
        scheduling.x-k8s.io/suspend: "true"
    spec:
      tfReplicaSpecs:
        PS:
          replicas: 1
          restartPolicy: Never
          template:
            spec:
              containers:
                - name: tensorflow
                  image: busybox:stable
                  command:
                    - /bin/sh
                    - -c
                    - --
                  args:
                    - "sleep 30s"
                  resources:
                    requests:
                      cpu: 1
                      memory: 1Gi
                    limits:
                      cpu: 1
                      memory: 1Gi
    
        Worker:
          replicas: 2
          restartPolicy: Never
          template:
            spec:
              containers:
                - name: tensorflow
                  image: busybox:stable
                  command:
                    - /bin/sh
                    - -c
                    - --
                  args:
                    - "sleep 30s"
                  resources:
                    requests:
                      cpu: 1
                      memory: 1Gi
                    limits:
                      cpu: 1
                      memory: 1Gi
    --
    apiVersion: "kubeflow.org/v1"
    kind: "TFJob"
    metadata:
      name: "job2"
      annotations:
        scheduling.x-k8s.io/suspend: "true"
    spec:
      tfReplicaSpecs:
        PS:
          replicas: 1
          restartPolicy: Never
          template:
            spec:
              containers:
                - name: tensorflow
                  image: busybox:stable
                  command:
                    - /bin/sh
                    - -c
                    - --
                  args:
                    - "sleep 30s"
                  resources:
                    requests:
                      cpu: 1
                      memory: 1Gi
                    limits:
                      cpu: 1
                      memory: 1Gi
    
        Worker:
          replicas: 2
          restartPolicy: Never
          template:
            spec:
              containers:
                - name: tensorflow
                  image: busybox:stable
                  command:
                    - /bin/sh
                    - -c
                    - --
                  args:
                    - "sleep 30s"
                  resources:
                    requests:
                      cpu: 1
                      memory: 1Gi
                    limits:
                      cpu: 1
                      memory: 1Gi
  4. 待任务提交后,执行如下命令,查看任务状态。

    kubectl get tfjob
    NAME   STATE     AGE
    job1   Running   5s
    job2   Queuing   5s
    
    kubectl get pods
    NAME            READY   STATUS    RESTARTS   AGE
    job1-ps-0       1/1     Running   0          8s
    job1-worker-0   1/1     Running   0          8s
    job1-worker-1   1/1     Running   0          8s

    可以看到job1处于Running状态,job2处于Queuing状态,这是符合预期的。因为每个TFJob训练任务需要的CPU资源为3 Core(1个参数服务器Pod,需要1 Core;和2个Worker Pod,均需要1 Core),而ResourceQuota给Default Namespace分配的最大CPU核数为 4 Core,所以同一时间段内只能运行一个TFJob。

  5. 稍等片刻后,再次执行如下命令。

    kubectl get tfjob
    NAME   STATE       AGE
    job1   Succeeded   77s
    job2   Running     77s
    
    kubectl get pods
    NAME            READY   STATUS      RESTARTS   AGE
    job1-worker-0   0/1     Completed   0          54s
    job1-worker-1   0/1     Completed   0          54s
    job2-ps-0       1/1     Running     0          22s
    job2-worker-0   1/1     Running     0          22s
    job2-worker-1   1/1     Running     0          21s

    预期输出表明,job1执行成功,执行成功后,job2开始执行,表明ack-kube-queue可以正常工作。

    限制同时出队的任务数量

    在一些应用能够自动进行伸缩的场景下,应用自身需要的资源量可能是无法估计的,这种场景下可以使用出队的任务数量对队列任务进行限制,声明对任务数量的限制需要在ElasticQuotaTree中以kube-queue/max-jobs为资源进行限制,限制后,该Quota下能够出队的QueueUnit的数量将不会超过该值乘以超卖比例。如以下的例子:

    展开查看YAML文件

    apiVersion: v1
    kind: List
    metadata:
      resourceVersion: ""
      selfLink: ""
    items:
      - apiVersion: scheduling.sigs.k8s.io/v1beta1
        kind: ElasticQuotaTree
        metadata:
          name: elasticquotatree
          namespace: kube-system
        spec:
          root: # root节点为总的资源数量,max的值不能小于Children的max之和。
            name: root
            namespaces: []
            max:
              kube-queue/max-jobs: 10
              cpu: "4"
              memory: 4Gi
              nvidia.com/gpu: "64"
              aliyun.com/gpu-mem: "32"
            min:
              cpu: "0"
              memory: 0M
              nvidia.com/gpu: "0"
              aliyun.com/gpu-mem: "0"
            children: # 可以有多个Child,一般每个Child对应一个Namespace。
              - name: root.defaultQuotaGroup
                namespaces:
                  - default
                max:
              		kube-queue/max-jobs: 10
                  cpu: "4"
                  memory: 4Gi
                  nvidia.com/gpu: "64"
                  aliyun.com/gpu-mem: "16"
                min:
                  cpu: "0"
                  memory: 0M
                  nvidia.com/gpu: "0"
                  aliyun.com/gpu-mem: "0"
                children: null