使用ECI弹性资源运行Spark作业

本文介绍如何在ACK集群中使用弹性容器实例ECI运行Spark作业。通过使用ECI弹性资源并配置合适的调度策略,您可以按需创建ECI Pod,并按资源使用量按需付费,从而有效减少资源闲置带来的成本浪费,进而更加经济高效地运行Spark作业。

前提条件

使用ECI运行Spark作业的优势

弹性容器实例ECI为ACK集群提供基础的容器Pod运行环境,通过动态调度Spark作业中的Driver和Executor Pod到ECI上,可以实现无服务器Spark作业执行。每个容器实例通过轻量级虚拟化安全沙箱技术实现完全隔离,确保互不干扰。

在ECI上运行Spark作业具备以下优势:

  • 超大容量:集群无需额外配置即可轻松获得最多50,000个Pod容量,无需提前规划容量。

  • 秒级弹性:始终确保在极短时间内创建出数千Pod,无需担心突发业务流量因Pod创建时延受到影响。

  • 节约成本:ECI Pod按需创建并按量计费,避免资源闲置造成的浪费。支持Spot实例及多种实例组合,进一步降低成本。

配置ECS和ECI资源使用策略

您可以通过配置污点(Taints)容忍(Tolerations)节点亲和性(nodeAffinity)来声明只使用ECS资源或者ECI弹性资源,您还可以优先使用ECS资源,当ECS资源不足时自动申请ECI资源。关于如何指定ECS和ECI的资源分配,请参见指定ECS和ECI的资源分配。以下是配置示例。

只使用ECS资源

ECI节点默认会被标记污点virtual-kubelet.io/provider=alibabacloud:NoSchedule,默认情况下不会使用ECI资源。以下的SparkApplication将只使用到ECS资源。

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: spark-pi-ecs-only
  namespace: default
spec:
  type: Scala
  mode: cluster
  image: registry-cn-hangzhou.ack.aliyuncs.com/ack-demo/spark:3.5.2
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.2.jar
  arguments: 
  - "5000"
  sparkVersion: 3.5.2
  driver:
    cores: 1
    coreLimit: 1200m
    memory: 512m
    serviceAccount: spark-operator-spark
  executor:
    instances: 2
    cores: 2
    memory: 4g

只使用ECI弹性资源

通过容忍ECI节点上的污点使得作业可以调度到ECI上执行,并通过添加节点亲和性使得作业只使用ECI弹性资源。virtual-kubelet.io/provider=alibabacloud:NoSchedule的污点,从而只使用ECI弹性资源。以下的SparkApplication将只使用ECI弹性资源。

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: spark-pi-eci-only
  namespace: default
spec:
  type: Scala
  mode: cluster
  image: registry-cn-hangzhou.ack.aliyuncs.com/ack-demo/spark:3.5.2
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.2.jar
  arguments: 
  - "5000"
  sparkVersion: 3.5.2
  driver:
    cores: 1
    coreLimit: 1200m
    memory: 512m
    serviceAccount: spark-operator-spark
    affinity:
      nodeAffinity:
        # 调度到ECI节点上
        requiredDuringSchedulingIgnoredDuringExecution:
          nodeSelectorTerms:
          - matchExpressions:
            - key: type
              operator: In
              values:
              - virtual-kubelet
    tolerations:
    # 容忍ECI污点
    - key: virtual-kubelet.io/provider
      operator: Equal
      value: alibabacloud
      effect: NoSchedule
  executor:
    instances: 2
    cores: 2
    memory: 4g
    affinity:
      nodeAffinity:
        # 调度到ECI节点上。
        requiredDuringSchedulingIgnoredDuringExecution:
          nodeSelectorTerms:
          - matchExpressions:
            - key: type
              operator: In
              values:
              - virtual-kubelet
    tolerations:
    # 容忍ECI污点。
    - key: virtual-kubelet.io/provider
      operator: Equal
      value: alibabacloud
      effect: NoSchedule

优先使用ECS资源,当ECS资源不足时自动申请ECI资源

一种常见的弹性策略是优先使用ECS资源,当ECS资源不足时,自动使用ECI弹性资源。这种策略可以减少常驻资源闲置造成的浪费,还能在业务流量高峰期提供额外的弹性资源。下面的SparkApplication示例展示了如何配置这种弹性策略。

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: spark-pi-ecs-first
  namespace: default
