数据摄入开发参考

重要

本文中含有需要您注意的重要提示信息,忽略该信息可能对您的业务造成影响,请务必仔细阅读。

本文为您介绍数据摄入有关的source、sink、transform、route和pipeline模块的开发参考。

支持的连接器

连接器

支持类型

Source

Sink

消息队列Kafka

说明

仅实时计算引擎VVR 8.0.10及以上版本支持。

实时数仓Hologres

×

MySQL

说明

支持连接RDS MySQL版、PolarDB MySQL版及自建MySQL。

×

Upsert Kafka

×

Print

×

StarRocks

×

流式数据湖仓Paimon

×

source模块

source模块定义数据摄入的数据源端,目前支持的系统包括消息队列KafkaMySQL

语法结构

source:
  type: mysql
  name: mysql source
  xxx: ...

具体配置请查看对应连接器的数据摄入部分。

sink模块

sink模块定义数据摄入的目标端,目前支持的系统包括消息队列KafkaUpsert Kafka实时数仓Hologres流式数据湖仓PaimonStarRocksPrint

语法结构

sink:
  type: hologres
  name: hologres sink
  xxx: ...

具体配置请查看对应连接器的数据摄入部分。

transform模块

您可以在YAML作业的transform语句块中填写若干规则信息,从而实现源表中数据的投影、计算和过滤等功能。

语法结构

transform:
  - source-table: db.tbl1
    projection: ...
    filter: ...
  - source-table: db.tbl2
    projection: ...
    filter: ...

配置项

参数

含义

是否必填

备注

source-table

指定生效上游表。

支持使用正则表达式。

projection

指定用于保留部分上游列的投影规则。

使用的句法与SQL SELECT语句类似。

不填则不追加或删除任何列。

说明

VVR 8.0.9版本,如需将上游表结构变更同步到下游,则仍需手动定义 projection: \*规则。详情请参见注意事项

filter

行过滤规则。

使用的句法与SQL WHERE语句类似。

不填则不过滤任何行。

primary-keys

设定transform后Schema的主键列表。

不填则保留原Schema的主键定义。主键列表使用英文逗号(,)分隔。

partition-keys

设定transform的分区键列表。

不填则保留原Schema的分区键定义,分区键列表使用英文逗号(,)分隔。

table-options

需要传递给Sink的额外配置信息。

Options选项,例如Paimon Sink的分桶数、注释等信息。

不同配置项通过,分割,配置项的键与值通过=分割。

配置示例:

key1=value1,key2=value2

description

该transform块的描述信息。

无。

计算列

您可以在projection规则中使用<Expression> AS <ColName>句法来定义计算列,表达式将对上游的每条数据分别求值后填入相应列。

警告

计算列的表达式不可以引用其他计算列的值,即使被引用的列出现在该计算列之前。例如a, b AS c, c AS d不是合法的projection表达式。

例如,在接收到来自上游db.tbl表的[+I, id = 1]数据记录时,将其转化为[+I, id = 1, inc_id = 2]数据行并发送给下游。

transform:
  - source-table: db.tbl
    projection: id, id + 1 AS inc_id

通配符

如果您希望将源表中的所有列以及后续追加的新列按原样发送给下游,则可以在projection规则中使用星号(*)通配符。

说明

如果一个projection规则中没有使用通配符(*),则其产生的Schema就是固定的,并且始终与projection规则中写出的版本保持一致。

例如,*, 'extras' AS extras表示会在上游Schema的列尾追加额外的列,并持续将上游的表结构变更发送给下游。

transform:
  - source-table: db.tbl
    projection: \*, 'extras' AS extras

元数据列

在编写projection规则时,可以将以下预先定义的元数据列作为普通数据列使用:

重要

请勿定义与元数据列同名的普通数据列。

元数据列名称

数据类型

说明

__namespace_name__

String

这条数据变更记录对应源表的Namespace名称。

__schema_name__

String

这条数据变更记录对应源表的Schema名称。

__table_name__

String

这条数据变更记录对应源表的Table名称。

__data_event_type__

String

这条数据变更记录对应的操作类型(+I-U+U-D)。

重要

由于CDC Event总是将一次更新对应的Update Before和Update After打包为一条事件,因此__data_event_type__的内容在同一条Update事件里分别为-U+U。请勿将其作为主键使用。

例如,将上游表的全限定名称写入计算列中,并发送给下游。

