Spark is a unified analytics engine for big data known for its high performance, ease of use, and versatility. It supports complex in-memory computing, making it ideal for building large-scale, low-latency data analytics applications. DataWorks lets you use EMR Spark nodes to develop and periodically schedule Spark tasks. This topic explains how to configure and use EMR Spark nodes and provides application examples.
Prerequisites
-
Before you start node development, if you need a custom component environment, create a custom image based on the official
dataworks_emr_base_task_podimage and then use it in Data Studio. For more information, see Create a custom image and Use images in data development.For example, you can replace Spark JAR packages or add dependencies on specific
libraries,files, orJAR packageswhen you create a custom image. -
You have created an Alibaba Cloud EMR cluster and registered it with DataWorks. For more information, see Data Studio: Bind an EMR computing resource.
-
(Optional, for RAM users) The RAM user for task development has been added to the corresponding workspace and granted the Development or Workspace Administrator (this role has extensive permissions, grant with caution) role. For more information about how to add members, see Add members to a workspace.
If you use an Alibaba Cloud account, you can skip this step.
-
If your tasks require a specific development environment, you can build a custom image. For more information, see custom image.
Limitations
-
EMR Shell nodes can run only on a serverless resource group (recommended) or an exclusive resource group for scheduling. Using a custom image for data development requires a serverless resource group.
-
To manage metadata for a DataLake or custom cluster in DataWorks, you must first configure EMR-HOOK on the cluster. For more information, see Configure EMR-HOOK for Spark SQL.
NoteWithout EMR-HOOK configured on the cluster, DataWorks cannot display metadata in real time, generate audit logs, display data lineage, or perform EMR-related governance tasks.
-
While EMR on ACK Spark clusters do not support viewing data lineage, EMR Serverless Spark clusters do.
-
EMR on ACK Spark clusters and EMR Serverless Spark clusters support only referencing OSS resources via OSS REF and uploading resources to OSS, but not uploading resources to HDFS.
-
DataLake clusters and custom clusters support referencing OSS resources via OSS REF, uploading resources to OSS, and uploading resources to HDFS.
Notes
If you have enabled Ranger access control for Spark in the EMR cluster associated with your workspace:
-
When you run a Spark task with the default image, this feature is enabled by default.
-
To run a Spark task with a custom image, submit a ticket to request an image upgrade to enable this feature.
Develop a Spark job and get the JAR package
Before you can schedule EMR Spark jobs with DataWorks, you must develop the job in EMR and compile the code into a JAR package. For more information on developing EMR Spark jobs, see Spark Overview.
You will then need to upload the JAR package to DataWorks to schedule recurring EMR Spark jobs.
Procedure
-
On the EMR Spark node editing page, develop the job.
Develop a Spark job
Choose an option based on your scenario:
Option 1: Upload and reference
DataWorks allows you to upload resources from your local machine to DataStudio and then reference them. After you compile the EMR Spark job, you must obtain the compiled jar package. The recommended storage method for the jar package depends on its size.
Upload the jar package and create it as a DataWorks EMR resource, then submit it. You can also store it directly in HDFS on the EMR cluster. EMR on ACK and EMR Serverless Spark clusters do not support uploading resources to HDFS.
JAR < 500 MB
-
Create an EMR JAR resource.
If a jar package is smaller than 500 MB, you can upload it from your local machine as a DataWorks EMR JAR resource. This enables easy visual management in the DataWorks console. After creating the resource, submit it. For more information, see Create and use EMR resources.
-
Upload the jar package to the storage directory for JAR resources by using the Local upload method. For more information, see Resource management.
-
Click Click Upload to upload the JAR resource.
-
Select a Storage Path, Data Sources, and Resource Group.
-
Click Save.
-
-
Reference the EMR JAR resource.
-
Open the created EMR Spark node and go to the code editing page.
-
In the Resource Management section of the left-side navigation pane, find the resource that you want to reference, right-click it, and select Insert Resource Path.
-
After you select the reference, a reference statement is automatically added to the code editor of the EMR Spark node. This indicates a successful reference.
##@resource_reference{"spark-examples_2.12-1.0.0-SNAPSHOT-shaded.jar"} spark-examples_2.12-1.0.0-SNAPSHOT-shaded.jarIn this code,
spark-examples_2.12-1.0.0-SNAPSHOT-shaded.jaris the name of the EMR JAR resource that you uploaded. -
Edit the code for the EMR Spark node to include the spark-submit command. The following code is an example.
NoteThe EMR Spark node editor does not support comments. Write the job code as shown in the following example, without comments. Otherwise, the node fails to run.
##@resource_reference{"spark-examples_2.11-2.4.0.jar"} spark-submit --class org.apache.spark.examples.SparkPi --master yarn spark-examples_2.11-2.4.0.jar 100Note-
org.apache.spark.examples.SparkPi: The main class in the compiled jar package. -
spark-examples_2.11-2.4.0.jar: The name of the EMR JAR resource that you uploaded. -
You can use the example values for the other parameters. You can also run the
spark-submit --helpcommand to view usage information and modify the spark-submit command as needed. -
To use simplified parameters for the spark-submit command in a Spark node, you must add them to your code, for example,
--executor-memory 2G. -
Spark nodes only support submitting jobs to YARN in cluster mode.
-
For jobs submitted with spark-submit, set the deploy mode to cluster mode, not client mode.
-
-
JAR >= 500 MB
-
Create an EMR JAR resource.
If a jar package is 500 MB or larger, you cannot upload it from your local machine as a DataWorks resource. We recommend that you store the jar package directly in HDFS on the EMR cluster and record its storage path. You can then reference this path when you schedule the Spark job in DataWorks.
-
Upload the jar package to the storage directory for JAR resources by using the Local upload method. For more information, see Resource management.
-
Click Click Upload to upload the JAR resource.
-
Select a Storage Path, Data Sources, and Resource Group.
-
Click Save.
-
-
Reference the EMR JAR resource.
If the jar package is stored in HDFS, you can reference it by specifying its path in the code of the EMR Spark node.
-
Double-click the created EMR Spark node to open its code editor page.
-
Write the spark-submit command. The following is an example.
spark-submit --master yarn --deploy-mode cluster --name SparkPi --driver-memory 4G --driver-cores 1 --num-executors 5 --executor-memory 4G --executor-cores 1 --class org.apache.spark.examples.JavaSparkPi hdfs:///tmp/jars/spark-examples_2.11-2.4.8.jar 100Note-
hdfs:///tmp/jars/spark-examples_2.11-2.4.8.jar: The actual path of the jar package in HDFS.
-
org.apache.spark.examples.JavaSparkPi: The main class in the compiled jar package.
-
Configure the other parameters according to your EMR cluster's settings. You can also run the
spark-submit --helpcommand to view the usage information for spark-submit and modify the command as needed. -
To use simplified parameters for the spark-submit command in a Spark node, you must add them to your code, for example,
--executor-memory 2G. -
Spark nodes only support submitting jobs to YARN in cluster mode.
-
For jobs submitted with spark-submit, set the deploy mode to cluster mode, not client mode.
-
-
Option 2: Reference an OSS resource
You can directly reference OSS resources from the current node by using OSS REF. When you run an EMR node, DataWorks automatically loads the OSS resources specified in the code for local use. This method is commonly used in scenarios where EMR jobs require JAR dependencies or depend on scripts.
-
Develop the JAR resource.
-
Prepare code dependencies.
You can view the required code dependencies in the
/usr/lib/emr/spark-current/jars/path on the master node of your EMR cluster. The following example uses Spark 3.4.2. You must open an existing IntelliJ IDEA project, add Project Object Model (POM) dependencies, and reference the relevant plug-ins.POM dependencies
<dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.4.2</version> </dependency> <!-- Apache Spark SQL --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>3.4.2</version> </dependency> </dependencies>Plug-ins
<build> <sourceDirectory>src/main/scala</sourceDirectory> <testSourceDirectory>src/test/scala</testSourceDirectory> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.7.0</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <configuration> <recompileMode>incremental</recompileMode> </configuration> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> <configuration> <args> <arg>-dependencyfile</arg> <arg>${project.build.directory}/.scala_dependencies</arg> </args> </configuration> </execution> </executions> </plugin> </plugins> </build> -
Sample code
package com.aliyun.emr.example.spark import org.apache.spark.sql.SparkSession object SparkMaxComputeDemo { def main(args: Array[String]): Unit = { // Create a SparkSession. val spark = SparkSession.builder() .appName("HelloDataWorks") .getOrCreate() // Print the Spark version. println(s"Spark version: ${spark.version}") } } -
After editing the Scala code, generate a jar package.
The sample jar package is named
SparkWorkOSS-1.0-SNAPSHOT-jar-with-dependencies.jar.
-
-
Upload the JAR resource.
-
After finishing code development, log in to the OSS console. In the left-side navigation pane for the region, click Bucket list.
-
Click the name of the destination bucket to open the File Management page.
This topic uses the
onaliyun-bucket-2bucket as an example. -
Click Create Directory to create a directory to store the JAR resource.
Set Catalog Name to
emr/jarsto create the storage directory for the JAR resource. -
Upload the JAR resource to the storage directory.
Go to the directory and click Upload File. In the Files to Upload area, click Scan Files, add the
SparkWorkOSS-1.0-SNAPSHOT-jar-with-dependencies.jarfile to the bucket, and then click Upload File.
-
-
Reference the JAR resource.
-
Edit the code to reference the JAR resource.
On the editing page of the created EMR Spark node, edit the code to reference the JAR resource.
spark-submit --class com.aliyun.emr.example.spark.SparkMaxComputeDemo --master yarn ossref://onaliyun-bucket-2/emr/jars/SparkWorkOSS-1.0-SNAPSHOT-jar-with-dependencies.jarParameter description:
Parameter
Description
classThe fully qualified name of the main class to run.
masterThe running mode of the Spark application.
ossreffile pathThe format is
ossref://{endpoint}/{bucket}/{object}.-
Endpoint: The endpoint of the OSS service. If this parameter is left empty, the OSS resource must be in the same region as the EMR cluster.
-
Bucket: A container in OSS for storing objects. Each bucket has a unique name. Log in to the OSS console to view all buckets under your account.
-
Object: A specific object, such as a file name or path, stored in a bucket.
-
-
Run the EMR Spark node job.
After you finish editing, click the
icon and select the Serverless resource group that you created to run the EMR Spark node. After the job is complete, record the applicationIdsfrom the console, for example,application_1730367929285_xxxx. -
View the result.
Create an EMR Shell node and run the
yarn logs -applicationId application_1730367929285_xxxxcommand on the node to view the execution result:yarn logs -applicationId application_xxx Process Output>>> LogLastModifiedTime:Fri Nov 01 16:27:46 +0800 2024 Process Output>>> LogLength:21 Process Output>>> LogContents: Process Output>>> Spark version: 3.4.2 Process Output>>>
-
(Optional) Configure advanced parameters
You can configure the following unique property parameters in the Scheduling Settings pane on the right side of the node, under .
Note-
The available advanced parameters vary by EMR cluster type, as shown in the following tables.
-
For more open-source Spark properties, you can configure them in the Scheduling Settings pane under .
Datalake cluster/custom cluster: EMR on ECS
Advanced parameter
Description
queue
The scheduling queue for the job. The default is
default.If you configured a workspace-level YARN Resource Queue when you registered the EMR cluster with the DataWorks workspace:
-
If you select the Global Settings Take Precedence checkbox and set it to Yes, the Spark job uses the queue configured when you registered the EMR cluster.
-
If you do not select the checkbox, the Spark job uses the queue configured for the EMR Spark node.
For more information about EMR YARN, see Basic queue configuration. For more information about the queue configuration when you register an EMR cluster, see Set the global YARN resource queue.
priority
The priority. The default value is 1.
FLOW_SKIP_SQL_ANALYZE
The execution mode for SQL statements. Valid values:
-
true: Executes multiple SQL statements at a time. -
false(default): Executes one SQL statement at a time.
NoteThis parameter is supported only for test runs in the data development environment.
Other
-
You can add custom Spark parameters in Advanced Configurations. For example, if you add
spark.eventLog.enabled : false, DataWorks automatically adds the parameter to the code that is submitted to the EMR cluster in the--conf key=valueformat. -
You can also configure global Spark parameters. For more information, see Set global Spark parameters.
NoteTo enable Ranger access control, add the
spark.hadoop.fs.oss.authorization.method=rangerconfiguration in Set global Spark parameters to ensure that Ranger access control takes effect.
Spark cluster: EMR on ACK
Advanced parameter
Description
FLOW_SKIP_SQL_ANALYZE
The execution mode for SQL statements. Valid values:
-
true: Executes multiple SQL statements at a time. -
false: Executes one SQL statement at a time.
NoteThis parameter is supported only for test runs in the data development environment.
Other
-
You can add custom Spark parameters in Advanced Configurations. For example,
spark.eventLog.enabled: false. DataWorks automatically adds the parameter to the code that is submitted to the EMR cluster in the--conf key=valueformat. -
You can also configure global Spark parameters. For more information, see Set global Spark parameters.
Hadoop cluster: EMR on ECS
Advanced parameter
Description
queue
The scheduling queue to which the job is submitted. The default value is
default.If you configure a workspace-level YARN resource queue when you register the EMR cluster with the DataWorks workspace:
-
If you set Global Settings Take Precedence to Yes, the queue for a Spark job defaults to the configuration from the EMR cluster registration.
-
If you do not select the checkbox, the queue configured for the EMR Spark node is used.
For more information about EMR YARN, see Basic queue configuration. For more information about the queue configuration during EMR cluster registration, see Set a global YARN resource queue.
priority
The priority. The default value is 1.
FLOW_SKIP_SQL_ANALYZE
The execution mode for SQL statements. Valid values:
-
true: Executes multiple SQL statements at a time. -
false: Executes one SQL statement at a time.
NoteThis parameter is supported only for test runs in the data development environment.
USE_GATEWAY
Specifies whether to submit the job for this node through a Gateway cluster. Valid values:
-
true: Submits the job through the Gateway cluster. -
false: Does not submit the job through a Gateway cluster. By default, the job is submitted to the header node.
NoteIf you set this parameter to
truebut the node's cluster is not associated with a Gateway cluster, subsequent EMR job submissions will fail.Other
-
You can add custom Spark parameters directly in Advanced Configurations. For an entry such as
spark.eventLog.enabled : false, DataWorks automatically adds it to the code submitted to the EMR cluster in the--conf key=valueformat. -
You can also configure global Spark parameters. For more information, see Set global Spark parameters.
NoteTo enable Ranger access control, add the
spark.hadoop.fs.oss.authorization.method=rangerconfiguration in Set global Spark parameters to ensure that Ranger access control takes effect.
EMR Serverless Spark cluster
For more information about parameter settings, see Set parameters for submitting a Spark job.
Advanced parameter
Description
queue
The scheduling queue for the job. The default is
dev_queue.priority
The priority. The default value is 1.
FLOW_SKIP_SQL_ANALYZE
The execution mode for SQL statements. Valid values:
-
true: Executes multiple SQL statements at a time. -
false: Executes one SQL statement at a time.
NoteThis parameter is supported only for test runs in the data development environment.
SERVERLESS_RELEASE_VERSION
The Spark engine version. By default, the job uses the Default Engine Version configured for the cluster in Management Center under Clusters. You can configure this parameter to set different engine versions for different jobs.
SERVERLESS_QUEUE_NAME
The resource queue. By default, the job uses the Default Resource Queue configured for the cluster in Management Center under Clusters. If you have resource isolation and management requirements, you can add queues. For more information, see Manage resource queues.
Other
-
You can add custom Spark parameters in the advanced configuration. For example, if you add
spark.eventLog.enabled : false, DataWorks automatically adds the parameter to the code that is submitted to the EMR cluster in the--conf key=valueformat. -
You can also configure global Spark parameters. For more information, see Set global Spark parameters.
Run the Spark job
-
In the Run Configuration pane, go to the Compute Resource section and configure the Compute Resource and DataWorks Resource Group.
Note-
You can also CUs for Scheduling based on the resources required by the task. The default CU is
0.25. -
To access a data source over the public internet or in a VPC, you must use a scheduling resource group with verified network connectivity to the data source. For more information, see Network connectivity solutions.
-
-
In the parameter dialog box on the toolbar, select the corresponding data source, and then click Run.
-
-
To run the node job periodically, configure its scheduling properties. For more information, see Configure scheduling properties for a node.
NoteIf you need to customize the component environment, you can create a custom image based on the official image
dataworks_emr_base_task_podand use the image in Data Development.For example, when you create a custom image, you can replace Spark JAR packages or depend on specific
libraries,files, orJAR packages. -
After you configure the node job, you must deploy the node. For more information, see Deploy a node or workflow.
-
After the job is deployed, you can go to Operation Center to view the run status of the periodic job. For more information, see Get started with Operation Center.
FAQ
-
Q: How do I resolve a node connection timeout?
A: This error can occur if the resource group and the cluster do not have network connectivity. To resolve this issue, go to the computing resource list page, find the resource, and click Resource Initialization. In the dialog box that appears, click Re-initialize and ensure that the resource is successfully initialized.