Apache Paimon是一种流批统一的湖存储格式,支持高吞吐的写入和低延迟的查询。本文通过Paimon Catalog和MySQL连接器,将云数据库RDS中的订单数据和表结构变更导入Paimon表中,并使用Flink对Paimon表进行简单分析。
背景信息
Apache Paimon是一种流批统一的湖存储格式,支持高吞吐的写入和低延迟的查询。目前阿里云实时计算Flink版,以及开源大数据平台E-MapReduce上常见的计算引擎(例如Spark、Hive或Trino)都与Paimon有着较为完善的集成度。您可以借助Apache Paimon快速地在HDFS或者OSS上构建自己的数据湖存储服务,并接入计算引擎实现数据湖的分析。
前提条件
- 如果您使用RAM用户或RAM角色等身份访问,需要确认已具有Flink控制台相关权限,详情请参见权限管理。 
- 已创建Flink工作空间,详情请参见开通实时计算Flink版。 
步骤一:准备数据源
- 说明RDS MySQL版实例需要与Flink工作空间处于同一VPC。不在同一VPC下时请参见网络连通性。 创建名称为orders的数据库,并创建高权限账号或具有数据库orders读写权限的普通账号。 
- 第二步:连接RDS MySQL实例,在orders数据库中创建表orders_1和orders_2。 - CREATE TABLE `orders_1` ( orderkey BIGINT NOT NULL, custkey BIGINT, order_status VARCHAR(100), total_price DOUBLE, order_date DATE, order_priority VARCHAR(100), clerk VARCHAR(100), ship_priority INT, comment VARCHAR(100), PRIMARY KEY (orderkey) ); CREATE TABLE `orders_2` ( orderkey BIGINT NOT NULL, custkey BIGINT, order_status VARCHAR(100), total_price DOUBLE, order_date DATE, order_priority VARCHAR(100), clerk VARCHAR(100), ship_priority INT, comment VARCHAR(100), PRIMARY KEY (orderkey) );
- 插入如下测试数据。 - INSERT INTO `orders_1` VALUES (1, 1, 'O', 131251.81, '1996-01-02', '5-LOW', 'Clerk#000000951', 0, 'nstructions sleep furiously among '); INSERT INTO `orders_1` VALUES (2, 3, 'O', 40183.29, '1996-12-01', '1-URGENT', 'Clerk#000000880', 0, ' foxes. pending accounts at the pending, silent asymptot'); INSERT INTO `orders_1` VALUES (3, 6, 'F', 160882.76, '1993-10-14', '5-LOW', 'Clerk#000000955', 0, 'sly final accounts boost. carefully regular ideas cajole carefully. depos'); INSERT INTO `orders_1` VALUES (4, 6, 'O', 31084.79, '1995-10-11', '5-LOW', 'Clerk#000000124', 0, 'sits. slyly regular warthogs cajole. regular, regular theodolites acro'); INSERT INTO `orders_1` VALUES (5, 2, 'F', 86615.25, '1994-07-30', '5-LOW', 'Clerk#000000925', 0, 'quickly. bold deposits sleep slyly. packages use slyly'); INSERT INTO `orders_1` VALUES (6, 2, 'F', 36468.55, '1992-02-21', '4-NOT SPECIFIED', 'Clerk#000000058', 0, 'ggle. special, final requests are against the furiously specia'); INSERT INTO `orders_1` VALUES (7, 2, 'O', 171488.73, '1996-01-10', '2-HIGH', 'Clerk#000000470', 0, 'ly special requests '); INSERT INTO `orders_1` VALUES (8, 6, 'O', 116923.00, '1995-07-16', '2-HIGH', 'Clerk#000000616', 0, 'ise blithely bold, regular requests. quickly unusual dep'); INSERT INTO `orders_1` VALUES (9, 3, 'F', 99798.76, '1993-10-27', '3-MEDIUM', 'Clerk#000000409', 0, 'uriously. furiously final request'); INSERT INTO `orders_1` VALUES (10, 3, 'O', 41670.02, '1998-07-21', '3-MEDIUM', 'Clerk#000000223', 0, 'ly final packages. fluffily final deposits wake blithely ideas. spe'); INSERT INTO `orders_2` VALUES (11, 6, 'O', 148789.52, '1995-10-23', '4-NOT SPECIFIED', 'Clerk#000000259', 0, 'zzle. carefully enticing deposits nag furio'); INSERT INTO `orders_2` VALUES (12, 5, 'O', 38988.98, '1995-11-03', '1-URGENT', 'Clerk#000000358', 0, ' quick packages are blithely. slyly silent accounts wake qu'); INSERT INTO `orders_2` VALUES (13, 4, 'F', 113701.89, '1992-06-03', '3-MEDIUM', 'Clerk#000000456', 0, 'kly regular pinto beans. carefully unusual waters cajole never'); INSERT INTO `orders_2` VALUES (14, 6, 'O', 46366.56, '1996-08-21', '4-NOT SPECIFIED', 'Clerk#000000604', 0, 'haggle blithely. furiously express ideas haggle blithely furiously regular re'); INSERT INTO `orders_2` VALUES (15, 4, 'O', 219707.84, '1996-09-20', '3-MEDIUM', 'Clerk#000000659', 0, 'ole express, ironic requests: ir'); INSERT INTO `orders_2` VALUES (16, 1, 'F', 20065.73, '1994-07-16', '3-MEDIUM', 'Clerk#000000661', 0, 'wake fluffily. sometimes ironic pinto beans about the dolphin'); INSERT INTO `orders_2` VALUES (17, 0, 'P', 65883.92, '1995-03-18', '1-URGENT', 'Clerk#000000632', 0, 'ular requests are blithely pending orbits-- even requests against the deposit'); INSERT INTO `orders_2` VALUES (18, 6, 'F', 79258.24, '1994-01-20', '5-LOW', 'Clerk#000000743', 0, 'y pending requests integrate'); INSERT INTO `orders_2` VALUES (19, 2, 'O', 116227.05, '1996-12-19', '4-NOT SPECIFIED', 'Clerk#000000547', 0, 'symptotes haggle slyly around the furiously iron'); INSERT INTO `orders_2` VALUES (20, 1, 'O', 215135.72, '1998-04-18', '3-MEDIUM', 'Clerk#000000440', 0, ' pinto beans sleep carefully. blithely ironic deposits haggle furiously acro');
步骤二:创建Catalog
- 进入数据管理页面。 - 登录实时计算控制台。 
- 单击目标工作空间操作列下的控制台。 
- 单击数据管理。 
 
