文档

Flink+DLF数据入湖与分析实践

更新时间:

阿里云实时计算Flink版结合DLF Paimon Catalog,实现Flink作业结果到数据湖的高效写入和元数据同步,支持无缝对接多种计算引擎并优化数据湖管理,本文为您介绍具体的操作流程。

背景信息

阿里云实时计算Flink版是一套基于Apache Flink构建的实时大数据分析平台,支持多种数据源和结果表类型。Flink任务可以利用数据湖统一存储的优势,使用Paimon表,将作业的结果输出到数据湖中,实现数据湖分析。在写入数据湖的过程中,Flink可以通过设置DLF Catalog,将表的元数据同步到数据湖构建(DLF)中。依托数据湖构建产品(DLF)提供的企业级统一元数据能力,Flink+DLF方案可以实现写入的数据湖表无缝对接阿里云上的计算引擎,如EMR、MaxCompute、Hologres等。也可以通过DLF提供的丰富的数据湖管理能力,实现数据湖生命周期管理和湖格式的优化。

前提条件

  • 已创建Flink全托管工作空间,引擎版本须为VVR 8.0.9及以上。如未创建,详情请参见开通实时计算Flink版

  • 已创建DLF 2.0数据目录。如未创建,详情请参见创建数据目录

    说明

    如果是RAM用户,在进行数据操作之前,需要先授予相应的资源权限。详情请参见授权管理

  • 本文以MySQL数据源为例,需要创建RDS MySQL,详情请参见快速创建RDS MySQL实例

说明

创建的RDS MySQL需要和实时计算Flink版在同一个地域、同一个VPC内,RDS MySQL须为5.7及以上版本。

操作流程

步骤1:Flink创建DLF Paimon Catalog

  1. 登录Flink控制台

  2. 进入创建Catalog页面。

    1. Flink全托管页签,单击目标工作空间操作列下的控制台

    2. 在左侧导航栏,单击元数据管理

    3. 单击创建Catalog

  3. 创建DLF Paimon Catalog。

    1. 创建Catalog页面,类型选择Apache Paimon Catalog。

    2. metastore选择dlf,下拉列表中选择标有v2.0的Catalog,Catalog需要提前在DLF控制台创建。

    image

步骤2:准备MySQL数据

  1. 登录准备好的MySQL实例,详情请参见通过DMS登录RDS MySQL

  2. 执行如下命令,在已有数据库下创建一张表,并插入若干测试数据。

    CREATE TABLE `student` (
      `id` bigint(20) NOT NULL,
      `name` varchar(256) DEFAULT NULL,
      `age` bigint(20) DEFAULT NULL,
      PRIMARY KEY (`id`)
    );
    
    INSERT INTO student VALUES (1,'name1',10);
    INSERT INTO student VALUES (2,'name2',20);

步骤3:创建Flink入湖作业

  1. 登录Flink控制台

  2. 创建数据源表和目标表。

    1. Flink全托管页签,单击目标工作空间操作列下的控制台

    2. 在左侧导航栏,选择数据开发 > 数据查询

    3. 单击新建,创建一个新的查询脚本,执行如下SQL。

      说明

      关于MySQL源表的参数设置和使用条件,请参考MySQL的CDC源表

      -- Catalog名为步骤1创建的DLF Catalog Name,本例中填dlf_clg_test
      CREATE DATABASE IF NOT EXISTS dlf_clg_test.dlf_testdb;
      
      --创建目标表
      CREATE TABLE IF NOT EXISTS dlf_clg_test.dlf_testdb.student_paimon (
        id    BIGINT PRIMARY KEY NOT ENFORCED,
        name  STRING,
        age    BIGINT
      ) WITH(
        'merge-engine' = 'partial-update', -- 使用部分更新数据合并机制产生宽表
        'changelog-producer' = 'lookup', -- 使用lookup增量数据产生机制以低延时产出变更数据
        'bucket' = '3'
      );
  3. 创建Flink入湖任务。

    1. 在SQL开发页面,创建一个新的SQL流作业,执行如下SQL。

      -- 创建CDC源表
      CREATE TEMPORARY TABLE student_source (
        id INT,
        name VARCHAR (256),
        age INT,
        PRIMARY KEY (id) NOT ENFORCED
      )
      WITH (
        'connector' = 'mysql-cdc',
        -- hostname替换为RDS的连接地址
        'hostname' = 'rm-xxxxxxxx.mysql.rds.aliyuncs.com',
        'port' = '3306',
        'username' = '<RDS username>',
        'password' = '<RDS password>',
        'database-name' = '<RDS database>',
        -- table-name为数据源表,本例中填步骤2创建的student表
        'table-name' = 'student'
      );
      
      INSERT INTO dlf_clg_test.dlf_testdb.student_paimon SELECT * FROM student_source;

    b. 单击保存

    c. 单击部署

  4. 在左侧导航栏,单击运维中心 > 作业运维。找到上面创建的SQL流作业,单击启动

  5. 启动成功后一段时间,可以看到作业的状态变成运行中

步骤4:使用Flink进行数据分析

  1. 执行数据查询和分析。

    1. Flink全托管页签,单击目标工作空间操作列下的控制台

    2. 在左侧导航栏,单击数据开发 > 数据查询

    3. 在SQL开发页面,创建一个新的查询脚本,执行如下SQL。

    SELECT count(1) FROM dlf_clg_test.dlf_testdb.student_paimon;
    SELECT * FROM dlf_clg_test.dlf_testdb.student_paimon;
  2. 单击运行,可以直接对Flink写入数据湖的数据进行查询和分析。