使用Spark Operator运行Spark作业

Apache Spark是一种专门用于大规模数据处理的计算引擎,广泛应用于数据分析和机器学习等场景。Spark Operator提供了一种在Kubernetes集群中自动化部署Spark作业和管理其生命周期的能力。本文介绍如何在ACK集群中使用Spark Operator运行Spark作业,帮助数据工程师快速高效地运行和管理大数据处理作业。

前提条件

Spark Operator介绍

Spark Operator专为在Kubernetes集群中运行Spark工作负载而设计,旨在自动化管理Spark作业的生命周期。通过SparkApplicationScheduledSparkApplication等CRD资源,您可以灵活提交和管理Spark作业。利用Kubernetes的自动扩展、健康检查和资源管理等特性,Spark Operator可以更有效地监控和优化Spark作业的运行。ACK基于社区组件kubeflow/spark-operator提供了ack-spark-operator组件,更多信息,请参见Spark Operator | Kubeflow

使用优势:

  • 简化管理:通过Kubernetes的声明式作业配置,自动化部署Spark作业并管理作业的生命周期。

  • 支持多租户:可利用Kubernetes的命名空间机制和资源配额机制进行用户粒度资源隔离和资源分配,并利用Kubernetes的节点选择机制保证 Spark工作负载可以获得专用的资源。

  • 弹性资源供给:利用ECI弹性容器实例或弹性节点池等弹性资源,可在业务高峰期快速获得大量弹性资源,平衡性能和成本。

适用场景:

  • 数据分析:数据科学家可以利用Spark进行交互式数据分析和数据清洗等。

  • 批量数据计算:运行定时批处理作业,处理大规模数据集。

  • 实时数据处理:Spark Streaming库提供了对实时数据进行流式处理的能力。

流程概述

本文将引导您完成以下步骤,帮助您了解如何使用Spark Operator在ACK集群上运行和管理Spark作业,从而有效地进行大数据处理。

  1. 部署ack-spark-operator组件:在ACK集群中安装Spark Operator,使其能够管理和运行Spark作业。

  2. 提交Spark作业:创建并提交一个Spark作业的配置文件,实现对特定数据处理任务的执行。

  3. 查看Spark作业:监控作业的运行状态,获取详细的执行信息和日志。

  4. 访问Spark Web UI:通过Web界面更直观地了解Spark作业执行情况。

  5. 更新Spark作业:根据需求调整作业配置,支持动态更新参数。

  6. 删除Spark作业:清理已完成或不再需要的Spark作业,避免产生预期外的费用。

步骤一:部署ack-spark-operator组件

  1. 登录容器服务管理控制台,在左侧导航栏选择市场 > 应用市场

  2. 应用市场页面单击应用目录页签,然后搜索并选中ack-spark-operator

  3. ack-spark-operator页面,单击一键部署

  4. 创建面板中,选择集群和命名空间,然后单击下一步

  5. 参数配置页面,设置相应参数,然后单击确定

    下表列出了部分配置参数的说明。完整的参数配置详情,您可以在ack-spark-operator页面中的配置项查看。

    参数

    描述

    示例值

    controller.replicas

    控制器副本数量。

    1(默认值)

    webhook.replicas

    Webhook副本数量。

    1(默认值)

    spark.jobNamespaces

    可运行Spark任务的命名空间列表。包含空字符串表示允许所有命名空间。多个命名空间使用英文半角逗号(,)隔开。

    • ["default"](默认值)

    • [""](所有命名空间)

    • ["ns1","ns2","ns3"](多个命名空间)

    spark.serviceAccount.name

    Spark作业会在spark.jobNamespaces指定的每个命名空间中自动创建名为spark-operator-spark的ServiceAccount和RBAC资源并进行相关授权。您可以自定义ServiceAccount名称,后续提交Spark作业时请指定自定义创建的ServiceAccount名称。

    spark-operator-spark(默认值)

步骤二:提交Spark作业

您可以通过创建SparkApplication清单文件,提交一个实际的Spark作业以进行数据处理。

  1. 创建如下SparkApplication清单文件,并保存为spark-pi.yaml

    apiVersion: sparkoperator.k8s.io/v1beta2
    kind: SparkApplication
    metadata:
      name: spark-pi
      namespace: default     # 需要确保命名空间在spark.jobNamespaces指定的命名空间列表中。
    spec:
      type: Scala
      mode: cluster
      image: registry-cn-hangzhou.ack.aliyuncs.com/ack-demo/spark:3.5.2
      imagePullPolicy: IfNotPresent
      mainClass: org.apache.spark.examples.SparkPi
      mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.2.jar
      arguments:
      - "1000"
      sparkVersion: 3.5.2
      driver:
        cores: 1
        coreLimit: 1200m
        memory: 512m
        serviceAccount: spark-operator-spark   # 如果您自定义了ServiceAccount名称,则需要进行相应修改。
      executor:
        instances: 1
        cores: 1
        coreLimit: 1200m
        memory: 512m
      restartPolicy:
        type: Never
  2. 执行以下命令,提交Spark作业。

    kubectl apply -f spark-pi.yaml

    预期输出如下。

    sparkapplication.sparkoperator.k8s.io/spark-pi created

