通过外表导入MaxCompute数据

更新时间:

云原生数据仓库 AnalyticDB MySQL 版支持通过外表读取并导入MaxCompute数据。通过外表导入数据可以最大限度地利用集群资源,实现高性能数据导入。本文主要介绍如何通过外表将MaxCompute数据导入AnalyticDB for MySQL

功能介绍

AnalyticDB for MySQL产品系列

访问方式

AnalyticDB for MySQL内核版本

数据访问效率

企业版、基础版及湖仓版

Tunnel Record API方式

无限制

适合小规模数据访问,数据访问和导入速度慢。

Tunnel Arrow API方式

3.2.2.3及以上版本

使用列式读取数据,减少数据访问和导入时间,提供更快的数据传输速度。

数仓版

Tunnel Record API方式

无限制

使用公共数据传输服务资源组,该资源会被该地域所有项目共享使用,数据访问和导入速度慢。

前提条件

数据准备

本文示例中的MaxCompute项目为odps_project,示例表odps_nopart_import_test。示例如下:

CREATE TABLE IF NOT EXISTS odps_nopart_import_test (
    id int,
    name string,
    age int)
partitioned by (dt string);

odps_nopart_import_test表中添加分区,示例如下:

ALTER TABLE odps_nopart_import_test 
ADD 
PARTITION (dt='202207');

向分区中添加数据,示例如下:

INSERT INTO odps_project.odps_nopart_import_test 
PARTITION (dt='202207') 
VALUES (1,'james',10),(2,'bond',20),(3,'jack',30),(4,'lucy',40);

操作步骤

企业版、基础版及湖仓版

默认情况下,AnalyticDB for MySQL集群会使用Tunnel Record API方式访问并导入MaxCompute数据。 若您需要通过Tunnel Arrow API方式访问并导入MaxCompute数据,请先开启Arrow API功能。开启后,AnalyticDB for MySQL集群会使用Tunnel Arrow API方式进行导入。

Tunnel Record API方式

数据导入方式分为常规导入(默认)和弹性导入。常规导入在计算节点中读取源数据,然后在存储节点中构建索引,消耗计算资源和存储资源。弹性导入在Serverless Spark Job中读取源数据和构建索引,消耗Job型资源组的资源。仅内核版本3.1.10.0及以上且已创建Job型资源组的集群支持弹性导入数据。相较于常规导入,弹性导入可以大幅减少资源的消耗,降低导入过程中对在线读写业务的影响,提升资源隔离性和数据导入效率。更多内容,请参见数据导入方式介绍

