文档

Broker Load

更新时间:

Broker Load是一个异步的导入方式,支持的数据源取决于Broker进程支持的数据源。本文为您介绍Broker Load导入的基本原理、基本操作、系统配置以及最佳实践。

背景信息

因为Doris表里的数据是有序的,所以Broker Load在导入数据时需要利用Doris集群资源对数据进行排序,相对于Spark Load来完成海量历史数据迁移,Broker Load对Doris集群资源占用较大。Broker Load方式是在没有Spark计算资源的情况下使用,如果有Spark计算资源建议使用Spark Load

适用场景

  • 源数据在Broker可以访问的存储系统中,例如HDFS。

  • 数据量在几十到百GB级别。

基本原理

提交导入任务后,FE会生成对应的Plan并根据目前BE的个数和文件的大小,将Plan分给多个BE执行,每个BE执行一部分导入数据。BE在执行的过程中会从Broker拉取数据,在对数据transform之后将数据导入系统。所有BE均完成导入,由FE最终决定导入是否成功。

+
                 | 1. user create broker load
                 v
            +----+----+
            |         |
            |   FE    |
            |         |
            +----+----+
                 |
                 | 2. BE etl and load the data
    +--------------------------+
    |            |             |
+---v---+     +--v----+    +---v---+
|       |     |       |    |       |
|  BE   |     |  BE   |    |   BE  |
|       |     |       |    |       |
+---+-^-+     +---+-^-+    +--+-^--+
    | |           | |         | |
    | |           | |         | | 3. pull data from broker
+---v-+-+     +---v-+-+    +--v-+--+
|       |     |       |    |       |
|Broker |     |Broker |    |Broker |
|       |     |       |    |       |
+---+-^-+     +---+-^-+    +---+-^-+
    | |           | |          | |
+---v-+-----------v-+----------v-+-+
|       HDFS/BOS/AFS cluster       |
|                                  |
+----------------------------------+

开始导入

Hive分区表的数据导入

  1. 创建Hive表。

    ##数据格式是:默认,分区字段是:day
    CREATE TABLE `ods_demo_detail`(
      `id` string,
      `store_id` string,
      `company_id` string,
      `tower_id` string,
      `commodity_id` string,
      `commodity_name` string,
      `commodity_price` double,
      `member_price` double,
      `cost_price` double,
      `unit` string,
      `quantity` double,
      `actual_price` double
    )
    PARTITIONED BY (day string)
    row format delimited fields terminated by ','
    lines terminated by '\n'
  2. 使用Hive的Load命令将您的数据导入到Hive表中。

    load data local inpath '/opt/custorm' into table ods_demo_detail;
  3. 创建Doris表。

    CREATE TABLE `doris_ods_test_detail` (
      `rq` date NULL,
      `id` varchar(32) NOT NULL,
      `store_id` varchar(32) NULL,
      `company_id` varchar(32) NULL,
      `tower_id` varchar(32) NULL,
      `commodity_id` varchar(32) NULL,
      `commodity_name` varchar(500) NULL,
      `commodity_price` decimal(10, 2) NULL,
      `member_price` decimal(10, 2) NULL,
      `cost_price` decimal(10, 2) NULL,
      `unit` varchar(50) NULL,
      `quantity` int(11) NULL,
      `actual_price` decimal(10, 2) NULL
    ) ENGINE=OLAP
    UNIQUE KEY(`rq`, `id`, `store_id`)
    PARTITION BY RANGE(`rq`)
    (
    PARTITION P_202204 VALUES [('2022-04-01'), ('2022-05-01')))
    DISTRIBUTED BY HASH(`store_id`) BUCKETS 1
    PROPERTIES (
    "replication_allocation" = "tag.location.default: 3",
    "dynamic_partition.enable" = "true",
    "dynamic_partition.time_unit" = "MONTH",
    "dynamic_partition.start" = "-2147483648",
    "dynamic_partition.end" = "2",
    "dynamic_partition.prefix" = "P_",
    "dynamic_partition.buckets" = "1",
    "in_memory" = "false",
    "storage_format" = "V2"
    );
  4. 开始导入数据。

    LOAD LABEL broker_load_2022_03_23
    (
        DATA INFILE("hdfs://192.168.**.**:8020/user/hive/warehouse/ods.db/ods_demo_detail/*/*")
        INTO TABLE doris_ods_test_detail
        COLUMNS TERMINATED BY ","
      (id,store_id,company_id,tower_id,commodity_id,commodity_name,commodity_price,member_price,cost_price,unit,quantity,actual_price)
        COLUMNS FROM PATH AS (`day`)
       SET
       (rq = str_to_date(`day`,'%Y-%m-%d'),id=id,store_id=store_id,company_id=company_id,tower_id=tower_id,commodity_id=commodity_id,commodity_name=commodity_name,commodity_price=commodity_price,member_price=member_price,cost_price=cost_price,unit=unit,quantity=quantity,actual_price=actual_price)
        )
    WITH BROKER "broker_name_1"
        (
          "username" = "hdfs",
          "password" = ""
        )
    PROPERTIES
    (
        "timeout"="1200",
        "max_filter_ratio"="0.1"
    );