步骤三:查看Spark作业

您可以通过以下命令获取Spark作业的运行状态、相关Pod信息及日志。

  1. 执行以下命令,查看Spark作业运行状态。

    kubectl get sparkapplication spark-pi

    预期输出如下。

    NAME       STATUS      ATTEMPTS   START                  FINISH       AGE
    spark-pi   SUBMITTED   1          2024-06-04T03:17:11Z   <no value>   15s
  2. 执行以下命令,指定标签sparkoperator.k8s.io/app-namespark-pi,查看Spark作业的Pod的运行状态。

    kubectl get pod -l sparkoperator.k8s.io/app-name=spark-pi

    预期输出如下。

    NAME                               READY   STATUS    RESTARTS   AGE
    spark-pi-7272428fc8f5f392-exec-1   1/1     Running   0          13s
    spark-pi-7272428fc8f5f392-exec-2   1/1     Running   0          13s
    spark-pi-driver                    1/1     Running   0          49s

    当Spark作业运行结束后,所有Executor Pod都将被Driver自动删除。

  3. 执行以下命令,查看Spark作业详细信息。

    kubectl describe sparkapplication spark-pi

    展开查看预期输出

    具体输出内容会根据当前作业运行状态而有所不同。

    Name:         spark-pi
    Namespace:    default
    Labels:       <none>
    Annotations:  <none>
    API Version:  sparkoperator.k8s.io/v1beta2
    Kind:         SparkApplication
    Metadata:
      Creation Timestamp:  2024-06-04T03:16:59Z
      Generation:          1
      Resource Version:    1350200
      UID:                 1a1f9160-5dbb-XXXX-XXXX-be1c1fda4859
    Spec:
      Arguments:
        1000
      Driver:
        Core Limit:  1200m
        Cores:       1
        Memory:           512m
        Service Account:  spark
      Executor:
        Core Limit:  1200m
        Cores:       1
        Instances:   1
        Memory:               512m
      Image:                  registry-cn-hangzhou.ack.aliyuncs.com/ack-demo/spark:3.5.2
      Image Pull Policy:      IfNotPresent
      Main Application File:  local:///opt/spark/examples/jars/spark-examples_2.12-3.5.2.jar
      Main Class:             org.apache.spark.examples.SparkPi
      Mode:                   cluster
      Restart Policy:
        Type:         Never
      Spark Version:  3.5.2
      Type:           Scala
    Status:
      Application State:
        State:  COMPLETED
      Driver Info:
        Pod Name:             spark-pi-driver
        Web UI Address:       172.XX.XX.92:0
        Web UI Port:          4040
        Web UI Service Name:  spark-pi-ui-svc
      Execution Attempts:     1
      Executor State:
        spark-pi-26c5XXXXX1408337-exec-1:  COMPLETED
      Last Submission Attempt Time:        2024-06-04T03:17:11Z
      Spark Application Id:                spark-0042dead12XXXXXX43675f09552a946
      Submission Attempts:                 1
      Submission ID:                       117ee161-3951-XXXX-XXXX-e7d24626c877
      Termination Time:                    2024-06-04T03:17:55Z
    Events:
      Type    Reason                     Age   From            Message
      ----    ------                     ----  ----            -------
      Normal  SparkApplicationAdded      91s   spark-operator  SparkApplication spark-pi was added, enqueuing it for submission
      Normal  SparkApplicationSubmitted  79s   spark-operator  SparkApplication spark-pi was submitted successfully
      Normal  SparkDriverRunning         61s   spark-operator  Driver spark-pi-driver is running
      Normal  SparkExecutorPending       56s   spark-operator  Executor [spark-pi-26c5XXXXX1408337-exec-1] is pending
      Normal  SparkExecutorRunning       53s   spark-operator  Executor [spark-pi-26c5XXXXX1408337-exec-1] is running
      Normal  SparkDriverCompleted       35s   spark-operator  Driver spark-pi-driver completed
      Normal  SparkApplicationCompleted  35s   spark-operator  SparkApplication spark-pi completed
      Normal  SparkExecutorCompleted     35s   spark-operator  Executor [spark-pi-26c5XXXXX1408337-exec-1] completed
  4. 执行以下命令,查看Driver Pod运行日志的最后20行。

    kubectl logs --tail=20 spark-pi-driver

    预期输出如下:

    24/05/30 10:05:30 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
    24/05/30 10:05:30 INFO DAGScheduler: ResultStage 0 (reduce at SparkPi.scala:38) finished in 7.942 s
    24/05/30 10:05:30 INFO DAGScheduler: Job 0 is finished. Cancelling potential speculative or zombie tasks for this job
    24/05/30 10:05:30 INFO TaskSchedulerImpl: Killing all running tasks in stage 0: Stage finished
    24/05/30 10:05:30 INFO DAGScheduler: Job 0 finished: reduce at SparkPi.scala:38, took 8.043996 s
    Pi is roughly 3.1419522314195225
    24/05/30 10:05:30 INFO SparkContext: SparkContext is stopping with exitCode 0.
    24/05/30 10:05:30 INFO SparkUI: Stopped Spark web UI at http://spark-pi-1e18858fc8f56b14-driver-svc.default.svc:4040
    24/05/30 10:05:30 INFO KubernetesClusterSchedulerBackend: Shutting down all executors
    24/05/30 10:05:30 INFO KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asking each executor to shut down
    24/05/30 10:05:30 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed.
    24/05/30 10:05:30 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
    24/05/30 10:05:30 INFO MemoryStore: MemoryStore cleared
    24/05/30 10:05:30 INFO BlockManager: BlockManager stopped
    24/05/30 10:05:30 INFO BlockManagerMaster: BlockManagerMaster stopped
    24/05/30 10:05:30 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
    24/05/30 10:05:30 INFO SparkContext: Successfully stopped SparkContext
    24/05/30 10:05:30 INFO ShutdownHookManager: Shutdown hook called
    24/05/30 10:05:30 INFO ShutdownHookManager: Deleting directory /var/data/spark-14ed60f1-82cd-4a33-b1b3-9e5d975c5b1e/spark-01120c89-5296-4c83-8a20-0799eef4e0ee
    24/05/30 10:05:30 INFO ShutdownHookManager: Deleting directory /tmp/spark-5f98ed73-576a-41be-855d-dabdcf7de189

