DataWorks的EMR(E-MapReduce) SPARK节点,用于进行复杂的内存分析,构建大型、低延迟的数据分析应用。本文为您介绍如何创建EMR Spark节点,并通过测试计算Pi及Spark对接MaxCompute两个示例,为您介绍EMR
Spark节点的功能。
前提条件
- 您已创建阿里云EMR集群,且集群所在的安全组中入方向的安全策略包含以下策略。
- 授权策略:允许
- 协议类型:自定义 TCP
- 端口范围:8898/8898
- 授权对象:100.104.0.0/16
- 您在工作空间配置页面添加E-MapReduce计算引擎实例后,当前页面才会显示EMR目录。详情请参见配置工作空间。
- 如果EMR启用了Ranger,则使用DataWorks进行EMR的作业开发前,您需要在EMR中修改配置,添加白名单配置并重启Hive,否则作业运行时会报错Cannot modify spark.yarn.queue at runtime或Cannot modify SKYNET_BIZDATE at runtime。
- 白名单的配置通过EMR的自定义参数,添加Key和Value进行配置,以Hive组件的配置为例,配置值如下。
hive.security.authorization.sqlstd.confwhitelist.append=tez.*|spark.*|mapred.*|mapreduce.*|ALISA.*|SKYNET.*
说明 其中ALISA.*
和SKYNET.*
为DataWorks专有的配置。
- 白名单配置完成后需要重启服务,重启后配置才会生效。重启服务的操作详情请参见重启服务。
- 已开通独享调度资源组,并且独享调度资源组需要绑定EMR所在的VPC专有网络,详情请参见新增和使用独享调度资源组。
创建EMR Spark节点
- 进入数据开发页面。
- 登录DataWorks控制台。
- 在左侧导航栏,单击工作空间列表。
- 选择工作空间所在地域后,单击相应工作空间后的进入数据开发。
- 鼠标悬停至
图标,单击。您也可以找到相应的业务流程,右键单击EMR,选择。
- 在新建节点对话框中,输入节点名称,并选择目标文件夹。
说明 节点名称必须是大小写字母、中文、数字、下划线(_)和小数点(.),且不能超过128个字符。
- 编辑高级配置。
- "SPARK_CONF": "--conf spark.driver.memory=2g --conf xxx=xxx" ,设置spark 任务运行参数,多个参数在该key中追加。
- “queue”:提交作业的调度队列,默认为default队列。
- “vcores”: 虚拟核数,默认为1。
- “memory”:内存,默认为2048MB(用于设置启动器Launcher的内存配额)。
- “priority”:优先级,默认为1。
- “FLOW_SKIP_SQL_ANALYZE”:SQL语句执行方式,参数值为false表示每次执行一条SQL语句;参数值为true表示每次执行多条SQL语句。
- “USE_GATEWAY”:设置本节点提交作业时,是否通过Gateway集群提交,true标明通过Gateway集群提交,false表明不通过Gateway集群提交,默认提交到header节点。
说明 如果本节点所在的集群未关联Gateway集群,此处手动设置参数取值为true时,后续提交EMR作业时会失败。
- 在调度配置界面对节点配置调度相关参数。
- 单击提交。
您可以在
EMR节点下,单击所使用的目标引擎,在
数据开发中找到新创建的EMR Spark节点。
说明 如果您的工作空间绑定多个EMR引擎,需要选择EMR引擎。如果仅绑定一个EMR引擎,则无需选择。

保存并提交节点任务
- 保存并提交节点。
注意 您需要设置节点的重跑属性和依赖的上游节点,才可以提交节点。
- 单击工具栏中的
图标,保存节点。
- 单击工具栏中的
图标。
- 在提交新版本对话框中,输入变更描述。
- 单击确认。
如果您使用的是标准模式的工作空间,提交成功后,请单击右上方的
发布。具体操作请参见
发布任务。
- 测试节点,详情请参见查看周期任务。
数据开发示例一:使用计算Pi测试当前EMR Spark环境是否可用
示例一以Spark自带示例项目计算Pi为例,测试当前EMR Spark环境是否可用。示例详情请参见示例项目使用说明。
- 获取Spark自带示例的JAR包spark-examples_2.11-2.4.5.jar的存放路径。
Spark组件安装在
/usr/lib/spark-current路径下,您需要登录
阿里云E-MapReduce控制台,进入目标EMR集群查询完整的路径
/usr/lib/spark-current/examples/jars/spark-examples_2.11-2.4.5.jar,详情请参见
EMR常用文件路径。

- 在创建的EMR Spark节点编辑页面,输入运行代码。创建EMR Spark节点,详情请参见创建EMR Spark节点。
示例运行代码如下。
--class org.apache.spark.examples.SparkPi --master local[8] /usr/lib/spark-current/examples/jars/spark-examples_2.11-2.4.5.jar 100
您仅需填写
spark-submit后面的内容即可,在作业提交时会自动补全
spark-submit的内容。实际执行的界面代码如下。
# spark-submit [options] --class [MainClass] xxx.jar args
spark-submit --class org.apache.spark.examples.SparkPi --master local[8] /usr/lib/spark-current/examples/jars/spark-examples_2.11-2.4.5.jar 100
- 保存并提交运行节点任务,详情请参见《保存并提交节点任务》章节内容。
当返回结果为
1097: Pi is roughly 3.1415547141554714
时,表示运行成功,EMR Spark环境可用。

