读写MaxCompute

EMR Serverless Spark内置了基于Spark DataSource V2MaxCompute DataSource,只需在开发时添加对应的配置即可连接MaxCompute。本文为您介绍在EMR Serverless Spark中实现MaxCompute的读取与写入操作。

背景信息

大数据计算服务MaxCompute(原名ODPS)是一种快速、完全托管的EB级数据仓库解决方案,致力于批量结构化数据的存储和计算,提供海量数据仓库的解决方案及分析建模服务。MaxCompute的详情请参见什么是MaxCompute

前提条件

使用限制

  • 本文仅适用于引擎版本Spark引擎版本号为3.3.x的版本。例如,esr-2.5.1 (Spark 3.3.1, Scala 2.12)。

  • 本文操作需要MaxCompute开启开放存储功能,请参见开放存储使用开放存储并完成相应配置。

  • 本文操作所使用的MaxCompute Endpoint需支持存储API功能。如果不支持,请切换至支持存储APIEndpoint,详情请参见数据传输资源

注意事项

使用开放存储(按量付费),超过1 TB的部分按照实际读写数据的逻辑大小进行计费,详情请参见开放存储(按量计费)

操作流程

步骤一:创建会话以连接MaxCompute

您可以创建SQL会话,或者Notebook会话来连接MaxCompute。关于会话更多介绍,请参见会话管理

创建SQL会话以连接MaxCompute

  1. 进入会话管理页面。

    1. 登录E-MapReduce控制台

    2. 在左侧导航栏,选择EMR Serverless > Spark

    3. Spark页面,单击目标工作空间名称。

    4. EMR Serverless Spark页面,单击左侧导航栏中的会话管理

  2. SQL会话页面,单击创建SQL会话

  3. 在创建SQL会话页面,配置以下信息,单击创建

    参数

    说明

    名称

    自定义SQL会话的名称。例如,mc_sql_compute。

    Spark配置

    填写Spark配置信息,以连接阿里云MaxCompute。

    重要

    如果需要访问开启了三层模型的MaxCompute项目,还需在Spark配置信息中配置spark.sql.catalog.odps.enableNamespaceSchema参数为true。更多参数信息,请参见Spark Connector。关于Schema详情,请参见Schema操作

    spark.sql.catalog.odps                        org.apache.spark.sql.execution.datasources.v2.odps.OdpsTableCatalog
    spark.sql.extensions                          org.apache.spark.sql.execution.datasources.v2.odps.extension.OdpsExtensions
    spark.sql.sources.partitionOverwriteMode      dynamic
    spark.hadoop.odps.tunnel.quota.name           pay-as-you-go
    spark.hadoop.odps.project.name                <project_name>
    spark.hadoop.odps.end.point                   https://service.cn-hangzhou-vpc.maxcompute.aliyun-inc.com/api
    spark.hadoop.odps.access.id                   <accessId>
    spark.hadoop.odps.access.key                  <accessKey>

    请根据您的实际情况替换以下信息:

    • <project_name>:您的MaxCompute项目名称。

    • https://service.cn-hangzhou-vpc.maxcompute.aliyun-inc.com/api:您的MaxComputeEndpoint信息,详情请参见Endpoint

    • <accessId>:访问MaxCompute服务所使用阿里云账号的AccessKey ID。

    • <accessKey>:访问MaxCompute服务所使用阿里云账号的AccessKey Secret。

创建Notebook会话以连接MaxCompute

  1. 进入Notebook会话页面。

    1. 登录E-MapReduce控制台

    2. 在左侧导航栏,选择EMR Serverless > Spark

    3. Spark页面,单击目标工作空间名称。

    4. EMR Serverless Spark页面,选择左侧导航栏中的会话管理

    5. 单击Notebook会话页签。

  2. 单击创建Notebook会话

  3. 在创建Notebook会话页面,配置以下信息,单击创建

    参数

    说明

    名称

    自定义Notebook会话的名称。例如,mc_notebook_compute。

    Spark配置

    填写Spark配置信息,以连接阿里云MaxCompute。

    重要

    如果需要访问开启了Schema功能的MaxCompute项目,还需在Spark配置信息中配置spark.sql.catalog.odps.enableNamespaceSchema参数为true。更多参数信息,请参见Spark Connector。关于Schema详情,请参见Schema操作

    spark.sql.catalog.odps                        org.apache.spark.sql.execution.datasources.v2.odps.OdpsTableCatalog
    spark.sql.extensions                          org.apache.spark.sql.execution.datasources.v2.odps.extension.OdpsExtensions
    spark.sql.sources.partitionOverwriteMode      dynamic
    spark.hadoop.odps.tunnel.quota.name           pay-as-you-go
    spark.hadoop.odps.project.name                <project_name>
    spark.hadoop.odps.end.point                    https://service.cn-hangzhou-vpc.maxcompute.aliyun-inc.com/api
    spark.hadoop.odps.access.id                   <accessId>
    spark.hadoop.odps.access.key                  <accessKey>

    请根据您的实际情况替换以下信息:

    • <project_name>:您的MaxCompute项目名称。

    • https://service.cn-hangzhou-vpc.maxcompute.aliyun-inc.com/api:您的MaxComputeEndpoint信息,详情请参见Endpoint

    • <accessId>:访问MaxCompute服务所使用阿里云账号的AccessKey ID。

    • <accessKey>:访问MaxCompute服务所使用阿里云账号的AccessKey Secret。

