文档

Spark Connector

更新时间:

为了更好地融入大数据生态,MaxCompute开放了存储组件(Storage API),通过调用Storage API直接访问MaxCompute底层存储,有助于提高第三方引擎访问MaxCompute数据的速度与效率。本文为您介绍如何使用第三方计算引擎Spark通过Spark Connector调用Storage API来访问MaxCompute数据。

背景信息

使用Spark引擎调用MaxCompute的Storage API处理MaxCompute数据,满足您数据开放、多引擎使用场景,同时Spark结合MaxCompute的数据存储能力,可以实现高效、灵活和强大的数据处理和分析能力。阿里云提供了Spark Connector来简化Spark与MaxCompute之间的集成,MaxCompute支持通过Spark Connector调用Storage API直接访问MaxCompute底层存储,并且支持高并发的数据读写操作,而不通过MaxCompute前端服务层,提供更直接访问和操作数据的能力,提高了数据访问和处理的效率。架构图如下:

image

前提条件

已开通MaxCompute服务并创建MaxCompute项目,详情请参见开通MaxCompute创建MaxCompute项目

使用限制

  • 目前Spark Connector只能使用独享Tunnel并发资源组,不能使用共享资源组,详情请参见购买与使用独享数据传输服务资源组

  • 不支持读写JSON数据类型的数据。

  • 不支持读写外部表、聚簇表、物化视图和视图的数据。

