通过在E-MapReduce集群中使用Spark2访问表格存储。对于批计算,Tablestore on Spark提供索引选择、分区裁剪、Projection列和Filter下推、动态指定分区大小等功能,利用表格存储的全局二级索引或者多元索引可以加速查询。

前提条件

  • 已创建E-MapReduce Hadoop集群。具体操作,请参见EMR快速入门

    创建集群时请参照选择以下配置项,其余配置项按实际情况选择即可。

    • 业务场景自定义集群

    • 可选服务:Spark2、Hive、YARN、Hadoop-Common、HDFS。

    • 元数据内置MySQL

    同时确保打开Master节点组下的挂载公网开关,其余配置项使用默认值即可。

    重要

    如果不开启挂载公网开关,则创建后只能通过内网访问。创建后如果您需要公网访问,请前往ECS挂载EIP。

  • 已使用阿里云账号对EMR服务授权。具体操作,请参见阿里云账号角色授权

  • 已创建RAM用户,并授予RAM用户管理表格存储服务的权限(AliyunOTSFullAccess)。具体操作,请参见通过RAM PolicyRAM用户授权

    重要

    由于配置时需要填写访问密钥AccessKey(AK)信息来执行授权,为避免阿里云账号泄露AccessKey带来的安全风险,建议您通过RAM用户来完成授权和AccessKey的创建。

  • 已获取AccessKey(包括AccessKey IDAccessKey Secret),用于进行签名认证。具体操作,请参见获取AccessKey

连接全局二级索引

Spark连接到表格存储数据表和全局二级索引后,通过Spark外表查询数据时,系统会根据查询条件中设置的列条件自动选择索引表进行查询。

步骤一:在表格存储侧创建数据表或全局二级索引

  1. 创建表格存储的数据表。具体操作,请参见概述

    本示例中数据表名称为tpch_lineitem_perf,主键列为l_orderkey(LONG类型)、l_linenumber(LONG类型),属性列分别为l_comment(STRING类型)、l_commitdate(STRING类型)、l_discount(DOUBLE类型)、l_extendedprice(DOUBLE类型)、l_linestatus(STRING类型)、l_partkey(LONG类型)、l_quantity(DOUBLE类型)、l_receiptdate(STRING类型)等14列,数据条数为384016850,数据样例如下图所示。

    fig_00a

  2. (可选)在数据表上创建全局二级索引。具体操作,请参见使用二级索引

    说明

    当查询条件中需要使用数据表的非主键列,建议创建全局二级索引加速查询。

    全局二级索引支持在指定列上建立索引,生成的索引表中数据按指定的索引列进行排序,数据表的每一个数据写入都将自动以异步方式同步到索引表。

    fig_00b

