云原生数据仓库AnalyticDB MySQL版湖仓版(3.0)Spark应用的描述格式为JSON格式,包含名称、JAR包路径以及执行时配置参数等信息。本文主要介绍如何配置一个Spark应用。

离线应用

应用包含BatchStreamingSQL,其中Batch为离线应用,任务执行时需要入口类所在的JAR包或Python文件。根据业务的不同,可能还需要提供入口类参数、执行依赖的JAR包、Python执行环境沙箱等。

云原生数据仓库AnalyticDB MySQL版Spark离线应用的编写方法与社区Spark-Submit工具的命令行参数类似。

离线应用配置示例

本章节以读取OSS数据为例,描述了典型的Spark离线应用的编写方式,包括应用名称、入口类所在的JAR包、入口类、入口类参数以及执行时的参数配置。命令行参数格式为JSON格式。示例如下:
 {
  "args": ["oss://${testBucketName}/data/test/test.csv"],
  "name": "spark-oss-test",
  "file": "oss://${testBucketName/jars/test/spark-examples-0.0.1-SNAPSHOT.jar",
  "className": "com.aliyun.spark.oss.SparkReadOss",
  "conf": {
    "spark.driver.resourceSpec": "medium",
    "spark.executor.resourceSpec": "medium",
    "spark.executor.instances": 2,
    "spark.adb.connectors": "oss"
  }
}
参数说明如下。
参数名称 是否必填 示例值 使用说明
args "args":["args0", "args1"] Spark应用传入的参数,多个参数之间以英文逗号(,)分隔。
name "name": "your_job_name" Spark应用名称。
file Python/Java/Scala应用必填 "file":"oss://testBucketName/path/to/your/jar" Spark应用主文件的存储位置。主文件是入口类所在的JAR包或者Python的入口执行文件。
说明 Spark应用主文件目前只支持存储在OSS中。
className Java/Scala应用必填 "className":"com.aliyun.spark.oss.SparkReadOss" Java或者Scala程序入口类。Python不需要指定入口类。
sqls SQL应用必填 "sqls":["select * from xxxx","show databases"] 本关键字允许用户不提交JAR包和Python文件,直接提交SQL离线应用。该关键字跟fileclassNameargs关键字不能同时使用。用户可以同时指定多条SQL语句,中间以英文逗号(,)分隔。多条SQL语句按照指定的顺序依次执行。
jars "jars":["oss://testBucketName/path/to/jar","oss://testBucketName/path/to/jar"] Spark应用依赖的JAR包,多个JAR包之间以英文逗号(,)分隔。JAR包在运行时会被加入到Driver和Executor JVM的ClassPath里面。
说明 Spark应用所依赖的所有JAR包须存储在OSS中。
files "files":["oss://testBucketName/path/to/files","oss://testBucketName/path/to/files"] Spark应用依赖的文件资源,文件会被下载到Driver和Executor进程的当前执行目录下。

支持配置文件别名,例如oss://testBucketName/test/test1.txt#test1,test1为文件别名,用户可以使用./test1或者./test1.txt访问文件。

多个文件中间用英文逗号(,)分隔。
说明
  • files中包含名为oss://<path/to>/log4j.properties的文件时(文件名固定为log4j.properties),Spark会使用该log4j.properties作为日志配置。
  • Spark应用所依赖的所有文件须存储在OSS中。
archives "archives":["oss://testBucketName/path/to/archives","oss://testBucketName/path/to/archives"] Spark应用依赖的压缩包资源,目前支持.TAR.GZ后缀。压缩包会被解压到当前Spark进程的当前目录下。

支持配置文件别名,例如oss://testBucketName/test/test1.tar.gz#test1,test1为文件别名。假设test2.txt是test1.tar.gz压缩包中的文件,用户可以使用./test1/test2.txt或者./test1.tar.gz/test2.txt访问解压后的文件。

