An E-MapReduce (EMR) MR node splits a large dataset into multiple parallel map tasks to accelerate large-scale computation. This topic demonstrates how to create an EMR MR node by using a WordCount example that reads text from Object Storage Service (OSS) and counts words.
Prerequisites
-
An EMR cluster must be registered with DataWorks. For more information, see DataStudio (old version): Bind an EMR compute resource.
-
(Optional, for RAM users) The RAM user for task development must be a member of the workspace and have the Development or Workspace Administrator role. The Workspace Administrator role has broad permissions, so assign it with caution. For more information, see Add members to a workspace.
-
A serverless resource group must be purchased and configured, including workspace binding and network settings. For more information, see Use a serverless resource group.
-
A workflow must be created in Data Development (DataStudio). For more information, see Create a workflow.
-
If your EMR MR node job needs to reference open-source code, you must first upload the code as a resource to an EMR JAR resource node. For more information, see Create and use EMR resources.
-
If your EMR MR node job needs to reference a user-defined function (UDF), you must first upload the UDF as a resource to an EMR JAR resource node, and then create and register the function. For more information, see Create an EMR function.
-
If you run the job development example in this topic, you must first create an OSS bucket. For more information, see Create a bucket in the console.
Limitations
-
This type of task can run only on a serverless resource group (recommended) or an exclusive resource group for scheduling.
-
To manage metadata for a DataLake or custom cluster in DataWorks, you must first configure EMR-HOOK on the cluster. Without EMR-HOOK, DataWorks cannot display real-time metadata, generate audit logs, show data lineage, or perform EMR-related governance tasks. For more information about how to configure EMR-HOOK, see Configure EMR-HOOK for Hive.
Prepare the sample data and JAR package
Prepare the sample data
Create a sample file named input01.txt with the following content.
hadoop emr hadoop dw
hive hadoop
dw emr
Upload the sample data file
-
Log on to the OSS console. In the left-side navigation pane, click Buckets.
-
Click the name of the target bucket to open the File Management page.
This topic uses a bucket named onaliyun-bucket-2 as an example.
-
Click New Directory to create directories for the sample data and JAR resources.
-
Set Directory Name to emr/datas/wordcount02/inputs to create a directory for the sample data.
-
Set Directory Name to emr/jars to create a directory for the JAR resources.
-
-
Upload the sample data file to the data directory.
-
Go to the /emr/datas/wordcount02/inputs path and click Upload File.
-
In the Files to Upload area, click Select Files, add the input01.txt file to the bucket, and then click Upload File.
-
Build a MapReduce JAR package
-
Open your IntelliJ IDEA project and add the following POM dependencies.
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-common</artifactId> <version>2.8.5</version> <!-- EMR MR uses version 2.8.5. --> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.8.5</version> </dependency> -
To read from and write to OSS files in MapReduce, you must configure the following parameters.
ImportantRisk warning: Your Alibaba Cloud account's AccessKey has full access to all APIs. We recommend using a RAM user for API access and daily O&M. Do not store your AccessKey ID and AccessKey Secret in your project code or anywhere they might be exposed. A leaked AccessKey can compromise the security of all resources under your account. The following code sample is for reference only. Keep your AccessKey information secure.
conf.set("fs.oss.accessKeyId", "${accessKeyId}"); conf.set("fs.oss.accessKeySecret", "${accessKeySecret}"); conf.set("fs.oss.endpoint","${endpoint}");The following table describes the parameters.
-
${accessKeyId}: The AccessKey ID of your Alibaba Cloud account. -
${accessKeySecret}: The AccessKey secret of your Alibaba Cloud account. -
${endpoint}: The OSS endpoint. The OSS bucket must be in the same region as the EMR cluster, and this region determines the endpoint. For more information, see Regions and endpoints.
The following Java code provides an example of how to modify the WordCount example from the official Hadoop website. It adds the AccessKey ID and AccessKey secret to grant the job permission to access the OSS file.
-
-
Package the code into a JAR file.
After you edit and save the Java code, package it into a JAR file. The JAR package generated in this example is named onaliyun_mr_wordcount-1.0-SNAPSHOT.jar.
Step 1: Create an EMR MR node
Log on to the DataWorks console. In the target region, click in the left-side navigation pane. Select a workspace from the drop-down list and click Go to Data Development.
-
Create an EMR MR node.
-
Right-click the target workflow and choose .
NoteAlternatively, you can move the pointer over the Create icon and choose .
-
In the Create Node dialog box, enter a Name, and select an Engine Instance, Node Type, and Path. Click Confirm to open the EMR MR node editor.
NoteNode names can contain letters, digits, underscores (_), and periods (.).
-
Step 2: Develop the EMR MR task
In the EMR MR node editor, double-click the node that you created to open the task development page. Choose one of the following options based on your scenario:
-
(Recommended) Upload resources from your local machine to DataStudio first, and then reference them. For more information, see Option 1: Upload and reference a resource.
-
Reference OSS resources by using the OSS REF method. For more information, see Option 2: Use OSS REF.
Option 1: Upload and reference a resource
You can upload resources from your local machine to DataStudio to reference in your node. If a resource is too large to upload through the console, store it in HDFS and reference it from your code instead.
-
Create an EMR JAR resource.
For more information, see Create and use an EMR resource. In this example, the JAR package generated in the Prepare the sample data and JAR package section is stored in the emr/jars directory. The first time you use this feature, click Authorize, and then click Click Upload to upload the JAR resource. In the workflow navigation tree on the left, right-click Resource to open the Create Resource dialog box. Set Engine Type to EMR and Resource Type to EMR JAR. For Storage Path, select One-click authorization for OSS. Upload the file, set its Name to
onaliyun_mr_wordcount-1.0-SNAPSHOT.jar, and then click Create. -
Reference the EMR JAR resource.
-
Open the EMR MR node that you created and go to the code editor.
-
In the folder, find the resource that you want to reference, such as
onaliyun_mr_wordcount-1.0-SNAPSHOT.jarin this example. Right-click the resource and choose Insert Resource Path. -
After you select the reference, a success message appears on the code editing page for the EMR MR node. This indicates that the code resource is successfully referenced. Then, run the following command. The resource packages, bucket names, and path information in the following command are examples. You must replace them with your actual information.
##@resource_reference{"onaliyun_mr_wordcount-1.0-SNAPSHOT.jar"} onaliyun_mr_wordcount-1.0-SNAPSHOT.jar cn.apache.hadoop.onaliyun.examples.EmrWordCount oss://onaliyun-bucket-2/emr/datas/wordcount02/inputs oss://onaliyun-bucket-2/emr/datas/wordcount02/outputsNoteThe code editor for EMR MR nodes does not support comment statements.
-
Option 2: Use OSS REF
You can use the OSS REF method to directly reference a resource from OSS. When the node runs, DataWorks automatically loads the specified OSS resource for the job to use. This method is suitable when an EMR task depends on a JAR or script.
-
Upload the JAR resource.
-
After you develop the code, log on to the OSS console. In the left-side navigation pane for your region, click Buckets.
-
Click the name of the target bucket to open the File Management page.
This topic uses a bucket named
onaliyun-bucket-2as an example. -
Upload the JAR resource to its storage directory.
Go to the
emr/jarsdirectory and click Upload File. In the Files to Upload area, click Select Files, add theonaliyun_mr_wordcount-1.0-SNAPSHOT.jarfile to the bucket, and then click Upload File.
-
-
Reference the JAR resource.
Edit the code to reference the JAR resource.
On the configuration page of the EMR MR node, edit the code to reference the JAR resource.
hadoop jar ossref://onaliyun-bucket-2/emr/jars/onaliyun_mr_wordcount-1.0-SNAPSHOT.jar cn.apache.hadoop.onaliyun.examples.EmrWordCount oss://onaliyun-bucket-2/emr/datas/wordcount02/inputs oss://onaliyun-bucket-2/emr/datas/wordcount02/outputsNoteThe command format is as follows:
hadoop jar <path_of_the_JAR_file_to_run> <fully_qualified_name_of_the_main_class> <input_directory> <output_directory>.The following table describes the parameter for the path of the JAR file.
Parameter
Description
Path of the JAR file to run
The format is
ossref://{endpoint}/{bucket}/{object}.-
Endpoint: The OSS endpoint. This parameter is optional. If omitted, the system defaults to the region of your EMR cluster, and you can only access OSS resources within that same region.
-
Bucket: A container for storing objects in OSS. Each bucket has a unique name. You can log on to the OSS console to view all buckets under your account.
-
object: A specific object, such as a file name or path, that is stored in a bucket.
-
(Optional) Configure advanced parameters
You can configure node-specific properties on the Advanced Settings tab. For more information about how to configure the properties, see Spark Configuration. The available advanced parameters vary based on the EMR cluster type, as shown in the following tables.
DataLake and custom cluster
|
Advanced parameter |
Description |
|
queue |
The scheduling queue to which the job is submitted. The default value is |
|
priority |
The priority of the job. The default value is 1. |
|
Other |
You can also add custom parameters for MR tasks on the Advanced Settings tab. When you commit the code, DataWorks automatically appends the new parameters to the command by using the |
Hadoop cluster
|
Advanced parameter |
Description |
|
queue |
The scheduling queue to which the job is submitted. The default value is |
|
priority |
The priority of the job. The default value is 1. |
|
USE_GATEWAY |
Specifies whether to submit the job through a gateway cluster. Valid values:
Note
If the cluster where the node resides is not associated with a gateway cluster and you manually set this parameter to |
Run the task
-
In the toolbar, click the
icon. In the Parameter dialog box, select the scheduling resource group and click Running.Note-
To access a compute resource over a public network or a VPC, you must use a resource group for scheduling that passes the connectivity test with the compute resource. For more information, see Network connectivity solutions.
-
If you need to change the resource group for subsequent tasks, you can click the Run with Parameters
icon and select the desired resource group.
-
-
Click the
icon to save the code. -
(Optional) Perform smoke testing.
If you want to perform smoke testing in the development environment, you can do so before or after you commit the node. For more information, see Perform smoke testing.
Step 3: Configure scheduling properties
If you want the system to periodically run a task on the node, you can click Properties in the right-side navigation pane on the configuration tab of the node to configure task scheduling properties based on your business requirements. For more information, see Overview.
You must configure the Rerun and Parent Nodes parameters on the Properties tab before you commit the task.
Step 4: Deploy the task
After a task on a node is configured, you must commit and deploy the task. After you commit and deploy the task, the system runs the task on a regular basis based on scheduling configurations.
-
Click the
icon in the top toolbar to save the task. -
Click the
icon in the top toolbar to commit the task. In the Submit dialog box, configure the Change description parameter. Then, determine whether to review task code after you commit the task based on your business requirements.
Note-
You must configure the Rerun and Parent Nodes parameters on the Properties tab before you commit the task.
-
You can use the code review feature to ensure the code quality of tasks and prevent task execution errors caused by invalid task code. If you enable the code review feature, the task code that is committed can be deployed only after the task code passes the code review. For more information, see Code review.
-
If you use a workspace in standard mode, you must deploy the task in the production environment after you commit the task. To deploy a task on a node, click Deploy in the upper-right corner of the configuration tab of the node. For more information, see Deploy nodes.
More operations
After you commit and deploy the task, the task is periodically run based on the scheduling configurations. You can click Operation Center in the upper-right corner of the configuration tab of the corresponding node to go to Operation Center and view the scheduling status of the task. For more information, see Manage scheduled tasks.
View the results
-
Log on to the OSS console. View the output in the specified directory of your bucket. Example path: emr/datas/wordcount02/outputs. After the wordcount02 job is complete, a _SUCCESS flag file and multiple part-r-* result shard files are generated in the
/emr/datas/wordcount02/outputs/directory in OSS. These files confirm that the job successfully produced output. -
Read the statistics in DataWorks.
-
Create an EMR Hive node. For more information, see Create an EMR Hive node.
-
In the EMR Hive node, create a Hive external table over the data in OSS. The following is sample code:
CREATE EXTERNAL TABLE IF NOT EXISTS wordcount02_result_tb ( `word` STRING COMMENT 'word', `cout` STRING COMMENT 'count' ) ROW FORMAT delimited fields terminated by '\t' location 'oss://onaliyun-bucket-2/emr/datas/wordcount02/outputs/'; SELECT * FROM wordcount02_result_tb;After running the query, the Results tab displays the word frequency statistics:
dw(2),hadoop(3),emr(2), andhive(1).
-