本文为您介绍消息队列Kafka源表的DDL定义、WITH参数、元信息列和示例。

什么是Kafka源表

消息队列Kafka版是阿里云提供的分布式、高吞吐、可扩展的消息队列服务。消息队列Kafka版广泛用于日志收集、监控数据聚合、流式数据处理、在线和离线分析等大数据领域。

前提条件

使用限制

  • 仅Flink计算引擎VVR 2.0.0及以上版本支持消息队列Kafka Connector。
  • 消息队列Kafka Connector仅支持读取kafka 0.10及以上版本的数据。
  • 消息队列Kafka Connector仅支持Kafka 2.4版本的消费者配置项,详情请参见消费者配置项
  • 仅Flink计算引擎vvr-4.0.12-flink-1.13及以上版本支持Kafka作为CTAS的同步数据源。
  • 仅JSON format支持类型推导和Schema变更,其它format暂不支持。
  • 在Kafka作为CTAS语句的数据源时,仅支持将JSON变更同步到Hudi和Hologres结果表。
  • 仅支持Kafka中value部分的类型推导和表结构变更。如果您需要同步Kafka key部分的列,则需要您手动在DDL中进行指定。详情请参见示例三

DDL定义

以下为创建Kafka源表的DDL示例,消息格式为CSV,包含5个字段。
CREATE TABLE kafkaTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `category_id` BIGINT,
  `behavior` STRING,
  `topic` STRING METADATA VIRTUAL,
  `partition` BIGINT METADATA VIRTUAL
) WITH (
  'connector' = 'kafka',
  'topic' = 'my_excellent_topic',
  'properties.bootstrap.servers' = 'mykafka:9092',
  'properties.group.id' = 'my_excellent_group'
  'format' = 'csv',
  'scan.startup.mode' = 'earliest-offset'
)
在实际使用中请根据实际情况配置字段名和WITH参数。

元信息列

您可以在源表中定义元信息列,以获取Kafka消息的元信息。例如,当WITH参数中定义了多个topic时,如果在Kafka源表中定义了元信息列,那么Flink读取到的数据就会被标识是从哪个topic中读取的数据。
Key 数据类型 说明
topic STRING NOT NULL METADATA VIRTUAL Kafka消息所在的Topic名称。
partition INT NOT NULL METADATA VIRTUAL Kafka消息所在的Partition ID。
headers MAP<STRING, BYTES> NOT NULL METADATA VIRTUAL Kafka消息的消息头(header)。
leader-epoch INT NOT NULL METADATA VIRTUAL Kafka消息的Leader epoch。
offset BIGINT NOT NULL METADATA VIRTUAL Kafka消息的偏移量(offset)。
timestamp TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL METADATA VIRTUAL Kafka消息的时间戳。
timestamp-type STRING NOT NULL METADATA VIRTUAL Kafka消息的时间戳类型:
  • NoTimestampType:消息中没有定义时间戳。
  • CreateTime:消息产生的时间。
  • LogAppendTime:消息被添加到Kafka Broker的时间。
说明
  • 仅VVR 3.0.0及以后版本支持定义元信息列。
  • 如果您需要将消息队列Kafka作为结果表,且忽略只读元信息列时,则在源表中定义元信息列时,必须声明这些元信息列为VIRTUAL。

作为CTAS数据源

