在ACK集群中自动化执行Ray Job

企业在管理集群资源时面临的主要挑战是任务量庞大而资源有限。为解决这一问题,需要优先将资源分配给关键部门或个人,并保持高度的灵活性以随时调整资源分配。本文将介绍如何提高企业集群资源的利用率,并通过统一的任务管理平台自动化处理来自不同部门的大量Ray Job,支持任务插队和动态优先级调整,确保高优先级任务能够优先获得资源。

为什么要在ACK集群中自动化执行Ray Job

  • 资源管理与优化。

    通过ACK集群配合Ray来自动调度任务,可以更灵活地管理和分配计算资源。例如,在给定的场景中,文本生成小组和视频生成小组被分配了不同的CPU核心数量。使用Ray可以在这些限定的资源范围内高效地安排作业,确保每个小组都能充分利用其分配到的资源,同时避免资源浪费或过度竞争。

  • 优先级控制。

    算法工程师的任务享有更高的执行优先级。通过在ACK上配置适当的策略,并结合Ray的调度机制,可以实现这种基于用户角色的任务优先级设置。这样既保证了关键研究工作的顺利进行,也使得实习生能够在不影响主要项目进度的前提下参与学习和实验。

  • 弹性伸缩。

    随着研究进展或项目需求的变化,可能需要调整各组可用的计算能力。利用ACK提供的弹性伸缩功能,可以根据实际负载动态增加或减少节点数目,而Ray则能够无缝适应这种变化,自动重新平衡工作负载,从而保持最优性能。

CPU的分配情况如下图所示。

image

为了解决配额管理和任务排序的问题,需在集群中建立资源配额机制任务调度系统

资源配额可以根据部门或个人设定,明确每个单位可使用的最大资源量或机器数量。在任务启动前,系统会自动检查该任务所属的资源配额是否足以满足其需求。只有当确认资源配额能够支持所需资源时,任务才会被分配到相应的计算资源上并开始执行。这样既保证了资源的有效利用,也确保了任务按照优先级合理安排。

环境准备

设置队列与资源

任务排队及编排系统应具备将来自不同部门和员工的任务分配至相应队列的能力,以便根据各部门设定的优先级进行管理。此外,系统需支持基于优先级的任务调度机制,而非简单的先进先出(FIFO)模式,从而确保紧急任务可以优先处理。

通过结合使用ACK Scheduler、Kube Queue和ElasticQuotaTree,ACK集群能够支持包括最大配额限制、弹性配额管理、多样化的排队策略以及队列资源回收机制在内的多种功能。这种综合方案全面满足了企业在任务队列系统方面的需求。

管理员、研究员、任务、Kube Queue、ACK Scheduler以及Controller的关联关系如下图所示。

image

ElasticQuotaTree设置建立完成,不同部门研究员便能够提交任务至集群。管理员需在集群中创建ElasticQuotaTree,以定义资源配额及队列结构。这些提交的任务将依据预设的资源配额被分配到不同的组别,并根据各队列特有的排队策略进行排序等待执行。当任务从队列中出队时,其状态管理由Controller负责,而具体的Pod调度则交由ACK Scheduler处理。若某出队任务长时间未能启动运行,Kube Queue会自动回收该任务并将其重新加入到待处理队列中。

设置资源配额体系

ElasticQuotaTree定义了集群内的配额信息,涵盖了配额的层级结构、与配额关联的资源量,以及配额所绑定的命名空间。当在这些命名空间下提交任务时,任务将自动归入相应命名空间的资源配额中。

为了构建符合企业需求的资源配额体系,可以向集群提交如下的ElasticQuotaTree配置

展开查看完整代码示例

apiVersion: scheduling.sigs.k8s.io/v1beta1
kind: ElasticQuotaTree
metadata:
  name: elasticquotatree # 当前仅支持单一ElasticQuotaTree
  namespace: kube-system # 只有kube-system下才会生效
spec:
  root:
    name: root 
    max:
      cpu: 100
      memory: 100Gi
    min:
      cpu: 100
      memory: 100Gi
    children:
    - name: group-text
      max:
        cpu: 40
        memory: 100Gi
      min:
        cpu: 40
        memory: 100Gi
      children:
      - name: algorithm-text
        max:
          cpu: 40
          memory: 100Gi
        min:
          cpu: 40
          memory: 100Gi
        namespaces: # 配置对应的Namespace
          - algorithm-text
      - name: intern-text
        max:
          cpu: 40
          memory: 100Gi
        min:
          cpu: 0
          memory: 0
        namespaces: # 配置对应的Namespace
          - intern-text
    - name: group-video
      max:
        cpu: 60
        memory: 100Gi
      min:
        cpu: 60
        memory: 100Gi
      children:
      - name: algorithm-video
        max:
          cpu: 60
          memory: 100Gi
        min:
          cpu: 60
          memory: 100Gi
        namespaces: # 配置对应的Namespace
          - algorithm-video
      - name: intern-video
        max:
          cpu: 60
          memory: 100Gi
        min:
          cpu: 0
          memory: 0
        namespaces: # 配置对应的Namespace
          - intern-video

