在EMR Serverless Spark中使用Paimon

Apache Paimon是一种流批统一的湖存储格式,支持高吞吐的写入和低延迟的查询,详情请参见Apache Paimon。本文为您介绍如何在EMR Serverless Spark中实现Paimon表的读取与写入操作。

前提条件

已创建工作空间,详情请参见创建工作空间

操作流程

步骤一:创建SQL会话

  1. 进入会话管理页面。

    1. 登录E-MapReduce控制台

    2. 在左侧导航栏,选择EMR Serverless > Spark

    3. Spark页面,单击目标工作空间名称。

    4. EMR Serverless Spark页面,单击左侧导航栏中的会话管理

  2. SQL会话页面,单击创建SQL会话

  3. 创建SQL会话页面的Spark配置区域,配置以下信息,单击创建。详情请参见管理SQL会话

    SparkPaimon的读写基于Catalog,您可以根据具体场景进行选择。

    • Catalog类型介绍

      类型

      描述

      Paimon Catalog

      用于管理Paimon格式的元数据,只能用于查询和写入Paimon表。

      • 支持自定义元数据类型(DLF 1.0、Hive MetaStore、FileSystem)。

      • 访问Paimon表时应使用格式 <catalogName>.<数据库名>.<表名>

        重要

        本文配置中的<catalogName>Catalog名称,您可以自定义。如无特殊需求时建议保持默认Catalog名称 paimon

      spark_catalog

      Spark默认内置Catalog,通常用于管理Spark SQL内部表的元数据,可以用于查询和写入Paimon表或者非Paimon表。

      • 元数据是当前工作空间的默认Catalog。

        如果您希望将默认Catalog修改为外部的Hive Metastore,可以参见EMR Serverless Spark连接外部Hive Metastore

      • 访问表时可直接使用格式 <数据库名>.<表名>

    • Catalog配置

      • 使用Paimon Catalog

        DLF 1.0

        元数据保存在DLF 1.0中。

        spark.sql.extensions                            org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
        spark.sql.catalog.<catalogName>                 org.apache.paimon.spark.SparkCatalog
        spark.sql.catalog.<catalogName>.metastore       dlf

        Hive MetaStore

        元数据保存在指定的Hive MetaStore中。

        spark.sql.extensions                            org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
        spark.sql.catalog.<catalogName>                 org.apache.paimon.spark.SparkCatalog
        spark.sql.catalog.<catalogName>.metastore       hive
        spark.sql.catalog.<catalogName>.uri             thrift://<yourHMSUri>:<port>

        参数

        说明

        thrift://<yourHMSUri>:<port>

        Hive MetaStoreURI。格式为thrift://<Hive metastoreIP地址>:9083

        <Hive metastoreIP地址>HMS服务的内网IP地址。如果您需要指定外部Metastore服务,请参见EMR Serverless Spark连接外部Hive Metastore

        FileSystem

        元数据保存在文件系统中。

        spark.sql.extensions                            org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
        spark.sql.catalog.<catalogName>                 org.apache.paimon.spark.SparkCatalog
        spark.sql.catalog.<catalogName>.metastore       filesystem
        spark.sql.catalog.<catalogName>.warehouse       oss://<yourBucketName>/warehouse
      • 使用spark_catalog

        spark.sql.extensions               org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
        spark.sql.catalog.spark_catalog    org.apache.paimon.spark.SparkGenericCatalog

步骤二:基于Paimon Catalogspark_catalog的表读写操作

  1. 进入SQL开发页面。

    EMR Serverless Spark页面,单击左侧导航栏中的数据开发

  2. 开发目录页签下,单击image图标。

  3. 新建对话框中,输入名称(例如users_task),类型使用默认的SQL > SparkSQL,然后单击确定

  4. 复制如下代码到新增的Spark SQL页签(users_task)中。

    使用Paimon Catalog

    -- 创建数据库
    CREATE DATABASE IF NOT EXISTS paimon.ss_paimon_db;             
    
    -- 创建Paimon表
    CREATE TABLE paimon.ss_paimon_db.paimon_tbl (id INT, name STRING) USING paimon;
    
    -- 写入Paimon表
    INSERT INTO paimon.ss_paimon_db.paimon_tbl VALUES (1, "a"), (2, "b");
    
    -- 查询 Paimon 表的写入结果
    SELECT * FROM paimon.ss_paimon_db.paimon_tbl ORDER BY id;
    
    -- 删除数据库
    DROP DATABASE paimon.ss_paimon_db CASCADE;

    使用spark_catalog

    -- 创建数据库
    CREATE DATABASE IF NOT EXISTS ss_paimon_db; 
    CREATE DATABASE IF NOT EXISTS ss_parquet_db;
    
    -- 创建Paimon表和Parquet表
    CREATE TABLE ss_paimon_db.paimon_tbl (id INT, name STRING) USING paimon;
    CREATE TABLE ss_parquet_db.parquet_tbl USING parquet AS SELECT 3, "c";
    
    -- 写入数据
    INSERT INTO ss_paimon_db.paimon_tbl VALUES (1, "a"), (2, "b");
    INSERT INTO ss_paimon_db.paimon_tbl SELECT * FROM ss_parquet_db.parquet_tbl;
    
    -- 查询写入结果
    SELECT * FROM ss_paimon_db.paimon_tbl ORDER BY id;
    SELECT * FROM ss_parquet_db.parquet_tbl;
    
    -- 删除数据库
    DROP DATABASE ss_paimon_db CASCADE;
    DROP DATABASE ss_parquet_db CASCADE;
  5. 在数据库下拉列表中选择一个数据库,在会话下拉列表中选择刚刚创建的SQL会话。

  6. 单击运行,执行任务。返回信息如下所示。

    image

常见问题

对表执行DELETEUPDATEMERGE等操作时报错,应该如何处理?

  • 问题现象:执行 DELETEUPDATEMERGE 操作时,出现以下类似错误信息。

    Caused by: org.apache.spark.sql.AnalysisException: Table does not support deletes/updates/merge: <tableName>.
        at org.apache.spark.sql.errors.QueryCompilationErrors$.tableDoesNotSupportError(QueryCompilationErrors.scala:1391)
  • 问题原因:该表的存储格式不支持行级别更新操作,或缺少必要的Spark配置。

  • 解决方法:

    1. 检查表类型。

      执行以下命令,确认该表是否为Paimon表。

      SHOW CREATE TABLE <tableName>;

      如果输出结果中包含 USING PAIMON,则表为 Paimon 表。如果输出中显示其他存储格式(如 USING hive),则需要确认该格式是否支持行级别更新操作。

    2. 确认Spark配置。

      如果所涉及的表为Paimon表,则应检查Spark配置信息,以确保已添加以下配置,从而启用对Paimon的支持。

      spark.sql.extensions org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions

      如果没有添加此配置,请在Spark配置中添加。

相关文档