通过实时计算Flink和Paimon实现流批一体

本文演示了使用实时计算Flink版和EMR Serverless Spark构建Paimon数据湖分析流程。该流程包括将数据写入OSS、进行交互式查询以及执行离线数据Compact操作。EMR Serverless Spark完全兼容Paimon,通过内置的DLF元数据与其他云产品(例如,实时计算Flink版)实现元数据互通,形成完整的流批一体化解决方案。它支持灵活的任务运行方式和参数配置,满足实时分析和生产调度的多种需求。

背景信息

实时计算Flink

阿里云实时计算Flink版是一种全托管ServerlessFlink云服务,是一站式开发运维管理平台,开箱即用,计费灵活。具备作业开发、数据调试、运行与监控、自动调优、智能诊断等全生命周期能力。更多信息,请参见什么是阿里云实时计算Flink

Apache Paimon

Apache Paimon是一种统一的数据湖存储格式,结合FlinkSpark构建了流批处理的实时湖仓一体架构。Paimon创新地将湖格式与LSM(Log-structured merge-tree)技术结合,使数据湖具备了实时流更新和完整的流处理能力。更多信息,请参见Apache Paimon

操作流程

步骤一:通过实时计算Flink创建Paimon Catalog

Paimon Catalog可以方便地管理同一个warehouse目录下的所有Paimon表,并与其它阿里云产品连通。创建并使用Paimon Catalog,详情请参见管理Paimon Catalog

  1. 登录实时计算控制台

  2. 单击目标工作空间操作列下的控制台

  3. 创建Paimon Catalog。

    1. 在左侧导航栏,选择数据开发 > 数据查询

    2. 单击image,新建查询脚本。

    3. 填写SQL代码。

      Catalog完整配置如下所示。

      CREATE CATALOG `paimon` WITH (
        'type' = 'paimon',
        'metastore' = 'dlf',
        'warehouse' = '<warehouse>',
        'dlf.catalog.id' = '<dlf.catalog.id>',
        'dlf.catalog.accessKeyId' = '<dlf.catalog.accessKeyId>',
        'dlf.catalog.accessKeySecret' = '<dlf.catalog.accessKeySecret>',
        'dlf.catalog.endpoint' = '<dlf.catalog.endpoint>',
        'dlf.catalog.region' = '<dlf.catalog.region>',
      );

      配置项

      说明

      是否必填

      备注

      paimon

      Paimon Catalog名称。

      请填写为自定义的英文名。

      type

      Catalog类型。

      固定值为paimon。

      metastore

      元数据存储类型。

      本文示例元数据存储类型选择dlf,通过DLF实现统一的元数据管理,实现多引擎无缝衔接。

      warehouse

      配置数据仓库的实际位置。

      请根据实际情况修改。

      dlf.catalog.id

      DLF数据目录ID。

      请在数据湖构建控制台上查看数据目录对应的ID,具体操作请参见数据目录

      dlf.catalog.accessKeyId

      访问DLF服务所需的Access Key ID。

      获取方法请参见创建AccessKey

      dlf.catalog.accessKeySecret

      访问DLF服务所需的Access Key Secret。

      获取方法请参见创建AccessKey

      dlf.catalog.endpoint

      DLF服务的Endpoint。

      详情请参见已开通的地域和访问域名

      说明

      如果FlinkDLF位于同一地域,则使用VPC网络Endpoint,否则使用公网Endpoint。

      dlf.catalog.region

      DLF所在区域。

      详情请参见已开通的地域和访问域名

      说明

      请和dlf.catalog.endpoint选择的地域保持一致。

    4. 选择或创建Session集群。

      单击页面右下角的执行环境,选择对应版本的Session集群(VVR 8.0.4及以上引擎版本)。如果没有Session集群,请参见步骤一:创建Session集群

    5. 选中目标代码片段后,单击代码行左侧的运行

  4. 创建Paimon表。

    查询脚本文本编辑区域输入如下命令后,选中代码后单击运行

    CREATE TABLE IF NOT EXISTS `paimon`.`test_paimon_db`.`test_append_tbl`
    (
        id       STRING,
        data     STRING,
        category INT,
        ts       STRING,
        dt       STRING,
        hh       STRING
    ) PARTITIONED BY (dt, hh)
    WITH (
        'write-only' = 'true'
    );
  5. 创建流作业。

    1. 新建作业。

      1. 在左侧导航栏,选择数据开发 > ETL

      2. 单击新建

      3. 单击空白的流作业草稿

      4. 单击下一步

      5. 新建作业草稿对话框中,填写作业配置信息。

        作业参数

        说明

        文件名称

        作业的名称。

        说明

        作业名称在当前项目中必须保持唯一。

        存储位置

        指定该作业的存储位置。

        您还可以在现有文件夹右侧,单击新建文件夹图标,新建子文件夹。

        引擎版本

        当前作业使用的Flink的引擎版本。引擎版本号含义、版本对应关系和生命周期重要时间点详情请参见引擎版本介绍

      6. 单击创建

    2. 编写代码。

      在新建的作业草稿中,输入以下代码,通过datagen源源不断生成数据写入Paimon表中。

      CREATE TEMPORARY TABLE datagen
      (
          id        string,
          data      string,
          category  int
      )
      WITH (
          'connector' = 'datagen',
          'rows-per-second' = '100',
          'fields.category.kind' = 'random',
          'fields.category.min' = '1',
          'fields.category.max' = '10'
      );
      INSERT INTO `paimon`.`test_paimon_db`.`test_append_tbl`
      SELECT
          id,
          data,
          category,
          cast(LOCALTIMESTAMP as string) as ts,
          cast(CURRENT_DATE as string) as dt,
          cast(hour(LOCALTIMESTAMP) as string) as hh
      FROM datagen;
    3. 单击部署,即可将数据发布至生产环境。

    4. 您可以在作业运维页面启动作业进入运行阶段,详情请参见作业启动

