为了更好地融入大数据生态,MaxCompute开放了存储组件(Storage API),通过调用Storage API直接访问MaxCompute底层存储,有助于提高第三方引擎访问MaxCompute数据的速度与效率。本文为您介绍如何使用第三方计算引擎Spark通过Spark Connector调用Storage API来访问MaxCompute数据。
背景信息
使用Spark引擎调用MaxCompute的Storage API处理MaxCompute数据,满足您数据开放、多引擎使用场景,同时Spark结合MaxCompute的数据存储能力,可以实现高效、灵活和强大的数据处理和分析能力。阿里云提供了Spark Connector来简化Spark与MaxCompute之间的集成,MaxCompute支持通过Spark Connector调用Storage API直接访问MaxCompute底层存储,并且支持高并发的数据读写操作,而不通过MaxCompute前端服务层,提供更直接访问和操作数据的能力,提高了数据访问和处理的效率。架构图如下:
前提条件
已开通MaxCompute服务并创建MaxCompute项目,详情请参见开通MaxCompute和创建MaxCompute项目。
使用限制
目前Spark Connector只能使用独享Tunnel并发资源组,不能使用共享资源组,详情请参见购买与使用独享数据传输服务资源组。
不支持读写JSON数据类型的数据。
不支持读写外部表、聚簇表、物化视图和视图的数据。
操作步骤
部署Spark开发环境。
本文的Spark开发环境搭建在Linux操作系统下,详情请参见搭建Linux开发环境。您也可以选择搭建在Windows操作系统下,详情请参见搭建Windows开发环境。
重要Spark包请使用Spark3.3.x版本,单击Spark下载并解压到本地目录。
下载并编译Spark Connector(当前只支持Spark 3.3.x版本)。
说明如果您编译时遇到网络等原因导致编译失败,可以单击spark-odps-datasource-3.3.1-odps0.43.0.jar,下载已经编译成功的JAR包,然后将JAR包放在
$SPARK_HOME/jars/
目录下。## 下载Spark Connector: git clone https://github.com/aliyun/aliyun-maxcompute-data-collectors.git ## 切换到spark connector目录cd cd aliyun-maxcompute-data-collectors/spark-connector ## 编译 mvn clean package ## Datasource Jar包位置 datasource/target/spark-odps-datasource-3.3.1-odps0.43.0.jar ## 将Datasource Jar包拷贝到 $SPARK_HOME/jars/目录下 cp datasource/target/spark-odps-datasource-3.3.1-odps0.43.0.jar $SPARK_HOME/jars/
配置MaxCompute账号访问信息。
在Spark的
conf
目录下创建spark-defaults.conf
文件:cd $SPARK_HOME/conf vim spark-defaults.conf
文件内容示例如下:
## 在spark-defaults.conf配置账号 spark.hadoop.odps.project.name=doc_test spark.hadoop.odps.access.id=L******************** spark.hadoop.odps.access.key=******************* spark.hadoop.odps.end.point=http://service.cn-beijing.maxcompute.aliyun.com/api spark.hadoop.odps.tunnel.quota.name=ot_xxxx_p#ot_xxxx ##配置MaxCompute Catalog spark.sql.catalog.odps=org.apache.spark.sql.execution.datasources.v2.odps.OdpsTableCatalog spark.sql.sources.partitionOverwriteMode=dynamic spark.sql.extensions=org.apache.spark.sql.execution.datasources.v2.odps.extension.OdpsExtensions
参数说明如下:
分类
参数
是否必填
说明
MaxCompute实例信息
spark.hadoop.odps.project.name
是
MaxCompute项目(Project)名称。
此处为MaxCompute项目名称,非工作空间名称。您可以登录MaxCompute控制台,左上角切换地域后,在左侧导航栏选择工作区>项目管理,查看具体的MaxCompute项目名称。
spark.hadoop.odps.access.id
是
具备目标MaxCompute项目访问权限的AccessKey ID。
您可以进入AccessKey管理页面获取AccessKey ID。
spark.hadoop.odps.access.key
是
AccessKey ID对应的AccessKey Secret。
您可以进入AccessKey管理页面获取AccessKey Secret。
spark.hadoop.odps.end.point
是
MaxCompute项目所属区域的外网Endpoint。
各地域的外网Endpoint信息,请参见Endpoint。
spark.hadoop.odps.tunnel.quota.name
否
访问MaxCompute使用的Quota名称。
访问MaxCompute支持独享Tunnel资源或者开放存储按量模式。两种模式获取Quota名称的方式如下:
独享Tunnel模式:登录MaxCompute控制台,左上角切换地域后,在左侧导航栏选择工作区>配额(Quota)管理,查看可使用的Quota列表。具体操作,请参见查看Quota。
开放存储按量模式:
spark.hadoop.odps.tunnel.quota.name=pay-as-you-go
Catalog相关配置
spark.sql.catalog.odps
是
指定Spark Catalog,值需要设置为
org.apache.spark.sql.execution.datasources.v2.odps.OdpsTableCatalog
。spark.sql.sources.partitionOverwriteMode
是
指定INSERT OVERWRITE一个分区的数据源表时的模式,值需要设置为
dynamic
,即Spark不会提前删除分区,只覆盖在运行时写入了数据的分区。spark.sql.extensions
是
指定Spark会话扩展,值需要设置为
org.apache.spark.sql.execution.datasources.v2.odps.extension.OdpsExtensions
。spark.sql.defaultCatalog
否
指定是否为默认Catalog,如果需要将MaxCompute设置为默认Catalog,需要将该参数值设置为
odps
。spark.sql.catalog.odps.enableVectorizedReader
否
指定是否开启向量化读,默认值为
true
。spark.sql.catalog.odps.columnarReaderBatchSize
否
指定向量化读每个批处理包含的行数,默认值为
4096
。spark.sql.catalog.odps.enableVectorizedWriter
否
指定是否开启向量化写,默认值为
true
。spark.sql.catalog.odps.columnarWriterBatchSize
否
指定向量化写每个批处理包含的行数,默认值为
4096
。spark.sql.catalog.odps.splitSizeInMB
否
指定表切片大小,决定读表并行度,单位MB,默认值为
256
。spark.sql.catalog.odps.enableNamespaceSchema
否
如果MaxCompute项目开启三层模型,值需要设置为
true
。通过Spark Connector使用MaxCompute。
使用如下命令在Spark的
bin
目录下启动Spark SQL客户端:cd $SPARK_HOME/bin spark-sql
使用Spark SQL客户端执行命令示例如下:
查询MaxCompute项目中存在的表:
SHOW tables in odps.doc_test;
doc_test
为示例MaxCompute项目名称,实际运行时请替换为您的MaxCompute项目名称。创建表:
CREATE TABLE odps.doc_test.mc_test_table (name STRING, num BIGINT);
表中写入数据:
INSERT INTO TABLE odps.doc_test.mc_test_table VALUES ('test1', 1),('test2',2);
读取表中数据:
SELECT * FROM odps.doc_test.mc_test_table;
返回示例如下:
test1 1 test2 2 Time taken: 1.279 seconds, Fetched 2 row(s)
创建分区表:
CREATE TABLE odps.doc_test.mc_test_table_pt (name STRING, num BIGINT) PARTITIONED BY (pt1 STRING, pt2 STRING);
分区表中写入数据:
INSERT OVERWRITE TABLE odps.doc_test.mc_test_table_pt PARTITION (pt1='2018', pt2='0601') SELECT * FROM odps.doc_test.mc_test_table;
读取分区表中数据:
SELECT * FROM odps.doc_test.mc_test_table_pt;
返回结果示例如下:
test1 1 2018 0601 test2 2 2018 0601 Time taken: 1.312 seconds, Fetched 2 row(s)
删除表:
DROP TABLE IF EXISTS odps.doc_test.mc_test_table;
- 本页导读 (1)