Delta Lake 快速开始一

重要

本文中含有需要您注意的重要提示信息,忽略该信息可能对您的业务造成影响,请务必仔细阅读。

该Notebook展示了如何将JSON数据转换为Delta Lake格式,创建Delta表,在Delta表中Append数据,最后使用Delta Lake元数据命令来显示表的历史记录、格式和详细信息。

前提条件

  1. 通过主账号登录阿里云 Databricks控制台

  2. 已创建集群,具体请参见创建集群

  3. 已使用OSS管理控制台创建非系统目录存储空间,详情请参见创建存储空间

    警告

    首次使用DDI产品创建的Bucket为系统目录Bucket,不建议存放数据,您需要再创建一个Bucket来读写数据。

    说明

    DDI支持免密访问OSS路径,结构为:oss://BucketName/Object

    • BucketName为您的存储空间名称;

    • Object为上传到OSS上的文件的访问路径。

    例:读取在存储空间名称为databricks-demo-hangzhou文件路径为demo/The_Sorrows_of_Young_Werther.txt的文件

    // 从oss地址读取文本文档
    val text = sc.textFile("oss://databricks-demo-hangzhou/demo/The_Sorrows_of_Young_Werther.txt")

详情可参考Databricks官网Blog文章

步骤一:创建集群并通过knox账号访问Notebook

创建集群参考:创建集群,需注意要设置RAM子账号及保存好knox密码,登录WebUI时候需要用到。

步骤二:创建Notebook、导入数据、进行数据分析

  1. 定义Notebook中使用的路径path

    %pyspark
    
    # 注意需要将数据文件events.json上传至您的OSS对应bucket下,events.json数据来源Databricks站点的open/close数据
    inputPath = "oss://databricks-huhehaote/delta-demo/events.json"
    deltaPath = "/delta/events"
    database = "Delta_QuickStart_Database"

  2. 导入数据到Dataframe中 & 打印数据data

    %pyspark
    
    from pyspark.sql.functions import expr
    from pyspark.sql.functions import from_unixtime
    
    # spark.read读取json数据,并将表头time转换为date格式
    events = spark.read \
      .option("inferSchema", "true") \
      .json(inputPath) \
      .withColumn("date", expr("time")) \
      .drop("time") \
      .withColumn("date", from_unixtime("date", 'yyyy-MM-dd'))
    events.show()
    运行结果
  3. 将数据使用Delta格式写入

    %pyspark
    events.write.format("delta").mode("overwrite").partitionBy("date").save(deltaPath)

  4. 再次读取数据查看是否成功保存

    %pyspark
    events_delta = spark.read.format("delta").load(deltaPath)
    events_delta.printSchema()
  5. 重置数据库

    %pyspark
    
    spark.sql("DROP DATABASE IF EXISTS {} CASCADE".format(database)) # 注意{}是在pyspark里spark.sql()中使用的变量,参数在.format中指定 (参考:https://stackoverflow.com/questions/44582450/how-to-pass-variables-in-spark-sql-using-python)
    spark.sql("CREATE DATABASE IF NOT EXISTS {}".format(database))

  6. 使用Delta创建表

    %pyspark
    
    spark.sql("USE {}".format(database))
    spark.sql("CREATE TABLE events USING DELTA LOCATION \"{}\"".format(deltaPath))

  7. 查看表中的数据

    %sql
    select * from events limit 10;
    查看表数据
  8. 对数据执行一个简单的count

    %pyspark
    
    events_delta.count()
  9. 查看events详情

    %sql
    DESCRIBE DETAIL events;

  10. 查看表历史,这个功能只有Delta表中可用

    %sql
    
    DESCRIBE HISTORY events
    查看表历史

  11. 做一个聚合(aggregation)操作并将结果保存在一个临时表中

    %pyspark
    
    from pyspark.sql.functions import count
    events_delta.groupBy("action","date").agg(count("action").alias("action_count")).orderBy("date", "action").createOrReplaceTempView("tempDisplay")

  12. 可视化该临时表

    %sql
    
    select * from tempDisplay
    可视化表

  13. 我们可以很方便的使用append模式追加新数据

    %pyspark
    
    # 该操作主要将数据中时间前移2天(172800秒)
    historical_events = spark.read \
      .option("inferSchema", "true") \
      .json(inputPath) \
      .withColumn("date", expr("time-172800")) \
      .drop("time") \
      .withColumn("date", from_unixtime("date", 'yyyy-MM-dd'))
      
    historical_events.write.format("delta").mode("append").partitionBy("date").save(deltaPath)
    
    historical_events.show()
    追加数据
  14. 聚合(aggregate)并查看新数据

    %pyspark
    
    events_delta.groupBy("action","date").agg(count("action").alias("action_count")).orderBy("date", "action").createOrReplaceTempView("tempDisplay2")

  15. 可视化新表

    %sql
    
    select * from tempDisplay2
    可视化新表
  16. 执行count操作查看新append数据行

    %pyspark 
    
    events_delta.count()

  17. Describe表来展示表详情,并能够看到表中有6个文件

    %sql
    DESCRIBE DETAIL events
    详情展示
  18. Databricks支持优化(OPTIMIZE)合并文件以提升性能

    %pyspark
    
    spark.sql("OPTIMIZE events")

  19. 可以看到优化(OPTIMIZE)命令也在事务日志中增加了日志(z-order)

    %sql
    
    DESCRIBE HISTORY events
    优化

  20. 优化后,文件被自动合并做性能优化,表中只有5个文件

    %sql
    
    DESCRIBE DETAIL events
    优化后
  21. Describe formatted 命令也在DDI中支持

    %sql
    
    DESCRIBE FORMATTED events
    Decribe formatted

附录

清理数据库

%sql
/*
DROP DATABASE IF EXISTS Delta_QuickStart_Database1 CASCADE;
*/