云原生数据仓库 AnalyticDB MySQL 版在通过外表访问并导入MaxCompute数据时,默认使用Tunnel Record API方式。您也可以进一步选择Tunnel Arrow API方式,相较于Tunnel Record API方式,Tunnel Arrow API方式可以列式读取MaxCompute的数据,从而提高数据访问和处理的效率。
前提条件
AnalyticDB for MySQL集群的产品系列为企业版、基础版或湖仓版。
MaxCompute项目与AnalyticDB for MySQL集群位于同一地域。
AnalyticDB for MySQL集群已开启ENI访问。
说明登录云原生数据仓库AnalyticDB MySQL控制台,在 的网络信息区域,打开ENI网络开关。
已添加AnalyticDB for MySQL的VPC网段到MaxCompute项目的白名单中。
说明登录云原生数据仓库AnalyticDB MySQL控制台,在集群信息页面查询VPC ID。然后登录专有网络控制台,在专有网络页面根据VPC ID查询网段。设置MaxCompute白名单的操作,请参见管理IP白名单。
使用Tunnel Arrow API方式访问并导入MaxCompute数据时,AnalyticDB for MySQL集群需为3.2.2.1及以上版本。
说明查看企业版或湖仓版集群的内核版本,请执行
SELECT adb_version();
。如需升级内核版本,请联系技术支持。
示例数据
本文示例中的MaxCompute项目为test_adb
,示例表person
。示例如下:
CREATE TABLE IF NOT EXISTS person (
id INT,
name VARCHAR(1023),
age INT)
partitioned BY (dt string);
在person
表中添加分区,示例如下:
ALTER TABLE person
ADD
PARTITION (dt='202207');
向分区中添加数据,示例如下:
INSERT INTO test_adb.person
PARTITION (dt='202207')
VALUES (1,'james',10),(2,'bond',20),(3,'jack',30),(4,'lucy',40);
开启Arrow API(可选)
默认情况下,AnalyticDB for MySQL集群会使用Tunnel Record API方式访问并导入MaxCompute数据。 若您需要通过Tunnel Arrow API方式访问并导入MaxCompute数据,请先开启Arrow API功能。开启后,AnalyticDB for MySQL集群会使用Tunnel Arrow API方式进行导入。
开启方法
您可以通过SET命令或Hint在集群级别和查询级别开启Arrow API:
集群级别开启Arrow API:
SET ADB_CONFIG <config_name>= <value>;
查询级别开启Arrow API:
/*<config_name>= <value>*/ SELECT * FROM table;
Arrow API相关配置参数
参数(config_name) | 说明 |
ODPS_TUNNEL_ARROW_ENABLED | 是否开启Arrow API。取值:
|
ODPS_TUNNEL_SPLIT_BY_SIZE_ENABLED | 是否开启动态Split切分。取值:
|
操作步骤
数据导入方式分为常规导入(默认)和弹性导入。常规导入在计算节点中读取源数据,然后在存储节点中构建索引,消耗计算资源和存储资源。弹性导入在Serverless Spark Job中读取源数据和构建索引,消耗Job型资源组的资源。仅内核版本3.1.10.0及以上且已创建Job型资源组的集群支持弹性导入数据。相较于常规导入,弹性导入可以大幅减少资源的消耗,降低导入过程中对在线读写业务的影响,提升资源隔离性和数据导入效率。更多内容,请参见数据导入方式介绍。
常规导入
进入SQL编辑器。
登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,在企业版、基础版或湖仓版页签下,单击目标集群ID。
在左侧导航栏,单击 。
创建外部数据库。示例如下:
CREATE EXTERNAL DATABASE adb_external_db;
创建外表。本文示例为
test_adb
。CREATE EXTERNAL TABLE IF NOT EXISTS adb_external_db.test_adb ( id int, name varchar(1023), age int, dt string ) ENGINE='ODPS' TABLE_PROPERTIES='{ "accessid":"LTAILd4****", "endpoint":"http://service.cn-hangzhou.maxcompute.aliyun.com/api", "accesskey":"4A5Q7ZVzcYnWMQPysX****", "partition_column":"dt", "project_name":"test_adb", "table_name":"person" }';
说明AnalyticDB for MySQL外表和MaxCompute中表的字段名称、字段数量、字段顺序需要一致,字段类型需要兼容。
外表的参数说明,请参见CREATE EXTERNAL TABLE。
查询数据。
SELECT * FROM adb_external_db.test_adb;
返回结果如下:
+------+-------+------+---------+ | id | name | age | dt | +------+-------+------+---------+ | 1 | james | 10 | 202207 | | 2 | bond | 20 | 202207 | | 3 | jack | 30 | 202207 | | 4 | lucy | 40 | 202207 | +------+-------+------+---------+ 4 rows in set (0.35 sec)
执行以下步骤将MaxCompute数据导入至AnalyticDB for MySQL。
在AnalyticDB for MySQL中创建数据库,示例如下:
CREATE DATABASE adb_demo;
在AnalyticDB for MySQL中创建表用于存储从MaxCompute中导入的数据,示例如下:
说明新表和步骤3中创建的外表的字段顺序和字段数量需要一致,字段类型兼容。
CREATE TABLE IF NOT EXISTS adb_demo.adb_import_test( id int, name string, age int, dt string PRIMARY KEY(id,dt) ) DISTRIBUTED BY HASH(id) PARTITION BY VALUE('dt');
向表中写入数据,示例如下:
方式一:执行INSERT INTO导入数据,当主键重复时会自动忽略当前写入数据,不做更新,作用等同于
INSERT IGNORE INTO
,详情请参见INSERT INTO。示例如下:INSERT INTO adb_demo.adb_import_test SELECT * FROM adb_external_db.test_adb;
如果需要将特定分区的数据导入
adb_demo.adb_import_test
,可以执行:INSERT INTO adb_demo.adb_import_test SELECT * FROM adb_external_db.test_adb WHERE dt = '202207';
方式二:执行INSERT OVERWRITE INTO导入数据,会覆盖表中原有的数据。示例如下:
INSERT OVERWRITE INTO adb_demo.adb_import_test SELECT * FROM adb_external_db.test_adb;
方式三:异步执行INSERT OVERWRITE INTO导入数据。通常使用
SUBMIT JOB
提交异步任务,由后台调度,可以在写入任务前增加Hint(/*+ direct_batch_load=true*/
)加速写入任务。详情请参见异步写入。示例如下:SUBMIT job INSERT OVERWRITE INTO adb_demo.adb_import_test SELECT * FROM adb_external_db.test_adb;
返回结果如下:
+---------------------------------------+ | job_id | +---------------------------------------+ | 2020112122202917203100908203303****** | +---------------------------------------+
关于异步提交任务详情,请参见异步提交导入任务。
弹性导入
进入SQL编辑器。
登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,在企业版、基础版或湖仓版页签下,单击目标集群ID。
在左侧导航栏,单击 。
创建数据库。如果有已创建的数据库,可以忽略本步骤。示例如下:
CREATE DATABASE adb_demo;
创建外表。
说明AnalyticDB for MySQL外表的名称需要和MaxCompute项目的名称相同,否则创建外表会失败。
AnalyticDB for MySQL外表和MaxCompute中表的字段名称、字段数量、字段顺序需要一致,字段类型需要兼容。
弹性导入仅支持
CREATE TABLE
语句创建外表。
CREATE TABLE IF NOT EXISTS test_adb ( id int, name string, age int, dt string ) ENGINE='ODPS' TABLE_PROPERTIES='{ "endpoint":"http://service.cn-hangzhou.maxcompute.aliyun-inc.com/api", "accessid":"LTAILd4****", "accesskey":"4A5Q7ZVzcYnWMQPysX****", "partition_column":"dt", "project_name":"test_adb", "table_name":"person" }';
外表支持设置的参数及参数说明,请参见参数说明。
查询数据。
SELECT * FROM adb_demo.test_adb;
返回结果如下:
+------+-------+------+---------+ | id | name | age | dt | +------+-------+------+---------+ | 1 | james | 10 | 202207 | | 2 | bond | 20 | 202207 | | 3 | jack | 30 | 202207 | | 4 | lucy | 40 | 202207 | +------+-------+------+---------+ 4 rows in set (0.35 sec)
在AnalyticDB for MySQL中创建表用于存储从MaxCompute中导入的数据。示例如下:
说明创建的内表和步骤3中创建的外表的字段名称、字段数量、字段顺序、字段类型必须相同。
CREATE TABLE IF NOT EXISTS adb_import_test ( id int, name string, age int, dt string, PRIMARY KEY(id,dt) ) DISTRIBUTED BY HASH(id) PARTITION BY VALUE('dt') LIFECYCLE 30;
导入数据。
重要弹性导入仅支持通过
INSERT OVERWRITE INTO
语句导入数据。方法一:执行INSERT OVERWRITE INTO弹性导入数据,会覆盖表中原有的数据。示例如下:
/*+ elastic_load=true, elastic_load_configs=[adb.load.resource.group.name=resource_group|spark.adb.eni.vswitchId=vsw-bp12ldm83z4zu9k4d****]*/ INSERT OVERWRITE INTO adb_demo.adb_import_test SELECT * FROM adb_demo.test_adb;
方法二:异步执行INSERT OVERWRITE INTO弹性导入数据。通常使用
SUBMIT JOB
提交异步任务,由后台调度。/*+ elastic_load=true, elastic_load_configs=[adb.load.resource.group.name=resource_group|spark.adb.eni.vswitchId=vsw-bp12ldm83z4zu9k4d****]*/ SUBMIT JOB INSERT OVERWRITE INTO adb_demo.adb_import_test SELECT * FROM adb_demo.test_adb;
重要异步提交弹性导入任务时,不支持设置优先级队列。
返回结果如下:
+---------------------------------------+ | job_id | +---------------------------------------+ | 2023081517192220291720310090151****** | +---------------------------------------+
使用
SUBMIT JOB
提交异步任务后,返回结果仅表示异步任务提交成功。您可以通过job_id终止异步任务或查询异步任务状态,判断任务是否执行成功。具体操作,请参见异步提交导入任务。Hint参数说明:
elastic_load:是否使用弹性导入方式。取值:true或false(默认值)。
elastic_load_configs:弹性导入方式支持配置的参数。参数需使用方括号([ ])括起来,且多个参数之间以竖线(|)分隔,支持配置的参数如下表所示:
参数
是否必填
说明
adb.load.resource.group.name
是
执行弹性导入任务的Job资源组名称。
adb.load.job.max.acu
否
单个弹性导入任务最多使用的资源。单位为ACU,最小值为5 ACU。默认值为集群Shard个数+1。
执行如下语句可查询集群Shard个数:
SELECT count(1) FROM information_schema.kepler_meta_shards;
spark.driver.resourceSpec
否
Spark driver的资源规格。默认值为small。取值范围,请参见Spark资源规格列表的型号列。
spark.executor.resourceSpec
否
Spark executor的资源规格。默认值为large。取值范围,请参见Spark资源规格列表的型号列。
spark.adb.executorDiskSize
否
Spark executor的磁盘容量,取值范围为(0,100],单位为GiB,默认值为10 Gi。更多信息,请参见指定Driver和Executor资源。
(可选)查看已提交的导入任务是否为弹性导入任务。
SELECT job_name, (job_type = 3) AS is_elastic_load FROM INFORMATION_SCHEMA.kepler_meta_async_jobs WHERE job_name = "2023081818010602101701907303151******";
返回结果如下:
+---------------------------------------+------------------+ | job_name | is_elastic_load | +---------------------------------------+------------------+ | 2023081517195203101701907203151****** | 1 | +---------------------------------------+------------------+
is_elastic_load
的返回值为1,表示已提交的导入任务是弹性导入任务;若为0,则表示已提交的导入任务是常规导入任务。