CTAS语句支持将消息队列Kafka,且format为JSON的表作为数据源。在数据同步过程中,如果某些字段并未出现在预定义的表结构中,Flink会尝试去自动推导该列的类型。如果自动推导的类型不能满足您的使用需求,您也可以通过辅助推导的方式对某些列的解析类型进行声明。关于类型推导和嵌套类型的处理方式,详情如下:
说明 关于JSON Format的详细描述,详情请参见JSON Format
在类型推导过程中,Flink默认只会展开JSON文本中的第一层数据,根据其类型和数值,按照基本规则进行类型推导。当然您也可以在DDL中声明特定的列的解析类型以满足您的特殊需要,即辅助推导。基本规则和辅助推导详情如下:
  • 基本规则
    类型映射基本规则如下表所示。
    JSON类型 Flink SQL类型
    BOOLEAN BOOLEAN
    STRING DATE、TIMESTAMP、TIMESTAMP_LTZ、TIME或STRING
    INT或LONG BIGINT
    BIGINT DECIMAL或STRING
    说明 由于Flink中DECIMAL的类型是有精度限制的。因此,如果整数的实际取值超过了DECIMAL类型最大精度,Flink会自动推导其类型为STRING,避免精度的损失。
    FLOAT、DOUBLE或BIG DECIMAL DOUBLE
    ARRAY STRING
    OBJECT STRING
    示例
    • JSON文本
      {
        "id": 101,
        "name": "VVP",
        "properties": {
          "owner": "阿里云",
          "engine": "Flink"
        }
          "type": ["大数据"]
      }
    • Flink写入到下游存储的表信息
      id name properties type
      101 "VVP"
      {
           "owner": "阿里云",
           "engine": "Flink"
      }
      ["大数据"]
  • 辅助推导
    如果您觉得以上基本规则不符合您实际需要,则您可以在源表的DDL中声明特定列的解析类型。通过该方式,Flink会优先使用您声明的列类型去解析目标字段。针对以下示例,Flink会使用DECIMAL的方式去解析price字段,而不是使用默认的基本规则将其转换为DOUBLE类型。
    CREATE TABLE evolvingKafkaSource (
      price DECIMAL(18, 2)
    ) WITH (
      'connector' = 'kafka',
      'properties.bootstrap.servers' = 'localhost:9092',
      'topic' = 'evolving_kafka_demo',
      'scan.startup.mode' = 'earliest-offset',
      'format' = 'json'
    );
    但是,如果您在DDL中指定的类型和实际数据中的类型不一致时,则可以按照以下方式进行处理:
    • 在声明的类型比实际类型更宽泛时,以声明的类型自动去解析。

      例如,声明为DOUBLE,遇到的数据类型为BIGINT,则会以DOUBLE类型去解析。

    • 在实际的类型比声明的类型更为宽泛或者两种类型不兼容时,由于当前CTAS不支持类型变更,因此会报错提示您相关信息,您需要重新启动作业并声明准确的类型去解析数据。
      对于类型的宽泛的程度以及兼容性,可以总结为下图。宽泛程度上图表示越靠近根节点,其类型越宽泛。如果两个类型在不同的分支上,则表示这两个类型不兼容。
      说明
      • 不支持辅助推导复杂类型,包括ROW、ARRAY、MAP和MULTISET。
      • 对于复杂类型,Flink在默认情况下会处理为STRING。
通常,Kafka Topic中的JSON文本带有嵌套结构。如果您需要提取JSON文本中的嵌套列,则可以通过以下两种方式:
  • 在源表DDL中声明'json.infer-schema.flatten-nested-columns.enable'='true',来展开嵌套列中的所有元素至顶层。
    通过该方式,所有的嵌套列都会被依次展开。为了避免列名冲突,Flink采用索引到该列的路径作为展开后列名字。
    说明 目前不支持解决列名冲突。如果发生列名冲突,请在源表的DDL中声明json.ignore-parse-errors为true,来忽略存在冲突的数据。
    示例
    • JSON文本
      {
        "nested": {
          "inner": {
            "col": true
          }
        }
      }
    • Flink写入到下游存储的表信息
      neseted.inner.col
      true
  • 在DDL中CTAS语法中添加计算列 `rowkey` AS JSON_VALUE(`properties`, `$.rowkey`),来指定要展开的列。

    详情请参见示例四

WITH参数

参数 说明 是否必选 数据类型 备注
connector 源表类型。 String 固定值为kafka
topic topic名称。 String 以分号 (;) 分隔多个topic名称,例如topic-1;topic-2
注意 topic和topic-pattern两个选项只能指定其中一个。
topic-pattern 匹配读取topic名称的正则表达式。所有匹配该正则表达式的topic在作业运行时均会被订阅。 String
注意
  • 仅VVR 3.0.0及以后版本支持该参数。
  • topic和topic-pattern两个选项只能指定其中一个。
properties.bootstrap.servers Kafka Broker地址。 String 格式为host:port,host:port,host:port,以英文逗号(,)分割。
properties.group.id Kafka消费组ID。 String 无。
properties.* Kafka配置。 String 后缀名必须匹配为Kafka官方文档中定义的配置。Flink会将properties.前缀移除,并将剩余的配置传递给Kafka客户端。例如可以通过 'properties.allow.auto.create.topics' = 'false' 来禁用自动创建topic。

不建议通过以上方式修改'key.deserializer''value.deserializer'参数,因为它们会被kafka配置覆盖。

format Flink Kafka Connector在反序列化来自Kafka的消息体(value)时使用的格式。 String 取值如下:
  • csv
  • json
  • avro
  • debezium-json
  • canal-json
  • maxwell-json
  • avro-confluent
  • raw
