本文为您介绍如何搭建MaxCompute Spark开发环境。

前提条件

搭建Spark开发环境前,请确保您已经在操作系统中安装如下软件:
  • JDK 1.8
    以Linux操作系统为例,安装命令示例如下。JDK名称请以实际为准。具体的JDK名称可通过执行yum -y list java*获取。
    yum install -y java-1.8.0-openjdk-devel.x86_64
  • Python 2.7
    以Linux操作系统为例,安装命令示例如下。Python包名称请以实际为准。
    # 获取Python包。
    wget https://www.python.org/ftp/python/2.7.10/Python-2.7.10.tgz
    # 解压缩Python包。
    tar -zxvf Python-2.7.10.tgz
    # 切换到解压后的目录,指定安装路径。
    cd Python-2.7.10
    ./configure --prefix=/usr/local/python2
    # 编译并安装Python。
    make
    make install
  • Maven
    以Linux操作系统为例,安装命令示例如下。Maven包路径请以实际为准。
    # 获取Maven包。
    wget https://dlcdn.apache.org/maven/maven-3/3.8.4/binaries/apache-maven-3.8.4-bin.tar.gz
    # 解压缩Maven包。
    tar -zxvf apache-maven-3.8.4-bin.tar.gz
  • Git
    以Linux操作系统为例,安装命令示例如下。
    # 获取Git包。
    wget https://github.com/git/git/archive/v2.17.0.tar.gz
    # 解压缩Git包。
    tar -zxvf v2.17.0.tar.gz
    # 安装编译源码所需依赖。
    yum install curl-devel expat-devel gettext-devel openssl-devel zlib-devel gcc perl-ExtUtils-MakeMaker
    # 切换到解压后的目。
    cd git-2.17.0
    # 编译。
    make prefix=/usr/local/git all
    # 安装Git至/usr/local/git路径。
    make prefix=/usr/local/git install

下载MaxCompute Spark客户端包并上传至操作系统

MaxCompute Spark发布包集成了MaxCompute认证功能。作为客户端工具,它通过Spark-Submit方式提交作业到MaxCompute项目中运行。MaxCompute提供了面向Spark1.x、Spark2.x和Spark3.x发布包,下载路径如下:
将MaxCompute Spark客户端包上传至Linux操作系统中,并解压。您可以进入Spark客户端包所在目录,执行如下命令解压包。
tar -xzvf spark-2.3.0-odps0.33.0.tar.gz

设置环境变量

