本文为您介绍使用数据摄入YAML作业将实时数据写入常用消息队列的最佳实践。
数据库全增量实时写入Kafka
通过数据摄入YAML作业将来自MySQL的数据导入到Kafka中后,您可以按照需要再将数据分发到下游的不同系统中,避免多个作业直接连接业务库,降低业务库处理压力。
MySQL Binlog数据同步到Kafka
在某些场景下,您希望存储原始的Binlog数据,方便后续的数据审计、数据重放等工作。数据摄入YAML支持同步MySQL原始Binlog数据到Kafka,方便您分布式读取Binlog数据,解决数据热点问题。
假设数据库kafka_test中有两张表customers和products,下面的作业可以分别将表数据同步到customers和products两个topic中。
source:
type: mysql
name: MySQL Source
hostname: ${secret_values.mysql.hostname}
port: ${mysql.port}
username: ${secret_values.mysql.username}
password: ${secret_values.mysql.password}
tables: kafka_test.\.*
server-id: 8601-8604
#(可选)同步新增表的全量和增量数据
scan.newly-added-table.enabled: true
#(可选)同步表注释和字段注释
include-comments.enabled: true
#(可选)优先分发无界的分片以避免可能出现的TaskManager OutOfMemory问题
scan.incremental.snapshot.unbounded-chunk-first.enabled: true
#(可选)开启解析过滤,加速读取
scan.only.deserialize.captured.tables.changelog.enabled: true
# 将数据的变更时间作为元数据下发
metadata-column.include-list: op_ts
sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: ${kafka.bootstraps.server}
# 阿里云Kafka不支持幂等和事务写入,关闭幂等功能
properties.enable.idempotence: false
#(可选)设置上游表与Kafka topic的映射关系
sink.tableId-to-topic.mapping: kafka_test.customers:customers;kafka_test.products:products
customers表的一条Update语句产生Kafka消息的消息体格式如下:
// debezium-json
{
"before": {
"id": 4,
"name": "John",
"address": "New York",
"phone_number": "2222",
"age": 12
},
"after": {
"id": 4,
"name": "John",
"address": "New York",
"phone_number": "1234",
"age": 12
},
"op": "u",
"source": {
"db": "kafka_test",
"table": "customers",
"ts_ms": 1728528674000
}
}
// canal-json
{
"old": [
{
"id": 4,
"name": "John",
"address": "New York",
"phone_number": "2222",
"age": 12
}
],
"data": [
{
"id": 4,
"name": "John",
"address": "New York",
"phone_number": "1234",
"age": 12
}
],
"type": "UPDATE",
"database": "kafka_test",
"table": "customers",
"pkNames": [
"id"
],
"ts": 1728528674000,
"es": 0
}
写入的Binlog格式支持json、canal-json和debezium-json(默认),详情请参见消息队列Kafka。
如果不使用sink.tableId-to-topic.mapping参数,在Kafka中会使用database.table的格式创建topic。例如MySQL表
kafka_test.customers
在Kafka中对应的topic名称为kafka_test.customers
。使用sink.tableId-to-topic.mapping可以配置上游表与写入topic的映射关系,能够在写出Kafka消息保留源表表名的同时修改写出的topic。详情请参见消息队列Kafka。默认所有数据写入Topic的0号分区,可以使用
partition.strategy
配置进行调整,详情请参见消息队列Kafka。例如可使用如下配置:partition.strategy: hash-by-key,每个表的数据会根据主键的哈希值将数据写到多个分区,并保证同一个主键的数据在同一个分区并且有序。阿里云消息队列Kafka版不支持幂等和事务写入,作为数据摄入目标端时,需要在数据摄入目标端添加配置项
properties.enable.idempotence: false
以关闭幂等写入功能。
Kafka实时摄入到数据湖DLF
MySQL Binlog数据同步到Kafka为您提供了在Kafka中存储MySQL数据的方案,进一步的,您可以配置数据摄入YAML同步Kafka数据到DLF存储。
假设Kafka中名为inventory的topic中存有debezium-json格式的两张表customers和products的数据,下面的作业可以将两张表的数据分别同步到DLF对应的两张表中:
source:
type: kafka
name: Kafka Source
properties.bootstrap.servers: ${kafka.bootstrap.servers}
topic: inventory
scan.startup.mode: earliest-offset
value.format: debezium-json
debezium-json.distributed-tables: true
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
#(可选)开启删除向量,提升读取性能
table.properties.deletion-vectors.enabled: true
# debezium-json不包含主键信息,需要另外为表添加主键
transform:
- source-table: \.*.\.*
projection: \*
primary-keys: id
Kafka数据源读取的格式支持canal-json和debezium-json(默认)。
当数据格式为debezium-json时,需要通过transform规则手动为表添加主键:
transform: - source-table: \.*.\.* projection: \* primary-keys: id
当单表的数据分布在多个分区中,或数据位于不同分区中的表需要进行分库分表合并时,需要将配置项debezium-json.distributed-tables或canal-json.distributed-tables设为true。
kafka数据源支持多种Schema推导策略,可以通过配置项schema.inference.strategy设置,Schema推导和变更同步策略详情请参见消息队列Kafka。