在EMR Serverless Spark中使用Paimon

Apache Paimon是一种流批统一的湖存储格式,支持高吞吐的写入和低延迟的查询,详情请参见Apache Paimon。本文为您介绍如何在EMR Serverless Spark中实现Paimon表的读取与写入操作。

前提条件

已创建工作空间,详情请参见创建工作空间

操作流程

步骤一:创建SQL会话

  1. 进入会话管理页面。

    1. 登录E-MapReduce控制台

    2. 在左侧导航栏,选择EMR Serverless > Spark

    3. Spark页面,单击目标工作空间名称。

    4. EMR Serverless Spark页面,单击左侧导航栏中的会话管理

  2. SQL会话页面,单击创建SQL会话

  3. 创建SQL会话页面的Spark配置区域,配置以下信息,单击创建。详情请参见管理SQL会话

    Spark对Paimon表的读写基于Catalog,根据不同场景可以有以下两种选择:

    • 使用Paimon Catalog(仅限于查询和写入Paimon表,同时支持自定义元数据类型,目前支持以下三类)。

      DLF

      元数据保存在DLF中。

      spark.sql.extensions                     org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
      spark.sql.catalog.paimon                 org.apache.paimon.spark.SparkCatalog
      spark.sql.catalog.paimon.metastore       dlf

      Hive

      元数据保存在指定的Hive MetaStore中。

      spark.sql.extensions                     org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
      spark.sql.catalog.paimon                 org.apache.paimon.spark.SparkCatalog
      spark.sql.catalog.paimon.metastore       hive
      spark.sql.catalog.paimon.uri             thrift://<yourHMSUri>:<port>

      参数

      说明

      thrift://<yourHMSUri>:<port>

      Hive MetaStore的URI。格式为thrift://<Hive metastore的IP地址>:9083

      <Hive metastore的IP地址>为HMS服务的内网IP地址。如果您需要指定外部Metastore服务,请参见EMR Serverless Spark连接外部Hive Metastore

      FileSystem

      元数据保存在文件系统中。

      spark.sql.extensions                     org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
      spark.sql.catalog.paimon                 org.apache.paimon.spark.SparkCatalog
      spark.sql.catalog.paimon.metastore       filesystem
      spark.sql.catalog.paimon.warehouse       oss://<yourBucketName>/warehouse
    • 使用spark_catalog(可以查询、写入Paimon表或者非Paimon表,元数据仅支持DLF)。

      spark.sql.extensions               org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
      spark.sql.catalog.spark_catalog    org.apache.paimon.spark.SparkGenericCatalog

步骤二:读写Paimon表

  1. 进入SQL开发页面。

    EMR Serverless Spark页面,单击左侧导航栏中的数据开发

  2. 开发目录页签下,单击新建

  3. 新建对话框中,输入名称(例如users_task),类型使用默认的SparkSQL,然后单击确定

  4. 拷贝如下代码到新增的Spark SQL页签(users_task)中。

    使用Paimon Catalog

    此时,访问Paimon表需通过paimon.db.tbl,访问非Paimon表,需通过spark_catalog.db.tbl。

    -- 创建数据库
    CREATE DATABASE IF NOT EXISTS paimon.ss_paimon_db;
    CREATE DATABASE IF NOT EXISTS spark_catalog.ss_parquet_db;
    
    -- 创建Paimon表和Parquet表
    CREATE TABLE paimon.ss_paimon_db.paimon_tbl (id INT, name STRING) USING paimon;
    CREATE TABLE spark_catalog.ss_parquet_db.parquet_tbl USING parquet AS SELECT 3, "c";
    
    -- 写入Paimon表
    INSERT INTO paimon.ss_paimon_db.paimon_tbl VALUES (1, "a"), (2, "b");
    INSERT INTO paimon.ss_paimon_db.paimon_tbl SELECT * FROM spark_catalog.ss_parquet_db.parquet_tbl;
    
    -- 查询写入结果
    SELECT * FROM paimon.ss_paimon_db.paimon_tbl ORDER BY id;
    
    -- 删除数据库
    DROP DATABASE paimon.ss_paimon_db CASCADE;
    DROP DATABASE spark_catalog.ss_parquet_db CASCADE;

    使用spark_catalog

    在该情况下,无论是访问Paimon表还是非Paimon表,都可以通过spark_catalog.db.tbl进行访问(由于spark_catalog为默认Catalog,因此可以省略不写)。

    -- 创建数据库
    CREATE DATABASE IF NOT EXISTS ss_paimon_db;
    CREATE DATABASE IF NOT EXISTS ss_parquet_db;
    
    -- 创建Paimon表和Parquet表
    CREATE TABLE ss_paimon_db.paimon_tbl (id INT, name STRING) USING paimon;
    CREATE TABLE ss_parquet_db.parquet_tbl USING parquet AS SELECT 3, "c";
    
    -- 写入Paimon表
    INSERT INTO ss_paimon_db.paimon_tbl VALUES (1, "a"), (2, "b");
    INSERT INTO ss_paimon_db.paimon_tbl SELECT * FROM ss_parquet_db.parquet_tbl;
    
    -- 查询写入结果
    SELECT * FROM ss_paimon_db.paimon_tbl ORDER BY id;
    
    -- 删除数据库
    DROP DATABASE ss_paimon_db CASCADE;
    DROP DATABASE ss_parquet_db CASCADE;
  5. 在数据库下拉列表中选择一个数据库,在会话下拉列表中选择刚刚创建的SQL会话。

  6. 单击运行,执行任务。返回信息如下所示。

    image

相关文档