在EMR任务开发中,通过创建EMR(E-MapReduce)的MR(MapReduce)节点,可以将大规模数据集分解为多个并行处理的Map任务,从而显著提高数据处理效率。本文将以一个实例——从对象存储服务OSS读取文本文件,并统计其中单词数量为例,向您详细介绍如何开发和配置EMR MR节点作业,帮助您全面掌握这一流程。
前提条件
已创建阿里云EMR集群,并注册EMR集群至DataWorks。操作详情请参见注册EMR集群至DataWorks。
(可选,RAM账号需要)进行任务开发的RAM账号已被添加至对应工作空间中,并具有开发或空间管理员(权限较大,谨慎添加)角色权限,添加成员的操作详情请参见为工作空间添加空间成员。
说明如果您使用的是主账号,则可忽略该添加操作。
已开发创建项目目录,详情请参见项目目录。
如需在节点中引用开源代码资源或自定义函数,您需在资源管理中创建资源和函数,详情请参见资源管理。
如果您使用本文的作业开发示例执行相关作业流程,则还需要创建好OSS的存储空间Bucket。创建OSS的存储空间Bucket,详情请参见控制台创建存储空间。
已创建EMR MR节点,详情请参见创建任务节点。
使用限制
仅支持使用Serverless资源组(推荐)或独享调度资源组运行该类型任务。
DataLake或自定义集群若要在DataWorks管理元数据,需先在集群侧配置EMR-HOOK。详情请参见配置Hive的EMR-HOOK。
说明若未在集群侧配置EMR-HOOK,则无法在DataWorks中实时展示元数据、生成审计日志、展示血缘关系、开展EMR相关治理任务。
准备初始数据及JAR资源包
准备初始数据
创建示例文件input01.txt
文件内容如下。
hadoop emr hadoop dw
hive hadoop
dw emr
上传初始数据文件
登录OSS管理控制台,单击左侧导航栏的Bucket列表。
单击目标Bucket名称,进入文件管理页面。
本文示例使用的Bucket为
onaliyun-bucket-2
。单击新建目录,创建初始数据及JAR资源的存放目录。
配置目录名为
emr/datas/wordcount02/inputs
,创建初始数据的存放目录。配置目录名为
emr/jars
,创建JAR资源的存放目录。
上传初始数据文件至初始数据的存放目录。
进入
/emr/datas/wordcount02/inputs
路径,单击上传文件。在待上传文件区域单击扫描文件,添加
input01.txt
文件至Bucket,单击上传文件。
使用MapReduce读取OSS文件并生成JAR包
打开已创建的IDEA项目,添加pom依赖。
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-common</artifactId> <version>2.8.5</version> <!--因为EMR-MR用的是2.8.5--> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.8.5</version> </dependency>
在MapReduce中读写OSS文件,需要配置如下参数。
重要风险提示: 阿里云账号AccessKey拥有所有API的访问权限,建议您使用RAM用户进行API访问或日常运维。强烈建议不要将AccessKey ID和AccessKey Secret保存到工程代码里或者任何容易被泄露的地方,AccessKey泄露会威胁您账号下所有资源的安全。以下代码示例仅供参考,请妥善保管好您的AccessKey信息。
conf.set("fs.oss.accessKeyId", "${accessKeyId}"); conf.set("fs.oss.accessKeySecret", "${accessKeySecret}"); conf.set("fs.oss.endpoint","${endpoint}");
参数说明如下:
${accessKeyId}
:阿里云账号的AccessKey ID。${accessKeySecret}
:阿里云账号的AccessKey Secret。${endpoint}
:OSS对外服务的访问域名。由您集群所在的地域决定,对应的OSS也需要是在集群对应的地域,详情请参见OSS地域和访问域名。
以Java代码为例,修改Hadoop官网WordCount示例,即在代码中添加AccessKey ID和AccessKey Secret的配置,以便作业有权限访问OSS文件。
编辑完上述Java代码后将该代码生成JAR包。示例生成的JAR包为
onaliyun_mr_wordcount-1.0-SNAPSHOT.jar
。
操作步骤
在EMR MR节点编辑页面,执行如下开发操作。
开发EMR MR任务
您可以根据不同场景需求选择适合您的操作方案:
方案一:先上传资源后引用EMR JAR资源
DataWorks也支持您从本地先上传资源至DataStudio,再引用资源。若EMR MR节点依赖的资源较大,则无法通过DataWorks页面上传。您可将资源存放至HDFS上,然后在代码中进行引用。
创建EMR JAR资源。
详情请参见资源管理。将准备初始数据及JAR资源包中生成的JAR包存储在JAR资源的存放目录
emr/jars
下。单击点击上传按钮,上传JAR资源。选择存储路径、数据源及资源组。
单击保存按钮进行保存。
引用EMR JAR资源。
打开创建的EMR MR节点,停留在代码编辑页面。
在左侧导航栏的资源管理中找到待引用资源(示例为
onaliyun_mr_wordcount-1.0-SNAPSHOT.jar
),右键选择引用资源。选择引用后,当EMR MR节点的代码编辑页面出现如下引用成功提示时,表明已成功引用代码资源。此时,需要执行下述命令。如下命令涉及的资源包、Bucket名称、路径信息等为本文示例的内容,使用时,您需要替换为实际使用信息。
##@resource_reference{"onaliyun_mr_wordcount-1.0-SNAPSHOT.jar"} onaliyun_mr_wordcount-1.0-SNAPSHOT.jar cn.apache.hadoop.onaliyun.examples.EmrWordCount oss://onaliyun-bucket-2/emr/datas/wordcount02/inputs oss://onaliyun-bucket-2/emr/datas/wordcount02/outputs
说明EMR MR节点编辑代码时不支持注释语句。
方案二:直接引用OSS资源
当前节点可通过OSS REF的方式直接引用OSS资源,在运行EMR节点时,DataWorks会自动加载代码中的OSS资源至本地使用。该方式常用于“需要在EMR任务中运行JAR依赖”、“EMR任务需依赖脚本”等场景。
上传JAR资源。
完成代码开发后,您需登录OSS管理控制台,单击所在地域左侧导航栏的Bucket列表。
单击目标Bucket名称,进入文件管理页面。
本文示例使用的Bucket为
onaliyun-bucket-2
。上传JAR资源至JAR资源的存放目录。
进入存放目录
emr/jars
,单击上传文件,在待上传文件区域单击扫描文件,添加onaliyun_mr_wordcount-1.0-SNAPSHOT.jar
文件至Bucket,单击上传文件。
引用JAR资源。
编辑引用JAR资源代码。
在已创建的EMR MR节点编辑页面,编辑引用JAR资源代码。
hadoop jar ossref://onaliyun-bucket-2/emr/jars/onaliyun_mr_wordcount-1.0-SNAPSHOT.jar cn.apache.hadoop.onaliyun.examples.EmrWordCount oss://onaliyun-bucket-2/emr/datas/wordcount02/inputs oss://onaliyun-bucket-2/emr/datas/wordcount02/outputs
说明上述命令行格式为
hadoop jar <引用运行JAR存储路径> <运行的主类全名称> <读入文件存储目录> <写出结果存储目录>
。引用运行JAR存储路径参数说明:
参数
参数说明
引用运行JAR存储路径
格式为
ossref://{endpoint}/{bucket}/{object}
endpoint:OSS对外服务的访问域名。Endpoint为空时,仅支持使用与当前访问的EMR集群同地域的OSS,即OSS的Bucket需要与EMR集群所在地域相同。
Bucket:OSS用于存储对象的容器,每一个Bucket有唯一的名称,登录OSS管理控制台,可查看当前登录账号下所有Bucket。
object:存储在Bucket中的一个具体的对象(文件名称或路径)。
(可选)配置高级参数
您可在节点调度配置的EMR节点参数中配置特有属性参数。更多属性参数设置,请参考Spark Configuration。不同类型EMR集群可配置的高级参数存在部分差异,具体如下表。
DataLake集群/自定义集群:EMR on ECS
高级参数
配置说明
queue
提交作业的调度队列,默认为default队列。关于EMR YARN说明,详情请参见队列基础配置。
priority
优先级,默认为1。
FLOW_SKIP_SQL_ANALYZE
SQL语句执行方式。取值如下:
true
:表示每次执行多条SQL语句。false
(默认值):表示每次执行一条SQL语句。
说明该参数仅支持用于数据开发环境测试运行流程。
其他
您也可以直接在高级配置里追加自定义MR任务参数。提交代码时DataWorks会自动在命令中通过
-D key=value
语句加上新增的参数。Hadoop集群:EMR on ECS
高级参数
配置说明
queue
提交作业的调度队列,默认为default队列。关于EMR YARN说明,详情请参见队列基础配置。
priority
优先级,默认为1。
USE_GATEWAY
设置本节点提交作业时,是否通过Gateway集群提交。取值如下:
true
:通过Gateway集群提交。false
(默认值):不通过Gateway集群提交,默认提交到header节点。
说明如果本节点所在的集群未关联Gateway集群,此处手动设置参数取值为
true
时,后续提交EMR作业时会失败。执行SQL任务
在调试配置的计算资源中,选择配置计算资源和DataWorks资源组。
说明您还可以根据任务执行所需的资源情况来调度 CU。默认CU为
0.25
。访问公共网络或VPC网络环境的数据源需要使用与数据源测试连通性成功的调度资源组。详情请参见网络连通方案。
在工具栏的参数对话框中选择已创建的数据源,单击运行SQL任务。
如需定期执行节点任务,请根据业务需求配置调度信息。配置详情请参见调度配置。
节点任务配置完成后,需对节点进行发布。详情请参见节点发布。
任务发布后,您可以在运维中心查看周期任务的运行情况。详情请参见运维中心入门。
查看结果
登录OSS管理控制台,您可以在目标Bucket的初始数据存放目录下查看写入结果。示例路径为emr/datas/wordcount02/outputs。
在DataWorks读取统计结果。
新建EMR Hive节点,详情请参见创建任务节点。
在EMR Hive节点中创建挂载在OSS上的Hive外表,读取表数据。代码示例如下。
CREATE EXTERNAL TABLE IF NOT EXISTS wordcount02_result_tb ( `word` STRING COMMENT '单词', `cout` STRING COMMENT '计数' ) ROW FORMAT delimited fields terminated by '\t' location 'oss://onaliyun-bucket-2/emr/datas/wordcount02/outputs/'; SELECT * FROM wordcount02_result_tb;
运行结果如下图。