EMR PySpark 节点

更新时间:
复制为 MD 格式

DataWorks 提供 EMR PySpark 节点,支持用户在统一开发界面中协同编写 Python 业务逻辑与 spark-submit 提交命令,无缝对接阿里云 EMR 半托管集群以及全托管的 EMR Serverless Spark 。

适用范围

  • 计算资源限制:EMR PySpark 节点支持以下两类计算资源:

    • EMR计算资源(半托管集群)的特定集群类型:仅支持 DataLake 集群 和 Custom 集群。

    • EMR Serverless Spark计算资源(全托管集群)。

      重要

      如您需在 EMR 半托管集群(如 DataLake 集群或 Custom 集群)上运行 EMR PySpark 节点,请提交工单,并注明您的集群类型、EMR 版本及具体使用场景,我们将为您评估接入可行性并提供相应支持。

  • 资源组限制:仅支持使用Serverless资源组

  • 权限限制:主账号或已被添加至对应工作空间中,并具有开发空间管理员角色权限的 RAM 用户。添加成员的操作详情请参见为工作空间添加空间成员

新建节点

  1. 登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的数据开发与运维 > 数据开发,在下拉框中选择对应工作空间后单击进入数据开发

  2. 在数据开发(Data Studio)页面,新建 EMR PySpark 节点。

  3. 配置节点路径名称。该示例中节点名称为 emr_pyspark_test。

开发节点

EMR PySpark 节点采用双窗口协同编辑模式

  • 上层窗口为 Python 代码编辑区,用于编写核心业务逻辑(支持引用上传的资源文件,如 .py 模块);

  • 下层窗口为 Submit 命令编辑区,用于提交 spark-submit命令至 EMR 集群。

以下示例基于蒙特卡洛方法,演示如何在 EMR PySpark 节点中分布式估算圆周率 π 的近似值。

步骤一:上传依赖资源

将自定义 Python 文件(该示例中为utils.py)上传至 DataWorks 资源管理模块。上传后,可在节点中引用该资源。该 Python 文件定义了单个 Spark 执行任务中蒙特卡洛模拟采样的核心计算逻辑。

  1. 关于上传资源的具体操作,可以参见:EMR资源与函数。在数据开发资源管理页面,新建资源并选择资源类型为 EMR File,配置资源名称

  2. 单击重新上传,上传示例中的utils.py

  3. 选择存储路径数据源以及资源组,并单击保存按钮进行保存。

    image

步骤二:编写 Python 代码

在 Python 代码编辑窗口内,编写以下代码。该示例 Python 程序旨在通过集群并行计算来估算π的数值。程序首先将总计算量分解成若干子任务,并将它们分配给集群中的计算节点。在所有节点并行执行完毕后,程序会收集它们的计算结果,并通过聚合操作得出最终的π估算值。

##@resource_reference{"utils.py"}
from pyspark.sql import SparkSession
from utils import estimate_pi_in_task
import sys

def main():
    # 创建 SparkSession
    spark = SparkSession.builder \
        .appName("EstimatePi") \
        .getOrCreate()

    sc = spark.sparkContext

    # 总采样数
    total_samples = int(sys.argv[1])
    num_partitions = ${test1}

    # 每个分区的样本数
    samples_per_partition = total_samples // num_partitions

    # 创建一个 RDD,每个分区执行一次 estimate_pi_in_task
    rdd = sc.parallelize(range(num_partitions), num_partitions)

    # 映射每个分区去执行采样任务
    inside_counts = rdd.map(lambda _: estimate_pi_in_task(samples_per_partition))

    # 聚合所有分区的结果
    total_inside = inside_counts.sum()
    pi_estimate = 4.0 * total_inside / total_samples

    print(f"总样本数: {total_samples}")
    print(f"落在圆内的样本数: {total_inside}")
    print(f"估算的 π 值: {pi_estimate:.6f}")

    spark.stop()

if __name__ == "__main__":
    main()
说明

仅支持将整个 Python 文件作为一个 Spark 作业提交执行,不支持选中部分代码运行。

关于脚本中参数的进一步说明:

参数

类型

取值

说明

sys.argv[1]

命令行参数

spark-submit 命令中脚本名后的值

表示总采样数(如 10000

${test1}

调度参数

运行/调度时由DataWorks动态替换为实际值

表示分区数(如100

步骤三:编写 spark-submit 代码

Submit命令编辑窗口内,编写以下代码。该命令通过 spark-submit 工具,将指定的 Python 应用程序打包并提交至阿里云 EMR 集群执行。

在使用 EMR 计算资源(半托管集群)与 EMR Serverless Spark 计算资源(全托管集群)时,spark-submit 命令所支持的参数等存在关键差异。为便于您准确配置,下表从代码示例、命令集成、参考文档三个维度进行了对比说明。

差异类别

使用 EMR 计算资源(半托管集群)

使用 EMR Serverless Spark 计算资源(全托管集群)

代码示例

spark-submit \                          
 --master yarn \            # 指定 Spark 集群管理模式为 YARN
 --deploy-mode cluster \    # 指定部署模式为集群
 --py-files utils.py \      # 显式声明依赖的 Python 资源文件
 emr_pyspark_test.py 10000  # 主程序入口文件
spark-submit \                                       
 --py-files utils.py \                    
 emr_pyspark_test.py 10000  

命令集成

基于 Apache Spark 开源发行版的 spark-submit 工具

基于阿里云 EMR Serverless spark-submit 工具包

命令参考文档

更多 spark-submit 支持的参数及其使用说明,请参见 Apache Spark 官方文档 — Submitting Applications

关于 spark-submit 的参数规范、常见选项及最佳实践,可参考文档:通过spark-submit提交任务

运行节点

  1. 运行配置选择配置计算资源资源组

    配置项

    说明

    计算资源

    选择已绑定的 EMR 计算资源 EMR Serverless Spark计算资源。若无可用计算资源,可在下拉框中选择新建计算资源

    资源组

    选择已绑定工作空间的资源组。

    脚本参数

    在配置节点内容时,通过${参数名}的方式定义变量,需要在脚本参数处配置本次运行值信息,任务运行时会将它动态替换为真实的取值。

    其中参数值会根据调度配置中的参数值进行同步。

    说明

    本次运行值仅在当前执行中生效。优先采用本次运行值,无则采用参数值

  2. 在节点编辑页面上方工具栏,单击运行,系统将合并完整 Python 脚本(含资源引用),并通过 spark-submit 提交至 EMR 集群,最后返回执行日志与结果。

后续步骤

  • 节点调度配置:若项目目录下的节点需要周期性调度执行,您需要在节点右侧的调度配置中设置调度策略,配置相关的调度属性。

  • 节点发布:若任务需要发布至生产环境执行,请单击界面image图标唤起发布流程,通过该流程将任务发布至生产环境。项目目录下的节点只有在发布至生产环境后,才会进行周期性调度。

  • 任务运维:任务发布后,您可以在运维中心查看周期任务的运行情况。详情请参见运维中心入门

相关文档