Create an EMR Spark node

更新时间:
复制 MD 格式

Apache Spark is a versatile, high-performance, and easy-to-use big data analytics engine that performs complex in-memory analytics and builds large-scale, low-latency data analytics applications. DataWorks provides EMR Spark nodes to develop Spark tasks and implement periodic scheduling. This topic explains how to create an EMR Spark node and demonstrates its features with a detailed example.

Prerequisites

  • To customize the component environment before you start node development, create a custom image based on the official dataworks_emr_base_task_pod image and then use the image in DataStudio.

    For example, you can replace a Spark JAR package or add dependencies on specific libraries, files, or JAR packages when you create the custom image.

  • An EMR cluster must be registered with DataWorks. For more information, see DataStudio (legacy version): Bind an EMR computing resource.

  • (Optional) If you use a RAM user for task development, it must be added to the corresponding workspace and granted the Development or Workspace Administrator role. The Workspace Manager role grants extensive permissions. Grant this role with caution. For more information, see Add members to a workspace.

  • A resource group must be purchased and configured, including its workspace binding and network settings. For more information, see Use a Serverless resource group.

  • A business flow must be created. DataStudio organizes development operations into business flows, so you cannot create a node without one. For more information, see Create a business flow.

  • If your development tasks require a specific environment, you can use the custom image feature provided by DataWorks to build a custom image that contains the required components for task execution. For more information, see Custom image.

Limitations

  • This type of task can run only on a serverless resource group (recommended) or an exclusive resource group for scheduling. If you need to use an image during development in DataStudio, you must use a serverless resource group.

  • For DataLake or custom clusters, you must configure EMR-HOOK on the cluster to manage metadata in DataWorks. If EMR-HOOK is not configured, you cannot view real-time metadata, generate audit logs, display data lineage, or perform EMR-related governance tasks in DataWorks. For more information, see Configure EMR-HOOK for Spark SQL.

  • EMR Serverless Spark clusters support viewing data lineage, whereas EMR on ACK Spark clusters do not.

  • EMR on ACK Spark clusters and EMR Serverless Spark clusters support referencing OSS resources by using ossref and uploading resources to OSS, but do not support uploading resources to HDFS.

  • DataLake clusters and custom clusters support referencing OSS resources by using ossref, uploading resources to OSS, and uploading resources to HDFS.

Notes

If you have enabled Ranger access control for Spark in the EMR cluster associated with the current workspace:

  • Spark tasks that use the default image automatically support this feature.

  • To run a Spark task with a custom image, submit a ticket to technical support to request an image upgrade.

Preparation: Develop a Spark job JAR

Before you can schedule an EMR Spark job with DataWorks, you must first develop the job in EMR, compile it, and generate a JAR package. For more information about developing EMR Spark jobs, see Spark overview.

Note

You will then upload the JAR package to DataWorks to schedule the EMR Spark job periodically.

I. Create an EMR Spark node

  1. Log on to the DataWorks console. In the target region, click Data Development and O&M > Data Development in the left-side navigation pane. Select a workspace from the drop-down list and click Go to Data Development.

  2. Create an EMR Spark node.

    1. Right-click the target business flow and choose Create Node > EMR > EMR Spark.

      Note

      You can also hover over Create and choose Create Node > EMR > EMR Spark.

    2. In the Create Node dialog box, enter a Name and select the Engine Instance, Node Type, and Path. Click Confirm to go to the EMR Spark node editor.

      Note

      The node name can contain uppercase letters, lowercase letters, Chinese characters, digits, underscores (_), and periods (.).

2. Develop a Spark job

To open the code editor, double-click the EMR Spark node. You can then select a method based on your scenario.

Method 1: Upload and reference an EMR JAR resource

In DataWorks, you can upload a resource from your local computer to DataStudio and then reference it. After compiling an EMR Spark job into a JAR package, select a storage method based on its size.

