基于DLF1.0 +OSS读取湖上Paimon数据

本文介绍基于Flink创建Paimon DLF Catalog,读取MySQL CDC数据并写入OSS,进一步将元数据同步到DLF,进而使用MaxComputeexternal schema进行数据湖联邦查询。

前提条件

使用限制

  • 支持地域

    地域名称

    地域ID

    华东1(杭州)

    cn-hangzhou

    华东2(上海)

    cn-shanghai

    华北2(北京)

    cn-beijing

    华北3(张家口)

    cn-zhangjiakou

    华南1(深圳)

    cn-shenzhen

    中国香港

    cn-hongkong

    新加坡

    ap-southeast-1

    德国(法兰克福)

    eu-central-1

  • MaxCompute、OSS、DLF、Flink必须部署在同一地域。

操作步骤

步骤一:授予MaxCompute访问DLFOSS的权限

操作MaxCompute项目的账号未经授权无法访问DLFOSS服务,授权方式包含如下两种:

  • 一键授权:当创建MaxCompute项目的账号和部署DLF的账号相同时,推荐直接单击授权DLF一键授权。

  • 自定义授权:无论创建MaxCompute项目的账号和部署DLF的账号是否相同,都可以使用自定义授权DLF方式。

步骤二:准备MySQL测试数据

如有其他MySQL测试数据,可跳过此步骤。

  1. 登录到RDS MySQL控制台,左上角选择地域。

  2. 在左侧导航栏,选择实例列表。在实例列表页面,单击目标实例ID,进入实例详情页。

  3. 在左侧导航栏,单击数据库管理

  4. 单击新建数据库。配置如下参数:

    参数

    是否必填

    说明

    示例

    数据库(DB)名称

    必填

    • 长度为2~64个字符。

    • 以字母开头,以字母或数字结尾。

    • 由小写字母、数字、下划线或中划线组成。

    • 数据库名称在实例内必须是唯一的。

    • 数据库名称中如果包含-,创建出的数据库的文件夹的名字中的-会变成@002d

    mysql_paimon

    支持字符集

    必填

    请按需选择字符集。

    utf8

    授权账号

    选填

    • 选中需要访问本数据库的账号。本参数可以留空,创建数据库后再绑定账号

    • 此处仅会显示普通账号。高权限账号拥有所有数据库的所有权限,无需授权。

    默认

    备注说明

    选填

    用于备注该数据库的相关信息,便于后续数据库管理,最多支持256个字符。

    创建flink测试库。

  5. 单击数据库管理页面,右上角登录数据库,在左侧导航栏选择数据库实例,选中已创建的数据库,在右侧SQLConsole页面执行下列语句,创建测试表并写入测试数据。

    -- 创建表
    CREATE TABLE sales (
        id INT NOT NULL AUTO_INCREMENT,
        year INT NOT NULL,
        amount DECIMAL(10,2) NOT NULL,
        product_name VARCHAR(100) NOT NULL,
        customer_name VARCHAR(100) NOT NULL,
        order_date DATE NOT NULL,
        region VARCHAR(50) NOT NULL,
        status VARCHAR(20) NOT NULL,
        PRIMARY KEY (id,year)
    ) PARTITION BY RANGE (year) (
        PARTITION p2020 VALUES LESS THAN (2021),
        PARTITION p2021 VALUES LESS THAN (2022),
        PARTITION p2022 VALUES LESS THAN (2023),
        PARTITION p2023 VALUES LESS THAN (2024)
    );
    
    -- 写入数据
    INSERT INTO sales (year, amount, product_name, customer_name, order_date, region, status) VALUES
    (2020, 100.00, 'Product A', 'Customer 1', '2020-01-01', 'Region 1', 'Completed'),
    (2020, 200.00, 'Product B', 'Customer 2', '2020-02-01', 'Region 2', 'Pending'),
    (2021, 150.00, 'Product C', 'Customer 3', '2021-03-01', 'Region 3', 'Completed'),
    (2021, 300.00, 'Product D', 'Customer 4', '2021-04-01', 'Region 4', 'Pending'),
    (2022, 250.00, 'Product E', 'Customer 5', '2022-05-01', 'Region 5', 'Completed'),
    (2022, 400.00, 'Product F', 'Customer 6', '2022-06-01', 'Region 6', 'Pending'),
    (2023, 350.00, 'Product G', 'Customer 7', '2023-07-01', 'Region 7', 'Completed'),
    (2023, 500.00, 'Product H', 'Customer 8', '2023-08-01', 'Region 8', 'Pending'),
    (2020, 450.00, 'Product I', 'Customer 9', '2020-09-01', 'Region 1', 'Completed'),
    (2021, 600.00, 'Product J', 'Customer 10', '2021-10-01', 'Region 2', 'Pending');
  6. 查询测试表数据。

    SELECT * FROM sales;

    返回结果:

    image

