Create an EMR MR node

更新时间:
复制 MD 格式

An E-MapReduce (EMR) MR node splits a large dataset into multiple parallel map tasks to accelerate large-scale computation. This topic demonstrates how to create an EMR MR node by using a WordCount example that reads text from Object Storage Service (OSS) and counts words.

Prerequisites

  • An EMR cluster must be registered with DataWorks. For more information, see DataStudio (old version): Bind an EMR compute resource.

  • (Optional, for RAM users) The RAM user for task development must be a member of the workspace and have the Development or Workspace Administrator role. The Workspace Administrator role has broad permissions, so assign it with caution. For more information, see Add members to a workspace.

  • A serverless resource group must be purchased and configured, including workspace binding and network settings. For more information, see Use a serverless resource group.

  • A workflow must be created in Data Development (DataStudio). For more information, see Create a workflow.

  • If your EMR MR node job needs to reference open-source code, you must first upload the code as a resource to an EMR JAR resource node. For more information, see Create and use EMR resources.

  • If your EMR MR node job needs to reference a user-defined function (UDF), you must first upload the UDF as a resource to an EMR JAR resource node, and then create and register the function. For more information, see Create an EMR function.

  • If you run the job development example in this topic, you must first create an OSS bucket. For more information, see Create a bucket in the console.

Limitations

  • This type of task can run only on a serverless resource group (recommended) or an exclusive resource group for scheduling.

  • To manage metadata for a DataLake or custom cluster in DataWorks, you must first configure EMR-HOOK on the cluster. Without EMR-HOOK, DataWorks cannot display real-time metadata, generate audit logs, show data lineage, or perform EMR-related governance tasks. For more information about how to configure EMR-HOOK, see Configure EMR-HOOK for Hive.

Prepare the sample data and JAR package

Prepare the sample data

Create a sample file named input01.txt with the following content.

hadoop emr hadoop dw
hive hadoop
dw emr

Upload the sample data file

  1. Log on to the OSS console. In the left-side navigation pane, click Buckets.

  2. Click the name of the target bucket to open the File Management page.

    This topic uses a bucket named onaliyun-bucket-2 as an example.

  3. Click New Directory to create directories for the sample data and JAR resources.

    • Set Directory Name to emr/datas/wordcount02/inputs to create a directory for the sample data.

    • Set Directory Name to emr/jars to create a directory for the JAR resources.

  4. Upload the sample data file to the data directory.

    • Go to the /emr/datas/wordcount02/inputs path and click Upload File.

    • In the Files to Upload area, click Select Files, add the input01.txt file to the bucket, and then click Upload File.

