本文为您介绍Delta Table支持对部分列进行更新的使用场景和参数配置,以及Flink Connector设计的两种部分列更新模式的介绍与相关配置。
背景信息
UPSERT操作:结合了插入(INSERT)和更新(UPDATE)特性的数据库功能,它通过确保每个经过UPSERT处理的记录(或行)都必须包含主键列,实现了高效的数据操作。
UPSERT行为:取决于表中是否存在指定主键的数据。
插入语义:当表中不存在指定主键的数据时,UPSERT将执行插入操作,将新记录添加到表中。
更新语义:当表中已存在指定主键的记录时,UPSERT将执行更新操作,用提供的新数据更新现有数据。
UPSERT场景:流处理的多表连接中,涉及两个不同数据流的更新操作影响同一表内的不同列。
数据流
StreamA
负责更新列ColumnX
。数据流
StreamB
负责更新列ColumnY
。
UPSERT形式比较:
传统UPSERT:
StreamB
的更新可能会覆盖StreamA
对数据所做的修改,从而导致数据一致性问题。部分列更新功能:确保了在执行并发更新时,各个流之间不会发生冲突。它们只更新各自负责的列,同时保留同一行中所有流的更新结果。
使用场景
场景一:更新同行不同列,互不干扰
假设存在一个用户信息管理系统,该系统需要实时处理和更新用户数据。这些数据被两个独立的服务流处理,它们分别从不同的数据源接收信息。
数据流
StreamA
负责处理用户的个人信息,如姓名、年龄和性别。数据流
StreamB
负责处理用户的联系信息,如邮箱和电话号码。
在实际业务中,用户的个人信息和联系信息可能会几乎同时发生变化。我们需要确保这些更新能够即时反映在用户信息管理系统中,并不会相互覆盖。
操作流程
用户在不同的平台更新了姓名和电话号码。
StreamA
接收到了姓名的更新,StreamB
接收到了电话号码的更新。StreamA
和StreamB
均将更新发送到用户信息管理系统。
最终结果
不使用部分列更新:若
StreamB
的更新在StreamA
之后到达并处理,它将覆盖StreamA
刚刚更新的姓名信息(如果StreamB
以全行更新的方式进行操作),从而导致姓名恢复为旧值。使用部分列更新:
StreamA
进行更新时,只针对姓名列进行操作,而不会触及联系信息列。StreamB
进行更新时,只针对电话号码列进行操作,而不会触及个人信息列。
最终的结果是用户的姓名被更新为最新的信息,电话号码也被更新为最新的信息。而且这些更新是独立进行的,互不干扰,确保了用户信息的完整性和准确性。
在实际应用中,部分列更新功能对于处理用户信息等数据至关重要。这一功能不仅保证了数据更新的实时性,还有效避免了数据不一致的问题。
场景二:更新行内部分字段,其他不变
假设存在一个用户信息管理系统,该系统需要实时处理和更新用户数据。这些数据被两个独立的服务流处理,它们分别从不同的数据源接收信息。
StreamA
负责更新用户个人信息,如姓名、年龄、性别,以及用户的联系信息,如邮箱和电话号码。StreamB
负责更新用户个人信息,如姓名、年龄、性别,以及用户的联系信息,如邮箱和电话号码。与StreamA
任务一致。
操作流程
StreamA
只希望更新用户的年龄,命令可能是:INSERT INTO table (pk, age) VALUES (1, 3) ;
。同时,
StreamB
只希望更新用户的性别,命令可能是:INSERT INTO table (pk, sex) VALUES (1, 'male') ;
。
最终结果
不使用部分列更新:主键为1的记录若接收到上述更新命令,则除了待更新的字段外,其他所有字段将被设置为NULL。这将导致原有有效数据的丢失。
使用动态更新:
当插入操作被触发时,系统会识别到只有部分字段包含数据。
部分列更新机制会确保只有这些包含数据的字段被更新。
同时,那些在插入操作中没有提供数据的字段将保持其原有的值不变。
通过实现这种自动识别和更新策略,用户信息管理系统能够在不丢失任何已有有效数据的前提下,精确地更新仅需变更的字段。这显著提升了数据管理的灵活性和准确性,为维护数据完整性提供了坚实的保障。
Flink Connector模式概述
根据Delta Table部分列更新的使用场景,Flink Connector精心设计了两种部分列更新模式,以满足不同的数据更新需求。
静态模式
在静态模式下,用户需要提前指定将被数据流更新的列。这些被指定的列将遵循正常的UPSERT逻辑进行操作:
如果主键存在,则更新数据。
如果主键不存在,则插入新数据。
同时,未被指定更新的列将保持其现有值不变。这种模式适用于那些预期将会经常更改的列。
动态模式
动态模式赋予系统更高的智能化和自适应能力。在此模式下,系统能够自动检测数据流中哪些列包含非NULL值,并仅对这些值存在的列进行更新操作。这意味着数据流中未携带值(即值为NULL)的列将保持原样不变。动态模式尤其适用于无法预先确定哪些列会发生变化的情况,确保了数据流每次更新的准确性和高效性。
通过引入这两种更新模式,Flink Connector为用户提供了更加灵活和强大的数据处理能力,允许他们根据实际情况选择最合适的数据更新策略,从而保障了数据的准确性和完整性。
以下是不同模式每次更新同样数据后的结果:
本示例数据的第一列a
为主键。静态模式下,主键列默认被选中。
模式 | 初始数据 | 第一步:更新(a, b, c)后数据 | 第二步:更新(a, d, null)后数据 | 第三步:更新(a, null, e)后最终数据 |
常规模式 | (null, null, null) | (a, b, c) | (a, d, null) | (a, null, e) |
动态模式 | (null, null, null) | (a, b, c) | (a, d, c) | (a, d, e) |
静态模式(指定第二列更新) | (null, null, null) | (a, b, null) | (a, d, null) | (a, null, null) |
使用方法
创建Delta Table并开启部分列更新
具体方式是在tblproperties
配置如下参数:acid.partial.fields.update.enable=true
。详情请参见Delta Table表参数。
语法示例如下:
CREATE TABLE IF NOT EXISTS partial_upsert_test
(pk INT NOT NULL,
c1 STRING,
c2 STRING,
c3 STRING,
primary key(pk))
TBLPROPERTIES('transactional'='true', 'acid.partial.fields.update.enable'='true');
Flink Connector配置示例
配置参数说明
为了配置部分列更新模式,MaxCompute引入了以下两个配置参数:
参数 | 参数说明 |
upsert.partial-column.enable | 此参数用于启动部分列更新功能。若不指定列名(即 |
upsert.partial-column.name | 此参数用于指定需要更新的列名。如果设置了此参数,系统将仅更新列出的字段,其他字段保持原值不变。 说明 主键列默认已选中,目前不能将分区列名加入到该参数中。 |
动态部分列更新配置示例
创建一个开启动态部分列更新的表,示例如下。
CREATE TABLE partialtable (
pk INT,
c1 STRING,
c2 STRING,
c3 STRING,
PRIMARY KEY(pk) NOT ENFORCED
) WITH (
'connector' = 'maxcompute',
'odps.end.point' = 'https://service.cn-hangzhou-vpc.maxcompute.aliyun-inc.com/api', //VPC网络连接
'odps.project.name' = 'project_name',
'odps.namespace.schema' = 'true', //支持三层模型
'table.name' = 'project.schema.tablename',
'sink.operation' = 'upsert',
'upsert.write.bucket.num' = '1',
'upsert.partial-column.enable' = 'true',
'odps.access.id' = 'yourAccessId',
'odps.access.key' = 'yourAccessKey'
);
后续对表的操作及结果示例如下。
向表中插入数据 [1,a, b, c],其中第一列为主键,初始数据为
[1, a, b, c]
。INSERT INTO partialtable VALUES (1, 'a', 'b', 'c');
仅更新主键为
1
的记录的第二列c2
为d
,更新后数据为[1, a, d, c]
。INSERT INTO partialtable(pk, c2) VALUES (1, 'd');
仅更新主键为
1
的记录的第三列c3
为e
,更新后数据为[1, a, d, e]
。INSERT INTO partialtable(pk, c3) VALUES (1, 'e');
静态部分列更新配置示例
创建一个只对c2
列进行更新的表,接下来,对该表的操作将只影响 'c2' 列,其他列将保持不变。示例如下。
CREATE TABLE PartialTable2 (
pk INT,
c1 STRING,
c2 STRING,
c3 STRING,
PRIMARY KEY(pk) NOT ENFORCED
) WITH (
'connector' = 'maxcompute',
'odps.end.point' = 'https://service.cn-hangzhou-vpc.maxcompute.aliyun-inc.com/api', //VPC网络连接
'odps.project.name' = 'project_name',
'odps.namespace.schema' = 'true', //支持三层模型
'table.name' = 'project.schema.tablename',
'sink.operation' = 'upsert',
'upsert.write.bucket.num' = '1',
'upsert.partial-column.enable' = 'true',
'upsert.partial-column.name' = 'c2', // 指定只更新 'c2' 列
'odps.access.id' = 'yourAccessId',
'odps.access.key' = 'yourAccessKey'
);
在配置upsert.partial-column.name
参数时,应使用MaxCompute中表对应的列名,而不是Flink内部表的列名。这确保Flink能够正确识别并更新存储系统中的对应列。
相关文档
关于通过Flink写入数据至Delta Table的操作实践,请参见使用Flink写入数据到Delta Table。您可以参考实践过程进行部分列更新的参数配置来实现业务需求。
关于部分列更新的相关介绍,请参见部分列更新。