使用DLF

本文介绍了如何在EMR Serverless Spark中开发并运行一个基于数据湖构建(DLF)的Paimon表写入任务。通过上传测试文件、创建任务并运行,最终可以通过日志探查或控制台查看结果,验证数据写入和查询的正确性。

前提条件

操作流程

步骤一:准备测试文件

创建一个简单的DataFrame,并将该DataFrame写入名为pyspark_test的表中,存储格式为Paimon。随后,通过Spark SQL对表数据进行查询,以验证写入结果的正确性。

Java

Java代码示例如下。单击SparkExample-1.0-SNAPSHOT.jar,直接下载测试JAR包。

Maven依赖

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.12</artifactId>
    <version>3.5.2</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.12</artifactId>
    <version>3.5.2</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-hive_2.12</artifactId>
    <version>3.5.2</version>
    <scope>provided</scope>
</dependency>

代码示例

package org.example;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import java.util.Arrays;
import java.util.List;

public class DlfAccess {
    public static void main(String[] args) {
        // 创建 SparkSession 实例
        SparkSession spark = SparkSession.builder()
                .appName("DLF Example")
                .enableHiveSupport()
                .getOrCreate();

        // 构造 DataFrame
        List<Row> data = Arrays.asList(
                RowFactory.create(1, "Alice"),
                RowFactory.create(2, "Bob"),
                RowFactory.create(3, "Charlie")
        );

        StructType schema = DataTypes.createStructType(new StructField[] {
                DataTypes.createStructField("id",   DataTypes.IntegerType, false),
                DataTypes.createStructField("name", DataTypes.StringType,  false)
        });
        // 创建 DataFrame 并写表

        Dataset<Row> df = spark.createDataFrame(data, schema);

        spark.sql("drop table if exists pyspark_test");
        df.write().format("paimon").mode("overwrite").saveAsTable("pyspark_test");

        // 执行查询,获取前 10 条数据
        Dataset<Row> tableDf = spark.sql("select * from pyspark_test limit 10");

        tableDf.show();

        spark.stop();
    }
}

Python

如果您的工作空间绑定的是DLF 1.0(旧版),需要在Spark配置中添加spark.sql.extensions org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions配置。

代码示例

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("DLF test") \
    .enableHiveSupport() \
    .getOrCreate()

# 测试数据
data = [
    (1, "Alice"),
    (2, "Bob"),
    (3, "Charlie")
]

spark.sql("drop table if exists pyspark_test")
# 创建 DataFrame 并写表
df = spark.createDataFrame(data, schema='id int, name string')
df.write.format('paimon').mode("overwrite").saveAsTable("pyspark_test")

# 查表验证
spark.sql("select * from pyspark_test").show()

步骤二:上传测试文件

  1. 进入资源上传页面。

    1. 登录E-MapReduce控制台

    2. 在左侧导航栏,选择EMR Serverless > Spark

    3. Spark页面,单击目标工作空间名称。

    4. EMR Serverless Spark页面,单击左侧导航栏中的文件管理

  2. 文件管理页面,单击上传文件

  3. 上传文件对话框中,单击待上传文件区域选择Python文件或者JAR包,或直接拖拽Python或文件者JAR包到待上传文件区域。

步骤三:创建批任务并运行

  1. EMR Serverless Spark页面,单击左侧导航栏中的数据开发

  2. 开发目录页签下,单击image(新建)图标。

  3. 在弹出的对话框中,输入名称,根据实际需求在批任务中选择PySpark或者JAR类型,然后单击确定

  4. 在右上角选择目标队列。

    添加队列的具体操作,请参见管理资源队列

  5. 在新建的开发页签中,配置以下信息,其余参数无需配置,然后单击运行

    JAR

    参数

    说明

    jar资源

    选择上传步骤一中编译的JAR包。

    Main Class

    提交Spark任务时所指定的主类。本文示例填写为org.example.DlfAccess

    PySpark

    参数

    说明

    Python资源

    选择工作空间资源,然后选择在资源上传页面上传的Python文件。

步骤四:查看结果

任务运行完成后,您可以选择以下方式查看结果。

方式一:通过日志探查直接查看

  1. 运行任务后,在下方的运行记录区域,单击任务操作列的日志探查

  2. 日志探查页签,您可以查看相关的日志信息。

    image

方式二:通过数据湖构建控制台查看

  1. 登录数据湖构建控制台

  2. 进入对应的Catalog和数据库,即可看到创建的测试表。

    image

方式三:通过SQL开发查看

数据开发中创建SQL开发,执行查询语句,以验证数据写入的正确性,详情请参见SparkSQL开发

image