操作步骤

  1. 部署Spark开发环境。

    本文的Spark开发环境搭建在Linux操作系统下,详情请参见搭建Linux开发环境。您也可以选择搭建在Windows操作系统下,详情请参见搭建Windows开发环境

    重要

    Spark包请使用Spark3.3.x版本,单击Spark下载并解压到本地目录。

  2. 下载并编译Spark Connector(当前只支持Spark 3.3.x版本)。

    说明

    如果您编译时遇到网络等原因导致编译失败,可以单击spark-odps-datasource-3.3.1-odps0.43.0.jar,下载已经编译成功的JAR包,然后将JAR包放在$SPARK_HOME/jars/目录下。

    ## 下载Spark Connector:
    git clone https://github.com/aliyun/aliyun-maxcompute-data-collectors.git
    
    ## 切换到spark connector目录cd 
    cd aliyun-maxcompute-data-collectors/spark-connector 
    
    ## 编译
    mvn clean package
    
    ## Datasource Jar包位置
    datasource/target/spark-odps-datasource-3.3.1-odps0.43.0.jar
    
    ## 将Datasource Jar包拷贝到 $SPARK_HOME/jars/目录下
    cp datasource/target/spark-odps-datasource-3.3.1-odps0.43.0.jar $SPARK_HOME/jars/
  3. 配置MaxCompute账号访问信息。

    在Spark的conf目录下创建spark-defaults.conf文件:

    cd $SPARK_HOME/conf
    vim spark-defaults.conf

    文件内容示例如下:

    ## 在spark-defaults.conf配置账号
    spark.hadoop.odps.project.name=doc_test
    spark.hadoop.odps.access.id=L********************
    spark.hadoop.odps.access.key=*******************
    spark.hadoop.odps.end.point=http://service.cn-beijing.maxcompute.aliyun.com/api
    spark.hadoop.odps.tunnel.quota.name=ot_xxxx_p#ot_xxxx
    ##配置MaxCompute Catalog
    spark.sql.catalog.odps=org.apache.spark.sql.execution.datasources.v2.odps.OdpsTableCatalog 
    spark.sql.sources.partitionOverwriteMode=dynamic
    spark.sql.extensions=org.apache.spark.sql.execution.datasources.v2.odps.extension.OdpsExtensions

    参数说明如下:

    分类

    参数

    是否必填

    说明

    MaxCompute实例信息

    spark.hadoop.odps.project.name

    MaxCompute项目(Project)名称。

    此处为MaxCompute项目名称,非工作空间名称。您可以登录MaxCompute控制台,左上角切换地域后,在左侧导航栏选择工作区>项目管理,查看具体的MaxCompute项目名称。

    spark.hadoop.odps.access.id

    具备目标MaxCompute项目访问权限的AccessKey ID。

    您可以进入AccessKey管理页面获取AccessKey ID。

    spark.hadoop.odps.access.key

    AccessKey ID对应的AccessKey Secret。

    您可以进入AccessKey管理页面获取AccessKey Secret。

    spark.hadoop.odps.end.point

    MaxCompute项目所属区域的外网Endpoint。

    各地域的外网Endpoint信息,请参见Endpoint

    spark.hadoop.odps.tunnel.quota.name

    访问MaxCompute使用的Quota名称。

    访问MaxCompute支持独享Tunnel资源或者开放存储按量模式。两种模式获取Quota名称的方式如下:

    • 独享Tunnel模式:登录MaxCompute控制台,左上角切换地域后,在左侧导航栏选择工作区>配额(Quota)管理,查看可使用的Quota列表。具体操作,请参见查看Quota

    • 开放存储按量模式:spark.hadoop.odps.tunnel.quota.name=pay-as-you-go

    Catalog相关配置

    spark.sql.catalog.odps

    指定Spark Catalog,值需要设置为org.apache.spark.sql.execution.datasources.v2.odps.OdpsTableCatalog

    spark.sql.sources.partitionOverwriteMode

    指定INSERT OVERWRITE一个分区的数据源表时的模式,值需要设置为dynamic,即Spark不会提前删除分区,只覆盖在运行时写入了数据的分区。

    spark.sql.extensions

    指定Spark会话扩展,值需要设置为org.apache.spark.sql.execution.datasources.v2.odps.extension.OdpsExtensions

    spark.sql.defaultCatalog

    指定是否为默认Catalog,如果需要将MaxCompute设置为默认Catalog,需要将该参数值设置为odps

    spark.sql.catalog.odps.enableVectorizedReader

    指定是否开启向量化读,默认值为true

    spark.sql.catalog.odps.columnarReaderBatchSize

    指定向量化读每个批处理包含的行数,默认值为4096

    spark.sql.catalog.odps.enableVectorizedWriter

    指定是否开启向量化写,默认值为true

    spark.sql.catalog.odps.columnarWriterBatchSize

    指定向量化写每个批处理包含的行数,默认值为4096

    spark.sql.catalog.odps.splitSizeInMB

    指定表切片大小,决定读表并行度,单位MB,默认值为256

    spark.sql.catalog.odps.enableNamespaceSchema

    如果MaxCompute项目开启三层模型,值需要设置为true

  4. 通过Spark Connector使用MaxCompute。

    使用如下命令在Spark的bin目录下启动Spark SQL客户端:

    cd $SPARK_HOME/bin
    spark-sql

    使用Spark SQL客户端执行命令示例如下:

    • 查询MaxCompute项目中存在的表:

      SHOW tables in odps.doc_test;

      doc_test为示例MaxCompute项目名称,实际运行时请替换为您的MaxCompute项目名称。

    • 创建表:

      CREATE TABLE odps.doc_test.mc_test_table (name STRING, num BIGINT);
    • 表中写入数据:

      INSERT INTO TABLE odps.doc_test.mc_test_table VALUES ('test1', 1),('test2',2);
    • 读取表中数据:

      SELECT * FROM odps.doc_test.mc_test_table;

      返回示例如下:

      test1   1
      test2   2
      Time taken: 1.279 seconds, Fetched 2 row(s)
    • 创建分区表:

       CREATE TABLE odps.doc_test.mc_test_table_pt (name STRING, num BIGINT) PARTITIONED BY (pt1 STRING, pt2 STRING);
    • 分区表中写入数据:

      INSERT OVERWRITE TABLE odps.doc_test.mc_test_table_pt PARTITION (pt1='2018', pt2='0601') SELECT * FROM odps.doc_test.mc_test_table;
    • 读取分区表中数据:

      SELECT * FROM odps.doc_test.mc_test_table_pt;

      返回结果示例如下:

      test1   1       2018    0601
      test2   2       2018    0601
      Time taken: 1.312 seconds, Fetched 2 row(s)
    • 删除表:

      DROP TABLE IF EXISTS odps.doc_test.mc_test_table;
  • 本页导读 (1)
文档反馈