Broker Load

在Broker Load模式下,通过部署的Broker程序,StarRocks可读取对应数据源(例如,Apache HDFS,阿里云OSS)上的数据,利用自身的计算资源对数据进行预处理和导入。本文为您介绍Broker Load导入的使用示例以及常见问题。

背景信息

Broker Load是一种异步的导入方式。StarRocks支持从外部存储系统导入数据,支持CSV、ORCFile和Parquet等文件格式,建议单次导入数据量在几十GB到上百GB级别。

Broker Load导入

查看Broker

阿里云EMR StarRocks实例在创建时已经自动搭建并启动Broker服务,Broker服务位于每个Core节点上。使用以下SQL命令可以查看Broker。

SHOW PROC "/brokers";

创建导入任务

  • 语法

    LOAD LABEL [<database_name>.]<label_name>
    (
        data_desc[, data_desc ...]
    )
    WITH BROKER
    [broker_properties]
    [opt_properties]
  • 参数描述

    • Label

      导入任务的标识。每个导入任务都有一个唯一的Label,由系统自动生成或在导入命令中自定义。Label可以用来防止导入相同的数据,并查看对应导入任务的执行情况。当Label对应的导入任务状态为FINISHED时,对应的Label无法再次使用;当状态为CANCELLED时,可以再次使用该Label提交导入作业。

    • 数据描述类data_desc

      data_desc:
          DATA INFILE ('file_path', ...)
          [NEGATIVE]
          INTO TABLE tbl_name
          [PARTITION (p1, p2)]
          [COLUMNS TERMINATED BY column_separator ]
          [FORMAT AS file_type]
          [(col1, ...)]
          [COLUMNS FROM PATH AS (colx, ...)]
          [SET (k1=f1(xx), k2=f2(xx))]
          [WHERE predicate]

      相关参数描述如下表所示。

      参数

      描述

      file_path

      文件路径可以指定到文件,也可以用星号(*)通配符指定某个目录下的所有文件。中间的目录也可以使用通配符匹配。

      可以使用的通配符有? * [] {} ^,使用规则请参见FileSystem

      例如, 通过oss://bucket/data/tablename , 可以匹配tablename下所有分区内的所有文件。通过oss://bucket/data/tablename/dt=202301, 可以匹配tablename下1月分区的所有文件。

      negative

      设置数据取反导入。

      该功能适用的场景是当数据表中聚合列的类型均为SUM类型时,如果希望撤销某一批导入的数据,可以通过negative参数导入同一批数据,StarRocks会自动为这批数据在聚合列上数据取反,以达到消除同一批数据的功能。

      partition

      指定待导入表的Partition信息。

      如果待导入数据不属于指定的Partition,则不会被导入。同时,不指定Partition中的数据会被认为是“错误数据”。对于不想导入,也不想记录为“错误数据”的数据,可以使用where predicate来过滤。

      column_separator

      COLUMNS TERMINATED BY column_separator,用于指定导入文件中的列分隔符,默认为\t。

      如果是不可见字符,则需要加\x作为前缀,使用十六进制来表示分隔符。例如,Hive文件的分隔符为\x01,则列分隔符为\\x01。

      file_type

      FORMAT AS file_type,用于指定导入文件的类型。例如,parquetorccsv,默认值为csv

      parquet类型也可以通过文件后缀名.parquet或者.parq判断。

      COLUMNS FROM PATH AS

      提取文件路径中的分区字段。

      例如,导入文件为/path/col_name=col_value/dt=20210101/file1,其中col_name/dt为表中的列,则将col_value、20210101分别导入到col_name和dt对应的列的代码示例如下。

      (col1, col2)
      COLUMNS FROM PATH AS (col_name, dt)

      set column mapping

      SET (k1=f1(xx), k2=f2(xx)),data_desc中的SET语句负责设置列函数变换。

      如果原始数据的列和表中的列不一一对应,则需要使用该属性。

      where predicate

      WHERE predicate,data_desc中的WHERE语句负责过滤已经完成transform的数据。

      被过滤的数据不会进入容忍率的统计中。如果多个data_desc中声明了关于同一张表的多个条件,则会以AND语义合并这些条件。

    • 导入作业参数

      导入作业参数是指Broker Load创建导入语句中属于broker_properties部分的参数。导入作业参数是作用于整个导入作业的。

      broker_properties:
          (key2=value2, ...)

      部分参数描述如下表所示。

      参数

      描述

      timeout

      导入作业的超时时间(以秒为单位)。

      您可以在opt_properties中自行设置每个导入的超时时间。导入任务在设定的时限内未完成则会被系统取消,变为CANCELLED。Broker Load的默认导入超时时间为4小时。

      重要

      通常情况下,不需要您手动设置导入任务的超时时间。当在默认超时时间内无法完成导入时,可以手动设置任务的超时时间。

      推荐超时时间的计算方式为:超时时间 >((总文件大小 (MB)* 待导入的表及相关Roll up表的个数) / (30 * 导入并发数))

      公式中的30为目前BE导入的平均速度,表示30 MB/s。例如,如果待导入数据文件为1 GB,待导入表包含2个Rollup表,当前的导入并发数为3,则timeout的最小值为 (1 * 1024 * 3 ) / (10 * 3) = 102 秒。

      由于每个StarRocks集群的机器环境不同且集群并发的查询任务也不同,所以StarRocks集群的最慢导入速度需要您根据历史的导入任务速度进行推测。

      max_filter_ratio

      导入任务的最大容忍率,默认为0容忍,取值范围是0~1。当导入的错误率超过该值,则导入失败。如果您希望忽略错误的行,可以设置该参数值大于0,来保证导入可以成功。

      计算公式为:max_filter_ratio = (dpp.abnorm.ALL / (dpp.abnorm.ALL + dpp.norm.ALL ) )

      其中,dpp.abnorm.ALL表示数据质量不合格的行数,例如类型不匹配、列数不匹配和长度不匹配等。dpp.abnorm.ALL指的是导入过程中正确数据的条数,可以通过SHOW LOAD命令查询导入任务的正确数据量。

      原始文件的行数 = dpp.abnorm.ALL + dpp.norm.ALL

      load_mem_limit

      导入内存限制。默认值为0,表示不限制。

      strict_mode

      Broker Load导入可以开启Strict Mode模式。开启方式为properties ("strict_mode" = "true")

      默认关闭。

      Strict Mode模式是对于导入过程中的列类型转换进行严格过滤。严格过滤的策略为,对于列类型转换,如果Strict Mode为true,则错误的数据将被过滤掉。错误数据是指原始数据并不为空值,在参与列类型转换后结果为空值的数据。但以下场景除外:

      • 对于导入的某列由函数变换生成时,Strict Mode对其不产生影响。

      • 对于导入的某列类型包含范围限制的,如果原始数据能正常通过类型转换,但无法通过范围限制的,Strict Mode对其也不产生影响。例如,如果类型是decimal(1,0),原始数据为10,则属于可以通过类型转换但不在列声明的范围内,Strict Mode对其不产生影响。

