本文介绍如何将MaxCompute数据导入至AnalyticDB MySQL。
前提条件
- 已创建MaxCompute项目和表。本文示例的MaxCompute项目、表和数据,请参见示例数据说明。
- 已在MaxCompute项目所在的同一地域中创建AnalyticDB MySQL集群。具体操作,请参见创建数仓版(3.0)集群。
- 已创建AnalyticDB MySQL目标库。
- 已添加AnalyticDB MySQL的VPC网段到MaxCompute项目的白名单中。
登录云原生数据仓库AnalyticDB MySQL控制台,在集群信息页面查询VPC ID。然后登录专有网络控制台,在专有网络页面根据VPC ID查询网段。设置MaxCompute白名单的操作,请参见管理IP白名单。
示例数据说明
本文示例中的MaxCompute项目为
odps_project1
,示例表odps_nopart_import_test
。示例如下:CREATE TABLE IF NOT EXISTS odps_nopart_import_test (
id int,
name string,
age int)
partitioned by (dt string);
在
odps_nopart_import_test
表中添加分区,示例如下:ALTER TABLE odps_nopart_import_test
ADD
PARTITION (dt='202207');
向分区中添加数据,示例如下:
INSERT INTO odps_project1.odps_nopart_import_test
PARTITION (dt='202207')
VALUES (1,'james',10),(2,'bond',20),(3,'jack',30),(4,'lucy',40);
操作步骤
本示例将odps_nopart_import_test
表中的数据导入AnalyticDB MySQL的test_adb
数据库中。
- 连接AnalyticDB MySQL集群,进入目标数据库
test_adb
。 - 通过CREATE TABLE,在
test_adb
数据库中创建外部映射表odps_nopart_import_test_external_table
。CREATE TABLE IF NOT EXISTS odps_nopart_import_test_external_table ( id int, name string, age int, dt string ) ENGINE='ODPS' TABLE_PROPERTIES='{ "endpoint":"http://service.cn.maxcompute.aliyun-inc.com/api", "accessid":"L*******FsE", "accesskey":"CcwF********iWjv", "partition_column":"dt", "project_name":"odps_project1", "table_name":"odps_nopart_import_test" }';
参数 说明 ENGINE=’ODPS’
外表的存储引擎。读写MaxCompute数据时,取值为ODPS。 endpoint
MaxCompute的EndPoint(域名节点)。 说明 目前仅支持AnalyticDB MySQL通过MaxCompute的VPC网络Endpoint访问MaxCompute。查询各地域VPC网络的Endpoint,请参见各地域Endpoint对照表(阿里云VPC网络连接方式)。
accessid
阿里云账号或者具备MaxCompute访问权限的RAM用户的AccessKey ID。 如何获取AccessKey ID和AccessKey Secret,请参见获取AccessKey信息。
accesskey
阿里云账号或者具备MaxCompute访问权限的RAM用户的AccessKey Secret。 如何获取AccessKey ID和AccessKey Secret,请参见获取AccessKey信息。
partition_column
本文使用的示例是创建分区表的示例,所以需要配置 partition_column
。如果MaxCompute的表是非分区表,那么AnalyticDB MySQL中也需要创建非分区表,此时无需配置partition_column
。project_name
MaxCompute中的工作空间名称。 table_name
MaxCompute中的数据源表名。 - 通过CREATE TABLE,在
test_adb
数据库中创建表adb_nopart_import_test
用于存储从MaxCompute中导入的数据。CREATE TABLE IF NOT EXISTS adb_nopart_import_test ( id int, name string, age int, dt string, PRIMARY KEY(id,dt) ) DISTRIBUTE BY HASH(id) PARTITION BY VALUE('dt') LIFECYCLE 30;
- 通过外表
odps_nopart_import_test_external_table
将MaxCompute数据导入AnalyticDB MySQL的表adb_nopart_import_test
。- 方式一:执行INSERT INTO导入数据,当主键重复时会自动忽略当前写入数据,不做更新,作用等同于
INSERT IGNORE INTO
,详情请参见INSERT INTO。示例如下:INSERT INTO adb_nopart_import_test SELECT * FROM odps_nopart_import_test_external_table;
通过SELECT查询写入表中的数据,示例如下:
返回结果如下:SELECT * FROM adb_nopart_import_test;
+------+-------+------+---------+ | id | name | age | dt | +------+-------+------+---------+ | 1 | james | 10 | 202207 | | 2 | bond | 20 | 202207 | | 3 | jack | 30 | 202207 | | 4 | lucy | 40 | 202207 | +------+-------+------+---------+
如果需要将特定分区的数据导入adb_nopart_import_test
,可以执行:INSERT INTO adb_nopart_import_test SELECT * FROM odps_nopart_import_test_external_table WHERE dt = '202207';
- 方式二:执行INSERT OVERWRITE INTO导入数据,会覆盖表中原有的数据。示例如下:
INSERT OVERWRITE INTO adb_nopart_import_test SELECT * FROM odps_nopart_import_test_external_table;
- 方式三:异步执行INSERT OVERWRITE INTO导入数据。通常使用
SUBMIT JOB
提交异步任务,由后台调度,可以在写入任务前增加Hint(/* direct_batch_load=true*/
)加速写入任务。详情请参见异步写入。示例如下:SUBMIT job INSERT OVERWRITE INTO adb_nopart_import_test SELECT * FROM odps_nopart_import_test_external_table;
返回结果如下:+---------------------------------------+ | job_id | +---------------------------------------+ | 2020112122202917203100908203303****** |
关于异步提交任务详情请参见异步提交导入导出任务。
- 方式一:执行INSERT INTO导入数据,当主键重复时会自动忽略当前写入数据,不做更新,作用等同于