您需要在操作系统的命令行执行窗口配置如下环境变量信息,以Linux系统为例,配置方法及信息如下。

  • 配置Java环境变量。
    • 获取Java安装路径。命令示例如下。
      # 如果通过yum方式安装,默认安装在usr目录下,您可以按照如下命令查找。如果您自定义了安装路径,请以实际路径为准。
      whereis java
      ls -lrt /usr/bin/java
      ls -lrt /etc/alternatives/java
      
      # 返回信息如下。/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.322.b06-1.1.al7.x86_64即为安装路径。
      /etc/alternatives/java -> /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.322.b06-1.1.al7.x86_64/jre/bin/java
      查找Java路径
    • 编辑Java环境变量信息。命令示例如下。
      # 编辑环境变量配置文件。
      vim /etc/profile
      
      # 按下i进入编辑状态后,在配置文件末尾添加环境变量信息。
      # JAVA_HOME需要修改为实际Java的安装路径。
      export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.322.b06-1.1.al7.x86_64
      export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
      export PATH=$JAVA_HOME/bin:$PATH
      # 按ESC退出编辑,按:wq退出配置文件。
      
      # 执行如下命令使修改生效。
      source /etc/profile
      
      # 确认Java已配置成功。
      java -version
      # 返回结果示例如下。
      openjdk version "1.8.0_322"
      OpenJDK Runtime Environment (build 1.8.0_322-b06)
      OpenJDK 64-Bit Server VM (build 25.322-b06, mixed mode)
      Java环境变量
  • 配置Spark环境变量。
    • 获取Spark客户端包解压后的路径。图示如下,表明包所在路径为/root/spark-2.3.0-odps0.33.0。请以实际解压路径及名称为准。获取Spark包所在路径
    • 编辑Spark环境变量信息。命令示例如下。
      # 编辑环境变量配置文件。
      vim /etc/profile
      # 按下i进入编辑状态后,在配置文件末尾添加环境变量信息。
      # SPARK_HOME需要修改为实际解压后的Spark客户端包所在路径。
      export SPARK_HOME=/root/spark-2.3.0-odps0.33.0
      export PATH=$SPARK_HOME/bin:$PATH
      # 按ESC退出编辑,按:wq退出配置文件。
      
      # 执行如下命令使修改生效。
      source /etc/profile
      配置Spark环境变量
  • 配置Python环境变量。

    使用PySpark的用户,需要配置该信息。

    • 获取Python安装路径。命令示例如下。配置Python环境变量
    • 编辑Python环境变量信息。命令示例如下。
      # 编辑环境变量配置文件。
      vim /etc/profile
      
      # 按下i进入编辑状态后,在配置文件末尾添加环境变量信息。
      # PATH需要修改为Python的实际安装路径。
      export PATH=/usr/bin/python/bin/:$PATH
      # 按ESC退出编辑,按:wq退出配置文件。
      
      # 执行如下命令使修改生效。
      source /etc/profile
      
      # 确认Python已配置成功。
      python --version
      # 返回结果示例如下。
      Python 2.7.5
      配置Python环境变量
  • 配置Maven环境变量。
    • 获取Maven包解压后的路径。图示如下,表明包所在路径为/root/apache-maven-3.8.4。请以实际解压路径及名称为准。Maven包路径
    • 编辑Maven环境变量信息。命令示例如下。
      # 编辑环境变量配置文件。
      vim /etc/profile
      
      # 按下i进入编辑状态后,在配置文件末尾添加环境变量信息。
      # MAVEN_HOME需要修改为实际解压后的Maven包所在路径。
      export MAVEN_HOME=/root/apache-maven-3.8.4
      export PATH=$MAVEN_HOME/bin:$PATH
      # 按ESC退出编辑,按:wq退出配置文件。
      # 执行如下命令使修改生效。
      source /etc/profile
      
      # 确认Maven已配置成功。
      mvn -version
      # 返回结果示例如下。
      Apache Maven 3.8.4 (9b656c72d54e5bacbed989b64718c159fe39b537)
      Maven home: /root/apache-maven-3.8.4
      Java version: 1.8.0_322, vendor: Red Hat, Inc., runtime: /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.322.b06-1.1.al7.x86_64/jre
      Default locale: en_US, platform encoding: UTF-8
      OS name: "linux", version: "4.19.91-25.1.al7.x86_64", arch: "amd64", family: "unix"
  • 配置Git环境变量。
    • 获取Git安装路径。命令示例如下。
      whereis git
      获取git安装路径
    • 编辑Git环境变量信息。命令示例如下。
      # 编辑环境变量配置文件。
      vim /etc/profile
      
      # 按下i进入编辑状态后,在配置文件末尾添加环境变量信息。# PATH需要修改为Git的实际安装路径。
      export PATH=/usr/local/git/bin/:$PATH
      # 按ESC退出编辑,按:wq退出配置文件。
      
      # 执行如下命令使修改生效。
      source /etc/profile
      
      # 确认Git已配置成功。
      git --version
      # 返回结果示例如下。
      git version 2.24.4
      配置Git环境变量

配置spark-defaults.conf

第一次使用MaxCompute Spark客户端时,请在Spark客户端包的解压路径下,将conf文件夹下的spark-defaults.conf.template文件重命名为spark-defaults.conf后再进行相关配置。如果没有对文件进行重命名,将会导致配置无法生效。命令示例如下。
# 切换至Spark客户端包的解压路径,并进入conf文件夹。请以实际路径为准。
cd /root/spark-2.3.0-odps0.33.0/conf

# 修改文件名。
mv spark-defaults.conf.template spark-defaults.conf