查看导入任务状态

SHOW LOAD;

返回参数的描述如下表所示。

参数

描述

JobId

导入任务的唯一ID,每个导入任务的JobId都不同,由系统自动生成。与Label不同的是,JobId永远不会相同,而Label则可以在导入任务失败后被复用。

Label

导入任务的标识。

State

导入任务当前所处的阶段。

  • PENDING:表示当前导入任务正在等待被执行。

  • LOADING:表示正在执行中。

  • CANCELLED:表示导入失败。

  • FINISHED:表示导入成功。

Progress

导入任务的进度描述。分为ETL和LOAD两种进度,分别对应导入流程的ETL和LOADING两个阶段。目前Broker Load只有LOADING阶段,所以ETL固定显示为N/A,而LOAD的进度范围为0~100%。

LOAD的进度的计算公式为LOAD进度 = 当前完成导入的表个数 / 本次导入任务设计的总表个数 * 100%

如果所有导入表均完成导入,此时LOAD的进度为99%, 导入进入到最后生效阶段,待整个导入任务完成后,LOAD的进度才会改为100%。

重要

导入进度并不是线性的,所以如果一段时间内进度没有变化,并不代表导入任务没有执行。

Type

导入任务的类型。Broker Load的Type取值是BROKER。

EtlInfo

主要显示导入的数据量指标unselected.rowsdpp.norm.ALLdpp.abnorm.ALL

您可以根据unselected.rows的参数值判断where条件过滤了多少行,根据dpp.norm.ALLdpp.abnorm.ALL两个指标可以验证当前导入任务的错误率是否超过max-filter-ratio。三个指标之和就是原始数据量的总行数。

TaskInfo

主要显示当前导入任务参数,即创建Broker Load导入任务时您指定的参数,包括cluster,timeout和max-filter-ratio。

ErrorMsg

如果导入任务状态为CANCELLED,则显示失败的原因,包括type和msg两部分。如果导入任务成功则显示N/A。type的取值意义如下:

  • USER-CANCEL:取消的任务。

  • ETL-RUN-FAIL:在ETL阶段失败的导入任务。

  • ETL-QUALITY-UNSATISFIED:数据质量不合格,即错误数据率超过了max-filter-ratio。

  • LOAD-RUN-FAIL:在LOADING阶段失败的导入任务。

  • TIMEOUT:没在超时时间内完成的导入任务。

  • UNKNOWN:未知的导入错误。