常规导入

  1. 进入SQL编辑器。

    1. 登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,然后单击目标集群ID。

    2. 在左侧导航栏,单击作业开发 > SQL开发

  2. 创建外部数据库。示例如下:

    CREATE EXTERNAL DATABASE adb_external_db;
  3. 创建外表。本文示例为test_adb

    CREATE EXTERNAL TABLE IF NOT EXISTS adb_external_db.test_adb (
        id int,
        name varchar(1023),
        age int,
        dt string
        ) ENGINE='ODPS'
    TABLE_PROPERTIES='{
    "accessid":"yourAccessKeyID",
    "endpoint":"http://service.cn-hangzhou.maxcompute.aliyun.com/api",
    "accesskey":"yourAccessKeySecret",
    "partition_column":"dt",
    "project_name":"odps_project",
    "table_name":"odps_nopart_import_test"
    }';
    说明
    • AnalyticDB for MySQL外表和MaxCompute中表的字段名称、字段数量、字段顺序需要一致,字段类型需要兼容。

    • 外表的参数说明,请参见CREATE EXTERNAL TABLE

  4. 查询数据。

    SELECT * FROM adb_external_db.test_adb;

    返回结果如下:

    +------+-------+------+---------+
    | id   | name  | age  |   dt    |
    +------+-------+------+---------+
    |    1 | james |   10 |  202207 |
    |    2 | bond  |   20 |  202207 |
    |    3 | jack  |   30 |  202207 |
    |    4 | lucy  |   40 |  202207 |
    +------+-------+------+---------+
  5. 执行以下步骤将MaxCompute数据导入至AnalyticDB for MySQL

    1. AnalyticDB for MySQL中创建数据库,示例如下:

      CREATE DATABASE adb_demo; 
    2. AnalyticDB for MySQL中创建表用于存储从MaxCompute中导入的数据,示例如下:

      说明

      新表和步骤3中创建的外表的字段顺序和字段数量需要一致,字段类型兼容。

      CREATE TABLE IF NOT EXISTS adb_demo.adb_import_test(
          id int,
          name string,
          age int,
          dt string,
          PRIMARY KEY(id,dt)
      )
      DISTRIBUTED BY HASH(id)  
      PARTITION BY VALUE('dt'); 
    3. 向表中写入数据,示例如下:

      • 方式一:执行INSERT INTO导入数据,当主键重复时会自动忽略当前写入数据,不做更新,作用等同于INSERT IGNORE INTO,详情请参见INSERT INTO。示例如下:

        INSERT INTO adb_demo.adb_import_test
        SELECT * FROM adb_external_db.test_adb;

        如果需要将特定分区的数据导入adb_demo.adb_import_test,可以执行:

        INSERT INTO adb_demo.adb_import_test
        SELECT * FROM adb_external_db.test_adb 
        WHERE dt = '202207'; 
      • 方式二:执行INSERT OVERWRITE INTO导入数据,会覆盖表中原有的数据。示例如下:

        INSERT OVERWRITE INTO adb_demo.adb_import_test
        SELECT * FROM adb_external_db.test_adb;
      • 方式三:异步执行INSERT OVERWRITE INTO导入数据。通常使用SUBMIT JOB提交异步任务,由后台调度,可以在写入任务前增加Hint(/*+ direct_batch_load=true*/)加速写入任务。详情请参见异步写入。示例如下:

        SUBMIT job 
        INSERT OVERWRITE INTO adb_demo.adb_import_test
        SELECT * FROM adb_external_db.test_adb;

        返回结果如下:

        +---------------------------------------+
        | job_id                                |
        +---------------------------------------+
        | 2020112122202917203100908203303****** |
        +---------------------------------------+

        关于异步提交任务详情,请参见异步提交导入任务

