本文为您介绍Avro格式的使用示例、配置选项和类型映射。
背景信息
Avro格式允许基于Avro的结构读写Avro数据。当前,Avro结构是基于表结构推导而来的。支持Avro格式的连接器包括消息队列Kafka、Upsert Kafka和对象存储OSS。
使用示例
利用Kafka以及Avro格式构建表的示例如下。
CREATE TABLE user_behavior (
  user_id BIGINT,
  item_id BIGINT,
  category_id BIGINT,
  behavior STRING,
  ts TIMESTAMP(3)
) WITH (
 'connector' = 'kafka',
 'topic' = 'user_behavior',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'avro'
)配置选项
参数  | 是否必选  | 默认值  | 类型  | 描述  | 
format | 是  | (none)  | String  | 声明使用的格式。使用Avro格式时,参数取值为avro。  | 
avro.codec | 否  | (none)  | String  | 指定Avro压缩的编解码器,仅适用于连接器为Filesystem的情况。参数取值如下: 
  | 
类型映射
Flink与Avro的数据类型的映射关系如下。
Flink SQL类型  | Avro类型  | 
CHAR / VARCHAR / STRING  | string  | 
BOOLEAN  | boolean  | 
BINARY / VARBINARY  | bytes  | 
DECIMAL  | bytes 说明  带有精度的十进制数。  | 
TINYINT  | int  | 
SMALLINT  | int  | 
INT  | int  | 
BIGINT  | long  | 
FLOAT  | float  | 
DOUBLE  | double  | 
DATE  | int 说明  日期。  | 
TIME  | int 说明  以毫秒为单位的时间。  | 
TIMESTAMP  | long 说明  以毫秒为单位的时间戳。  | 
ARRAY  | array  | 
MAP 说明  元素必须是STRING、CHAR或VARCHAR类型。  | map  | 
MULTISET 说明  元素必须是STRING、CHAR或VARCHAR类型。  | map  | 
ROW  | record  | 
除了以上类型,Flink支持读取和写入nullable的类型。Flink将nullable的类型映射到Avro union(something, null),其中something是从Flink类型转换的Avro类型。
说明 
关于Avro类型的信息,详情请见Avro 规范。
该文章对您有帮助吗?