Broker Load

更新时间:
复制 MD 格式

Broker Load is an asynchronous data import method based on the MySQL protocol. With Broker Load, StarRocks uses broker processes to read data from data sources such as Apache HDFS or OSS, leveraging their computing power to pre-process data in real time and efficiently perform import operations. This topic shows an example of how to use Broker Load.

Data file formats

Broker Load supports data file formats such as CSV, ORC, and Parquet. The recommended data size per import ranges from tens to hundreds of gigabytes (GB).

Broker load

Broker information

The Broker service is automatically deployed and started when you create an Alibaba Cloud EMR Serverless StarRocks instance. To view detailed information about all Brokers in the current instance, run the following SQL command.

SHOW PROC "/brokers";

Create load job

  • Syntax

    LOAD LABEL [<database_name>.]<label_name>
    (
        data_desc[, data_desc ...]
    )
    WITH BROKER
    (
        StorageCredentialParams
    )
    [PROPERTIES
    (
        opt_properties
    )
    ]
  • Parameters

    • <database_name>: Optional. The database where the destination StarRocks table resides.

    • <label_name>: The label of the load job.

      Each load job has a unique label within its database. You can use the label to check the execution status of the corresponding job and prevent duplicate data loads. When a load job's state is FINISHED, its label cannot be reused for other jobs. When a load job's state is CANCELLED, its label can be reused, typically to retry the same job (that is, loading the same data with the same label) to achieve Exactly-Once semantics.

    • data_desc: Describes a batch of data to be loaded.

      Broker Load supports loading multiple data files in a single job. You can use multiple data_desc clauses to declare multiple data files, or use a single data_desc clause to declare all data files within a path. Broker Load ensures transactional atomicity. This means a job either loads all its data files successfully or fails entirely, preventing partial data loads.

      DATA INFILE ("<file_path>"[, "<file_path>" ...])
      [NEGATIVE]
      INTO TABLE <table_name>
      [PARTITION (<partition1_name>[, <partition2_name> ...])]
      [TEMPORARY PARTITION (<temporary_partition1_name>[, <temporary_partition2_name> ...])]
      [COLUMNS TERMINATED BY "<column_separator>"]
      [ROWS TERMINATED BY "<row_separator>"]
      [FORMAT AS "CSV | Parquet | ORC"]
      [(format_type_options)]
      [(column_list)]
      [COLUMNS FROM PATH AS (<partition_field_name>[, <partition_field_name> ...])]
      [SET <k1=f1(v1)>[, <k2=f2(v2)> ...]]
      [WHERE predicate]

      This table describes key parameters in data_desc.

      Parameter

      Description

      file_path

      Specifies the path to the source data file. The path can point to a specific file or use an asterisk (*) wildcard to specify all files in a directory. You can also use wildcards in intermediate directories.

      Supported wildcards include ?, *, [], {}, and ^. For usage rules, see FileSystem.

      For example, specifying the path oss://bucket/data/tablename/*/* matches all files in all partitions under data/tablename. Specifying the path oss://bucket/data/tablename/dt=202104*/* matches all data files in partitions starting with 202104 under the data/tablename directory.

      NEGATIVE

      Rolls back a successfully loaded batch of data. To roll back a successful batch, load the same batch again with the NEGATIVE keyword.

      Note

      This parameter is applicable only when the destination StarRocks table is an aggregate table and the aggregation function for all of its value columns is sum.

      PARTITION

      Specifies the partitions of the destination table.

      If omitted, data is loaded into all table partitions by default.

      COLUMNS TERMINATED BY

      Specifies the column separator in the source files. If omitted, the default separator is \t (Tab).

      Broker Load submits load requests over the MySQL protocol. Both StarRocks and the MySQL protocol perform character escaping. Therefore, if the column separator is an invisible character like a tab, you must escape it with an extra backslash (\). For example, if the separator is \t, you must enter \\t. If the separator is \n, you must enter \\n. The column separator for Apache Hive™ files is \x01, so if your source file is a Hive file, you must specify \\x01.

      FORMAT AS

      Specifies the format of the source files. Valid values are CSV, Parquet, and ORC. If this parameter is omitted, StarRocks infers the file format from the file extension in file_path, such as .csv, .parquet, or .orc.

      COLUMNS FROM PATH AS

      Extracts information for one or more partition fields from the specified file path. This parameter applies only when the specified file path contains partition fields.

      For example, if the file path is /path/col_name=col_value/file1, where col_name can be mapped to a column in the destination StarRocks table, you can set this parameter to col_name. During the load, StarRocks inserts col_value into the column that corresponds to col_name.

      Note

      This parameter is available only for HDFS data loads.

      SET

      Transforms a column from the source data file by using a specified function and inserts the result into the StarRocks table. The syntax is column_name = expression.

      WHERE

      Specifies a filter condition to apply to the data after transformation. Only data that meets the conditions in the WHERE clause is loaded into the StarRocks table.

    • opt_properties: Specifies optional parameters for the load job. The specified settings apply to the entire job. The syntax is as follows:

      PROPERTIES ("<key1>" = "<value1>"[, "<key2>" = "<value2>" ...])

      The following table describes some of the parameters.

      Parameter

      Description

      timeout

      The timeout for the load job, in seconds.

      You can set a custom timeout for each load job in opt_properties. If a load job does not complete within the specified time, the system cancels it and its state changes to CANCELLED. The default timeout for a Broker Load job is 4 hours.

      Important

      Manually setting the timeout is usually unnecessary. However, you can set it if a job cannot be completed within the default period.

      Recommended timeout calculation: timeout > (total size of source files x number of destination tables and related materialized views) / average load speed.

      For example, you want to load a 1 GB data file into a table that has two rollup tables. The average load speed of your StarRocks instance is 10 MB/s. In this case, the calculated duration is (1 x 1024 x 3) / 10 = 307.2 (seconds).

      Therefore, it is recommended to set the job timeout to a value greater than 308 seconds.

      Note

      The slowest load speed varies by instance hardware and concurrent query workload. Estimate this speed based on the performance of historical load jobs.

      max_filter_ratio

      The maximum filter ratio for the load job, which ranges from 0 to 1. The default is 0, meaning no tolerance for errors. If a job's error rate exceeds this value, the job fails. To ignore bad rows and ensure the load can succeed, set this parameter to a value greater than 0.

      Formula: max_filter_ratio = (dpp.abnorm.ALL / (dpp.abnorm.ALL + dpp.norm.ALL ) )

      dpp.abnorm.ALL represents the number of rows with data quality issues, such as type mismatches, column count mismatches, or length mismatches. dpp.norm.ALL represents the number of valid rows loaded. You can check the number of valid rows for a load job by using the SHOW LOAD command.

      The sum of dpp.abnorm.ALL and dpp.norm.ALL equals the total number of rows to be loaded.

      load_mem_limit

      Specifies the memory limit for a load job in bytes. This limit cannot exceed the memory limit of a BE or CN.

      strict_mode

      Specifies whether to enable strict mode. Valid values:

      • true: Enables strict mode.

      • false (Default): Disables strict mode.

      Strict mode enforces strict filtering on column type conversions during the load process. The filtering policies are as follows:

      • If strict mode is enabled, StarRocks filters out rows with errors, loads only the valid rows, and returns details about the invalid data.

      • If you disable strict mode, StarRocks converts fields that fail to convert to NULL values, and imports the error rows containing these NULL values along with the valid data rows.

