全部产品

    常见问题与数据倾斜处理

    更新时间:2020-09-14 14:33:40

    常见问题与数据倾斜处理

    常见问题与数据倾斜处理

    案例一:如何判断资源不足导致的作业停止

    现象:作业出现进度停滞,迟迟没有新的进度进展。

    现象

    原因分析:一般判断为资源不足。此时,可以通过Logview判断作业资源情况 ,即Task的Instance状态 。

    • Ready:表示等待任务调度系统资源分配,一旦得到资源就能启动起来。
    • Wait: 表示等待依赖的Task完成 。

    例如,在下图中的状态表明,目前没有资源来运行这些随时待命的task实例。一旦实例获得资源,就会转变为Running状态开始运行。

    解决方案:

    • 如果是作业的运行高峰导致的资源不足,可以让任务错峰运行,避免高峰 。
    • 如果是计算配额不足,确认该Project所用的Quota组,资源是否足够。
    • 如果是整个集群的计算资源长时间饱和,需要制定计算配额的分配策略,进行扩容。
    • 避免长时间占用资源不释放,导致作业堆积的异常的/不合理的大作业。
    • 启动SQL加速,对小作业运行有很大帮助,可避免走任务调度系统资源申请过程。
    • 抢占方式 or 先进先出。

    案例二:作业处理时间超长

    现象:MaxCompute作业进度长时间处于99%,运行了很久才完成。

    原因分析:MaxCompute作业中有几个fuxi Instance运行时间比其他fuxi Instance明显长 。

    原因分析

    进一步分析:通过logview分析作业job summary,查看慢的task的input records、output records的max和avg差值,如果max和avg相差几个数量级,可初步判断为作业数据倾斜。

    进一步分析

    解决方案:如果发现运行的慢的fuxi Instance集中在特定机器上,可以分析机器是否有硬件异常。

    案例三:MaxCompute并发度不够

    问题定位:对于Map task来说,并发度取决于以下两个规则:

    • split size和merge limit。

      Map的输入是一系列存储了数据的文件,对于较大的文件来说,我们会根据odps.sql.mapper.split.size的配置量来对大文件进行分片,默认该值是256M,有多少个分片就会起多少个instance。但是启动一个instance是需要耗费资源和时间的,所以为了提升单个instance的利用率,我们在处理较小的文件时,会根据odps.sql.mapper.merge.limit.size的配置量来合并小文件,默认该值是64M,即我们会合并大量小文件来由一个instance处理,但是合并总量的上限不会超过该值。

    • instance处理的数据不能跨分区。

      一个分区对应了底层盘古的一个文件夹,而目前对于一个分区的数据,我们至少需要一个instance来运行,即Instance处理的数据不能跨越分区。而在一个分区内,又必须遵照上一个规则来执行。

    对于Reduce来说,通常会根据上级Map任务的instance数1/4来启动instance,Join任务的instance数则与上级Map任务相同但不会超过1111。

    可以通过以下两种方法,配置加大reduce和join的并发instance数。

    set odps.sql.reducer.instances = xxx

    set odps.sql.joiner.instances = xxx

    需要提升并发度的场景:

    • 单条记录数据量较小的情况。

      由于单条记录数的数据量很小,导致同样大小的文件包含的记录数就会较多,如果还是按照256M的split size分片的话,那么单个Map instance需要处理的记录数就会较大,导致各条记录间的处理并发度降低。

    • Map/Reduce/Join阶段出现dump。

      通过前面对job summary的分析讲解,我们知道如果出现了dump信息,说明单个instance的内存已经无法一次性处理shuffle期间的排序。如果能提升并发度,就可以降低单个instance的数据处理量到内存可承受的范围内,从而省去磁盘的IO耗时提升处理速度。

    • 使用了处理耗时的UDF。

      由于UDF的处理非常耗时,如果能提升并发度,就可以并发地执行UDF,从而减少单个instance的UDF的处理时间。

    解决方案:

    • 提升map的并发度,可以通过降低以下两个参数的取值达到。

      odps.sql.mapper.split.size = xxx
      odps.sql.mapper.merge.limit.size = xxx

    • 提升reduce和join的并发度,可以通过加大以下两个参数的值达到。

      odps.sql.reducer.instances = xxx
      odps.sql.joiner.instances = xxx

    需要注意,并发度的提升是把双刃剑,它是需要消耗更多资源的,所以请做好资源成本控制与并发度的平衡,通常优化到instance平均时间为10分钟,这样整体资源利用率会较优,当然关键路径上的job可以优化到更短的时间。

    案例四:具体倾斜优化

    SQL中不同类型的数据倾斜可以采用不同的方式来处理。

    • group by倾斜

      由于group by的key分布不均匀,从而导致reduce端的数据倾斜。可以在SQL执行前设置防group倾斜的参数。

      set odps.sql.groupby.skewindata=true

      一旦该参数设置为true,系统会在进行Shuffle hash算法时自动加入随机因素,并通过引入一个新增task来防止数据倾斜问题。

    • distribute by倾斜

      例如想对全表做全排序,而采用常量来进行distribute by,从而导致reduce端的数据倾斜。通常需要避免此类做法。

    • join倾斜

      造成join倾斜的主要原因是join on所在的key分布不均匀,例如有个别key在join的个多表中有大量重复,从而导致个别join instance中的数据量以接近笛卡尔积式的数据量暴增。视场景有三种方案来解决join倾斜:

      • 如果join的两边有一个是小表,可以把join改成map join来处理。
      • 倾斜的key用单独的逻辑来处理,例如经常出现的一种情况是两边表on的key里有大量null数据导致了倾斜。则需要在join前先过滤掉null的数据或者通过case when将null值替换为某种随机值,然后再进行join。
      • 如果不想更改SQL,可以通过设置如下参数来让MaxCompute自动做优化。
        set odps.sql.skewinfo=tab1:(col1,col2)[(v1,v2),(v3,v4),...]
        set odps.sql.skewjoin=true;

    • muti-distinct倾斜

      多个distinct会放大group by数据倾斜问题,通常避免使用muti-distinct,可以采用两层group by来平缓数据倾斜问题。

    • UDF OOM

      一些job在运行时会报OOM的问题,报错信息如:FAILED: ODPS-0123144: Fuxi job failed - WorkerRestart errCode:9,errMsg:SigKill(OOM), usually caused by OOM(out of memory)。此时可以尝试通过设置UDF运行时参数解决,示例如下,仅供参考。

      odps.sql.mapper.memory=3072;
      set odps.sql.udf.jvm.memory=2048;
      set odps.sql.udf.python.memory=1536;

    相关的数据倾斜设置如下所示。

    set odps.sql.groupby.skewindata=true/false
    作用:开启group by优化。

    set odps.sql.skewjoin=true/false
    作用:开启join优化,必须设置odps.sql.skewinfo才有效。

    set odps.sql.skewinfo
    作用:设置join优化具体信息,格式如下。
    set odps.sql.skewinfo=skewed_src:(skewed_key)[("skewed_value")]
    src a join src_skewjoin1 b on a.key = b.key;
    相关示例如下,仅供参考。
    set odps.sql.skewinfo=src_skewjoin1:(key)[("0")]
    -- 针对单个字段单个倾斜数值,输出结果为explain select a.key c1, a.value c2, b.key c3, b.value c4 from src a join src_skewjoin1 b on a.key = b.key;
    set odps.sql.skewinfo=src_skewjoin1:(key)[("0")("1")]
    -- 针对单个字段多个倾斜数值,输出结果为explain select a.key c1, a.value c2, b.key c3, b.value c4 from src a join src_skewjoin1 b on a.key = b.key;

    案例五:常用SQL参数设置

    Map设置

    set odps.sql.mapper.cpu=100
    作用:设定处理map task每个instance的cpu数目,默认为100。[50,800]之间调整。

    set odps.sql.mapper.memory=1024
    作用:设定map task每个instance的memory大小,单位M,默认1024M。[256,12288]之间调整。

    set odps.sql.mapper.merge.limit.size=64
    作用:设定控制文件被合并的最大阈值,单位M,默认64M。用户可以通过控制这个变量,从而达到对map端输入的控制。[0,Integer.MAX_VALUE]之间调整。

    set odps.sql.mapper.split.size=256
    作用:设定一个map的最大数据输入量,单位M,默认256M。用户可以通过控制这个变量,从而达到对map端输入的控制。[1,Integer.MAX_VALUE]之间调整。

    Join设置

    set odps.sql.joiner.instances=-1
    作用:设定Join task的instance数量,默认为-1。[0,2000]之间调整。

    set odps.sql.joiner.cpu=100
    作用:设定Join task每个instance的cpu数目,默认为100。[50,800]之间调整。

    set odps.sql.joiner.memory=1024
    作用:设定Join task每个instance的memory大小,单位为M,默认为1024M。[256 ,12288]之间调整。

    Reduce设置

    set odps.sql.reducer.instances=-1
    作用:设定reduce task的instance数量,默认为-1。[0,2000]之间调整。

    set odps.sql.reducer.cpu=100
    作用:设定处理reduce task每个instance的cpu数目,默认为100。[50,800]之间调整。

    set odps.sql.reducer.memory=1024
    作用:设定reduce task每个instance的memory大小,单位M, 默认1024M。[256 ,12288]之间调整。

    UDF设置

    set odps.sql.udf.jvm.memory=1024
    作用:设定UDF jvm heap使用的最大内存,单位M,默认1024M。[256,12288]之间调整。

    set odps.sql.udf.timeout=600
    作用:设置UDF超时时间,默认为600 秒,单位秒。[0,3600]之间调整。

    set odps.sql.udf.python.memory=256
    作用:设定UDF python使用的最大内存,单位M,默认256M。[64,3072]之间调整。

    set odps.sql.udf.optimize.reuse=true/false
    作用:开启后,相同的UDF函数表达式,只计算一次,可以提高性能,默认为true。

    set odps.sql.udf.strict.mode=false/true
    作用:控制有些函数在遇到脏数据时是返回NULL还是报错,true是报错,flase是返回null。

    MapJoin设置

    set odps.sql.mapjoin.memory.max=512
    作用:设置mapjoin时小表的最大内存,默认512,单位M,[128,2048]之间调整动态分区设置。

    set odps.sql.reshuffle.dynamicpt=true/false
    作用:
    • 动态分区某些场景很慢,关闭可以加快SQL速度。
    • 如果动态分区值很少,关闭后可以避免出现数据倾斜。

    案例六:如何查看单个project的存储使用情况

    以project owner的身份打开MaxComputeConsole,运行desc project <project_name>-extended;,可以看到如下信息。

    存储信息
    从上图中可以看到该project中与容量相关的存储信息,其中相关指标的物理值与逻辑值之间的关系为:某指标物理值 = 某指标逻辑值 * 副本数。