步骤二:在MaxCompute中查询或写入数据

使用SparkSQL写入并查询MaxCompute中的数据

  1. EMR Serverless Spark页面,单击左侧导航栏中的数据开发

  2. 开发目录页签下,单击新建

  3. 新建SparkSQL。

    1. 在弹出的对话框中,输入名称(例如,mc_load_task),类型使用默认的SparkSQL,然后单击确定

    2. 复制如下代码到新增的Spark SQL页签(mc_load_task)中。

      CREATE TABLE odps.default.mc_table (name STRING, num BIGINT);
      
      INSERT INTO odps.default.mc_table (name, num) VALUES ('Alice', 100),('Bob', 200);
      
      SELECT * FROM odps.default.mc_table;
    3. 在数据库下拉列表中选择一个数据库,在Compute下拉列表中选择步骤一:创建会话以连接MaxCompute中创建的SQL会话(mc_sql_compute)。

    4. 单击运行,执行创建的SparkSQL。

      查询执行成功后,在运行结果中输出了查询结果。

      image

  4. MaxCompute控制台查看创建的表。

    1. 登录MaxCompute控制台,在左上角选择地域。

    2. 项目管理页面,单击已创建项目操作列的管理

    3. 单击Tables页签。

      即可在MaxCompute控制台看到创建了一个名为mc_table的新表。

      image

使用Notebook写入并查询MaxCompute中的数据

  1. EMR Serverless Spark页面,单击左侧导航栏中的数据开发

  2. 开发目录页签下,单击新建

  3. 新建Notebook。

    1. 在弹出的对话框中,输入名称(例如,mc_load_task),类型使用Python > Notebook,然后单击确定

    2. 在会话下拉列表中选择步骤一:创建会话以连接MaxCompute中创建的并已启动的Notebook会话(mc_notebook_compute)。

    3. 编写代码并执行。

      1. Python单元格中,输入以下命令创建表。

        spark.sql("""
        CREATE TABLE odps.default.mc_table (name STRING, num BIGINT);
        """)
        
      2. 在一个新的Python单元格中,输入以下命令插入数据。

        spark.sql("INSERT INTO odps.default.mc_table (name, num) VALUES ('Alice', 100),('Bob', 200);")
      3. 在一个新的Python单元格中,输入以下命令查询数据。

        spark.sql("SELECT * FROM odps.default.mc_table;").show()

        查询执行成功后,在运行结果中输出了查询结果。

        image

  4. MaxCompute控制台查看创建的表。

    1. 登录MaxCompute控制台,在左上角选择地域。

    2. 项目管理页面,单击已创建项目操作列的管理

    3. 单击Tables页签。

      即可在MaxCompute控制台看到创建了一个名为mc_table的新表。

      image

常见问题

为什么查询MaxCompute表时,报错Access Denied?

  • 问题现象:查询MaxCompute表时,出现以下错误信息。

    Access Denied - Not allowed to use storage api service on current endpoint
  • 问题原因:该错误表明当前用户未获得使用MaxCompute存储API服务的授权,或者所使用的Endpoint不支持存储API功能。

  • 处理方法:

    • 检查是否已开启开放存储功能。

      MaxCompute控制台选择租户管理>租户属性查看,如果未开启,请参见使用开放存储开启并完成配置。

    • 检查当前使用的Endpoint是否支持存储API功能。如果不支持,请切换至支持存储API的 Endpoint,详情请参见支持地域

相关文档

本文以SparkSQLNotebook开发类型为例,如果您想通过其他方式读写MaxCompute数据,可以参见批任务或流任务开发