Spark常见问题

本文为您介绍使用Spark过程中的常见问题。

问题类别

常见问题

开发Spark

作业报错

如何自检项目工程?

建议您检查如下内容:

  • 检查pom.xml

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_${scala.binary.version}</artifactId>
        <version>${spark.version}</version>
        <scope>provided</scope> // spark-xxxx_${scala.binary.version} 依赖scope必须是provided。
    </dependency>
  • 检查主类spark.master

    val spark = SparkSession
          .builder()
          .appName("SparkPi")
          .config("spark.master", "local[4]") // 如果是以yarn-cluster方式提交,代码中如果有local[N]的配置,将会报错。
          .getOrCreate()
  • 检查主类Scala代码。

    object SparkPi { // 必须是object,如果在IDEA创建文件的时候写为class,main函数是无法加载的。
      def main(args: Array[String]) {
        val spark = SparkSession
          .builder()
          .appName("SparkPi")
          .getOrCreate()
  • 检查主类代码配置。

    val spark = SparkSession
          .builder()
          .appName("SparkPi")
          .config("key1", "value1")
          .config("key2", "value2")
          .config("key3", "value3")
          ...  // 如果执行local测试时,将MaxCompute配置在hard-code代码里,部分配置是无法生效的。
          .getOrCreate()
    说明

    建议您在使用yarn-cluster方式提交任务时,将配置项都写在spark-defaults.conf中。

在DataWorks上运行ODPS Spark节点的步骤是什么?

  1. 在本地Python环境中编辑Spark代码并打包。本地Python环境版本要求为Python 2.7。

  2. 上传资源包至DataWorks。详情请参见创建并使用MaxCompute资源

  3. 在DataWorks上创建ODPS Spark节点。详情请参见创建ODPS Spark节点

  4. 编写代码并运行节点,在DataWorks控制台上即可查看运行结果。

Spark on MaxCompute如何在本地进行调试?

您可以通过IntelliJ IDEA在本地进行调试。详情请参见搭建Linux开发环境

如何通过Spark访问VPC环境内的服务?

更多通过Spark访问VPC环境内的服务信息,请参见Spark访问VPC实例

如何把JAR包当成资源来引用?

您可以通过参数spark.hadoop.odps.cupid.resources指定需要引用的资源。资源可以多个项目共享,建议您设置相关权限确保数据安全。示例如下。

spark.hadoop.odps.cupid.resources = projectname.xx0.jar,projectname.xx1.jar 

如何通过Spark传入参数?

传参详情请参见Spark on DataWorks

如何将Spark流式读取的DataHub数据写入MaxCompute?

示例代码请参见DataHub

如何将开源Spark代码迁移至Spark on MaxCompute?

您可以根据作业场景选择的迁移方案如下:

如何通过Spark处理MaxCompute中的表数据?

Spark on MaxCompute支持Local、Cluster和DataWorks运行模式。三种模式的配置不同,详情请参见运行模式

如何设置Spark资源并行度?

Spark 资源并行度由Executor数量和Executor CPU核数共同决定,任务可并行执行的最大Task数量为Executor数量 * Executor CPU核数

  • Executor数量

    • 参数:spark.executor.instances

    • 参数说明:该参数用于设置作业申请的Executor数量。

  • Executor CPU核数

    • 参数:spark.executor.cores

    • 参数说明:该参数用于设置每个Executor进程的CPU核数,决定每个Executor进程并行执行Task的能力,每个CPU核同一时间只能执行一个Task。通常Executor的CPU核数设置为2~4较为合适。

如何解决内存不足问题?

  • 常见报错:

    • java.lang.OutOfMemoryError: Java heap space

    • java.lang.OutOfMemoryError: GC overhead limit exceeded

    • Cannot allocate memory

    • The job has been killed by "OOM Killer", please check your job's memory usage

  • 解决方案:

    • 设置Executor内存。

      • 参数:spark.executor.memory

      • 参数说明:代表每个Executor的内存。通常与spark.executor.cores保持1:4设置即可,例如设置spark.executor.cores1spark.executor.memory4 GB。当Executor抛出java.lang.OutOfMemoryError异常时,需要调大该值。

    • 设置Executor堆外内存。

      • 参数:spark.executor.memoryOverhead

      • 参数说明:代表每个Executor的额外内存,主要用于JVM自身、字符串、NIO Buffer等开销。默认大小为spark.executor.memory * 0.1,最小384 MB。通常不需要额外设置,当Executor日志出现Cannot allocate memory或OOM Killer报错时,需要调大该值。

    • 设置Driver内存。

      • 参数:spark.driver.memory

      • 参数说明:代表Driver的内存大小。通常与spark.driver.cores保持1:4设置即可。当Driver需要Collect较大数据量,或抛出java.lang.OutOfMemoryError异常时,需要调大该值。

    • 设置Driver堆外内存。

      • 参数:spark.driver.memoryOverhead

      • 参数说明:代表Driver的额外内存。默认为大小spark.driver.memory * 0.1,最小384 MB。当Driver日志出现Cannot allocate memory报错,需要调大该值。

如何解决磁盘不足问题?

  • 问题现象

    出现报错:No space left on device

  • 问题原因:该错误意味着本地磁盘不足,通常该报错会在Executor中出现,并导致Executor退出。

  • 解决方案:

    • 增加磁盘大小

      • 参数:spark.hadoop.odps.cupid.disk.driver.device_size

      • 默认值:20 GB。

      • 参数说明:默认Driver和Executor各提供20GB的本地磁盘,当磁盘空间不足时可以适当调大该参数。注意该参数必须要配置在spark-defaults.conf或DataWorks的配置项中才会生效。

    • 增加Executor数量

      如果您调整本地磁盘大小到100GB以后,仍然报该错误,说明单个Executor Shuffle数据已经超过上限,可能是遇到了数据倾斜,这种情况下对数据重分区会有帮助。也可能确实是数据量太大,那就需要调整spark.executor.instances参数,增加Executor的数量。

如何引用MaxCompute Project中的资源?

当前Spark on MaxCompute支持以下两种方式来访问MaxCompute中的资源:

  • 通过参数配置直接引用MaxCompute资源。

    • 参数:spark.hadoop.odps.cupid.resources

    • 参数格式:<projectname>.<resourcename>[:<newresourcename>]

    • 参数说明:该配置项指定了任务运行所需要的MaxCompute资源,详情请参见资源操作。指定的资源将被下载到Driver和Executor的当前工作目录。同一个任务可引用多个资源,资源之间用逗号分隔。资源下载到工作目录后默认名字是<projectname>.<resourcename>,在配置时通过<projectname>.<resourcename>:<newresourcename>进行重命名。需要注意该配置项必须要配置在spark-default.conf中或DataWorks的配置项中才能生效。

    • 示例:

      ## 以下配置必须在DataWorks配置项/spark-defaults.conf文件中添加
      
      ## 同时引用多个资源:同时引用public.python-python-2.7-ucs4.zip和public.myjar.jar
      spark.hadoop.odps.cupid.resources=public.python-python-2.7-ucs4.zip,public.myjar.jar
      
      ## 重命名示例:引用并将public.myjar.jar重命名为myjar.jar
      spark.hadoop.odps.cupid.resources=public.myjar.jar:myjar.jar
  • 在DataWorks中引用资源。

    • 将MaxCompute中的资源添加至DataWorks数据开发面板的业务流程中,详情请参见MaxCompute资源管理

    • 在DataWorks ODPS Spark节点中选择jar、file、archive资源。

    说明

    该方案在任务运行时会上传资源,对于较大资源建议采用方案一进行引用。

如何访问VPC?

当前Spark on MaxCompute支持以下两种方式来访问阿里云VPC中的服务:

  • 反向访问方式

    • 使用限制

      只能访问与MaxCompute相同Region的阿里云VPC。

    • 使用流程:

      1. 在要访问的服务中添加IP白名单,允许100.104.0.0/16网段的访问。

      2. 作业配置spark.hadoop.odps.cupid.vpc.domain.list参数。

        该配置描述了需要访问的一个或多个实例的网络情况。配置值为JSON格式,需要把JSON压缩成一行。示例如下,您需要将RegionID、VPCID、实例域名、端口等替换为实际值即可。

        ## 以下配置必须在DataWorks配置项/spark-defaults.conf文件中添加
        
        spark.hadoop.odps.cupid.vpc.domain.list={"regionId":"cn-beijing","vpcs":[{"vpcId":"vpc-2zeaeq21mb1dmkqh*****","zones":[{"urls":[{"domain":"dds-2ze3230cfea0*****.mongodb.rds.aliyuncs.com","port":3717},{"domain":"dds-2ze3230cfea0*****.mongodb.rds.aliyuncs.com","port":3717}]}]}]}
  • 通过ENI专线访问

    • 使用限制

      通过ENI专线可以打通一个相同Region的VPC,如果您的作业需要同时访问多个VPC,则可以将已经通过ENI专线打通的VPC与其他VPC之间再做打通即可。

    • 使用流程:

      1. 自助开通ENI专线,详情请参见Spark访问VPC实例

      2. 在要访问的服务中添加白名单,授权代表MaxCompute的安全组(即上一步中提供的安全组)能访问的具体端口。

        例如需要访问阿里云RDS,则需要在RDS中增加规则,允许第1步中创建的安全组访问。如果用户需要访问的服务无法添加安全组,只能添加IP,那么需要将第一步中所使用的vSwitch网段都添加进来。

      3. 作业配置spark.hadoop.odps.cupid.eni.infospark.hadoop.odps.cupid.eni.enable参数。

        使用示例如下,需要把RegionID和VPCID替换为实际值。

        ## 以下配置必须在DataWorks配置项/spark-defaults.conf文件中添加
        
        spark.hadoop.odps.cupid.eni.enable = true
        spark.hadoop.odps.cupid.eni.info = [regionid]:[vpcid]

如何访问公网?

当前Spark on MaxCompute支持以下两种方式来访问公网服务:

  • 通过SmartNAT访问

    假设需要访问https://aliyundoc.com:443,流程如下。

    1. 您可以通过申请链接或搜索(钉钉群号:11782920)加入MaxCompute开发者社区钉群联系MaxCompute技术支持团队将https://aliyundoc.com:443加入到odps.security.outbound.internetlist中。

    2. 使用如下命令示例配置Spark作业级别的公网访问白名单以及SmartNAT开关。

      ## 以下配置必须在DataWorks配置项/spark-defaults.conf文件中添加
      spark.hadoop.odps.cupid.internet.access.list=aliyundoc.com:443
      spark.hadoop.odps.cupid.smartnat.enable=true
  • 通过ENI专线访问

    1. 自助开通ENI专线,详情请参见Spark访问VPC实例

    2. 确保专线VPC有访问公网的能力,详情请参见使用公网NAT网关SNAT功能访问互联网

    3. 使用如下命令示例配置Spark作业级别的公网访问白名单以及ENI开关,需要把RegionID和VPCID替换为实际值。

       ## 以下配置必须在DataWorks配置项/spark-defaults.conf文件中添加
      spark.hadoop.odps.cupid.internet.access.list=aliyundoc.com:443
      spark.hadoop.odps.cupid.eni.enable=true
      spark.hadoop.odps.cupid.eni.info=[region]:[vpcid]

如何访问OSS?

当前Spark on MaxCompute支持使用Jindo SDK来访问阿里云OSS,需要配置以下信息:

  • 配置Jindo SDK及OSS Endpoint。

    命令示例如下。

    ## 引用JindoSDK Jar。以下配置必须在DataWorks配置项/spark-defaults.conf文件中添加
    spark.hadoop.odps.cupid.resources=public.jindofs-sdk-3.7.2.jar
    
    ## 设置OSS实现类。
    spark.hadoop.fs.AbstractFileSystem.oss.impl=com.aliyun.emr.fs.oss.OSS
    spark.hadoop.fs.oss.impl=com.aliyun.emr.fs.oss.JindoOssFileSystem
    
    ## 设置OSS Endpoint
    spark.hadoop.fs.oss.endpoint=oss-[YourRegionId]-internal.aliyuncs.com
    
    ## 通常无需设置OSS endpoint网络白名单,若作业运行过程中发现网络不通,可以需要通过以下参数添加白名单。
    ## 以下配置必须在DataWorks配置项/spark-defaults.conf文件中添加
    spark.hadoop.odps.cupid.trusted.services.access.list=[YourBucketName].oss-[YourRegionId]-internal.aliyuncs.com
    说明

    在Spark on MaxCompute集群模式运行时只支持OSS内网Endpoint,不支持外网Endpoint。OSS Region和Endpoint映射请参见OSS地域和访问域名

  • 配置OSS鉴权信息,当前Jindo SDK支持以下两种方式鉴权。

    • 使用AccessKey鉴权,配置示例如下:

      val conf = new SparkConf()
        .setAppName("jindo-sdk-demo")
        # 配置access-key鉴权参数
        .set("spark.hadoop.fs.oss.accessKeyId", "<YourAccessKeyId")
        .set("spark.hadoop.fs.oss.accessKeySecret", "<YourAccessKeySecret>")
    • 使用STS Token鉴权,使用流程如下:

      1. 单击一键授权,将当前云账号的OSS资源通过StsToken的方式授权给MaxCompute项目直接访问。

        说明

        当MaxCompute的ProjectOwner为OSS云账号时,才可以执行一键授权。

      2. 配置开启本地HTTP服务。

        命令示例如下。

        ## 以下配置必须在DataWorks配置项/spark-defaults.conf文件中添加
        spark.hadoop.odps.cupid.http.server.enable = true
      3. 配置鉴权信息。

        命令示例如下。

        val conf = new SparkConf()
          .setAppName("jindo-sdk-demo")
          # 配置云服务角色鉴权
          # ${aliyun-uid}是阿里云用户UID
          # ${role-name}是角色名称
          .set("spark.hadoop.fs.jfs.cache.oss.credentials.provider", "com.aliyun.emr.fs.auth.CustomCredentialsProvider")
          .set("spark.hadoop.aliyun.oss.provider.url", "http://localhost:10011/sts-token-info?user_id=${aliyun-uid}&role=${role-name}")

如何引用Python三方库?

  • 问题现象:PySpark作业运行时抛出No module named 'xxx'异常。

  • 问题原因:PySpark作业依赖Python第三方库,在当前MaxCompute平台默认的Python环境中尚未安装。

  • 解决方案:您可以采用以下几种方案添加第三方库依赖。

    • 直接使用MaxCompute Python公共环境。

      您只需要在DataWorks配置项或spark-defaults.conf文件中添加以下配置即可,不同Python版本配置如下:

      • Python 2配置

        ## Python 2.7.13 配置
        ## 以下配置必须在DataWorks配置项/spark-defaults.conf文件中添加
        spark.hadoop.odps.cupid.resources = public.python-2.7.13-ucs4.tar.gz
        spark.pyspark.python = ./public.python-2.7.13-ucs4.tar.gz/python-2.7.13-ucs4/bin/python
        
        ## 三方库列表
        https://odps-repo.oss-cn-hangzhou.aliyuncs.com/pyspark/py27/py27-default_req.txt.txt
      • Python 3配置

        ## Python 3.7.9 配置
        ## 以下配置必须在DataWorks配置项/spark-defaults.conf文件中添加
        spark.hadoop.odps.cupid.resources = public.python-3.7.9-ucs4.tar.gz
        spark.pyspark.python = ./public.python-3.7.9-ucs4.tar.gz/python-3.7.9-ucs4/bin/python3
        
        ## 三方库列表
        https://odps-repo.oss-cn-hangzhou.aliyuncs.com/pyspark/py37/py37-default_req.txt
    • 上传单个WHEEL包。

      该方案适用于Python三方依赖数量较少、较为简单的情况,命令示例如下。

      ##需要将wheel包重命名为zip包,例如将pymysql的wheel包重命名为pymysql.zip
      ##将重命名后的zip包上传(文件类型为archive)
      ##在Dataworks spark节点引用该zip包(archive类型)
      ##在spark-defaults.conf或dataworks配置项中添加配置以下后即可import
      ## 配置
      spark.executorEnv.PYTHONPATH=pymysql
      spark.yarn.appMasterEnv.PYTHONPATH=pymysql
      
      ## 上传代码
      import pymysql
    • 上传完整自定义Python环境。

      适用于依赖较为复杂或需要自定义Python版本的情况。您需利用Docker容器打包并上传完整Python环境,详情请参见Package依赖

如何解决Jar依赖冲突问题?

  • 问题现象:运行时抛出NoClassDefFoundError或NoSuchMethodError异常。

  • 问题原因:通常是由于Jar包中的第三方依赖与Spark依赖版本冲突,需要检查上传的主Jar包及第三方依赖库,排除冲突的依赖。

  • 解决方案:

    • Pom自检。

      • 将Spark社区版依赖设置为Provided。

      • 将Hadoop社区版依赖设置为Provided。

      • 将Odps/Cupid依赖设置为Provided。

    • 排除冲突的依赖。

    • 使用maven-shade-plugin relocation解决包冲突。

如何使用Local模式进行调试?

  • Spark 2.3.0

    1. 在spark-defaults.conf中添加以下配置。

      spark.hadoop.odps.project.name =<Yourprojectname>
      spark.hadoop.odps.access.id =<YourAccessKeyID>
      spark.hadoop.odps.access.key =<YourAccessKeySecret>
      spark.hadoop.odps.end.point =<endpoint>
    2. 使用Local模式运行任务。

      ./bin/spark-submit --master local spark_sql.py
  • Spark 2.4.5/Spark 3.1.1

    1. 创建odps.conf文件,并在文件中添加以下配置。

      odps.access.id=<YourAccessKeyID>
      odps.access.key=<YourAccessKeySecret>
      odps.end.point=<endpoint>
      odps.project.name=<Yourprojectname>
    2. 添加环境变量指向odps.conf文件位置。

      export ODPS_CONF_FILE=/path/to/odps.conf
    3. 使用Local模式运行任务。

      ./bin/spark-submit --master local spark_sql.py
  • 常见报错

    • 报错1:

      • 报错信息:

        • Incomplete config, no accessId or accessKey

        • Incomplete config, no odps.service.endpoint

      • 报错原因:在Local模式开启了EventLog。

      • 解决方案:将spark-defaults.conf中的spark.eventLog.enabled=true参数删除即可。

    • 报错2:

      • 报错信息:Cannot create CupidSession with empty CupidConf

      • 报错原因:Spark 2.4.5或Spark 3.1.1 无法读取odps.access.id等信息。

      • 解决方案:创建odps.conf文件,并添加环境变量后再运行任务

    • 报错3:

      • 报错信息:java.util.NoSuchElementException: odps.access.id

      • 报错原因:Spark 2.3.0 无法读取odps.access.id等信息。

      • 解决方案:在spark-defaults.conf中添加spark.hadoop.odps.access.id等配置信息。

运行Spark作业时,报错User signature does not match,如何解决?

  • 问题现象

    运行Spark作业时,返回报错如下。

    Stack:
    com.aliyun.odps.OdpsException: ODPS-0410042:
    Invalid signature value - User signature does not match
  • 产生原因

    身份验证未通过,AccessKey ID或AccessKey Secret有误。

  • 解决措施

    请检查spark-defaults.conf提供的AccessKey ID、AccessKey Secret和阿里云官网控制台用户信息管理中的AccessKey IDAccessKey Secret是否一致,如果不一致,请修改一致。

运行Spark作业时,报错You have NO privilege,如何解决?

  • 问题现象

    运行Spark作业时,返回报错如下。

    Stack:
    com.aliyun.odps.OdpsException: ODPS-0420095: 
    Access Denied - Authorization Failed [4019], You have NO privilege 'odps:CreateResource' on {acs:odps:*:projects/*}
  • 产生原因

    权限不足,需要申请权限。

  • 解决措施

    需要由项目所有者授予Resource的Read和Create权限。更多授权信息,请参见MaxCompute权限

运行Spark作业时,报错Access Denied,如何解决?

  • 问题现象

    运行Spark作业时,返回报错如下。

    Exception in thread "main" org.apache.hadoop.yarn.exceptions.YarnException: com.aliyun.odps.OdpsException: ODPS-0420095: Access Denied - The task is not in release range: CUPID
  • 产生原因

    • 原因一:Spark-defaults.conf中配置的AccessKey ID、AccessKey Secret不正确。

    • 原因二:项目所在的地域未提供Spark on MaxCompute服务。

  • 解决措施

    • 原因一的解决措施:检查Spark-defaults.conf配置信息,修改为正确的AccessKey ID、AccessKey Secret。更多信息,请参见搭建Linux开发环境

    • 原因二的解决措施:确认项目所在的地域是否已经提供了Spark on MaxCompute服务或加入钉钉群21969532(Spark on MaxCompute支持群)咨询。

运行Spark作业时,报错No space left on device,如何解决?

Spark使用网盘进行本地存储。Shuffle数据和BlockManager溢出的数据均存储在网盘上。网盘的大小通过spark.hadoop.odps.cupid.disk.driver.device_size参数控制,默认为20 GB,最大为100 GB。如果调整到100 GB仍然报此错误,需要分析具体原因。常见原因为数据倾斜,在Shuffle或者Cache过程中数据集中分布在某些Block。此时可以缩小单个Executor的核数(spark.executor.cores),增加Executor的数量(spark.executor.instances)。

运行Spark作业时,报错Table or view not found,如何解决?

  • 问题现象

    运行Spark作业时,返回报错如下。

    Table or view not found:xxx
  • 产生原因

    • 原因一:表或视图不存在。

    • 原因二:打开了Hive的catalog配置。

  • 解决措施

    • 原因一的解决措施:请创建表。

    • 原因二的解决措施:去掉catalog配置。报错示例如下,需要去掉enableHiveSupport()

      spark = SparkSession.builder.appName(app_name).enableHiveSupport().getOrCreate()

运行Spark作业时,报错Shutdown hook called before final status was reported,如何解决?

  • 问题现象

    运行Spark作业时,返回报错如下。

    App Status: SUCCEEDED, diagnostics: Shutdown hook called before final status was reported.
  • 产生原因

    提交到集群的user main并没有通过AM(ApplicationMaster)申请集群资源。例如,用户没有新建SparkContext或用户在代码中设置spark.master为local。

运行Spark作业时,发生JAR包版本冲突类错误,如何解决?

  • 问题现象

    运行Spark作业时,返回报错如下。

    User class threw exception: java.lang.NoSuchMethodError
  • 产生原因

    JAR包版本冲突或类错误。

  • 解决措施

    1. $SPARK_HOME/jars路径下找出异常类所在的JAR。

    2. 执行如下命令定位第三方库的坐标以及版本。

      grep <异常类类名> $SPARK_HOME/jars/*.jar
    3. 在Spark作业根目录下,执行如下命令查看整个工程的所有依赖。

      mvn dependency:tree
    4. 找到对应的依赖后,执行如下命令排除冲突包。

      maven dependency exclusions
    5. 重新编译并提交代码。

运行Spark作业时,报错ClassNotFound,如何解决?

  • 问题现象

    运行Spark作业时,返回报错如下。

    java.lang.ClassNotFoundException: xxxx.xxx.xxxxx
  • 产生原因

    类不存在或依赖配置错误。

  • 解决措施

    1. 执行如下命令查看您提交的JAR包中是否存在该类定义。

      jar -tf <作业JAR包> | grep <类名称>
    2. 检查pom.xml文件中的依赖是否正确。

    3. 使用Shade方式提交JAR包。

运行Spark作业时,报错The task is not in release range,如何解决?

  • 问题现象

    运行Spark作业时,返回报错如下。

    The task is not in release range: CUPID
  • 产生原因

    项目所在地域未开通Spark on MaxCompute服务。

  • 解决措施

    请您选择已经开启Spark on MaxCompute服务的地域使用。

运行Spark作业时,报错java.io.UTFDataFormatException,如何解决?

  • 问题现象

    运行Spark作业时,返回报错如下。

    java.io.UTFDataFormatException: encoded string too long: 2818545 bytes 
  • 解决措施

    调整spark-defaults.confspark.hadoop.odps.cupid.disk.driver.device_size参数的值。默认为20 GB,最大支持100 GB。

运行Spark作业时,打印的中文乱码,如何解决?

您需要添加如下配置。

"--conf" "spark.executor.extraJavaOptions=-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8"
"--conf" "spark.driver.extraJavaOptions=-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8"

Spark调用外网第三方任务时报错,如何解决?

Spark不能调用外网第三方任务,网络不通。

您可以在VPC中搭建Nginx反向代理,通过代理访问外网。Spark支持直接访问VPC,详情请参见Spark访问VPC实例