配置Kafka JSON Catalog后,您可以在Flink全托管作业开发中直接访问Kafka集群中格式为JSON的Topic,无需再定义Schema。本文为您介绍如何在Flink全托管模式下配置、查看及删除Kafka JSON Catalog。

背景信息

Kafka JSON Catalog通过自动解析JSON格式的消息来推导Topic的Schema,您无需在Flink SQL中声明Kafka表的Schema便可以获取消息的具体字段信息。Kafka JSON Catalog具有以下功能特点:
  • Kafka JSON Catalog的表名对应Kafka Topic名,无需再通过DDL语句手动注册Kafka表,提升开发效率和正确性。
  • Kafka JSON Catalog提供的表可以直接作为Flink SQL作业中的源表使用。
  • Kafka JSON Catalog可以配合CREATE TABLE AS(CTAS)语句完成Schema变更的数据同步。
本文将从以下方面为您介绍如何管理Kafka JSON Catalog:

使用限制

  • Kafka JSON Catalog仅支持消息格式为JSON的Topic,暂不支持其它格式。
  • 仅Flink计算引擎VVR 6.0.2及以上版本支持配置Kafka JSON Catalog。
  • 不支持通过DDL语句修改已有的Kafka JSON Catalog。
  • 仅支持查询数据表,不支持创建、修改和删除数据库和表。
  • Kafka JSON Catalog不支持读取或写入开启了SSL或SASL认证的Kafka。
  • Kafka JSON Catalog提供的表可以直接作为Flink SQL作业中的源表,不支持作为结果表和Lookup维表。
说明 建议VVR 4.x版本用户升级作业至VVR 6.0.2及以上版本后使用Kafka JSON Catalog。

