配置Kafka JSON Catalog后,您可以在Flink全托管作业开发中直接访问Kafka集群中格式为JSON的Topic,无需再定义Schema。本文为您介绍如何在Flink全托管模式下配置、查看及删除Kafka JSON Catalog。
背景信息
Kafka JSON Catalog通过自动解析JSON格式的消息来推导Topic的Schema,您无需在Flink SQL中声明Kafka表的Schema便可以获取消息的具体字段信息。Kafka
JSON Catalog具有以下功能特点:
- Kafka JSON Catalog的表名对应Kafka Topic名,无需再通过DDL语句手动注册Kafka表,提升开发效率和正确性。
- Kafka JSON Catalog提供的表可以直接作为Flink SQL作业中的源表使用。
- Kafka JSON Catalog可以配合CREATE TABLE AS(CTAS)语句完成Schema变更的数据同步。
本文将从以下方面为您介绍如何管理Kafka JSON Catalog:
使用限制
- Kafka JSON Catalog仅支持消息格式为JSON的Topic,暂不支持其它格式。
- 仅Flink计算引擎VVR 6.0.2及以上版本支持配置Kafka JSON Catalog。
- 不支持通过DDL语句修改已有的Kafka JSON Catalog。
- 仅支持查询数据表,不支持创建、修改和删除数据库和表。
- Kafka JSON Catalog不支持读取或写入开启了SSL或SASL认证的Kafka。
- Kafka JSON Catalog提供的表可以直接作为Flink SQL作业中的源表,不支持作为结果表和Lookup维表。
说明 建议VVR 4.x版本用户升级作业至VVR 6.0.2及以上版本后使用Kafka JSON Catalog。
配置Kafka JSON Catalog
查看Kafka JSON Catalog
使用Kafka JSON Catalog
- 作为源表,从Kafka Topic中读取数据。
INSERT INTO ${other_sink_table} SELECT ... FROM `${kafka_catalog}`.`${db_name}`.`${topic_name}` /*+ OPTIONS('scan.startup.mode'='earliest-offset') */;
说明 如果Kafka JSON Catalog的表使用时需要指定其他WITH参数,则建议使用SQL Hints的方式来添加其他参数。例如,如上SQL使用了SQL Hints指定从最早的数据开始消费。其他参数详情请参见消息队列Kafka源表和消息队列Kafka结果表。 - 作为源表,使用CREATE TABLE AS(CTAS)语句将Kafka Topic中的数据同步至目标表中。
- 单表同步,实时同步数据。
CREATE TABLE IF NOT EXISTS `${target_table_name}` WITH (...) AS TABLE `${kafka_catalog}`.`${db_name}`.`${topic_name}` /*+ OPTIONS('scan.startup.mode'='earliest-offset') */;
- 在一个作业中同步多张表。
BEGIN STATEMENT SET; CREATE TABLE IF NOT EXISTS `some_catalog`.`some_database`.`some_table0` AS TABLE `kafka-catalog`.`kafka`.`topic0` /*+ OPTIONS('scan.startup.mode'='earliest-offset') */; CREATE TABLE IF NOT EXISTS `some_catalog`.`some_database`.`some_table1` AS TABLE `kafka-catalog`.`kafka`.`topic1` /*+ OPTIONS('scan.startup.mode'='earliest-offset') */; CREATE TABLE IF NOT EXISTS `some_catalog`.`some_database`.`some_table2` AS TABLE `kafka-catalog`.`kafka`.`topic2` /*+ OPTIONS('scan.startup.mode'='earliest-offset') */; END;
结合Kafka JSON Catalog,您可以在同一个任务中同步多张Kafka表。但需要满足以下条件:- 所有Kafka表均未配置topic-pattern参数。
- 每张表关于Kafka的配置必须完全相同,即properties.*配置的属性完全相同,包括properties.bootstrap.servers和properties.group.id。
- 每张表的 scan.startup.mode配置必须完全相同,且只能配置为group-offsets、latest-offset或earliest-offset,不能配置为其他值。
例如,下图中上面两张表满足条件,下面两张表违反了以上三个条件。
- 单表同步,实时同步数据。
说明 完整的端到端的Kafka JSON Catalog使用示例详情请参见日志实时入仓快速入门和使用Flink读取kafka Catalog源表数据后写入下游Hologres表。
删除Kafka JSON Catalog
重要 删除Kafka JSON Catalog不会影响已运行的作业,但会导致使用该Catalog下表的作业,在上线或重启时报无法找到该表的错误,请您谨慎操作。
从Kafka JSON Catalog获取的表信息详解
为了方便使用Kafka JSON Catalog获取的表,Kafka JSON Catalog会在推导的表上添加默认的配置参数、元数据和主键信息。Kafka JSON
Catalog获取的表的详细信息如下:
- Kafka表的Schema推导
Kafka JSON Catalog在解析JSON格式消息获取Topic的Schema时,Catalog会尝试消费最多max.fetch.records条消息,解析每条数据的Schema,解析规则与Kafka作为CTAS数据源时的基本规则相同,再将这些Schema合并作为最终的Schema。Schema主要包含以下几个部分:
- 推导的物理列(Physical Columns)
Kafka JSON Catalog会从Kafka消息的消息键(Key)和消息体(Value)推导出消息的物理列,列名添加对应的前缀。
如果消息键不为空但解析失败,会返回列名为key.fields-prefix前缀和infer-schema.parse-key-error.field-name参数配置值的拼接结果,类型为VARBINARY的列。
当拉取到一组Kafka消息后,Catalog会逐条解析Kafka消息并按以下规则合并解析出的物理列,从而作为整个Topic的Schema。合并规则如下:- 如果解析出的物理列中包含结果Schema中没有的字段,则Kafka JSON Catalog会自动将这些字段加入到结果Schema。
- 如果两者出现了同名列,则按照以下场景进行处理:
- 当类型相同且精度不同时,会取两者中较大的精度的类型。
- 当类型不同时,会按照如下图的树型结构找到最小父节点,作为该同名列的类型。但当Decimal和Float类型合并时,为了保留精度会合并为Double类型。
例如,对于下面包含三条数据的一个Kafka topic,Kafka JSON Catalog得到的Schema如下图所示。 - 默认添加的元数据列(Metadata Column)
Kafka JSON Catalog会默认添加partition,offset和timestamp三个有用的元数据列。详情如下表所示。
元数据名 列名称 类型 说明 partition partition INT NOT NULL 分区值。 offset offset BIGINT NOT NULL 偏移量。 timestamp timestamp TIMESTAMP_LTZ(3) NOT NULL 消息时间戳。 - 默认添加的主键约束
从Kafka JSON Catalog获取的表,在作为源表消费时,会默认把元数据列partition和offset列作为主键,确保数据不重复。
- 推导的物理列(Physical Columns)
- 默认添加的表参数
参数 说明 备注 connector Connector类型。 固定值为kafka或upsert-kafka。 topic 对应的Topic名称。 声明的表名。 properties.bootstrap.servers Kafka Broker地址。 对应Catalog的properties.bootstrap.servers参数配置值。 value.format Flink Kafka Connector在序列化或反序列化Kafka的消息体(Value)时使用的格式。 固定值为json。 value.fields-prefix 为所有Kafka消息体(Value)指定自定义前缀,以避免与消息键(Key)或Metadata字段重名。 对应Catalog的value.fields-prefix参数配置值。 value.json.infer-schema.flatten-nested-columns.enable Kafka消息体(Value)是否递归式地展开JSON中的嵌套列。 对应Catalog的infer-schema.flatten-nested-columns.enable参数配置值。 value.json.infer-schema.primitive-as-string Kafka消息体(Value)是否推导所有基本类型为String类型。 对应Catalog的infer-schema.primitive-as-string参数配置值。 value.fields-include 定义消息体在处理消息键字段时的策略。 固定值为EXCEPT_KEY。表示消息体中不包含消息键的字段。 消息键(Key)不为空时配置该参数,消息键(Key)为空时不配置该参数。
key.format Flink Kafka Connector在序列化/反序列化Kafka的消息键(Key)时使用的格式。 固定值为json或raw。 消息键(Key)不为空时配置该参数,消息键(Key)为空时不配置该参数。
当消息键(Key)不为空但解析失败时,配置为raw;解析成功时,配置为json。
key.fields-prefix 为所有Kafka消息键(Key)指定自定义前缀,以避免与消息体(Value)格式字段重名。 对应Catalog的key.fields-prefix参数配置值。 消息键(Key)不为空时配置该参数,消息键(Key)为空时不配置该参数。
key.fields Kafka消息键(Key)解析出来的数据存放的字段。 自动填写解析出来的Key字段列表。 消息键(Key)不为空且不是Upsert Kafka表时配置该参数,否则不配置该参数。