MaxCompute开放存储支持Spark通过Connector调用Storage API,直接读取MaxCompute的数据,简化了读取数据的过程,提高了数据访问性能。同时,Spark集成MaxCompute的数据存储能力,实现了高效、灵活和强大的数据处理和分析。
前提条件
已开通MaxCompute服务并创建MaxCompute项目,详情请参见开通MaxCompute和创建MaxCompute项目。
使用限制
第三方引擎访问MaxCompute时,支持读取分区表、聚簇表、物化视图;不支持读取MaxCompute的外部表、逻辑视图、Delta Table。
不支持读JSON数据类型。
开放存储(按量付费)每个租户的请求并发数限制默认为1000个,并且每个并发传输速率为10 MB/s。
操作步骤
部署Spark开发环境。
本文的Spark开发环境搭建在Linux操作系统下,详情请参见搭建Linux开发环境。您也可以选择搭建在Windows操作系统下,详情请参见搭建Windows开发环境。
重要Spark包请使用Spark3.3.x版本,单击Spark下载并解压到本地目录。
下载并编译Spark Connector(当前只支持Spark 3.3.x版本)。
## 下载Spark Connector: git clone https://github.com/aliyun/aliyun-maxcompute-data-collectors.git ## 切换到spark connector目录cd cd aliyun-maxcompute-data-collectors/spark-connector ## 编译 mvn clean package ## Datasource Jar包位置 datasource/target/spark-odps-datasource-3.3.1-odps0.43.0.jar ## 将Datasource Jar包拷贝到 $SPARK_HOME/jars/目录下 cp datasource/target/spark-odps-datasource-3.3.1-odps0.43.0.jar $SPARK_HOME/jars/
配置MaxCompute账号访问信息。
在Spark的
conf
目录下创建spark-defaults.conf
文件:cd $SPARK_HOME/conf vim spark-defaults.conf
文件内容示例如下:
## 在spark-defaults.conf配置账号 spark.hadoop.odps.project.name=doc_test spark.hadoop.odps.access.id=L******************** spark.hadoop.odps.access.key=******************* spark.hadoop.odps.end.point=http://service.cn-beijing.maxcompute.aliyun.com/api spark.hadoop.odps.tunnel.quota.name=ot_xxxx_p#ot_xxxx ##配置MaxCompute Catalog spark.sql.catalog.odps=org.apache.spark.sql.execution.datasources.v2.odps.OdpsTableCatalog spark.sql.extensions=org.apache.spark.sql.execution.datasources.v2.odps.extension.OdpsExtensions
参数说明如下:
分类
参数
是否必填
说明
MaxCompute实例信息
spark.hadoop.odps.project.name
是
MaxCompute项目(Project)名称。
此处为MaxCompute项目名称,非工作空间名称。您可以登录MaxCompute控制台,左上角切换地域后,在左侧导航栏选择工作区>项目管理,查看具体的MaxCompute项目名称。
spark.hadoop.odps.access.id
是
具备目标MaxCompute项目访问权限的AccessKey ID。
您可以进入AccessKey管理页面获取AccessKey ID。
spark.hadoop.odps.access.key
是
AccessKey ID对应的AccessKey Secret。
spark.hadoop.odps.end.point
是
MaxCompute服务的连接地址。您需要根据创建MaxCompute项目时选择的地域以及网络连接方式配置Endpoint。各地域及网络对应的Endpoint值,请参见Endpoint。
说明当前仅支持使用阿里云VPC网络。
spark.hadoop.odps.tunnel.quota.name
否
访问MaxCompute使用的Quota名称。访问MaxCompute支持独享数据传输服务资源组(包年包月)和开放存储(按量计费)两种资源,获取Quota名称的方式分别如下:
独享数据传输服务资源组:登录MaxCompute控制台,左上角切换地域后,在左侧导航栏选择工作区>配额(Quota)管理,查看可使用的Quota列表。具体操作,请参见计算资源-Quota管理。
开放存储:登录MaxCompute控制台,在左侧导航栏选择租户管理>租户属性,开启开放存储并进行授权操作。具体操作,请参见使用开放存储(按量付费)。
开放存储资源名称默认为
pay-as-you-go
。
Catalog相关配置
spark.sql.catalog.odps
是
指定Spark Catalog,值需要设置为
org.apache.spark.sql.execution.datasources.v2.odps.OdpsTableCatalog
。spark.sql.extensions
是
指定Spark会话扩展,值需要设置为
org.apache.spark.sql.execution.datasources.v2.odps.extension.OdpsExtensions
。spark.sql.defaultCatalog
否
指定是否为默认Catalog,如果需要将MaxCompute设置为默认Catalog,需要将该参数值设置为
odps
。spark.sql.catalog.odps.enableVectorizedReader
否
指定是否开启向量化读,默认值为
true
。spark.sql.catalog.odps.columnarReaderBatchSize
否
指定向量化读每个批处理包含的行数,默认值为
4096
。spark.sql.catalog.odps.splitSizeInMB
否
指定表切片大小,决定读表并行度,单位为MB,默认值为
256
。spark.sql.catalog.odps.enableNamespaceSchema
否
如果MaxCompute项目开启三层模型,值需要设置为
true
。说明Spark通过Connector调用Storage API读取MaxCompute的数据时,Spark作业并发数决定可使用的MaxCompute Tunnel并发数。一个Spark并发读或写对应一个Tunnel并发,而一个Spark并发可能同时读写,因此Tunnel并发数会介于Spark的并发数1~2倍之间。例如,Spark的任务拉起1000并发,那么Tunnel并发数会介于1000到2000之间。
通过Spark Connector使用MaxCompute。
使用如下命令在Spark的
bin
目录下启动Spark SQL客户端:cd $SPARK_HOME/bin spark-sql
使用Spark SQL客户端执行命令示例如下:
查询MaxCompute项目中存在的表:
SHOW tables in odps.doc_test;
doc_test
为示例MaxCompute项目名称,实际运行时请替换为您的MaxCompute项目名称。创建表:
CREATE TABLE odps.doc_test.mc_test_table (name STRING, num BIGINT);
读取表中数据:
SELECT * FROM odps.doc_test.mc_test_table;
返回示例如下:
test1 1 test2 2 Time taken: 1.279 seconds, Fetched 2 row(s)
创建分区表:
CREATE TABLE odps.doc_test.mc_test_table_pt (name STRING, num BIGINT) PARTITIONED BY (pt1 STRING, pt2 STRING);
读取分区表中数据:
SELECT * FROM odps.doc_test.mc_test_table_pt;
返回结果示例如下:
test1 1 2018 0601 test2 2 2018 0601 Time taken: 1.312 seconds, Fetched 2 row(s)
删除表:
DROP TABLE IF EXISTS odps.doc_test.mc_test_table;