本文中含有需要您注意的重要提示信息,忽略该信息可能对您的业务造成影响,请务必仔细阅读。
前提条件
通过主账号登录阿里云 Databricks控制台。
已创建集群,具体请参见创建集群。
已使用OSS管理控制台创建非系统目录存储空间,详情请参见创建存储空间。
警告首次使用DDI产品创建的Bucket为系统目录Bucket,不建议存放数据,您需要再创建一个Bucket来读写数据。
说明DDI支持免密访问OSS路径,结构为:oss://BucketName/Object
BucketName为您的存储空间名称;
Object为上传到OSS上的文件的访问路径。
例:读取在存储空间名称为databricks-demo-hangzhou文件路径为demo/The_Sorrows_of_Young_Werther.txt的文件
// 从oss地址读取文本文档 val text = sc.textFile("oss://databricks-demo-hangzhou/demo/The_Sorrows_of_Young_Werther.txt")
如何使用Databricks Delta更新和查询商品库存信息
当执行UPSERT操作时,一般情况下,您需要执行两个不同的任务 - UPdate操作和inSERT操作。为了优化性能,Databricks Delta能够执行“upsert”或MERGE操作,从而简化业务逻辑。
在这个notebook中,我们将会演示2个Use Case:
DML: MERGE/DELETE/UPDATE。
如何通过`OPTIMIZE`` and `ZORDER`对查询性能进行优化。
步骤一:创建集群并通过knox账号访问Notebook
创建集群参考:创建集群,需注意要设置RAM子账号及保存好knox密码,登录WebUI时候需要用到。
步骤二:创建Notebook、导入数据、进行数据分析
%pyspark
# 将csv文件转化为parquet格式
# 注意文件读取和保存的路径请按照您的oss路径进行配置
spark.read.option("header", "true") \
.csv("oss://databricks-demo/online_retail.csv") \
.select("StockCode","Description","Quantity","UnitPrice","Country") \
.write.format("parquet").mode("overwrite") \
.save("oss://databricks-demo/parquet_online_retail/inventory")
# 从parquet文件导入DataFrame并查看
df = spark.read.parquet("oss://databricks-demo/parquet_online_retail/inventory")
df.show()
Case 1: DML MERGE/DELETE/UPDATE
%spark.sql
-- 创建库 DB_Demo_Inventory_OSS
DROP DATABASE if EXISTS DB_Demo_Inventory_OSS CASCADE;
-- 建表
CREATE DATABASE IF NOT EXISTS DB_Demo_Inventory_OSS LOCATION 'oss://databricks-demo/parquet_online_retail/inventory_database';
USE DB_Demo_Inventory_OSS;
-- 从parquet数据建表current_inventory
CREATE TABLE IF NOT EXISTS current_inventory USING PARQUET LOCATION 'oss://databricks-demo/parquet_online_retail/inventory';
-- 查看是否建表成功,简单SELECT语句验证
SHOW TABLES;
SELECT * FROM current_inventory LIMIT 10;
INSERT 或 UPDATE parquet文件的七个步骤
在使用Databricks Delta之前,我们先来看下,在通常情况下如何插入或更新表记录。
确定需要插入的新记录。
确定将会被替换的记录 (例如,更新updated)。
确定不会被INSERT或UPDATE操作影响的记录。
基于前面3步的结果,创建一张临时表。
将原表删除(包括所有相关的数据文件)。
对临时表重命名。
删除临时表。
查看当前Parquet表的数据
%spark.sql
-- 查看某个StockCode下的数据
SELECT * FROM current_inventory WHERE StockCode IN ('21877', '21876')
步骤1:向Parquet表中插入记录
%pyspark
# 创建2条记录,准备插入到表中并转换为DataFrame
items = [('2187709', 'RICE COOKER', 30, 50.04, 'United Kingdom'), ('2187631', 'PORCELAIN BOWL - BLACK', 10, 8.33, 'United Kingdom')]
cols = ['StockCode', 'Description', 'Quantity', 'UnitPrice', 'Country']
insert_rows = spark.createDataFrame(items, cols)
insert_rows.show()
步骤2:更新Parquet表中的记录
%pyspark
# 创建2条记录,准备更新到表中并转换为DataFrame
items = [('21877', 'HOME SWEET HOME MUG', 300, 26.04, 'United Kingdom'), ('21876', 'POTTERING MUG', 1000, 48.33, 'United Kingdom')]
cols = ['StockCode', 'Description', 'Quantity', 'UnitPrice', 'Country']
update_rows = spark.createDataFrame(items, cols)
update_rows.show()
步骤3:找到Parquet表中没有被变更过的记录
%pyspark
# 展示不会被以上数据更新的数据
unchanged_rows = spark.sql("select * from current_inventory where StockCode !='21877' and StockCode != '21876'")
unchanged_rows.show()
步骤4:创建临时表
%pyspark
temp_current_inventory = insert_rows.union(update_rows).union(spark.sql("select * from current_inventory").filter("StockCode !='21877' and StockCode != '21876'"))
temp_current_inventory.show()
步骤5:删除原始表
%spark.sql
USE DB_Demo_Inventory_OSS;
DROP TABLE current_inventory;
步骤6:使用临时表的数据,生产新原始表current_inventory
%pyspark
temp_current_inventory.write.format("parquet").mode("overwrite").saveAsTable("current_inventory")
spark.sql("select * from current_inventory").show()
步骤7:删除临时表
%sql
drop table if exists temp_current_inventory;
使用Databricks Delta更新表数据仅需2步:
确定需要插入或更新的记录。
使用MERGE。
创建一个delta表
%sql
DROP TABLE IF EXISTS current_inventory_delta;
CREATE TABLE current_inventory_delta
USING delta
AS SELECT * FROM current_inventory;
SHOW TABLES;
SELECT * FROM current_inventory_delta;
步骤1:将目标数据更新至delta表
%pyspark
spark.read.option("header","True").csv("oss://databricks-demo/online_retail_mergetable.csv").createOrReplaceTempView("merge_table")
%sql
SELECT * FROM merge_table
步骤2:使用MERGE插入或更新delta表
%sql
MERGE INTO current_inventory_delta as d
USING merge_table as m
on d.StockCode = m.StockCode and d.Country = m.Country
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED
THEN INSERT *
MERGE语句执行成功!
我们可以看到所有数据均被更新,新增数据也插入成功。
%sql
select * from current_inventory_delta where StockCode in ('2187709', '2187631', '21877', '21876') and Country = 'United Kingdom'
DELETE
同样,我们可以轻松删除Delta表中的记录。
创建redItem
%pyspark
redItem = Row({'StockCode':'33REDff', 'Description':'ADDITIONAL RED ITEM', 'Quantity': '8', 'UnitPrice': '3.53', 'Country':'United Kingdom'})
redItemDF = spark.createDataFrame(redItem)
redItemDF.printSchema()
分别创建PARQUET表和DELTA表
%pyspark
redItemDF.write.format("delta").mode("append").saveAsTable("current_inventory_delta")
redItemDF.write.format("parquet").mode("append").saveAsTable("current_inventory_pq")
spark.sql("""select * from current_inventory_delta where StockCode = '33REDff'""").show()
删除PARQUET表中的记录将会报错 – 与DELTA的不同之处
%sql
delete from current_inventory_pq where StockCode = '33REDff';
删除DELTA表中的记录 – 成功
%sql
delete from current_inventory_delta where StockCode = '33REDff';
select * from current_inventory_delta where StockCode = '33REDff';
Case 2:OPTIMIZE 和 ZORDER
通过OPTIMIZE和ZORDERZORDER
对数据文件进行优化,以提高查询性能。
查询DELTA表
%sql
select * from current_inventory_delta where Country = 'United Kingdom' and StockCode like '21%' and UnitPrice > 10
%sql
OPTIMIZE current_inventory_delta
ZORDER by Country, StockCode;
select * from current_inventory_delta
执行相同的查询 – 查询时间更短
实际执行时间与集群ECS规格的选择有关,与标准PARQUET表相比,通常会有5-10X
性能提升,最快会有50X性能提升。
%sql
select * from current_inventory_delta where Country = 'United Kingdom' and StockCode like '21%' and UnitPrice > 10
%sql
-- delta的历史回溯功能
describe history current_inventory_delta;