Hive分区表导入(ORC格式)

  1. 创建ORC格式的Hive分区表。

    #数据格式:ORC 分区:day
    CREATE TABLE `ods_demo_orc_detail`(
      `id` string,
      `store_id` string,
      `company_id` string,
      `tower_id` string,
      `commodity_id` string,
      `commodity_name` string,
      `commodity_price` double,
      `member_price` double,
      `cost_price` double,
      `unit` string,
      `quantity` double,
      `actual_price` double
    )
    PARTITIONED BY (day string)
    row format delimited fields terminated by ','
    lines terminated by '\n'
    STORED AS ORC
  2. 创建Doris表。

    CREATE TABLE `doris_ods_test_detail` (
      `rq` date NULL,
      `id` varchar(32) NOT NULL,
      `store_id` varchar(32) NULL,
      `company_id` varchar(32) NULL,
      `tower_id` varchar(32) NULL,
      `commodity_id` varchar(32) NULL,
      `commodity_name` varchar(500) NULL,
      `commodity_price` decimal(10, 2) NULL,
      `member_price` decimal(10, 2) NULL,
      `cost_price` decimal(10, 2) NULL,
      `unit` varchar(50) NULL,
      `quantity` int(11) NULL,
      `actual_price` decimal(10, 2) NULL
    ) ENGINE=OLAP
    UNIQUE KEY(`rq`, `id`, `store_id`)
    PARTITION BY RANGE(`rq`)
    (
    PARTITION P_202204 VALUES [('2022-04-01'), ('2022-05-01')))
    DISTRIBUTED BY HASH(`store_id`) BUCKETS 1
    PROPERTIES (
    "replication_allocation" = "tag.location.default: 3",
    "dynamic_partition.enable" = "true",
    "dynamic_partition.time_unit" = "MONTH",
    "dynamic_partition.start" = "-2147483648",
    "dynamic_partition.end" = "2",
    "dynamic_partition.prefix" = "P_",
    "dynamic_partition.buckets" = "1",
    "in_memory" = "false",
    "storage_format" = "V2"
    );
  3. 使用Broker Load导入数据。

    LOAD LABEL dish_2022_03_23
    (
        DATA INFILE("hdfs://10.220.**.**:8020/user/hive/warehouse/ods.db/ods_demo_orc_detail/*/*")
        INTO TABLE doris_ods_test_detail
        COLUMNS TERMINATED BY ","
        FORMAT AS "orc"
    (id,store_id,company_id,tower_id,commodity_id,commodity_name,commodity_price,member_price,cost_price,unit,quantity,actual_price)
        COLUMNS FROM PATH AS (`day`)
       SET
       (rq = str_to_date(`day`,'%Y-%m-%d'),id=id,store_id=store_id,company_id=company_id,tower_id=tower_id,commodity_id=commodity_id,commodity_name=commodity_name,commodity_price=commodity_price,member_price=member_price,cost_price=cost_price,unit=unit,quantity=quantity,actual_price=actual_price)
        )
    WITH BROKER "broker_name_1"
        (
          "username" = "hdfs",
          "password" = ""
        )
    PROPERTIES
    (
        "timeout"="1200",
        "max_filter_ratio"="0.1"
    );

    其中,涉及参数:

    • FORMAT AS "orc" : 指定了要导入的数据格式。

    • SET : 定义了Hive表和Doris表之间的字段映射关系及字段转换的一些操作。