Load job status

use <database_name>;
SHOW LOAD;

This table describes the returned parameters.

Parameter

Description

JobId

The unique ID of the load job. Each JobId is unique and is automatically generated by the system. Unlike a Label, a JobId is never the same, whereas a Label can be reused after a load job fails.

Label

The identifier of the load job.

State

The current state of the load job. Valid values:

  • PENDING: The load job is created.

  • QUEUEING: The load job is waiting to be executed.

  • LOADING: The load job is running.

  • CANCELLED: The load job has failed.

  • FINISHED: The load job has succeeded.

Progress

The progress of the load job. A Broker Load job has only the LOAD phase, which corresponds to the LOADING state. The LOAD progress ranges from 0% to 100%.

The LOAD progress is calculated as follows: LOAD progress = (number of currently loaded tables / total number of tables in the load job) * 100%.

If all tables have finished loading, the LOAD progress shows 99%. The job then enters the final commit phase. The progress changes to 100% only after the entire job is complete.

Important

The loading progress is not linear. If the progress does not change for a period, it does not mean the job is not running.

Type

The type of the load job. For Broker Load, the type is BROKER.

EtlInfo

Displays key data volume metrics: unselected.rows, dpp.norm.ALL, and dpp.abnorm.ALL.

You can use the value of unselected.rows to determine how many rows were filtered by the WHERE clause. The dpp.norm.ALL and dpp.abnorm.ALL metrics can be used to verify if the job's error rate exceeded the max_filter_ratio. The sum of these three metrics equals the total number of rows in the source data.