说明
  • 仅VVR 3.0.0及以后版本支持maxwell-json、avro-confluent和raw格式。
  • format和value.format只能配置其中一个,如果同时配置两个,则会有冲突。
value.format Flink Kafka Connector在反序列化来自Kafka的消息体(value)时使用的格式。 String 取值如下:
  • csv
  • json
  • avro
  • debezium-json
  • canal-json
  • maxwell-json
  • avro-confluent
  • raw
说明
  • 仅VVR 3.0.0及以后版本支持maxwell-json、avro-confluent和raw格式。
  • format和value.format只能配置其中一个,如果同时配置两个,则会有冲突。
key.format 反序列化Kafka消息键(key)时使用的格式。 String 取值如下:
  • csv
  • json
  • avro
  • debezium-json
  • canal-json
  • maxwell-json
  • avro-confluent
  • raw
说明
  • 仅VVR 3.0.0及以后版本支持maxwell-json、avro-confluent和raw格式。
  • 如果指定了key.format参数,则也必须指定key.fields参数。
key.fields Kafka消息键(key)解析出来的数据存放的字段。 String 多个字段名以分号(;)分隔。例如field1;field2。默认不配置该参数,因此key不会被解析,key数据将被丢弃。
说明 仅VVR 3.0.0及以后版本支持该参数。
key.fields-prefix 为所有Kafka消息键(Key)指定自定义前缀,以避免与消息体(Value)格式字段重名。 String 默认情况下前缀为空。如果定义了前缀,表结构和配置项key.fields都需要使用带前缀的名称。

当构建消息键字段时,前缀会被移除,将使用无前缀的名称。

注意
  • 仅VVR 3.0.0及以后版本支持该参数。
  • 该配置项要求必须将value.fields-include配置为EXCEPT_KEY。
value.fields-include 在解析消息体时,是否要包含消息键字段。 String 取值如下:
  • ALL(默认值):所有定义的字段都存放消息体(Value)的解析出来的数据。
  • EXCEPT_KEY:除去key.fields定义字段,剩余的定义字段可以用来存放消息体(Value)解析出来的数据。
说明 仅VVR 3.0.0及以后版本支持该参数。
scan.startup.mode Kafka读取数据的启动位点。 String 取值如下:
  • earliest-offset:从Kafka最早分区开始读取。
  • latest-offset:从Kafka最新位点开始读取。
  • group-offsets(默认值):根据Group读取。
  • timestamp:从Kafka指定时间点读取。

    需要在WITH参数中指定scan.startup.timestamp-millis参数。

  • specific-offsets:从Kafka指定分区指定偏移量读取。

    需要在WITH参数中指定scan.startup.specific-offsets参数。

scan.startup.specific-offsets specific-offsets启动模式下,指定每个分区的启动偏移量。 String 例如:partition:0,offset:42;partition:1,offset:300
scan.startup.timestamp-millis timestamp启动模式下,指定启动位点时间戳。 Long 单位为毫秒。
value.fields-prefix 为所有Kafka消息体(Value)指定自定义前缀,以避免与消息键(Key)或Metadata字段重名。 String 如果定义了前缀,表结构需要使用带前缀的名称。当构建消息键字段时,前缀会被移除,将使用无前缀的名称。
说明 仅Flink计算引擎VVR 4.0.12及之后版本支持该参数。

默认情况下前缀为空。

json.infer-schema.flatten-nested-columns.enable 是否递归式地展开JSON中的嵌套列。 Boolean 参数取值如下:
  • true:递归式展开。

    对于被展开的列,Flink使用索引该值的路径作为名字。例如,对于JSON{"nested": {"col": true}} 中的列col,它展开后的名字为nested.col。

  • false(默认值):将嵌套类型当作String处理。
说明 该参数仅在Kafka作为CTAS数据同步的数据源时生效。
json.infer-schema.primitive-as-string 是否推导所有基本类型为String类型。 Boolean 参数取值如下:
  • true:推导所有基本类型为String。
  • false(默认值):按照基本规则进行推导。
