本文为您介绍如何使用Upsert Kafka连接器。
背景信息
Upsert Kafka连接器支持以upsert方式从Kafka topic中读取数据并将数据写入Kafka topic。
作为源表,此连接器可以将Kafka中存储的数据转换为changelog流,其中每条数据记录代表一个更新或删除事件。更准确地说,数据记录中的value被解释为同一key的最后一个value的UPDATE,如果有这个key,如果不存在相应的key,则该更新被视为INSERT。用表来类比,changelog流中的数据记录被解释为UPSERT,也称为INSERT或UPDATE,因为任何具有相同key的现有行都被覆盖。另外,value为空的消息将会被视作为DELETE消息。
作为结果表或数据摄入目标端,此连接器可以消费上游计算逻辑产生的changelog流。它会将INSERT或UPDATE_AFTER数据作为正常的Kafka消息写入,并将DELETE数据以value为空的Kafka消息写入,表示对应key的消息被删除。Flink将根据主键列的值对数据进行分区,从而保证主键上的消息有序,因此同一主键上的更新或删除消息将落在同一分区中。
类别 | 详情 |
支持类型 | 源表和结果表,数据摄入目标端 |
运行模式 | 流模式 |
数据格式 | avro、avro-confluent、csv、json和raw |
特有监控指标 |
|
API种类 | SQL,数据摄入YAML作业 |
是否支持更新或删除结果表数据 | 是 |
前提条件
您需要创建Kafka集群,详情请参见创建DataFlow Kafka集群或在Kafka创建资源。
您需要连接实时计算Flink与Kafka集群之间网络。Kafka on EMR可参见文档配置创建和管理专有网络和安全组概述,云消息队列 Kafka 版需要配置白名单。
使用限制
SQL
Upsert Kafka连接器支持以upsert方式从Kafka topic中读取数据并将数据写入Kafka topic。
语法结构
CREATE TABLE upsert_kafka_sink(
user_region STRING,
pv BIGINT,
uv BIGINT,
PRIMARY KEY(user_region) NOT ENFORCED
)WITH(
'connector'='upsert-kafka',
'topic'='<yourTopicName>',
'properties.bootstrap.servers'='...',
'key.format'='avro',
'value.format'='avro'
);
WITH参数
通用
参数
说明
数据类型
是否必填
默认值
备注
connector
表类型。
String
是
无
固定值为upsert-kafka。
properties.bootstrap.servers
Kafka broker地址。
String
是
无
格式为
host:port,host:port,host:port
,以英文逗号(,)分割。properties.*
对Kafka客户端的直接配置。
String
否
无
Flink会将properties.前缀移除,并将剩余的配置传递给Kafka客户端。例如可以通过
'properties.allow.auto.create.topics' = 'false'
来禁用自动创建topic。不能通过该方式修改以下配置,因为它们会被Kafka连接器覆盖:
key.deserializer
value.deserializer
key.format
读取或写入Kafka消息key部分时使用的格式。
String
是
无
当使用该配置时,key.fields或key.fields-prefix配置是必填的。
参数取值如下:
csv
json
avro
debezium-json
canal-json
maxwell-json
avro-confluent
raw
key.fields-prefix
为所有Kafka消息key部分指定自定义前缀,以避免与消息value部分格式字段重名。
String
否
无
该配置项仅用于源表和结果表的列名区分,解析和生成Kafka消息key部分时,该前缀会被移除。
说明使用该配置时,value.fields-include必须配置为EXCEPT_KEY。
value.format
读取或写入Kafka消息value部分时使用的格式。
String
是
无
该配置等同于format,因此format和value.format 只能配置其中一个,如果同时配置两个会产生冲突。
value.fields-include
在解析或生成Kafka消息value部分时,是否要包含消息key部分对应的字段。
String
是
ALL
参数取值如下:
ALL(默认值):所有列都会作为Kafka消息value部分处理。
EXCEPT_KEY:除去key.fields定义的字段,剩余字段作为Kafka消息value部分处理。
topic
读取或写入topic名称。
String
是
无
无。
结果表
参数
说明
数据类型
是否必填
默认值
备注
sink.parallelism
Kafka结果表算子的并发数。
Integer
否
上游算子的并发,由框架决定
无。
sink.buffer-flush.max-rows
缓存刷新前,最多能缓存多少条记录。
Integer
否
0(未开启)
当结果表收到很多同key上的更新时,缓存将保留同key的最后一条记录,因此结果表缓存能帮助减少发往Kafka topic的数据量,以及避免发送潜在的tombstone消息。
说明如果要开启结果表缓存,需要同时设置sink.buffer-flush.max-rows和sink.buffer-flush.interval两个选项为大于零的值。
sink.buffer-flush.interval
缓存刷新的间隔时间。
Duration
否
0(未开启)
单位可以为毫秒(ms)、秒(s)、分钟(min)或小时(h)。例如
'sink.buffer-flush.interval'='1 s'
。当结果表收到很多同key上的更新时,缓存将保留同key的最后一条记录,因此结果表缓存能帮助减少发往Kafka topic的数据量,以及避免发送潜在的tombstone消息。
说明如果要开启结果表缓存,需要同时设置sink.buffer-flush.max-rows和sink.buffer-flush.interval两个选项为大于零的值。
数据摄入
Upsert Kafka连接器可以用于数据摄入YAML作业开发,作为目标端写入。写入时使用JSON格式,主键字段也会放入消息体中。
语法结构
sink:
type: upsert-kafka
name: upsert-kafka Sink
properties.bootstrap.servers: localhost:9092
# 阿里云消息队列 Kafka 版
aliyun.kafka.accessKeyId: ${secret_values.kafka-ak}
aliyun.kafka.accessKeySecret: ${secret_values.kafka-sk}
aliyun.kafka.instanceId: ${instancd-id}
aliyun.kafka.endpoint: ${endpoint}
aliyun.kafka.regionId: ${region-id}
配置项
参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 |
type | 目标端类型。 | STRING | 是 | 无 | 固定值为upsert-kafka。 |
name | 目标端名称。 | STRING | 否 | 无 | 无。 |
properties.bootstrap.servers | Kafka broker地址。 | STRING | 是 | 无 | 格式为 |
properties.* | 对Kafka客户端的直接配置。 | STRING | 否 | 无 | 后缀名必须是Kafka官方文档中定义的生产者配置。 Flink会将properties.前缀移除,并将剩余的配置传递给Kafka客户端。例如可以通过 |
sink.delivery-guarantee | 写入时的语义模式。 | STRING | 否 | at-least-once | 取值如下:
|
sink.add-tableId-to-header-enabled | 是否将table信息写入header。 | BOOLEAN | 否 | false | 开启时,namespace、schemaName和tableName会分别写入header。 |
aliyun.kafka.accessKeyId | 阿里云账号AccessKey ID。 | STRING | 否 | 无 | 详情请参见创建AccessKey。 说明 同步数据到阿里云消息队列Kafka版时需要配置。 |
aliyun.kafka.accessKeySecret | 阿里云账号AccessKey Secret。 | STRING | 否 | 无 | 详情请参见创建AccessKey。 说明 同步数据到阿里云消息队列Kafka版时需要配置。 |
aliyun.kafka.instanceId | 阿里云Kafka消息队列实例ID。 | STRING | 否 | 无 | 请在阿里云Kafka实例详情界面查看。 说明 同步数据到阿里云消息队列Kafka版时需要配置。 |
aliyun.kafka.endpoint | 阿里云Kafka API服务接入地址。 | STRING | 否 | 无 | 详情请参见服务接入点。 说明 同步数据到阿里云消息队列Kafka版时需要配置。 |
aliyun.kafka.regionId | Topic所在实例的地域ID。 | STRING | 否 | 无 | 详情请参见服务接入点。 说明 同步数据到阿里云消息队列Kafka版时需要配置。 |
使用示例
源表
创建Kafka数据源表,源表中包含网站用户的浏览数据。
CREATE TABLE pageviews( user_id BIGINT, page_id BIGINT, viewtime TIMESTAMP, user_region STRING, WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND )WITH( 'connector'='kafka', 'topic'='<yourTopicName>', 'properties.bootstrap.servers'='...', 'format'='json' );
结果表
创建Upsert Kafka结果表。
CREATE TABLE pageviews_per_region( user_region STRING, pv BIGINT, uv BIGINT, PRIMARY KEY(user_region) NOT ENFORCED )WITH( 'connector'='upsert-kafka', 'topic'='<yourTopicName>', 'properties.bootstrap.servers'='...', 'key.format'='avro', 'value.format'='avro' );
将统计网站用户的浏览数据写入结果表中。
INSERT INTO pageviews_per_region SELECT user_region, COUNT(*), COUNT(DISTINCTuser_id) FROM pageviews GROUP BY user_region;
数据摄入目标端
source: type: mysql name: MySQL Source hostname: ${mysql.hostname} port: ${mysql.port} username: ${mysql.username} password: ${mysql.password} tables: ${mysql.source.table} server-id: 8601-8604 sink: type: upsert-kafka name: Upsert Kafka Sink properties.bootstrap.servers: ${upsert.kafka.bootstraps.server} aliyun.kafka.accessKeyId: ${upsert.kafka.aliyun.ak} aliyun.kafka.accessKeySecret: ${upsert.kafka.aliyun.sk} aliyun.kafka.instanceId: ${upsert.kafka.aliyun.instanceid} aliyun.kafka.endpoint: ${upsert.kafka.aliyun.endpoint} aliyun.kafka.regionId: ${upsert.kafka.aliyun.regionid} route: - source-table: ${mysql.source.table} sink-table: ${upsert.kafka.topic}