Broker Load是一种基于MySQL协议的异步数据导入方式。在Broker Load模式下,StarRocks通过部署的Broker服务可以读取对应数据源(例如,Apache HDFS,阿里云OSS)上的数据,还能运用其内置的计算能力对数据进行实时预处理,进而高效地执行导入操作。本文为您介绍Broker Load导入的使用示例。
支持的数据文件格式
Broker Load支持CSV、ORC和Parquet等文件格式,建议单次导入的数据量为几十GB至上百GB级别。
Broker Load导入
查看Broker信息
阿里云EMR Serverless StarRocks实例在创建时已经自动搭建并启动Broker服务。您可以使用以下SQL命令查看当前实例中所有Broker的详细信息。
SHOW PROC "/brokers";
创建导入任务
语法
LOAD LABEL [<database_name>.]<label_name> ( data_desc[, data_desc ...] ) WITH BROKER ( StorageCredentialParams ) [PROPERTIES ( opt_properties ) ]
参数描述
<database_name>
:可选,目标StarRocks表所在的数据库。<label_name>
:导入任务的标签。每个导入任务在该数据库内均具有唯一的标签。通过该标签,可以查看相应导入作业的执行情况,并防止导入重复的数据。当导入任务状态为FINISHED时,该标签不可再用于其他导入作业;而当导入任务状态为CANCELLED时,该标签可以被复用至其他导入作业,但通常情况下是用于重试同一导入作业(即使用相同标签导入相同数据),以实现数据“精确一次(Exactly-Once)”的语义。
data_desc
:用于描述一批次待导入的数据。Broker Load 支持一次导入多个数据文件。在一个导入作业中,您可以使用多个
data_desc
来声明导入多个数据文件,也可以使用一个data_desc
来声明导入一个路径下的所有数据文件。Broker Load 还支持保证单次导入事务的原子性,即单次导入的多个数据文件都成功或者都失败,而不会出现部分导入成功、部分导入失败的情况。DATA INFILE ("<file_path>"[, "<file_path>" ...]) [NEGATIVE] INTO TABLE <table_name> [PARTITION (<partition1_name>[, <partition2_name> ...])] [TEMPORARY PARTITION (<temporary_partition1_name>[, <temporary_partition2_name> ...])] [COLUMNS TERMINATED BY "<column_separator>"] [ROWS TERMINATED BY "<row_separator>"] [FORMAT AS "CSV | Parquet | ORC"] [(format_type_options)] [(column_list)] [COLUMNS FROM PATH AS (<partition_field_name>[, <partition_field_name> ...])] [SET <k1=f1(v1)>[, <k2=f2(v2)> ...]] [WHERE predicate]
data_desc
中部分参数描述如下表所示。参数
描述
file_path
指定源数据文件所在的路径。文件路径可以指定到文件,也可以用星号(*)通配符指定某个目录下的所有文件。中间的目录也可以使用通配符匹配。
可以使用的通配符有
?
、*
、[]
、{}
和^
,使用规则请参见FileSystem。例如, 通过指定
oss://bucket/data/tablename/*/*
路径可以匹配data/tablename
下所有分区内的所有文件。通过指定oss://bucket/data/tablename/dt=202104*/*
路径可以匹配data/tablename
目录下所有202104
分区内的数据文件。NEGATIVE
用于撤销某一批已经成功导入的数据。如果想要撤销某一批已经成功导入的数据,可以通过指定
NEGATIVE
关键字来导入同一批数据。说明该参数仅适用于目标StarRocks表使用聚合表、并且所有Value列的聚合函数均为
sum
的情况。PARTITION
指定待导入表的Partition信息。
指定要把数据导入哪些分区。如果不指定该参数,则默认导入到StarRocks表所在的所有分区中。
COLUMNS TERMINATED BY
用于指定导入文件中的列分隔符。如果不指定该参数,则默认列分隔符为
\t
,即Tab。Broker Load通过MySQL协议提交导入请求,除了StarRocks会做转义处理以外,MySQL协议也会做转义处理。因此,如果列分隔符是Tab等不可见字符,则需要在列分隔字符前面多加一个反斜线(\)。例如,如果列分隔符是
\t
,这里必须输入\\t
;如果列分隔符是\n
,这里必须输入\\n
。Apache Hive™ 文件的列分隔符为\x01
,因此,如果源数据文件是 Hive 文件,这里必须传入\\x01
。FORMAT AS
用于指定导入文件的格式。取值包括
CSV
、Parquet
和ORC
。如果不指定该参数,则默认通过file_path
参数中指定的文件扩展名(.csv
、.parquet
和.orc
)来判断文件格式。COLUMNS FROM PATH AS
用于从指定的文件路径中提取一个或多个分区字段的信息。该参数仅当指定的文件路径中存在分区字段时有效。
例如,导入文件路径为
/path/col_name=col_value/file1
,其中col_name
可以对应到StarRocks表中的列。这时候,您可以设置参数为col_name
。导入时,StarRocks会将col_value
落入col_name
对应的列中。说明该参数只有在从HDFS导入数据时可用。
SET
用于将源数据文件的某一列依据指定的函数进行转换,并将转换后的结果插入到 StarRocks 表中。语法为
column_name = expression
。WHERE
用于指定过滤条件,对已完成转换的数据进行筛选。只有符合WHERE子句中所指定的过滤条件的数据,才能导入到StarRocks表中。
opt_properties
:用于指定一些导入相关的可选参数,指定的参数设置作用于整个导入作业。语法如下所示。PROPERTIES ("<key1>" = "<value1>"[, "<key2>" = "<value2>" ...])
部分参数描述如下表所示。
参数
描述
timeout
导入作业的超时时间(以秒为单位)。
您可以在
opt_properties
中自行设置每个导入的超时时间。导入任务在设定的时限内未完成则会被系统取消,变为CANCELLED。Broker Load的默认导入超时时间为4小时。重要通常情况下,不需要您手动设置导入任务的超时时间。当在默认超时时间内无法完成导入时,可以手动设置任务的超时时间。
推荐超时时间的计算方式为:
超时时间 >(导入文件的总大小 x导入文件及相关物化视图的个数)/平均导入速度
。例如,要导入一个1 GB的数据文件,待导入表包含2个Rollup表,当前StarRocks实例的平均导入速度为10 MB/s。在这种情况下,根据公式计算出来时长为
(1 x 1024 x 3)/10 = 307.2(秒)
因此,建议导入作业的超时时间大于308秒。
说明由于每个StarRocks实例的机器环境不同,且实例并发的查询任务也不同,所以StarRocks实例的最慢导入速度需要您根据历史的导入任务速度进行推测。
max_filter_ratio
导入任务的最大容忍率,默认为0容忍,取值范围是0~1。当导入的错误率超过该值,则导入失败。如果您希望忽略错误的行,可以设置该参数值大于0,来保证导入可以成功。
计算公式为:
max_filter_ratio = (dpp.abnorm.ALL / (dpp.abnorm.ALL + dpp.norm.ALL ) )
其中,
dpp.abnorm.ALL
表示数据质量不合格的行数,例如类型不匹配、列数不匹配和长度不匹配等。dpp.norm.ALL
指的是导入过程中正确数据的条数,可以通过SHOW LOAD
命令查询导入任务的正确数据量。dpp.abnorm.ALL
和dpp.norm.ALL
的总和就等于待导入的总行数。load_mem_limit
导入作业的内存限制,最大不超过BE或CN的内存限制。单位:字节。
strict_mode
是否开启严格模式。取值范围:
true
: 表示开启。false
(默认值):表示关闭。
严格模式对导入过程中的列类型转换实施了严格的过滤。严格模式的过滤策略如下:
如果开启严格模式,StarRocks会把错误的数据行过滤掉,只导入正确的数据行,并返回错误数据详情。
如果关闭严格模式,StarRocks会把转换失败的错误字段转换成
NULL
值,并把这些包含NULL
值的错误数据行跟正确的数据行一起导入。
查看导入任务状态
use <database_name>;
SHOW LOAD;
返回参数的描述如下表所示。
参数 | 描述 |
| 导入任务的唯一ID,每个导入任务的JobId都不同,由系统自动生成。与Label不同的是,JobId永远不会相同,而Label则可以在导入任务失败后被复用。 |
| 导入任务的标识。 |
| 导入任务当前所处的阶段。取值如下:
|
| 导入任务所处的阶段。Broker Load导入任务只有 LOAD的进度的计算公式为 如果所有导入表均完成导入,此时LOAD的进度为99%, 导入进入到最后生效阶段,待整个导入任务完成后,LOAD的进度才会改为100%。 重要 导入进度并不是线性的,所以如果一段时间内进度没有变化,并不代表导入任务没有执行。 |
| 导入任务的类型。Broker Load的Type取值是BROKER。 |
| 主要显示导入的数据量指标 您可以根据 |
| 主要显示当前导入任务参数,即创建Broker Load导入任务时您指定的参数,包括cluster,timeout和max-filter-ratio。 |
| 导入任务的失败原因。当导入作业的状态为
|
| 导入任务创建的时间。 |
| 由于Broker Load导入没有ETL阶段,所以该参数值和 |
| 由于Broker Load导入没有ETL阶段,所以该参数值和 |
|
|
| 导入任务完成的时间。 |
| 导入任务中质量不合格数据的访问地址。您可以使用 |
| 导入任务的其他信息,包括:
|
取消导入任务
当导入任务状态不为CANCELLED或FINISHED时,可以通过CANCEL LOAD
语句来取消该导入任务。
CANCEL LOAD FROM <database_name> WHERE LABEL = "<label_name>";
导入任务并发度
一个作业可以拆成一个或者多个任务,任务之间并行执行。拆分由LOAD语句中的DataDescription来决定。例如:
多个DataDescription对应导入多个不同的表,每个会拆成一个任务。
多个DataDescription对应导入同一个表的不同分区,每个也会拆成一个任务。
每个任务还会拆分成一个或者多个实例,然后将这些实例平均分配到BE上并行执行。实例的拆分由以下FE配置决定:
min_bytes_per_broker_scanner
:单个实例处理的最小数据量,默认值为64 MB。max_broker_concurrency
:单个任务最大并发实例数,默认值为100。load_parallel_instance_num
:单个BE上并发实例数,默认值为1个。
实例总数的计算公式为实例的总数 = min(导入文件总大小/单个实例处理的最小数据量,单个任务最大并发实例数,单个BE上并发实例数 * BE数)
。
通常情况下,一个作业只有一个DataDescription,只会拆分成一个任务。任务会拆成与BE数相等的实例,然后分配到所有BE上并行执行。
导入示例
通过EMR StarRocks Manager连接StarRocks实例,可以在SQL Editor页面执行以下SQL。
从阿里云OSS导入
创建测试表。
create database if not exists load_test; use load_test; create table if not exists customer( c_customer_sk bigint, c_customer_id char(16), c_current_cdemo_sk bigint, c_current_hdemo_sk bigint, c_current_addr_sk bigint, c_first_shipto_date_sk bigint, c_first_sales_date_sk bigint, c_salutation char(10), c_first_name char(20), c_last_name char(30), c_preferred_cust_flag char(1), c_birth_day int, c_birth_month int, c_birth_year int, c_birth_country varchar(20), c_login char(13), c_email_address char(50), c_last_review_date_sk bigint ) duplicate key (c_customer_sk) distributed by hash(c_customer_sk) buckets 5 properties( "replication_num"="1" );
创建导入任务。
请下载并上传数据文件customer.orc至OSS,然后执行以下命令创建导入任务。
LOAD LABEL load_test.customer_label ( DATA INFILE("<file_path>") INTO TABLE customer format as "orc" ) WITH BROKER 'broker' ( "fs.oss.accessKeyId" = "xxxxx", "fs.oss.accessKeySecret" = "xxxxx", "fs.oss.endpoint" = "oss-cn-xxx-internal.aliyuncs.com" );
涉及参数如下表所示,请根据实际情况进行替换。
参数
说明
<file_path>
customer.orc
文件所在的路径。例如,oss://<yourBucketName>/data/customer.orc
。fs.oss.accessKeyId
阿里云账号或RAM用户的AccessKey ID。您可以进入AccessKey管理页面获取AccessKey ID。
fs.oss.accessKeySecret
AccessKey ID对应的 AccessKey Secret。
fs.oss.endpoint
访问OSS的Endpoint。例如,oss-cn-hangzhou-internal.aliyuncs.com。
如果StarRocks与OSS位于同一地域,则使用VPC网络Endpoint,否则使用公网Endpoint。获取方法请参见OSS地域和访问域名。
查看导入任务状态。
use load_test; show load where label='customer_label';
查询表信息。
示例1:统计load_test数据库中customer表的总行数。
select count(1) from load_test.customer;
示例2:展示load_test数据库中customer表的前两条完整记录。
select * from load_test.customer limit 2;
从HDFS导入
使用该方式导入时,需注意以下信息:
HDFS集群和StarRocks实例需要在同一个VPC下,并且在同一个可用区下。
HDFS集群创建后,需要为安全组开通所有DataNode端口,才能访问HDFS数据。本文示例使用的是EMR on ECS中包含了HDFS服务的DataLake集群,因此关于安全组的开通详情,请参见管理安全组。
HDFS导入示例
创建库表。
CREATE DATABASE IF NOT EXISTS mydatabase; CREATE TABLE if NOT EXISTS mydatabase.userdata_broker_load ( userId INT, userName VARCHAR(20), registrationDate DATE ) ENGINE = OLAP DUPLICATE KEY(userId) DISTRIBUTED BY HASH(userId);
创建导入任务。
请下载并上传数据文件user_data.parquet至HDFS的
/data
目录下,然后执行以下命令创建导入任务。LOAD LABEL mydatabase.userdata_broker_load_label ( DATA INFILE("hdfs://<hdfs_ip>:<hdfs_port>/data/user_data.parquet") INTO TABLE userdata_broker_load format AS "parquet" ) WITH BROKER PROPERTIES ( "timeout" = "72000" );
其中,以下参数请您根据实际情况替换。
参数
说明
<hdfs_ip>
HDFS集群NameNode节点的内网IP地址。
如果您使用的是EMR on ECS中包含HDFS服务的集群(DataLake或Custom类型),则可以在节点管理页签的Master节点组下,查看内网IP地址。
<hdfs_port>
HDFS集群的NameNode节点服务监听的端口号,默认是
9000
。查看导入任务。
USE mydatabase; SHOW LOAD WHERE label='userdata_broker_load_label';
查看导入数据。
SELECT * FROM mydatabase.userdata_broker_load;
HDFS认证方式
社区版本HDFS支持简单认证和Kerberos认证两种认证方式。
简单认证:用户的身份由与HDFS建立连接的客户端操作系统决定。
如果使用简单认证,请按如下配置
StorageCredentialParams
。"hadoop.security.authentication" = "simple", "username" = "<hdfs_username>", "password" = "<hdfs_password>"
StorageCredentialParams
包含如下参数。参数
描述
hadoop.security.authentication
认证方式。取值范围:
simple
和kerberos
。默认值:simple
。simple
表示简单认证,即无认证。kerberos
表示Kerberos认证。username
用于访问HDFS集群中NameNode节点的用户名。
password
用于访问HDFS集群中NameNode节点的密码。
Kerberos认证:客户端的身份由用户自己的Kerberos证书决定。
如果使用Kerberos认证,请在Serverless StarRocks实例的实例配置页面,为
hdfs-site.xml
文件添加以下配置。"hadoop.security.authentication" = "kerberos", "kerberos_principal" = "nn/zelda1@ZELDA.COM", "kerberos_keytab" = "/keytab/hive.keytab", "kerberos_keytab_content" = "YWFhYWFh"
涉及参数说明如下所示。
参数
描述
hadoop.security.authentication
认证方式。取值范围:
simple
和kerberos
。默认值:simple
。simple
表示简单认证,即无认证。kerberos
表示Kerberos认证。kerberos_principal
用于指定Kerberos的用户或服务(Principal)。每个Principal在HDFS集群内唯一,由如下三部分组成:
username
或servicename
:HDFS集群中用户或服务的名称。instance
:HDFS集群要认证的节点所在服务器的名称,用来保证用户或服务全局唯一。比如,HDFS集群中有多个DataNode节点,各节点需要各自独立认证。realm
:域,必须全大写。
例如,
nn/zelda1@ZELDA.COM
。kerberos_keytab
指定Kerberos的keytab文件路径。该文件必须为Broker进程所在服务器上的文件。
kerberos_keytab_content
指定Kerberos中keytab文件内容经过Base64编码之后的内容。
重要该参数和
kerberos_keytab
参数只需配置一个。
HDFS HA配置
通过配置NameNode HA,可以在NameNode切换时,自动识别到新的NameNode。在Serverless StarRocks实例的实例配置页面,为hdfs-site.xml
文件添加以下配置,用于访问以HA模式部署的HDFS集群。
"dfs.nameservices" = "ha_cluster",
"dfs.ha.namenodes.ha_cluster" = "ha_n1,ha_n2",
"dfs.namenode.rpc-address.ha_cluster.ha_n1" = "<hdfs_host>:<hdfs_port>",
"dfs.namenode.rpc-address.ha_cluster.ha_n2" = "<hdfs_host>:<hdfs_port>",
"dfs.client.failover.proxy.provider.ha_cluster" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
涉及参数说明如下表所示。
参数 | 描述 |
| 指定HDFS服务的名称,您可以自定义。 例如,设置 |
| 自定义NameNode的名称,多个名称时以逗号(,)分隔。其中 例如,设置 |
| 指定NameNode的RPC地址信息。其中nn表示 例如,设置 |
| 指定Client连接NameNode的Provider,默认值为 |
如果您使用的是EMR on ECS中包含HDFS服务的集群,则可以在目标集群的集群服务标签下,进入HDFS服务的配置标签,在hdfs-site.xml文件中查找相关参数的值。