SelectDB 早期版本仅支持简单的部分列更新功能,限制了一次导入中每一行必须更新相同的列。SelectDB 4.1及更高版本支持灵活部分列更新。该功能允许在单次导入作业中,不同数据行可以更新不同的列。这与早期版本要求每行必须更新相同列的限制形成对比,极大地提升了数据更新的灵活性,尤其适用于 CDC 实时同步等场景。
适用场景
在使用 CDC 的方式将某个数据系统的数据实时同步到 SelectDB 中时,源端系统输出的记录可能并不是完整的行数据,而是只有主键和被更新的列的数据。在这种情况下,某个时间窗口内的一批数据中每一行更新的列可能都是不同的。此时,可以使用灵活列更新的方式来将数据导入到 SelectDB 中。
前提条件与限制
表模型与功能限制
适用表模型:仅支持 Unique Key 模型的 Merge-on-Write 表。
列类型限制:不支持在有 Variant 列的表上进行灵活列更新。
物化视图限制:不支持在有同步物化视图的表上进行灵活列更新。
导入方式与数据要求
导入方式:目前只有 stream load 这一种导入方式以及使用 stream load 作为其导入方式的工具 (如 Flink Doris Connector) 支持灵活列更新功能。
数据格式:在使用灵活列更新时导入文件必须为 json 格式的数据。
主键要求:导入的每一行数据需要包括所有的主键列。不满足此要求的行将被过滤,计入
filtered rows的计数中。如果filtered rows的数量超过了本次导入max_filter_ratio所允许的上限,则整个导入将会失败。被过滤的数据会在 error log 中留下一条日志。列名匹配:导入的 JSON 对象中,只有键(Key)与目标表列名一致的键值对才是有效的,不满足此要求的键值对将被忽略。系统内部使用的列名(如
__DORIS_VERSION_COL__等)也将被忽略。
不支持的导入参数
使用灵活列更新时,Stream Load 作业的 Header 中不能指定或开启以下参数:
merge_typedeletefuzzy_parsecolumnsjsonpathshidden_columnsfunction_column.sequence_colsqlmemtable_on_sink_nodegroup_commitwhere
使用方式
步骤一:为表开启功能
新建表时开启
对于新建的表,如果需要使用灵活列更新功能,建表时需在 `PROPERTIES` 中指定以下两个表属性。这会启用 Merge-on-Write 实现,并为表添加支持灵活列更新所需的隐藏列。
"enable_unique_key_merge_on_write" = "true",
"enable_unique_key_skip_bitmap_column" = "true"为存量表开启
前提条件:确保目标表已是 Merge-on-Write 表(即 `PROPERTIES` 中已包含 `"enable_unique_key_merge_on_write" = "true"`),并已开启 light-schema-change 功能。
执行以下命令为存量表开启此功能:
ALTER TABLE db1.tbl1 ENABLE FEATURE "UPDATE_FLEXIBLE_COLUMNS";验证方式:执行 show create table db1.tbl1,若结果的 `PROPERTIES` 中包含 "enable_unique_key_skip_bitmap_column" = "true",则表示功能开启成功。
步骤二:执行导入
Stream Load
在使用 Stream Load 导入时,在 HTTP Header 中添加以下参数:
unique_key_update_mode:UPDATE_FLEXIBLE_COLUMNSFlink Doris Connector
如果使用 Flink Doris Connector, 需要添加如下 sink 配置:
'sink.properties.unique_key_update_mode' = 'UPDATE_FLEXIBLE_COLUMNS'使用示例
1. 准备表和数据
假设有如下表 `t1`,已按要求开启灵活列更新功能:
CREATE TABLE t1 (
`k` int(11) NULL,
`v1` BIGINT NULL,
`v2` BIGINT NULL DEFAULT "9876",
`v3` BIGINT NOT NULL,
`v4` BIGINT NOT NULL DEFAULT "1234",
`v5` BIGINT NULL
) UNIQUE KEY(`k`) DISTRIBUTED BY HASH(`k`) BUCKETS 1
PROPERTIES(
"replication_num" = "1",
"enable_unique_key_merge_on_write" = "true",
"enable_unique_key_skip_bitmap_column" = "true"
);表中有如下原始数据:
+---+----+----+----+----+----+
| k | v1 | v2 | v3 | v4 | v5 |
+---+----+----+----+----+----+
| 0 | 0 | 0 | 0 | 0 | 0 |
| 1 | 1 | 1 | 1 | 1 | 1 |
| 2 | 2 | 2 | 2 | 2 | 2 |
| 3 | 3 | 3 | 3 | 3 | 3 |
| 4 | 4 | 4 | 4 | 4 | 4 |
| 5 | 5 | 5 | 5 | 5 | 5 |
+---+----+----+----+----+----+2. 导入更新数据
准备 JSON 格式的数据文件 `test1.json`,其中包含删除、更新、新增等多种操作。
{"k": 0, "__DORIS_DELETE_SIGN__": 1}
{"k": 1, "v1": 10}
{"k": 2, "v2": 20, "v5": 25}
{"k": 3, "v3": 30}
{"k": 4, "v4": 20, "v1": 43, "v3": 99}
{"k": 5, "v5": null}
{"k": 6, "v1": 999, "v3": 777}
{"k": 2, "v4": 222}
{"k": 1, "v2": 111, "v3": 111}通过 Stream Load 执行导入:
curl --location-trusted -u root: \
-H "strict_mode:false" \
-H "format:json" \
-H "read_json_by_line:true" \
-H "unique_key_update_mode:UPDATE_FLEXIBLE_COLUMNS" \
-T test1.json \
-XPUT http://<host>:<http_port>/api/d1/t1/_stream_load3. 验证与分析结果
更新后查询表数据,结果如下:
+---+-----+------+-----+------+--------+
| k | v1 | v2 | v3 | v4 | v5 |
+---+-----+------+-----+------+--------+
| 1 | 10 | 111 | 111 | 1 | 1 |
| 2 | 2 | 20 | 2 | 222 | 25 |
| 3 | 3 | 3 | 30 | 3 | 3 |
| 4 | 43 | 4 | 99 | 20 | 4 |
| 5 | 5 | 5 | 5 | 5 | <null> |
| 6 | 999 | 9876 | 777 | 1234 | <null> |
+---+-----+------+-----+------+--------+结果分析:
k = 0:该行数据已被删除(因 `__DORIS_DELETE_SIGN__` 字段)。
k = 1:该行数据在本次导入中被更新了两次。Doris 会合并更新,最终结果是 `v1` 更新为 `10`,`v2` 更新为 `111`,`v3` 更新为 `111`。未被更新的列保持原值。
k = 2:同理,该行数据在本次导入中也被更新了两次,最终结果是两次更新的合并。
k = 3:仅 `v3` 列被更新为 `30`。
k = 4:`v1`, `v3`, `v4` 列被更新。
k = 5:`v5` 列被显式更新为 `NULL`。
k = 6:这是一行新插入的数据。其中 `v1` 和 `v3` 被赋予了指定值,而未指定的列 `v2` 和 `v4` 使用了表结构中定义的 `DEFAULT` 值(`9876` 和 `1234`),`v5` 列无默认值,因此为 `NULL`。