基于Flink CDC实现实时分发

本文为您介绍使用数据摄入YAML作业将实时数据写入常用消息队列的最佳实践。

数据库全增量实时写入Kafka

通过数据摄入YAML作业将来自MySQL的数据导入到Kafka中后,您可以按照需要再将数据分发到下游的不同系统中,避免多个作业直接连接业务库,降低业务库处理压力。

MySQL Binlog数据同步到Kafka

在某些场景下,您希望存储原始的Binlog数据,方便后续的数据审计、数据重放等工作。数据摄入YAML支持同步MySQL原始Binlog数据到Kafka,方便您分布式读取Binlog数据,解决数据热点问题。

假设数据库kafka_test中有两张表customersproducts,下面的作业可以分别将表数据同步到customersproducts两个topic中。

source:
  type: mysql
  name: MySQL Source
  hostname: ${secret_values.mysql.hostname}
  port: ${mysql.port}
  username: ${secret_values.mysql.username}
  password: ${secret_values.mysql.password}
  tables: kafka_test.\.*
  server-id: 8601-8604
  #(可选)同步新增表的全量和增量数据
  scan.newly-added-table.enabled: true
  #(可选)同步表注释和字段注释
  include-comments.enabled: true
  #(可选)优先分发无界的分片以避免可能出现的TaskManager OutOfMemory问题
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  #(可选)开启解析过滤,加速读取
  scan.only.deserialize.captured.tables.changelog.enabled: true
  # 将数据的变更时间作为元数据下发
  metadata-column.include-list: op_ts

sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: ${kafka.bootstraps.server}
  # 阿里云Kafka不支持幂等和事务写入,关闭幂等功能
  properties.enable.idempotence: false
  #(可选)设置上游表与Kafka topic的映射关系
  sink.tableId-to-topic.mapping: kafka_test.customers:customers;kafka_test.products:products

customers表的一条Update语句产生Kafka消息的消息体格式如下:

// debezium-json
{
  "before": {
    "id": 4,
    "name": "John",
    "address": "New York",
    "phone_number": "2222",
    "age": 12
  },
  "after": {
    "id": 4,
    "name": "John",
    "address": "New York",
    "phone_number": "1234",
    "age": 12
  },
  "op": "u",
  "source": {
    "db": "kafka_test",
    "table": "customers",
    "ts_ms": 1728528674000
  }
}

// canal-json
{
  "old": [
    {
      "id": 4,
      "name": "John",
      "address": "New York",
      "phone_number": "2222",
      "age": 12
    }
  ],
  "data": [
    {
      "id": 4,
      "name": "John",
      "address": "New York",
      "phone_number": "1234",
      "age": 12
    }
  ],
  "type": "UPDATE",
  "database": "kafka_test",
  "table": "customers",
  "pkNames": [
    "id"
  ],
  "ts": 1728528674000,
  "es": 0
}
说明
  • 写入的Binlog格式支持json、canal-jsondebezium-json(默认),详情请参见消息队列Kafka

  • 如果不使用sink.tableId-to-topic.mapping参数,在Kafka中会使用database.table的格式创建topic。例如MySQLkafka_test.customersKafka中对应的topic名称为kafka_test.customers。使用sink.tableId-to-topic.mapping可以配置上游表与写入topic的映射关系,能够在写出Kafka消息保留源表表名的同时修改写出的topic。详情请参见消息队列Kafka

  • 默认所有数据写入Topic0号分区,可以使用partition.strategy配置进行调整,详情请参见消息队列Kafka。例如可使用如下配置:partition.strategy: hash-by-key,每个表的数据会根据主键的哈希值将数据写到多个分区,并保证同一个主键的数据在同一个分区并且有序。

  • 阿里云消息队列Kafka版不支持幂等和事务写入,作为数据摄入目标端时,需要在数据摄入目标端添加配置项properties.enable.idempotence: false以关闭幂等写入功能。

Kafka实时摄入到数据湖DLF

MySQL Binlog数据同步到Kafka为您提供了在Kafka中存储MySQL数据的方案,进一步的,您可以配置数据摄入YAML同步Kafka数据到DLF存储。

假设Kafka中名为inventorytopic中存有debezium-json格式的两张表customersproducts的数据,下面的作业可以将两张表的数据分别同步到DLF对应的两张表中:

source:
  type: kafka
  name: Kafka Source
  properties.bootstrap.servers: ${kafka.bootstrap.servers}
  topic: inventory
  scan.startup.mode: earliest-offset
  value.format: debezium-json
  debezium-json.distributed-tables: true

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  #(可选)开启删除向量,提升读取性能
  table.properties.deletion-vectors.enabled: true

# debezium-json不包含主键信息,需要另外为表添加主键  
transform:
  - source-table: \.*.\.*
    projection: \*
    primary-keys: id
说明
  • Kafka数据源读取的格式支持canal-jsondebezium-json(默认)。

  • 当数据格式为debezium-json时,需要通过transform规则手动为表添加主键:

    transform:
      - source-table: \.*.\.*
        projection: \*
        primary-keys: id
  • 当单表的数据分布在多个分区中,或数据位于不同分区中的表需要进行分库分表合并时,需要将配置项debezium-json.distributed-tablescanal-json.distributed-tables设为true。

  • kafka数据源支持多种Schema推导策略,可以通过配置项schema.inference.strategy设置,Schema推导和变更同步策略详情请参见消息队列Kafka