弹性导入

  1. 进入SQL编辑器。

    1. 登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,然后单击目标集群ID。

    2. 在左侧导航栏,单击作业开发 > SQL开发

  2. 创建数据库。如果有已创建的数据库,可以忽略本步骤。示例如下:

    CREATE DATABASE adb_demo; 
  3. 创建外表。

    说明
    • AnalyticDB for MySQL外表的名称需要和MaxCompute项目的名称相同,否则创建外表会失败。

    • AnalyticDB for MySQL外表和MaxCompute中表的字段名称、字段数量、字段顺序需要一致,字段类型需要兼容。

    • 弹性导入仅支持CREATE TABLE语句创建外表。

    CREATE TABLE IF NOT EXISTS test_adb
    (
        id int,
        name string,
        age int,
        dt string
    )
     ENGINE='ODPS'
     TABLE_PROPERTIES='{
     "endpoint":"http://service.cn-hangzhou.maxcompute.aliyun-inc.com/api",
     "accessid":"yourAccessKeyID",
     "accesskey":"yourAccessKeySecret",
     "partition_column":"dt",
     "project_name":"odps_project",
     "table_name":"odps_nopart_import_test"
     }';                 

    外表支持设置的参数及参数说明,请参见参数说明

  4. 查询数据。

    SELECT * FROM adb_demo.test_adb;

    返回结果如下:

    +------+-------+------+---------+
    | id   | name  | age  |   dt    |
    +------+-------+------+---------+
    |    1 | james |   10 |  202207 |
    |    2 | bond  |   20 |  202207 |
    |    3 | jack  |   30 |  202207 |
    |    4 | lucy  |   40 |  202207 |
    +------+-------+------+---------+
  5. AnalyticDB for MySQL中创建表用于存储从MaxCompute中导入的数据。示例如下:

    说明

    创建的内表和步骤3中创建的外表的字段名称、字段数量、字段顺序、字段类型必须相同。

    CREATE TABLE IF NOT EXISTS adb_import_test
    (   id int,
        name string,
        age int,
        dt string,
        PRIMARY KEY(id,dt)
    )
    DISTRIBUTED BY HASH(id)
    PARTITION BY VALUE('dt') LIFECYCLE 30;  
  6. 导入数据。

    重要

    弹性导入仅支持通过INSERT OVERWRITE INTO语句导入数据。

    • 方法一:执行INSERT OVERWRITE INTO弹性导入数据,会覆盖表中原有的数据。示例如下:

      /*+ elastic_load=true, elastic_load_configs=[adb.load.resource.group.name=resource_group]*/
      INSERT OVERWRITE INTO adb_demo.adb_import_test SELECT * FROM adb_demo.test_adb;
    • 方法二:异步执行INSERT OVERWRITE INTO弹性导入数据。通常使用SUBMIT JOB提交异步任务,由后台调度。

      /*+ elastic_load=true, elastic_load_configs=[adb.load.resource.group.name=resource_group]*/
      SUBMIT JOB INSERT OVERWRITE INTO adb_demo.adb_import_test SELECT * FROM adb_demo.test_adb;
      重要

      异步提交弹性导入任务时,不支持设置优先级队列。

      返回结果如下:

      +---------------------------------------+
      | job_id                                |
      +---------------------------------------+
      | 2023081517192220291720310090151****** |
      +---------------------------------------+

    使用SUBMIT JOB提交异步任务后,返回结果仅表示异步任务提交成功。您可以通过job_id终止异步任务或查询异步任务状态,判断任务是否执行成功。具体操作,请参见异步提交导入任务

    Hint参数说明:

    • elastic_load:是否使用弹性导入方式。取值:truefalse(默认值)。

    • elastic_load_configs:弹性导入方式支持配置的参数。参数需使用方括号([ ])括起来,且多个参数之间以竖线(|)分隔,支持配置的参数如下表所示:

      参数

      是否必填

      说明

      adb.load.resource.group.name

      执行弹性导入任务的Job资源组名称。

      adb.load.job.max.acu

      单个弹性导入任务最多使用的资源。单位为ACU,最小值为5 ACU。默认值为集群Shard个数+1。

      执行如下语句可查询集群Shard个数:

      SELECT count(1) FROM information_schema.kepler_meta_shards;

      spark.driver.resourceSpec

      Spark driver的资源规格。默认值为small。取值范围,请参见Spark应用配置参数说明的型号列。

      spark.executor.resourceSpec

      Spark executor资源规格。默认值为large。取值范围,请参见Spark应用配置参数说明的型号列。

      spark.adb.executorDiskSize

      Spark executor的磁盘容量,取值范围为(0,100],单位为GiB,默认值为10 Gi。更多信息,请参见指定DriverExecutor资源

  7. (可选)查看已提交的导入任务是否为弹性导入任务。

    SELECT job_name, (job_type = 3) AS is_elastic_load FROM INFORMATION_SCHEMA.kepler_meta_async_jobs WHERE job_name = "2023081818010602101701907303151******";

    返回结果如下:

    +---------------------------------------+------------------+
    | job_name                              | is_elastic_load  |
    +---------------------------------------+------------------+
    | 2023081517195203101701907203151****** |       1          |
    +---------------------------------------+------------------+

    is_elastic_load的返回值为1,表示已提交的导入任务是弹性导入任务;若为0,则表示已提交的导入任务是常规导入任务。

Tunnel Arrow API方式

步骤一:开启Arrow API

开启方法

您可以通过SET命令或Hint在集群级别和查询级别开启Arrow API:

  • 集群级别开启Arrow API:

    SET ADB_CONFIG <config_name>= <value>;
  • 查询级别开启Arrow API:

    /*<config_name>= <value>*/ SELECT * FROM table;

Arrow API相关配置参数

参数(config_name)

说明

ODPS_TUNNEL_ARROW_ENABLED

是否开启Arrow API。取值:

  • true:是。

  • false(默认值):否。

