本文介绍如何使用Direct Load快速装载机制向PolarDB MySQL版列存表中高效导入数据,支持从OSS和本地文件导入CSV及Parquet格式数据。
概述
Direct Load是一种快速装载机制,为X-Engine列存表提供快速数据加载能力。支持从OSS和本地文件导入数据,支持CSV(默认)和Parquet格式。除Direct Load外,X-Engine全列存还支持MySQL原生导入以及DTS导入(选择目标引擎为X-Engine)。
会话参数
Direct Load提供以下会话级参数,用于控制导入行为:
参数名 | 说明 | 取值范围 | 默认值 |
| 存储层并发度。0表示关闭快速装载,1~1024表示并行度。 | 0~1024 | 1 |
| 唯一性检查模式。
| OFF / KEEPONE / ERROR | KEEPONE |
| 是否异步构建NCI(非聚集索引)。 | ON / OFF | OFF |
| CFile提交时是否跳过Redo Log复制,CFile仍然持久化到主存储,但不会同步到存储热备集群(Standby)。 说明 仅适用于标准版且未开启存储热备集群(Standby)的导入场景。 | ON / OFF | OFF |
语法
Direct Load使用LOAD DATA语法,支持从OSS或本地文件导入数据。
LOAD DATA [OSS|LOCAL]
INFILE 'file_name'
[REPLACE | IGNORE]
INTO TABLE tbl_name
[PARTITION (partition_name [, ...])]
[CHARACTER SET charset_name]
[{FIELDS | COLUMNS}
[TERMINATED BY 'string']
[[OPTIONALLY] ENCLOSED BY 'char']
[ESCAPED BY 'char']
]
[LINES
[STARTING BY 'string']
[TERMINATED BY 'string']
]
[IGNORE number {LINES | ROWS}]
[(col_name_or_user_var [, ...])]
[SET col_name={expr | DEFAULT} [, ...]]限制条件
使用Direct Load导入数据时,需要注意以下限制:
表要求:Direct Load仅支持
ENGINE=XENGINE且TABLE_FORMAT=COLUMN的表,表必须为空。索引支持:
支持带ORDER KEY的表
不支持二级索引、外键或Hidden Key。
说明当参数
xengine_enable_ctable_hidden_key=ON时存在Hidden Key限制。
Binlog:Direct Load启用时自动关闭Binlog写入(通过临时清除
OPTION_BIN_LOG标志),不影响全局配置。触发器:带触发器的表不支持Direct Load。
并发限制:每张表同时只允许一个Direct Load语句。执行期间会持有排他MDL锁,禁止对该表进行DML和DDL操作。
文件通配符:
OSS模式支持通配符(*),例如
/home/t4/data/*.csv。LOCAL模式和非Direct Load模式不支持通配符。
使用示例
从OSS导入
步骤一:创建OSS Server
首先创建OSS Server,配置OSS连接信息。
创建OSS Server时需要SERVERS_ADMIN权限,您可以通过SHOW GRANTS FOR 用户名;命令查看当前登录用户是否具有SERVERS_ADMIN权限。
高权限账户可以给低权限账户赋予该权限。 如果没有权限,会报错:Access denied; you need (at least one of) the SERVERS_ADMIN OR SUPER privilege(s) for this operation。
DROP SERVER IF EXISTS my_server;
CREATE SERVER my_server FOREIGN DATA WRAPPER oss OPTIONS (
EXTRA_SERVER_INFO '{"oss_endpoint":"oss-cn-xxxx-internal.aliyuncs.com","oss_bucket":"xxxx","oss_access_key_id":"xxxx","oss_access_key_secret":"xxxx"}'
);
SELECT * FROM mysql.servers;步骤二:创建目标表
创建X-Engine列存目标表。以下以TPC-H lineitem表为例。
CREATE TABLE lineitem (
l_orderkey BIGINT NOT NULL,
l_partkey BIGINT NOT NULL,
l_suppkey BIGINT NOT NULL,
l_linenumber INT NOT NULL,
l_quantity DECIMAL(15,2) NOT NULL,
l_extendedprice DECIMAL(15,2) NOT NULL,
l_discount DECIMAL(15,2) NOT NULL,
l_tax DECIMAL(15,2) NOT NULL,
l_returnflag CHAR(1) NOT NULL,
l_linestatus CHAR(1) NOT NULL,
l_shipdate DATE NOT NULL,
l_commitdate DATE NOT NULL,
l_receiptdate DATE NOT NULL,
l_shipinstruct CHAR(25) NOT NULL,
l_shipmode CHAR(10) NOT NULL,
l_comment VARCHAR(44) NOT NULL,
PRIMARY KEY (l_orderkey, l_linenumber, l_shipdate),
ORDER KEY(l_orderkey)
) ENGINE=XENGINE TABLE_FORMAT=COLUMN;步骤三:导入数据
设置并行度后,使用LOAD DATA OSS从OSS导入数据。支持通配符匹配多个文件。
-- 设置并发度(可选)
SET xengine_dload_parallel = 2;
-- 使用通配符导入(OSS 模式支持通配符)
LOAD DATA OSS INFILE 'my_server/lineitem/lineitem_test*.csv'
INTO TABLE lineitem
FIELDS TERMINATED BY '|';
-- 验证导入结果
SELECT * FROM lineitem ORDER BY l_orderkey, l_partkey DESC LIMIT 10;
SELECT COUNT(1) FROM lineitem;从本地文件导入
从本地文件Direct Load,仅支持单个文件导入,多个文件请使用从OSS导入。
使用LOAD DATA LOCAL从客户端本地文件导入数据。
CREATE TABLE lineitem (
l_orderkey BIGINT NOT NULL,
l_partkey BIGINT NOT NULL,
l_suppkey BIGINT NOT NULL,
l_linenumber BIGINT NOT NULL,
l_quantity DECIMAL(15,2) NOT NULL,
l_extendedprice DECIMAL(15,2) NOT NULL,
l_discount DECIMAL(15,2) NOT NULL,
l_tax DECIMAL(15,2) NOT NULL,
l_returnflag CHAR(1) NOT NULL,
l_linestatus CHAR(1) NOT NULL,
l_shipdate DATE NOT NULL,
l_commitdate DATE NOT NULL,
l_receiptdate DATE NOT NULL,
l_shipinstruct CHAR(25) NOT NULL,
l_shipmode CHAR(10) NOT NULL,
l_comment VARCHAR(44) NOT NULL,
PRIMARY KEY (l_orderkey, l_linenumber, l_shipdate),
ORDER KEY(l_orderkey)
) ENGINE=XENGINE TABLE_FORMAT=COLUMN;
SET xengine_dload_parallel = 1;
-- 从本地文件导入(不支持通配符)
LOAD DATA LOCAL INFILE '/home/data/lineitem/lineitem_test.csv'
INTO TABLE lineitem
FIELDS TERMINATED BY '|';
-- 验证导入结果
SELECT * FROM lineitem ORDER BY l_orderkey, l_partkey DESC LIMIT 10;
SELECT COUNT(1) FROM lineitem;
FORMAT PARQUET导入
支持从Parquet文件直接导入X-Engine列存表,跳过CSV文本解析,走Arrow到ORC列式fast-path,获得更高的导入性能。
Parquet语法
LOAD DATA [LOCAL | OSS] INFILE 'file_name_or_glob'
[REPLACE | IGNORE]
FORMAT PARQUET
INTO TABLE tbl_name
[PARALLEL n]
[(col_name [, col_name] ...)]
[OPTIONS (
batch_size = <unsigned int>,
validate_schema = TRUE | FALSE,
parallel = <unsigned int>
)];语法说明
自动识别:如果您的文件名以
.parquet(大小写不敏感)结尾,系统会自动启用Parquet导入模式,即使您省略了FORMAT PARQUET关键字。并行加载 (
PARALLEL n):此子句用于指定并行读取Parquet文件中数据块(RowGroup)的线程数。实际的并行度会取n和所有待导入文件中RowGroup总数的较小者(公式:实际并行度 = min(n, RowGroup 数 × 文件数))。如果省略此子句,则并行度由会话变量xengine_dload_parallel决定。OPTIONS子句中parallel与外层PARALLEL n等价,两者同时存在时以OPTIONS(parallel=N)为准。
选择性导入列 (
(col_name, ...)):您可以提供一个列名清单,以按名称从Parquet文件中选择性地导入数据。只有清单中指定的列才会被加载,目标表中未被指定的列将填充其默认值。这对于处理宽表或只需导入部分数据列的场景非常有用。
Parquet使用限制
数据类型兼容性:Parquet文件中的数据类型必须与目标表中的列类型兼容。例如,Parquet 的
STRING类型可以映射到VARCHAR或TEXT,但BINARY类型只能映射到BLOB或VARBINARY。请参考类型映射表以确保兼容性。Schema匹配:
位置模式(未提供列清单):Parquet 文件的列数量必须与目标表的列数量完全相等,且类型按位置一一兼容。
具名模式(提供列清单):清单中的每一个列名都必须同时存在于 Parquet 文件和目标表中,且类型兼容。
异步索引构建:为了追求极致性能,Parquet导入模式内部强制采用异步方式构建主键索引(NCI)。这意味着
LOAD DATA命令会在数据写入完成后立即返回,然后在后台构建索引。在此期间,表将处于只读状态,且仅支持列存查询。您无需手动设置xengine_dload_async_build_nci参数。暂不支持的类型:当前版本暂不支持
FIXED_SIZE_BINARY类型的Arrow列。
类型映射表
以下是Parquet/Arrow类型到MySQL类型的映射关系:
Parquet / Arrow 类型 | 可接受的MySQL列类型 | 备注 |
|
| 自动扩展宽度 |
| 与位宽匹配或更宽的整型 | 小整型可自动扩展至 |
|
| 支持在有符号类型上进行安全的宽度扩展 |
|
|
|
|
| - |
|
| 适用于非二进制字符集 |
|
| 专为超长字符串设计 |
|
| 不允许映射到 |
|
| - |
|
| 当映射为 |
|
| 支持可空(Nullable)类型 |
|
| 内部使用 Decimal64 路径 |
|
| 内部使用 Decimal128 路径 |
| — | 暂不支持 |
Parquet导入示例
指定列(具名模式)
在列清单中指定需要导入的列名,列名必须同时存在于Parquet文件和目标表中。
-- 仅载入 id, name 两列,其他列填充默认值/NULL
LOAD DATA LOCAL INFILE '/data/test_basic.parquet'
FORMAT PARQUET INTO TABLE t_prune (id, name);并行加载(PARALLEL子句)
-- PARALLEL 子句:4 线程按 RowGroup 分片
LOAD DATA LOCAL INFILE '/data/big.parquet'
FORMAT PARQUET INTO TABLE t_par PARALLEL 4;
-- 通配符 + PARALLEL:多文件并行
LOAD DATA LOCAL INFILE '/data/multi_*.parquet'
FORMAT PARQUET INTO TABLE t_multi PARALLEL 8;OPTIONS子句
使用OPTIONS子句可以控制Parquet导入的批量大小和Schema验证等行为。
LOAD DATA LOCAL INFILE '/data/test_basic.parquet'
FORMAT PARQUET INTO TABLE t_opts
OPTIONS(batch_size=65536, validate_schema=TRUE);用户自定义PK
当目标表有自定义的主键时,Parquet文件中需要包含对应的主键列。
-- 单列 PK
CREATE TABLE t_upk1 (
id INT PRIMARY KEY,
bigval BIGINT,
amount DOUBLE,
name VARCHAR(100)
) ENGINE=XENGINE TABLE_FORMAT=COLUMN;
LOAD DATA LOCAL INFILE '/data/test_basic.parquet'
FORMAT PARQUET INTO TABLE t_upk1;
-- 复合 PK
CREATE TABLE t_upk_comp (
id INT,
bigval BIGINT,
amount DOUBLE,
name VARCHAR(100),
PRIMARY KEY (id, bigval)
) ENGINE=XENGINE TABLE_FORMAT=COLUMN;
LOAD DATA LOCAL INFILE '/data/test_basic.parquet'
FORMAT PARQUET INTO TABLE t_upk_comp;从OSS导入Parquet
支持从OSS导入Parquet文件,可使用通配符匹配多个文件。
-- 先创建 OSS Server(语句同 CSV 路径)
DROP SERVER IF EXISTS my_server;
CREATE SERVER my_server FOREIGN DATA WRAPPER oss OPTIONS (
EXTRA_SERVER_INFO '{"oss_endpoint": "oss-cn-xxxx.aliyuncs.com",
"oss_bucket": "xxxx",
"oss_access_key_id": "xxxx",
"oss_access_key_secret": "xxxx"}'
);
SET xengine_dload_parallel = 16;
LOAD DATA OSS INFILE 'my_server/path/to/data_*.parquet'
FORMAT PARQUET INTO TABLE t_basic PARALLEL 16;异步NCI构建模式
当xengine_dload_async_build_nci=ON时,导入在写入CTable文件完成后立即返回,NCI在后台异步构建。
使用方法
SET xengine_dload_async_build_nci = ON;
SET xengine_dload_parallel = 64;
LOAD DATA OSS INFILE 'my_server/tpch1000g/lineitem.*'
INTO TABLE tpch1000.lineitem
FIELDS TERMINATED BY '|';注意事项
NCI构建期间不允许DML和DDL操作。
NCI构建期间仅允许列存执行计划查询,行存查询会失败,报错:
ER_XENGINE_CTABLE_NCI_NOT_READY_FOR_ROWPLAN。
查询示例(异步构建期间)
SET use_imci_engine = ON;
SELECT * FROM lineitem ORDER BY l_orderkey, l_partkey DESC LIMIT 10; -- ✓ 可执行
SELECT COUNT(1) FROM lineitem; -- ✓ 可执行
SET use_imci_engine = OFF;
SELECT * FROM lineitem ORDER BY l_orderkey, l_partkey DESC LIMIT 10; -- ✗ 报错
UPDATE lineitem SET l_partkey=1 WHERE l_orderkey=149999904; -- ✗ 报错监控NCI构建进度
通过information_schema.XENGINE_CTABLE_NCI_BUILD表监控NCI构建进度。
SELECT schema_name, table_name, task_status FROM information_schema.XENGINE_CTABLE_NCI_BUILD ORDER BY subtable_id;取消和重建NCI
如果需要取消正在进行的NCI构建,可以使用dbms_ctable包中提供的命令终止后台线程。NCI构建失败后,可以通过删除并重新创建索引来重建NCI。
-- 取消 NCI 构建(如需要)
CALL dbms_ctable.cancel_nci_build(<subtable_id>);
-- 重建 NCI(取消后)
CALL dbms_ctable.rebuild_nci(<subtable_id>);TPC-H 100G导入示例
以下示例展示如何使用Direct Load从OSS导入TPC-H 100 GB规模的数据。
创建目标表
以lineitem表为例,创建X-Engine列存表。
SET xengine_enable_ctable_hidden_key = OFF;
DROP DATABASE IF EXISTS tpch100;
CREATE DATABASE tpch100;
USE tpch100;
-- 示例:创建 lineitem 表
CREATE TABLE lineitem (
L_ORDERKEY INTEGER NOT NULL,
L_PARTKEY INTEGER NOT NULL,
L_SUPPKEY INTEGER NOT NULL,
L_LINENUMBER INTEGER NOT NULL,
L_QUANTITY DECIMAL(15,2) NOT NULL,
L_EXTENDEDPRICE DECIMAL(15,2) NOT NULL,
L_DISCOUNT DECIMAL(15,2) NOT NULL,
L_TAX DECIMAL(15,2) NOT NULL,
L_RETURNFLAG CHAR(1) NOT NULL,
L_LINESTATUS CHAR(1) NOT NULL,
L_SHIPDATE DATE NOT NULL,
L_COMMITDATE DATE NOT NULL,
L_RECEIPTDATE DATE NOT NULL,
L_SHIPINSTRUCT CHAR(25) NOT NULL,
L_SHIPMODE CHAR(10) NOT NULL,
L_COMMENT VARCHAR(44) NOT NULL,
PRIMARY KEY (L_ORDERKEY, L_LINENUMBER, L_SHIPDATE)
) ENGINE=XENGINE TABLE_FORMAT=COLUMN;创建OSS Server
DROP SERVER IF EXISTS my_server;
CREATE SERVER my_server
FOREIGN DATA WRAPPER oss OPTIONS
(
EXTRA_SERVER_INFO '{"oss_endpoint": "oss-cn-xxxx.aliyuncs.com","oss_bucket": "xxxx","oss_access_key_id": "xxxx","oss_access_key_secret": "xxxx"}'
);批量导入脚本
使用Bash脚本批量导入所有TPC-H表的数据。
#!/usr/bin/env bash
set -u
SOCK=/tmp/mysql29.sock
OUT=output.txt
PREFIX="my_server/imci/tpch100g"
DB=tpch100
PARA=64
: > "$OUT"
tables=(lineitem nation region part supplier partsupp customer orders)
run() {
local t="$1"
(
echo "===== [$t] START $(date '+%F %T') ====="
sql="SET xengine_dload_parallel=${PARA};
LOAD DATA OSS INFILE '${PREFIX}/${t}.*'
INTO TABLE ${DB}.${t}
FIELDS TERMINATED BY '|';"
{ time mysql -uroot -S "$SOCK" -e "$sql"; } 2>&1
rc=${PIPESTATUS[0]:-0}
echo "===== [$t] END $(date '+%F %T') rc=$rc ====="
echo
exit "$rc"
) >>"$OUT" 2>&1 &
echo "[$t] pid=$!"
}
for t in "${tables[@]}"; do
run "$t"
done
wait
echo "ALL DONE $(date '+%F %T')" >>"$OUT"
TPC-H Parquet导入示例
本示例与TPC-H 100G导入示例结构一致,建表DDL与OSS Server创建语句直接复用该节(如切换库名,把tpch100改为tpch_parquet即可)。差异仅在导入脚本:输入格式改为FORMAT PARQUET,不再需要FIELDS TERMINATED BY,Parquet路径内部强制异步NCI,也不需要显式设置xengine_dload_async_build_nci。
#!/usr/bin/env bash
set -u
SOCK=/tmp/mysql29.sock
OUT=output.txt
# TODO: 数据集准备好之后填入实际 OSS 路径,例如 my_server/imci/tpch1000g_parquet
PREFIX="my_server/<TBD>/tpch_parquet"
DB=tpch_parquet
PARA=64
: > "$OUT"
tables=(lineitem nation region part supplier partsupp customer orders)
run() {
local t="$1"
(
echo "===== [$t] START $(date '+%F %T') ====="
sql="SET xengine_dload_parallel=${PARA};
LOAD DATA OSS INFILE '${PREFIX}/${t}/*.parquet'
FORMAT PARQUET INTO TABLE ${DB}.${t} PARALLEL ${PARA};"
{ time mysql -uroot -S "$SOCK" -e "$sql"; } 2>&1
rc=${PIPESTATUS[0]:-0}
echo "===== [$t] END $(date '+%F %T') rc=$rc ====="
echo
exit "$rc"
) >>"$OUT" 2>&1 &
echo "[$t] pid=$!"
}
for t in "${tables[@]}"; do
run "$t"
done
wait
echo "ALL DONE $(date '+%F %T')" >>"$OUT"
# 等待后台异步 NCI 构建结束(之后才能走行计划 / DML)
mysql -uroot -S "$SOCK" -N -e "
SELECT TABLE_NAME, TASK_STATUS, RUN_SEC
FROM information_schema.XENGINE_CTABLE_NCI_BUILD
ORDER BY CREATE_TIME;"
错误处理
使用Direct Load导入数据时,可能遇到以下常见错误:
错误码 | 说明 | 解决方案 |
| 不支持的特性 | 检查表是否满足Direct Load要求(ENGINE=XENGINE, TABLE_FORMAT=COLUMN),确认不存在二级索引、外键或触发器。 |
| 主键冲突 | 检查导入数据中是否存在重复主键。可通过设置 |
| NCI尚未构建完成,无法使用行存执行计划 | 等待NCI构建完成,或使用列存执行计划查询。通过 |
| NCI构建期间,尝试对表进行DML操作 | NCI构建期间不允许DML和DDL操作,请等待NCI构建完成。 |
最佳实践
Binlog自动关闭:Direct Load自动关闭Binlog写入,无需手动操作。导入完成后自动恢复,不影响全局配置。
设置合适的并发度:OSS导入场景通常建议设置
xengine_dload_parallel为32 ~ 64,本地文件导入建议设置较低的并发度。大规模导入使用异步NCI:大规模数据导入时,建议开启
xengine_dload_async_build_nci=ON,可以显著缩短导入时间。确保表为空:Direct Load要求目标表为空,导入前请确认表中无数据。
导入前移除二级索引:Direct Load不支持二级索引,请在导入前删除二级索引,导入完成后再重新创建。
OSS模式使用通配符:利用通配符(*)一次性匹配多个文件,简化大规模数据导入流程。
监控NCI构建进度:异步NCI模式下,通过
information_schema.XENGINE_CTABLE_NCI_BUILD定期检查构建进度,确保NCI构建完成后再执行行存执行计划查询。Parquet优于CSV:条件允许时,建议使用Parquet格式导入。Parquet格式跳过CSV文本解析,走列式fast-path,导入性能更优。