Spark-Submit命令行工具

本文主要介绍了如何操作Spark-Submit命令行工具以及相关示例。

重要

云原生数据湖分析(DLA)产品已退市,云原生数据仓库 AnalyticDB MySQL 版湖仓版支持DLA已有功能,并提供更多的功能和更好的性能。AnalyticDB for MySQL相关使用文档,请参见通过spark-submit命令行工具开发Spark应用

安装工具

  1. 获取SparkSubmit工具包

    您也可以通过wget的方式获取dla-spark-toolkit.tar.gz

    wget https://dla003.oss-cn-hangzhou.aliyuncs.com/dla_spark_toolkit/dla-spark-toolkit.tar.gz
  2. dla-spark-toolkit.tar.gz下载成功后,解压工具包。

    tar zxvf dla-spark-toolkit.tar.gz
    说明

    该工具包支持JDK8或以上版本。

参数配置

通过命令行配置conf/spark-defaults.conf的常用参数。spark-defaults.conf目前支持下列常用参数的配置:

#  cluster information
# AccessKeyId  
#keyId =
#  AccessKeySecret
#secretId =
#  RegionId
#regionId =
#  set vcName
#vcName =
#  set ossKeyId, if not set will use --keyId value or keyId value
#ossKeyId =
# set ossSecretId if not set will use --secretId value or secretId value
#ossSecretId =
#  set OssUploadPath, if you need upload local resource
#ossUploadPath =

##spark conf
#  driver specifications : small(1c4g) | medium (2c8g) | large (4c16g) | xalrge (8c32g)
#spark.driver.resourceSpec =
#  executor instance number
#spark.executor.instances =
#  executor specifications : small(1c4g) | medium (2c8g) | large (4c16g) | xalrge (8c32g)
#spark.executor.resourceSpec =
#  when use ram,  role arn
#spark.dla.roleArn =
#  config dla oss connectors
#spark.dla.connectors = oss
#  config eni, if you want to use eni
#spark.dla.eni.enable = true
#spark.dla.eni.vswitch.id =
#spark.dla.eni.security.group.id =
#  config log location, need an oss path to store logs
#spark.dla.job.log.oss.uri =
#  config spark read dla table when use option -f or -e
#spark.sql.hive.metastore.version = dla

## any other user defined spark conf...

其中keyIdsecretIdregionIdvcName必须要进行配置,参数说明如下:

参数名称

参数说明

keyId

您的阿里云AccessKeyId。

secretId

您的阿里云AccessKeySecret。

vcName

您的虚拟集群的名称。

regionId

您的虚拟集群所在的地域,地区和regionId的对照关系,请参见地域和可用区

您可以输入如下命令查看该工具的命令行使用帮助。

cd /path/to/dla-spark-toolkit
./bin/spark-submit --help

运行上述帮助命令后,结果如下所示:

