Maxwell

本文为您介绍Maxwell格式的使用示例、配置选项和类型映射。

背景信息

Maxwell是一个CDC (Changelog Data Capture,变更数据获取)工具,可以把来自MySQL的更改实时流传输到Kafka、Kinesis和其他流连接器中。Maxwell为变更日志提供了统一的格式结构,并支持使用JSON序列化消息。支持Maxwell格式的连接器有消息队列Kafka对象存储OSS

Flink支持将Maxwell JSON消息解析为INSERT、UPDATEDELETE消息到Flink SQL系统中。在很多情况下,利用这个特性非常的有用,例如:

  • 将增量数据从数据库同步到其他系统

  • 日志审计

  • 数据库的实时物化视图

  • 数据库表的temporal join变更历史

Flink还支持将Flink SQL中的INSERT、UPDATEDELETE消息编码为Maxwell格式的JSON消息,输出到Kafka等存储中。

重要

目前Flink还不支持将UPDATE_BEFOREUPDATE_AFTER合并为一条UPDATE消息。因此,FlinkUPDATE_BEFOREUPDATE_AFTER分别编码为DELETEINSERT类型的Maxwell消息。

使用示例

假设MySQL products表有4列(id、name、description、weight),一个Maxwell格式的从MySQL products表捕获的更新操作的简单示例如下:

{
   "database":"test",
   "table":"e",
   "type":"insert",
   "ts":1477053217,
   "xid":23396,
   "commit":true,
   "position":"master.000006:800911",
   "server_id":23042,
   "thread_id":108,
   "primary_key": [1, "2016-10-21 05:33:37.523000"],
   "primary_key_columns": ["id", "c"],
   "data":{
     "id":111,
     "name":"scooter",
     "description":"Big 2-wheel scooter",
     "weight":5.15
   },
   "old":{
     "weight":5.18,
   }
}
说明

示例中各字段的含义,详情请参见Maxwell

上面的JSON消息是products表上的一条更新事件,其中id = 111的行的weight值从5.18更改为5.15。假设此消息已同步到名为products_binlogKafka topic,则可以使用以下DDL来使用此topic并解析更改事件。

CREATE TABLE topic_products (
  --元数据与MySQL "products"表完全相同。
  id BIGINT,
  name STRING,
  description STRING,
  weight DECIMAL(10, 2)
) WITH (
 'connector' = 'kafka',
 'topic' = 'products_binlog',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'maxwell-json'
)

在将topic注册为Flink表之后,可以将Maxwell消息用作变更日志源。

--关于MySQL "products"表的实时物化视图。
--计算相同产品的最新平均重量。
SELECT name, AVG(weight) FROM topic_products GROUP BY name;

--将MySQL "products"表的所有数据和增量更改同步到。
--Elasticsearch "products"索引以供将来搜索。
INSERT INTO elasticsearch_products
SELECT * FROM topic_products;

配置选项

选项

是否必填

默认

类型

描述

format

(none)

String

指定要使用的格式,此处应为maxwell-json。

maxwell-json.ignore-parse-errors

false

Boolean

参数取值如下:

  • true:当解析异常时,跳过当前字段或行。

  • false(默认值):抛出错误,作业启动失败。

maxwell-json.timestamp-format.standard

SQL

String

指定输入和输出时间戳格式。参数取值如下:

  • SQL:解析yyyy-MM-dd HH:mm:ss.s{precision}格式的输入时间戳,例如2020-12-30 12:13:14.123,并以相同格式输出时间戳。

  • ISO-8601:解析yyyy-MM-ddTHH:mm:ss.s{precision}格式的输入时间戳,例如2020-12-30T12:13:14.123,并以相同的格式输出时间戳。

maxwell-json.map-null-key.mode

FAIL

String

指定处理Mapkey值为空的方法。参数取值如下:

  • FAIL:在Mapkey值为空的时候抛出异常。

  • DROP:丢弃Mapkey值为空的数据项。

  • LITERAL:使用字符串常量来替换Map中的空key值。字符串常量的值由maxwell-json.map-null-key.literal定义。

maxwell-json.map-null-key.literal

null

String

maxwell-json.map-null-key.mode的值是LITERAL时,指定字符串常量替换Map中的空key值。

maxwell-json.encode.decimal-as-plain-number

false

Boolean

参数取值如下:

  • true:所有DECIMAL类型的数据保持原状,不使用科学计数法表示,例如0.000000027表示为0.000000027。

  • false:所有DECIMAL类型的数据,使用科学计数法表示,例如0.000000027表示为2.7E-8。

类型映射

目前,Maxwell使用JSON格式进行序列化和反序列化。有关数据类型映射的更多详细信息,请参见JSON Format。

可用的元数据

下面的格式元数据可以在DDL语句中声明为只读(VIRTUAL)列。

重要

格式元数据字段只有在相应的连接器转发格式元数据时才可用。目前,只有Kafka连接器能够声明其值格式的元数据字段。

数据类型

说明

database

STRING NULL

原始数据库。对应于Maxwell记录中的database字段。

table

STRING NULL

原始数据库的表。对应于Maxwell记录中的table字段。

primary-key-columns

ARRAY<STRING> NULL

主键名称数组。对应于Maxwell记录中的primary_key_columns字段。

ingestion-timestamp

TIMESTAMP_LTZ(3) NULL

连接器处理事件时的时间戳。对应于Maxwell记录中的ts字段。

如何在Kafka中访问Maxwell元数据字段的diam示例如下。

CREATE TABLE KafkaTable (
  origin_database STRING METADATA FROM 'value.database' VIRTUAL,
  origin_table STRING METADATA FROM 'value.table' VIRTUAL,
  origin_primary_key_columns ARRAY<STRING> METADATA FROM 'value.primary-key-columns' VIRTUAL,
  origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
  user_id BIGINT,
  item_id BIGINT,
  behavior STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'value.format' = 'maxwell-json'
);

常见问题

故障时投递重复的变更事件

在正常的操作环境下,Maxwell能够以exactly-once的语义投递每条变更事件,Flink能够正常消费Maxwell产生的变更事件。在非正常情况下(例如有故障发生),Maxwell只能保证at-least-once的投递语义。此时,Maxwell可能会投递重复的变更事件到Kafka中,当FlinkKafka中消费的时候就会得到重复的事件,可能导致Flink query的运行得到错误的结果或者非预期的异常。因此,在这种情况下,建议将作业参数table.exec.source.cdc-events-duplicate设置成true,并在该源表上定义PRIMARY KEY。Flink系统会生成一个额外的有状态算子,使用该PRIMARY KEY来对变更事件去重并生成一个规范化的changelog流。