在EMR任务开发中,通过创建EMR(E-MapReduce) MR节点,可将大规模数据集分为多个Map任务以并行处理,加速数据集的并行运算。本文将以创建EMR MR节点实现从OSS中读取文本,并统计文本中的单词数为例,为您展示EMR MR节点的作业开发流程。
背景信息
本文示例中,涉及到的文件名称、Bucket名称及路径等信息,您需要替换为实际使用的相关信息。
本节点支持使用OSS REF方式引用OSS资源,详情请参见方案一:直接引用OSS资源。
前提条件
已注册EMR集群至DataWorks,详情请参见注册EMR集群至DataWorks。
使用EMR MR节点进行作业开发时,如果需要引用开源代码资源,您需先将开源代码作为资源上传至EMR JAR资源节点中,详情请参见创建和使用EMR资源。
使用EMR MR节点进行作业开发时,如果需要引用自定义函数时,您需要先将自定义函数作为资源上传至EMR JAR资源节点中,新建注册此函数,详情请参见创建EMR函数。
如果您使用本文的作业开发示例执行相关作业流程,则还需要创建好OSS的存储空间Bucket。创建OSS的存储空间Bucket,详情请参见控制台创建存储空间。
使用限制
该类任务不支持公共调度资源组运行,支持在2023年12月1号之后购买的资源组运行。
DataLake或自定义集群若要在DataWorks管理元数据,需先在集群侧配置EMR-HOOK。若未配置,则在DataWorks中无法实时展示元数据、生成审计日志、展示血缘关系,EMR相关治理任务也将无法开展。配置EMR-HOOK,详情请参见配置Hive的EMR-HOOK。
准备初始数据及JAR资源包
准备初始数据。
创建input01.txt文件,文件内容如下。
hadoop emr hadoop dw hive hadoop dw emr
创建初始数据及JAR资源的存放目录。
登录OSS管理控制台。
单击左侧导航栏的Bucket列表
单击目标Bucket名称,进入文件管理页面。
本文示例使用的Bucket为onaliyun-bucket-2。
单击新建目录,创建初始数据及JAR资源的存放目录。
配置目录名为emr/datas/wordcount02/inputs,创建初始数据的存放目录。
配置目录名为emr/jars,创建JAR资源的存放目录。
上传初始数据文件至初始数据的存放目录。
进入/emr/datas/wordcount02/inputs路径。
单击上传文件
在待上传文件区域单击扫描文件,添加input01.txt文件至Bucket。
单击上传文件
使用MapReduce读取OSS文件并生成JAR包。
打开已创建的IDEA项目,添加pom依赖。
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-common</artifactId> <version>2.8.5</version> <!--因为EMR-MR用的是2.8.5--> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.8.5</version> </dependency>
在MapReduce中读写OSS文件,需要配置如下参数。
重要风险提示: 阿里云账号AccessKey拥有所有API的访问权限,建议您使用RAM用户进行API访问或日常运维。强烈建议不要将AccessKey ID和AccessKey Secret保存到工程代码里或者任何容易被泄露的地方,AccessKey泄露会威胁您账号下所有资源的安全。以下代码示例仅供参考,请妥善保管好您的AccessKey信息。
conf.set("fs.oss.accessKeyId", "${accessKeyId}"); conf.set("fs.oss.accessKeySecret", "${accessKeySecret}"); conf.set("fs.oss.endpoint","${endpoint}");
参数说明如下:
${accessKeyId}
:阿里云账号的AccessKey ID。${accessKeySecret}
:阿里云账号的AccessKey Secret。${endpoint}
:OSS对外服务的访问域名。由您集群所在的地域决定,对应的OSS也需要是在集群对应的地域,详情请参见访问域名和数据中心
以Java代码为例,修改Hadoop官网WordCount示例,即在代码中添加AccessKey ID和AccessKey Secret的配置,以便作业有权限访问OSS文件。
package cn.apache.hadoop.onaliyun.examples; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class EmrWordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length < 2) { System.err.println("Usage: wordcount <in> [<in>...] <out>"); System.exit(2); } conf.set("fs.oss.accessKeyId", "${accessKeyId}"); // conf.set("fs.oss.accessKeySecret", "${accessKeySecret}"); // conf.set("fs.oss.endpoint", "${endpoint}"); // Job job = Job.getInstance(conf, "word count"); job.setJarByClass(EmrWordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); for (int i = 0; i < otherArgs.length - 1; ++i) { FileInputFormat.addInputPath(job, new Path(otherArgs[i])); } FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
编辑完上述Java代码后将该代码生成JAR包。示例生成的JAR包为onaliyun_mr_wordcount-1.0-SNAPSHOT.jar。
创建EMR MR节点
进入数据开发页面。
登录DataWorks控制台,单击左侧导航栏的 ,在下拉框中选择对应工作空间后单击进入数据开发。
创建业务流程。
如果您已有业务流程,则可以忽略该步骤。
鼠标悬停至图标,选择新建业务流程。
在新建业务流程对话框,输入业务名称。
单击新建。
创建EMR MR节点。
鼠标悬停至图标,选择 。
您也可以找到相应的业务流程,右键单击业务流程,选择
。在新建节点对话框中,输入名称,并选择引擎实例、节点类型及路径。
说明节点名称必须是大小写字母、中文、数字、下划线(_)和小数点(.),且不能超过128个字符。
单击确认,进入EMR MR节点编辑页面。
开发MR任务
双击已创建的节点,进入任务开发页面,进行以下任务开发操作。
方案一:直接引用OSS资源
当前节点可直接通过OSS REF的方式直接引用OSS资源,在运行EMR节点时,DataWorks会自动加载代码中的OSS资源至本地使用。该方式常用于“需要在EMR任务中运行JAR依赖”、“EMR任务需依赖脚本”等场景。引用格式如下:
ossref://{endpoint}/{bucket}/{object}
endpoint:OSS对外服务的访问域名。Endpoint为空时,仅支持使用与当前访问的EMR集群同地域的OSS,即OSS的Bucket需要与EMR集群所在地域相同。
Bucket:OSS用于存储对象的容器,每一个Bucket有唯一的名称,登录OSS管理控制台,可查看当前登录账号下所有Bucket。
object:存储在Bucket中的一个具体的对象(文件名称或路径)。
该功能不支持公共调度资源组。并且若您在2023年12月1日前购买的资源组,需联系我们进行资源组升级以使用此功能。
方案二:先上传资源后引用EMR JAR资源
DataWorks也支持您从本地先上传资源至DataStudio,再引用资源。若EMR MR节点依赖的资源较大,则无法通过DataWorks页面上传。您可将资源存放至HDFS上,然后在代码中进行引用。
创建EMR JAR资源。
创建EMR JAR资源,详情请参见创建和使用EMR资源。示例将本文《准备初始数据及JAR资源包》中生成的JAR包存储在JAR资源的存放目录emr/jars下。首次使用需要进行一键授权,然后单击点击上传按钮,上传JAR资源。
引用EMR JAR资源。
打开创建的EMR MR节点,停留在代码编辑页面。
在
节点下,找到待引用资源(示例为onaliyun_mr_wordcount-1.0-SNAPSHOT.jar
),右键选择引用资源。选择引用后,当EMR MR节点的代码编辑页面出现如下引用成功提示时,表明已成功引用代码资源。此时,需要执行下述命令。如下命令涉及的资源包、Bucket名称、路径信息等为本文示例的内容,使用时,您需要替换为实际使用的信息。
##@resource_reference{"onaliyun_mr_wordcount-1.0-SNAPSHOT.jar"} onaliyun_mr_wordcount-1.0-SNAPSHOT.jar cn.apache.hadoop.onaliyun.examples.EmrWordCount oss://onaliyun-bucket-2/emr/datas/wordcount02/inputs oss://onaliyun-bucket-2/emr/datas/wordcount02/outputs
说明EMR MR节点编辑代码时不支持注释语句。
配置高级参数
对于任务的相关参数配置,请参考Spark Configuration。
集群类型 | 高级参数 |
DataLake集群(新版数据湖)【EMR on ECS】、Custom集群(自定义集群)【EMR on ECS】 |
说明 您也可以直接在高级配置里追加自定义MR任务参数。提交代码时DataWorks会自动在命令中通过 |
Hadoop集群(数据湖)【EMR on ECS】 |
说明 如果本节点所在的集群未关联Gateway集群,此处手动设置参数取值为 |
配置任务调度
如果您需要周期性执行创建的节点任务,可以单击节点编辑页面右侧的调度配置,根据业务需求配置该节点任务的调度信息:
配置任务调度的基本信息,详情请参见配置基础属性。
配置时间调度周期、重跑属性和上下游依赖关系,详情请参见时间属性配置说明及配置同周期调度依赖。
说明您需要设置节点的重跑属性和依赖的上游节点,才可以提交节点。
配置资源属性,详情请参见配置资源属性。访问公网或VPC网络时,请选择与目标节点网络连通的调度资源组作为周期调度任务使用的资源组。详情请参见配置资源组与网络连通。
任务发布进行周期调度
查看结果
登录OSS管理控制台,您可以在目标Bucket的初始数据存放目录下查看写入结果。示例路径为emr/datas/wordcount02/outputs。
在DataWorks读取统计结果。
新建EMR Hive节点,详情请参见创建EMR Hive节点。
在EMR Hive节点中创建挂载在OSS上的Hive外表,读取表数据。代码示例如下。
CREATE EXTERNAL TABLE IF NOT EXISTS wordcount02_result_tb ( `word` STRING COMMENT '单词', `cout` STRING COMMENT '计数' ) ROW FORMAT delimited fields terminated by '\t' location 'oss://onaliyun-bucket-2/emr/datas/wordcount02/outputs/'; SELECT * FROM wordcount02_result_tb;
运行结果如下图。
- 本页导读 (1)