Flink全托管产品提供丰富强大的日志数据实时入仓能力。本文为您介绍如何在Flink全托管控制台上快速构建一个从Kafka到Hologres的数据同步作业。
背景信息
假设消息队列Kafka实例中有一个名称为users的Topic,其中有100条JSON数据,代表通过日志文件采集工具或者应用写入Kafka的日志数据,其数据分布大致如下图所示。

此时,如果您希望创建一个数据同步的作业,将该Topic中的日志数据都同步到Hologres中,则可以按照以下步骤进行:
本文使用Flink全托管提供的CREATE TABLE AS(CTAS)语句,一键完成日志数据的同步,以及实时的表结构变更同步。
前提条件
- 已准备阿里云账号,并确保账户余额充足。
- 账号注册操作步骤,请参见账号注册。
- 阿里云账户余额不少于100.00元人民币或等值的代金券或优惠券。
- 已创建Flink全托管工作空间并完成角色授权,详情请参见开通流程和阿里云账号角色授权。
- 上下游存储
- 已创建消息队列Kafka实例,详情请参见步骤三:创建资源。
- 已创建Hologres实例,详情请参见购买Hologres。
说明 消息队列Kafka和Hologres需要与Flink全托管工作空间在相同地域相同VPC下,否则需要打通网络,详情请参见如何访问跨VPC的其他服务?或Flink全托管集群如何访问公网?。
步骤一:配置IP白名单
为了让Flink能访问Kafka和Hologres实例,您需要将Flink全托管工作空间的网段添加到在Kafka和Hologres的白名单中。
步骤二:准备Kafka测试数据
使用Flink全托管的模拟数据生成源表作为数据生成器,将数据写入到Kafka中。请按以下步骤使用Flink全托管开发控制台将数据写入至消息队列Kafka。
步骤三:创建Hologres Catalog
单表同步都需要依赖目标Catalog来创建目标表。因此,您需要通过控制台创建目标Catalog。本文将以目标Catalog为Hologres Catalog为例,为您进行介绍。
步骤四:创建并启动数据同步作业
步骤五:观察全量同步结果
步骤六:观察自动同步表结构变更
- 在Kafka控制台手动发送一条包含新增列的消息。
- 在Hologres控制台,查看sync_kafka_users表结构和数据的变化。
(可选)步骤七:调整作业资源配置
根据数据量的不同,我们往往需要调节不同节点的并发和资源,以达到更优的作业性能。您可以使用资源配置的基础模式简单配置作业并发度和CU数,也可以使用资源配置的专家模式细粒度地调整节点的并发和资源。
相关文档
- CREATE TABLE AS (CTAS) 语法功能介绍,请参见CREATE TABLE AS(CTAS)语句。
- 消息队列Kafka作为表结构变更数据源功能介绍,请参见消息队列Kafka源表。