通过外表导入至湖仓版

云原生数据仓库 AnalyticDB MySQL 版在通过外表访问并导入MaxCompute数据时,默认使用Tunnel Record API方式。您也可以进一步选择Tunnel Arrow API方式,相较于Tunnel Record API方式,Tunnel Arrow API方式可以列式读取MaxCompute的数据,从而提高数据访问和处理的效率。

前提条件

  • AnalyticDB for MySQL集群的产品系列为企业版、基础版或湖仓版

  • MaxCompute项目与AnalyticDB for MySQL集群位于同一地域。

  • AnalyticDB for MySQL集群已开启ENI访问。

    说明

    登录云原生数据仓库AnalyticDB MySQL控制台,在集群管理 > 集群信息网络信息区域,打开ENI网络开关。

  • 已添加AnalyticDB for MySQL的VPC网段到MaxCompute项目的白名单中。

    说明

    登录云原生数据仓库AnalyticDB MySQL控制台,在集群信息页面查询VPC ID。然后登录专有网络控制台,在专有网络页面根据VPC ID查询网段。设置MaxCompute白名单的操作,请参见管理IP白名单

  • 使用Tunnel Arrow API方式访问并导入MaxCompute数据时,AnalyticDB for MySQL集群需为3.2.2.1及以上版本。

    说明

    查看企业版湖仓版集群的内核版本,请执行SELECT adb_version();。如需升级内核版本,请联系技术支持。

示例数据

本文示例中的MaxCompute项目为test_adb,示例表person。示例如下:

CREATE TABLE IF NOT EXISTS person (
    id INT,
    name VARCHAR(1023),
    age INT)
partitioned BY (dt string);

person表中添加分区,示例如下:

ALTER TABLE person 
ADD 
PARTITION (dt='202207');

向分区中添加数据,示例如下:

INSERT INTO test_adb.person 
PARTITION (dt='202207') 
VALUES (1,'james',10),(2,'bond',20),(3,'jack',30),(4,'lucy',40);

开启Arrow API(可选)

默认情况下,AnalyticDB for MySQL集群会使用Tunnel Record API方式访问并导入MaxCompute数据。 若您需要通过Tunnel Arrow API方式访问并导入MaxCompute数据,请先开启Arrow API功能。开启后,AnalyticDB for MySQL集群会使用Tunnel Arrow API方式进行导入。

开启方法

您可以通过SET命令或Hint在集群级别和查询级别开启Arrow API:

  • 集群级别开启Arrow API:

    SET ADB_CONFIG <config_name>= <value>;
  • 查询级别开启Arrow API:

    /*<config_name>= <value>*/ SELECT * FROM table;

Arrow API相关配置参数

参数(config_name)

说明

ODPS_TUNNEL_ARROW_ENABLED

是否开启Arrow API。取值:

  • true:是。

  • false(默认值):否。

ODPS_TUNNEL_SPLIT_BY_SIZE_ENABLED

是否开启动态Split切分。取值:

  • true:是。

  • false(默认值):否。

操作步骤

数据导入方式分为常规导入(默认)和弹性导入。常规导入在计算节点中读取源数据,然后在存储节点中构建索引,消耗计算资源和存储资源。弹性导入在Serverless Spark Job中读取源数据和构建索引,消耗Job型资源组的资源。仅内核版本3.1.10.0及以上且已创建Job型资源组的集群支持弹性导入数据。相较于常规导入,弹性导入可以大幅减少资源的消耗,降低导入过程中对在线读写业务的影响,提升资源隔离性和数据导入效率。更多内容,请参见数据导入方式介绍

