Flink近实时部分列更新写入Delta Table

本文为您介绍Delta Table支持对部分列进行更新的使用场景和参数配置,以及Flink Connector设计的两种部分列更新模式的介绍与相关配置。

背景信息

  • UPSERT操作:结合了插入(INSERT)和更新(UPDATE)特性的数据库功能,它通过确保每个经过UPSERT处理的记录(或行)都必须包含主键列,实现了高效的数据操作。

  • UPSERT行为:取决于表中是否存在指定主键的数据。

    • 插入语义:当表中不存在指定主键的数据时,UPSERT将执行插入操作,将新记录添加到表中。

    • 更新语义:当表中已存在指定主键的记录时,UPSERT将执行更新操作,用提供的新数据更新现有数据。

  • UPSERT场景:流处理的多表连接中,涉及两个不同数据流的更新操作影响同一表内的不同列。

    • 数据流StreamA负责更新列ColumnX

    • 数据流StreamB负责更新列ColumnY

  • UPSERT形式比较:

    • 传统UPSERT:StreamB的更新可能会覆盖StreamA对数据所做的修改,从而导致数据一致性问题。

    • 部分列更新功能:确保了在执行并发更新时,各个流之间不会发生冲突。它们只更新各自负责的列,同时保留同一行中所有流的更新结果。

使用场景

场景一:更新同行不同列,互不干扰

假设存在一个用户信息管理系统,该系统需要实时处理和更新用户数据。这些数据被两个独立的服务流处理,它们分别从不同的数据源接收信息。

  • 数据流StreamA负责处理用户的个人信息,如姓名、年龄和性别。

  • 数据流StreamB负责处理用户的联系信息,如邮箱和电话号码。

在实际业务中,用户的个人信息和联系信息可能会几乎同时发生变化。我们需要确保这些更新能够即时反映在用户信息管理系统中,并不会相互覆盖。

操作流程

  1. 用户在不同的平台更新了姓名和电话号码。StreamA接收到了姓名的更新,StreamB接收到了电话号码的更新。

  2. StreamAStreamB均将更新发送到用户信息管理系统。

最终结果

  • 不使用部分列更新:若StreamB的更新在StreamA之后到达并处理,它将覆盖StreamA刚刚更新的姓名信息(如果StreamB以全行更新的方式进行操作),从而导致姓名恢复为旧值。

  • 使用部分列更新

    • StreamA进行更新时,只针对姓名列进行操作,而不会触及联系信息列。

    • StreamB进行更新时,只针对电话号码列进行操作,而不会触及个人信息列。

    最终的结果是用户的姓名被更新为最新的信息,电话号码也被更新为最新的信息。而且这些更新是独立进行的,互不干扰,确保了用户信息的完整性和准确性。

在实际应用中,部分列更新功能对于处理用户信息等数据至关重要。这一功能不仅保证了数据更新的实时性,还有效避免了数据不一致的问题。

场景二:更新行内部分字段,其他不变

假设存在一个用户信息管理系统,该系统需要实时处理和更新用户数据。这些数据被两个独立的服务流处理,它们分别从不同的数据源接收信息。

  • StreamA负责更新用户个人信息,如姓名、年龄、性别,以及用户的联系信息,如邮箱和电话号码。

  • StreamB负责更新用户个人信息,如姓名、年龄、性别,以及用户的联系信息,如邮箱和电话号码。与StreamA任务一致。

操作流程

  1. StreamA只希望更新用户的年龄,命令可能是:INSERT INTO table (pk, age) VALUES (1, 3) ;

  2. 同时,StreamB只希望更新用户的性别,命令可能是:INSERT INTO table (pk, sex) VALUES (1, 'male') ;

最终结果

  • 不使用部分列更新:主键为1的记录若接收到上述更新命令,则除了待更新的字段外,其他所有字段将被设置为NULL。这将导致原有有效数据的丢失。

  • 使用动态更新

    • 当插入操作被触发时,系统会识别到只有部分字段包含数据。

    • 部分列更新机制会确保只有这些包含数据的字段被更新。

    • 同时,那些在插入操作中没有提供数据的字段将保持其原有的值不变。

通过实现这种自动识别和更新策略,用户信息管理系统能够在不丢失任何已有有效数据的前提下,精确地更新仅需变更的字段。这显著提升了数据管理的灵活性和准确性,为维护数据完整性提供了坚实的保障。

Flink Connector模式概述

根据Delta Table部分列更新的使用场景,Flink Connector精心设计了两种部分列更新模式,以满足不同的数据更新需求。

静态模式

在静态模式下,用户需要提前指定将被数据流更新的列。这些被指定的列将遵循正常的UPSERT逻辑进行操作:

  • 如果主键存在,则更新数据。

  • 如果主键不存在,则插入新数据。

同时,未被指定更新的列将保持其现有值不变。这种模式适用于那些预期将会经常更改的列。

动态模式

