数据导入

更新时间:
复制为 MD 格式

本文介绍如何使用Direct Load快速装载机制向PolarDB MySQL列存表中高效导入数据,支持从OSS和本地文件导入CSVParquet格式数据。

概述

Direct Load是一种快速装载机制,为X-Engine列存表提供快速数据加载能力。支持从OSS和本地文件导入数据,支持CSV(默认)和Parquet格式。除Direct Load外,X-Engine全列存还支持MySQL原生导入以及DTS导入(选择目标引擎为X-Engine)

会话参数

Direct Load提供以下会话级参数,用于控制导入行为:

参数名

说明

取值范围

默认值

xengine_dload_parallel

存储层并发度。0表示关闭快速装载,1~1024表示并行度。

0~1024

1

xengine_dload_check_unique_mod

唯一性检查模式。

  • OFF:关闭唯一性检查(主键重复仍报错)。

  • KEEPONE:发现重复时随机保留一条。

  • ERROR:发现重复立即报错。

OFF / KEEPONE / ERROR

KEEPONE

xengine_dload_async_build_nci

是否异步构建NCI(非聚集索引)。FORMAT PARQUET内部强制走异步路径。

ON / OFF

OFF

xengine_dload_skip_redo_log

CFile提交时是否跳过Redo Log复制,CFile仍然持久化到主存储,但不会同步到存储热备集群(Standby)。

说明

仅适用于标准版且未开启存储热备集群(Standby)的导入场景。

ON / OFF

OFF

语法

Direct Load使用LOAD DATA语法,支持从OSS或本地文件导入数据。

LOAD DATA [OSS|LOCAL]
    INFILE 'file_name'
    [REPLACE | IGNORE]
    INTO TABLE tbl_name
    [PARTITION (partition_name [, ...])]
    [CHARACTER SET charset_name]
    [{FIELDS | COLUMNS}
        [TERMINATED BY 'string']
        [[OPTIONALLY] ENCLOSED BY 'char']
        [ESCAPED BY 'char']
    ]
    [LINES
        [STARTING BY 'string']
        [TERMINATED BY 'string']
    ]
    [IGNORE number {LINES | ROWS}]
    [(col_name_or_user_var [, ...])]
    [SET col_name={expr | DEFAULT} [, ...]]

限制条件

