本文中含有需要您注意的重要提示信息,忽略该信息可能对您的业务造成影响,请务必仔细阅读。
该Notebook展示了如何将JSON数据转换为Delta Lake格式,创建Delta表,在Delta表中Append数据,最后使用Delta Lake元数据命令来显示表的历史记录、格式和详细信息。
前提条件
通过主账号登录阿里云 Databricks控制台。
已创建集群,具体请参见创建集群。
已使用OSS管理控制台创建非系统目录存储空间,详情请参见创建存储空间。
警告首次使用DDI产品创建的Bucket为系统目录Bucket,不建议存放数据,您需要再创建一个Bucket来读写数据。
说明DDI支持免密访问OSS路径,结构为:oss://BucketName/Object
BucketName为您的存储空间名称;
Object为上传到OSS上的文件的访问路径。
例:读取在存储空间名称为databricks-demo-hangzhou文件路径为demo/The_Sorrows_of_Young_Werther.txt的文件
// 从oss地址读取文本文档 val text = sc.textFile("oss://databricks-demo-hangzhou/demo/The_Sorrows_of_Young_Werther.txt")
详情可参考Databricks官网Blog文章
步骤一:创建集群并通过knox账号访问Notebook
创建集群参考:创建集群,需注意要设置RAM子账号及保存好knox密码,登录WebUI时候需要用到。
步骤二:创建Notebook、导入数据、进行数据分析
定义Notebook中使用的路径path
%pyspark # 注意需要将数据文件events.json上传至您的OSS对应bucket下,events.json数据来源Databricks站点的open/close数据 inputPath = "oss://databricks-huhehaote/delta-demo/events.json" deltaPath = "/delta/events" database = "Delta_QuickStart_Database"
导入数据到Dataframe中 & 打印数据data
%pyspark from pyspark.sql.functions import expr from pyspark.sql.functions import from_unixtime # spark.read读取json数据,并将表头time转换为date格式 events = spark.read \ .option("inferSchema", "true") \ .json(inputPath) \ .withColumn("date", expr("time")) \ .drop("time") \ .withColumn("date", from_unixtime("date", 'yyyy-MM-dd')) events.show()
将数据使用Delta格式写入
%pyspark events.write.format("delta").mode("overwrite").partitionBy("date").save(deltaPath)
再次读取数据查看是否成功保存
%pyspark events_delta = spark.read.format("delta").load(deltaPath) events_delta.printSchema()
重置数据库
%pyspark spark.sql("DROP DATABASE IF EXISTS {} CASCADE".format(database)) # 注意{}是在pyspark里spark.sql()中使用的变量,参数在.format中指定 (参考:https://stackoverflow.com/questions/44582450/how-to-pass-variables-in-spark-sql-using-python) spark.sql("CREATE DATABASE IF NOT EXISTS {}".format(database))
使用Delta创建表
%pyspark spark.sql("USE {}".format(database)) spark.sql("CREATE TABLE events USING DELTA LOCATION \"{}\"".format(deltaPath))
查看表中的数据
%sql select * from events limit 10;
对数据执行一个简单的count
%pyspark events_delta.count()
查看events详情
%sql DESCRIBE DETAIL events;
查看表历史,这个功能只有Delta表中可用
%sql DESCRIBE HISTORY events
做一个聚合(aggregation)操作并将结果保存在一个临时表中
%pyspark from pyspark.sql.functions import count events_delta.groupBy("action","date").agg(count("action").alias("action_count")).orderBy("date", "action").createOrReplaceTempView("tempDisplay")
可视化该临时表
%sql select * from tempDisplay
我们可以很方便的使用append模式追加新数据
%pyspark # 该操作主要将数据中时间前移2天(172800秒) historical_events = spark.read \ .option("inferSchema", "true") \ .json(inputPath) \ .withColumn("date", expr("time-172800")) \ .drop("time") \ .withColumn("date", from_unixtime("date", 'yyyy-MM-dd')) historical_events.write.format("delta").mode("append").partitionBy("date").save(deltaPath) historical_events.show()
聚合(aggregate)并查看新数据
%pyspark events_delta.groupBy("action","date").agg(count("action").alias("action_count")).orderBy("date", "action").createOrReplaceTempView("tempDisplay2")
可视化新表
%sql select * from tempDisplay2
执行count操作查看新append数据行
%pyspark events_delta.count()
Describe表来展示表详情,并能够看到表中有6个文件
%sql DESCRIBE DETAIL events
Databricks支持优化(OPTIMIZE)合并文件以提升性能
%pyspark spark.sql("OPTIMIZE events")
可以看到优化(OPTIMIZE)命令也在事务日志中增加了日志(z-order)
%sql DESCRIBE HISTORY events
优化后,文件被自动合并做性能优化,表中只有5个文件
%sql DESCRIBE DETAIL events
Describe formatted 命令也在DDI中支持
%sql DESCRIBE FORMATTED events
附录
清理数据库
%sql
/*
DROP DATABASE IF EXISTS Delta_QuickStart_Database1 CASCADE;
*/