Hologres V2.0版本新增存储过程(hg_insert_overwrite),使用此存储过程帮助您简单、快捷地将MaxCompute或Hologres表数据导入Hologres内部表。本文为您介绍如何使用存储过程将MaxCompute或Hologres表数据导入Hologres内部表。
注意事项
Hologres从 V2.0.15版本开始,执行
set hg_experimental_hg_insert_overwrite_enable_view=on;
命令开启GUC后,hg_insert_overwrite
命令支持往有普通视图依赖的表导入数据;暂不支持使用hg_insert_overwrite
命令往有物化视图依赖的表中导入数据。如果选择部分字段导入,字段顺序需要与源表保持一致且一一对应。
对于Hologres V2.0.11以前的版本,导入失败需要手动清理临时;自 V2.0.11版本开始,系统会自动清理临时表。
因hg_insert_overwirte需要以表Owner的身份新建一张临时表,因此仅Superuser和表的Owner有权限执行hg_insert_overwrite操作。
使用说明
命令语法:
call hg_insert_overwrite('holo_table' regclass, ['partition_value' text|int|varchar|date], 'sql' text);
参数说明:
holo_table:Hologres的内部表,即数据目标存储表,表必须已经存在,如果是分区表,需要指定partition_value。
partition_value:分区表的分区值,holo_table为分区表时指定。支持将INT、TEXT、VARCHAR或DATE类型数据作为分区值。
sql:标准的
select
语句,可用来查询MaxCompute或者Hologres的表,需确保select
出来的分区字段的值必须完全等于partition_value。如果SQL语句中含有单引号(' '),需要通过$$sql$$
改写sql,以自动实现单引号转义。
命令示例:
分区表写入:将test1表数据写入parent_table表的20230421分区。
call hg_insert_overwrite('parent_table','20230421',$$select * from test1 where partition_value='20230421'$$);
非分区表写入:将test1表数据写入holo_table表。
call hg_insert_overwrite('holo_table','select * from test1');
使用示例
示例1:MaxCompute非分区表数据导入Hologres
准备MaxCompute的非分区表数据。
在MaxCompute中创建一张非分区源数据表,或直接选用已创建的非分区表。示例选用MaxCompute公告数据集public_data项目下的
customer
表数据,其表DDL以及数据如下。--MaxCompute公共数据集的表DDL CREATE TABLE IF NOT EXISTS public_data.customer( c_customer_sk BIGINT, c_customer_id STRING, c_current_cdemo_sk BIGINT, c_current_hdemo_sk BIGINT, c_current_addr_sk BIGINT, c_first_shipto_date_sk BIGINT, c_first_sales_date_sk BIGINT, c_salutation STRING, c_first_name STRING, c_last_name STRING, c_preferred_cust_flag STRING, c_birth_day BIGINT, c_birth_month BIGINT, c_birth_year BIGINT, c_birth_country STRING, c_login STRING, c_email_address STRING, c_last_review_date STRING, useless STRING); --在MaxCompute中查询表是否有数据 SELECT * FROM public_data.customer;
部分数据如下:
Hologres中创建一张外部表。
在Hologres中创建一张外部表,用于映射MaxCompute中的源头数据表。示例如下:
CREATE FOREIGN TABLE customer ( "c_customer_sk" int8, "c_customer_id" text, "c_current_cdemo_sk" int8, "c_current_hdemo_sk" int8, "c_current_addr_sk" int8, "c_first_shipto_date_sk" int8, "c_first_sales_date_sk" int8, "c_salutation" text, "c_first_name" text, "c_last_name" text, "c_preferred_cust_flag" text, "c_birth_day" int8, "c_birth_month" int8, "c_birth_year" int8, "c_birth_country" text, "c_login" text, "c_email_address" text, "c_last_review_date" text, "useless" text ) SERVER odps_server OPTIONS (project_name 'public_data', table_name 'customer');
参数说明如下:
参数
说明
SERVER
您可以直接调用Hologres底层已创建的名为odps_server的外部表服务器。详细原理请参见Postgres FDW。
project_name
MaxCompute表所在的项目名称。
table_name
需要查询的MaxCompute表名称。
在Hologres中建立一张内部表
在Hologres中建立一张内部表,用于接收MaxCompute源头表数据。示例DDL如下:
说明本DDL仅是给初步的DDL示例,实际导入数据时建表请根据业务情况设置表结构并设置合适的索引,以达到更优的查询性能,表属性说明请参见建表概述。
--示例建一张列存表 BEGIN; CREATE TABLE public.holo_customer ( "c_customer_sk" int8, "c_customer_id" text, "c_current_cdemo_sk" int8, "c_current_hdemo_sk" int8, "c_current_addr_sk" int8, "c_first_shipto_date_sk" int8, "c_first_sales_date_sk" int8, "c_salutation" text, "c_first_name" text, "c_last_name" text, "c_preferred_cust_flag" text, "c_birth_day" int8, "c_birth_month" int8, "c_birth_year" int8, "c_birth_country" text, "c_login" text, "c_email_address" text, "c_last_review_date" text, "useless" text ); COMMIT;
导入数据至Hologres。
通过导入函数语句可以将MaxCompute源头表中的数据导入到Hologres内部表中。示例DDL如下:
说明如果MaxCompute的数据会定期更新,建议在导入之前使用IMPORT FOREIGN SCHEMA语句刷新外部表元数据,以获取最新的MaxCompute数据。
IMPORT FOREIGN SCHEMA <project_name> LIMIT to (customer) FROM server odps_server INTO PUBLIC options(if_table_exist 'update');--更新外部表 select pg_sleep(30);--等待一些时间再导入Hologres,以防Hologres meta信息更新缓存慢导致的数据不一致而同步不成功 call hg_insert_overwrite('holo_customer', 'SELECT * FROM customer where c_birth_year > 1980');
数据查询。
在Hologres中查询MaxCompute源表中的数据,示例如下:
SELECT * FROM holo_customer limit 10;
示例返回结果如下:
示例2:MaxCompute分区数据导入查询
准备MaxCompute的分区表数据。
在MaxCompute中创建一张分区表,其表DDL以及数据如下。
DROP TABLE IF EXISTS odps_sale_detail; CREATE TABLE IF NOT EXISTS odps_sale_detail ( shop_name STRING ,customer_id STRING ,total_price DOUBLE ) PARTITIONED BY ( sale_date STRING ) ; -- 向源表增加分区20210815 ALTER TABLE odps_sale_detail ADD IF NOT EXISTS PARTITION(sale_date='20210815') ; -- 向分区写入数据 INSERT OVERWRITE TABLE odps_sale_detail PARTITION(sale_date='20210815') VALUES ('s1','c1',100.1), ('s2','c2',100.2), ('s3','c3',100.3) ;
Hologres中创建一张外部表。
在Hologres中创建一张外部表,用于映射MaxCompute中的源头数据表。示例如下:
DROP FOREIGN TABLE IF EXISTS odps_sale_detail; -- 创建外部表 IMPORT FOREIGN SCHEMA <maxcompute_project> LIMIT to ( odps_sale_detail ) FROM SERVER odps_server INTO public OPTIONS(if_table_exist 'error',if_unsupported_type 'error');
maxcompute_project为MaxCompute数据表所在的项目名称。
创建Hologres的内表。
在Hologres中建立一张内部表,用于接收MaxCompute源头表数据。示例DDL如下:
DROP TABLE IF EXISTS holo_sale_detail; -- 创建Hologres分区表(内部表) BEGIN ; CREATE TABLE IF NOT EXISTS holo_sale_detail ( shop_name TEXT ,customer_id TEXT ,total_price FLOAT8 ,sale_date TEXT ) PARTITION BY LIST(sale_date); COMMIT;
导入数据至Hologres。
通过导入函数语句可以MaxCompute源头表中的数据导入到Hologres内部表中。示例如下:
call hg_insert_overwrite('holo_sale_detail', '20210815', $$ SELECT * FROM public.odps_sale_detail WHERE sale_date='20210815' $$);
查询数据。
在Hologres中查询MaxCompute源表中的数据,示例如下:
SELECT * FROM holo_sale_detail;
返回结果如下:
示例3:Hologres内部表数据导入Hologres分区表
创建Hologres内部表并准备数据。
在Hologres中建立一张非分区表(内部表),示例DDL如下:
BEGIN; CREATE TABLE public.data( c_customer_id int, c_shop_id int, c_first_sales_date_sk timestamp, c_email_address text ); COMMIT; INSERT INTO public.data VALUES (1,101,'2023-04-03 10:00:00','test123@alibaba.com'), (2,102,'2023-04-03 11:00:00','test123@alibaba.com'), (2,102,'2023-04-03 17:00:00','test123@alibaba.com');
创建Hologres的分区表。
在Hologres中建立一张分区表(内部表),用于接收数据。示例DDL如下:
BEGIN; CREATE TABLE public.hologres_parent( c_customer_id int, c_shop_id int, c_first_sales_date_sk timestamp, c_email_address text ) PARTITION BY LIST(c_customer_id); CREATE TABLE public.hologres_child1 PARTITION OF public.hologres_parent FOR VALUES IN(1); CREATE TABLE public.hologres_child2 PARTITION OF public.hologres_parent FOR VALUES IN(2); CREATE TABLE public.hologres_child3 PARTITION OF public.hologres_parent FOR VALUES IN(3); COMMIT;
Hologres内表数据导入Hologres分区表。
call hg_insert_overwrite('public.hologres_parent','1', 'SELECT * FROM public.data where c_customer_id=1');
查询数据。
在Hologres中查询分区表,示例如下:
SELECT * FROM public.hologres_parent;
返回结果如下:
示例4:Hologres内部表数据导入Hologres非分区表
创建Hologres内部表并准备数据。
在Hologres中建立一张非分区表(内部表),示例DDL如下:
BEGIN; CREATE TABLE public.data( c_customer_id int, c_shop_id int, c_first_sales_date_sk timestamp, c_email_address text ); COMMIT; INSERT INTO public.data VALUES (1,101,'2023-04-03 10:00:00','test123@alibaba.com'), (2,102,'2023-04-03 11:00:00','test123@alibaba.com'), (2,102,'2023-04-03 17:00:00','test123@alibaba.com');
创建Hologres的非分区表。
在Hologres中建立一张非分区表(内部表),用于接收数据。示例DDL如下:
BEGIN; CREATE TABLE public.re_data( c_customer_id int, c_shop_id int, c_first_sales_date_sk timestamp, c_email_address text ); COMMIT;
Hologres内表数据导入Hologres非分区表。
call hg_insert_overwrite('public.re_data', $$SELECT * FROM public.data where c_first_sales_date_sk='2023-04-03 17:00:00' and c_email_address ='test123@alibaba.com'$$);
查询数据。
在Hologres中查询非分区表,示例如下:
SELECT * FROM public.re_data;
返回结果示例如下:
常见问题
导入数据失败如何解决?
问题现象:MaxCompute数据导入时发生OOM,提示超出内存限制异常。一般报错为:
Query executor exceeded total memory limitation xxxxx: yyyy bytes used
。解决方案:
排查步骤1:当导入Query包含查询,但部分表没有执行analyze,导致查询优化器决策Join Order有误,会引起内存开销增高。
解决方法:对所有参与的内表、外表执行
analyze <tablename>
命令,更新表的统计元信息,可以帮助查询优化器生成更优的执行计划。排查步骤2:当表的列数较多,单行数据量较大时,单次读取的数据量会更大,通过在SQL前加以下参数来控制单次读取数据行数,可以有效减少OOM情况。
set hg_experimental_query_batch_size = 1024;--默认为8192 call hg_insert_overwrite('xx','select xx from xx');
排查步骤3:降低导入的并发度,也会有效减少导入过程中的内存开销,并发度通过参数
hg_experimental_foreign_table_executor_max_dop
控制,默认为实例的Core数,可以在导入时为参数设置更小的值,降低导入的内存使用。set hg_experimental_foreign_table_executor_max_dop = 8; call hg_insert_overwrite('xx','select xx from xx');
排查步骤4:降低执行DML语句的并发度,该并发度通过参数
hg_foreign_table_executor_dml_max_dop
控制,默认为32,可以在导入时设置更小的值,避免导入操作占用过多资源。set hg_foreign_table_executor_dml_max_dop = 16; call hg_insert_overwrite('xx','select xx from xx');
后续操作:临时表清理。
外部表导入内部表的过程中,为了保持比较好的原子性,增加了创建临时表,数据先写入临时表,最后通过重命名操作来替换完成,以此可以保证对于查询的影响最小。如果导入数据失败,不再进行重试导入,请执行清理临时表的操作。
清理命令:
非分区表清理命令:
drop table if exists system_insert_overwrite_${<holo_table>}_temp;
holo_table需要替换为实际导入数据的内部表名称。
存在的分区表清理命令:
通过传递的参数
partition_values
找到分区子表,比如holo_table_chile。drop table if exists system_insert_overwrite_${holo_table_chile}_temp;
holo_table_chile需要替换为实际的分区子表名称。
不存在的分区表清理命令:
drop table if exists ${holo_table}_${partition_values};
holo_table需要替换为实际准备导入内部表的分区父表名称;partition_values需要替换为实际准备导入的分区值。
清理示例
非分区表清理。
执行语句:
call hg_insert_overwrite('ads_holo_table','select * from ads_holo_table_foreign');
执行语句对应的临时表清理语句:
drop table if exists system_insert_overwrite_ads_holo_table_temp;
导入数据至存在的分区表清理。
执行语句:
call hg_insert_overwrite('ads_holo_table','20221207','select * from ads_holo_table_foreign');
执行语句对应的临时表清理语句:
drop table if exists system_insert_overwrite_ads_holo_table_20221207_temp;
导入数据至不存在的分区表清理。
执行语句:
call hg_insert_overwrite('ads_holo_table','20221208','select * from ads_holo_table_foreign');
执行语句对应的临时表清理语句:
drop table if exists ads_holo_table_20221208;
常见报错
ERROR: can not InsertOverwrite table feedback_detail because materialized view feedback_day_top depends on it
可能原因:导入数据的Table有物化视图依赖的,目前不支持。
ERROR: InsertOverwrite insert select table data failed : new row for relation "xxx" violates partition constraint
可能原因:导入数据的表是分区表,目标分区里面写入了非分区值的数据。
- 本页导读 (1)