ODPS_TUNNEL_SPLIT_BY_SIZE_ENABLED

是否开启动态Split切分。取值:

  • true:是。

  • false(默认值):否。

步骤二:访问并导入MaxCompute数据

  1. 进入SQL编辑器。

    1. 登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,然后单击目标集群ID。

    2. 在左侧导航栏,单击作业开发 > SQL开发

  2. 创建外部数据库。示例如下:

    CREATE EXTERNAL DATABASE adb_external_db;
  3. 创建外表。本文示例为test_adb

    CREATE EXTERNAL TABLE IF NOT EXISTS adb_external_db.test_adb (
        id int,
        name varchar(1023),
        age int,
        dt string
        ) ENGINE='ODPS'
    TABLE_PROPERTIES='{
    "accessid":"yourAccessKeyID",
    "endpoint":"http://service.cn-hangzhou.maxcompute.aliyun.com/api",
    "accesskey":"yourAccessKeySecret",
    "partition_column":"dt",
    "project_name":"odps_project",
    "table_name":"odps_nopart_import_test"
    }';
    说明
    • AnalyticDB for MySQL外表和MaxCompute中表的字段名称、字段数量、字段顺序需要一致,字段类型需要兼容。

    • 外表的参数说明,请参见CREATE EXTERNAL TABLE

  4. 查询数据。

    SELECT * FROM adb_external_db.test_adb;

    返回结果如下:

    +------+-------+------+---------+
    | id   | name  | age  |   dt    |
    +------+-------+------+---------+
    |    1 | james |   10 |  202207 |
    |    2 | bond  |   20 |  202207 |
    |    3 | jack  |   30 |  202207 |
    |    4 | lucy  |   40 |  202207 |
    +------+-------+------+---------+
  5. 执行以下步骤将MaxCompute数据导入至AnalyticDB for MySQL

    1. AnalyticDB for MySQL中创建数据库,示例如下:

      CREATE DATABASE adb_demo; 
    2. AnalyticDB for MySQL中创建表用于存储从MaxCompute中导入的数据,示例如下:

      说明

      新表和步骤3中创建的外表的字段顺序和字段数量需要一致,字段类型兼容。

      CREATE TABLE IF NOT EXISTS adb_demo.adb_import_test(
          id int,
          name string,
          age int,
          dt string
          PRIMARY KEY(id,dt)
      )
      DISTRIBUTED BY HASH(id)  
      PARTITION BY VALUE('dt'); 
    3. 向表中写入数据,示例如下:

      • 方式一:执行INSERT INTO导入数据,当主键重复时会自动忽略当前写入数据,不做更新,作用等同于INSERT IGNORE INTO,详情请参见INSERT INTO。示例如下:

        INSERT INTO adb_demo.adb_import_test
        SELECT * FROM adb_external_db.test_adb;

        如果需要将特定分区的数据导入adb_demo.adb_import_test,可以执行:

        INSERT INTO adb_demo.adb_import_test
        SELECT * FROM adb_external_db.test_adb 
        WHERE dt = '202207'; 
      • 方式二:执行INSERT OVERWRITE INTO导入数据,会覆盖表中原有的数据。示例如下:

        INSERT OVERWRITE INTO adb_demo.adb_import_test
        SELECT * FROM adb_external_db.test_adb;
      • 方式三:异步执行INSERT OVERWRITE INTO导入数据。通常使用SUBMIT JOB提交异步任务,由后台调度,可以在写入任务前增加Hint(/*+ direct_batch_load=true*/)加速写入任务。详情请参见异步写入。示例如下:

        SUBMIT job 
        INSERT OVERWRITE INTO adb_demo.adb_import_test
        SELECT * FROM adb_external_db.test_adb;

        返回结果如下:

        +---------------------------------------+
        | job_id                                |
        +---------------------------------------+
        | 2020112122202917203100908203303****** |
        +---------------------------------------+

        关于异步提交任务详情,请参见异步提交导入任务

