本文主要为您介绍Spark Load导入方式。
背景信息
- Spark Load是一种异步导入方式,您需要通过MySQL协议创建Spark类型导入任务,并通过 - SHOW LOAD查看导入结果。
- Spark Load利用了Spark集群资源对要导入的数据进行了排序,Doris BE直接写文件,这样能大大降低Doris集群的资源使用,对于历史海量数据迁移降低Doris集群资源使用及负载有很好的效果。 
如果您在没有Spark集群这种资源的情况下,又想方便、快速的完成外部存储历史数据的迁移,可以使用Broker Load。相比于Spark Load导入,Broker Load对Doris集群的资源占用会更高。
适用场景
Spark Load通过外部的Spark资源实现对导入数据的预处理,提高Doris大数据量的导入性能并且节省Doris集群的计算资源。主要用于初次迁移、大数据量导入Doris的场景。
- 源数据在Spark可以访问的存储系统中,如HDFS。 
- 数据量达到10 GB以上至TB级别的业务场景。 
数据量较小或不满足上述情况时,建议使用Stream Load或者Broker Load。
基本流程
您可以通过MySQL客户端提交Spark类型导入任务,FE记录元数据并返回提交成功,基本流程如下所示。
+
                 | 
            +----v----+
            |   FE    |---------------------------------+
            +----+----+                                 |
                 | 3. FE send push tasks                |
                 | 5. FE publish version                |
    +------------+------------+                         |
    |            |            |                         |
+---v---+    +---v---+    +---v---+                     |
|  BE   |    |  BE   |    |  BE   |                     |1. FE submit Spark ETL job
+---^---+    +---^---+    +---^---+                     |
    |4. BE push with broker   |                         |
+---+---+    +---+---+    +---+---+                     |
|Broker |    |Broker |    |Broker |                     |
+---^---+    +---^---+    +---^---+                     |
    |            |            |                         |
+---+------------+------------+---+ 2.ETL +-------------v---------------+
|               HDFS              +------->       Spark cluster         |
|                                 <-------+                             |
+---------------------------------+       +-----------------------------+Spark Load任务的执行主要分为以下5个阶段。
- FE调度提交ETL任务到Spark集群执行。 
- Spark集群执行ETL完成对导入数据的预处理。包括全局字典构建(BITMAP类型)、分区、排序、聚合等。 
- ETL任务完成后,FE获取预处理过的每个分片的数据路径,并调度相关的BE执行Push任务。 
- BE通过Broker读取数据,转化为Doris底层存储格式。 
- FE调度生效版本,完成导入任务。 
全局字典
适用场景
- 目前Doris中Bitmap列是使用类库Roaringbitmap实现的,而Roaringbitmap的输入数据类型只能是整型,因此如果要在导入流程中实现对于Bitmap列的预计算,则需要将输入数据的类型转换成整型。 
- 在Doris现有的导入流程中,全局字典的数据结构是基于Hive表实现的,保存了原始值到编码值的映射。 
构建流程
- 读取上游数据源的数据,生成一张Hive临时表,记为hive_table。 
- 从hive_table中抽取待去重字段的去重值,生成一张新的Hive表,记为distinct_value_table。 
- 新建一张全局字典表,记为dict_table。字典表一列为原始值,另一列为编码后的值。 
- 将distinct_value_table与dict_table做 - left join,计算出新增的去重值集合,然后对这个集合使用窗口函数进行编码,此时去重列原始值就多了一列编码后的值,最后将这两列的数据写回dict_table。
- 将dict_table与hive_table连接,完成hive_table中原始值替换成整型编码值的工作。 
- hive_table会被下一步数据预处理的流程所读取,经过计算后导入到Doris中。 
数据预处理(DPP)
基本流程
- 从数据源读取数据,上游数据源可以是HDFS文件,也可以是Hive表。 
- 对读取到的数据进行字段映射,表达式计算以及根据分区信息生成分桶字段 - bucket_id。
- 根据Doris表的rollup元数据生成RollupTree。 
- 遍历RollupTree,进行分层的聚合操作,下一个层级的rollup可以由上一个层的rollup计算得来。 
- 每次完成聚合计算后,会对数据根据 - bucket_id进行分桶然后写入HDFS中。
- 后续broker会拉取HDFS中的文件然后导入Doris BE中。 
Hive Bitmap UDF
Spark支持将Hive生成的Bitmap数据直接导入到Doris。
配置ETL集群
配置集群
Spark作为一种外部计算资源在Doris中用来完成ETL工作,引入Resource Management来管理Doris使用的这些外部资源。
提交Spark导入任务之前,需要配置执行ETL任务的Spark集群,参数配置如下所示,详情参见下面的创建资源配置。
-- create spark resource
CREATE EXTERNAL RESOURCE resource_name
PROPERTIES
(
  type = spark,
  spark_conf_key = spark_conf_value,
  working_dir = path,
  broker = broker_name,
  broker.property_key = property_value,
  broker.hadoop.security.authentication = kerberos,
  broker.kerberos_principal = doris@YOUR.COM,
  broker.kerberos_keytab = /home/doris/my.keytab
  broker.kerberos_keytab_content = ASDOWHDLAWI********ALDJSDIWALD
)
-- drop spark resource
DROP RESOURCE resource_name
-- show resources
SHOW RESOURCES
SHOW PROC "/resources"
-- privileges
GRANT USAGE_PRIV ON RESOURCE resource_name TO user_identity
GRANT USAGE_PRIV ON RESOURCE resource_name TO ROLE role_name
REVOKE USAGE_PRIV ON RESOURCE resource_name FROM user_identity
REVOKE USAGE_PRIV ON RESOURCE resource_name FROM ROLE role_name创建资源配置
resource_name为Doris中配置的Spark资源的名字。
PROPERTIES是Spark资源相关参数,参数如下:
- type:资源类型,必填,目前仅支持Spark。 
- Spark相关参数如下: - spark.master:必填,参数值为yarn或 - spark://host:port。
- spark.submit.deployMode:Spark程序的部署模式,必填,支持Cluster,Client两种模式。 
- spark.hadoop.yarn.resourcemanager.address:参数spark.master为yarn时,该参数必填。 
- spark.hadoop.fs.defaultFS:参数spark.master为yarn时,该参数必填。 
- 其他参数为可选,详情请参见Spark Configuration。 
 
