Broker Load

Broker Load是一种基于MySQL协议的异步数据导入方式。在Broker Load模式下,StarRocks通过部署的Broker服务可以读取对应数据源(例如,Apache HDFS,阿里云OSS)上的数据,还能运用其内置的计算能力对数据进行实时预处理,进而高效地执行导入操作。本文为您介绍Broker Load导入的使用示例。

支持的数据文件格式

Broker Load支持CSV、ORCParquet等文件格式,建议单次导入的数据量为几十GB至上百GB级别。

Broker Load导入

查看Broker信息

阿里云EMR Serverless StarRocks实例在创建时已经自动搭建并启动Broker服务。您可以使用以下SQL命令查看当前实例中所有Broker的详细信息。

SHOW PROC "/brokers";

创建导入任务

  • 语法

    LOAD LABEL [<database_name>.]<label_name>
    (
        data_desc[, data_desc ...]
    )
    WITH BROKER
    (
        StorageCredentialParams
    )
    [PROPERTIES
    (
        opt_properties
    )
    ]
  • 参数描述

    • <database_name>:可选,目标StarRocks表所在的数据库。

    • <label_name>:导入任务的标签。

      每个导入任务在该数据库内均具有唯一的标签。通过该标签,可以查看相应导入作业的执行情况,并防止导入重复的数据。当导入任务状态为FINISHED时,该标签不可再用于其他导入作业;而当导入任务状态为CANCELLED时,该标签可以被复用至其他导入作业,但通常情况下是用于重试同一导入作业(即使用相同标签导入相同数据),以实现数据“精确一次(Exactly-Once)”的语义。

    • data_desc:用于描述一批次待导入的数据。

      Broker Load 支持一次导入多个数据文件。在一个导入作业中,您可以使用多个 data_desc 来声明导入多个数据文件,也可以使用一个 data_desc 来声明导入一个路径下的所有数据文件。Broker Load 还支持保证单次导入事务的原子性,即单次导入的多个数据文件都成功或者都失败,而不会出现部分导入成功、部分导入失败的情况。

      DATA INFILE ("<file_path>"[, "<file_path>" ...])
      [NEGATIVE]
      INTO TABLE <table_name>
      [PARTITION (<partition1_name>[, <partition2_name> ...])]
      [TEMPORARY PARTITION (<temporary_partition1_name>[, <temporary_partition2_name> ...])]
      [COLUMNS TERMINATED BY "<column_separator>"]
      [ROWS TERMINATED BY "<row_separator>"]
      [FORMAT AS "CSV | Parquet | ORC"]
      [(format_type_options)]
      [(column_list)]
      [COLUMNS FROM PATH AS (<partition_field_name>[, <partition_field_name> ...])]
      [SET <k1=f1(v1)>[, <k2=f2(v2)> ...]]
      [WHERE predicate]

      data_desc 中部分参数描述如下表所示。

      参数

      描述

      file_path

      指定源数据文件所在的路径。文件路径可以指定到文件,也可以用星号(*)通配符指定某个目录下的所有文件。中间的目录也可以使用通配符匹配。

      可以使用的通配符有?*[]{}^,使用规则请参见FileSystem

      例如, 通过指定oss://bucket/data/tablename/*/*路径可以匹配data/tablename下所有分区内的所有文件。通过指定oss://bucket/data/tablename/dt=202104*/*路径可以匹配data/tablename目录下所有 202104 分区内的数据文件。

      NEGATIVE

      用于撤销某一批已经成功导入的数据。如果想要撤销某一批已经成功导入的数据,可以通过指定 NEGATIVE 关键字来导入同一批数据。

      说明

      该参数仅适用于目标StarRocks表使用聚合表、并且所有Value列的聚合函数均为sum的情况。

      PARTITION

      指定待导入表的Partition信息。

      指定要把数据导入哪些分区。如果不指定该参数,则默认导入到StarRocks表所在的所有分区中。

      COLUMNS TERMINATED BY

      用于指定导入文件中的列分隔符。如果不指定该参数,则默认列分隔符为\t,即Tab。

      Broker Load通过MySQL协议提交导入请求,除了StarRocks会做转义处理以外,MySQL协议也会做转义处理。因此,如果列分隔符是Tab等不可见字符,则需要在列分隔字符前面多加一个反斜线(\)。例如,如果列分隔符是 \t,这里必须输入 \\t;如果列分隔符是 \n,这里必须输入 \\n。Apache Hive™ 文件的列分隔符为 \x01,因此,如果源数据文件是 Hive 文件,这里必须传入 \\x01

      FORMAT AS

      用于指定导入文件的格式。取值包括 CSVParquetORC。如果不指定该参数,则默认通过file_path参数中指定的文件扩展名(.csv.parquet.orc)来判断文件格式。

      COLUMNS FROM PATH AS

      用于从指定的文件路径中提取一个或多个分区字段的信息。该参数仅当指定的文件路径中存在分区字段时有效。

      例如,导入文件路径为/path/col_name=col_value/file1,其中col_name可以对应到StarRocks表中的列。这时候,您可以设置参数为col_name。导入时,StarRocks会将col_value落入col_name对应的列中。

      说明

      该参数只有在从HDFS导入数据时可用。

      SET

      用于将源数据文件的某一列依据指定的函数进行转换,并将转换后的结果插入到 StarRocks 表中。语法为column_name = expression

      WHERE

      用于指定过滤条件,对已完成转换的数据进行筛选。只有符合WHERE子句中所指定的过滤条件的数据,才能导入到StarRocks表中。

    • opt_properties:用于指定一些导入相关的可选参数,指定的参数设置作用于整个导入作业。语法如下所示。

      PROPERTIES ("<key1>" = "<value1>"[, "<key2>" = "<value2>" ...])

      部分参数描述如下表所示。

      参数

      描述

      timeout

      导入作业的超时时间(以秒为单位)。

      您可以在opt_properties中自行设置每个导入的超时时间。导入任务在设定的时限内未完成则会被系统取消,变为CANCELLED。Broker Load的默认导入超时时间为4小时。

      重要

      通常情况下,不需要您手动设置导入任务的超时时间。当在默认超时时间内无法完成导入时,可以手动设置任务的超时时间。

      推荐超时时间的计算方式为:超时时间 >(导入文件的总大小 x导入文件及相关物化视图的个数)/平均导入速度

      例如,要导入一个1 GB的数据文件,待导入表包含2Rollup表,当前StarRocks实例的平均导入速度为10 MB/s。在这种情况下,根据公式计算出来时长为(1 x 1024 x 3)/10 = 307.2(秒)

      因此,建议导入作业的超时时间大于308秒。

      说明

      由于每个StarRocks实例的机器环境不同,且实例并发的查询任务也不同,所以StarRocks实例的最慢导入速度需要您根据历史的导入任务速度进行推测。

      max_filter_ratio

      导入任务的最大容忍率,默认为0容忍,取值范围是0~1。当导入的错误率超过该值,则导入失败。如果您希望忽略错误的行,可以设置该参数值大于0,来保证导入可以成功。

      计算公式为:max_filter_ratio = (dpp.abnorm.ALL / (dpp.abnorm.ALL + dpp.norm.ALL ) )

      其中,dpp.abnorm.ALL表示数据质量不合格的行数,例如类型不匹配、列数不匹配和长度不匹配等。dpp.norm.ALL指的是导入过程中正确数据的条数,可以通过SHOW LOAD命令查询导入任务的正确数据量。

      dpp.abnorm.ALL 和 dpp.norm.ALL 的总和就等于待导入的总行数。

      load_mem_limit

      导入作业的内存限制,最大不超过BECN的内存限制。单位:字节。

      strict_mode

      是否开启严格模式。取值范围:

      • true: 表示开启。

      • false(默认值):表示关闭。

      严格模式对导入过程中的列类型转换实施了严格的过滤。严格模式的过滤策略如下:

      • 如果开启严格模式,StarRocks会把错误的数据行过滤掉,只导入正确的数据行,并返回错误数据详情。

      • 如果关闭严格模式,StarRocks会把转换失败的错误字段转换成NULL值,并把这些包含NULL值的错误数据行跟正确的数据行一起导入。

