本文为您介绍常见的SQL问题以及优化示例。

并行度优化

并行度是衡量并行计算程度的一个指标,从执行计划上来看,例如ID为M1的任务,使用1000个Instance来执行,我们就说M1的并行度是1000。合理地设置并调整任务并行度,可以使任务执行效率更高。

并行度优化场景如下。

强制一个Instance执行

某些操作强制一个Instance执行任务,例如:

  • 做聚合的时候,没有进行group by或者group by一个常量。

  • 窗口函数的over语句里指定partition by一个常量。

  • SQL中指定distribute bycluster by一个常量。

解决方案:针对此情形,建议您检查这些操作是否必要,能否去掉,尽量取消类似操作,避免强制一个Instance执行任务。

Instance数量过多或过少

重要

调整并行度不一定是越多越好,Instance数量过多会从如下两个方面影响执行速度:

  • Instance越多,等待资源的时间越长,排队次数也更多。

  • 每个Instance初始化需要时间,并行度越高,初始化占用的总时间就越长,有效的执行时间占比就越低。

以下情形会导致使用很多Instance:

  • 需要读取很多小分区的数据,例如一个数据查询SQL语句读取10000个分区,那么系统会强制使用10000个Instance。

    解决方案:您需要设计SQL,减少分区的数量,可以从进行分区裁剪、筛除不需要读的分区、将大作业拆分成小作业方面进行考虑。

  • 每次读取256 MB数据太少,导致Instance的执行时间太短,而由于输入数据很大,反而导致了并行度过大,使Instance大多数时间在排队等资源。

    解决方案:使用如下命令调大单个并发处理的数据大小,从而减少Instance数量。

    SET odps.stage.mapper.split.size=<256>;
    SET odps.stage.reducer.num=<并发数>;

Instance数量设置方法

  • 读表的Task

    • 方法1:通过设置参数调整并行度。

      -- 设定一个map的最大数据输入量,单位MB
      -- 默认256,区间[1,Integer.MAX_VALUE]
      SET odps.sql.mapper.split.size=<value>;
    • 方法2:MaxCompute提供split size hint方式,可以针对单个读表操作来调整并行度。

      --设置split size大小为1MB,此hint会在读src表时,按照1MB的大小来切分task
      SELECT a.key FROM src a /*+split_size(1)*/ JOIN src2 b ON a.key=b.key;
    • 方法3:在表级别按照大小、行数或指定并行度进行切分。

    由于方法1中odps.sql.mapper.split.size只支持Mapper Stage的整体设置,且最低为1 MB,必要时,您可以根据表的维度调整并行度,尤其是在表中每行数据的size较小,而后续计算负担较重的情况下,可以减少并行处理的行数,从而提高任务的并行度。

    调整并行度的方式如下:

    • 设置表级别单个并行处理的分片数据大小。

      SET odps.sql.split.size = {"table1": 1024, "table2": 512};
    • 设置表级别单个并行处理的行数。

      SET odps.sql.split.row.count = {"table1": 100, "table2": 500};
    • 设置表级别的并行度。

      SET odps.sql.split.dop = {"table1": 1, "table2": 5};
    说明

    odps.sql.split.row.countodps.sql.split.dop只能用于内部表、非事务表和非聚簇表。

  • 非读表的Task

    主要有如下三种方式调整并行度:

    • 方法1:调整odps.stage.reducer.num值。使用如下命令强制设置Reducer并行度,该设置将影响所有相关的Task。

      -- 设定Reduce Task的instance数量
      -- 可调整区间为[1,99999]
      SET odps.stage.reducer.num=<value>;
    • 方法2:调整odps.stage.joiner.num值。使用如下命令强制设置Reducer并行度,会影响所有相关的Task。

      -- 设定Joiner Task的instance数量
      -- 可调整区间为[1,99999]
      SET odps.stage.joiner.num=<value>;
    • 方法3:调整odps.sql.mapper.split.size值。

      非读表Task的并行度会受到输入Task的并行度影响,通过调整读表Task的并行度间接调整非读表Task的并行度。

窗口函数优化

如果SQL语句中使用了窗口函数,通常每个窗口函数会形成一个Reduce作业。如果窗口函数较多,会消耗过多的资源。您可以对符合下述条件的窗口函数进行优化:

  • 窗口函数在OVER关键字后面要完全相同,要有相同的分组和排序条件。

  • 多个窗口函数在同一层SQL中执行。

符合上述2个条件的窗口函数会合并为一个Reduce执行。SQL示例如下所示。

SELECT
RANK() OVER (PARTITION BY A ORDER BY B desc) AS RANK,
ROW_NUMBER() OVER (PARTITION BY A ORDER BY B desc) AS row_num
FROM MyTable;

子查询优化

子查询如下所示。

SELECT * FROM table_a a WHERE a.col1 IN (SELECT col1 FROM table_b b WHERE xxx);

当此语句中的table_b子查询返回的col1的个数超过9999个时,系统会报错为records returned from subquery exceeded limit of 9999。此时您可以使用Join语句来代替,如下所示。

SELECT a.* FROM table_a a JOIN (SELECT DISTINCT col1 FROM table_b b WHERE xxx) c ON (a.col1 = c.col1);
说明
  • 如果没有使用DISTINCT关键字,而子查询表c返回的结果中有相同的col1的值,可能会导致a表的结果数变多。

  • DISTINCT关键字会导致查询在同一个Worker中执行。如果子查询数据量较大,会导致查询比较慢。

  • 如果业务上已经确保子查询中col1列值无重复,您可以删除DISTINCT关键字,以提高性能。

Join语句优化

当两个表进行Join操作时,建议在如下位置使用WHERE子句:

  • 主表的分区限制条件可以写在WHERE子句中(建议先用子查询过滤)。

  • 主表的WHERE子句建议写在SQL语句最后。

  • 从表分区限制条件不要写在WHERE子句中,建议写在ON条件或者子查询中。

示例如下。

SELECT * FROM A JOIN (SELECT * FROM B WHERE dt=20150301)B ON B.id=A.id WHERE A.dt=20150301;
SELECT * FROM A JOIN B ON B.id=A.id WHERE B.dt=20150301; --不建议使用。此语句会先执行Join操作后进行分区裁剪,导致数据量变大,性能下降。
SELECT * FROM (SELECT * FROM A WHERE dt=20150301)A JOIN (SELECT * FROM B WHERE dt=20150301)B ON B.id=A.id;

聚合函数优化

使用wm_concat函数替代collect_list函数,实现聚合函数的优化,使用示例如下。

-- collect_list实现
SELECT concat_ws(',', sort_array(collect_list(key))) FROM src;
-- wm_concat实现更优
SELECT wm_concat(',', key) WITHIN GROUP (ORDER BY key) FROM src;


-- collect_list实现
SELECT array_join(collect_list(key), ',') FROM src;
-- wm_concat实现更优
SELECT wm_concat(',', key) FROM src;