Import data in batches

更新时间:
复制 MD 格式

Lindorm provides the bulkload feature to import data quickly and reliably. Bulkload uses bypass mode to load data files directly into Lindorm, without using the standard data write path or the computing resources of your instance. You can submit a bulkload job through the Lindorm Tunnel Service (LTS) console or the API.

Advantages

Compared to importing data through an API, the bulkload feature provides the following advantages:

  • Faster imports: Imports data more than 10 times faster.

  • Stable online services: Does not consume online service resources, so running services are not affected.

  • Flexible resource separation: Separates online and offline resources for flexible resource usage.

  • Multiple data sources: Supports importing data from CSV, ORC, Parquet, and MaxCompute sources.

  • No code required: Loads data in batches in bypass mode without any code.

  • Cost-effective: LTS uses serverless Spark to provide elastic computing resources for bulkload operations. Resources are scaled as needed and billed on a pay-as-you-go basis, so you do not need to provision long-running computing resources.

Prerequisites

Before you import data in batches, make sure that the following requirements are met:

Supported data sources

The following table lists the supported source and destination data sources.

Source data source Destination data source
MaxCompute Table LindormTable
HDFS CSV or OSS CSV LindormTable
HDFS Parquet or OSS Parquet LindormTable
HDFS ORC or OSS ORC LindormTable

Submission methods

You can submit a bulkload job in one of the following ways:

Submit a job using the LTS console

Step 1: Add data sources

  1. Log on to the LTS console. For more information, see Activate and log on to LTS.

  2. In the navigation pane on the left, choose Data Source Management > Add Data Source to add the following data sources:

Step 2: Create and configure a bulkload job

  1. In the navigation pane on the left, choose Import To Lindorm/HBase > Universal Import.

    • For LTS versions earlier than 3.8.12.4.3, choose Import To Lindorm/HBase > Bulkload.

    • To view the LTS version, go to the Configuration Information section on the Instance Details page in the Lindorm console.

  2. Click Create Job and configure the following parameters.

Data source settings

Configuration item Parameter Description
Select Data Source Source Data Source Select the ODPS or HDFS data source that you added.
Destination Data Source Select the Lindorm wide table data source that you added.

Reader configuration

The Plugin Configuration > Reader Configuration parameters depend on the source data source type.

MaxCompute (ODPS) source
Parameter Description
table The name of the MaxCompute (ODPS) table.
column The names of the MaxCompute (ODPS) columns to import.
partition The partition information. Leave this empty for a non-partitioned table.
numPartitions The degree of parallelism for reading data.
CSV source (HDFS or OSS)
Parameter Description
filePath The directory where the CSV file is located.
header Specifies whether the CSV file contains a header row.
delimiter The delimiter used in the CSV file.
column The column names and their corresponding types in the CSV file.
Parquet source (HDFS or OSS)
Parameter Description
filePath The directory where the Parquet file is located.
column The column names in the Parquet file.

For configuration examples, see Reader plugin configuration examples.

Writer configuration

The Plugin Configuration > Writer Configuration parameters apply to all source types.

Parameter Description
namespace The namespace of the Lindorm wide table.
lindormTable The name of the Lindorm wide table.
compression The compression algorithm. Currently, only zstd is supported. To disable compression, set this to none.
columns The column mapping. The configuration depends on the destination table type. See Column mapping.
timestamp The timestamp of the data in the Lindorm wide table. Supports the following types: a Long type with a 13-digit value, or a String type in the yyyy-MM-dd HH:mm:ss or yyyy-MM-dd HH:mm:ss SSS format.
Column mapping
  • Lindorm wide tables: Specify the column names of the Lindorm SQL wide table. The columns must correspond to the columns in the reader configuration.

  • HBase-compatible Lindorm tables: Specify the standard column names of the HBase table. The columns must correspond to the columns in the reader configuration.

For configuration examples, see Writer plugin configuration examples.

Job running parameters

Configuration item Parameter Description
Job Running Parameter Configuration Spark Driver Specification Select the Spark driver specification.
Spark Executor Specification Select the Spark executor specification.
Number Of Executors Enter the number of executors.
Spark Configuration Enter the Spark configuration. This parameter is optional.

Step 3: Submit and monitor the job

  1. Click Create.

  2. On the Bulkload page, click the Job Name to view the job details.

    • Click the Job Name to view the Spark UI of the job.

    • Click Details to view the execution logs of the job.

Job details page

If data is evenly distributed across partitions in the destination Lindorm wide table, it takes about 1 hour to import 100 GB of data with a 4:1 compression ratio. The actual time may vary.

Configuration examples

Reader plugin configuration examples

MaxCompute (ODPS) data source

{
  "table": "test",
  "column": [
    "id",
    "intcol",
    "doublecol",
    "stringcol",
    "string1col",
    "decimalcol"
  ],
  "partition": [
    "pt=1"
  ],
  "numPartitions": 10
}

CSV file in an HDFS data source

{
  "filePath": "csv/",
  "header": false,
  "delimiter": ",",
  "column": [
    "id|string",
    "intcol|int",
    "doublecol|double",
    "stringcol|string",
    "string1col|string",
    "decimalcol|decimal"
  ]
}