步骤二:通过EMR Serverless Spark创建SQL会话

创建的SQL会话用于SQL开发和查询。有关会话的详细介绍,请参见会话管理

  1. 进入会话管理页面。

    1. 登录E-MapReduce控制台

    2. 在左侧导航栏,选择EMR Serverless > Spark

    3. Spark页面,单击目标工作空间名称。

    4. EMR Serverless Spark页面,单击左侧导航栏中的会话管理

  2. 创建SQL会话。

    1. SQL会话页签,单击创建SQL会话

    2. 在创建SQL会话页面,配置以下信息,其余参数无需配置,然后单击创建

      参数

      说明

      名称

      自定义SQL会话的名称。例如,paimon_compute。

      Spark配置

      请填写以下Spark配置信息,以连接Paimon。

      spark.sql.extensions                org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
      spark.sql.catalog.paimon            org.apache.paimon.spark.SparkCatalog
      spark.sql.catalog.paimon.metastore  dlf
      spark.sql.catalog.paimon.warehouse  <warehouse>
      spark.sql.catalog.paimon.dlf.catalog.id  <dlf.catalog.id>

      请根据您的实际情况替换以下信息:

      • <warehouse>:配置数据仓库的实际位置,请根据实际情况修改。

      • <dlf.catalog.id>:DLF数据目录ID,请根据实际情况修改。

    3. 单击操作列的启动

步骤三:通过EMR Serverless Spark进行交互式查询或任务调度

EMR Serverless Spark提供了交互式查询和任务调度两种操作模式,以满足不同的使用需求。交互式查询适用于快速查询和调试,而任务调度则支持任务的开发、发布和运维,实现完整的生命周期管理。

在数据写入过程中,我们可以随时使用EMR Serverless SparkPaimon表进行交互式查询,以便实时获取数据状态和执行快速分析。此外,通过发布开发好的任务并创建工作流,可以编排各项任务并完成工作流的发布。您可以配置调度策略,实现任务的定期调度,从而保证数据处理和分析的自动化与高效性。