- working_dir: ETL使用的目录。Spark作为ETL资源使用时必填。例如 - hdfs://host:port/tmp/doris。
- broker.hadoop.security.authentication:指定认证方式为kerberos。 
- broker.kerberos_principal:指定kerberos的principal。 
- broker.kerberos_keytab:指定kerberos的keytab文件路径。该文件必须为Broker进程所在服务器上的文件的绝对路径。并且可以被Broker进程访问。 
- broker.kerberos_keytab_content:指定kerberos中keytab文件内容经过base64编码之后的内容。该参数和broker.kerberos_keytab配置二选一即可。 
- broker: broker名字。Spark作为ETL资源使用时必填。需要使用 - ALTER SYSTEM ADD BROKER命令提前完成配置。
- broker.property_key:broker读取ETL生成的中间文件时需要指定的认证信息等。 
参数示例。
- YARN Cluster模式 - CREATE EXTERNAL RESOURCE "spark0" PROPERTIES ( "type" = "spark", "spark.master" = "yarn", "spark.submit.deployMode" = "cluster", "spark.jars" = "xxx.jar,yyy.jar", "spark.files" = "/tmp/aaa,/tmp/bbb", "spark.executor.memory" = "1g", "spark.yarn.queue" = "queue0", "spark.hadoop.yarn.resourcemanager.address" = "127.0.0.1:9999", "spark.hadoop.fs.defaultFS" = "hdfs://127.0.0.1:10000", "working_dir" = "hdfs://127.0.0.1:10000/tmp/doris", "broker" = "broker0", "broker.username" = "user0", "broker.password" = "password0" );
- Spark Standalone Client模式 - CREATE EXTERNAL RESOURCE "spark1" PROPERTIES ( "type" = "spark", "spark.master" = "spark://127.0.0.1:7777", "spark.submit.deployMode" = "client", "working_dir" = "hdfs://127.0.0.1:10000/tmp/doris", "broker" = "broker1" );
Spark Load支持Kerberos认证
如果是Spark Load访问带有Kerberos认证的Hadoop集群资源,您只需要在创建Spark resource的时候指定以下参数即可:
- broker.hadoop.security.authentication:指定认证方式为kerberos。 
- broker.kerberos_principal:指定kerberos的principal。 
- broker.kerberos_keytab:指定kerberos的keytab文件路径。该文件必须为Broker进程所在服务器上的文件的绝对路径。并且可以被Broker进程访问。 
- broker.kerberos_keytab_content:指定kerberos中keytab文件内容经过base64编码之后的内容。该参数和kerberos_keytab配置二选一即可。 
参数示例
CREATE EXTERNAL RESOURCE "spark_on_kerberos"
PROPERTIES
(
  "type" = "spark",
  "spark.master" = "yarn",
  "spark.submit.deployMode" = "cluster",
  "spark.jars" = "xxx.jar,yyy.jar",
  "spark.files" = "/tmp/aaa,/tmp/bbb",
  "spark.executor.memory" = "1g",
  "spark.yarn.queue" = "queue0",
  "spark.hadoop.yarn.resourcemanager.address" = "127.0.0.1:9999",
  "spark.hadoop.fs.defaultFS" = "hdfs://127.0.0.1:10000",
  "working_dir" = "hdfs://127.0.0.1:10000/tmp/doris",
  "broker" = "broker0",
  "broker.hadoop.security.authentication" = "kerberos",
  "broker.kerberos_principal" = "doris@YOUR.COM",
  "broker.kerberos_keytab" = "/home/doris/my.keytab"
);查看资源
- 普通账户只能看到自己有USAGE_PRIV使用权限的资源。 
- root和admin账户可以看到所有的资源。 
资源权限
资源权限通过GRANT REVOKE来管理,目前仅支持USAGE_PRIV使用权限。
可以将USAGE_PRIV权限赋予某个账户或者某个角色,角色的使用与之前一致。
-- 授予spark0资源的使用权限给账户user0
GRANT USAGE_PRIV ON RESOURCE "spark0" TO "user0"@"%";
-- 授予spark0资源的使用权限给角色role0
GRANT USAGE_PRIV ON RESOURCE "spark0" TO ROLE "role0";
-- 授予所有资源的使用权限给账户user0
GRANT USAGE_PRIV ON RESOURCE * TO "user0"@"%";
-- 授予所有资源的使用权限给角色role0
GRANT USAGE_PRIV ON RESOURCE * TO ROLE "role0";
-- 撤销账户user0的spark0资源使用权限
REVOKE USAGE_PRIV ON RESOURCE "spark0" FROM "user0"@"%";配置SPARK客户端
FE底层通过执行spark-submit命令去提交Spark任务,因此需要为FE配置Spark客户端,建议使用2.4.5或以上的Spark2官方版本,您可以通过Spark下载地址进行下载,下载完成后,请按以下步骤完成配置。
- 配置SPARK_HOME环境变量 - 将spark客户端放在FE同一台机器上的目录下,并在FE的配置文件配置spark_home_default_dir项指向此目录,此配置项默认为FE根目录下的lib/spark2x路径,此项不可为空。 
- 配置SPARK依赖包 - 将Spark客户端下的 - jars文件夹内所有JAR包归档打包成一个ZIP文件,并在FE的配置文件配置- spark_resource_path项指向此ZIP文件,若此配置项为空,则FE会尝试寻找FE根目录下的lib/spark2x/jars/spark-2x.zip文件,若没有找到则会报文件不存在的错误。- 当提交Spark Load任务时,会将归档好的依赖文件上传至远端仓库,默认仓库路径挂在 - working_dir/{cluster_id}目录下,并以- __spark_repository__{resource_name}命名,表示集群内的一个resource对应一个远端仓库,远端仓库目录结构参考如下:- __spark_repository__spark0/ |-__archive_1.0.0/ | |-__lib_990325d2c0d1d5e45bf675e54e44fb16_spark-dpp-1.0.0-jar-with-dependencies.jar | |-__lib_7670c29daf535efe3c9b923f778f61fc_spark-2x.zip |-__archive_1.1.0/ | |-__lib_64d5696f99c379af2bee28c1c84271d5_spark-dpp-1.1.0-jar-with-dependencies.jar | |-__lib_1bbb74bb6b264a270bc7fca3e964160f_spark-2x.zip |-__archive_1.2.0/ | |-...说明- 除了Spark依赖(默认以spark-2x.zip命名),FE还会上传DPP的依赖包至远端仓库,若此次Spark Load提交的所有依赖文件都已存在远端仓库,那么就不需要再上传依赖,节省了每次重复上传大量文件的时间。 
配置YARN客户端
- FE底层通过执行 - yarn命令去获取正在运行的Application的状态以及终止Application,因此需要为FE配置YARN客户端,建议使用hadoop-2.5.2或hadoop-2.0以上的官方版本,下载详情请参见hadoop下载地址。
- 将下载好的YARN客户端放在FE同一台机器的目录下,并在FE配置文件配置yarn_client_path项指向yarn的二进制可执行文件,默认为FE根目录下的lib/yarn-client/hadoop/bin/yarn路径。 
- (可选操作)当FE通过yarn客户端去获取Application的状态或者终止Application时,默认会在FE根目录下的lib/yarn-config路径下生成执行yarn命令所需的配置文件,此路径可通过在FE配置文件配置yarn_config_dir项修改,目前生成的配置文件包括core-site.xml和yarn-site.xml。 
创建导入
主要介绍Spark Load的创建导入语法中参数意义和注意事项。
创建导入的详细语法执行HELP SPARK LOAD查看语法帮助。
- Spark Load导入语法 - LOAD LABEL load_label (data_desc, ...) WITH RESOURCE resource_name [resource_properties] [PROPERTIES (key1=value1, ... )] * load_label: db_name.label_name * data_desc: DATA INFILE ('file_path', ...) [NEGATIVE] INTO TABLE tbl_name [PARTITION (p1, p2)] [COLUMNS TERMINATED BY separator ] [(col1, ...)] [COLUMNS FROM PATH AS (col2, ...)] [SET (k1=f1(xx), k2=f2(xx))] [WHERE predicate] DATA FROM TABLE hive_external_tbl [NEGATIVE] INTO TABLE tbl_name [PARTITION (p1, p2)] [SET (k1=f1(xx), k2=f2(xx))] [WHERE predicate] * resource_properties: (key2=value2, ...)
- 示例一:上游数据源为HDFS文件的情况 - LOAD LABEL db1.label1 ( DATA INFILE("hdfs://abc.com:8888/user/palo/test/ml/file1") INTO TABLE tbl1 COLUMNS TERMINATED BY "," (tmp_c1,tmp_c2) SET ( id=tmp_c2, name=tmp_c1 ), DATA INFILE("hdfs://abc.com:8888/user/palo/test/ml/file2") INTO TABLE tbl2 COLUMNS TERMINATED BY "," (col1, col2) where col1 > 1 ) WITH RESOURCE 'spark0' ( "spark.executor.memory" = "2g", "spark.shuffle.compress" = "true" ) PROPERTIES ( "timeout" = "3600" );
- 示例二:上游数据源是Hive表的情况 - 新建Hive外部表。 - CREATE EXTERNAL TABLE hive_t1 ( k1 INT, K2 SMALLINT, k3 varchar(50), uuid varchar(100) ) ENGINE=hive properties ( "database" = "tmp", "table" = "t1", "hive.metastore.uris" = "thrift://0.0.0.0:8080" );
- 提交Load命令,要求导入的Doris表中的列必须在Hive外部表中存在。 - LOAD LABEL db1.label1 ( DATA FROM TABLE hive_t1 INTO TABLE tbl1 SET ( uuid=bitmap_dict(uuid) ) ) WITH RESOURCE 'spark0' ( "spark.executor.memory" = "2g", "spark.shuffle.compress" = "true" ) PROPERTIES ( "timeout" = "3600" );
 