数据开发示例二:Spark对接MaxCompute
本示例以Spark对接MaxCompute,实现通过Spark统计MaxCompute表的行数为例,为您介绍EMR Spark节点的功能应用。更多应用场景请参见EMR Spark开发指南。
执行本示例前,您需要准备如下相关环境及测试数据:
- 准备环境。
- DataWorks工作空间绑定EMR引擎和MaxCompute引擎,详情请参见配置工作空间
- 开通OSS并创建Bucket,详情请参见创建存储空间
- 安装了scala的本地IDE(IDEA)。
- 准备测试数据。
在DataWorks数据开发页面创建ODPS SQL节点,执行建表语句并插入数据。示例语句如下,设置第一列为BIGINT类型,同时,插入了两条数据记录。创建ODPS
SQL节点,详情请参见
创建ODPS SQL节点DROP TABLE IF EXISTS emr_spark_read_odpstable ;
CREATE TABLE IF NOT EXISTS emr_spark_read_odpstable
(
id BIGINT
,name STRING
)
;
INSERT INTO TABLE emr_spark_read_odpstable VALUES (111,'zhangsan'),(222,'lisi') ;
- 在Spark中创建Maven工程,添加pom依赖,详情请参见Spark准备工作。
添加pom依赖,代码如下。
<dependency>
<groupId>com.aliyun.emr</groupId>
<artifactId>emr-maxcompute_2.11</artifactId>
<version>1.9.0</version>
</dependency>
您可以参考如下插件代码,在实际使用中请以实际代码为准。
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.7.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<configuration>
<recompileMode>incremental</recompileMode>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
- 在Spark中统计MaxCompute表第一列BIGINT类型的行数,详情请参见Spark对接MaxCompute。
示例代码如下。
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.aliyun.emr.example.spark
import com.aliyun.odps.TableSchema
import com.aliyun.odps.data.Record
import org.apache.spark.aliyun.odps.OdpsOps
import org.apache.spark.{SparkConf, SparkContext}
object SparkMaxComputeDemo {
def main(args: Array[String]): Unit = {
if (args.length < 6) {
System.err.println(
"""Usage: SparkMaxComputeDemo <accessKeyId> <accessKeySecret> <envType> <project> <table> <numPartitions>
|
|Arguments:
|
| accessKeyId Aliyun Access Key ID.
| accessKeySecret Aliyun Key Secret.
| envType 0 or 1
| 0: Public environment.
| 1: Aliyun internal environment, i.e. Aliyun ECS etc.
| project Aliyun ODPS project
| table Aliyun ODPS table
| numPartitions the number of RDD partitions
""".stripMargin)
System.exit(1)
}
val accessKeyId = args(0)
val accessKeySecret = args(1)
val envType = args(2).toInt
val project = args(3)
val table = args(4)
val numPartitions = args(5).toInt
val urls = Seq(
Seq("http://service.odps.aliyun.com/api", "http://dt.odps.aliyun.com"), // public environment
Seq("http://odps-ext.aliyun-inc.com/api", "http://dt-ext.odps.aliyun-inc.com") // Aliyun internal environment
)
val conf = new SparkConf().setAppName("E-MapReduce Demo 3-1: Spark MaxCompute Demo (Scala)")
val sc = new SparkContext(conf)
val odpsOps = envType match {
case 0 =>
OdpsOps(sc, accessKeyId, accessKeySecret, urls(0)(0), urls(0)(1))
case 1 =>
OdpsOps(sc, accessKeyId, accessKeySecret, urls(1)(0), urls(1)(1))
}
val odpsData = odpsOps.readTable(project, table, read, numPartitions)
println(s"Count (odpsData): ${odpsData.count()}")
}
def read(record: Record, schema: TableSchema): Long = {
record.getBigint(0)
}
}
统计MaxCompute数据完成后,请将该数据生成JAR包。示例生成的JAR包为
emr_spark_demo-1.0-SNAPSHOT-jar-with-dependencies.jar。
说明 与ODPS相关的依赖均属于第三方包,您需要将第三方包一并生成JAR包上传至目标EMR集群。
- 上传运行资源。
- 登录OSS管控台。
- 上传运行资源(即上一步骤生成的JAR包)至指定OSS路径。
本示例中,使用的路径为
oss://oss-cn-shanghai-internal.aliyuncs.com/onaliyun-bucket-2/emr_BE/spark_odps/,您需要上传
emr_spark_demo-1.0-SNAPSHOT-jar-with-dependencies.jar至该路径。首次使用OSS路径时,需要先进行一键授权,详情请参见
创建并使用EMR MR节点。
说明 由于DataWorks EMR资源的使用上限为50M,而添加依赖的JAR包通常大于50M,所以您需要在OSS控制台上传。如果您的运行资源小于50M,您也可以选择在DataWorks直接上传,详情请参见
创建和使用EMR资源

- 创建EMR Spark节点,并执行节点任务。
在
emr_spark_odps节点的编辑页面,选择所使用的EMR引擎实例,输入如下代码。
--class com.aliyun.emr.example.spark.SparkMaxComputeDemo --master yarn-client ossref://onaliyun-bucket-2/emr_BE/spark_odps/emr_spark_demo-1.0-SNAPSHOT-jar-with-dependencies.jar <accessKeyId> <accessKeySecret> 1 onaliyun_workshop_dev emr_spark_read_odpstable 1
其中<accessKeyId> 、<accessKeySecret>、 <envType>、 <project>、 <table> 、<numPartitions>等参数信息您需要替换为实际使用的相关信息。
- 保存并提交运行节点任务,详情请参见《保存并提交节点任务》章节内容。
您可以查看运行日志,当返回结果中表记录条数为
2时,表示统计结果符合预期。