spec:
  type: Scala
  mode: cluster
  image: registry-cn-hangzhou.ack.aliyuncs.com/ack-demo/spark:3.5.2
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.2.jar
  arguments: 
  - "5000"
  sparkVersion: 3.5.2
  driver:
    cores: 1
    coreLimit: 1200m
    memory: 512m
    serviceAccount: spark-operator-spark
    affinity:
      nodeAffinity:
        # 优先调度到非ECI节点上。
        preferredDuringSchedulingIgnoredDuringExecution:
        - weight: 1
          preference:
            matchExpressions:
            - key: type
              operator: NotIn
              values:
              - virtual-kubelet
    tolerations:
    # 容忍ECI污点。
    - key: virtual-kubelet.io/provider
      operator: Equal
      value: alibabacloud
      effect: NoSchedule
  executor:
    instances: 2
    cores: 2
    memory: 4g
    affinity:
      nodeAffinity:
        # 优先调度到非 ECI 节点上
        preferredDuringSchedulingIgnoredDuringExecution:
        - weight: 1
          preference:
            matchExpressions:
            - key: type
              operator: NotIn
              values:
              - virtual-kubelet
    tolerations:
    # 容忍 ECI 污点
    - key: virtual-kubelet.io/provider
      operator: Equal
      value: alibabacloud
      effect: NoSchedule

自定义弹性资源优先级调度

ACK调度器支持自定义弹性资源优先级调度策略。通过自定义ResourcePolicy,您可以设置Pod在不同类型节点上的调度优先顺序。详细信息,请参见自定义弹性资源优先级调度

  1. 创建如下示例ResourcePolicy清单文件并保存为resourcepolicy.yaml,该文件定义了每个Spark作业的Pod按照一定的策略去使用ECS/ECI资源。resourcepolicy.yaml的ResourcePolicy清单文件,用于定义Spark作业启动Pod的资源调度策略。

    apiVersion: scheduling.alibabacloud.com/v1alpha1
    kind: ResourcePolicy
    metadata:
      name: sparkapplication-resource-policy
      namespace: default                      # 指定调度策略仅作用于命名空间default中的Pod。
    spec:
      ignorePreviousPod: true     
      ignoreTerminatingPod: false     
      matchLabelKeys:
      - sparkoperator.k8s.io/submission-id    # 根据Spark作业的Submission ID进行Pod计数分组。
      preemptPolicy: AfterAllUnits            # 调度策略中的抢占策略,表示在所有调度单元消耗完之前不进行抢占。
      selector:                               # 指定调度策略作用于同一命名空间中的哪些Pod。
        sparkoperator.k8s.io/launched-by-spark-operator: "true"  # 仅作用于由Spark Operator启动的Pod。
      strategy: prefer              
      units:                                 # 定义了两个调度单元。调度单元的顺序决定资源扩缩容的顺序:扩容按顺序进行,缩容按逆序进行。
      - max: 2                               # 第一个单元:最多调度2个Pod至具有kubernetes.io/arch=amd64标签ECS节点。
        resource: ecs               
        nodeSelector:
          kubernetes.io/arch: amd64  
      - max: 3                               # 第二个单元:最多调度3个Pod至ECI节点。
        resource: eci      
  2. 执行以下命令,创建一个仅作用于Spark作业启动Pod的调度策略。

    kubectl apply -f resourcepolicy.yaml
  3. 创建如下SparkApplication清单文件,并保存为spark-pi.yaml

    在资源充足的情况下,调度策略的结果将为1个Driver Pod和1个Executor Pod被调度到AMD 64架构的ECS节点,3个Executor Pod被调度到ECI节点,另有1个Executor Pod因超出最大数量限制而处于Pending状态。

    apiVersion: sparkoperator.k8s.io/v1beta2
    kind: SparkApplication
    metadata:
      name: spark-pi
      namespace: default
    spec:
      type: Scala
      mode: cluster
      image: registry-cn-hangzhou.ack.aliyuncs.com/ack-demo/spark:3.5.2
      mainClass: org.apache.spark.examples.SparkPi
      mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.2.jar
      arguments: 
      - "5000"
      sparkVersion: 3.5.2
      driver:
        cores: 1
        coreLimit: 1200m
        memory: 512m
        serviceAccount: spark-operator-spark
        tolerations:
        - key: virtual-kubelet.io/provider       # 容忍ECI污点。
          operator: Equal
          value: alibabacloud
          effect: NoSchedule
      executor:
        instances: 5
        cores: 1
        coreLimit: 1200m
        memory: 512m
        tolerations: 
        - key: virtual-kubelet.io/provider         # 容忍ECI污点。
          operator: Equal
          value: alibabacloud
          effect: NoSchedule
  4. 执行以下命令,提交Spark作业。

    kubectl apply -f spark-pi.yaml
  5. 查看Pod调度情况确实符合预期。

    kubectl get pods  -o wide -l sparkoperator.k8s.io/app-name=spark-pi
    NAME                                        READY   STATUS      RESTARTS   AGE       IP                  NODE                          
    spark-pi-34c0998f9f832e61-exec-1            1/1     Running     0          28s       192.XXX.XX.34       cn-beijing.192.XXX.XX.250       
    spark-pi-34c0998f9f832e61-exec-2            1/1     Running     0          28s       192.XXX.XX.87       virtual-kubelet-cn-beijing-i   
    spark-pi-34c0998f9f832e61-exec-3            1/1     Running     0          28s       192.XXX.XX.88       virtual-kubelet-cn-beijing-i   
    spark-pi-34c0998f9f832e61-exec-4            1/1     Running     0          28s       192.XXX.XX.86       virtual-kubelet-cn-beijing-i   
    spark-pi-34c0998f9f832e61-exec-5            0/1     Pending     0          28s       <none>              <none>                         
    spark-pi-driver                             1/1     Running     0          34s       192.XXX.XX.37       cn-beijing.192.XXX.XXX.250       

