AnalyticDB for MySQL支持通过外表读取并导入MaxCompute数据。通过外表导入数据可以最大限度地利用集群资源,实现高性能数据导入。您也可以进一步选择通过Tunnel方式和Storage API两种方式访问并导入MaxCompute数据,相较于Tunnel方式,Storage API方式可以提高数据访问和处理的效率。本文介绍如何通过外表将MaxCompute数据导入至AnalyticDB MySQL湖仓版(3.0)。
前提条件
MaxCompute项目AnalyticDB for MySQL湖仓版(3.0)集群位于同一地域。具体操作,请参见创建集群。
AnalyticDB MySQL湖仓版(3.0)已开启ENI访问。
已添加AnalyticDB MySQL的VPC网段到MaxCompute项目的白名单中。
登录云原生数据仓库AnalyticDB MySQL控制台,在集群信息页面查询VPC ID。然后登录专有网络控制台,在专有网络页面根据VPC ID查询网段。设置MaxCompute白名单的操作,请参见管理IP白名单。
使用Storage API方式访问并导入MaxCompute数据时,还需满足以下条件:
已提交Storage API功能的试用申请,并获得试用资格。详情请参见试用申请。
集群内版本需为3.1.10.2及以上版本。
集群所在地域为华东1(杭州)、华东2(上海)、华南1(深圳)或华北2(北京)。
功能介绍
AnalyticDB for MySQL集群默认使用Tunnel方式访问MaxCompute数据,3.1.10.2及以上内核版本集群支持使用Storage API方式访问MaxCompute数据。相较于Tunnel方式,Storage API方式可以缩短生成查询计划的耗时和将MaxCompute外表数据导入AnalyticDB for MySQL内表的耗时,提升OLAP查询的查询性能和数据导入性能。
Storage API方式与Tunnel方式的区别如下:
访问方式 | 集群版本 | 地域 | 费用 | 数据访问效率 |
Tunnel方式 | 无限制 | 无限制 | 免费 | 使用公共数据传输服务资源组,该资源会被该地域所有项目共享使用,数据访问和导入速度慢。 |
Storage API方式 | 3.1.10.2及以上版本 |
| 需购买独享数据传输服务资源组,会产生费用。独享数据传输服务资源组的计费规则请参见独享数据传输服务资源组定价。 | 使用独享数据传输服务资源组,为项目配置后,该资源仅供当前项目使用,减少数据访问和导入时间,提供更高的数据传输速度。 |
示例数据
本文示例中的MaxCompute项目为test_adb
,示例表person
。示例如下:
CREATE TABLE IF NOT EXISTS person (
id int,
name varchar(1023),
age int)
partitioned by (dt string);
在person
表中添加分区,示例如下:
ALTER TABLE person
ADD
PARTITION (dt='202207');
向分区中添加数据,示例如下:
INSERT INTO test_adb.person
partition (dt='202207')
VALUES (1,'james',10),(2,'bond',20),(3,'jack',30),(4,'lucy',40);
Tunnel方式访问MaxCompute
数据导入方式分为常规导入(默认)和弹性导入。常规导入在计算节点中读取源数据,然后在存储节点中构建索引,消耗计算资源和存储资源。弹性导入在Serverless Spark Job中读取源数据和构建索引,消耗Job型资源组的资源。仅内核版本3.1.10.0及以上且已创建Job型资源组的集群支持弹性导入数据。相较于常规导入,弹性导入可以大幅减少资源的消耗,降低导入过程中对在线读写业务的影响,提升资源隔离性和数据导入效率。更多内容,请参见数据导入方式介绍。
常规导入
进入SQL编辑器。
登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,在湖仓版(3.0)页签,单击目标集群ID。
- 在左侧导航栏,单击 。
创建外部数据库。示例如下:
CREATE EXTERNAL DATABASE adb_external_db;
创建外表。本文示例为
test_adb
。CREATE EXTERNAL TABLE IF NOT EXISTS adb_external_db.test_adb ( id int, name varchar(1023), age int, dt string ) ENGINE='ODPS' TABLE_PROPERTIES='{ "accessid":"LTAILd4****", "endpoint":"http://service.cn-hangzhou.maxcompute.aliyun.com/api", "accesskey":"4A5Q7ZVzcYnWMQPysX****", "partition_column":"dt", "project_name":"test_adb", "table_name":"person" }';
说明AnalyticDB MySQL外表和MaxCompute中表的字段名称、字段数量、字段顺序需要一致,字段类型需要兼容。
外表的参数说明,请参见CREATE EXTERNAL TABLE。
查询数据。
SELECT * FROM adb_external_db.test_adb;
返回结果如下:
+------+-------+------+---------+ | id | name | age | dt | +------+-------+------+---------+ | 1 | james | 10 | 202207 | | 2 | bond | 20 | 202207 | | 3 | jack | 30 | 202207 | | 4 | lucy | 40 | 202207 | +------+-------+------+---------+ 4 rows in set (0.35 sec)
执行以下步骤将MaxCompute数据导入至AnalyticDB MySQL。
在AnalyticDB MySQL中创建数据库,示例如下:
在AnalyticDB MySQL中创建表用于存储从MaxCompute中导入的数据,示例如下:
向表中写入数据,示例如下:
方式一:执行INSERT INTO导入数据,当主键重复时会自动忽略当前写入数据,不做更新,作用等同于
INSERT IGNORE INTO
,详情请参见INSERT INTO。示例如下:方式二:执行INSERT OVERWRITE INTO导入数据,会覆盖表中原有的数据。示例如下:
方式三:异步执行INSERT OVERWRITE INTO导入数据。通常使用
SUBMIT JOB
提交异步任务,由后台调度,可以在写入任务前增加Hint(/* direct_batch_load=true*/
)加速写入任务。详情请参见异步写入。示例如下:关于异步提交任务详情,请参见异步提交导入任务。
CREATE DATABASE adb_demo;
说明新表和步骤3中创建的外表的字段顺序和字段数量需要一致,字段类型兼容。
CREATE TABLE IF NOT EXISTS adb_demo.adb_import_test( id int, name string, age int, dt string PRIMARY KEY(id,dt) ) DISTRIBUTE BY HASH(id) PARTITION BY VALUE('dt');
INSERT INTO adb_demo.adb_import_test SELECT * FROM adb_external_db.test_adb;
如果需要将特定分区的数据导入
adb_demo.adb_import_test
,可以执行:INSERT INTO adb_demo.adb_import_test SELECT * FROM adb_external_db.test_adb WHERE dt = '202207';
INSERT OVERWRITE INTO adb_demo.adb_import_test SELECT * FROM adb_external_db.test_adb;
SUBMIT job INSERT OVERWRITE INTO adb_demo.adb_import_test SELECT * FROM adb_external_db.test_adb;
返回结果如下:
+---------------------------------------+ | job_id | +---------------------------------------+ | 2020112122202917203100908203303****** | +---------------------------------------+
弹性导入
进入SQL编辑器。
登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,在湖仓版(3.0)页签,单击目标集群ID。
- 在左侧导航栏,单击 。
创建数据库。如果有已创建的数据库,可以忽略本步骤。示例如下:
CREATE DATABASE adb_demo;
创建外表。
说明AnalyticDB MySQL外表的名称需要和MaxCompute项目的名称相同,否则创建外表会失败。
AnalyticDB MySQL外表和MaxCompute中表的字段名称、字段数量、字段顺序需要一致,字段类型需要兼容。
弹性导入仅支持
CREATE TABLE
语句创建外表。
CREATE TABLE IF NOT EXISTS test_adb ( id int, name string, age int, dt string ) ENGINE='ODPS' TABLE_PROPERTIES='{ "endpoint":"http://service.cn-hangzhou.maxcompute.aliyun-inc.com/api", "accessid":"LTAILd4****", "accesskey":"4A5Q7ZVzcYnWMQPysX****", "partition_column":"dt", "project_name":"test_adb", "table_name":"person" }';
外表支持设置的参数及参数说明,请参见参数说明。
查询数据。
SELECT * FROM adb_demo.test_adb;
返回结果如下:
+------+-------+------+---------+ | id | name | age | dt | +------+-------+------+---------+ | 1 | james | 10 | 202207 | | 2 | bond | 20 | 202207 | | 3 | jack | 30 | 202207 | | 4 | lucy | 40 | 202207 | +------+-------+------+---------+ 4 rows in set (0.35 sec)
在AnalyticDB MySQL中创建表用于存储从MaxCompute中导入的数据。示例如下:
说明创建的内表和步骤3中创建的外表的字段名称、字段数量、字段顺序、字段类型必须相同。
CREATE TABLE IF NOT EXISTS adb_import_test ( id int, name string, age int, dt string, PRIMARY KEY(id,dt) ) DISTRIBUTE BY HASH(id) PARTITION BY VALUE('dt') LIFECYCLE 30;
导入数据。
重要弹性导入仅支持通过
INSERT OVERWRITE INTO
语句导入数据。方法一:执行INSERT OVERWRITE INTO弹性导入数据,会覆盖表中原有的数据。示例如下:
/*elastic_load=true, elastic_load_configs=[adb.load.resource.group.name=resource_group|spark.adb.eni.vswitchId=vsw-bp12ldm83z4zu9k4d****]*/ INSERT OVERWRITE INTO adb_demo.adb_import_test SELECT * FROM adb_demo.test_adb;
方法二:异步执行INSERT OVERWRITE INTO弹性导入数据。通常使用
SUBMIT JOB
提交异步任务,由后台调度。/*elastic_load=true, elastic_load_configs=[adb.load.resource.group.name=resource_group|spark.adb.eni.vswitchId=vsw-bp12ldm83z4zu9k4d****]*/ SUBMIT JOB INSERT OVERWRITE INTO adb_demo.adb_import_test SELECT * FROM adb_demo.test_adb;
重要异步提交弹性导入任务时,不支持设置优先级队列。
返回结果如下:
+---------------------------------------+ | job_id | +---------------------------------------+ | 2023081517192220291720310090151****** | +---------------------------------------+
使用
SUBMIT JOB
提交异步任务后,返回结果仅表示异步任务提交成功。您可以通过job_id终止异步任务或查询异步任务状态,判断任务是否执行成功。具体操作,请参见异步提交导入任务。Hint参数说明:
elastic_load:是否使用弹性导入方式。取值:true或false(默认值)。
elastic_load_configs:弹性导入方式支持配置的参数。参数需使用方括号([ ])括起来,且多个参数之间以竖线(|)分隔,支持配置的参数如下表所示:
参数
是否必填
说明
adb.load.resource.group.name
是
执行弹性导入任务的Job资源组名称。
adb.load.job.max.acu
否
单个弹性导入任务最多使用的资源。单位为ACU,最小值为5 ACU。默认值为集群Shard个数+1。
执行如下语句可查询集群Shard个数:
SELECT count(1) FROM information_schema.kepler_meta_shards;
spark.driver.resourceSpec
否
Spark driver的资源规格。默认值为small。取值范围,请参见Spark资源规格列表的型号列。
spark.executor.resourceSpec
否
Spark executor的资源规格。默认值为large。取值范围,请参见Spark资源规格列表的型号列。
spark.adb.executorDiskSize
否
Spark executor的磁盘容量,取值范围为(0,100],单位为GiB,默认值为10 Gi。更多信息,请参见指定Driver和Executor资源。
(可选)查看已提交的导入任务是否为弹性导入任务。
SELECT job_name, (job_type = 3) AS is_elastic_load FROM INFORMATION_SCHEMA.kepler_meta_async_jobs WHERE job_name = "2023081818010602101701907303151******";
返回结果如下:
+---------------------------------------+------------------+ | job_name | is_elastic_load | +---------------------------------------+------------------+ | 2023081517195203101701907203151****** | 1 | +---------------------------------------+------------------+
is_elastic_load
的返回值为1,表示已提交的导入任务是弹性导入任务;若为0,则表示已提交的导入任务是常规导入任务。
Storage API方式访问MaxCompute
使用Storage API方式访问MaxCompute数据时,您需确保:
已提交Storage API功能的试用申请,并获得试用资格。详情请参见试用申请。
集群内版本需为3.1.10.2及以上版本。
集群所在地域为华东1(杭州)、华东2(上海)、华南1(深圳)或华北2(北京)。
费用说明
使用Storage API访问MaxCompute外表数据需购买独享数据传输服务资源组,会产生费用。独享数据传输服务资源组的计费规则请参见独享数据传输服务资源组定价。
购买并配置独享数据传输服务资源组
登录MaxCompute控制台购买独享数据传输服务资源组。具体操作,请参见购买独享数据服务资源组。
在项目管理页面,选择目标MaxCompute项目并单击操作列的管理。
单击基础属性区域的编辑,在数据传输服务下拉列表中选择您购买的独享资源组。
访问MaxCompute数据
数据导入方式分为常规导入(默认)和弹性导入。常规导入在计算节点中读取源数据,然后在存储节点中构建索引,消耗计算资源和存储资源。弹性导入在Serverless Spark Job中读取源数据和构建索引,消耗Job型资源组的资源。仅内核版本3.1.10.0及以上且已创建Job型资源组的集群支持弹性导入数据。相较于常规导入,弹性导入可以大幅减少资源的消耗,降低导入过程中对在线读写业务的影响,提升资源隔离性和数据导入效率。更多内容,请参见数据导入方式介绍。
常规导入
进入SQL编辑器。
登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,在湖仓版(3.0)页签,单击目标集群ID。
- 在左侧导航栏,单击 。
创建外部数据库。示例如下:
CREATE EXTERNAL DATABASE adb_external_demo;
创建外表。本文示例为
test
。说明AnalyticDB MySQL外表和MaxCompute中表的字段名称、字段数量、字段顺序需要一致,字段类型需要兼容。
CREATE EXTERNAL TABLE IF NOT EXISTS adb_external_demo.test ( id int, name varchar(1023), age int, dt string ) ENGINE='ODPS' TABLE_PROPERTIES='{ "accessid":"LTAILd4****", "endpoint":"http://service.cn-hangzhou.maxcompute.aliyun.com/api", "accesskey":"4A5Q7ZVzcYnWMQPysX****", "partition_column":"dt", "project_name":"test_adb", "table_name":"person", "odps_quota_name":ot_42854300324284****, "odps_compression_code":ZSTD }';
外表的参数说明,请参见CREATE EXTERNAL TABLE。
查询数据。
/*storage_api_enabled=true,odps_quota_name=ot_42854300324284****,odps_compression_code=ZSTD*/ SELECT * FROM adb_external_demo.test;
Hint参数说明:
参数
是否必填
说明
storage_api_enabled
是
是否使用Storage API访问MaxCompute数据。取值:
true:是。
false(默认值):否,使用Tunnel方式访问MaxCompute数据。
本示例需配置为true。
说明您也可以使用SET ADB_CONFIG命令配置该参数。详情请参见常见配置参数。
odps_quota_name
是
购买独享数据传输服务资源组的名称。本文示例为ot_42854300324284****。独享数据传输服务资源组的名称查看方法,请参见查看Quota名称。
说明您也可以使用SET ADB_CONFIG命令或在TABLE_PROPERTIES中配置该参数。详情请参见常见配置参数。
odps_compression_code
否
压缩MaxCompute数据的方式。默认值为
""
,即不压缩MaxCompute数据。取值:ZSTD
LZ4_FRAME
说明您也可以使用SET ADB_CONFIG命令或在TABLE_PROPERTIES中配置该参数。详情请参见常见配置参数。
返回结果如下:
+------+-------+------+---------+ | id | name | age | dt | +------+-------+------+---------+ | 1 | james | 10 | 202207 | | 2 | bond | 20 | 202207 | | 3 | jack | 30 | 202207 | | 4 | lucy | 40 | 202207 | +------+-------+------+---------+
导入数据。
在AnalyticDB for MySQL中创建数据库,示例如下:
在AnalyticDB for MySQL中创建表用于存储从MaxCompute中导入的数据,示例如下:
向表中写入数据,示例如下:
方式一:执行
INSERT INTO
导入数据,当主键重复时会自动忽略当前写入数据,不做更新,作用等同于INSERT IGNORE INTO
,详情请参见INSERT INTO。示例如下:方式二:执行
INSERT OVERWRITE INTO
导入数据,会覆盖表中原有的数据。示例如下:方式三:异步执行INSERT OVERWRITE INTO导入数据。通常使用
SUBMIT JOB
提交异步任务,由后台调度,可以在写入任务前增加Hint(/* direct_batch_load=true*/
)加速写入任务。详情请参见异步写入。示例如下:关于异步提交任务详情,请参见异步提交导入任务。
CREATE DATABASE adb_demo;
说明新表和步骤3中创建的外表的字段顺序和字段数量需要一致,字段类型兼容。
CREATE TABLE IF NOT EXISTS adb_demo.import_test( id int, name string, age int, dt string PRIMARY KEY(id,dt) ) DISTRIBUTE BY HASH(id) PARTITION BY VALUE('dt');
/*storage_api_enabled=true,odps_quota_name=ot_42854300324284****,odps_compression_code=ZSTD*/ INSERT INTO adb_demo.import_test SELECT * FROM adb_external_demo.test;
如果需要将特定分区的数据导入
adb_external_demo.test
,可以执行:/*storage_api_enabled=true,odps_quota_name=ot_42854300324284****,odps_compression_code=ZSTD*/ INSERT INTO adb_demo.import_test SELECT * FROM adb_external_demo.test WHERE dt = '202207';
/*storage_api_enabled=true,odps_quota_name=ot_42854300324284****,odps_compression_code=ZSTD*/ INSERT OVERWRITE INTO adb_demo.import_test SELECT * FROM adb_external_demo.test;
/*storage_api_enabled=true,odps_quota_name=ot_42854300324284****,odps_compression_code=ZSTD*/ SUBMIT job INSERT OVERWRITE INTO adb_demo.import_test SELECT * FROM adb_external_demo.test;
返回结果如下:
+---------------------------------------+ | job_id | +---------------------------------------+ | 2020112122202917203100908203303****** | +---------------------------------------+
Hint参数说明,请参见Hint参数说明。
(可选)查看是否已使用Storage API方式访问MaxCompute数据。具体操作,请参见资源观测。
在表访问热度(并发数)和访问来源IP(B/S)指标区域,将使用方式选择为Storage API读或Storage API写,若指标区域有数据,则说明使用了Storage API方式访问MaxCompute数据;若无,则说明未使用Storage API方式,而默认使用了Tunnel方式访问MaxCompute数据。
弹性导入
进入SQL编辑器。
登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,在湖仓版(3.0)页签,单击目标集群ID。
- 在左侧导航栏,单击 。
创建数据库。如果有已创建的数据库,可以忽略本步骤。示例如下:
CREATE DATABASE adb_demo1;
创建外表。
说明AnalyticDB MySQL外表的名称需要和MaxCompute项目的名称相同,否则创建外表会失败。
AnalyticDB MySQL外表和MaxCompute中表的字段名称、字段数量、字段顺序需要一致,字段类型需要兼容。
弹性导入仅支持
CREATE TABLE
语句创建外表。
CREATE TABLE IF NOT EXISTS test_adb ( id int, name string, age int, dt string ) ENGINE='ODPS' TABLE_PROPERTIES='{ "endpoint":"http://service.cn-hangzhou.maxcompute.aliyun-inc.com/api", "accessid":"LTAILd4****", "accesskey":"4A5Q7ZVzcYnWMQPysX****", "partition_column":"dt", "project_name":"test_adb", "table_name":"person", "odps_quota_name":ot_42854300324284****, "odps_compression_code":ZSTD }';
外表支持设置的参数及参数说明,请参见参数说明。
查询数据。
/*storage_api_enabled=true,odps_quota_name=ot_42854300324284****,odps_compression_code=ZSTD*/ SELECT * FROM adb_demo1.test_adb;
Hint参数说明,请参见Hint参数说明。
返回结果如下:
+------+-------+------+---------+ | id | name | age | dt | +------+-------+------+---------+ | 1 | james | 10 | 202207 | | 2 | bond | 20 | 202207 | | 3 | jack | 30 | 202207 | | 4 | lucy | 40 | 202207 | +------+-------+------+---------+ 4 rows in set (0.35 sec)
在AnalyticDB MySQL中创建表用于存储从MaxCompute中导入的数据。示例如下:
说明创建的内表和步骤3中创建的外表的字段名称、字段数量、字段顺序、字段类型必须相同。
CREATE TABLE IF NOT EXISTS adb_import_test ( id int, name string, age int, dt string, PRIMARY KEY(id,dt) ) DISTRIBUTE BY HASH(id) PARTITION BY VALUE('dt') LIFECYCLE 30;
导入数据。
重要弹性导入仅支持通过
INSERT OVERWRITE INTO
语句导入数据。方法一:执行INSERT OVERWRITE INTO弹性导入数据,会覆盖表中原有的数据。示例如下:
/*storage_api_enabled=true,odps_quota_name=ot_42854300324284****,odps_compression_code=ZSTD,elastic_load=true, elastic_load_configs=[adb.load.resource.group.name=resource_group|spark.adb.eni.vswitchId=vsw-bp12ldm83z4zu9k4d****]*/ INSERT OVERWRITE INTO adb_demo.adb_import_test SELECT * FROM adb_demo.test_adb;
方法二:异步执行INSERT OVERWRITE INTO弹性导入数据。通常使用
SUBMIT JOB
提交异步任务,由后台调度。/*storage_api_enabled=true,odps_quota_name=ot_42854300324284****,odps_compression_code=ZSTD,elastic_load=true, elastic_load_configs=[adb.load.resource.group.name=resource_group|spark.adb.eni.vswitchId=vsw-bp12ldm83z4zu9k4d****]*/ SUBMIT JOB INSERT OVERWRITE INTO adb_demo.adb_import_test SELECT * FROM adb_demo.test_adb;
重要异步提交弹性导入任务时,不支持设置优先级队列。
返回结果如下:
+---------------------------------------+ | job_id | +---------------------------------------+ | 2023081517192220291720310090151****** | +---------------------------------------+
使用
SUBMIT JOB
提交异步任务后,返回结果仅表示异步任务提交成功。您可以通过job_id终止异步任务或查询异步任务状态,判断任务是否执行成功。具体操作,请参见异步提交导入任务。Hint参数说明:
Storage API Hint参数说明,请参见Hint参数说明。
弹性导入Hint参数说明如下:
elastic_load:是否使用弹性导入方式。取值:true或false(默认值)。
elastic_load_configs:弹性导入方式支持配置的参数。参数需使用方括号([ ])括起来,且多个参数之间以竖线(|)分隔,支持配置的参数如下表所示:
参数
是否必填
说明
adb.load.resource.group.name
是
执行弹性导入任务的Job资源组名称。
adb.load.job.max.acu
否
单个弹性导入任务最多使用的资源。单位为ACU,最小值为5 ACU。默认值为集群Shard个数+1。
执行如下语句可查询集群Shard个数:
SELECT count(1) FROM information_schema.kepler_meta_shards;
spark.driver.resourceSpec
否
Spark driver的资源规格。默认值为small。取值范围,请参见Spark资源规格列表的型号列。
spark.executor.resourceSpec
否
Spark executor的资源规格。默认值为large。取值范围,请参见Spark资源规格列表的型号列。
spark.adb.executorDiskSize
否
Spark executor的磁盘容量,取值范围为(0,100],单位为GiB,默认值为10 Gi。更多信息,请参见指定Driver和Executor资源。
(可选)查看已提交的导入任务是否为弹性导入任务。
SELECT job_name, (job_type = 3) AS is_elastic_load FROM INFORMATION_SCHEMA.kepler_meta_async_jobs WHERE job_name = "2023081818010602101701907303151******";
返回结果如下:
+---------------------------------------+------------------+ | job_name | is_elastic_load | +---------------------------------------+------------------+ | 2023081517195203101701907203151****** | 1 | +---------------------------------------+------------------+
is_elastic_load
的返回值为1,表示已提交的导入任务是弹性导入任务;若为0,则表示已提交的导入任务是常规导入任务。(可选)查看是否已使用Storage API方式访问MaxCompute数据。具体操作,请参见资源观测。
在表访问热度(并发数)和访问来源IP(B/S)指标区域,将使用方式选择为Storage API读或Storage API写,若指标区域有数据,则说明使用了Storage API方式访问MaxCompute数据;若无,则说明未使用Storage API方式,而默认使用了Tunnel方式访问MaxCompute数据。
- 本页导读 (1)