CreateTime

分别表示导入创建的时间、ETL阶段开始的时间、ETL阶段完成的时间、LOADING阶段开始的时间和整个导入任务完成的时间。

  • 由于Broker Load导入没有ETL阶段,所以EtlStartTimeEtlFinishTimeLoadStartTime被设置为同一个值。

  • 如果导入任务长时间停留在CreateTime,而LoadStartTime为N/A ,则说明目前导入任务堆积严重,您可以减少导入提交的频率。

    LoadFinishTime - CreateTime = 整个导入任务所消耗时间
    
    LoadFinishTime - LoadStartTime = 整个Broker load导入任务执行时间 = 整个导入任务所消耗时间 - 导入任务等待的时间

EtlStartTime

EtlFinishTime

LoadStartTime

LoadFinishTime

URL

导入任务的错误数据样例,访问URL地址即可获取本次导入的错误数据样例。当本次导入不存在错误数据时,URL字段为N/A。

JobDetails

显示作业的详细运行状态。包括导入文件的个数、总大小(字节)、子任务个数、已处理的原始行数,运行子任务的BE节点ID,以及未完成的BE节点ID。

{"Unfinished backends":{"9c3441027ff948a0-8287923329a2****":[10002]},"ScannedRows":2390016,"TaskNumber":1,"All backends":{"9c3441027ff948a0-8287923329a2****":[10002]},"FileNumber":1,"FileSize":1073741824}

其中已处理的原始行数,每5秒更新一次。该行数仅用于展示当前的进度,不代表最终实际的处理行数。实际处理行数以EtlInfo中显示的数据为准。

取消导入任务

当Broker Load作业状态不为CANCELLED或FINISHED时,可以手动取消。取消时需要指定待取消导入任务的Label。

导入任务并发度

一个作业可以拆成一个或者多个任务,任务之间并行执行。拆分由LOAD语句中的DataDescription来决定。例如:

  • 多个DataDescription对应导入多个不同的表,每个会拆成一个任务。

  • 多个DataDescription对应导入同一个表的不同分区,每个也会拆成一个任务。

每个任务还会拆分成一个或者多个实例,然后将这些实例平均分配到BE上并行执行。实例的拆分由以下FE配置决定:

  • min_bytes_per_broker_scanner:单个实例处理的最小数据量,默认值为64 MB。

  • max_broker_concurrency:单个任务最大并发实例数,默认值为100。

  • load_parallel_instance_num:单个BE上并发实例数,默认值为1个。

实例总数的计算公式为实例的总数 = min(导入文件总大小/单个实例处理的最小数据量,单个任务最大并发实例数,单个BE上并发实例数 * BE数)

通常情况下,一个作业只有一个DataDescription,只会拆分成一个任务。任务会拆成与BE数相等的实例,然后分配到所有BE上并行执行。

导入示例

说明

以下示例中的参数请根据实际情况替换。