HDFS文件系统数据导入

以上面创建好的Doris表为例,通过Broker Load从HDFS上导入数据的语句如下所示。

LOAD LABEL demo.label_20220402
        (
            DATA INFILE("hdfs://10.220.**.**:8020/tmp/test_hdfs.txt")
            INTO TABLE `ods_dish_detail_test`
            COLUMNS TERMINATED BY "\t" (id,store_id,company_id,tower_id,commodity_id,commodity_name,commodity_price,member_price,cost_price,unit,quantity,actual_price)
        )
        with HDFS (
            "fs.defaultFS"="hdfs://10.220.**.**:8020",
            "hadoop.username"="root"
        )
        PROPERTIES
        (
            "timeout"="1200",
            "max_filter_ratio"="0.1"
        );

查看导入状态

您可以通过下面的命令查看上面导入任务的状态信息。

show load order by createtime desc limit 1\G;

返回信息如下所示。

*************************** 1. row ***************************
         JobId: 4132****
         Label: broker_load_2022_03_23
         State: FINISHED
      Progress: ETL:100%; LOAD:100%
          Type: BROKER
       EtlInfo: unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=27
      TaskInfo: cluster:N/A; timeout(s):1200; max_filter_ratio:0.1
      ErrorMsg: NULL
    CreateTime: 2022-04-01 18:59:06
  EtlStartTime: 2022-04-01 18:59:11
 EtlFinishTime: 2022-04-01 18:59:11
 LoadStartTime: 2022-04-01 18:59:11
LoadFinishTime: 2022-04-01 18:59:11
           URL: NULL
    JobDetails: {"Unfinished backends":{"5072bde59b74b65-8d2c0ee5b029****":[]},"ScannedRows":27,"TaskNumber":1,"All backends":{"5072bde59b74b65-8d2c0ee5b029****":[36728051]},"FileNumber":1,"FileSize":5540}
1 row in set (0.01 sec)

取消导入

当Broker Load作业状态不为CANCELLED或FINISHED时,您可以手动取消。取消时需要指定待取消导入任务的Label 。

例如:撤销数据库demo上Label为broker_load_2022_03_23的导入作业。

CANCEL LOAD FROM demo WHERE LABEL = "broker_load_2022_03_23";

相关系统配置

Broker参数

Broker Load需要借助Broker 程访问远端存储,不同的Broker需要提供不同的参数 。

FE配置

以下配置属于Broker Load的系统级别配置,即会作用于所有Broker Load导入任务的配置。主要通过修改fe.conf来调整配置值。

  • min_bytes_per_broker_scanner:限制了单个BE处理的数据量的最小值。

  • max_bytes_per_broker_scanner:限制了单个BE处理的数据量的最大值。

  • max_broker_concurrency:限制了一个作业的最大的导入并发数。

最小处理的数据量、最大并发数、源文件大小和当前集群BE的个数共同决定了本次导入的并发数。

本次导入并发数 = Math.min(源文件大小/最小处理量,最大并发数,当前BE节点个数)
本次导入单个BE的处理量 = 源文件大小/本次导入的并发数

通常一个导入作业支持的最大数据量为max_bytes_per_broker_scanner * BE节点数。如果需要导入更大数据量,则需要适当调整max_bytes_per_broker_scanner参数的大小。

默认参数值如下:

  • min_bytes_per_broker_scanner:默认64 MB,单位bytes。

  • max_bytes_per_broker_scanner:默认3 GB,单位bytes。

  • max_broker_concurrency:默认10。