步骤三:准备DLF元数据库

  1. 登录OSS控制台,创建Bucket,本示例中Bucket名为mc-lakehouse-dlf-oss。详情请参见创建存储空间

  2. Bucket下新建目录flink_paimon

  3. 登录数据湖构建(DLF)控制台

  4. 在左侧导航栏,选择元数据 > 元数据管理。在当前元数据管理页面,单击数据库页签,在default数据目录下单击新建数据库。配置如下参数:

    参数

    是否必填

    说明

    所属数据目录

    必填

    示例中是default数据目录。

    数据库名称

    必填

    自定义数据库名称,以字母开头,1-128位,a-ZA-Z0-9_,例如db_dlf_oss。

    数据库描述

    选填

    自定义描述。

    选择路径

    必填

    数据库存储位置,例如oss://mc-lakehouse-dlf-oss/flink_paimon/

步骤四:基于Flink创建Paimon、MySQL catalog

  1. 创建Paimon catalog

    1. 登录Flink控制台,单击目标工作空间名称。

    2. 在左侧菜单栏,选择数据管理

    3. 在右侧Catalog列表界面,单击创建Catalog,在弹出的创建 Catalog对话框里,选择Apache Paimon,单击下一步并配置如下参数:

      参数

      是否必填

      说明

      metastore

      必填

      元数据存储类型。本示例中选择dlf

      catalog name

      必填

      选择需要关联版本的DLF Catalog。

      warehouse

      必填

      OSS服务中所指定的数仓目录。本示例中oss://mc-lakehouse-dlf-oss/flink_paimon/

      fs.oss.endpoint

      必填

      OSS服务的endpoint,例如杭州地域为oss-cn-hangzhou-internal.aliyuncs.com

      fs.oss.accessKeyId

      必填

      访问OSS服务所需的Access Key ID。

      fs.oss.accessKeySecret

      必填

      访问OSS服务所需的Access Key Secret。

      dlf.catalog.accessKeyId

      必填

      访问DLF服务所需的Access Key ID。

      dlf.catalog.accessKeySecret

      必填

      访问DLF服务所需的Access Key Secret。

  2. 创建MySQL catalog

    1. 登录Flink控制台

    2. 添加白名单。

      1. 单击目标工作空间的操作列详情,复制网段信息。

      2. RDS MySQL控制台,单击左侧导航栏,选择实例列表。在实例列表页面,单击目标实例ID,进入实例详情页。

      3. 单击左侧导航栏白名单与安全组,在右侧白名单设置页签,单击修改

      4. 在弹出的修改白名单分组对话框,组内白名单位置添加步骤ii中复制的网段信息,单击确定

    3. Flink控制台,单击目标工作空间名称,在左侧菜单栏,选择数据管理

    4. 在右侧Catalog列表界面,单击创建catalog,在弹出的创建 Catalog对话框里,选择MySQL,单击下一步并配置如下参数:

      参数

      是否必填

      说明

      catalog name

      必填

      自定义MySQL Catalog名称。

      hostname

      必填

      MySQL数据库的IP地址或者Hostname。VPC公网访问时需要打通网络,详情请参见网络连通性

      port

      默认

      连接到服务器的端口,默认为3306。

      default database

      必填

      默认数据库名称。例如mysql_paimon

      username

      必填

      连接MySQL数据库服务器时使用的用户名。可在实例详情页,单击账号管理查看。

      password

      必填

      连接MySQL数据库服务器时使用的密码。可在实例详情页,单击账号管理查看。

步骤五:基于FlinkMySQLPaimon并同步元数据到DLF

  1. 登录Flink控制台,单击目标工作空间名称。

  2. 在左侧导航栏,选择数据开发 > ETL,在作业草稿页签,单击image,新建文件夹。

  3. 右键文件夹,选择新建流作业,在弹出的新建作业草稿对话框,填写文件名称并选择引擎版本

  4. 在文件中写入如下CREATE TABLE AS(CTAS)SQL语句。注意根据实际命名修改代码中的相关命名。

    CREATE TABLE IF NOT EXISTS `paimon_catalog_name`.`flink_paimon`.`sales` 
    AS TABLE `mysql_catalog_name`.`mysql_paimon`.`sales`;
    1. (可选)单击右上方的深度检查,确认作业Flink SQL语句中是否存在语法错误。

    2. 单击右上角部署,在弹出的部署新版本对话框中填写备注、作业标签和部署目标等信息,然后单击确定

  5. 在左侧导航栏,,选择运维中心 > 作业运维,单击目标作业名称,进入作业部署详情页面。

  6. 在目标作业部署详情页右上角,单击启动,选择无状态启动后,单击启动

  7. 查询Paimon数据。在左侧导航栏,选择数据开发 > 数据查询页面的查询脚本页签,单击image,新建查询脚本。运行如下代码:

    SELECT * FROM `<paimon_catalog_name>`.`flink_paimon`.`sales`;

    返回结果如下:

    image

  8. 进入OSS控制台,查看mc-lakehouse-dlf-oss/flink_paimon/,生成sales/文件夹,生成文件如图所示:

    image

  9. 登录数据湖构建(DLF)控制台。在左侧导航栏,选择元数据 > 元数据管理。单击数据库名flink_paimon,可查看到已生成的表,如图所示:

    image

