实时计算Flink版基于Flink CDC YAML作业提供了将整库数据从源端同步到目标端的数据摄入功能。本文为您介绍如何通过阿里云实时计算Flink版将实时数据摄入至DLF Paimon Catalog,实现高效的数据湖存储与管理。
前提条件
已创建Flink全托管工作空间。如未创建,详情请参见开通实时计算Flink版。
使用限制
仅实时计算引擎VVR 11.1.0及以上版本支持对接DLF Paimon Catalog。
创建DLF Paimon Catalog
详情请参见DLF 快速入门。
Flink CDC对接Paimon Catalog配置参数
创建数据摄入作业的操作流程,请参见Flink CDC数据摄入作业开发(公测中)。
Flink中数据摄入作业的Sink使用以下配置:
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
#(可选)提交用户名,建议为不同作业设置不同的提交用户以避免冲突
commit.user: your_job_name
#(可选)开启删除向量,提升读取性能
table.properties.deletion-vectors.enabled: true
配置项说明如下:
配置项 | 描述 | 是否必填 | 示例 |
| Metastore类型,固定为rest。 | 是 | rest |
| Token提供方,固定为dlf。 | 是 | dlf |
| 访问DLF Rest Catalog Server的URI,格式为 | 是 | http://cn-hangzhou-vpc.dlf.aliyuncs.com |
| DLF Catalog名称。 | 是 | dlf_test |
配置示例
下面为您介绍几种典型的通过Flink CDC YAML作业将数据同步到数据湖DLF的配置方案:
MySQL整库同步数据湖DLF
MySQL整库同步数据到DLF的CDC YAML作业如下所示:
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: mysql_test.\.*
server-id: 8601-8604
#(可选)同步增量阶段新创建的表的数据
scan.binlog.newly-added-table.enabled: true
#(可选)同步表注释和字段注释
include-comments.enabled: true
#(可选)优先分发无界的分片以避免可能出现的TaskManager OutOfMemory问题
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
#(可选)开启解析过滤,加速读取
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
#(可选)提交用户名,建议为不同作业设置不同的提交用户以避免冲突
commit.user: your_job_name
#(可选)开启删除向量,提升读取性能
table.properties.deletion-vectors.enabled: true
【MySQL Source配置】,建议设置下列配置项,详情请参见MySQL。
参数:scan.binlog.newly-added-table.enabled
作用:同步增量阶段新创建的表的数据。
参数:include-comments.enabled
作用:同步表注释和字段注释。
参数:scan.incremental.snapshot.unbounded-chunk-first.enabled
作用:避免可能出现的TaskManager OutOfMemory问题。
参数:scan.only.deserialize.captured.tables.changelog.enabled: true
作用:仅对作业匹配的表的数据进行解析,加速读取。
【Paimon Sink配置】
Catalog连接参数
参数前缀:catalog.properties
作用:catalog连接信息
建表参数
参数前缀:table.properties
作用:建表信息
配置建议:在建表参数中添加deletion-vectors.enabled的配置,在不损失太大写入更新性能的同时,获得极大的读取性能提升,达到近实时更新与极速查询的效果
补充说明:在DLF中已经提供了自动进行文件合并的功能,不建议在建表参数中添加文件合并和bucket相关参数,例如bucket、num-sorted-run.compaction-trigger等。
提交用户
参数名称:commit.user
作用:写入Paimon的文件提交用户
配置建议:为不同的作业设置不同的提交用户,可以设置为作业名
补充说明:默认的提交用户为admin,在多个作业写入同一张表时可能出现提交冲突和不一致的问题。
写入数据湖DLF分区表
数据摄入作业的源表通常不包含分区字段信息,如果希望写入的下游表为分区表,您需要通过Flink CDC数据摄入作业开发参考中的partition-keys设置分区字段,配置示例如下:
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: mysql_test.\.*
server-id: 8601-8604
#(可选)同步增量阶段新创建的表的数据
scan.binlog.newly-added-table.enabled: true
#(可选)同步表注释和字段注释
include-comments.enabled: true
#(可选)优先分发无界的分片以避免可能出现的TaskManager OutOfMemory问题
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
#(可选)开启解析过滤,加速读取
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
#(可选)提交用户名,建议为不同作业设置不同的提交用户以避免冲突
commit.user: your_job_name
#(可选)开启删除向量,提升读取性能
table.properties.deletion-vectors.enabled: true
transform:
- source-table: mysql_test.tbl1
#(可选)设置分区字段
partition-keys: id,pt
- source-table: mysql_test.tbl2
partition-keys: id,pt
写入数据湖DLF Append Only表
数据摄入作业的源表包含完整的变更类型,如果希望写入的下游表为将删除操作转化为插入操作实现逻辑删除的功能,您可以通过Flink CDC数据摄入作业开发参考实现该需求,配置示例如下:
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: mysql_test.\.*
server-id: 8601-8604
#(可选)同步增量阶段新创建的表的数据
scan.binlog.newly-added-table.enabled: true
#(可选)同步表注释和字段注释
include-comments.enabled: true
#(可选)优先分发无界的分片以避免可能出现的TaskManager OutOfMemory问题
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
#(可选)开启解析过滤,加速读取
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
#(可选)提交用户名,建议为不同作业设置不同的提交用户以避免冲突
commit.user: your_job_name
#(可选)开启删除向量,提升读取性能
table.properties.deletion-vectors.enabled: true
transform:
- source-table: mysql_test.tbl1
#(可选)设置分区字段
partition-keys: id,pt
#(可选)实现软删除
projection: \*, __data_event_type__ AS op_type
converter-after-transform: SOFT_DELETE
- source-table: mysql_test.tbl2
#(可选)设置分区字段
partition-keys: id,pt
#(可选)实现软删除
projection: \*, __data_event_type__ AS op_type
converter-after-transform: SOFT_DELETE
通过在projection中添加__data_event_type,将变更类型作为新增字段写入到下游表中。同时设置converter-after-transform为SOFT_DELETE,可以将删除操作转化为插入操作,使得下游能够完整记录全部变更操作。详见Flink CDC数据摄入作业开发参考。
Kafka实时同步到数据湖DLF
假设Kafka的inventory Topic中存储了两张表(customers和products)的数据,且数据格式为Debezium JSON。以下示例作业可将这两张表的数据分别同步到DLF对应的目标表:
source:
type: kafka
name: Kafka Source
properties.bootstrap.servers: ${kafka.bootstrap.servers}
topic: inventory
scan.startup.mode: earliest-offset
value.format: debezium-json
debezium-json.distributed-tables: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
#(可选)提交用户名,建议为不同作业设置不同的提交用户以避免冲突
commit.user: your_job_name
#(可选)开启删除向量,提升读取性能
table.properties.deletion-vectors.enabled: true
# debezium-json不包含主键信息,需要另外为表添加主键
transform:
- source-table: \.*.\.*
projection: \*
primary-keys: id
Kafka数据源读取的格式支持canal-json、debezium-json(默认)和json格式。
当数据格式为debezium-json时,由于debezium-json消息不记录主键信息,需要通过transform规则手动为表添加主键:
transform: - source-table: \.*.\.* projection: \* primary-keys: id
当单表的数据分布在多个分区中,或数据位于不同分区中的表需要进行分库分表合并时,需要将配置项debezium-json.distributed-tables或canal-json.distributed-tables设为true。
kafka数据源支持多种Schema推导策略,可以通过配置项schema.inference.strategy设置,Schema推导和变更同步策略详情请参见消息队列Kafka。
如果您希望添加进行更精细的作业配置,可以查询Flink CDC数据摄入作业开发参考。