开发ODPS Spark任务

Spark on MaxCompute作业可通过Local模式、Cluster模式执行,此外,您也可在DataWorks中运行Spark on MaxCompute离线作业(Cluster模式),以便与其它类型执行节点集成和调度。本文为您介绍如何通过DataWorks实现Spark on MaxCompute作业的配置与调度。

前提条件

已创建ODPS Spark节点,详情请参见创建并管理MaxCompute节点

使用限制

当ODPS Spark节点选择Spark3.x版本时,若提交节点报错,请购买使用Serverless资源组,详情请参见新增和使用Serverless资源组

背景信息

Spark on MaxCompute是MaxCompute提供的兼容开源Spark的计算服务。它在统一的计算资源和数据集权限体系基础上,提供Spark计算框架,支持您以熟悉的开发使用方式提交运行Spark作业,满足更丰富的数据处理分析需求。在DataWorks中,您可通过ODPS Spark节点实现Spark on MaxCompute任务的调度运行,以及与其他作业的集成操作。

Spark on MaxCompute支持使用Java、Scala和Python语言进行开发,并通过Local、Cluster模式运行任务,在DataWorks中运行Spark on MaxCompute离线作业时采用Cluster模式执行。更多关于Spark on MaxCompute运行模式的介绍,详情请参见运行模式

准备工作

ODPS Spark节点支持使用Java/ScalaPython语言运行Spark on MaxCompute离线作业,不同语言开发步骤及配置界面存在差异,您可根据业务需要选择使用。

Java/Scala

在ODPS Spark节点执行Java或Scala语言类型代码前,您需先在本地开发好Spark on MaxCompute作业代码,再通过DataWorks上传为MaxCompute的资源。步骤如下:

  1. 准备开发环境。

    根据所使用系统类型,准备运行Spark on MaxCompute任务的开发环境,详情请参见搭建Linux开发环境搭建Windows开发环境

  2. 开发Java/Scala代码。

    在ODPS Spark节点执行Java或Scala语言类型代码前,需先在本地或已有环境开发好Spark on MaxCompute代码,建议使用Spark on MaxCompute提供的项目示例工程模板进行开发。

  3. 打包代码并上传至DataWorks。

    代码开发完成后,需将其打包,并通过DataWorks上传为MaxCompute资源,详情请参见创建并使用MaxCompute资源

Python(使用默认Python环境实现)

DataWorks可通过将代码在线写入DataWorks Python资源的方式,实现PySpark作业开发,并通过ODPS Spark节点提交运行该代码逻辑。DataWorks上创建Python资源,详情请参见创建并使用MaxCompute资源;PySpark开发示例,详情请参见PySpark开发示例

说明

该方式使用DataWorks提供的默认Python环境,可直接依赖的三方包有限,若默认环境无法满足PySpark作业第三方依赖包的需求,可参考下文《开发语言:Python(使用自定义Python环境实现)》方式,自行准备Python环境执行任务。当然,您也可选择对Python资源支持性更好的PyODPS 2节点PyODPS 3节点

Python(使用自定义Python环境实现)

若平台提供的默认Python环境无法满足您的业务需求,则可根据如下步骤自定义Python环境,执行Spark on MaxCompute任务。

  1. 本地准备Python环境。

    您可参考PySpark Python版本和依赖支持,根据业务需要配置可用的Python环境。

  2. 打包环境并上传至DataWorks。

    将Python环境压缩为一个Zip包,并通过DataWorks上传为MaxCompute资源,作为后续运行Spark on MaxCompute任务的执行环境。详情请参见创建并使用MaxCompute资源

配置项说明

DataWorks运行Spark on MaxCompute离线作业采用Cluster模式,在Cluster模式中,您需指定自定义程序入口mainmain运行结束(即状态为SuccessFail)时,对应的Spark作业便会结束。此外,spark-defaults.conf中的配置需逐条加到ODPS Spark节点配置项中。例如,Executor的数量、内存大小和spark.hadoop.odps.runtime.end.point的配置。

说明

您无需上传spark-defaults.conf文件,而是需将spark-defaults.conf文件中的配置都逐条加到ODPS Spark节点的配置项中。

Spark任务配置

参数

描述

对应的spark-submit命令

spark版本

包括Spark1.xSpark2.xSpark3.x版本。

说明

当ODPS Spark节点选择Spark3.x版本时,若提交节点报错,请购买使用Serverless资源组,详情请参见新增和使用Serverless资源组

语言

此处选择Java/ScalaPython。请根据实际Spark on MaxCompute开发语言进行选择。

选择主资源

指定任务所使用的主JAR资源文件或主Python资源。

