Confluent Avro

本文为您介绍Confluent Avro格式的使用方法、配置选项和类型映射。

背景信息

Confluent Avro格式允许读取被io.confluent.kafka.serializers.KafkaAvroSerializer序列化的记录,以及写入能被io.confluent.kafka.serializers.KafkaAvroDeserializer反序列化的记录。

  • 读取记录时,将根据记录中编码的结构版本id从配置的Confluent Schema Registry中获取Avro写入结构,并从表结构中推断出读取结构。

  • 写入记录时,Avro结构是从表结构中推断出来的,并会被用来检索要与数据一起编码的结构id。我们会在配置的Confluent Schema Registry中配置的subject下,检索结构id。subject通过avro-confluent.subject参数来指定。

支持Confluent Avro格式的连接器包括消息队列KafkaUpsert Kafka

使用方法

利用Kafka和Upsert Kafka以及Confluent Avro格式构建表的示例如下。

  • 示例1:使用Kafka连接器,创建使用原始的UTF-8字符串作为Kafka的key,Schema Registry中注册的Avro记录作为Kafka的value的表。

CREATE TABLE user_created (
  -- 映射到Kafka key中的原始的UTF-8字符串的列。
  the_kafka_key STRING,
  
  -- 映射到Kafka value中的Avro字段的列。
  id STRING,
  name STRING,
  email STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_events_example1',
  'properties.bootstrap.servers' = 'localhost:9092',

  -- 使用UTF-8字符串作为Kafka的key的格式,并使用表中的the_kafka_key列。
  'key.format' = 'raw',
  'key.fields' = 'the_kafka_key',

  'value.format' = 'avro-confluent',
  'value.avro-confluent.url' = 'http://localhost:8082',
  'value.fields-include' = 'EXCEPT_KEY'
);

实现将数据写入到Kafka表中的DDL语句如下:

INSERT INTO user_created
SELECT
  -- 将用户id复制至映射到kafka key的列中。
  id as the_kafka_key,

  -- 所有的value。
  id, name, email
FROM some_table
  • 示例2:使用Kafka连接器,创建使用Schema Registry中注册的Avro记录作为Kafka的key和value的表。

CREATE TABLE user_created (  
  -- 映射到Kafka key中的Avro字段id的列。
  kafka_key_id STRING,
  
  -- 映射到Kafka value中的Avro字段的列。
  id STRING,
  name STRING, 
  email STRING 
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_events_example2',
  'properties.bootstrap.servers' = 'localhost:9092',

  -- 注意:由于哈希分区,在Kafka key的上下文中,schema升级几乎从不向后也不向前兼容。
  'key.format' = 'avro-confluent',
  'key.avro-confluent.url' = 'http://localhost:8082',
  'key.fields' = 'kafka_key_id',

  -- 在本例中,我们希望Kafka的key和value的Avro类型都包含id字段。
  -- 给表中与Kafka key字段关联的列添加一个前缀来避免冲突。
  'key.fields-prefix' = 'kafka_key_',

  'value.format' = 'avro-confluent',
  'value.avro-confluent.url' = 'http://localhost:8082',
  'value.fields-include' = 'EXCEPT_KEY',
   
  -- 自Flink 1.13起,subjects具有一个默认值,但是可以被覆盖。
  'key.avro-confluent.subject' = 'user_events_example2-key2',
  'value.avro-confluent.subject' = 'user_events_example2-value2'
);
  • 示例3:使用Upsert Kafka连接器,创建Schema Registry中注册的Avro记录作为Kafka的value的表。

CREATE TABLE user_created (  
  -- 映射到Kafka key中的原始的UTF-8字符串的列。
  kafka_key_id STRING,
  
  -- 映射到Kafka value中的Avro字段的列。
  id STRING, 
  name STRING, 
  email STRING, 
  
  -- Upsert-Kafka连接器需要一个主键来定义upsert行为。
  PRIMARY KEY (kafka_key_id) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'user_events_example3',
  'properties.bootstrap.servers' = 'localhost:9092',

  -- UTF-8字符串作为Kafka的keys。
  -- 在本例中我们不指定key.fields,因为它由表的主键决定。
  'key.format' = 'raw',
  
  -- 在本例中,我们希望Kafka的key和value的Avro类型都包含id字段。
  -- 给表中与Kafka key字段关联的列添加一个前缀来避免冲突。
  'key.fields-prefix' = 'kafka_key_',

  'value.format' = 'avro-confluent',
  'value.avro-confluent.url' = 'http://localhost:8082',
  'value.fields-include' = 'EXCEPT_KEY'
);

配置选项

选项

是否必选

默认值

类型

说明

format

(none)

String

指定使用的格式。使用Confluent Avro格式时,参数取值为Confluent Avro。

avro-confluent.basic-auth.credentials-source

(none)

String

指定Schema Registry的基本认证资格source。

avro-confluent.basic-auth.user-info

(none)

String

指定Schema Registry的基本认证用户信息。

avro-confluent.bearer-auth.credentials-source

(none)

String

指定Schema Registry的持有者认证资格source。

avro-confluent.bearer-auth.token

(none)

String

指定Schema Registry的持有者认证令牌。

avro-confluent.properties

(none)

Map

属性映射,该映射被转发到Schema Registry。对于没有通过Flink配置选项正式公开的选项非常有用。

重要

Flink选项具有更高的优先级。

avro-confluent.ssl.keystore.location

(none)

String

指定SSL keystore的位置。

avro-confluent.ssl.keystore.password

(none)

String

指定SSL keystore的密码。

avro-confluent.ssl.truststore.location

(none)

String

指定SSL truststore的位置。

avro-confluent.ssl.truststore.password

(none)

String

指定SSL truststore的密码。

avro-confluent.subject

(none)

String

指定Confluent Schema Registry subject,在该subject下注册此格式在序列化期间使用的模式。默认情况下,如果使用kafka和upsert-kafka连接器作为值或键格式,则使用<Topname>-value或<topname>-key作为默认subject名称。对于filesystem连接器,当其作为结果表使用时,必须使用subject选项。

avro-confluent.url

(none)

String

指定用于获取或注册schemas的Confluent Schema Registry的URL。

类型映射

Flink与Confluent Avro的数据类型的映射关系可参照Flink与Avro的数据类型的映射关系,如下表所示。

Flink SQL类型

Avro类型

CHAR / VARCHAR / STRING

string

BOOLEAN

boolean

BINARY / VARBINARY

bytes

DECIMAL

fixed

说明

带有精度的十进制数。

TINYINT

int

SMALLINT

int

INT

int

BIGINT

long

FLOAT

float

DOUBLE

double

DATE

int

说明

日期。

TIME

int

说明

以毫秒为单位的时间。

TIMESTAMP

long

说明

以毫秒为单位的时间戳。

ARRAY

array

MAP

说明

元素必须是STRING、CHAR或VARCHAR类型。

map

MULTISET

说明

元素必须是STRING、CHAR或VARCHAR类型。

map

ROW

record

除了以上类型,Flink支持读取和写入nullable的类型。Flink将nullable的类型映射到Avro union(something, null),其中something是从Flink类型转换的Avro类型。

说明

关于Avro类型的信息,详情请参见Avro规范