TaskInfo

Displays the parameters of the current load job that you specified at creation, including cluster, timeout, and max-filter-ratio.

ErrorMsg

The reason the job failed. When the job state is PENDING, LOADING, or FINISHED, this parameter is NULL. When the job state is CANCELLED, this parameter includes two parts: type and msg.

  • type can be one of the following values:

    • USER_CANCEL: The job was canceled by a user.

    • ETL_SUBMIT_FAIL: The load job submission failed.

    • ETL_QUALITY_UNSATISFIED: Data quality did not meet the max-filter-ratio requirement.

    • LOAD-RUN-FAIL: The load job failed during the LOAD phase.

    • TIMEOUT: The load job timed out.

    • UNKNOWN: An unknown load error occurred.

  • msg displays detailed information about the failure.

CreateTime

The time when the load job was created.

EtlStartTime

Because Broker Load does not have an ETL phase, this value is the same as LoadStartTime.

EtlFinishTime

Because Broker Load does not have an ETL phase, this value is the same as LoadStartTime.

LoadStartTime

The time when the LOAD phase started.

LoadFinishTime

The time when the load job was completed.

URL

The URL to access the data that failed the quality check. You can use curl or wget to open this URL. If there is no such data in the job, this parameter is NULL.

JobDetails

Other information about the load job, including:

  • Unfinished backends: The IDs of the BE nodes where the load has not completed.

  • ScannedRows: The number of rows actually processed, including both loaded and filtered rows.

  • TaskNumber: The number of sub-tasks.

  • All backends: The IDs of the BE nodes that are running sub-tasks.

  • FileNumber: The number of source files.

  • FileSize: The total size of all source files. Unit: bytes.

Cancel load job

If a load job's state is not CANCELLED or FINISHED, you can cancel it by using the CANCEL LOAD statement.

CANCEL LOAD FROM <database_name> WHERE LABEL = "<label_name>";

Load job concurrency

A load job can be split into one or more parallel tasks. The DataDescription clauses in the LOAD statement determine this split.

  • Multiple DataDescription clauses that load data into different tables will each be split into a separate task.

  • Multiple DataDescription clauses that load data into different partitions of the same table will also each be split into a separate task.

Each task is also split into one or more instances, which are distributed evenly across the BEs for parallel execution. The following FE configuration parameters determine how these instances are split:

  • min_bytes_per_broker_scanner: The minimum amount of data processed by a single instance. The default is 64 MB.

  • max_broker_concurrency: The maximum number of concurrent instances for a single task. The default is 100.

  • load_parallel_instance_num: The number of concurrent instances on a single BE. The default is 1.

The total number of instances is calculated as follows: Total instances = min(total source file size / min_bytes_per_broker_scanner, max_broker_concurrency, load_parallel_instance_num * number of BEs).

Typically, a job has only one DataDescription and is split into only one task. This task is then split into a number of instances equal to the number of BEs and distributed across them for parallel execution.

Import examples

Connect to your StarRocks instance with EMR StarRocks Manager and run the following SQL statements on the SQL Editor page.