步骤六:MaxCompute创建DLF+OSS外部数据源

  1. 登录MaxCompute控制台,在左上角选择地域。

  2. 在左侧导航栏,选择管理配置 > 外部数据源

  3. 外部数据源页面,单击创建外部数据源

  4. 新增外部数据源对话框,根据界面提示配置相关参数。参数说明如下:

    参数

    说明

    外部数据源类型

    选择DLF+OSS

    外部数据源名称

    可自定义命名。命名规则如下:

    • 以字母开头,且只能包含小写字母、下划线和数字。

    • 不能超过128个字符。

    例如mysql_paimon_dlf_mc_fs

    外部数据源描述

    根据需要填写。

    地域

    默认为当前地域。

    DLF Endpoint

    默认为当前地域的DLF Endpoint。

    OSS Endpoint

    默认为当前地域的OSS Endpoint。

    RoleARN

    RAM角色的ARN信息。此角色需要包含能够同时访问DLFOSS服务的权限。

    您可以登录RAM访问控制台,在左侧导航栏选择身份管理>角色,单击对应的RAM角色名称,即可在基本信息区域获取ARN信息。

    示例:acs:ram::124****:role/aliyunodpsdefaultrole

    外部数据源补充属性

    特殊声明的外部数据源补充属性。指定后,使用此外部数据源的任务可以按照参数定义的行为访问源系统。

    说明

    支持的具体参数请关注后续官网文档更新说明,具体参数将随产品能力演进逐步放开。

  5. 单击确认,完成外部数据源的创建。

  6. 外部数据源页面,单击数据源操作列的详情可查看数据源详细信息。

步骤七:创建外部schema

SET odps.namespace.schema=true;

CREATE EXTERNAL SCHEMA IF NOT EXISTS <external_schema>
WITH mc_dlf_oss_pt
ON '<dlf_data_catalogue>.dlf_database';

参数说明如下:

  • external_schema:自定义外部Schema名称。例如es_mc_dlf_oss_paimon

  • external_data_source:上述已创建的外部数据源名称,外部Schema归属的项目必须与外部数据源处于同一地域。例如mysql_paimon_dlf_mc_fs

  • dlf_data_catalogueDLF数据目录ID。数据目录创建方法请参见新建数据目录。例如122****

  • dlf_database:DLF中指定数据目录下的数据库名称。详情请参见数据库表及函数。例如flink_paimon

步骤八:使用SQL访问OSS数据

  1. 登录MaxCompute客户端,查询external schema内的表。

SET odps.namespace.schema=true;
SHOW tables IN es_mc_dlf_oss_paimon;

-- 返回结果:
ALIYUN$xxx:sales

OK
  1. 查询external schema内表数据。

SET odps.namespace.schema=true;
SELECT * FROM <project_name>.es_mc_dlf_oss_paimon.sales;

-- 返回结果如下:
+------------+------------+------------+--------------+---------------+------------+------------+------------+
| id         | year       | amount     | product_name | customer_name | order_date | region     | status     | 
+------------+------------+------------+--------------+---------------+------------+------------+------------+
| 1          | 2020       | 100        | Product A    | Customer 1    | 2020-01-01 | Region 1   | Completed  | 
| 2          | 2020       | 200        | Product B    | Customer 2    | 2020-02-01 | Region 2   | Pending    | 
| 3          | 2021       | 150        | Product C    | Customer 3    | 2021-03-01 | Region 3   | Completed  | 
| 4          | 2021       | 300        | Product D    | Customer 4    | 2021-04-01 | Region 4   | Pending    | 
| 5          | 2022       | 250        | Product E    | Customer 5    | 2022-05-01 | Region 5   | Completed  | 
| 6          | 2022       | 400        | Product F    | Customer 6    | 2022-06-01 | Region 6   | Pending    | 
| 7          | 2023       | 350        | Product G    | Customer 7    | 2023-07-01 | Region 7   | Completed  | 
| 8          | 2023       | 500        | Product H    | Customer 8    | 2023-08-01 | Region 8   | Pending    | 
| 9          | 2020       | 450        | Product I    | Customer 9    | 2020-09-01 | Region 1   | Completed  | 
| 10         | 2021       | 600        | Product J    | Customer 10   | 2021-10-01 | Region 2   | Pending    | 
+------------+------------+------------+--------------+---------------+------------+------------+------------+