本文为您介绍如何使用Upsert Kafka连接器。

背景信息

Upsert Kafka连接器支持以upsert方式从Kafka topic中读取数据并将数据写入Kafka topic。
  • 作为源表,此连接器可以将Kafka中存储的数据转换为changelog流,其中每条数据记录代表一个更新或删除事件。更准确地说,数据记录中的value被解释为同一key的最后一个value的UPDATE,如果有这个key,如果不存在相应的key,则该更新被视为INSERT。用表来类比,changelog流中的数据记录被解释为UPSERT,也称为INSERT或UPDATE,因为任何具有相同key的现有行都被覆盖。另外,value为空的消息将会被视作为DELETE消息。
  • 作为结果表,此连接器可以消费上游计算逻辑产生的changelog流。它会将INSERT或UPDATE_AFTER数据作为正常的Kafka消息写入,并将DELETE数据以value为空的Kafka消息写入,表示对应key的消息被删除。Flink将根据主键列的值对数据进行分区,从而保证主键上的消息有序,因此同一主键上的更新或删除消息将落在同一分区中。
类别详情
支持类型源表和结果表
运行模式流模式和批模式
数据格式avro、avro-confluent、canal-json、csv、debezium-json、json、maxwell-json和raw
特有监控指标
  • 源表
    • numRecordsIn
    • numRecordsInPerSecond
    • numBytesIn
    • numBytesInPerScond
    • currentEmitEventTimeLag
    • currentFetchEventTimeLag
    • sourceIdleTime
    • pendingRecords
  • 结果表
    • numRecordsOut
    • numRecordsOutPerSecond
    • numBytesOut
    • numBytesOutPerSecond
    • currentSendTime
API种类SQL
是否支持更新或删除结果表数据

前提条件

使用限制

  • 仅Flink计算引擎VVR 2.0.0及以上版本支持消息队列Kafka连接器。
  • 仅支持读取和写入Apache Kafka 0.10及以上版本的数据。
  • 仅支持Apache Kafka 2.8版本的客户端配置项,详情请参见Apache Kafka消费者生产者配置项文档。
  • Upsert Kafka结果表在使用精确一次语义时,写入的Kafka集群必须开启事务功能,且仅支持Apache Kafka 0.11及以上版本的集群。

语法结构

CREATE TABLE upsert_kafka_sink (
  user_region STRING,
  pv BIGINT,
  uv BIGINT,
  PRIMARY KEY (user_region) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = '<yourTopicName>',
  'properties.bootstrap.servers' = '...',
  'key.format' = 'avro',
  'value.format' = 'avro'
);

WITH参数

  • 通用
    参数说明数据类型是否必填默认值备注
    connector表类型。String固定值为upsert-kafka。
    properties.bootstrap.serversKafka broker地址。String格式为host:port,host:port,host:port,以英文逗号(,)分割。
    properties.*对Kafka客户端的直接配置。String

    后缀名必须是Kafka官方文档中定义的生产者消费者配置。

    Flink会将properties.前缀移除,并将剩余的配置传递给Kafka客户端。例如可以通过'properties.allow.auto.create.topics' = 'false' 来禁用自动创建topic。

    不能通过该方式修改以下配置,因为它们会被Kafka连接器覆盖:
    • key.deserializer
    • value.deserializer
    format读取或写入Kafka消息value部分时使用的格式。String
    参数取值如下:
    • csv
    • json
    • avro
    • debezium-json
    • canal-json
    • maxwell-json
    • avro-confluent
    • raw
    key.format读取或写入Kafka消息key部分时使用的格式。String当使用该配置时,key.fieldskey.fields-prefix配置是必填的。
    参数取值如下:
    • csv
    • json
    • avro
    • debezium-json
    • canal-json
    • maxwell-json
    • avro-confluent
    • raw
    key.fieldsKafka消息key部分对应的源表或结果表字段。String多个字段名以分号(;)分隔。例如field1;field2
    key.fields-prefix为所有Kafka消息key部分指定自定义前缀,以避免与消息value部分格式字段重名。String该配置项仅用于源表和结果表的列名区分,解析和生成Kafka消息key部分时,该前缀会被移除。
    说明 使用该配置时,value.fields-include必须配置为EXCEPT_KEY。
    value.format读取或写入Kafka消息value部分时使用的格式。String当使用该配置时,key.fieldskey.fields-prefix配置是必填的。该配置等同于format,因此formatvalue.format 只能配置其中一个,如果同时配置两个会产生冲突。
    value.fields-include在解析或生成Kafka消息value部分时,是否要包含消息key部分对应的字段。StringALL
    参数取值如下:
    • ALL(默认值):所有列都会作为Kafka消息value部分处理。
    • EXCEPT_KEY:除去key.fields定义的字段,剩余字段作为Kafka消息value部分处理。
    topic读取或写入topic名称。String无。
  • 结果表
    参数说明数据类型是否必填默认值备注
    sink.parallelismKafka结果表算子的并发数。Integer上游算子的并发,由框架决定无。
    sink.buffer-flush.max-rows缓存刷新前,最多能缓存多少条记录。Integer0(未开启)当结果表收到很多同key上的更新时,缓存将保留同key的最后一条记录,因此结果表缓存能帮助减少发往Kafka topic的数据量,以及避免发送潜在的tombstone消息。
    说明 如果要开启结果表缓存,需要同时设置sink.buffer-flush.max-rowssink.buffer-flush.interval两个选项为大于零的值。
    sink.buffer-flush.interval缓存刷新的间隔时间。Duration0(未开启)当结果表收到很多同key上的更新时,缓存将保留同key的最后一条记录,因此结果表缓存能帮助减少发往Kafka topic的数据量,以及避免发送潜在的tombstone消息。
    说明 如果要开启结果表缓存,需要同时设置sink.buffer-flush.max-rowssink.buffer-flush.interval两个选项为大于零的值。

使用示例

  • 源表
    创建Kafka数据源表,源表中包含网站用户的浏览数据。
    CREATE TABLE pageviews (
      user_id BIGINT,
      page_id BIGINT,
      viewtime TIMESTAMP,
      user_region STRING,
      WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND
    ) WITH (
      'connector' = 'kafka',
      'topic' = '<yourTopicName>',
      'properties.bootstrap.servers' = '...',
      'format' = 'json'
    );
  • 结果表
    • 创建Upsert Kafka结果表。
      CREATE TABLE pageviews_per_region (
        user_region STRING,
        pv BIGINT,
        uv BIGINT,
        PRIMARY KEY (user_region) NOT ENFORCED
      ) WITH (
        'connector' = 'upsert-kafka',
        'topic' = '<yourTopicName>',
        'properties.bootstrap.servers' = '...',
        'key.format' = 'avro',
        'value.format' = 'avro'
      );
    • 将统计网站用户的浏览数据写入结果表中。
      INSERT INTO pageviews_per_region 
      SELECT
        user_region,
        COUNT(*),
        COUNT(DISTINCT user_id)
      FROM pageviews
      GROUP BY user_region;

最佳实践

MySQL整库同步Kafka