Paimon与Spark集成

E-MapReduce支持通过Spark SQL对Paimon进行读写操作。本文通过示例为您介绍如何通过Spark SQL对Paimon进行读写操作。

前提条件

已创建选择了Spark和Paimon的DataLake或Custom类型的集群,创建集群详情请参见创建集群

使用限制

  • EMR-3.46.0及后续版本、EMR-5.12.0及后续版本的集群,支持Spark SQL和Spark CLI对Paimon进行读写操作。

  • 仅Spark3的Spark SQL可以通过Catalog读写Paimon。

  • Spark CLI只能通过文件系统或对象存储的路径读取Paimon。

操作步骤

步骤一:创建Catalog

Paimon将数据和元数据都保存在文件系统(例如,HDFS)或对象存储(例如,OSS)中,存储的根路径由warehouse参数指定。如果指定的warehouse路径不存在,将会自动创建该路径;如果指定的warehouse路径存在,您可以通过该Catalog访问路径中已有的表。

您还可以将元数据同步到Hive或DLF中,方便其他服务访问Paimon。

创建Filesystem Catalog

Filesystem Catalog仅将元数据保存在文件系统或对象存储中。

spark-sql --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \
--conf spark.sql.catalog.paimon.metastore=filesystem \
--conf spark.sql.catalog.paimon.warehouse=oss://<yourBucketName>/warehouse \
--conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
说明
  • spark.sql.catalog.paimon:定义了名为paimon的Catalog。

  • spark.sql.catalog.paimon.metastore:指定Catalog使用的元数据存储类型。设置为filesystem意味着元数据存储在本地文件系统。

  • spark.sql.catalog.paimon.warehouse:配置数据仓库的实际位置,请根据实际情况修改。

创建DLF Catalog

DLF Catalog会将元数据同步到DLF中。

重要

创建集群时,元数据必须是DLF统一元数据

spark-sql --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \
--conf spark.sql.catalog.paimon.metastore=dlf \
--conf spark.sql.catalog.paimon.warehouse=oss://<yourBucketName>/warehouse \
--conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
说明
  • spark.sql.catalog.paimon:定义了名为paimon的Catalog。

  • spark.sql.catalog.paimon.metastore:指定Catalog使用的元数据存储类型。设置为dlf意味着将数据同步到DLF(Data Lake Formation)中。

  • spark.sql.catalog.paimon.warehouse:配置数据仓库的实际位置,请根据实际情况修改。

创建Hive Catalog

Hive Catalog会同步元数据到Hive MetaStore中。在Hive Catalog中创建的表可以直接在Hive中查询。

Hive查询Paimon,详情请参见Paimon与Hive集成

spark-sql --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \
--conf spark.sql.catalog.paimon.metastore=hive \
--conf spark.sql.catalog.paimon.uri=thrift://master-1-1:9083 \
--conf spark.sql.catalog.paimon.warehouse=oss://<yourBucketName>/warehouse \
--conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
说明
  • spark.sql.catalog.paimon:定义了名为paimon的Catalog。

  • spark.sql.catalog.paimon.metastore:指定Catalog使用的元数据存储类型。设置为hive意味着将元数据同步到Hive Metastore中。

  • spark.sql.catalog.paimon.uri:为Hive MetaStore Service的地址和端口。参数值为thrift://master-1-1:9083,这意味着Spark SQL将连接到这个运行在master-1-1主机上、监听9083端口的Hive Metastore服务以获取元数据信息。

  • spark.sql.catalog.paimon.warehouse:配置数据仓库的实际位置,请根据实际情况修改。

步骤二 :通过Spark SQL读写Paimon中的数据

执行以下Spark SQL语句,在Catalog中创建一张表,并读写表中的数据。

-- 切换到paimon catalog
USE paimon;

-- 在之前创建的paimon的Catalog中,创建并使用一个测试DATABASE。
CREATE DATABASE test_db;
USE test_db;

-- 创建Paimon表。
CREATE TABLE test_tbl (
    uuid int,
    name string,
    price double
) TBLPROPERTIES (
    'primary-key' = 'uuid'
);

-- 向Paimon表中写入数据。
INSERT INTO test_tbl VALUES (1, 'apple', 3.5), (2, 'banana', 4.0), (3, 'cherry', 20.5);

-- 读取表中的数据。
SELECT * FROM test_tbl;

查询结果如下。

1       apple   3.5
2       banana  4.0
3       cherry  20.5

步骤三:通过Spark CLI读Paimon中的数据

  1. 执行以下命令,启动Spark CLI。

    spark-shell
  2. 在Spark CLI中运行以下Scala代码,查询指定目录下存储的Paimon表。

    val dataset = spark.read.format("paimon").load("oss://<yourBucketName>/warehouse/test_db.db/test_tbl")
    dataset.createOrReplaceTempView("test_tbl")
    spark.sql("SELECT * FROM test_tbl").show()
    说明
    • paimon:固定值。表明您正在使用Paimon作为数据存储格式来读取或写入数据。

    • oss://<yourBucketName>/warehouse/test_db.db/test_tbl:Paimon表所在路径,请根据实际情况进行替换。

    返回信息如下所示。

    +----+------+-----+                                                             
    |uuid|  name|price|
    +----+------+-----+
    |   1| apple|  3.5|
    |   2|banana|  4.0|
    |   3|cherry| 20.5|
    +----+------+-----+