通过 Tablestore on Spark,在 E-MapReduce(EMR)集群中使用 Spark SQL 对表格存储数据表执行批量查询。支持索引选择、分区裁剪、投影列和过滤器下推、动态分片大小调整,并可利用全局二级索引或多元索引加速查询。
前提条件
开始前,确保已完成以下准备工作:
-
已创建 E-MapReduce Hadoop 集群。具体操作,请参见EMR快速入门。创建集群时,按如下方式配置:
-
业务场景:自定义集群。
-
可选服务:Spark2、Hive、YARN、Hadoop-Common、HDFS。
-
元数据:内置 MySQL。
在Master节点组下,开启挂载公网开关,其余配置项保持默认值。
重要不开启挂载公网开关时,集群创建后只能通过内网访问。如需公网访问,请前往ECS控制台为 Master 节点挂载弹性公网 IP(EIP)。
-
-
已完成EMR服务授权。具体操作,请参见阿里云账号角色授权。
-
已创建RAM用户,并授予RAM用户管理表格存储服务的权限(AliyunOTSFullAccess)。具体操作,请参见通过RAM Policy为RAM用户授权。
重要建议通过RAM用户的AccessKey完成授权,避免因主账号AccessKey泄露带来安全风险。
-
已获取 AccessKey(AccessKey ID和AccessKey Secret),用于签名认证。具体操作,请参见创建AccessKey。
连接全局二级索引
将 Spark 连接到表格存储数据表和全局二级索引后,通过 Spark 外表执行查询时,系统会根据查询条件中的列条件自动选择最优索引。
步骤一:在表格存储侧创建数据表或全局二级索引
-
创建表格存储数据表。具体操作,请参见创建数据表。
本示例中,数据表名称为 tpch_lineitem_perf,主键列为 l_orderkey(LONG 类型)和 l_linenumber(LONG 类型),共 14 个属性列,包括 l_comment(STRING)、l_commitdate(STRING)、l_discount(DOUBLE)、l_extendedprice(DOUBLE)、l_linestatus(STRING)、l_partkey(LONG)、l_quantity(DOUBLE)、l_receiptdate(STRING)等,数据总量 384,016,850 条。
-
(可选)在数据表上创建全局二级索引。具体操作,请参见使用二级索引。
说明查询条件涉及数据表非主键列时,建议创建全局二级索引加速查询。
全局二级索引在指定列上建立索引,生成的索引表按指定索引列排序。数据表的每次写入,都会以异步方式自动同步到索引表。
本示例中全局二级索引的主键包括
l_shipdate(STRING,分区键)、l_orderkey(INTEGER)和l_linenumber(INTEGER);预定义列包括l_returnflag(STRING)、l_quantity(DOUBLE)、l_linestatus(STRING)、l_extendedprice(DOUBLE)、l_discount(DOUBLE)和l_tax(DOUBLE)。
步骤二:在EMR集群侧创建Spark外表
-
登录 Master 节点。
-
执行如下命令,启动 Spark SQL CLI。
spark-sql --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark2-emrsdk/*启动界面如下所示。
23/07/19 14:42:23 INFO [main] metastore: Opened a connection to metastore, current connections: 1 23/07/19 14:42:24 INFO [main] metastore: Connected to metastore. 23/07/19 14:42:24 INFO [main] Hive: instanced a metaStoreClient with type: com.sun.proxy.$Proxy30 Spark master: yarn, Application Id: application_1689747666517_0003 23/07/19 14:42:24 INFO [main] SparkSQLCLIDriver: Spark master: yarn, Application Id: application_1689747666517_0003 spark-sql> -
创建 Spark 外表,连接全局二级索引。参数说明如下。
参数
说明
endpoint
表格存储实例访问地址,建议使用VPC地址。使用VPC时,请确保EMR集群和表格存储实例之间网络可连通。更多信息,请参见服务地址。
access.key.id
阿里云账号或RAM用户的AccessKey ID和AccessKey Secret。获取方式请参见创建AccessKey。
access.key.secret
instance.name
表格存储实例名称。
table.name
表格存储数据表名称。
split.size.mbs
每个数据切片(Split)的大小,单位为 MB。默认值:100。
max.split.count
数据表最大 Split 数,对应 Spark 查询并发度。默认值:1000。
catalog
数据表的 Schema 定义,JSON 格式。
示例:
DROP TABLE IF EXISTS tpch_lineitem;
CREATE TABLE tpch_lineitem
USING tablestore
OPTIONS(endpoint="http://vehicle-test.cn-hangzhou.vpc.tablestore.aliyuncs.com",
access.key.id="",
access.key.secret="",
instance.name="vehicle-test",
table.name="tpch_lineitem_perf",
split.size.mbs=10,
max.split.count=1000,
catalog='{"columns":{"l_orderkey":{"type":"long"},"l_partkey":{"type":"long"},"l_suppkey":{"type":"long"},"l_linenumber":{"type":"long"},"l_quantity":{"type":"double"},"l_extendedprice":{"type":"double"},"l_discount":{"type":"double"},"l_tax":{"type":"double"},"l_returnflag":{"type":"string"},"l_linestatus":{"type":"string"},"l_shipdate":{"type":"string"},"l_commitdate":{"type":"string"},"l_receiptdate":{"type":"string"},"l_shipinstruct":{"type":"string"},"l_shipmode":{"type":"string"},"l_comment":{"type":"string"}}}'
);
步骤三:SQL查询
以下为不同查询场景的 SQL 示例,可根据实际业务组合使用。
-
全表扫描
SQL:
SELECT COUNT(*) FROM tpch_lineitem;平均耗时 35.237 s(三次耗时:36.199 s、34.711 s、34.801 s)。
-
主键查询
SQL:
SELECT COUNT(*) FROM tpch_lineitem WHERE l_orderkey = 1 AND l_linenumber = 1;表格存储服务端执行 GetRow 操作,平均耗时 0.585 ms。
-
非主键查询(未开启全局二级索引)
SQL:
SELECT count(*) FROM tpch_lineitem WHERE l_shipdate = '1996-06-06';平均耗时 37.149 s(三次耗时:37.006 s、37.269 s、37.17 s)。
-
非主键查询(已开启全局二级索引)
SQL:
SELECT count(*) FROM tpch_lineitem WHERE l_shipdate = '1996-06-06';在 l_shipdate 列上创建全局二级索引后,平均耗时降至 1.707 s(三次耗时:1.686 s、1.651 s、1.784 s),查询速度提升约 22 倍。
连接多元索引
将 Spark 连接到表格存储数据表和多元索引后,通过 Spark 外表执行查询时,系统会自动使用指定的多元索引。
步骤一:在表格存储侧创建数据表和多元索引
-
创建数据表。具体操作,请参见创建数据表。
本示例中,数据表名称为 geo_table,主键列为 pk1(String 类型),属性列包括 val_keyword1、val_keyword2、val_keyword3(均为 String 类型)、val_bool(Boolean 类型)、val_double(Double 类型)、val_long1、val_long2(均为 Long 类型)、val_text(String 类型)和 val_geo(String 类型),数据总量 208,912,382 条。数据样例请参见数据表详情。
-
在数据表上创建多元索引。具体操作,请参见创建及使用多元索引。
创建多元索引时,根据字段的数据类型选择对应的字段类型映射(Mapping)。
说明地理位置字段需选择字段类型为地理位置(Geo-point),而非字符串类型。
本示例中索引字段与类型映射如下:pk1(字符串)、val_double(浮点数)、val_keyword3(字符串)、val_bool(布尔值)、val_geo(地理位置)、val_long1(长整型)、val_long2(长整型)、val_text(分词字符串,分词类型为最大数量语义分词)、val_keyword2(字符串)、val_keyword1(字符串),数组均为否。预排序方式为主键预排序(升序)。
多元索引创建后,会自动开始同步数据表中的数据。待多元索引进入增量状态时,表示索引构建完成。
在索引列表中,确认
geo_table表的多元索引geo_table_index的同步状态列显示为增量。
步骤二:在EMR集群侧创建Spark外表
-
登录 Master 节点。
-
创建 Spark 外表,连接多元索引。参数说明如下。
参数
说明
endpoint
表格存储实例访问地址,建议使用VPC地址。使用VPC时,请确保EMR集群和表格存储实例之间网络可连通。更多信息,请参见服务地址。
access.key.id
阿里云账号或RAM用户的AccessKey ID和AccessKey Secret。获取方式请参见创建AccessKey。
access.key.secret
instance.name
表格存储实例名称。
table.name
表格存储数据表名称。
search.index.name
多元索引名称。
max.split.count
多元索引 Parallel Scan 的查询并发度,对应 Spark 的 Split 数。
push.down.range.long
是否将 Long 类型列的范围谓词(>=、>、<、<=)下推。Boolean 类型,默认值为 true。更多信息,请参见批计算谓词下推配置。
-
true:将 Long 类型列的范围比较谓词下推。
-
false:不下推 Long 类型列的范围比较谓词。
push.down.range.string
是否将 String 类型列的范围谓词(>=、>、<、<=)下推。Boolean 类型,默认值为 true。更多信息,请参见批计算谓词下推配置。
-
true:将 String 类型列的范围比较谓词下推。
-
false:不下推 String 类型列的范围比较谓词。
示例:
DROP TABLE IF EXISTS geo_table; CREATE TABLE geo_table ( pk1 STRING, val_keyword1 STRING, val_keyword2 STRING, val_keyword3 STRING, val_bool BOOLEAN, val_double DOUBLE, val_long1 LONG, val_long2 LONG, val_text STRING, val_geo STRING COMMENT "geo stored in string format" ) USING tablestore OPTIONS(endpoint="https://sparksearchtest.cn-hangzhou.vpc.tablestore.aliyuncs.com", access.key.id="", access.key.secret="", instance.name="sparksearchtest", table.name="geo_table", search.index.name="geo_table_index", max.split.count=64, push.down.range.long = false, push.down.range.string = false ); -
步骤三:SQL查询
以下为不同查询场景的 SQL 示例,可根据实际业务组合使用。
-
使用多元索引全表查询
SQL:
SELECT COUNT(*) FROM geo_table;测试数据 208,912,382 条,配置 64 个 Parallel Scan 并发,实际耗时 165.208 s,平均QPS约 126.45 万。
208912382 Time taken: 165.208 seconds, Fetched 1 row(s) 20/06/29 20:55:11 INFO [main] SparkSQLCLIDriver: Time taken: 165.208 seconds, Fetched 1 row(s) -
组合条件查询
SQL:
SELECT val_long1, val_long2, val_keyword1, val_double FROM geo_table WHERE (val_long1 > 17183057 AND val_long1 < 27183057) AND (val_long2 > 1000 AND val_long2 < 5000) LIMIT 100;Spark 将投影列和过滤器下推至多元索引,实际耗时 2.728 s,查询效率大幅提升。
21423964 4017 aaa 2501.9901650365096 21962236 2322 eio 2775.9021545044116 Time taken: 2.894 seconds, Fetched 100 row(s) 20/06/30 18:51:24 INFO [main] SparkSQLCLIDriver: Time taken: 2.894 second -
地理位置查询
地理位置查询支持三种方式。示例中 val_geo 为地理位置字段名,坐标格式均为"纬度,经度"。
-
地理距离查询——查询距中心点指定半径范围内的数据。
语法:
val_geo = '{"centerPoint":"<纬度,经度>", "distanceInMeter": <距离>}'SQL:
SELECT COUNT(*) FROM geo_table WHERE val_geo = '{"centerPoint":"6.530045901643962,9.05358919674954", "distanceInMeter": 3000.0}'; -
地理长方形查询——查询矩形区域内的数据,由左上角和右下角坐标定义。
语法:
val_geo = '{"topLeft":"<纬度,经度>", "bottomRight": "<纬度,经度>"}'SQL:
SELECT COUNT(*) FROM geo_table WHERE val_geo = '{"topLeft":"6.257664116603074,9.1595116589601", "bottomRight": "6.153593333442616,9.25968497923747"}'; -
地理多边形范围查询——查询多边形区域内的数据,由一组坐标点定义。
语法:
val_geo = '{"points":["<坐标1>", "<坐标2>", ..., "<坐标n>"]}'SQL:
SELECT COUNT(*) FROM geo_table WHERE val_geo = '{"points":["6.530045901643962,9.05358919674954", "6.257664116603074,9.1595116589601", "6.160393397574926,9.256517839929597", "6.16043846779313,9.257192872563525"]}';
-