本文演示了使用实时计算Flink版和EMR Serverless Spark构建Paimon数据湖分析流程。该流程包括将数据写入OSS、进行交互式查询以及执行离线数据Compact操作。EMR Serverless Spark完全兼容Paimon,通过内置的DLF元数据与其他云产品(例如,实时计算Flink版)实现元数据互通,形成完整的流批一体化解决方案。它支持灵活的任务运行方式和参数配置,满足实时分析和生产调度的多种需求。
背景信息
实时计算Flink版
阿里云实时计算Flink版是一种全托管Serverless的Flink云服务,是一站式开发运维管理平台,开箱即用,计费灵活。具备作业开发、数据调试、运行与监控、自动调优、智能诊断等全生命周期能力。更多信息,请参见什么是阿里云实时计算Flink版。
Apache Paimon
Apache Paimon是一种统一的数据湖存储格式,结合Flink和Spark构建了流批处理的实时湖仓一体架构。Paimon创新地将湖格式与LSM(Log-structured merge-tree)技术结合,使数据湖具备了实时流更新和完整的流处理能力。更多信息,请参见Apache Paimon。
操作流程
步骤一:通过实时计算Flink创建Paimon Catalog
Paimon Catalog可以方便地管理同一个warehouse目录下的所有Paimon表,并与其它阿里云产品连通。创建并使用Paimon Catalog,详情请参见管理Paimon Catalog。
登录实时计算控制台。
单击目标工作空间操作列下的控制台。
创建Paimon Catalog。
在SQL开发页面,单击查询脚本页签。
单击,新建查询脚本。
填写SQL代码。
Catalog完整配置如下所示。
CREATE CATALOG `paimon` WITH ( 'type' = 'paimon', 'metastore' = 'dlf', 'warehouse' = '<warehouse>', 'dlf.catalog.id' = '<dlf.catalog.id>', 'dlf.catalog.accessKeyId' = '<dlf.catalog.accessKeyId>', 'dlf.catalog.accessKeySecret' = '<dlf.catalog.accessKeySecret>', 'dlf.catalog.endpoint' = '<dlf.catalog.endpoint>', 'dlf.catalog.region' = '<dlf.catalog.region>', );
配置项
说明
是否必填
备注
paimon
Paimon Catalog名称。
是
请填写为自定义的英文名。
type
Catalog类型。
是
固定值为paimon。
metastore
元数据存储类型。
是
本文示例元数据存储类型选择dlf,通过DLF实现统一的元数据管理,实现多引擎无缝衔接。
warehouse
配置数据仓库的实际位置。
是
请根据实际情况修改。
dlf.catalog.id
DLF数据目录ID。
是
请在数据湖构建控制台上查看数据目录对应的ID,具体操作请参见数据目录。
dlf.catalog.accessKeyId
访问DLF服务所需的Access Key ID。
是
获取方法请参见创建AccessKey。
dlf.catalog.accessKeySecret
访问DLF服务所需的Access Key Secret。
是
获取方法请参见创建AccessKey。
dlf.catalog.endpoint
DLF服务的Endpoint。
是
详情请参见已开通的地域和访问域名。
说明如果Flink与DLF位于同一地域,则使用VPC网络Endpoint,否则使用公网Endpoint。
dlf.catalog.region
DLF所在区域。
是
详情请参见已开通的地域和访问域名。
说明请和dlf.catalog.endpoint选择的地域保持一致。
选择或创建Session集群。
单击页面右下角的执行环境,选择对应版本的Session集群(VVR 8.0.4及以上引擎版本)。如果没有Session集群,请参见步骤一:创建Session集群。
选中目标代码片段后,单击代码行左侧的运行。
创建Paimon表。
在查询脚本文本编辑区域输入如下命令后,选中代码后单击运行。
CREATE TABLE IF NOT EXISTS `paimon`.`test_paimon_db`.`test_append_tbl` ( id STRING, data STRING, category INT, ts STRING, dt STRING, hh STRING ) PARTITIONED BY (dt, hh) WITH ( 'write-only' = 'true' );
创建流作业。
新建作业。
在SQL开发页面,单击作业草稿页签。
单击新建。
单击空白的流作业草稿。
单击下一步。
在新建作业草稿对话框,填写作业配置信息。
作业参数
说明
文件名称
作业的名称。
说明作业名称在当前项目中必须保持唯一。
存储位置
指定该作业的存储位置。
您还可以在现有文件夹右侧,单击图标,新建子文件夹。
引擎版本
当前作业使用的Flink的引擎版本。引擎版本号含义、版本对应关系和生命周期重要时间点详情请参见引擎版本介绍。
单击创建。
编写代码。
在新建的作业草稿中,输入以下代码,通过datagen源源不断生成数据写入Paimon表中。
CREATE TEMPORARY TABLE datagen ( id string, data string, category int ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '100', 'fields.category.kind' = 'random', 'fields.category.min' = '1', 'fields.category.max' = '10' ); INSERT INTO `paimon`.`test_paimon_db`.`test_append_tbl` SELECT id, data, category, cast(LOCALTIMESTAMP as string) as ts, cast(CURRENT_DATE as string) as dt, cast(hour(LOCALTIMESTAMP) as string) as hh FROM datagen;
单击部署,即可将数据发布至生产环境。
您可以在作业运维页面启动作业进入运行阶段,详情请参见作业启动。
步骤二:通过EMR Serverless Spark创建SQL会话
创建的SQL会话用于SQL开发和查询。有关会话的详细介绍,请参见Compute。
进入会话管理页面。
在左侧导航栏,选择
。在Spark页面,单击目标工作空间名称。
在EMR Serverless Spark页面,单击左侧导航栏中的会话管理。
创建SQL会话。
在SQL会话页签,单击创建SQL会话。
在创建SQL会话页面,配置以下信息,其余参数无需配置,然后单击创建。
参数
说明
名称
自定义SQL会话的名称。例如,paimon_compute。
Spark配置
请填写以下Spark配置信息,以连接Paimon。
spark.sql.extensions org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions spark.sql.catalog.paimon org.apache.paimon.spark.SparkCatalog spark.sql.catalog.paimon.metastore dlf spark.sql.catalog.paimon.warehouse <warehouse> spark.sql.catalog.paimon.dlf.catalog.id <dlf.catalog.id>
请根据您的实际情况替换以下信息:
<warehouse>
:配置数据仓库的实际位置,请根据实际情况修改。<dlf.catalog.id>
:DLF数据目录ID,请根据实际情况修改。
单击操作列的启动。
步骤三:通过EMR Serverless Spark进行交互式查询或任务调度
EMR Serverless Spark提供了交互式查询和任务调度两种操作模式,以满足不同的使用需求。交互式查询适用于快速查询和调试,而任务调度则支持任务的开发、发布和运维,实现完整的生命周期管理。
在数据写入过程中,我们可以随时使用EMR Serverless Spark对Paimon表进行交互式查询,以便实时获取数据状态和执行快速分析。此外,通过发布开发好的任务并创建工作流,可以编排各项任务并完成工作流的发布。您可以配置调度策略,实现任务的定期调度,从而保证数据处理和分析的自动化与高效性。
交互式查询
创建SQL开发。
在EMR Serverless Spark页面,单击左侧导航栏中的数据开发。
在开发目录页签下,单击新建。
在弹出的对话框中,输入名称(例如,paimon_compact),类型选择为
,然后单击确定。在右上角选择数据目录、数据库和前一步骤中启动的SQL Compute。
在新建的任务编辑器中输入SQL语句。
示例1:查询
test_append_tbl
表中前10行的数据。SELECT * FROM paimon.test_paimon_db.test_append_tbl limit 10;
返回结果示例如下。
示例2:统计
test_append_tbl
表中满足特定条件的行数。SELECT COUNT(*) FROM paimon.test_paimon_db.test_append_tbl WHERE dt = '2024-06-24' AND hh = '19';
返回结果示例如下。
运行并发布任务。
单击运行。
返回结果信息可以在下方的运行结果中查看。如果有异常,则可以在运行问题中查看。
确认运行无误后,单击右上角的发布。
在发布对话框中,可以输入发布信息,然后单击确定。
任务调度
查询Compact前文件信息。
在数据开发页面,新建SQL开发,查询Paimon的files系统表,快速地得到Compact前文件的数据。创建SQL开发的具体操作,请参见SQL开发。
SELECT file_path, record_count, file_size_in_bytes FROM paimon.test_paimon_db.test_append_tbl$files WHERE partition='[2024-06-24, 19]';
在数据开发页面,编写Paimon Compact SQL(例如,paimon_compact),然后完成发布。
创建SQL开发的具体操作,请参见SQL开发。
CALL paimon.sys.compact ( table => 'test_paimon_db.test_append_tbl', partitions => 'dt=\"2024-06-24\",hh=\"19\"', order_strategy => 'zorder', order_by => 'category' );
创建工作流。
在EMR Serverless Spark页面,单击左侧导航栏中的任务编排。
在任务编排页面,单击创建工作流。
在创建工作流面板中,输入工作流名称(例如,paimon_workflow_task),然后单击下一步。
其他设置区域的参数,请根据您的实际情况配置,更多参数信息请参见管理工作流。
在新建的节点画布中,单击添加节点。
在来源文件路径下拉列表中选择已发布的SQL开发(paimon_compact),填写Spark配置参数,然后单击保存。
参数
说明
名称
自定义SQL会话的名称。例如,paimon_compute。
Spark配置
请填写以下Spark配置信息,以连接Paimon。
spark.sql.extensions org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions spark.sql.catalog.paimon org.apache.paimon.spark.SparkCatalog spark.sql.catalog.paimon.metastore dlf spark.sql.catalog.paimon.warehouse <warehouse> spark.sql.catalog.paimon.dlf.catalog.id <dlf.catalog.id>
请根据您的实际情况替换以下信息:
<warehouse>
:配置数据仓库的实际位置,请根据实际情况修改。<dlf.catalog.id>
:DLF数据目录ID,请根据实际情况修改。
在新建的节点画布中,单击发布工作流,然后单击确定。
运行工作流。
在任务编排页面,单击新建工作流(例如,paimon_workflow_task)的工作流名称。
在工作流实例列表页面,单击手动运行。
在触发运行对话框中,单击确定。
验证Compact效果。
工作流调度执行成功后,再次执行与开始相同的SQL查询,对比Compact前后文件的数量、记录数和大小,以验证Compact操作的效果。
SELECT file_path, record_count, file_size_in_bytes FROM paimon.test_paimon_db.test_append_tbl$files WHERE partition='[2024-06-24, 19]';