Submit a data import task on the LTS action page

更新时间:
复制 MD 格式

Lindorm supports several methods for importing data in batches. This topic describes how to submit a data import Job on the LTS action page.

Create a task

  1. Log on to the LTS action page. For more information, see Activate and log on to LTS.

  2. In the left-side navigation pane, choose Data Sources > Add Data Source to add the following data sources.

  3. In the navigation pane on the left, select Import Lindorm/HBase > General Import.

    Note
    • For LTS versions earlier than 3.8.12.4.3, choose Import Lindorm/HBase > Bulkload.

    • You can find the LTS version under Configurations on the Instance Details page of the Lindorm Management Console.

  4. Click Create a task and configure the following parameters.

    Parameter

    Setting

    Description

    Select data source

    Source Data Source

    Select the added ODPS or HDFS data source.

    Target Data Source

    Select the added Lindorm wide table data source.

    File Type (optional)

    If the data source is HDFS or OSS, select the file type to read.

    Balanced Partition

    The sampling percentage. For more information, see Balanced Partition.

    1. Click the Balanced partitioning option to enable this feature.

    2. In the input box, enter an integer from 1 to 100 to specify the sampling percentage. The recommended value is 1% to 5%.

    Plugin Configuration

    Reader Configuration

    For more information, see Reader plugin configuration example.

    Writer Configuration

    For a configuration example, see Writer Plugin Configuration Example.

    Job Runtime Configuration

    Executor Count

    Enter the number of executors.

    The maximum concurrency of a task is 4 × Number of executors. Configure the number of executors based on your resource and workload requirements.

    Spark Configuration

    Optional. The Spark configuration. For parameter details, see Compute engine job configurations.

    Spark UDF

    Use a custom UDF. Only UDFs developed in Java are supported. Upload the JAR file using the compute engine console. For more information, see Upload files using the console.

  5. Click Create.

  6. On the Bulkload page, click a job name in the Task Name column to view the job details.

    • Click the Task Name link to open the Spark job UI.

    • Click Details to view the Spark job's execution log.

    The job list includes columns such as Table Name, Source Cluster, Target Cluster, Status, Task Progress, and Start Time. You can filter jobs by status: All, Queued, Running, Completed, and Failed.

    Note

    When you migrate data from a source to a Lindorm wide table with evenly distributed partitions, it takes approximately one hour to import 100 GB of data at a 1:4 compression ratio. The actual time may vary.

View task running status

View task progress

  1. Click the link in the Task Name column to open the details page.

  2. After the task starts, click the specific task link.

    Note

    The task may take a few moments to start. The task link might be unavailable during this period. Please wait for the task to initialize.

  3. View the overall progress of the task.

View task logs

You can view task logs in several ways to troubleshoot issues. You can view failure logs in the console, download full log files, or track running logs in real time.

  • If a task fails, you can view the FAILED logs directly in the task details.

  • You can download the full logs from the task details.

  • You can view the running logs on the task progress page.

Balanced partitioning

Scenarios

This feature is designed for the batch data import (Bulkload) mode and applies to the following scenarios:

  1. Initial data import for a new table
    When you first import data into a new wide table, the default number of partitions might not be suitable for the distribution of a large dataset. Direct data import can cause data to accumulate in a few partitions, creating hot spots and reducing throughput.



  2. Processing unevenly distributed source data
    When source data is not evenly distributed, a standard partitioning policy can cause data to accumulate in specific partitions, creating hot spots that slow down the import process and reduce batch import efficiency.



How it works

This feature uses a dynamic partition optimization algorithm to ensure even data distribution. The process includes three stages:

  1. Sampling and analysis
    The system samples 1% to 5% of the raw dataset and analyzes its distribution.



  2. Partition adjustment
    Based on this analysis, the system dynamically adjusts the partitioning policy to ensure even data distribution.



  3. Full data import
    After the partitions are dynamically adjusted, the system performs a distributed parallel import of the full dataset.



Reader plugin configuration examples

MaxCompute task configuration example

The following example shows the reader plugin configuration for a MaxCompute data source.

{
  "table": "test",
  "column": [ 
    "id",
    "intcol",
    "doublecol",
    "stringcol",
    "string1col",
    "decimalcol"
  ],
  "partition": [
     "ds=20250820,hh=12"  
  ],
  "numPartitions":10 
}

Parameter

Description

Required

Default value

table

The name of the MaxCompute table.

Yes

None

column

The names of the ODPS columns to import.

