访问MaxCompute数据源

本文介绍如何使用云原生数据仓库 AnalyticDB MySQL 版Spark SQL读写MaxCompute数据。

背景信息

大数据计算服务MaxCompute是一种快速、完全托管的TB/PB级数据仓库解决方案。您可以通过AnalyticDB for MySQLSpark SQL读写MaxCompute数据。

前提条件

  • AnalyticDB for MySQL集群的产品系列为企业版、基础版或湖仓版

  • 已创建数据库账号。

  • 已在AnalyticDB for MySQL集群中创建Job型资源组。具体操作,请参见新建资源组

  • 已创建与AnalyticDB for MySQL集群同地域的MaxCompute项目空间。具体操作,请参见创建MaxCompute项目。本文示例为杭州地域创建的项目空间spark_on_maxcompute

操作步骤

您可以选择批处理或者交互式执行任意一种方式读写MaxCompute数据。两种方式的区别,请参见Spark SQL执行方式

批处理方式读写MaxCompute数据

步骤一:提交Spark配置

  1. 登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,在企业版湖仓版页签下,单击目标集群ID。

  2. 在左侧导航栏,单击作业开发 > SQL开发

  3. SQLConsole窗口,选择Spark引擎和Job型资源组。

  4. SQLConsole窗口,按照SET Key=Value;的形式输入以下配置。

    SET spark.sql.catalog.odps = org.apache.spark.sql.execution.datasources.v2.odps.OdpsTableCatalog;
    SET spark.hadoop.odps.access.id = LTAI5tMnzDS5EyqqMsT****;
    SET spark.hadoop.odps.access.key = A2kHFzEgFidOKrDKqAbJIPt8****;
    SET spark.hadoop.odps.endpoint = http://service.cn-hangzhou.maxcompute.aliyun-inc.com/api;
    SET spark.hadoop.odps.project = spark_on_maxcompute;
    SET spark.adb.connectors = odps;

    参数名称

    取值说明

    spark.sql.catalog.<catalog_name>

    Spark SQL支持的配置数据源的方式。

    参数取值固定为org.apache.spark.sql.execution.datasources.v2.odps.OdpsTableCatalog

    说明

    参数名称中的catalog_name可自定义,本文示例为odps

    spark.hadoop.odps.access.id

    阿里云账号或者具备MaxCompute访问权限的RAM用户的AccessKey ID。

    如何获取AccessKey ID,请参见获取AccessKey信息

    spark.hadoop.odps.access.key

    阿里云账号或者具备MaxCompute访问权限的RAM用户的AccessKey Secret。

    如何获取AccessKey Secret,请参见获取AccessKey信息

    spark.hadoop.odps.endpoint

    MaxCompute所在地域对应VPC网络的Endpoint。

    查询各地域VPC网络的Endpoint,请参见各地域Endpoint对照表(阿里云VPC网络连接方式)

    spark.hadoop.odps.project

    MaxCompute的项目空间名称。

    spark.adb.connectors

    Spark作业连接的数据源,固定为odps

  5. 在弹窗中,选择继续执行

步骤二:使用Spark SQL读写MaxCompute数据

  1. 输入以下语句,单击执行SQL(F8),并在弹窗中,单击继续执行。创建MaxCompute表。

    CREATE TABLE odps.spark_on_maxcompute.spark_test(k1 int, k2 string) 
    partitioned by (part int);
    重要

    本示例中表名odps.spark_on_maxcompute.spark_test以三层Catalog形式命名。

    • 第一层为Catalog_name,需要与参数spark.sql.catalog.<catalog_name>中的Catalog_name一致,本文示例为odps

    • 第二层为MaxCompute项目空间名称。

    • 第三层为MaxCompute表名称。

    Spark SQL也支持使用两层Catalog形式命名<maxcompute_project_name>.<table_name>,需要在执行建表语句前,先执行USE <catalog_name>;。您还可以继续执行USE <maxcompute_project_name>;,建表时只写表名。

  2. 输入以下语句,单击执行SQL(F8),并在弹窗中,单击继续执行。向表中写入数据。

    INSERT INTO odps.spark_on_maxcompute.spark_test 
    values (1, 'aaa', 101), (2, 'bbb', 102);
  3. 输入以下语句,单击执行SQL(F8),并在弹窗中,单击继续执行。查询MaxCompute表数据。

    SELECT * FROM odps.spark_on_maxcompute.spark_test;
    说明

    您可以在DataWorks控制台查询对应的MaxCompute项目空间,确认在项目空间中表是否已经创建并且写入数据。

