在EMR Serverless Spark中实现MaxCompute读写操作

EMR Serverless Spark内置了基于Spark DataSource V2的MaxCompute DataSource,只需在开发时添加对应的配置即可连接MaxCompute。本文为您介绍在EMR Serverless Spark中实现MaxCompute的读取与写入操作。

背景信息

大数据计算服务MaxCompute(原名ODPS)是一种快速、完全托管的EB级数据仓库解决方案,致力于批量结构化数据的存储和计算,提供海量数据仓库的解决方案及分析建模服务。MaxCompute的详情请参见什么是MaxCompute

前提条件

操作流程

步骤一:创建会话连接MaxCompute

您可以创建SQL会话,或者Notebook会话来连接MaxCompute。关于会话更多介绍,请参见会话管理

创建SQL会话连接MaxCompute

  1. 进入会话管理页面。

    1. 登录E-MapReduce控制台

    2. 在左侧导航栏,选择EMR Serverless > Spark

    3. Spark页面,单击目标工作空间名称。

    4. EMR Serverless Spark页面,单击左侧导航栏中的会话管理

  2. SQL会话页面,单击创建SQL会话

  3. 在创建SQL会话页面,配置以下信息,单击创建

    参数

    说明

    名称

    自定义SQL会话的名称。例如,mc_sql_compute。

    Spark配置

    填写Spark配置信息,以连接阿里云MaxCompute。

    spark.sql.catalog.odps                        org.apache.spark.sql.execution.datasources.v2.odps.OdpsTableCatalog
    spark.sql.extensions                          org.apache.spark.sql.execution.datasources.v2.odps.extension.OdpsExtensions
    spark.sql.catalog.odps.enableNamespaceSchema  true
    spark.sql.sources.partitionOverwriteMode      dynamic
    spark.hadoop.odps.project.name                <project_name>
    spark.hadoop.odps.end.point                   http://service.cn-hangzhou-vpc.maxcompute.aliyun-inc.com/api
    spark.hadoop.odps.access.id                   <accessId>
    spark.hadoop.odps.access.key                  <accessKey>

    请根据您的实际情况替换以下信息:

    • <project_name>:您的MaxCompute项目名称。

    • http://service.cn-hangzhou-vpc.maxcompute.aliyun-inc.com/api:您的MaxCompute的Endpoint信息,详情请参见Endpoint

    • <accessId>:访问MaxCompute服务所使用阿里云账号的AccessKey ID。

    • <accessKey>:访问MaxCompute服务所使用阿里云账号的AccessKey Secret。

创建Notebook会话连接MaxCompute

  1. 进入Notebook会话页面。

    1. 登录E-MapReduce控制台

    2. 在左侧导航栏,选择EMR Serverless > Spark

    3. Spark页面,单击目标工作空间名称。

    4. EMR Serverless Spark页面,选择左侧导航栏中的会话管理

    5. 单击Notebook会话页签。

  2. 单击创建Notebook会话

  3. 在创建Notebook会话页面,配置以下信息,单击创建

    参数

    说明

    名称

    自定义Notebook会话的名称。例如,mc_notebook_compute。

    Spark配置

    填写Spark配置信息,以连接阿里云MaxCompute。

    spark.sql.catalog.odps                        org.apache.spark.sql.execution.datasources.v2.odps.OdpsTableCatalog
    spark.sql.extensions                          org.apache.spark.sql.execution.datasources.v2.odps.extension.OdpsExtensions
    spark.sql.catalog.odps.enableNamespaceSchema  true
    spark.sql.sources.partitionOverwriteMode      dynamic
    spark.hadoop.odps.project.name                <project_name>
    spark.hadoop.odps.end.point                   http://service.cn-hangzhou-vpc.maxcompute.aliyun-inc.com/api
    spark.hadoop.odps.access.id                   <accessId>
    spark.hadoop.odps.access.key                  <accessKey>

    请根据您的实际情况替换以下信息:

    • <project_name>:您的MaxCompute项目名称。

    • http://service.cn-hangzhou-vpc.maxcompute.aliyun-inc.com/api:您的MaxCompute的Endpoint信息,详情请参见Endpoint

    • <accessId>:访问MaxCompute服务所使用阿里云账号的AccessKey ID。

    • <accessKey>:访问MaxCompute服务所使用阿里云账号的AccessKey Secret。

