本文将向您介绍基于Flink CDC的数据摄入作业的基本结构及重要参数。
作业结构示例
一个典型的Flink CDC数据摄入作业由以下模块组成:
例如,一个从MySQL写入Paimon的作业结构如下所示:
# MySQL Source 模块
source:
type: mysql
name: MySQL Source
host: localhost
port: 3306
username: admin
password: <yourPassword>
tables: adb.*, bdb.user_table_[0-9]+, [app|web]_order_.*, mydb.\\.*
# Paimon Sink 模块
sink:
type: paimon
name: Paimon Sink
catalog.properties.metastore: filesystem
catalog.properties.warehouse: /path/warehouse
# Transform 模块
transform:
- source-table: mydb.app_order_.*
projection: id, order_id, TO_UPPER(product_name)
filter: id > 10 AND order_id > 100
primary-keys: id
partition-keys: product_name
table-options: comment=app order
description: project fields from source table
converter-after-transform: SOFT_DELETE
- source-table: mydb.web_order_.*
projection: CONCAT(id, order_id) as uniq_id, *
filter: uniq_id > 10
description: add new uniq_id for each row
# Route 模块
route:
- source-table: mydb.default.app_order_.*
sink-table: odsdb.default.app_order
description: sync all table shards to one
- source-table: mydb.default.web_order
sink-table: odsdb.default.ods_web_order
description: sync table to with given prefix ods_
# Pipeline 模块
pipeline:
name: source-database-sync-pipe
schema.change.behavior: evolve
Source(源端)模块
Source模块定义Flink CDC数据摄入作业的数据源端,目前支持的连接器包括消息队列Kafka、MySQL、MongoDB、日志服务SLS。
语法结构
source:
type: mysql
name: mysql source
xxx: ...
各连接器支持的具体配置请查看对应连接器的文档。
Sink(目标端)模块
Sink模块定义Flink CDC数据摄入作业的目标端,目前支持的连接器包括消息队列Kafka、Upsert Kafka、实时数仓Hologres、流式数据湖仓Paimon、StarRocks、大数据计算服务MaxCompute和Print。
语法结构
sink:
type: hologres
name: hologres sink
xxx: ...
各连接器支持的具体配置请查看对应连接器的文档。
Transform模块
您可以在Flink CDC数据摄入作业的Transform模块中定义若干规则,从而实现源表中数据的投影、计算和过滤等功能。
语法结构
transform:
- source-table: db.tbl1
projection: ...
filter: ...
- source-table: db.tbl2
projection: ...
filter: ...
具体表达式语法请参考Transform模块文档。
Route模块
您可以在Flink CDC数据摄入作业的Route模块中定义若干规则,从而实现上游分库分表的合并、广播等。
语法结构
route:
- source-table: db.tbl1
sink-table: sinkdb.tbl1
- source-table: db.tbl2
sink-table: sinkdb.tbl2
具体表达式语法请参考Route模块文档。
Pipeline模块
您可以在Flink CDC数据摄入作业的Pipeline模块中配置一些全局参数。
语法结构
pipeline:
name: CDC YAML job
schema.change.behavior: LENIENT
可配置的选项请参考Pipeline模块文档。
该文章对您有帮助吗?