本文为您介绍如何使用Upsert Kafka连接器。
背景信息
Upsert Kafka连接器支持以upsert方式从Kafka topic中读取数据并将数据写入Kafka topic。
- 作为源表,此连接器可以将Kafka中存储的数据转换为changelog流,其中每条数据记录代表一个更新或删除事件。更准确地说,数据记录中的value被解释为同一key的最后一个value的UPDATE,如果有这个key,如果不存在相应的key,则该更新被视为INSERT。用表来类比,changelog流中的数据记录被解释为UPSERT,也称为INSERT或UPDATE,因为任何具有相同key的现有行都被覆盖。另外,value为空的消息将会被视作为DELETE消息。 
- 作为结果表或数据摄入目标端,此连接器可以消费上游计算逻辑产生的changelog流。它会将INSERT或UPDATE_AFTER数据作为正常的Kafka消息写入,并将DELETE数据以value为空的Kafka消息写入,表示对应key的消息被删除。Flink将根据主键列的值对数据进行分区,从而保证主键上的消息有序,因此同一主键上的更新或删除消息将落在同一分区中。 
| 类别 | 详情 | 
| 支持类型 | 源表和结果表,数据摄入目标端 | 
| 运行模式 | 流模式 | 
| 数据格式 | avro、avro-confluent、csv、json和raw | 
| 特有监控指标 | 
 | 
| API种类 | SQL,数据摄入YAML作业 | 
| 是否支持更新或删除结果表数据 | 是 | 
前提条件
- 您需要创建Kafka集群,详情请参见创建DataFlow Kafka集群或在Kafka创建资源。 
- 您需要连接实时计算Flink与Kafka集群之间网络。Kafka on EMR可参见文档配置创建和管理专有网络和安全组概述,云消息队列 Kafka 版需要配置白名单。 
使用限制
SQL
Upsert Kafka连接器支持以upsert方式从Kafka topic中读取数据并将数据写入Kafka topic。
语法结构
CREATE TABLE upsert_kafka_sink(
user_region STRING,
pv BIGINT,
uv BIGINT,
PRIMARY KEY(user_region) NOT ENFORCED
)WITH(
'connector'='upsert-kafka',
'topic'='<yourTopicName>',
'properties.bootstrap.servers'='...',
'key.format'='avro',
'value.format'='avro'
);WITH参数
- 通用 - 参数 - 说明 - 数据类型 - 是否必填 - 默认值 - 备注 - connector - 表类型。 - String - 是 - 无 - 固定值为upsert-kafka。 - properties.bootstrap.servers - Kafka broker地址。 - String - 是 - 无 - 格式为 - host:port,host:port,host:port,以英文逗号(,)分割。- properties.* - 对Kafka客户端的直接配置。 - String - 否 - 无 - Flink会将properties.前缀移除,并将剩余的配置传递给Kafka客户端。例如可以通过 - 'properties.allow.auto.create.topics' = 'false'来禁用自动创建topic。- 不能通过该方式修改以下配置,因为它们会被Kafka连接器覆盖: - key.deserializer 
- value.deserializer 
 - key.format - 读取或写入Kafka消息key部分时使用的格式。 - String - 是 - 无 - 当使用该配置时,key.fields或key.fields-prefix配置是必填的。 - 参数取值如下: - csv 
- json 
- avro 
- debezium-json 
- canal-json 
- maxwell-json 
- avro-confluent 
- raw 
 - key.fields-prefix - 为所有Kafka消息key部分指定自定义前缀,以避免与消息value部分格式字段重名。 - String - 否 - 无 - 该配置项仅用于源表和结果表的列名区分,解析和生成Kafka消息key部分时,该前缀会被移除。 说明- 使用该配置时,value.fields-include必须配置为EXCEPT_KEY。 - value.format - 读取或写入Kafka消息value部分时使用的格式。 - String - 是 - 无 - 该配置等同于format,因此format和value.format 只能配置其中一个,如果同时配置两个会产生冲突。 - value.fields-include - 在解析或生成Kafka消息value部分时,是否要包含消息key部分对应的字段。 - String - 是 - ALL - 参数取值如下: - ALL(默认值):所有列都会作为Kafka消息value部分处理。 
- EXCEPT_KEY:除去key.fields定义的字段,剩余字段作为Kafka消息value部分处理。 
 - topic - 读取或写入topic名称。 - String - 是 - 无 - 无。 
