Apache Paimon是一种流批统一的湖存储格式,支持高吞吐的写入和低延迟的查询,详情请参见Apache Paimon。本文为您介绍如何在EMR Serverless Spark中实现Paimon表的读取与写入操作。
前提条件
已创建工作空间,详情请参见创建工作空间。
操作流程
步骤一:创建SQL会话
进入会话管理页面。
在左侧导航栏,选择
。在Spark页面,单击目标工作空间名称。
在EMR Serverless Spark页面,单击左侧导航栏中的会话管理。
在SQL会话页面,单击创建SQL会话。
在创建SQL会话页面的Spark配置区域,配置以下信息,单击创建。详情请参见管理SQL会话。
Spark对Paimon表的读写基于Catalog,根据不同场景可以有以下两种选择:
使用Paimon Catalog(仅限于查询和写入Paimon表,同时支持自定义元数据类型,目前支持以下三类)。
DLF
元数据保存在DLF中。
spark.sql.extensions org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions spark.sql.catalog.paimon org.apache.paimon.spark.SparkCatalog spark.sql.catalog.paimon.metastore dlf
Hive
元数据保存在指定的Hive MetaStore中。
spark.sql.extensions org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions spark.sql.catalog.paimon org.apache.paimon.spark.SparkCatalog spark.sql.catalog.paimon.metastore hive spark.sql.catalog.paimon.uri thrift://<yourHMSUri>:<port>
参数
说明
thrift://<yourHMSUri>:<port>
Hive MetaStore的URI。格式为
thrift://<Hive metastore的IP地址>:9083
。<Hive metastore的IP地址>
为HMS服务的内网IP地址。如果您需要指定外部Metastore服务,请参见EMR Serverless Spark连接外部Hive Metastore。FileSystem
元数据保存在文件系统中。
spark.sql.extensions org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions spark.sql.catalog.paimon org.apache.paimon.spark.SparkCatalog spark.sql.catalog.paimon.metastore filesystem spark.sql.catalog.paimon.warehouse oss://<yourBucketName>/warehouse
使用spark_catalog(可以查询、写入Paimon表或者非Paimon表,元数据仅支持DLF)。
spark.sql.extensions org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions spark.sql.catalog.spark_catalog org.apache.paimon.spark.SparkGenericCatalog
步骤二:读写Paimon表
进入SQL开发页面。
在EMR Serverless Spark页面,单击左侧导航栏中的数据开发。
在开发目录页签下,单击新建。
在新建对话框中,输入名称(例如users_task),类型使用默认的SparkSQL,然后单击确定。
拷贝如下代码到新增的Spark SQL页签(users_task)中。
使用Paimon Catalog
此时,访问Paimon表需通过paimon.db.tbl,访问非Paimon表,需通过spark_catalog.db.tbl。
-- 创建数据库 CREATE DATABASE IF NOT EXISTS paimon.ss_paimon_db; CREATE DATABASE IF NOT EXISTS spark_catalog.ss_parquet_db; -- 创建Paimon表和Parquet表 CREATE TABLE paimon.ss_paimon_db.paimon_tbl (id INT, name STRING) USING paimon; CREATE TABLE spark_catalog.ss_parquet_db.parquet_tbl USING parquet AS SELECT 3, "c"; -- 写入Paimon表 INSERT INTO paimon.ss_paimon_db.paimon_tbl VALUES (1, "a"), (2, "b"); INSERT INTO paimon.ss_paimon_db.paimon_tbl SELECT * FROM spark_catalog.ss_parquet_db.parquet_tbl; -- 查询写入结果 SELECT * FROM paimon.ss_paimon_db.paimon_tbl ORDER BY id; -- 删除数据库 DROP DATABASE paimon.ss_paimon_db CASCADE; DROP DATABASE spark_catalog.ss_parquet_db CASCADE;
使用spark_catalog
在该情况下,无论是访问Paimon表还是非Paimon表,都可以通过spark_catalog.db.tbl进行访问(由于spark_catalog为默认Catalog,因此可以省略不写)。
-- 创建数据库 CREATE DATABASE IF NOT EXISTS ss_paimon_db; CREATE DATABASE IF NOT EXISTS ss_parquet_db; -- 创建Paimon表和Parquet表 CREATE TABLE ss_paimon_db.paimon_tbl (id INT, name STRING) USING paimon; CREATE TABLE ss_parquet_db.parquet_tbl USING parquet AS SELECT 3, "c"; -- 写入Paimon表 INSERT INTO ss_paimon_db.paimon_tbl VALUES (1, "a"), (2, "b"); INSERT INTO ss_paimon_db.paimon_tbl SELECT * FROM ss_parquet_db.parquet_tbl; -- 查询写入结果 SELECT * FROM ss_paimon_db.paimon_tbl ORDER BY id; -- 删除数据库 DROP DATABASE ss_paimon_db CASCADE; DROP DATABASE ss_parquet_db CASCADE;
在数据库下拉列表中选择一个数据库,在会话下拉列表中选择刚刚创建的SQL会话。
单击运行,执行任务。返回信息如下所示。
相关文档
SQL任务和任务编排完整的开发流程示例,请参见SQL开发快速入门。
更多Paimon相关用法和配置,请参见Paimon官方文档。
如果需要指定外部Metastore服务,请参见EMR Serverless Spark连接外部Hive Metastore。