Apache Spark is a versatile, high-performance, and easy-to-use big data analytics engine that performs complex in-memory analytics and builds large-scale, low-latency data analytics applications. DataWorks provides EMR Spark nodes to develop Spark tasks and implement periodic scheduling. This topic explains how to create an EMR Spark node and demonstrates its features with a detailed example.
Prerequisites
-
To customize the component environment before you start node development, create a custom image based on the official
dataworks_emr_base_task_podimage and then use the image in DataStudio.For example, you can replace a Spark JAR package or add dependencies on specific
libraries,files, orJAR packageswhen you create the custom image. -
An EMR cluster must be registered with DataWorks. For more information, see DataStudio (legacy version): Bind an EMR computing resource.
-
(Optional) If you use a RAM user for task development, it must be added to the corresponding workspace and granted the Development or Workspace Administrator role. The Workspace Manager role grants extensive permissions. Grant this role with caution. For more information, see Add members to a workspace.
-
A resource group must be purchased and configured, including its workspace binding and network settings. For more information, see Use a Serverless resource group.
-
A business flow must be created. DataStudio organizes development operations into business flows, so you cannot create a node without one. For more information, see Create a business flow.
-
If your development tasks require a specific environment, you can use the custom image feature provided by DataWorks to build a custom image that contains the required components for task execution. For more information, see Custom image.
Limitations
-
This type of task can run only on a serverless resource group (recommended) or an exclusive resource group for scheduling. If you need to use an image during development in DataStudio, you must use a serverless resource group.
-
For DataLake or custom clusters, you must configure EMR-HOOK on the cluster to manage metadata in DataWorks. If EMR-HOOK is not configured, you cannot view real-time metadata, generate audit logs, display data lineage, or perform EMR-related governance tasks in DataWorks. For more information, see Configure EMR-HOOK for Spark SQL.
-
EMR Serverless Spark clusters support viewing data lineage, whereas EMR on ACK Spark clusters do not.
-
EMR on ACK Spark clusters and EMR Serverless Spark clusters support referencing OSS resources by using
ossrefand uploading resources to OSS, but do not support uploading resources to HDFS. -
DataLake clusters and custom clusters support referencing OSS resources by using
ossref, uploading resources to OSS, and uploading resources to HDFS.
Notes
If you have enabled Ranger access control for Spark in the EMR cluster associated with the current workspace:
-
Spark tasks that use the default image automatically support this feature.
-
To run a Spark task with a custom image, submit a ticket to technical support to request an image upgrade.
Preparation: Develop a Spark job JAR
Before you can schedule an EMR Spark job with DataWorks, you must first develop the job in EMR, compile it, and generate a JAR package. For more information about developing EMR Spark jobs, see Spark overview.
You will then upload the JAR package to DataWorks to schedule the EMR Spark job periodically.
I. Create an EMR Spark node
Log on to the DataWorks console. In the target region, click in the left-side navigation pane. Select a workspace from the drop-down list and click Go to Data Development.
-
Create an EMR Spark node.
-
Right-click the target business flow and choose .
NoteYou can also hover over Create and choose .
-
In the Create Node dialog box, enter a Name and select the Engine Instance, Node Type, and Path. Click Confirm to go to the EMR Spark node editor.
NoteThe node name can contain uppercase letters, lowercase letters, Chinese characters, digits, underscores (
_), and periods (.).
-
2. Develop a Spark job
To open the code editor, double-click the EMR Spark node. You can then select a method based on your scenario.
-
(Recommended) Upload resources from your local machine to DataStudio, and then reference the resources. For more information, see Method 1: Upload resources before you reference EMR JAR resources.
-
Reference an OSS resource by using the OSS REF method. For more information, see Method 2: Directly reference an OSS resource.
Method 1: Upload and reference an EMR JAR resource
In DataWorks, you can upload a resource from your local computer to DataStudio and then reference it. After compiling an EMR Spark job into a JAR package, select a storage method based on its size.
You can upload and commit a JAR package as a DataWorks EMR resource. Alternatively, you can store it in the HDFS of an EMR cluster. EMR on ACK Spark clusters and EMR Serverless Spark clusters do not support resource uploads to HDFS.
JAR package smaller than 500 MB
-
Create an EMR JAR resource.
This method allows for visual resource management in the DataWorks console. After you create the resource, commit it. For more information, see Create and use EMR resources.
In the Create Resource dialog box, set Engine Type to EMR and select an Engine Instance. Set Resource Type to EMR JAR, specify the Path, and select Upload as EMR Resource. Select a Storage Path type, which can be OSS One-click Authorization or HDFS. Set File Source to Local and click Upload to upload the file. Specify a Name for the resource. The name of a JAR resource must end with
.jar. Then, click Create.NoteThe first time you create an EMR resource to be stored in OSS, you must authorize access as prompted.
-
Reference the EMR JAR resource.
-
Double-click the EMR Spark node to open its code editor.
-
In the left-side navigation tree, choose . Find your EMR JAR resource, right-click it, and select Insert Resource Path.
-
After you reference the resource, the system automatically adds a reference code snippet to the node's code editor. The following code snippet provides an example.
##@resource_reference{"spark-examples_2.12-1.0.0-SNAPSHOT-shaded.jar"} spark-examples_2.12-1.0.0-SNAPSHOT-shaded.jarThe presence of this snippet indicates that the resource is referenced. In the preceding code, spark-examples_2.12-1.0.0-SNAPSHOT-shaded.jar is the name of the uploaded EMR JAR resource.
-
Modify the code of the EMR Spark node and add a spark-submit command. The following code provides an example.
NoteThe code editor for EMR Spark nodes does not support comments. Modify the job code based on the following example. Do not add comments; otherwise, an error will occur when the node runs.
##@resource_reference{"spark-examples_2.11-2.4.0.jar"} spark-submit --class org.apache.spark.examples.SparkPi --master yarn spark-examples_2.11-2.4.0.jar 100The following table describes the parameters.
-
org.apache.spark.examples.SparkPi: the main class of the job in the compiled JAR package.
-
spark-examples_2.11-2.4.0.jar: the name of the EMR JAR resource that you uploaded.
-
You can leave the other parameters as they are in the example above. You can also run the following command to view the help for
spark submitand modify thespark submitcommand as needed.Note-
If you want to use simplified parameters of the
spark-submitcommand in a Spark node, you must add the parameters to the code. Example:--executor-memory 2G. -
Spark nodes support job submission only in YARN cluster mode.
-
For jobs submitted by using the
spark-submitcommand, set the deploy mode parameter to cluster, not client.
spark-submit --helpspark-submit --help Process Output>>> Process Output>>> Options: Process Output>>> --master MASTER_URL spark://host:port, mesos://host:port, yarn, Process Output>>> k8s://https://host:port, or local (Default: local[*]). Process Output>>> --deploy-mode DEPLOY_MODE Whether to launch the driver program locally ("client") or Process Output>>> on one of the worker machines inside the cluster ("cluster") Process Output>>> (Default: client). Process Output>>> --class CLASS_NAME Your application's main class (for Java / Scala apps). Process Output>>> --name NAME A name of your application. Process Output>>> --jars JARS Comma-separated list of jars to include on the driver Process Output>>> and executor classpaths. Process Output>>> --packages Comma-separated list of maven coordinates of jars to include Process Output>>> on the driver and executor classpaths. Will search the local Process Output>>> maven repo, then maven central and any additional remote Process Output>>> repositories given by --repositories. The format for the Process Output>>> coordinates should be groupId:artifactId:version. Process Output>>> --exclude-packages Comma-separated list of groupId:artifactId, to exclude while Process Output>>> resolving the dependencies provided in --packages to avoid Process Output>>> dependency conflicts. Process Output>>> --repositories Comma-separated list of additional remote repositories to Process Output>>> search for the maven coordinates given with --packages. Process Output>>> --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to place Process Output>>> on the PYTHONPATH for Python apps. Process Output>>> --files FILES Comma-separated list of files to be placed in the working Process Output>>> directory of each executor. File paths of these files -
-
-
JAR package 500 MB or larger
-
Create an EMR JAR resource.
If a JAR package is 500 MB or larger, you cannot upload it from your local computer as a DataWorks resource. Store the JAR package in the HDFS of your EMR cluster and record its storage path. You can then use the path to reference the package when you schedule the Spark job.
-
Reference the EMR JAR resource.
If the JAR package is stored in HDFS, reference it directly by specifying its path in the EMR Spark node code.
-
Double-click the created EMR Spark node to open the node's code editor.
-
Write a spark-submit command. The following code provides an example.
spark-submit --master yarn --deploy-mode cluster --name SparkPi --driver-memory 4G --driver-cores 1 --num-executors 5 --executor-memory 4G --executor-cores 1 --class org.apache.spark.examples.JavaSparkPi hdfs:///tmp/jars/spark-examples_2.11-2.4.8.jar 100The following table describes the parameters.
-
hdfs:///tmp/jars/spark-examples_2.11-2.4.8.jar: the path of the JAR package in HDFS.
-
org.apache.spark.examples.JavaSparkPi: the main class of the job in the compiled JAR package.
-
Other parameters are EMR cluster parameters and must be modified based on your business requirements. You can also run the following command to view the help information for the spark-submit command and modify the command as needed.
Important-
If you want to use simplified parameters of the Spark-submit command in a Spark node, you must add the parameters to the code. Example:
--executor-memory 2G. -
Spark nodes support job submission only in YARN cluster mode.
-
For jobs submitted by using the spark-submit command, set the deploy mode parameter to cluster, not client.
spark-submit --help -
-
-
Method 2: Directly reference an OSS resource
(Optional) Advanced parameters
You can configure Spark-specific parameters in the Advanced Settings section of a node. For more information about Spark properties, see Spark Configuration. The advanced parameters that you can configure vary for different types of EMR clusters, as shown in the following table.
DataLake or custom cluster: EMR on ECS
|
Parameter |
Description |
|
queue |
The scheduling queue to which jobs are submitted. Default value: default. If you configure a workspace-level YARN resource queue when you register an EMR cluster to a DataWorks workspace, the following rules apply:
For more information about EMR YARN, see Basic queue configurations. For more information about how to configure a queue when you register an EMR cluster, see Set the global YARN resource queue. |
|
priority |
The priority. Default value: 1. |
|
FLOW_SKIP_SQL_ANALYZE |
The method that is used to run SQL statements. Valid values:
Note
This parameter can be used only for test runs in Data Development. |
|
Others |
|
EMR Serverless Spark cluster
For more information about how to configure parameters, see Configure parameters for a Spark job.
|
Parameter |
Description |
|
queue |
The scheduling queue to which jobs are submitted. Default value: dev_queue. |
|
priority |
The priority. Default value: 1. |
|
FLOW_SKIP_SQL_ANALYZE |
The method that is used to run SQL statements. Valid values:
Note
This parameter can be used only for test runs in Data Development. |
|
SERVERLESS_RELEASE_VERSION |
The version of the Spark engine. By default, the Default Engine Version that is configured for the cluster in the Cluster Management section of the Management Center page is used. You can configure this parameter to specify different engine versions for different jobs. |
|
SERVERLESS_QUEUE_NAME |
The resource queue that you want to specify. By default, the Default Resource Queue that is configured for the cluster in the Cluster Management section of the Management Center page is used. If you want to isolate and manage resources, you can add queues. For more information, see Manage resource queues. |
|
Others |
|
Spark cluster: EMR on ACK
|
Parameter |
Description |
|
queue |
This parameter is not supported. |
|
priority |
This parameter is not supported. |
|
FLOW_SKIP_SQL_ANALYZE |
The method that is used to run SQL statements. Valid values:
Note
This parameter can be used only for test runs in Data Development. |
|
Others |
|
Hadoop cluster: EMR on ECS
|
Parameter |
Description |
|
queue |
The scheduling queue to which jobs are submitted. Default value: default. If you configure a workspace-level YARN resource queue when you register an EMR cluster to a DataWorks workspace, the following rules apply:
For more information about EMR YARN, see Basic queue configurations. For more information about how to configure a queue when you register an EMR cluster, see Set the global YARN resource queue. |
|
priority |
The priority. Default value: 1. |
|
FLOW_SKIP_SQL_ANALYZE |
The method that is used to run SQL statements. Valid values:
Note
This parameter can be used only for test runs in Data Development. |
|
USE_GATEWAY |
Specifies whether to submit jobs from the current node through a Gateway cluster. Valid values:
Note
If the cluster in which the current node resides is not associated with a Gateway cluster and you set this parameter to |
|
Others |
|
SQL job
-
On the toolbar, click the
icon. In the Parameter dialog box, select the created scheduling resource group and click Running.Note-
To access compute resources that are deployed in a public network or a VPC, you must use a scheduling resource group that can connect to the compute resources. For more information, see Solutions for network connections.
-
If you want to modify the resource group that is used to run a job, click the Run with Parameters
icon and select the resource group that you want to use. -
When you query data by using an EMR Spark node, the result is limited to a maximum of 10,000 records and a total size of 10 MB.
-
-
Click the
icon to save your SQL statements. -
(Optional) Perform smoke testing.
If you want to run smoke tests in the Development environment, you can run them when you commit to an execution node or after the node is committed. For more information, see Run a smoke test.
Step 3: Configure node scheduling
If you want the system to periodically run a task on the node, you can click Properties in the right-side navigation pane on the configuration tab of the node to configure task scheduling properties based on your business requirements. For more information, see Overview.
-
You must set the Rerun attribute property and the Parent Nodes for the node before you can submit it.
-
If you need to customize the component environment, you can create a custom image based on the official
dataworks_emr_base_task_podimage and use the image in DataStudio.For example, you can replace a Spark JAR package or add dependencies on specific
libraries,files, orJAR packageswhen you create the custom image.
Step 4: Deploy the node
After a task on a node is configured, you must commit and deploy the task. After you commit and deploy the task, the system runs the task on a regular basis based on scheduling configurations.
-
Click the
icon in the top toolbar to save the task. -
Click the
icon in the top toolbar to commit the task. In the Submit dialog box, configure the Change description parameter. Then, determine whether to review task code after you commit the task based on your business requirements.
Note-
You must configure the Rerun and Parent Nodes parameters on the Properties tab before you commit the task.
-
You can use the code review feature to ensure the code quality of tasks and prevent task execution errors caused by invalid task code. If you enable the code review feature, the task code that is committed can be deployed only after the task code passes the code review. For more information, see Code review.
-
If you use a workspace in standard mode, you must deploy the task in the production environment after you commit the task. To deploy a task on a node, click Deploy in the upper-right corner of the configuration tab of the node. For more information, see Deploy nodes.
More operations
After you commit and deploy the task, the task is periodically run based on the scheduling configurations. You can click Operation Center in the upper-right corner of the configuration tab of the corresponding node to go to Operation Center and view the scheduling status of the task. For more information, see Manage scheduled tasks.
FAQ
-
Q: How do I resolve a node connection timeout?
A: This error can occur if the resource group and the cluster do not have network connectivity. To resolve this issue, go to the computing resource list page, find the resource, and click Resource Initialization. In the dialog box that appears, click Re-initialize and ensure that the resource is successfully initialized.
icon and select your scheduling resource group to run the EMR Spark node. When the job is complete, record the