交互式执行方式读写MaxCompute数据

步骤一:提交Spark配置

  1. 登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,在企业版湖仓版页签下,单击目标集群ID。

  2. 在左侧导航栏,单击集群管理 > 资源组管理

  3. 在资源组列表中,单击目标Job型资源组操作列的高级配置

  4. 单击其他配置后的1111,填写以下参数。

    参数名称

    取值说明

    spark.sql.catalog.<catalog_name>

    Spark SQL支持的配置数据源的方式。

    参数取值固定为org.apache.spark.sql.execution.datasources.v2.odps.OdpsTableCatalog

    说明

    参数名称中的catalog_name可自定义,本文示例为odps

    spark.hadoop.odps.access.id

    阿里云账号或者具备MaxCompute访问权限的RAM用户的AccessKey ID。

    如何获取AccessKey ID,请参见获取AccessKey信息

    spark.hadoop.odps.access.key

    阿里云账号或者具备MaxCompute访问权限的RAM用户的AccessKey Secret。

    如何获取AccessKey Secret,请参见获取AccessKey信息

    spark.hadoop.odps.endpoint

    MaxCompute所在地域对应VPC网络的Endpoint。

    查询各地域VPC网络的Endpoint,请参见各地域Endpoint对照表(阿里云VPC网络连接方式)

    spark.hadoop.odps.project

    MaxCompute的项目空间名称。

    spark.adb.connectors

    Spark作业连接的数据源,固定为odps

  5. 单击启动。当高级配置(Spark ThriftServer配置)页面显示运行中,则配置生效。

步骤二:使用Spark SQL读写MaxCompute数据

  1. 在左侧导航栏,单击作业开发 > SQL开发

  2. SQLConsole窗口,选择Spark引擎和Job型资源组。

  3. 输入以下语句,单击执行SQL(F8),创建MaxCompute表。

    CREATE TABLE odps.spark_on_maxcompute.spark_test(k1 int, k2 string) 
    partitioned by (part int);
    重要

    本示例中表名odps.spark_on_maxcompute.spark_test以三层Catalog形式命名。

    • 第一层为Catalog_name,需要与参数spark.sql.catalog.<catalog_name>中的Catalog_name一致,本文示例为odps

    • 第二层为MaxCompute项目空间名称。

    • 第三层为MaxCompute表名称。

    Spark SQL也支持使用两层Catalog形式命名<maxcompute_project_name>.<table_name>,需要在执行建表语句前,先执行USE <catalog_name>;。您还可以继续执行USE <maxcompute_project_name>;,建表时只写表名。

  4. 输入以下语句,单击执行SQL(F8),向表中写入数据。

    INSERT INTO odps.spark_on_maxcompute.spark_test 
    values (1, 'aaa', 101), (2, 'bbb', 102);
  5. 输入以下语句,单击执行SQL(F8),查询MaxCompute表数据。

    SELECT * FROM odps.spark_on_maxcompute.spark_test;
    说明

    您可以在DataWorks控制台查询对应的MaxCompute项目空间,确认在项目空间中表是否已经创建并且写入数据。

支持的操作和数据类型

支持的库表操作

  • SHOW DATABASES

  • AHOW TABLES

  • CREATE TABLE

  • DROP TABLE

  • SELECT

  • INSERT INTO

  • INSERT INTO PARTITION

重要

暂不支持INSERT OVERWRITE PARTITION。

支持的字段类型

  • BOOLEAN

  • DOUBLE

  • INT

  • BIGINT

  • STRING

  • DECIMAL

  • DATA

  • TIMESTAMP

  • FLOAT

  • SMALLINT

  • TINYINT

  • BINARY

  • ARRAY

  • MAP

  • STRUCT

支持的分区字段类型

  • INT

  • BIGINT

  • STRING

  • TINYINT

  • SMALLINT