You can upload and commit a JAR package as a DataWorks EMR resource. Alternatively, you can store it in the HDFS of an EMR cluster. EMR on ACK Spark clusters and EMR Serverless Spark clusters do not support resource uploads to HDFS.

JAR package smaller than 500 MB

  1. Create an EMR JAR resource.

    This method allows for visual resource management in the DataWorks console. After you create the resource, commit it. For more information, see Create and use EMR resources.

    In the Create Resource dialog box, set Engine Type to EMR and select an Engine Instance. Set Resource Type to EMR JAR, specify the Path, and select Upload as EMR Resource. Select a Storage Path type, which can be OSS One-click Authorization or HDFS. Set File Source to Local and click Upload to upload the file. Specify a Name for the resource. The name of a JAR resource must end with .jar. Then, click Create.

    Note

    The first time you create an EMR resource to be stored in OSS, you must authorize access as prompted.

  2. Reference the EMR JAR resource.

    1. Double-click the EMR Spark node to open its code editor.

    2. In the left-side navigation tree, choose EMR > Resources. Find your EMR JAR resource, right-click it, and select Insert Resource Path.

    3. After you reference the resource, the system automatically adds a reference code snippet to the node's code editor. The following code snippet provides an example.

      ##@resource_reference{"spark-examples_2.12-1.0.0-SNAPSHOT-shaded.jar"}
      spark-examples_2.12-1.0.0-SNAPSHOT-shaded.jar

      The presence of this snippet indicates that the resource is referenced. In the preceding code, spark-examples_2.12-1.0.0-SNAPSHOT-shaded.jar is the name of the uploaded EMR JAR resource.

    4. Modify the code of the EMR Spark node and add a spark-submit command. The following code provides an example.

      Note

      The code editor for EMR Spark nodes does not support comments. Modify the job code based on the following example. Do not add comments; otherwise, an error will occur when the node runs.

      ##@resource_reference{"spark-examples_2.11-2.4.0.jar"}
      spark-submit --class org.apache.spark.examples.SparkPi --master yarn  spark-examples_2.11-2.4.0.jar 100

      The following table describes the parameters.

      • org.apache.spark.examples.SparkPi: the main class of the job in the compiled JAR package.

      • spark-examples_2.11-2.4.0.jar: the name of the EMR JAR resource that you uploaded.

      • You can leave the other parameters as they are in the example above. You can also run the following command to view the help for spark submit and modify the spark submit command as needed.

        Note
        • If you want to use simplified parameters of the spark-submit command in a Spark node, you must add the parameters to the code. Example: --executor-memory 2G.

        • Spark nodes support job submission only in YARN cluster mode.

        • For jobs submitted by using the spark-submit command, set the deploy mode parameter to cluster, not client.

        spark-submit --help
        spark-submit --help
        Process Output>>>
        Process Output>>> Options:
        Process Output>>>       --master MASTER_URL         spark://host:port, mesos://host:port, yarn,
        Process Output>>>                                   k8s://https://host:port, or local (Default: local[*]).
        Process Output>>>       --deploy-mode DEPLOY_MODE   Whether to launch the driver program locally ("client") or
        Process Output>>>                                   on one of the worker machines inside the cluster ("cluster")
        Process Output>>>                                   (Default: client).
        Process Output>>>       --class CLASS_NAME          Your application's main class (for Java / Scala apps).
        Process Output>>>       --name NAME                 A name of your application.
        Process Output>>>       --jars JARS                 Comma-separated list of jars to include on the driver
        Process Output>>>                                   and executor classpaths.
        Process Output>>>       --packages                  Comma-separated list of maven coordinates of jars to include
        Process Output>>>                                   on the driver and executor classpaths. Will search the local
        Process Output>>>                                   maven repo, then maven central and any additional remote
        Process Output>>>                                   repositories given by --repositories. The format for the
        Process Output>>>                                   coordinates should be groupId:artifactId:version.
        Process Output>>>       --exclude-packages          Comma-separated list of groupId:artifactId, to exclude while
        Process Output>>>                                   resolving the dependencies provided in --packages to avoid
        Process Output>>>                                   dependency conflicts.
        Process Output>>>       --repositories              Comma-separated list of additional remote repositories to
        Process Output>>>                                   search for the maven coordinates given with --packages.
        Process Output>>>       --py-files PY_FILES         Comma-separated list of .zip, .egg, or .py files to place
        Process Output>>>                                   on the PYTHONPATH for Python apps.
        Process Output>>>       --files FILES               Comma-separated list of files to be placed in the working
        Process Output>>>                                   directory of each executor. File paths of these files