查看导入任务状态

use <database_name>;
SHOW LOAD;

返回参数的描述如下表所示。

参数

描述

JobId

导入任务的唯一ID,每个导入任务的JobId都不同,由系统自动生成。与Label不同的是,JobId永远不会相同,而Label则可以在导入任务失败后被复用。

Label

导入任务的标识。

State

导入任务当前所处的阶段。取值如下:

  • PENDING:导入任务已创建。

  • QUEUEING:导入任务正在等待执行中。

  • LOADING:表示正在执行中。

  • CANCELLED:表示导入失败。

  • FINISHED:表示导入成功。

Progress

导入任务所处的阶段。Broker Load导入任务只有LOAD阶段,对应导入作业状态中的LOADINGLOAD进度为0~100%。

LOAD的进度的计算公式为LOAD进度 = 当前完成导入的表个数 / 本次导入任务设计的总表个数 * 100%

如果所有导入表均完成导入,此时LOAD的进度为99%, 导入进入到最后生效阶段,待整个导入任务完成后,LOAD的进度才会改为100%。

重要

导入进度并不是线性的,所以如果一段时间内进度没有变化,并不代表导入任务没有执行。

Type

导入任务的类型。Broker LoadType取值是BROKER。

EtlInfo

主要显示导入的数据量指标unselected.rowsdpp.norm.ALLdpp.abnorm.ALL