配置Kafka JSON Catalog

  1. 登录实时计算管理控制台
  2. Flink全托管页签,单击目标工作空间操作列下的控制台
  3. 在左侧导航栏,单击作业开发
  4. 在页面左上角,单击新建,文件类型选择流作业/SQL
  5. 在文本编辑区域,输入配置Kafka JSON Catalog的命令。

    自建Kafka集群或EMR Kafka集群

    CREATE CATALOG <catalogName> WITH(
     'type'='kafka',
     'properties.bootstrap.servers'='<brokers>',
     'format'='json',
     'default-database'='<dbName>',
     'key.fields-prefix'='<keyPrefix>',
     'value.fields-prefix'='<valuePrefix>',
     'timestamp-format.standard'='<timestampFormat>',
     'infer-schema.flatten-nested-columns.enable'='<flattenNestedColumns>',
     'infer-schema.primitive-as-string'='<primitiveAsString>',
     'infer-schema.parse-key-error.field-name='<parseKeyErrorFieldName>',
     'infer-schema.compacted-topic-as-upsert-table'='true',
     'max.fetch.records'='100'
    );

    阿里云消息队列Kafka版

    CREATE CATALOG <catalogName> WITH(
     'type'='kafka',
     'properties.bootstrap.servers'='<brokers>',
     'format'='json',
     'default-database'='<dbName>',
     'key.fields-prefix'='<keyPrefix>',
     'value.fields-prefix'='<valuePrefix>',
     'timestamp-format.standard'='<timestampFormat>',
     'infer-schema.flatten-nested-columns.enable'='<flattenNestedColumns>',
     'infer-schema.primitive-as-string'='<primitiveAsString>',
     'infer-schema.parse-key-error.field-name='<parseKeyErrorFieldName>',
     'infer-schema.compacted-topic-as-upsert-table'='true',
     'max.fetch.records'='100',
     'aliyun.kafka.accessKeyId'='<aliyunAccessKeyId>',
     'aliyun.kafka.accessKeySecret'='<aliyunAccessKeySecret>',
     'aliyun.kafka.instanceId'='<aliyunKafkaInstanceId>',
     'aliyun.kafka.endpoint'='<aliyunKafkaEndpoint>',
     'aliyun.kafka.regionId'='<aliyunKafkaRegionId>'
    );
    参数 类型 说明 是否必填 备注
    catalogName String Kafka JSON Catalog名称。 请填写为自定义的英文名。
    type String Catalog类型。 固定值为kafka。
    properties.bootstrap.servers String Kafka Broker地址。 格式为host1:port1,host2:port2,host3:port3

    以英文逗号(,)分割。

    format String Kafka消息格式。 目前只支持配置为json。Flink会解析JSON格式的Kafka消息,来获取Schema。
    default-database String Kafka集群名称。 默认值为kafka。Catalog要求三层结构定位一张表,即catalog_name.db_name.table_name。此处是配置默认的db_name,由于Kafka没有Database的概念,您可以在此处使用任意字符串指代Kafka集群作为database的定义。
    key.fields-prefix String 自定义添加到消息键(Key)解析出字段名称的前缀,来避免Kafka消息键解析后的命名冲突问题。 默认值为key_。例如,如果您的key字段名为a,则系统默认解析key后的字段名称为key_a。
    说明 key.fields-prefix的配置值不可以是value.fields-prefix的配置值的前缀。例如value.fields-prefix配置为test1_value_,则key.fields-prefix不可以配置为test1_。
    value.fields-prefix String 自定义添加到消息体(Value)解析出字段名称的前缀,来避免Kafka消息体解析后的命名冲突问题。 默认值为value_。例如,如果您的value字段名为b,则系统默认解析value后的字段名称为value_b。
    说明 value.fields-prefix的配置值不可以是key.fields-prefix的配置值的前缀。例如key.fields-prefix配置为test2_value_,则value.fields-prefix不可以配置为test2_。
    timestamp-format.standard String 解析JSON格式消息中Timestamp类型字段的格式,首先尝试通过您配置的格式去解析,解析失败后再自动尝试使用其他格式解析。 可配置的值有以下两种:
    • SQL(默认值)
    • ISO-8601
    infer-schema.flatten-nested-columns.enable Boolean 解析JSON格式消息体(Value)时,是否递归式地展开JSON中的嵌套列。 参数取值如下:
    • true:递归式展开。

      对于被展开的列,Flink使用索引该值的路径作为名字。例如,对于{"nested": {"col": true}} 中的列col,它展开后的名字为nested.col。

      说明 设置为true时,建议和CREATE TABLE AS(CTAS)语句配合使用,目前暂不支持其它DML语句自动展开嵌套列。
    • false(默认值):将嵌套类型当作String处理。
    infer-schema.primitive-as-string Boolean 解析JSON格式消息体(Value)时,是否推导所有基本类型为String类型。 参数取值如下:
    • true:推导所有基本类型为String。
    • false(默认值):按照基本规则进行推导。
    infer-schema.parse-key-error.field-name String 解析JSON格式消息键(Key)时,如果消息键不为空,且解析失败,会添加key.fields-prefix前缀拼接此配置项的值为列名,类型为VARBINARY的字段到表Schema,表示消息键部分的数据。 默认值为col。如:消息体解析出的字段为value_name,消息键不为空但解析失败,则默认返回的Schema包含两个字段:key_col,value_name。
    infer-schema.compacted-topic-as-upsert-table Boolean 当Kafka topic的日志清理策略为compact且消息键(Key)不为空时,是否作为Upsert Kafka表使用。 默认值为true。使用CTAS或CDAS语法同步数据到阿里云消息队列Kafka版时需要配置为true。
    说明 仅实时计算引擎VVR 6.0.2及以上版本支持该参数。
    max.fetch.records Int 解析JSON格式消息时,最多尝试消费的消息数量。 默认值为100。
    aliyun.kafka.accessKeyId String 阿里云账号AccessKey ID,详情请参见创建AccessKey 使用CTAS或CDAS语法同步数据到阿里云消息队列Kafka版时需要配置。
    说明 仅实时计算引擎VVR 6.0.2及以上版本支持该参数。
    aliyun.kafka.accessKeySecret String 阿里云账号AccessKey Secret,详情请参见创建AccessKey 使用CTAS或CDAS语法同步数据到阿里云消息队列Kafka版时需要配置。
    说明 仅实时计算引擎VVR 6.0.2及以上版本支持该参数。
    aliyun.kafka.instanceId String 阿里云Kafka消息队列实例ID,可在消息队列Kafka实例详情界面查看。 使用CTAS或CDAS语法同步数据到阿里云消息队列Kafka版时需要配置。
    说明 仅实时计算引擎VVR 6.0.2及以上版本支持该参数。
    aliyun.kafka.endpoint String 阿里云Kafka API服务接入地址,详情请参见服务接入点 使用CTAS或CDAS语法同步数据到阿里云消息队列Kafka版时需要配置。
    说明 仅实时计算引擎VVR 6.0.2及以上版本支持该参数。
    aliyun.kafka.regionId String Topic所在实例的地域ID,详情请参见服务接入点 使用CTAS或CDAS语法同步数据到阿里云消息队列Kafka版时需要配置。
    说明 仅实时计算引擎VVR 6.0.2及以上版本支持该参数。
  6. 单击执行
    执行完会提示Query has been executed

