您可以通过调整内存、CPU和Task个数等,实现对Hive作业的调优。本文为您介绍如何调优Hive作业。
作业调优方案
作业调优方向 | 调优方案 |
参数调优 | |
代码优化 |
代码优化
数据清洗
读取表时分区过滤,避免全表扫描。
数据过滤之后再JOIN。
重复使用数据时,避免重复计算,构建中间表,重复使用中间表。
多Distinct优化
优化前代码
多个Distinct时,数据会出现膨胀。
select k,count(distinct case when a > 1 then user_id end) user1, count(distinct case when a > 2 then user_id end) user2, count(distinct case when a > 3 then user_id end) user3, count(distinct case when a > 4 then user_id end) user4 from t group by k;
优化后代码
通过两次Group By的方式代替Distinct操作,通过内层的Group By去重并降低数据量,通过外层的Group By取sum,即可实现Distinct的效果。
select k,sum(case when user1 > 0 then 1 else 0 end) as user1, sum(case when user2 > 0 then 1 else 0 end) as user2, sum(case when user3 > 0 then 1 else 0 end) as user3, sum(case when user4 > 0 then 1 else 0 end) as user4 from (select k,user_id,count(case when a > 1 then user_id end) user1, count(case when a > 2 then user_id end) user2, count(case when a > 3 then user_id end) user3, count(case when a > 4 then user_id end) user4 from t group by k,user_id ) tmp group by k;
数据倾斜
热点Key处理方式如下:
如果是Group By出现热点,请按照以下方法操作:
先开启Map端聚合。
set hive.map.aggr=true; set hive.groupby.mapaggr.checkinterval=100000;(用于设定Map端进行聚合操作的条目数)
可以对Key随机化打散,多次聚合,或者直接设置。
set hive.groupby.skewindata=true;
当选项设定为true时,生成的查询计划有两个MapReduce任务。在第一个MapReduce中,Map的输出结果集合会随机分布到Reduce中, 每个部分进行聚合操作,并输出结果。这样处理的结果是,相同的Group By Key有可能分发到不同的Reduce中,从而达到负载均衡的目的;第二个MapReduce任务再根据预处理的数据结果按照Group By Key分布到Reduce中(这个过程可以保证相同的Group By Key分布到同一个Reduce中),最后完成最终的聚合操作。
如果两个大表进行JOIN操作时,出现热点,则使用热点Key随机化。
例如,log表存在大量user_id为null的记录,但是表bmw_users中不会存在user_id为空,则可以把null随机化再关联,这样就避免null值都分发到一个Reduce Task上。代码示例如下。
SELECT * FROM log a LEFT OUTER JOIN bmw_users b ON CASE WHEN a.user_id IS NULL THEN CONCAT('dp_hive',RAND()) ELSE a.user_id=b.user_id END;
如果大表和小表进行JOIN操作时,出现热点,则使用MAP JOIN。
内存参数
您可以通过设置以下参数,对Map和Reduce阶段的内存进行调优:
Map阶段
参数
描述
示例
mapreduce.map.java.opts
默认参数,表示JVM堆内存。
-Xmx2048m
mapreduce.map.memory.mb
默认参数,表示整个JVM进程占用的内存,计算方法为
堆内存+堆外内存=2048+256
。2304
Reduce阶段
参数
描述
示例
mapreduce.reduce.java.opts
默认参数,表示JVM堆内存。
-Xmx2048m
mapreduce.reduce.memory.mb
默认参数,表示整个JVM进程占用的内存,计算方法为
堆内存+堆外内存=2048+256
。2304
CPU参数
您可以通过设置以下参数,对Map和Reduce任务的CPU进行调优。
参数 | 描述 |
mapreduce.map.cpu.vcores | 每个Map任务可用的最多的CPU Core数目。 |
mapreduce.reduce.cpu.vcores | 每个Reduce任务可用的最多的CPU Core数目。 说明 此设置在公平队列是不生效的,通常vCores用于较大的集群,以限制不同用户或应用程序的CPU。 |
Task数量优化
Map Task数量优化
在分布式计算系统中,决定Map数量的一个因素就是原始数据,在不加干预的情况下,原始数据有多少个块,就可能有多少个起始的Task,因为每个Task对应读取一个块的数据;当然这个也不是绝对的,当文件数量特别多,并且每个文件的大小特别小时,您就可以限制减少初始Map对相应的Task的数量,以减少计算资源的浪费,如果文件数量较少,但是单个文件较大,您可以增加Map的Task的数量,以减小单个Task的压力。
通常,Map Task数量是由mapred.map.tasks、mapred.min.split.size和dfs.block.size决定的。
Hive的文件基本上都是存储在HDFS上,而HDFS上的文件,都是分块的,所以具体的Hive数据文件在HDFS上分多少块,可能对应的是默认Hive起始的Task的数量,使用default_mapper_num参数表示。使用数据总大小除以dfs默认的最大块大小来决定初始默认数据分区数。
初始默认的Map Task数量,具体公式如下。
default_mapper_num = total_size/dfs.block.size
计算Split的size,具体公式如下。
default_split_size = max(mapred.min.split.size, min(mapred.max.split.size, dfs.block.size))
上述公式中的mapred.min.split.size和mapred.max.split.size,分别为Hive计算的时Split的最小值和最大值。
将数据按照计算出来的size划分为数据块,具体公式如下。
split_num = total_size/default_split_size;
计算的Map Task数量,具体公式如下。
map_task_num = min(split_num, max(mapred.map.tasks, default_mapper_num))
从上面的过程来看,Task的数量受各个方面的限制,不至于Task的数量太多,也不至于Task的数量太少。如果需要提高Task数量,就要降低mapred.min.split.size的数值,在一定的范围内可以减小default_split_size的数值,从而增加split_num的数量,也可以增大mapred.map.tasks的数量。
重要Hive on TEZ和Hive on MR使用是有差异的。例如,在Hive中执行一个Query时,可以发现Hive的执行引擎在使用Tez与MR时,两者生成的mapper数量差异较大。主要原因在于Tez中对inputSplit做了grouping操作,可以将多个inputSplit组合成更少的groups,然后为每个group生成一个mapper任务,而不是为每个inputSplit生成一个mapper任务。
Reduce Task数量优化
通过hive.exec.reducers.bytes.per.reducer参数控制单个Reduce处理的字节数。
Reduce的计算方法如下。
reducer_num = min(total_size/hive.exec.reducers.bytes.per.reducers, hive.exec.reducers.max)。
通过mapred.reduce.tasks参数来设置Reduce Task的数量。
说明在TEZ引擎模式下,通过命令
set hive.tez.auto.reducer.parallelism = true;
,TEZ将会根据vertice的输出大小动态预估调整Reduce Task的数量。同Map一样,启动和初始化Reduce也会消耗时间和资源。另外,有多少个Reduce,就会有多少个输出文件,如果生成了很多个小文件,并且这些小文件作为下一个任务的输入,则会出现小文件过多的问题。
并行运行
并行运行表示同步执行Hive的多个阶段。Hive在执行过程中,将一个查询转化成一个或者多个阶段。某个特定的Job可能包含多个阶段,而这些阶段可能并非完全相互依赖的,也就是可以并行运行的,这样可以使得整个Job的运行时间缩短。
您可以通过设置以下参数,控制不同的作业是否可以同时运行。
参数 | 描述 |
hive.exec.parallel | 默认值为false。设置true时,表示允许任务并行运行。 |
hive.exec.parallel.thread.number | 默认值为8。表示允许同时运行线程的最大值。 |
Fetch task
您可以通过设置以下参数,在执行查询等语句时,不执行MapReduce程序,以减少等待时间。
参数 | 描述 |
hive.fetch.task.conversion | 默认值为none。参数取值如下:
|
开启向量化
您可以通过设置以下参数,在执行查询时启用向量化执行,以提升查询性能。
参数 | 描述 |
hive.vectorized.execution.enabled | 默认值为true。开启向量化查询的开关。 |
hive.vectorized.execution.reduce.enabled | 默认值为true。表示是否启用Reduce任务的向量化执行模式。 |
合并小文件
大量小文件容易在文件存储端造成瓶颈,影响处理效率。对此,您可以通过合并Map和Reduce的结果文件来处理。
您可以通过设置以下参数,合并小文件。
参数 | 描述 |
hive.merge.mapfiles | 默认值为true。表示是否合并Map输出文件。 |
hive.merge.mapredfiles | 默认值为false。表示是否合并Reduce输出文件。 |
hive.merge.size.per.task | 默认值为256000000,单位字节。表示合并文件的大小。 |