DataWorks 提供 EMR PySpark 节点,支持用户在统一开发界面中协同编写 Python 业务逻辑与 spark-submit 提交命令,无缝对接阿里云 EMR 半托管集群以及全托管的 EMR Serverless Spark 。
适用范围
计算资源限制:EMR PySpark 节点支持以下两类计算资源:
EMR计算资源(半托管集群)的特定集群类型:仅支持 DataLake 集群 和 Custom 集群。
EMR Serverless Spark计算资源(全托管集群)。
重要如您需在 EMR 半托管集群(如 DataLake 集群或 Custom 集群)上运行 EMR PySpark 节点,请提交工单,并注明您的集群类型、EMR 版本及具体使用场景,我们将为您评估接入可行性并提供相应支持。
资源组限制:仅支持使用Serverless资源组。
权限限制:主账号或已被添加至对应工作空间中,并具有开发或空间管理员角色权限的 RAM 用户。添加成员的操作详情请参见为工作空间添加空间成员。
新建节点
登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的,在下拉框中选择对应工作空间后单击进入数据开发。
在数据开发(Data Studio)页面,新建 EMR PySpark 节点。
配置节点路径与名称。该示例中节点名称为 emr_pyspark_test。
开发节点
EMR PySpark 节点采用双窗口协同编辑模式:
上层窗口为 Python 代码编辑区,用于编写核心业务逻辑(支持引用上传的资源文件,如
.py模块);下层窗口为 Submit 命令编辑区,用于提交 spark-submit命令至 EMR 集群。
以下示例基于蒙特卡洛方法,演示如何在 EMR PySpark 节点中分布式估算圆周率 π 的近似值。
步骤一:上传依赖资源
将自定义 Python 文件(该示例中为utils.py)上传至 DataWorks 资源管理模块。上传后,可在节点中引用该资源。该 Python 文件定义了单个 Spark 执行任务中蒙特卡洛模拟采样的核心计算逻辑。
步骤二:编写 Python 代码
在 Python 代码编辑窗口内,编写以下代码。该示例 Python 程序旨在通过集群并行计算来估算π的数值。程序首先将总计算量分解成若干子任务,并将它们分配给集群中的计算节点。在所有节点并行执行完毕后,程序会收集它们的计算结果,并通过聚合操作得出最终的π估算值。
##@resource_reference{"utils.py"}
from pyspark.sql import SparkSession
from utils import estimate_pi_in_task
import sys
def main():
# 创建 SparkSession
spark = SparkSession.builder \
.appName("EstimatePi") \
.getOrCreate()
sc = spark.sparkContext
# 总采样数
total_samples = int(sys.argv[1])
num_partitions = ${test1}
# 每个分区的样本数
samples_per_partition = total_samples // num_partitions
# 创建一个 RDD,每个分区执行一次 estimate_pi_in_task
rdd = sc.parallelize(range(num_partitions), num_partitions)
# 映射每个分区去执行采样任务
inside_counts = rdd.map(lambda _: estimate_pi_in_task(samples_per_partition))
# 聚合所有分区的结果
total_inside = inside_counts.sum()
pi_estimate = 4.0 * total_inside / total_samples
print(f"总样本数: {total_samples}")
print(f"落在圆内的样本数: {total_inside}")
print(f"估算的 π 值: {pi_estimate:.6f}")
spark.stop()
if __name__ == "__main__":
main()仅支持将整个 Python 文件作为一个 Spark 作业提交执行,不支持选中部分代码运行。
关于脚本中参数的进一步说明:
参数 | 类型 | 取值 | 说明 |
| 命令行参数 |
| 表示总采样数(如 |
| 调度参数 | 运行/调度时由DataWorks动态替换为实际值 | 表示分区数(如 |
步骤三:编写 spark-submit 代码
在Submit命令编辑窗口内,编写以下代码。该命令通过 spark-submit 工具,将指定的 Python 应用程序打包并提交至阿里云 EMR 集群执行。
在使用 EMR 计算资源(半托管集群)与 EMR Serverless Spark 计算资源(全托管集群)时,spark-submit 命令所支持的参数等存在关键差异。为便于您准确配置,下表从代码示例、命令集成、参考文档三个维度进行了对比说明。
差异类别 | 使用 EMR 计算资源(半托管集群) | 使用 EMR Serverless Spark 计算资源(全托管集群) |
代码示例 | | |
命令集成 | 基于 Apache Spark 开源发行版的 | 基于阿里云 EMR Serverless |
命令参考文档 | 更多 | 关于 |
运行节点
在运行配置选择配置计算资源和资源组。
配置项
说明
计算资源
选择已绑定的 EMR 计算资源或 EMR Serverless Spark计算资源。若无可用计算资源,可在下拉框中选择新建计算资源。
资源组
选择已绑定工作空间的资源组。
脚本参数
在配置节点内容时,通过
${参数名}的方式定义变量,需要在脚本参数处配置本次运行值信息,任务运行时会将它动态替换为真实的取值。其中参数值会根据调度配置中的参数值进行同步。
说明本次运行值仅在当前执行中生效。优先采用本次运行值,无则采用参数值。
在节点编辑页面上方工具栏,单击运行,系统将合并完整 Python 脚本(含资源引用),并通过
spark-submit提交至 EMR 集群,最后返回执行日志与结果。

图标唤起发布流程,通过该流程将任务发布至生产环境。项目目录下的节点只有在发布至生产环境后,才会进行周期性调度。