Info: Usage: spark-submit [options] <app jar> [app arguments]
Usage: spark-submit --list [PAGE_NUMBER] [PAGE_SIZE]
Usage: spark-submit --kill [JOB_ID]
Info:
Options:
  --keyId                             Your ALIYUN_ACCESS_KEY_ID, is same as `keyId` in conf/spark-defaults.conf or --conf spark.dla.access.key.id=<value>, required
  --secretId                          Your ALIYUN_ACCESS_KEY_SECRET, is same as `secretId` in conf/spark-defaults.conf or --conf spark.dla.access.secret.id=<value>, required
  --regionId                          Your Cluster Region Id, is same as `regionId` in conf/spark-defaults.conf or --conf spark.dla.region.id=<value>,  required
  --vcName                            Your Virtual Cluster Name, is same as `vcName` in conf/spark-defaults.conf or --conf spark.dla.vc.name=<value>, required
  --oss-keyId                         Your ALIYUN_ACCESS_KEY_ID to upload local resource to oss,
                                      by default, the value will take from --keyId, is same as `ossKeyId` in conf/spark-defaults.conf or --conf spark.dla.oss.access.key.id=<value>
  --oss-secretId                      Your ALIYUN_ACCESS_KEY_SECRET to upload local resource to oss,
                                      default the value will take from --secretId, is same as `ossSecretId` in conf/spark-defaults.conf or --conf spark.dla.oss.access.secret.id=<value>
  --oss-endpoint                      Oss endpoint where the resource will upload. default is http://oss-$regionId.aliyuncs.com,
                                      is same as `ossEndpoint` in conf/spark-defaults.conf or --conf spark.dla.oss.endpoint=<value>
  --oss-upload-path                   The user oss path where the resource will upload
                                      If you want to upload a local jar package to the OSS directory,
                                      you need to specify this parameter. It is same as `ossUploadPath` in conf/spark-defaults.conf or --conf spark.dla.oss.upload.path=<value>
  --class CLASS_NAME                  Your application's main class (for Java / Scala apps).
  --name NAME                         A name of your application.
  --jars JARS                         Comma-separated list of jars to include on the driver
                                      and executor classpaths.
  --conf PROP=VALUE                   Arbitrary Spark configuration property, or you can set conf in conf/spark-defaults.conf
  --help, -h                          Show this help message and exit.
  --driver-resource-spec              Indicates the resource specifications used by the driver:
                                      small | medium | large | xlarge | 2xlarge
                                      you can also set this value through --conf spark.driver.resourceSpec=<value>
  --executor-resource-spec            Indicates the resource specifications used by the executor:
                                      small | medium | large | xlarge | 2xlarge
                                      you can also set this value through --conf spark.executor.resourceSpec=<value>
  --num-executors                     Number of executors to launch, you can also set this value through --conf spark.executor.instances=<value>
  --driver-memory MEM                 Memory for driver (e.g. 1000M, 2G)
                                      you can also set this value through --conf spark.driver.memory=<value>
  --driver-cores NUM                  Number of cores used by the driver
                                      you can also set this value through --conf spark.driver.cores=<value>
  --driver-java-options               Extra Java options to pass to the driver
                                      you can also set this value through --conf spark.driver.extraJavaOptions=<value>
  --executor-memory MEM               Memory per executor (e.g. 1000M, 2G)
                                      you can also set this value through --conf spark.executor.memory=<value>
  --executor-cores NUM                Number of cores per executor.
                                      you can also set this value through --conf spark.executor.cores=<value>
  --properties-file                   Spark default conf file location, only local files are supported, default conf/spark-defaults.conf
  --py-files PY_FILES                 Comma-separated list of .zip, .egg, or .py files to place
                                      on the PYTHONPATH for Python apps
  --files FILES                       Comma-separated list of files to be placed in the working
                                      directory of each executor. File paths of these files
                                      in executors can be accessed via SparkFiles.get(fileName).
                                      Specially, you can pass in a custom log output format file named `log4j.properties`
                                      Note: The file name must be `log4j.properties` to take effect
  --archives                          Comma separated list of archives to be extracted into the
                                      working directory of each executor. Support file types: zip、tgz、tar、tar.gz
  --status job_id                     If given, requests the status and details of the job specified
  --verbose                           Print additional debug output
  --version, -v                       Print out the dla-spark-toolkit version.

  List Spark Job Only:
  --list                              List Spark Job, should use specify --vcName and --regionId
  --pagenumber, -pn                   Set page number which want to list (default: 1)
  --pagesize, -ps                     Set page size which want to list (default: 1)

  Get Job Log Only:
  --get-log job_id                    Get job log

  Kill Spark Job Only:
  --kill job_id                       Specifies the jobid to be killed

  Spark Offline SQL options:
  -e <quoted-query-string>            SQL from command line. By default, use SubmitSparkSQL API to submit SQL, support set command to set spark conf.
                                      you can set --disable-submit-sql to submit an SQL job using the previous SubmitSparkJob API,
                                      which requires the user to specified the --oss-upload-path
  -f <filename>                       SQL from files. By default, use SubmitSparkSQL API to submit SQL, support set command to set spark conf.
                                      you can set --disable-submit-sql to submit an SQL job using the previous SubmitSparkJob API,
                                      which requires the user to specified the --oss-upload-path
  -d,--define <key=value>             Variable substitution to apply to spark sql
                                      commands. e.g. -d A=B or --define A=B
  --hivevar <key=value>               Variable substitution to apply to spark sql
                                      commands. e.g. --hivevar A=B
  --hiveconf <property=value>         Use value for given property, DLA spark toolkit will add `spark.hadoop.` prefix to property
  --database <databasename>           Specify the database to use
  --enable-inner-endpoint             It means that DLA pop SDK and OSS SDK will use the endpoint of Intranet to access DLA,
                                      you can turn on this option when you are on Alibaba cloud's ECS machine.


  Inner API options:
  --api-retry-times                   Specifies the number of retries that the client fails to call the API, default 3.
  --time-out-seconds                  Specifies the timeout for the API(time unit is second (s)), which is considered a call failure.
                                      default 10s.                            