JAR package 500 MB or larger

  1. Create an EMR JAR resource.

    If a JAR package is 500 MB or larger, you cannot upload it from your local computer as a DataWorks resource. Store the JAR package in the HDFS of your EMR cluster and record its storage path. You can then use the path to reference the package when you schedule the Spark job.

  2. Reference the EMR JAR resource.

    If the JAR package is stored in HDFS, reference it directly by specifying its path in the EMR Spark node code.

    1. Double-click the created EMR Spark node to open the node's code editor.

    2. Write a spark-submit command. The following code provides an example.

      spark-submit --master yarn
      --deploy-mode cluster
      --name SparkPi
      --driver-memory 4G
      --driver-cores 1
      --num-executors 5
      --executor-memory 4G
      --executor-cores 1
      --class org.apache.spark.examples.JavaSparkPi
      hdfs:///tmp/jars/spark-examples_2.11-2.4.8.jar 100

      The following table describes the parameters.

      • hdfs:///tmp/jars/spark-examples_2.11-2.4.8.jar: the path of the JAR package in HDFS.

      • org.apache.spark.examples.JavaSparkPi: the main class of the job in the compiled JAR package.

      • Other parameters are EMR cluster parameters and must be modified based on your business requirements. You can also run the following command to view the help information for the spark-submit command and modify the command as needed.

        Important
        • If you want to use simplified parameters of the Spark-submit command in a Spark node, you must add the parameters to the code. Example: --executor-memory 2G.

        • Spark nodes support job submission only in YARN cluster mode.

        • For jobs submitted by using the spark-submit command, set the deploy mode parameter to cluster, not client.

        spark-submit --help

Method 2: Directly reference an OSS resource

