DataWorks的EMR(E-MapReduce) SPARK节点,用于进行复杂的内存分析,构建大型、低延迟的数据分析应用。本文为您介绍如何创建EMR Spark节点,并通过测试计算Pi及Spark对接MaxCompute两个示例,为您介绍EMR Spark节点的功能。

前提条件

  • 您已创建阿里云EMR集群,且集群所在的安全组中入方向的安全策略包含以下策略。
    • 授权策略:允许
    • 协议类型:自定义 TCP
    • 端口范围:8898/8898
    • 授权对象:100.104.0.0/16
  • 您在工作空间配置页面添加E-MapReduce计算引擎实例后,当前页面才会显示EMR目录。详情请参见配置工作空间
  • 如果EMR启用了Ranger,则使用DataWorks进行EMR的作业开发前,您需要在EMR中修改配置,添加白名单配置并重启Hive,否则作业运行时会报错Cannot modify spark.yarn.queue at runtimeCannot modify SKYNET_BIZDATE at runtime
    1. 白名单的配置通过EMR的自定义参数,添加Key和Value进行配置,以Hive组件的配置为例,配置值如下。
      hive.security.authorization.sqlstd.confwhitelist.append=tez.*|spark.*|mapred.*|mapreduce.*|ALISA.*|SKYNET.*
      说明 其中 ALISA.*SKYNET.*为DataWorks专有的配置。
    2. 白名单配置完成后需要重启服务,重启后配置才会生效。重启服务的操作详情请参见重启服务
  • 已开通独享调度资源组,并且独享调度资源组需要绑定EMR所在的VPC专有网络,详情请参见新增和使用独享调度资源组
    说明 仅支持使用独享调度资源组运行该类型任务。

创建EMR Spark节点

  1. 进入数据开发页面。
    1. 登录DataWorks控制台
    2. 在左侧导航栏,单击工作空间列表
    3. 选择工作空间所在地域后,单击相应工作空间后的进入数据开发
  2. 鼠标悬停至新建图标,单击EMR > EMR Spark
    您也可以找到相应的业务流程,右键单击 EMR,选择 新建 > EMR Spark
  3. 新建节点对话框中,输入节点名称,并选择目标文件夹
    说明 节点名称必须是大小写字母、中文、数字、下划线(_)和小数点(.),且不能超过128个字符。
  4. 编辑高级配置
    • "USE_GATEWAY":true ,表示任务会被提交到EMR gateway上执行,默认提交到header节点。
    • "SPARK_CONF": "--conf spark.driver.memory=2g --conf xxx=xxx" ,设置spark 任务运行参数,多个参数在该key中追加。
    • “queue”:提交作业的调度队列,默认为default队列。
    • “vcores”: 虚拟核数,默认为1。
    • “memory”:内存,默认为2048MB(用于设置启动器Launcher的内存配额)。
    • “priority”:优先级,默认为1。
    • “FLOW_SKIP_SQL_ANALYZE”:SQL语句执行方式,参数值为false表示每次执行一条SQL语句;参数值为true表示每次执行多条SQL语句。
  5. 调度配置界面对节点配置调度相关参数。
    • 配置任务调度的基本信息,详情请参见基础属性
    • 配置时间调度周期、重跑属性和上下游依赖关系,详情请参见配置时间属性配置调度依赖
      说明 您需要设置节点的 重跑属性依赖的上游节点,才可以提交节点。
    相关配置详情请参见: 基础属性
  6. 单击提交
    您可以在 EMR节点下,单击所使用的目标引擎,在 数据开发中找到新创建的EMR Spark节点。
    说明 如果您的工作空间绑定多个EMR引擎,需要选择EMR引擎。如果仅绑定一个EMR引擎,则无需选择。
    EMR Spark

保存并提交节点任务

  1. 保存并提交节点。
    注意 您需要设置节点的 重跑属性依赖的上游节点,才可以提交节点。
    1. 单击工具栏中的保存图标,保存节点。
    2. 单击工具栏中的提交图标。
    3. 提交新版本对话框中,输入变更描述
    4. 单击确认
    如果您使用的是标准模式的工作空间,提交成功后,请单击右上方的 发布。具体操作请参见 发布任务
  2. 测试节点,详情请参见查看周期任务

数据开发示例一:使用计算Pi测试当前EMR Spark环境是否可用

