Upsert Kafka

本文为您介绍如何使用Upsert Kafka连接器。

背景信息

Upsert Kafka连接器支持以upsert方式从Kafka topic中读取数据并将数据写入Kafka topic。

  • 作为源表,此连接器可以将Kafka中存储的数据转换为changelog流,其中每条数据记录代表一个更新或删除事件。更准确地说,数据记录中的value被解释为同一key的最后一个value的UPDATE,如果有这个key,如果不存在相应的key,则该更新被视为INSERT。用表来类比,changelog流中的数据记录被解释为UPSERT,也称为INSERT或UPDATE,因为任何具有相同key的现有行都被覆盖。另外,value为空的消息将会被视作为DELETE消息。

  • 作为结果表或数据摄入目标端,此连接器可以消费上游计算逻辑产生的changelog流。它会将INSERT或UPDATE_AFTER数据作为正常的Kafka消息写入,并将DELETE数据以value为空的Kafka消息写入,表示对应key的消息被删除。Flink将根据主键列的值对数据进行分区,从而保证主键上的消息有序,因此同一主键上的更新或删除消息将落在同一分区中。

类别

详情

支持类型

源表和结果表,数据摄入目标端

运行模式

流模式

数据格式

avro、avro-confluent、csv、json和raw

特有监控指标

  • 源表

    • numRecordsIn

    • numRecordsInPerSecond

    • numBytesIn

    • numBytesInPerScond

    • currentEmitEventTimeLag

    • currentFetchEventTimeLag

    • sourceIdleTime

    • pendingRecords

  • 结果表

    • numRecordsOut

    • numRecordsOutPerSecond

    • numBytesOut

    • numBytesOutPerSecond

    • currentSendTime

API种类

SQL,数据摄入YAML作业

是否支持更新或删除结果表数据

前提条件

使用限制

  • 仅Flink计算引擎VVR 2.0.0及以上版本支持消息队列Kafka连接器。

  • 仅支持读取和写入Apache Kafka 0.10及以上版本的数据。

  • 仅支持Apache Kafka 2.8版本的客户端配置项,详情请参见Apache Kafka消费者生产者配置项文档。

  • Upsert Kafka结果表在使用精确一次语义时,写入的Kafka集群必须开启事务功能,且仅支持Apache Kafka 0.11及以上版本的集群。

SQL

Upsert Kafka连接器支持以upsert方式从Kafka topic中读取数据并将数据写入Kafka topic。

语法结构

CREATE TABLE upsert_kafka_sink(
user_region STRING,
pv BIGINT,
uv BIGINT,
PRIMARY KEY(user_region) NOT ENFORCED
)WITH(
'connector'='upsert-kafka',
'topic'='<yourTopicName>',
'properties.bootstrap.servers'='...',
'key.format'='avro',
'value.format'='avro'
);

WITH参数

  • 通用

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    connector

    表类型。

    String

    固定值为upsert-kafka。

    properties.bootstrap.servers

    Kafka broker地址。

    String

    格式为host:port,host:port,host:port,以英文逗号(,)分割。

    properties.*

    对Kafka客户端的直接配置。

    String

    后缀名必须是Kafka官方文档中定义的生产者消费者配置。

    Flink会将properties.前缀移除,并将剩余的配置传递给Kafka客户端。例如可以通过'properties.allow.auto.create.topics' = 'false' 来禁用自动创建topic。

    不能通过该方式修改以下配置,因为它们会被Kafka连接器覆盖:

    • key.deserializer

    • value.deserializer

    key.format

    读取或写入Kafka消息key部分时使用的格式。

    String

    当使用该配置时,key.fieldskey.fields-prefix配置是必填的。

    参数取值如下:

    • csv

    • json

    • avro

    • debezium-json

    • canal-json

    • maxwell-json

    • avro-confluent

    • raw

    key.fields-prefix

    为所有Kafka消息key部分指定自定义前缀,以避免与消息value部分格式字段重名。

    String

    该配置项仅用于源表和结果表的列名区分,解析和生成Kafka消息key部分时,该前缀会被移除。

    说明

    使用该配置时,value.fields-include必须配置为EXCEPT_KEY。

    value.format

    读取或写入Kafka消息value部分时使用的格式。

    String

    该配置等同于format,因此formatvalue.format 只能配置其中一个,如果同时配置两个会产生冲突。

    value.fields-include

    在解析或生成Kafka消息value部分时,是否要包含消息key部分对应的字段。

    String

    ALL

    参数取值如下:

    • ALL(默认值):所有列都会作为Kafka消息value部分处理。

    • EXCEPT_KEY:除去key.fields定义的字段,剩余字段作为Kafka消息value部分处理。

    topic

    读取或写入topic名称。

    String

    无。

  • 结果表

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    sink.parallelism

    Kafka结果表算子的并发数。

    Integer

    上游算子的并发,由框架决定

    无。

    sink.buffer-flush.max-rows

    缓存刷新前,最多能缓存多少条记录。

    Integer

    0(未开启)

    当结果表收到很多同key上的更新时,缓存将保留同key的最后一条记录,因此结果表缓存能帮助减少发往Kafka topic的数据量,以及避免发送潜在的tombstone消息。

    说明

    如果要开启结果表缓存,需要同时设置sink.buffer-flush.max-rowssink.buffer-flush.interval两个选项为大于零的值。

    sink.buffer-flush.interval

    缓存刷新的间隔时间。

    Duration

    0(未开启)

    单位可以为毫秒(ms)、秒(s)、分钟(min)或小时(h)。例如'sink.buffer-flush.interval'='1 s'

    当结果表收到很多同key上的更新时,缓存将保留同key的最后一条记录,因此结果表缓存能帮助减少发往Kafka topic的数据量,以及避免发送潜在的tombstone消息。

    说明

    如果要开启结果表缓存,需要同时设置sink.buffer-flush.max-rowssink.buffer-flush.interval两个选项为大于零的值。

