本文主要为您介绍Spark Load导入方式。
背景信息
Spark Load是一种异步导入方式,您需要通过MySQL协议创建Spark类型导入任务,并通过
SHOW LOAD
查看导入结果。Spark Load利用了Spark集群资源对要导入的数据进行了排序,Doris BE直接写文件,这样能大大降低Doris集群的资源使用,对于历史海量数据迁移降低Doris集群资源使用及负载有很好的效果。
如果您在没有Spark集群这种资源的情况下,又想方便、快速的完成外部存储历史数据的迁移,可以使用Broker Load。相比于Spark Load导入,Broker Load对Doris集群的资源占用会更高。
适用场景
Spark Load通过外部的Spark资源实现对导入数据的预处理,提高Doris大数据量的导入性能并且节省Doris集群的计算资源。主要用于初次迁移、大数据量导入Doris的场景。
源数据在Spark可以访问的存储系统中,如HDFS。
数据量达到10 GB以上至TB级别的业务场景。
数据量较小或不满足上述情况时,建议使用Stream Load或者Broker Load。
基本流程
您可以通过MySQL客户端提交Spark类型导入任务,FE记录元数据并返回提交成功,基本流程如下所示。
+
|
+----v----+
| FE |---------------------------------+
+----+----+ |
| 3. FE send push tasks |
| 5. FE publish version |
+------------+------------+ |
| | | |
+---v---+ +---v---+ +---v---+ |
| BE | | BE | | BE | |1. FE submit Spark ETL job
+---^---+ +---^---+ +---^---+ |
|4. BE push with broker | |
+---+---+ +---+---+ +---+---+ |
|Broker | |Broker | |Broker | |
+---^---+ +---^---+ +---^---+ |
| | | |
+---+------------+------------+---+ 2.ETL +-------------v---------------+
| HDFS +-------> Spark cluster |
| <-------+ |
+---------------------------------+ +-----------------------------+
Spark Load任务的执行主要分为以下5个阶段。
FE调度提交ETL任务到Spark集群执行。
Spark集群执行ETL完成对导入数据的预处理。包括全局字典构建(BITMAP类型)、分区、排序、聚合等。
ETL任务完成后,FE获取预处理过的每个分片的数据路径,并调度相关的BE执行Push任务。
BE通过Broker读取数据,转化为Doris底层存储格式。
FE调度生效版本,完成导入任务。
全局字典
适用场景
目前Doris中Bitmap列是使用类库Roaringbitmap实现的,而Roaringbitmap的输入数据类型只能是整型,因此如果要在导入流程中实现对于Bitmap列的预计算,则需要将输入数据的类型转换成整型。
在Doris现有的导入流程中,全局字典的数据结构是基于Hive表实现的,保存了原始值到编码值的映射。
构建流程
读取上游数据源的数据,生成一张Hive临时表,记为hive_table。
从hive_table中抽取待去重字段的去重值,生成一张新的Hive表,记为distinct_value_table。
新建一张全局字典表,记为dict_table。字典表一列为原始值,另一列为编码后的值。
将distinct_value_table与dict_table做
left join
,计算出新增的去重值集合,然后对这个集合使用窗口函数进行编码,此时去重列原始值就多了一列编码后的值,最后将这两列的数据写回dict_table。将dict_table与hive_table连接,完成hive_table中原始值替换成整型编码值的工作。
hive_table会被下一步数据预处理的流程所读取,经过计算后导入到Doris中。
数据预处理(DPP)
基本流程
从数据源读取数据,上游数据源可以是HDFS文件,也可以是Hive表。
对读取到的数据进行字段映射,表达式计算以及根据分区信息生成分桶字段
bucket_id
。根据Doris表的rollup元数据生成RollupTree。
遍历RollupTree,进行分层的聚合操作,下一个层级的rollup可以由上一个层的rollup计算得来。
每次完成聚合计算后,会对数据根据
bucket_id
进行分桶然后写入HDFS中。后续broker会拉取HDFS中的文件然后导入Doris BE中。
Hive Bitmap UDF
Spark支持将Hive生成的Bitmap数据直接导入到Doris。
配置ETL集群
配置集群
Spark作为一种外部计算资源在Doris中用来完成ETL工作,引入Resource Management来管理Doris使用的这些外部资源。
提交Spark导入任务之前,需要配置执行ETL任务的Spark集群,参数配置如下所示,详情参见下面的创建资源配置。
-- create spark resource
CREATE EXTERNAL RESOURCE resource_name
PROPERTIES
(
type = spark,
spark_conf_key = spark_conf_value,
working_dir = path,
broker = broker_name,
broker.property_key = property_value,
broker.hadoop.security.authentication = kerberos,
broker.kerberos_principal = doris@YOUR.COM,
broker.kerberos_keytab = /home/doris/my.keytab
broker.kerberos_keytab_content = ASDOWHDLAWI********ALDJSDIWALD
)
-- drop spark resource
DROP RESOURCE resource_name
-- show resources
SHOW RESOURCES
SHOW PROC "/resources"
-- privileges
GRANT USAGE_PRIV ON RESOURCE resource_name TO user_identity
GRANT USAGE_PRIV ON RESOURCE resource_name TO ROLE role_name
REVOKE USAGE_PRIV ON RESOURCE resource_name FROM user_identity
REVOKE USAGE_PRIV ON RESOURCE resource_name FROM ROLE role_name
创建资源配置
resource_name为Doris中配置的Spark资源的名字。
PROPERTIES是Spark资源相关参数,参数如下:
type:资源类型,必填,目前仅支持Spark。
Spark相关参数如下:
spark.master:必填,参数值为yarn或
spark://host:port
。spark.submit.deployMode:Spark程序的部署模式,必填,支持Cluster,Client两种模式。
spark.hadoop.yarn.resourcemanager.address:参数spark.master为yarn时,该参数必填。
spark.hadoop.fs.defaultFS:参数spark.master为yarn时,该参数必填。
其他参数为可选,详情请参见Spark Configuration。
working_dir: ETL使用的目录。Spark作为ETL资源使用时必填。例如
hdfs://host:port/tmp/doris
。broker.hadoop.security.authentication:指定认证方式为kerberos。
broker.kerberos_principal:指定kerberos的principal。
broker.kerberos_keytab:指定kerberos的keytab文件路径。该文件必须为Broker进程所在服务器上的文件的绝对路径。并且可以被Broker进程访问。
broker.kerberos_keytab_content:指定kerberos中keytab文件内容经过base64编码之后的内容。该参数和broker.kerberos_keytab配置二选一即可。
broker: broker名字。Spark作为ETL资源使用时必填。需要使用
ALTER SYSTEM ADD BROKER
命令提前完成配置。broker.property_key:broker读取ETL生成的中间文件时需要指定的认证信息等。
参数示例。
YARN Cluster模式
CREATE EXTERNAL RESOURCE "spark0" PROPERTIES ( "type" = "spark", "spark.master" = "yarn", "spark.submit.deployMode" = "cluster", "spark.jars" = "xxx.jar,yyy.jar", "spark.files" = "/tmp/aaa,/tmp/bbb", "spark.executor.memory" = "1g", "spark.yarn.queue" = "queue0", "spark.hadoop.yarn.resourcemanager.address" = "127.0.0.1:9999", "spark.hadoop.fs.defaultFS" = "hdfs://127.0.0.1:10000", "working_dir" = "hdfs://127.0.0.1:10000/tmp/doris", "broker" = "broker0", "broker.username" = "user0", "broker.password" = "password0" );
Spark Standalone Client模式
CREATE EXTERNAL RESOURCE "spark1" PROPERTIES ( "type" = "spark", "spark.master" = "spark://127.0.0.1:7777", "spark.submit.deployMode" = "client", "working_dir" = "hdfs://127.0.0.1:10000/tmp/doris", "broker" = "broker1" );
Spark Load支持Kerberos认证
如果是Spark Load访问带有Kerberos认证的Hadoop集群资源,您只需要在创建Spark resource的时候指定以下参数即可:
broker.hadoop.security.authentication:指定认证方式为kerberos。
broker.kerberos_principal:指定kerberos的principal。
broker.kerberos_keytab:指定kerberos的keytab文件路径。该文件必须为Broker进程所在服务器上的文件的绝对路径。并且可以被Broker进程访问。
broker.kerberos_keytab_content:指定kerberos中keytab文件内容经过base64编码之后的内容。该参数和kerberos_keytab配置二选一即可。
参数示例
CREATE EXTERNAL RESOURCE "spark_on_kerberos"
PROPERTIES
(
"type" = "spark",
"spark.master" = "yarn",
"spark.submit.deployMode" = "cluster",
"spark.jars" = "xxx.jar,yyy.jar",
"spark.files" = "/tmp/aaa,/tmp/bbb",
"spark.executor.memory" = "1g",
"spark.yarn.queue" = "queue0",
"spark.hadoop.yarn.resourcemanager.address" = "127.0.0.1:9999",
"spark.hadoop.fs.defaultFS" = "hdfs://127.0.0.1:10000",
"working_dir" = "hdfs://127.0.0.1:10000/tmp/doris",
"broker" = "broker0",
"broker.hadoop.security.authentication" = "kerberos",
"broker.kerberos_principal" = "doris@YOUR.COM",
"broker.kerberos_keytab" = "/home/doris/my.keytab"
);
查看资源
普通账户只能看到自己有USAGE_PRIV使用权限的资源。
root和admin账户可以看到所有的资源。
资源权限
资源权限通过GRANT REVOKE来管理,目前仅支持USAGE_PRIV使用权限。
可以将USAGE_PRIV权限赋予某个账户或者某个角色,角色的使用与之前一致。
-- 授予spark0资源的使用权限给账户user0
GRANT USAGE_PRIV ON RESOURCE "spark0" TO "user0"@"%";
-- 授予spark0资源的使用权限给角色role0
GRANT USAGE_PRIV ON RESOURCE "spark0" TO ROLE "role0";
-- 授予所有资源的使用权限给账户user0
GRANT USAGE_PRIV ON RESOURCE * TO "user0"@"%";
-- 授予所有资源的使用权限给角色role0
GRANT USAGE_PRIV ON RESOURCE * TO ROLE "role0";
-- 撤销账户user0的spark0资源使用权限
REVOKE USAGE_PRIV ON RESOURCE "spark0" FROM "user0"@"%";
配置SPARK客户端
FE底层通过执行spark-submit
命令去提交Spark任务,因此需要为FE配置Spark客户端,建议使用2.4.5或以上的Spark2官方版本,您可以通过Spark下载地址进行下载,下载完成后,请按以下步骤完成配置。
配置SPARK_HOME环境变量
将spark客户端放在FE同一台机器上的目录下,并在FE的配置文件配置spark_home_default_dir项指向此目录,此配置项默认为FE根目录下的lib/spark2x路径,此项不可为空。
配置SPARK依赖包
将Spark客户端下的
jars
文件夹内所有JAR包归档打包成一个ZIP文件,并在FE的配置文件配置spark_resource_path
项指向此ZIP文件,若此配置项为空,则FE会尝试寻找FE根目录下的lib/spark2x/jars/spark-2x.zip文件,若没有找到则会报文件不存在的错误。当提交Spark Load任务时,会将归档好的依赖文件上传至远端仓库,默认仓库路径挂在
working_dir/{cluster_id}
目录下,并以__spark_repository__{resource_name}
命名,表示集群内的一个resource对应一个远端仓库,远端仓库目录结构参考如下:__spark_repository__spark0/ |-__archive_1.0.0/ | |-__lib_990325d2c0d1d5e45bf675e54e44fb16_spark-dpp-1.0.0-jar-with-dependencies.jar | |-__lib_7670c29daf535efe3c9b923f778f61fc_spark-2x.zip |-__archive_1.1.0/ | |-__lib_64d5696f99c379af2bee28c1c84271d5_spark-dpp-1.1.0-jar-with-dependencies.jar | |-__lib_1bbb74bb6b264a270bc7fca3e964160f_spark-2x.zip |-__archive_1.2.0/ | |-...
说明除了Spark依赖(默认以spark-2x.zip命名),FE还会上传DPP的依赖包至远端仓库,若此次Spark Load提交的所有依赖文件都已存在远端仓库,那么就不需要再上传依赖,节省了每次重复上传大量文件的时间。
配置YARN客户端
FE底层通过执行
yarn
命令去获取正在运行的Application的状态以及终止Application,因此需要为FE配置YARN客户端,建议使用hadoop-2.5.2或hadoop-2.0以上的官方版本,下载详情请参见hadoop下载地址。将下载好的YARN客户端放在FE同一台机器的目录下,并在FE配置文件配置yarn_client_path项指向yarn的二进制可执行文件,默认为FE根目录下的lib/yarn-client/hadoop/bin/yarn路径。
(可选操作)当FE通过yarn客户端去获取Application的状态或者终止Application时,默认会在FE根目录下的lib/yarn-config路径下生成执行yarn命令所需的配置文件,此路径可通过在FE配置文件配置yarn_config_dir项修改,目前生成的配置文件包括core-site.xml和yarn-site.xml。
创建导入
主要介绍Spark Load的创建导入语法中参数意义和注意事项。
创建导入的详细语法执行HELP SPARK LOAD
查看语法帮助。
Spark Load导入语法
LOAD LABEL load_label (data_desc, ...) WITH RESOURCE resource_name [resource_properties] [PROPERTIES (key1=value1, ... )] * load_label: db_name.label_name * data_desc: DATA INFILE ('file_path', ...) [NEGATIVE] INTO TABLE tbl_name [PARTITION (p1, p2)] [COLUMNS TERMINATED BY separator ] [(col1, ...)] [COLUMNS FROM PATH AS (col2, ...)] [SET (k1=f1(xx), k2=f2(xx))] [WHERE predicate] DATA FROM TABLE hive_external_tbl [NEGATIVE] INTO TABLE tbl_name [PARTITION (p1, p2)] [SET (k1=f1(xx), k2=f2(xx))] [WHERE predicate] * resource_properties: (key2=value2, ...)
示例一:上游数据源为HDFS文件的情况
LOAD LABEL db1.label1 ( DATA INFILE("hdfs://abc.com:8888/user/palo/test/ml/file1") INTO TABLE tbl1 COLUMNS TERMINATED BY "," (tmp_c1,tmp_c2) SET ( id=tmp_c2, name=tmp_c1 ), DATA INFILE("hdfs://abc.com:8888/user/palo/test/ml/file2") INTO TABLE tbl2 COLUMNS TERMINATED BY "," (col1, col2) where col1 > 1 ) WITH RESOURCE 'spark0' ( "spark.executor.memory" = "2g", "spark.shuffle.compress" = "true" ) PROPERTIES ( "timeout" = "3600" );
示例二:上游数据源是Hive表的情况
新建Hive外部表。
CREATE EXTERNAL TABLE hive_t1 ( k1 INT, K2 SMALLINT, k3 varchar(50), uuid varchar(100) ) ENGINE=hive properties ( "database" = "tmp", "table" = "t1", "hive.metastore.uris" = "thrift://0.0.0.0:8080" );
提交Load命令,要求导入的Doris表中的列必须在Hive外部表中存在。
LOAD LABEL db1.label1 ( DATA FROM TABLE hive_t1 INTO TABLE tbl1 SET ( uuid=bitmap_dict(uuid) ) ) WITH RESOURCE 'spark0' ( "spark.executor.memory" = "2g", "spark.shuffle.compress" = "true" ) PROPERTIES ( "timeout" = "3600" );
示例三:上游数据源是Hive Binary类型情况
新建Hive外部表。
CREATE EXTERNAL TABLE hive_t1 ( k1 INT, K2 SMALLINT, k3 varchar(50), uuid varchar(100) //hive中的类型为binary ) ENGINE=hive properties ( "database" = "tmp", "table" = "t1", "hive.metastore.uris" = "thrift://0.0.0.0:8080" );
提交Load命令,要求导入的Doris表中的列必须在Hive外部表中存在。
LOAD LABEL db1.label1 ( DATA FROM TABLE hive_t1 INTO TABLE tbl1 SET ( uuid=binary_bitmap(uuid) ) ) WITH RESOURCE 'spark0' ( "spark.executor.memory" = "2g", "spark.shuffle.compress" = "true" ) PROPERTIES ( "timeout" = "3600" );
示例四:导入Hive分区表的数据
Hive建表语句
create table test_partition( id int, name string, age int ) partitioned by (dt string) row format delimited fields terminated by ',' stored as textfile;
Doris建表语句
CREATE TABLE IF NOT EXISTS test_partition_04 ( dt date, id int, name string, age int ) UNIQUE KEY(`dt`, `id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1 PROPERTIES ( "replication_allocation" = "tag.location.default: 1" );
Spark Load语句
CREATE EXTERNAL RESOURCE "spark_resource" PROPERTIES ( "type" = "spark", "spark.master" = "yarn", "spark.submit.deployMode" = "cluster", "spark.executor.memory" = "1g", "spark.yarn.queue" = "default", "spark.hadoop.yarn.resourcemanager.address" = "localhost:50056", "spark.hadoop.fs.defaultFS" = "hdfs://localhost:9000", "working_dir" = "hdfs://localhost:9000/tmp/doris", "broker" = "broker_01" ); LOAD LABEL demo.test_hive_partition_table_18 ( DATA INFILE("hdfs://localhost:9000/user/hive/warehouse/demo.db/test/dt=2022-08-01/*") INTO TABLE test_partition_04 COLUMNS TERMINATED BY "," FORMAT AS "csv" (id,name,age) COLUMNS FROM PATH AS (`dt`) SET ( dt=dt, id=id, name=name, age=age ) ) WITH RESOURCE 'spark_resource' ( "spark.executor.memory" = "1g", "spark.shuffle.compress" = "true" ) PROPERTIES ( "timeout" = "3600" );
参数和相关导入说明
数据描述类参数
目前支持的数据源有CSV和Hive table。其他规则与Broker Load一致。
导入作业参数
导入作业参数主要指的是Spark Load创建导入语句中的属于opt_properties部分的参数。导入作业参数是作用于整个导入作业的。规则与Broker Load一致。
Spark资源参数
Spark资源需要提前配置到Doris系统中并且赋予您USAGE_PRIV权限后才能使用Spark Load。
当您有临时性的需求,比如增加任务使用的资源而修改Spark configs,可参考如下示例进行设置。
WITH RESOURCE 'spark0' ( "spark.driver.memory" = "1g", "spark.executor.memory" = "3g" )
说明设置仅对本次任务生效,并不影响Doris集群中已有的配置。
Hive表作为数据源导入
目前如果期望在导入流程中将Hive表作为数据源,那么需要先新建一张类型为Hive的外部表,然后提交导入命令时指定外部表的表名即可。
导入流程构建全局字典
适用于Doris表聚合列的数据类型为Bitmap类型。在
load
命令中指定需要构建全局字典的字段即可,格式为:Doris字段名称=bitmap_dict(Hive表字段名称)。重要目前只有在上游数据源为Hive表时才支持全局字典的构建。
Hive Binary(bitmap)类型列的导入
适用于Doris表聚合列的数据类型为Bitmap类型,且数据源Hive表中对应列的数据类型为binary(通过FE中spark-dpp中的
org.apache.doris.load.loadv2.dpp.BitmapValue
类序列化)类型。全局字典,在
load
命令中指定相应字段即可,格式为:Doris字段名称=binary_bitmap(Hive表字段名称)。重要目前只有在上游数据源为Hive表时才支持Binary(Bitmap)类型的数据导入。
查看导入
Spark Load导入方式同Broker Load一样都是异步的,所以您必须将创建导入的Label记录,并且在查看导入命令中使用Label来查看导入结果。查看导入命令在所有导入方式中是通用的,具体语法可执行HELP SHOW LOAD
查看,查看导入示例如下。
show load order by createtime desc limit 1\G
返回如下信息。
*************************** 1. row ***************************
JobId: 76391
Label: label1
State: FINISHED
Progress: ETL:100%; LOAD:100%
Type: SPARK
EtlInfo: unselected.rows=4; dpp.abnorm.ALL=15; dpp.norm.ALL=28133376
TaskInfo: cluster:cluster0; timeout(s):10800; max_filter_ratio:5.0E-5
ErrorMsg: N/A
CreateTime: 2019-07-27 11:46:42
EtlStartTime: 2019-07-27 11:46:44
EtlFinishTime: 2019-07-27 11:49:44
LoadStartTime: 2019-07-27 11:49:44
LoadFinishTime: 2019-07-27 11:50:16
URL: http://1.1.*.*:80**/proxy/application_15866****3848_0035/
JobDetails: {"ScannedRows":28133395,"TaskNumber":1,"FileNumber":1,"FileSize":200000}
返回结果集中参数意义可以参见Broker Load。不同点如下:
State导入任务当前所处的阶段。任务提交之后状态为PENDING,提交Spark ETL之后状态变为ETL,ETL完成之后FE调度BE执行push操作状态变为LOADING,push完成并且版本生效后状态变为FINISHED。
导入任务的最终阶段有两个:CANCELLED和FINISHED。当Load job处于这两个阶段时导入完成。其中CANCELLED为导入失败,FINISHED为导入成功。
Progress导入任务的进度描述。分为两种进度:ETL和LOAD,对应了导入流程的两个阶段ETL和LOADING。
LOAD进度=当前已完成所有replica导入的tablet个数/本次导入任务的总tablet个数*100%
LOAD的进度范围为0~100%。如果所有导入表均完成导入,此时LOAD的进度为99%,表示导入进入到最后生效阶段,整个导入完成后,LOAD的进度才会改为100%。
说明导入进度并不是线性的,所以如果一段时间内进度没有变化,并不代表导入没有在执行。
Type导入任务的类型。Spark Load为SPARK。
以下参数值含义:
CreateTime:导入创建的时间。
EtlStartTime:ETL阶段开始的时间。
EtlFinishTime:ETL阶段完成的时间。
LoadStartTime:LOADING阶段开始的时间。
LoadFinishTime:整个导入任务完成的时间。
JobDetails显示一些作业的详细运行状态,ETL结束的时候更新。包括导入文件的个数、总大小(字节)、子任务个数、已处理的原始行数等,如
{"ScannedRows":139264,"TaskNumber":1,"FileNumber":1,"FileSize":940754064}
。URL可复制输入到浏览器,跳转至相应Application的Web界面。
查看Spark launcher提交日志
Spark任务提交过程中产生的详细日志,默认保存在FE根目录下log/spark_launcher_log路径下,并以spark_launcher_{load_job_id}_{label}.log命名。
日志会在此目录下保存一段时间,当FE元数据中的导入信息被清理时,相应的日志也会被清理,默认保存时间为3天。
取消导入
当Spark Load作业状态不为CANCELLED或FINISHED时,您可以手动取消。取消时需要指定待取消导入任务的Label。取消导入命令语法可执行HELP CANCEL LOAD
查看。
相关系统配置
下面配置属于Spark Load的系统级别配置,即作用于所有Spark Load导入任务的配置。主要通过修改fe.conf来调整配置值。
enable_spark_load:开启Spark Load和创建Resource功能。默认为False,关闭此功能。
spark_load_default_timeout_second:任务默认超时时间为259200秒(3天)。
spark_home_default_dirspark:客户端路径
fe/lib/spark2x
。spark_resource_path:打包好的Spark依赖文件路径(默认为空)。
spark_launcher_log_dirspark:客户端的提交日志存放的目录
fe/log/spark_launcher_log
。yarn_client_pathyarn:二进制可执行文件路径
fe/lib/yarn-client/hadoop/bin/yarn
。yarn_config_diryarn:配置文件生成路径
fe/lib/yarn-config
。