常规导入

  1. 进入SQL编辑器。

    1. 登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,在企业版、基础版或湖仓版页签下,单击目标集群ID。

    2. 在左侧导航栏,单击作业开发 > SQL开发

  2. 创建外部数据库。示例如下:

    CREATE EXTERNAL DATABASE adb_external_db;
  3. 创建外表。本文示例为test_adb

    CREATE EXTERNAL TABLE IF NOT EXISTS adb_external_db.test_adb (
        id int,
        name varchar(1023),
        age int,
        dt string
        ) ENGINE='ODPS'
    TABLE_PROPERTIES='{
    "accessid":"LTAILd4****",
    "endpoint":"http://service.cn-hangzhou.maxcompute.aliyun.com/api",
    "accesskey":"4A5Q7ZVzcYnWMQPysX****",
    "partition_column":"dt",
    "project_name":"test_adb",
    "table_name":"person"
    }';
    说明
    • AnalyticDB for MySQL外表和MaxCompute中表的字段名称、字段数量、字段顺序需要一致,字段类型需要兼容。

    • 外表的参数说明,请参见CREATE EXTERNAL TABLE

  4. 查询数据。

    SELECT * FROM adb_external_db.test_adb;

    返回结果如下:

    +------+-------+------+---------+
    | id   | name  | age  |   dt    |
    +------+-------+------+---------+
    |    1 | james |   10 |  202207 |
    |    2 | bond  |   20 |  202207 |
    |    3 | jack  |   30 |  202207 |
    |    4 | lucy  |   40 |  202207 |
    +------+-------+------+---------+
    4 rows in set (0.35 sec)
  5. 执行以下步骤将MaxCompute数据导入至AnalyticDB for MySQL

    1. AnalyticDB for MySQL中创建数据库,示例如下:

      CREATE DATABASE adb_demo; 
    2. AnalyticDB for MySQL中创建表用于存储从MaxCompute中导入的数据,示例如下:

      说明

      新表和步骤3中创建的外表的字段顺序和字段数量需要一致,字段类型兼容。

      CREATE TABLE IF NOT EXISTS adb_demo.adb_import_test(
          id int,
          name string,
          age int,
          dt string
          PRIMARY KEY(id,dt)
      )
      DISTRIBUTED BY HASH(id)  
      PARTITION BY VALUE('dt'); 
    3. 向表中写入数据,示例如下:

      • 方式一:执行INSERT INTO导入数据,当主键重复时会自动忽略当前写入数据,不做更新,作用等同于INSERT IGNORE INTO,详情请参见INSERT INTO。示例如下:

        INSERT INTO adb_demo.adb_import_test
        SELECT * FROM adb_external_db.test_adb;

        如果需要将特定分区的数据导入adb_demo.adb_import_test,可以执行:

        INSERT INTO adb_demo.adb_import_test
        SELECT * FROM adb_external_db.test_adb 
        WHERE dt = '202207'; 
      • 方式二:执行INSERT OVERWRITE INTO导入数据,会覆盖表中原有的数据。示例如下:

        INSERT OVERWRITE INTO adb_demo.adb_import_test
        SELECT * FROM adb_external_db.test_adb;
      • 方式三:异步执行INSERT OVERWRITE INTO导入数据。通常使用SUBMIT JOB提交异步任务,由后台调度,可以在写入任务前增加Hint(/*+ direct_batch_load=true*/)加速写入任务。详情请参见异步写入。示例如下:

        SUBMIT job 
        INSERT OVERWRITE INTO adb_demo.adb_import_test
        SELECT * FROM adb_external_db.test_adb;

        返回结果如下:

        +---------------------------------------+
        | job_id                                |
        +---------------------------------------+
        | 2020112122202917203100908203303****** |
        +---------------------------------------+

        关于异步提交任务详情,请参见异步提交导入任务

