本文介绍如何使用计算下推功能,该功能可以有效解决Tablestore计算慢的问题。

背景信息

在Presto里面,通常查询是从对应数据源拉取数据,并放到Presto端计算。当需要拉取的数据量比较大时,会严重影响计算效率。计算下推的原理就是把一部分计算放到数据源端进行,数据源端计算完再把结果返回给Presto,在一些场景下(比如聚合)可以有效减少数据源与Presto之间的数据传输,从而提升计算效率。

支持的算子

算子名称 具体类型
FILTER
  • OR/AND条件下推
  • LIKE/NOT LIKE下推
  • IS NULL/NOT NULL下推
  • BETWEEN XX AND XX下推
  • =, <, <=, >, >=, <>逻辑表达式下推
  • IN/NOT IN下推
LIMIT
  • LIMIT下推
  • ORDER BY XX LIMIT N下推
AGGREGATE
  • MIN(COL)下推
  • MAX(COL)下推
  • SUM(COL)下推
  • COUNT(*)\COUNT(COL)\COUNT(DISTINCT COL)下推
  • AVG(COL)下推
  • 带GROUP BY的聚合

计算下推限制

计算下推功能有如下限制。

  • 必须是简单的表达式,支持下推的例如:a>10、sum(a)、avg(a);不支持下推的例如:a+b>10、abs(a)>10、sum(a+b)。
  • 所涉及的列都必须在Tablestore的同一个索引里面,否则会导致只能下推一部分算子或者都不下推。
  • LIKE的字符必须小于等于20个字符,比如a like '%123456789123456789%',包括%在内需要小于等于20个字符。
  • 目前GROUP BY后面的列最多只能支持4层,比如group by a1,a2,a3,a4 ,如果超过4层,将不能下推。
  • GROUP BY下推默认是关闭的,下推到Tablestore之后,Tablestore只能返回2000条数据,如果聚合的结果不止这些会导致结果不对。开启计算下推详情,请参见开启计算下推

开启计算下推

Tablestore数据源计算下推默认是关闭的,需要通过ots-index-first=auto,ots-pushdown-enabled=true开关开启,示例如下:

/*+ots-index-first=auto,ots-pushdown-enabled=true*/ select * from lineitem where l_orderkey > 100;

/*+ots-index-first=auto,ots-pushdown-enabled=true*/ select * from lineitem where l_orderkey > 100 limit 100;

/*+ots-index-first=auto,ots-pushdown-enabled=true*/ select * from lineitem where l_orderkey > 100 order by l_commitdate desc limit 10;

/*+ots-index-first=auto,ots-pushdown-enabled=true*/ select * from lineitem where l_orderkey > 100 order by l_commitdate desc limit 10;

/*+ots-index-first=auto,ots-pushdown-enabled=true*/ select count(l_orderkey),sum(l_partkey), avg(l_linenumber) from lineitem;

GROUP BY下推

要实现GROUP BY下推,需要在开启计算下推的基础上加上ots-groupby-pushdown-enabled=true hint,示例如下:

/*+ots-index-first=auto,ots-pushdown-enabled=true,ots-groupby-pushdown-enabled=true*/ 
select
l_returnflag,
    l_linestatus,
    sum(l_quantity) as sum_qty,
    sum(l_extendedprice) as sum_base_price,
    avg(l_quantity) as avg_qty,
    avg(l_extendedprice) as avg_price,
    avg(l_discount) as avg_disc,
    count(*) as count_order
from
    lineitem
where
    l_shipdate <= '1998-12-01'
group by
    l_returnflag,
    l_linestatus
order by
    l_returnflag,
    l_linestatus;

查看计算下推

您可以使用EXPLAIN查看SQL是否下推到Tablestore上,SQL示例如下:

explain select
l_returnflag,
    l_linestatus,
    sum(l_quantity) as sum_qty,
    sum(l_extendedprice) as sum_base_price,
    avg(l_quantity) as avg_qty,
    avg(l_extendedprice) as avg_price,
    avg(l_discount) as avg_disc,
    count(*) as count_order
from
    lineitem
where
    l_shipdate <= '1998-12-01'
group by
    l_returnflag,
    l_linestatus;

返回结果如下:

