文档

Spark Connector

更新时间:

MaxCompute开放存储支持Spark通过Connector调用Storage API,直接读取MaxCompute的数据,简化了读取数据的过程,提高了数据访问性能。同时,Spark集成MaxCompute的数据存储能力,实现了高效、灵活和强大的数据处理和分析。

前提条件

已开通MaxCompute服务并创建MaxCompute项目,详情请参见开通MaxCompute创建MaxCompute项目

使用限制

  • 不支持读写JSON数据类型的数据。

  • 不支持读取MaxCompute的外部表、聚簇表、物化视图、以及视图数据。

操作步骤

  1. 部署Spark开发环境。

    本文的Spark开发环境搭建在Linux操作系统下,详情请参见搭建Linux开发环境。您也可以选择搭建在Windows操作系统下,详情请参见搭建Windows开发环境

    重要

    Spark包请使用Spark3.3.x版本,单击Spark下载并解压到本地目录。

  2. 下载并编译Spark Connector(当前只支持Spark 3.3.x版本)。

    ## 下载Spark Connector:
    git clone https://github.com/aliyun/aliyun-maxcompute-data-collectors.git
    
    ## 切换到spark connector目录cd 
    cd aliyun-maxcompute-data-collectors/spark-connector 
    
    ## 编译
    mvn clean package
    
    ## Datasource Jar包位置
    datasource/target/spark-odps-datasource-3.3.1-odps0.43.0.jar
    
    ## 将Datasource Jar包拷贝到 $SPARK_HOME/jars/目录下
    cp datasource/target/spark-odps-datasource-3.3.1-odps0.43.0.jar $SPARK_HOME/jars/
  3. 配置MaxCompute账号访问信息。

    在Spark的conf目录下创建spark-defaults.conf文件:

    cd $SPARK_HOME/conf
    vim spark-defaults.conf

    文件内容示例如下:

    ## 在spark-defaults.conf配置账号
    spark.hadoop.odps.project.name=doc_test
    spark.hadoop.odps.access.id=L********************
    spark.hadoop.odps.access.key=*******************
    spark.hadoop.odps.end.point=http://service.cn-beijing.maxcompute.aliyun.com/api
    spark.hadoop.odps.tunnel.quota.name=ot_xxxx_p#ot_xxxx
    ##配置MaxCompute Catalog
    spark.sql.catalog.odps=org.apache.spark.sql.execution.datasources.v2.odps.OdpsTableCatalog 
    spark.sql.extensions=org.apache.spark.sql.execution.datasources.v2.odps.extension.OdpsExtensions

    参数说明如下:

    分类

    参数

    是否必填

    说明

    MaxCompute实例信息

    spark.hadoop.odps.project.name

    MaxCompute项目(Project)名称。

    此处为MaxCompute项目名称,非工作空间名称。您可以登录MaxCompute控制台,左上角切换地域后,在左侧导航栏选择工作区>项目管理,查看具体的MaxCompute项目名称。

    spark.hadoop.odps.access.id

    具备目标MaxCompute项目访问权限的AccessKey ID。

    您可以进入AccessKey管理页面获取AccessKey ID。

    spark.hadoop.odps.access.key

    AccessKey ID对应的AccessKey Secret。

    您可以进入AccessKey管理页面获取AccessKey Secret。

    spark.hadoop.odps.end.point

    MaxCompute项目所属区域的外网Endpoint。

    各地域的外网Endpoint信息,请参见Endpoint

    spark.hadoop.odps.tunnel.quota.name

    访问MaxCompute使用的Quota名称。访问MaxCompute支持独享数据传输服务资源组(包年包月)和开放存储(按量计费)两种资源,获取Quota名称的方式分别如下:

    • 独享数据传输服务资源组:登录MaxCompute控制台,左上角切换地域后,在左侧导航栏选择工作区>配额(Quota)管理,查看可使用的Quota列表。具体操作,请参见Quota管理(新版)

    • 开放存储:登录MaxCompute控制台,在左侧导航栏选择租户管理>租户属性,开启开放存储,开放存储资源名称默认为pay-as-you-go。

    Catalog相关配置

    spark.sql.catalog.odps

    指定Spark Catalog,值需要设置为org.apache.spark.sql.execution.datasources.v2.odps.OdpsTableCatalog

    spark.sql.extensions

    指定Spark会话扩展,值需要设置为org.apache.spark.sql.execution.datasources.v2.odps.extension.OdpsExtensions

    spark.sql.defaultCatalog

    指定是否为默认Catalog,如果需要将MaxCompute设置为默认Catalog,需要将该参数值设置为odps

    spark.sql.catalog.odps.enableVectorizedReader

    指定是否开启向量化读,默认值为true

    spark.sql.catalog.odps.columnarReaderBatchSize

    指定向量化读每个批处理包含的行数,默认值为4096

    spark.sql.catalog.odps.splitSizeInMB

    指定表切片大小,决定读表并行度,单位为MB,默认值为256

    spark.sql.catalog.odps.enableNamespaceSchema

    如果MaxCompute项目开启三层模型,值需要设置为true

    说明

    Spark通过Connector调用Storage API读取MaxCompute的数据时,Spark作业并发数决定可使用的MaxCompute Tunnel并发数。一个Spark并发读或写对应一个Tunnel并发,而一个Spark并发可能同时读写,因此Tunnel并发数会介于Spark的并发数1~2倍之间。例如,Spark的任务拉起1000并发,那么Tunnel并发数会介于1000到2000之间。

  4. 通过Spark Connector使用MaxCompute。

    使用如下命令在Spark的bin目录下启动Spark SQL客户端:

    cd $SPARK_HOME/bin
    spark-sql

    使用Spark SQL客户端执行命令示例如下:

    • 查询MaxCompute项目中存在的表:

      SHOW tables in odps.doc_test;

      doc_test为示例MaxCompute项目名称,实际运行时请替换为您的MaxCompute项目名称。

    • 创建表:

      CREATE TABLE odps.doc_test.mc_test_table (name STRING, num BIGINT);
    • 读取表中数据:

      SELECT * FROM odps.doc_test.mc_test_table;

      返回示例如下:

      test1   1
      test2   2
      Time taken: 1.279 seconds, Fetched 2 row(s)
    • 创建分区表:

       CREATE TABLE odps.doc_test.mc_test_table_pt (name STRING, num BIGINT) PARTITIONED BY (pt1 STRING, pt2 STRING);
    • 读取分区表中数据:

      SELECT * FROM odps.doc_test.mc_test_table_pt;

      返回结果示例如下:

      test1   1       2018    0601
      test2   2       2018    0601
      Time taken: 1.312 seconds, Fetched 2 row(s)
    • 删除表:

      DROP TABLE IF EXISTS odps.doc_test.mc_test_table;