本文介绍基于Flink创建Paimon DLF Catalog,读取MySQL CDC数据并写入OSS,进一步将元数据同步到DLF,进而使用MaxCompute的external schema进行数据湖联邦查询。
前提条件
使用限制
支持地域
地域名称
地域ID
华东1(杭州)
cn-hangzhou
华东2(上海)
cn-shanghai
华北2(北京)
cn-beijing
华北3(张家口)
cn-zhangjiakou
华南1(深圳)
cn-shenzhen
中国香港
cn-hongkong
新加坡
ap-southeast-1
德国(法兰克福)
eu-central-1
MaxCompute、OSS、DLF、Flink必须部署在同一地域。
操作步骤
步骤一:授予MaxCompute访问DLF和OSS的权限
操作MaxCompute项目的账号未经授权无法访问DLF和OSS服务,授权方式包含如下两种:
步骤二:准备MySQL测试数据
如有其他MySQL测试数据,可跳过此步骤。
登录到RDS MySQL控制台,左上角选择地域。
在左侧导航栏,选择实例列表。在实例列表页面,单击目标实例ID,进入实例详情页。
在左侧导航栏,单击数据库管理。
单击新建数据库。配置如下参数:
参数
是否必填
说明
示例
数据库(DB)名称
必填
长度为2~64个字符。
以字母开头,以字母或数字结尾。
由小写字母、数字、下划线或中划线组成。
数据库名称在实例内必须是唯一的。
数据库名称中如果包含
-
,创建出的数据库的文件夹的名字中的-
会变成@002d
。
mysql_paimon
支持字符集
必填
请按需选择字符集。
utf8
授权账号
选填
选中需要访问本数据库的账号。本参数可以留空,创建数据库后再绑定账号。
此处仅会显示普通账号。高权限账号拥有所有数据库的所有权限,无需授权。
默认
备注说明
选填
用于备注该数据库的相关信息,便于后续数据库管理,最多支持256个字符。
创建flink测试库。
单击数据库管理页面,右上角登录数据库,在左侧导航栏选择数据库实例,选中已创建的数据库,在右侧SQLConsole页面执行下列语句,创建测试表并写入测试数据。
-- 创建表 CREATE TABLE sales ( id INT NOT NULL AUTO_INCREMENT, year INT NOT NULL, amount DECIMAL(10,2) NOT NULL, product_name VARCHAR(100) NOT NULL, customer_name VARCHAR(100) NOT NULL, order_date DATE NOT NULL, region VARCHAR(50) NOT NULL, status VARCHAR(20) NOT NULL, PRIMARY KEY (id,year) ) PARTITION BY RANGE (year) ( PARTITION p2020 VALUES LESS THAN (2021), PARTITION p2021 VALUES LESS THAN (2022), PARTITION p2022 VALUES LESS THAN (2023), PARTITION p2023 VALUES LESS THAN (2024) ); -- 写入数据 INSERT INTO sales (year, amount, product_name, customer_name, order_date, region, status) VALUES (2020, 100.00, 'Product A', 'Customer 1', '2020-01-01', 'Region 1', 'Completed'), (2020, 200.00, 'Product B', 'Customer 2', '2020-02-01', 'Region 2', 'Pending'), (2021, 150.00, 'Product C', 'Customer 3', '2021-03-01', 'Region 3', 'Completed'), (2021, 300.00, 'Product D', 'Customer 4', '2021-04-01', 'Region 4', 'Pending'), (2022, 250.00, 'Product E', 'Customer 5', '2022-05-01', 'Region 5', 'Completed'), (2022, 400.00, 'Product F', 'Customer 6', '2022-06-01', 'Region 6', 'Pending'), (2023, 350.00, 'Product G', 'Customer 7', '2023-07-01', 'Region 7', 'Completed'), (2023, 500.00, 'Product H', 'Customer 8', '2023-08-01', 'Region 8', 'Pending'), (2020, 450.00, 'Product I', 'Customer 9', '2020-09-01', 'Region 1', 'Completed'), (2021, 600.00, 'Product J', 'Customer 10', '2021-10-01', 'Region 2', 'Pending');
查询测试表数据。
SELECT * FROM sales;
返回结果:
步骤三:准备DLF元数据库
登录OSS控制台,创建Bucket,本示例中Bucket名为
mc-lakehouse-dlf-oss
。详情请参见创建存储空间。在Bucket下新建目录
flink_paimon
。在左侧导航栏,选择元数据 > 元数据管理。在当前元数据管理页面,单击数据库页签,在default数据目录下单击新建数据库。配置如下参数:
参数
是否必填
说明
所属数据目录
必填
示例中是default数据目录。
数据库名称
必填
自定义数据库名称,以字母开头,1-128位,a-ZA-Z0-9_,例如db_dlf_oss。
数据库描述
选填
自定义描述。
选择路径
必填
数据库存储位置,例如
oss://mc-lakehouse-dlf-oss/flink_paimon/
。
步骤四:基于Flink创建Paimon、MySQL catalog
创建Paimon catalog:
登录Flink控制台,单击目标工作空间名称。
在左侧菜单栏,选择数据管理。
在右侧Catalog列表界面,单击创建Catalog,在弹出的创建 Catalog对话框里,选择Apache Paimon,单击下一步并配置如下参数:
参数
是否必填
说明
metastore
必填
元数据存储类型。本示例中选择
dlf
。catalog name
必填
选择需要关联版本的DLF Catalog。
warehouse
必填
OSS服务中所指定的数仓目录。本示例中
oss://mc-lakehouse-dlf-oss/flink_paimon/
。fs.oss.endpoint
必填
OSS服务的endpoint,例如杭州地域为
oss-cn-hangzhou-internal.aliyuncs.com
。fs.oss.accessKeyId
必填
访问OSS服务所需的Access Key ID。
fs.oss.accessKeySecret
必填
访问OSS服务所需的Access Key Secret。
dlf.catalog.accessKeyId
必填
访问DLF服务所需的Access Key ID。
dlf.catalog.accessKeySecret
必填
访问DLF服务所需的Access Key Secret。
创建MySQL catalog:
登录Flink控制台。
添加白名单。
单击目标工作空间的操作列详情,复制网段信息。
在RDS MySQL控制台,单击左侧导航栏,选择实例列表。在实例列表页面,单击目标实例ID,进入实例详情页。
单击左侧导航栏白名单与安全组,在右侧白名单设置页签,单击修改。
在弹出的修改白名单分组对话框,组内白名单位置添加
步骤ii
中复制的网段信息,单击确定。
在Flink控制台,单击目标工作空间名称,在左侧菜单栏,选择数据管理。
在右侧Catalog列表界面,单击创建catalog,在弹出的创建 Catalog对话框里,选择MySQL,单击下一步并配置如下参数:
参数
是否必填
说明
catalog name
必填
自定义MySQL Catalog名称。
hostname
必填
MySQL数据库的IP地址或者Hostname。跨VPC或公网访问时需要打通网络,详情请参见网络连通性。
port
默认
连接到服务器的端口,默认为3306。
default database
必填
默认数据库名称。例如
mysql_paimon
。username
必填
连接MySQL数据库服务器时使用的用户名。可在实例详情页,单击账号管理查看。
password
必填
连接MySQL数据库服务器时使用的密码。可在实例详情页,单击账号管理查看。
步骤五:基于Flink读MySQL写Paimon并同步元数据到DLF
登录Flink控制台,单击目标工作空间名称。
在左侧导航栏,选择
,在作业草稿页签,单击,新建文件夹。
右键文件夹,选择新建流作业,在弹出的新建作业草稿对话框,填写文件名称并选择引擎版本。
在文件中写入如下CREATE TABLE AS(CTAS)SQL语句。注意根据实际命名修改代码中的相关命名。
CREATE TABLE IF NOT EXISTS `paimon_catalog_name`.`flink_paimon`.`sales` AS TABLE `mysql_catalog_name`.`mysql_paimon`.`sales`;
(可选)单击右上方的深度检查,确认作业Flink SQL语句中是否存在语法错误。
单击右上角部署,在弹出的部署新版本对话框中填写备注、作业标签和部署目标等信息,然后单击确定。
在左侧导航栏,,选择
,单击目标作业名称,进入作业部署详情页面。在目标作业部署详情页右上角,单击启动,选择无状态启动后,单击启动。
查询Paimon数据。在左侧导航栏,选择
页面的查询脚本页签,单击,新建查询脚本。运行如下代码:
SELECT * FROM `<paimon_catalog_name>`.`flink_paimon`.`sales`;
返回结果如下:
进入OSS控制台,查看
mc-lakehouse-dlf-oss/flink_paimon/
,生成sales/
文件夹,生成文件如图所示:登录数据湖构建(DLF)控制台。在左侧导航栏,选择元数据 > 元数据管理。单击数据库名
flink_paimon
,可查看到已生成的表,如图所示:
步骤六:MaxCompute创建DLF+OSS外部数据源
登录MaxCompute控制台,在左上角选择地域。
在左侧导航栏,选择管理配置 > 外部数据源。
在外部数据源页面,单击创建外部数据源。
在新增外部数据源对话框,根据界面提示配置相关参数。参数说明如下:
参数
说明
外部数据源类型
选择DLF+OSS。
外部数据源名称
可自定义命名。命名规则如下:
以字母开头,且只能包含小写字母、下划线和数字。
不能超过128个字符。
例如
mysql_paimon_dlf_mc_fs
。外部数据源描述
根据需要填写。
地域
默认为当前地域。
DLF Endpoint
默认为当前地域的DLF Endpoint。
OSS Endpoint
默认为当前地域的OSS Endpoint。
RoleARN
RAM角色的ARN信息。此角色需要包含能够同时访问DLF和OSS服务的权限。
您可以登录RAM访问控制台,在左侧导航栏选择身份管理>角色,单击对应的RAM角色名称,即可在基本信息区域获取ARN信息。
示例:
acs:ram::124****:role/aliyunodpsdefaultrole
。外部数据源补充属性
特殊声明的外部数据源补充属性。指定后,使用此外部数据源的任务可以按照参数定义的行为访问源系统。
说明支持的具体参数请关注后续官网文档更新说明,具体参数将随产品能力演进逐步放开。
单击确认,完成外部数据源的创建。
在外部数据源页面,单击数据源操作列的详情可查看数据源详细信息。
步骤七:创建外部schema
SET odps.namespace.schema=true;
CREATE EXTERNAL SCHEMA IF NOT EXISTS <external_schema>
WITH mc_dlf_oss_pt
ON '<dlf_data_catalogue>.dlf_database';
参数说明如下:
步骤八:使用SQL访问OSS数据
登录MaxCompute客户端,查询external schema内的表。
SET odps.namespace.schema=true;
SHOW tables IN es_mc_dlf_oss_paimon;
-- 返回结果:
ALIYUN$xxx:sales
OK
查询external schema内表数据。
SET odps.namespace.schema=true;
SELECT * FROM <project_name>.es_mc_dlf_oss_paimon.sales;
-- 返回结果如下:
+------------+------------+------------+--------------+---------------+------------+------------+------------+
| id | year | amount | product_name | customer_name | order_date | region | status |
+------------+------------+------------+--------------+---------------+------------+------------+------------+
| 1 | 2020 | 100 | Product A | Customer 1 | 2020-01-01 | Region 1 | Completed |
| 2 | 2020 | 200 | Product B | Customer 2 | 2020-02-01 | Region 2 | Pending |
| 3 | 2021 | 150 | Product C | Customer 3 | 2021-03-01 | Region 3 | Completed |
| 4 | 2021 | 300 | Product D | Customer 4 | 2021-04-01 | Region 4 | Pending |
| 5 | 2022 | 250 | Product E | Customer 5 | 2022-05-01 | Region 5 | Completed |
| 6 | 2022 | 400 | Product F | Customer 6 | 2022-06-01 | Region 6 | Pending |
| 7 | 2023 | 350 | Product G | Customer 7 | 2023-07-01 | Region 7 | Completed |
| 8 | 2023 | 500 | Product H | Customer 8 | 2023-08-01 | Region 8 | Pending |
| 9 | 2020 | 450 | Product I | Customer 9 | 2020-09-01 | Region 1 | Completed |
| 10 | 2021 | 600 | Product J | Customer 10 | 2021-10-01 | Region 2 | Pending |
+------------+------------+------------+--------------+---------------+------------+------------+------------+