Delta Lake是一个开源存储框架,旨在数据湖之上构建LakeHouse架构。Delta Lake提供了ACID事务支持、可扩展的元数据处理功能,并能够在现有的数据湖(如OSS、Amazon S3和HDFS)上整合流处理与批处理。此外,Delta Lake还支持多种引擎,如Spark、PrestoDB和Flink,以及多种编程语言的API,包括Scala、Java、Rust和Python,以便于访问。
前提条件
已创建工作空间,详情请参见创建工作空间。
操作流程
步骤一:创建SQL会话
进入会话管理页面。
在左侧导航栏,选择
。在Spark页面,单击目标工作空间名称。
在EMR Serverless Spark页面,单击左侧导航栏中的会话管理。
在SQL会话页面,单击创建SQL会话。
在创建SQL会话页面的Spark配置区域,配置以下信息,单击创建。详情请参见管理SQL会话。
元数据是当前工作空间的默认Catalog。如果您希望将默认Catalog修改为外部的Hive Metastore,可以参见连接外部Hive Metastore Service。
spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension spark.sql.catalog.spark_catalog org.apache.spark.sql.delta.catalog.DeltaCatalog
步骤二:读写Delta Lake表
进入SQL开发页面。
在EMR Serverless Spark页面,单击左侧导航栏中的开发。
在开发目录页签下,单击
图标。
在新建对话框中,输入名称(例如users_task),类型使用默认的SparkSQL,然后单击确定。
拷贝如下代码到新增的SparkSQL页签(users_task)中。
CREATE DATABASE IF NOT EXISTS ss_delta_db; CREATE TABLE ss_delta_db.delta_tbl (id INT, name STRING) USING delta; INSERT INTO ss_delta_db.delta_tbl VALUES (1, "a"), (2, "b"); SELECT id, name FROM ss_delta_db.delta_tbl ORDER BY id;
在数据库下拉列表中选择一个数据库,在会话下拉列表中选择刚刚创建的SQL会话。
单击运行,执行任务。返回信息如下所示。
步骤三:更新操作
Delta Lake支持多种数据更新操作,包括Update、Delete和Merge Into。以下是具体示例。
-- Update操作
UPDATE ss_delta_db.delta_tbl SET name = "a_v2" WHERE id = 1;
-- Delete操作
DELETE FROM ss_delta_db.delta_tbl WHERE id = 2;
-- Merge Into操作
-- 创建临时表并插入数据
CREATE TABLE ss_delta_db.tmp_tbl(id INT, name STRING) USING delta;
INSERT INTO ss_delta_db.tmp_tbl VALUES (1, "a_v3"), (3, "c");
-- 执行Merge Into操作
MERGE INTO ss_delta_db.delta_tbl AS target
USING ss_delta_db.tmp_tbl AS source
ON target.id = source.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
--验证结果
SELECT * FROM ss_delta_db.delta_tbl ORDER BY id;
返回信息如下所示。
步骤四:清理操作
测试完成后,建议清理测试资源以避免占用存储空间。执行以下命令如下所示。
DROP TABLE ss_delta_db.delta_tbl;
DROP TABLE ss_delta_db.tmp_tbl;
DROP DATABASE ss_delta_db;
上述操作将永久删除表和数据库,请确保数据已备份或不再需要。
相关文档
SQL任务和任务编排完整的开发流程示例,请参见SparkSQL开发快速入门。
更多Delta Lake相关用法和配置,参见Delta Lake官方文档。