Yes

None

partition

  • Do not specify this parameter for non-partitioned tables.

  • For a partitioned table, you must configure partition information for a single partition.

No

None

numPartitions

The maximum concurrency for reading the source table. If you do not specify this parameter, the system calculates it automatically.

Reading data consumes Tunnel Slot resources of the MaxCompute project. Evaluate your resources carefully. Insufficient Slot resources will cause the task to fail.
For more information, see Data Transmission Service Overview

No

None

Parquet file task configuration example

The following example shows the reader plugin configuration for Parquet files from an HDFS data source.

{
  "filePath":"parquet/",
  "column": [   // Column names in the Parquet file
    "id",
    "intcol",
    "doublecol",
    "stringcol",
    "string1col",
    "decimalcol"
  ]
}

Parameter

Description

Required

Default value

filePath

The path where the data is located. The following four formats are supported:

  1. A single file path. For example: /tables/search_info/ds=20250820/part-*.parquet.

  2. A single-level data directory that contains only data files and no subdirectories. For example: /tables/search_info/ds=20250820.

  3. A multi-level data directory. For example: /tables/search_info/.

  • The system reads:

    • Data files in the path.

    • All subdirectories in the "partition style" (col=value, multi-level supported). These directory names are parsed as partition key columns.

  • The system does not read:

    • Subdirectories of any depth that are not in the partition style.

  • The system ignores:

    • Files or directories whose names start with a period (.) or an underscore (_), such as _SUCCESS, _temporary, and .DS_Store.

  1. A combination of multiple directories or files. Use a comma (,) to separate the paths. For example: /tables/search_info/ds=20250820,/tables/search_info/ds=20250821.

Yes

None

column

The names of the columns to import.

Yes

None

basePath

Specifies the root directory of a partitioned dataset. This is typically used to obtain partition key columns.

  • Example: /table/date=2025-09-01/hour=10/1.parquet, /table/date=2025-09-02/hour=11/2.parquet

  • If you set "basePath":"/table", the partition key columns date and hour are identified. Otherwise, these partition key columns are usually not obtained.

No

None

int96RebaseMode

The parsing mode for int96 timestamps. To parse the old int96 type, set this parameter to "LEGACY".

No

None

pathGlobFilter

A path filter. It supports glob pattern filtering for file paths. For example, *.parquet reads only Parquet files.

No

None

CSV file task configuration example

The following example shows the reader plugin configuration for CSV files from an HDFS data source.

Important

CSV is a plain text file format. When you configure the task, you must explicitly declare the data type of each field in the "schema".

{
  "filePath":"csv/",
  "header": false,
  "delimiter": ",",
  "schema": [
    "id|string",
    "intcol|int",
    "doublecol|double",
    "stringcol|string",
    "string1col|string",
    "decimalcol|decimal"
  ],
  "column": [
    "id",
    "intcol",
    "doublecol",
    "stringcol",
    "string1col",
    "decimalcol"
  ]
}

Parameter

Description

Required

Default value

filePath

The path where the data is located. The following four formats are supported:

  1. A single file path, such as: "/tables/search_info/ds=20250820/part-*.csv".

  2. A single-level data folder, such as: "/tables/search_info/ds=20250820". This path contains only data files and no subdirectories.

  3. A multi-level data folder, such as: "/tables/search_info/".

  • The system reads:

    • Data files in the path.

    • All subdirectories in the "partition style" (col=value, multi-level supported). These directory names are parsed as partition key columns.

  • The system does not read:

    • Subdirectories of any depth that are not in the partition style.

  • The system ignores:

    • Files or directories whose names start with a period (.) or an underscore (_), such as _SUCCESS, _temporary, and .DS_Store.

  1. A combination of multiple directories or files. Use a comma (,) to separate the paths. For example: "/tables/search_info/ds=20250820,/tables/search_info/ds=20250821".

Yes

None

schema

Declares all column names and their data types for the CSV file. The format is column_name|type, for example: "id|string". When you declare the schema, note the following:

  1. You must include all columns from the file.

  2. The order must match the field order in the file.

Yes

None

basePath

Specifies the root directory of a partitioned dataset. This is typically used to obtain partition key columns.

  • Example: /table/date=2025-09-01/hour=10/1.parquet, /table/date=2025-09-02/hour=11/2.parquet.

  • If you set "basePath":"/table", the partition key columns date and hour are identified. Otherwise, these partition key columns are usually not obtained.

No

None

column