步骤二:查询或向MaxCompute中写数据

创建SparkSQL或Notebook向MaxCompute中写数据

创建SparkSQL向MaxCompute中写数据

  1. 在EMR Serverless Spark页面,单击左侧导航栏中的数据开发

  2. 开发目录页签下,单击新建

  3. 新建SparkSQL。

    1. 在弹出的对话框中,输入名称(例如,mc_load_task),类型使用默认的SparkSQL,然后单击确定

    2. 拷贝如下代码到新增的Spark SQL页签(mc_load_task)中。

      CREATE TABLE odps.default.customer_total_return AS (
        SELECT
          sr_customer_sk AS ctr_customer_sk,
          sr_store_sk AS ctr_store_sk,
          sum(sr_return_amt) AS ctr_total_return
        FROM
          odps.bigdata_public_dataset.tpcds_10g.store_returns,
          odps.bigdata_public_dataset.tpcds_10g.date_dim
        WHERE
          sr_returned_date_sk = d_date_sk
          AND d_year = 2000
        GROUP BY
          sr_customer_sk,
          sr_store_sk
      );
      
      SELECT * FROM odps.default.customer_total_return;
    3. 在数据库下拉列表中选择一个数据库,在Compute下拉列表中选择步骤一:创建会话连接MaxCompute中创建的SQL会话(mc_sql_compute)。

    4. 单击运行,执行创建的SparkSQL。

      查询执行成功后,在运行结果中输出了查询结果。

      image

  4. 在MaxCompute控制台查看创建的表。

    1. 登录MaxCompute控制台,在左上角选择地域。

    2. 项目管理页面,单击已创建项目操作列的管理

    3. 单击Tables页签。

      即可在MaxCompute控制台看到创建了一个名为customer_total_return的新表。

      image

创建Notebook向MaxCompute中写数据

  1. 在EMR Serverless Spark页面,单击左侧导航栏中的数据开发

  2. 开发目录页签下,单击新建

  3. 新建Notebook。

    1. 在弹出的对话框中,输入名称(例如,mc_load_task),类型使用Python > Notebook,然后单击确定

    2. 在会话下拉列表中选择步骤一:创建会话连接MaxCompute中创建的并已启动的Notebook会话(mc_notebook_compute)。

    3. 拷贝如下代码到新增的Notebook的Python单元格中。

      spark.sql("""
      CREATE TABLE odps.default.customer_total_return AS (
        SELECT
          sr_customer_sk AS ctr_customer_sk,
          sr_store_sk AS ctr_store_sk,
          sum(sr_return_amt) AS ctr_total_return
        FROM
          odps.bigdata_public_dataset.tpcds_10g.store_returns,
          odps.bigdata_public_dataset.tpcds_10g.date_dim
        WHERE
          sr_returned_date_sk = d_date_sk
          AND d_year = 2000
        GROUP BY
          sr_customer_sk,
          sr_store_sk
      );
      """)
      
      spark.sql("SELECT * FROM odps.default.customer_total_return LIMIT 10").show()
      
    4. 单击单元格前面的image图标,或者单击运行所有单元格,执行创建的Notebook。

      查询执行成功后,在运行结果中输出了查询结果。

      image

  4. 在MaxCompute控制台查看创建的表。

    1. 登录MaxCompute控制台,在左上角选择地域。

    2. 项目管理页面,单击已创建项目操作列的管理

    3. 单击Tables页签。

      即可在MaxCompute控制台看到创建了一个名为customer_total_return的新表。

      image

创建SparkSQL或Notebook查询MaxCompute中数据

本文以查询MaxCompute TPC-DS公开数据集中的数据为例。

