在Broker Load模式下,通过部署的Broker程序,StarRocks可读取对应数据源(例如,Apache HDFS,阿里云OSS)上的数据,利用自身的计算资源对数据进行预处理和导入。本文为您介绍Broker Load导入的使用示例以及常见问题。
背景信息
Broker Load是一种异步的导入方式。StarRocks支持从外部存储系统导入数据,支持CSV、ORCFile和Parquet等文件格式,建议单次导入数据量在几十GB到上百GB级别。
Broker Load导入
查看Broker
阿里云EMR StarRocks实例在创建时已经自动搭建并启动Broker服务,Broker服务位于每个Core节点上。使用以下SQL命令可以查看Broker。
SHOW PROC "/brokers";
创建导入任务
语法
LOAD LABEL [<database_name>.]<label_name> ( data_desc[, data_desc ...] ) WITH BROKER [broker_properties] [opt_properties]
参数描述
Label
导入任务的标识。每个导入任务都有一个唯一的Label,由系统自动生成或在导入命令中自定义。Label可以用来防止导入相同的数据,并查看对应导入任务的执行情况。当Label对应的导入任务状态为FINISHED时,对应的Label无法再次使用;当状态为CANCELLED时,可以再次使用该Label提交导入作业。
数据描述类data_desc
data_desc: DATA INFILE ('file_path', ...) [NEGATIVE] INTO TABLE tbl_name [PARTITION (p1, p2)] [COLUMNS TERMINATED BY column_separator ] [FORMAT AS file_type] [(col1, ...)] [COLUMNS FROM PATH AS (colx, ...)] [SET (k1=f1(xx), k2=f2(xx))] [WHERE predicate]
相关参数描述如下表所示。
参数
描述
file_path
文件路径可以指定到文件,也可以用星号(*)通配符指定某个目录下的所有文件。中间的目录也可以使用通配符匹配。
可以使用的通配符有? * [] {} ^,使用规则请参见FileSystem。
例如, 通过oss://bucket/data/tablename , 可以匹配tablename下所有分区内的所有文件。通过oss://bucket/data/tablename/dt=202301, 可以匹配tablename下1月分区的所有文件。
negative
设置数据取反导入。
该功能适用的场景是当数据表中聚合列的类型均为SUM类型时,如果希望撤销某一批导入的数据,可以通过negative参数导入同一批数据,StarRocks会自动为这批数据在聚合列上数据取反,以达到消除同一批数据的功能。
partition
指定待导入表的Partition信息。
如果待导入数据不属于指定的Partition,则不会被导入。同时,不指定Partition中的数据会被认为是“错误数据”。对于不想导入,也不想记录为“错误数据”的数据,可以使用where predicate来过滤。
column_separator
COLUMNS TERMINATED BY column_separator
,用于指定导入文件中的列分隔符,默认为\t。如果是不可见字符,则需要加\x作为前缀,使用十六进制来表示分隔符。例如,Hive文件的分隔符为\x01,则列分隔符为\\x01。
file_type
FORMAT AS file_type
,用于指定导入文件的类型。例如,parquet、orc、csv,默认值为csv。parquet类型也可以通过文件后缀名.parquet或者.parq判断。
COLUMNS FROM PATH AS
提取文件路径中的分区字段。
例如,导入文件为/path/col_name=col_value/dt=20210101/file1,其中col_name/dt为表中的列,则将col_value、20210101分别导入到col_name和dt对应的列的代码示例如下。
(col1, col2) COLUMNS FROM PATH AS (col_name, dt)
set column mapping
SET (k1=f1(xx), k2=f2(xx))
,data_desc中的SET语句负责设置列函数变换。如果原始数据的列和表中的列不一一对应,则需要使用该属性。
where predicate
WHERE predicate
,data_desc中的WHERE语句负责过滤已经完成transform的数据。被过滤的数据不会进入容忍率的统计中。如果多个data_desc中声明了关于同一张表的多个条件,则会以AND语义合并这些条件。
导入作业参数
导入作业参数是指Broker Load创建导入语句中属于broker_properties部分的参数。导入作业参数是作用于整个导入作业的。
broker_properties: (key2=value2, ...)
部分参数描述如下表所示。
参数
描述
timeout
导入作业的超时时间(以秒为单位)。
您可以在opt_properties中自行设置每个导入的超时时间。导入任务在设定的时限内未完成则会被系统取消,变为CANCELLED。Broker Load的默认导入超时时间为4小时。
重要通常情况下,不需要您手动设置导入任务的超时时间。当在默认超时时间内无法完成导入时,可以手动设置任务的超时时间。
推荐超时时间的计算方式为:
超时时间 >((总文件大小 (MB)* 待导入的表及相关Roll up表的个数) / (30 * 导入并发数))
公式中的30为目前BE导入的平均速度,表示30 MB/s。例如,如果待导入数据文件为1 GB,待导入表包含2个Rollup表,当前的导入并发数为3,则timeout的最小值为 (1 * 1024 * 3 ) / (10 * 3) = 102 秒。
由于每个StarRocks集群的机器环境不同且集群并发的查询任务也不同,所以StarRocks集群的最慢导入速度需要您根据历史的导入任务速度进行推测。
max_filter_ratio
导入任务的最大容忍率,默认为0容忍,取值范围是0~1。当导入的错误率超过该值,则导入失败。如果您希望忽略错误的行,可以设置该参数值大于0,来保证导入可以成功。
计算公式为:
max_filter_ratio = (dpp.abnorm.ALL / (dpp.abnorm.ALL + dpp.norm.ALL ) )
其中,
dpp.abnorm.ALL
表示数据质量不合格的行数,例如类型不匹配、列数不匹配和长度不匹配等。dpp.abnorm.ALL
指的是导入过程中正确数据的条数,可以通过SHOW LOAD
命令查询导入任务的正确数据量。原始文件的行数 = dpp.abnorm.ALL + dpp.norm.ALL
load_mem_limit
导入内存限制。默认值为0,表示不限制。
strict_mode
Broker Load导入可以开启Strict Mode模式。开启方式为
properties ("strict_mode" = "true")
。默认关闭。
Strict Mode模式是对于导入过程中的列类型转换进行严格过滤。严格过滤的策略为,对于列类型转换,如果Strict Mode为true,则错误的数据将被过滤掉。错误数据是指原始数据并不为空值,在参与列类型转换后结果为空值的数据。但以下场景除外:
对于导入的某列由函数变换生成时,Strict Mode对其不产生影响。
对于导入的某列类型包含范围限制的,如果原始数据能正常通过类型转换,但无法通过范围限制的,Strict Mode对其也不产生影响。例如,如果类型是decimal(1,0),原始数据为10,则属于可以通过类型转换但不在列声明的范围内,Strict Mode对其不产生影响。
查看导入任务状态
SHOW LOAD;
返回参数的描述如下表所示。
参数 | 描述 |
JobId | 导入任务的唯一ID,每个导入任务的JobId都不同,由系统自动生成。与Label不同的是,JobId永远不会相同,而Label则可以在导入任务失败后被复用。 |
Label | 导入任务的标识。 |
State | 导入任务当前所处的阶段。
|
Progress | 导入任务的进度描述。分为ETL和LOAD两种进度,分别对应导入流程的ETL和LOADING两个阶段。目前Broker Load只有LOADING阶段,所以ETL固定显示为N/A,而LOAD的进度范围为0~100%。 LOAD的进度的计算公式为 如果所有导入表均完成导入,此时LOAD的进度为99%, 导入进入到最后生效阶段,待整个导入任务完成后,LOAD的进度才会改为100%。 重要 导入进度并不是线性的,所以如果一段时间内进度没有变化,并不代表导入任务没有执行。 |
Type | 导入任务的类型。Broker Load的Type取值是BROKER。 |
EtlInfo | 主要显示导入的数据量指标unselected.rows,dpp.norm.ALL和dpp.abnorm.ALL。 您可以根据unselected.rows的参数值判断where条件过滤了多少行,根据dpp.norm.ALL和dpp.abnorm.ALL两个指标可以验证当前导入任务的错误率是否超过max-filter-ratio。三个指标之和就是原始数据量的总行数。 |
TaskInfo | 主要显示当前导入任务参数,即创建Broker Load导入任务时您指定的参数,包括cluster,timeout和max-filter-ratio。 |
ErrorMsg | 如果导入任务状态为CANCELLED,则显示失败的原因,包括type和msg两部分。如果导入任务成功则显示N/A。type的取值意义如下:
|
CreateTime | 分别表示导入创建的时间、ETL阶段开始的时间、ETL阶段完成的时间、LOADING阶段开始的时间和整个导入任务完成的时间。
|
EtlStartTime | |
EtlFinishTime | |
LoadStartTime | |
LoadFinishTime | |
URL | 导入任务的错误数据样例,访问URL地址即可获取本次导入的错误数据样例。当本次导入不存在错误数据时,URL字段为N/A。 |
JobDetails | 显示作业的详细运行状态。包括导入文件的个数、总大小(字节)、子任务个数、已处理的原始行数,运行子任务的BE节点ID,以及未完成的BE节点ID。
其中已处理的原始行数,每5秒更新一次。该行数仅用于展示当前的进度,不代表最终实际的处理行数。实际处理行数以EtlInfo中显示的数据为准。 |
取消导入任务
当Broker Load作业状态不为CANCELLED或FINISHED时,可以手动取消。取消时需要指定待取消导入任务的Label。
导入任务并发度
一个作业可以拆成一个或者多个任务,任务之间并行执行。拆分由LOAD语句中的DataDescription来决定。例如:
多个DataDescription对应导入多个不同的表,每个会拆成一个任务。
多个DataDescription对应导入同一个表的不同分区,每个也会拆成一个任务。
每个任务还会拆分成一个或者多个实例,然后将这些实例平均分配到BE上并行执行。实例的拆分由以下FE配置决定:
min_bytes_per_broker_scanner:单个实例处理的最小数据量,默认值为64 MB。
max_broker_concurrency:单个任务最大并发实例数,默认值为100。
load_parallel_instance_num:单个BE上并发实例数,默认值为1个。
实例总数的计算公式为实例的总数 = min(导入文件总大小/单个实例处理的最小数据量,单个任务最大并发实例数,单个BE上并发实例数 * BE数)
。
通常情况下,一个作业只有一个DataDescription,只会拆分成一个任务。任务会拆成与BE数相等的实例,然后分配到所有BE上并行执行。
导入示例
以下示例中的参数请根据实际情况替换。
阿里云OSS数据导入
创建测试表。
create database if not exists load_test; use load_test; create table if not exists customer( c_customer_sk bigint, c_customer_id char(16), c_current_cdemo_sk bigint, c_current_hdemo_sk bigint, c_current_addr_sk bigint, c_first_shipto_date_sk bigint, c_first_sales_date_sk bigint, c_salutation char(10), c_first_name char(20), c_last_name char(30), c_preferred_cust_flag char(1), c_birth_day int, c_birth_month int, c_birth_year int, c_birth_country varchar(20), c_login char(13), c_email_address char(50), c_last_review_date_sk bigint ) duplicate key (c_customer_sk) distributed by hash(c_customer_sk) buckets 5 properties( "replication_num"="1" );
创建导入任务。
下载customer.orc。
LOAD LABEL load_test.customer_label ( DATA INFILE("oss://{bucket_name}/data/customer.orc") INTO TABLE customer format as "orc" ) WITH BROKER 'broker' ( "fs.oss.accessKeyId" = "xxxxx", "fs.oss.accessKeySecret" = "xxxxx", "fs.oss.endpoint" = "oss-cn-xxx-internal.aliyuncs.com" );
查看导入任务状态。
show load where label='customer_label';
查询表信息。
示例1
select count(1) from customer;
返回信息如下。
+----------+ | count(1) | +----------+ | 6000000 | +----------+ 1 row in set (0.10 sec)
示例2
select * from customer limit 2;
返回信息如下。
+---------------+------------------+--------------------+--------------------+-------------------+------------------------+-----------------------+--------------+--------------+-------------+-----------------------+-------------+---------------+--------------+-----------------+---------+------------------------+-----------------------+ | c_customer_sk | c_customer_id | c_current_cdemo_sk | c_current_hdemo_sk | c_current_addr_sk | c_first_shipto_date_sk | c_first_sales_date_sk | c_salutation | c_first_name | c_last_name | c_preferred_cust_flag | c_birth_day | c_birth_month | c_birth_year | c_birth_country | c_login | c_email_address | c_last_review_date_sk | +---------------+------------------+--------------------+--------------------+-------------------+------------------------+-----------------------+--------------+--------------+-------------+-----------------------+-------------+---------------+--------------+-----------------+---------+------------------------+-----------------------+ | 2 | AAAAAAAACAAAAAAA | 819667 | 1461 | 681655 | 2452318 | 2452288 | Dr. | Amy | Moses | Y | 9 | 4 | 1966 | TOGO | NULL | Amy.M****@Ovk9KjHH.com | 2452318 | | 2 | AAAAAAAACAAAAAAA | 819667 | 1461 | 681655 | 2452318 | 2452288 | Dr. | Amy | Moses | Y | 9 | 4 | 1966 | TOGO | NULL | Amy.M****@Ovk9KjHH.com | 2452318 | +---------------+------------------+--------------------+--------------------+-------------------+------------------------+-----------------------+--------------+--------------+-------------+-----------------------+-------------+---------------+--------------+-----------------+---------+------------------------+-----------------------+
HDFS导入
HDFS导入语法示例
LOAD LABEL db1.label1 ( DATA INFILE("hdfs://emr-header-1.cluster-xxx:9000/user/hive/test.db/ml/file1") INTO TABLE tbl1 COLUMNS TERMINATED BY "," (tmp_c1, tmp_c2) SET ( id=tmp_c2, name=tmp_c1 ), DATA INFILE("hdfs://emr-header-1.cluster-xxx:9000/user/hive/test.db/ml/file2") INTO TABLE tbl2 COLUMNS TERMINATED BY "," (col1, col2) where col1 > 1 ) WITH BROKER 'broker' ( "username" = "hdfs_username", "password" = "hdfs_password" ) PROPERTIES ( "timeout" = "3600" );
说明如果您的集群类型是新版数据湖(DataLake)、实时数据流(DataFlow)、数据分析(OLAP)、数据服务(DataServing)和自定义场景的集群, 请替换emr-header-1为master-1-1。
HDFS认证
社区版本的HDFS支持简单认证和Kerberos认证两种认证方式。
简单认证(Simple):用户的身份由与HDFS建立链接的客户端操作系统决定。涉及参数如下表。
参数
描述
hadoop.security.authentication
认证方式。默认值为simple。
username
HDFS的用户名。
password
HDFS的密码。
Kerberos认证:客户端的身份由用户自己的Kerberos证书决定。
涉及参数如下表。
参数
描述
hadoop.security.authentication
认证方式。默认值为kerberos。
kerberos_principal
指定Kerberos的Principal。
kerberos_keytab
指定Kerberos的keytab文件路径。该文件必须为Broker进程所在服务器上的文件。
kerberos_keytab_content
指定Kerberos中keytab文件内容经过Base64编码之后的内容。
重要该参数和kerberos_keytab参数只需配置一个。
HDFS HA配置
通过配置NameNode HA,可以在NameNode切换时,自动识别到新的NameNode。配置以下参数用于访问以HA模式部署的HDFS集群。
参数
描述
dfs.nameservices
指定HDFS服务的名称,您可以自定义。
例如,设置dfs.nameservices为my_ha。
dfs.ha.namenodes.xxx
自定义NameNode的名称,多个名称时以逗号(,)分隔。其中xxx为dfs.nameservices中自定义的名称。
例如,设置dfs.ha.namenodes.my_ha为my_nn。
dfs.namenode.rpc-address.xxx.nn
指定NameNode的RPC地址信息。其中nn表示dfs.ha.namenodes.xxx中配置的NameNode的名称。
例如,设置dfs.namenode.rpc-address.my_ha.my_nn参数值的格式为host:port。
dfs.client.failover.proxy.provider
指定Client连接NameNode的Provider,默认值为org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider。