批计算

更新时间:
复制为 MD 格式

通过 Tablestore on Spark,在 E-MapReduce(EMR)集群中使用 Spark SQL 对表格存储数据表执行批量查询。支持索引选择、分区裁剪、投影列和过滤器下推、动态分片大小调整,并可利用全局二级索引或多元索引加速查询。

前提条件

开始前,确保已完成以下准备工作:

  • 已创建 E-MapReduce Hadoop 集群。具体操作,请参见EMR快速入门。创建集群时,按如下方式配置:

    • 业务场景自定义集群

    • 可选服务:Spark2、Hive、YARN、Hadoop-Common、HDFS。

    • 元数据内置 MySQL

    Master节点组下,开启挂载公网开关,其余配置项保持默认值。

    重要

    不开启挂载公网开关时,集群创建后只能通过内网访问。如需公网访问,请前往ECS控制台为 Master 节点挂载弹性公网 IP(EIP)。

  • 已完成EMR服务授权。具体操作,请参见阿里云账号角色授权

  • 已创建RAM用户,并授予RAM用户管理表格存储服务的权限(AliyunOTSFullAccess)。具体操作,请参见通过RAM PolicyRAM用户授权

    重要

    建议通过RAM用户的AccessKey完成授权,避免因主账号AccessKey泄露带来安全风险。

  • 已获取 AccessKey(AccessKey IDAccessKey Secret),用于签名认证。具体操作,请参见创建AccessKey

连接全局二级索引

将 Spark 连接到表格存储数据表和全局二级索引后,通过 Spark 外表执行查询时,系统会根据查询条件中的列条件自动选择最优索引。

步骤一:在表格存储侧创建数据表或全局二级索引

  1. 创建表格存储数据表。具体操作,请参见创建数据表

    本示例中,数据表名称为 tpch_lineitem_perf,主键列为 l_orderkey(LONG 类型)和 l_linenumber(LONG 类型),共 14 个属性列,包括 l_comment(STRING)、l_commitdate(STRING)、l_discount(DOUBLE)、l_extendedprice(DOUBLE)、l_linestatus(STRING)、l_partkey(LONG)、l_quantity(DOUBLE)、l_receiptdate(STRING)等,数据总量 384,016,850 条。

  2. (可选)在数据表上创建全局二级索引。具体操作,请参见使用二级索引

    说明

    查询条件涉及数据表非主键列时,建议创建全局二级索引加速查询。

    全局二级索引在指定列上建立索引,生成的索引表按指定索引列排序。数据表的每次写入,都会以异步方式自动同步到索引表。

    本示例中全局二级索引的主键包括 l_shipdate(STRING,分区键)、l_orderkey(INTEGER)和 l_linenumber(INTEGER);预定义列包括 l_returnflag(STRING)、l_quantity(DOUBLE)、l_linestatus(STRING)、l_extendedprice(DOUBLE)、l_discount(DOUBLE)和 l_tax(DOUBLE)。

