实时数据入仓

本文为您介绍使用数据摄入CDC YAML作业将实时数据写入常用数据仓库的最佳实践。

Flink CDC实时同步至Hologres数仓

使用数据摄入CDC YAML同步数据到Hologres搭建实时数仓,可以充分利用Flink强大的实时处理能力和Hologres提供的Binlog、行列共存和资源强隔离等能力,实现高效、可扩展的实时数据处理和分析。

MySQL实时同步至Hologres数仓

最基本的MySQL整库同步Hologres的数据摄入CDC YAML作业如下所示:

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: holo_test.\.*
  server-id: 8601-8604
  #(可选)同步表注释和字段注释
  include-comments.enabled: true
  #(可选)优先分发无界的分片以避免可能出现的TaskManager OutOfMemory问题
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  #(可选)优先分发无界的分片以避免可能出现的TaskManager OutOfMemory问题
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  #(可选)开启解析过滤,加速读取
  scan.only.deserialize.captured.tables.changelog.enabled: true  

sink:
  type: hologres
  name: Hologres Sink
  endpoint: ****.hologres.aliyuncs.com:80
  dbname: cdcyaml_test
  username: ${secret_values.holo-username}
  password: ${secret_values.holo-password}
  sink.type-normalize-strategy: BROADEN
说明
  • 建议在作业最开始启动时就在MySQL Source中设置include-comments.enabled以同步表注释和字段注释。详情请参见MySQL

  • 建议在作业最开始启动时就在MySQL Source中设置scan.incremental.snapshot.unbounded-chunk-first.enabled以避免可能出现的TaskManager OutOfMemory问题。详情请参见MySQL

使用更宽容的类型映射

Hologres连接器无法处理列类型变更事件,但支持了多种类型映射关系。为了更好地支持数据源的变更,您可以通过将多个MySQL数据类型映射到更宽的Hologres类型,跳过不必要的类型变更事件,从而让作业正常运行。您可以通过配置项sink.type-normalize-strategy进行更改,默认值为STANDARD,详情请见数据摄入YAML作业Hologres连接器类型映射

例如,可以使用ONLY_BIGINT_OR_TEXT让类型只对应到Hologresint8text类型。此时如果MySQL某个列的类型从INT改为BIGINT,Hologres将这两种MySQL类型对应到int8类型,作业不会因为无法处理类型转换而报错。

source:
  type: mysql
  name: MySQL Source
  hostname: localhost
  port: 3306
  username: username
  password: password
  tables: holo_test.\.*
  server-id: 8601-8604
  #(可选)同步表注释和字段注释
  include-comments.enabled: true
  #(可选)优先分发无界的分片以避免可能出现的TaskManager OutOfMemory问题
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  #(可选)优先分发无界的分片以避免可能出现的TaskManager OutOfMemory问题
  scan.incremental.snapshot.unbounded-chunk-first.enabled: true
  #(可选)开启解析过滤,加速读取
  scan.only.deserialize.captured.tables.changelog.enabled: true

sink:
  type: hologres
  name: Hologres Sink
  endpoint: ****.hologres.aliyuncs.com:80
  dbname: cdcyaml_test
  username: ${secret_values.holo-username}
  password: ${secret_values.holo-password}
  sink.type-normalize-strategy: ONLY_BIGINT_OR_TEXT

分区表写入

数据摄入YAML在使用Hologres连接器作为目标端时支持写入分区表,详情请参见分区表写入

Kafka实时同步至Hologres数仓

实现实时分发为您提供了在Kafka中存储MySQL数据的方案。进一步的,数据摄入YAML支持同步Kafka数据到Hologres搭建实时数仓。

假设Kafka中名为inventorytopic中存有debezium-json格式的两张表customersproducts的数据,下面的作业可以将两张表的数据分别同步到Hologres对应的两张表中。

source:
  type: kafka
  name: Kafka Source
  properties.bootstrap.servers: ${kafka.bootstrap.servers}
  topic: inventory
  scan.startup.mode: earliest-offset
  value.format: debezium-json
  debezium-json.distributed-tables: true

sink:
  type: hologres
  name: Hologres Sink
  endpoint: ****.hologres.aliyuncs.com:80
  dbname: cdcyaml_test
  username: ${secret_values.holo-username}
  password: ${secret_values.holo-password}
  sink.type-normalize-strategy: ONLY_BIGINT_OR_TEXT
 
transform:
  - source-table: \.*.\.*
    projection: \*
    primary-keys: id
说明
  • Kafka数据源读取的格式支持json、canal-jsondebezium-json(默认)。

  • 当数据格式为debezium-json时,需要通过transform规则手动为表添加主键:

    transform:
      - source-table: \.*.\.*
        projection: \*
        primary-keys: id
  • 当单表的数据分布在多个分区中,或数据位于不同分区中的表需要进行分库分表合并时,需要将配置项debezium-json.distributed-tablescanal-json.distributed-tables设为true。

  • kafka数据源支持多种Schema推导策略,可以通过配置项schema.inference.strategy设置,Schema推导和变更同步策略详情请参见消息队列Kafka