本文为您介绍如何使用Kafka数据表。

建表语法

CREATE TABLE tbName[(columnName dataType [,columnName dataType]*)]
USING kafka
OPTIONS(propertyName=propertyValue[,propertyName=propertyValue]*);

配置参数说明

参数名 说明 是否必选
kafka.schema.registry.url kafka SchemaRegistry 地址
subscribe 关联的 Kafka topic 名
kafka.bootstrap.servers Kafka 集群连接地址
kafka.schema.record.name Avro 数据格式 RecordName 定义
kafka.schema.record.namespace Avro 数据格式 RecordNameSpace 定义

Table Schema

创建 Kafka 表时, 字段定义是可选的
场景 Kafka SchemaRegistry 存在对应 Topic 数据的 Schema 定义 Kafka SchemaRegistry 不存在对应 Topic 数据的 Schema 定义
不提供表的字段定义 根据此 Topic Schema 来定义表的 Schema。 使用默认的Kafka数据源Schema
提供表的字段定义 检查表的 Schema 是否和 Topic 的 Schema 兼容。若不兼容,则报错。若兼容,则按照自定义字段类型来定义表的 Schema。 自动将Kafka数据的 Avro 数据Schema注册到Kafka SchemaRegistry 中。此时,需要提供 kafka.schema.record.namekafka.schema.record.namespace
当使用 Kafka SchemaRegistry 时,请注意:
  • Schema兼容性检查:当我们向Kafka中写入数据时,会进行输出数据的schema和Kafka Topic schema兼容性检查。Kafka Schema Registry默认创建的schema兼容性策略是BACKWARD。 如果您没有schema兼容性要求,可以选择将兼容性策略设置为NONE。
  • 如果您在使用过程中出现schema兼容性错误时,请关注错误日志提示。