- 创建Paimon Catalog。 - 单击创建Catalog,在内置Catalog页签,选择Apache Paimon后,单击下一步。 
- 填写配置信息。  - 配置项 - 说明 - 备注 - catalog name - 填写自定义的Paimon Catalog名称。 - 在本例中为paimon-catalog。 - metastore - Paimon表的元数据存储类型: - filesystem:仅将元数据存储在OSS中。 
- dlf:除了将元数据存储在OSS上外,还会将元数据同步到阿里云数据湖构建服务DLF中。 
 - 本文选择filesystem。 - warehouse - Paimon Catalog的存储根目录,是一个OSS目录。可以选择创建实时计算Flink版时使用的OSS Bucket,也可以使用同一账号同一地域下的其他OSS Bucket。 - 格式为oss://<bucket>/<object>。其中: - bucket:表示您创建的OSS Bucket名称。 
- object:表示您存放数据的路径。 
 - 您可以在OSS管理控制台上查看您的bucket和object名称。 - fs.oss.endpoint - OSS服务的连接地址。 - 如果Flink与DLF位于同一地域,则使用VPC网络Endpoint,否则使用公网Endpoint。获取方法请参见OSS地域和访问域名。 - fs.oss.accessKeyId - 拥有读写OSS权限的阿里云账号或RAM账号的Accesskey ID。 - 如果您没有Accesskey ID,详情请参见创建AccessKey。 - fs.oss.accessKeySecret - 拥有读写OSS权限的阿里云账号或RAM账号的Accesskey secret。 - 本文使用变量的方式填写AccessKey Secret取值,避免明文泄露的风险,详情请参见项目变量。 
