Serverless PySpark节点

更新时间:
复制为 MD 格式

DataWorks 提供 Serverless PySpark 节点,支持用户在无需管理集群基础设施的前提下,直接开发和运行基于 EMR Serverless Spark 的分布式 PySpark 任务。本文档介绍 Serverless PySpark 节点的适用范围、资源依赖配置、代码与提交命令编写规范等,帮助您高效完成任务开发与生产部署。

适用范围

新建节点

  1. 登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的数据开发与运维 > 数据开发,在下拉框中选择对应工作空间后单击进入数据开发

  2. 在数据开发(Data Studio)页面,新建 Serverless PySpark 节点。

  3. 配置节点路径名称。该示例中节点名称为 serverless_pyspark_test1。

开发节点

Serverless PySpark 节点采用双窗口协同编辑模式

  • 上层窗口为 Python 代码编辑区,用于编写核心业务逻辑(支持引用上传的资源文件,如 .py 模块);

  • 下层窗口为 Submit 命令编辑区,用于提交 spark-submit 命令至 EMR Serverless Spark。

该设计深度适配 EMR Serverless Spark 场景,系统将自动解析代码依赖、注入资源,并通过内置集成的 spark-submit 工具,将任务一键提交至 Serverless Spark 运行环境。关于 spark-submit 的参数规范、常见选项及最佳实践,可参考文档:通过spark-submit提交任务

以下示例基于蒙特卡洛方法,演示如何在 Serverless PySpark 节点中分布式估算圆周率 π 的近似值。

步骤一:上传依赖资源

将自定义 Python 文件(该示例中为utils.py)上传至 DataWorks 资源管理模块。上传后,可在节点中引用该资源。该 Python 文件定义了单个 Spark 执行任务中蒙特卡洛模拟采样的核心计算逻辑。

  1. 关于上传资源的具体操作,可以参见:EMR资源与函数。在数据开发资源管理页面,新建资源并选择资源类型为 EMR File,配置资源名称

  2. 单击重新上传,上传示例中的utils.py

  3. 选择存储路径数据源以及资源组,并单击保存按钮进行保存。

    image

步骤二:编写 Python 代码

在 Python 代码编辑窗口内,编写以下代码。该示例 Python 程序旨在通过集群并行计算来估算π的数值。程序首先将总计算量分解成若干子任务,并将它们分配给集群中的计算节点。在所有节点并行执行完毕后,程序会收集它们的计算结果,并通过聚合操作得出最终的π估算值。

说明

当前节点暂不支持通过图形界面直接引用资源。如需引用资源,请在 Python 代码中手动添加资源引用声明:##@resource_reference{"资源名称"}

##@resource_reference{"utils.py"}
from pyspark.sql import SparkSession
from utils import estimate_pi_in_task
import sys

def main():
    # 创建 SparkSession
    spark = SparkSession.builder.appName("EstimatePi").getOrCreate()

    sc = spark.sparkContext

    # 总采样数
    total_samples = int(sys.argv[1])
    num_partitions = ${test1}

    # 每个分区的样本数
    samples_per_partition = total_samples // num_partitions

    # 创建一个 RDD,每个分区执行一次 estimate_pi_in_task
    rdd = sc.parallelize(range(num_partitions), num_partitions)

    # 映射每个分区去执行采样任务
    inside_counts = rdd.map(lambda _: estimate_pi_in_task(samples_per_partition))

    # 聚合所有分区的结果
    total_inside = inside_counts.sum()
    pi_estimate = 4.0 * total_inside / total_samples

    print(f"总样本数: {total_samples}")
    print(f"落在圆内的样本数: {total_inside}")
    print(f"估算的 π 值: {pi_estimate:.6f}")

    spark.stop()

if __name__ == "__main__":
    main()
说明

仅支持将整个 Python 文件作为一个 Spark 作业提交执行,不支持选中部分代码运行。

关于脚本中参数的进一步说明:

参数

类型

取值

说明

sys.argv[1]

命令行参数

spark-submit 命令中脚本名后的值

表示总采样数(如 10000

${test1}

调度参数

运行/调度时由DataWorks动态替换为实际值

表示分区数(如100

步骤三:编写 spark-submit 代码

Submit命令编辑窗口内,编写以下代码。该命令通过 spark-submit 工具,将指定的 Python 脚本打包并提交至阿里云 EMR Serverless Spark 执行。

spark-submit \
 --py-files utils.py \
serverless_pyspark_test1.py 10000
重要
  • 文件名一致性:spark-submit 命令中指定的主 Python 脚本文件名须与当前节点的名称保持一致,并以 .py 为后缀。例如,若节点名称为 serverless_pyspark_test1,则命令中应使用 serverless_pyspark_test1.py

  • 依赖声明:若引用了外部 .py 文件,必须通过 --py-files 显式声明。

  • 关于 spark-submit 的参数规范、常见选项及最佳实践,可参考文档:通过spark-submit提交任务

运行节点

  1. 运行配置选择配置计算资源资源组

    配置项

    说明

    计算资源

    选择已绑定的 EMR Serverless Spark计算资源。若无可用计算资源,可在下拉框中选择新建计算资源

    资源组

    选择已绑定工作空间的资源组。

    脚本参数

    在配置节点内容时,通过${参数名}的方式定义变量,需要在脚本参数处配置本次运行值信息,任务运行时会将它动态替换为真实的取值。

    其中参数值会根据调度配置中的参数值进行同步。

    说明

    本次运行值仅在当前执行中生效。优先采用本次运行值,无则采用参数值

  2. 在节点编辑页面上方工具栏,单击运行,系统将合并完整 Python 脚本(含资源引用),并通过 spark-submit 提交至 EMR Serverless Spark,最后返回执行日志与结果。

    说明

    运行后,您还可登录 EMR Serverless Spark 控制台,进入对应的 Serverless Spark 工作空间,在 运维中心 > 任务历史 页面查看该任务的执行状态、耗时、资源使用情况,并通过 Spark UI 查看详细日志等。具体步骤可以参考:步骤五:查看Spark UI

后续步骤

  • 节点调度配置:若项目目录下的节点需要周期性调度执行,您需要在节点右侧的调度配置中设置调度策略,配置相关的调度属性。

  • 节点发布:若任务需要发布至生产环境执行,请单击界面image图标唤起发布流程,通过该流程将任务发布至生产环境。项目目录下的节点只有在发布至生产环境后,才会进行周期性调度。

  • 任务运维:任务发布后,您可以在运维中心查看周期任务的运行情况。详情请参见运维中心入门

相关文档