Load data from Alibaba Cloud OSS

  1. Create a test table.

    create database if not exists load_test;
    use load_test;
    create table if not exists customer(
      c_customer_sk bigint,
      c_customer_id char(16),
      c_current_cdemo_sk bigint,
      c_current_hdemo_sk bigint,
      c_current_addr_sk bigint,
      c_first_shipto_date_sk bigint,
      c_first_sales_date_sk bigint,
      c_salutation char(10),
      c_first_name char(20),
      c_last_name char(30),
      c_preferred_cust_flag char(1),
      c_birth_day int,
      c_birth_month int,
      c_birth_year int,
      c_birth_country varchar(20),
      c_login char(13),
      c_email_address char(50),
      c_last_review_date_sk bigint
    )
    duplicate key (c_customer_sk)
    distributed by hash(c_customer_sk) buckets 5
    properties(
      "replication_num"="1"
    );
  2. Create an import job.

    Download the customer.orc data file and upload it to Alibaba Cloud OSS. Then, run the following command to create the import job.

    LOAD LABEL load_test.customer_label
    (
      DATA INFILE("<file_path>")
      INTO TABLE customer
      format as "orc"
    )
    WITH BROKER 'broker'
    (
      "fs.oss.accessKeyId" = "xxxxx",
      "fs.oss.accessKeySecret" = "xxxxx",
      "fs.oss.endpoint" = "oss-cn-xxx-internal.aliyuncs.com"
    );

    Replace the placeholders in the command with your values as described in the following table.

    Parameter

    Description

    <file_path>

    The path to the customer.orc file. For example, oss://<yourBucketName>/data/customer.orc.

    fs.oss.accessKeyId

    The AccessKey ID of your Alibaba Cloud account or RAM user. To get the AccessKey ID, go to the AccessKey Management page.

    fs.oss.accessKeySecret

    The AccessKey Secret that corresponds to the AccessKey ID.

    fs.oss.endpoint

    The endpoint to access Alibaba Cloud OSS. For example, oss-cn-hangzhou-internal.aliyuncs.com.

    If your StarRocks instance and Alibaba Cloud OSS are in the same region, use a VPC endpoint. Otherwise, use a public network endpoint. For more information, see Regions and endpoints.

  3. Check the import job status.

    use load_test;
    show load where label='customer_label';
  4. Query the table.

    • Example 1: Count the total number of rows in the customer table of the load_test database.

      select count(1) from load_test.customer;
    • Example 2: Display the first two complete records from the customer table of the load_test database.

      select * from load_test.customer limit 2;

Load data from HDFS

Note

Before you load data from HDFS, note the following:

  • The HDFS cluster and the StarRocks instance must be in the same VPC and the same availability zone.

  • After creating an HDFS cluster, you must open all DataNode ports in the security group to access HDFS data. This example uses a DataLake cluster in EMR on ECS that includes the HDFS service. For details about how to configure the security group, see Manage security groups.

HDFS load example

  1. Create a database and a table.

    CREATE DATABASE IF NOT EXISTS mydatabase;
    
    CREATE TABLE if NOT EXISTS mydatabase.userdata_broker_load (
        userId INT,
        userName VARCHAR(20),
        registrationDate DATE
    )
    ENGINE = OLAP
    DUPLICATE KEY(userId)
    DISTRIBUTED BY HASH(userId);
  2. Create an import job.

    Download the data file user_data.parquet and upload it to the /data directory in HDFS. Then, run the following command to create an import job.

    LOAD LABEL mydatabase.userdata_broker_load_label
    (
      DATA INFILE("hdfs://<hdfs_ip>:<hdfs_port>/data/user_data.parquet")
      INTO TABLE userdata_broker_load
      format AS "parquet"
    )
    WITH BROKER
    PROPERTIES
    (
        "timeout" = "72000"
    );

    Replace the following parameters with your values.

    Parameter

    Description

    <hdfs_ip>

    The private IP address of the HDFS cluster's NameNode.

    If you use an EMR on ECS cluster (DataLake or Custom type) that includes the HDFS service, go to the Nodes tab and find the private IP address in the Master node group.

    <hdfs_port>

    The port that the NameNode service listens on. The default value is 9000.

  3. Check the import job status.

    USE mydatabase;
    SHOW LOAD WHERE label='userdata_broker_load_label';
  4. Check the loaded data.

    SELECT * FROM mydatabase.userdata_broker_load;

