Lindorm supports several methods for importing data in batches. This topic describes how to submit a data import Job on the LTS action page.
Create a task
Log on to the LTS action page. For more information, see Activate and log on to LTS.
-
In the left-side navigation pane, choose Data Sources > Add Data Source to add the following data sources.
Add an ODPS data source. For more information, see ODPS Data Source.
Add a Lindorm wide table data source. For more information, see Lindorm Wide Table Data Source.
Add an HDFS data source. For more information, see Add an HDFS data source.
-
In the navigation pane on the left, select .
Note-
For LTS versions earlier than 3.8.12.4.3, choose .
-
You can find the LTS version under Configurations on the Instance Details page of the Lindorm Management Console.
-
-
Click Create a task and configure the following parameters.
Parameter
Setting
Description
Select data source
Source Data Source
Select the added ODPS or HDFS data source.
Target Data Source
Select the added Lindorm wide table data source.
File Type (optional)
If the data source is HDFS or OSS, select the file type to read.
Balanced Partition
The sampling percentage. For more information, see Balanced Partition.
Click the Balanced partitioning option to enable this feature.
In the input box, enter an integer from 1 to 100 to specify the sampling percentage. The recommended value is 1% to 5%.
Plugin Configuration
Reader Configuration
For more information, see Reader plugin configuration example.
Writer Configuration
For a configuration example, see Writer Plugin Configuration Example.
Job Runtime Configuration
Executor Count
Enter the number of executors.
The maximum concurrency of a task is
4 × Number of executors. Configure the number of executors based on your resource and workload requirements.Spark Configuration
Optional. The Spark configuration. For parameter details, see Compute engine job configurations.
Spark UDF
Use a custom UDF. Only UDFs developed in Java are supported. Upload the JAR file using the compute engine console. For more information, see Upload files using the console.
-
Click Create.
-
On the Bulkload page, click a job name in the Task Name column to view the job details.
-
Click the Task Name link to open the Spark job UI.
-
Click Details to view the Spark job's execution log.
The job list includes columns such as Table Name, Source Cluster, Target Cluster, Status, Task Progress, and Start Time. You can filter jobs by status: All, Queued, Running, Completed, and Failed.
NoteWhen you migrate data from a source to a Lindorm wide table with evenly distributed partitions, it takes approximately one hour to import 100 GB of data at a 1:4 compression ratio. The actual time may vary.
-
View task running status
View task progress
-
Click the link in the Task Name column to open the details page.
-
After the task starts, click the specific task link.
NoteThe task may take a few moments to start. The task link might be unavailable during this period. Please wait for the task to initialize.
-
View the overall progress of the task.
View task logs
You can view task logs in several ways to troubleshoot issues. You can view failure logs in the console, download full log files, or track running logs in real time.
-
If a task fails, you can view the FAILED logs directly in the task details.
-
You can download the full logs from the task details.
-
You can view the running logs on the task progress page.
Balanced partitioning
Scenarios
This feature is designed for the batch data import (Bulkload) mode and applies to the following scenarios:
-
Initial data import for a new table
When you first import data into a new wide table, the default number of partitions might not be suitable for the distribution of a large dataset. Direct data import can cause data to accumulate in a few partitions, creating hot spots and reducing throughput. -
Processing unevenly distributed source data
When source data is not evenly distributed, a standard partitioning policy can cause data to accumulate in specific partitions, creating hot spots that slow down the import process and reduce batch import efficiency.
How it works
This feature uses a dynamic partition optimization algorithm to ensure even data distribution. The process includes three stages:
-
Sampling and analysis
The system samples 1% to 5% of the raw dataset and analyzes its distribution. -
Partition adjustment
Based on this analysis, the system dynamically adjusts the partitioning policy to ensure even data distribution. -
Full data import
After the partitions are dynamically adjusted, the system performs a distributed parallel import of the full dataset.
Reader plugin configuration examples
MaxCompute task configuration example
The following example shows the reader plugin configuration for a MaxCompute data source.
{
"table": "test",
"column": [
"id",
"intcol",
"doublecol",
"stringcol",
"string1col",
"decimalcol"
],
"partition": [
"ds=20250820,hh=12"
],
"numPartitions":10
}Parameter | Description | Required | Default value |
table | The name of the MaxCompute table. | Yes | None |
column | The names of the ODPS columns to import. | Yes | None |
partition |
| No | None |
numPartitions | The maximum concurrency for reading the source table. If you do not specify this parameter, the system calculates it automatically. Reading data consumes Tunnel Slot resources of the MaxCompute project. Evaluate your resources carefully. Insufficient Slot resources will cause the task to fail. For more information, see Data Transmission Service Overview | No | None |
Parquet file task configuration example
The following example shows the reader plugin configuration for Parquet files from an HDFS data source.
{
"filePath":"parquet/",
"column": [ // Column names in the Parquet file
"id",
"intcol",
"doublecol",
"stringcol",
"string1col",
"decimalcol"
]
}
Parameter | Description | Required | Default value |
filePath |
The path where the data is located. The following four formats are supported:
|
Yes |
None |
column | The names of the columns to import. | Yes | None |
basePath | Specifies the root directory of a partitioned dataset. This is typically used to obtain partition key columns.
| No | None |
int96RebaseMode | The parsing mode for int96 timestamps. To parse the old int96 type, set this parameter to "LEGACY". | No | None |
pathGlobFilter | A path filter. It supports glob pattern filtering for file paths. For example, *.parquet reads only Parquet files. | No | None |
CSV file task configuration example
The following example shows the reader plugin configuration for CSV files from an HDFS data source.
CSV is a plain text file format. When you configure the task, you must explicitly declare the data type of each field in the "schema".
{
"filePath":"csv/",
"header": false,
"delimiter": ",",
"schema": [
"id|string",
"intcol|int",
"doublecol|double",
"stringcol|string",
"string1col|string",
"decimalcol|decimal"
],
"column": [
"id",
"intcol",
"doublecol",
"stringcol",
"string1col",
"decimalcol"
]
}
Parameter | Description | Required | Default value |
filePath |
The path where the data is located. The following four formats are supported:
|
Yes |
None |
schema |
Declares all column names and their data types for the CSV file. The format is
|
Yes |
None |
basePath | Specifies the root directory of a partitioned dataset. This is typically used to obtain partition key columns.
| No | None |
column | The names of the columns to import. These must exist in the schema declaration. | Yes | None |
nullValue |
Specifies a string to be interpreted as a null value. For example, to parse fields containing "NULL" or "\N" as nulls, set this parameter to |
No |
None |
mode | The mode for handling malformed data.
| No | PERMISSIVE |
timestampFormat | The timestamp format. This is required if a field is parsed as a timestamp type. For example: "yyyy-MM-dd HH:mm:ss". | No | None |
dateFormat | The date format. This is required if a field is parsed as a date type. For example: "yyyy-MM-dd". | No | None |
pathGlobFilter | A path filter. It supports glob pattern filtering for file paths. For example, *.csv reads only CSV files. | No | None |
ORC file task configuration example
The following example shows the reader plugin configuration for ORC files from an HDFS data source.
{
"filePath":"orc/",
"column": [
"id",
"intcol",
"doublecol",
"stringcol",
"string1col",
"decimalcol"
]
}
Parameter | Description | Required | Default value |
filePath |
The path where the data is located. The following four formats are supported:
|
Yes |
None |
column | The names of the columns to import. | Yes | None |
basePath | Specifies the root directory of a partitioned dataset. This is typically used to obtain partition key columns.
| No | None |
pathGlobFilter | A path filter. It supports glob pattern filtering for file paths. For example, *.orc reads only ORC files. | No | None |
Data transformation configuration example
Notes
Functions must be configured in the reader plugin. For information about how to use common built-in functions, see Function Description.
Data transformation is supported for MaxCompute, Parquet, ORC, and CSV.
Standard Spark SQL built-in functions are supported. For more information, see Spark SQL Built-in Functions.
You can use user-defined functions (UDFs). For more information, see Job runtime parameter configuration.
{
"column": [
"CAST(intcol as string)", // Converts an integer to a string. An error is reported if the conversion fails.
"TRY_CAST('abc' as INT)", // Tries to convert the data type. Returns NULL on failure.
"COALESCE(nullable_stringcol,'')", // If the field is NULL, returns an empty string.
"DATE_FORMAT('2023-08-15', 'MM/dd/yyyy')", // Formats the date for output.
"UNIX_MILLIS(CURRENT_TIMESTAMP())", // Gets the current timestamp in milliseconds.
"CONCAT(stringcol, 'hello', ' world')", // Concatenates strings.
"SUBSTRING('Spark SQL', 7)", // Extracts a substring. Returns: 'SQL'.
"MD5(stringcol2)" // Calculates the MD5 value.
]
}Writer plugin configuration examples
Import data into a Lindorm SQL table
The following example shows the writer plugin configuration for importing data into a Lindorm SQL table.
{
"namespace": "default",
"lindormTable": "xxx",
"compression":"zstd",
"sortMode": "row",
"replication":2,
"columns": [
"id",
"intcol",
"doublecol",
"stringcol",
"string1col",
"decimalcol"
]
}Import dynamic columns into a Lindorm SQL table
The following example shows the writer plugin configuration for importing dynamic columns into a Lindorm SQL table.
The schema of dynamic columns is not stored persistently. When you import data, you must specify the column family and the corresponding data type.
{
"namespace": "default",
"lindormTable": "xxx",
"compression":"zstd",
"sortMode": "row",
"replication":2,
"columns": [
"id",
"intcol",
"doublecol",
"stringcol",
"f:dynamic_intcol||INT",
"f:dynamic_stringcol||STRING"
]
}Import data into a Lindorm SQL table with multiple column families
The following example shows the writer plugin configuration for importing data into a Lindorm SQL table with multiple column families.
A table can have multiple column families. The default column family is named 'f', but you can add custom column families, such as 'cf1' and 'cf2'.
{
"namespace": "default",
"lindormTable": "xxx",
"compression":"zstd",
"sortMode": "row",
"replication":2,
"columns": [
"id",
"intcol",
"doublecol",
"stringcol",
"cf1:cf1_int",
"cf2:cf2_string"
]
}Import data into an HBase-compatible Lindorm table
The following example shows the writer plugin configuration for importing data into an HBase-compatible Lindorm table.
{
"namespace": "default",
"lindormTable": "xxx",
"compression":"zstd",
"sortMode": "row",
"replication":2,
"columns": [
"ROW||String", // ROW is fixed to represent the rowkey. String indicates the data type.
"f:intcol||Int", // Format: column_family:column_name||column_type
"f:doublecol||Double",
"f:stringcol||String",
"f:string1col||String",
"f:decimalcol||Decimal"
]
}Import data into Lindorm in API mode
The Bulkload mode does not support importing data into index tables. To synchronously build an index table when you import data into a primary table, use the API mode.
The following example shows the writer plugin configuration for importing data into Lindorm in API mode.
When you import data in API mode, the resources of the Lindorm instance are directly consumed. Limit the import rate to prevent performance degradation of online production services.
{
"namespace": "default",
"lindormTable": "xxx",
"compression":"zstd",
"sortMode": "row",
"replication":2,
"columns": [
"id",
"intcol",
"doublecol",
"stringcol",
"string1col",
"decimalcol"
],
"writeMode":"api" // Specifies the API import mode.
}Writer configuration parameter details
Parameter | Description | Required | Default value |
namespace | The namespace of the Lindorm wide table. | Yes | None |
lindormTable | The name of the Lindorm wide table. | Yes | None |
columns |
The column mapping configuration for the target table. Configure this parameter based on the target table type.
|
Yes |
None |
compression | The compression algorithm. We recommend that you use zstd. Other options include snappy, gz, and lzo. To disable compression, set this parameter to none (default). | No | none (no compression) |
timestamp |
Specifies a timestamp for the import. All fields share the same timestamp. If not configured, the current system time is used by default. The following types are supported:
|
No |
System time |
timeCol | Specifies a column in the source data as the timestamp. Each row can have a different timestamp. The default value is -1, which means no time column.
| No | -1 (no time column) |
sortMode |
The data sorting method for the
|
No |
|
replication | The number of replicas for the written files. The default value is 3. We recommend that you set this to 2. | No | 3 |
writeMode |
The data import method. Valid values are
|
No |
bulkload |