MaxCompute开放存储支持Spark通过Connector调用Storage API,直接读取MaxCompute的数据,简化了读取数据的过程,提高了数据访问性能。同时,Spark集成MaxCompute的数据存储能力,实现了高效、灵活和强大的数据处理和分析。
适用范围
第三方引擎访问MaxCompute时
支持读取普通表、分区表、聚簇表、Delta Table和物化视图;
不支持读取MaxCompute的外部表、逻辑视图。
不支持读JSON数据类型。
开放存储(按量付费)每个租户的请求并发数限制默认为1000个,并且每个并发传输速率为10 MB/s。
操作步骤
部署Spark开发环境
Spark包请使用
Spark 3.2.x - Spark 3.5.x版本,单击Spark下载并解压到本地目录。若Spark开发环境搭建在Linux操作系统下,详情请参见搭建Linux开发环境。
若Spark开发环境搭建在Windows操作系统下,详情请参见搭建Windows开发环境。
下载并编译Spark Connector(当前只支持Spark 3.2.x~Spark 3.5.x版本,下文以Spark 3.3.1版本为例)。
使用
git clone命令下载Spark Connector安装包,需确保该环境已安装Git,否则执行该命令将会报错。## 下载Spark Connector: git clone https://github.com/aliyun/aliyun-maxcompute-data-collectors.git ## 切换到spark connector目录cd cd aliyun-maxcompute-data-collectors/spark-connector ## 编译 mvn clean package ## Datasource Jar包位置 datasource/target/spark-odps-datasource-3.3.1-odps0.43.0.jar ## 将Datasource Jar包拷贝到 $SPARK_HOME/jars/目录下 cp datasource/target/spark-odps-datasource-3.3.1-odps0.43.0.jar $SPARK_HOME/jars/配置MaxCompute账号访问信息。
在Spark的
conf目录下创建spark-defaults.conf文件:cd $SPARK_HOME/conf vim spark-defaults.conf在
spark-defaults.conf文件中配置账号信息:## 在spark-defaults.conf配置账号 spark.hadoop.odps.project.name=doc_test spark.hadoop.odps.access.id=L******************** spark.hadoop.odps.access.key=******************* spark.hadoop.odps.end.point=http://service.cn-beijing.maxcompute.aliyun.com/api spark.hadoop.odps.tunnel.quota.name=ot_xxxx_p#ot_xxxx ## 配置MaxCompute Catalog spark.sql.catalog.odps=org.apache.spark.sql.execution.datasources.v2.odps.OdpsTableCatalog spark.sql.extensions=org.apache.spark.sql.execution.datasources.v2.odps.extension.OdpsExtensions通过Spark Connector使用MaxCompute。
使用如下命令在Spark的
bin目录下启动Spark SQL客户端:cd $SPARK_HOME/bin spark-sql查询MaxCompute项目中存在的表:
SHOW tables in odps.doc_test;doc_test为示例MaxCompute项目名称,实际运行时请替换为您的MaxCompute项目名称。创建表:
CREATE TABLE odps.doc_test.mc_test_table (name STRING, num BIGINT);读取表中数据:
SELECT * FROM odps.doc_test.mc_test_table;创建分区表:
CREATE TABLE odps.doc_test.mc_test_table_pt (name STRING, num BIGINT) PARTITIONED BY (pt1 STRING, pt2 STRING);读取分区表中数据:
SELECT * FROM odps.doc_test.mc_test_table_pt;返回结果示例如下:
test1 1 2018 0601 test2 2 2018 0601 Time taken: 1.312 seconds, Fetched 2 row(s)删除表:
DROP TABLE IF EXISTS odps.doc_test.mc_test_table;