HDFS authentication methods

In the community edition, HDFS supports two authentication methods: simple authentication and Kerberos authentication.

  • Simple authentication: The client's operating system determines the user identity when connecting to HDFS.

    For simple authentication, configure StorageCredentialParams as follows:

    "hadoop.security.authentication" = "simple",
    "username" = "<hdfs_username>",
    "password" = "<hdfs_password>"

    StorageCredentialParams includes the following parameters.

    Parameter

    Description

    hadoop.security.authentication

    The authentication method. Valid values are simple and kerberos. Default: simple. simple indicates simple authentication. kerberos indicates Kerberos authentication.

    username

    The username to access the HDFS cluster's NameNode.

    password

    The password for accessing the HDFS cluster's NameNode.

  • Kerberos authentication: Your Kerberos credentials determine the client's identity.

    For Kerberos authentication, go to the Instance Configuration page of your StarRocks instance and add the following configuration to the hdfs-site.xml file.

    "hadoop.security.authentication" = "kerberos",
    "kerberos_principal" = "nn/zelda1@ZELDA.COM",
    "kerberos_keytab" = "/keytab/hive.keytab",
    "kerberos_keytab_content" = "YWFhYWFh"

    The following table describes the parameters.

    Parameter

    Description

    hadoop.security.authentication

    The authentication method. Valid values are simple and kerberos. Default: simple. simple indicates simple authentication. kerberos indicates Kerberos authentication.

    kerberos_principal

    The Kerberos user or service principal. Each principal is unique within the HDFS cluster and consists of three parts:

    • username or servicename: The name of the user or service in the HDFS cluster.

    • instance: The name of the server where the authenticated node runs. This ensures the user or service is globally unique. For example, if an HDFS cluster has multiple DataNode nodes, each node requires separate authentication.

    • realm: The realm, which must be in uppercase.

    For example, nn/zelda1@ZELDA.COM.

    kerberos_keytab

    Specifies the path to the Kerberos keytab file. This file must be on the server where the broker process runs.

    kerberos_keytab_content

    Specifies the Base64-encoded content of the Kerberos keytab file.

    Important

    Specify only one of kerberos_keytab or kerberos_keytab_content.

HDFS HA configuration

Configuring NameNode High Availability (HA) enables StarRocks to automatically identify the new active NameNode during a failover. To access an HDFS cluster deployed in HA mode, go to the Instance Configuration page of your StarRocks instance and add the following configuration to the hdfs-site.xml file.

"dfs.nameservices" = "ha_cluster",
"dfs.ha.namenodes.ha_cluster" = "ha_n1,ha_n2",
"dfs.namenode.rpc-address.ha_cluster.ha_n1" = "<hdfs_host>:<hdfs_port>",
"dfs.namenode.rpc-address.ha_cluster.ha_n2" = "<hdfs_host>:<hdfs_port>",
"dfs.client.failover.proxy.provider.ha_cluster" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"

The following table describes the parameters.

Parameter

Description

dfs.nameservices

Specifies a logical name for the HDFS service, which you can customize.

For example, set dfs.nameservices to my_ha.

dfs.ha.namenodes.xxx

The logical names for the NameNodes, separated by commas (,). In this parameter, xxx represents the name you set for dfs.nameservices.

For example, set dfs.ha.namenodes.my_ha to my_nn1,my_nn2.

dfs.namenode.rpc-address.xxx.nn

Specifies the RPC address information for the NameNode. Here, nn represents the name of the NameNode configured in dfs.ha.namenodes.xxx.

For example, set the dfs.namenode.rpc-address.my_ha.my_nn parameter to <hdfs_host>:<hdfs_port>.

dfs.client.failover.proxy.provider.xxx

The provider class that the client uses to connect to the NameNode. The default value is org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider.

Note

When using an EMR on ECS cluster that includes the HDFS service, go to the Services tab for the target cluster. Then, open the Configure tab of the HDFS service and find the parameter values in the hdfs-site.xml file.