Build a MapReduce JAR package

  1. Open your IntelliJ IDEA project and add the following POM dependencies.

            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-mapreduce-client-common</artifactId>
                <version>2.8.5</version> <!-- EMR MR uses version 2.8.5. -->
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-common</artifactId>
                <version>2.8.5</version>
            </dependency>
  2. To read from and write to OSS files in MapReduce, you must configure the following parameters.

    Important

    Risk warning: Your Alibaba Cloud account's AccessKey has full access to all APIs. We recommend using a RAM user for API access and daily O&M. Do not store your AccessKey ID and AccessKey Secret in your project code or anywhere they might be exposed. A leaked AccessKey can compromise the security of all resources under your account. The following code sample is for reference only. Keep your AccessKey information secure.

    conf.set("fs.oss.accessKeyId", "${accessKeyId}");
    conf.set("fs.oss.accessKeySecret", "${accessKeySecret}");
    conf.set("fs.oss.endpoint","${endpoint}");

    The following table describes the parameters.

    • ${accessKeyId}: The AccessKey ID of your Alibaba Cloud account.

    • ${accessKeySecret}: The AccessKey secret of your Alibaba Cloud account.

    • ${endpoint}: The OSS endpoint. The OSS bucket must be in the same region as the EMR cluster, and this region determines the endpoint. For more information, see Regions and endpoints.

    The following Java code provides an example of how to modify the WordCount example from the official Hadoop website. It adds the AccessKey ID and AccessKey secret to grant the job permission to access the OSS file.

    Sample code

    package cn.apache.hadoop.onaliyun.examples;
    import java.io.IOException;
    import java.util.StringTokenizer;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    public class EmrWordCount {
        public static class TokenizerMapper
                extends Mapper<Object, Text, Text, IntWritable> {
            private final static IntWritable one = new IntWritable(1);
            private Text word = new Text();
            public void map(Object key, Text value, Context context
            ) throws IOException, InterruptedException {
                StringTokenizer itr = new StringTokenizer(value.toString());
                while (itr.hasMoreTokens()) {
                    word.set(itr.nextToken());
                    context.write(word, one);
                }
            }
        }
        public static class IntSumReducer
                extends Reducer<Text, IntWritable, Text, IntWritable> {
            private IntWritable result = new IntWritable();
            public void reduce(Text key, Iterable<IntWritable> values,
                               Context context
            ) throws IOException, InterruptedException {
                int sum = 0;
                for (IntWritable val : values) {
                    sum += val.get();
                }
                result.set(sum);
                context.write(key, result);
            }
        }
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
            if (otherArgs.length < 2) {
                System.err.println("Usage: wordcount <in> [<in>...] <out>");
                System.exit(2);
            }
            conf.set("fs.oss.accessKeyId", "${accessKeyId}"); // 
            conf.set("fs.oss.accessKeySecret", "${accessKeySecret}"); // 
            conf.set("fs.oss.endpoint", "${endpoint}"); //
            Job job = Job.getInstance(conf, "word count");
            job.setJarByClass(EmrWordCount.class);
            job.setMapperClass(TokenizerMapper.class);
            job.setCombinerClass(IntSumReducer.class);
            job.setReducerClass(IntSumReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            for (int i = 0; i < otherArgs.length - 1; ++i) {
                FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
            }
            FileOutputFormat.setOutputPath(job,
                    new Path(otherArgs[otherArgs.length - 1]));
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
    }          
  3. Package the code into a JAR file.

    After you edit and save the Java code, package it into a JAR file. The JAR package generated in this example is named onaliyun_mr_wordcount-1.0-SNAPSHOT.jar.

Step 1: Create an EMR MR node

  1. Log on to the DataWorks console. In the target region, click Data Development and O&M > Data Development in the left-side navigation pane. Select a workspace from the drop-down list and click Go to Data Development.

  2. Create an EMR MR node.

    1. Right-click the target workflow and choose Create Node > EMR > EMR MR.

      Note

      Alternatively, you can move the pointer over the Create icon and choose Create Node > EMR > EMR MR.

    2. In the Create Node dialog box, enter a Name, and select an Engine Instance, Node Type, and Path. Click Confirm to open the EMR MR node editor.

      Note

      Node names can contain letters, digits, underscores (_), and periods (.).

Step 2: Develop the EMR MR task

In the EMR MR node editor, double-click the node that you created to open the task development page. Choose one of the following options based on your scenario:

Option 1: Upload and reference a resource

You can upload resources from your local machine to DataStudio to reference in your node. If a resource is too large to upload through the console, store it in HDFS and reference it from your code instead.

  1. Create an EMR JAR resource.

    For more information, see Create and use an EMR resource. In this example, the JAR package generated in the Prepare the sample data and JAR package section is stored in the emr/jars directory. The first time you use this feature, click Authorize, and then click Click Upload to upload the JAR resource. In the workflow navigation tree on the left, right-click Resource to open the Create Resource dialog box. Set Engine Type to EMR and Resource Type to EMR JAR. For Storage Path, select One-click authorization for OSS. Upload the file, set its Name to onaliyun_mr_wordcount-1.0-SNAPSHOT.jar, and then click Create.

  2. Reference the EMR JAR resource.

    1. Open the EMR MR node that you created and go to the code editor.

    2. In the EMR > Resources folder, find the resource that you want to reference, such as onaliyun_mr_wordcount-1.0-SNAPSHOT.jar in this example. Right-click the resource and choose Insert Resource Path.

    3. After you select the reference, a success message appears on the code editing page for the EMR MR node. This indicates that the code resource is successfully referenced. Then, run the following command. The resource packages, bucket names, and path information in the following command are examples. You must replace them with your actual information.

      ##@resource_reference{"onaliyun_mr_wordcount-1.0-SNAPSHOT.jar"}
      onaliyun_mr_wordcount-1.0-SNAPSHOT.jar cn.apache.hadoop.onaliyun.examples.EmrWordCount oss://onaliyun-bucket-2/emr/datas/wordcount02/inputs oss://onaliyun-bucket-2/emr/datas/wordcount02/outputs
      Note

      The code editor for EMR MR nodes does not support comment statements.

Option 2: Use OSS REF

You can use the OSS REF method to directly reference a resource from OSS. When the node runs, DataWorks automatically loads the specified OSS resource for the job to use. This method is suitable when an EMR task depends on a JAR or script.

  1. Upload the JAR resource.

    1. After you develop the code, log on to the OSS console. In the left-side navigation pane for your region, click Buckets.

    2. Click the name of the target bucket to open the File Management page.

      This topic uses a bucket named onaliyun-bucket-2 as an example.

    3. Upload the JAR resource to its storage directory.

      Go to the emr/jars directory and click Upload File. In the Files to Upload area, click Select Files, add the onaliyun_mr_wordcount-1.0-SNAPSHOT.jar file to the bucket, and then click Upload File.

  2. Reference the JAR resource.

    Edit the code to reference the JAR resource.

    On the configuration page of the EMR MR node, edit the code to reference the JAR resource.

    hadoop jar ossref://onaliyun-bucket-2/emr/jars/onaliyun_mr_wordcount-1.0-SNAPSHOT.jar cn.apache.hadoop.onaliyun.examples.EmrWordCount oss://onaliyun-bucket-2/emr/datas/wordcount02/inputs oss://onaliyun-bucket-2/emr/datas/wordcount02/outputs
    Note

    The command format is as follows: hadoop jar <path_of_the_JAR_file_to_run> <fully_qualified_name_of_the_main_class> <input_directory> <output_directory>.

    The following table describes the parameter for the path of the JAR file.

    Parameter

    Description

    Path of the JAR file to run

    The format is ossref://{endpoint}/{bucket}/{object}.

    • Endpoint: The OSS endpoint. This parameter is optional. If omitted, the system defaults to the region of your EMR cluster, and you can only access OSS resources within that same region.

    • Bucket: A container for storing objects in OSS. Each bucket has a unique name. You can log on to the OSS console to view all buckets under your account.

    • object: A specific object, such as a file name or path, that is stored in a bucket.

(Optional) Configure advanced parameters

You can configure node-specific properties on the Advanced Settings tab. For more information about how to configure the properties, see Spark Configuration. The available advanced parameters vary based on the EMR cluster type, as shown in the following tables.

DataLake and custom cluster

Advanced parameter

Description

queue

The scheduling queue to which the job is submitted. The default value is default. For more information about EMR YARN, see Basic queue configurations.

priority

The priority of the job. The default value is 1.

Other

You can also add custom parameters for MR tasks on the Advanced Settings tab. When you commit the code, DataWorks automatically appends the new parameters to the command by using the -D key=value format.

Hadoop cluster

Advanced parameter

Description

queue

The scheduling queue to which the job is submitted. The default value is default. For more information about EMR YARN, see Basic queue configurations.

priority

The priority of the job. The default value is 1.

USE_GATEWAY

Specifies whether to submit the job through a gateway cluster. Valid values:

  • true: submits the job through a gateway cluster.

  • false (default): does not submit the job through a gateway cluster. The job is submitted to the master node by default.

Note

If the cluster where the node resides is not associated with a gateway cluster and you manually set this parameter to true, the EMR job fails to be submitted.

Run the task

  1. In the toolbar, click the 高级运行 icon. In the Parameter dialog box, select the scheduling resource group and click Running.

    Note
    • To access a compute resource over a public network or a VPC, you must use a resource group for scheduling that passes the connectivity test with the compute resource. For more information, see Network connectivity solutions.

    • If you need to change the resource group for subsequent tasks, you can click the Run with Parameters 高级运行 icon and select the desired resource group.

  2. Click the 保存 icon to save the code.

  3. (Optional) Perform smoke testing.

    If you want to perform smoke testing in the development environment, you can do so before or after you commit the node. For more information, see Perform smoke testing.

Step 3: Configure scheduling properties

If you want the system to periodically run a task on the node, you can click Properties in the right-side navigation pane on the configuration tab of the node to configure task scheduling properties based on your business requirements. For more information, see Overview.

Note

You must configure the Rerun and Parent Nodes parameters on the Properties tab before you commit the task.

Step 4: Deploy the task

After a task on a node is configured, you must commit and deploy the task. After you commit and deploy the task, the system runs the task on a regular basis based on scheduling configurations.

  1. Click the 保存 icon in the top toolbar to save the task.

  2. Click the 提交 icon in the top toolbar to commit the task.

    In the Submit dialog box, configure the Change description parameter. Then, determine whether to review task code after you commit the task based on your business requirements.

    Note
    • You must configure the Rerun and Parent Nodes parameters on the Properties tab before you commit the task.

    • You can use the code review feature to ensure the code quality of tasks and prevent task execution errors caused by invalid task code. If you enable the code review feature, the task code that is committed can be deployed only after the task code passes the code review. For more information, see Code review.

If you use a workspace in standard mode, you must deploy the task in the production environment after you commit the task. To deploy a task on a node, click Deploy in the upper-right corner of the configuration tab of the node. For more information, see Deploy nodes.

More operations

After you commit and deploy the task, the task is periodically run based on the scheduling configurations. You can click Operation Center in the upper-right corner of the configuration tab of the corresponding node to go to Operation Center and view the scheduling status of the task. For more information, see Manage scheduled tasks.

View the results

  • Log on to the OSS console. View the output in the specified directory of your bucket. Example path: emr/datas/wordcount02/outputs. After the wordcount02 job is complete, a _SUCCESS flag file and multiple part-r-* result shard files are generated in the /emr/datas/wordcount02/outputs/ directory in OSS. These files confirm that the job successfully produced output.

  • Read the statistics in DataWorks.

    1. Create an EMR Hive node. For more information, see Create an EMR Hive node.

    2. In the EMR Hive node, create a Hive external table over the data in OSS. The following is sample code:

      CREATE EXTERNAL TABLE IF NOT EXISTS wordcount02_result_tb
      (
          `word` STRING COMMENT 'word',
          `cout` STRING COMMENT 'count'   
      ) 
      ROW FORMAT delimited fields terminated by '\t'
      location 'oss://onaliyun-bucket-2/emr/datas/wordcount02/outputs/';
      SELECT * FROM wordcount02_result_tb;

      After running the query, the Results tab displays the word frequency statistics: dw (2), hadoop (3), emr (2), and hive (1).