此处的资源文件需提前上传至DataWorks并已提交,详情请参见创建并使用MaxCompute资源

app jar or Python file

配置项

指定提交作业时的配置项。其中:

  • spark.hadoop.odps.access.idspark.hadoop.odps.access.keyspark.hadoop.odps.end.point无需配置,默认为MaxCompute项目的值(有特殊原因可显式配置,将覆盖默认值)。

  • 您无需上传spark-defaults.conf文件,而是需将spark-defaults.conf中的配置逐条加到ODPS SPARK节点配置项中,例如Executor的数量、内存大小和spark.hadoop.odps.runtime.end.point的配置。

--conf PROP=VALUE

Main Class

配置主类名称。当开发语言为Java/Scala时,需要配置该参数。

--class CLASS_NAME

参数

您可根据需要添加参数,多个参数之间用空格分隔。DataWorks支持使用调度参数,此处参数配置格式为${变量名}。配置完成后需在右侧导航栏调度配置 > 参数处给变量赋值。

说明

调度参数支持的赋值格式请参见调度参数支持的格式

[app arguments]

选择其他资源

您可根据需要,选择使用如下资源。

  • jar资源:仅支持开发语言为Java/Scala时使用。

  • Python资源:仅支持开发语言为Python时使用。

  • file资源

  • archives资源:仅展示压缩类型的资源。

此处的资源文件需提前上传至DataWorks并已提交,详情请参见创建并使用MaxCompute资源

不同资源分别对应如下命令:

  • --jars JARS

  • --py-files PY_FILES

  • --files FILES

  • --archives ARCHIVES

编辑代码:简单示例

以下以一个简单示例为您介绍ODPS Spark节点的使用:判断一个字符串是否可以转换为数字。

  1. 创建资源。

    1. 数据开发页面新建Python类型的资源,并命名为spark_is_number.py,详情请参见创建并使用MaxCompute资源。代码如下:

      # -*- coding: utf-8 -*-
      import sys
      from pyspark.sql import SparkSession
      
      try:
          # for python 2
          reload(sys)
          sys.setdefaultencoding('utf8')
      except:
          # python 3 not needed
          pass
      
      if __name__ == '__main__':
          spark = SparkSession.builder\
              .appName("spark sql")\
              .config("spark.sql.broadcastTimeout", 20 * 60)\
              .config("spark.sql.crossJoin.enabled", True)\
              .config("odps.exec.dynamic.partition.mode", "nonstrict")\
              .config("spark.sql.catalogImplementation", "odps")\
              .getOrCreate()
      
      def is_number(s):
          try:
              float(s)
              return True
          except ValueError:
              pass
      
          try:
              import unicodedata
              unicodedata.numeric(s)
              return True
          except (TypeError, ValueError):
              pass
      
          return False
      
      print(is_number('foo'))
      print(is_number('1'))
      print(is_number('1.3'))
      print(is_number('-1.37'))
      print(is_number('1e3'))
    2. 保存并提交资源。

  2. 在已创建的ODPS Spark节点中,根据配置项说明配置节点参数和调度配置参数,并保存提交节点。

    配置项

    说明

    spark版本

    Spark2.x

    语言

    Python

    选择主python资源

    在下拉列表中选择上述已创建的python资源spark_is_number.py

  3. 进入开发环境的运维中心,执行补数据,具体操作请参见执行补数据并查看补数据实例(新版)

    说明

    由于数据开发中的ODPS Spark节点没有运行入口,因此您需要在开发环境的运维中心执行Spark任务。

  4. 查看返回结果。

    待补数据实例运行成功后,进入其运行日志的tracking URL中查看运行结果,如下:

    False
    True
    True
    True
    True

编辑代码:进阶示例

更多场景的Spark on MaxCompute任务开发,请参考:

后续步骤

当您完成当前节点的任务开发后,通常您可进行以下操作。

  • 调度配置:配置节点的周期性调度属性。任务需要周期性调度运行时,您需要设置节点后续实际运行过程中的重跑属性、调度依赖关系等,操作详情请参见任务调度属性配置概述

  • 任务调试:对当前节点的代码进行测试运行,确认代码逻辑符合预期,操作详情请参见任务调试流程

  • 任务发布:完成所有开发相关操作后,您需要将所有任务节点进行发布,发布后节点即会根据调度配置结果进行周期性运行,操作详情请参见发布任务

  • Spark作业诊断:MaxCompute为Spark作业提供Logview工具以及Spark Web-UI,您可通过作业日志检查作业是否已正常提交并执行。

  • Spark常见问题:您可了解Spark执行过程中的常见问题,便于出现异常时快速排查解决。