# 编辑spark-defaults.conf。
vim spark-defaults.conf
# 按下i进入编辑状态后,在配置文件末尾添加如下配置信息。
spark.hadoop.odps.project.name = <MaxCompute_project_name>  
spark.hadoop.odps.access.id = <AccessKey_id>     
spark.hadoop.odps.access.key = <AccessKey_secret>
spark.hadoop.odps.end.point = <Endpoint>   # Spark客户端连接访问MaxCompute项目的Endpoint,您可以根据自己情况进行修改。详情请参见Endpoint。
spark.hadoop.odps.runtime.end.point = <VPC_endpoint>  # Spark运行环境Endpoint,所在Region的MaxCompute VPC网络的Endpoint。您可以根据自己情况进行修改。
# spark 2.3.0请将spark.sql.catalogImplementation设置为odps,spark 2.4.5请将spark.sql.catalogImplementation设置为hive。
spark.sql.catalogImplementation={odps|hive} 
# 如下参数配置保持不变
spark.hadoop.odps.task.major.version = cupid_v2
spark.hadoop.odps.cupid.container.image.enable = true
spark.hadoop.odps.cupid.container.vm.engine.type = hyper
spark.hadoop.odps.cupid.webproxy.endpoint = http://service.cn.maxcompute.aliyun-inc.com/api
spark.hadoop.odps.moye.trackurl.host = http://jobview.odps.aliyun.com
  • MaxCompute_project_name:待访问MaxCompute项目的名称。

    此处为MaxCompute项目名称,非工作空间名称。您可以登录MaxCompute控制台,左上角切换地域后,即可在项目管理页签查看到具体的MaxCompute项目名称。

  • AccessKey_id:具备目标MaxCompute项目访问权限的AccessKey ID。

    您可以进入AccessKey管理页面获取AccessKey ID。

  • AccessKey_secret:AccessKey ID对应的AccessKey Secret。

    您可以进入AccessKey管理页面获取AccessKey Secret。

  • Endpoint:MaxCompute项目所属区域的外网Endpoint。

    各地域的外网Endpoint信息,请参见各地域Endpoint对照表(外网连接方式)

  • VPC_endpoint:MaxCompute项目所属区域的VPC网络的Endpoint。

    各地域的VPC网络Endpoint信息,请参见各地域Endpoint对照表(阿里云VPC网络连接方式)

特殊场景和功能,需要开启一些其他的配置参数,详情请参见Spark配置详解

准备项目工程

MaxCompute Spark提供了项目工程模版,建议您下载模版复制后直接在模版里开发。
注意 模版工程里的关于spark依赖的scope为provided,请不要更改,否则提交的作业无法正常运行。
以Linux操作系统为例,准备项目工程命令示例如下:
  • 下载Spark-1.x模板并编译
    git clone https://github.com/aliyun/MaxCompute-Spark.git
    cd MaxCompute-Spark/spark-1.x
    mvn clean package
  • 下载Spark-2.x 模板并编译
    git clone https://github.com/aliyun/MaxCompute-Spark.git
    cd MaxCompute-Spark/spark-2.x
    mvn clean package
  • 下载Spark-3.x 模板并编译
    git clone https://github.com/aliyun/MaxCompute-Spark.git
    cd MaxCompute-Spark/spark-3.x
    mvn clean package

上述命令执行完毕后,如果显示创建失败,说明环境配置有误,请按照上述配置指导仔细检查并修正环境配置信息。

配置依赖说明

