本文为您介绍如何使用Upsert Kafka连接器。
背景信息
Upsert Kafka连接器支持以upsert方式从Kafka topic中读取数据并将数据写入Kafka topic。
- 作为源表,此连接器可以将Kafka中存储的数据转换为changelog流,其中每条数据记录代表一个更新或删除事件。更准确地说,数据记录中的value被解释为同一key的最后一个value的UPDATE,如果有这个key,如果不存在相应的key,则该更新被视为INSERT。用表来类比,changelog流中的数据记录被解释为UPSERT,也称为INSERT或UPDATE,因为任何具有相同key的现有行都被覆盖。另外,value为空的消息将会被视作为DELETE消息。
- 作为结果表,此连接器可以消费上游计算逻辑产生的changelog流。它会将INSERT或UPDATE_AFTER数据作为正常的Kafka消息写入,并将DELETE数据以value为空的Kafka消息写入,表示对应key的消息被删除。Flink将根据主键列的值对数据进行分区,从而保证主键上的消息有序,因此同一主键上的更新或删除消息将落在同一分区中。
类别 | 详情 |
---|---|
支持类型 | 源表和结果表 |
运行模式 | 流模式和批模式 |
数据格式 | avro、avro-confluent、canal-json、csv、debezium-json、json、maxwell-json和raw |
特有监控指标 |
|
API种类 | SQL |
是否支持更新或删除结果表数据 | 是 |
前提条件
- 您需要创建Kafka集群,详情请参见创建DataFlow Kafka集群或在Kafka创建资源。
- 您需要连接实时计算Flink与Kafka集群之间网络。Kafka on EMR可参见文档配置创建和管理专有网络和安全组概述,阿里云消息队列Kafka版需要配置白名单。
使用限制
语法结构
CREATE TABLE upsert_kafka_sink (
user_region STRING,
pv BIGINT,
uv BIGINT,
PRIMARY KEY (user_region) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = '<yourTopicName>',
'properties.bootstrap.servers' = '...',
'key.format' = 'avro',
'value.format' = 'avro'
);
WITH参数
- 通用
参数 说明 数据类型 是否必填 默认值 备注 connector 表类型。 String 是 无 固定值为upsert-kafka。 properties.bootstrap.servers Kafka broker地址。 String 是 无 格式为 host:port,host:port,host:port
,以英文逗号(,)分割。properties.* 对Kafka客户端的直接配置。 String 否 无 Flink会将properties.前缀移除,并将剩余的配置传递给Kafka客户端。例如可以通过
'properties.allow.auto.create.topics' = 'false'
来禁用自动创建topic。不能通过该方式修改以下配置,因为它们会被Kafka连接器覆盖:- key.deserializer
- value.deserializer
format 读取或写入Kafka消息value部分时使用的格式。 String 否 无 参数取值如下:- csv
- json
- avro
- debezium-json
- canal-json
- maxwell-json
- avro-confluent
- raw
key.format 读取或写入Kafka消息key部分时使用的格式。 String 是 无 当使用该配置时,key.fields或key.fields-prefix配置是必填的。 参数取值如下:- csv
- json
- avro
- debezium-json
- canal-json
- maxwell-json
- avro-confluent
- raw
key.fields Kafka消息key部分对应的源表或结果表字段。 String 否 无 多个字段名以分号(;)分隔。例如 field1;field2
key.fields-prefix 为所有Kafka消息key部分指定自定义前缀,以避免与消息value部分格式字段重名。 String 否 无 该配置项仅用于源表和结果表的列名区分,解析和生成Kafka消息key部分时,该前缀会被移除。 说明 使用该配置时,value.fields-include必须配置为EXCEPT_KEY。value.format 读取或写入Kafka消息value部分时使用的格式。 String 是 无 当使用该配置时,key.fields或key.fields-prefix配置是必填的。该配置等同于format,因此format和value.format 只能配置其中一个,如果同时配置两个会产生冲突。 value.fields-include 在解析或生成Kafka消息value部分时,是否要包含消息key部分对应的字段。 String 是 ALL 参数取值如下:- ALL(默认值):所有列都会作为Kafka消息value部分处理。
- EXCEPT_KEY:除去key.fields定义的字段,剩余字段作为Kafka消息value部分处理。
topic 读取或写入topic名称。 String 是 无 无。 - 结果表
参数 说明 数据类型 是否必填 默认值 备注 sink.parallelism Kafka结果表算子的并发数。 Integer 否 上游算子的并发,由框架决定 无。 sink.buffer-flush.max-rows 缓存刷新前,最多能缓存多少条记录。 Integer 否 0(未开启) 当结果表收到很多同key上的更新时,缓存将保留同key的最后一条记录,因此结果表缓存能帮助减少发往Kafka topic的数据量,以及避免发送潜在的tombstone消息。 说明 如果要开启结果表缓存,需要同时设置sink.buffer-flush.max-rows和sink.buffer-flush.interval两个选项为大于零的值。sink.buffer-flush.interval 缓存刷新的间隔时间。 Duration 否 0(未开启) 当结果表收到很多同key上的更新时,缓存将保留同key的最后一条记录,因此结果表缓存能帮助减少发往Kafka topic的数据量,以及避免发送潜在的tombstone消息。 说明 如果要开启结果表缓存,需要同时设置sink.buffer-flush.max-rows和sink.buffer-flush.interval两个选项为大于零的值。
使用示例
- 源表创建Kafka数据源表,源表中包含网站用户的浏览数据。
CREATE TABLE pageviews ( user_id BIGINT, page_id BIGINT, viewtime TIMESTAMP, user_region STRING, WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = '<yourTopicName>', 'properties.bootstrap.servers' = '...', 'format' = 'json' );
- 结果表
- 创建Upsert Kafka结果表。
CREATE TABLE pageviews_per_region ( user_region STRING, pv BIGINT, uv BIGINT, PRIMARY KEY (user_region) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = '<yourTopicName>', 'properties.bootstrap.servers' = '...', 'key.format' = 'avro', 'value.format' = 'avro' );
- 将统计网站用户的浏览数据写入结果表中。
INSERT INTO pageviews_per_region SELECT user_region, COUNT(*), COUNT(DISTINCT user_id) FROM pageviews GROUP BY user_region;
- 创建Upsert Kafka结果表。