说明

Spark-Submit工具的脚本将自动读取conf/spark-defaults.conf中的配置信息。如果您通过命令行修改了conf/spark-defaults.conf中的参数取值,Spark-Submit工具将获取命令行提交的参数值。

配置兼容性说明

为了方便兼容开源社区的Spark-Submit,以下非开源社区的选项也可以通过Spark Conf进行设置。

--keyId              #--conf spark.dla.access.key.id=<value>
--secretId           #--conf spark.dla.access.secret.id=<value>
--regionId           #--conf spark.dla.region.id=<value>
--vcName             #--conf spark.dla.vc.name=<value>
--oss-keyId          #--conf spark.dla.oss.access.key.id=<value>
--oss-secretId       #--conf spark.dla.oss.access.secret.id=<value>
--oss-endpoint       #--conf spark.dla.oss.endpoint=<value>
--oss-upload-path    #--conf spark.dla.oss.upload.path=<value>

以下选项暂不支持,或者对于DLA Spark无意义,它们的值会被忽略。

Useless options(these options will be ignored):
  --deploy-mode
  --master
  --packages, please use `--jars` instead
  --exclude-packages
  --proxy-user
  --repositories
  --keytab
  --principal
  --queue
  --total-executor-cores
  --driver-library-path
  --driver-class-path
  --supervise
  -S,--silent
  -i <filename>
                

由于DLA Spark的Driver和Executor是运行在弹性容器上的,而弹性容器只能选取某些固定的资源规格,DLA Spark目前支持的资源规格请参见Serverless Spark概述。因此,阿里云数据湖分析团队对Spark-Submit中设置资源规格参数的选项进行了处理。开源社区Spark-Submit和DLA-Spark-Toolkit表现不一致的参数说明如下表所示:

参数名称

说明

--driver-cores/--conf spark.driver.cores

设置Driver的核数。DLA-Spark-Toolkit会选择最接近用户指定的核数的资源规格并且该资源规格的核数大于等于用户指定的核数。

--driver-memory/--conf spark.driver.memory

设置Driver的内存。DLA-Spark-Toolkit会选择最接近用户指定的内存的资源规格并且该资源规格的内存大于等于用户指定的内存。

--executor-cores/--conf spark.executor.cores

设置Executor的核数。DLA-Spark-Toolkit会选择最接近用户指定的核数的资源规格并且该资源规格的核数大于等于用户指定的核数。

--executor-memory/--conf spark.executor.memory

设置Executor的内存。DLA-Spark-Toolkit会选择最接近用户指定的内存的资源规格并且该资源规格的内存大于等于用户指定的内存。

DLA-Spark-Toolkit其他独有的参数如下表所示:

参数名称

说明

--driver-resource-spec

指定Driver的资源规格。优先级高于--driver-cores/--conf spark.driver.cores,即如果同时指定--driver-resource-spec--driver-cores,则取--driver-resource-spec指定的资源规格。

--executor-resource-spec