- 结果表 - 参数 - 说明 - 数据类型 - 是否必填 - 默认值 - 备注 - sink.parallelism - Kafka结果表算子的并发数。 - Integer - 否 - 上游算子的并发,由框架决定 - 无。 - sink.buffer-flush.max-rows - 缓存刷新前,最多能缓存多少条记录。 - Integer - 否 - 0(未开启) - 当结果表收到很多同key上的更新时,缓存将保留同key的最后一条记录,因此结果表缓存能帮助减少发往Kafka topic的数据量,以及避免发送潜在的tombstone消息。 说明- 如果要开启结果表缓存,需要同时设置sink.buffer-flush.max-rows和sink.buffer-flush.interval两个选项为大于零的值。 - sink.buffer-flush.interval - 缓存刷新的间隔时间。 - Duration - 否 - 0(未开启) - 单位可以为毫秒(ms)、秒(s)、分钟(min)或小时(h)。例如 - 'sink.buffer-flush.interval'='1 s'。- 当结果表收到很多同key上的更新时,缓存将保留同key的最后一条记录,因此结果表缓存能帮助减少发往Kafka topic的数据量,以及避免发送潜在的tombstone消息。 说明- 如果要开启结果表缓存,需要同时设置sink.buffer-flush.max-rows和sink.buffer-flush.interval两个选项为大于零的值。 
数据摄入
Upsert Kafka连接器可以用于数据摄入YAML作业开发,作为目标端写入。写入时使用JSON格式,主键字段也会放入消息体中。
语法结构
sink:
  type: upsert-kafka
  name: upsert-kafka Sink
  properties.bootstrap.servers: localhost:9092
  # 阿里云消息队列 Kafka 版
  aliyun.kafka.accessKeyId: ${secret_values.kafka-ak}
  aliyun.kafka.accessKeySecret: ${secret_values.kafka-sk}
  aliyun.kafka.instanceId: ${instancd-id}
  aliyun.kafka.endpoint: ${endpoint}
  aliyun.kafka.regionId: ${region-id}配置项
| 参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 | 
| type | 目标端类型。 | STRING | 是 | 无 | 固定值为upsert-kafka。 | 
| name | 目标端名称。 | STRING | 否 | 无 | 无。 | 
| properties.bootstrap.servers | Kafka broker地址。 | STRING | 是 | 无 | 格式为 | 
| properties.* | 对Kafka客户端的直接配置。 | STRING | 否 | 无 | 后缀名必须是Kafka官方文档中定义的生产者配置。 Flink会将properties.前缀移除,并将剩余的配置传递给Kafka客户端。例如可以通过 | 
| sink.delivery-guarantee | 写入时的语义模式。 | STRING | 否 | at-least-once | 取值如下: 
 | 
| sink.add-tableId-to-header-enabled | 是否将table信息写入header。 | BOOLEAN | 否 | false | 开启时,namespace、schemaName和tableName会分别写入header。 | 
| aliyun.kafka.accessKeyId | 阿里云账号AccessKey ID。 | STRING | 否 | 无 | 详情请参见创建AccessKey。 说明  同步数据到阿里云消息队列Kafka版时需要配置。 | 
| aliyun.kafka.accessKeySecret | 阿里云账号AccessKey Secret。 | STRING | 否 | 无 | 详情请参见创建AccessKey。 说明  同步数据到阿里云消息队列Kafka版时需要配置。 | 
| aliyun.kafka.instanceId | 阿里云Kafka消息队列实例ID。 | STRING | 否 | 无 | 请在阿里云Kafka实例详情界面查看。 说明  同步数据到阿里云消息队列Kafka版时需要配置。 | 
| aliyun.kafka.endpoint | 阿里云Kafka API服务接入地址。 | STRING | 否 | 无 | 详情请参见服务接入点。 说明  同步数据到阿里云消息队列Kafka版时需要配置。 | 
| aliyun.kafka.regionId | Topic所在实例的地域ID。 | STRING | 否 | 无 | 详情请参见服务接入点。 说明  同步数据到阿里云消息队列Kafka版时需要配置。 | 
支持的类型变更
数据摄入Upsert Kafka连接器支持全部类型的变更操作,但是在读取时需要使用Schema固定的Flink Upsert Kafka SQL连接器进行读取。
使用示例
- 源表 - 创建Kafka数据源表,源表中包含网站用户的浏览数据。 - CREATE TABLE pageviews( user_id BIGINT, page_id BIGINT, viewtime TIMESTAMP, user_region STRING, WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND )WITH( 'connector'='kafka', 'topic'='<yourTopicName>', 'properties.bootstrap.servers'='...', 'format'='json' );
- 结果表 - 创建Upsert Kafka结果表。 - CREATE TABLE pageviews_per_region( user_region STRING, pv BIGINT, uv BIGINT, PRIMARY KEY(user_region) NOT ENFORCED )WITH( 'connector'='upsert-kafka', 'topic'='<yourTopicName>', 'properties.bootstrap.servers'='...', 'key.format'='avro', 'value.format'='avro' );
- 将统计网站用户的浏览数据写入结果表中。 - INSERT INTO pageviews_per_region SELECT user_region, COUNT(*), COUNT(DISTINCTuser_id) FROM pageviews GROUP BY user_region;
 
- 数据摄入目标端 - source: type: mysql name: MySQL Source hostname: ${mysql.hostname} port: ${mysql.port} username: ${mysql.username} password: ${mysql.password} tables: ${mysql.source.table} server-id: 8601-8604 sink: type: upsert-kafka name: Upsert Kafka Sink properties.bootstrap.servers: ${upsert.kafka.bootstraps.server} aliyun.kafka.accessKeyId: ${upsert.kafka.aliyun.ak} aliyun.kafka.accessKeySecret: ${upsert.kafka.aliyun.sk} aliyun.kafka.instanceId: ${upsert.kafka.aliyun.instanceid} aliyun.kafka.endpoint: ${upsert.kafka.aliyun.endpoint} aliyun.kafka.regionId: ${upsert.kafka.aliyun.regionid} route: - source-table: ${mysql.source.table} sink-table: ${upsert.kafka.topic}