Paimon与Spark集成
E-MapReduce支持通过Spark SQL对Paimon进行读写操作。本文通过示例为您介绍如何通过Spark SQL对Paimon进行读写操作。
使用限制
EMR-3.46.0及后续版本、EMR-5.12.0及后续版本的集群,支持Spark SQL和Spark CLI对Paimon进行读写操作。
仅Spark3的Spark SQL可以通过Catalog读写Paimon。
Spark CLI只能通过文件系统或对象存储的路径读取Paimon。
操作步骤
步骤一:创建Catalog
Paimon将数据和元数据都保存在文件系统(例如,HDFS)或对象存储(例如,OSS)中,存储的根路径由warehouse参数指定。如果指定的warehouse路径不存在,将会自动创建该路径;如果指定的warehouse路径存在,您可以通过该Catalog访问路径中已有的表。
您还可以同步元数据到Hive或DLF中,方便其它服务访问Paimon。
创建Filesystem Catalog
Filesystem Catalog仅将元数据保存在文件系统或对象存储中。
spark-sql --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog --conf spark.sql.catalog.paimon.metastore=filesystem --conf spark.sql.catalog.paimon.warehouse=oss://<yourBucketName>/warehouse
创建Hive Catalog
Hive Catalog会同步元数据到Hive MetaStore中。在Hive Catalog中创建的表可以直接在Hive中查询。
Hive查询Paimon,详情请参见Paimon与Hive集成。
spark-sql --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog --conf spark.sql.catalog.paimon.metastore=hive --conf spark.sql.catalog.paimon.uri=thrift://master-1-1:9083 --conf spark.sql.catalog.paimon.warehouse=oss://<yourBucketName>/warehouse
说明
spark.sql.catalog.paimon.uri
为Hive MetaStore Service的地址。
创建DLF Catalog
DLF Catalog会同步元数据到DLF中。
重要
创建集群时,元数据须为DLF统一元数据。
spark-sql --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog --conf spark.sql.catalog.paimon.metastore=dlf --conf spark.sql.catalog.paimon.warehouse=oss://<yourBucketName>/warehouse
步骤二 :通过Spark SQL读写Paimon中的数据
执行以下Spark SQL语句,在Catalog中创建一张表,并读写表中的数据。
-- 在之前创建的catalog中,创建并使用一个测试database。
CREATE DATABASE paimon.test_db;
USE paimon.test_db;
-- 创建Paimon表。
CREATE TABLE test_tbl (
uuid int,
name string,
price double
) TBLPROPERTIES (
'primary-key' = 'uuid'
);
-- 向Paimon中写入数据。
INSERT INTO test_tbl VALUES (1, 'apple', 3.5), (2, 'banana', 4.0), (3, 'cherry', 20.5);
-- 读取表中的数据。
SELECT * FROM test_tbl;
步骤三:通过Spark CLI读Paimon中的数据
执行以下命令,启动Spark CLI。
spark-shell
在Spark CLI中运行以下Scala代码,查询指定目录下存储的Paimon表。
val dataset = spark.read.format("paimon").load("oss://<yourBucketName>/warehouse/test_db.db/test_tbl") dataset.createOrReplaceTempView("test_tbl") spark.sql("SELECT * FROM test_tbl").show()