使用ImageCache加速镜像拉取

ECI支持镜像缓存功能,能够加速镜像拉取,从而提升Pod创建速度。更多信息,请参见使用ImageCache加速创建ECI Pod

下文以SparkApplication为例,比较正常情况下的拉取时间与使用镜像缓存时的拉取时间,以及在实际环境如何使用自动创建和匹配镜像缓存。

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: spark-pi
  namespace: default
spec:
  type: Scala
  mode: cluster
  image: registry-cn-hangzhou.ack.aliyuncs.com/ack-demo/spark:3.5.2
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.2.jar
  arguments: 
  - "5000"
  sparkVersion: 3.5.2
  driver:
    cores: 1
    coreLimit: 1200m
    memory: 512m
    serviceAccount: spark-operator-spark
  executor:
    instances: 2
    cores: 2
    memory: 4g
  • 第一次提交作业(未使用镜像缓存)

    在不使用镜像缓存的情况下提交该作业,然后查看Driver Pod的事件。

    kubectl describe pod spark-pi-driver
    Events:
      ...
      Warning  ImageCacheMissed       24m   EciService         [eci.imagecache]Missed image cache.
      Normal   ImageCacheAutoCreated  24m   EciService         [eci.imagecache]Image cache imc-2zeXXXXXXXXXXXXXXXXX is auto created
      Normal   Pulling                24m   kubelet            Pulling image "registry-cn-hangzhou.ack.aliyuncs.com/ack-demo/spark:3.5.2"
      Normal   Pulled                 23m   kubelet            Successfully pulled image "registry-cn-hangzhou.ack.aliyuncs.com/ack-demo/spark:3.5.2" in 1m41.289s (1m41.289s including waiting)
      ...

    事件显示ImageCache未命中,并创建了新的ImageCache,拉取镜像总共花了100秒左右。

  • 第二次提交作业(使用镜像缓存)

    我们在Driver和Executor中添加如下注解以明确指定该镜像缓存。

    apiVersion: sparkoperator.k8s.io/v1beta2
    kind: SparkApplication
    metadata:
      name: spark-pi-eci-only
      namespace: default
    spec:
      type: Scala
      mode: cluster
      image: registry-cn-hangzhou.ack.aliyuncs.com/ack-demo/spark:3.5.2
      mainClass: org.apache.spark.examples.SparkPi
      mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.2.jar
      arguments: 
      - "5000"
      sparkVersion: 3.5.2
      driver:
        annotations:
          # 手动指定镜像缓存ID
          k8s.aliyun.com/eci-image-snapshot-id: imc-2zeXXXXXXXXXXXXXXXXX
        cores: 1
        coreLimit: 1200m
        memory: 512m
        serviceAccount: spark-operator-spark
        affinity:
          nodeAffinity:
            # 调度到ECI节点上
            requiredDuringSchedulingIgnoredDuringExecution:
              nodeSelectorTerms:
              - matchExpressions:
                - key: type
                  operator: In
                  values:
                  - virtual-kubelet
        tolerations:
        # 容忍ECI污点
        - key: virtual-kubelet.io/provider
          operator: Equal
          value: alibabacloud
          effect: NoSchedule
      executor:
        annotations:
          # 手动指定镜像缓存ID
          k8s.aliyun.com/eci-image-snapshot-id: imc-2zeXXXXXXXXXXXXXXXXX
        instances: 2
        cores: 2
        memory: 4g
        affinity:
          nodeAffinity:
            # 调度到ECI节点上
            requiredDuringSchedulingIgnoredDuringExecution:
              nodeSelectorTerms:
              - matchExpressions:
                - key: type
                  operator: In
                  values:
                  - virtual-kubelet
        tolerations:
        # 容忍ECI污点
        - key: virtual-kubelet.io/provider
          operator: Equal
          value: alibabacloud
          effect: NoSchedule

    运行作业并查看Driver Pod的事件日志。

     kubectl describe pod spark-pi-driver
    Events:
      ...
      Normal  SuccessfulHitImageCache  23s   EciService         [eci.imagecache]Successfully hit image cache imc-2zeXXXXXXXXXXXXXXXXX, eci will be scheduled with this image cache.
      Normal  Pulled                   4s    kubelet            Container image "registry-cn-hangzhou.ack.aliyuncs.com/ack-demo/spark:3.5.2" already present on machine
      ...

    事件显示镜像缓存成功命中,无需再次拉取镜像。

  • 自动创建和匹配镜像缓存

    在实际使用过程中,您可以在.spec.[driver|executor].annotations中添加注解k8s.aliyun.com/eci-image-cache: "true"以实现自动创建并匹配镜像缓存,而无需明确指定镜像缓存ID。

    apiVersion: sparkoperator.k8s.io/v1beta2
    kind: SparkApplication
    metadata:
      name: spark-pi-eci-only
      namespace: default
    spec:
      type: Scala
      mode: cluster
      image: registry-cn-hangzhou.ack.aliyuncs.com/ack-demo/spark:3.5.2
      mainClass: org.apache.spark.examples.SparkPi
      mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.2.jar
      arguments: 
      - "5000"
      sparkVersion: 3.5.2
      driver:
        annotations:
          # 自动创建并匹配镜像缓存
          k8s.aliyun.com/eci-image-cache: "true"
        cores: 1
        coreLimit: 1200m
        memory: 512m
        serviceAccount: spark-operator-spark
        affinity:
          nodeAffinity:
            # 调度到ECI节点上
            requiredDuringSchedulingIgnoredDuringExecution:
              nodeSelectorTerms:
              - matchExpressions:
                - key: type
                  operator: In
                  values:
                  - virtual-kubelet
        tolerations:
        # 容忍ECI污点
        - key: virtual-kubelet.io/provider
          operator: Equal
          value: alibabacloud
          effect: NoSchedule
      executor:
        annotations:
          # 自动创建并匹配镜像缓存
          k8s.aliyun.com/eci-image-cache: "true"
        instances: 2
        cores: 2
        memory: 4g
        affinity:
          nodeAffinity:
            # 调度到ECI节点上
            requiredDuringSchedulingIgnoredDuringExecution:
              nodeSelectorTerms:
              - matchExpressions:
                - key: type
                  operator: In
                  values:
                  - virtual-kubelet
        tolerations:
        # 容忍ECI污点
        - key: virtual-kubelet.io/provider
          operator: Equal
          value: alibabacloud
          effect: NoSchedule