本文为您介绍使用数据摄入CDC YAML作业将实时数据写入常用数据仓库的最佳实践。
Flink CDC实时同步至Hologres数仓
使用数据摄入CDC YAML同步数据到Hologres搭建实时数仓,可以充分利用Flink强大的实时处理能力和Hologres提供的Binlog、行列共存和资源强隔离等能力,实现高效、可扩展的实时数据处理和分析。
MySQL实时同步至Hologres数仓
最基本的MySQL整库同步Hologres的数据摄入CDC YAML作业如下所示:
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: holo_test.\.*
server-id: 8601-8604
#(可选)同步表注释和字段注释
include-comments.enabled: true
#(可选)优先分发无界的分片以避免可能出现的TaskManager OutOfMemory问题
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
#(可选)优先分发无界的分片以避免可能出现的TaskManager OutOfMemory问题
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
#(可选)开启解析过滤,加速读取
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: hologres
name: Hologres Sink
endpoint: ****.hologres.aliyuncs.com:80
dbname: cdcyaml_test
username: ${secret_values.holo-username}
password: ${secret_values.holo-password}
sink.type-normalize-strategy: BROADEN
使用更宽容的类型映射
Hologres连接器无法处理列类型变更事件,但支持了多种类型映射关系。为了更好地支持数据源的变更,您可以通过将多个MySQL数据类型映射到更宽的Hologres类型,跳过不必要的类型变更事件,从而让作业正常运行。您可以通过配置项sink.type-normalize-strategy
进行更改,默认值为STANDARD,详情请见数据摄入YAML作业Hologres连接器类型映射。
例如,可以使用ONLY_BIGINT_OR_TEXT让类型只对应到Hologres的int8和text类型。此时如果MySQL某个列的类型从INT改为BIGINT,Hologres将这两种MySQL类型对应到int8类型,作业不会因为无法处理类型转换而报错。
source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: holo_test.\.*
server-id: 8601-8604
#(可选)同步表注释和字段注释
include-comments.enabled: true
#(可选)优先分发无界的分片以避免可能出现的TaskManager OutOfMemory问题
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
#(可选)优先分发无界的分片以避免可能出现的TaskManager OutOfMemory问题
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
#(可选)开启解析过滤,加速读取
scan.only.deserialize.captured.tables.changelog.enabled: true
sink:
type: hologres
name: Hologres Sink
endpoint: ****.hologres.aliyuncs.com:80
dbname: cdcyaml_test
username: ${secret_values.holo-username}
password: ${secret_values.holo-password}
sink.type-normalize-strategy: ONLY_BIGINT_OR_TEXT
分区表写入
数据摄入YAML在使用Hologres连接器作为目标端时支持写入分区表,详情请参见分区表写入。
Kafka实时同步至Hologres数仓
实现实时分发为您提供了在Kafka中存储MySQL数据的方案。进一步的,数据摄入YAML支持同步Kafka数据到Hologres搭建实时数仓。
假设Kafka中名为inventory的topic中存有debezium-json格式的两张表customers和products的数据,下面的作业可以将两张表的数据分别同步到Hologres对应的两张表中。
source:
type: kafka
name: Kafka Source
properties.bootstrap.servers: ${kafka.bootstrap.servers}
topic: inventory
scan.startup.mode: earliest-offset
value.format: debezium-json
debezium-json.distributed-tables: true
sink:
type: hologres
name: Hologres Sink
endpoint: ****.hologres.aliyuncs.com:80
dbname: cdcyaml_test
username: ${secret_values.holo-username}
password: ${secret_values.holo-password}
sink.type-normalize-strategy: ONLY_BIGINT_OR_TEXT
transform:
- source-table: \.*.\.*
projection: \*
primary-keys: id
Kafka数据源读取的格式支持json、canal-json和debezium-json(默认)。
当数据格式为debezium-json时,需要通过transform规则手动为表添加主键:
transform: - source-table: \.*.\.* projection: \* primary-keys: id
当单表的数据分布在多个分区中,或数据位于不同分区中的表需要进行分库分表合并时,需要将配置项debezium-json.distributed-tables或canal-json.distributed-tables设为true。
kafka数据源支持多种Schema推导策略,可以通过配置项schema.inference.strategy设置,Schema推导和变更同步策略详情请参见消息队列Kafka。