本文为您介绍消息队列Kafka源表的DDL定义、WITH参数、元信息列和示例。
什么是Kafka源表
消息队列Kafka版是阿里云提供的分布式、高吞吐、可扩展的消息队列服务。消息队列Kafka版广泛用于日志收集、监控数据聚合、流式数据处理、在线和离线分析等大数据领域。
前提条件
使用限制
- 仅Flink计算引擎VVR 2.0.0及以上版本支持消息队列Kafka Connector。
- 消息队列Kafka Connector仅支持读取kafka 0.10及以上版本的数据。
- 消息队列Kafka Connector仅支持Kafka 2.4版本的消费者配置项,详情请参见消费者配置项。
- 仅Flink计算引擎vvr-4.0.12-flink-1.13及以上版本支持Kafka作为CTAS的同步数据源。
- 仅JSON format支持类型推导和Schema变更,其它format暂不支持。
- 在Kafka作为CTAS语句的数据源时,仅支持将JSON变更同步到Hudi和Hologres结果表。
- 仅支持Kafka中value部分的类型推导和表结构变更。如果您需要同步Kafka key部分的列,则需要您手动在DDL中进行指定。详情请参见示例三。
DDL定义
以下为创建Kafka源表的DDL示例,消息格式为CSV,包含5个字段。
CREATE TABLE kafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`category_id` BIGINT,
`behavior` STRING,
`topic` STRING METADATA VIRTUAL,
`partition` BIGINT METADATA VIRTUAL
) WITH (
'connector' = 'kafka',
'topic' = 'my_excellent_topic',
'properties.bootstrap.servers' = 'mykafka:9092',
'properties.group.id' = 'my_excellent_group'
'format' = 'csv',
'scan.startup.mode' = 'earliest-offset'
)
在实际使用中请根据实际情况配置字段名和WITH参数。元信息列
您可以在源表中定义元信息列,以获取Kafka消息的元信息。例如,当WITH参数中定义了多个topic时,如果在Kafka源表中定义了元信息列,那么Flink读取到的数据就会被标识是从哪个topic中读取的数据。
Key | 数据类型 | 说明 |
---|---|---|
topic | STRING NOT NULL METADATA VIRTUAL | Kafka消息所在的Topic名称。 |
partition | INT NOT NULL METADATA VIRTUAL | Kafka消息所在的Partition ID。 |
headers | MAP<STRING, BYTES> NOT NULL METADATA VIRTUAL | Kafka消息的消息头(header)。 |
leader-epoch | INT NOT NULL METADATA VIRTUAL | Kafka消息的Leader epoch。 |
offset | BIGINT NOT NULL METADATA VIRTUAL | Kafka消息的偏移量(offset)。 |
timestamp | TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL METADATA VIRTUAL | Kafka消息的时间戳。 |
timestamp-type | STRING NOT NULL METADATA VIRTUAL | Kafka消息的时间戳类型:
|
说明
- 仅VVR 3.0.0及以后版本支持定义元信息列。
- 如果您需要将消息队列Kafka作为结果表,且忽略只读元信息列时,则在源表中定义元信息列时,必须声明这些元信息列为VIRTUAL。
作为CTAS数据源
CTAS语句支持将消息队列Kafka,且format为JSON的表作为数据源。在数据同步过程中,如果某些字段并未出现在预定义的表结构中,Flink会尝试去自动推导该列的类型。如果自动推导的类型不能满足您的使用需求,您也可以通过辅助推导的方式对某些列的解析类型进行声明。关于类型推导和嵌套类型的处理方式,详情如下:
说明 关于JSON Format的详细描述,详情请参见JSON Format。
在类型推导过程中,Flink默认只会展开JSON文本中的第一层数据,根据其类型和数值,按照基本规则进行类型推导。当然您也可以在DDL中声明特定的列的解析类型以满足您的特殊需要,即辅助推导。基本规则和辅助推导详情如下:
- 基本规则
类型映射基本规则如下表所示。
JSON类型 Flink SQL类型 BOOLEAN BOOLEAN STRING DATE、TIMESTAMP、TIMESTAMP_LTZ、TIME或STRING INT或LONG BIGINT BIGINT DECIMAL或STRING 说明 由于Flink中DECIMAL的类型是有精度限制的。因此,如果整数的实际取值超过了DECIMAL类型最大精度,Flink会自动推导其类型为STRING,避免精度的损失。FLOAT、DOUBLE或BIG DECIMAL DOUBLE ARRAY STRING OBJECT STRING 示例- JSON文本
{ "id": 101, "name": "VVP", "properties": { "owner": "阿里云", "engine": "Flink" } "type": ["大数据"] }
- Flink写入到下游存储的表信息
id name properties type 101 "VVP" { "owner": "阿里云", "engine": "Flink" }
["大数据"]
- JSON文本
- 辅助推导
如果您觉得以上基本规则不符合您实际需要,则您可以在源表的DDL中声明特定列的解析类型。通过该方式,Flink会优先使用您声明的列类型去解析目标字段。针对以下示例,Flink会使用DECIMAL的方式去解析price字段,而不是使用默认的基本规则将其转换为DOUBLE类型。
CREATE TABLE evolvingKafkaSource ( price DECIMAL(18, 2) ) WITH ( 'connector' = 'kafka', 'properties.bootstrap.servers' = 'localhost:9092', 'topic' = 'evolving_kafka_demo', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json' );
但是,如果您在DDL中指定的类型和实际数据中的类型不一致时,则可以按照以下方式进行处理:- 在声明的类型比实际类型更宽泛时,以声明的类型自动去解析。
例如,声明为DOUBLE,遇到的数据类型为BIGINT,则会以DOUBLE类型去解析。
- 在实际的类型比声明的类型更为宽泛或者两种类型不兼容时,由于当前CTAS不支持类型变更,因此会报错提示您相关信息,您需要重新启动作业并声明准确的类型去解析数据。
对于类型的宽泛的程度以及兼容性,可以总结为下图。
上图表示越靠近根节点,其类型越宽泛。如果两个类型在不同的分支上,则表示这两个类型不兼容。
说明- 不支持辅助推导复杂类型,包括ROW、ARRAY、MAP和MULTISET。
- 对于复杂类型,Flink在默认情况下会处理为STRING。
- 在声明的类型比实际类型更宽泛时,以声明的类型自动去解析。
通常,Kafka Topic中的JSON文本带有嵌套结构。如果您需要提取JSON文本中的嵌套列,则可以通过以下两种方式:
- 在源表DDL中声明'json.infer-schema.flatten-nested-columns.enable'='true',来展开嵌套列中的所有元素至顶层。
通过该方式,所有的嵌套列都会被依次展开。为了避免列名冲突,Flink采用索引到该列的路径作为展开后列名字。说明 目前不支持解决列名冲突。如果发生列名冲突,请在源表的DDL中声明json.ignore-parse-errors为true,来忽略存在冲突的数据。示例
- JSON文本
{ "nested": { "inner": { "col": true } } }
- Flink写入到下游存储的表信息
neseted.inner.col true
- JSON文本
- 在DDL中CTAS语法中添加计算列 `rowkey` AS JSON_VALUE(`properties`, `$.rowkey`),来指定要展开的列。
详情请参见示例四。
WITH参数
参数 | 说明 | 是否必选 | 数据类型 | 备注 |
---|---|---|---|---|
connector | 源表类型。 | 是 | String | 固定值为kafka 。
|
topic | topic名称。 | 是 | String | 以分号 (;) 分隔多个topic名称,例如topic-1;topic-2 。
注意 topic和topic-pattern两个选项只能指定其中一个。
|
topic-pattern | 匹配读取topic名称的正则表达式。所有匹配该正则表达式的topic在作业运行时均会被订阅。 | 否 | String |
注意
|
properties.bootstrap.servers | Kafka Broker地址。 | 是 | String | 格式为host:port,host:port,host:port ,以英文逗号(,)分割。 |
properties.group.id | Kafka消费组ID。 | 是 | String | 无。 |
properties.* | Kafka配置。 | 否 | String | 后缀名必须匹配为Kafka官方文档中定义的配置。Flink会将properties.前缀移除,并将剩余的配置传递给Kafka客户端。例如可以通过 'properties.allow.auto.create.topics' = 'false' 来禁用自动创建topic。
不建议通过以上方式修改'key.deserializer'和'value.deserializer'参数,因为它们会被kafka配置覆盖。 |
format | Flink Kafka Connector在反序列化来自Kafka的消息体(value)时使用的格式。 | 是 | String | 取值如下:
说明
|
value.format | Flink Kafka Connector在反序列化来自Kafka的消息体(value)时使用的格式。 | 是 | String | 取值如下:
说明
|
key.format | 反序列化Kafka消息键(key)时使用的格式。 | 否 | String | 取值如下:
说明
|
key.fields | Kafka消息键(key)解析出来的数据存放的字段。 | 否 | String | 多个字段名以分号(;)分隔。例如field1;field2 。默认不配置该参数,因此key不会被解析,key数据将被丢弃。
说明 仅VVR 3.0.0及以后版本支持该参数。
|
key.fields-prefix | 为所有Kafka消息键(Key)指定自定义前缀,以避免与消息体(Value)格式字段重名。 | 否 | String | 默认情况下前缀为空。如果定义了前缀,表结构和配置项key.fields都需要使用带前缀的名称。
当构建消息键字段时,前缀会被移除,将使用无前缀的名称。 注意
|
value.fields-include | 在解析消息体时,是否要包含消息键字段。 | 否 | String | 取值如下:
说明 仅VVR 3.0.0及以后版本支持该参数。
|
scan.startup.mode | Kafka读取数据的启动位点。 | 否 | String | 取值如下:
|
scan.startup.specific-offsets | 在specific-offsets启动模式下,指定每个分区的启动偏移量。 | 否 | String | 例如:partition:0,offset:42;partition:1,offset:300 |
scan.startup.timestamp-millis | 在timestamp启动模式下,指定启动位点时间戳。 | 否 | Long | 单位为毫秒。 |
value.fields-prefix | 为所有Kafka消息体(Value)指定自定义前缀,以避免与消息键(Key)或Metadata字段重名。 | 否 | String | 如果定义了前缀,表结构需要使用带前缀的名称。当构建消息键字段时,前缀会被移除,将使用无前缀的名称。
说明 仅Flink计算引擎VVR 4.0.12及之后版本支持该参数。
默认情况下前缀为空。 |
json.infer-schema.flatten-nested-columns.enable | 是否递归式地展开JSON中的嵌套列。 | 否 | Boolean | 参数取值如下:
说明 该参数仅在Kafka作为CTAS数据同步的数据源时生效。
|
json.infer-schema.primitive-as-string | 是否推导所有基本类型为String类型。 | 否 | Boolean | 参数取值如下:
说明 该参数仅在Kafka作为CTAS数据同步的数据源时生效。
|
Kafka Consumer配置参数详情请参见Kafka官网消费者配置项列表。如果您还需要直接配置Connector使用的Kafka Consumer,可以在Kafka Consumer配置参数前添加
properties
前缀,并将该Kafka Consumer配置信息追加至WITH参数。例如Kafka集群需要SASL(Simple Authentication and Security
Layer)认证。CREATE TABLE kafkaTable (
...
) WITH (
...
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.mechanism' = 'PLAIN',
'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="USERNAME" password="PASSWORD";'
);
说明 仅Per-Job集群支持Kafka认证。
代码示例
- 示例一:从Kafka中读取数据后插入Kafka。
从名称为source的Topic中读取Kafka数据,再写入名称为sink的Topic,数据使用CSV格式。
CREATE TEMPORARY TABLE Kafka_source ( id INT, name STRING, age INT ) WITH ( 'connector' = 'kafka', 'topic' = 'source', 'properties.bootstrap.servers' = '<yourKafkaBrokers>', 'properties.group.id' = '<yourKafkaConsumerGroupId>', 'format' = 'csv' ); CREATE TEMPORARY TABLE Kafka_sink ( id INT, name STRING, age INT ) WITH ( 'connector' = 'kafka', 'topic' = 'sink', 'properties.bootstrap.servers' = '<yourKafkaBrokers>', 'properties.group.id' = '<yourKafkaConsumerGroupId>', 'format' = 'csv' ); INSERT INTO Kafka_sink SELECT id, name, age FROM Kafka_source;
- 示例二:同步表结构以及数据
将Kafka Topic中的消息实时同步到Hologres中。在该情况下,您可以将Kafka消息的offset和partition id作为主键,从而保证在Failover时,Hologres中不会有重复消息。
CREATE TEMPORARY TABLE kafkaTable ( `offset` INT NOT NULL METADATA, `part` BIGINT NOT NULL METADATA FROM 'partition', PRIMARY KEY (`part`, `offset`) NOT ENFORCED ) WITH ( 'connector' = 'kafka', 'properties.bootstrap.servers' = '<yourKafkaBrokers>', 'topic' = 'kafka_evolution_demo', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json', 'json.infer-schema.flatten-nested-columns.enable' = 'true' -- 可选,将嵌套列全部展开。 ); CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka` WITH ( 'connector' = 'hologres' ) AS TABLE vvp.`default`.kafkaTable;
- 示例三:同步表结构以及Kafka消息的key和value数据。
Kafka消息中的key部分已经存储了相关信息,您可以同时同步Kafka中的key和value。
CREATE TEMPORARY TABLE kafkaTable ( `key_id` INT NOT NULL, `val_name` VARCHAR(200) ) WITH ( 'connector' = 'kafka', 'properties.bootstrap.servers' = '<yourKafkaBrokers>', 'topic' = 'kafka_evolution_demo', 'scan.startup.mode' = 'earliest-offset', 'key.format' = 'json', 'value.format' = 'json', 'key.fields' = 'key_id', 'key.fields-prefix' = 'key_', 'value.fields-prefix' = 'val_', 'value.fields-include' = 'EXCEPT_KEY' ); CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka`( WITH ( 'connector' = 'hologres' ) AS TABLE vvp.`default`.kafkaTable;
说明 Kafka消息中的key部分不支持表结构变更和类型推导,需要您手动声明。 - 示例四:同步表结构和数据并进行计算。
在同步Kafka数据到Hologres时,往往需要一些轻量级的计算。
CREATE TEMPORARY TABLE kafkaTable ( `distinct_id` INT NOT NULL, `properties` STRING, `timestamp` TIMESTAMP METADATA, `date` AS CAST(`timestamp` AS DATE) ) WITH ( 'connector' = 'kafka', 'properties.bootstrap.servers' = '<yourKafkaBrokers>', 'topic' = 'kafka_evolution_demo', 'scan.startup.mode' = 'earliest-offset', 'key.format' = 'json', 'value.format' = 'json', 'key.fields' = 'key_id', 'key.fields-prefix' = 'key_' ); CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka` WITH ( 'connector' = 'hologres' ) AS TABLE vvp.`default`.kafkaTable ADD COLUMN `order_id` AS COALESCE(JSON_VALUE(`properties`, '$.order_id'), 'default'); -- 使用COALESCE处理空值情况。