- 单击确定。 
 
- 创建MySQL Catalog。 - 单击创建Catalog,在内置Catalog页签,选择MySQL后,单击下一步。 
- 填写配置信息。  - 参数 - 说明 - 备注 - catalogname - MySQL Catalog名称。 - 本示例填写为mysql-catalog。 - hostname - MySQL数据库的IP地址或者Hostname。 - 本示例填写为RDS实例的内网地址。 - port - MySQL数据库服务的端口号。 - 默认值为3306。 - default-database - 默认的MySQL数据库名称。 - 本示例填写步骤一:准备数据源中创建的orders数据库。 - username - MySQL数据库服务的用户名。 - 填写您的数据库用户名。 - password - MySQL数据库服务的密码。 - 本文使用变量的方式填写AccessKey Secret取值,避免明文泄露的风险,详情请参见项目变量。 
- 单击确定。 
 
步骤三:创建Flink作业
- 在页面,单击新建。 
- 选择空白的流作业草稿,单击下一步。 
- 在新建作业草稿对话框,填写作业配置信息。 - 作业参数 - 说明 - 文件名称 - 作业的名称。 说明- 作业名称在当前项目中必须保持唯一。 - 存储位置 - 指定该作业的存储位置。 - 您还可以在现有文件夹右侧,单击  图标,新建子文件夹。 图标,新建子文件夹。- 引擎版本 - 当前作业使用的Flink的引擎版本。 
- 单击创建。 
- 输入以下语句,实时捕获orders数据库中相关表的变化,并同步到Paimon表中。 - -- 捕获表名符合正则表达式orders_\d+的MySQL表,将MySQL表的变化同步到Paimon的默认数据库orders表中 CREATE TABLE IF NOT EXISTS `paimon-catalog`.`default`.`orders` AS TABLE `mysql-catalog`.`orders`.`orders_\d+`;- CREATE TABLE AS语法的使用,详情请参见CREATE TABLE AS(CTAS)语句。 
- (可选)单击右上方的深度检查,确认作业Flink SQL语句中是否存在语法错误。 
- 单击右上方的部署,单击确定。 
- 在左侧导航栏,单击,单击目标作业名称,进入作业部署详情页面。 
- 单击运行参数配置区域右侧的编辑。 - 本文为了更快观察到任务运行的结果,将系统检查点间隔和两次系统检查点之间的最短时间间隔均改为10s,单击保存。  
- 在目标作业部署详情页顶部,单击启动,选择无状态启动后,单击启动。  
- 查询Paimon数据。 - 在页面的查询脚本页签,将如下代码拷贝到查询脚本。 - select custkey, sum(total_price) from `paimon-catalog`.`default`.`orders` group by custkey;
- 选中目标片段后,单击左侧代码行上的运行。  
 
步骤四:更新MySQL表结构
本部分将演示MySQL表结构变更同步到Paimon表的功能。
- 登录云数据库RDS控制台。 
- 在orders数据库,输入如下SQL语句,然后单击执行,为两张数据表添加一列,并填充一些数据。 - ALTER TABLE `orders_1` ADD COLUMN quantity BIGINT; ALTER TABLE `orders_2` ADD COLUMN quantity BIGINT; UPDATE `orders_1` SET `quantity` = 100 WHERE `orderkey` < 5; UPDATE `orders_2` SET `quantity` = 100 WHERE `orderkey` > 15;
- 在实时计算控制台页面的查询脚本页签,将如下代码拷贝到查询脚本。 - select * from `paimon-catalog`.`default`.`orders` where `quantity` is not null;- 选中目标片段后单击左侧代码行上的运行。  
相关文档
- 流式数据湖仓Paimon连接器可以配合Paimon Catalog使用,使用方法、功能等详情请参见流式数据湖仓Paimon。 
- Paimon Catalog的使用,详情请参见管理Paimon Catalog。