Parquet file in an HDFS data source

{
  "filePath": "parquet/",
  "column": [
    "id",
    "intcol",
    "doublecol",
    "stringcol",
    "string1col",
    "decimalcol"
  ]
}

Writer plugin configuration examples

Lindorm SQL table

{
  "namespace": "default",
  "lindormTable": "xxx",
  "compression": "zstd",
  "timestamp": "2022-07-01 10:00:00",
  "columns": [
    "id",
    "intcol",
    "doublecol",
    "stringcol",
    "string1col",
    "decimalcol"
  ]
}

Lindorm table compatible with HBase

In this format, ROW represents the row key and the value after || indicates the data type. For non-row-key columns, use the column family:column name||column type format.

{
  "namespace": "default",
  "lindormTable": "xxx",
  "compression": "zstd",
  "timestamp": "2022-07-01 10:00:00",
  "columns": [
    "ROW||String",
    "f:intcol||Int",
    "f:doublecol||Double",
    "f:stringcol||String",
    "f:string1col||String",
    "f:decimalcol||Decimal"
  ]
}

Submit a job using an API operation

All API operations use the LTS master endpoint in the following format: http://{LTSMaster}:12311. Replace {LTSMaster} with the master hostname of your Lindorm instance, which you can find in the Basic Information section on the Cluster Information page of the LTS console.

Getting the LTS master hostname

Submit a job

  • Method: POST

  • Endpoint: http://{LTSMaster}:12311/pro/proc/bulkload/create

Parameters

Parameter Description
src The name of the source data source.
dst The name of the destination data source.
readerConfig The reader plugin configuration in JSON format. For configuration examples, see Reader plugin configuration examples.
writerConfig The writer plugin configuration in JSON format. For configuration examples, see Writer plugin configuration examples.
driverSpec The specification of the Spark driver. Valid values: small, medium, large, and xlarge. We recommend that you set this parameter to large.
instances The number of executors.
fileType The source file type. If the source data source is HDFS, set this parameter to CSV or Parquet.
sparkAdditionalParams The extension parameters. This parameter is optional.

Example

curl -d "src=hdfs&dst=ld&readerConfig={\"filePath\":\"parquet/\",\"column\":[\"id\",\"intcol\",\"doublecol\",\"stringcol\",\"string1col\",\"decimalcol\"]}&writerConfig={\"columns\":[\"ROW||String\",\"f:intcol||Int\",\"f:doublecol||Double\",\"f:stringcol||String\",\"f:string1col||String\",\"f:decimalcol||Decimal\"],\"namespace\":\"default\",\"lindormTable\":\"bulkload_test\",\"compression\":\"zstd\"}&driverSpec=large&instances=5&fileType=Parquet" -H "Content-Type: application/x-www-form-urlencoded" -X POST http://{LTSMaster}:12311/pro/proc/bulkload/create

Response

The message field contains the job ID.

{"success":"true","message":"proc-91-ff383c616e5242888b398e51359c****"}

Get job information

  • Method: GET

  • Endpoint: http://{LTSMaster}:12311/pro/proc/{procId}/info

  • Parameter: procId -- the job ID.

Example

curl http://{LTSMaster}:12311/pro/proc/proc-91-ff383c616e5242888b398e51359c****/info

Response

{
    "data":{
        "checkJobs":Array,
        "procId":"proc-91-ff383c616e5242888b398e51359c****",
        "incrJobs":Array,
        "procConfig":Object,
        "stage":"WAIT_FOR_SUCCESS",
        "fullJobs":Array,
        "mergeJobs":Array,
        "srcDS":"hdfs",
        "sinkDS":"ld-uf6el41jkba96****",
        "state":"RUNNING",
        "schemaJob":Object,
        "procType":"SPARK_BULKLOAD"
    },
    "success":"true"
}

The response includes the following key fields:

Field Description
procId The job ID.
srcDS The source data source.
sinkDS The destination data source.
state The job status, such as RUNNING.
stage The job stage, such as WAIT_FOR_SUCCESS.
procType The job type, such as SPARK_BULKLOAD.

Stop a job

  • Method: GET

  • Endpoint: http://{LTSMaster}:12311/pro/proc/{procId}/abort

  • Parameter: procId -- the job ID.

Example

curl http://{LTSMaster}:12311/pro/proc/proc-91-ff383c616e5242888b398e51359c****/abort

Response

{"success":"true","message":"ok"}

Retry a job

  • Method: GET

  • Endpoint: http://{LTSMaster}:12311/pro/proc/{procId}/retry

  • Parameter: procId -- the job ID.

Example

curl http://{LTSMaster}:12311/pro/proc/proc-91-ff383c616e5242888b398e51359c****/retry

Response

{"success":"true","message":"ok"}

Delete a job

  • Method: GET

  • Endpoint: http://{LTSMaster}:12311/pro/proc/{procId}/delete

  • Parameter: procId -- the job ID.

Example

curl http://{LTSMaster}:12311/pro/proc/proc-91-ff383c616e5242888b398e51359c****/delete

Response

{"success":"true","message":"ok"}