步骤二:EMR集群侧创建Spark外表

  1. 登录Master节点。

  2. 执行如下命令启动Spark SQL CLI,用于Spark外表创建和后续的SQL操作。

    spark-sql --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark2-emrsdk/*

    启动界面如下所示。

    image.png

  3. 创建Spark外表同时连接全局二级索引。

    • 参数

      参数

      说明

      endpoint

      表格存储实例访问地址,建议使用VPC地址。使用VPC时,请确保EMR集群和表格存储实例之间的网络可连通。更多信息,请参见服务地址

      access.key.id

      阿里云账号或者RAM用户的AccessKey IDAccessKey Secret。获取方式请参见创建AccessKey

      access.key.secret

      instance.name

      表格存储实例名称。

      table.name

      表格存储的数据表名称。

      split.size.mbs

      每个Split的大小,默认值为100 MB。

      max.split.count

      数据表计算出的最大Split数,并发数和SparkSplit个数对应,默认值为1000。

      catalog

      数据表的Schema定义。

    • 示例

      DROP TABLE IF EXISTS tpch_lineitem;
      CREATE TABLE tpch_lineitem
      USING tablestore
      OPTIONS(endpoint="http://vehicle-test.cn-hangzhou.vpc.tablestore.aliyuncs.com",
              access.key.id="",
              access.key.secret="",
              instance.name="vehicle-test",
              table.name="tpch_lineitem_perf",
              split.size.mbs=10,
              max.split.count=1000,
              catalog='{"columns":{"l_orderkey":{"type":"long"},"l_partkey":{"type":"long"},"l_suppkey":{"type":"long"},"l_linenumber":{"type":"long"},"l_quantity":{"type":"double"},"l_extendedprice":{"type":"double"},"l_discount":{"type":"double"},"l_tax":{"type":"double"},"l_returnflag":{"type":"string"},"l_linestatus":{"type":"string"},"l_shipdate":{"type":"string"},"l_commitdate":{"type":"string"},"l_receiptdate":{"type":"string"},"l_shipinstruct":{"type":"string"},"l_shipmode":{"type":"string"},"l_comment":{"type":"string"}}}'
      );

步骤三:SQL查询

如下是不同查询需求的SQL查询样例,请根据实际业务组合使用SQL查询。

  • 全表查询

    • SQL语句:SELECT COUNT(*) FROM tpch_lineitem;

    • SQL总耗时:36.199s、34.711s、34.801s,平均耗时35.237s。

  • 主键查询

    • SQL语句SELECT COUNT(*) FROM tpch_lineitem WHERE l_orderkey = 1 AND l_linenumber = 1;

    • 表格存储服务端:GetRow操作,平均耗时为0.585 ms。

  • 非主键查询,未开启全局二级索引

    • SQL语句:SELECT count(*) FROM tpch_lineitem WHERE l_shipdate = '1996-06-06';

    • SQL总耗时:37.006s、37.269s、37.17s,平均耗时37.149s。

  • 非主键查询,开启全局二级索引

    • SQL语句:SELECT count(*) FROM tpch_lineitem WHERE l_shipdate = '1996-06-06';

    • SQL总耗时(开启l_shipdate列的全局二级索引):1.686s、1.651s、1.784s,平均耗时1.707s。

连接多元索引

Spark连接到表格存储数据表和多元索引后,通过Spark外表查询数据时,系统会自动使用设置的多元索引进行查询。

步骤一:在表格存储侧创建数据表和多元索引

  1. 创建数据表。具体操作,请参见概述

    本示例中数据表名称为geo_table,主键列为pk1(String类型),属性列分别为val_keyword1(String类型)、val_keyword2(String类型)、val_keyword3(String类型)、val_bool(Boolean类型)、val_double(Double类型)、val_long1(Long类型)、val_long2(Long类型)、val_text(String类型)和val_geo(String类型),数据条数为208912382,数据样例如下图所示。

    fig_002

  2. 在数据表上创建多元索引。具体操作,请参见创建及使用多元索引

    创建多元索引时,根据字段类型选择对应的多元索引Mapping。

    说明

    创建多元索引时,地理位置字段需选择字段类型为地理位置而非字符串类型。

    fig_001

    创建多元索引后,多元索引会自动开始同步数据表中的数据,待多元索引进入增量状态时,表示多元索引完成构建。

    fig_searchindex_batch_list

步骤二:EMR集群侧创建Spark外表

  1. 登录Master节点。

  2. 创建Spark外表同时连接多元索引。

    • 参数

      参数

      说明

      endpoint

      表格存储实例访问地址,建议使用VPC地址。使用VPC时,请确保EMR集群和表格存储实例之间的网络可连通。更多信息,请参见服务地址

      access.key.id

      阿里云账号或者RAM用户的AccessKey IDAccessKey Secret。获取方式请参见创建AccessKey

      access.key.secret

      instance.name

      表格存储实例名称。

      table.name

      表格存储的数据表名称。

      search.index.name

      多元索引名称。

      max.split.count

      多元索引Parallel Scan的查询并发度,并发数和SparkSplit数对应。

      push.down.range.long

      Long类型的列做大小(>=、>、<、<=)比较的谓词是否下推。Boolean型,默认值为true。更多信息,请参见批计算谓词下推配置

      • 设置为true时,表示与Long类型的列做大小比较的谓词下推。

      • 设置为false时,表示与Long类型的列做大小比较的谓词不下推。

      push.down.range.string

      String类型的列做大小(>=、>、<、<=)比较的谓词是否下推。Boolean型,默认值为true。更多信息,请参见批计算谓词下推配置

      • 设置为true时,表示与String类型的列做大小比较的谓词下推。

      • 设置为false时,表示与String类型的列做大小比较的谓词不下推。

    • 示例

      DROP TABLE IF EXISTS geo_table;
      CREATE TABLE geo_table (
          pk1 STRING, val_keyword1 STRING, val_keyword2 STRING, val_keyword3 STRING, 
          val_bool BOOLEAN, val_double DOUBLE, val_long1 LONG, val_long2 LONG,
          val_text STRING, val_geo STRING COMMENT "geo stored in string format"
      )
      USING tablestore
      OPTIONS(endpoint="https://sparksearchtest.cn-hangzhou.vpc.tablestore.aliyuncs.com",
              access.key.id="",
              access.key.secret="",
              instance.name="sparksearchtest",
              table.name="geo_table",
              search.index.name="geo_table_index",
              max.split.count=64,
              push.down.range.long = false,
              push.down.range.string = false
      );

步骤三:SQL查询

如下是不同查询需求的SQL查询样例,请根据实际业务组合使用SQL查询。

  • 使用多元索引全表查询

    • SQL语句:SELECT COUNT(*) FROM geo_table;

    • SQL耗时:测试数据208912382条,配置64Parallel Scan并发,实际耗时165.208s,平均QPS126.45万。

      208912382
      Time taken: 165.208 seconds, Fetched 1 row(s)
      20/06/29 20:55:11 INFO [main] SparkSQLCLIDriver: Time taken: 165.208 seconds, Fetched 1 row(s)
  • 组合条件查询

    • SQL语句:SELECT val_long1, val_long2, val_keyword1, val_double FROM geo_table WHERE (val_long1 > 17183057 AND val_long1 < 27183057) AND (val_long2 > 1000 AND val_long2 < 5000) LIMIT 100;

    • SQL耗时:Spark会将Projection列和Filter下推到多元索引,实际耗时2.728s,极大加快查询效率。

      21423964        4017    aaa     2501.9901650365096
      21962236        2322    eio     2775.9021545044116
      Time taken: 2.894 seconds, Fetched 100 row(s)
      20/06/30 18:51:24 INFO [main] SparkSQLCLIDriver: Time taken: 2.894 second
  • 地理位置查询

    地理位置查询包括地理距离查询、地理长方形查询和地理多边形范围查询三种地理位置查询方式。示例中val_geo为地理位置字段名,地理坐标的格式都为"纬度,经度"。

    • 地理距离查询

      语法为val_geo = '{"centerPoint":"中心点坐标", "distanceInMeter": 距离中心点的距离}'。

      SQL语句:SELECT COUNT(*) FROM geo_table WHERE val_geo = '{"centerPoint":"6.530045901643962,9.05358919674954", "distanceInMeter": 3000.0}';

    • 地理长方形查询

      语法为val_geo = '{"topLeft":"矩形框的左上角的坐标", "bottomRight": "矩形框的右下角的坐标"}'。

      SQL语句:SELECT COUNT(*) FROM geo_table WHERE val_geo = '{"topLeft":"6.257664116603074,9.1595116589601", "bottomRight": "6.153593333442616,9.25968497923747"}';

    • 地理多边形范围查询

      语法为val_geo = '{"points":["坐标1", "坐标2", .... "坐标n-1", "坐标n"]}'。

      SQL语句:SELECT COUNT(*) FROM geo_table WHERE val_geo = '{"points":["6.530045901643962,9.05358919674954", "6.257664116603074,9.1595116589601", "6.160393397574926,9.256517839929597", "6.16043846779313,9.257192872563525"]}';