EMR Spark node

更新时间:
复制 MD 格式

Spark is a unified analytics engine for big data known for its high performance, ease of use, and versatility. It supports complex in-memory computing, making it ideal for building large-scale, low-latency data analytics applications. DataWorks lets you use EMR Spark nodes to develop and periodically schedule Spark tasks. This topic explains how to configure and use EMR Spark nodes and provides application examples.

Prerequisites

  • Before you start node development, if you need a custom component environment, create a custom image based on the official dataworks_emr_base_task_pod image and then use it in Data Studio. For more information, see Create a custom image and Use images in data development.

    For example, you can replace Spark JAR packages or add dependencies on specific libraries, files, or JAR packages when you create a custom image.

  • You have created an Alibaba Cloud EMR cluster and registered it with DataWorks. For more information, see Data Studio: Bind an EMR computing resource.

  • (Optional, for RAM users) The RAM user for task development has been added to the corresponding workspace and granted the Development or Workspace Administrator (this role has extensive permissions, grant with caution) role. For more information about how to add members, see Add members to a workspace.

    If you use an Alibaba Cloud account, you can skip this step.
  • If your tasks require a specific development environment, you can build a custom image. For more information, see custom image.

Limitations

  • EMR Shell nodes can run only on a serverless resource group (recommended) or an exclusive resource group for scheduling. Using a custom image for data development requires a serverless resource group.

  • To manage metadata for a DataLake or custom cluster in DataWorks, you must first configure EMR-HOOK on the cluster. For more information, see Configure EMR-HOOK for Spark SQL.

    Note

    Without EMR-HOOK configured on the cluster, DataWorks cannot display metadata in real time, generate audit logs, display data lineage, or perform EMR-related governance tasks.

  • While EMR on ACK Spark clusters do not support viewing data lineage, EMR Serverless Spark clusters do.

  • EMR on ACK Spark clusters and EMR Serverless Spark clusters support only referencing OSS resources via OSS REF and uploading resources to OSS, but not uploading resources to HDFS.

  • DataLake clusters and custom clusters support referencing OSS resources via OSS REF, uploading resources to OSS, and uploading resources to HDFS.

Notes

If you have enabled Ranger access control for Spark in the EMR cluster associated with your workspace:

  • When you run a Spark task with the default image, this feature is enabled by default.

  • To run a Spark task with a custom image, submit a ticket to request an image upgrade to enable this feature.

Develop a Spark job and get the JAR package

Before you can schedule EMR Spark jobs with DataWorks, you must develop the job in EMR and compile the code into a JAR package. For more information on developing EMR Spark jobs, see Spark Overview.

Note

You will then need to upload the JAR package to DataWorks to schedule recurring EMR Spark jobs.

