Flink CDC对接DLF Paimon Catalog

实时计算Flink版基于Flink CDC YAML作业提供了将整库数据从源端同步到目标端的数据摄入功能。本文为您介绍如何通过阿里云实时计算Flink版将实时数据摄入至DLF Paimon Catalog,实现高效的数据湖存储与管理。

前提条件

已创建Flink全托管工作空间。如未创建,详情请参见开通实时计算Flink

使用限制

仅实时计算引擎VVR 11.1.0及以上版本支持对接DLF Paimon Catalog。

创建DLF Paimon Catalog

详情请参见DLF 快速入门

Flink CDC对接Paimon Catalog配置参数

创建数据摄入作业的操作流程,请参见Flink CDC数据摄入作业开发(公测中)

Flink中数据摄入作业的Sink使用以下配置:

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  #(可选)提交用户名,建议为不同作业设置不同的提交用户以避免冲突
  commit.user: your_job_name
  #(可选)开启删除向量,提升读取性能
  table.properties.deletion-vectors.enabled: true

配置项说明如下:

配置项

描述

是否必填

示例

catalog.properties.metastore

Metastore类型,固定为rest。

rest

catalog.properties.token.provider

Token提供方,固定为dlf。

dlf

catalog.properties.uri

访问DLF Rest Catalog ServerURI,格式为http://[region-id]-vpc.dlf.aliyuncs.com。详见服务接入点中的Region ID。

http://cn-hangzhou-vpc.dlf.aliyuncs.com

catalog.properties.warehouse

DLF Catalog名称。

dlf_test

配置示例

下面为您介绍几种典型的通过Flink CDC YAML作业将数据同步到数据湖DLF的配置方案:

MySQL整库同步数据湖DLF

MySQL整库同步数据到DLFCDC YAML作业如下所示:

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: mysql_test.\.*
  server-id: 8601-8604
  #(可选)同步增量阶段新创建的表的数据
  scan.binlog.newly-added-table.enabled: true
  #(可选)同步表注释和字段注释
  include-comments.enabled: true
  #(可选)优先分发无界的分片以避免可能出现的TaskManager OutOfMemory问题
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  #(可选)开启解析过滤,加速读取
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  #(可选)提交用户名,建议为不同作业设置不同的提交用户以避免冲突
  commit.user: your_job_name
  #(可选)开启删除向量,提升读取性能
  table.properties.deletion-vectors.enabled: true
说明

【MySQL Source配置】,建议设置下列配置项,详情请参见MySQL

  1. 参数:scan.binlog.newly-added-table.enabled

    作用:同步增量阶段新创建的表的数据。

  2. 参数:include-comments.enabled

    作用:同步表注释和字段注释。

  3. 参数:scan.incremental.snapshot.unbounded-chunk-first.enabled

    作用:避免可能出现的TaskManager OutOfMemory问题。

  4. 参数:scan.only.deserialize.captured.tables.changelog.enabled: true

    作用:仅对作业匹配的表的数据进行解析,加速读取。

说明

【Paimon Sink配置】

  1. Catalog连接参数

  • 参数前缀:catalog.properties

  • 作用:catalog连接信息

  1. 建表参数

  • 参数前缀:table.properties

  • 作用:建表信息

  • 配置建议:在建表参数中添加deletion-vectors.enabled的配置,在不损失太大写入更新性能的同时,获得极大的读取性能提升,达到近实时更新与极速查询的效果

  • 补充说明:在DLF中已经提供了自动进行文件合并的功能,不建议在建表参数中添加文件合并和bucket相关参数,例如bucket、num-sorted-run.compaction-trigger等。

  1. 提交用户

  • 参数名称:commit.user

  • 作用:写入Paimon的文件提交用户

  • 配置建议:为不同的作业设置不同的提交用户,可以设置为作业名

  • 补充说明:默认的提交用户为admin,在多个作业写入同一张表时可能出现提交冲突和不一致的问题。

写入数据湖DLF分区表