在准备的MaxCompute Spark项目下,配置依赖信息。命令示例如下。

  • 配置访问MaxCompute表所需的依赖。
    • 使用Spark-1.x模板场景
      # 进入spark-1.x文件夹。
      cd MaxCompute-Spark/spark-1.x
      
      # 编辑Pom文件,添加odps-spark-datasource依赖。
      <dependency>
        <groupId>com.aliyun.odps</groupId>
        <artifactId>odps-spark-datasource_2.10</artifactId>
        <version>3.3.8-public</version>
      </dependency>                           
    • 使用Spark-2.x模板场景
      # 进入spark-2.x文件夹。
      cd MaxCompute-Spark/spark-2.x
      
      # 编辑Pom文件,添加odps-spark-datasource依赖。
      <dependency>
          <groupId>com.aliyun.odps</groupId>
          <artifactId>odps-spark-datasource_2.11</artifactId>
          <version>3.3.8-public</version>
      </dependency>
  • 配置访问OSS所需的依赖。
    如果作业需要访问OSS,直接添加以下依赖即可。
    <dependency>
        <groupId>com.aliyun.odps</groupId>
        <artifactId>hadoop-fs-oss</artifactId>
        <version>3.3.8-public</version>
    </dependency>

更多Spark-1.x、Spark-2.x以及Spark-3.x的依赖配置信息,请参见Spark-1.x pom文件Spark-2.x pom文件Spark-3.x pom文件

引用外部文件

用户在开发过程中涉及到如下场景时,需要引用外部文件:
  • 作业需要读取一些配置文件。
  • 作业需要额外的资源包或第三方库。例如JAR包、Python库。
在实际操作中,您需要先上传文件后才可以引用文件,上传文件方式有以下两种,任选其中一种即可:
  • 方式一:通过Spark参数上传文件

    MaxCompute Spark支持Spark社区版原生的--jars--py-files--files--archives参数,您可以在提交作业时,通过这些参数上传文件,这些文件在作业运行时会被上传到用户的工作目录下。

    • 通过MaxCompute Spark客户端,使用Spark-Submit方式上传文件。
      说明
      • --jars:会将配置的JAR包上传至Driver和Executor的当前工作目录,多个文件用英文逗号(,)分隔。这些JAR包都会加入Driver和Executor的Classpath。在Spark作业中直接通过"./your_jar_name"即可引用,与社区版Spark行为相同。
      • --files--py-files:会将配置的普通文件或Python文件上传至Driver和Executor的当前工作目录,多个文件用英文逗号(,)分隔。在Spark作业中直接通过"./your_file_name"即可引用,与社区版Spark行为相同。
      • --archives:与社区版Spark行为略有不同,多个文件用英文逗号(,)分隔,配置方式为xxx#yyy,会将配置的归档文件(例如.zip)解压到Driver和Executor的当前工作目录的子目录中。例如当配置方式为xx.zip#yy时,应以"./yy/xx/"引用到归档文件中的内容;当配置方式为xx.zip时,应以"./xx.zip/xx/"引用到归档文件中的内容。如果一定要将归档内容直接解压到当前目录,即直接引用"./xxx/",请使用下文中的spark.hadoop.odps.cupid.resources参数进行配置。
    • 通过DataWorks,添加作业需要的资源,操作详情请参见创建MaxCompute资源
      说明 DataWorks中上传资源限制最大为50 MB,如果需要使用更大的资源,您需要通过MaxCompute客户端将资源上传为MaxCompute资源,并将资源添加至数据开发面板。更多MaxCompute资源信息,请参见MaxCompute资源
  • 方式二:通过MaxCompute资源上传文件
    MaxCompute Spark提供spark.hadoop.odps.cupid.resources参数,可以直接引用MaxCompute中的资源,这些资源在作业运行时会被上传到用户的工作目录下。使用方式如下:
    1. 通过MaxCompute客户端将文件上传至MaxCompute项目。单个文件最大支持500 MB。
    2. 在Spark作业配置中添加spark.hadoop.odps.cupid.resources参数,指定Spark作业运行所需要的MaxCompute资源。格式为<projectname>.<resourcename>,如果需要引用多个文件,需要用英文逗号(,)分隔。配置示例如下:
      spark.hadoop.odps.cupid.resources=public.python-python-2.7-ucs4.zip,public.myjar.jar
      指定的资源将被下载到Driver和Executor的当前工作目录,资源下载到工作目录后默认名称是<projectname>.<resourcename>
      此外,您可以在配置时通过<projectname>.<resourcename>:<newresourcename>方式重命名资源名称。配置示例如下:
      spark.hadoop.odps.cupid.resources=public.myjar.jar:myjar.jar
      注意 该配置项必须在spark-defaults.conf或DataWorks的配置项中进行配置才能生效,不能写在代码中。
