本文介绍如何将MaxCompute数据导入至AnalyticDB MySQL

前提条件

示例数据说明

本文示例中的MaxCompute项目为odps_project1,示例表odps_nopart_import_test。示例如下:
CREATE TABLE IF NOT EXISTS odps_nopart_import_test (
    id int,
    name string,
    age int)
partitioned by (dt string);
odps_nopart_import_test表中添加分区,示例如下:
ALTER TABLE odps_nopart_import_test 
ADD 
PARTITION (dt='202207');
向分区中添加数据,示例如下:
INSERT INTO odps_project1.odps_nopart_import_test 
PARTITION (dt='202207') 
VALUES (1,'james',10),(2,'bond',20),(3,'jack',30),(4,'lucy',40);

操作步骤

本示例将odps_nopart_import_test表中的数据导入AnalyticDB MySQLtest_adb数据库中。

  1. 连接AnalyticDB MySQL集群,进入目标数据库test_adb
  2. 通过CREATE TABLE,在test_adb数据库中创建外部映射表odps_nopart_import_test_external_table
    CREATE TABLE IF NOT EXISTS odps_nopart_import_test_external_table
    (
        id int,
        name string,
        age int,
        dt string
    )
     ENGINE='ODPS'
     TABLE_PROPERTIES='{
     "endpoint":"http://service.cn.maxcompute.aliyun-inc.com/api",
     "accessid":"L*******FsE",
     "accesskey":"CcwF********iWjv",
     "partition_column":"dt",
     "project_name":"odps_project1",
     "table_name":"odps_nopart_import_test"
     }';                 
    参数说明
    ENGINE=’ODPS’外表的存储引擎。读写MaxCompute数据时,取值为ODPS。
    endpointMaxCompute的EndPoint(域名节点)
    说明 目前仅支持AnalyticDB MySQL通过MaxCompute的VPC网络Endpoint访问MaxCompute。

    查询各地域VPC网络的Endpoint,请参见各地域Endpoint对照表(阿里云VPC网络连接方式)

    accessid阿里云账号或者具备MaxCompute访问权限的RAM用户的AccessKey ID。

    如何获取AccessKey ID和AccessKey Secret,请参见获取AccessKey信息

    accesskey阿里云账号或者具备MaxCompute访问权限的RAM用户的AccessKey Secret。

    如何获取AccessKey ID和AccessKey Secret,请参见获取AccessKey信息

    partition_column本文使用的示例是创建分区表的示例,所以需要配置partition_column。如果MaxCompute的表是非分区表,那么AnalyticDB MySQL中也需要创建非分区表,此时无需配置partition_column
    project_nameMaxCompute中的工作空间名称。
    table_nameMaxCompute中的数据源表名。
  3. 通过CREATE TABLE,在test_adb数据库中创建表adb_nopart_import_test用于存储从MaxCompute中导入的数据。
    CREATE TABLE IF NOT EXISTS adb_nopart_import_test
    (   id int,
        name string,
        age int,
        dt string,
        PRIMARY KEY(id,dt)
    )
    DISTRIBUTE BY HASH(id)
    PARTITION BY VALUE('dt') LIFECYCLE 30;  
  4. 通过外表odps_nopart_import_test_external_table将MaxCompute数据导入AnalyticDB MySQL的表adb_nopart_import_test
    • 方式一:执行INSERT INTO导入数据,当主键重复时会自动忽略当前写入数据,不做更新,作用等同于INSERT IGNORE INTO,详情请参见INSERT INTO。示例如下:
      INSERT INTO adb_nopart_import_test
      SELECT * FROM odps_nopart_import_test_external_table;   
      通过SELECT查询写入表中的数据,示例如下:
      SELECT * FROM adb_nopart_import_test;
      返回结果如下:
      +------+-------+------+---------+
      | id   | name  | age  |   dt    |
      +------+-------+------+---------+
      |    1 | james |   10 |  202207 |
      |    2 | bond  |   20 |  202207 |
      |    3 | jack  |   30 |  202207 |
      |    4 | lucy  |   40 |  202207 |
      +------+-------+------+---------+
      如果需要将特定分区的数据导入adb_nopart_import_test,可以执行:
      INSERT INTO adb_nopart_import_test
      SELECT * FROM odps_nopart_import_test_external_table 
      WHERE dt = '202207'; 
    • 方式二:执行INSERT OVERWRITE INTO导入数据,会覆盖表中原有的数据。示例如下:
      INSERT OVERWRITE INTO adb_nopart_import_test
      SELECT * FROM odps_nopart_import_test_external_table;
    • 方式三:异步执行INSERT OVERWRITE INTO导入数据。通常使用SUBMIT JOB提交异步任务,由后台调度,可以在写入任务前增加Hint(/* direct_batch_load=true*/)加速写入任务。详情请参见异步写入。示例如下:
      SUBMIT job 
      INSERT OVERWRITE INTO adb_nopart_import_test
      SELECT * FROM odps_nopart_import_test_external_table;  
      返回结果如下:
      +---------------------------------------+
      | job_id                                |
      +---------------------------------------+
      | 2020112122202917203100908203303****** |

      关于异步提交任务详情请参见异步提交导入导出任务