多个压缩包中间使用英文逗号(,)分隔。
说明 Spark应用所依赖的所有压缩包须存储在OSS中。压缩包解压缩失败,任务会失败。
pyFiles Python应用可选 "pyFiles":["oss://testBucketName/path/to/pyfiles","oss://testBucketName/path/to/pyfiles"] PySpark依赖的Python文件,后缀可以是ZIP、PY和EGG。如果依赖多个Python文件,建议用户使用ZIP或者EGG压缩包。您可以在Python代码中以module方式引用Python文件。多个压缩包中间使用英文逗号(,)分隔。
说明 Spark应用所依赖的所有Python文件须存储在OSS中。
conf "conf":{"spark.driver.resourceSpec": "medium",spark.executor.resourceSpec":"medium,"spark.executor.instances": 2,"spark.adb.connectors": "oss"} 与社区Spark中的配置项基本一致,参数格式为key: value形式,多个参数之间以英文逗号(,)分隔。与社区Spark用法不一致的配置参数及AnalyticDB MySQL特有的配置参数,请参见conf配置参数说明

SQL应用

AnalyticDB MySQL版Spark支持在控制台直接提交Spark SQL。无需用户打包JAR包或者写Python代码,更利于数据开发人员使用Spark进行数据分析。提交Spark SQL时需要指定应用类型为SQL

SQL应用配置示例

-- Here is just an example of SparkSQL. Modify the content and run your spark program.

conf spark.driver.resourceSpec=medium;
conf spark.executor.instances=2;
conf spark.executor.resourceSpec=medium;
conf spark.app.name=Spark SQL Test;
conf spark.adb.connectors=oss;

-- Here are your sql statements
show databases;

Spark SQL支持的命令类型

Spark编辑器支持编辑SQL语句,每条SQL语句以英文分号(;)分隔。

Spark SQL支持以下类型命令:
  • CONF命令
    • 用于指定Spark的设置,置于SQL语句的前面。
    • 每条CONF命令指定一个Spark启动时参数的值,每条CONF命令以英文分号(;)分隔。
    • CONF命令的KeyValue均不加单引号或双引号。
    • CONF命令支持的配置参数,请参见conf配置参数说明
  • ADD JAR命令
    • 用于增加Spark SQL运行时依赖的JAR包,例如UDF的JAR包,各类数据源连接器的JAR包等。JAR包目前支持OSS格式路径,置于SQL语句的前面。
    • 每条ADD JAR命令指定一个OSS JAR包路径,路径字符串不加单引号或双引号,每条ADD JAR命令以英文分号(;)分隔。
  • Spark SQL语法所支持的DDL或DML语句。例如查询语句SELECT、插入语句INSERT等。

切换元数据服务

Spark SQL默认使用AnalyticDB MySQL元数据服务。如果使用其它元数据服务,可以采用以下两种方式切换:
  • 使用in-memory catalog
    CONF spark.sql.catalogImplementation = in-memory;
  • 使用Spark内置的hive metastore2.3.7版本或其他版本。
    CONF spark.sql.catalogImplementation = hive;
    CONF spark.sql.hive.metastore.version = 2.3.7;
    说明 如需要连接自建的Hive Metastore,可以采用社区Spark的标准配置方式,该配置会覆盖默认配置。社区Spark的标准配置方式,请参见Spark Configuration

conf配置参数说明

