本文主要介绍如何操作工具Spark-Submit以及相关示例。

前提条件

获取SparkSubmit工具包

您可以点击下载获取dla-spark-toolkit.tar.gz,或者您可以通过wget的方式进行下载。
wget https://dla003.oss-cn-hangzhou.aliyuncs.com/dla_spark_toolkit_1/dla-spark-toolkit.tar.gz
下载成功后,解压工具包。
tar zxvf dla-spark-toolkit.tar.gz
说明 本工具包要求JDK8或以上版本

操作步骤

  1. 查看帮助说明
    • 输入如下命令,查看使用帮助。
      cd /path/to/dla-spark-toolkit
      ./bin/spark-submit --help
    • 运行帮助命令后,结果如下所示:
      Usage: spark-submit [options] <app jar> [app arguments]
      Usage: spark-submit --list [PAGE_NUMBER] [PAGE_SIZE]
      Usage: spark-submit --kill [JOB_ID] 
      
      Options:
        --keyId                     Your ALIYUN_ACCESS_KEY_ID, required
        --secretId                  Your ALIYUN_ACCESS_KEY_SECRET, required
        --regionId                  Your Cluster Region Id, required
        --vcName                    Your Virtual Cluster Name, required
        --oss-keyId                 Your ALIYUN_ACCESS_KEY_ID to upload local resource to oss.
                                    The default is the same as --keyId
        --oss-secretId              Your ALIYUN_ACCESS_KEY_SECRET, the default is the same as --secretId
        --oss-endpoint              Oss endpoint where the resource will upload. The default is http://oss-$regionId.aliyuncs.com
        --oss-upload-path           The user oss path where the resource will upload
                                    If you want to upload a local jar package to the OSS directory,
                                    you need to specify this parameter
      
      
      
        --class CLASS_NAME          Your application's main class (for Java / Scala apps).
        --name NAME                 A name of your application.
        --jars JARS                 Comma-separated list of jars to include on the driver
                                    and executor classpaths.
      
        --conf PROP=VALUE           Arbitrary Spark configuration property
        --help, -h                  Show this help message and exit.
        --driver-resource-spec      Indicates the resource specifications used by the driver:
                                    small | medium | large
        --executor-resource-spec    Indicates the resource specifications used by the executor:
                                    small | medium | large
        --num-executors             Number of executors to launch
        --properties-file           spark-defaults.conf properties file location, only local files are supported
                                    The default is ${SPARK_SUBMIT_TOOL_HOME}/conf/spark-defaults.conf
        --py-files PY_FILES         Comma-separated list of .zip, .egg, or .py files to place
                                    on the PYTHONPATH for Python apps.
      
        --status job_id             If given, requests the status and details of the job specified
        --verbose                   print more messages, enable spark-submit print job status and more job details.
      
        List Spark Job Only:
        --list                      List Spark Job, should use specify --vcName and --regionId
        --pagenumber, -pn           Set page number which want to list (default: 1)
        --pagesize, -ps             Set page size which want to list (default: 10)
      
        Get Job Log Only:
        --get-log job_id            Get job log
      
        Kill Spark Job Only:
        --kill job_id,job_id        Comma-separated list of job to kill spark job with specific ids
      
        Spark Offline SQL options:
        -e <quoted-query-string>    SQL from command line
        -f <filename>               SQL from files
    • spark-submit作业退出码说明
      255    #代表作业执行失败
      0      #代表作业执行成功
      143    #代表作业被kill
  2. 使用spark-defaults.conf 配置常用参数
    spark-defaults.conf 目前支持下列类型参数的配置(其中spark conf,只列出了常用配置):
    #  cluster information
    # AccessKeyId
    #keyId =
    #  AccessKeySecret
    #secretId =
    #  RegionId
    #regionId =
    #  set vcName
    #vcName =
    #  set OssUploadPath, if you need upload local resource
    #ossUploadPath =
    
    ##spark conf
    #  driver specifications : small 1c4g | medium 2c8g | large 4c16g
    #spark.driver.resourceSpec =
    #  executor instance number
    #spark.executor.instances =
    #  executor specifications : small 1c4g | medium 2c8g | large 4c16g
    #spark.executor.resourceSpec =
    #  when use ram,  role arn
    #spark.dla.roleArn =
    #  when use option -f or -e, set catalog implementation
    #spark.sql.catalogImplementation =
    #  config dla oss connectors
    #spark.dla.connectors = oss
    #  config eni, if you want to use eni
    #spark.dla.eni.enable =
    #spark.dla.eni.vswitch.id =
    #spark.dla.eni.security.group.id =
    #  config log location, need an oss path to store logs
    #spark.dla.job.log.oss.uri =
    #  config spark read dla table
    #spark.sql.hive.metastore.version = dla
    说明
    • spark-submit脚本将自动读取conf/spark-defaults.conf中的配置文件。
    • 命令行中参数的优先级要高于conf/spark-defaults.conf中的配置文件。
    • 地区和regionId的对照关系请参见对照关系
  3. 提交作业

    详细操作步骤请参见创建和执行Spark作业

    使用spark-submit工具前,提交Spark作业需要按照JSON格式,如下所示:
    {
        "name": "xxx",
        "file": "oss://{bucket-name}/jars/xxx.jar",
        "jars": "oss://{bucket-name}/jars/xxx.jar,oss://{bucket-name}/jars/xxx.jar"
        "className": "xxx.xxx.xxx.xxx.xxx",
        "args": [
            "xxx",
            "xxx"
        ],
        "conf": {
            "spark.executor.instances": "1",
            "spark.driver.resourceSpec": "medium",
            "spark.executor.resourceSpec": "medium",
            "spark.dla.job.log.oss.uri": "oss://{bucket-name}/path/to/log/"
        }
    }
    使用spark-submit工具后,可以按照如下格式提交:
    $ ./bin/spark-submit  \
    --class xxx.xxx.xxx.xxx.xxx \
    --verbose \
    --name xxx \
    --jars oss://{bucket-name}/jars/xxx.jar,oss://{bucket-name}/jars/xxx.jar
    --conf spark.driver.resourceSpec=medium \
    --conf spark.executor.instances=1 \
    --conf spark.executor.resourceSpec=medium \
    oss://{bucket-name}/jars/xxx.jar \
    xxx xxx
    
    ## --verbose 参数用于在作业提交后,打印作业提交的相关参数和执行状态
    
    ##主程序文件,--jars指定的jar包, --py-files均支持本地文件路径和oss文件路径。
    ##指定的本地资源需要使用绝对路径,spark-submit会自动上传本地文件资源至用户指定的oss目录
    ##用户可使用 --oss-upload-path 或者在spark-defaults.conf中设置ossUploadPath的值来指定上传到oss的目录
    ##资源上传时,会使用md5校验文件内容,当指定的oss目录中有相同文件名且md5相同时,将不再重复上传。
    ##注意您如果手动更新了oss上传目录的jar包,请删除掉对应md5文件
    ##格式: --jars  /path/to/local/directory/XXX.jar,/path/to/local/directory/XXX.jar
    ##多个文件以逗号隔开, 文件指定绝对路径
    
    ## --jars, --py-files 也支持指定本地目录,会上传该目录下所有的文件(不递归上传子目录内容)
    ## 目录路径也需要指定绝对路径 如 --jars /path/to/local/directory/,/path/to/local/directory2/
    ## 多个目录以逗号隔开,目录使用绝对路径
    
    
    ##程序输出, 可通过查看下方输出的SparkUI连接访问作业的SparkUI, 和 JobDetail查看作业提交的参数是否符合预期
    job status: starting
    job status: starting
    job status: starting
    job status: starting
    job status: starting
    job status: starting
    job status: running
    {
      "jobId": "",
      "jobName": "SparkPi",
      "status": "running",
      "detail": "",
      "sparkUI": "",
      "createTime": "2020-08-20 14:12:07",
      "updateTime": "2020-08-20 14:12:07",
      ...
    }
    Job Detail: {
      "name": "SparkPi",
      "className": "org.apache.spark.examples.SparkPi",
      "conf": {
        "spark.driver.resourceSpec": "medium",
        "spark.executor.instances": "1",
        "spark.executor.resourceSpec": "medium"
      },
      "file": ""
    }
    说明
    • 使用子账号AccessKeyId、AccessKeySecret提交作业请参考这里
    • 由于spark-submit需要上传本地jar包时,需要子账户拥有对OSS的访问权限,用户可以在RAM访问控制->用户为相应子账户添加AliyunOSSFullAccess权限
  4. 结束作业
    执行如下所示命令即可结束作业:
    $ ./spark-submit \
    --kill <jobId>
    
    ## 打印结果
    {"data":"deleted"}
  5. 查看作业列表
    您可以通过命令行的方式查看作业,例如指定查看任务列表中第一页,总共1个作业:
    $ ./bin/spark-submit \
    --list --pagenumber 1 --pagesize 1
    
    ## 打印结果
    {
      "requestId": "",
      "dataResult": {
        "pageNumber": "1",
        "pageSize": "1",
        "totalCount": "251",
        "jobList": [
          {
            "createTime": "2020-08-20 11:02:17",
            "createTimeValue": "1597892537000",
            "detail": "",
            "driverResourceSpec": "large",
            "executorInstances": "4",
            "executorResourceSpec": "large",
            "jobId": "",
            "jobName": "",
            "sparkUI": "",
            "status": "running",
            "submitTime": "2020-08-20 11:01:58",
            "submitTimeValue": "1597892518000",
            "updateTime": "2020-08-20 11:22:01",
            "updateTimeValue": "1597893721000",
            "vcName": ""
          }
        ]
      }
    }
                            
  6. 获取作业提交参数和SparkUI
    ./bin/spark-submit --status <jobId>
    
    ## 打印输出
    Status: success
    job status: success
    {
      "jobId": "",
      "jobName": "SparkPi",
      "status": "success",
      "detail": "",
      "sparkUI": "",
      "createTime": "2020-08-20 14:12:07",
      "updateTime": "2020-08-20 14:12:33",
      "submitTime": "2020-08-20 14:11:49",
      "createTimeValue": "1597903927000",
      "updateTimeValue": "1597903953000",
      "submitTimeValue": "1597903909000",
      "vcName": "",
      "driverResourceSpec": "medium",
      "executorResourceSpec": "medium",
      "executorInstances": "1"
    }
  7. 获取作业的日志
    ./bin/spark-submit --get-log <jobId>
    
    ##打印结果
    20/08/20 06:24:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    20/08/20 06:24:58 INFO SparkContext: Running Spark version 2.4.5
    20/08/20 06:24:58 INFO SparkContext: Submitted application: Spark Pi
    20/08/20 06:24:58 INFO SecurityManager: Changing view acls to: spark
    20/08/20 06:24:58 INFO SecurityManager: Changing modify acls to: spark
    20/08/20 06:24:58 INFO SecurityManager: Changing view acls groups to: 
    20/08/20 06:24:58 INFO SecurityManager: Changing modify acls groups to: 
    20/08/20 06:24:58 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(spark); groups with view permissions: Set(); users  with modify permissions: Set(spark); groups with modify permissions: Set()
    ...