ElasticQuotaTree是一种树形结构,其中每个节点通过max字段定义了资源配额的最大使用量,而min字段则指定了该节点的最低保障资源量。当某一资源配额的最低保障无法得到满足时,调度系统将尝试从那些实际占用超过其最低保障资源量的其他配额中回收资源来运行任务。特别是对于标记为intern-textintern-video的任务,由于它们的保障资源量设定为0,这意味着如果算法团队成员提交了需要立即处理的任务,并且此时有实习生的任务正在执行,那么可以通过抢占这些实习生任务所使用的资源来优先保证算法团队任务的执行,从而确保高优先级任务能够顺利进行。

设置管理队列优先级

在ElasticQuotaTree提交后,Kube Queue会在集群中创建相应的队列用于任务排队。每个叶子节点的配额对应于集群中的一个独立队列。当任务被提交到集群时,系统会依据任务所属的命名空间及资源配额配置中的命名空间信息,自动将任务分配至正确的队列内。如果需要指定特定的Quota,则可以在Ray Job和Ray Cluster资源上通过"quota.scheduling.alibabacloud.com/name"标签来声明所需使用的Quota名称。默认情况下,所有新建队列的优先级均为0;但用户可以通过调整kube-queue命名空间下queue资源对象的属性来自定义各个队列的优先级。Kube Queue采用轮询方式遍历各队列,并尝试从当前访问的队列头部取出一项任务执行调度操作。在此过程中,具有较高优先级的队列会被更早地检查,从而有机会优先利用可用资源。

此外,还可以通过修改Queue资源规格(Spec)来进一步管理队列的优先级设置。

apiVersion: scheduling.x-k8s.io/v1alpha1
kind: Queue
metadata:
  annotations:
    kube-queue/quota-fullname: root/group-video/algorithm-video
  creationTimestamp: "2024-10-08T06:43:07Z"
  generation: 1
  labels:
    create-by-kubequeue: "true"
  name: root-group-video-algorithm-video-algorithm-video
  namespace: kube-queue
  resourceVersion: "5766258"
  uid: 01342987-3dad-401b-8509-ef7250683377
spec:
  queuePolicy: Round
  # 通过在Queue的Spec中增加Priority=2的字段申明Queue具有2的优先级
  priority: 2

资源配额与队列的对应关系如下图所示。默认情况下创建的队列排队方式是轮转调度,更多支持的队列排队方式可以参考相关文档开启阻塞队列

image

在完成资源配额设置后,用户只需在提交任务时添加Suspend标记,即可使任务进入队列等待。对于Ray Job来说,只需将任务的.spec.suspend字段设为True。对于其他支持排队功能的任务类型,也应遵循类似的方法来启用排队,具体操作,请参见相关文档支持排队功能的任务类型。针对Ray Job,系统会在任务提交后,把Head Pod的资源需求与每个WorkerGroup的资源量(即副本数乘以单个Pod请求的资源量)相加,以此总和作为该任务所需的资源总量。

Kube Queue管理任务队列示例

以下将通过提交几个Ray Job来展示Kube Queue的功能。当Ray Job达到Succeeded状态时,即表示任务已完成,此时Kube Queue会允许后续任务开始执行。需要注意的是,新的RayCluster的Pod调度必须等待与已完成的Ray Job关联的Ray Cluster被销毁并释放其占用的资源后才能进行,代码示例如下所示。

展开查看完整代码示例

apiVersion: ray.io/v1
kind: RayJob
metadata:
  generateName: rayjob-sample-
  namespace: algorithm-text
spec:
  entrypoint: python /home/ray/samples/sample_code.py
  runtimeEnvYAML: |
    pip:
      - requests==2.26.0
      - pendulum==2.1.2
    env_vars:
      counter_name: "test_counter"

  # Suspend specifies whether the RayJob controller should create a RayCluster instance.
  # If a job is applied with the suspend field set to true, the RayCluster will not be created and we will wait for the transition to false.
  # If the RayCluster is already created, it will be deleted. In the case of transition to false, a new RayCluste rwill be created.
  suspend: true

  rayClusterSpec:
    rayVersion: '2.9.0' # should match the Ray version in the image of the containers
    headGroupSpec:
      rayStartParams:
        dashboard-host: '0.0.0.0'
      template:
        spec:
          containers:
            - name: ray-head
              image: rayproject/ray:2.9.0
              ports:
                - containerPort: 6379
                  name: gcs-server
                - containerPort: 8265 # Ray dashboard
                  name: dashboard
                - containerPort: 10001
                  name: client
              resources:
                limits:
                  cpu: "1"
                requests:
                  cpu: "1"
              volumeMounts:
                - mountPath: /home/ray/samples
                  name: code-sample
          volumes:
            # You set volumes at the Pod level, then mount them into containers inside that Pod
            - name: code-sample
              configMap:
                # Provide the name of the ConfigMap you want to mount.
                name: ray-job-code-sample
                # An array of keys from the ConfigMap to create as files
                items:
                  - key: sample_code.py
                    path: sample_code.py
    workerGroupSpecs:
      - replicas: 10
        groupName: small-group
        rayStartParams: {}
        template:
          spec:
            containers:
              - name: ray-worker # must consist of lower case alphanumeric characters or '-', and must start and end with an alphanumeric character (e.g. 'my-name',  or '123-abc'
                image: rayproject/ray:2.9.0
                lifecycle:
                  preStop:
                    exec:
                      command: [ "/bin/sh","-c","ray stop" ]
                resources:
                  limits:
                    cpu: "1"
                  requests:
                    cpu: "1"
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: ray-job-code-sample
  namespace: algorithm-text
