本文主要介绍了如何操作Spark-Submit命令行工具以及相关示例。
云原生数据湖分析(DLA)产品已退市,云原生数据仓库 AnalyticDB MySQL 版湖仓版支持DLA已有功能,并提供更多的功能和更好的性能。AnalyticDB for MySQL相关使用文档,请参见通过spark-submit命令行工具开发Spark应用。
安装工具
您也可以通过wget的方式获取dla-spark-toolkit.tar.gz。
wget https://dla003.oss-cn-hangzhou.aliyuncs.com/dla_spark_toolkit/dla-spark-toolkit.tar.gz
dla-spark-toolkit.tar.gz下载成功后,解压工具包。
tar zxvf dla-spark-toolkit.tar.gz
说明该工具包支持JDK8或以上版本。
参数配置
通过命令行配置conf/spark-defaults.conf的常用参数。spark-defaults.conf目前支持下列常用参数的配置:
# cluster information
# AccessKeyId
#keyId =
# AccessKeySecret
#secretId =
# RegionId
#regionId =
# set vcName
#vcName =
# set ossKeyId, if not set will use --keyId value or keyId value
#ossKeyId =
# set ossSecretId if not set will use --secretId value or secretId value
#ossSecretId =
# set OssUploadPath, if you need upload local resource
#ossUploadPath =
##spark conf
# driver specifications : small(1c4g) | medium (2c8g) | large (4c16g) | xalrge (8c32g)
#spark.driver.resourceSpec =
# executor instance number
#spark.executor.instances =
# executor specifications : small(1c4g) | medium (2c8g) | large (4c16g) | xalrge (8c32g)
#spark.executor.resourceSpec =
# when use ram, role arn
#spark.dla.roleArn =
# config dla oss connectors
#spark.dla.connectors = oss
# config eni, if you want to use eni
#spark.dla.eni.enable = true
#spark.dla.eni.vswitch.id =
#spark.dla.eni.security.group.id =
# config log location, need an oss path to store logs
#spark.dla.job.log.oss.uri =
# config spark read dla table when use option -f or -e
#spark.sql.hive.metastore.version = dla
## any other user defined spark conf...
其中keyId
、secretId
、regionId
、vcName
必须要进行配置,参数说明如下:
参数名称 | 参数说明 |
keyId | 您的阿里云AccessKeyId。 |
secretId | 您的阿里云AccessKeySecret。 |
vcName | 您的虚拟集群的名称。 |
regionId | 您的虚拟集群所在的地域,地区和regionId的对照关系,请参见地域和可用区。 |
您可以输入如下命令查看该工具的命令行使用帮助。
cd /path/to/dla-spark-toolkit
./bin/spark-submit --help
运行上述帮助命令后,结果如下所示:
Info: Usage: spark-submit [options] <app jar> [app arguments]
Usage: spark-submit --list [PAGE_NUMBER] [PAGE_SIZE]
Usage: spark-submit --kill [JOB_ID]
Info:
Options:
--keyId Your ALIYUN_ACCESS_KEY_ID, is same as `keyId` in conf/spark-defaults.conf or --conf spark.dla.access.key.id=<value>, required
--secretId Your ALIYUN_ACCESS_KEY_SECRET, is same as `secretId` in conf/spark-defaults.conf or --conf spark.dla.access.secret.id=<value>, required
--regionId Your Cluster Region Id, is same as `regionId` in conf/spark-defaults.conf or --conf spark.dla.region.id=<value>, required
--vcName Your Virtual Cluster Name, is same as `vcName` in conf/spark-defaults.conf or --conf spark.dla.vc.name=<value>, required
--oss-keyId Your ALIYUN_ACCESS_KEY_ID to upload local resource to oss,
by default, the value will take from --keyId, is same as `ossKeyId` in conf/spark-defaults.conf or --conf spark.dla.oss.access.key.id=<value>
--oss-secretId Your ALIYUN_ACCESS_KEY_SECRET to upload local resource to oss,
default the value will take from --secretId, is same as `ossSecretId` in conf/spark-defaults.conf or --conf spark.dla.oss.access.secret.id=<value>
--oss-endpoint Oss endpoint where the resource will upload. default is http://oss-$regionId.aliyuncs.com,
is same as `ossEndpoint` in conf/spark-defaults.conf or --conf spark.dla.oss.endpoint=<value>
--oss-upload-path The user oss path where the resource will upload
If you want to upload a local jar package to the OSS directory,
you need to specify this parameter. It is same as `ossUploadPath` in conf/spark-defaults.conf or --conf spark.dla.oss.upload.path=<value>
--class CLASS_NAME Your application's main class (for Java / Scala apps).
--name NAME A name of your application.
--jars JARS Comma-separated list of jars to include on the driver
and executor classpaths.
--conf PROP=VALUE Arbitrary Spark configuration property, or you can set conf in conf/spark-defaults.conf
--help, -h Show this help message and exit.
--driver-resource-spec Indicates the resource specifications used by the driver:
small | medium | large | xlarge | 2xlarge
you can also set this value through --conf spark.driver.resourceSpec=<value>
--executor-resource-spec Indicates the resource specifications used by the executor:
small | medium | large | xlarge | 2xlarge
you can also set this value through --conf spark.executor.resourceSpec=<value>
--num-executors Number of executors to launch, you can also set this value through --conf spark.executor.instances=<value>
--driver-memory MEM Memory for driver (e.g. 1000M, 2G)
you can also set this value through --conf spark.driver.memory=<value>
--driver-cores NUM Number of cores used by the driver
you can also set this value through --conf spark.driver.cores=<value>
--driver-java-options Extra Java options to pass to the driver
you can also set this value through --conf spark.driver.extraJavaOptions=<value>
--executor-memory MEM Memory per executor (e.g. 1000M, 2G)
you can also set this value through --conf spark.executor.memory=<value>
--executor-cores NUM Number of cores per executor.
you can also set this value through --conf spark.executor.cores=<value>
--properties-file Spark default conf file location, only local files are supported, default conf/spark-defaults.conf
--py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to place
on the PYTHONPATH for Python apps
--files FILES Comma-separated list of files to be placed in the working
directory of each executor. File paths of these files
in executors can be accessed via SparkFiles.get(fileName).
Specially, you can pass in a custom log output format file named `log4j.properties`
Note: The file name must be `log4j.properties` to take effect
--archives Comma separated list of archives to be extracted into the
working directory of each executor. Support file types: zip、tgz、tar、tar.gz
--status job_id If given, requests the status and details of the job specified
--verbose Print additional debug output
--version, -v Print out the dla-spark-toolkit version.
List Spark Job Only:
--list List Spark Job, should use specify --vcName and --regionId
--pagenumber, -pn Set page number which want to list (default: 1)
--pagesize, -ps Set page size which want to list (default: 1)
Get Job Log Only:
--get-log job_id Get job log
Kill Spark Job Only:
--kill job_id Specifies the jobid to be killed
Spark Offline SQL options:
-e <quoted-query-string> SQL from command line. By default, use SubmitSparkSQL API to submit SQL, support set command to set spark conf.
you can set --disable-submit-sql to submit an SQL job using the previous SubmitSparkJob API,
which requires the user to specified the --oss-upload-path
-f <filename> SQL from files. By default, use SubmitSparkSQL API to submit SQL, support set command to set spark conf.
you can set --disable-submit-sql to submit an SQL job using the previous SubmitSparkJob API,
which requires the user to specified the --oss-upload-path
-d,--define <key=value> Variable substitution to apply to spark sql
commands. e.g. -d A=B or --define A=B
--hivevar <key=value> Variable substitution to apply to spark sql
commands. e.g. --hivevar A=B
--hiveconf <property=value> Use value for given property, DLA spark toolkit will add `spark.hadoop.` prefix to property
--database <databasename> Specify the database to use
--enable-inner-endpoint It means that DLA pop SDK and OSS SDK will use the endpoint of Intranet to access DLA,
you can turn on this option when you are on Alibaba cloud's ECS machine.
Inner API options:
--api-retry-times Specifies the number of retries that the client fails to call the API, default 3.
--time-out-seconds Specifies the timeout for the API(time unit is second (s)), which is considered a call failure.
default 10s.
Spark-Submit工具的脚本将自动读取conf/spark-defaults.conf
中的配置信息。如果您通过命令行修改了conf/spark-defaults.conf
中的参数取值,Spark-Submit工具将获取命令行提交的参数值。
配置兼容性说明
为了方便兼容开源社区的Spark-Submit,以下非开源社区的选项也可以通过Spark Conf进行设置。
--keyId #--conf spark.dla.access.key.id=<value>
--secretId #--conf spark.dla.access.secret.id=<value>
--regionId #--conf spark.dla.region.id=<value>
--vcName #--conf spark.dla.vc.name=<value>
--oss-keyId #--conf spark.dla.oss.access.key.id=<value>
--oss-secretId #--conf spark.dla.oss.access.secret.id=<value>
--oss-endpoint #--conf spark.dla.oss.endpoint=<value>
--oss-upload-path #--conf spark.dla.oss.upload.path=<value>
以下选项暂不支持,或者对于DLA Spark无意义,它们的值会被忽略。
Useless options(these options will be ignored):
--deploy-mode
--master
--packages, please use `--jars` instead
--exclude-packages
--proxy-user
--repositories
--keytab
--principal
--queue
--total-executor-cores
--driver-library-path
--driver-class-path
--supervise
-S,--silent
-i <filename>
由于DLA Spark的Driver和Executor是运行在弹性容器上的,而弹性容器只能选取某些固定的资源规格,DLA Spark目前支持的资源规格请参见Serverless Spark概述。因此,阿里云数据湖分析团队对Spark-Submit中设置资源规格参数的选项进行了处理。开源社区Spark-Submit和DLA-Spark-Toolkit表现不一致的参数说明如下表所示:
参数名称 | 说明 |
--driver-cores/--conf spark.driver.cores | 设置Driver的核数。DLA-Spark-Toolkit会选择最接近用户指定的核数的资源规格并且该资源规格的核数大于等于用户指定的核数。 |
--driver-memory/--conf spark.driver.memory | 设置Driver的内存。DLA-Spark-Toolkit会选择最接近用户指定的内存的资源规格并且该资源规格的内存大于等于用户指定的内存。 |
--executor-cores/--conf spark.executor.cores | 设置Executor的核数。DLA-Spark-Toolkit会选择最接近用户指定的核数的资源规格并且该资源规格的核数大于等于用户指定的核数。 |
--executor-memory/--conf spark.executor.memory | 设置Executor的内存。DLA-Spark-Toolkit会选择最接近用户指定的内存的资源规格并且该资源规格的内存大于等于用户指定的内存。 |
DLA-Spark-Toolkit其他独有的参数如下表所示:
参数名称 | 说明 |
--driver-resource-spec | 指定Driver的资源规格。优先级高于 |
--executor-resource-spec | 指定Driver的资源规格。优先级高于 |
--api-retry-times | DLA-Spark-Toolkit内部执行命令失败时的重试次数(除提交命令外),提交任务不是一个幂等操作,由于网络超时等原因导致的提交失败,实际上任务可能在后台成功,为防止任务重复提交,提交任务失败将不会重试。您需要自行获取已经提交的任务列表(--list),或者去DLA Spark控制台查看任务列表判断任务是否提交成功。 |
--time-out-seconds | DLA-Spark-Toolkit内部默认的网络超时时间,默认为10,超时命令将会失败重试。 |
--enable-inner-endpoint | 当用户处于阿里云ECS机器上时,可以指定此选项,DLA-Spark-Toolkit将使用内网环境来访问DLA Pop SDK和OSS SDK,内网环境比公网环境稳定。 |
--list | 用于获取作业列表,常搭配 --pagesize和 --pagenumber使用,分别用于指定列表的页数和和一页显示的作业个数,默认值分别为1,10(代表返回第一页的10个作业)。 |
--kill | 接收一个JobID作为参数,用于终止对应的JobID作业。 |
--get-log | 接收一个JobID作为参数,用于获取对应的JobID作业的日志。 |
--status | 接收一个JobID作为参数,用于获取对应的JobID作业的详细情况。 |
如何提交作业
在作业管理页面,提交Spark作业需要按照JSON格式,如下所示:
{ "name": "xxx", "file": "oss://{bucket-name}/jars/xxx.jar", "jars": "oss://{bucket-name}/jars/xxx.jar,oss://{bucket-name}/jars/xxx.jar" "className": "xxx.xxx.xxx.xxx.xxx", "args": [ "xxx", "xxx" ], "conf": { "spark.executor.instances": "1", "spark.driver.resourceSpec": "medium", "spark.executor.resourceSpec": "medium", "spark.dla.job.log.oss.uri": "oss://{bucket-name}/path/to/log/" } }
使用Spark-Submit工具,则按照如下格式提交作业:
./bin/spark-submit \ --class xxx.xxx.xxx.xxx.xxx \ --verbose \ --name xxx \ --jars oss://{bucket-name}/jars/xxx.jar,oss://{bucket-name}/jars/xxx.jar --conf spark.driver.resourceSpec=medium \ --conf spark.executor.instances=1 \ --conf spark.executor.resourceSpec=medium \ oss://{bucket-name}/jars/xxx.jar \ args0 args1 ##主程序文件,--jars指定的jar包, --py-files --files均支持本地文件路径和oss文件路径。 ##指定的本地资源需要使用绝对路径,spark-submit会自动上传本地文件资源至用户指定的oss目录 ##用户可使用 --oss-upload-path 或者在spark-defaults.conf中设置ossUploadPath的值来指定上传到oss的目录 ##资源上传时,会使用md5校验文件内容,当指定的oss目录中有相同文件名且md5相同时,将不再重复上传。 ##注意您如果手动更新了oss上传目录的jar包,请删除掉对应md5文件 ##格式: --jars /path/to/local/directory/XXX.jar,/path/to/local/directory/XXX.jar ##多个文件以逗号隔开, 文件指定绝对路径 ## --jars, --py-files --files 也支持指定本地目录,会上传该目录下所有的文件(不递归上传子目录内容) ## 目录路径也需要指定绝对路径 如 --jars /path/to/local/directory/,/path/to/local/directory2/ ## 多个目录以逗号隔开,目录使用绝对路径 ##程序输出, 可通过查看下方输出的SparkUI连接访问作业的SparkUI, 和 JobDetail查看作业提交的参数是否符合预期 Info: job status: starting Info: job status: starting Info: job status: starting Info: job status: starting Info: job status: starting Info: job status: starting Info: job status: starting Info: job status: starting Info: job status: starting Info: job status: starting Info: job status: starting Info: job status: running { "jobId": "", "jobName": "SparkPi", "status": "running", "detail": "", "sparkUI": "", "createTime": "2020-08-20 14:12:07", "updateTime": "2020-08-20 14:12:07", ... } Job Detail: { "name": "SparkPi", "className": "org.apache.spark.examples.SparkPi", "conf": { "spark.driver.resourceSpec": "medium", "spark.executor.instances": "1", "spark.executor.resourceSpec": "medium" }, "file": "", "sparkUI":"https://xxx" }
spark-submit作业退出码说明如下:
255 #代表作业执行失败 0 #代表作业执行成功 143 #代表作业被kill
说明使用子账号AccessKeyId、AccessKeySecret提交作业请参考细粒度配置RAM子账号权限。
由于Spark-Submit上传本地jar包时,需要RAM账号拥有对OSS的访问权限,您可以在用户页面为相应子账户添加AliyunOSSFullAccess权限。
如何结束Spark作业
执行如下命令即可结束Spark作业:
./spark-submit \
--kill <jobId>
结果如下:
## 打印结果
Info: kill job: jxxxxxxxx, response: null
如何查看Spark作业列表
您可以通过命令行的方式查看作业,例如指定查看任务列表中第1页,总共1个作业:
./bin/spark-submit \
--list --pagenumber 1 --pagesize 1
结果如下:
## 打印结果
{
"requestId": "",
"dataResult": {
"pageNumber": "1",
"pageSize": "1",
"totalCount": "251",
"jobList": [
{
"createTime": "2020-08-20 11:02:17",
"createTimeValue": "1597892537000",
"detail": "",
"driverResourceSpec": "large",
"executorInstances": "4",
"executorResourceSpec": "large",
"jobId": "",
"jobName": "",
"sparkUI": "",
"status": "running",
"submitTime": "2020-08-20 11:01:58",
"submitTimeValue": "1597892518000",
"updateTime": "2020-08-20 11:22:01",
"updateTimeValue": "1597893721000",
"vcName": ""
}
]
}
}
如何获取作业提交参数和SparkUI
执行如下命令即可获取:
./bin/spark-submit --status <jobId>
获取结果如下:
## 打印输出
Info: job status: success
Info:
{
"jobId": "jxxxxxxxx",
"jobName": "drop database if exists `",
"status": "success",
"detail": "xxxxxx",
"sparkUI": "xxxxxxx",
"createTime": "2021-05-08 20:02:28",
"updateTime": "2021-05-08 20:04:05",
"submitTime": "2021-05-08 20:02:28",
"createTimeValue": "1620475348180",
"updateTimeValue": "1620475445000",
"submitTimeValue": "1620475348180",
"vcName": "release-test"
}
Info: Job Detail:
set spark.sql.hive.metastore.version=dla;
set spark.dla.connectors=oss;
set spark.executor.instances=1;
set spark.sql.hive.metastore.version = dla;
set spark.dla.eni.enable = true;
set spark.dla.eni.security.group.id = xxxx ;
set spark.dla.eni.vswitch.id = xxxxx;
drop database if exists `my_hdfs_db_1` CASCADE;
如何获取作业日志
执行如下命令即可获取:
./bin/spark-submit --get-log <jobId>
获取结果如下:
##打印结果
20/08/20 06:24:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
20/08/20 06:24:58 INFO SparkContext: Running Spark version 2.4.5
20/08/20 06:24:58 INFO SparkContext: Submitted application: Spark Pi
20/08/20 06:24:58 INFO SecurityManager: Changing view acls to: spark
20/08/20 06:24:58 INFO SecurityManager: Changing modify acls to: spark
20/08/20 06:24:58 INFO SecurityManager: Changing view acls groups to:
20/08/20 06:24:58 INFO SecurityManager: Changing modify acls groups to:
20/08/20 06:24:58 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(spark); groups with view permissions: Set(); users with modify permissions: Set(spark); groups with modify permissions: Set()
...