Spark是一个通用的大数据分析引擎,以其高性能、易用性和广泛的适用性而著称。它支持复杂的内存计算,非常适合构建大规模且低延迟的数据分析应用。DataWorks平台提供了EMR Spark节点,使您能够在 DataWorks上便捷地开发和周期性调度Spark任务。本文将向您介绍如何配置和使用EMR Spark节点,并通过具体的应用示例来展示EMR Spark节点的功能。
前提条件
已创建阿里云EMR集群,并注册EMR集群至DataWorks。操作详情请参见注册EMR集群至DataWorks。
(可选,RAM账号需要)进行任务开发的RAM账号已被添加至对应工作空间中,并具有开发或空间管理员(权限较大,谨慎添加)角色权限,添加成员的操作详情请参见为工作空间添加空间成员。
如果您使用的是主账号,则可忽略该添加操作。
已开发创建项目目录,详情请参见项目目录。
如果您在开发任务时,需要特定的开发环境支持,可使用DataWorks提供的自定义镜像功能,定制化构建任务执行所需的组件镜像。更多信息,请参见镜像管理。
已创建EMR Spark节点,详情请参见创建周期任务。
使用限制
仅支持使用Serverless资源组(推荐)或独享调度资源组运行该类型任务。
DataLake或自定义集群若要在DataWorks管理元数据,需先在集群侧配置EMR-HOOK。详情请参见配置Spark SQL的EMR-HOOK。
若未在集群侧配置EMR-HOOK,则无法在DataWorks中实时展示元数据、生成审计日志、展示血缘关系、开展EMR相关治理任务。
EMR on ACK类型的Spark集群不支持查看血缘,EMR Serverless Spark集群支持查看血缘。
EMR on ACK 类型的Spark集群及EMR Serverless Spark集群仅支持通过OSS REF的方式直接引用OSS资源、上传资源到OSS,不支持上传资源到HDFS。
DataLake集群、自定义集群支持通过OSS REF的方式直接引用OSS资源、上传资源到OSS及上传资源到HDFS。
准备工作:开发Spark任务并获取JAR包
在使用DataWorks调度EMR Spark任务前,您需要先在EMR中开发Spark任务代码并完成任务代码的编译,生成编译后的任务JAR包,EMR Spark任务的开发指导详情请参见Spark概述。
后续您需要将任务JAR包上传至DataWorks,在DataWorks中周期性调度EMR Spark任务。
操作步骤
在EMR Spark节点编辑页面,执行如下开发操作。
开发Spark任务
您可以根据不同场景需求选择适合您的操作方案:
方案一:先上传资源后引用EMR JAR资源方案二:直接引用OSS资源DataWorks也支持您从本地先上传资源至Data Studio,再引用资源。EMR Spark任务编译完成后,您需获取编译后的JAR包,建议根据JAR包大小选择不同方式存储JAR包资源。
上传JAR包资源,创建为DataWorks的EMR资源并提交,或直接存储在EMR的HDFS存储中(EMR on ACK 类型的Spark集群及EMR Serverless Spark集群不支持上传资源到HDFS)。
JAR包小于200MB时JAR包大于等于200MB时创建EMR JAR资源。
JAR包小于200MB时,可将JAR包通过本地上传的方式上传为DataWorks的EMR JAR资源,便于后续在DataWorks控制台进行可视化管理,创建完成资源后需进行提交,操作详情请参见创建和使用EMR资源。
将JAR包通过本地上传的方式上传到JAR资源的存放目录下,详情请参见资源管理。
单击点击上传按钮,上传JAR资源。
选择存储路径、数据源及资源组。
单击保存按钮进行保存。
引用EMR JAR资源。
打开创建的EMR Spark节点,停留在代码编辑页面。
在左侧导航栏的资源管理中找到待引用资源,右键选择引用资源。
选择引用后,当EMR Spark节点的代码编辑页面出现如下引用成功提示时,表明已成功引用代码资源。
##@resource_reference{"spark-examples_2.12-1.0.0-SNAPSHOT-shaded.jar"} spark-examples_2.12-1.0.0-SNAPSHOT-shaded.jar
如果成功自动添加上述引用代码,表明资源引用成功。其中,
spark-examples_2.12-1.0.0-SNAPSHOT-shaded.jar
为您实际上传的EMR JAR资源名称。改写EMR Spark节点代码,补充spark submit命令,改写后的示例如下。
EMR Spark节点编辑代码时不支持注释语句,请务必参考如下示例改写任务代码,不要随意添加注释,否则后续运行节点时会报错。
##@resource_reference{"spark-examples_2.11-2.4.0.jar"} spark-submit --class org.apache.spark.examples.SparkPi --master yarn spark-examples_2.11-2.4.0.jar 100
org.apache.spark.examples.SparkPi
:为您实际编译的JAR包中的任务主Class。spark-examples_2.11-2.4.0.jar
:为您实际上传的EMR JAR资源名称。其他参数可参考以上示例不做修改,您也可执行
spark-submit --help
命令查看spark submit的使用帮助,根据需要修改spark submit命令。若您需要在Spark节点中使用Spark-submit命令简化的参数,您需要在代码中自行添加,例如,
--executor-memory 2G
。Spark节点仅支持使用Yarn的Cluster提交作业。
spark-submit方式提交的任务,deploy-mode推荐使用cluster模式,不建议使用client模式。
创建EMR JAR资源。
JAR包大于等于200MB时,无法通过本地上传的方式直接上传为DataWorks的资源。建议直接将JAR包存储在EMR的HDFS中,并记录下JAR包的存储路径,以便于后续在DataWorks调度Spark任务时引用该路径。
将JAR包通过本地上传的方式上传到JAR资源的存放目录下,详情请参见资源管理。
单击点击上传按钮,上传JAR资源。
选择存储路径、数据源及资源组。
单击保存按钮进行保存。
引用EMR JAR资源。
JAR包存储在HDFS时,您可以直接在EMR Spark节点中通过代码指定JAR包路径的方式引用JAR包。
双击创建的EMR Spark节点,打开EMR Spark 节点的代码编辑页面。
编写spark submit命令,示例如下。
spark-submit --master yarn --deploy-mode cluster --name SparkPi --driver-memory 4G --driver-cores 1 --num-executors 5 --executor-memory 4G --executor-cores 1 --class org.apache.spark.examples.JavaSparkPi hdfs:///tmp/jars/spark-examples_2.11-2.4.8.jar 100
hdfs:///tmp/jars/spark-examples_2.11-2.4.8.jar:为JAR包实际在HDFS中的路径。
org.apache.spark.examples.JavaSparkPi:为您实际编译的JAR包中的任务主class。
其他参数为实际EMR集群的参数,需根据实际情况进行修改配置。您也可以执行
spark-submit --help
命令查看spark submit的使用帮助,根据需要修改spark submit命令。若您需要在Spark节点中使用Spark-submit命令简化的参数,您需要在代码中自行添加,例如,
--executor-memory 2G
。Spark节点仅支持使用Yarn的Cluster提交作业。
spark-submit方式提交的任务,deploy-mode推荐使用cluster模式,不建议使用client模式。
当前节点可直接通过OSS REF的方式引用OSS资源,在运行EMR节点时,DataWorks会自动加载代码中的OSS资源至本地使用。该方式常用于“需要在EMR任务中运行JAR依赖”、“EMR任务需依赖脚本”等场景。
开发JAR资源。
代码依赖准备。
您可前往EMR集群,在集群master节点的
/usr/lib/emr/spark-current/jars/
路径下查看您所需的代码依赖。下面以Spark3.4.2版本为例,您需在打开已创建的IDEA项目添加pom依赖并引用相关插件。添加pom依赖引用相关插件<dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.4.2</version> </dependency> <!-- Apache Spark SQL --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>3.4.2</version> </dependency> </dependencies>
<build> <sourceDirectory>src/main/scala</sourceDirectory> <testSourceDirectory>src/test/scala</testSourceDirectory> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.7.0</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <configuration> <recompileMode>incremental</recompileMode> </configuration> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> <configuration> <args> <arg>-dependencyfile</arg> <arg>${project.build.directory}/.scala_dependencies</arg> </args> </configuration> </execution> </executions> </plugin> </plugins> </build>
代码示例。
package com.aliyun.emr.example.spark import org.apache.spark.sql.SparkSession object SparkMaxComputeDemo { def main(args: Array[String]): Unit = { // 创建 SparkSession val spark = SparkSession.builder() .appName("HelloDataWorks") .getOrCreate() // 打印Spark版本 println(s"Spark version: ${spark.version}") } }
编辑完上述Scala代码后将该代码生成JAR包。
示例生成的JAR包为
SparkWorkOSS-1.0-SNAPSHOT-jar-with-dependencies.jar
。
上传JAR资源。
完成代码开发后,您需登录OSS管理控制台,单击所在地域左侧导航栏的Bucket列表。
单击目标Bucket名称,进入文件管理页面。
本文示例使用的Bucket为
onaliyun-bucket-2
。单击新建目录,创建JAR资源的存放目录。
配置目录名为
emr/jars
,创建JAR资源的存放目录。上传JAR资源至JAR资源的存放目录。
进入存放目录,单击上传文件,在待上传文件区域单击扫描文件,添加
SparkWorkOSS-1.0-SNAPSHOT-jar-with-dependencies.jar
文件至Bucket,单击上传文件。
引用JAR资源。
编辑引用JAR资源代码。
在已创建的EMR Spark节点编辑页面,编辑引用JAR资源代码。
spark-submit --class com.aliyun.emr.example.spark.SparkMaxComputeDemo --master yarn ossref://onaliyun-bucket-2/emr/jars/SparkWorkOSS-1.0-SNAPSHOT-jar-with-dependencies.jar
引用参数说明:
参数
参数说明
class
运行的主类全名称。
master
Spark应用程序的运行模式。
ossref
文件路径格式为
ossref://{endpoint}/{bucket}/{object}
endpoint:OSS对外服务的访问域名。Endpoint为空时,仅支持使用与当前访问的EMR集群同地域的OSS,即OSS的Bucket需要与EMR集群所在地域相同。
Bucket:OSS用于存储对象的容器,每一个Bucket有唯一的名称,登录OSS管理控制台,可查看当前登录账号下所有Bucket。
object:存储在Bucket中的一个具体的对象(文件名称或路径)。
运行EMR Spark节点任务。
编辑完成后您可单击
图标,选择您所创建的Serverless资源组运行EMR Spark节点。待任务执行完成后,记录控制台打印的
applicationIds
,例如application_1730367929285_xxxx
。结果查看。
创建EMR Shell节点并在EMR Shell节点执行
yarn logs -applicationId application_1730367929285_xxxx
命令查看运行结果:
(可选)配置高级参数
您可在节点调度配置的EMR节点参数中配置特有属性参数。更多Spark属性参数设置,请参考Spark Configuration。不同类型EMR集群可配置的高级参数存在部分差异,具体如下表。
DataLake集群/自定义集群:EMR on ECSSpark集群:EMR ON ACKHadoop集群:EMR on ECSEMR Serverless Spark集群高级参数
配置说明
queue
提交作业的调度队列,默认为default队列。
如果您在注册EMR集群至DataWorks工作空间时,配置了工作空间级的YARN资源队列:
如果勾选了全局配置是否优先为是,则实际Spark任务运行时,队列以注册EMR集群时的配置结果为准。
如果未勾选,则实际Spark任务运行时,队列以EMR Spark节点的配置结果为准。
关于EMR YARN说明,详情请参见队列基础配置,注册EMR集群时的队列配置详情请参见设置全局YARN资源队列。
priority
优先级,默认为1。
FLOW_SKIP_SQL_ANALYZE
SQL语句执行方式。取值如下:
true
:表示每次执行多条SQL语句。false
(默认值):表示每次执行一条SQL语句。
该参数仅支持用于数据开发环境测试运行流程。
其他
您可以直接在高级配置里追加自定义SPARK参数。例如,
spark.eventLog.enabled : false
,DataWorks会自动在最终下发EMR集群的代码中进行补全,格式为:--conf key=value
。还支持配置全局Spark参数,详情请参见设置全局Spark参数。
高级参数
配置说明
FLOW_SKIP_SQL_ANALYZE
SQL语句执行方式。取值如下:
true
:表示每次执行多条SQL语句。false
:表示每次执行一条SQL语句。
该参数仅支持用于数据开发环境测试运行流程。
其他
您可以直接在高级配置里追加自定义SPARK参数。例如,
spark.eventLog.enabled : false
,DataWorks会自动在最终下发EMR集群的代码中进行补全,格式为:--conf key=value
。还支持配置全局Spark参数,详情请参见设置全局Spark参数。
高级参数
配置说明
queue
提交作业的调度队列,默认为default队列。
如果您在注册EMR集群至DataWorks工作空间时,配置了工作空间级的YARN资源队列:
如果勾选了全局配置是否优先为是,则实际Spark任务运行时,队列以注册EMR集群时的配置结果为准。
如果未勾选,则实际Spark任务运行时,队列以EMR Spark节点的配置结果为准。
关于EMR YARN说明,详情请参见队列基础配置,注册EMR集群时的队列配置详情请参见设置全局YARN资源队列。
priority
优先级,默认为1。
FLOW_SKIP_SQL_ANALYZE
SQL语句执行方式。取值如下:
true
:表示每次执行多条SQL语句。false
:表示每次执行一条SQL语句。
该参数仅支持用于数据开发环境测试运行流程。
USE_GATEWAY
设置本节点提交作业时,是否通过Gateway集群提交。取值如下:
true
:通过Gateway集群提交。false
:不通过Gateway集群提交,默认提交到header节点。
如果本节点所在的集群未关联Gateway集群,此处手动设置参数取值为
true
时,后续提交EMR作业时会失败。其他
您可以直接在高级配置里追加自定义SPARK参数。例如,
spark.eventLog.enabled : false
,DataWorks会自动在最终下发EMR集群的代码中进行补全,格式为:--conf key=value
。还支持配置全局Spark参数,详情请参见设置全局Spark参数。
相关参数设置请参见提交Spark任务参数设置。
高级参数
配置说明
queue
提交作业的调度队列,默认为dev_queue队列。
priority
优先级,默认为1。
FLOW_SKIP_SQL_ANALYZE
SQL语句执行方式。取值如下:
true
:表示每次执行多条SQL语句。false
:表示每次执行一条SQL语句。
该参数仅支持用于数据开发环境测试运行流程。
SERVERLESS_RELEASE_VERSION
Spark引擎版本,默认使用管理中心的集群管理中集群配置的默认引擎版本。如需为不同任务设置不同的引擎版本,您可在此进行设置。
SERVERLESS_QUEUE_NAME
指定资源队列,默认使用管理中心的集群管理中集群配置的默认资源队列。如有资源隔离和管理需求,可通过添加队列实现。详情请参见管理资源队列。
其他
您可以直接在高级配置里追加自定义SPARK参数。例如,
spark.eventLog.enabled : false
,DataWorks会自动在最终下发EMR集群的代码中进行补全,格式为:--conf key=value
。还支持配置全局Spark参数,详情请参见设置全局Spark参数。
执行Spark任务
在调试配置的计算资源中,选择配置计算资源和DataWorks资源组。
您还可以根据任务执行所需的资源情况来调度 CU。默认CU为
0.25
。访问公共网络或VPC网络环境的数据源需要使用与数据源测试连通性成功的调度资源组。详情请参见网络连通方案。
在工具栏的参数对话框中选择对应的数据源,单击运行Spark任务。
如需定期执行节点任务,请根据业务需求配置调度信息。配置详情请参见节点调度。
节点任务配置完成后,需对节点进行发布。详情请参见节点/工作流发布。
任务发布后,您可以在运维中心查看周期任务的运行情况。详情请参见运维中心入门。
常见问题
相关文档
- 本页导读 (1)
- 前提条件
- 使用限制
- 准备工作:开发Spark任务并获取JAR包
- 操作步骤
- 常见问题
- 相关文档