data:
  sample_code.py: |
    import ray
    import os
    import requests

    ray.init()

    @ray.remote
    class Counter:
        def __init__(self):
            # Used to verify runtimeEnv
            self.name = os.getenv("counter_name")
            assert self.name == "test_counter"
            self.counter = 0

        def inc(self):
            self.counter += 1

        def get_counter(self):
            return "{} got {}".format(self.name, self.counter)

    counter = Counter.remote()

    for _ in range(5):
        ray.get(counter.inc.remote())
        print(ray.get(counter.get_counter.remote()))

    # Verify that the correct runtime env was used for the job.
    assert requests.__version__ == "2.26.0"

以下将提交了两个任务,每个任务都需要31核CPU的资源。在这种情况下,只有一个任务能够立即获得所需资源并开始运行,而另一个任务则会持续处于挂起状态。您可以通过以下命令检查队列中的Status字段来了解详细的排队情况。

# 从返回值中可以看到提交的两个任务中只有第一个任务处于初始化状态,第二个任务处于挂起等待状态
➜  kubequeue-doc kubectl get rayjob -n algorithm-text
NAME                  JOB STATUS   DEPLOYMENT STATUS   START TIME             END TIME   AGE
rayjob-sample-j87d8                                                                      4s
rayjob-sample-rhm9s                Initializing        2024-10-08T07:56:31Z              7s


# 用于查看当前集群中的队列,与上述的图片所对应
➜  kubequeue-doc kubectl -n kube-queue get queue
NAME                                               AGE
root-group-text-algorithm-text-algorithm-text      73m
root-group-text-intern-text-intern-text            73m
root-group-video-algorithm-video-algorithm-video   73m
root-group-video-intern-video-intern-video         73m


# 查看刚刚提交任务的队列,从status中可以看到,rayjob-sample-j87d8在backoff队列中,排队位置为1,意味着当前任务执行完后该任务即可开始执行
➜  kubequeue-doc kubectl -n kube-queue get queue  root-group-text-algorithm-text-algorithm-text -oyaml
apiVersion: scheduling.x-k8s.io/v1alpha1
kind: Queue
metadata:
  annotations:
    kube-queue/quota-fullname: root/group-text/algorithm-text
  creationTimestamp: "2024-10-08T06:43:07Z"
  generation: 1
  labels:
    create-by-kubequeue: "true"
  name: root-group-text-algorithm-text-algorithm-text
  namespace: kube-queue
  resourceVersion: "5802083"
  uid: 83a3bf55-cd96-4405-9629-7e37512ac4b6
spec:
  queuePolicy: Round
status:
  queueItemDetails:
    active: []
    backoff:
    - name: rayjob-sample-j87d8-ray-qu     # 下个命令中的name与其保持一致
      namespace: algorithm-text
      position: 1
      
 
# 查看 qu 资源的状态,可以看到失败原因是root/group-text/algorithm-text的资源配额超出了配置的最大资源配额。
➜  kubequeue-doc kubectl get queue -n algorithm-text rayjob-sample-j87d8-ray-qu -oyaml
...
status:
  lastUpdateTime: "2024-10-08T08:01:46Z"
  message: 'Insufficient quota(cpu) in quota root/group-text/algorithm-text: request
    31, max 40, used 31, oversellreate 1. Wait for running jobs to complete'
  phase: Enqueued

同样的如果此时实习生提交了一个作业,可以看到该作业也处于排队状态,并且提示信息会显示由于root/group-text的配额超过了最大限制,因此无法从队列中取出执行。您可以通过以下命令来了解详细的排队情况。

➜  kubequeue-doc kubectl get queue -n intern-text
NAME                         AGE
rayjob-sample-n5gzf-ray-qu   9s              # 下个命令中的name与其保持一致


➜  kubequeue-doc kubectl get queue -n intern-text rayjob-sample-n5gzf-ray-qu -oyaml
apiVersion: scheduling.x-k8s.io/v1alpha1
kind: QueueUnit
metadata:
  creationTimestamp: "2024-10-08T08:07:48Z"
  generation: 1
  name: rayjob-sample-n5gzf-ray-qu
  namespace: intern-text
  ownerReferences:
  - apiVersion: ray.io/v1
    kind: RayJob
    name: rayjob-sample-n5gzf
    uid: d44af490-b595-4876-9463-bd22ff826848
  resourceVersion: "5807774"
  uid: b9f7e900-ccf9-47cb-817b-8d4e61575369