You can directly reference an OSS resource from a node by using the OSS REF method. When you run an EMR node, DataWorks automatically downloads the specified OSS resources to a local directory. This method is suitable for scenarios where an EMR job requires JAR dependencies or scripts.

  1. Develop a JAR resource.

    1. Prepare code dependencies.

      You can find the required code dependencies in the /usr/lib/emr/spark-current/jars/ path on the master node of the cluster. The following example uses Spark 3.4.2. You must add pom dependencies and reference related plug-ins in your IDEA project.

      Pom dependencies

      <dependencies>
              <dependency>
                  <groupId>org.apache.spark</groupId>
                  <artifactId>spark-core_2.12</artifactId>
                  <version>3.4.2</version>
              </dependency>
              <!-- Apache Spark SQL -->
              <dependency>
                  <groupId>org.apache.spark</groupId>
                  <artifactId>spark-sql_2.12</artifactId>
                  <version>3.4.2</version>
              </dependency>
      </dependencies>

      Related plug-ins

      <build>
              <sourceDirectory>src/main/scala</sourceDirectory>
              <testSourceDirectory>src/test/scala</testSourceDirectory>
              <plugins>
                  <plugin>
                      <groupId>org.apache.maven.plugins</groupId>
                      <artifactId>maven-compiler-plugin</artifactId>
                      <version>3.7.0</version>
                      <configuration>
                          <source>1.8</source>
                          <target>1.8</target>
                      </configuration>
                  </plugin>
                  <plugin>
                      <artifactId>maven-assembly-plugin</artifactId>
                      <configuration>
                          <descriptorRefs>
                              <descriptorRef>jar-with-dependencies</descriptorRef>
                          </descriptorRefs>
                      </configuration>
                      <executions>
                          <execution>
                              <id>make-assembly</id>
                              <phase>package</phase>
                              <goals>
                                  <goal>single</goal>
                              </goals>
                          </execution>
                      </executions>
                  </plugin>
                  <plugin>
                      <groupId>net.alchim31.maven</groupId>
                      <artifactId>scala-maven-plugin</artifactId>
                      <version>3.2.2</version>
                      <configuration>
                          <recompileMode>incremental</recompileMode>
                      </configuration>
                      <executions>
                          <execution>
                              <goals>
                                  <goal>compile</goal>
                                  <goal>testCompile</goal>
                              </goals>
                              <configuration>
                                  <args>
                                      <arg>-dependencyfile</arg>
                                      <arg>${project.build.directory}/.scala_dependencies</arg>
                                  </args>
                              </configuration>
                          </execution>
                      </executions>
                  </plugin>
              </plugins>
          </build>
    2. Write sample code.

      package com.aliyun.emr.example.spark
      import org.apache.spark.sql.SparkSession
      object SparkMaxComputeDemo {
        def main(args: Array[String]): Unit = {
          // Creates a SparkSession.
          val spark = SparkSession.builder()
            .appName("HelloDataWorks")
            .getOrCreate()
          // Prints the Spark version.
          println(s"Spark version: ${spark.version}")
        }
      }
    3. Package the code into a JAR file.

      After you edit and save the Scala code, package the Scala code into a JAR file. In this example, the generated JAR package is named SparkWorkOSS-1.0-SNAPSHOT-jar-with-dependencies.jar.

  2. Upload the JAR resource.

    1. After the code is developed, log on to the OSS console. In the left-side navigation pane, click Buckets.

    2. Click the name of the destination bucket to go to the File Management page.

      In this example, the onaliyun-bucket-2 bucket is used.

    3. Click New Directory to create a directory to store the JAR resource.

      Set Directory Name to emr/jars to create the directory to store the JAR resource.

    4. Upload the JAR resource to the directory.

      Go to the directory, click Upload File, and then click Scan in the Files to Upload section. Add the SparkWorkOSS-1.0-SNAPSHOT-jar-with-dependencies.jar file to the bucket and click Upload File.

  3. Reference the JAR resource.

    1. Edit the code to reference the JAR resource.

      On the edit page of the created EMR Spark node, edit the code to reference the JAR resource.

      spark-submit --class com.aliyun.emr.example.spark.SparkMaxComputeDemo --master yarn ossref://onaliyun-bucket-2/emr/jars/SparkWorkOSS-1.0-SNAPSHOT-jar-with-dependencies.jar

      The following table describes the parameters.

      Parameter

      Description

      class

      The name of the main class that you want to run.

      master

      The running mode of the Spark application.

      ossref file path

      The path must be in the ossref://{endpoint}/{bucket}/{object} format.

      • Endpoint: the domain name that is used to access OSS. If you omit this parameter, the OSS bucket and your EMR cluster must be in the same region.

      • Bucket: A container for storing objects in OSS. Each bucket must have a unique name. You can log on to the OSS console to view all buckets in your account.

      • object: a specific object stored in a bucket, which can be a file name or path.

    2. Run the EMR Spark node job.

      After editing the code, click the image icon and select your scheduling resource group to run the EMR Spark node. When the job is complete, record the applicationId from the console output. Example: application_1730367929285_xxxx.

    3. View the result.

      Create an EMR Shell node and run the yarn logs -applicationId application_1730367929285_xxxx command to view the result.

      yarn logs -applicationId application_xxx
      Process Output>>> LogLastModifiedTime:Fri Nov 01 16:27:46 +0800 2024
      Process Output>>> LogLength:21
      Process Output>>> LogContents:
      Process Output>>> Spark version: 3.4.2
      Process Output>>>

(Optional) Advanced parameters

