EMR Serverless Spark内置了基于Spark DataSource V2的MaxCompute DataSource,只需在开发时添加对应的配置即可连接MaxCompute。本文为您介绍在EMR Serverless Spark中实现MaxCompute的读取与写入操作。
背景信息
大数据计算服务MaxCompute(原名ODPS)是一种快速、完全托管的EB级数据仓库解决方案,致力于批量结构化数据的存储和计算,提供海量数据仓库的解决方案及分析建模服务。MaxCompute的详情请参见什么是MaxCompute。
前提条件
已创建EMR Serverless Spark工作空间,详情请参见创建工作空间。
已创建MaxCompute项目,详情请参见创建MaxCompute项目。
操作流程
步骤一:创建会话连接MaxCompute
您可以创建SQL会话,或者Notebook会话来连接MaxCompute。关于会话更多介绍,请参见会话管理。
创建SQL会话连接MaxCompute
进入会话管理页面。
在左侧导航栏,选择
。在Spark页面,单击目标工作空间名称。
在EMR Serverless Spark页面,单击左侧导航栏中的会话管理。
在SQL会话页面,单击创建SQL会话。
在创建SQL会话页面,配置以下信息,单击创建。
参数
说明
名称
自定义SQL会话的名称。例如,mc_sql_compute。
Spark配置
填写Spark配置信息,以连接阿里云MaxCompute。
spark.sql.catalog.odps org.apache.spark.sql.execution.datasources.v2.odps.OdpsTableCatalog spark.sql.extensions org.apache.spark.sql.execution.datasources.v2.odps.extension.OdpsExtensions spark.sql.catalog.odps.enableNamespaceSchema true spark.sql.sources.partitionOverwriteMode dynamic spark.hadoop.odps.project.name <project_name> spark.hadoop.odps.end.point http://service.cn-hangzhou-vpc.maxcompute.aliyun-inc.com/api spark.hadoop.odps.access.id <accessId> spark.hadoop.odps.access.key <accessKey>
请根据您的实际情况替换以下信息:
<project_name>
:您的MaxCompute项目名称。http://service.cn-hangzhou-vpc.maxcompute.aliyun-inc.com/api
:您的MaxCompute的Endpoint信息,详情请参见Endpoint。<accessId>
:访问MaxCompute服务所使用阿里云账号的AccessKey ID。<accessKey>
:访问MaxCompute服务所使用阿里云账号的AccessKey Secret。
创建Notebook会话连接MaxCompute
进入Notebook会话页面。
在左侧导航栏,选择
。在Spark页面,单击目标工作空间名称。
在EMR Serverless Spark页面,选择左侧导航栏中的会话管理。
单击Notebook会话页签。
单击创建Notebook会话。
在创建Notebook会话页面,配置以下信息,单击创建。
参数
说明
名称
自定义Notebook会话的名称。例如,mc_notebook_compute。
Spark配置
填写Spark配置信息,以连接阿里云MaxCompute。
spark.sql.catalog.odps org.apache.spark.sql.execution.datasources.v2.odps.OdpsTableCatalog spark.sql.extensions org.apache.spark.sql.execution.datasources.v2.odps.extension.OdpsExtensions spark.sql.catalog.odps.enableNamespaceSchema true spark.sql.sources.partitionOverwriteMode dynamic spark.hadoop.odps.project.name <project_name> spark.hadoop.odps.end.point http://service.cn-hangzhou-vpc.maxcompute.aliyun-inc.com/api spark.hadoop.odps.access.id <accessId> spark.hadoop.odps.access.key <accessKey>
请根据您的实际情况替换以下信息:
<project_name>
:您的MaxCompute项目名称。http://service.cn-hangzhou-vpc.maxcompute.aliyun-inc.com/api
:您的MaxCompute的Endpoint信息,详情请参见Endpoint。<accessId>
:访问MaxCompute服务所使用阿里云账号的AccessKey ID。<accessKey>
:访问MaxCompute服务所使用阿里云账号的AccessKey Secret。
步骤二:查询或向MaxCompute中写数据
创建SparkSQL或Notebook向MaxCompute中写数据
创建SparkSQL向MaxCompute中写数据
在EMR Serverless Spark页面,单击左侧导航栏中的数据开发。
在开发目录页签下,单击新建。
新建SparkSQL。
在弹出的对话框中,输入名称(例如,mc_load_task),类型使用默认的SparkSQL,然后单击确定。
拷贝如下代码到新增的Spark SQL页签(mc_load_task)中。
CREATE TABLE odps.default.customer_total_return AS ( SELECT sr_customer_sk AS ctr_customer_sk, sr_store_sk AS ctr_store_sk, sum(sr_return_amt) AS ctr_total_return FROM odps.bigdata_public_dataset.tpcds_10g.store_returns, odps.bigdata_public_dataset.tpcds_10g.date_dim WHERE sr_returned_date_sk = d_date_sk AND d_year = 2000 GROUP BY sr_customer_sk, sr_store_sk ); SELECT * FROM odps.default.customer_total_return;
在数据库下拉列表中选择一个数据库,在Compute下拉列表中选择步骤一:创建会话连接MaxCompute中创建的SQL会话(mc_sql_compute)。
单击运行,执行创建的SparkSQL。
查询执行成功后,在运行结果中输出了查询结果。
在MaxCompute控制台查看创建的表。
登录MaxCompute控制台,在左上角选择地域。
在项目管理页面,单击已创建项目操作列的管理。
单击Tables页签。
即可在MaxCompute控制台看到创建了一个名为
customer_total_return
的新表。
创建Notebook向MaxCompute中写数据
在EMR Serverless Spark页面,单击左侧导航栏中的数据开发。
在开发目录页签下,单击新建。
新建Notebook。
在弹出的对话框中,输入名称(例如,mc_load_task),类型使用
,然后单击确定。在会话下拉列表中选择步骤一:创建会话连接MaxCompute中创建的并已启动的Notebook会话(mc_notebook_compute)。
拷贝如下代码到新增的Notebook的Python单元格中。
spark.sql(""" CREATE TABLE odps.default.customer_total_return AS ( SELECT sr_customer_sk AS ctr_customer_sk, sr_store_sk AS ctr_store_sk, sum(sr_return_amt) AS ctr_total_return FROM odps.bigdata_public_dataset.tpcds_10g.store_returns, odps.bigdata_public_dataset.tpcds_10g.date_dim WHERE sr_returned_date_sk = d_date_sk AND d_year = 2000 GROUP BY sr_customer_sk, sr_store_sk ); """) spark.sql("SELECT * FROM odps.default.customer_total_return LIMIT 10").show()
单击单元格前面的图标,或者单击运行所有单元格,执行创建的Notebook。
查询执行成功后,在运行结果中输出了查询结果。
在MaxCompute控制台查看创建的表。
登录MaxCompute控制台,在左上角选择地域。
在项目管理页面,单击已创建项目操作列的管理。
单击Tables页签。
即可在MaxCompute控制台看到创建了一个名为
customer_total_return
的新表。
创建SparkSQL或Notebook查询MaxCompute中数据
本文以查询MaxCompute TPC-DS公开数据集中的数据为例。
创建SparkSQL查询MaxCompute中数据
在EMR Serverless Spark页面,单击左侧导航栏中的数据开发。
在开发目录页签下,单击新建。
新建SparkSQL。
在弹出的对话框中,输入名称(例如,mc_read_sql_task),类型使用默认的SparkSQL,然后单击确定。
拷贝如下代码到新增的Spark SQL页签(mc_read_sql_task)中。
WITH customer_total_return AS ( SELECT sr_customer_sk AS ctr_customer_sk, sr_store_sk AS ctr_store_sk, sum(sr_return_amt) AS ctr_total_return FROM odps.bigdata_public_dataset.tpcds_10g.store_returns, odps.bigdata_public_dataset.tpcds_10g.date_dim WHERE sr_returned_date_sk = d_date_sk AND d_year = 2000 GROUP BY sr_customer_sk, sr_store_sk ) SELECT c_customer_id FROM customer_total_return ctr1, odps.bigdata_public_dataset.tpcds_10g.store, odps.bigdata_public_dataset.tpcds_10g.customer WHERE ctr1.ctr_total_return > ( SELECT avg(ctr_total_return) * 1.2 FROM customer_total_return ctr2 WHERE ctr1.ctr_store_sk = ctr2.ctr_store_sk ) AND s_store_sk = ctr1.ctr_store_sk AND s_state = 'TN' AND ctr1.ctr_customer_sk = c_customer_sk ORDER BY c_customer_id LIMIT 100;
在数据库下拉列表中选择一个数据库,在Compute下拉列表中选择步骤一:创建会话连接MaxCompute中创建的SQL会话(mc_sql_compute)。
单击运行,执行创建的SparkSQL。
查询执行成功后,在运行结果中输出了正确的查询结果。
创建Notebook查询MaxCompute中数据
在EMR Serverless Spark页面,单击左侧导航栏中的数据开发。
在开发目录页签下,单击新建。
新建Notebook。
在弹出的对话框中,输入名称(例如,mc_read_notebook_task),类型使用默认的SparkSQL,然后单击确定。
在会话下拉列表中选择步骤一:创建会话连接MaxCompute中创建的并已启动的Notebook会话(mc_notebook_compute)。
拷贝如下代码到新增的Spark SQL页签(mc_read_notebook_task)中。
spark.sql(""" WITH customer_total_return AS ( SELECT sr_customer_sk AS ctr_customer_sk, sr_store_sk AS ctr_store_sk, sum(sr_return_amt) AS ctr_total_return FROM odps.bigdata_public_dataset.tpcds_10g.store_returns, odps.bigdata_public_dataset.tpcds_10g.date_dim WHERE sr_returned_date_sk = d_date_sk AND d_year = 2000 GROUP BY sr_customer_sk, sr_store_sk ) SELECT c_customer_id FROM customer_total_return ctr1, odps.bigdata_public_dataset.tpcds_10g.store, odps.bigdata_public_dataset.tpcds_10g.customer WHERE ctr1.ctr_total_return > ( SELECT avg(ctr_total_return) * 1.2 FROM customer_total_return ctr2 WHERE ctr1.ctr_store_sk = ctr2.ctr_store_sk ) AND s_store_sk = ctr1.ctr_store_sk AND s_state = 'TN' AND ctr1.ctr_customer_sk = c_customer_sk ORDER BY c_customer_id LIMIT 100; """).show()
单击单元格前面的图标,或者单击运行所有单元格,执行创建的Notebook。
查询执行成功后,可以在单元格下方看到查询结果。
相关文档
本文以SparkSQL和Notebook开发类型为例,如果您想通过其他方式向MaxCompute中读写数据,可以参见Application开发。