Flink CDC Transform模块

本文向您介绍Flink CDC数据摄入作业的 Transform 模块支持的语法规则及内置函数。

Transform规则参数

Transform 模块支持用户直接操作数据列,可对现有列执行删除或扩展,同时支持在数据同步过程中过滤不需要的数据。您可以使用以下参数来定义Transform规则:

参数

含义

是否必填

备注

source-table

指定需要转换的上游表。

支持使用正则表达式。

projection

指定上游表的投影规则,决定了上游表转换后的所有数据列。

使用的句法与SQL SELECT语句类似。

不填则不追加或删除任何列。

详情请参见数据筛选,可使用的内置函数请参考内置函数文档。

filter

数据行过滤规则。

使用的句法与SQL WHERE语句类似。

不填则不过滤任何行。

primary-keys

指定转换后的主键列。

不填则保留原Schema的主键定义。主键列表使用英文逗号(,)分隔。

重要

自定义主键列时,您需要确保上游到来的数据符合主键约束。建议自定义主键列中包含上游表的主键列,以避免跨分区写入时的数据乱序问题。

partition-keys

指定转换后分区键列表。

不填则保留原Schema的分区键定义,分区键列表使用英文逗号(,)分隔。

重要

自定义分区列时,您需要确保上游到来的数据符合主键约束,以避免跨分区写入时的数据乱序问题。

table-options

传递给Sink的额外配置信息。

可选的属性列表,例如Paimon Sink的分桶数、注释等信息。

不同配置项通过,分割,配置项的键与值通过=分割。

配置示例:

key1=value1,key2=value2

description

转换规则的描述信息。

无。

converter-after-transform

在转换完成后对数据额外处理的转换器。

详情请见Transform 后转换器(Converter after Transform)

注意事项

  • 修改transform模块的语句后,需要无状态重新启动作业。

  • 通常情况下,projectionfilter语句无需使用引号包裹。

    transform:
      - projection: a, b, c
        # 等价于
      - projection: "a, b, c"

    然而,如果Projection表达式的第一个字符为*'等特殊字符,则整行表达式可能无法被作为合法的YAML字符串字面量解析。此时需要手动使用'"包裹整个表达式,或是使用\转义:

    transform:
      - projection: *, 42      # 不是合法的YAML
      - projection: '*, 42'    # OK
      - projection: \*, 42     # OK  

字段筛选

数据摄入Transform模块采用类 SQL 语法来定义字段筛选(Projection)规则,可以完成选取部分列、添加计算列、添加元数据列等功能。

列裁剪

如果您希望取出源表中某些特定列并同步给下游,可以在projection规则中将需要同步的列写出,未被指定的列将不会被发送给下游:

transform:
  - source-table: db.tbl
    projection: col_1, col_3, col_4 # col_2 会被裁剪
重要

裁剪部分列可能导致上游表发生结构变更时,上下游表结构失去同步。

通配符

如果您希望将源表中的所有列以及后续追加的新列按原样发送给下游,则可以在projection规则中使用星号(*)通配符。

说明

如果一个projection规则中没有使用通配符(*),则其产生的Schema就是固定的,并且始终与projection规则中写出的版本保持一致。

例如,*, 'extras' AS extras表示会在上游Schema的列尾追加额外的列,并持续将上游的表结构变更发送给下游。

transform:
  - source-table: db.tbl
    projection: \*, 'extras' AS extras

计算列

您可以在projection规则中使用<Expression> AS <ColName>句法来添加计算列,表达式将对上游的每条数据分别求值后填入相应列。

说明

计算列的表达式不能引用其他计算列的值,即使被引用的列出现在该计算列之前。例如a, b AS c, c AS d不是合法的表达式。

例如,在接收到来自上游db.tbl表的[+I, id = 1]数据记录时,将其转化为[+I, id = 1, inc_id = 2]数据行并发送给下游。

transform:
  - source-table: db.tbl
    projection: id, id + 1 AS inc_id

元数据列(Metadata Column)

在编写projection规则时,可以将以下预先定义的元数据列作为普通数据列使用:

重要

请勿定义与元数据列同名的普通数据列。

元数据列名称

数据类型

说明

__namespace_name__

String

这条数据变更记录对应源表的Namespace名称。

__schema_name__

String

这条数据变更记录对应源表的Schema名称。

__table_name__

String

这条数据变更记录对应源表的Table名称。

__data_event_type__

String

这条数据变更记录对应的操作类型(+I-U+U-D)。

重要

由于CDC Event总是将一次更新对应的Update BeforeUpdate After打包为一条事件,因此__data_event_type__的内容在同一条Update事件里分别为-U+U。请勿将其作为主键使用。

例如,将上游表的全限定名称写入计算列中,并发送给下游。

transform:
  - source-table: \.*.\.*
    projection: \*, __namespace_name__ || __schema_name__ || __table_name__ AS identifier

各个数据库连接器对Namespace、SchemaTable名称的映射关系如下表所示。

数据库类型

Namespace名称

Schema名称

Table名称

MySQL

Database

-

Table

Kafka

-

-

Topic

SLS

-

Project

LogStore

MongoDB

-

Database

Collection

Paimon

-

Database

Table

Hologres

-

Schema

Table

StarRocks

Database

-

Table

Doris

Database

-

Table

数据过滤

数据摄入Transform采用类 SQL 语法来定义行过滤规则。

Filter规则应当是一个可被求值为BOOLEAN类型的表达式,可以引用源表中的任意列及计算列。

如果某条数据变更记录匹配了一个Filter不为空的Transform规则,并且Filter表达式的求值结果为FALSE,那么该行数据将不会被发送给下游。

说明

如果您在Projection规则中使用计算列覆盖了上游已存在的某一列,那么在Filter表达式中引用的是计算列。

例如如下的这个Transform规则:

transform:
  - source-table: db.tbl
    projection: CAST(id AS VARCHAR) AS id
    filter: CHAR_LENGTH(id) > 5

是合法的,filter表达式中所引用的 id是已经被转换为 VARCHAR 类型的计算列。

Transform 后转换器(Converter after Transform)

converter-after-transform 用于在全部的transform规则后处理数据变更。可以使用英文,连接多个转换器使用,消息会按照转换器的顺序进行修改。目前支持配置值如下。

转换器名称

功能

支持版本

SOFT_DELETE

将删除变更转换为插入。

VVR 8.0.11及以上版本。

FIELD_NAME_LOWER_CASE

表的字段名全部转为小写。

VVR 11.1及以上版本。

逻辑删除

SOFT_DELETE转换器结合元数据列 __data_event_type__可以实现逻辑删除。例如如下的transform配置可以实现逻辑删除,删除数据不会在下游真正地删除数据。删除的数据会转化为插入,对应数据的op_type更新为-D来表示删除。

transform: 
  - source-table: db.tbl
    projection: \*, __data_event_type__ AS op_type
    converter-after-transform: SOFT_DELETE