本文介绍如何使用Databricks Delta进行Spark作业的优化。
前提条件
已创建集群,详情请参见创建集群。
集群应满足以下配置:
区域 | 详情 |
---|---|
地域(Region) | 华北2(北京) |
集群规模 | 1个Master节点,5个Worker节点 |
ECS实例配置 | 配置如下:
说明 ECS实例会因库存等原因和实际售卖页有出入。此处参数仅供参考,具体请您根据实际情况选择相应的实例规格进行测试。 |
OSS宽带 | 10Gbps |
背景信息
Databricks数据洞察内置了Databricks商业版引擎,您可以利用Databricks数据洞察创建集群,实现在秒级响应时间内处理PB级别的数据。本文示例制造100亿条数据,利用Databricks Delta的Data Skipping和ZOEDER Clustering特性,对Spark作业进行改造,达到优化性能的目的。Databricks Delta详情请参见Processing Petabytes of Data in Seconds with Databricks Delta。
配置Spark
使用阿里云账号登录Databricks数据洞察控制台。
在Databricks数据洞察控制台页面,选择所在的地域(Region)。
创建的集群将会在对应的地域内,一旦创建后不能修改。
在左侧导航栏中,单击集群。
单击待配置集群所在行的详情。
在集群详情页面,单击上方的Spark配置。
配置以下参数。
修改以下配置。
参数
描述
spark.driver.cores
4
spark.driver.memory
8G
spark.executor.memory
23G
新增以下配置。
在配置区域,单击spark-defaults页签。
单击右侧的自定义配置。
参数
描述
spark.executor.cores
3
spark.executor.instances
22
spark.yarn.executor.memoryOverhead
default
示例
准备数据。
准备测试数据和query脚本。
在集群中生成数据预计需要5小时,生成测试数据详情请参见Processing Petabytes of Data in Seconds with Databricks Delta。
准备五张表:
conn_random:delta格式表
conn_random_parquet:parquet格式表
conn_optimize:经过OPTIMIZE的表,主要是Compaction
conn_zorder_only_ip:ZORDER BY (src_ip, dst_ip)
conn_zorder:ZORDER BY (src_ip, src_port, dst_ip, dst_port)
使用OPTIMIZE命令进行优化。
详细代码如下:
import spark.implicits._ val seed = 0 val numRecords = 10*1000*1000*1000L val numFiles = 1000*1000 val baseLocation = "oss://mytest/records-10m(1000)3-(1000)2/data/random/" val dbName = s"mdc_random_$numFiles" val connRandom = "conn_random" val connRandomParquet = "conn_random_parquet" // val connSorted = "conn_sorted" val connOptimize = "conn_optimize" val connZorderOnlyIp = "conn_zorder_only_ip" val connZorder = "conn_zorder" spark.conf.set("spark.sql.shuffle.partitions", numFiles) spark.conf.get("spark.sql.shuffle.partitions") sql(s"drop database if exists $dbName cascade") sql(s"create database if not exists $dbName") sql(s"use $dbName") sql(s"show tables").show(false) import scala.util.Random case class ConnRecord(src_ip: String, src_port: Int, dst_ip: String, dst_port: Int) // 生成数据 def randomIPv4(r: Random) = Seq.fill(4)(r.nextInt(256)).mkString(".") def randomPort(r: Random) = r.nextInt(65536) def randomConnRecord(r: Random) = ConnRecord( src_ip = randomIPv4(r), src_port = randomPort(r), dst_ip = randomIPv4(r), dst_port = randomPort(r)) val df = spark.range(0, numFiles, 1, numFiles).mapPartitions { it => val partitionID = it.toStream.head val r = new Random(seed = partitionID) Iterator.fill((numRecords / numFiles).toInt)(randomConnRecord(r)) } // 生成数据表 df.write .mode("overwrite") .format("delta") .option("path", baseLocation + connRandom) .saveAsTable(connRandom) df.write .mode("overwrite") .format("parquet") .option("path", baseLocation + connRandomParquet) .saveAsTable(connRandomParquet) spark.read.table(connRandom) .write .mode("overwrite") .format("delta") .option("path", baseLocation + connOptimize) .saveAsTable(connOptimize) spark.read.table(connRandom) .write .mode("overwrite") .format("delta") .option("path", baseLocation + connZorderOnlyIp) .saveAsTable(connZorderOnlyIp) spark.read.table(connRandom) .write .mode("overwrite") .format("delta") .option("path", baseLocation + connZorder) .saveAsTable(connZorder) spark.conf.set("spark.databricks.io.skipping.mdc.addNoise", "false") // OPTIMIZE优化命令 sql(s"OPTIMIZE '${baseLocation + connOptimize}'") sql(s"OPTIMIZE '${baseLocation + connZorderOnlyIp}' ZORDER BY (src_ip, dst_ip)") sql(s"OPTIMIZE '${baseLocation + connZorder}' ZORDER BY (src_ip, src_port, dst_ip, dst_port)")
验证Spark SQL。
select count(*) from conn_random where src_ip like '157%' and dst_ip like '216.%'; select count(*) from conn_random_parquet where src_ip like '157%' and dst_ip like '216.%'; select count(*) from conn_optimize where src_ip like '157%' and dst_ip like '216.%'; select count(*) from conn_zorder_only_ip where src_ip like '157%' and dst_ip like '216.%'; select count(*) from conn_zorder where src_ip like '157%' and dst_ip like '216.%';
测试结论
本示例各表情况如下。
表名称 | 时间(s) |
---|---|
conn_random_parquet | 2504 |
conn_random | 2324 |
conn_optimize | 112 |
conn_zorder | 65 |
conn_zorder_only_ip | 46 |
通过以上示例,可以发现:
经过OPTIMIZE的表,文件大小会在1G左右,而且进行了delta元数据的优化,提高了data-skipping的效率,在性能上提升约20倍(2504/112=22X)。
Zorder使得data-skipping的优化效果进一步深化,性能提升约40倍(2504/65=38X)。
当Zorder列是查询列时,优化效果会更加明显,实验显示性能提升约50倍(2504/46=54X)。
问题反馈
您在使用阿里云Databricks数据洞察过程中有任何疑问,欢迎用钉钉扫描下面的二维码加入钉钉群进行反馈。