spec:
  consumerRef:
    apiVersion: ray.io/v1
    kind: RayJob
    name: rayjob-sample-n5gzf
    namespace: intern-text
  podSet:
  - count: 1
    name: head
    template:
      metadata: {}
      spec:
        containers:
        - image: rayproject/ray:2.9.0
          name: ray-head
          ports:
          - containerPort: 6379
            name: gcs-server
            protocol: TCP
          - containerPort: 8265
            name: dashboard
            protocol: TCP
          - containerPort: 10001
            name: client
            protocol: TCP
          resources:
            limits:
              cpu: "1"
            requests:
              cpu: "1"
          volumeMounts:
          - mountPath: /home/ray/samples
            name: code-sample
        volumes:
        - configMap:
            items:
            - key: sample_code.py
              path: sample_code.py
            name: ray-job-code-sample
          name: code-sample
  - count: 30
    name: small-group
    template:
      metadata: {}
      spec:
        containers:
        - image: rayproject/ray:2.9.0
          lifecycle:
            preStop:
              exec:
                command:
                - /bin/sh
                - -c
                - ray stop
          name: ray-worker
          resources:
            limits:
              cpu: "1"
            requests:
              cpu: "1"
  resource:
    cpu: "31"
status:
  lastUpdateTime: "2024-10-08T08:08:03Z"
  message: 'Insufficient quota(cpu) in parent quota root/group-text: request 31, max
    40, used 31, oversellreate 1. Wait for running jobs to complete'
  phase: Enqueued

当任务 rayjob-sample-rhm9s 完成后,由于算法队列具有更高的优先级,其中的任务将优先获取资源并更早开始执行。

# 通过删除任务模拟任务执行完成
➜  kubequeue-doc kubectl delete rayjob -n algorithm-text rayjob-sample-rhm9s
rayjob.ray.io "rayjob-sample-rhm9s" deleted


# 算法队列中的任务开始执行
➜  kubequeue-doc kubectl get rayjob -n algorithm-text
NAME                  JOB STATUS   DEPLOYMENT STATUS   START TIME             END TIME   AGE
rayjob-sample-j87d8                Initializing        2024-10-08T08:24:09Z              27m


# 实习生队列中的任务继续挂起
➜  kubequeue-doc kubectl get rayjob -n intern-text
NAME                  JOB STATUS   DEPLOYMENT STATUS   START TIME   END TIME   AGE
rayjob-sample-n5gzf                                                            16m

指定节点池调度

在Ray中可以通过指定节点池来控制任务或Actor的调度位置,能够优化资源使用、提高性能或者满足特定的安全性和隔离性。以下是一些指定节点池调度的场景。

  • 资源需求匹配。

    如果您的应用中有一些任务对硬件有特殊需求(比如GPU计算),您可以将这些具有相应硬件配置的机器划分为一个单独的节点池,并且只在这个池内调度那些需要这种硬件的任务。

  • 安全隔离。

    对于处理敏感数据的任务,您可能希望它们运行在一个更加安全可控的环境中。通过创建一个专门用于此类工作的节点池,并确保只有经过严格审查的服务可以访问该池中的节点,就可以实现一定程度上的安全隔离。

  • 性能优化。

    某些类型的工作负载可能更适合部署到特定类型的基础设施上以获得最佳性能。例如,I/O密集型作业可能更适合拥有高速网络连接和SSD存储设备的服务器。在这种情况下,可以为这类作业分配专用的高性能节点池。

  • 成本控制。

    根据云服务厂商的不同定价策略,合理规划由不同价格等级实例组成的节点池可以帮助用户更好地管理成本。例如,将非关键任务安排到低成本但可能性能稍差的实例上执行。

  • 多租户环境。

    在一个共享集群里支持多个项目团队时,每个团队都希望能够独立地管理和优化自己的工作负载。这时,可以根据不同的团队创建对应的节点池,使得各团队能够更方便地进行资源规划与调整。

如果您希望将Pod部署到特定的节点池,可以通过在Head Pod和Work Group的Pod配置中设置NodeAffinity来实现。此外,您还可以通过定义Resource Policy为一组Pod指定调度优先级,例如优先调度至ECS实例,然后才是ECI或ACS节点。相关代码如下所示。

展开查看完整代码

apiVersion: ray.io/v1
kind: RayJob
metadata:
  generateName: rayjob-sample-
  namespace: algorithm-text
spec:
  entrypoint: python /home/ray/samples/sample_code.py
  runtimeEnvYAML: |
    pip:
      - requests==2.26.0
      - pendulum==2.1.2
    env_vars:
      counter_name: "test_counter"
  suspend: true
  shutdownAfterJobFinishes: true
  rayClusterSpec:
    rayVersion: '2.9.0' # should match the Ray version in the image of the containers
    headGroupSpec:
      rayStartParams:
        dashboard-host: '0.0.0.0'
      template:
        spec:
          # 申明仅在某个节点池里调度
          affinity:
            nodeAffinity:
              requiredDuringSchedulingIgnoredDuringExecution:
                nodeSelectorTerms:
                - matchExpressions:
                  - key: alibabacloud.com/nodepool-id
                    operator: In
                    values:
                    - np9c9b663eb55d44d0943009d5c3d32781
          containers:
            - name: ray-head
              image: rayproject/ray:2.9.0
              ports:
                - containerPort: 6379
                  name: gcs-server
                - containerPort: 8265 # Ray dashboard
                  name: dashboard
                - containerPort: 10001
                  name: client
              resources:
                limits:
                  cpu: "1"
                requests:
                  cpu: "1"
              volumeMounts:
                - mountPath: /home/ray/samples
                  name: code-sample
          volumes:
            - name: code-sample
              configMap:
                name: ray-job-code-sample
                items:
                  - key: sample_code.py
                    path: sample_code.py
    workerGroupSpecs:
      - replicas: 30
        groupName: small-group
        rayStartParams: {}
        template:
          spec:
            # 申明仅在某个节点池里调度
            affinity:
              nodeAffinity:
                requiredDuringSchedulingIgnoredDuringExecution:
                  nodeSelectorTerms:
                  - matchExpressions:
                    - key: alibabacloud.com/nodepool-id
                      operator: In
                      values:
                      - np9c9b663eb55d44d0943009d5c3d32781
            containers:
              - name: ray-worker
                image: rayproject/ray:2.9.0
                lifecycle:
                  preStop:
                    exec:
                      command: [ "/bin/sh","-c","ray stop" ]
                resources:
                  limits:
                    cpu: "1"
                  requests:
                    cpu: "1"
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: ray-job-code-sample
  namespace: algorithm-text
