全部产品

Databricks Delta商品库存示例并使用OPTIMIZE和Z-ORDER进行查询性能优化

前提条件

  1. 通过主账号登录阿里云 Databricks控制台

  2. 已创建集群,具体请参见创建集群

  3. 已使用OSS管理控制台创建非系统目录存储空间,详情请参见创建存储空间

    警告

    首次使用DDI产品创建的Bucket为系统目录Bucket,不建议存放数据,您需要再创建一个Bucket来读写数据。

    说明

    DDI支持免密访问OSS路径,结构为:oss://BucketName/Object

    • BucketName为您的存储空间名称;

    • Object为上传到OSS上的文件的访问路径。

    例:读取在存储空间名称为databricks-demo-hangzhou文件路径为demo/The_Sorrows_of_Young_Werther.txt的文件

    // 从oss地址读取文本文档
    val text = sc.textFile("oss://databricks-demo-hangzhou/demo/The_Sorrows_of_Young_Werther.txt")

如何使用Databricks Delta更新和查询商品库存信息

当执行UPSERT操作时,一般情况下,您需要执行两个不同的任务 - UPdate操作和inSERT操作。为了优化性能,Databricks Delta能够执行“upsert”或MERGE操作,从而简化业务逻辑。

在这个notebook中,我们将会演示2个Use Case:

  1. DML: MERGE/DELETE/UPDATE

  2. 如何通过`OPTIMIZE`` and `ZORDER`对查询性能进行优化

步骤一:创建集群并通过knox账号访问Notebook

创建集群参考: https://help.aliyun.com/document_detail/167621.html,需注意要设置ram子账号及保存好knox密码,登录WebUI时候需要用到。

步骤二:创建Notebook、导入数据、进行数据分析

示例Note下载:CASE6-Delta商品库存案例.zpln

示例数据下载一:online_retail_mergetable.csv

示例数据下载二:online_retail.csv

%pyspark

# 将csv文件转化为parquet格式
# 注意文件读取和保存的路径请按照您的oss路径进行配置
spark.read.option("header", "true") \
    .csv("oss://databricks-demo/online_retail.csv") \
    .select("StockCode","Description","Quantity","UnitPrice","Country") \
    .write.format("parquet").mode("overwrite") \
    .save("oss://databricks-demo/parquet_online_retail/inventory")

# 从parquet文件导入DataFrame并查看
df = spark.read.parquet("oss://databricks-demo/parquet_online_retail/inventory")
df.show()
运行结果图片

Case 1: DML MERGE/DELETE/UPDATE

%spark.sql

-- 创建库 DB_Demo_Inventory_OSS
DROP DATABASE if EXISTS DB_Demo_Inventory_OSS CASCADE;

-- 建表
CREATE DATABASE IF NOT EXISTS DB_Demo_Inventory_OSS LOCATION 'oss://databricks-demo/parquet_online_retail/inventory_database';
USE DB_Demo_Inventory_OSS;

-- 从parquet数据建表current_inventory
CREATE TABLE IF NOT EXISTS current_inventory USING PARQUET LOCATION 'oss://databricks-demo/parquet_online_retail/inventory';

-- 查看是否建表成功,简单SELECT语句验证
SHOW TABLES;
SELECT * FROM current_inventory LIMIT 10;

运行结果图片运行结果图片

INSERT 或 UPDATE parquet文件的七个步骤

在使用Databricks Delta之前,我们先来看下,在通常情况下如何插入或更新表记录:

  1. 确定需要插入的新记录

  2. 确定将会被替换的记录 (例如,更新updated)

  3. 确定不会被INSERT或UPDATE操作影响的记录

  4. 基于前面3步的结果,创建一张临时表

  5. 将原表删除(包括所有相关的数据文件)

  6. 对临时表重命名

  7. 删除临时表

merge-into-legacy

查看当前Parquet表的数据

%spark.sql

-- 查看某个StockCode下的数据
SELECT * FROM current_inventory WHERE StockCode IN ('21877', '21876')
运行结果图片

步骤1:向Parquet表中插入记录

%pyspark

# 创建2条记录,准备插入到表中并转换为DataFrame
items = [('2187709', 'RICE COOKER', 30, 50.04, 'United Kingdom'), ('2187631', 'PORCELAIN BOWL - BLACK', 10, 8.33, 'United Kingdom')]
cols = ['StockCode', 'Description', 'Quantity', 'UnitPrice', 'Country']

insert_rows = spark.createDataFrame(items, cols)
insert_rows.show()
运行结果图片

步骤2:更新Parquet表中的记录

%pyspark

# 创建2条记录,准备更新到表中并转换为DataFrame
items = [('21877', 'HOME SWEET HOME MUG', 300, 26.04, 'United Kingdom'), ('21876', 'POTTERING MUG', 1000, 48.33, 'United Kingdom')]
cols = ['StockCode', 'Description', 'Quantity', 'UnitPrice', 'Country']

update_rows = spark.createDataFrame(items, cols)
update_rows.show()
运行结果图片

步骤3:找到Parquet表中没有被变更过的记录

%pyspark

# 展示不会被以上数据更新的数据
unchanged_rows = spark.sql("select * from current_inventory where StockCode !='21877' and StockCode != '21876'")
unchanged_rows.show()
运行结果图片

步骤4:创建临时表

%pyspark

temp_current_inventory = insert_rows.union(update_rows).union(spark.sql("select * from current_inventory").filter("StockCode !='21877' and StockCode != '21876'"))
temp_current_inventory.show()
运行结果图片

步骤5:删除原始表

%spark.sql 

USE DB_Demo_Inventory_OSS;
DROP TABLE current_inventory;

步骤6:使用临时表的数据,生产新原始表current_inventory

%pyspark 

temp_current_inventory.write.format("parquet").mode("overwrite").saveAsTable("current_inventory")
spark.sql("select * from current_inventory").show()
运行结果

步骤7:删除临时表

%sql
drop table if exists temp_current_inventory;

使用Databricks Delta更新表数据仅需2步:

  1. 确定需要插入或更新的记录

  2. 使用MERGE

创建一个delta表

%sql 
DROP TABLE IF EXISTS current_inventory_delta;
CREATE TABLE current_inventory_delta
USING delta
AS SELECT * FROM current_inventory;

SHOW TABLES;

SELECT * FROM current_inventory_delta;

运行结果图片运行结果图片

步骤1:将目标数据更新至delta表

%pyspark

spark.read.option("header","True").csv("oss://databricks-demo/online_retail_mergetable.csv").createOrReplaceTempView("merge_table")
%sql

SELECT * FROM merge_table
运行结果图片

步骤2:使用MERGE插入或更新delta表

%sql
MERGE INTO current_inventory_delta as d
USING merge_table as m
on d.StockCode = m.StockCode and d.Country = m.Country
WHEN MATCHED THEN 
  UPDATE SET *
WHEN NOT MATCHED 
  THEN INSERT *

MERGE语句执行成功!

我们可以看到所有数据均被更新,新增数据也插入成功

%sql
select * from current_inventory_delta where StockCode in ('2187709', '2187631', '21877', '21876') and Country = 'United Kingdom'
运行结果图片

DELETE

同样,我们可以轻松删除Delta表中的记录

创建redItem

%pyspark

redItem = Row({'StockCode':'33REDff', 'Description':'ADDITIONAL RED ITEM', 'Quantity': '8', 'UnitPrice': '3.53', 'Country':'United Kingdom'})
redItemDF = spark.createDataFrame(redItem)
redItemDF.printSchema()

分别创建PARQUET表和DELTA表

%pyspark

redItemDF.write.format("delta").mode("append").saveAsTable("current_inventory_delta")
redItemDF.write.format("parquet").mode("append").saveAsTable("current_inventory_pq")

spark.sql("""select * from current_inventory_delta where StockCode = '33REDff'""").show()
运行结果图片

删除PARQUET表中的记录将会报错 – 与DELTA的不同之处

%sql 
delete from current_inventory_pq where StockCode = '33REDff';
运行结果图片

删除DELTA表中的记录 – 成功

%sql 

delete from current_inventory_delta where StockCode = '33REDff';
select * from current_inventory_delta where StockCode = '33REDff';
运行结果图片

Case 2:OPTIMIZE 和 ZORDER

通过OPTIMIZE和ZORDERZORDER对数据文件进行优化,以提高查询性能

查询DELTA表

%sql

select * from current_inventory_delta where Country = 'United Kingdom' and StockCode like '21%' and UnitPrice > 10
运行结果图片
%sql
OPTIMIZE current_inventory_delta
ZORDER by Country, StockCode;

select * from current_inventory_delta

运行结果图片运行结果图片

执行相同的查询 – 查询时间更短

实际执行时间与集群ECS规格的选择有关,与标准PARQUET表相比,通常会有5-10X性能提升,最快会有50X性能提升

%sql
select * from current_inventory_delta where Country = 'United Kingdom' and StockCode like '21%' and UnitPrice > 10
运行结果图片
%sql

-- delta的历史回溯功能
describe history current_inventory_delta;
运行结果图片