transform:
  - source-table: \.*.\.*
    projection: \*, __namespace_name__ || __schema_name__ || __table_name__ AS identifier

各个数据库连接器对Namespace、Schema和Table名称的映射关系如下表所示。

数据库类型

Namespace名称

Schema名称

Table名称

JDBC

Catalog

Schema

Table

Debezium

Catalog

Schema

Table

MySQL

Database

-

Table

Postgres

Database

Schema

Table

Oracle

-

Schema

Table

Microsoft SQL Server

Database

Schema

Table

StarRocks

Database

-

Table

Doris

Database

-

Table

注意事项

  • 修改transform模块的语句后,不能从已有的状态恢复,需要进行无状态启动。

  • 通常情况下,projection和filter语句无需使用引号包裹。

    transform:
      - projection: a, b, c
        # 等价于
      - projection: "a, b, c"

    然而,如果Projection表达式的第一个字符为*'等特殊字符,则整行表达式可能无法被作为合法的YAML字符串字面量解析。此时需要手动使用引号包裹整个表达式,或是使用\转义:

    transform:
      - projection: *, 42      # 不是合法的YAML
      - projection: '*, 42'    # OK
      - projection: \*, 42     # OK  
  • 使用VVR 8.0.9版本编写Transform规则时,不提供Projection表达式无法将表结构变更同步到下游。可以使用以下方法编写等价的规则:

    transform:
      - source-table: db.\.*
        projection: \*        # 将上游全部列及后续结构变更同步到下游

route模块

您可以在YAML作业的route模块中定义包含若干条route规则的语句块,描述上游表到下游表的复杂拓扑结构。

语法结构

route:
  - source-table: db.tbl1
    sink-table: sinkdb.tbl1
  - source-table: db.tbl2
    sink-table: sinkdb.tbl2

配置项

参数

含义

是否必填

备注

source-table

指定生效上游表。

支持使用正则表达式。

sink-table

指定数据路由的目标位置。

无。

replace-symbol

在使用模式匹配功能时,用于指代上游表名的字符串。

例如,当replace-symbol设置为<>时,可以将sink-table配置为sinkdb.<>。这样,来自上游的表table1会被写入到sinkdb.table1表中。

description

该route块的描述信息。

无。

使用方法

一对一路由

将上游表mydb.web_order中的数据路由到下游表mydb.ods_web_order

route:
  - source-table: mydb.web_order
    sink-table: mydb.ods_web_order
    description: sync table to one destination table with given prefix ods_

合并分库分表

将上游mydb数据库中的所有表合并到下游mydb.merged表中。

route:
  - source-table: mydb.\.*
    sink-table: mydb.merged
    description: sync sharding tables to one destination table

多路由规则

可以在一个route块中使用YAML列表符号(-)定义多条规则,它们会同时生效。

route:
  - source-table: mydb.orders
    sink-table: ods_db.ods_orders
    description: sync orders table to orders
  - source-table: mydb.shipments
    sink-table: ods_db.ods_shipments
    description: sync shipments table to ods_shipments
  - source-table: mydb.products
    sink-table: ods_db.ods_products
    description: sync products table to ods_products

模式匹配

source_db数据库中的全部表一一对应地同步到sink_db中,并保持表名不变。

route:
  - source-table: source_db.\.*
    sink-table: sink_db.<>
    replace-symbol: <>
    description: route all tables in source_db to sink_db

使用replace-symbol参数定义的<>特殊字符串会被表名替代,从而实现源表到汇表的一一对应。

数据分发

将同一张表的数据分发给多个下游表,只需定义多条路由规则即可。例如将mydb.orders的数据会被同时分发到sink_dbbackup_sink_db两个数据库中。

route:
  - source-table: mydb.orders
    sink-table: sink_db.orders
  - source-table: mydb.orders
    sink-table: backup_sink_db.orders

注意事项

修改route模块的语句后,不能从已有的状态恢复,需要进行无状态启动。

pipeline模块

您可以在pipeline模块配置数据摄入YAML作业的整体配置。

语法结构

pipeline:
  name: CDC YAML job
  schema.change.behavior: LENIENT

配置项

参数

说明

是否必填

数据类型

默认值

备注

name

数据摄入YAML名称。

STRING

Flink CDC Pipeline Job

无。

schema.change.behavior

Schema变更行为配置。

STRING

LENIENT

可配置的值如下,详见Schema变更行为配置

  • LENIENT(默认值)

  • EXCEPTION

  • EVOLVE

  • TRY_EVOLVE

  • IGNORE