查看Kafka JSON Catalog

  1. 登录实时计算管理控制台
  2. Flink全托管页签,单击目标工作空间操作列下的控制台
  3. 在左侧导航栏,单击作业开发
  4. 在页面左上角,单击新建,文件类型选择流作业/SQL
  5. 在文本编辑区域,输入以下命令。
    DESCRIBE `${catalog_name}`.`${db_name}`.`${topic_name}`;
    参数 说明
    ${catalog_name} Kafka JSON Catalog名称。
    ${db_name} Kafka集群名称。
    ${topic_name} Kafka Topic名称。
  6. 单击执行
    执行成功后,可以在运行结果中查看表的具体信息。表信息

使用Kafka JSON Catalog

  • 作为源表,从Kafka Topic中读取数据。
    INSERT INTO ${other_sink_table}
    SELECT ...
    FROM `${kafka_catalog}`.`${db_name}`.`${topic_name}` /*+ OPTIONS('scan.startup.mode'='earliest-offset') */;
    说明 如果Kafka JSON Catalog的表使用时需要指定其他WITH参数,则建议使用SQL Hints的方式来添加其他参数。例如,如上SQL使用了SQL Hints指定从最早的数据开始消费。其他参数详情请参见消息队列Kafka源表消息队列Kafka结果表
  • 作为源表,使用CREATE TABLE AS(CTAS)语句将Kafka Topic中的数据同步至目标表中。
    • 单表同步,实时同步数据。
      CREATE TABLE IF NOT EXISTS `${target_table_name}`
      WITH (...)
      AS TABLE `${kafka_catalog}`.`${db_name}`.`${topic_name}`
      /*+ OPTIONS('scan.startup.mode'='earliest-offset') */;
    • 在一个作业中同步多张表。
      BEGIN STATEMENT SET;
      
      CREATE TABLE IF NOT EXISTS `some_catalog`.`some_database`.`some_table0`
      AS TABLE `kafka-catalog`.`kafka`.`topic0`
      /*+ OPTIONS('scan.startup.mode'='earliest-offset') */;
      
      CREATE TABLE IF NOT EXISTS `some_catalog`.`some_database`.`some_table1`
      AS TABLE `kafka-catalog`.`kafka`.`topic1`
      /*+ OPTIONS('scan.startup.mode'='earliest-offset') */;
      
      CREATE TABLE IF NOT EXISTS `some_catalog`.`some_database`.`some_table2`
      AS TABLE `kafka-catalog`.`kafka`.`topic2`
      /*+ OPTIONS('scan.startup.mode'='earliest-offset') */;
      
      END;
      结合Kafka JSON Catalog,您可以在同一个任务中同步多张Kafka表。但需要满足以下条件:
      • 所有Kafka表均未配置topic-pattern参数。
      • 每张表关于Kafka的配置必须完全相同,即properties.*配置的属性完全相同,包括properties.bootstrap.servers和properties.group.id。
      • 每张表的 scan.startup.mode配置必须完全相同,且只能配置为group-offsets、latest-offset或earliest-offset,不能配置为其他值。
      例如,下图中上面两张表满足条件,下面两张表违反了以上三个条件。示例
说明 完整的端到端的Kafka JSON Catalog使用示例详情请参见日志实时入仓快速入门使用Flink读取kafka Catalog源表数据后写入下游Hologres表

删除Kafka JSON Catalog

重要 删除Kafka JSON Catalog不会影响已运行的作业,但会导致使用该Catalog下表的作业,在上线或重启时报无法找到该表的错误,请您谨慎操作。
  1. 登录实时计算管理控制台
  2. Flink全托管页签,单击目标工作空间操作列下的控制台
  3. 在左侧导航栏,单击作业开发
  4. 在页面左上角,单击新建,文件类型选择流作业/SQL
  5. 在文本编辑区域,输入以下命令。
    DROP CATALOG ${catalog_name};

    其中${catalog_name}为您要删除的目标Kafka JSON Catalog名称。

  6. 单击执行
    执行完会提示Query has been executed

从Kafka JSON Catalog获取的表信息详解

