EMR Serverless Spark内置了基于Spark DataSource V2的MaxCompute DataSource,只需在开发时添加对应的配置即可连接MaxCompute。本文为您介绍在EMR Serverless Spark中实现MaxCompute的读取与写入操作。
背景信息
大数据计算服务MaxCompute(原名ODPS)是一种快速、完全托管的EB级数据仓库解决方案,致力于批量结构化数据的存储和计算,提供海量数据仓库的解决方案及分析建模服务。MaxCompute的详情请参见什么是MaxCompute。
前提条件
已在EMR Serverless Spark中创建工作空间。
已在MaxCompute中创建MaxCompute项目并使用开放存储。
本文示例使用的是开放存储(按量付费)。
使用限制
注意事项
使用开放存储(按量付费),超过1 TB的部分按照实际读写数据的逻辑大小进行计费,详情请参见开放存储(按量计费)。
操作流程
步骤一:创建会话以连接MaxCompute
您可以创建SQL会话,或者Notebook会话来连接MaxCompute。关于会话更多介绍,请参见会话管理。
创建SQL会话以连接MaxCompute
进入会话管理页面。
在左侧导航栏,选择
。在Spark页面,单击目标工作空间名称。
在EMR Serverless Spark页面,单击左侧导航栏中的会话管理。
在SQL会话页面,单击创建SQL会话。
在创建SQL会话页面,配置以下信息,单击创建。
参数
说明
名称
自定义SQL会话的名称。例如,mc_sql_compute。
Spark配置
填写Spark配置信息,以连接阿里云MaxCompute。
重要如果需要访问开启了三层模型的MaxCompute项目,还需在Spark配置信息中配置
spark.sql.catalog.odps.enableNamespaceSchema
参数为true
。更多参数信息,请参见Spark Connector。关于Schema详情,请参见Schema操作。spark.sql.catalog.odps org.apache.spark.sql.execution.datasources.v2.odps.OdpsTableCatalog spark.sql.extensions org.apache.spark.sql.execution.datasources.v2.odps.extension.OdpsExtensions spark.sql.sources.partitionOverwriteMode dynamic spark.hadoop.odps.tunnel.quota.name pay-as-you-go spark.hadoop.odps.project.name <project_name> spark.hadoop.odps.end.point https://service.cn-hangzhou-vpc.maxcompute.aliyun-inc.com/api spark.hadoop.odps.access.id <accessId> spark.hadoop.odps.access.key <accessKey>
请根据您的实际情况替换以下信息:
<project_name>
:您的MaxCompute项目名称。https://service.cn-hangzhou-vpc.maxcompute.aliyun-inc.com/api
:您的MaxCompute的Endpoint信息,详情请参见Endpoint。<accessId>
:访问MaxCompute服务所使用阿里云账号的AccessKey ID。<accessKey>
:访问MaxCompute服务所使用阿里云账号的AccessKey Secret。
创建Notebook会话以连接MaxCompute
进入Notebook会话页面。
在左侧导航栏,选择
。在Spark页面,单击目标工作空间名称。
在EMR Serverless Spark页面,选择左侧导航栏中的会话管理。
单击Notebook会话页签。
单击创建Notebook会话。
在创建Notebook会话页面,配置以下信息,单击创建。
参数
说明
名称
自定义Notebook会话的名称。例如,mc_notebook_compute。
Spark配置
填写Spark配置信息,以连接阿里云MaxCompute。
重要如果需要访问开启了Schema功能的MaxCompute项目,还需在Spark配置信息中配置
spark.sql.catalog.odps.enableNamespaceSchema
参数为true
。更多参数信息,请参见Spark Connector。关于Schema详情,请参见Schema操作。spark.sql.catalog.odps org.apache.spark.sql.execution.datasources.v2.odps.OdpsTableCatalog spark.sql.extensions org.apache.spark.sql.execution.datasources.v2.odps.extension.OdpsExtensions spark.sql.sources.partitionOverwriteMode dynamic spark.hadoop.odps.tunnel.quota.name pay-as-you-go spark.hadoop.odps.project.name <project_name> spark.hadoop.odps.end.point https://service.cn-hangzhou-vpc.maxcompute.aliyun-inc.com/api spark.hadoop.odps.access.id <accessId> spark.hadoop.odps.access.key <accessKey>
请根据您的实际情况替换以下信息:
<project_name>
:您的MaxCompute项目名称。https://service.cn-hangzhou-vpc.maxcompute.aliyun-inc.com/api
:您的MaxCompute的Endpoint信息,详情请参见Endpoint。<accessId>
:访问MaxCompute服务所使用阿里云账号的AccessKey ID。<accessKey>
:访问MaxCompute服务所使用阿里云账号的AccessKey Secret。
步骤二:在MaxCompute中查询或写入数据
使用SparkSQL写入并查询MaxCompute中的数据
在EMR Serverless Spark页面,单击左侧导航栏中的数据开发。
在开发目录页签下,单击新建。
新建SparkSQL。
在弹出的对话框中,输入名称(例如,mc_load_task),类型使用默认的SparkSQL,然后单击确定。
复制如下代码到新增的Spark SQL页签(mc_load_task)中。
CREATE TABLE odps.default.mc_table (name STRING, num BIGINT); INSERT INTO odps.default.mc_table (name, num) VALUES ('Alice', 100),('Bob', 200); SELECT * FROM odps.default.mc_table;
在数据库下拉列表中选择一个数据库,在Compute下拉列表中选择步骤一:创建会话以连接MaxCompute中创建的SQL会话(mc_sql_compute)。
单击运行,执行创建的SparkSQL。
查询执行成功后,在运行结果中输出了查询结果。
在MaxCompute控制台查看创建的表。
登录MaxCompute控制台,在左上角选择地域。
在项目管理页面,单击已创建项目操作列的管理。
单击Tables页签。
即可在MaxCompute控制台看到创建了一个名为
mc_table
的新表。
使用Notebook写入并查询MaxCompute中的数据
在EMR Serverless Spark页面,单击左侧导航栏中的数据开发。
在开发目录页签下,单击新建。
新建Notebook。
在弹出的对话框中,输入名称(例如,mc_load_task),类型使用
,然后单击确定。在会话下拉列表中选择步骤一:创建会话以连接MaxCompute中创建的并已启动的Notebook会话(mc_notebook_compute)。
编写代码并执行。
在Python单元格中,输入以下命令创建表。
spark.sql(""" CREATE TABLE odps.default.mc_table (name STRING, num BIGINT); """)
在一个新的Python单元格中,输入以下命令插入数据。
spark.sql("INSERT INTO odps.default.mc_table (name, num) VALUES ('Alice', 100),('Bob', 200);")
在一个新的Python单元格中,输入以下命令查询数据。
spark.sql("SELECT * FROM odps.default.mc_table;").show()
查询执行成功后,在运行结果中输出了查询结果。
在MaxCompute控制台查看创建的表。
登录MaxCompute控制台,在左上角选择地域。
在项目管理页面,单击已创建项目操作列的管理。
单击Tables页签。
即可在MaxCompute控制台看到创建了一个名为
mc_table
的新表。
常见问题
相关文档
本文以SparkSQL和Notebook开发类型为例,如果您想通过其他方式读写MaxCompute数据,可以参见批任务或流任务开发。