- 示例三:上游数据源是Hive Binary类型情况 - 新建Hive外部表。 - CREATE EXTERNAL TABLE hive_t1 ( k1 INT, K2 SMALLINT, k3 varchar(50), uuid varchar(100) //hive中的类型为binary ) ENGINE=hive properties ( "database" = "tmp", "table" = "t1", "hive.metastore.uris" = "thrift://0.0.0.0:8080" );
- 提交Load命令,要求导入的Doris表中的列必须在Hive外部表中存在。 - LOAD LABEL db1.label1 ( DATA FROM TABLE hive_t1 INTO TABLE tbl1 SET ( uuid=binary_bitmap(uuid) ) ) WITH RESOURCE 'spark0' ( "spark.executor.memory" = "2g", "spark.shuffle.compress" = "true" ) PROPERTIES ( "timeout" = "3600" );
 
- 示例四:导入Hive分区表的数据 - Hive建表语句 - create table test_partition( id int, name string, age int ) partitioned by (dt string) row format delimited fields terminated by ',' stored as textfile;
- Doris建表语句 - CREATE TABLE IF NOT EXISTS test_partition_04 ( dt date, id int, name string, age int ) UNIQUE KEY(`dt`, `id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1 PROPERTIES ( "replication_allocation" = "tag.location.default: 1" );
- Spark Load语句 - CREATE EXTERNAL RESOURCE "spark_resource" PROPERTIES ( "type" = "spark", "spark.master" = "yarn", "spark.submit.deployMode" = "cluster", "spark.executor.memory" = "1g", "spark.yarn.queue" = "default", "spark.hadoop.yarn.resourcemanager.address" = "localhost:50056", "spark.hadoop.fs.defaultFS" = "hdfs://localhost:9000", "working_dir" = "hdfs://localhost:9000/tmp/doris", "broker" = "broker_01" ); LOAD LABEL demo.test_hive_partition_table_18 ( DATA INFILE("hdfs://localhost:9000/user/hive/warehouse/demo.db/test/dt=2022-08-01/*") INTO TABLE test_partition_04 COLUMNS TERMINATED BY "," FORMAT AS "csv" (id,name,age) COLUMNS FROM PATH AS (`dt`) SET ( dt=dt, id=id, name=name, age=age ) ) WITH RESOURCE 'spark_resource' ( "spark.executor.memory" = "1g", "spark.shuffle.compress" = "true" ) PROPERTIES ( "timeout" = "3600" );
 
参数和相关导入说明
- 数据描述类参数 - 目前支持的数据源有CSV和Hive table。其他规则与Broker Load一致。 
- 导入作业参数 - 导入作业参数主要指的是Spark Load创建导入语句中的属于opt_properties部分的参数。导入作业参数是作用于整个导入作业的。规则与Broker Load一致。 
- Spark资源参数 - Spark资源需要提前配置到Doris系统中并且赋予您USAGE_PRIV权限后才能使用Spark Load。 - 当您有临时性的需求,比如增加任务使用的资源而修改Spark configs,可参考如下示例进行设置。 - WITH RESOURCE 'spark0' ( "spark.driver.memory" = "1g", "spark.executor.memory" = "3g" )说明- 设置仅对本次任务生效,并不影响Doris集群中已有的配置。 
- Hive表作为数据源导入 - 目前如果期望在导入流程中将Hive表作为数据源,那么需要先新建一张类型为Hive的外部表,然后提交导入命令时指定外部表的表名即可。 
- 导入流程构建全局字典 - 适用于Doris表聚合列的数据类型为Bitmap类型。在 - load命令中指定需要构建全局字典的字段即可,格式为:Doris字段名称=bitmap_dict(Hive表字段名称)。重要- 目前只有在上游数据源为Hive表时才支持全局字典的构建。 
- Hive Binary(bitmap)类型列的导入 - 适用于Doris表聚合列的数据类型为Bitmap类型,且数据源Hive表中对应列的数据类型为binary(通过FE中spark-dpp中的 - org.apache.doris.load.loadv2.dpp.BitmapValue类序列化)类型。- 全局字典,在 - load命令中指定相应字段即可,格式为:Doris字段名称=binary_bitmap(Hive表字段名称)。重要- 目前只有在上游数据源为Hive表时才支持Binary(Bitmap)类型的数据导入。 
查看导入
Spark Load导入方式同Broker Load一样都是异步的,所以您必须将创建导入的Label记录,并且在查看导入命令中使用Label来查看导入结果。查看导入命令在所有导入方式中是通用的,具体语法可执行HELP SHOW LOAD查看,查看导入示例如下。
show load order by createtime desc limit 1\G返回如下信息。
*************************** 1. row ***************************
         JobId: 76391
         Label: label1
         State: FINISHED
      Progress: ETL:100%; LOAD:100%
          Type: SPARK
       EtlInfo: unselected.rows=4; dpp.abnorm.ALL=15; dpp.norm.ALL=28133376
      TaskInfo: cluster:cluster0; timeout(s):10800; max_filter_ratio:5.0E-5
      ErrorMsg: N/A
    CreateTime: 2019-07-27 11:46:42
  EtlStartTime: 2019-07-27 11:46:44
 EtlFinishTime: 2019-07-27 11:49:44
 LoadStartTime: 2019-07-27 11:49:44
LoadFinishTime: 2019-07-27 11:50:16
           URL: http://1.1.*.*:80**/proxy/application_15866****3848_0035/
    JobDetails: {"ScannedRows":28133395,"TaskNumber":1,"FileNumber":1,"FileSize":200000}返回结果集中参数意义可以参见Broker Load。不同点如下:
- State导入任务当前所处的阶段。任务提交之后状态为PENDING,提交Spark ETL之后状态变为ETL,ETL完成之后FE调度BE执行push操作状态变为LOADING,push完成并且版本生效后状态变为FINISHED。 - 导入任务的最终阶段有两个:CANCELLED和FINISHED。当Load job处于这两个阶段时导入完成。其中CANCELLED为导入失败,FINISHED为导入成功。 
- Progress导入任务的进度描述。分为两种进度:ETL和LOAD,对应了导入流程的两个阶段ETL和LOADING。 - LOAD进度=当前已完成所有replica导入的tablet个数/本次导入任务的总tablet个数*100%- LOAD的进度范围为0~100%。如果所有导入表均完成导入,此时LOAD的进度为99%,表示导入进入到最后生效阶段,整个导入完成后,LOAD的进度才会改为100%。 说明- 导入进度并不是线性的,所以如果一段时间内进度没有变化,并不代表导入没有在执行。 
- Type导入任务的类型。Spark Load为SPARK。 
- 以下参数值含义: - CreateTime:导入创建的时间。 
- EtlStartTime:ETL阶段开始的时间。 
- EtlFinishTime:ETL阶段完成的时间。 
- LoadStartTime:LOADING阶段开始的时间。 
- LoadFinishTime:整个导入任务完成的时间。 
 
- JobDetails显示一些作业的详细运行状态,ETL结束的时候更新。包括导入文件的个数、总大小(字节)、子任务个数、已处理的原始行数等,如 - {"ScannedRows":139264,"TaskNumber":1,"FileNumber":1,"FileSize":940754064}。
- URL可复制输入到浏览器,跳转至相应Application的Web界面。 
查看Spark launcher提交日志
Spark任务提交过程中产生的详细日志,默认保存在FE根目录下log/spark_launcher_log路径下,并以spark_launcher_{load_job_id}_{label}.log命名。
日志会在此目录下保存一段时间,当FE元数据中的导入信息被清理时,相应的日志也会被清理,默认保存时间为3天。
取消导入
当Spark Load作业状态不为CANCELLED或FINISHED时,您可以手动取消。取消时需要指定待取消导入任务的Label。取消导入命令语法可执行HELP CANCEL LOAD查看。
相关系统配置
下面配置属于Spark Load的系统级别配置,即作用于所有Spark Load导入任务的配置。主要通过修改fe.conf来调整配置值。
- enable_spark_load:开启Spark Load和创建Resource功能。默认为False,关闭此功能。 
- spark_load_default_timeout_second:任务默认超时时间为259200秒(3天)。 
- spark_home_default_dirspark:客户端路径 - fe/lib/spark2x。
- spark_resource_path:打包好的Spark依赖文件路径(默认为空)。 
- spark_launcher_log_dirspark:客户端的提交日志存放的目录 - fe/log/spark_launcher_log。
- yarn_client_pathyarn:二进制可执行文件路径 - fe/lib/yarn-client/hadoop/bin/yarn。
- yarn_config_diryarn:配置文件生成路径 - fe/lib/yarn-config。