交互式查询

  1. 创建SQL开发。

    1. EMR Serverless Spark页面,单击左侧导航栏中的数据开发

    2. 开发目录页签下,单击新建

    3. 在弹出的对话框中,输入名称(例如,paimon_compact),类型选择为SQL > SparkSQL,然后单击确定

    4. 在右上角选择数据目录、数据库和前一步骤中启动的SQL会话。

    5. 在新建的任务编辑器中输入SQL语句。

      • 示例1:查询test_append_tbl表中前10行的数据。

        SELECT * FROM paimon.test_paimon_db.test_append_tbl limit 10;

        返回结果示例如下。

        image

      • 示例2:统计test_append_tbl表中满足特定条件的行数。

        SELECT COUNT(*) FROM paimon.test_paimon_db.test_append_tbl WHERE dt = '2024-06-24' AND hh = '19';

        返回结果示例如下。

        image

  2. 运行并发布任务。

    1. 单击运行

      返回结果信息可以在下方的运行结果中查看。如果有异常,则可以在运行问题中查看。

    2. 确认运行无误后,单击右上角的发布

    3. 发布对话框中,可以输入发布信息,然后单击确定

任务调度

  1. 查询Compact前文件信息。

    数据开发页面,新建SQL开发,查询Paimonfiles系统表,快速地得到Compact前文件的数据。创建SQL开发的具体操作,请参见SQL开发

    SELECT file_path, record_count, file_size_in_bytes FROM paimon.test_paimon_db.test_append_tbl$files WHERE partition='[2024-06-24, 19]';

    image

  2. 数据开发页面,编写Paimon Compact SQL(例如,paimon_compact),然后完成发布。

    创建SQL开发的具体操作,请参见SQL开发

    CALL paimon.sys.compact (
      table => 'test_paimon_db.test_append_tbl',
      partitions => 'dt=\"2024-06-24\",hh=\"19\"',
      order_strategy => 'zorder',
      order_by => 'category'
    );
  3. 创建工作流。

    1. EMR Serverless Spark页面,单击左侧导航栏中的任务编排

    2. 任务编排页面,单击创建工作流

    3. 创建工作流面板中,输入工作流名称(例如,paimon_workflow_task),然后单击下一步

      其他设置区域的参数,请根据您的实际情况配置,更多参数信息请参见管理工作流

    4. 在新建的节点画布中,单击添加节点

    5. 来源文件路径下拉列表中选择已发布的SQL开发(paimon_compact),填写Spark配置参数,然后单击保存

      参数

      说明

      名称

      自定义SQL会话的名称。例如,paimon_compute。

      Spark配置

      请填写以下Spark配置信息,以连接Paimon。

      spark.sql.extensions                org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
      spark.sql.catalog.paimon            org.apache.paimon.spark.SparkCatalog
      spark.sql.catalog.paimon.metastore  dlf
      spark.sql.catalog.paimon.warehouse  <warehouse>
      spark.sql.catalog.paimon.dlf.catalog.id  <dlf.catalog.id>

      请根据您的实际情况替换以下信息:

      • <warehouse>:配置数据仓库的实际位置,请根据实际情况修改。

      • <dlf.catalog.id>:DLF数据目录ID,请根据实际情况修改。

    6. 在新建的节点画布中,单击发布工作流,然后单击确定

  4. 运行工作流。

    1. 任务编排页面,单击新建工作流(例如,paimon_workflow_task)的工作流名称

    2. 工作流实例列表页面,单击手动运行

    3. 触发运行对话框中,单击确定

  5. 验证Compact效果。

    工作流调度执行成功后,再次执行与开始相同的SQL查询,对比Compact前后文件的数量、记录数和大小,以验证Compact操作的效果。

    SELECT file_path, record_count, file_size_in_bytes FROM paimon.test_paimon_db.test_append_tbl$files WHERE partition='[2024-06-24, 19]';

    image