动态模式赋予系统更高的智能化和自适应能力。在此模式下,系统能够自动检测数据流中哪些列包含非NULL值,并仅对这些值存在的列进行更新操作。这意味着数据流中未携带值(即值为NULL)的列将保持原样不变。动态模式尤其适用于无法预先确定哪些列会发生变化的情况,确保了数据流每次更新的准确性和高效性。

通过引入这两种更新模式,Flink Connector为用户提供了更加灵活和强大的数据处理能力,允许他们根据实际情况选择最合适的数据更新策略,从而保障了数据的准确性和完整性。

以下是不同模式每次更新同样数据后的结果:

说明

本示例数据的第一列a为主键。静态模式下,主键列默认被选中。

模式

初始数据

第一步:更新(a, b, c)后数据

第二步:更新(a, d, null)后数据

第三步:更新(a, null, e)后最终数据

常规模式

(null, null, null)

(a, b, c)

(a, d, null)

(a, null, e)

动态模式

(null, null, null)

(a, b, c)

(a, d, c)

(a, d, e)

静态模式(指定第二列更新)

(null, null, null)

(a, b, null)

(a, d, null)

(a, null, null)

使用方法

创建Delta Table并开启部分列更新

具体方式是在tblproperties配置如下参数:acid.partial.fields.update.enable=true。详情请参见Delta Table表参数

语法示例如下:

CREATE TABLE IF NOT EXISTS partial_upsert_test
  (pk INT NOT NULL, 
   c1 STRING, 
   c2 STRING, 
   c3 STRING, 
   primary key(pk)) 
TBLPROPERTIES('transactional'='true', 'acid.partial.fields.update.enable'='true');

Flink Connector配置示例

配置参数说明

为了配置部分列更新模式,MaxCompute引入了以下两个配置参数:

参数

参数说明

upsert.partial-column.enable

此参数用于启动部分列更新功能。若不指定列名(即upsert.partial-column.name参数为空),则系统将采用动态模式(更新非NULL字段)进行更新。

upsert.partial-column.name

此参数用于指定需要更新的列名。如果设置了此参数,系统将仅更新列出的字段,其他字段保持原值不变。

说明

主键列默认已选中,目前不能将分区列名加入到该参数中

动态部分列更新配置示例

创建一个开启动态部分列更新的表,示例如下。

CREATE TABLE partialtable (
  pk INT,
  c1 STRING, 
  c2 STRING, 
  c3 STRING,
  PRIMARY KEY(pk) NOT ENFORCED
) WITH (
  'connector' = 'maxcompute',
  'odps.end.point' = 'https://service.cn-hangzhou-vpc.maxcompute.aliyun-inc.com/api', //VPC网络连接
  'odps.project.name' = 'project_name',
  'odps.namespace.schema' = 'true', //支持三层模型
  'table.name' = 'project.schema.tablename',
  'sink.operation' = 'upsert',
  'upsert.write.bucket.num' = '1',
  'upsert.partial-column.enable' = 'true', 
  'odps.access.id' = 'yourAccessId',
  'odps.access.key' = 'yourAccessKey'
);

后续对表的操作及结果示例如下。

  1. 向表中插入数据 [1,a, b, c],其中第一列为主键,初始数据为[1, a, b, c]

    INSERT INTO partialtable VALUES (1, 'a', 'b', 'c'); 
  2. 仅更新主键为1的记录的第二列 c2d,更新后数据为[1, a, d, c]

    INSERT INTO partialtable(pk, c2) VALUES (1, 'd'); 
  3. 仅更新主键为1的记录的第三列 c3e,更新后数据为[1, a, d, e]

    INSERT INTO partialtable(pk, c3) VALUES (1, 'e'); 

静态部分列更新配置示例

创建一个只对c2列进行更新的表,接下来,对该表的操作将只影响 'c2' 列,其他列将保持不变。示例如下。

CREATE TABLE PartialTable2 (
  pk INT,
  c1 STRING, 
  c2 STRING, 
  c3 STRING,
  PRIMARY KEY(pk) NOT ENFORCED
) WITH (
  'connector' = 'maxcompute',
  'odps.end.point' = 'https://service.cn-hangzhou-vpc.maxcompute.aliyun-inc.com/api', //VPC网络连接
  'odps.project.name' = 'project_name',
  'odps.namespace.schema' = 'true', //支持三层模型
  'table.name' = 'project.schema.tablename',
  'sink.operation' = 'upsert',
  'upsert.write.bucket.num' = '1',
  'upsert.partial-column.enable' = 'true', 
  'upsert.partial-column.name' = 'c2', // 指定只更新 'c2' 列
  'odps.access.id' = 'yourAccessId',
  'odps.access.key' = 'yourAccessKey'
);
说明

在配置upsert.partial-column.name参数时,应使用MaxCompute中表对应的列名,而不是Flink内部表的列名。这确保Flink能够正确识别并更新存储系统中的对应列。

相关文档

  • 关于通过Flink写入数据至Delta Table的操作实践,请参见使用Flink写入数据到Delta Table。您可以参考实践过程进行部分列更新的参数配置来实现业务需求。

  • 关于部分列更新的相关介绍,请参见部分列更新