通过上述两种方式将文件上传后,即可在代码中引用文件,文件读取示例如下:
val targetFile = "文件名"
val file = Source.fromFile(targetFile)
for (line <- file.getLines)
    println(line)
file.close

SparkPi冒烟测试

完成以上的工作之后,执行冒烟测试,验证MaxCompute Spark是否可以端到端连通。以Spark-2.x为例,您可以提交一个SparkPi验证功能是否正常,提交命令如下。
# /path/to/MaxCompute-Spark请指向正确的编译出来后的应用程序的Jar包。
cd $SPARK_HOME
bin/spark-submit --master yarn-cluster --class com.aliyun.odps.spark.examples.SparkPi \
/path/to/MaxCompute-Spark/spark-2.x/target/spark-examples_2.11-1.0.0-SNAPSHOT-shaded.jar

# 当看到以下日志表明冒烟作业成功。
19/06/11 11:57:30 INFO Client: 
         client token: N/A
         diagnostics: N/A
         ApplicationMaster host: 11.222.166.90
         ApplicationMaster RPC port: 38965
         queue: queue
         start time: 1560225401092
         final status: SUCCEEDED

IDEA本地执行注意事项

通常,本地调试成功后会在集群上执行代码。但是Spark可以支持在IDEA里以Local模式直接运行代码,运行时请注意以下几点:
  • 代码需要手动设置spark.master
    val spark = SparkSession
          .builder()
          .appName("SparkPi")
          .config("spark.master", "local[4]") // 需要设置spark.master为local[N]才能直接运行,N为并发数。
          .getOrCreate()
  • 在IDEA里手动添加MaxCompute Spark客户端的相关依赖。
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_${scala.binary.version}</artifactId>
        <version>${spark.version}</version>
        <scope>provided</scope> 
    </dependency>
    pom.xml中设置要求scope为provided,所以运行时会出现NoClassDefFoundError报错。
    Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/sql/SparkSession$
        at com.aliyun.odps.spark.examples.SparkPi$.main(SparkPi.scala:27)
        at com.aliyun.odps.spark.examples.Spa。r。kPi.main(SparkPi.scala)
    Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.SparkSession$
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 2 more
    您可以按照以下方式手动将MaxCompute Spark下的Jars目录加入IDEA模板工程项目中,既可以保持scope=provided,又能在IDEA里直接运行不报错:
    1. 在IDEA中单击顶部菜单栏上的File,选中Project Structure…44
    2. Project Structure页面,单击左侧导航栏上的Modules。选择资源包,并单击资源包的Dependencies页签。
    3. 在资源包的Dependencies页签下,单击左下角的+,选择JARs or directories…添加MaxCompute Spark下的Jars目录。
  • Local不能直接引用spark-defaults.conf里的配置,需要手动指定相关配置。
    通过Spark-Submit方式提交作业,系统会读取spark-defaults.conf文件中的配置。通过Local模式提交作业,需要手动在代码里指定相关配置。例如,在Local模式下如果要通过Spark-Sql读取MaxCompute的表,配置如下。
    val spark = SparkSession
          .builder()
          .appName("SparkPi")
          .config("spark.master", "local[4]") // 需设置spark.master为local[N]才能直接运行,N为并发数。
          .config("spark.hadoop.odps.project.name", "****")
          .config("spark.hadoop.odps.access.id", "****")
          .config("spark.hadoop.odps.access.key", "****")
          .config("spark.hadoop.odps.end.point", "http://service.cn.maxcompute.aliyun.com/api")
          .config("spark.sql.catalogImplementation", "odps")
          .getOrCreate()

