本文为您介绍使用数据摄入CDC YAML作业将实时数据写入阿里云数据湖构建的最佳实践。
阿里云数据湖构建(Data Lake Formation,简称DLF)是一款全托管的统一元数据和数据存储及管理平台,为客户提供元数据管理、权限管理和存储优化等功能。详情请参见什么是数据湖构建。
数据摄入作业支持使用DLF的Paimon Catalog作为目标端,您可以使用数据摄入作业完成大规模整库数据入湖需求。
MySQL整库同步数据湖DLF
MySQL整库同步数据到DLF的CDC YAML作业如下所示:
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: mysql_test.\.*
server-id: 8601-8604
#(可选)同步增量阶段新创建的表的数据
scan.binlog.newly-added-table.enabled: true
#(可选)同步表注释和字段注释
include-comments.enabled: true
#(可选)优先分发无界的分片以避免可能出现的TaskManager OutOfMemory问题
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
#(可选)开启解析过滤,加速读取
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
# Metastore类型,固定为rest
catalog.properties.metastore: rest
# Token提供方,固定为dlf
catalog.properties.token.provider: dlf
# 访问DLF Rest Catalog Server的URI,格式为http://[region-id]-vpc.dlf.aliyuncs.com,如http://cn-hangzhou-vpc.dlf.aliyuncs.com
catalog.properties.uri: dlf_uri
# DLF Catalog名称。
catalog.properties.warehouse: your_warehouse
#(可选)提交用户名,建议为不同作业设置不同的提交用户以避免冲突。
commit.user: your_job_name
#(可选)开启删除向量,提升读取性能
table.properties.deletion-vectors.enabled: true
写入数据湖DLF分区表
数据摄入作业的源表通常不包含分区字段信息,如果希望写入的下游表为分区表,您需要通过Flink CDC数据摄入作业开发参考中的partition-keysFlink CDC数据摄入作业开发参考设置分区字段,配置示例如下:
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: mysql_test.\.*
server-id: 8601-8604
#(可选)同步增量阶段新创建的表的数据
scan.binlog.newly-added-table.enabled: true
#(可选)同步表注释和字段注释
include-comments.enabled: true
#(可选)优先分发无界的分片以避免可能出现的TaskManager OutOfMemory问题
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
#(可选)开启解析过滤,加速读取
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
#(可选)提交用户名,建议为不同作业设置不同的提交用户以避免冲突。
commit.user: your_job_name
#(可选)开启删除向量,提升读取性能
table.properties.deletion-vectors.enabled: true
transform:
- source-table: mysql_test.tbl1
#(可选)设置分区字段
partition-keys: id,pt
- source-table: mysql_test.tbl2
partition-keys: id,pt
写入数据湖DLF Append Only表
数据摄入作业的源表包含完整的变更类型,如果希望写入的下游表为将删除操作转化为插入操作实现逻辑删除的功能,您可以通过Flink CDC数据摄入作业开发参考实现该需求,配置示例如下:
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: mysql_test.\.*
server-id: 8601-8604
#(可选)同步增量阶段新创建的表的数据
scan.binlog.newly-added-table.enabled: true
#(可选)同步表注释和字段注释
include-comments.enabled: true
#(可选)优先分发无界的分片以避免可能出现的TaskManager OutOfMemory问题
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
#(可选)开启解析过滤,加速读取
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
#(可选)提交用户名,建议为不同作业设置不同的提交用户以避免冲突。
commit.user: your_job_name
#(可选)开启删除向量,提升读取性能
table.properties.deletion-vectors.enabled: true
transform:
- source-table: mysql_test.tbl1
#(可选)设置分区字段
partition-keys: id,pt
#(可选)实现软删除
projection: \*, __data_event_type__ AS op_type
converter-after-transform: SOFT_DELETE
- source-table: mysql_test.tbl2
#(可选)设置分区字段
partition-keys: id,pt
#(可选)实现软删除
projection: \*, __data_event_type__ AS op_type
converter-after-transform: SOFT_DELETE
通过在projection中添加__data_event_type,将变更类型作为新增字段写入到下游表中。同时设置converter-after-transform为SOFT_DELETE,可以将删除操作转化为插入操作,使得下游能够完整记录全部变更操作。详见Flink CDC数据摄入作业开发参考。
Kafka实时同步到数据湖DLF
实现实时分发能够将MySQL数据实时同步至Kafka,在此基础上,您可以配置CDC YAML作业同步Kafka数据到DLF存储。
假设Kafka的inventory Topic中存储了两张表(customers和products)的数据,且数据格式为Debezium JSON。以下示例作业可将这两张表的数据分别同步到DLF对应的目标表:
source:
type: kafka
name: Kafka Source
properties.bootstrap.servers: ${kafka.bootstrap.servers}
topic: inventory
scan.startup.mode: earliest-offset
value.format: debezium-json
debezium-json.distributed-tables: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
#(可选)提交用户名,建议为不同作业设置不同的提交用户以避免冲突。
commit.user: your_job_name
#(可选)开启删除向量,提升读取性能
table.properties.deletion-vectors.enabled: true
# debezium-json不包含主键信息,需要另外为表添加主键
transform:
- source-table: \.*.\.*
projection: \*
primary-keys: id
Kafka数据源读取的格式支持canal-json、debezium-json(默认)和json格式。
当数据格式为debezium-json时,由于debezium-json消息不记录主键信息,需要通过transform规则手动为表添加主键:
transform: - source-table: \.*.\.* projection: \* primary-keys: id
当单表的数据分布在多个分区中,或数据位于不同分区中的表需要进行分库分表合并时,需要将配置项debezium-json.distributed-tables或canal-json.distributed-tables设为true。
kafka数据源支持多种Schema推导策略,可以通过配置项schema.inference.strategy设置,Schema推导和变更同步策略详情请参见消息队列Kafka。