Hologres V2.0版本开始支持INSERT OVERWRITE存储过程,方便用户进行大批量数据的全量写入或者分区级数据批量写入。本文为您介绍在Hologres中如何使用存储过程实现INSERT OVERWRITE功能,如您实例版本低于V2.0,请升级实例,详情请参见实例升级。如您暂时不方便升级,也可使用临时表方式实现INSERT OVERWRITE功能。
功能说明
Hologres V3.0版本增强了hg_insert_overwrite能力,支持通过INSERT OVERWRITE命令直接导入数据至分区父表。
Hologres从V2.0.15版本开始,支持通过
set hg_experimental_hg_insert_overwrite_enable_view=on;
命令开启GUC,实现向有视图依赖的表中导入数据;暂不支持向有物化视图依赖的表中导入数据。V3.0版本起,无需设置上述GUC,即可支持向有视图依赖的表中导入数据;暂不支持向有物化视图依赖的表中导入数据。
对于Hologres V2.0.11以前的版本,导入失败需要手动清理临时表;自V2.0.11版本开始,系统会自动清理临时表。
使用限制
如果选择部分字段导入,字段顺序需要与源表保持一致且一一对应。
由于hg_insert_overwrite需要以表Owner的身份新建一张临时表,因此仅Superuser和表的Owner有权限执行hg_insert_overwrite操作。
目标表的分区键支持INT、TEXT或VARCHAR类型。
Hologres V3.0版本起,明确要求不能在事务中使用hg_insert_overwrite,执行会报错。
说明在旧版本的事务中使用hg_insert_overwrite也会有潜在问题,但很多时候可以正常正确执行,新版本将更加严格。
行为变更
Hologres V3.0版本起,hg_insert_overwrite有如下行为变更:
仅有target_table和sql两个入参时,如果目标表是分区父表,则V3.0版本前会直接报错,V3.0版本起可能写入成功(当select_query执行结果对应的分区子表都已存在时),也可能报错(当select_query执行结果对应的分区子表不存在时)。
如果hg_insert_overwrite执行中途被Cancel,则V3.0版本起需要执行如下SQL清理临时表,V3.0版本前不需要清理临时表。
--- 删除before_time之前系统创建的临时表 CALL hg_clean_insert_overwrite_tmp_tables(before_time::timestamptz);
使用存储过程实现INSERT OVERWRITE功能
命令格式
-- V3.0版本前的hg_insert_overwrite语法
CALL hg_insert_overwrite('<target_table>' regclass, ['<partition_value>' text], '<sql>' text);
-- V3.0及以上版本的hg_insert_overwrite语法
CALL hg_insert_overwrite('<target_table>' regclass, ['<partition_value>' array], '<sql>' text, ['<auto_create_partition>' bool]);
参数说明
Hologres V3.0版本起,hg_insert_overwrite语句中的partition_value数据类型改为ARRAY类型,即支持写入分区父表并指定多个分区子表。您仍可使用TEXT类型作为partition_value的入参,但此时只支持写入一张分区子表。
参数 | 说明 |
target_table | Hologres的内部表。 即数据目标存储表,表必须已经存在。 |
partition_value | 分区表的分区值。
|
sql | 标准的SELECT语句。 可用来查询MaxCompute或者Hologres的表,需确保SELECT出来的分区字段值必须完全等于
|
auto_create_partition | 是否自动创建分区。仅V3.0及以上版本支持该参数。
|
V3.0版本起,针对INSERT OVERWRITE分区父表,即target_table
为分区父表的情况,不同参数设置的行为如下:
不指定
partition_value
参数时:auto_create_partition
值说明
TRUE
sql
执行结果对应的target_table
分区,全部执行数据覆写。如果有不存在的分区子表,则先自动创建分区。与
sql
执行结果无关的target_table
分区,忽略。
FALSE
sql
执行结果中,如果对应的target_table
分区都存在:执行结果对应的
target_table
分区,全部执行数据覆写。与执行结果无关的
target_table
分区,忽略。
sql
执行结果中,如果有对应的target_table
分区不存在,则直接报错,其余已存在的分区也不执行覆写。
指定
partition_value
参数时:auto_create_partition
值说明
TRUE
对于
partition_value
指定的target_table
分区:如果分区实际不存在:自动创建分区。
sql
执行结果对应的分区:执行数据覆写。与
sql
执行结果无关的分区:直接清空。
对于
partition_value
未指定的target_table
分区:sql
执行结果如果包含未指定的分区:不处理。与
sql
执行结果无关的分区:不处理。
FALSE
对于
partition_value
指定的target_table
分区:如果分区实际不存在:直接报错,其余分区也不执行覆写。
sql
执行结果对应的分区:执行数据覆写。与
sql
执行结果无关的分区:直接清空。
对于
partition_value
未指定的target_table
分区:sql
执行结果如果包含未指定的分区:不处理。与
sql
执行结果无关的分区:不处理。
使用示例
场景一:使用存储过程将Hologres内部表数据导入Hologres非分区表
-- 创建表A作为目标表
BEGIN;
CREATE TABLE public.tablea (
cid integer NOT NULL,
cname text,
code integer
,PRIMARY KEY (cid)
);
CALL set_table_property('public.tablea', 'orientation', 'column');
CALL set_table_property('public.tablea', 'storage_format', 'orc');
CALL set_table_property('public.tablea', 'bitmap_columns', 'cname');
CALL set_table_property('public.tablea', 'dictionary_encoding_columns', 'cname:auto');
CALL set_table_property('public.tablea', 'distribution_key', 'cid');
CALL set_table_property('public.tablea', 'time_to_live_in_seconds', '3153600000');
COMMIT;
-- 创建表B作为数据输入
CREATE TABLE public.tableb (
cid integer NOT NULL,
cname text,
code integer
,PRIMARY KEY (cid)
);
INSERT INTO public.tableb VALUES(1,'aaa',10001),(2,'bbb','10002');
-- 使用hg_insert_overwrite 将表B数据插入表A
CALL hg_insert_overwrite('public.tablea' , 'SELECT * FROM public.tableb');
场景二:使用存储过程将Hologres内部表数据导入Hologres分区表
-- 创建表A作为目标表
BEGIN;
CREATE TABLE public.tableA(
a text ,
b int,
c timestamp,
d text,
ds text,
PRIMARY key(ds,b)
)
PARTITION BY LIST(ds);
CALL set_table_property('public.tableA', 'orientation', 'column');
CREATE TABLE public.holo_child_1 PARTITION OF public.tableA FOR VALUES IN('20201215');
CREATE TABLE public.holo_child_2 PARTITION OF public.tableA FOR VALUES IN('20201216');
CREATE TABLE public.holo_child_3 PARTITION OF public.tableA FOR VALUES IN('20201217');
COMMIT;
-- 创建表B作为数据输入
BEGIN;
CREATE TABLE public.tableB(
a text ,
b int,
c timestamp,
d text,
ds text,
PRIMARY key(ds,b)
)
PARTITION BY LIST(ds);
CALL set_table_property('public.tableB', 'orientation', 'column');
CREATE TABLE public.holo_child_3a PARTITION OF public.tableB FOR VALUES IN('20201215');
CREATE TABLE public.holo_child_3b PARTITION OF public.tableB FOR VALUES IN('20201216');
CREATE TABLE public.holo_child_3c PARTITION OF public.tableB FOR VALUES IN('20201217');
COMMIT;
INSERT INTO public.holo_child_3a VALUES('a',1,'2034-10-19','a','20201215');
INSERT INTO public.holo_child_3b VALUES('b',2,'2034-10-20','b','20201216');
INSERT INTO public.holo_child_3c VALUES('c',3,'2034-10-21','c','20201217');
-- 使用insert overwrite 将表B数据插入表A
CALL hg_insert_overwrite('public.tableA' , '20201215',$$SELECT * FROM public.tableB WHERE ds='20201215'$$);
场景三:使用存储过程将MaxCompute非分区表数据导入Hologres
-- 在MaxCompute中创建一张非分区表。示例选用MaxCompute公告数据集public_data项目下的customer表数据,其表DDL如下。
CREATE TABLE IF NOT EXISTS public_data.customer(
c_customer_sk BIGINT,
c_customer_id STRING,
c_current_cdemo_sk BIGINT,
c_current_hdemo_sk BIGINT,
c_current_addr_sk BIGINT,
c_first_shipto_date_sk BIGINT,
c_first_sales_date_sk BIGINT,
c_salutation STRING,
c_first_name STRING,
c_last_name STRING,
c_preferred_cust_flag STRING,
c_birth_day BIGINT,
c_birth_month BIGINT,
c_birth_year BIGINT,
c_birth_country STRING,
c_login STRING,
c_email_address STRING,
c_last_review_date STRING,
useless STRING);
-- 在Hologres中创建一张外部表,用于映射MaxCompute中的源头数据表。
CREATE FOREIGN TABLE customer (
"c_customer_sk" int8,
"c_customer_id" text,
"c_current_cdemo_sk" int8,
"c_current_hdemo_sk" int8,
"c_current_addr_sk" int8,
"c_first_shipto_date_sk" int8,
"c_first_sales_date_sk" int8,
"c_salutation" text,
"c_first_name" text,
"c_last_name" text,
"c_preferred_cust_flag" text,
"c_birth_day" int8,
"c_birth_month" int8,
"c_birth_year" int8,
"c_birth_country" text,
"c_login" text,
"c_email_address" text,
"c_last_review_date" text,
"useless" text
)
SERVER odps_server
OPTIONS (project_name 'public_data', table_name 'customer');
-- 在Hologres中建立一张内部表(以列存表为例),用于接收MaxCompute源头表数据。
BEGIN;
CREATE TABLE public.holo_customer (
"c_customer_sk" int8,
"c_customer_id" text,
"c_current_cdemo_sk" int8,
"c_current_hdemo_sk" int8,
"c_current_addr_sk" int8,
"c_first_shipto_date_sk" int8,
"c_first_sales_date_sk" int8,
"c_salutation" text,
"c_first_name" text,
"c_last_name" text,
"c_preferred_cust_flag" text,
"c_birth_day" int8,
"c_birth_month" int8,
"c_birth_year" int8,
"c_birth_country" text,
"c_login" text,
"c_email_address" text,
"c_last_review_date" text,
"useless" text
);
COMMIT;
-- 导入数据至Hologres。
IMPORT FOREIGN SCHEMA <project_name> LIMIT TO
(customer) FROM server odps_server INTO PUBLIC options(if_table_exist 'update');--更新外部表
SELECT pg_sleep(30);--等待一些时间再导入Hologres,以防Hologres meta信息更新缓存慢导致的数据不一致而同步不成功
CALL hg_insert_overwrite('holo_customer', 'SELECT * FROM customer where c_birth_year > 1980');
-- 在Hologres中查询MaxCompute源表中的数据。
SELECT * FROM holo_customer limit 10;
场景四:使用存储过程将MaxCompute分区表数据导入Hologres
-- 在MaxCompute中创建一张分区表。
DROP TABLE IF EXISTS odps_sale_detail;
CREATE TABLE IF NOT EXISTS odps_sale_detail
(
shop_name STRING
,customer_id STRING
,total_price DOUBLE
)
PARTITIONED BY
(
sale_date STRING
)
;
-- 向源表增加分区20210815
ALTER TABLE odps_sale_detail ADD IF NOT EXISTS PARTITION(sale_date='20210815')
;
-- 向分区写入数据
INSERT OVERWRITE TABLE odps_sale_detail PARTITION(sale_date='20210815') VALUES
('s1','c1',100.1),
('s2','c2',100.2),
('s3','c3',100.3)
;
-- 在Hologres中创建一张外部表,用于映射MaxCompute中的源头数据表。
DROP FOREIGN TABLE IF EXISTS odps_sale_detail;
-- 创建外部表
IMPORT FOREIGN SCHEMA <maxcompute_project> LIMIT TO
(
odps_sale_detail
)
FROM SERVER odps_server INTO public
OPTIONS(if_table_exist 'error',if_unsupported_type 'error');
-- 在Hologres中建立一张内部表,用于接收MaxCompute源头表数据。
DROP TABLE IF EXISTS holo_sale_detail;
-- 创建Hologres分区表(内部表)
BEGIN ;
CREATE TABLE IF NOT EXISTS holo_sale_detail
(
shop_name TEXT
,customer_id TEXT
,total_price FLOAT8
,sale_date TEXT
)
PARTITION BY LIST(sale_date);
COMMIT;
-- 导入数据至Hologres。
CALL hg_insert_overwrite('holo_sale_detail', '20210815', $$SELECT * FROM public.odps_sale_detail WHERE sale_date='20210815'$$);
-- 在Hologres中查询MaxCompute源表中的数据。
SELECT * FROM holo_sale_detail;
场景五:使用存储过程将MaxCompute分区表数据导入Hologres分区父表
-- 在MaxCompute中创建一张分区表。
DROP TABLE IF EXISTS odps_sale_detail;
CREATE TABLE IF NOT EXISTS odps_sale_detail
(
shop_name STRING
,customer_id STRING
,total_price DOUBLE
)
PARTITIONED BY
(
sale_date STRING
)
;
-- 向源表增加分区20210815和20210816
ALTER TABLE odps_sale_detail ADD IF NOT EXISTS PARTITION(sale_date='20210815')
;
ALTER TABLE odps_sale_detail ADD IF NOT EXISTS PARTITION(sale_date='20210816')
;
-- 向分区写入数据
INSERT OVERWRITE TABLE odps_sale_detail PARTITION(sale_date='20210815') VALUES
('s1','c1',100.1),
('s2','c2',100.2),
('s3','c3',100.3)
;
INSERT OVERWRITE TABLE odps_sale_detail PARTITION(sale_date='20210816') VALUES
('s1','c1',100.1),
('s2','c2',100.2),
('s3','c3',100.3)
;
-- 在Hologres中创建一张外部表,用于映射MaxCompute中的源头数据表。
DROP FOREIGN TABLE IF EXISTS odps_sale_detail;
-- 创建外部表
IMPORT FOREIGN SCHEMA <maxcompute_project> LIMIT TO
(
odps_sale_detail
)
FROM SERVER odps_server INTO public
OPTIONS(if_table_exist 'error',if_unsupported_type 'error');
-- 在Hologres中建立一张内部表,用于接收MaxCompute源头表数据。
DROP TABLE IF EXISTS holo_sale_detail;
-- 创建Hologres分区表(内部表)
BEGIN ;
CREATE TABLE IF NOT EXISTS holo_sale_detail
(
shop_name TEXT
,customer_id TEXT
,total_price FLOAT8
,sale_date TEXT
)
PARTITION BY LIST(sale_date);
COMMIT;
-- 导入数据至Hologres。不指定分区子表且auto_create_partition为TRUE,系统会自动创建两个分区子表并导入数据
CALL hg_insert_overwrite ('holo_sale_detail', $$SELECT * FROM public.odps_sale_detail$$, TRUE);
-- 在Hologres中查询数据。
SELECT * FROM holo_sale_detail;
参数说明:
maxcompute_project:MaxCompute分区表所在的项目名称。
使用临时表实现INSERT OVERWRITE功能
命令格式
您可以使用如下SQL语句实现INSERT OVERWRITE的功能。
BEGIN ;
-- 清理潜在的临时表
DROP TABLE IF EXISTS <table_new>;
-- 创建临时表
SET hg_experimental_enable_create_table_like_properties=on;
CALL HG_CREATE_TABLE_LIKE ('<table_new>', 'select * from <table>');
COMMIT ;
-- 向临时表插入数据
INSERT INTO <table_new> [( <column> [, ...] )]
VALUES ( {<expression>} [, ...] )
[, ...] | <query>}
ANALYZE <table_new>;
BEGIN ;
-- 删除旧表
DROP TABLE IF EXISTS <table>;
-- 临时表改名
ALTER TABLE <table_new> RENAME TO <table>;
COMMIT ;
参数说明
参数 | 说明 |
table_new | 新创建的临时表名称。 表名称也可以使用 |
table | 已存在的表名称。 表名称也可以使用 |
临时表DDL | 创建临时表有如下两种方式。
|
使用示例
场景一:MaxCompute向Hologres的非分区表导入数据
在MaxCompute向Hologres导入数据的场景中,希望将数据全量覆盖,常见于离线加工后的结果表导出为线上服务表。此场景使用示例如下所示,将MaxCompute中的odps_region_10g表的数据写入Hologres的region表中,且将Hologres中region表的数据全量覆盖。
BEGIN ;
-- 清理潜在的临时表
DROP TABLE IF EXISTS public.region_new;
-- 创建临时表
SET hg_experimental_enable_create_table_like_properties=on;
CALL HG_CREATE_TABLE_LIKE ('public.region_new', 'select * from public.region');
COMMIT ;
-- 向临时表插入数据
INSERT INTO public.region_new
SELECT *
FROM public.odps_region_10g;
ANALYZE public.region_new;
BEGIN ;
-- 删除旧表
DROP TABLE IF EXISTS public.region;
-- 临时表改名
ALTER TABLE IF EXISTS public.region_new RENAME TO region;
COMMIT ;
场景二:MaxCompute向Hologres的分区表导入数据
在每天定期更新MaxCompute分区表的数据,且需要将MaxCompute分区表向Hologres的分区表导入数据的场景中,希望将数据全量覆盖,实现离线数据对实时数据的修正。此场景使用示例如下所示,将MaxCompute中的odps_lineitem_10g表的数据写入Hologres的lineitem表中,且全量覆盖Hologres中lineitem表的数据,两个表都是按照ds字段按天分区。
BEGIN ;
-- 清理潜在的临时表
DROP TABLE IF EXISTS public.lineitem_new_20210101;
-- 创建临时表
SET hg_experimental_enable_create_table_like_properties=on;
CALL HG_CREATE_TABLE_LIKE ('public.lineitem_new_20210101', 'select * from public.lineitem');
COMMIT ;
-- 向临时表插入数据
INSERT INTO public.lineitem_new_20210101
SELECT *
FROM public.odps_lineitem_10g
WHERE DS = '20210101'
ANALYZE public.lineitem_new_20210101;
BEGIN ;
-- 删除旧分区
DROP TABLE IF EXISTS public.lineitem_20210101;
-- 临时表改名
ALTER TABLE public.lineitem_new_20210101 RENAME TO lineitem_20210101;
-- 将临时表绑定至指定分区表
ALTER TABLE public.lineitem ATTACH PARTITION lineitem_20210101 FOR VALUES IN ('20210101');
COMMIT ;
场景三:Hologres向MaxCompute的非分区表导入数据
如果您需要从Hologres向MaxCompute的非分区表导入数据,建议采用临时表导入的方式,导入完成后将临时表改名为正式表即可。此场景使用示例如下所示,将Hologres中holotable表的数据写入MaxCompute的mc_holotable表中,且将MaxCompute的mc_holotable表数据全量覆盖。
-- 在MC中创建目标表的临时表
CREATE TABLE if not exists mc_holotable_temp(
age int,
job string,
name string
);
-- 在Hologres中创建临时表的映射
CREATE FOREIGN TABLE "public"."mapping_holotable_temp" (
"age" int,
"job" text,
"name" text
)
SERVER odps_server
OPTIONS (project_name 'DLF_test',table_name 'mc_holotable_temp');
-- 在Hologres中更新原始表
UPDATE holotable SET "job" = 'president' WHERE "name" = 'Lily';
-- 将更新后的数据写入临时表的映射
INSERT INTO mapping_holotable_temp SELECT * FROM holotable;
-- 在MaxCompute中删除旧的目标表
DROP TABLE IF EXISTS mc_holotable;
-- 临时表更名为目标表即可
ALTER TABLE mc_holotable_temp RENAME TO mc_holotable;
导入数据支持部分导入和全表导入两种方式:
导出部分字段示例:
INSERT INTO mapping_holotable_temp SELECT x,x,x FROM holotable; --x,x,x可以替换为您需要导出的字段名
导出全部字段示例:
INSERT INTO mapping_holotable_temp SELECT * FROM holotable;