- Output[l_returnflag, l_linestatus, sum_qty, sum_base_price, avg_qty, avg_price, avg_disc, count_order] => [l_returnflag:varchar, l_linestatus:varchar, sum:double, sum_7:double, avg:double, avg_8:double, avg_9:double, count:bigint]
        sum_qty := sum
        sum_base_price := sum_7
        avg_qty := avg
        avg_price := avg_8
        avg_disc := avg_9
        count_order := count
    - RemoteStreamingMerge[l_returnflag ASC_NULLS_FIRST, l_linestatus ASC_NULLS_FIRST] => [l_returnflag:varchar, l_linestatus:varchar, sum:double, sum_7:double, avg:double, avg_8:double, avg_9:double, count:bigint]
        - LocalMerge[l_returnflag ASC_NULLS_FIRST, l_linestatus ASC_NULLS_FIRST] => [l_returnflag:varchar, l_linestatus:varchar, sum:double, sum_7:double, avg:double, avg_8:double, avg_9:double, count:bigint]
            - PartialSort[l_returnflag ASC_NULLS_FIRST, l_linestatus ASC_NULLS_FIRST] => [l_returnflag:varchar, l_linestatus:varchar, sum:double, sum_7:double, avg:double, avg_8:double, avg_9:double, count:bigint]
                - RemoteStreamingExchange[REPARTITION] => [l_returnflag:varchar, l_linestatus:varchar, sum:double, sum_7:double, avg:double, avg_8:double, avg_9:double, count:bigint]
                        Estimates: {rows: ? (?), cpu: ?, memory: 0.00, network: ?}
                    - TableScan[TableHandle {connectorId='ots', connectorHandle='table=ots20201208_mzl.lineitem', layout='Optional[table=ots20201208_mzl.lineitem, OtsPushDownInfo = Optional[OtsQueryGeneratorContext{filter=L_SHIPDATE <= 1998-12-01, candidateIndexMap=Optional[{lineitem_index=[L_SHIPMODE, L_TAX, L_COMMITDATE, L_EXTENDEDPRICE, L_ORDERKEY, L_LINESTATUS, L_RETURNFLAG, L_RECEIPTDATE, L_PARTKEY, L_LINENUMBER, L_SHIPDATE, L_SHIPINSTRUCT, L_SUPPKEY, L_DISCOUNT, L_COMMENT, L_QUANTITY]}], aggregationLists=Optional[[SumAggregation(aggregationType=AGG_SUM, aggName=sum(l_quantity), fieldName=L_QUANTITY), SumAggregation(aggregationType=AGG_SUM, aggName=sum(l_extendedprice), fieldName=L_EXTENDEDPRICE), AvgAggregation(aggregationType=AGG_AVG, aggName=avg(l_quantity), fieldName=L_QUANTITY), AvgAggregation(aggregationType=AGG_AVG, aggName=avg(l_extendedprice), fieldName=L_EXTENDEDPRICE), AvgAggregation(aggregationType=AGG_AVG, aggName=avg(l_discount), fieldName=L_DISCOUNT), CountAggregation(aggregationType=AGG_COUNT, aggName=count(*), fieldName=_ID)]], groupByColumns={l_returnflag=l_returnflag, l_linestatus=l_linestatus}, orderBy={}, limit=OptionalInt.empty}]]'}] => [l_returnflag:varchar, l_linestatus:varchar, sum:double, sum_7:double, avg:double, avg_8:double, avg_9:double, count:bigint]
                            Estimates: {rows: ? (?), cpu: ?, memory: 0.00, network: 0.00}
                            LAYOUT: table=ots20201208_mzl.lineitem, OtsPushDownInfo = Optional[OtsQueryGeneratorContext{filter=L_SHIPDATE <= 1998-12-01, candidateIndexMap=Optional[{lineitem_index=[L_SHIPMODE, L_TAX, L_COMMITDATE, L_EXTENDEDPRICE, L_ORDERKEY, L_LINESTATUS, L_RETURNFLAG, L_RECEIPTDATE, L_PARTKEY, L_LINENUMBER, L_SHIPDATE, L_SHIPINSTRUCT, L_SUPPKEY, L_DISCOUNT, L_COMMENT, L_QUANTITY]}], aggregationLists=Optional[[SumAggregation(aggregationType=AGG_SUM, aggName=sum(l_quantity), fieldName=L_QUANTITY), SumAggregation(aggregationType=AGG_SUM, aggName=sum(l_extendedprice), fieldName=L_EXTENDEDPRICE), AvgAggregation(aggregationType=AGG_AVG, aggName=avg(l_quantity), fieldName=L_QUANTITY), AvgAggregation(aggregationType=AGG_AVG, aggName=avg(l_extendedprice), fieldName=L_EXTENDEDPRICE), AvgAggregation(aggregationType=AGG_AVG, aggName=avg(l_discount), fieldName=L_DISCOUNT), CountAggregation(aggregationType=AGG_COUNT, aggName=count(*), fieldName=_ID)]], groupByColumns={l_returnflag=l_returnflag, l_linestatus=l_linestatus}, orderBy={}, limit=OptionalInt.empty}]
                            avg_8 := OtsColumnHandle{columnName=avg(l_extendedprice), mappedName=L_EXTENDEDPRICE, primaryKey=false, columnType=double}
                            avg := OtsColumnHandle{columnName=avg(l_quantity), mappedName=L_QUANTITY, primaryKey=false, columnType=double}
                            sum := OtsColumnHandle{columnName=sum(l_quantity), mappedName=L_QUANTITY, primaryKey=false, columnType=double}
                            avg_9 := OtsColumnHandle{columnName=avg(l_discount), mappedName=L_DISCOUNT, primaryKey=false, columnType=double}
                            count := OtsColumnHandle{columnName=count(*), mappedName=_ID, primaryKey=false, columnType=bigint}
                            l_returnflag := OtsColumnHandle{columnName=l_returnflag, mappedName=L_RETURNFLAG, primaryKey=false, columnType=varchar}
                            sum_7 := OtsColumnHandle{columnName=sum(l_extendedprice), mappedName=L_EXTENDEDPRICE, primaryKey=false, columnType=double}
                            l_linestatus := OtsColumnHandle{columnName=l_linestatus, mappedName=L_LINESTATUS, primaryKey=false, columnType=varchar}

上述输出的执行计划已经看不到Aggregate Node了,而且在TableScan的OtsQueryGeneratorContex中包含下推的相关信息,表明查询已经下推到Tablestore了。