data:
  sample_code.py: |
    import ray
    import os
    import requests

    ray.init()

    @ray.remote
    class Counter:
        def __init__(self):
            # Used to verify runtimeEnv
            self.name = os.getenv("counter_name")
            assert self.name == "test_counter"
            self.counter = 0

        def inc(self):
            self.counter += 1

        def get_counter(self):
            return "{} got {}".format(self.name, self.counter)

    counter = Counter.remote()

    for _ in range(5):
        ray.get(counter.inc.remote())
        print(ray.get(counter.get_counter.remote()))

    # Verify that the correct runtime env was used for the job.
    assert requests.__version__ == "2.26.0"

    import time
    time.sleep(30)

另外,也可以配置Resource Policy,在节点池资源不足时自动将任务调度至Serverless资源上。

重要

当前ACS资源尚不支持与Gang调度机制同时使用。

apiVersion: scheduling.alibabacloud.com/v1alpha1
kind: ResourcePolicy
metadata:
  name: rayjob-sample-tnj27-raycluster-s6t8b-small-group
  namespace: algorithm-text
spec:
  selector:
    # 指定某个raycluster的某个workergroup需要优先级调度,ray.io/group对应groupName
    # ray.io/cluster对应特定rayCluster
    ray.io/group: small-group
    ray.io/cluster: rayjob-sample-tnj27-raycluster-s6t8b
    ray.io/node-type: worker
  strategy: prefer
  units:
  - nodeSelector:
      alibabacloud.com/nodepool-id: np9c9b663eb55d44d0943009d5c3d32781
    resource: ecs
  - resource: acs

设置Gang调度

当您面临需要利用多台计算机共同解决问题时,且希望简化这些复杂性带来的编程挑战时,在以下几种场景中,您可考虑使用Ray来设置节点协同调度。

  • 大规模机器学习训练。

    当需要处理非常大的数据集或复杂的模型时,单一机器可能无法提供足够的计算资源,这时需要一组容器协同工作。Gang调度策略能够确保这些容器同时调度,避免资源争抢和死锁,从而提高训练效率。

  • ‌MPI计算框架‌。

    MPI框架下的多线程并行计算需要主从进程协同工作。Gang调度策略能够确保这些进程同时调度,减少通信延迟,提升计算效率。

  • 数据处理与分析。

    对于那些需要处理海量数据的应用来说,比如日志分析、实时流处理等,多个任务可能需要同时运行以完成复杂的分析工作。Gang调度策略可以确保这些任务同时调度,提高整体处理速度。

  • 自定义分布式应用开发。

    在游戏服务器架构中实现玩家匹配服务;或者在物联网(IoT)项目里协调来自成千上万个设备的数据收集与处理工作。

如果您需要提交带有Gang调度需求的Ray Job,在ACK Pro以及ACK灵骏的环境中只需要在Ray Job的Metadata中申明ray.io/scheduler-name=kube-scheduler即可,提交任务之后rayoperator会在创建出Pod时注入Gang相关标记。

如果您需要在Ray Job中提交Gang调度需求,在ACK Pro及ACK灵骏环境中,您只需在Ray Job的Metadata中声明ray.io/scheduler-name=kube-scheduler。提交任务后,Ray Operator会在创建Pod时自动注入与Gang调度相关的标记。相关代码如下所示。

展开查看完整代码

apiVersion: ray.io/v1
kind: RayJob
metadata:
  generateName: rayjob-sample-
  namespace: algorithm-text
  labels:
    # 通过ray.io/scheduler-name指定Gang调度
    ray.io/scheduler-name: kube-scheduler
    # 通过quota.scheduling.alibabacloud.com/name指定Quota
    quota.scheduling.alibabacloud.com/name: algorithm-video
