本文介绍如何在云数据库 SelectDB 版实例中,通过数据导入实现高效的数据更新。
概述
在SelectDB的Unique或Aggregate模型中,可以通过数据导入实现列更新能力。列更新支持高频的并发写入,写入成功后数据即可见,系统自动通过MVCC机制来保证并发写入的数据正确性。Unique模型和Aggregate模型详情请参见数据模型。
适用场景
列更新的适用场景如下。
实时的动态列更新,需要在表中实时的高频更新某些字段值。例如,T+1生成的用户标签表中有一些关于用户最新行为信息的字段需要实时更新,以实现广告或推荐等系统能够据其进行实时的分析和决策。
将多张源表拼接成一张大宽表。
数据修正。
实现方式
在Unique或Aggregate模型下,通过数据导入实现列更新。
Unique模型
SelectDB Unique表中,默认的数据写入语义是整行Upsert。在老版本中,想要更新某些行的一部分字段,只能通过UPDATE
命令,但是UPDATE
命令由于读写事务的锁粒度原因,并不适合高频的数据写入场景。因此SelectDB在Unique模型中引入了新版的列更新。当前Unique模型仅支持在Merge-on-Write实现上,通过导入进行列更新。
通过正常的导入方式将一部分列的数据写入SelectDB的Memtable,此时Memtable中并没有整行数据。在Memtable下刷的时候,会查找历史数据,用历史数据补齐一整行,并写入数据文件中,同时将历史数据文件中相同Key的数据行标记删除。
当出现并发导入时,SelectDB会利用MVCC机制来保证数据的正确性。如果两批数据导入都更新了一个相同Key的不同列,则其中系统版本较高的导入任务会在版本较低的导入任务成功后,再使用版本较低的导入任务写入的数据行重新进行补齐。
由于Merge-on-Write实现需要在数据写入的时候,进行整行数据的补齐以保证最优的查询性能,因此使用Merge-on-Write实现进行列更新会有较为明显的导入性能下降。为进一步优化写入性能,推荐
开启行存,开启行存将能够大大减少补齐数据时产生的IOPS,导入性能提升明显,可以在建表时通过如下属性(property)来开启行存。
"store_row_column" = "true"
Aggregate模型
Aggregate模型的表主要在预聚合场景使用而非数据更新的场景使用,但也可以通过将聚合函数设置为REPLACE_IF_NOT_NULL
,来实现通过导入进行列更新。
Aggregate模型在写入过程中不做任何额外处理,所以写入性能不受影响,与普通的数据导入相同。但是在查询时进行聚合的代价较大,典型的聚合查询性能相比Unique模型的Merge-on-Write实现会有5-10倍的下降。
无法通过聚合函数REPLACE_IF_NOT_NULL将某个字段由非NULL设置为NULL,写入的NULL值在REPLACE_IF_NOT_NULL
聚合函数的处理中会自动忽略。
模型选择建议
对写入性能要求较高、查询性能要求较低的用户,建议使用Aggregate模型。
对查询性能要求较高、写入性能要求不高(例如数据的写入和更新基本都在凌晨低峰期完成),或者写入频率不高的用户,建议使用Unique模型Merge-on-Write实现。
使用示例
Unique模型
步骤一:建表
建表时指定enable_unique_key_merge_on_write
属性,开启Merge-on-Write实现。设置订单ID为Key列,订单状态和订单金额为Value列。示例如下。
CREATE TABLE `order_tbl` (
`order_id` int(11) NULL,
`order_amount` int(11) NULL,
`order_status` varchar(100) NULL
) ENGINE=OLAP
UNIQUE KEY(`order_id`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`order_id`) BUCKETS 1
PROPERTIES (
"enable_unique_key_merge_on_write" = "true"
);
步骤二:插入数据
向订单表order_tbl
插入初始数如下。
INSERT INTO order_tbl (order_id, order_amount, order_status) values (1,100,'待付款');
查询订单表order_tbl
数据如下。
+----------+--------------+--------------+
| order_id | order_amount | order_status |
+----------+--------------+--------------+
| 1 | 100 | 待付款 |
+----------+--------------+--------------+
1 row in set (0.01 sec)
步骤三:更新数据
当在用户点击付款后,SelectDB需要将订单ID为'1'的订单状态变更为'待发货',可采用如下三种数据导入方式来实现列更新。
StreamLoad/BrokerLoad/RoutineLoad
如果使用的是StreamLoad/BrokerLoad/RoutineLoad,导入时在Header中添加partial_columns:true
属性,具体操作如下:
新建文件
data.csv
,示例如下。1,待发货
导入数据。
在指令中的Header中添加
partial_columns:true
属性,并在columns
中指定要导入的列(必须包含所有Key列,否则无法更新),示例如下。$ curl --location-trusted -u root: -H "expect:100-continue" -H "partial_columns:true" -H "column_separator:," -H "columns:order_id,order_status" -T data.csv http://127.0.0.1:48037/api/db1/order_tbl/_stream_load
更新后,查询结果如下。
+----------+--------------+--------------+ | order_id | order_amount | order_status | +----------+--------------+--------------+ | 1 | 100 | 待发货 | +----------+--------------+--------------+ 1 row in set (0.01 sec)
INSERT INTO
在所有的数据模型中,INSERT INTO
在仅给定一部分列的场合,默认都是整行写入。为了防止误用,在Merge-on-Write实现中,INSERT INTO
默认仍然保持整行UPSERT的语义,如果需要开启列更新的语义,需要设置enable_unique_key_partial_update参数为true,设置语句如下所示。
set enable_unique_key_partial_update=true
此参数仅在表为Unique模型,且实现方式为写时合并(MOW,merge-on-write)时有效。
开启此参数后,如果控制INSERT语句是严格模式,即
enable_insert_strict
的值为true(默认值),则INSERT INTO语句仅具备更新功能,不具备插入新数据的功能。如果INSERT INTO语句中包含表内不存在的key,则会报错。开启此参数后,如果您期望在使用INSERT INTO语句进行部分列更新的同时,也能插入新数据,需要在
enable_unique_key_partial_update
设置为true的同时,将enable_insert_strict
设置为false。如何设置参数,请参见设置。
使用INSERT INTO
进行列更新的样例如下。
INSERT INTO order_tbl (order_id, order_status) values (1,'待发货');
Flink Connector
如果使用Flink Connector,需要添加如下配置。
'sink.properties.partial_columns' = 'true',
同时在sink.properties.column
中指定要导入的列(必须包含所有Key列,不然无法更新)。
Aggregate模型
建表
将需要进行列更新的字段对应的聚合函数设置为REPLACE_IF_NOT_NULL
。
CREATE TABLE `order_tbl` (
`order_id` int(11) NULL,
`order_amount` int(11) REPLACE_IF_NOT_NULL NULL,
`order_status` varchar(100) REPLACE_IF_NOT_NULL NULL
) ENGINE=OLAP
AGGREGATE KEY(`order_id`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`order_id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
数据写入
无论是导入任务还是INSERT INTO
, 直接写入需要更新的字段的数据即可。
通过Stream Load导入,对应的Stream Load命令如下(不需要额外的Header)。
curl --location-trusted -u root: -H "expect:100-continue" -H "column_separator:," -H "columns:order_id,order_status" -T /tmp/update.csv http://127.0.0.1:48037/api/db1/order_tbl/_stream_load
对应的INSERT INTO
语句如下(不需要额外设置session variable)。
INSERT INTO order_tbl (order_id, order_status) values (1,'待发货');