创建SparkSQL查询MaxCompute中数据

  1. 在EMR Serverless Spark页面,单击左侧导航栏中的数据开发

  2. 开发目录页签下,单击新建

  3. 新建SparkSQL。

    1. 在弹出的对话框中,输入名称(例如,mc_read_sql_task),类型使用默认的SparkSQL,然后单击确定

    2. 拷贝如下代码到新增的Spark SQL页签(mc_read_sql_task)中。

      WITH customer_total_return AS (
        SELECT
          sr_customer_sk AS ctr_customer_sk,
          sr_store_sk AS ctr_store_sk,
          sum(sr_return_amt) AS ctr_total_return
        FROM
          odps.bigdata_public_dataset.tpcds_10g.store_returns,
          odps.bigdata_public_dataset.tpcds_10g.date_dim
        WHERE
          sr_returned_date_sk = d_date_sk
          AND d_year = 2000
        GROUP BY
          sr_customer_sk,
          sr_store_sk
      )
      SELECT
        c_customer_id
      FROM
        customer_total_return ctr1,
        odps.bigdata_public_dataset.tpcds_10g.store,
        odps.bigdata_public_dataset.tpcds_10g.customer
      WHERE
        ctr1.ctr_total_return > (
          SELECT
            avg(ctr_total_return) * 1.2
          FROM
            customer_total_return ctr2
          WHERE
            ctr1.ctr_store_sk = ctr2.ctr_store_sk
        )
        AND s_store_sk = ctr1.ctr_store_sk
        AND s_state = 'TN'
        AND ctr1.ctr_customer_sk = c_customer_sk
      ORDER BY
        c_customer_id
      LIMIT
        100;
    3. 在数据库下拉列表中选择一个数据库,在Compute下拉列表中选择步骤一:创建会话连接MaxCompute中创建的SQL会话(mc_sql_compute)。

    4. 单击运行,执行创建的SparkSQL。

      查询执行成功后,在运行结果中输出了正确的查询结果。

      image

创建Notebook查询MaxCompute中数据

  1. 在EMR Serverless Spark页面,单击左侧导航栏中的数据开发

  2. 开发目录页签下,单击新建

  3. 新建Notebook。

    1. 在弹出的对话框中,输入名称(例如,mc_read_notebook_task),类型使用默认的SparkSQL,然后单击确定

    2. 在会话下拉列表中选择步骤一:创建会话连接MaxCompute中创建的并已启动的Notebook会话(mc_notebook_compute)。

    3. 拷贝如下代码到新增的Spark SQL页签(mc_read_notebook_task)中。

      spark.sql("""
      WITH customer_total_return AS (
          SELECT
          sr_customer_sk AS ctr_customer_sk,
          sr_store_sk AS ctr_store_sk,
          sum(sr_return_amt) AS ctr_total_return
      FROM
      odps.bigdata_public_dataset.tpcds_10g.store_returns,
      odps.bigdata_public_dataset.tpcds_10g.date_dim
      WHERE
      sr_returned_date_sk = d_date_sk
      AND d_year = 2000
      GROUP BY
      sr_customer_sk,
      sr_store_sk
      )
      SELECT
      c_customer_id
      FROM
      customer_total_return ctr1,
      odps.bigdata_public_dataset.tpcds_10g.store,
      odps.bigdata_public_dataset.tpcds_10g.customer
      WHERE
      ctr1.ctr_total_return > (
          SELECT
          avg(ctr_total_return) * 1.2
          FROM
      customer_total_return ctr2
      WHERE
      ctr1.ctr_store_sk = ctr2.ctr_store_sk
      )
      AND s_store_sk = ctr1.ctr_store_sk
      AND s_state = 'TN'
      AND ctr1.ctr_customer_sk = c_customer_sk
      ORDER BY
      c_customer_id
      LIMIT
      100;
      """).show()
    4. 单击单元格前面的image图标,或者单击运行所有单元格,执行创建的Notebook。

    5. 查询执行成功后,可以在单元格下方看到查询结果。

      image

相关文档

本文以SparkSQL和Notebook开发类型为例,如果您想通过其他方式向MaxCompute中读写数据,可以参见Application开发