本文主要介绍如何操作工具Spark-Submit以及相关示例。

前提条件

获取SparkSubmit工具包

您可以点击下载获取dla-spark-toolkit.tar.gz,或者您可以通过wget的方式进行下载。
wget https://dla003.oss-cn-hangzhou.aliyuncs.com/dla_spark_toolkit_1/dla-spark-toolkit.tar.gz
下载成功后,解压工具包。
tar zxvf dla-spark-toolkit.tar.gz
说明 本工具包要求JDK8或以上版本

操作步骤

  1. 查看帮助说明
    • 输入如下命令,查看使用帮助。
      cd /path/to/dla-spark-toolkit
      ./bin/spark-submit --help
    • 运行帮助命令后,结果如下所示:
      Usage: spark-submit [options] <app jar> [app arguments]
      Usage: spark-submit --list [PAGE_NUMBER] [PAGE_SIZE]
      Usage: spark-submit --kill [JOB_ID]
      
      Options:
        --keyId                     Your ALIYUN_ACCESS_KEY_ID, is same as `keyId` in conf/spark-defaults.conf, required
        --secretId                  Your ALIYUN_ACCESS_KEY_SECRET, is same as `secretId` in conf/spark-defaults.conf, required
        --regionId                  Your Cluster Region Id, is same as `regionId` in conf/spark-defaults.conf,  required
        --vcName                    Your Virtual Cluster Name, is same as `vcName` in conf/spark-defaults.conf, required
        --oss-keyId                 Your ALIYUN_ACCESS_KEY_ID to upload local resource to oss,
                                    default the value will take from --keyId, is same as `ossKeyId` in conf/spark-defaults.conf
        --oss-secretId              Your ALIYUN_ACCESS_KEY_SECRET to upload local resource to oss,
                                    default the value will take from --secretId, is same as `ossSecretId` in conf/spark-defaults.conf
        --oss-endpoint              Oss endpoint where the resource will upload. default is http://oss-$regionId.aliyuncs.com,
                                    is same as `ossEndpoint` in conf/spark-defaults.conf
        --oss-upload-path           the user oss path where the resource will upload
                                    If you want to upload a local jar package to the OSS directory,
                                    you need to specify this parameter. It is same as `ossEndpoint` in conf/spark-defaults.conf
      
      
      
        --class CLASS_NAME          Your application's main class (for Java / Scala apps).
        --name NAME                 A name of your application.
        --jars JARS                 Comma-separated list of jars to include on the driver
                                    and executor classpaths.
      
        --conf PROP=VALUE           Arbitrary Spark configuration property, or you can set conf in conf/spark-defaults.conf
        --help, -h                  Show this help message and exit.
        --driver-resource-spec      Indicates the resource specifications used by the driver:
                                    small | medium | large | xlarge | 2xlarge
        --executor-resource-spec    Indicates the resource specifications used by the executor:
                                    small | medium | large | xlarge | 2xlarge
        --num-executors             Number of executors to launch
        --properties-file           Spark default conf file location, only local files are supported, default conf/spark-defaults.conf
        --py-files PY_FILES         Comma-separated list of .zip, .egg, or .py files to place
                                    on the PYTHONPATH for Python apps.
        --files FILES               Comma-separated list of files to be placed in the working
                                    directory of each executor. File paths of these files
                                    in executors can be accessed via SparkFiles.get(fileName).
                                    Specially, you can pass in a custom log output format file named `log4j.properties`
                                    Note: The file name must be `log4j.properties` to take effect
      
        --archives                  Comma separated list of archives to be extracted into the
                                    working directory of each executor. Support file types: zip、tgz、tar、tar.gz
      
        --status job_id             If given, requests the status and details of the job specified
        --verbose                   print more messages, enable spark-submit print job status and more job details.
      
        List Spark Job Only:
        --list                      List Spark Job, should use specify --vcName and --regionId
        --pagenumber, -pn           Set page number which want to list (default: 1)
        --pagesize, -ps             Set page size which want to list (default: 1)
      
        Get Job Log Only:
        --get-log job_id            Get job log
      
        Kill Spark Job Only:
        --kill job_id,job_id        Comma-separated list of job to kill spark job with specific ids
      
        Spark Offline SQL options:
        -e <quoted-query-string>   SQL from command line
        -f <filename>              SQL from files
        --sql-size-threshold       When SQL size more than 1024 bytes, we will upload it to
                                   ossUploadPath(required, you can set it through --oss-upload-path)
                                   you can adjust the length threshold by --sql-size-threshold.
      
        Some Other options:
        --api-retry-times          Specifies the number of retries that the client fails to call the API, default 3.
        --time-out-seconds         Specifies the timeout for the API(time unit is second (s)), which is considered a call failure.
                                   default 10.
      
    • spark-submit作业退出码说明
      255    #代表作业执行失败
      0      #代表作业执行成功
      143    #代表作业被kill
  2. 使用spark-defaults.conf 配置常用参数
    spark-defaults.conf 目前支持下列类型参数的配置(其中spark conf,只列出了常用配置):
    #  cluster information
    # AccessKeyId
    #keyId =
    #  AccessKeySecret
    #secretId =
    #  RegionId
    #regionId =
    #  set vcName
    #vcName =
    #  set ossKeyId, if not set will use --keyId value or keyId value
    #ossKeyId =
    # set ossSecretId if not set will use --secretId value or secretId value
    #ossSecretId =
    #  set OssUploadPath, if you need upload local resource
    #ossUploadPath =
    
    ##spark conf
    #  driver specifications : small(1c4g) | medium (2c8g) | large (4c16g) | xalrge (8c32g)
    #spark.driver.resourceSpec =
    #  executor instance number
    #spark.executor.instances =
    #  executor specifications : small(1c4g) | medium (2c8g) | large (4c16g) | xalrge (8c32g)
    #spark.executor.resourceSpec =
    #  when use ram,  role arn
    #spark.dla.roleArn =
    #  config dla oss connectors
    #spark.dla.connectors = oss
    #  config eni, if you want to use eni
    #spark.dla.eni.enable = true
    #spark.dla.eni.vswitch.id =
    #spark.dla.eni.security.group.id =
    #  config log location, need an oss path to store logs
    #spark.dla.job.log.oss.uri =
    #  config spark read dla table when use option -f or -e
    #spark.sql.hive.metastore.version = dla
    
    ## any other user defined spark conf...
    说明
    • spark-submit脚本将自动读取conf/spark-defaults.conf中的配置文件。
    • 命令行中参数的优先级要高于conf/spark-defaults.conf中的配置文件。
    • 地区和regionId的对照关系请参见对照关系
  3. 提交作业

    详细操作步骤请参见创建和执行Spark作业

    使用spark-submit工具前,提交Spark作业需要按照JSON格式,如下所示:
    {
        "name": "xxx",
        "file": "oss://{bucket-name}/jars/xxx.jar",
        "jars": "oss://{bucket-name}/jars/xxx.jar,oss://{bucket-name}/jars/xxx.jar"
        "className": "xxx.xxx.xxx.xxx.xxx",
        "args": [
            "xxx",
            "xxx"
        ],
        "conf": {
            "spark.executor.instances": "1",
            "spark.driver.resourceSpec": "medium",
            "spark.executor.resourceSpec": "medium",
            "spark.dla.job.log.oss.uri": "oss://{bucket-name}/path/to/log/"
        }
    }
    使用spark-submit工具后,可以按照如下格式提交:
    $ ./bin/spark-submit  \
    --class xxx.xxx.xxx.xxx.xxx \
    --verbose \
    --name xxx \
    --jars oss://{bucket-name}/jars/xxx.jar,oss://{bucket-name}/jars/xxx.jar
    --conf spark.driver.resourceSpec=medium \
    --conf spark.executor.instances=1 \
    --conf spark.executor.resourceSpec=medium \
    oss://{bucket-name}/jars/xxx.jar \
    xxx xxx
    
    ## --verbose 参数用于在作业提交后,打印作业提交的相关参数和执行状态
    
    ##主程序文件,--jars指定的jar包, --py-files --files均支持本地文件路径和oss文件路径。
    ##指定的本地资源需要使用绝对路径,spark-submit会自动上传本地文件资源至用户指定的oss目录
    ##用户可使用 --oss-upload-path 或者在spark-defaults.conf中设置ossUploadPath的值来指定上传到oss的目录
    ##资源上传时,会使用md5校验文件内容,当指定的oss目录中有相同文件名且md5相同时,将不再重复上传。
    ##注意您如果手动更新了oss上传目录的jar包,请删除掉对应md5文件
    ##格式: --jars  /path/to/local/directory/XXX.jar,/path/to/local/directory/XXX.jar
    ##多个文件以逗号隔开, 文件指定绝对路径
    
    ## --jars, --py-files --files 也支持指定本地目录,会上传该目录下所有的文件(不递归上传子目录内容)
    ## 目录路径也需要指定绝对路径 如 --jars /path/to/local/directory/,/path/to/local/directory2/
    ## 多个目录以逗号隔开,目录使用绝对路径
    
    
    ##程序输出, 可通过查看下方输出的SparkUI连接访问作业的SparkUI, 和 JobDetail查看作业提交的参数是否符合预期
    job status: starting
    job status: starting
    job status: starting
    job status: starting
    job status: starting
    job status: starting
    job status: running
    {
      "jobId": "",
      "jobName": "SparkPi",
      "status": "running",
      "detail": "",
      "sparkUI": "",
      "createTime": "2020-08-20 14:12:07",
      "updateTime": "2020-08-20 14:12:07",
      ...
    }
    Job Detail: {
      "name": "SparkPi",
      "className": "org.apache.spark.examples.SparkPi",
      "conf": {
        "spark.driver.resourceSpec": "medium",
        "spark.executor.instances": "1",
        "spark.executor.resourceSpec": "medium"
      },
      "file": ""
    }
    说明
    • 使用子账号AccessKeyId、AccessKeySecret提交作业请参考这里
    • 由于spark-submit需要上传本地jar包时,需要子账户拥有对OSS的访问权限,用户可以在RAM访问控制->用户为相应子账户添加AliyunOSSFullAccess权限
  4. 结束作业
    执行如下所示命令即可结束作业:
    $ ./spark-submit \
    --kill <jobId>
    
    ## 打印结果
    {"data":"deleted"}
  5. 查看作业列表
    您可以通过命令行的方式查看作业,例如指定查看任务列表中第一页,总共1个作业:
    $ ./bin/spark-submit \
    --list --pagenumber 1 --pagesize 1
    
    ## 打印结果
    {
      "requestId": "",
      "dataResult": {
        "pageNumber": "1",
        "pageSize": "1",
        "totalCount": "251",
        "jobList": [
          {
            "createTime": "2020-08-20 11:02:17",
            "createTimeValue": "1597892537000",
            "detail": "",
            "driverResourceSpec": "large",
            "executorInstances": "4",
            "executorResourceSpec": "large",
            "jobId": "",
            "jobName": "",
            "sparkUI": "",
            "status": "running",
            "submitTime": "2020-08-20 11:01:58",
            "submitTimeValue": "1597892518000",
            "updateTime": "2020-08-20 11:22:01",
            "updateTimeValue": "1597893721000",
            "vcName": ""
          }
        ]
      }
    }
                            
  6. 获取作业提交参数和SparkUI
    ./bin/spark-submit --status <jobId>
    
    ## 打印输出
    Status: success
    job status: success
    {
      "jobId": "",
      "jobName": "SparkPi",
      "status": "success",
      "detail": "",
      "sparkUI": "",
      "createTime": "2020-08-20 14:12:07",
      "updateTime": "2020-08-20 14:12:33",
      "submitTime": "2020-08-20 14:11:49",
      "createTimeValue": "1597903927000",
      "updateTimeValue": "1597903953000",
      "submitTimeValue": "1597903909000",
      "vcName": "",
      "driverResourceSpec": "medium",
      "executorResourceSpec": "medium",
      "executorInstances": "1"
    }
  7. 获取作业的日志
    ./bin/spark-submit --get-log <jobId>
    
    ##打印结果
    20/08/20 06:24:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    20/08/20 06:24:58 INFO SparkContext: Running Spark version 2.4.5
    20/08/20 06:24:58 INFO SparkContext: Submitted application: Spark Pi
    20/08/20 06:24:58 INFO SecurityManager: Changing view acls to: spark
    20/08/20 06:24:58 INFO SecurityManager: Changing modify acls to: spark
    20/08/20 06:24:58 INFO SecurityManager: Changing view acls groups to: 
    20/08/20 06:24:58 INFO SecurityManager: Changing modify acls groups to: 
    20/08/20 06:24:58 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(spark); groups with view permissions: Set(); users  with modify permissions: Set(spark); groups with modify permissions: Set()
    ...