步骤四:访问Spark Web UI

Spark作业提供了Web UI,可以监控Spark作业的执行状态。通过使用kubectl port-forward命令,将端口转发到本地,来访问该Web UI界面。Web UI服务仅在Spark作业运行期间(即Driver Pod处于Running状态)可用,Spark作业结束后,Web UI将无法继续访问。

在部署ack-spark-operator组件时,controller.uiService.enable默认为true,会自动创建一个Service,您可以将其端口转发来访问Web UI,但如果在部署组件时将controller.uiService.enable设置为false,则不会创建Service,也可以通过转发Pod的端口访问Web UI。

重要

kubectl port-forward命令建立的端口转发仅适用于测试环境下的快速验证,不适合在生产环境中使用,使用时请注意安全风险。

  1. 根据情况选择通过Service或Pod来转发端口以访问Web UI。以下是相关的命令:

    • 执行以下命令,通过Service端口转发访问Web UI。

      kubectl port-forward services/spark-pi-ui-svc 4040
    • 执行以下命令,通过Pod端口转发访问Web UI。

      kubectl port-forward pods/spark-pi-driver 4040

      预期输出如下。

      Forwarding from 127.0.0.1:4040 -> 4040
      Forwarding from [::1]:4040 -> 4040
  2. 通过http://127.0.0.1:4040访问Web UI。

(可选)步骤五:更新Spark作业

如需修改Spark作业的参数,您可以更新Spark作业的清单文件。

  1. 编辑资源清单文件spark-pi.yaml,例如将作业参数arguments修改为10000executor数量修改为2

    apiVersion: sparkoperator.k8s.io/v1beta2
    kind: SparkApplication
    metadata:
      name: spark-pi
    spec:
      type: Scala
      mode: cluster
      image: registry-cn-hangzhou.ack.aliyuncs.com/ack-demo/spark:3.5.2
      imagePullPolicy: IfNotPresent
      mainClass: org.apache.spark.examples.SparkPi
      mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.2.jar
      arguments:
      - "10000"
      sparkVersion: 3.5.2
      driver:
        cores: 1
        coreLimit: 1200m
        memory: 512m
        serviceAccount: spark
      executor:
        instances: 2
        cores: 1
        coreLimit: 1200m
        memory: 512m
      restartPolicy:
        type: Never
  2. 执行以下命令,更新Spark作业。

    kubectl apply -f spark-pi.yaml
  3. 执行以下命令,查看Spark作业状态。

    kubectl get sparkapplication spark-pi

    Spark作业将再次开始运行。预期输出如下。

    NAME       STATUS    ATTEMPTS   START                  FINISH       AGE
    spark-pi   RUNNING   1          2024-06-04T03:37:34Z   <no value>   20m

(可选)步骤六:删除Spark作业

如果您已体验完本教程,Spark作业已无需使用,您可以通过以下命令操作释放相关资源。

执行以下命令,删除上述步骤中创建的Spark作业。

kubectl delete -f spark-pi.yaml

您也可以执行以下命令。

kubectl delete sparkapplication spark-pi