使用Direct Load导入数据时,需要注意以下限制:

  • 表要求:Direct Load仅支持ENGINE=XENGINETABLE_FORMAT=COLUMN的表,表必须为空。

  • 索引支持

    • 支持带ORDER KEY的表

    • 不支持二级索引、外键或Hidden Key。

      说明

      当参数xengine_enable_ctable_hidden_key=ON时存在Hidden Key限制。

  • Binlog:Direct Load启用时自动关闭Binlog写入(通过临时清除OPTION_BIN_LOG标志),不影响全局配置。

  • 触发器:带触发器的表不支持Direct Load。

  • 并发限制:每张表同时只允许一个Direct Load语句。执行期间会持有排他MDL锁,禁止对该表进行DMLDDL操作。

  • 文件通配符

    • OSS模式支持通配符(*),例如/home/t4/data/*.csv

    • LOCAL模式和非Direct Load模式不支持通配符。

使用示例

OSS导入

步骤一:创建OSS Server

首先创建OSS Server,配置OSS连接信息。

说明

创建OSS Server时需要SERVERS_ADMIN权限,您可以通过SHOW GRANTS FOR 用户名;命令查看当前登录用户是否具有SERVERS_ADMIN权限。

高权限账户可以给低权限账户赋予该权限。 如果没有权限,会报错:Access denied; you need (at least one of) the SERVERS_ADMIN OR SUPER privilege(s) for this operation

DROP SERVER IF EXISTS my_server;
CREATE SERVER my_server FOREIGN DATA WRAPPER oss OPTIONS (
  EXTRA_SERVER_INFO '{"oss_endpoint":"oss-cn-xxxx-internal.aliyuncs.com","oss_bucket":"xxxx","oss_access_key_id":"xxxx","oss_access_key_secret":"xxxx"}'
);
SELECT * FROM mysql.servers;

步骤二:创建目标表

创建X-Engine列存目标表。以下以TPC-H lineitem表为例。

CREATE TABLE lineitem (
    l_orderkey BIGINT NOT NULL,
    l_partkey BIGINT NOT NULL,
    l_suppkey BIGINT NOT NULL,
    l_linenumber INT NOT NULL,
    l_quantity DECIMAL(15,2) NOT NULL,
    l_extendedprice DECIMAL(15,2) NOT NULL,
    l_discount DECIMAL(15,2) NOT NULL,
    l_tax DECIMAL(15,2) NOT NULL,
    l_returnflag CHAR(1) NOT NULL,
    l_linestatus CHAR(1) NOT NULL,
    l_shipdate DATE NOT NULL,
    l_commitdate DATE NOT NULL,
    l_receiptdate DATE NOT NULL,
    l_shipinstruct CHAR(25) NOT NULL,
    l_shipmode CHAR(10) NOT NULL,
    l_comment VARCHAR(44) NOT NULL,
    PRIMARY KEY (l_orderkey, l_linenumber, l_shipdate),
    ORDER KEY(l_orderkey)
) ENGINE=XENGINE TABLE_FORMAT=COLUMN;

步骤三:导入数据

设置并行度后,使用LOAD DATA OSSOSS导入数据。支持通配符匹配多个文件。

-- 设置并发度(可选)
SET xengine_dload_parallel = 2;

-- 使用通配符导入(OSS 模式支持通配符)
LOAD DATA OSS INFILE 'my_server/lineitem/lineitem_test*.csv' 
INTO TABLE lineitem 
FIELDS TERMINATED BY '|';

-- 验证导入结果
SELECT * FROM lineitem ORDER BY l_orderkey, l_partkey DESC LIMIT 10;
SELECT COUNT(1) FROM lineitem;

从本地文件导入

说明

从本地文件Direct Load,仅支持单个文件导入,多个文件请使用OSS导入

使用LOAD DATA LOCAL从客户端本地文件导入数据。

CREATE TABLE lineitem (
    l_orderkey       BIGINT NOT NULL,
    l_partkey        BIGINT NOT NULL,
    l_suppkey        BIGINT NOT NULL,
    l_linenumber     BIGINT NOT NULL,
    l_quantity       DECIMAL(15,2) NOT NULL,
    l_extendedprice  DECIMAL(15,2) NOT NULL,
    l_discount       DECIMAL(15,2) NOT NULL,
    l_tax            DECIMAL(15,2) NOT NULL,
    l_returnflag     CHAR(1) NOT NULL,
    l_linestatus     CHAR(1) NOT NULL,
    l_shipdate       DATE NOT NULL,
    l_commitdate     DATE NOT NULL,
    l_receiptdate    DATE NOT NULL,
    l_shipinstruct   CHAR(25) NOT NULL,
    l_shipmode       CHAR(10) NOT NULL,
    l_comment        VARCHAR(44) NOT NULL,
    PRIMARY KEY (l_orderkey, l_linenumber, l_shipdate),
    ORDER KEY(l_orderkey)
) ENGINE=XENGINE TABLE_FORMAT=COLUMN;

SET xengine_dload_parallel = 1;

-- 从本地文件导入(不支持通配符)
LOAD DATA LOCAL INFILE '/home/data/lineitem/lineitem_test.csv' 
INTO TABLE lineitem 
FIELDS TERMINATED BY '|';

-- 验证导入结果
SELECT * FROM lineitem ORDER BY l_orderkey, l_partkey DESC LIMIT 10;
SELECT COUNT(1) FROM lineitem;

FORMAT PARQUET导入

支持从Parquet文件直接导入X-Engine列存表,跳过CSV文本解析,走ArrowORC列式fast-path,获得更高的导入性能。

Parquet语法

LOAD DATA [LOCAL | OSS] INFILE 'file_name_or_glob'
    [REPLACE | IGNORE]
    FORMAT PARQUET
    INTO TABLE tbl_name
    [PARALLEL n]
    [(col_name [, col_name] ...)]
    [OPTIONS (
        batch_size     = <unsigned int>,
        validate_schema = TRUE | FALSE,
        parallel       = <unsigned int>
    )];

语法说明

  • 自动识别:如果您的文件名以.parquet(大小写不敏感)结尾,系统会自动启用Parquet导入模式,即使您省略了FORMAT PARQUET关键字。

  • 并行加载 (PARALLEL n):此子句用于指定并行读取Parquet文件中数据块(RowGroup)的线程数。实际的并行度会取n和所有待导入文件中RowGroup总数的较小者(公式:实际并行度 = min(n, RowGroup 数 × 文件数))。如果省略此子句,则并行度由会话变量xengine_dload_parallel决定。

    • OPTIONS子句中parallel与外层PARALLEL n等价,两者同时存在时以OPTIONS(parallel=N)为准。

  • 选择性导入列 ((col_name, ...)):您可以提供一个列名清单,以按名称从Parquet文件中选择性地导入数据。只有清单中指定的列才会被加载,目标表中未被指定的列将填充其默认值。这对于处理宽表或只需导入部分数据列的场景非常有用。

Parquet使用限制

  • 数据类型兼容性:Parquet文件中的数据类型必须与目标表中的列类型兼容。例如,Parquet 的STRING类型可以映射到VARCHARTEXT,但BINARY类型只能映射到BLOBVARBINARY。请参考类型映射表以确保兼容性。

  • Schema匹配

    • 位置模式(未提供列清单):Parquet 文件的列数量必须与目标表的列数量完全相等,且类型按位置一一兼容。

    • 具名模式(提供列清单):清单中的每一个列名都必须同时存在于 Parquet 文件和目标表中,且类型兼容。

  • 异步索引构建:为了追求极致性能,Parquet导入模式内部强制采用异步方式构建主键索引(NCI)。这意味着LOAD DATA命令会在数据写入完成后立即返回,然后在后台构建索引。在此期间,表将处于只读状态,且仅支持列存查询。您无需手动设置xengine_dload_async_build_nci参数。

  • 暂不支持的类型:当前版本暂不支持FIXED_SIZE_BINARY类型的Arrow列。

类型映射表

以下是Parquet/Arrow类型到MySQL类型的映射关系:

Parquet / Arrow 类型

可接受的MySQL列类型

备注

BOOLEAN

TINYINT(1)SMALLINTINTBIGINT

自动扩展宽度

INT8INT16INT32INT64

与位宽匹配或更宽的整型

小整型可自动扩展至BIGINT

UINT8UINT16UINT32UINT64

SMALLINTINTBIGINT

支持在有符号类型上进行安全的宽度扩展

FLOAT

FLOATDOUBLE

FLOAT可自动扩展至DOUBLE

DOUBLE

DOUBLE

-

STRING (UTF-8)

VARCHAR(n)CHAR(n)TEXT

适用于非二进制字符集

LARGE_STRING

TEXT

专为超长字符串设计

BINARY

BLOBVARBINARY

不允许映射到 VARCHAR 等非二进制字符集

LARGE_BINARY

BLOB

-

DATE32

DATEVARCHAR(>=10)

当映射为 VARCHAR 时,格式为 YYYY-MM-DD

TIMESTAMP[us, UTC]

DATETIME(6)TIMESTAMP(6)

支持可空(Nullable)类型

DECIMAL(p,s) (p ≤ 18)

DECIMAL(p,s)

内部使用 Decimal64 路径

DECIMAL(p,s) (19 ≤ p ≤ 38)

DECIMAL(p,s)

内部使用 Decimal128 路径

FIXED_SIZE_BINARY

暂不支持

Parquet导入示例

本地文件导入(隐藏PK)

SET xengine_enable_ctable_hidden_key = ON;
SET xengine_dload_parallel = 4;

CREATE TABLE t_basic (
    id     INT,
    bigval BIGINT,
    amount DOUBLE,
    name   VARCHAR(100)
) ENGINE=XENGINE TABLE_FORMAT=COLUMN;

-- 显式 FORMAT PARQUET
LOAD DATA LOCAL INFILE '/data/test_basic.parquet'
  FORMAT PARQUET INTO TABLE t_basic;

-- 自动识别 .parquet 扩展名
LOAD DATA LOCAL INFILE '/data/test_basic.parquet' INTO TABLE t_basic;

-- 等待异步 NCI 构建完成后再走行计划 / DML
SELECT schema_name, table_name, task_status
  FROM information_schema.XENGINE_CTABLE_NCI_BUILD;

指定列(具名模式)

在列清单中指定需要导入的列名,列名必须同时存在于Parquet文件和目标表中。

-- 仅载入 id, name 两列,其他列填充默认值/NULL
LOAD DATA LOCAL INFILE '/data/test_basic.parquet'
  FORMAT PARQUET INTO TABLE t_prune (id, name);

并行加载(PARALLEL子句)

-- PARALLEL 子句:4 线程按 RowGroup 分片
LOAD DATA LOCAL INFILE '/data/big.parquet'
  FORMAT PARQUET INTO TABLE t_par PARALLEL 4;

-- 通配符 + PARALLEL:多文件并行
LOAD DATA LOCAL INFILE '/data/multi_*.parquet'
  FORMAT PARQUET INTO TABLE t_multi PARALLEL 8;

OPTIONS子句

使用OPTIONS子句可以控制Parquet导入的批量大小和Schema验证等行为。

LOAD DATA LOCAL INFILE '/data/test_basic.parquet'
  FORMAT PARQUET INTO TABLE t_opts
  OPTIONS(batch_size=65536, validate_schema=TRUE);

用户自定义PK

当目标表有自定义的主键时,Parquet文件中需要包含对应的主键列。

-- 单列 PK
CREATE TABLE t_upk1 (
    id     INT PRIMARY KEY,
    bigval BIGINT,
    amount DOUBLE,
    name   VARCHAR(100)
) ENGINE=XENGINE TABLE_FORMAT=COLUMN;

LOAD DATA LOCAL INFILE '/data/test_basic.parquet'
  FORMAT PARQUET INTO TABLE t_upk1;

-- 复合 PK
CREATE TABLE t_upk_comp (
    id     INT,
    bigval BIGINT,
    amount DOUBLE,
    name   VARCHAR(100),
    PRIMARY KEY (id, bigval)
) ENGINE=XENGINE TABLE_FORMAT=COLUMN;

LOAD DATA LOCAL INFILE '/data/test_basic.parquet'
  FORMAT PARQUET INTO TABLE t_upk_comp;

OSS导入Parquet

支持从OSS导入Parquet文件,可使用通配符匹配多个文件。

-- 先创建 OSS Server(语句同 CSV 路径)
DROP SERVER IF EXISTS my_server;
CREATE SERVER my_server FOREIGN DATA WRAPPER oss OPTIONS (
  EXTRA_SERVER_INFO '{"oss_endpoint": "oss-cn-xxxx.aliyuncs.com",
                      "oss_bucket": "xxxx",
                      "oss_access_key_id": "xxxx",
                      "oss_access_key_secret": "xxxx"}'
);

SET xengine_dload_parallel = 16;

LOAD DATA OSS INFILE 'my_server/path/to/data_*.parquet'
  FORMAT PARQUET INTO TABLE t_basic PARALLEL 16;

异步NCI构建模式

xengine_dload_async_build_nci=ON时,导入在写入CTable文件完成后立即返回,NCI在后台异步构建。

使用方法

SET xengine_dload_async_build_nci = ON;
SET xengine_dload_parallel = 64;

LOAD DATA OSS INFILE 'my_server/tpch1000g/lineitem.*' 
INTO TABLE tpch1000.lineitem 
FIELDS TERMINATED BY '|';

注意事项

  • NCI构建期间不允许DMLDDL操作。

  • NCI构建期间仅允许列存执行计划查询,行存查询会失败,报错:ER_XENGINE_CTABLE_NCI_NOT_READY_FOR_ROWPLAN

查询示例(异步构建期间)

SET use_imci_engine = ON;
SELECT * FROM lineitem ORDER BY l_orderkey, l_partkey DESC LIMIT 10;  -- ✓ 可执行
SELECT COUNT(1) FROM lineitem;  -- ✓ 可执行

SET use_imci_engine = OFF;
SELECT * FROM lineitem ORDER BY l_orderkey, l_partkey DESC LIMIT 10;  -- ✗ 报错
UPDATE lineitem SET l_partkey=1 WHERE l_orderkey=149999904;  -- ✗ 报错

监控NCI构建进度

通过information_schema.XENGINE_CTABLE_NCI_BUILD表监控NCI构建进度。

SELECT schema_name, table_name, task_status FROM information_schema.XENGINE_CTABLE_NCI_BUILD ORDER BY subtable_id;

取消和重建NCI

如果需要取消正在进行的NCI构建,可以使用dbms_ctable包中提供的命令终止后台线程。NCI构建失败后,可以通过删除并重新创建索引来重建NCI。

-- 取消 NCI 构建(如需要)
CALL dbms_ctable.cancel_nci_build(<subtable_id>);
-- 重建 NCI(取消后)
CALL dbms_ctable.rebuild_nci(<subtable_id>);

TPC-H 100G导入示例

以下示例展示如何使用Direct LoadOSS导入TPC-H 100 GB规模的数据。

创建目标表

lineitem表为例,创建X-Engine列存表。

SET xengine_enable_ctable_hidden_key = OFF;

DROP DATABASE IF EXISTS tpch100;
CREATE DATABASE tpch100;
USE tpch100;

-- 示例:创建 lineitem 表
CREATE TABLE lineitem (
    L_ORDERKEY    INTEGER NOT NULL,
    L_PARTKEY     INTEGER NOT NULL,
    L_SUPPKEY     INTEGER NOT NULL,
    L_LINENUMBER  INTEGER NOT NULL,
    L_QUANTITY    DECIMAL(15,2) NOT NULL,
    L_EXTENDEDPRICE DECIMAL(15,2) NOT NULL,
    L_DISCOUNT    DECIMAL(15,2) NOT NULL,
    L_TAX         DECIMAL(15,2) NOT NULL,
    L_RETURNFLAG  CHAR(1) NOT NULL,
    L_LINESTATUS  CHAR(1) NOT NULL,
    L_SHIPDATE    DATE NOT NULL,
    L_COMMITDATE  DATE NOT NULL,
    L_RECEIPTDATE DATE NOT NULL,
    L_SHIPINSTRUCT CHAR(25) NOT NULL,
    L_SHIPMODE     CHAR(10) NOT NULL,
    L_COMMENT      VARCHAR(44) NOT NULL,
    PRIMARY KEY (L_ORDERKEY, L_LINENUMBER, L_SHIPDATE)
) ENGINE=XENGINE TABLE_FORMAT=COLUMN;

创建OSS Server

DROP SERVER IF EXISTS my_server;

CREATE SERVER my_server
FOREIGN DATA WRAPPER oss OPTIONS
(
  EXTRA_SERVER_INFO '{"oss_endpoint": "oss-cn-xxxx.aliyuncs.com","oss_bucket": "xxxx","oss_access_key_id": "xxxx","oss_access_key_secret": "xxxx"}'
);

批量导入脚本

使用Bash脚本批量导入所有TPC-H表的数据。

#!/usr/bin/env bash
set -u

SOCK=/tmp/mysql29.sock
OUT=output.txt
PREFIX="my_server/imci/tpch100g"
DB=tpch100
PARA=64

: > "$OUT"

tables=(lineitem nation region part supplier partsupp customer orders)

run() {
  local t="$1"
  (
    echo "===== [$t] START $(date '+%F %T') ====="
    sql="SET xengine_dload_parallel=${PARA};
         LOAD DATA OSS INFILE '${PREFIX}/${t}.*'
         INTO TABLE ${DB}.${t}
         FIELDS TERMINATED BY '|';"
    { time mysql -uroot -S "$SOCK" -e "$sql"; } 2>&1
    rc=${PIPESTATUS[0]:-0}
    echo "===== [$t] END   $(date '+%F %T') rc=$rc ====="
    echo
    exit "$rc"
  ) >>"$OUT" 2>&1 &
  echo "[$t] pid=$!"
}

for t in "${tables[@]}"; do
  run "$t"
done

wait
echo "ALL DONE $(date '+%F %T')" >>"$OUT"

TPC-H Parquet导入示例

本示例与TPC-H 100G导入示例结构一致,建表DDLOSS Server创建语句直接复用该节(如切换库名,把tpch100改为tpch_parquet即可)。差异仅在导入脚本:输入格式改为FORMAT PARQUET,不再需要FIELDS TERMINATED BY,Parquet路径内部强制异步NCI,也不需要显式设置xengine_dload_async_build_nci

#!/usr/bin/env bash
set -u

SOCK=/tmp/mysql29.sock
OUT=output.txt
# TODO: 数据集准备好之后填入实际 OSS 路径,例如 my_server/imci/tpch1000g_parquet
PREFIX="my_server/<TBD>/tpch_parquet"
DB=tpch_parquet
PARA=64

: > "$OUT"

tables=(lineitem nation region part supplier partsupp customer orders)

run() {
  local t="$1"
  (
    echo "===== [$t] START $(date '+%F %T') ====="
    sql="SET xengine_dload_parallel=${PARA};
         LOAD DATA OSS INFILE '${PREFIX}/${t}/*.parquet'
         FORMAT PARQUET INTO TABLE ${DB}.${t} PARALLEL ${PARA};"
    { time mysql -uroot -S "$SOCK" -e "$sql"; } 2>&1
    rc=${PIPESTATUS[0]:-0}
    echo "===== [$t] END   $(date '+%F %T') rc=$rc ====="
    echo
    exit "$rc"
  ) >>"$OUT" 2>&1 &
  echo "[$t] pid=$!"
}

for t in "${tables[@]}"; do
  run "$t"
done

wait
echo "ALL DONE $(date '+%F %T')" >>"$OUT"

# 等待后台异步 NCI 构建结束(之后才能走行计划 / DML)
mysql -uroot -S "$SOCK" -N -e "
  SELECT TABLE_NAME, TASK_STATUS, RUN_SEC
    FROM information_schema.XENGINE_CTABLE_NCI_BUILD
   ORDER BY CREATE_TIME;"

错误处理

使用Direct Load导入数据时,可能遇到以下常见错误:

错误码

说明

解决方案

ER_FEATURE_UNSUPPORTED

不支持的特性

检查表是否满足Direct Load要求(ENGINE=XENGINE, TABLE_FORMAT=COLUMN),确认不存在二级索引、外键或触发器。

ER_DUP_KEY

主键冲突

检查导入数据中是否存在重复主键。可通过设置xengine_dload_check_unique_mod=KEEPONE自动去重。

ER_XENGINE_CTABLE_NCI_NOT_READY_FOR_ROWPLAN

NCI尚未构建完成,无法使用行存执行计划

等待NCI构建完成,或使用列存执行计划查询。通过information_schema.XENGINE_CTABLE_NCI_BUILD查看构建进度。

ER_XENGINE_CTABLE_NCI_NOT_READY_FOR_DML

NCI构建期间,尝试对表进行DML操作

NCI构建期间不允许DMLDDL操作,请等待NCI构建完成。

最佳实践

  • Binlog自动关闭:Direct Load自动关闭Binlog写入,无需手动操作。导入完成后自动恢复,不影响全局配置。

  • 设置合适的并发度:OSS导入场景通常建议设置xengine_dload_parallel32 ~ 64,本地文件导入建议设置较低的并发度。

  • 大规模导入使用异步NCI:大规模数据导入时,建议开启xengine_dload_async_build_nci=ON,可以显著缩短导入时间。

  • 确保表为空:Direct Load要求目标表为空,导入前请确认表中无数据。

  • 导入前移除二级索引:Direct Load不支持二级索引,请在导入前删除二级索引,导入完成后再重新创建。

  • OSS模式使用通配符:利用通配符(*)一次性匹配多个文件,简化大规模数据导入流程。

  • 监控NCI构建进度:异步NCI模式下,通过information_schema.XENGINE_CTABLE_NCI_BUILD定期检查构建进度,确保NCI构建完成后再执行行存执行计划查询。

  • Parquet优于CSV:条件允许时,建议使用Parquet格式导入。Parquet格式跳过CSV文本解析,走列式fast-path,导入性能更优。