数据摄入作业的源表通常不包含分区字段信息,如果希望写入的下游表为分区表,您需要通过Flink CDC数据摄入作业开发参考中的partition-keys设置分区字段,配置示例如下:

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: mysql_test.\.*
  server-id: 8601-8604
  #(可选)同步增量阶段新创建的表的数据
  scan.binlog.newly-added-table.enabled: true
  #(可选)同步表注释和字段注释
  include-comments.enabled: true
  #(可选)优先分发无界的分片以避免可能出现的TaskManager OutOfMemory问题
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  #(可选)开启解析过滤,加速读取
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  #(可选)提交用户名,建议为不同作业设置不同的提交用户以避免冲突
  commit.user: your_job_name
  #(可选)开启删除向量,提升读取性能
  table.properties.deletion-vectors.enabled: true

transform:
  - source-table: mysql_test.tbl1
    #(可选)设置分区字段  
    partition-keys: id,pt
  - source-table: mysql_test.tbl2
    partition-keys: id,pt

写入数据湖DLF Append Only

数据摄入作业的源表包含完整的变更类型,如果希望写入的下游表为将删除操作转化为插入操作实现逻辑删除的功能,您可以通过Flink CDC数据摄入作业开发参考实现该需求,配置示例如下:

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: mysql_test.\.*
  server-id: 8601-8604
  #(可选)同步增量阶段新创建的表的数据
  scan.binlog.newly-added-table.enabled: true
  #(可选)同步表注释和字段注释
  include-comments.enabled: true
  #(可选)优先分发无界的分片以避免可能出现的TaskManager OutOfMemory问题
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  #(可选)开启解析过滤,加速读取
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  #(可选)提交用户名,建议为不同作业设置不同的提交用户以避免冲突
  commit.user: your_job_name
  #(可选)开启删除向量,提升读取性能
  table.properties.deletion-vectors.enabled: true
  
transform:
  - source-table: mysql_test.tbl1
    #(可选)设置分区字段
    partition-keys: id,pt
    #(可选)实现软删除
    projection: \*, __data_event_type__ AS op_type
    converter-after-transform: SOFT_DELETE
  - source-table: mysql_test.tbl2
    #(可选)设置分区字段
    partition-keys: id,pt
    #(可选)实现软删除
    projection: \*, __data_event_type__ AS op_type
    converter-after-transform: SOFT_DELETE
说明
  • 通过在projection中添加__data_event_type,将变更类型作为新增字段写入到下游表中。同时设置converter-after-transformSOFT_DELETE,可以将删除操作转化为插入操作,使得下游能够完整记录全部变更操作。详见Flink CDC数据摄入作业开发参考

Kafka实时同步到数据湖DLF

假设Kafkainventory Topic中存储了两张表(customersproducts)的数据,且数据格式为Debezium JSON。以下示例作业可将这两张表的数据分别同步到DLF对应的目标表:

source:
  type: kafka
  name: Kafka Source
  properties.bootstrap.servers: ${kafka.bootstrap.servers}
  topic: inventory
  scan.startup.mode: earliest-offset
  value.format: debezium-json
  debezium-json.distributed-tables: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  #(可选)提交用户名,建议为不同作业设置不同的提交用户以避免冲突
  commit.user: your_job_name
  #(可选)开启删除向量,提升读取性能
  table.properties.deletion-vectors.enabled: true

# debezium-json不包含主键信息,需要另外为表添加主键  
transform:
  - source-table: \.*.\.*
    projection: \*
    primary-keys: id
说明
  • Kafka数据源读取的格式支持canal-json、debezium-json(默认)和json格式。

  • 当数据格式为debezium-json时,由于debezium-json消息不记录主键信息,需要通过transform规则手动为表添加主键:

    transform:
      - source-table: \.*.\.*
        projection: \*
        primary-keys: id
  • 当单表的数据分布在多个分区中,或数据位于不同分区中的表需要进行分库分表合并时,需要将配置项debezium-json.distributed-tablescanal-json.distributed-tables设为true。

  • kafka数据源支持多种Schema推导策略,可以通过配置项schema.inference.strategy设置,Schema推导和变更同步策略详情请参见消息队列Kafka

如果您希望添加进行更精细的作业配置,可以查询Flink CDC数据摄入作业开发参考