本文介绍了如何在EMR Serverless Spark中开发并运行一个基于数据湖构建(DLF)的Paimon表写入任务。通过上传测试文件、创建任务并运行,最终可以通过日志探查或控制台查看结果,验证数据写入和查询的正确性。
前提条件
操作流程
步骤一:准备测试文件
创建一个简单的DataFrame,并将该DataFrame写入名为pyspark_test的表中,存储格式为Paimon。随后,通过Spark SQL对表数据进行查询,以验证写入结果的正确性。
Java
Java代码示例如下。单击SparkExample-1.0-SNAPSHOT.jar,直接下载测试JAR包。
Maven依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.5.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.5.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>3.5.2</version>
<scope>provided</scope>
</dependency>
代码示例
package org.example;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import java.util.Arrays;
import java.util.List;
public class DlfAccess {
public static void main(String[] args) {
// 创建 SparkSession 实例
SparkSession spark = SparkSession.builder()
.appName("DLF Example")
.enableHiveSupport()
.getOrCreate();
// 构造 DataFrame
List<Row> data = Arrays.asList(
RowFactory.create(1, "Alice"),
RowFactory.create(2, "Bob"),
RowFactory.create(3, "Charlie")
);
StructType schema = DataTypes.createStructType(new StructField[] {
DataTypes.createStructField("id", DataTypes.IntegerType, false),
DataTypes.createStructField("name", DataTypes.StringType, false)
});
// 创建 DataFrame 并写表
Dataset<Row> df = spark.createDataFrame(data, schema);
spark.sql("drop table if exists pyspark_test");
df.write().format("paimon").mode("overwrite").saveAsTable("pyspark_test");
// 执行查询,获取前 10 条数据
Dataset<Row> tableDf = spark.sql("select * from pyspark_test limit 10");
tableDf.show();
spark.stop();
}
}
Python
如果您的工作空间绑定的是DLF 1.0(旧版),需要在Spark配置中添加spark.sql.extensions org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
配置。
代码示例
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("DLF test") \
.enableHiveSupport() \
.getOrCreate()
# 测试数据
data = [
(1, "Alice"),
(2, "Bob"),
(3, "Charlie")
]
spark.sql("drop table if exists pyspark_test")
# 创建 DataFrame 并写表
df = spark.createDataFrame(data, schema='id int, name string')
df.write.format('paimon').mode("overwrite").saveAsTable("pyspark_test")
# 查表验证
spark.sql("select * from pyspark_test").show()
步骤二:上传测试文件
进入资源上传页面。
在左侧导航栏,选择
。在Spark页面,单击目标工作空间名称。
在EMR Serverless Spark页面,单击左侧导航栏中的文件管理。
在文件管理页面,单击上传文件。
在上传文件对话框中,单击待上传文件区域选择Python文件或者JAR包,或直接拖拽Python或文件者JAR包到待上传文件区域。
步骤三:创建批任务并运行
在EMR Serverless Spark页面,单击左侧导航栏中的数据开发。
在开发目录页签下,单击
(新建)图标。
在弹出的对话框中,输入名称,根据实际需求在批任务中选择PySpark或者JAR类型,然后单击确定。
在右上角选择目标队列。
添加队列的具体操作,请参见管理资源队列。
在新建的开发页签中,配置以下信息,其余参数无需配置,然后单击运行。
JAR
参数
说明
主jar资源
选择上传步骤一中编译的JAR包。
Main Class
提交Spark任务时所指定的主类。本文示例填写为
org.example.DlfAccess
。PySpark
参数
说明
主Python资源
选择工作空间资源,然后选择在资源上传页面上传的Python文件。
步骤四:查看结果
任务运行完成后,您可以选择以下方式查看结果。
方式一:通过日志探查直接查看
运行任务后,在下方的运行记录区域,单击任务操作列的日志探查。
在日志探查页签,您可以查看相关的日志信息。
方式二:通过数据湖构建控制台查看
登录数据湖构建控制台。
进入对应的Catalog和数据库,即可看到创建的测试表。
方式三:通过SQL开发查看
在数据开发中创建SQL开发,执行查询语句,以验证数据写入的正确性,详情请参见SparkSQL开发。