使用Databricks Delta优化Spark作业性能

本文介绍如何使用Databricks Delta进行Spark作业的优化。

前提条件

已创建集群,详情请参见创建集群

集群应满足以下配置:

区域

详情

地域(Region)

华北2(北京)

集群规模

1个Master节点,5个Worker节点

ECS实例配置

配置如下:

  • CPU:32核

  • 内存:128GiB

  • ECS规格:ecs.g6.8xlarge

  • 数据盘配置:ESSD云盘300GB X 4块

  • 系统盘配置:ESSD云盘120GB X 1块

说明

ECS实例会因库存等原因和实际售卖页有出入。此处参数仅供参考,具体请您根据实际情况选择相应的实例规格进行测试。

OSS宽带

10Gbps

背景信息

Databricks数据洞察内置了Databricks商业版引擎,您可以利用Databricks数据洞察创建集群,实现在秒级响应时间内处理PB级别的数据。本文示例制造100亿条数据,利用Databricks Delta的Data Skipping和ZOEDER Clustering特性,对Spark作业进行改造,达到优化性能的目的。Databricks Delta详情请参见Processing Petabytes of Data in Seconds with Databricks Delta

配置Spark

  1. 使用阿里云账号登录Databricks数据洞察控制台

  2. 在Databricks数据洞察控制台页面,选择所在的地域(Region)。

    创建的集群将会在对应的地域内,一旦创建后不能修改。

  3. 在左侧导航栏中,单击集群

  4. 单击待配置集群所在行的详情

  5. 集群详情页面,单击上方的Spark配置

  6. 配置以下参数。

    1. 修改以下配置。

      参数

      描述

      spark.driver.cores

      4

      spark.driver.memory

      8G

      spark.executor.memory

      23G

    2. 新增以下配置。

      1. 在配置区域,单击spark-defaults页签。

      2. 单击右侧的自定义配置。

      参数

      描述

      spark.executor.cores

      3

      spark.executor.instances

      22

      spark.yarn.executor.memoryOverhead

      default

示例

  1. 准备数据。

    • 准备测试数据和query脚本。

      在集群中生成数据预计需要5小时,生成测试数据详情请参见Processing Petabytes of Data in Seconds with Databricks Delta

    • 准备五张表:

      • conn_random:delta格式表

      • conn_random_parquet:parquet格式表

      • conn_optimize:经过OPTIMIZE的表,主要是Compaction

      • conn_zorder_only_ip:ZORDER BY (src_ip, dst_ip)

      • conn_zorder:ZORDER BY (src_ip, src_port, dst_ip, dst_port)

  2. 使用OPTIMIZE命令进行优化。

    详细代码如下:

    import spark.implicits._
    
    val seed = 0
    val numRecords = 10*1000*1000*1000L
    val numFiles = 1000*1000
    
    val baseLocation = "oss://mytest/records-10m(1000)3-(1000)2/data/random/"
    
    val dbName = s"mdc_random_$numFiles"
    val connRandom = "conn_random"
    val connRandomParquet = "conn_random_parquet"
    // val connSorted = "conn_sorted"
    val connOptimize = "conn_optimize"
    val connZorderOnlyIp = "conn_zorder_only_ip"
    val connZorder = "conn_zorder"
    
    spark.conf.set("spark.sql.shuffle.partitions", numFiles)
    spark.conf.get("spark.sql.shuffle.partitions")
    
    sql(s"drop database if exists $dbName cascade")
    sql(s"create database if not exists $dbName")
    sql(s"use $dbName")
    sql(s"show tables").show(false)
    
    import scala.util.Random
    
    case class ConnRecord(src_ip: String, src_port: Int, dst_ip: String, dst_port: Int)
    
    // 生成数据
    def randomIPv4(r: Random) = Seq.fill(4)(r.nextInt(256)).mkString(".")
    def randomPort(r: Random) = r.nextInt(65536)
    
    def randomConnRecord(r: Random) = ConnRecord(
      src_ip = randomIPv4(r), src_port = randomPort(r),
      dst_ip = randomIPv4(r), dst_port = randomPort(r))
    
    val df = spark.range(0, numFiles, 1, numFiles).mapPartitions { it =>
      val partitionID = it.toStream.head
      val r = new Random(seed = partitionID)
      Iterator.fill((numRecords / numFiles).toInt)(randomConnRecord(r))
    }
    
    // 生成数据表
    df.write
    .mode("overwrite")
    .format("delta")
    .option("path", baseLocation + connRandom)
    .saveAsTable(connRandom)
    
    df.write
    .mode("overwrite")
    .format("parquet")
    .option("path", baseLocation + connRandomParquet)
    .saveAsTable(connRandomParquet)
    
    spark.read.table(connRandom)
    .write
    .mode("overwrite")
    .format("delta")
    .option("path", baseLocation + connOptimize)
    .saveAsTable(connOptimize)
    
    spark.read.table(connRandom)
    .write
    .mode("overwrite")
    .format("delta")
    .option("path", baseLocation + connZorderOnlyIp)
    .saveAsTable(connZorderOnlyIp)
    
    spark.read.table(connRandom)
    .write
    .mode("overwrite")
    .format("delta")
    .option("path", baseLocation + connZorder)
    .saveAsTable(connZorder)
    
    spark.conf.set("spark.databricks.io.skipping.mdc.addNoise", "false")
    
    // OPTIMIZE优化命令
    sql(s"OPTIMIZE '${baseLocation + connOptimize}'")
    sql(s"OPTIMIZE '${baseLocation + connZorderOnlyIp}' ZORDER BY (src_ip, dst_ip)")
    sql(s"OPTIMIZE '${baseLocation + connZorder}' ZORDER BY (src_ip, src_port, dst_ip, dst_port)")

  3. 验证Spark SQL。

    select count(*) from conn_random where src_ip like '157%' and dst_ip like '216.%';
    
    select count(*) from conn_random_parquet where src_ip like '157%' and dst_ip like '216.%';
    
    select count(*) from conn_optimize where src_ip like '157%' and dst_ip like '216.%';
    
    select count(*) from conn_zorder_only_ip where src_ip like '157%' and dst_ip like '216.%';
    
    select count(*) from conn_zorder where src_ip like '157%' and dst_ip like '216.%';

测试结论

本示例各表情况如下。

表名称

时间(s)

conn_random_parquet

2504

conn_random

2324

conn_optimize

112

conn_zorder

65

conn_zorder_only_ip

46

说明

通过以上示例,可以发现:

  • 经过OPTIMIZE的表,文件大小会在1G左右,而且进行了delta元数据的优化,提高了data-skipping的效率,在性能上提升约20倍(2504/112=22X)。

  • Zorder使得data-skipping的优化效果进一步深化,性能提升约40倍(2504/65=38X)。

  • 当Zorder列是查询列时,优化效果会更加明显,实验显示性能提升约50倍(2504/46=54X)。

问题反馈

您在使用阿里云Databricks数据洞察过程中有任何疑问,欢迎用钉钉扫描下面的二维码加入钉钉群进行反馈。

产品钉钉群