The names of the columns to import. These must exist in the schema declaration.

Yes

None

nullValue

Specifies a string to be interpreted as a null value. For example, to parse fields containing "NULL" or "\N" as nulls, set this parameter to "NULL" or "\N", respectively.

No

None

mode

The mode for handling malformed data.

  • PERMISSIVE: Tries to parse the data. Fills in null for missing columns. Tolerates format issues where possible without throwing an exception.

  • DROPMALFORMED: Drops rows that fail to parse or have a mismatched number of columns. No error is reported.

  • FAILFAST: Throws an exception and stops reading immediately when a bad row is encountered.

No

PERMISSIVE

timestampFormat

The timestamp format. This is required if a field is parsed as a timestamp type. For example: "yyyy-MM-dd HH:mm:ss".

No

None

dateFormat

The date format. This is required if a field is parsed as a date type. For example: "yyyy-MM-dd".

No

None

pathGlobFilter

A path filter. It supports glob pattern filtering for file paths. For example, *.csv reads only CSV files.

No

None

ORC file task configuration example

The following example shows the reader plugin configuration for ORC files from an HDFS data source.

{
  "filePath":"orc/",
  "column": [
    "id",
    "intcol",
    "doublecol",
    "stringcol",
    "string1col",
    "decimalcol"
  ]
}

Parameter

Description

Required

Default value

filePath

The path where the data is located. The following four formats are supported:

  1. A single file path. For example: /tables/search_info/ds=20250820/part-*.orc.

  2. A single-level data directory that contains only data files and no subdirectories. For example: /tables/search_info/ds=20250820.

  3. A multi-level data directory. For example: /tables/search_info/.

  • The system reads:

    • Data files in the path.

    • All subdirectories in the "partition style" (col=value, multi-level supported). These directory names are parsed as partition key columns.

  • The system does not read:

    • Subdirectories of any depth that are not in the partition style.

  • The system ignores:

    • Files or directories whose names start with a period (.) or an underscore (_), such as _SUCCESS, _temporary, and .DS_Store.

  1. A combination of multiple directories or files. Use a comma (,) to separate the paths. For example: /tables/search_info/ds=20250820,/tables/search_info/ds=20250821.

Yes

None

column

The names of the columns to import.

Yes

None

basePath

Specifies the root directory of a partitioned dataset. This is typically used to obtain partition key columns.

  • Example: /table/date=2025-09-01/hour=10/1.parquet, /table/date=2025-09-02/hour=11/2.parquet

  • If you set "basePath":"/table", the partition key columns date and hour are identified. Otherwise, these partition key columns are usually not obtained.

No

None

pathGlobFilter

A path filter. It supports glob pattern filtering for file paths. For example, *.orc reads only ORC files.

No

None

Data transformation configuration example

Notes
  • Functions must be configured in the reader plugin. For information about how to use common built-in functions, see Function Description.

  • Data transformation is supported for MaxCompute, Parquet, ORC, and CSV.

  • Standard Spark SQL built-in functions are supported. For more information, see Spark SQL Built-in Functions.

  • You can use user-defined functions (UDFs). For more information, see Job runtime parameter configuration.

{
  "column": [
    "CAST(intcol as string)",           // Converts an integer to a string. An error is reported if the conversion fails.
    "TRY_CAST('abc' as INT)",           // Tries to convert the data type. Returns NULL on failure.
    "COALESCE(nullable_stringcol,'')",  // If the field is NULL, returns an empty string.
    "DATE_FORMAT('2023-08-15', 'MM/dd/yyyy')", // Formats the date for output.
    "UNIX_MILLIS(CURRENT_TIMESTAMP())",   // Gets the current timestamp in milliseconds.
    "CONCAT(stringcol, 'hello', ' world')",   // Concatenates strings.
    "SUBSTRING('Spark SQL', 7)",        // Extracts a substring. Returns: 'SQL'.
    "MD5(stringcol2)"                   // Calculates the MD5 value.
  ]
}

Writer plugin configuration examples

Import data into a Lindorm SQL table

The following example shows the writer plugin configuration for importing data into a Lindorm SQL table.

{
  "namespace": "default",
  "lindormTable": "xxx",
  "compression":"zstd",
  "sortMode": "row",
  "replication":2,
  "columns": [
       "id",
       "intcol",
       "doublecol",
       "stringcol",
       "string1col",
       "decimalcol"
  ]
}

Import dynamic columns into a Lindorm SQL table

