Flink全托管产品提供丰富强大的日志数据实时入仓能力。本文为您介绍如何在Flink全托管控制台上快速构建一个从Kafka到Hologres的数据同步作业。
背景信息
假设消息队列Kafka实例中有一个名称为users的Topic,其中有100条JSON数据,代表通过日志文件采集工具或者应用写入Kafka的日志数据,其数据分布大致如下图所示。

此时,如果您希望创建一个数据同步的作业,将该Topic中的日志数据都同步到Hologres中,则可以按照以下步骤进行:
本文使用Flink全托管提供的CREATE TABLE AS(CTAS)语句,一键完成日志数据的同步,以及实时的表结构变更同步。
前提条件
- 已准备阿里云账号,并确保账户余额充足。
- 账号注册操作步骤,请参见账号注册。
- 阿里云账户余额不少于100.00元人民币或等值的代金券或优惠券。
- 已创建Flink全托管工作空间并完成角色授权,详情请参见开通Flink全托管和阿里云账号角色授权。
- 上下游存储
- 已创建消息队列Kafka实例,详情请参见步骤三:创建资源。
- 已创建Hologres实例,详情请参见购买Hologres。
说明 消息队列Kafka和Hologres需要与Flink全托管工作空间在相同地域相同VPC下,否则需要打通网络,详情请参见如何访问跨VPC的其他服务?或Flink全托管集群如何访问公网?。
步骤一:配置IP白名单
为了让Flink能访问Kafka和Hologres实例,您需要将Flink全托管工作空间的网段添加到在Kafka和Hologres的白名单中。
步骤二:准备Kafka测试数据
使用Flink全托管的模拟数据生成源表作为数据生成器,将数据写入到Kafka中。请按以下步骤使用Flink全托管开发控制台将数据写入至消息队列Kafka。
- 在Kafka控制台创建一个名称为users的Topic。操作详情请参见步骤一:创建Topic。
- 创建将数据写入到Kafka的作业。
- 启动作业。
步骤三:创建Hologres Catalog
单表同步都需要依赖目标Catalog来创建目标表。因此,您需要通过控制台创建目标Catalog。本文将以目标Catalog为Hologres Catalog为例,为您进行介绍。
- 创建名称为holo的Hologres Catalog。
- 在元数据页签,确认已创建名为holo的Catalog。
步骤四:创建并启动数据同步作业
- 登录Flink全托管开发控制台,创建数据同步作业。
- 将以下作业代码拷贝到作业文本编辑区,修改代码中的参数配置信息。将消息队列Kafka中名称为users的Topic数据同步至Hologres的flink_test_db数据库的sync_kafka_users表中。您可以通过以下任意一种方式进行:说明 在同步过程中,建议声明Kafka的Metadata partition和offset作为Hologres表中的主键。这样可以避免由于作业Failover,数据重发导致下游存储多份相同数据。
- 通过CATS语句同步该方式无需您手动在Hologres中创建该表,也无需指明对应的列类型为JSON或JSONB。
CREATE TEMPORARY TABLE kafka_users ( `id` INT NOT NULL, `address` STRING, `offset` BIGINT NOT NULL METADATA, `partition` BIGINT NOT NULL METADATA, `timestamp` TIMESTAMP METADATA, `date` AS CAST(`timestamp` AS DATE), `country` AS JSON_VALUE(`address`, '$.country'), PRIMARY KEY (`partition`, `offset`) NOT ENFORCED ) WITH ( 'connector' = 'kafka', 'properties.bootstrap.servers' = 'alikafka-host1.aliyuncs.com:9000,alikafka-host2.aliyuncs.com:9000,alikafka-host3.aliyuncs.com:9000', 'topic' = 'users', 'format' = 'json', 'json.infer-schema.flatten-nested-columns.enable' = 'true', -- 自动展开嵌套列。 'scan.startup.mode' = 'earliest-offset' ); CREATE TABLE IF NOT EXISTS holo.flink_test_db.sync_kafka_users WITH ( 'connector' = 'hologres' ) AS TABLE kafka_users;
说明 为了避免作业Failover后,作业重启将重复数据写入到Hologres中,您可以添加相关主键从而唯一地标识数据。当数据重发时,Hologres将会保证相同partition和offset的数据只会保留一份。 - 通过INSERT INTO语句同步
考虑到Hologres中对于JSON和JSONB类型的数据会进行特殊的优化,您也可以通过INSERT INTO语句将嵌套JSON写入到Hologres中。
该方式需要您手动在Hologres中创建该表并指明需要对应的列类型为JSON或JSONB,然后通过下文的SQL,会将address数据写入到 Hologres中类型为JSON的列。CREATE TEMPORARY TABLE kafka_users ( `id` INT NOT NULL, `address` STRING, -- 该列对应的数据为嵌套JSON。 `offset` BIGINT NOT NULL METADATA, `partition` BIGINT NOT NULL METADATA, `timestamp` TIMESTAMP METADATA, `date` AS CAST(`timestamp` AS DATE), `country` AS JSON_VALUE(`address`, '$.country') ) WITH ( 'connector' = 'kafka', 'properties.bootstrap.servers' = 'alikafka-host1.aliyuncs.com:9000,alikafka-host2.aliyuncs.com:9000,alikafka-host3.aliyuncs.com:9000', 'topic' = 'users', 'format' = 'json', 'json.infer-schema.flatten-nested-columns.enable' = 'true', -- 自动展开嵌套列。 'scan.startup.mode' = 'earliest-offset' ); CREATE TEMPORARY TABLE holo ( `id` INT NOT NULL, `address` STRING, `offset` BIGINT, `partition` BIGINT, `timestamp` TIMESTAMP, `date` DATE, `country` STRING ) WITH ( 'connector' = 'hologres', 'endpoint' = 'hgpostcn-****-cn-beijing-vpc.hologres.aliyuncs.com:80', 'username' = 'LTAI5tE572UJ44Xwhx6i****', 'password' = 'KtyIXK3HIDKA9VzKX4tpct9xTm****', 'dbname' = 'flink_test_db', 'tablename' = 'sync_kafka_users' ); INSERT INTO holo SELECT * FROM kafka_users;
在上述作业代码中配置如下参数信息。参数 示例值 说明 properties.bootstrap.servers alikafka-host1.aliyuncs.com:9000,alikafka-host2.aliyuncs.com:9000,alikafka-host3.aliyuncs.com:9000 Kafka Broker地址。 格式为host:port,host:port,host:port,以英文逗号(,)分割。
topic users Kafka Topic名称。 endpoint hgpostcn-****-cn-beijing-vpc.hologres.aliyuncs.com:80 Hologres端点。 格式为<ip>:<port>。
username LTAI5tE572UJ44Xwhx6i**** Hologres用户名,请填写阿里云账号的AccessKey ID。 password KtyIXK3HIDKA9VzKX4tpct9xTm**** Hologres密码,请填写阿里云账号的AccessKey Secret。 dbname flink_test_db Hologres数据库名称。 tablename sync_kafka_users Hologres表名称。 说明- 如果您通过INSERT INTO方式同步数据,则需要提前在目标实例的数据库中创建sync_kafka_users表和字段。
- 如果Schema不为Public时,则tablename需要填写为schema.tableName。
json.infer-schema.flatten-nested-columns.enable true 该参数取值说明如下。 - true:Flink会自动展平新增的嵌套列,并用访问该列的路径作为展开后的列的名字。
- false:禁用该功能。
- 通过CATS语句同步
- 在SQL开发页面,单击部署后,在弹出的对话框中单击确认。
- 在作业运维页面,单击目标作业名称操作列中的启动,配置详情请参见作业启动。
- 单击启动。作业启动后,您可以在作业运维界面观察作业的运行信息和状态。
步骤五:观察全量同步结果
- 登录Hologres管理控制台。
- 在实例列表页面,单击目标实例名称。
- 在页面右上角,单击登录实例。
- 在元数据管理页签,查看users数据库中同步的sync_kafka_users表结构和数据。同步后的表结构和数据如下图所示。
- 表结构
双击sync_kafka_users表名称,查看表结构。
- 表数据在sync_kafka_users表信息页面右上角,单击查询表后,输入如下命令,单击运行。
SELECT * FROM public.sync_kafka_users order by partition, "offset";
表数据结果如下图所示。
- 表结构
步骤六:观察自动同步表结构变更
- 在Kafka控制台手动发送一条包含新增列的消息。
- 在Hologres控制台,查看sync_kafka_users表结构和数据的变化。
(可选)步骤七:调整作业资源配置
根据数据量的不同,往往需要调节不同节点的并发和资源,以达到更优的作业性能。您可以使用资源配置的基础模式简单配置作业并发度和CU数,也可以使用资源配置的专家模式细粒度地调整节点的并发和资源。
- 登录Flink全托管开发控制台,进入作业详情页面。
- 登录实时计算控制台。
- 在Flink全托管页签,单击目标工作空间操作列下的控制台。
- 在左侧导航栏,单击作业运维。
- 单击目标作业名称,在部署详情页签下,单击资源配置区域右上角的编辑。
- 修改资源配置。
- 单击目标作业名称,在作业总览页面,查看调整效果。
相关文档
- CREATE TABLE AS (CTAS) 语法功能介绍,请参见CREATE TABLE AS(CTAS)语句。
- 消息队列Kafka作为表结构变更数据源功能介绍,请参见消息队列Kafka源表。