最佳实践

应用场景

使用Broker Load最适合的场景就是原始数据在文件系统(HDFS、BOS、AFS)中的场景。其次,由于Broker Load是单次导入中唯一的一种异步导入的方式,所以您想使用异步方式导入大文件时,可以考虑使用Broker Load。

数据量

以下内容是针对单个BE的情况,如果您集群有多个BE,则下面标题中的数据量应该乘以BE个数来计算。例如,您的集群有3个BE,则3 GB以下(包含)则应该乘以3,也就是9 GB以下(包含)。

  • 3 GB以下(包含):您可以直接提交Broker Load创建导入请求。

  • 3 GB以上:由于单个导入BE最大的处理量为3 GB,超过3 GB的待导入文件就需要通过调整Broker Load的导入参数来实现大文件的导入。

    1. 根据当前BE的个数和原始文件的大小修改单个BE的最大扫描量和最大并发数。

      修改fe.conf中配置
      max_broker_concurrency = BE个数
      当前导入任务单个BE处理的数据量 = 原始文件大小 / max_broker_concurrency
      max_bytes_per_broker_scanner >= 当前导入任务单个BE处理的数据量
      
      例如,一个100 GB的文件,集群的BE个数为10个
      max_broker_concurrency = 10
      max_bytes_per_broker_scanner >= 10G = 100G / 10

      修改后,所有的BE会并发的处理导入任务,每个BE处理原始文件的一部分。

      说明

      上述两个FE中的配置均为系统配置,其修改是作用于所有的Broker Load任务。

    2. 创建导入时自定义当前导入任务的timeout时间。

      当前导入任务单个BE处理的数据量 / 用户Doris集群最慢导入速度(MB/s) >= 当前导入任务的timeout时间 >= 当前导入任务单个BE处理的数据量 / 10M/s
      
      例如,一个100 GB的文件,集群的BE个数为10个,则timeout时间如下所示。
      timeout >= 1000s = 10G / 10M/s
    3. 当您发现第二步计算出的timeout时间超过系统默认的导入最大超时时间(4小时),此时不推荐您将导入最大超时时间直接改大来解决问题。单个导入时间如果超过默认的导入最大超时时间(4小时),最好通过切分待导入文件并且分多次导入来解决问题。主要原因是单次导入超过4小时的话,导入失败后重试的时间成本很高。您可以通过如下公式计算出Doris集群期望最大导入文件数据量。

      期望最大导入文件数据量 = 14400s * 10M/s * BE个数
      例如,集群的BE个数为10个
      期望最大导入文件数据量 = 14400s * 10M/s * 10 = 1440000M ≈ 1440G
      重要

      通常您的环境可能达不到10M/s的速度,所以建议超过500 GB的文件都进行文件切分,然后再导入。

作业调度

系统会限制一个集群内,正在运行的Broker Load作业数量,以防止同时运行过多的Load作业。

desired_max_waiting_jobs:FE的配置参数,会限制一个集群内未开始或正在运行(作业状态为PENDING或LOADING)的Broker Load作业数量。默认为100。如果超过该阈值,新提交的作业将会被直接拒绝。

一个Broker Load作业会被分为pending task和loading task阶段。其中pending task负责获取导入文件的信息,而loading task会发送给BE执行具体的导入任务。

  • async_pending_load_task_pool_size:FE的配置参数,用于限制同时运行的pending task的任务数量。也相当于控制了实际正在运行的导入任务数量。该参数默认为10。例如,您提交了100个Load作业,同时只会有10个作业会进入LOADING状态(开始执行),而其他作业处于PENDING状态(等待)。

  • async_loading_load_task_pool_size:FE的配置参数,用于限制同时运行的loading task的任务数量。一个Broker Load作业会有1个pending task和多个loading task (等于LOAD语句中DATA INFILE子句的个数),所以async_loading_load_task_pool_size应该大于等于async_pending_load_task_pool_size