本文主要为您介绍聚合算子下推功能及使用场景。

背景

在数据扫描过程中,越来越多用户提出数据统计的需求。其中典型场景是对一个数据范围内的数据列值进行count或sum等操作。如果没有统计函数,用户需要先将数据从表格中读取到客户端,然后对数据进行二次统计,该方案有以下两个主要缺陷。
  • 拉取数据产生的带宽开销大。
  • 数据在用户端做聚合,计算并发度低,计算能力受限。

基于上述场景,表格存储提供Count/Sum/Max/Min/GroupBy的算子下推,在后端服务器提供聚合统计功能,直接将计算结果返回到客户端。聚合算子下推不仅可以极大化计算并行,还避免了原始数据返回到用户端所产生的额外带宽消耗。

接口说明

下推的算子,包含两个部分:
  • 聚合算子(Count/Sum/Max/Min)
  • 分组函数group-by

在API使用上,用户可以同时传入多个聚合函数算子,并且可以根据业务场景,给出一个分组聚合函数。

message ScanOperation {
    repeated BuiltInAggFunctionPB aggregation_function = 1;
    optional BuiltInGroupByFunctionPB groupby_function = 2;
}

目前表格存储提供行数统计、列数统计、列值求和、列值最值统计。

说明 在统计行数时,不能指定列名,但其他基于列值的统计,都需要指定列名。
  • 聚合函数的定义
    enum BuiltInAggFunctionType {
        COUNT_ROW_FUNC = 1;
        COUNT_COLUMN_FUNC = 2;
        MIN_FUNC = 3;
        MAX_FUNC = 4;
        SUM_FUNC = 5;
    }
    
    message BuiltInAggFunctionPB {
        required BuiltInAggFunctionType type = 1;
        optional bytes column_name = 2;
    }
  • 分组函数的定义
    enum BuiltInGroupByFunctionType {
        GROUP_BY_COLUMN_NAME = 1;
    }
    
    message BuiltInGroupByColumnNamePB {
        repeated bytes column_name = 1;
    }
    
    message BuiltInGroupByFunctionPB {
        required BuiltInGroupByFunctionType type = 1;
        optional bytes function = 2;
    }

    列的分组函数可以指定单列分组键,也能指定多列分组键。

  • 返回结果说明
    • 返回格式
      引入了聚合函数后,scan操作不会返回原始数据。此时计算结果仍然使用PlainBuffer格式返回,包含以下两种情况。
      • 无group-by的计算,返回一个结果行。schema为:主键为空,属性列值为各个AggFn的计算结果,顺序与请求的AggFn参数位置一致。
      • 带group-by子句,返回多个结果行。schema为:主键是group-by key,属性列值为各个AggFn的计算结果,顺序与请求的AggFn参数位置一致。
      说明
      • 如果scan对应的范围区域过大,需要跟scan操作类似,使用scan token进行多次分段返回。
      • 返回结果列中的时间戳字段为空。
    • 返回结果的列名命名规则
      • 统计行数:COUNT_ROW_FUNC#
      • 列值求和:SUM_FUNC#列名
      • 列数统计:COUNT_COLUMN_FUNC#列名
      • 列值最小值统计:MIN_FUNC#列名
      • 列值最小值统计:MAX_FUNC#列名

使用场景

假设用户使用表格table-1来存储不同时间点不同机器上的cpu使用状况,表格schema如下。
主键1 = host_name(string)| 主键2 = time(int64_t) | 属性列 = cpu_usage(int64_t)