弹性导入

  1. 进入SQL编辑器。

    1. 登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,在企业版、基础版或湖仓版页签下,单击目标集群ID。

    2. 在左侧导航栏,单击作业开发 > SQL开发

  2. 创建数据库。如果有已创建的数据库,可以忽略本步骤。示例如下:

    CREATE DATABASE adb_demo; 
  3. 创建外表。

    说明
    • AnalyticDB for MySQL外表的名称需要和MaxCompute项目的名称相同,否则创建外表会失败。

    • AnalyticDB for MySQL外表和MaxCompute中表的字段名称、字段数量、字段顺序需要一致,字段类型需要兼容。

    • 弹性导入仅支持CREATE TABLE语句创建外表。

    CREATE TABLE IF NOT EXISTS test_adb
    (
        id int,
        name string,
        age int,
        dt string
    )
     ENGINE='ODPS'
     TABLE_PROPERTIES='{
     "endpoint":"http://service.cn-hangzhou.maxcompute.aliyun-inc.com/api",
     "accessid":"LTAILd4****",
     "accesskey":"4A5Q7ZVzcYnWMQPysX****",
     "partition_column":"dt",
     "project_name":"test_adb",
     "table_name":"person"
     }';                 

    外表支持设置的参数及参数说明,请参见参数说明

  4. 查询数据。

    SELECT * FROM adb_demo.test_adb;

    返回结果如下:

    +------+-------+------+---------+
    | id   | name  | age  |   dt    |
    +------+-------+------+---------+
    |    1 | james |   10 |  202207 |
    |    2 | bond  |   20 |  202207 |
    |    3 | jack  |   30 |  202207 |
    |    4 | lucy  |   40 |  202207 |
    +------+-------+------+---------+
    4 rows in set (0.35 sec)
  5. AnalyticDB for MySQL中创建表用于存储从MaxCompute中导入的数据。示例如下:

    说明

    创建的内表和步骤3中创建的外表的字段名称、字段数量、字段顺序、字段类型必须相同。

    CREATE TABLE IF NOT EXISTS adb_import_test
    (   id int,
        name string,
        age int,
        dt string,
        PRIMARY KEY(id,dt)
    )
    DISTRIBUTED BY HASH(id)
    PARTITION BY VALUE('dt') LIFECYCLE 30;  
  6. 导入数据。

    重要

    弹性导入仅支持通过INSERT OVERWRITE INTO语句导入数据。

    • 方法一:执行INSERT OVERWRITE INTO弹性导入数据,会覆盖表中原有的数据。示例如下:

      /*+ elastic_load=true, elastic_load_configs=[adb.load.resource.group.name=resource_group|spark.adb.eni.vswitchId=vsw-bp12ldm83z4zu9k4d****]*/
      INSERT OVERWRITE INTO adb_demo.adb_import_test SELECT * FROM adb_demo.test_adb;
    • 方法二:异步执行INSERT OVERWRITE INTO弹性导入数据。通常使用SUBMIT JOB提交异步任务,由后台调度。

      /*+ elastic_load=true, elastic_load_configs=[adb.load.resource.group.name=resource_group|spark.adb.eni.vswitchId=vsw-bp12ldm83z4zu9k4d****]*/
      SUBMIT JOB INSERT OVERWRITE INTO adb_demo.adb_import_test SELECT * FROM adb_demo.test_adb;
      重要

      异步提交弹性导入任务时,不支持设置优先级队列。

      返回结果如下:

      +---------------------------------------+
      | job_id                                |
      +---------------------------------------+
      | 2023081517192220291720310090151****** |
      +---------------------------------------+

    使用SUBMIT JOB提交异步任务后,返回结果仅表示异步任务提交成功。您可以通过job_id终止异步任务或查询异步任务状态,判断任务是否执行成功。具体操作,请参见异步提交导入任务

    Hint参数说明:

    • elastic_load:是否使用弹性导入方式。取值:truefalse(默认值)。

    • elastic_load_configs:弹性导入方式支持配置的参数。参数需使用方括号([ ])括起来,且多个参数之间以竖线(|)分隔,支持配置的参数如下表所示:

      参数

      是否必填

      说明

      adb.load.resource.group.name

      执行弹性导入任务的Job资源组名称。

      adb.load.job.max.acu

      单个弹性导入任务最多使用的资源。单位为ACU,最小值为5 ACU。默认值为集群Shard个数+1。

      执行如下语句可查询集群Shard个数:

      SELECT count(1) FROM information_schema.kepler_meta_shards;

      spark.driver.resourceSpec

      Spark driver的资源规格。默认值为small。取值范围,请参见Spark资源规格列表的型号列。

      spark.executor.resourceSpec

      Spark executor的资源规格。默认值为large。取值范围,请参见Spark资源规格列表的型号列。

      spark.adb.executorDiskSize

      Spark executor的磁盘容量,取值范围为(0,100],单位为GiB,默认值为10 Gi。更多信息,请参见指定Driver和Executor资源

  7. (可选)查看已提交的导入任务是否为弹性导入任务。

    SELECT job_name, (job_type = 3) AS is_elastic_load FROM INFORMATION_SCHEMA.kepler_meta_async_jobs WHERE job_name = "2023081818010602101701907303151******";

    返回结果如下:

    +---------------------------------------+------------------+
    | job_name                              | is_elastic_load  |
    +---------------------------------------+------------------+
    | 2023081517195203101701907203151****** |       1          |
    +---------------------------------------+------------------+

    is_elastic_load的返回值为1,表示已提交的导入任务是弹性导入任务;若为0,则表示已提交的导入任务是常规导入任务。