为了方便使用Kafka JSON Catalog获取的表,Kafka JSON Catalog会在推导的表上添加默认的配置参数、元数据和主键信息。Kafka JSON Catalog获取的表的详细信息如下:
  • Kafka表的Schema推导
    Kafka JSON Catalog在解析JSON格式消息获取Topic的Schema时,Catalog会尝试消费最多max.fetch.records条消息,解析每条数据的Schema,解析规则与Kafka作为CTAS数据源时的基本规则相同,再将这些Schema合并作为最终的Schema。Schema主要包含以下几个部分:
    • 推导的物理列(Physical Columns)

      Kafka JSON Catalog会从Kafka消息的消息键(Key)和消息体(Value)推导出消息的物理列,列名添加对应的前缀。

      如果消息键不为空但解析失败,会返回列名为key.fields-prefix前缀和infer-schema.parse-key-error.field-name参数配置值的拼接结果,类型为VARBINARY的列。

      当拉取到一组Kafka消息后,Catalog会逐条解析Kafka消息并按以下规则合并解析出的物理列,从而作为整个Topic的Schema。合并规则如下:
      • 如果解析出的物理列中包含结果Schema中没有的字段,则Kafka JSON Catalog会自动将这些字段加入到结果Schema。
      • 如果两者出现了同名列,则按照以下场景进行处理:
        • 当类型相同且精度不同时,会取两者中较大的精度的类型。
        • 当类型不同时,会按照如下图的树型结构找到最小父节点,作为该同名列的类型。但当Decimal和Float类型合并时,为了保留精度会合并为Double类型。Schema合并
      例如,对于下面包含三条数据的一个Kafka topic,Kafka JSON Catalog得到的Schema如下图所示。Schema
    • 默认添加的元数据列(Metadata Column)
      Kafka JSON Catalog会默认添加partition,offset和timestamp三个有用的元数据列。详情如下表所示。
      元数据名 列名称 类型 说明
      partition partition INT NOT NULL 分区值。
      offset offset BIGINT NOT NULL 偏移量。
      timestamp timestamp TIMESTAMP_LTZ(3) NOT NULL 消息时间戳。
    • 默认添加的主键约束

      从Kafka JSON Catalog获取的表,在作为源表消费时,会默认把元数据列partition和offset列作为主键,确保数据不重复。

  • 默认添加的表参数
    参数 说明 备注
    connector Connector类型。 固定值为kafka或upsert-kafka。
    topic 对应的Topic名称。 声明的表名。
    properties.bootstrap.servers Kafka Broker地址。 对应Catalog的properties.bootstrap.servers参数配置值。
    value.format Flink Kafka Connector在序列化或反序列化Kafka的消息体(Value)时使用的格式。 固定值为json。
    value.fields-prefix 为所有Kafka消息体(Value)指定自定义前缀,以避免与消息键(Key)或Metadata字段重名。 对应Catalog的value.fields-prefix参数配置值。
    value.json.infer-schema.flatten-nested-columns.enable Kafka消息体(Value)是否递归式地展开JSON中的嵌套列。 对应Catalog的infer-schema.flatten-nested-columns.enable参数配置值。
    value.json.infer-schema.primitive-as-string Kafka消息体(Value)是否推导所有基本类型为String类型。 对应Catalog的infer-schema.primitive-as-string参数配置值。
    value.fields-include 定义消息体在处理消息键字段时的策略。 固定值为EXCEPT_KEY。表示消息体中不包含消息键的字段。

    消息键(Key)不为空时配置该参数,消息键(Key)为空时不配置该参数。

    key.format Flink Kafka Connector在序列化/反序列化Kafka的消息键(Key)时使用的格式。 固定值为json或raw。

    消息键(Key)不为空时配置该参数,消息键(Key)为空时不配置该参数。

    当消息键(Key)不为空但解析失败时,配置为raw;解析成功时,配置为json。

    key.fields-prefix 为所有Kafka消息键(Key)指定自定义前缀,以避免与消息体(Value)格式字段重名。 对应Catalog的key.fields-prefix参数配置值。

    消息键(Key)不为空时配置该参数,消息键(Key)为空时不配置该参数。

    key.fields Kafka消息键(Key)解析出来的数据存放的字段。 自动填写解析出来的Key字段列表。

    消息键(Key)不为空且不是Upsert Kafka表时配置该参数,否则不配置该参数。