Spark Connector

MaxCompute开放存储支持Spark通过Connector调用Storage API,直接读取MaxCompute的数据,简化了读取数据的过程,提高了数据访问性能。同时,Spark集成MaxCompute的数据存储能力,实现了高效、灵活和强大的数据处理和分析。

适用范围

  • 第三方引擎访问MaxCompute

    • 支持读取普通表、分区表、聚簇表、Delta Table和物化视图;

    • 不支持读取MaxCompute的外部表、逻辑视图。

  • 不支持读JSON数据类型。

  • 开放存储(按量付费)每个租户的请求并发数限制默认为1000个,并且每个并发传输速率为10 MB/s。

操作步骤

  1. 开通MaxCompute创建MaxCompute项目

  2. 安装Git

  3. 购买独享数据传输服务资源组(包年包月)开通开放存储(按量付费)资源

  4. 部署Spark开发环境

    Spark包请使用Spark 3.2.x - Spark 3.5.x版本,单击Spark下载并解压到本地目录。

    1. Spark开发环境搭建在Linux操作系统下,详情请参见搭建Linux开发环境

    2. Spark开发环境搭建在Windows操作系统下,详情请参见搭建Windows开发环境

  5. 下载并编译Spark Connector(当前只支持Spark 3.2.x~Spark 3.5.x版本,下文以Spark 3.3.1版本为例)。

    使用git clone命令下载Spark Connector安装包,需确保该环境已安装Git,否则执行该命令将会报错。

    ## 下载Spark Connector:
    git clone https://github.com/aliyun/aliyun-maxcompute-data-collectors.git
    
    ## 切换到spark connector目录cd 
    cd aliyun-maxcompute-data-collectors/spark-connector 
    
    ## 编译
    mvn clean package
    
    ## Datasource Jar包位置
    datasource/target/spark-odps-datasource-3.3.1-odps0.43.0.jar
    
    ## 将Datasource Jar包拷贝到 $SPARK_HOME/jars/目录下
    cp datasource/target/spark-odps-datasource-3.3.1-odps0.43.0.jar $SPARK_HOME/jars/
  6. 配置MaxCompute账号访问信息。

    Sparkconf目录下创建spark-defaults.conf文件:

    cd $SPARK_HOME/conf
    vim spark-defaults.conf

    spark-defaults.conf文件中配置账号信息:

    ## 在spark-defaults.conf配置账号
    spark.hadoop.odps.project.name=doc_test
    spark.hadoop.odps.access.id=L********************
    spark.hadoop.odps.access.key=*******************
    spark.hadoop.odps.end.point=http://service.cn-beijing.maxcompute.aliyun.com/api
    spark.hadoop.odps.tunnel.quota.name=ot_xxxx_p#ot_xxxx
    ## 配置MaxCompute Catalog
    spark.sql.catalog.odps=org.apache.spark.sql.execution.datasources.v2.odps.OdpsTableCatalog 
    spark.sql.extensions=org.apache.spark.sql.execution.datasources.v2.odps.extension.OdpsExtensions
  7. 通过Spark Connector使用MaxCompute。

    1. 使用如下命令在Sparkbin目录下启动Spark SQL客户端:

      cd $SPARK_HOME/bin
      spark-sql
    2. 查询MaxCompute项目中存在的表:

      SHOW tables in odps.doc_test;

      doc_test为示例MaxCompute项目名称,实际运行时请替换为您的MaxCompute项目名称。

    3. 创建表:

      CREATE TABLE odps.doc_test.mc_test_table (name STRING, num BIGINT);
    4. 读取表中数据:

      SELECT * FROM odps.doc_test.mc_test_table;
    5. 创建分区表:

       CREATE TABLE odps.doc_test.mc_test_table_pt (name STRING, num BIGINT) PARTITIONED BY (pt1 STRING, pt2 STRING);
    6. 读取分区表中数据:

      SELECT * FROM odps.doc_test.mc_test_table_pt;

      返回结果示例如下:

      test1   1       2018    0601
      test2   2       2018    0601
      Time taken: 1.312 seconds, Fetched 2 row(s)
    7. 删除表:

      DROP TABLE IF EXISTS odps.doc_test.mc_test_table;