Schema变更行为配置

数据摄入YAML作业支持将数据源的Schema变更同步到下游目标端,例如创建表、添加列、重命名列、更改列类型、删除列和删除表等。下游目标端可能不支持全部的Schema变更,您可以通过schema.change.behavior配置来修改Schema变更发生时目标端的处理方式。

Schema变更模式

模式

说明

LENIENT(默认)

数据摄入YAML作业会对Schema变更进行转换成目标端可处理的变更并发送,遵循以下规则:

  • 不发送Drop table和Truncate table变更。

  • 列重命名时,改为发送更改列类型和新增列两个事件。原有的列不删除,更改列类型为nullable,同时新增一个列名为新名称,数据类型改为nullable的列。

  • 删除列时,改为发送更改列类型事件,将对应字段类型变为nullable。

  • 新增列时仍发送新增列事件,但字段类型会变为nullable。

EXCEPTION

不允许任何Schema变更行为。

当目标端不支持处理Schema变更时,可以使用此模式。收到Schema变更事件时,数据摄入YAML作业会抛出异常。

EVOLVE

数据摄入YAML作业会将所有Schema更改应用于目标端。

如果Schema变更在目标端应用失败,数据摄入YAML作业会抛出异常并触发故障重启。

TRY_EVOLVE

数据摄入YAML作业会尝试将Schema变更应用到目标端,如果目标端不支持处理发送的Schema变更,数据摄入YAML作业不会失败重启,尝试通过转换后续数据方式进行处理。

警告

TRY_EVOLVE模式下,如果发生Schema变更应用失败,可能导致上游后续到来的数据出现部分列丢失、被截断等情况。

IGNORE

所有Schema变更都不会应用于目标端。

当您的目标端尚未准备好进行任何Schema变更,想要继续从未更改的列中接收数据时,可以使用此模式。

控制目标端接收的Schema变更

在某些场景下,不需要所有Schema变更同步到目标端。例如,允许新增列但禁止删除列来避免删除已有的数据。

您可以通过在sink模块中设置include.schema.changesexclude.schema.changes选项来控制。

参数

说明

是否必填

数据类型

默认值

备注

include.schema.changes

支持应用的Schema变更。

List<String>

默认支持所有变更。

exclude.schema.changes

不支持应用的Schema变更。

List<String>

优先级高于include.schema.changes

以下是可配置架构变更事件类型的完整列表:

事件类型

说明

add.column

新增列。

alter.column.type

变更列类型。

create.table

创建表。

drop.column

删除列。

drop.table

删除表。

rename.column

修改列名。

truncate.table

清空数据。

说明

Schema变更支持部分匹配。例如,传入drop相当于同时传入drop.columndrop.table

代码示例

  • 示例1:Schema变更行为配置为EVOLVE

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: ${mysql.source.table}
  server-id: 7601-7604

sink:
  type: values
  name: Values Sink
  print.enabled: true
  sink.print.logger: true
  
pipeline:
  name: mysql to print job
  schema.change.pipeline: EVOLVE
  • 示例2:支持创建表和列相关事件,不支持删除列

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: ${mysql.source.table}
  server-id: 7601-7604

sink:
  type: values
  name: Values Sink
  print.enabled: true
  sink.print.logger: true
  include.schema.changes: [create.table, column] # 匹配了 CreateTable、AddColumn、AlterColumnType、RenameColumn、和 DropColumn 事件
  exclude.schema.changes: [drop.column] # 排除了 DropColumn 事件
  
pipeline:
  name: mysql to print job
  schema.change.pipeline: EVOLVE

函数

内置函数

CDC YAML提供了丰富的内置函数,可以直接在transform模块中的projection和filter表达式中使用。

比较函数

说明

除非特别说明,否则以下内置函数在输入参数包含NULL时均返回NULL。

函数

说明

value1 = value2

如果value1等于value2,则返回TRUE;否则返回FALSE。

value1 <> value2

如果value1不等于value2,则返回TRUE;否则返回FALSE。

value1 > value2

如果value1大于value2,则返回TRUE;否则返回FALSE。

value1 >= value2

如果value1大于或等于value2,则返回TRUE;否则返回FALSE。

value1 < value2

如果value1小于value2,则返回TRUE;否则返回FALSE。

value1 <= value2

如果value1小于或等于value2,则返回TRUE;否则返回FALSE。

value IS NULL