阿里云OSS数据导入

  1. 创建测试表。

    create database if not exists load_test;
    use load_test;
    create table if not exists customer(
      c_customer_sk bigint,
      c_customer_id char(16),
      c_current_cdemo_sk bigint,
      c_current_hdemo_sk bigint,
      c_current_addr_sk bigint,
      c_first_shipto_date_sk bigint,
      c_first_sales_date_sk bigint,
      c_salutation char(10),
      c_first_name char(20),
      c_last_name char(30),
      c_preferred_cust_flag char(1),
      c_birth_day int,
      c_birth_month int,
      c_birth_year int,
      c_birth_country varchar(20),
      c_login char(13),
      c_email_address char(50),
      c_last_review_date_sk bigint
    )
    duplicate key (c_customer_sk)
    distributed by hash(c_customer_sk) buckets 5
    properties(
      "replication_num"="1"
    );
  2. 创建导入任务。

    下载customer.orc

    LOAD LABEL load_test.customer_label
    (
      DATA INFILE("oss://{bucket_name}/data/customer.orc")
      INTO TABLE customer
      format as "orc"
    )
    WITH BROKER 'broker'
    (
      "fs.oss.accessKeyId" = "xxxxx",
      "fs.oss.accessKeySecret" = "xxxxx",
      "fs.oss.endpoint" = "oss-cn-xxx-internal.aliyuncs.com"
    );
  3. 查看导入任务状态。

    show load where label='customer_label';
  4. 查询表信息。

    • 示例1

      select count(1) from customer;

      返回信息如下。

      +----------+
      | count(1) |
      +----------+
      |  6000000 |
      +----------+
      1 row in set (0.10 sec)
    • 示例2

      select * from customer limit 2;

      返回信息如下。

      +---------------+------------------+--------------------+--------------------+-------------------+------------------------+-----------------------+--------------+--------------+-------------+-----------------------+-------------+---------------+--------------+-----------------+---------+------------------------+-----------------------+
      | c_customer_sk | c_customer_id    | c_current_cdemo_sk | c_current_hdemo_sk | c_current_addr_sk | c_first_shipto_date_sk | c_first_sales_date_sk | c_salutation | c_first_name | c_last_name | c_preferred_cust_flag | c_birth_day | c_birth_month | c_birth_year | c_birth_country | c_login | c_email_address        | c_last_review_date_sk |
      +---------------+------------------+--------------------+--------------------+-------------------+------------------------+-----------------------+--------------+--------------+-------------+-----------------------+-------------+---------------+--------------+-----------------+---------+------------------------+-----------------------+
      |             2 | AAAAAAAACAAAAAAA |             819667 |               1461 |            681655 |                2452318 |               2452288 | Dr.          | Amy          | Moses       | Y                     |           9 |             4 |         1966 | TOGO            | NULL    | Amy.M****@Ovk9KjHH.com |               2452318 |
      |             2 | AAAAAAAACAAAAAAA |             819667 |               1461 |            681655 |                2452318 |               2452288 | Dr.          | Amy          | Moses       | Y                     |           9 |             4 |         1966 | TOGO            | NULL    | Amy.M****@Ovk9KjHH.com |               2452318 |
      +---------------+------------------+--------------------+--------------------+-------------------+------------------------+-----------------------+--------------+--------------+-------------+-----------------------+-------------+---------------+--------------+-----------------+---------+------------------------+-----------------------+

HDFS导入

  • HDFS导入语法示例

    LOAD LABEL db1.label1
    (
        DATA INFILE("hdfs://emr-header-1.cluster-xxx:9000/user/hive/test.db/ml/file1")
        INTO TABLE tbl1
        COLUMNS TERMINATED BY ","
        (tmp_c1, tmp_c2)
        SET
        (
            id=tmp_c2,
            name=tmp_c1
        ),
    
        DATA INFILE("hdfs://emr-header-1.cluster-xxx:9000/user/hive/test.db/ml/file2")
        INTO TABLE tbl2
        COLUMNS TERMINATED BY ","
        (col1, col2)
        where col1 > 1
    )
    WITH BROKER 'broker'
    (
        "username" = "hdfs_username",
        "password" = "hdfs_password"
    )
    PROPERTIES
    (
        "timeout" = "3600"
    );
    说明

    如果您的集群类型是新版数据湖(DataLake)、实时数据流(DataFlow)、数据分析(OLAP)、数据服务(DataServing)和自定义场景的集群, 请替换emr-header-1为master-1-1。

  • HDFS认证

    社区版本的HDFS支持简单认证和Kerberos认证两种认证方式。

    • 简单认证(Simple):用户的身份由与HDFS建立链接的客户端操作系统决定。涉及参数如下表。

      参数

      描述

      hadoop.security.authentication

      认证方式。默认值为simple。

      username

      HDFS的用户名。

      password

      HDFS的密码。

    • Kerberos认证:客户端的身份由用户自己的Kerberos证书决定。

      涉及参数如下表。

      参数

      描述

      hadoop.security.authentication

      认证方式。默认值为kerberos。

      kerberos_principal

      指定Kerberos的Principal。

      kerberos_keytab

      指定Kerberos的keytab文件路径。该文件必须为Broker进程所在服务器上的文件。

      kerberos_keytab_content

      指定Kerberos中keytab文件内容经过Base64编码之后的内容。

      重要

      该参数和kerberos_keytab参数只需配置一个。

  • HDFS HA配置

    通过配置NameNode HA,可以在NameNode切换时,自动识别到新的NameNode。配置以下参数用于访问以HA模式部署的HDFS集群。

    参数

    描述

    dfs.nameservices

    指定HDFS服务的名称,您可以自定义。

    例如,设置dfs.nameservices为my_ha。

    dfs.ha.namenodes.xxx

    自定义NameNode的名称,多个名称时以逗号(,)分隔。其中xxx为dfs.nameservices中自定义的名称。

    例如,设置dfs.ha.namenodes.my_ha为my_nn。

    dfs.namenode.rpc-address.xxx.nn

    指定NameNode的RPC地址信息。其中nn表示dfs.ha.namenodes.xxx中配置的NameNode的名称。

    例如,设置dfs.namenode.rpc-address.my_ha.my_nn参数值的格式为host:port。

    dfs.client.failover.proxy.provider

    指定Client连接NameNode的Provider,默认值为org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider