长尾问题是分布式计算中最常见的问题之一,也是典型的疑难杂症。究其原因,是因为数据分布不均,导致各个节点的工作量不同,整个任务需要等最慢的节点完成才能结束。

处理这类问题的思路就是把工作分给多个 Worker 去执行,而不是一个 Worker 单独运行最重的那份工作。本文将为您介绍平时工作中遇到的一些典型的长尾问题的场景及其解决方案。

Join 长尾

问题原因

Join 出现长尾,是因为 Join 时出现某个 Key 里的数据特别多的情况。

解决办法

您可以从以下四方面进行考虑:

  • 排除两张表都是小表的情况,若两张表里有一张大表和一张小表,可以考虑使用 mapjoin,对小表进行缓存,具体的语法和说明请参见Select语法介绍。如果是 MapReduce 作业,可以使用资源表的功能,对小表进行缓存。

  • 但是如果两张表都比较大,就需要先尽量去重。

  • 若还是不能解决,就需要从业务上考虑,为什么会有这样的两个大数据量的 Key 要做笛卡尔积,直接考虑从业务上进行优化。

  • 小表leftjoin大表,odps直接leftjoin较慢。此时可以先小表和大表mapjoin,这样能拿到小表和大表的交集中间表,且这个中间表一定是不大于大表的(只要不是有很大的key倾斜就不会膨胀的很大)。然后小表再和这个中间表进行leftjoin,这样效果等于小表leftjoin大表。

Group By 长尾

问题原因

Group By Key 出现长尾,是因为某个 Key 内的计算量特别大。

解决办法

您可以通过以下两种方法解决:

  • 可对 SQL 进行改写,添加随机数,把长 Key 进行拆分。如下所示:

    SELECT Key,COUNT(*) AS Cnt FROM TableName GROUP BY Key;

    不考虑 Combiner,M 节点会 Shuffle 到 R 上,然后 R 再做 Count 操作。对应的执行计划是 M > R。但是如果对长尾的 Key 再做一次工作再分配,就变成如下语句:

    -- 假设长尾的Key已经找到是KEY001
    SELECT a.Key
      , SUM(a.Cnt) AS Cnt
    FROM (
      SELECT Key
        , COUNT(*) AS Cnt
    FROM TableName
    GROUP BY Key, 
      CASE 
        WHEN Key = 'KEY001' THEN Hash(Random()) % 50
        ELSE 0
       END
    ) a
    GROUP BY a.Key;

    由上可见,这次的执行计划变成了 M > R > R。虽然执行的步骤变长了,但是长尾的 Key 经过 2 个步骤的处理,整体的时间消耗可能反而有所减少。

    说明

    若数据的长尾并不严重,用这种方法人为地增加一次 R 的过程,最终的时间消耗可能反而更大。

  • 使用通用的优化策略 — 系统参数,设置如下:

    set odps.sql.groupby.skewindata=true。

    但是通用性的优化策略无法针对具体的业务进行分析,得出的结果不总是最优的。您可以根据实际的数据情况,用更加高效的方法来改写 SQL。

Distinct 长尾

对于 Distinct,上述 Group By 长尾时,把长 Key 进行拆分的策略已经不生效了。对这种场景,您可以考虑通过其他方式解决。

解决办法

--原始SQL,不考虑Uid为空
SELECT COUNT(uid) AS Pv
    , COUNT(DISTINCT uid) AS Uv
FROM UserLog;

可以改写成如下语句:

SELECT SUM(PV) AS Pv
    , COUNT(*) AS UV
FROM (
    SELECT COUNT(*) AS Pv
      , uid
    FROM UserLog
    GROUP BY uid
) a;

该解法是把 Distinct 改成了普通的 Count,这样的计算压力不会落到同一个 Reducer 上。而且这样改写后,既能支持前面提到的 Group By 优化,系统又能做 Combiner,性能会有较大的提升。

动态分区长尾

问题原因

  • 动态分区功能为了整理小文件,会在最后启一个 Reduce,对数据进行整理,所以如果使用动态分区写入数据时若有倾斜,就会发生长尾。

  • 一般情况下,滥用动态分区的功能也是产生这类长尾的一个常见原因。

解决办法

若写入的数据已经确定需要把数据写入某个具体分区,那可以在 Insert 的时候指定需要写入的分区,而不是使用动态分区。

通过 Combiner 解决长尾

对于 MapRedcuce 作业,使用 Combiner 是一种常见的长尾优化策略。在 WordCount 的示例中,已提到这种做法。通过 Combiner,减少 Mapper Shuffle 往 Reducer 的数据,可以大大减少网络传输的开销。对于 MaxCompute SQL,这种优化会由系统自动完成。

说明

Combiner 只是 Map 端的优化,需要保证是否执行 Combiner 的结果是一样的。以 WordCount 为例,传 2 个 (KEY,1) 和传 1 个 (KEY,2) 的结果是一样的。但是比如在做平均值时,便不能直接在 Combiner 里把 (KEY,1) 和 (KEY,2) 合并成 (KEY,1.5)。

通过系统优化解决长尾

针对长尾这种场景,除了前面提到的 Local Combiner,MaxCompute 系统本身还做了一些优化。比如在跑任务的时候,日志里突然打出如下的内容(+N backups 部分):

M1_Stg1_job0:0/521/521[100%] M2_Stg1_job0:0/1/1[100%] J9_1_2_Stg5_job0:0/523/523[100%] J3_1_2_Stg1_job0:0/523/523[100%] R6_3_9_Stg2_job0:1/1046/1047[100%] 
M1_Stg1_job0:0/521/521[100%] M2_Stg1_job0:0/1/1[100%] J9_1_2_Stg5_job0:0/523/523[100%] J3_1_2_Stg1_job0:0/523/523[100%] R6_3_9_Stg2_job0:1/1046/1047[100%] 
M1_Stg1_job0:0/521/521[100%] M2_Stg1_job0:0/1/1[100%] J9_1_2_Stg5_job0:0/523/523[100%] J3_1_2_Stg1_job0:0/523/523[100%] R6_3_9_Stg2_job0:1/1046/1047(+1 backups)[100%] 
M1_Stg1_job0:0/521/521[100%] M2_Stg1_job0:0/1/1[100%] J9_1_2_Stg5_job0:0/523/523[100%] J3_1_2_Stg1_job0:0/523/523[100%] R6_3_9_Stg2_job0:1/1046/1047(+1 backups)[100%]

可以看到 1047 个 Reducer,有 1046 个已经完成了,但是最后一个一直没完成。系统识别出这种情况后,自动启动了一个新的 Reducer,跑一样的数据,然后看两个哪个快,取快的数据归并到最后的结果集里。

通过业务优化解决长尾

虽然前面的优化策略有很多,但仍然不能解决所有问题。有时碰到的长尾问题,还需要从业务角度上去考虑是否有更好的解决方法,示例如下:

  • 实际数据可能包含非常多的噪音。比如:需要根据访问者的 ID 进行计算,看每个用户的访问记录的行为。需要先去掉爬虫的数据(现在的爬虫已越来越难识别),否则爬虫数据很容易长尾计算的长尾。类似的情况还有根据 xxid 进行关联的时候,需要考虑这个关联字段是否存在为空的情况。

  • 一些业务特殊情况。比如:ISV 的操作记录,在数据量、行为方式上都会和普通的个人会有很大的区别。那么可以考虑针对大客户,使用特殊的分析方式进行单独处理。

  • 数据分布不均匀的情况下,不要使用常量字段做 Distribute by 字段来实现全排序。