如果value是NULL,则返回TRUE;否则返回FALSE。

value IS NOT NULL

如果value不是NULL,则返回TRUE;否则返回FALSE。

value1 BETWEEN value2 AND value3

如果value1的值介于value2和value3之间,则返回TRUE;否则返回FALSE。

value1 NOT BETWEEN value2 AND value3

如果value1的值并非介于value2和value3之间,则返回TRUE;否则返回FALSE。

string1 LIKE string2

如果string1的值与string2定义的模式匹配,则返回TRUE;否则返回FALSE。

string1 NOT LIKE string2

如果string1的值与string2定义的模式不匹配,则返回TRUE;否则返回FALSE。

value1 IN (value2 [, value3]* )

如果value1的值存在于[value2, value3, ...]列表中,则返回TRUE;否则返回FALSE。

value1 NOT IN (value2 [, value3]* )

如果value1的值不存在于[value2, value3, ...]列表中,则返回TRUE;否则返回FALSE。

逻辑函数

函数

说明

boolean1 OR boolean2

如果boolean1和boolean2至少有一个为TRUE,则返回TRUE。

boolean1 AND boolean2

如果boolean1和boolean2均为TRUE,则返回TRUE。

NOT boolean

如果boolean为TRUE,则返回FALSE;如果boolean是FALSE,则返回TRUE。

boolean IS FALSE

如果boolean为TRUE,则返回FALSE;如果boolean是FALSE,则返回TRUE。

boolean IS NOT FALSE

如果boolean为TRUE,则返回TRUE;如果boolean是FALSE,则返回FALSE。

boolean IS TRUE

如果boolean为TRUE,则返回TRUE;如果boolean是FALSE,则返回FALSE。

boolean IS NOT TRUE

如果boolean为TRUE,则返回FALSE;如果boolean是FALSE,则返回TRUE。

算数函数

函数

说明

numeric1 + numeric2

返回numeric1加上numeric2的值。

numeric1 - numeric2

返回numeric1减去numeric2的值。

numeric1 * numeric2

返回numeric1乘以numeric2的值。

numeric1 / numeric2

返回numeric1除以numeric2的值。

numeric1 % numeric2

返回numeric1对numeric2取模的值。

ABS(numeric)

返回numeric的绝对值。

CEIL(numeric)

返回numeric向上取整的值。

FLOOR(numeric)

返回numeric向下取整的值。

ROUND(numeric, int)

返回numeric四舍五入到小数点后n位的值。

UUID()

生成一个全局唯一ID(UUID)字符串(例如“3d3c68f7-f608-473f-b60c-b0c44ad4cc4e”)。

使用RFC 4122 type 4方法伪随机生成。

字符串函数

函数

说明

string1 || string2

返回string1和string2拼接而成的字符串。

重要

请勿将其与逻辑或运算符混淆。

CHAR_LENGTH(string)

返回string字符串中的字符数。

UPPER(string)

返回string的大写形式字符串。

LOWER(string)

返回string的小写形式字符串。

TRIM(string1)

删除string两侧的空白字符。

REGEXP_REPLACE(string1, string2, string3)

将string1中所有满足string2模式的子串替换为string3。

例如,REGEXP_REPLACE('foobar', 'oo|ar', '__')求值的结果为 f__b__

SUBSTRING(string FROM integer1 [ FOR integer2 ])

返回string从第integer1到第integer2个字符的子串。

说明

在不提供FOR integer2语句时,默认提取到字符串尾部。

CONCAT(string1, string2,…)

返回将string1、string2、…拼接在一起形成的新字符串。

例如,CONCAT('AA', 'BB', 'CC')求值的结果为AABBCC

时间函数

函数

说明

LOCALTIME

返回当前时区下的本地时间,返回类型为TIME(0)

LOCALTIMESTAMP

返回当前时区下的本地时间戳,返回类型为TIMESTAMP(3)

CURRENT_TIME

返回当前时区下的本地时间,与LOCAL_TIME相同。

CURRENT_DATE

返回当前时区下的本地日期。

CURRENT_TIMESTAMP

返回当前时区下的本地时间戳。返回类型为TIMESTAMP_LTZ(3)

NOW()

返回当前时区下的本地时间戳,与CURRENT_TIMESTAMP相同。

DATE_FORMAT(timestamp, string)

将传入的时间戳按指定的格式化字符串string进行格式化。

说明

格式化字符串与Java中的SimpleDateFormat格式兼容。

