本文为您介绍Maxwell格式的使用示例、配置选项和类型映射。
背景信息
Maxwell是一个CDC (Changelog Data Capture,变更数据获取)工具,可以把来自MySQL的更改实时流传输到Kafka、Kinesis和其他流连接器中。Maxwell为变更日志提供了统一的格式结构,并支持使用JSON序列化消息。支持Maxwell格式的连接器有消息队列Kafka和对象存储OSS。
Flink支持将Maxwell JSON消息解析为INSERT、UPDATE或DELETE消息到Flink SQL系统中。在很多情况下,利用这个特性非常的有用,例如:
将增量数据从数据库同步到其他系统
日志审计
数据库的实时物化视图
数据库表的temporal join变更历史
Flink还支持将Flink SQL中的INSERT、UPDATE或DELETE消息编码为Maxwell格式的JSON消息,输出到Kafka等存储中。
目前Flink还不支持将UPDATE_BEFORE和UPDATE_AFTER合并为一条UPDATE消息。因此,Flink将UPDATE_BEFORE和UPDATE_AFTER分别编码为DELETE和INSERT类型的Maxwell消息。
使用示例
假设MySQL products表有4列(id、name、description、weight),一个Maxwell格式的从MySQL products表捕获的更新操作的简单示例如下:
{
"database":"test",
"table":"e",
"type":"insert",
"ts":1477053217,
"xid":23396,
"commit":true,
"position":"master.000006:800911",
"server_id":23042,
"thread_id":108,
"primary_key": [1, "2016-10-21 05:33:37.523000"],
"primary_key_columns": ["id", "c"],
"data":{
"id":111,
"name":"scooter",
"description":"Big 2-wheel scooter",
"weight":5.15
},
"old":{
"weight":5.18,
}
}
示例中各字段的含义,详情请参见Maxwell。
上面的JSON消息是products表上的一条更新事件,其中id = 111的行的weight值从5.18更改为5.15。假设此消息已同步到名为products_binlog的Kafka topic,则可以使用以下DDL来使用此topic并解析更改事件。
CREATE TABLE topic_products (
--元数据与MySQL "products"表完全相同。
id BIGINT,
name STRING,
description STRING,
weight DECIMAL(10, 2)
) WITH (
'connector' = 'kafka',
'topic' = 'products_binlog',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'maxwell-json'
)
在将topic注册为Flink表之后,可以将Maxwell消息用作变更日志源。
--关于MySQL "products"表的实时物化视图。
--计算相同产品的最新平均重量。
SELECT name, AVG(weight) FROM topic_products GROUP BY name;
--将MySQL "products"表的所有数据和增量更改同步到。
--Elasticsearch "products"索引以供将来搜索。
INSERT INTO elasticsearch_products
SELECT * FROM topic_products;
配置选项
选项 | 是否必填 | 默认 | 类型 | 描述 |
format | 是 | (none) | String | 指定要使用的格式,此处应为maxwell-json。 |
maxwell-json.ignore-parse-errors | 否 | false | Boolean | 参数取值如下:
|
maxwell-json.timestamp-format.standard | 否 | SQL | String | 指定输入和输出时间戳格式。参数取值如下:
|
maxwell-json.map-null-key.mode | 否 | FAIL | String | 指定处理Map中key值为空的方法。参数取值如下:
|
maxwell-json.map-null-key.literal | 否 | null | String | 当maxwell-json.map-null-key.mode的值是LITERAL时,指定字符串常量替换Map中的空key值。 |
maxwell-json.encode.decimal-as-plain-number | 否 | false | Boolean | 参数取值如下:
|
类型映射
目前,Maxwell使用JSON格式进行序列化和反序列化。有关数据类型映射的更多详细信息,请参见JSON Format。
可用的元数据
下面的格式元数据可以在DDL语句中声明为只读(VIRTUAL)列。
格式元数据字段只有在相应的连接器转发格式元数据时才可用。目前,只有Kafka连接器能够声明其值格式的元数据字段。
键 | 数据类型 | 说明 |
database | STRING NULL | 原始数据库。对应于Maxwell记录中的database字段。 |
table | STRING NULL | 原始数据库的表。对应于Maxwell记录中的table字段。 |
primary-key-columns | ARRAY<STRING> NULL | 主键名称数组。对应于Maxwell记录中的primary_key_columns字段。 |
ingestion-timestamp | TIMESTAMP_LTZ(3) NULL | 连接器处理事件时的时间戳。对应于Maxwell记录中的ts字段。 |
如何在Kafka中访问Maxwell元数据字段的diam示例如下。
CREATE TABLE KafkaTable (
origin_database STRING METADATA FROM 'value.database' VIRTUAL,
origin_table STRING METADATA FROM 'value.table' VIRTUAL,
origin_primary_key_columns ARRAY<STRING> METADATA FROM 'value.primary-key-columns' VIRTUAL,
origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
user_id BIGINT,
item_id BIGINT,
behavior STRING
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'value.format' = 'maxwell-json'
);
常见问题
故障时投递重复的变更事件
在正常的操作环境下,Maxwell能够以exactly-once的语义投递每条变更事件,Flink能够正常消费Maxwell产生的变更事件。在非正常情况下(例如有故障发生),Maxwell只能保证at-least-once的投递语义。此时,Maxwell可能会投递重复的变更事件到Kafka中,当Flink从Kafka中消费的时候就会得到重复的事件,可能导致Flink query的运行得到错误的结果或者非预期的异常。因此,在这种情况下,建议将作业参数table.exec.source.cdc-events-duplicate设置成true,并在该源表上定义PRIMARY KEY。Flink系统会生成一个额外的有状态算子,使用该PRIMARY KEY来对变更事件去重并生成一个规范化的changelog流。