说明 该参数仅在Kafka作为CTAS数据同步的数据源时生效。
Kafka Consumer配置参数详情请参见Kafka官网消费者配置项列表。如果您还需要直接配置Connector使用的Kafka Consumer,可以在Kafka Consumer配置参数前添加properties前缀,并将该Kafka Consumer配置信息追加至WITH参数。例如Kafka集群需要SASL(Simple Authentication and Security Layer)认证。
CREATE TABLE kafkaTable (
    ...
) WITH (
    ...
    'properties.security.protocol' = 'SASL_PLAINTEXT',
    'properties.sasl.mechanism' = 'PLAIN',
    'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="USERNAME" password="PASSWORD";'
);
说明 仅Per-Job集群支持Kafka认证。

代码示例

  • 示例一:从Kafka中读取数据后插入Kafka。
    从名称为source的Topic中读取Kafka数据,再写入名称为sink的Topic,数据使用CSV格式。
    CREATE TEMPORARY TABLE Kafka_source (
      id INT,
      name STRING,
      age INT
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'source',
      'properties.bootstrap.servers' = '<yourKafkaBrokers>',
      'properties.group.id' = '<yourKafkaConsumerGroupId>',
      'format' = 'csv'
    );
    
    CREATE TEMPORARY TABLE Kafka_sink (
      id INT,
      name STRING,
      age INT
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'sink',
      'properties.bootstrap.servers' = '<yourKafkaBrokers>',
      'properties.group.id' = '<yourKafkaConsumerGroupId>',
      'format' = 'csv'
    );
    
    INSERT INTO Kafka_sink SELECT id, name, age FROM Kafka_source;
  • 示例二:同步表结构以及数据
    将Kafka Topic中的消息实时同步到Hologres中。在该情况下,您可以将Kafka消息的offset和partition id作为主键,从而保证在Failover时,Hologres中不会有重复消息。
    CREATE TEMPORARY TABLE kafkaTable (
      `offset` INT NOT NULL METADATA,
      `part` BIGINT NOT NULL METADATA FROM 'partition',
      PRIMARY KEY (`part`, `offset`) NOT ENFORCED
    ) WITH (
      'connector' = 'kafka',
      'properties.bootstrap.servers' = '<yourKafkaBrokers>',
      'topic' = 'kafka_evolution_demo',
      'scan.startup.mode' = 'earliest-offset',
      'format' = 'json',
      'json.infer-schema.flatten-nested-columns.enable' = 'true'
        -- 可选,将嵌套列全部展开。
    );
    
    CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka`
    WITH (
      'connector' = 'hologres'
    ) AS TABLE vvp.`default`.kafkaTable;
  • 示例三:同步表结构以及Kafka消息的key和value数据。
    Kafka消息中的key部分已经存储了相关信息,您可以同时同步Kafka中的key和value。
    CREATE TEMPORARY TABLE kafkaTable (
      `key_id` INT NOT NULL,
      `val_name` VARCHAR(200)
    ) WITH (
      'connector' = 'kafka',
      'properties.bootstrap.servers' = '<yourKafkaBrokers>',
      'topic' = 'kafka_evolution_demo',
      'scan.startup.mode' = 'earliest-offset',
      'key.format' = 'json',
      'value.format' = 'json',
      'key.fields' = 'key_id',
      'key.fields-prefix' = 'key_',
      'value.fields-prefix' = 'val_',
      'value.fields-include' = 'EXCEPT_KEY'
    );
    
    CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka`(
    WITH (
      'connector' = 'hologres'
    ) AS TABLE vvp.`default`.kafkaTable;
    说明 Kafka消息中的key部分不支持表结构变更和类型推导,需要您手动声明。
  • 示例四:同步表结构和数据并进行计算。
    在同步Kafka数据到Hologres时,往往需要一些轻量级的计算。
    CREATE TEMPORARY TABLE kafkaTable (
      `distinct_id` INT NOT NULL,
      `properties` STRING,
      `timestamp` TIMESTAMP METADATA,
      `date` AS CAST(`timestamp` AS DATE)
    ) WITH (
      'connector' = 'kafka',
      'properties.bootstrap.servers' = '<yourKafkaBrokers>',
      'topic' = 'kafka_evolution_demo',
      'scan.startup.mode' = 'earliest-offset',
      'key.format' = 'json',
      'value.format' = 'json',
      'key.fields' = 'key_id',
      'key.fields-prefix' = 'key_'
    );
    
    CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka` WITH (
       'connector' = 'hologres'
    ) AS TABLE vvp.`default`.kafkaTable
    ADD COLUMN
      `order_id` AS COALESCE(JSON_VALUE(`properties`, '$.order_id'), 'default');
    -- 使用COALESCE处理空值情况。

常见问题