指定Driver的资源规格。优先级高于--executor-cores/--conf spark.executor.cores

--api-retry-times

DLA-Spark-Toolkit内部执行命令失败时的重试次数(除提交命令外),提交任务不是一个幂等操作,由于网络超时等原因导致的提交失败,实际上任务可能在后台成功,为防止任务重复提交,提交任务失败将不会重试。您需要自行获取已经提交的任务列表(--list),或者去DLA Spark控制台查看任务列表判断任务是否提交成功。

--time-out-seconds

DLA-Spark-Toolkit内部默认的网络超时时间,默认为10,超时命令将会失败重试。

--enable-inner-endpoint

当用户处于阿里云ECS机器上时,可以指定此选项,DLA-Spark-Toolkit将使用内网环境来访问DLA Pop SDK和OSS SDK,内网环境比公网环境稳定。

--list

用于获取作业列表,常搭配 --pagesize和 --pagenumber使用,分别用于指定列表的页数和和一页显示的作业个数,默认值分别为1,10(代表返回第一页的10个作业)。

--kill

接收一个JobID作为参数,用于终止对应的JobID作业。

--get-log

接收一个JobID作为参数,用于获取对应的JobID作业的日志。

--status

接收一个JobID作为参数,用于获取对应的JobID作业的详细情况。

如何提交作业

  • 作业管理页面,提交Spark作业需要按照JSON格式,如下所示:

    {
        "name": "xxx",
        "file": "oss://{bucket-name}/jars/xxx.jar",
        "jars": "oss://{bucket-name}/jars/xxx.jar,oss://{bucket-name}/jars/xxx.jar"
        "className": "xxx.xxx.xxx.xxx.xxx",
        "args": [
            "xxx",
            "xxx"
        ],
        "conf": {
            "spark.executor.instances": "1",
            "spark.driver.resourceSpec": "medium",
            "spark.executor.resourceSpec": "medium",
            "spark.dla.job.log.oss.uri": "oss://{bucket-name}/path/to/log/"
        }
    }
  • 使用Spark-Submit工具,则按照如下格式提交作业:

    ./bin/spark-submit  \
    --class xxx.xxx.xxx.xxx.xxx \
    --verbose \
    --name xxx \
    --jars oss://{bucket-name}/jars/xxx.jar,oss://{bucket-name}/jars/xxx.jar
    --conf spark.driver.resourceSpec=medium \
    --conf spark.executor.instances=1 \
    --conf spark.executor.resourceSpec=medium \
    oss://{bucket-name}/jars/xxx.jar \
    args0 args1
    
    ##主程序文件,--jars指定的jar包, --py-files --files均支持本地文件路径和oss文件路径。
    ##指定的本地资源需要使用绝对路径,spark-submit会自动上传本地文件资源至用户指定的oss目录
    ##用户可使用 --oss-upload-path 或者在spark-defaults.conf中设置ossUploadPath的值来指定上传到oss的目录
    ##资源上传时,会使用md5校验文件内容,当指定的oss目录中有相同文件名且md5相同时,将不再重复上传。
    ##注意您如果手动更新了oss上传目录的jar包,请删除掉对应md5文件
    ##格式: --jars  /path/to/local/directory/XXX.jar,/path/to/local/directory/XXX.jar
    ##多个文件以逗号隔开, 文件指定绝对路径
    
    ## --jars, --py-files --files 也支持指定本地目录,会上传该目录下所有的文件(不递归上传子目录内容)
    ## 目录路径也需要指定绝对路径 如 --jars /path/to/local/directory/,/path/to/local/directory2/
    ## 多个目录以逗号隔开,目录使用绝对路径
    
    
    ##程序输出, 可通过查看下方输出的SparkUI连接访问作业的SparkUI, 和 JobDetail查看作业提交的参数是否符合预期
    Info: job status: starting
    Info: job status: starting
    Info: job status: starting
    Info: job status: starting
    Info: job status: starting
    Info: job status: starting
    Info: job status: starting
    Info: job status: starting
    Info: job status: starting
    Info: job status: starting
    Info: job status: starting
    Info: job status: running
    
    {
      "jobId": "",
      "jobName": "SparkPi",
      "status": "running",
      "detail": "",
      "sparkUI": "",
      "createTime": "2020-08-20 14:12:07",
      "updateTime": "2020-08-20 14:12:07",
      ...
    }
    Job Detail: {
      "name": "SparkPi",
      "className": "org.apache.spark.examples.SparkPi",
      "conf": {
        "spark.driver.resourceSpec": "medium",
        "spark.executor.instances": "1",
        "spark.executor.resourceSpec": "medium"
      },
      "file": "",
      "sparkUI":"https://xxx"  
    }

    spark-submit作业退出码说明如下:

    255    #代表作业执行失败
    0      #代表作业执行成功
    143    #代表作业被kill
    说明