示例一以Spark自带示例项目计算Pi为例,测试当前EMR Spark环境是否可用。示例详情请参见示例项目使用说明

  1. 获取Spark自带示例的JAR包spark-examples_2.11-2.4.5.jar的存放路径。
    Spark组件安装在 /usr/lib/spark-current路径下,您需要登录 阿里云E-MapReduce控制台,进入目标EMR集群查询完整的路径 /usr/lib/spark-current/examples/jars/spark-examples_2.11-2.4.5.jar,详情请参见 EMR常用文件路径示例路径
  2. 在创建的EMR Spark节点编辑页面,输入运行代码。创建EMR Spark节点,详情请参见创建EMR Spark节点
    示例运行代码如下。
    --class org.apache.spark.examples.SparkPi --master local[8] /usr/lib/spark-current/examples/jars/spark-examples_2.11-2.4.5.jar 100
    您仅需填写 spark-submit后面的内容即可,在作业提交时会自动补全 spark-submit的内容。实际执行的界面代码如下。
    # spark-submit [options] --class [MainClass] xxx.jar args
    spark-submit --class org.apache.spark.examples.SparkPi --master local[8] /usr/lib/spark-current/examples/jars/spark-examples_2.11-2.4.5.jar 100
  3. 保存并提交运行节点任务,详情请参见保存并提交节点任务章节内容。
当返回结果为 1097: Pi is roughly 3.1415547141554714时,表示运行成功,EMR Spark环境可用。 示例一返回结果

数据开发示例二:Spark对接MaxCompute

本示例以Spark对接MaxCompute,实现通过Spark统计MaxCompute表的行数为例,为您介绍EMR Spark节点的功能应用。更多应用场景请参见EMR Spark开发指南