spec:
  entrypoint: python /home/ray/samples/sample_code.py
  runtimeEnvYAML: |
    pip:
      - requests==2.26.0
      - pendulum==2.1.2
    env_vars:
      counter_name: "test_counter"
  shutdownAfterJobFinishes: true
  # Suspend specifies whether the RayJob controller should create a RayCluster instance.
  # If a job is applied with the suspend field set to true, the RayCluster will not be created and we will wait for the transition to false.
  # If the RayCluster is already created, it will be deleted. In the case of transition to false, a new RayCluste rwill be created.
  suspend: true

  rayClusterSpec:
    rayVersion: '2.9.0' # should match the Ray version in the image of the containers
    headGroupSpec:
      rayStartParams:
        dashboard-host: '0.0.0.0'
      template:
        spec:
          containers:
            - name: ray-head
              image: rayproject/ray:2.9.0
              ports:
                - containerPort: 6379
                  name: gcs-server
                - containerPort: 8265 # Ray dashboard
                  name: dashboard
                - containerPort: 10001
                  name: client
              resources:
                limits:
                  cpu: "1"
                requests:
                  cpu: "1"
              volumeMounts:
                - mountPath: /home/ray/samples
                  name: code-sample
          volumes:
            # You set volumes at the Pod level, then mount them into containers inside that Pod
            - name: code-sample
              configMap:
                # Provide the name of the ConfigMap you want to mount.
                name: ray-job-code-sample
                # An array of keys from the ConfigMap to create as files
                items:
                  - key: sample_code.py
                    path: sample_code.py
    workerGroupSpecs:
      - replicas: 30
        groupName: small-group
        rayStartParams: {}
        template:
          spec:
            containers:
              - name: ray-worker # must consist of lower case alphanumeric characters or '-', and must start and end with an alphanumeric character (e.g. 'my-name',  or '123-abc'
                image: rayproject/ray:2.9.0
                lifecycle:
                  preStop:
                    exec:
                      command: [ "/bin/sh","-c","ray stop" ]
                resources:
                  limits:
                    cpu: "1"
                  requests:
                    cpu: "1"
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: ray-job-code-sample
  namespace: algorithm-text
data:
  sample_code.py: |
    import ray
    import os
    import requests

    ray.init()

    @ray.remote
    class Counter:
        def __init__(self):
            # Used to verify runtimeEnv
            self.name = os.getenv("counter_name")
            assert self.name == "test_counter"
            self.counter = 0

        def inc(self):
            self.counter += 1

        def get_counter(self):
            return "{} got {}".format(self.name, self.counter)

    counter = Counter.remote()

    for _ in range(5):
        ray.get(counter.inc.remote())
        print(ray.get(counter.get_counter.remote()))

    # Verify that the correct runtime env was used for the job.
    assert requests.__version__ == "2.26.0"

    import time
    time.sleep(30)

创建Pod时为其设置标签,以便于后续的识别、分组、筛选及管理操作。相关代码如下所示。

展开查看完整代码

apiVersion: v1
kind: Pod
metadata:
  annotations:
    ray.io/ft-enabled: "false"
  creationTimestamp: "2024-10-10T02:38:29Z"
  generateName: rayjob-sample-hhbdr-raycluster-ljj69-small-group-worker-
  labels:
    app.kubernetes.io/created-by: kuberay-operator
    app.kubernetes.io/name: kuberay
    # 带上了ACK需要的Coscheduling标记
    pod-group.scheduling.sigs.k8s.io/min-available: "31"
    pod-group.scheduling.sigs.k8s.io/name: rayjob-sample-hhbdr-raycluster-ljj69
    # 带上了ACK识别的指定Quota标记
    quota.scheduling.alibabacloud.com/name: algorithm-video
    ray.io/cluster: rayjob-sample-hhbdr-raycluster-ljj69
    ray.io/group: small-group
    ray.io/identifier: rayjob-sample-hhbdr-raycluster-ljj69-worker
    ray.io/is-ray-node: "yes"
    ray.io/node-type: worker
    scheduling.x-k8s.io/pod-group: rayjob-sample-hhbdr-raycluster-ljj69
  name: rayjob-sample-hhbdr-raycluster-ljj69-small-group-worker-xnzjh
  namespace: algorithm-text
  ownerReferences:
  - apiVersion: ray.io/v1
    blockOwnerDeletion: true
    controller: true
    kind: RayCluster
    name: rayjob-sample-hhbdr-raycluster-ljj69
    uid: 74259f20-86fd-4777-b826-73d201065931
  resourceVersion: "7482744"
  uid: 4f666efe-a25d-4620-824f-cab3f4fa0ce7
