灵活部分列更新

更新时间:
复制为 MD 格式

SelectDB 早期版本仅支持简单的部分列更新功能,限制了一次导入中每一行必须更新相同的列。SelectDB 4.1及更高版本支持灵活部分列更新。该功能允许在单次导入作业中,不同数据行可以更新不同的列。这与早期版本要求每行必须更新相同列的限制形成对比,极大地提升了数据更新的灵活性,尤其适用于 CDC 实时同步等场景。

适用场景

在使用 CDC 的方式将某个数据系统的数据实时同步到 SelectDB 中时,源端系统输出的记录可能并不是完整的行数据,而是只有主键和被更新的列的数据。在这种情况下,某个时间窗口内的一批数据中每一行更新的列可能都是不同的。此时,可以使用灵活列更新的方式来将数据导入到 SelectDB 中。

前提条件与限制

表模型与功能限制

  1. 适用表模型:仅支持 Unique Key 模型的 Merge-on-Write 表。

  2. 列类型限制:不支持在有 Variant 列的表上进行灵活列更新。

  3. 物化视图限制:不支持在有同步物化视图的表上进行灵活列更新。

导入方式与数据要求

  1. 导入方式:目前只有 stream load 这一种导入方式以及使用 stream load 作为其导入方式的工具 (如 Flink Doris Connector) 支持灵活列更新功能。

  2. 数据格式:在使用灵活列更新时导入文件必须为 json 格式的数据。

  3. 主键要求:导入的每一行数据需要包括所有的主键列。不满足此要求的行将被过滤,计入 filtered rows 的计数中。如果 filtered rows 的数量超过了本次导入 max_filter_ratio 所允许的上限,则整个导入将会失败。被过滤的数据会在 error log 中留下一条日志。

  4. 列名匹配:导入的 JSON 对象中,只有键(Key)与目标表列名一致的键值对才是有效的,不满足此要求的键值对将被忽略。系统内部使用的列名(如 __DORIS_VERSION_COL__ 等)也将被忽略。

不支持的导入参数

使用灵活列更新时,Stream Load 作业的 Header 中不能指定或开启以下参数:

  • merge_type

  • delete

  • fuzzy_parse

  • columns

  • jsonpaths

  • hidden_columns

  • function_column.sequence_col

  • sql

  • memtable_on_sink_node

  • group_commit

  • where

使用方式

步骤一:为表开启功能

新建表时开启

对于新建的表,如果需要使用灵活列更新功能,建表时需在 `PROPERTIES` 中指定以下两个表属性。这会启用 Merge-on-Write 实现,并为表添加支持灵活列更新所需的隐藏列。

"enable_unique_key_merge_on_write" = "true",
"enable_unique_key_skip_bitmap_column" = "true"

为存量表开启

前提条件:确保目标表已是 Merge-on-Write 表(即 `PROPERTIES` 中已包含 `"enable_unique_key_merge_on_write" = "true"`),并已开启 light-schema-change 功能。

执行以下命令为存量表开启此功能:

ALTER TABLE db1.tbl1 ENABLE FEATURE "UPDATE_FLEXIBLE_COLUMNS";

验证方式:执行 show create table db1.tbl1,若结果的 `PROPERTIES` 中包含 "enable_unique_key_skip_bitmap_column" = "true",则表示功能开启成功。

步骤二:执行导入

Stream Load

在使用 Stream Load 导入时,在 HTTP Header 中添加以下参数:

unique_key_update_mode:UPDATE_FLEXIBLE_COLUMNS

如果使用 Flink Doris Connector, 需要添加如下 sink 配置:

'sink.properties.unique_key_update_mode' = 'UPDATE_FLEXIBLE_COLUMNS'

使用示例

1. 准备表和数据

假设有如下表 `t1`,已按要求开启灵活列更新功能:

CREATE TABLE t1 (
  `k` int(11) NULL, 
  `v1` BIGINT NULL,
  `v2` BIGINT NULL DEFAULT "9876",
  `v3` BIGINT NOT NULL,
  `v4` BIGINT NOT NULL DEFAULT "1234",
  `v5` BIGINT NULL
) UNIQUE KEY(`k`) DISTRIBUTED BY HASH(`k`) BUCKETS 1
PROPERTIES(
"replication_num" = "1",
"enable_unique_key_merge_on_write" = "true",
"enable_unique_key_skip_bitmap_column" = "true"
);

表中有如下原始数据:

+---+----+----+----+----+----+
| k | v1 | v2 | v3 | v4 | v5 |
+---+----+----+----+----+----+
| 0 | 0  | 0  | 0  | 0  | 0  |
| 1 | 1  | 1  | 1  | 1  | 1  |
| 2 | 2  | 2  | 2  | 2  | 2  |
| 3 | 3  | 3  | 3  | 3  | 3  |
| 4 | 4  | 4  | 4  | 4  | 4  |
| 5 | 5  | 5  | 5  | 5  | 5  |
+---+----+----+----+----+----+

2. 导入更新数据

准备 JSON 格式的数据文件 `test1.json`,其中包含删除、更新、新增等多种操作。

{"k": 0, "__DORIS_DELETE_SIGN__": 1}
{"k": 1, "v1": 10}
{"k": 2, "v2": 20, "v5": 25}
{"k": 3, "v3": 30}
{"k": 4, "v4": 20, "v1": 43, "v3": 99}
{"k": 5, "v5": null}
{"k": 6, "v1": 999, "v3": 777}
{"k": 2, "v4": 222}
{"k": 1, "v2": 111, "v3": 111}

通过 Stream Load 执行导入:

curl --location-trusted -u root: \
-H "strict_mode:false" \
-H "format:json" \
-H "read_json_by_line:true" \
-H "unique_key_update_mode:UPDATE_FLEXIBLE_COLUMNS" \
-T test1.json \
-XPUT http://<host>:<http_port>/api/d1/t1/_stream_load

3. 验证与分析结果

更新后查询表数据,结果如下:

+---+-----+------+-----+------+--------+
| k | v1  | v2   | v3  | v4   | v5     |
+---+-----+------+-----+------+--------+
| 1 | 10  | 111  | 111 | 1    | 1      |
| 2 | 2   | 20   | 2   | 222  | 25     |
| 3 | 3   | 3    | 30  | 3    | 3      |
| 4 | 43  | 4    | 99  | 20   | 4      |
| 5 | 5   | 5    | 5   | 5    | <null> |
| 6 | 999 | 9876 | 777 | 1234 | <null> |
+---+-----+------+-----+------+--------+

结果分析:

  • k = 0:该行数据已被删除(因 `__DORIS_DELETE_SIGN__` 字段)。

  • k = 1:该行数据在本次导入中被更新了两次。Doris 会合并更新,最终结果是 `v1` 更新为 `10`,`v2` 更新为 `111`,`v3` 更新为 `111`。未被更新的列保持原值。

  • k = 2:同理,该行数据在本次导入中也被更新了两次,最终结果是两次更新的合并。

  • k = 3:仅 `v3` 列被更新为 `30`。

  • k = 4:`v1`, `v3`, `v4` 列被更新。

  • k = 5:`v5` 列被显式更新为 `NULL`。

  • k = 6:这是一行新插入的数据。其中 `v1` 和 `v3` 被赋予了指定值,而未指定的列 `v2` 和 `v4` 使用了表结构中定义的 `DEFAULT` 值(`9876` 和 `1234`),`v5` 列无默认值,因此为 `NULL`。