DataWorks 提供 Serverless PySpark 节点,支持用户在无需管理集群基础设施的前提下,直接开发和运行基于 EMR Serverless Spark 的分布式 PySpark 任务。本文档介绍 Serverless PySpark 节点的适用范围、资源依赖配置、代码与提交命令编写规范等,帮助您高效完成任务开发与生产部署。
适用范围
计算资源限制:仅支持EMR Serverless Spark计算资源,需确保资源组和计算资源网络连通。
资源组限制:仅支持使用Serverless资源组运行该类型任务。
权限限制:主账号或已被添加至对应工作空间中,并具有开发或空间管理员角色权限的 RAM 用户。添加成员的操作详情请参见为工作空间添加空间成员。
新建节点
登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的,在下拉框中选择对应工作空间后单击进入数据开发。
在数据开发(Data Studio)页面,新建 Serverless PySpark 节点。
配置节点路径与名称。该示例中节点名称为 serverless_pyspark_test1。
开发节点
Serverless PySpark 节点采用双窗口协同编辑模式:
上层窗口为 Python 代码编辑区,用于编写核心业务逻辑(支持引用上传的资源文件,如
.py模块);下层窗口为 Submit 命令编辑区,用于提交 spark-submit 命令至 EMR Serverless Spark。
该设计深度适配 EMR Serverless Spark 场景,系统将自动解析代码依赖、注入资源,并通过内置集成的 spark-submit 工具,将任务一键提交至 Serverless Spark 运行环境。关于 spark-submit 的参数规范、常见选项及最佳实践,可参考文档:通过spark-submit提交任务。
以下示例基于蒙特卡洛方法,演示如何在 Serverless PySpark 节点中分布式估算圆周率 π 的近似值。
步骤一:上传依赖资源
将自定义 Python 文件(该示例中为utils.py)上传至 DataWorks 资源管理模块。上传后,可在节点中引用该资源。该 Python 文件定义了单个 Spark 执行任务中蒙特卡洛模拟采样的核心计算逻辑。
步骤二:编写 Python 代码
在 Python 代码编辑窗口内,编写以下代码。该示例 Python 程序旨在通过集群并行计算来估算π的数值。程序首先将总计算量分解成若干子任务,并将它们分配给集群中的计算节点。在所有节点并行执行完毕后,程序会收集它们的计算结果,并通过聚合操作得出最终的π估算值。
当前节点暂不支持通过图形界面直接引用资源。如需引用资源,请在 Python 代码中手动添加资源引用声明:##@resource_reference{"资源名称"}。
##@resource_reference{"utils.py"}
from pyspark.sql import SparkSession
from utils import estimate_pi_in_task
import sys
def main():
# 创建 SparkSession
spark = SparkSession.builder.appName("EstimatePi").getOrCreate()
sc = spark.sparkContext
# 总采样数
total_samples = int(sys.argv[1])
num_partitions = ${test1}
# 每个分区的样本数
samples_per_partition = total_samples // num_partitions
# 创建一个 RDD,每个分区执行一次 estimate_pi_in_task
rdd = sc.parallelize(range(num_partitions), num_partitions)
# 映射每个分区去执行采样任务
inside_counts = rdd.map(lambda _: estimate_pi_in_task(samples_per_partition))
# 聚合所有分区的结果
total_inside = inside_counts.sum()
pi_estimate = 4.0 * total_inside / total_samples
print(f"总样本数: {total_samples}")
print(f"落在圆内的样本数: {total_inside}")
print(f"估算的 π 值: {pi_estimate:.6f}")
spark.stop()
if __name__ == "__main__":
main()仅支持将整个 Python 文件作为一个 Spark 作业提交执行,不支持选中部分代码运行。
关于脚本中参数的进一步说明:
参数 | 类型 | 取值 | 说明 |
| 命令行参数 |
| 表示总采样数(如 |
| 调度参数 | 运行/调度时由DataWorks动态替换为实际值 | 表示分区数(如 |
步骤三:编写 spark-submit 代码
在Submit命令编辑窗口内,编写以下代码。该命令通过 spark-submit 工具,将指定的 Python 脚本打包并提交至阿里云 EMR Serverless Spark 执行。
spark-submit \
--py-files utils.py \
serverless_pyspark_test1.py 10000文件名一致性:
spark-submit命令中指定的主 Python 脚本文件名须与当前节点的名称保持一致,并以.py为后缀。例如,若节点名称为serverless_pyspark_test1,则命令中应使用serverless_pyspark_test1.py。依赖声明:若引用了外部
.py文件,必须通过--py-files显式声明。关于
spark-submit的参数规范、常见选项及最佳实践,可参考文档:通过spark-submit提交任务。
运行节点
在运行配置选择配置计算资源和资源组。
配置项
说明
计算资源
选择已绑定的 EMR Serverless Spark计算资源。若无可用计算资源,可在下拉框中选择新建计算资源。
资源组
选择已绑定工作空间的资源组。
脚本参数
在配置节点内容时,通过
${参数名}的方式定义变量,需要在脚本参数处配置本次运行值信息,任务运行时会将它动态替换为真实的取值。其中参数值会根据调度配置中的参数值进行同步。
说明本次运行值仅在当前执行中生效。优先采用本次运行值,无则采用参数值。
在节点编辑页面上方工具栏,单击运行,系统将合并完整 Python 脚本(含资源引用),并通过
spark-submit提交至 EMR Serverless Spark,最后返回执行日志与结果。说明运行后,您还可登录 EMR Serverless Spark 控制台,进入对应的 Serverless Spark 工作空间,在 运维中心 > 任务历史 页面查看该任务的执行状态、耗时、资源使用情况,并通过 Spark UI 查看详细日志等。具体步骤可以参考:步骤五:查看Spark UI。

图标唤起发布流程,通过该流程将任务发布至生产环境。项目目录下的节点只有在发布至生产环境后,才会进行周期性调度。