The following example shows the writer plugin configuration for importing dynamic columns into a Lindorm SQL table.

Important

The schema of dynamic columns is not stored persistently. When you import data, you must specify the column family and the corresponding data type.

{
  "namespace": "default",
  "lindormTable": "xxx",
  "compression":"zstd",
  "sortMode": "row",
  "replication":2,
  "columns": [
       "id",
       "intcol",
       "doublecol",
       "stringcol",
       "f:dynamic_intcol||INT",
       "f:dynamic_stringcol||STRING"
  ]
}

Import data into a Lindorm SQL table with multiple column families

The following example shows the writer plugin configuration for importing data into a Lindorm SQL table with multiple column families.

A table can have multiple column families. The default column family is named 'f', but you can add custom column families, such as 'cf1' and 'cf2'.

{
  "namespace": "default",
  "lindormTable": "xxx",
  "compression":"zstd",
  "sortMode": "row",
  "replication":2,
  "columns": [
       "id",
       "intcol",
       "doublecol",
       "stringcol",
       "cf1:cf1_int",
       "cf2:cf2_string"
  ]
}

Import data into an HBase-compatible Lindorm table

The following example shows the writer plugin configuration for importing data into an HBase-compatible Lindorm table.

{
  "namespace": "default",
  "lindormTable": "xxx",
  "compression":"zstd",
  "sortMode": "row",
  "replication":2,
  "columns": [
    "ROW||String",    // ROW is fixed to represent the rowkey. String indicates the data type.
    "f:intcol||Int",  // Format: column_family:column_name||column_type
    "f:doublecol||Double",
    "f:stringcol||String",
    "f:string1col||String",
    "f:decimalcol||Decimal"
  ]
}

Import data into Lindorm in API mode

The Bulkload mode does not support importing data into index tables. To synchronously build an index table when you import data into a primary table, use the API mode.

The following example shows the writer plugin configuration for importing data into Lindorm in API mode.

Important

When you import data in API mode, the resources of the Lindorm instance are directly consumed. Limit the import rate to prevent performance degradation of online production services.

{
  "namespace": "default",
  "lindormTable": "xxx",
  "compression":"zstd",
  "sortMode": "row",
  "replication":2,
  "columns": [
       "id",
       "intcol",
       "doublecol",
       "stringcol",
       "string1col",
       "decimalcol"
  ],
  "writeMode":"api"  // Specifies the API import mode.
}

Writer configuration parameter details

Parameter

Description

Required

Default value

namespace

The namespace of the Lindorm wide table.

Yes

None

lindormTable

The name of the Lindorm wide table.

Yes

None

columns

The column mapping configuration for the target table. Configure this parameter based on the target table type.

  • If you import data into a Lindorm wide table, configure the column names of the Lindorm SQL wide table for the columns parameter. The names must correspond to the columns in the reader configuration.

  • For an HBase-compatible table, configure the columns parameter with the dynamic columns of the HBase table. The format is column family:column name||type.

Yes

None

compression

The compression algorithm. We recommend that you use zstd. Other options include snappy, gz, and lzo. To disable compression, set this parameter to none (default).

No

none (no compression)

timestamp

Specifies a timestamp for the import. All fields share the same timestamp. If not configured, the current system time is used by default. The following types are supported:

  • Long type: A 13-digit timestamp, such as 1719472800000.

  • String: yyyy-MM-dd HH:mm:ss (for example, 2024-09-01 12:34:56)
    or yyyy-MM-dd HH:mm:ss SSS (for example, 2024-09-01 12:34:56.789).



No

System time

timeCol

Specifies a column in the source data as the timestamp. Each row can have a different timestamp. The default value is -1, which means no time column.

  • The timeCol parameter represents the index of the time column in the source data, starting from 0.

  • The values in the time column must be of the Long type and be 13-digit timestamps.

  • By default, the time column is not imported as a data column into Lindorm.

No

-1 (no time column)

sortMode

The data sorting method for the bulkload import mode. The supported methods are row (row sorting) and kv (key-value sorting). The default value is kv.

  • We recommend that you use the row sort mode for tables with fewer than 1000 columns.

No

kv (key-value sorting)

replication

The number of replicas for the written files. The default value is 3. We recommend that you set this to 2.

No

3

writeMode

The data import method. Valid values are bulkload and api. The default value is bulkload.

  • The bulkload mode cannot import data into index tables. To synchronously build an index table when you import data into a primary table, use the API mode.

No

bulkload