Sequence列控制更新

当您在Unique模型中导入数据并且需要保证数据正确时,云数据库 SelectDB 版为您提供了通过Sequence列实现条件更新的功能,使您在导入数据时可以控制列的替换顺序,进而保证您的数据正确。

前提条件

表的数据模型为Unique模型。SelectDB数据模型详情请参见数据模型

概述

Unique模型一般用于唯一主键的场景,可以保证主键唯一性约束,但是由于使用REPLACE聚合方式,在同一批次中导入的数据不保证替换顺序。替换顺序无法保证则无法确定最终导入到表中的具体数据。为了解决这个问题,SelectDB提供了Sequence列,在导入时可指定Sequence列,通过指定Sequence列可以控制替换顺序。相同Key列的情况下,REPLACE聚合类型的列将按照Sequence列的值进行替换,较大值可以替换较小值,反之则无法替换。

基本原理

通过增加一个隐藏列__DORIS_SEQUENCE_COL__实现。该列的类型由用户在建表时指定,在导入时确定该列具体值,并依据该值对REPLACE列进行替换。

导入

导入时,FE在解析过程中将隐藏列的值设置成order by表达式的值(针对Broker LoadRoutine Load),或者function_column.sequence_col表达式的值(针对Stream Load),Value列将依据该值进行替换。隐藏列__DORIS_SEQUENCE_COL__的值既可以是表结构中的已有列,也可以采用数据源中的一列。

读取

请求包含Value列时需要额外读取__DORIS_SEQUENCE_COL__列,该列是在相同Key列下REPLACE聚合函数替换顺序的依据,较大值可以替换较小值,反之则不能替换。

说明

Base CompactionCumulative Compaction也遵循读取的原理。

启用Sequence column支持

建表时启用Sequence column支持

Sequence列建表时有两种方式,一种是建表时设置sequence_col属性,一种是建表时设置sequence_type属性。建议在创建表时设置sequence_col属性。

  • 设置sequence_col

    推荐使用指定Sequence列类型为sequence_col。因为导入数据时无需感知Sequence列,和普通导入使用方式一致,使用简单。

    创建Unique表时,指定Sequence列类型为sequence_colsequence_col指定Sequence列到表中某一列的映射,该列可以为整型和时间类型(DATE、DATETIME),示例如下。

    PROPERTIES ( "function_column.sequence_col" = 'column_name');
    说明

    创建后不能更改该列的类型。

  • 设置sequence_type

    创建Unique表时,指定Sequence列类型为sequence_typesequence_type列可以为整型和时间类型(DATE、DATETIME)。示例如下。

    PROPERTIES ( "function_column.sequence_type" = 'column_name');
    说明

    导入时需要指定Sequence列到其他列的映射。

建表后启用Sequence column支持

一个不支持Sequence column的表,可以使用如下语句来启用支持Sequence column,示例如下。

ALTER TABLE db_name.table_name
ENABLE FEATURE "SEQUENCE_LOAD" 
WITH PROPERTIES ("function_column.sequence_type" = "column_name")

查看Sequence column支持

不确定一个表是否支持Sequence column,可以通过设置一个会话变量来显示隐藏列SET show_hidden_columns=true,之后使用desc tablename,如果输出中有__DORIS_SEQUENCE_COL__列则支持,如果没有则不支持。

使用方式

以下分别介绍三种不同的导入方式设置数据源中的某列为Sequence列。

Stream Load

Stream Load场景下,可以通过Header中的function_column.sequence_col字段指定columns中某列作为Sequence列,示例如下。

curl --location-trusted -u root -H "columns: k1,k2,source_sequence,v1,v2" -H "function_column.sequence_col: source_sequence" -T testData http://host:port/api/testDb/testTbl/_stream_load

Broker Load

ORDER BY处设置数据源中的某列作为Sequence列,示例如下。

LOAD LABEL db1.label1
(
    DATA INFILE("hdfs://host:port/user/data/*/test.txt")
    INTO TABLE `tbl1`
    COLUMNS TERMINATED BY ","
    (k1,k2,source_sequence,v1,v2)
    ORDER BY source_sequence
)
WITH BROKER 'broker'
(
    "username"="user",
    "password"="pass"
)
PROPERTIES
(
    "timeout" = "3600"
);

Routine Load

映射方式同上,示例如下。

   CREATE ROUTINE LOAD example_db.test1 ON example_tbl 
    [WITH MERGE|APPEND|DELETE]
    COLUMNS(k1, k2, source_sequence, v1, v2),
    WHERE k1 > 100 and k2 like "%doris%"
    [ORDER BY source_sequence]
    PROPERTIES
    (
        "desired_concurrent_number"="3",
        "max_batch_interval" = "20",
        "max_batch_rows" = "300000",
        "max_batch_size" = "209715200",
        "strict_mode" = "false"
    )
    FROM KAFKA
    (
        "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
        "kafka_topic" = "my_topic",
        "kafka_partitions" = "0,1,2,3",
        "kafka_offsets" = "101,0,0,200"
    );