您可以根据unselected.rows的参数值判断where条件过滤了多少行,根据dpp.norm.ALLdpp.abnorm.ALL两个指标可以验证当前导入任务的错误率是否超过max-filter-ratio。三个指标之和就是原始数据量的总行数。

TaskInfo

主要显示当前导入任务参数,即创建Broker Load导入任务时您指定的参数,包括cluster,timeoutmax-filter-ratio。

ErrorMsg

导入任务的失败原因。当导入作业的状态为PENDINGLOADINGFINISHED时,该参数值为NULL。当导入作业的状态为CANCELLED时,该参数值包括typemsg两部分:

  • type包括以下取值:

    • USER_CANCEL:取消的任务。

    • ETL_SUBMIT_FAIL:导入任务提交失败。

    • ETL_QUALITY_UNSATISFIED:数据质量不合格,即错误数据率超过了max-filter-ratio。

    • LOAD-RUN-FAIL:在LOAD阶段失败的导入任务。

    • TIMEOUT:没在超时时间内完成的导入任务。

    • UNKNOWN:未知的导入错误。

  • msg显示有关失败原因的详细信息。

CreateTime

导入任务创建的时间。

EtlStartTime

由于Broker Load导入没有ETL阶段,所以该参数值和LoadStartTime相同。

EtlFinishTime

由于Broker Load导入没有ETL阶段,所以该参数值和LoadStartTime相同。

LoadStartTime

LOAD阶段开始的时间。

LoadFinishTime

导入任务完成的时间。

URL

导入任务中质量不合格数据的访问地址。您可以使用curlwget命令打开该地址。如果导入作业不存在质量不合格数据,该参数值为NULL

JobDetails

导入任务的其他信息,包括:

  • Unfinished backends:未完成导入的BE节点ID。

  • ScannedRows:实际处理的行数,包括导入的行数以及过滤掉的行数。

  • TaskNumber:子作业个数。

  • All backends:正在运行子作业的BE节点的 ID。

  • FileNumber:源文件的个数。

  • FileSize:所有源文件的总数据量,单位:字节。

取消导入任务

当导入任务状态不为CANCELLEDFINISHED时,可以通过CANCEL LOAD语句来取消该导入任务。

CANCEL LOAD FROM <database_name> WHERE LABEL = "<label_name>";

导入任务并发度

一个作业可以拆成一个或者多个任务,任务之间并行执行。拆分由LOAD语句中的DataDescription来决定。例如:

  • 多个DataDescription对应导入多个不同的表,每个会拆成一个任务。

  • 多个DataDescription对应导入同一个表的不同分区,每个也会拆成一个任务。

每个任务还会拆分成一个或者多个实例,然后将这些实例平均分配到BE上并行执行。实例的拆分由以下FE配置决定:

  • min_bytes_per_broker_scanner:单个实例处理的最小数据量,默认值为64 MB。

  • max_broker_concurrency:单个任务最大并发实例数,默认值为100。

  • load_parallel_instance_num:单个BE上并发实例数,默认值为1个。

实例总数的计算公式为实例的总数 = min(导入文件总大小/单个实例处理的最小数据量,单个任务最大并发实例数,单个BE上并发实例数 * BE数)

通常情况下,一个作业只有一个DataDescription,只会拆分成一个任务。任务会拆成与BE数相等的实例,然后分配到所有BE上并行执行。