执行本示例前,您需要准备如下相关环境及测试数据:
  • 准备环境。
    • DataWorks工作空间绑定EMR引擎和MaxCompute引擎,详情请参见配置工作空间
    • 开通OSS并创建Bucket,详情请参见创建存储空间
    • 安装了scala的本地IDE(IDEA)。
  • 准备测试数据。
    在DataWorks数据开发页面创建ODPS SQL节点,执行建表语句并插入数据。示例语句如下,设置第一列为BIGINT类型,同时,插入了两条数据记录。创建ODPS SQL节点,详情请参见 创建ODPS SQL节点
    DROP TABLE IF EXISTS emr_spark_read_odpstable ;
    CREATE TABLE IF NOT EXISTS emr_spark_read_odpstable 
    (
        id BIGINT
        ,name STRING
    )
    ;
    INSERT INTO TABLE emr_spark_read_odpstable VALUES (111,'zhangsan'),(222,'lisi') ;
  1. 在Spark中创建Maven工程,添加pom依赖,详情请参见Spark准备工作
    添加pom依赖,代码如下。
        <dependency>
            <groupId>com.aliyun.emr</groupId>
            <artifactId>emr-maxcompute_2.11</artifactId>
            <version>1.9.0</version>
        </dependency>
    您可以参考如下插件代码,在实际使用中请以实际代码为准。
            <build>
            <sourceDirectory>src/main/scala</sourceDirectory>
            <testSourceDirectory>src/test/scala</testSourceDirectory>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.7.0</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
              
                  <plugin>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <configuration>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                    </configuration>
                    <executions>
                        <execution>
                            <id>make-assembly</id>
                            <phase>package</phase>
                            <goals>
                                <goal>single</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
    
                <plugin>
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <version>3.2.2</version>
                    <configuration>
                        <recompileMode>incremental</recompileMode>
                    </configuration>
                    <executions>
                        <execution>
                            <goals>
                                <goal>compile</goal>
                                <goal>testCompile</goal>
                            </goals>
                            <configuration>
                                <args>
                                    <arg>-dependencyfile</arg>
                                    <arg>${project.build.directory}/.scala_dependencies</arg>
                                </args>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
  2. 在Spark中统计MaxCompute表第一列BIGINT类型的行数,详情请参见Spark对接MaxCompute
    示例代码如下。
    /*
     * Licensed to the Apache Software Foundation (ASF) under one or more
     * contributor license agreements.  See the NOTICE file distributed with
     * this work for additional information regarding copyright ownership.
     * The ASF licenses this file to You under the Apache License, Version 2.0
     * (the "License"); you may not use this file except in compliance with
     * the License.  You may obtain a copy of the License at
     *
     *    http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    
    package com.aliyun.emr.example.spark
    
    import com.aliyun.odps.TableSchema
    import com.aliyun.odps.data.Record
    
    import org.apache.spark.aliyun.odps.OdpsOps
    import org.apache.spark.{SparkConf, SparkContext}
    
    object SparkMaxComputeDemo {
      def main(args: Array[String]): Unit = {
        if (args.length < 6) {
          System.err.println(
            """Usage: SparkMaxComputeDemo <accessKeyId> <accessKeySecret> <envType> <project> <table> <numPartitions>
              |
              |Arguments:
              |
              |    accessKeyId      Aliyun Access Key ID.
              |    accessKeySecret  Aliyun Key Secret.
              |    envType          0 or 1
              |                     0: Public environment.
              |                     1: Aliyun internal environment, i.e. Aliyun ECS etc.
              |    project          Aliyun ODPS project
              |    table            Aliyun ODPS table
              |    numPartitions    the number of RDD partitions
            """.stripMargin)
          System.exit(1)
        }
    
        val accessKeyId = args(0)
        val accessKeySecret = args(1)
        val envType = args(2).toInt
        val project = args(3)
        val table = args(4)
        val numPartitions = args(5).toInt
    
        val urls = Seq(
          Seq("http://service.odps.aliyun.com/api", "http://dt.odps.aliyun.com"), // public environment
          Seq("http://odps-ext.aliyun-inc.com/api", "http://dt-ext.odps.aliyun-inc.com") // Aliyun internal environment
        )
    
        val conf = new SparkConf().setAppName("E-MapReduce Demo 3-1: Spark MaxCompute Demo (Scala)")
        val sc = new SparkContext(conf)
        val odpsOps = envType match {
          case 0 =>
            OdpsOps(sc, accessKeyId, accessKeySecret, urls(0)(0), urls(0)(1))
          case 1 =>
            OdpsOps(sc, accessKeyId, accessKeySecret, urls(1)(0), urls(1)(1))
        }
    
        val odpsData = odpsOps.readTable(project, table, read, numPartitions)
    
        println(s"Count (odpsData): ${odpsData.count()}")
      }
    
      def read(record: Record, schema: TableSchema): Long = {
        record.getBigint(0)
      }
    }
    统计MaxCompute数据完成后,请将该数据生成JAR包。示例生成的JAR包为 emr_spark_demo-1.0-SNAPSHOT-jar-with-dependencies.jar
    说明 与ODPS相关的依赖均属于第三方包,您需要将第三方包一并生成JAR包上传至目标EMR集群。
  3. 上传运行资源。
    1. 登录OSS管控台
    2. 上传运行资源(即上一步骤生成的JAR包)至指定OSS路径。
      本示例中,使用的路径为 oss://oss-cn-shanghai-internal.aliyuncs.com/onaliyun-bucket-2/emr_BE/spark_odps/,您需要上传 emr_spark_demo-1.0-SNAPSHOT-jar-with-dependencies.jar至该路径。首次使用OSS路径时,需要先进行一键授权,详情请参见 创建并使用EMR MR节点
      说明 由于DataWorks EMR资源的使用上限为50M,而添加依赖的JAR包通常大于50M,所以您需要在OSS控制台上传。如果您的运行资源小于50M,您也可以选择在DataWorks直接上传,详情请参见 创建和使用EMR JAR资源
      上传运行资源
  4. 创建EMR Spark节点,并执行节点任务。
    本示例创建的节点命名为 emr_spark_odps。创建EMR Spark节点,详情请参见 创建EMR Spark节点
    emr_spark_odps节点的编辑页面,选择所使用的EMR引擎实例,输入如下代码。
    --class com.aliyun.emr.example.spark.SparkMaxComputeDemo --master yarn-client ossref://onaliyun-bucket-2/emr_BE/spark_odps/emr_spark_demo-1.0-SNAPSHOT-jar-with-dependencies.jar <accessKeyId> <accessKeySecret> 1 onaliyun_workshop_dev emr_spark_read_odpstable 1
    其中<accessKeyId> 、<accessKeySecret>、 <envType>、 <project>、 <table> 、<numPartitions>等参数信息您需要替换为实际使用的相关信息。
  5. 保存并提交运行节点任务,详情请参见保存并提交节点任务章节内容。
您可以查看运行日志,当返回结果中表记录条数为 2时,表示统计结果符合预期。 返回结果