spec:
  containers:
  - args:
    - 'ulimit -n 65536; ray start  --address=rayjob-sample-hhbdr-raycluster-ljj69-head-svc.algorithm-text.svc.cluster.local:6379  --metrics-export-port=8080  --block  --dashboard-agent-listen-port=52365  --num-cpus=1 '
    command:
    - /bin/bash
    - -lc
    - --
    env:
    - name: FQ_RAY_IP
      value: rayjob-sample-hhbdr-raycluster-ljj69-head-svc.algorithm-text.svc.cluster.local
    - name: RAY_IP
      value: rayjob-sample-hhbdr-raycluster-ljj69-head-svc
    - name: RAY_CLUSTER_NAME
      valueFrom:
        fieldRef:
          apiVersion: v1
          fieldPath: metadata.labels['ray.io/cluster']
    - name: RAY_CLOUD_INSTANCE_ID
      valueFrom:
        fieldRef:
          apiVersion: v1
          fieldPath: metadata.name
    - name: RAY_NODE_TYPE_NAME
      valueFrom:
        fieldRef:
          apiVersion: v1
          fieldPath: metadata.labels['ray.io/group']
    - name: KUBERAY_GEN_RAY_START_CMD
      value: 'ray start  --address=rayjob-sample-hhbdr-raycluster-ljj69-head-svc.algorithm-text.svc.cluster.local:6379  --metrics-export-port=8080  --block  --dashboard-agent-listen-port=52365  --num-cpus=1 '
    - name: RAY_PORT
      value: "6379"
    - name: RAY_ADDRESS
      value: rayjob-sample-hhbdr-raycluster-ljj69-head-svc.algorithm-text.svc.cluster.local:6379
    - name: RAY_USAGE_STATS_KUBERAY_IN_USE
      value: "1"
    - name: REDIS_PASSWORD
    - name: RAY_DASHBOARD_ENABLE_K8S_DISK_USAGE
      value: "1"
    image: rayproject/ray:2.9.0
    imagePullPolicy: IfNotPresent
    lifecycle:
      preStop:
        exec:
          command:
          - /bin/sh
          - -c
          - ray stop
    livenessProbe:
      exec:
        command:
        - bash
        - -c
        - wget -T 2 -q -O- http://localhost:52365/api/local_raylet_healthz | grep
          success
      failureThreshold: 120
      initialDelaySeconds: 30
      periodSeconds: 5
      successThreshold: 1
      timeoutSeconds: 2
    name: ray-worker
    ports:
    - containerPort: 8080
      name: metrics
      protocol: TCP
    readinessProbe:
      exec:
        command:
        - bash
        - -c
        - wget -T 2 -q -O- http://localhost:52365/api/local_raylet_healthz | grep
          success
      failureThreshold: 10
      initialDelaySeconds: 10
      periodSeconds: 5
      successThreshold: 1
      timeoutSeconds: 2
    resources:
      limits:
        cpu: "1"
      requests:
        cpu: "1"
    terminationMessagePath: /dev/termination-log
    terminationMessagePolicy: File
    volumeMounts:
    - mountPath: /dev/shm
      name: shared-mem
    - mountPath: /var/run/secrets/kubernetes.io/serviceaccount
      name: kube-api-access-rq67v
      readOnly: true
  dnsPolicy: ClusterFirst
  enableServiceLinks: true
  initContainers:
  - args:
    - "\n\t\t\t\t\tSECONDS=0\n\t\t\t\t\twhile true; do\n\t\t\t\t\t\tif (( SECONDS
      <= 120 )); then\n\t\t\t\t\t\t\tif ray health-check --address rayjob-sample-hhbdr-raycluster-ljj69-head-svc.algorithm-text.svc.cluster.local:6379
      > /dev/null 2>&1; then\n\t\t\t\t\t\t\t\techo \"GCS is ready.\"\n\t\t\t\t\t\t\t\tbreak\n\t\t\t\t\t\t\tfi\n\t\t\t\t\t\t\techo
      \"$SECONDS seconds elapsed: Waiting for GCS to be ready.\"\n\t\t\t\t\t\telse\n\t\t\t\t\t\t\tif
      ray health-check --address rayjob-sample-hhbdr-raycluster-ljj69-head-svc.algorithm-text.svc.cluster.local:6379;
      then\n\t\t\t\t\t\t\t\techo \"GCS is ready. Any error messages above can be safely
      ignored.\"\n\t\t\t\t\t\t\t\tbreak\n\t\t\t\t\t\t\tfi\n\t\t\t\t\t\t\techo \"$SECONDS
      seconds elapsed: Still waiting for GCS to be ready. For troubleshooting, refer
      to the FAQ at https://github.com/ray-project/kuberay/blob/master/docs/guidance/FAQ.md.\"\n\t\t\t\t\t\tfi\n\t\t\t\t\t\tsleep
      5\n\t\t\t\t\tdone\n\t\t\t\t"
    command:
    - /bin/bash
    - -lc
    - --
    env:
    - name: FQ_RAY_IP
      value: rayjob-sample-hhbdr-raycluster-ljj69-head-svc.algorithm-text.svc.cluster.local
    - name: RAY_IP
      value: rayjob-sample-hhbdr-raycluster-ljj69-head-svc
    image: rayproject/ray:2.9.0
    imagePullPolicy: IfNotPresent
    name: wait-gcs-ready
    resources:
      limits:
        cpu: 200m
        memory: 256Mi
      requests:
        cpu: 200m
        memory: 256Mi
    terminationMessagePath: /dev/termination-log
    terminationMessagePolicy: File
    volumeMounts:
    - mountPath: /var/run/secrets/kubernetes.io/serviceaccount
      name: kube-api-access-rq67v
      readOnly: true
  preemptionPolicy: PreemptLowerPriority
  priority: 0
  restartPolicy: Always
  schedulerName: default-scheduler
  securityContext: {}
  serviceAccount: default
  serviceAccountName: default
  terminationGracePeriodSeconds: 30
  tolerations:
  - effect: NoExecute
    key: node.kubernetes.io/not-ready
    operator: Exists
    tolerationSeconds: 300
  - effect: NoExecute
    key: node.kubernetes.io/unreachable
    operator: Exists
    tolerationSeconds: 300
  volumes:
  - emptyDir:
      medium: Memory
    name: shared-mem
  - name: kube-api-access-rq67v
    projected:
      defaultMode: 420
      sources:
      - serviceAccountToken:
          expirationSeconds: 3607
          path: token
      - configMap:
          items:
          - key: ca.crt
            path: ca.crt
          name: kube-root-ca.crt
      - downwardAPI:
          items:
          - fieldRef:
              apiVersion: v1
              fieldPath: metadata.namespace
            path: namespace