如何结束Spark作业

执行如下命令即可结束Spark作业:

./spark-submit \
--kill <jobId>

结果如下:

## 打印结果
Info: kill job: jxxxxxxxx, response: null

如何查看Spark作业列表

您可以通过命令行的方式查看作业,例如指定查看任务列表中第1页,总共1个作业:

./bin/spark-submit \
--list --pagenumber 1 --pagesize 1                      

结果如下:

## 打印结果
{
  "requestId": "",
  "dataResult": {
    "pageNumber": "1",
    "pageSize": "1",
    "totalCount": "251",
    "jobList": [
      {
        "createTime": "2020-08-20 11:02:17",
        "createTimeValue": "1597892537000",
        "detail": "",
        "driverResourceSpec": "large",
        "executorInstances": "4",
        "executorResourceSpec": "large",
        "jobId": "",
        "jobName": "",
        "sparkUI": "",
        "status": "running",
        "submitTime": "2020-08-20 11:01:58",
        "submitTimeValue": "1597892518000",
        "updateTime": "2020-08-20 11:22:01",
        "updateTimeValue": "1597893721000",
        "vcName": ""
      }
    ]
  }
}                       

如何获取作业提交参数和SparkUI

执行如下命令即可获取:

./bin/spark-submit --status <jobId>

获取结果如下:

## 打印输出
Info: job status: success
Info:
{
  "jobId": "jxxxxxxxx",
  "jobName": "drop database if exists `",
  "status": "success",
  "detail": "xxxxxx",
  "sparkUI": "xxxxxxx",
  "createTime": "2021-05-08 20:02:28",
  "updateTime": "2021-05-08 20:04:05",
  "submitTime": "2021-05-08 20:02:28",
  "createTimeValue": "1620475348180",
  "updateTimeValue": "1620475445000",
  "submitTimeValue": "1620475348180",
  "vcName": "release-test"
}
Info: Job Detail:
set spark.sql.hive.metastore.version=dla;
set spark.dla.connectors=oss;
set spark.executor.instances=1;
set spark.sql.hive.metastore.version = dla;
set spark.dla.eni.enable = true;
set spark.dla.eni.security.group.id = xxxx ;
set spark.dla.eni.vswitch.id = xxxxx;

drop database if exists `my_hdfs_db_1` CASCADE;

如何获取作业日志

执行如下命令即可获取:

./bin/spark-submit --get-log <jobId>

获取结果如下:

##打印结果
20/08/20 06:24:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
20/08/20 06:24:58 INFO SparkContext: Running Spark version 2.4.5
20/08/20 06:24:58 INFO SparkContext: Submitted application: Spark Pi
20/08/20 06:24:58 INFO SecurityManager: Changing view acls to: spark
20/08/20 06:24:58 INFO SecurityManager: Changing modify acls to: spark
20/08/20 06:24:58 INFO SecurityManager: Changing view acls groups to: 
20/08/20 06:24:58 INFO SecurityManager: Changing modify acls groups to: 
20/08/20 06:24:58 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(spark); groups with view permissions: Set(); users  with modify permissions: Set(spark); groups with modify permissions: Set()
...