Spark 2.4.5使用注意事项

  • 使用Spark 2.4.5提交作业
    • 直接使用Yarn-cluster模式在本地提交任务。详情请参见Cluster模式
    • 在DataWorks中配置参数spark.hadoop.odps.spark.version=spark-2.4.5-odps0.33.0。当前DataWorks独享资源组尚未升级到Spark 2.4.5,用户可以采用公共资源组进行调度,或联系MaxCompute技术支持团队进行升级。
  • Spark 2.4.5使用变化
    • 如果使用Yarn-cluster模式在本地提交任务,需要新增环境变量export HADOOP_CONF_DIR=$SPARK_HOME/conf
    • 如果使用Local模式进行调试,需要在$SPARK_HOME/conf目录下新建odps.conf文件,并添加如下配置。
      odps.project.name = 
      odps.access.id = 
      odps.access.key =
      odps.end.point =
  • Spark 2.4.5参数配置变化
    • spark.sql.catalogImplementation配置为hive
    • spark.sql.sources.default配置为hive
    • spark.sql.odps.columnarReaderBatchSize,向量化读每个batch包含的行数,默认值为4096。
    • spark.sql.odps.enableVectorizedReader,开启向量化读,默认值为True。
    • spark.sql.odps.enableVectorizedWriter,开启向量化写,默认值为True。
    • spark.sql.odps.split.size,该配置可以用来调节读MaxCompute表的并发度,默认每个分区为256 MB。

Spark 3.1.1使用注意事项

  • 使用Spark 3.1.1提交作业
    • 直接使用Yarn-cluster模式在本地提交任务。详情请参见Cluster模式
    • DataWorks平台尚未部署,暂时不支持直接通过DataWorks模式提交。针对Scala、Java类型的作业,可以在spark-defaults.conf配置文件添加以下参数运行Spark 3.x。
      spark.hadoop.odps.cupid.resources = public.__spark_libs__3.1.1-odps0.33.0.zip,[projectname].[用户主jar包],[projectname].[用户其他jar包]                 
      spark.driver.extraClassPath = ./public.__spark_libs__3.1.1-odps0.33.0.zip/*            
      spark.executor.extraClassPath = ./public.__spark_libs__3.1.1-odps0.33.0.zip/* 
      说明
      • 采用如上方式提交任务,需要将所依赖的资源全部添加到spark.hadoop.odps.cupid.resources参数中(包括主JAR包),否则可能出现类找不到的问题。
      • 暂时还无法通过以上方式提交Pyspark作业。
  • Spark 3.1.1使用变化
    • 如果使用Yarn-cluster模式在本地提交任务,需要新增环境变量export HADOOP_CONF_DIR=$SPARK_HOME/conf
    • 如果使用Yarn-cluster模式提交PySpark作业,需要在spark-defaults.conf配置文件添加以下参数使用Python3。
      spark.hadoop.odps.cupid.resources = public.python-3.7.9-ucs4.tar.gz
      spark.pyspark.python = ./public.python-3.7.9-ucs4.tar.gz/python-3.7.9-ucs4/bin/python3
    • 如果使用Local模式进行调试:
      • 需要在$SPARK_HOME/conf目录下新建odps.conf文件,并添加如下配置。
        odps.project.name = 
        odps.access.id = 
        odps.access.key =
        odps.end.point =
      • 需要添加spark.hadoop.fs.defaultFS = file:///,示例如下。
        val spark = SparkSession
          .builder()
          .config("spark.hadoop.fs.defaultFS", "file:///")
          .enableHiveSupport()
          .getOrCreate()
  • Spark 3.1.1参数配置变化
    • spark.sql.defaultCatalog配置为odps
    • spark.sql.catalog.odps配置为org.apache.spark.sql.execution.datasources.v2.odps.OdpsTableCatalog
    • spark.sql.sources.partitionOverwriteMode配置为dynamic
    • spark.sql.extensions配置为org.apache.spark.sql.execution.datasources.v2.odps.extension.OdpsExtensions
    • spark.sql.odps.enableVectorizedReader,开启向量化读,默认值为True。
    • spark.sql.odps.enableVectorizedWriter,开启向量化写,默认值为True。
    • spark.sql.catalog.odps.splitSizeInMB,该配置可以用来调节读MaxCompute表的并发度,默认每个分区为256 MB。