阿里云实时计算Flink版结合DLF Paimon Catalog,实现Flink作业结果到数据湖的高效写入和元数据同步,支持无缝对接多种计算引擎并优化数据湖管理,本文为您介绍具体的操作流程。
背景信息
阿里云实时计算Flink版是一套基于Apache Flink构建的实时大数据分析平台,支持多种数据源和结果表类型。Flink任务可以利用数据湖统一存储的优势,使用Paimon表,将作业的结果输出到数据湖中,实现数据湖分析。在写入数据湖的过程中,Flink可以通过设置DLF Catalog,将表的元数据同步到数据湖构建(DLF)中。依托数据湖构建产品(DLF)提供的企业级统一元数据能力,Flink+DLF方案可以实现写入的数据湖表无缝对接阿里云上的计算引擎,如EMR、MaxCompute、Hologres等。也可以通过DLF提供的丰富的数据湖管理能力,实现数据湖生命周期管理和湖格式的优化。
前提条件
已创建Flink全托管工作空间,引擎版本须为VVR 8.0.9及以上。如未创建,详情请参见开通实时计算Flink版。
已创建DLF 2.0数据目录。如未创建,详情请参见创建数据目录。
说明如果是RAM用户,在进行数据操作之前,需要先授予相应的资源权限。详情请参见授权管理。
本文以MySQL数据源为例,需要创建RDS MySQL,详情请参见快速创建RDS MySQL实例。
创建的RDS MySQL需要和实时计算Flink版在同一个地域、同一个VPC内,RDS MySQL须为5.7及以上版本。
操作流程
步骤1:Flink创建DLF Paimon Catalog
登录Flink控制台。
进入创建Catalog页面。
在Flink全托管页签,单击目标工作空间操作列下的控制台。
在左侧导航栏,单击元数据管理。
单击创建Catalog。
创建DLF Paimon Catalog。
在创建Catalog页面,类型选择Apache Paimon Catalog。
metastore选择dlf,下拉列表中选择标有v2.0的Catalog,Catalog需要提前在DLF控制台创建。
步骤2:准备MySQL数据
登录准备好的MySQL实例,详情请参见通过DMS登录RDS MySQL。
执行如下命令,在已有数据库下创建一张表,并插入若干测试数据。
CREATE TABLE `student` ( `id` bigint(20) NOT NULL, `name` varchar(256) DEFAULT NULL, `age` bigint(20) DEFAULT NULL, PRIMARY KEY (`id`) ); INSERT INTO student VALUES (1,'name1',10); INSERT INTO student VALUES (2,'name2',20);
步骤3:创建Flink入湖作业
登录Flink控制台。
创建数据源表和目标表。
在Flink全托管页签,单击目标工作空间操作列下的控制台。
在左侧导航栏,选择数据开发 > 数据查询。
单击新建,创建一个新的查询脚本,执行如下SQL。
说明关于MySQL源表的参数设置和使用条件,请参考MySQL的CDC源表。
-- Catalog名为步骤1创建的DLF Catalog Name,本例中填dlf_clg_test CREATE DATABASE IF NOT EXISTS dlf_clg_test.dlf_testdb; --创建目标表 CREATE TABLE IF NOT EXISTS dlf_clg_test.dlf_testdb.student_paimon ( id BIGINT PRIMARY KEY NOT ENFORCED, name STRING, age BIGINT ) WITH( 'merge-engine' = 'partial-update', -- 使用部分更新数据合并机制产生宽表 'changelog-producer' = 'lookup', -- 使用lookup增量数据产生机制以低延时产出变更数据 'bucket' = '3' );
创建Flink入湖任务。
在SQL开发页面,创建一个新的SQL流作业,执行如下SQL。
-- 创建CDC源表 CREATE TEMPORARY TABLE student_source ( id INT, name VARCHAR (256), age INT, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', -- hostname替换为RDS的连接地址 'hostname' = 'rm-xxxxxxxx.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = '<RDS username>', 'password' = '<RDS password>', 'database-name' = '<RDS database>', -- table-name为数据源表,本例中填步骤2创建的student表 'table-name' = 'student' ); INSERT INTO dlf_clg_test.dlf_testdb.student_paimon SELECT * FROM student_source;
b. 单击保存。
c. 单击部署。
在左侧导航栏,单击运维中心 > 作业运维。找到上面创建的SQL流作业,单击启动。
启动成功后一段时间,可以看到作业的状态变成运行中。
步骤4:使用Flink进行数据分析
执行数据查询和分析。
在Flink全托管页签,单击目标工作空间操作列下的控制台。
在左侧导航栏,单击数据开发 > 数据查询。
在SQL开发页面,创建一个新的查询脚本,执行如下SQL。
SELECT count(1) FROM dlf_clg_test.dlf_testdb.student_paimon; SELECT * FROM dlf_clg_test.dlf_testdb.student_paimon;
单击运行,可以直接对Flink写入数据湖的数据进行查询和分析。