TIMESTAMP_DIFF(timepointunit, timepoint1, timepoint2)

计算timepoint1和timepoint2之间差距多少timepointunit单位。

timepointunit可被指定为SECOND、MINUTE、HOUR、DAY、MONTH或YEAR。

TO_DATE(string1[, string2])

将传入的日期字符串string1按string2指定的格式转化为DATE类型。

说明

在不指定格式化字符串string2时,默认采用yyyy-MM-dd格式。

TO_TIMESTAMP(string1[, string2])

将传入的时间戳字符串string1按string2指定的格式转化为不带时区信息的TIMESTAMP类型。

说明

在不指定格式化字符串string2时,默认采用yyyy-MM-ddHH:mm:ss格式。

说明

在进行projection和filter表达式求值时,可以保证其中每个子表达式所得到的时间点都一致。例如,NOW() AS t1, NOW() AS t2, NOW() AS t3得到的t1t2t3一定对应同一个时间戳,无论其求值时间和顺序如何。

条件函数

函数

说明

CASE value WHEN value1_1 [, value1_2]* THEN RESULT1 (WHEN value2_1 [, value2_2 ]* THEN result_2)* (ELSE result_z) END

依次检查value值是否等于WHEN子句给出的值,并返回第一个相等子句的RESULT值。

如果没有任何子句满足条件,则返回ELSE子句指定的值。如果没有指定ELSE子句,则返回NULL。

CASE WHEN condition1 THEN result1 (WHEN condition2 THEN result2)* (ELSE result_z) END

依次检查value值是否满足每个WHEN子句给出的条件,并返回第一个满足条件子句的RESULT值。

如果没有任何子句满足条件,则返回ELSE子句指定的值。如果没有指定ELSE子句,则返回NULL。

COALESCE(value1 [, value2]*)

返回[value1、value2、……]列表中第一个不为NULL的元素。如果列表中所有元素均为NULL,则返回NULL。

IF(condition, true_value, false_value)

如果condition子句对应的条件为真,则返回true_value;否则返回false_value。

UDF函数

CDC YAML也支持使用Java语言编写自定义UDF函数,并像内置函数一样调用。

UDF函数定义

满足以下要求的Java类可以作为CDC YAML UDF函数使用:

  • 实现了org.apache.flink.cdc.common.udf.UserDefinedFunction接口。

  • 拥有一个公共无参构造器。

  • 至少含有一个名为eval的公共方法。

UDF函数类可以通过@Override以下接口来实现更精细的语义控制:

  • 重写getReturnType方法来手动指定方法的返回类型。

  • 重写openclose方法来插入生命周期函数。

例如,将传入的整型参数增加1后返回的UDF函数定义如下。

public class AddOneFunctionClass implements UserDefinedFunction {
    
    public Object eval(Integer num) {
        return num + 1;
    }
    
    @Override
    public DataType getReturnType() {
        // 由于eval函数的返回类型不明确,需要
        // 使用getReturnType写明确指定类型
        return DataTypes.INT();
    }
    
    @Override
    public void open() throws Exception {
        // ...
    }

    @Override
    public void close() throws Exception {
        // ...
    }
}

UDF函数注册

通过在CDC YAMLpipeline块中加入如下所示的定义即可注册UDF函数:

pipeline:
  user-defined-function:
    - name: inc
      classpath: org.apache.flink.cdc.udf.examples.java.AddOneFunctionClass
    - name: format
      classpath: org.apache.flink.cdc.udf.examples.java.FormatFunctionClass
说明
  • 此处类路径对应的JAR包需要作为外部依赖上传。

  • UDF函数名称可以在此处任意调整,无需与UDF类名一致。

UDF函数使用

在完成UDF函数注册后,即可在projection和filter语句块中,像内置函数一样直接调用UDF函数。代码示例如下。

transform:
  - source-table: db.\.*
    projection: "*, inc(inc(inc(id))) as inc_id, format(id, 'id -> %d') as formatted_id"
    filter: inc(id) < 100

Flink ScalarFucntion兼容性

继承自ScalarFunction的Flink UDF函数也可以直接作为CDC YAML UDF函数注册并使用,但存在以下限制:

  • 不支持带参数的ScalarFunction

  • Flink风格的TypeInformation类型标注会被忽略。

  • openclose生命周期钩子函数不会被调用。

相关文档

数据摄入YAML作业开发的操作步骤,详情请参见数据摄入YAML作业开发(公测中)