status:
  conditions:
  - lastProbeTime: null
    lastTransitionTime: "2024-10-10T02:38:29Z"
    message: '0/7 nodes are available: 3 node(s) had untolerated taint {virtual-kubelet.io/provider:
      alibabacloud}, 4 Insufficient cpu. failed to get current scheduling unit not
      found, preemption: 0/7 nodes are available: 1 No victims found on node cn-hongkong.10.0.118.181
      for preemptor pod rayjob-sample-hhbdr-raycluster-ljj69-small-group-worker-xnzjh,
      1 No victims found on node cn-hongkong.10.1.0.52 for preemptor pod rayjob-sample-hhbdr-raycluster-ljj69-small-group-worker-xnzjh,
      1 No victims found on node cn-hongkong.10.2.0.24 for preemptor pod rayjob-sample-hhbdr-raycluster-ljj69-small-group-worker-xnzjh,
      1 No victims found on node cn-hongkong.10.2.0.5 for preemptor pod rayjob-sample-hhbdr-raycluster-ljj69-small-group-worker-xnzjh,
      3 Preemption is not helpful for scheduling., '
    reason: Unschedulable
    status: "False"
    type: PodScheduled
  phase: Pending
  qosClass: Burstable

当资源不足时,您可以通过事件(events)获取调度失败的详细信息。使用--field-selector='type=Warning,reason=GangFailedScheduling'可以过滤出与Gang相关的调度失败记录。此外,“cycle xx”表示不同轮次下的调度失败详情,具体说明了在该轮中Pod未能成功调度的原因。具体示例如下所示。

➜  kubequeue-doc kubectl get events -n algorithm-text --field-selector='type=Warning,reason=GangFailedScheduling' | grep "cycle 1"
5m48s       Warning   GangFailedScheduling   pod/rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-89mlq   rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-89mlq in gang failed to be scheduled in cycle 1: 0/0 nodes are available: 3 Insufficient cpu.
5m48s       Warning   GangFailedScheduling   pod/rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-8fwmr   rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-8fwmr in gang failed to be scheduled in cycle 1: 0/0 nodes are available: 3 Insufficient cpu.
5m48s       Warning   GangFailedScheduling   pod/rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-8g5wv   rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-8g5wv in gang failed to be scheduled in cycle 1: 0/0 nodes are available: 3 Insufficient cpu.
5m46s       Warning   GangFailedScheduling   pod/rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-8tn4w   rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-8tn4w in gang failed to be scheduled in cycle 1: 0/0 nodes are available: 3 Insufficient cpu.
5m48s       Warning   GangFailedScheduling   pod/rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-97gpk   rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-97gpk in gang failed to be scheduled in cycle 1: 0/0 nodes are available: 3 Insufficient cpu.
5m46s       Warning   GangFailedScheduling   pod/rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-9xsgw   rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-9xsgw in gang failed to be scheduled in cycle 1: 0/0 nodes are available: 3 Insufficient cpu.
5m48s       Warning   GangFailedScheduling   pod/rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-gwxhg   rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-gwxhg in gang failed to be scheduled in cycle 1: 0/0 nodes are available: 3 Insufficient cpu.
5m48s       Warning   GangFailedScheduling   pod/rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-jzw6k   rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-jzw6k in gang failed to be scheduled in cycle 1: 0/0 nodes are available: 3 Insufficient cpu.
5m48s       Warning   GangFailedScheduling   pod/rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-kb55s   rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-kb55s in gang failed to be scheduled in cycle 1: 0/0 nodes are available: 3 Insufficient cpu.
5m48s       Warning   GangFailedScheduling   pod/rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-lbvk7   rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-lbvk7 in gang failed to be scheduled in cycle 1: 0/0 nodes are available: 3 Insufficient cpu.
5m46s       Warning   GangFailedScheduling   pod/rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-ms96b   rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-ms96b in gang failed to be scheduled in cycle 1: 0/0 nodes are available: 3 Insufficient cpu.
5m48s       Warning   GangFailedScheduling   pod/rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-sgr9g   rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-sgr9g in gang failed to be scheduled in cycle 1: 0/0 nodes are available: 3 Insufficient cpu.
5m46s       Warning   GangFailedScheduling   pod/rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-svt6g   rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-svt6g in gang failed to be scheduled in cycle 1: 0/0 nodes are available: 3 Insufficient cpu.
5m48s       Warning   GangFailedScheduling   pod/rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-wm5c6   rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-wm5c6 in gang failed to be scheduled in cycle 1: 0/0 nodes are available: 3 Insufficient cpu.