Flink CDC Source和Sink模块

更新时间:
复制为 MD 格式

本文向您介绍基于Flink CDC的数据摄入作业的SourceSink模块及其支持使用的连接器。

支持的连接器

连接器

支持类型

Source

Sink

MySQL

说明

支持连接RDS MySQL版、PolarDB MySQL版及自建MySQL。

×

流式数据湖仓Paimon

×

流存储Fluss(公测中)

×

说明

仅实时计算引擎 VVR 11.4.0 及更高版本支持。

消息队列Kafka

说明

仅实时计算引擎 VVR 8.0.10 及更高版本支持。

Upsert Kafka

×

StarRocks

×

实时数仓Hologres

×

日志服务SLS

说明

仅实时计算引擎 VVR 11.1 及更高版本支持。

×

MongoDB

说明

仅实时计算引擎 VVR 11.2 及更高版本支持。

×

大数据计算服务MaxCompute

×

说明

仅实时计算引擎 VVR 11.1 及更高版本支持。

SelectDB

×

说明

仅实时计算引擎 VVR 11.1 及更高版本支持。

Postgres CDC(公测中)

说明

仅实时计算引擎 VVR 11.4 及更高版本支持。

×

Print

×

Iceberg

×

说明

仅实时计算引擎 VVR 11.6 及更高版本支持。

连接器参数配置

您可以在基于Flink CDC的数据摄入作业中配置 Source 及 Sink 连接器的参数。支持的连接器及相应的参数请参考下文。

# Source 模块
source:
  type: mysql # 或其他连接器标识符
  name: MySQL Source
  # 其他参数。使用 key: value 表示。

# Sink 模块
sink:
  type: paimon # 或其他连接器标识符
  name: Paimon Sink
  # 其他参数。使用 key: value 表示。

通用配置

参数

说明

是否必填

数据类型

默认值

备注

type

SourceSink的连接器类型

String

name

节点名称

String

using.built-in-catalog

复用已有Catalog

String

复用已有Catalog获取连接信息

VVR 11.5版本起,您可以在Flink CDC数据摄入作业中直接引用“数据管理”页面中创建的内置Catalog,获取对应的连接属性,如URL、username、password等,减少手写连接属性工作量。

语法
source:
  type: mysql
  using.built-in-catalog: mysql_rds_catalog
  
sink:
  type: paimon
  using.built-in-catalog: paimon_dlf_catalog

您可以在 sourcesink模块中使用using.built-in-catalog语法引用已创建的内置Catalog。

例如,在上面的例子中,mysql_rds_catalogCatalog元数据中已经包含hostnameusernamepassword等必填参数,因此无需在YAML作业中重复提供这些参数。

使用限制

下述连接器已经支持复用Catalog的连接信息:

  • MySQL(源端)

  • Kafka(源端)

  • Upsert Kafka(目标端)

  • StarRocks(目标端)

  • Hologres(目标端)

  • Paimon(目标端)

  • SLS(源端)

  • Iceberg(目标端)

说明

CDC YAML不兼容的Catalog参数不会生效,您可以参考各个连接器的参数列表查看详细信息。

Source配置

参数

说明

是否必填

数据类型

默认值

备注

source-expand

指定数据从source发出时的分发策略

请见对应语法部分进行配置

  • VVR 11.6 及以上版本可用。

source-expand

source-expand配置用于在Tramsform模块和Route模块处理前,将数据进行分发或者多路复制。

语法
source:                                                                                                                                                                                                                                                                                                          
  type: mysql                                                                                                                                                                                                                                                                                                     
  host: localhost                                                                                                                                                                                                                                                                                                  
  port: 3306                                                                                                                                                                                                                                                                                                       
  username: admin
  password: pass
  tables: mydb.orders
  source-expand:
  # 将 mydb.orders 复制为三张表,分别经过不同处理后写入下游
    - input-table: mydb.orders
      output-table: [ dwd.orders_full, dws.orders_summary, ads.orders_report ]

# 对三张扩展后的表分别应用不同的 transform
transform:
  # 明细层:保留全部字段,增加计算列
  - source-table: dwd.orders_full
    projection: "*, amount * discount as final_price"
  # 汇总层:只保留关键字段,过滤小额订单
  - source-table: dws.orders_summary
    projection: order_id, user_id, amount, order_status
    filter: amount > 100
    primary-keys: order_id
  # 报表层:只保留统计需要的字段,过滤已取消的订单
  - source-table: ads.orders_report
    projection: order_id, user_id, amount, TO_UPPER(order_status) as status
    filter: order_status <> 'CANCELLED'
    primary-keys: order_id

# 将三张扩展后的表路由到不同的下游目标表
route:
  - source-table: dwd.orders_full
    sink-table: starrocks_dwd.orders_full_detail
  - source-table: dws.orders_summary
    sink-table: starrocks_dws.orders_summary
  - source-table: ads.orders_report
    sink-table: starrocks_ads.orders_report

sink:
  type: starrocks
  name: sink-starrocks
  jdbc-url: jdbc:mysql://localhost:9030
  load-url: localhost:8030
  username: root
  password: pass

使用如上所示格式进行配置,例子中mydb.orders 表分发复制为 dwd.orders_full, dws.orders_summaryads.orders_report三张表,每个表经过不同的处理逻辑后分别写入到下游的三张表中。

注意事项
  • VVR 11.6 及以上版本可用。

  • 数据分发后默认不保留原有表。以语法部分中例子为例,mydb.orders表按如上配置后,下发的数据中不再包含mydb.orders表。如果想保留mydb.orders表,需要将mydb.orders表添加到output-table 部分。

    source-expand:
      - input-table: mydb.orders
        output-table: [ mydb.orders, db1.orders, db2.orders ]
  • input-table 和 output-table 中不支持使用正则表达式。

Sink配置

参数

说明

是否必填

数据类型

默认值

备注

include.schema.changes

支持应用的Schema变更

List<String>

默认支持所有变更。

exclude.schema.changes

不支持应用的Schema变更

List<String>

优先级高于include.schema.changes

include.schema.changesexclude.schema.changes的详细使用方式请参见表结构变更同步配置