本文介绍如何使用云原生数据仓库 AnalyticDB MySQL 版Spark SQL读写MaxCompute数据。
背景信息
大数据计算服务MaxCompute是一种快速、完全托管的TB/PB级数据仓库解决方案。您可以通过AnalyticDB for MySQLSpark SQL读写MaxCompute数据。
前提条件
AnalyticDB for MySQL集群的产品系列为企业版、基础版或湖仓版。
已创建数据库账号。
如果是通过阿里云账号访问,只需创建高权限账号。具体操作,请参见创建高权限账号。
如果是通过RAM用户访问,需要创建高权限账号和普通账号并且将RAM用户绑定到普通账号上。具体操作,请参见创建数据库账号和绑定或解绑RAM用户与数据库账号。
已在AnalyticDB for MySQL集群中创建Job型资源组。具体操作,请参见新建资源组。
已创建与AnalyticDB for MySQL集群同地域的MaxCompute项目空间。具体操作,请参见创建MaxCompute项目。本文示例为杭州地域创建的项目空间
spark_on_maxcompute
。
操作步骤
您可以选择批处理或者交互式执行任意一种方式读写MaxCompute数据。两种方式的区别,请参见Spark SQL执行方式。
批处理方式读写MaxCompute数据
步骤一:提交Spark配置
登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,在企业版或湖仓版页签下,单击目标集群ID。
在左侧导航栏,单击作业开发 > SQL开发。
在SQLConsole窗口,选择Spark引擎和Job型资源组。
在SQLConsole窗口,按照
SET Key=Value;
的形式输入以下配置。SET spark.sql.catalog.odps = org.apache.spark.sql.execution.datasources.v2.odps.OdpsTableCatalog; SET spark.hadoop.odps.access.id = LTAI5tMnzDS5EyqqMsT****; SET spark.hadoop.odps.access.key = A2kHFzEgFidOKrDKqAbJIPt8****; SET spark.hadoop.odps.endpoint = http://service.cn-hangzhou.maxcompute.aliyun-inc.com/api; SET spark.hadoop.odps.project = spark_on_maxcompute; SET spark.adb.connectors = odps;
参数名称
取值说明
spark.sql.catalog.<catalog_name>
Spark SQL支持的配置数据源的方式。
参数取值固定为
org.apache.spark.sql.execution.datasources.v2.odps.OdpsTableCatalog
。说明参数名称中的
catalog_name
可自定义,本文示例为odps
。spark.hadoop.odps.access.id
阿里云账号或者具备MaxCompute访问权限的RAM用户的AccessKey ID。
如何获取AccessKey ID,请参见获取AccessKey信息。
spark.hadoop.odps.access.key
阿里云账号或者具备MaxCompute访问权限的RAM用户的AccessKey Secret。
如何获取AccessKey Secret,请参见获取AccessKey信息。
spark.hadoop.odps.endpoint
MaxCompute所在地域对应VPC网络的Endpoint。
查询各地域VPC网络的Endpoint,请参见各地域Endpoint对照表(阿里云VPC网络连接方式)。
spark.hadoop.odps.project
MaxCompute的项目空间名称。
spark.adb.connectors
Spark作业连接的数据源,固定为
odps
。在弹窗中,选择继续执行。
步骤二:使用Spark SQL读写MaxCompute数据
输入以下语句,单击执行SQL(F8),并在弹窗中,单击继续执行。创建MaxCompute表。
CREATE TABLE odps.spark_on_maxcompute.spark_test(k1 int, k2 string) partitioned by (part int);
重要本示例中表名
odps.spark_on_maxcompute.spark_test
以三层Catalog形式命名。第一层为Catalog_name,需要与参数
spark.sql.catalog.<catalog_name>
中的Catalog_name一致,本文示例为odps
。第二层为MaxCompute项目空间名称。
第三层为MaxCompute表名称。
Spark SQL也支持使用两层Catalog形式命名
<maxcompute_project_name>.<table_name>
,需要在执行建表语句前,先执行USE <catalog_name>;
。您还可以继续执行USE <maxcompute_project_name>;
,建表时只写表名。输入以下语句,单击执行SQL(F8),并在弹窗中,单击继续执行。向表中写入数据。
INSERT INTO odps.spark_on_maxcompute.spark_test values (1, 'aaa', 101), (2, 'bbb', 102);
输入以下语句,单击执行SQL(F8),并在弹窗中,单击继续执行。查询MaxCompute表数据。
SELECT * FROM odps.spark_on_maxcompute.spark_test;
说明您可以在DataWorks控制台查询对应的MaxCompute项目空间,确认在项目空间中表是否已经创建并且写入数据。
交互式执行方式读写MaxCompute数据
步骤一:提交Spark配置
登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,在企业版或湖仓版页签下,单击目标集群ID。
在左侧导航栏,单击集群管理 > 资源组管理。
在资源组列表中,单击目标Job型资源组操作列的高级配置。
单击其他配置后的,填写以下参数。
参数名称
取值说明
spark.sql.catalog.<catalog_name>
Spark SQL支持的配置数据源的方式。
参数取值固定为
org.apache.spark.sql.execution.datasources.v2.odps.OdpsTableCatalog
。说明参数名称中的
catalog_name
可自定义,本文示例为odps
。spark.hadoop.odps.access.id
阿里云账号或者具备MaxCompute访问权限的RAM用户的AccessKey ID。
如何获取AccessKey ID,请参见获取AccessKey信息。
spark.hadoop.odps.access.key
阿里云账号或者具备MaxCompute访问权限的RAM用户的AccessKey Secret。
如何获取AccessKey Secret,请参见获取AccessKey信息。
spark.hadoop.odps.endpoint
MaxCompute所在地域对应VPC网络的Endpoint。
查询各地域VPC网络的Endpoint,请参见各地域Endpoint对照表(阿里云VPC网络连接方式)。
spark.hadoop.odps.project
MaxCompute的项目空间名称。
spark.adb.connectors
Spark作业连接的数据源,固定为
odps
。单击启动。当高级配置(Spark ThriftServer配置)页面显示运行中,则配置生效。
步骤二:使用Spark SQL读写MaxCompute数据
在左侧导航栏,单击作业开发 > SQL开发。
在SQLConsole窗口,选择Spark引擎和Job型资源组。
输入以下语句,单击执行SQL(F8),创建MaxCompute表。
CREATE TABLE odps.spark_on_maxcompute.spark_test(k1 int, k2 string) partitioned by (part int);
重要本示例中表名
odps.spark_on_maxcompute.spark_test
以三层Catalog形式命名。第一层为Catalog_name,需要与参数
spark.sql.catalog.<catalog_name>
中的Catalog_name一致,本文示例为odps
。第二层为MaxCompute项目空间名称。
第三层为MaxCompute表名称。
Spark SQL也支持使用两层Catalog形式命名
<maxcompute_project_name>.<table_name>
,需要在执行建表语句前,先执行USE <catalog_name>;
。您还可以继续执行USE <maxcompute_project_name>;
,建表时只写表名。输入以下语句,单击执行SQL(F8),向表中写入数据。
INSERT INTO odps.spark_on_maxcompute.spark_test values (1, 'aaa', 101), (2, 'bbb', 102);
输入以下语句,单击执行SQL(F8),查询MaxCompute表数据。
SELECT * FROM odps.spark_on_maxcompute.spark_test;
说明您可以在DataWorks控制台查询对应的MaxCompute项目空间,确认在项目空间中表是否已经创建并且写入数据。
支持的操作和数据类型
支持的库表操作
SHOW DATABASES
AHOW TABLES
CREATE TABLE
DROP TABLE
SELECT
INSERT INTO
INSERT INTO PARTITION
暂不支持INSERT OVERWRITE PARTITION。
支持的字段类型
BOOLEAN
DOUBLE
INT
BIGINT
STRING
DECIMAL
DATA
TIMESTAMP
FLOAT
SMALLINT
TINYINT
BINARY
ARRAY
MAP
STRUCT
支持的分区字段类型
INT
BIGINT
STRING
TINYINT
SMALLINT