导入示例

通过EMR StarRocks Manager连接StarRocks实例,可以在SQL Editor页面执行以下SQL。

从阿里云OSS导入

  1. 创建测试表。

    create database if not exists load_test;
    use load_test;
    create table if not exists customer(
      c_customer_sk bigint,
      c_customer_id char(16),
      c_current_cdemo_sk bigint,
      c_current_hdemo_sk bigint,
      c_current_addr_sk bigint,
      c_first_shipto_date_sk bigint,
      c_first_sales_date_sk bigint,
      c_salutation char(10),
      c_first_name char(20),
      c_last_name char(30),
      c_preferred_cust_flag char(1),
      c_birth_day int,
      c_birth_month int,
      c_birth_year int,
      c_birth_country varchar(20),
      c_login char(13),
      c_email_address char(50),
      c_last_review_date_sk bigint
    )
    duplicate key (c_customer_sk)
    distributed by hash(c_customer_sk) buckets 5
    properties(
      "replication_num"="1"
    );
  2. 创建导入任务。

    请下载并上传数据文件customer.orcOSS,然后执行以下命令创建导入任务。

    LOAD LABEL load_test.customer_label
    (
      DATA INFILE("<file_path>")
      INTO TABLE customer
      format as "orc"
    )
    WITH BROKER 'broker'
    (
      "fs.oss.accessKeyId" = "xxxxx",
      "fs.oss.accessKeySecret" = "xxxxx",
      "fs.oss.endpoint" = "oss-cn-xxx-internal.aliyuncs.com"
    );

    涉及参数如下表所示,请根据实际情况进行替换。

    参数

    说明

    <file_path>

    customer.orc文件所在的路径。例如,oss://<yourBucketName>/data/customer.orc

    fs.oss.accessKeyId

    阿里云账号或RAM用户的AccessKey ID。您可以进入AccessKey管理页面获取AccessKey ID。

    fs.oss.accessKeySecret

    AccessKey ID对应的 AccessKey Secret。

    fs.oss.endpoint

    访问OSSEndpoint。例如,oss-cn-hangzhou-internal.aliyuncs.com。

    如果StarRocksOSS位于同一地域,则使用VPC网络Endpoint,否则使用公网Endpoint。获取方法请参见OSS地域和访问域名

  3. 查看导入任务状态。

    use load_test;
    show load where label='customer_label';
  4. 查询表信息。

    • 示例1:统计load_test数据库中customer表的总行数。

      select count(1) from load_test.customer;
    • 示例2:展示load_test数据库中customer表的前两条完整记录。

      select * from load_test.customer limit 2;

HDFS导入

说明

使用该方式导入时,需注意以下信息:

  • HDFS集群和StarRocks实例需要在同一个VPC下,并且在同一个可用区下。

  • HDFS集群创建后,需要为安全组开通所有DataNode端口,才能访问HDFS数据。本文示例使用的是EMR on ECS中包含了HDFS服务的DataLake集群,因此关于安全组的开通详情,请参见管理安全组

HDFS导入示例

  1. 创建库表。

    CREATE DATABASE IF NOT EXISTS mydatabase;
    
    CREATE TABLE if NOT EXISTS mydatabase.userdata_broker_load (
        userId INT,
        userName VARCHAR(20),
        registrationDate DATE
    )
    ENGINE = OLAP
    DUPLICATE KEY(userId)
    DISTRIBUTED BY HASH(userId);
  2. 创建导入任务。

    请下载并上传数据文件user_data.parquetHDFS/data目录下,然后执行以下命令创建导入任务。

    LOAD LABEL mydatabase.userdata_broker_load_label
    (
      DATA INFILE("hdfs://<hdfs_ip>:<hdfs_port>/data/user_data.parquet")
      INTO TABLE userdata_broker_load
      format AS "parquet"
    )
    WITH BROKER
    PROPERTIES
    (
        "timeout" = "72000"
    );

    其中,以下参数请您根据实际情况替换。

    参数

    说明

    <hdfs_ip>

    HDFS集群NameNode节点的内网IP地址。

    如果您使用的是EMR on ECS中包含HDFS服务的集群(DataLakeCustom类型),则可以在节点管理页签的Master节点组下,查看内网IP地址。

    <hdfs_port>

    HDFS集群的NameNode节点服务监听的端口号,默认是 9000

  3. 查看导入任务。

    USE mydatabase;
    SHOW LOAD WHERE label='userdata_broker_load_label';
  4. 查看导入数据。

    SELECT * FROM mydatabase.userdata_broker_load;