You can configure Spark-specific parameters in the Advanced Settings section of a node. For more information about Spark properties, see Spark Configuration. The advanced parameters that you can configure vary for different types of EMR clusters, as shown in the following table.

DataLake or custom cluster: EMR on ECS

Parameter

Description

queue

The scheduling queue to which jobs are submitted. Default value: default.

If you configure a workspace-level YARN resource queue when you register an EMR cluster to a DataWorks workspace, the following rules apply:

  • If you select the Global Settings Take Precedence check box and set the value to Yes, the configuration of the queue that you specify when you register the EMR cluster takes precedence when a Spark job runs.

  • If you clear the check box, the configuration of the queue that you specify for the EMR Spark node takes precedence when a Spark job runs.

For more information about EMR YARN, see Basic queue configurations. For more information about how to configure a queue when you register an EMR cluster, see Set the global YARN resource queue.

priority

The priority. Default value: 1.

FLOW_SKIP_SQL_ANALYZE

The method that is used to run SQL statements. Valid values:

  • true: Multiple SQL statements are run at a time.

  • false (default value): A single SQL statement is run at a time.

Note

This parameter can be used only for test runs in Data Development.

Others

  • You can directly append custom Spark parameters in the advanced configurations. Example: "spark.eventLog.enabled":false. DataWorks automatically adds the parameters to the code that is delivered to the EMR cluster. The format is --conf key=value.

  • You can also configure global Spark parameters. For more information, see Configure global Spark parameters.

    Note

    To enable Ranger access control, add the spark.hadoop.fs.oss.authorization.method=ranger configuration in Configure global Spark parameters.

EMR Serverless Spark cluster

For more information about how to configure parameters, see Configure parameters for a Spark job.

Parameter

Description

queue

The scheduling queue to which jobs are submitted. Default value: dev_queue.

priority

The priority. Default value: 1.

FLOW_SKIP_SQL_ANALYZE

The method that is used to run SQL statements. Valid values:

  • true: Multiple SQL statements are run at a time.

  • false: A single SQL statement is run at a time.

Note

This parameter can be used only for test runs in Data Development.

SERVERLESS_RELEASE_VERSION

The version of the Spark engine. By default, the Default Engine Version that is configured for the cluster in the Cluster Management section of the Management Center page is used. You can configure this parameter to specify different engine versions for different jobs.

SERVERLESS_QUEUE_NAME

The resource queue that you want to specify. By default, the Default Resource Queue that is configured for the cluster in the Cluster Management section of the Management Center page is used. If you want to isolate and manage resources, you can add queues. For more information, see Manage resource queues.

Others

  • You can directly append custom Spark parameters in the advanced configurations. Example: "spark.eventLog.enabled":false. DataWorks automatically adds the parameters to the code that is delivered to the EMR cluster. The format is --conf key=value.

  • You can also configure global Spark parameters. For more information, see Configure global Spark parameters.

Spark cluster: EMR on ACK

Parameter

Description

queue

This parameter is not supported.

priority

This parameter is not supported.

FLOW_SKIP_SQL_ANALYZE

The method that is used to run SQL statements. Valid values:

  • true: Multiple SQL statements are run at a time.

  • false: A single SQL statement is run at a time.

Note

This parameter can be used only for test runs in Data Development.

Others

  • You can directly append custom Spark parameters in the advanced configurations. Example: "spark.eventLog.enabled":false. DataWorks automatically adds the parameters to the code that is delivered to the EMR cluster. The format is --conf key=value.

  • You can also configure global Spark parameters. For more information, see Configure global Spark parameters.

Hadoop cluster: EMR on ECS

Parameter

Description

queue

The scheduling queue to which jobs are submitted. Default value: default.

If you configure a workspace-level YARN resource queue when you register an EMR cluster to a DataWorks workspace, the following rules apply:

  • If you select the Global Settings Take Precedence check box and set the value to Yes, the configuration of the queue that you specify when you register the EMR cluster takes precedence when a Spark job runs.

  • If you clear the check box, the configuration of the queue that you specify for the EMR Spark node takes precedence when a Spark job runs.

