Broker Load is an asynchronous data import method based on the MySQL protocol. With Broker Load, StarRocks uses broker processes to read data from data sources such as Apache HDFS or OSS, leveraging their computing power to pre-process data in real time and efficiently perform import operations. This topic shows an example of how to use Broker Load.
Data file formats
Broker Load supports data file formats such as CSV, ORC, and Parquet. The recommended data size per import ranges from tens to hundreds of gigabytes (GB).
Broker load
Broker information
The Broker service is automatically deployed and started when you create an Alibaba Cloud EMR Serverless StarRocks instance. To view detailed information about all Brokers in the current instance, run the following SQL command.
SHOW PROC "/brokers";
Create load job
-
Syntax
LOAD LABEL [<database_name>.]<label_name> ( data_desc[, data_desc ...] ) WITH BROKER ( StorageCredentialParams ) [PROPERTIES ( opt_properties ) ] -
Parameters
-
<database_name>: Optional. The database where the destination StarRocks table resides. -
<label_name>: The label of the load job.Each load job has a unique label within its database. You can use the label to check the execution status of the corresponding job and prevent duplicate data loads. When a load job's state is FINISHED, its label cannot be reused for other jobs. When a load job's state is CANCELLED, its label can be reused, typically to retry the same job (that is, loading the same data with the same label) to achieve Exactly-Once semantics.
-
data_desc: Describes a batch of data to be loaded.Broker Load supports loading multiple data files in a single job. You can use multiple
data_descclauses to declare multiple data files, or use a singledata_descclause to declare all data files within a path. Broker Load ensures transactional atomicity. This means a job either loads all its data files successfully or fails entirely, preventing partial data loads.DATA INFILE ("<file_path>"[, "<file_path>" ...]) [NEGATIVE] INTO TABLE <table_name> [PARTITION (<partition1_name>[, <partition2_name> ...])] [TEMPORARY PARTITION (<temporary_partition1_name>[, <temporary_partition2_name> ...])] [COLUMNS TERMINATED BY "<column_separator>"] [ROWS TERMINATED BY "<row_separator>"] [FORMAT AS "CSV | Parquet | ORC"] [(format_type_options)] [(column_list)] [COLUMNS FROM PATH AS (<partition_field_name>[, <partition_field_name> ...])] [SET <k1=f1(v1)>[, <k2=f2(v2)> ...]] [WHERE predicate]This table describes key parameters in
data_desc.Parameter
Description
file_pathSpecifies the path to the source data file. The path can point to a specific file or use an asterisk (*) wildcard to specify all files in a directory. You can also use wildcards in intermediate directories.
Supported wildcards include
?,*,[],{}, and^. For usage rules, see FileSystem.For example, specifying the path
oss://bucket/data/tablename/*/*matches all files in all partitions underdata/tablename. Specifying the pathoss://bucket/data/tablename/dt=202104*/*matches all data files in partitions starting with202104under thedata/tablenamedirectory.NEGATIVERolls back a successfully loaded batch of data. To roll back a successful batch, load the same batch again with the
NEGATIVEkeyword.NoteThis parameter is applicable only when the destination StarRocks table is an aggregate table and the aggregation function for all of its value columns is
sum.PARTITIONSpecifies the partitions of the destination table.
If omitted, data is loaded into all table partitions by default.
COLUMNS TERMINATED BYSpecifies the column separator in the source files. If omitted, the default separator is
\t(Tab).Broker Load submits load requests over the MySQL protocol. Both StarRocks and the MySQL protocol perform character escaping. Therefore, if the column separator is an invisible character like a tab, you must escape it with an extra backslash (\). For example, if the separator is
\t, you must enter\\t. If the separator is\n, you must enter\\n. The column separator for Apache Hive™ files is\x01, so if your source file is a Hive file, you must specify\\x01.FORMAT ASSpecifies the format of the source files. Valid values are
CSV,Parquet, andORC. If this parameter is omitted, StarRocks infers the file format from the file extension infile_path, such as.csv,.parquet, or.orc.COLUMNS FROM PATH ASExtracts information for one or more partition fields from the specified file path. This parameter applies only when the specified file path contains partition fields.
For example, if the file path is
/path/col_name=col_value/file1, wherecol_namecan be mapped to a column in the destination StarRocks table, you can set this parameter tocol_name. During the load, StarRocks insertscol_valueinto the column that corresponds tocol_name.NoteThis parameter is available only for HDFS data loads.
SETTransforms a column from the source data file by using a specified function and inserts the result into the StarRocks table. The syntax is
column_name = expression.WHERESpecifies a filter condition to apply to the data after transformation. Only data that meets the conditions in the WHERE clause is loaded into the StarRocks table.
-
opt_properties: Specifies optional parameters for the load job. The specified settings apply to the entire job. The syntax is as follows:PROPERTIES ("<key1>" = "<value1>"[, "<key2>" = "<value2>" ...])The following table describes some of the parameters.
Parameter
Description
timeoutThe timeout for the load job, in seconds.
You can set a custom timeout for each load job in
opt_properties. If a load job does not complete within the specified time, the system cancels it and its state changes to CANCELLED. The default timeout for a Broker Load job is 4 hours.ImportantManually setting the timeout is usually unnecessary. However, you can set it if a job cannot be completed within the default period.
Recommended timeout calculation:
timeout > (total size of source files x number of destination tables and related materialized views) / average load speed.For example, you want to load a 1 GB data file into a table that has two rollup tables. The average load speed of your StarRocks instance is 10 MB/s. In this case, the calculated duration is
(1 x 1024 x 3) / 10 = 307.2 (seconds).Therefore, it is recommended to set the job timeout to a value greater than 308 seconds.
NoteThe slowest load speed varies by instance hardware and concurrent query workload. Estimate this speed based on the performance of historical load jobs.
max_filter_ratioThe maximum filter ratio for the load job, which ranges from 0 to 1. The default is 0, meaning no tolerance for errors. If a job's error rate exceeds this value, the job fails. To ignore bad rows and ensure the load can succeed, set this parameter to a value greater than 0.
Formula:
max_filter_ratio = (dpp.abnorm.ALL / (dpp.abnorm.ALL + dpp.norm.ALL ) )dpp.abnorm.ALLrepresents the number of rows with data quality issues, such as type mismatches, column count mismatches, or length mismatches.dpp.norm.ALLrepresents the number of valid rows loaded. You can check the number of valid rows for a load job by using theSHOW LOADcommand.The sum of
dpp.abnorm.ALLanddpp.norm.ALLequals the total number of rows to be loaded.load_mem_limitSpecifies the memory limit for a load job in bytes. This limit cannot exceed the memory limit of a BE or CN.
strict_modeSpecifies whether to enable strict mode. Valid values:
-
true: Enables strict mode. -
false(Default): Disables strict mode.
Strict mode enforces strict filtering on column type conversions during the load process. The filtering policies are as follows:
-
If strict mode is enabled, StarRocks filters out rows with errors, loads only the valid rows, and returns details about the invalid data.
-
If you disable strict mode, StarRocks converts fields that fail to convert to
NULLvalues, and imports the error rows containing theseNULLvalues along with the valid data rows.
-
-
Load job status
use <database_name>;
SHOW LOAD;
This table describes the returned parameters.
|
Parameter |
Description |
|
|
The unique ID of the load job. Each JobId is unique and is automatically generated by the system. Unlike a Label, a JobId is never the same, whereas a Label can be reused after a load job fails. |
|
|
The identifier of the load job. |
|
|
The current state of the load job. Valid values:
|
|
|
The progress of the load job. A Broker Load job has only the The If all tables have finished loading, the LOAD progress shows 99%. The job then enters the final commit phase. The progress changes to 100% only after the entire job is complete. Important
The loading progress is not linear. If the progress does not change for a period, it does not mean the job is not running. |
|
|
The type of the load job. For Broker Load, the type is BROKER. |
|
|
Displays key data volume metrics: You can use the value of |
|
|
Displays the parameters of the current load job that you specified at creation, including cluster, timeout, and max-filter-ratio. |
|
|
The reason the job failed. When the job state is
|
|
|
The time when the load job was created. |
|
|
Because Broker Load does not have an ETL phase, this value is the same as |
|
|
Because Broker Load does not have an ETL phase, this value is the same as |
|
|
The time when the |
|
|
The time when the load job was completed. |
|
|
The URL to access the data that failed the quality check. You can use |
|
|
Other information about the load job, including:
|
Cancel load job
If a load job's state is not CANCELLED or FINISHED, you can cancel it by using the CANCEL LOAD statement.
CANCEL LOAD FROM <database_name> WHERE LABEL = "<label_name>";
Load job concurrency
A load job can be split into one or more parallel tasks. The DataDescription clauses in the LOAD statement determine this split.
-
Multiple
DataDescriptionclauses that load data into different tables will each be split into a separate task. -
Multiple
DataDescriptionclauses that load data into different partitions of the same table will also each be split into a separate task.
Each task is also split into one or more instances, which are distributed evenly across the BEs for parallel execution. The following FE configuration parameters determine how these instances are split:
-
min_bytes_per_broker_scanner: The minimum amount of data processed by a single instance. The default is 64 MB. -
max_broker_concurrency: The maximum number of concurrent instances for a single task. The default is 100. -
load_parallel_instance_num: The number of concurrent instances on a single BE. The default is 1.
The total number of instances is calculated as follows: Total instances = min(total source file size / min_bytes_per_broker_scanner, max_broker_concurrency, load_parallel_instance_num * number of BEs).
Typically, a job has only one DataDescription and is split into only one task. This task is then split into a number of instances equal to the number of BEs and distributed across them for parallel execution.
Import examples
Connect to your StarRocks instance with EMR StarRocks Manager and run the following SQL statements on the SQL Editor page.
Load data from Alibaba Cloud OSS
-
Create a test table.
create database if not exists load_test; use load_test; create table if not exists customer( c_customer_sk bigint, c_customer_id char(16), c_current_cdemo_sk bigint, c_current_hdemo_sk bigint, c_current_addr_sk bigint, c_first_shipto_date_sk bigint, c_first_sales_date_sk bigint, c_salutation char(10), c_first_name char(20), c_last_name char(30), c_preferred_cust_flag char(1), c_birth_day int, c_birth_month int, c_birth_year int, c_birth_country varchar(20), c_login char(13), c_email_address char(50), c_last_review_date_sk bigint ) duplicate key (c_customer_sk) distributed by hash(c_customer_sk) buckets 5 properties( "replication_num"="1" ); -
Create an import job.
Download the customer.orc data file and upload it to Alibaba Cloud OSS. Then, run the following command to create the import job.
LOAD LABEL load_test.customer_label ( DATA INFILE("<file_path>") INTO TABLE customer format as "orc" ) WITH BROKER 'broker' ( "fs.oss.accessKeyId" = "xxxxx", "fs.oss.accessKeySecret" = "xxxxx", "fs.oss.endpoint" = "oss-cn-xxx-internal.aliyuncs.com" );Replace the placeholders in the command with your values as described in the following table.
Parameter
Description
<file_path>The path to the
customer.orcfile. For example,oss://<yourBucketName>/data/customer.orc.fs.oss.accessKeyIdThe AccessKey ID of your Alibaba Cloud account or RAM user. To get the AccessKey ID, go to the AccessKey Management page.
fs.oss.accessKeySecretThe AccessKey Secret that corresponds to the AccessKey ID.
fs.oss.endpointThe endpoint to access Alibaba Cloud OSS. For example, oss-cn-hangzhou-internal.aliyuncs.com.
If your StarRocks instance and Alibaba Cloud OSS are in the same region, use a VPC endpoint. Otherwise, use a public network endpoint. For more information, see Regions and endpoints.
-
Check the import job status.
use load_test; show load where label='customer_label'; -
Query the table.
-
Example 1: Count the total number of rows in the
customertable of theload_testdatabase.select count(1) from load_test.customer; -
Example 2: Display the first two complete records from the
customertable of theload_testdatabase.select * from load_test.customer limit 2;
-
Load data from HDFS
Before you load data from HDFS, note the following:
-
The HDFS cluster and the StarRocks instance must be in the same VPC and the same availability zone.
-
After creating an HDFS cluster, you must open all DataNode ports in the security group to access HDFS data. This example uses a DataLake cluster in EMR on ECS that includes the HDFS service. For details about how to configure the security group, see Manage security groups.
HDFS load example
-
Create a database and a table.
CREATE DATABASE IF NOT EXISTS mydatabase; CREATE TABLE if NOT EXISTS mydatabase.userdata_broker_load ( userId INT, userName VARCHAR(20), registrationDate DATE ) ENGINE = OLAP DUPLICATE KEY(userId) DISTRIBUTED BY HASH(userId); -
Create an import job.
Download the data file user_data.parquet and upload it to the
/datadirectory in HDFS. Then, run the following command to create an import job.LOAD LABEL mydatabase.userdata_broker_load_label ( DATA INFILE("hdfs://<hdfs_ip>:<hdfs_port>/data/user_data.parquet") INTO TABLE userdata_broker_load format AS "parquet" ) WITH BROKER PROPERTIES ( "timeout" = "72000" );Replace the following parameters with your values.
Parameter
Description
<hdfs_ip>The private IP address of the HDFS cluster's NameNode.
If you use an EMR on ECS cluster (DataLake or Custom type) that includes the HDFS service, go to the Nodes tab and find the private IP address in the Master node group.
<hdfs_port>The port that the NameNode service listens on. The default value is
9000. -
Check the import job status.
USE mydatabase; SHOW LOAD WHERE label='userdata_broker_load_label'; -
Check the loaded data.
SELECT * FROM mydatabase.userdata_broker_load;
HDFS authentication methods
In the community edition, HDFS supports two authentication methods: simple authentication and Kerberos authentication.
-
Simple authentication: The client's operating system determines the user identity when connecting to HDFS.
For simple authentication, configure
StorageCredentialParamsas follows:"hadoop.security.authentication" = "simple", "username" = "<hdfs_username>", "password" = "<hdfs_password>"StorageCredentialParamsincludes the following parameters.Parameter
Description
hadoop.security.authenticationThe authentication method. Valid values are
simpleandkerberos. Default:simple.simpleindicates simple authentication.kerberosindicates Kerberos authentication.usernameThe username to access the HDFS cluster's NameNode.
passwordThe password for accessing the HDFS cluster's NameNode.
-
Kerberos authentication: Your Kerberos credentials determine the client's identity.
For Kerberos authentication, go to the Instance Configuration page of your StarRocks instance and add the following configuration to the
hdfs-site.xmlfile."hadoop.security.authentication" = "kerberos", "kerberos_principal" = "nn/zelda1@ZELDA.COM", "kerberos_keytab" = "/keytab/hive.keytab", "kerberos_keytab_content" = "YWFhYWFh"The following table describes the parameters.
Parameter
Description
hadoop.security.authenticationThe authentication method. Valid values are
simpleandkerberos. Default:simple.simpleindicates simple authentication.kerberosindicates Kerberos authentication.kerberos_principalThe Kerberos user or service principal. Each principal is unique within the HDFS cluster and consists of three parts:
-
usernameorservicename: The name of the user or service in the HDFS cluster. -
instance: The name of the server where the authenticated node runs. This ensures the user or service is globally unique. For example, if an HDFS cluster has multiple DataNode nodes, each node requires separate authentication. -
realm: The realm, which must be in uppercase.
For example,
nn/zelda1@ZELDA.COM.kerberos_keytabSpecifies the path to the Kerberos keytab file. This file must be on the server where the broker process runs.
kerberos_keytab_contentSpecifies the Base64-encoded content of the Kerberos keytab file.
ImportantSpecify only one of
kerberos_keytaborkerberos_keytab_content. -
HDFS HA configuration
Configuring NameNode High Availability (HA) enables StarRocks to automatically identify the new active NameNode during a failover. To access an HDFS cluster deployed in HA mode, go to the Instance Configuration page of your StarRocks instance and add the following configuration to the hdfs-site.xml file.
"dfs.nameservices" = "ha_cluster",
"dfs.ha.namenodes.ha_cluster" = "ha_n1,ha_n2",
"dfs.namenode.rpc-address.ha_cluster.ha_n1" = "<hdfs_host>:<hdfs_port>",
"dfs.namenode.rpc-address.ha_cluster.ha_n2" = "<hdfs_host>:<hdfs_port>",
"dfs.client.failover.proxy.provider.ha_cluster" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
The following table describes the parameters.
|
Parameter |
Description |
|
|
Specifies a logical name for the HDFS service, which you can customize. For example, set |
|
|
The logical names for the NameNodes, separated by commas (,). In this parameter, For example, set |
|
|
Specifies the RPC address information for the NameNode. Here, nn represents the name of the NameNode configured in For example, set the |
|
|
The provider class that the client uses to connect to the NameNode. The default value is |
When using an EMR on ECS cluster that includes the HDFS service, go to the Services tab for the target cluster. Then, open the Configure tab of the HDFS service and find the parameter values in the hdfs-site.xml file.