本文向您介绍基于Flink CDC的数据摄入作业的Source和Sink模块及其支持使用的连接器。
支持的连接器
连接器 | 支持类型 | |
Source | Sink | |
说明 支持连接RDS MySQL版、PolarDB MySQL版及自建MySQL。 | √ | × |
× | √ | |
× | √ 说明 仅实时计算引擎 VVR 11.4.0 及更高版本支持。 | |
√ 说明 仅实时计算引擎 VVR 8.0.10 及更高版本支持。 | √ | |
× | √ | |
× | √ | |
× | √ | |
√ 说明 仅实时计算引擎 VVR 11.1 及更高版本支持。 | × | |
√ 说明 仅实时计算引擎 VVR 11.2 及更高版本支持。 | × | |
× | √ 说明 仅实时计算引擎 VVR 11.1 及更高版本支持。 | |
× | √ 说明 仅实时计算引擎 VVR 11.1 及更高版本支持。 | |
√ 说明 仅实时计算引擎 VVR 11.4 及更高版本支持。 | × | |
× | √ | |
× | √ 说明 仅实时计算引擎 VVR 11.6 及更高版本支持。 | |
连接器参数配置
您可以在基于Flink CDC的数据摄入作业中配置 Source 及 Sink 连接器的参数。支持的连接器及相应的参数请参考下文。
# Source 模块
source:
type: mysql # 或其他连接器标识符
name: MySQL Source
# 其他参数。使用 key: value 表示。
# Sink 模块
sink:
type: paimon # 或其他连接器标识符
name: Paimon Sink
# 其他参数。使用 key: value 表示。通用配置
参数 | 说明 | 是否必填 | 数据类型 | 默认值 | 备注 |
type | Source或Sink的连接器类型 | 是 | String | 无 | 无 |
name | 节点名称 | 否 | String | 无 | 无 |
using.built-in-catalog | 复用已有Catalog | 否 | String | 无 |
|
复用已有Catalog获取连接信息
自VVR 11.5版本起,您可以在Flink CDC数据摄入作业中直接引用“数据管理”页面中创建的内置Catalog,获取对应的连接属性,如URL、username、password等,减少手写连接属性工作量。
语法
source:
type: mysql
using.built-in-catalog: mysql_rds_catalog
sink:
type: paimon
using.built-in-catalog: paimon_dlf_catalog您可以在 source和 sink模块中使用using.built-in-catalog语法引用已创建的内置Catalog。
例如,在上面的例子中,mysql_rds_catalog的Catalog元数据中已经包含hostname、username、password等必填参数,因此无需在YAML作业中重复提供这些参数。
使用限制
下述连接器已经支持复用Catalog的连接信息:
MySQL(源端)
Kafka(源端)
Upsert Kafka(目标端)
StarRocks(目标端)
Hologres(目标端)
Paimon(目标端)
SLS(源端)
Iceberg(目标端)
与CDC YAML不兼容的Catalog参数不会生效,您可以参考各个连接器的参数列表查看详细信息。
Source配置
参数 | 说明 | 是否必填 | 数据类型 | 默认值 | 备注 |
source-expand | 指定数据从source发出时的分发策略 | 否 | 请见对应语法部分进行配置 | 无 |
|
source-expand
source-expand配置用于在Tramsform模块和Route模块处理前,将数据进行分发或者多路复制。
语法
source:
type: mysql
host: localhost
port: 3306
username: admin
password: pass
tables: mydb.orders
source-expand:
# 将 mydb.orders 复制为三张表,分别经过不同处理后写入下游
- input-table: mydb.orders
output-table: [ dwd.orders_full, dws.orders_summary, ads.orders_report ]
# 对三张扩展后的表分别应用不同的 transform
transform:
# 明细层:保留全部字段,增加计算列
- source-table: dwd.orders_full
projection: "*, amount * discount as final_price"
# 汇总层:只保留关键字段,过滤小额订单
- source-table: dws.orders_summary
projection: order_id, user_id, amount, order_status
filter: amount > 100
primary-keys: order_id
# 报表层:只保留统计需要的字段,过滤已取消的订单
- source-table: ads.orders_report
projection: order_id, user_id, amount, TO_UPPER(order_status) as status
filter: order_status <> 'CANCELLED'
primary-keys: order_id
# 将三张扩展后的表路由到不同的下游目标表
route:
- source-table: dwd.orders_full
sink-table: starrocks_dwd.orders_full_detail
- source-table: dws.orders_summary
sink-table: starrocks_dws.orders_summary
- source-table: ads.orders_report
sink-table: starrocks_ads.orders_report
sink:
type: starrocks
name: sink-starrocks
jdbc-url: jdbc:mysql://localhost:9030
load-url: localhost:8030
username: root
password: pass使用如上所示格式进行配置,例子中mydb.orders 表分发复制为 dwd.orders_full, dws.orders_summary和ads.orders_report三张表,每个表经过不同的处理逻辑后分别写入到下游的三张表中。
注意事项
VVR 11.6 及以上版本可用。
数据分发后默认不保留原有表。以语法部分中例子为例,mydb.orders表按如上配置后,下发的数据中不再包含mydb.orders表。如果想保留mydb.orders表,需要将mydb.orders表添加到output-table 部分。
source-expand: - input-table: mydb.orders output-table: [ mydb.orders, db1.orders, db2.orders ]input-table 和 output-table 中不支持使用正则表达式。
Sink配置
参数 | 说明 | 是否必填 | 数据类型 | 默认值 | 备注 |
include.schema.changes | 支持应用的Schema变更 | 否 | List<String> | 无 | 默认支持所有变更。 |
exclude.schema.changes | 不支持应用的Schema变更 | 否 | List<String> | 无 | 优先级高于 |
include.schema.changes和exclude.schema.changes的详细使用方式请参见表结构变更同步配置。