以host_name和time作为主键,数据按时间分布。属性列主要记录这个时序数据对应的cpu使用率。

  • 场景一:统计某个时间段内host-1的最大的cpu使用率
    • 等效SQL示例
      select max(cpu_usage) from 'cpu_table' where time >= 100 and time < 200 and host_name = 'host-1';
    • 示例代码
          PrimaryKey inclusiveStartPK;
          PrimaryKey exclusiveEndPK;
          inclusiveStartPK.AddPrimaryKeyColumn("host_name", PrimaryKeyValue("host-1"));
          inclusiveStartPK.AddPrimaryKeyColumn("time", PrimaryKeyValue(100));
          exclusiveEndPK.AddPrimaryKeyColumn("host_name", PrimaryKeyValue("host-1"));
          exclusiveEndPK.AddPrimaryKeyColumn("time", PrimaryKeyValue(200));
      
          // 设置max func
          ScanOperation scanOperation;
          list<AggFunction> aggFunctions;
          {
              AggFunction aggFunction(MAX_FUNC, "cpu_usage");
              aggFunctions.push_back(aggFunction);
          }
          scanOperation.SetAggFunctions(aggFunctions);
      
          // 发起请求,获取结果
          while (inclusiveStartPK.GetPrimaryKeyColumnsSize() > 0) {
              RangeRowQueryCriteria queryCriteria("cpu_table");
              queryCriteria.SetInclusiveStartPrimaryKey(inclusiveStartPK);
              queryCriteria.SetExclusiveEndPrimaryKey(exclusiveEndPK);
              queryCriteria.SetDirection(RangeRowQueryCriteria::FORWARD);
              queryCriteria.SetMaxVersions(1);
              queryCriteria.SetScanOperation(scanOperation);
      
              GetRangeRequestPtr getRangeRequestPtr(new GetRangeRequest(queryCriteria));
              GetRangeResultPtr getRangeResultPtr = gClient->GetRange(getRangeRequestPtr);
              const list<RowPtr>& rowPtrs = getRangeResultPtr->GetRows();
              typeof(rowPtrs.begin()) iter = rowPtrs.begin();
              for (; iter != rowPtrs.end(); ++iter) {// 在客户端聚合结果
                  const Column& col = ((*iter)->GetColumns()).front();
                  colName = col.GetName();
                  int64_t v = col.GetValue().AsInteger();
                  maxCpuUsage = maxCpuUsage < v ? v: maxCpuUsage;
              }
      
              if (getRangeResultPtr->HasNextStartPrimaryKey()) {
                  inclusiveStartPK = getRangeResultPtr->GetNextStartPrimaryKey();
              } else {
                  inclusiveStartPK = PrimaryKey();
              }
          }
          std::cout << "AggName: " << colName << ", AggValue: " << maxCpuUsage << std::endl;
      							
    • 输出结果
      AggName: MAX_FUNC#cpu_usage, AggValue: 75
  • 场景二:统计某个时间段内host-1最小的cpu使用率
    • 等效SQL示例
      select min(cpu_usage) from 'cpu_table' where time >= 100 and time < 200 and host_name = 'host-1';
    • 示例代码
          PrimaryKey inclusiveStartPK;
          PrimaryKey exclusiveEndPK;
          inclusiveStartPK.AddPrimaryKeyColumn("host_name", PrimaryKeyValue("host-1"));
          inclusiveStartPK.AddPrimaryKeyColumn("time", PrimaryKeyValue(100));
          exclusiveEndPK.AddPrimaryKeyColumn("host_name", PrimaryKeyValue("host-1"));
          exclusiveEndPK.AddPrimaryKeyColumn("time", PrimaryKeyValue(200));
      
          // 设置min func
          ScanOperation scanOperation;
          list<AggFunction> aggFunctions;
          {
              AggFunction aggFunction(MIN_FUNC, "cpu_usage");
              aggFunctions.push_back(aggFunction);
          }
          scanOperation.SetAggFunctions(aggFunctions);
      
          // 发起请求,获取结果
          int64_t minCpuUsage = INT64_MAX;
          std::string colName;
          while (inclusiveStartPK.GetPrimaryKeyColumnsSize() > 0) {
              RangeRowQueryCriteria queryCriteria("cpu_table");
              queryCriteria.SetInclusiveStartPrimaryKey(inclusiveStartPK);
              queryCriteria.SetExclusiveEndPrimaryKey(exclusiveEndPK);
              queryCriteria.SetDirection(RangeRowQueryCriteria::FORWARD);
              queryCriteria.SetMaxVersions(1);
              queryCriteria.SetScanOperation(scanOperation);
      
              GetRangeRequestPtr getRangeRequestPtr(new GetRangeRequest(queryCriteria));
              GetRangeResultPtr getRangeResultPtr = gClient->GetRange(getRangeRequestPtr);
              const list<RowPtr>& rowPtrs = getRangeResultPtr->GetRows();
              typeof(rowPtrs.begin()) iter = rowPtrs.begin();
              for (; iter != rowPtrs.end(); ++iter) {// 在客户端聚合结果
                  const Column& col = ((*iter)->GetColumns()).front();
                  colName = col.GetName();
                  int64_t v = col.GetValue().AsInteger();
                  minCpuUsage = minCpuUsage > v ? v: minCpuUsage;
              }
      
              if (getRangeResultPtr->HasNextStartPrimaryKey()) {
                  inclusiveStartPK = getRangeResultPtr->GetNextStartPrimaryKey();
              } else {
                  inclusiveStartPK = PrimaryKey();
              }
          }
          std::cout << "AggName: " << colName << ", AggValue: " << minCpuUsage << std::endl;
    • 输出结果
      AggName: MIN_FUNC#cpu_usage, AggValue: 9
  • 场景三:统计某个时间段内host-1采集的数据记录数
    • 等效SQL示例
      select count(*) from 'cpu_table' where time >= 100 and time < 200 and host_name = 'host-1';
    • 示例代码
          PrimaryKey inclusiveStartPK;
          PrimaryKey exclusiveEndPK;
          inclusiveStartPK.AddPrimaryKeyColumn("host_name", PrimaryKeyValue("host-1"));
          inclusiveStartPK.AddPrimaryKeyColumn("time", PrimaryKeyValue(100));
          exclusiveEndPK.AddPrimaryKeyColumn("host_name", PrimaryKeyValue("host-1"));
          exclusiveEndPK.AddPrimaryKeyColumn("time", PrimaryKeyValue(200));
      
          // 设置count func
          ScanOperation scanOperation;
          list<AggFunction> aggFunctions;
          {
              AggFunction aggFunction(COUNT_ROW_FUNC);
              aggFunctions.push_back(aggFunction);
          }
          scanOperation.SetAggFunctions(aggFunctions);
      
          // 发起请求,获取结果
          int64_t cpuRowCount = 0;
          std::string colName;
          while (inclusiveStartPK.GetPrimaryKeyColumnsSize() > 0) {
              RangeRowQueryCriteria queryCriteria("cpu_table");
              queryCriteria.SetInclusiveStartPrimaryKey(inclusiveStartPK);
              queryCriteria.SetExclusiveEndPrimaryKey(exclusiveEndPK);
              queryCriteria.SetDirection(RangeRowQueryCriteria::FORWARD);
              queryCriteria.SetMaxVersions(1);
              queryCriteria.SetScanOperation(scanOperation);
      
              GetRangeRequestPtr getRangeRequestPtr(new GetRangeRequest(queryCriteria));
              GetRangeResultPtr getRangeResultPtr = gClient->GetRange(getRangeRequestPtr);
              const list<RowPtr>& rowPtrs = getRangeResultPtr->GetRows();
              typeof(rowPtrs.begin()) iter = rowPtrs.begin();
              for (; iter != rowPtrs.end(); ++iter) {// 在客户端聚合结果
                  const Column& col = ((*iter)->GetColumns()).front();
                  colName = col.GetName();
                  cpuRowCount += col.GetValue().AsInteger();
              }
      
              if (getRangeResultPtr->HasNextStartPrimaryKey()) {
                  inclusiveStartPK = getRangeResultPtr->GetNextStartPrimaryKey();
              } else {
                  inclusiveStartPK = PrimaryKey();
              }
          }
          std::cout << "AggName: " << colName << ", AggValue: " << cpuRowCount << std::endl;
    • 输出结果
      AggName: COUNT_ROW_FUNC#, AggValue: 7
  • 场景四:某个时间段内各个机器的平均cpu使用率
    • 等效SQL示例
      select host_name, avg(cpu_usage) from cpu_table 
          where time >= 100 and time < 200 and 
              host_name >= 'host-100' and host_name < 'host-200'
          group by host_name
    • 示例代码
      这个场景中,返回数据较多,统计结果需要分批次返回。因此计算平均值时,需要累计多轮sum(cpu_usage)count(cpu_usage)的结果,再进行求均值。整个计算分为两步:
      1. 获取各个机器在这个时间段内的cpu使用值总和,以及对应的cpu记录总数。
            PrimaryKey inclusiveStartPK;
            PrimaryKey exclusiveEndPK;
            inclusiveStartPK.AddPrimaryKeyColumn("host_name", PrimaryKeyValue("host-100"));
            inclusiveStartPK.AddPrimaryKeyColumn("time", PrimaryKeyValue(100));
            exclusiveEndPK.AddPrimaryKeyColumn("host_name", PrimaryKeyValue("host-200"));
            exclusiveEndPK.AddPrimaryKeyColumn("time", PrimaryKeyValue(200));
        
            // 设置sum和count func
            ScanOperation scanOperation;
            list<AggFunction> aggFunctions;
            {
                AggFunction aggFunction(SUM_FUNC, "cpu_usage");
                aggFunctions.push_back(aggFunction);
            }
            {
                AggFunction aggFunction(COUNT_COL_FUNC, "cpu_usage");
                aggFunctions.push_back(aggFunction);
            }
            scanOperation.SetAggFunctions(aggFunctions);
        
            // 设置分组函数,按host_name分组
            {
                list<string> groupByKeys;
                groupByKeys.push_back("host_name");
                GroupByFunction groupByFunction(GROUP_BY_COLUMN_NAME, groupByKeys);
                scanOperation.SetGroupByFunction(groupByFunction);
            }
        
            // 发起请求,获取结果
            std::map<
                std::string /* host name*/,
                std::pair<int64_t /* cpu sum */, int64_t /*cpu count*/>
            > result;
            while (inclusiveStartPK.GetPrimaryKeyColumnsSize() > 0) {
                RangeRowQueryCriteria queryCriteria("cpu_table");
                queryCriteria.SetInclusiveStartPrimaryKey(inclusiveStartPK);
                queryCriteria.SetExclusiveEndPrimaryKey(exclusiveEndPK);
                queryCriteria.SetDirection(RangeRowQueryCriteria::FORWARD);
                queryCriteria.SetMaxVersions(1);
                queryCriteria.SetScanOperation(scanOperation);
        
                GetRangeRequestPtr getRangeRequestPtr(new GetRangeRequest(queryCriteria));
                GetRangeResultPtr getRangeResultPtr = gClient->GetRange(getRangeRequestPtr);
                const list<RowPtr>& rowPtrs = getRangeResultPtr->GetRows();
                typeof(rowPtrs.begin()) iter = rowPtrs.begin();
                for (; iter != rowPtrs.end(); ++iter) {// 在客户端聚合结果
                    const std::string& hostName = ((*iter)->GetPrimaryKey()).GetColumn(0).GetValue().AsString();
                    const Column& sumCol = ((*iter)->GetColumns()).front();
                    const Column& countCol = ((*iter)->GetColumns()).back();
                    int64_t cpuSum = sumCol.GetValue().AsInteger();
                    int64_t cpuCount = countCol.GetValue().AsInteger();
                    if (result.find(hostName) != result.end()) {
                        result[hostName].first +=cpuSum;
                        result[hostName].second += cpuCount;
                    } else {
                        result[hostName] = std::make_pair<int64_t, int64_t>(cpuSum, cpuCount);
                    }
                }
        
                // 获取多轮结果
                if (getRangeResultPtr->HasNextStartPrimaryKey()) {
                    inclusiveStartPK = getRangeResultPtr->GetNextStartPrimaryKey();
                } else {
                    inclusiveStartPK = PrimaryKey();
                }
            }
      2. 对每一个机器host1,在客户端统计平均值:avg(host1) = sum(host1-cpu) /count (host1-cpu)
            typeof(result.begin()) rIt = result.begin();
            for (; rIt != result.end(); ++rIt) {
                double cpuAverage = double(rIt->second.first) / rIt->second.second;
                std::cout << "HostName: " << rIt->first << ", CpuAverage: " << cpuAverage << std::endl;
            }
    • 输出结果
      HostName: host-100, CpuAverage: 38.7667
      HostName: host-101, CpuAverage: 50.4333
      HostName: host-102, CpuAverage: 48.0667
      HostName: host-103, CpuAverage: 46.5333
      HostName: host-104, CpuAverage: 43.8333
      HostName: host-105, CpuAverage: 40.5333
      HostName: host-106, CpuAverage: 48.2667
      HostName: host-107, CpuAverage: 54.5667
      HostName: host-108, CpuAverage: 42.9667
      HostName: host-109, CpuAverage: 47.5667

使用限制

  • 聚合函数
    • COUNT_ROW_FUNC不能指定列名,否则抛异常。
    • COUNT_COLUMN/MIN/MAX/SUM_FUNC必须指定列名,否则抛异常。
    • COUNT行或列的返回值为Integer类型。
    • MIN/MAX/SUM仅支持Integer/Double类型。
    • COUNT/SUM的计算结果(int64/double)溢出时,抛异常。
    • COUNT行或列的初始值为0;MIN/MAX/SUM初始值为NULL;读取列不存在时,返回初始值。
    • COUNT_COLUMN/MIN/MAX/SUM运行过程中,发现列值类型出现不一致时,抛异常。
    • 在计费方面,聚合统计与Scan接口一致。
  • 分组函数
    • 分组列值仅能是Integer/String/Blob/Boolean类型,否则抛异常。
    • 分组列值类型出现不一致时,抛异常。