通过导入实现列更新

本文介绍如何在云数据库 SelectDB 版实例中,通过数据导入实现高效的数据更新

概述

SelectDBUniqueAggregate模型中,可以通过数据导入实现列更新能力。列更新支持高频的并发写入,写入成功后数据即可见,系统自动通过MVCC机制来保证并发写入的数据正确性。Unique模型和Aggregate模型详情请参见数据模型

适用场景

列更新的适用场景如下。

  • 实时的动态列更新,需要在表中实时的高频更新某些字段值。例如,T+1生成的用户标签表中有一些关于用户最新行为信息的字段需要实时更新,以实现广告或推荐等系统能够据其进行实时的分析和决策。

  • 将多张源表拼接成一张大宽表。

  • 数据修正。

实现方式

UniqueAggregate模型下,通过数据导入实现列更新。

Unique模型

SelectDB Unique表中,默认的数据写入语义是整行Upsert。在老版本中,想要更新某些行的一部分字段,只能通过UPDATE命令,但是UPDATE命令由于读写事务的锁粒度原因,并不适合高频的数据写入场景。因此SelectDBUnique模型中引入了新版的列更新。当前Unique模型仅支持在Merge-on-Write实现上,通过导入进行列更新。

通过正常的导入方式将一部分列的数据写入SelectDBMemtable,此时Memtable中并没有整行数据。在Memtable下刷的时候,会查找历史数据,用历史数据补齐一整行,并写入数据文件中,同时将历史数据文件中相同Key的数据行标记删除。

当出现并发导入时,SelectDB会利用MVCC机制来保证数据的正确性。如果两批数据导入都更新了一个相同Key的不同列,则其中系统版本较高的导入任务会在版本较低的导入任务成功后,再使用版本较低的导入任务写入的数据行重新进行补齐。

由于Merge-on-Write实现需要在数据写入的时候,进行整行数据的补齐以保证最优的查询性能,因此使用Merge-on-Write实现进行列更新会有较为明显的导入性能下降。为进一步优化写入性能,推荐

开启行存,开启行存将能够大大减少补齐数据时产生的IOPS,导入性能提升明显,可以在建表时通过如下属性(property)来开启行存。

"store_row_column" = "true"

Aggregate模型

Aggregate模型的表主要在预聚合场景使用而非数据更新的场景使用,但也可以通过将聚合函数设置为REPLACE_IF_NOT_NULL,来实现通过导入进行列更新。

Aggregate模型在写入过程中不做任何额外处理,所以写入性能不受影响,与普通的数据导入相同。但是在查询时进行聚合的代价较大,典型的聚合查询性能相比Unique模型的Merge-on-Write实现会有5-10倍的下降。

说明

无法通过聚合函数REPLACE_IF_NOT_NULL将某个字段由非NULL设置为NULL,写入的NULL值在REPLACE_IF_NOT_NULL聚合函数的处理中会自动忽略。

模型选择建议

  • 对写入性能要求较高、查询性能要求较低的用户,建议使用Aggregate模型。

  • 对查询性能要求较高、写入性能要求不高(例如数据的写入和更新基本都在凌晨低峰期完成),或者写入频率不高的用户,建议使用Unique模型Merge-on-Write实现。

使用示例

Unique模型

建表

建表时指定enable_unique_key_merge_on_write属性,开启Merge-on-Write实现,设置订单IDKey列,订单状态和订单金额为Value列。创建表,示例如下。

CREATE TABLE `order_tbl` (
    `order_id` int(11) NULL,
    `order_amount` int(11) NULL,
    `order_status` varchar(100) NULL
) ENGINE=OLAP
UNIQUE KEY(`order_id`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`order_id`) BUCKETS 1
PROPERTIES (
    "enable_unique_key_merge_on_write" = "true"
);

向订单表order_tbl插入初始数如下。

INSERT INTO order_tbl (order_id, order_amount, order_status) values (1,100,'待付款');

查询订单表order_tbl数据如下。

+----------+--------------+--------------+
| order_id | order_amount | order_status |
+----------+--------------+--------------+
| 1        |          100 | 待付款        |
+----------+--------------+--------------+
1 row in set (0.01 sec)

当在用户点击付款后,SelectDB需要将订单ID为'1'的订单状态变更为'待发货',可采用如下三种数据导入方式来实现列更新。

StreamLoad/BrokerLoad/RoutineLoad

如果使用的是StreamLoad/BrokerLoad/RoutineLoad,导入时在Header中添加如下属性。

partial_columns:true

新建文件data.csv,示例如下。

1,待发货

同时在columns中指定要导入的列(必须包含所有Key列,否则无法更新),示例如下。

$ curl  --location-trusted -u root: -H "expect:100-continue" -H "partial_columns:true" -H "column_separator:," -H "columns:order_id,order_status" -T data.csv http://127.0.0.1:48037/api/db1/order_tbl/_stream_load

更新后,查询结果如下。

+----------+--------------+--------------+
| order_id | order_amount | order_status |
+----------+--------------+--------------+
| 1        |          100 | 待发货        |
+----------+--------------+--------------+
1 row in set (0.01 sec)

INSERT INTO

在所有的数据模型中,INSERT INTO在仅给定一部分列的场合,默认都是整行写入。为了防止误用,在Merge-on-Write实现中,INSERT INTO默认仍然保持整行UPSERT的语义,如果需要开启列更新的语义,需要设置会话变量如下。

set enable_unique_key_partial_update=true

使用INSERT INTO进行列更新的样例如下。

INSERT INTO order_tbl (order_id, order_status) values (1,'待发货');

控制Insert语句是否开启严格模式的会话变量enable_insert_strict的默认值为true,即insert语句默认开启严格模式,而在严格模式下进行列更新不允许更新不存在的Key。因此在使用insert语句进行列更新时,如果希望能插入不存在的Key,需要在enable_unique_key_partial_update设置为true的基础上同时将enable_insert_strict设置为false。

Flink Connector

如果使用Flink Connector,需要添加如下配置。

'sink.properties.partial_columns' = 'true',

同时在sink.properties.column中指定要导入的列(必须包含所有Key列,不然无法更新)。

Aggregate模型

建表

将需要进行列更新的字段对应的聚合函数设置为REPLACE_IF_NOT_NULL

CREATE TABLE `order_tbl` (
    `order_id` int(11) NULL,
    `order_amount` int(11) REPLACE_IF_NOT_NULL NULL,
    `order_status` varchar(100) REPLACE_IF_NOT_NULL NULL
) ENGINE=OLAP
AGGREGATE KEY(`order_id`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`order_id`) BUCKETS 1
PROPERTIES (
    "replication_allocation" = "tag.location.default: 1"
);

数据写入

无论是导入任务还是INSERT INTO, 直接写入需要更新的字段的数据即可。

通过Stream Load导入,对应的Stream Load命令如下(不需要额外的Header)。

curl --location-trusted -u root: -H "expect:100-continue" -H "column_separator:," -H "columns:order_id,order_status" -T /tmp/update.csv http://127.0.0.1:48037/api/db1/order_tbl/_stream_load

对应的INSERT INTO语句如下(不需要额外设置session variable)。

INSERT INTO order_tbl (order_id, order_status) values (1,'待发货');