实现实时入湖

本文为您介绍使用数据摄入CDC YAML作业将实时数据写入阿里云数据湖构建的最佳实践。

阿里云数据湖构建(Data Lake Formation,简称DLF)是一款全托管的统一元数据和数据存储及管理平台,为客户提供元数据管理、权限管理和存储优化等功能。详情请参见什么是数据湖构建

数据摄入作业支持使用DLFPaimon Catalog作为目标端,您可以使用数据摄入作业完成大规模整库数据入湖需求。

MySQL整库同步数据湖DLF

MySQL整库同步数据到DLFCDC YAML作业如下所示:

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: mysql_test.\.*
  server-id: 8601-8604
  #(可选)同步增量阶段新创建的表的数据
  scan.binlog.newly-added-table.enabled: true
  #(可选)同步表注释和字段注释
  include-comments.enabled: true
  #(可选)优先分发无界的分片以避免可能出现的TaskManager OutOfMemory问题
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  #(可选)开启解析过滤,加速读取
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: paimon
  # Metastore类型,固定为rest
  catalog.properties.metastore: rest
  # Token提供方,固定为dlf
  catalog.properties.token.provider: dlf
  # 访问DLF Rest Catalog Server的URI,格式为http://[region-id]-vpc.dlf.aliyuncs.com,如http://cn-hangzhou-vpc.dlf.aliyuncs.com
  catalog.properties.uri: dlf_uri
  # DLF Catalog名称。
  catalog.properties.warehouse: your_warehouse
  #(可选)提交用户名,建议为不同作业设置不同的提交用户以避免冲突。
  commit.user: your_job_name
  #(可选)开启删除向量,提升读取性能
  table.properties.deletion-vectors.enabled: true
说明
  • 【MySQL Source配置】更多参数详情请参见MySQL

  • catalog.properties.uri详情请参见服务接入点

  • 建议为不同作业设置不同的commit.user参数以避免冲突。

  • 在建表参数中添加deletion-vectors.enabled的配置,在不损失太大写入更新性能的同时,获得极大的读取性能提升,达到近实时更新与极速查询的效果。

  • DLF中已经提供了自动进行文件合并的功能,不建议在建表参数中添加文件合并和bucket相关参数,例如bucket、num-sorted-run.compaction-trigger等。

写入数据湖DLF分区表

数据摄入作业的源表通常不包含分区字段信息,如果希望写入的下游表为分区表,您需要通过Flink CDC数据摄入作业开发参考中的partition-keysFlink CDC数据摄入作业开发参考设置分区字段,配置示例如下:

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: mysql_test.\.*
  server-id: 8601-8604
  #(可选)同步增量阶段新创建的表的数据
  scan.binlog.newly-added-table.enabled: true
  #(可选)同步表注释和字段注释
  include-comments.enabled: true
  #(可选)优先分发无界的分片以避免可能出现的TaskManager OutOfMemory问题
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  #(可选)开启解析过滤,加速读取
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  #(可选)提交用户名,建议为不同作业设置不同的提交用户以避免冲突。
  commit.user: your_job_name
  #(可选)开启删除向量,提升读取性能
  table.properties.deletion-vectors.enabled: true

transform:
  - source-table: mysql_test.tbl1
    #(可选)设置分区字段  
    partition-keys: id,pt
  - source-table: mysql_test.tbl2
    partition-keys: id,pt

写入数据湖DLF Append Only

数据摄入作业的源表包含完整的变更类型,如果希望写入的下游表为将删除操作转化为插入操作实现逻辑删除的功能,您可以通过Flink CDC数据摄入作业开发参考实现该需求,配置示例如下:

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: mysql_test.\.*
  server-id: 8601-8604
  #(可选)同步增量阶段新创建的表的数据
  scan.binlog.newly-added-table.enabled: true
  #(可选)同步表注释和字段注释
  include-comments.enabled: true
  #(可选)优先分发无界的分片以避免可能出现的TaskManager OutOfMemory问题
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  #(可选)开启解析过滤,加速读取
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  #(可选)提交用户名,建议为不同作业设置不同的提交用户以避免冲突。
  commit.user: your_job_name
  #(可选)开启删除向量,提升读取性能
  table.properties.deletion-vectors.enabled: true
  
transform:
  - source-table: mysql_test.tbl1
    #(可选)设置分区字段
    partition-keys: id,pt
    #(可选)实现软删除
    projection: \*, __data_event_type__ AS op_type
    converter-after-transform: SOFT_DELETE
  - source-table: mysql_test.tbl2
    #(可选)设置分区字段
    partition-keys: id,pt
    #(可选)实现软删除
    projection: \*, __data_event_type__ AS op_type
    converter-after-transform: SOFT_DELETE
说明
  • 通过在projection中添加__data_event_type,将变更类型作为新增字段写入到下游表中。同时设置converter-after-transformSOFT_DELETE,可以将删除操作转化为插入操作,使得下游能够完整记录全部变更操作。详见Flink CDC数据摄入作业开发参考

Kafka实时同步到数据湖DLF

实现实时分发能够将MySQL数据实时同步至Kafka,在此基础上,您可以配置CDC YAML作业同步Kafka数据到DLF存储。

假设Kafkainventory Topic中存储了两张表(customersproducts)的数据,且数据格式为Debezium JSON。以下示例作业可将这两张表的数据分别同步到DLF对应的目标表:

source:
  type: kafka
  name: Kafka Source
  properties.bootstrap.servers: ${kafka.bootstrap.servers}
  topic: inventory
  scan.startup.mode: earliest-offset
  value.format: debezium-json
  debezium-json.distributed-tables: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  #(可选)提交用户名,建议为不同作业设置不同的提交用户以避免冲突。
  commit.user: your_job_name
  #(可选)开启删除向量,提升读取性能
  table.properties.deletion-vectors.enabled: true

# debezium-json不包含主键信息,需要另外为表添加主键  
transform:
  - source-table: \.*.\.*
    projection: \*
    primary-keys: id
说明
  • Kafka数据源读取的格式支持canal-json、debezium-json(默认)和json格式。

  • 当数据格式为debezium-json时,由于debezium-json消息不记录主键信息,需要通过transform规则手动为表添加主键:

    transform:
      - source-table: \.*.\.*
        projection: \*
        primary-keys: id
  • 当单表的数据分布在多个分区中,或数据位于不同分区中的表需要进行分库分表合并时,需要将配置项debezium-json.distributed-tablescanal-json.distributed-tables设为true。

  • kafka数据源支持多种Schema推导策略,可以通过配置项schema.inference.strategy设置,Schema推导和变更同步策略详情请参见消息队列Kafka