数仓版

  1. 连接目标AnalyticDB for MySQL集群。详细操作步骤,请参见连接集群

  2. 创建目标数据库。

    CREATE database test_adb;
  3. 创建MaxCompute外表。本文以odps_nopart_import_test_external_table为例。

    CREATE TABLE IF NOT EXISTS odps_nopart_import_test_external_table
    (
        id int,
        name string,
        age int,
        dt string
    )
     ENGINE='ODPS'
     TABLE_PROPERTIES='{
     "endpoint":"http://service.cn.maxcompute.aliyun-inc.com/api",
     "accessid":"yourAccessKeyID",
     "accesskey":"yourAccessKeySecret",
     "partition_column":"dt",
     "project_name":"odps_project1",
     "table_name":"odps_nopart_import_test"
     }';                 

    参数

    说明

    ENGINE=’ODPS’

    外表的存储引擎。读写MaxCompute数据时,取值为ODPS。

    endpoint

    MaxComputeEndPoint(域名节点)

    说明

    目前仅支持AnalyticDB for MySQL通过MaxComputeVPC网络Endpoint访问MaxCompute。

    查询各地域VPC网络的Endpoint,请参见VPC Endpoint

    accessid

    阿里云账号或者具备MaxCompute访问权限的RAM用户的AccessKey ID。

    如何获取AccessKey IDAccessKey Secret,请参见账号与权限

    accesskey

    阿里云账号或者具备MaxCompute访问权限的RAM用户的AccessKey Secret。

    如何获取AccessKey IDAccessKey Secret,请参见账号与权限

    partition_column

    本文使用的示例是创建分区表的示例,所以需要配置partition_column。如果MaxCompute的表是非分区表,那么AnalyticDB for MySQL中也需要创建非分区表,此时无需配置partition_column

    project_name

    MaxCompute中的工作空间名称。

    table_name

    MaxCompute中的数据源表名。

  4. test_adb数据库中创建表adb_nopart_import_test,用于存储从MaxCompute中导入的数据。

    CREATE TABLE IF NOT EXISTS adb_nopart_import_test
    (   id int,
        name string,
        age int,
        dt string,
        PRIMARY KEY(id,dt)
    )
    DISTRIBUTED BY HASH(id)
    PARTITION BY VALUE('dt') LIFECYCLE 30;
  5. 导入数据。

    • 方式一:执行INSERT INTO导入数据,当主键重复时会自动忽略当前写入数据,不做更新,作用等同于INSERT IGNORE INTO,详情请参见INSERT INTO。示例如下:

      INSERT INTO adb_nopart_import_test
      SELECT * FROM odps_nopart_import_test_external_table; 

      通过SELECT查询写入表中的数据,示例如下:

      SELECT * FROM adb_nopart_import_test;

      返回结果如下:

      +------+-------+------+---------+
      | id   | name  | age  |   dt    |
      +------+-------+------+---------+
      |    1 | james |   10 |  202207 |
      |    2 | bond  |   20 |  202207 |
      |    3 | jack  |   30 |  202207 |
      |    4 | lucy  |   40 |  202207 |
      +------+-------+------+---------+

      如果需要将特定分区的数据导入adb_nopart_import_test,可以执行:

      INSERT INTO adb_nopart_import_test
      SELECT * FROM odps_nopart_import_test_external_table
      WHERE dt = '202207';
    • 方式二:执行INSERT OVERWRITE导入数据,会覆盖表中原有的数据。示例如下:

      INSERT OVERWRITE adb_nopart_import_test
      SELECT * FROM odps_nopart_import_test_external_table;
    • 方式三:异步执行INSERT OVERWRITE导入数据。通常使用SUBMIT JOB提交异步任务,由后台调度,可以在写入任务前增加Hint加速写入任务。详情请参见异步写入。示例如下:

      SUBMIT JOB 
      INSERT OVERWRITE adb_nopart_import_test 
      SELECT * FROM odps_nopart_import_test_external_table;  

      返回结果如下:

      +---------------------------------------+
      | job_id                                |
      +---------------------------------------+
      | 2020112122202917203100908203303****** |

      关于异步提交任务详情请参见异步提交导入任务