步骤二:EMR集群侧创建Spark外表

  1. 登录 Master 节点。

  2. 执行如下命令,启动 Spark SQL CLI。

    spark-sql --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark2-emrsdk/*

    启动界面如下所示。

    23/07/19 14:42:23 INFO [main] metastore: Opened a connection to metastore, current connections: 1
    23/07/19 14:42:24 INFO [main] metastore: Connected to metastore.
    23/07/19 14:42:24 INFO [main] Hive: instanced a metaStoreClient with type: com.sun.proxy.$Proxy30
    Spark master: yarn, Application Id: application_1689747666517_0003
    23/07/19 14:42:24 INFO [main] SparkSQLCLIDriver: Spark master: yarn, Application Id: application_1689747666517_0003
    spark-sql>
  3. 创建 Spark 外表,连接全局二级索引。参数说明如下。

    参数

    说明

    endpoint

    表格存储实例访问地址,建议使用VPC地址。使用VPC时,请确保EMR集群和表格存储实例之间网络可连通。更多信息,请参见服务地址

    access.key.id

    阿里云账号或RAM用户的AccessKey IDAccessKey Secret。获取方式请参见创建AccessKey

    access.key.secret

    instance.name

    表格存储实例名称。

    table.name

    表格存储数据表名称。

    split.size.mbs

    每个数据切片(Split)的大小,单位为 MB。默认值:100。

    max.split.count

    数据表最大 Split 数,对应 Spark 查询并发度。默认值:1000。

    catalog

    数据表的 Schema 定义,JSON 格式。

示例:

DROP TABLE IF EXISTS tpch_lineitem;
CREATE TABLE tpch_lineitem
USING tablestore
OPTIONS(endpoint="http://vehicle-test.cn-hangzhou.vpc.tablestore.aliyuncs.com",
        access.key.id="",
        access.key.secret="",
        instance.name="vehicle-test",
        table.name="tpch_lineitem_perf",
        split.size.mbs=10,
        max.split.count=1000,
        catalog='{"columns":{"l_orderkey":{"type":"long"},"l_partkey":{"type":"long"},"l_suppkey":{"type":"long"},"l_linenumber":{"type":"long"},"l_quantity":{"type":"double"},"l_extendedprice":{"type":"double"},"l_discount":{"type":"double"},"l_tax":{"type":"double"},"l_returnflag":{"type":"string"},"l_linestatus":{"type":"string"},"l_shipdate":{"type":"string"},"l_commitdate":{"type":"string"},"l_receiptdate":{"type":"string"},"l_shipinstruct":{"type":"string"},"l_shipmode":{"type":"string"},"l_comment":{"type":"string"}}}'
);

步骤三:SQL查询

以下为不同查询场景的 SQL 示例,可根据实际业务组合使用。

  • 全表扫描

    SQL:SELECT COUNT(*) FROM tpch_lineitem;

    平均耗时 35.237 s(三次耗时:36.199 s、34.711 s、34.801 s)。

  • 主键查询

    SQL:SELECT COUNT(*) FROM tpch_lineitem WHERE l_orderkey = 1 AND l_linenumber = 1;

    表格存储服务端执行 GetRow 操作,平均耗时 0.585 ms。

  • 非主键查询(未开启全局二级索引)

    SQL:SELECT count(*) FROM tpch_lineitem WHERE l_shipdate = '1996-06-06';

    平均耗时 37.149 s(三次耗时:37.006 s、37.269 s、37.17 s)。

  • 非主键查询(已开启全局二级索引)

    SQL:SELECT count(*) FROM tpch_lineitem WHERE l_shipdate = '1996-06-06';

    在 l_shipdate 列上创建全局二级索引后,平均耗时降至 1.707 s(三次耗时:1.686 s、1.651 s、1.784 s),查询速度提升约 22 倍。

连接多元索引

将 Spark 连接到表格存储数据表和多元索引后,通过 Spark 外表执行查询时,系统会自动使用指定的多元索引。

步骤一:在表格存储侧创建数据表和多元索引

  1. 创建数据表。具体操作,请参见创建数据表

    本示例中,数据表名称为 geo_table,主键列为 pk1(String 类型),属性列包括 val_keyword1、val_keyword2、val_keyword3(均为 String 类型)、val_bool(Boolean 类型)、val_double(Double 类型)、val_long1、val_long2(均为 Long 类型)、val_text(String 类型)和 val_geo(String 类型),数据总量 208,912,382 条。数据样例请参见数据表详情。

  2. 在数据表上创建多元索引。具体操作,请参见创建及使用多元索引

    创建多元索引时,根据字段的数据类型选择对应的字段类型映射(Mapping)。

    说明

    地理位置字段需选择字段类型为地理位置(Geo-point),而非字符串类型。

    本示例中索引字段与类型映射如下:pk1(字符串)、val_double(浮点数)、val_keyword3(字符串)、val_bool(布尔值)、val_geo(地理位置)、val_long1(长整型)、val_long2(长整型)、val_text(分词字符串,分词类型为最大数量语义分词)、val_keyword2(字符串)、val_keyword1(字符串),数组均为否。预排序方式为主键预排序(升序)。

    多元索引创建后,会自动开始同步数据表中的数据。待多元索引进入增量状态时,表示索引构建完成。

    在索引列表中,确认 geo_table 表的多元索引 geo_table_index同步状态列显示为增量

步骤二:EMR集群侧创建Spark外表

  1. 登录 Master 节点。

  2. 创建 Spark 外表,连接多元索引。参数说明如下。

    参数

    说明

    endpoint

    表格存储实例访问地址,建议使用VPC地址。使用VPC时,请确保EMR集群和表格存储实例之间网络可连通。更多信息,请参见服务地址

    access.key.id

    阿里云账号或RAM用户的AccessKey IDAccessKey Secret。获取方式请参见创建AccessKey

    access.key.secret

    instance.name

    表格存储实例名称。

    table.name

    表格存储数据表名称。

    search.index.name

    多元索引名称。

    max.split.count

    多元索引 Parallel Scan 的查询并发度,对应 Spark 的 Split 数。

    push.down.range.long

    是否将 Long 类型列的范围谓词(>=、>、<、<=)下推。Boolean 类型,默认值为 true。更多信息,请参见批计算谓词下推配置

    • true:将 Long 类型列的范围比较谓词下推。

    • false:不下推 Long 类型列的范围比较谓词。

    push.down.range.string

    是否将 String 类型列的范围谓词(>=、>、<、<=)下推。Boolean 类型,默认值为 true。更多信息,请参见批计算谓词下推配置

    • true:将 String 类型列的范围比较谓词下推。

    • false:不下推 String 类型列的范围比较谓词。

    示例:

    DROP TABLE IF EXISTS geo_table;
    CREATE TABLE geo_table (
        pk1 STRING, val_keyword1 STRING, val_keyword2 STRING, val_keyword3 STRING, 
        val_bool BOOLEAN, val_double DOUBLE, val_long1 LONG, val_long2 LONG,
        val_text STRING, val_geo STRING COMMENT "geo stored in string format"
    )
    USING tablestore
    OPTIONS(endpoint="https://sparksearchtest.cn-hangzhou.vpc.tablestore.aliyuncs.com",
            access.key.id="",
            access.key.secret="",
            instance.name="sparksearchtest",
            table.name="geo_table",
            search.index.name="geo_table_index",
            max.split.count=64,
            push.down.range.long = false,
            push.down.range.string = false
    );

步骤三:SQL查询

以下为不同查询场景的 SQL 示例,可根据实际业务组合使用。

  • 使用多元索引全表查询

    SQL:SELECT COUNT(*) FROM geo_table;

    测试数据 208,912,382 条,配置 64 个 Parallel Scan 并发,实际耗时 165.208 s,平均QPS约 126.45 万。

    208912382
    Time taken: 165.208 seconds, Fetched 1 row(s)
    20/06/29 20:55:11 INFO [main] SparkSQLCLIDriver: Time taken: 165.208 seconds, Fetched 1 row(s)
  • 组合条件查询

    SQL:SELECT val_long1, val_long2, val_keyword1, val_double FROM geo_table WHERE (val_long1 > 17183057 AND val_long1 < 27183057) AND (val_long2 > 1000 AND val_long2 < 5000) LIMIT 100;

    Spark 将投影列和过滤器下推至多元索引,实际耗时 2.728 s,查询效率大幅提升。

    21423964        4017    aaa     2501.9901650365096
    21962236        2322    eio     2775.9021545044116
    Time taken: 2.894 seconds, Fetched 100 row(s)
    20/06/30 18:51:24 INFO [main] SparkSQLCLIDriver: Time taken: 2.894 second
  • 地理位置查询

    地理位置查询支持三种方式。示例中 val_geo 为地理位置字段名,坐标格式均为"纬度,经度"。

    • 地理距离查询——查询距中心点指定半径范围内的数据。

      语法:val_geo = '{"centerPoint":"<纬度,经度>", "distanceInMeter": <距离>}'

      SQL:SELECT COUNT(*) FROM geo_table WHERE val_geo = '{"centerPoint":"6.530045901643962,9.05358919674954", "distanceInMeter": 3000.0}';

    • 地理长方形查询——查询矩形区域内的数据,由左上角和右下角坐标定义。

      语法:val_geo = '{"topLeft":"<纬度,经度>", "bottomRight": "<纬度,经度>"}'

      SQL:SELECT COUNT(*) FROM geo_table WHERE val_geo = '{"topLeft":"6.257664116603074,9.1595116589601", "bottomRight": "6.153593333442616,9.25968497923747"}';

    • 地理多边形范围查询——查询多边形区域内的数据,由一组坐标点定义。

      语法:val_geo = '{"points":["<坐标1>", "<坐标2>", ..., "<坐标n>"]}'

      SQL:SELECT COUNT(*) FROM geo_table WHERE val_geo = '{"points":["6.530045901643962,9.05358919674954", "6.257664116603074,9.1595116589601", "6.160393397574926,9.256517839929597", "6.16043846779313,9.257192872563525"]}';