For more information about EMR YARN, see Basic queue configurations. For more information about how to configure a queue when you register an EMR cluster, see Set the global YARN resource queue.

priority

The priority. Default value: 1.

FLOW_SKIP_SQL_ANALYZE

The method that is used to run SQL statements. Valid values:

  • true: Multiple SQL statements are run at a time.

  • false: A single SQL statement is run at a time.

Note

This parameter can be used only for test runs in Data Development.

USE_GATEWAY

Specifies whether to submit jobs from the current node through a Gateway cluster. Valid values:

  • true: Jobs are submitted through a Gateway cluster.

  • false: Jobs are not submitted through a Gateway cluster. By default, jobs are submitted to a header node.

Note

If the cluster in which the current node resides is not associated with a Gateway cluster and you set this parameter to true, the EMR jobs fail to be submitted.

Others

  • You can directly append custom Spark parameters in the advanced configurations. Example: "spark.eventLog.enabled":false. DataWorks automatically adds the parameters to the code that is delivered to the EMR cluster. The format is --conf key=value.

  • You can also configure global Spark parameters. For more information, see Configure global Spark parameters.

    Note

    To enable Ranger access control, add the spark.hadoop.fs.oss.authorization.method=ranger configuration in Configure global Spark parameters.

SQL job

  1. On the toolbar, click the 高级运行 icon. In the Parameter dialog box, select the created scheduling resource group and click Running.

    Note
    • To access compute resources that are deployed in a public network or a VPC, you must use a scheduling resource group that can connect to the compute resources. For more information, see Solutions for network connections.

    • If you want to modify the resource group that is used to run a job, click the Run with Parameters 高级运行 icon and select the resource group that you want to use.

    • When you query data by using an EMR Spark node, the result is limited to a maximum of 10,000 records and a total size of 10 MB.

  2. Click the 保存 icon to save your SQL statements.

  3. (Optional) Perform smoke testing.

    If you want to run smoke tests in the Development environment, you can run them when you commit to an execution node or after the node is committed. For more information, see Run a smoke test.

Step 3: Configure node scheduling

If you want the system to periodically run a task on the node, you can click Properties in the right-side navigation pane on the configuration tab of the node to configure task scheduling properties based on your business requirements. For more information, see Overview.

Note
  • You must set the Rerun attribute property and the Parent Nodes for the node before you can submit it.

  • If you need to customize the component environment, you can create a custom image based on the official dataworks_emr_base_task_pod image and use the image in DataStudio.

    For example, you can replace a Spark JAR package or add dependencies on specific libraries, files, or JAR packages when you create the custom image.

Step 4: Deploy the node

After a task on a node is configured, you must commit and deploy the task. After you commit and deploy the task, the system runs the task on a regular basis based on scheduling configurations.

  1. Click the 保存 icon in the top toolbar to save the task.

  2. Click the 提交 icon in the top toolbar to commit the task.

    In the Submit dialog box, configure the Change description parameter. Then, determine whether to review task code after you commit the task based on your business requirements.

    Note
    • You must configure the Rerun and Parent Nodes parameters on the Properties tab before you commit the task.

    • You can use the code review feature to ensure the code quality of tasks and prevent task execution errors caused by invalid task code. If you enable the code review feature, the task code that is committed can be deployed only after the task code passes the code review. For more information, see Code review.

If you use a workspace in standard mode, you must deploy the task in the production environment after you commit the task. To deploy a task on a node, click Deploy in the upper-right corner of the configuration tab of the node. For more information, see Deploy nodes.

More operations

After you commit and deploy the task, the task is periodically run based on the scheduling configurations. You can click Operation Center in the upper-right corner of the configuration tab of the corresponding node to go to Operation Center and view the scheduling status of the task. For more information, see Manage scheduled tasks.

FAQ

Related documents