本文为您介绍Confluent Avro格式的使用方法、配置选项和类型映射。
背景信息
Confluent Avro格式允许读取被io.confluent.kafka.serializers.KafkaAvroSerializer序列化的记录,以及写入能被io.confluent.kafka.serializers.KafkaAvroDeserializer反序列化的记录。
读取记录时,将根据记录中编码的结构版本id从配置的Confluent Schema Registry中获取Avro写入结构,并从表结构中推断出读取结构。
写入记录时,Avro结构是从表结构中推断出来的,并会被用来检索要与数据一起编码的结构id。我们会在配置的Confluent Schema Registry中配置的subject下,检索结构id。subject通过avro-confluent.subject参数来指定。
支持Confluent Avro格式的连接器包括消息队列Kafka和Upsert Kafka。
使用方法
利用Kafka和Upsert Kafka以及Confluent Avro格式构建表的示例如下。
示例1:使用Kafka连接器,创建使用原始的UTF-8字符串作为Kafka的key,Schema Registry中注册的Avro记录作为Kafka的value的表。
CREATE TABLE user_created (
-- 映射到Kafka key中的原始的UTF-8字符串的列。
the_kafka_key STRING,
-- 映射到Kafka value中的Avro字段的列。
id STRING,
name STRING,
email STRING
) WITH (
'connector' = 'kafka',
'topic' = 'user_events_example1',
'properties.bootstrap.servers' = 'localhost:9092',
-- 使用UTF-8字符串作为Kafka的key的格式,并使用表中的the_kafka_key列。
'key.format' = 'raw',
'key.fields' = 'the_kafka_key',
'value.format' = 'avro-confluent',
'value.avro-confluent.url' = 'http://localhost:8082',
'value.fields-include' = 'EXCEPT_KEY'
);
实现将数据写入到Kafka表中的DDL语句如下:
INSERT INTO user_created
SELECT
-- 将用户id复制至映射到kafka key的列中。
id as the_kafka_key,
-- 所有的value。
id, name, email
FROM some_table
示例2:使用Kafka连接器,创建使用Schema Registry中注册的Avro记录作为Kafka的key和value的表。
CREATE TABLE user_created (
-- 映射到Kafka key中的Avro字段id的列。
kafka_key_id STRING,
-- 映射到Kafka value中的Avro字段的列。
id STRING,
name STRING,
email STRING
) WITH (
'connector' = 'kafka',
'topic' = 'user_events_example2',
'properties.bootstrap.servers' = 'localhost:9092',
-- 注意:由于哈希分区,在Kafka key的上下文中,schema升级几乎从不向后也不向前兼容。
'key.format' = 'avro-confluent',
'key.avro-confluent.url' = 'http://localhost:8082',
'key.fields' = 'kafka_key_id',
-- 在本例中,我们希望Kafka的key和value的Avro类型都包含id字段。
-- 给表中与Kafka key字段关联的列添加一个前缀来避免冲突。
'key.fields-prefix' = 'kafka_key_',
'value.format' = 'avro-confluent',
'value.avro-confluent.url' = 'http://localhost:8082',
'value.fields-include' = 'EXCEPT_KEY',
-- 自Flink 1.13起,subjects具有一个默认值,但是可以被覆盖。
'key.avro-confluent.subject' = 'user_events_example2-key2',
'value.avro-confluent.subject' = 'user_events_example2-value2'
);
示例3:使用Upsert Kafka连接器,创建Schema Registry中注册的Avro记录作为Kafka的value的表。
CREATE TABLE user_created (
-- 映射到Kafka key中的原始的UTF-8字符串的列。
kafka_key_id STRING,
-- 映射到Kafka value中的Avro字段的列。
id STRING,
name STRING,
email STRING,
-- Upsert-Kafka连接器需要一个主键来定义upsert行为。
PRIMARY KEY (kafka_key_id) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'user_events_example3',
'properties.bootstrap.servers' = 'localhost:9092',
-- UTF-8字符串作为Kafka的keys。
-- 在本例中我们不指定key.fields,因为它由表的主键决定。
'key.format' = 'raw',
-- 在本例中,我们希望Kafka的key和value的Avro类型都包含id字段。
-- 给表中与Kafka key字段关联的列添加一个前缀来避免冲突。
'key.fields-prefix' = 'kafka_key_',
'value.format' = 'avro-confluent',
'value.avro-confluent.url' = 'http://localhost:8082',
'value.fields-include' = 'EXCEPT_KEY'
);
配置选项
选项 | 是否必选 | 默认值 | 类型 | 说明 |
format | 是 | (none) | String | 指定使用的格式。使用Confluent Avro格式时,参数取值为Confluent Avro。 |
avro-confluent.basic-auth.credentials-source | 否 | (none) | String | 指定Schema Registry的基本认证资格source。 |
avro-confluent.basic-auth.user-info | 否 | (none) | String | 指定Schema Registry的基本认证用户信息。 |
avro-confluent.bearer-auth.credentials-source | 否 | (none) | String | 指定Schema Registry的持有者认证资格source。 |
avro-confluent.bearer-auth.token | 否 | (none) | String | 指定Schema Registry的持有者认证令牌。 |
avro-confluent.properties | 否 | (none) | Map | 属性映射,该映射被转发到Schema Registry。对于没有通过Flink配置选项正式公开的选项非常有用。 重要 Flink选项具有更高的优先级。 |
avro-confluent.ssl.keystore.location | 否 | (none) | String | 指定SSL keystore的位置。 |
avro-confluent.ssl.keystore.password | 否 | (none) | String | 指定SSL keystore的密码。 |
avro-confluent.ssl.truststore.location | 否 | (none) | String | 指定SSL truststore的位置。 |
avro-confluent.ssl.truststore.password | 否 | (none) | String | 指定SSL truststore的密码。 |
avro-confluent.subject | 否 | (none) | String | 指定Confluent Schema Registry subject,在该subject下注册此格式在序列化期间使用的模式。默认情况下,如果使用kafka和upsert-kafka连接器作为值或键格式,则使用<Topname>-value或<topname>-key作为默认subject名称。对于filesystem连接器,当其作为结果表使用时,必须使用subject选项。 |
avro-confluent.url | 是 | (none) | String | 指定用于获取或注册schemas的Confluent Schema Registry的URL。 |
类型映射
Flink与Confluent Avro的数据类型的映射关系可参照Flink与Avro的数据类型的映射关系,如下表所示。
Flink SQL类型 | Avro类型 |
CHAR / VARCHAR / STRING | string |
BOOLEAN | boolean |
BINARY / VARBINARY | bytes |
DECIMAL | fixed 说明 带有精度的十进制数。 |
TINYINT | int |
SMALLINT | int |
INT | int |
BIGINT | long |
FLOAT | float |
DOUBLE | double |
DATE | int 说明 日期。 |
TIME | int 说明 以毫秒为单位的时间。 |
TIMESTAMP | long 说明 以毫秒为单位的时间戳。 |
ARRAY | array |
MAP 说明 元素必须是STRING、CHAR或VARCHAR类型。 | map |
MULTISET 说明 元素必须是STRING、CHAR或VARCHAR类型。 | map |
ROW | record |
除了以上类型,Flink支持读取和写入nullable的类型。Flink将nullable的类型映射到Avro union(something, null),其中something是从Flink类型转换的Avro类型。
关于Avro类型的信息,详情请参见Avro规范。