本文档为您介绍如何创建实时计算消息队列 Kafka结果表。

说明 本文档仅适用于独享模式。

什么是Kafka结果表

Apache Kafka(以下简称Kafka)是一个快速、可扩展、高吞吐、可容错的分布式发布订阅消息系统。Kafka具有高吞吐量、内置分区、支持数据副本和容错的特性,适合在大规模消息处理场景中使用。实时计算广泛采用Kafka作为流式数据的数据源表和结果表。
说明 Kafka结果表支持写入自建Kafka集群,但需注意版本对应关系,以及自建集群和实时计算集群的网络环境配置。

DDL定义

消息队列 Kafka结果表需要定义的DDL如下。
create table sink_kafka (
    messageKey VARBINARY,
    `message` VARBINARY,
    PRIMARY KEY (messageKey)
) with (
    type = 'kafka010',
    topic = '<yourTopicName>',
    bootstrap.servers = '<yourServerAddress>'
);
说明
  • 创建Kafka结果表时,必须明文指定PRIMARY KEY (messageKey)
  • 实时计算仅在2.2.6及以上版本中,支持阿里云Kafka或自建Kafka的TPS、RPS等指标信息的显示。

WITH参数

通用配置

参数 注释说明 是否必选 备注
type Kafka对应版本 必须是 Kafka08、Kafka09、Kafka010、Kafka011中的一种,版本对应关系参见Kafka版本对应关系
topic 写入的Topic Topic名称。
  • 必选配置
    • Kafka08必选配置:
      参数 注释说明 备注
      zookeeper.connect zk链接地址 zk连接ID
    • Kafka09/Kafka010/Kafka011必选配置:
      参数 注释说明 备注
      bootstrap.servers Kafka集群地址 Kafka集群地址
    说明 Kafka集群地址
  • 可选配置参数

    • consumer.id
    • socket.timeout.ms
    • fetch.message.max.bytes
    • num.consumer.fetchers
    • auto.commit.enable
    • auto.commit.interval.ms
    • queued.max.message.chunks
    • rebalance.max.retries
    • fetch.min.bytes
    • fetch.wait.max.ms
    • rebalance.backoff.ms
    • refresh.leader.backoff.ms
    • auto.offset.reset
    • consumer.timeout.ms
    • exclude.internal.topics
    • partition.assignment.strategy
    • client.id
    • zookeeper.session.timeout.ms
    • zookeeper.connection.timeout.ms
    • zookeeper.sync.time.ms
    • offsets.storage
    • offsets.channel.backoff.ms
    • offsets.channel.socket.timeout.ms
    • offsets.commit.max.retries
    • dual.commit.enabled
    • partition.assignment.strategy
    • socket.receive.buffer.bytes
    • fetch.min.bytes
    说明 其它可选配置项参考Kafka官方文档进行配置。

Kafka版本对应关系

type Kafka 版本
Kafka08 0.8.22
Kafka09 0.9.0.1
Kafka010 0.10.2.1
Kafka011 0.11.0.2及以上

示例

create table datahub_input (
id VARCHAR,
nm VARCHAR
) with (
type = 'datahub'
);

create table sink_kafka (
 messageKey VARBINARY,
`message` VARBINARY,
 PRIMARY KEY (messageKey)
) with (
    type = 'kafka010',
    topic = '<yourTopicName>',
    bootstrap.servers = '<yourServerAddress>'
);


INSERT INTO
    sink_kafka
SELECT
   cast(id as VARBINARY) as messageKey,
   cast(nm as VARBINARY) as `message`
FROM
    datahub_input;