数据摄入

Upsert Kafka连接器可以用于数据摄入YAML作业开发,作为目标端写入。写入时使用JSON格式,主键字段也会放入消息体中。

语法结构

sink:
  type: upsert-kafka
  name: upsert-kafka Sink
  properties.bootstrap.servers: localhost:9092
  # 阿里云消息队列 Kafka 版
  aliyun.kafka.accessKeyId: ${secret_values.kafka-ak}
  aliyun.kafka.accessKeySecret: ${secret_values.kafka-sk}
  aliyun.kafka.instanceId: ${instancd-id}
  aliyun.kafka.endpoint: ${endpoint}
  aliyun.kafka.regionId: ${region-id}

配置项

参数

说明

数据类型

是否必填

默认值

备注

type

目标端类型。

STRING

固定值为upsert-kafka。

name

目标端名称。

STRING

无。

properties.bootstrap.servers

Kafka broker地址。

STRING

格式为host:port,host:port,host:port,以英文逗号(,)分割。

properties.*

对Kafka客户端的直接配置。

STRING

后缀名必须是Kafka官方文档中定义的生产者配置。

Flink会将properties.前缀移除,并将剩余的配置传递给Kafka客户端。例如可以通过'properties.allow.auto.create.topics' = 'false' 来禁用自动创建topic。

sink.delivery-guarantee

写入时的语义模式。

STRING

at-least-once

取值如下:

  • none:不保证任何语义,数据可能会丢失或重复。

  • at-least-once(默认值):保证数据不丢失,但可能会重复。

  • exactly-once:使用Kafka事务保证数据不会丢失和重复。

sink.add-tableId-to-header-enabled

是否将table信息写入header。

BOOLEAN

false

开启时,namespace、schemaName和tableName会分别写入header。

aliyun.kafka.accessKeyId

阿里云账号AccessKey ID。

STRING

详情请参见创建AccessKey

说明

同步数据到阿里云消息队列Kafka版时需要配置。

aliyun.kafka.accessKeySecret

阿里云账号AccessKey Secret。

STRING

详情请参见创建AccessKey

说明

同步数据到阿里云消息队列Kafka版时需要配置。

aliyun.kafka.instanceId

阿里云Kafka消息队列实例ID。

STRING

请在阿里云Kafka实例详情界面查看。

说明

同步数据到阿里云消息队列Kafka版时需要配置。

aliyun.kafka.endpoint

阿里云Kafka API服务接入地址。

STRING

详情请参见服务接入点

说明

同步数据到阿里云消息队列Kafka版时需要配置。

aliyun.kafka.regionId

Topic所在实例的地域ID。

STRING

详情请参见服务接入点

说明

同步数据到阿里云消息队列Kafka版时需要配置。

使用示例

  • 源表

    创建Kafka数据源表,源表中包含网站用户的浏览数据。

    CREATE TABLE pageviews(
    user_id BIGINT,
    page_id BIGINT,
    viewtime TIMESTAMP,
    user_region STRING,
    WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND
    )WITH(
    'connector'='kafka',
    'topic'='<yourTopicName>',
    'properties.bootstrap.servers'='...',
    'format'='json'
    );
  • 结果表

    • 创建Upsert Kafka结果表。

      CREATE TABLE pageviews_per_region(
      user_region STRING,
      pv BIGINT,
      uv BIGINT,
      PRIMARY KEY(user_region) NOT ENFORCED
      )WITH(
      'connector'='upsert-kafka',
      'topic'='<yourTopicName>',
      'properties.bootstrap.servers'='...',
      'key.format'='avro',
      'value.format'='avro'
      );
    • 将统计网站用户的浏览数据写入结果表中。

      INSERT INTO pageviews_per_region
      SELECT
      user_region,
      COUNT(*),
      COUNT(DISTINCTuser_id)
      FROM pageviews
      GROUP BY user_region;
  • 数据摄入目标端

    source:
      type: mysql
      name: MySQL Source
      hostname: ${mysql.hostname}
      port: ${mysql.port}
      username: ${mysql.username}
      password: ${mysql.password}
      tables: ${mysql.source.table}
      server-id: 8601-8604
    
    sink:
      type: upsert-kafka
      name: Upsert Kafka Sink
      properties.bootstrap.servers: ${upsert.kafka.bootstraps.server}
      aliyun.kafka.accessKeyId: ${upsert.kafka.aliyun.ak}
      aliyun.kafka.accessKeySecret: ${upsert.kafka.aliyun.sk}
      aliyun.kafka.instanceId: ${upsert.kafka.aliyun.instanceid}
      aliyun.kafka.endpoint: ${upsert.kafka.aliyun.endpoint}
      aliyun.kafka.regionId: ${upsert.kafka.aliyun.regionid}
    
    route:
      - source-table: ${mysql.source.table}
        sink-table: ${upsert.kafka.topic}

最佳实践