HDFS认证方式

社区版本HDFS支持简单认证和Kerberos认证两种认证方式。

  • 简单认证:用户的身份由与HDFS建立连接的客户端操作系统决定。

    如果使用简单认证,请按如下配置 StorageCredentialParams

    "hadoop.security.authentication" = "simple",
    "username" = "<hdfs_username>",
    "password" = "<hdfs_password>"

    StorageCredentialParams 包含如下参数。

    参数

    描述

    hadoop.security.authentication

    认证方式。取值范围:simple 和 kerberos。默认值:simplesimple表示简单认证,即无认证。kerberos表示Kerberos认证。

    username

    用于访问HDFS集群中NameNode节点的用户名。

    password

    用于访问HDFS集群中NameNode节点的密码。

  • Kerberos认证:客户端的身份由用户自己的Kerberos证书决定。

    如果使用Kerberos认证,请在Serverless StarRocks实例的实例配置页面,为hdfs-site.xml 文件添加以下配置。

    "hadoop.security.authentication" = "kerberos",
    "kerberos_principal" = "nn/zelda1@ZELDA.COM",
    "kerberos_keytab" = "/keytab/hive.keytab",
    "kerberos_keytab_content" = "YWFhYWFh"

    涉及参数说明如下所示。

    参数

    描述

    hadoop.security.authentication

    认证方式。取值范围:simple 和 kerberos。默认值:simplesimple表示简单认证,即无认证。kerberos表示Kerberos认证。

    kerberos_principal

    用于指定Kerberos的用户或服务(Principal)。每个PrincipalHDFS集群内唯一,由如下三部分组成:

    • usernameservicename:HDFS集群中用户或服务的名称。

    • instance:HDFS集群要认证的节点所在服务器的名称,用来保证用户或服务全局唯一。比如,HDFS集群中有多个DataNode节点,各节点需要各自独立认证。

    • realm:域,必须全大写。

    例如,nn/zelda1@ZELDA.COM

    kerberos_keytab

    指定Kerberoskeytab文件路径。该文件必须为Broker进程所在服务器上的文件。

    kerberos_keytab_content

    指定Kerberoskeytab文件内容经过Base64编码之后的内容。

    重要

    该参数和kerberos_keytab参数只需配置一个。

HDFS HA配置

通过配置NameNode HA,可以在NameNode切换时,自动识别到新的NameNode。在Serverless StarRocks实例的实例配置页面,为hdfs-site.xml 文件添加以下配置,用于访问以HA模式部署的HDFS集群。

"dfs.nameservices" = "ha_cluster",
"dfs.ha.namenodes.ha_cluster" = "ha_n1,ha_n2",
"dfs.namenode.rpc-address.ha_cluster.ha_n1" = "<hdfs_host>:<hdfs_port>",
"dfs.namenode.rpc-address.ha_cluster.ha_n2" = "<hdfs_host>:<hdfs_port>",
"dfs.client.failover.proxy.provider.ha_cluster" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"

涉及参数说明如下表所示。

参数

描述

dfs.nameservices

指定HDFS服务的名称,您可以自定义。

例如,设置dfs.nameservicesmy_ha。

dfs.ha.namenodes.xxx

自定义NameNode的名称,多个名称时以逗号(,)分隔。其中xxxdfs.nameservices自定义的名称。

例如,设置dfs.ha.namenodes.my_hamy_nn。

dfs.namenode.rpc-address.xxx.nn

指定NameNodeRPC地址信息。其中nn表示dfs.ha.namenodes.xxx中配置的NameNode的名称。

例如,设置dfs.namenode.rpc-address.my_ha.my_nn参数值的格式为<hdfs_host>:<hdfs_port>

dfs.client.failover.proxy.provider.xxx

指定Client连接NameNodeProvider,默认值为org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider

说明

如果您使用的是EMR on ECS中包含HDFS服务的集群,则可以在目标集群的集群服务标签下,进入HDFS服务的配置标签,在hdfs-site.xml件中查找相关参数的值。