AnalyticDB MySQL版Spark配置项跟社区Spark中的配置项基本一致。本章节介绍AnalyticDB MySQL与社区Spark用法不一致的配置参数及AnalyticDB MySQL特有的配置参数。

  • 指定DriverExecutor资源
    重要 此用法与社区Spark不同。
    spark.driver.resourceSpecspark.executor.resourceSpec参数取值相同。
    参数名称 使用说明 对应社区Spark参数
    spark.driver.resourceSpec Spark driver的资源规格。不同型号的取值对应不同的规格,详情请参见Spark资源规格列表的型号列。

    例如CONF spark.driver.resourceSpec = c.small;,则Spark driver的资源规格为1核2 GB。

    spark.driver.coresspark.driver.memory
    spark.executor.resourceSpec Spark executor的资源规格。不同型号的取值对应不同的规格,详情请参见Spark资源规格列表的型号列。

    例如CONF spark.executor.resourceSpec = c.small;,则Spark executor的资源规格为1核2 GB。

    spark.executor.coresspark.executor.memory
  • Spark UI
    参数名称 默认值 参数说明
    spark.app.log.rootPath 存储AnalyticDB MySQL版Spark应用执行时产生的日志以及SparkUI EventLog的目录。目前只支持用户手动填写用于存储日志的OSS路径。如果不填写,则无法在应用结束后看到日志,也无法在应用结束后打开Spark UI。
  • RAM用户运维Spark
    参数名称 默认值 参数说明
    spark.adb.roleArn RAM系统中授予提交Spark应用的RAM用户的Role Arn,详情请参见RAM角色概览。RAM用户提交应用需要填写该参数,阿里云账号无需提交该参数。
  • AnalyticDB MySQL版Spark内置数据源连接器
    参数名称 默认值 参数说明
    spark.adb.connectors 启用AnalyticDB MySQL版Spark内置的连接器,连接器名称以逗号分隔,目前可选的连接器有OSS、hbase1.x、tablestore。
    spark.hadoop.io.compression.codec.snappy.native false 标识Snappy文件是否为标准Snappy文件。Hadoop默认识别的是Hadoop修改过的Snappy格式文件。设置为true时将使用标准Snappy库解压,否则使用hadoop默认的Snappy库解压。
  • 访问VPC和连接数据源
    参数名称 默认值 参数说明
    spark.adb.eni.vswitchId 开启VPC功能,配置弹性网卡的交换机ID。如果用户有ECS可以访问目标数据源,那么可以直接使用该ECS的交换机ID。
    spark.adb.eni.securityGroupId 开启VPC功能,用于弹性网卡的安全组ID。如果用户有ECS可以访问目标数据源,那么可以直接使用该ECS的安全组ID。
    spark.adb.eni.extraHosts 需要额外传入的IP和Host的映射关系,以便Spark能正确解析数据源中的Host。如连接自建的Hive数据源,则需要传入此参数。
    说明 IP和Host之间用空格分隔。多个IP和域名用英文逗号(,)分隔,如"ip0 master0, ip1 master1"。
  • Spark SQL连接AnalyticDB MySQL元数据
    参数名称 默认值 参数说明
    spark.sql.hive.metastore.version 指定Hive MetaStore的版本。配置为ADB时,可以连接AnalyticDB MySQL中的元数据信息,读取AnalyticDB MySQL的表信息。
  • 应用重试
    参数名称 默认值 参数说明
    spark.adb.maxAttempts 1 最大尝试次数,默认值为1,代表不需要重试。

    假设配置为3,则这个应用在一个滑动窗口期时间内最多尝试3次。

    spark.adb.attemptFailuresValidityInterval Long.MAX 重试计数的滑动窗口时间,单位:毫秒(ms)。

    假设配置为6000,那么当一次重试失败后,系统会计算过去6000ms总共进行了多少次重试,如果重试次数少于maxAttempts的配置值,则会继续重试。

  • 源配置
    参数名称 默认值 参数说明
    spark.adb.driver.cpu-vcores-ratio 1 Driver虚拟Core与实际CPU Core之间的比例。假设Driver是Medium规格(2Core8 GB),本参数值设置为2,那么Driver进程可以按照4个Core进行并发控制,相当于spark.driver.cores=4。
    spark.adb.executor.cpu-vcores-ratio 1 Executor虚拟Core与实际CPU Core之间的比例。当单个Task的CPU使用率比较低时,可以通过该配置,提升CPU利用效率。假设Executor是Medium规格(2Core8 GB),本参数值设置为2,那么Executor进程可以按照4个Core进行并发控制,也就是同时调度4个并发任务,相当于spark.executor.cores=4。

Spark资源规格列表

型号(Type) 规格
CPU(Core) 内存(GB)
c.small 1 2
small 1 4
m.small 1 8
c.medium 2 4
medium 2 8
m.medium 2 16
c.large 4 8
large 4 16
m.large 4 32
c.xlarge 8 16
xlarge 8 32
m.xlarge 8 64
c.2xlarge 16 32
2xlarge 16 64
m.2xlarge 16 128