使用示例

如下以Stream Load导入数据为例。

  1. 创建Unique模型的test_table数据表,并指定Sequence列映射到表中的modify_date列。

    CREATE TABLE test_db.test_table
    (
        user_id bigint,
        date date,
        group_id bigint,
        modify_date date,
        keyword VARCHAR(128)
    )
    UNIQUE KEY(user_id, date, group_id)
    DISTRIBUTED BY HASH (user_id) BUCKETS 32
    PROPERTIES(
        "function_column.sequence_col" = 'modify_date',
        "enable_unique_key_merge_on_write" = "true"
    );
  2. 表结构如下。

    DESC test_table;
    +-------------+--------------+------+-------+---------+---------+
    | Field       | Type         | Null | Key   | Default | Extra   |
    +-------------+--------------+------+-------+---------+---------+
    | user_id     | BIGINT       | No   | true  | NULL    |         |
    | date        | DATE         | No   | true  | NULL    |         |
    | group_id    | BIGINT       | No   | true  | NULL    |         |
    | modify_date | DATE         | No   | false | NULL    | REPLACE |
    | keyword     | VARCHAR(128) | No   | false | NULL    | REPLACE |
    +-------------+--------------+------+-------+---------+---------+
  3. 创建文件,需要导入的文本数据如下。

    1,2020-02-22,1,2020-02-21,a
    1,2020-02-22,1,2020-02-22,b
    1,2020-02-22,1,2020-03-05,c
    1,2020-02-22,1,2020-02-26,d
    1,2020-02-22,1,2020-02-23,e
    1,2020-02-22,1,2020-02-24,b
  1. Stream Load为例,导入数据,示例如下。

    curl --location-trusted -u root: -T data.csv -H "expect:100-continue" -H "column_separator:," http://host:port/api/test_db/test_table/_stream_load
  2. 查询导入结果,结果如下。

    SELECT * FROM test_table;
    +---------+------------+----------+-------------+---------+
    | user_id | date       | group_id | modify_date | keyword |
    +---------+------------+----------+-------------+---------+
    |       1 | 2020-02-22 |        1 | 2020-03-05  | c       |
    +---------+------------+----------+-------------+---------+

    因此这次导入中,因Sequence column的值(也就是modify_date中的值)里“2020-03-05”为最大值,所以keyword列中最终保留了c。

  3. 创建文件,然后通过Stream Load将文件中数据导入,示例如下。

    1,2020-02-22,1,2020-02-22,a
    1,2020-02-22,1,2020-02-23,b
  4. 查询导入结果,结果如下。

    SELECT * FROM test_table;
    +---------+------------+----------+-------------+---------+
    | user_id | date       | group_id | modify_date | keyword |
    +---------+------------+----------+-------------+---------+
    |       1 | 2020-02-22 |        1 | 2020-03-05  | c       |
    +---------+------------+----------+-------------+---------+

    查询这次导入的数据,会比较所有已导入数据的Sequence列(也就是modify_date),其中“2020-03-05”仍为最大值,所以keyword列中最终保留了c。

  5. 创建文件,然后通过Stream Load将文件中数据导入,示例如下。

    1,2020-02-22,1,2020-02-22,a
    1,2020-02-22,1,2020-03-23,w
  6. 查询导入结果,结果如下。

    SELECT * FROM test_table;
    +---------+------------+----------+-------------+---------+
    | user_id | date       | group_id | modify_date | keyword |
    +---------+------------+----------+-------------+---------+
    |       1 | 2020-02-22 |        1 | 2020-03-23  | w       |
    +---------+------------+----------+-------------+---------+

    此时Sequence列的最大值为“2020-03-23”,因此替换表中原有的数据。

    综上,在导入过程中,会比较所有批次的Sequence列值,选择值最大的记录导入SelectDB表中。

说明
  • Stream LoadBroker Load等导入任务以及行更新Insert语句中,用户必须显式指定Sequence列(除非Sequence列的默认值为CURRENT_TIMESTAMP),否则会产生报错信息“Table test_table has sequence column, need to specify the sequence column”。

  • 在部分列更新导入中,用户每次可以只更新一部分列,并不必须要包含Sequence列。若用户提交的导入任务中包含Sequence列,则无影响;若用户提交的导入任务不包含Sequence列,SelectDB会使用匹配的历史数据中的Sequence列作为更新后该行的Sequence列的值。如果历史数据中不存在相同Key的列,则会自动用NULL或默认值填充。