Procedure

  1. On the EMR Spark node editing page, develop the job.

    Develop a Spark job

    Choose an option based on your scenario:

    Option 1: Upload and reference

    DataWorks allows you to upload resources from your local machine to DataStudio and then reference them. After you compile the EMR Spark job, you must obtain the compiled jar package. The recommended storage method for the jar package depends on its size.

    Upload the jar package and create it as a DataWorks EMR resource, then submit it. You can also store it directly in HDFS on the EMR cluster. EMR on ACK and EMR Serverless Spark clusters do not support uploading resources to HDFS.

    JAR < 500 MB

    1. Create an EMR JAR resource.

      If a jar package is smaller than 500 MB, you can upload it from your local machine as a DataWorks EMR JAR resource. This enables easy visual management in the DataWorks console. After creating the resource, submit it. For more information, see Create and use EMR resources.

      1. Upload the jar package to the storage directory for JAR resources by using the Local upload method. For more information, see Resource management.

      2. Click Click Upload to upload the JAR resource.

      3. Select a Storage Path, Data Sources, and Resource Group.

      4. Click Save.

    2. Reference the EMR JAR resource.

      1. Open the created EMR Spark node and go to the code editing page.

      2. In the Resource Management section of the left-side navigation pane, find the resource that you want to reference, right-click it, and select Insert Resource Path.

      3. After you select the reference, a reference statement is automatically added to the code editor of the EMR Spark node. This indicates a successful reference.

        ##@resource_reference{"spark-examples_2.12-1.0.0-SNAPSHOT-shaded.jar"}
        spark-examples_2.12-1.0.0-SNAPSHOT-shaded.jar

        In this code, spark-examples_2.12-1.0.0-SNAPSHOT-shaded.jar is the name of the EMR JAR resource that you uploaded.

      4. Edit the code for the EMR Spark node to include the spark-submit command. The following code is an example.

        Note

        The EMR Spark node editor does not support comments. Write the job code as shown in the following example, without comments. Otherwise, the node fails to run.

        ##@resource_reference{"spark-examples_2.11-2.4.0.jar"}
        spark-submit --class org.apache.spark.examples.SparkPi --master yarn  spark-examples_2.11-2.4.0.jar 100
        Note
        • org.apache.spark.examples.SparkPi: The main class in the compiled jar package.

        • spark-examples_2.11-2.4.0.jar: The name of the EMR JAR resource that you uploaded.

        • You can use the example values for the other parameters. You can also run the spark-submit --help command to view usage information and modify the spark-submit command as needed.

        • To use simplified parameters for the spark-submit command in a Spark node, you must add them to your code, for example, --executor-memory 2G.

        • Spark nodes only support submitting jobs to YARN in cluster mode.

        • For jobs submitted with spark-submit, set the deploy mode to cluster mode, not client mode.

    JAR >= 500 MB

    1. Create an EMR JAR resource.

      If a jar package is 500 MB or larger, you cannot upload it from your local machine as a DataWorks resource. We recommend that you store the jar package directly in HDFS on the EMR cluster and record its storage path. You can then reference this path when you schedule the Spark job in DataWorks.

      1. Upload the jar package to the storage directory for JAR resources by using the Local upload method. For more information, see Resource management.

      2. Click Click Upload to upload the JAR resource.

      3. Select a Storage Path, Data Sources, and Resource Group.

      4. Click Save.

    2. Reference the EMR JAR resource.

      If the jar package is stored in HDFS, you can reference it by specifying its path in the code of the EMR Spark node.

      1. Double-click the created EMR Spark node to open its code editor page.

      2. Write the spark-submit command. The following is an example.

        spark-submit --master yarn
        --deploy-mode cluster
        --name SparkPi
        --driver-memory 4G
        --driver-cores 1
        --num-executors 5
        --executor-memory 4G
        --executor-cores 1
        --class org.apache.spark.examples.JavaSparkPi
        hdfs:///tmp/jars/spark-examples_2.11-2.4.8.jar 100
        Note
        • hdfs:///tmp/jars/spark-examples_2.11-2.4.8.jar: The actual path of the jar package in HDFS.

        • org.apache.spark.examples.JavaSparkPi: The main class in the compiled jar package.

        • Configure the other parameters according to your EMR cluster's settings. You can also run the spark-submit --help command to view the usage information for spark-submit and modify the command as needed.

        • To use simplified parameters for the spark-submit command in a Spark node, you must add them to your code, for example, --executor-memory 2G.

        • Spark nodes only support submitting jobs to YARN in cluster mode.

        • For jobs submitted with spark-submit, set the deploy mode to cluster mode, not client mode.

    Option 2: Reference an OSS resource

    You can directly reference OSS resources from the current node by using OSS REF. When you run an EMR node, DataWorks automatically loads the OSS resources specified in the code for local use. This method is commonly used in scenarios where EMR jobs require JAR dependencies or depend on scripts.

    1. Develop the JAR resource.

      1. Prepare code dependencies.

        You can view the required code dependencies in the /usr/lib/emr/spark-current/jars/ path on the master node of your EMR cluster. The following example uses Spark 3.4.2. You must open an existing IntelliJ IDEA project, add Project Object Model (POM) dependencies, and reference the relevant plug-ins.

        POM dependencies

        <dependencies>
                <dependency>
                    <groupId>org.apache.spark</groupId>
                    <artifactId>spark-core_2.12</artifactId>
                    <version>3.4.2</version>
                </dependency>
                <!-- Apache Spark SQL -->
                <dependency>
                    <groupId>org.apache.spark</groupId>
                    <artifactId>spark-sql_2.12</artifactId>
                    <version>3.4.2</version>
                </dependency>
        </dependencies>

        Plug-ins

        <build>
                <sourceDirectory>src/main/scala</sourceDirectory>
                <testSourceDirectory>src/test/scala</testSourceDirectory>
                <plugins>
                    <plugin>
                        <groupId>org.apache.maven.plugins</groupId>
                        <artifactId>maven-compiler-plugin</artifactId>
                        <version>3.7.0</version>
                        <configuration>
                            <source>1.8</source>
                            <target>1.8</target>
                        </configuration>
                    </plugin>
                    <plugin>
                        <artifactId>maven-assembly-plugin</artifactId>
                        <configuration>
                            <descriptorRefs>
                                <descriptorRef>jar-with-dependencies</descriptorRef>
                            </descriptorRefs>
                        </configuration>
                        <executions>
                            <execution>
                                <id>make-assembly</id>
                                <phase>package</phase>
                                <goals>
                                    <goal>single</goal>
                                </goals>
                            </execution>
                        </executions>
                    </plugin>
                    <plugin>
                        <groupId>net.alchim31.maven</groupId>
                        <artifactId>scala-maven-plugin</artifactId>
                        <version>3.2.2</version>
                        <configuration>
                            <recompileMode>incremental</recompileMode>
                        </configuration>
                        <executions>
                            <execution>
                                <goals>
                                    <goal>compile</goal>
                                    <goal>testCompile</goal>
                                </goals>
                                <configuration>
                                    <args>
                                        <arg>-dependencyfile</arg>
                                        <arg>${project.build.directory}/.scala_dependencies</arg>
                                    </args>
                                </configuration>
                            </execution>
                        </executions>
                    </plugin>
                </plugins>
            </build>
      2. Sample code

        package com.aliyun.emr.example.spark
        import org.apache.spark.sql.SparkSession
        object SparkMaxComputeDemo {
          def main(args: Array[String]): Unit = {
            // Create a SparkSession.
            val spark = SparkSession.builder()
              .appName("HelloDataWorks")
              .getOrCreate()
            // Print the Spark version.
            println(s"Spark version: ${spark.version}")
          }
        }
      3. After editing the Scala code, generate a jar package.

        The sample jar package is named SparkWorkOSS-1.0-SNAPSHOT-jar-with-dependencies.jar.

    2. Upload the JAR resource.

      1. After finishing code development, log in to the OSS console. In the left-side navigation pane for the region, click Bucket list.

      2. Click the name of the destination bucket to open the File Management page.

        This topic uses the onaliyun-bucket-2 bucket as an example.

      3. Click Create Directory to create a directory to store the JAR resource.

        Set Catalog Name to emr/jars to create the storage directory for the JAR resource.

      4. Upload the JAR resource to the storage directory.

        Go to the directory and click Upload File. In the Files to Upload area, click Scan Files, add the SparkWorkOSS-1.0-SNAPSHOT-jar-with-dependencies.jar file to the bucket, and then click Upload File.

    3. Reference the JAR resource.

      1. Edit the code to reference the JAR resource.

        On the editing page of the created EMR Spark node, edit the code to reference the JAR resource.

        spark-submit --class com.aliyun.emr.example.spark.SparkMaxComputeDemo --master yarn ossref://onaliyun-bucket-2/emr/jars/SparkWorkOSS-1.0-SNAPSHOT-jar-with-dependencies.jar

        Parameter description:

        Parameter

        Description

        class

        The fully qualified name of the main class to run.

        master

        The running mode of the Spark application.

        ossref file path

        The format is ossref://{endpoint}/{bucket}/{object}.

        • Endpoint: The endpoint of the OSS service. If this parameter is left empty, the OSS resource must be in the same region as the EMR cluster.

        • Bucket: A container in OSS for storing objects. Each bucket has a unique name. Log in to the OSS console to view all buckets under your account.

        • Object: A specific object, such as a file name or path, stored in a bucket.

      2. Run the EMR Spark node job.

        After you finish editing, click the image icon and select the Serverless resource group that you created to run the EMR Spark node. After the job is complete, record the applicationIds from the console, for example, application_1730367929285_xxxx.

      3. View the result.

        Create an EMR Shell node and run the yarn logs -applicationId application_1730367929285_xxxx command on the node to view the execution result:

        yarn logs -applicationId application_xxx
        Process Output>>> LogLastModifiedTime:Fri Nov 01 16:27:46 +0800 2024
        Process Output>>> LogLength:21
        Process Output>>> LogContents:
        Process Output>>> Spark version: 3.4.2
        Process Output>>>

    (Optional) Configure advanced parameters

    You can configure the following unique property parameters in the Scheduling Settings pane on the right side of the node, under EMR Node Parameters > DataWorks parameters.

    Note
    • The available advanced parameters vary by EMR cluster type, as shown in the following tables.

    • For more open-source Spark properties, you can configure them in the Scheduling Settings pane under EMR Node Parameters > Spark parameter.

    Datalake cluster/custom cluster: EMR on ECS

    Advanced parameter

    Description

    queue

    The scheduling queue for the job. The default is default.

    If you configured a workspace-level YARN Resource Queue when you registered the EMR cluster with the DataWorks workspace:

    • If you select the Global Settings Take Precedence checkbox and set it to Yes, the Spark job uses the queue configured when you registered the EMR cluster.

    • If you do not select the checkbox, the Spark job uses the queue configured for the EMR Spark node.

    For more information about EMR YARN, see Basic queue configuration. For more information about the queue configuration when you register an EMR cluster, see Set the global YARN resource queue.

    priority

    The priority. The default value is 1.

    FLOW_SKIP_SQL_ANALYZE

    The execution mode for SQL statements. Valid values:

    • true: Executes multiple SQL statements at a time.

    • false (default): Executes one SQL statement at a time.

    Note

    This parameter is supported only for test runs in the data development environment.

    Other

    • You can add custom Spark parameters in Advanced Configurations. For example, if you add spark.eventLog.enabled : false , DataWorks automatically adds the parameter to the code that is submitted to the EMR cluster in the --conf key=value format.

    • You can also configure global Spark parameters. For more information, see Set global Spark parameters.

      Note

      To enable Ranger access control, add the spark.hadoop.fs.oss.authorization.method=ranger configuration in Set global Spark parameters to ensure that Ranger access control takes effect.

    Spark cluster: EMR on ACK

    Advanced parameter

    Description

    FLOW_SKIP_SQL_ANALYZE

    The execution mode for SQL statements. Valid values:

    • true: Executes multiple SQL statements at a time.

    • false: Executes one SQL statement at a time.

    Note

    This parameter is supported only for test runs in the data development environment.

    Other

    • You can add custom Spark parameters in Advanced Configurations. For example, spark.eventLog.enabled: false. DataWorks automatically adds the parameter to the code that is submitted to the EMR cluster in the --conf key=value format.

    • You can also configure global Spark parameters. For more information, see Set global Spark parameters.

    Hadoop cluster: EMR on ECS

    Advanced parameter

    Description

    queue

    The scheduling queue to which the job is submitted. The default value is default.

    If you configure a workspace-level YARN resource queue when you register the EMR cluster with the DataWorks workspace:

    • If you set Global Settings Take Precedence to Yes, the queue for a Spark job defaults to the configuration from the EMR cluster registration.

    • If you do not select the checkbox, the queue configured for the EMR Spark node is used.

    For more information about EMR YARN, see Basic queue configuration. For more information about the queue configuration during EMR cluster registration, see Set a global YARN resource queue.

    priority

    The priority. The default value is 1.

    FLOW_SKIP_SQL_ANALYZE

    The execution mode for SQL statements. Valid values:

    • true: Executes multiple SQL statements at a time.

    • false: Executes one SQL statement at a time.

    Note

    This parameter is supported only for test runs in the data development environment.

    USE_GATEWAY

    Specifies whether to submit the job for this node through a Gateway cluster. Valid values:

    • true: Submits the job through the Gateway cluster.

    • false: Does not submit the job through a Gateway cluster. By default, the job is submitted to the header node.

    Note

    If you set this parameter to true but the node's cluster is not associated with a Gateway cluster, subsequent EMR job submissions will fail.

    Other

    • You can add custom Spark parameters directly in Advanced Configurations. For an entry such as spark.eventLog.enabled : false , DataWorks automatically adds it to the code submitted to the EMR cluster in the --conf key=value format.

    • You can also configure global Spark parameters. For more information, see Set global Spark parameters.

      Note

      To enable Ranger access control, add the spark.hadoop.fs.oss.authorization.method=ranger configuration in Set global Spark parameters to ensure that Ranger access control takes effect.

    EMR Serverless Spark cluster

    For more information about parameter settings, see Set parameters for submitting a Spark job.

    Advanced parameter

    Description

    queue

    The scheduling queue for the job. The default is dev_queue.

    priority

    The priority. The default value is 1.

    FLOW_SKIP_SQL_ANALYZE

    The execution mode for SQL statements. Valid values:

    • true: Executes multiple SQL statements at a time.

    • false: Executes one SQL statement at a time.

    Note

    This parameter is supported only for test runs in the data development environment.

    SERVERLESS_RELEASE_VERSION

    The Spark engine version. By default, the job uses the Default Engine Version configured for the cluster in Management Center under Clusters. You can configure this parameter to set different engine versions for different jobs.

    SERVERLESS_QUEUE_NAME

    The resource queue. By default, the job uses the Default Resource Queue configured for the cluster in Management Center under Clusters. If you have resource isolation and management requirements, you can add queues. For more information, see Manage resource queues.

    Other

    • You can add custom Spark parameters in the advanced configuration. For example, if you add spark.eventLog.enabled : false , DataWorks automatically adds the parameter to the code that is submitted to the EMR cluster in the --conf key=value format.

    • You can also configure global Spark parameters. For more information, see Set global Spark parameters.

    Run the Spark job

    1. In the Run Configuration pane, go to the Compute Resource section and configure the Compute Resource and DataWorks Resource Group.

      Note
      • You can also CUs for Scheduling based on the resources required by the task. The default CU is 0.25.

      • To access a data source over the public internet or in a VPC, you must use a scheduling resource group with verified network connectivity to the data source. For more information, see Network connectivity solutions.

    2. In the parameter dialog box on the toolbar, select the corresponding data source, and then click Run.

  2. To run the node job periodically, configure its scheduling properties. For more information, see Configure scheduling properties for a node.

    Note

    If you need to customize the component environment, you can create a custom image based on the official image dataworks_emr_base_task_pod and use the image in Data Development.

    For example, when you create a custom image, you can replace Spark JAR packages or depend on specific libraries, files, or JAR packages.

  3. After you configure the node job, you must deploy the node. For more information, see Deploy a node or workflow.

  4. After the job is deployed, you can go to Operation Center to view the run status of the periodic job. For more information, see Get started with Operation Center.

FAQ

Related topics