文档

使用存储过程将数据导入Hologres内部表(Beta)

更新时间:

Hologres V2.0版本新增存储过程(hg_insert_overwrite),使用此存储过程帮助您简单、快捷地将MaxCompute或Hologres表数据导入Hologres内部表。本文为您介绍如何使用存储过程将MaxCompute或Hologres表数据导入Hologres内部表。

注意事项

  • Hologres从 V2.0.15版本开始,执行set hg_experimental_hg_insert_overwrite_enable_view=on;命令开启GUC后,hg_insert_overwrite命令支持往有普通视图依赖的表导入数据;暂不支持使用hg_insert_overwrite命令往有物化视图依赖的表中导入数据。

  • 如果选择部分字段导入,字段顺序需要与源表保持一致且一一对应。

  • 对于Hologres V2.0.11以前的版本,导入失败需要手动清理临时;自 V2.0.11版本开始,系统会自动清理临时表。

  • 因hg_insert_overwirte需要以表Owner的身份新建一张临时表,因此仅Superuser和表的Owner有权限执行hg_insert_overwrite操作。

使用说明

  • 命令语法:

    call hg_insert_overwrite('holo_table' regclass, ['partition_value' text|int|varchar|date], 'sql' text);
  • 参数说明:

    • holo_table:Hologres的内部表,即数据目标存储表,表必须已经存在,如果是分区表,需要指定partition_value

    • partition_value:分区表的分区值,holo_table为分区表时指定。支持将INT、TEXT、VARCHAR或DATE类型数据作为分区值。

    • sql:标准的select语句,可用来查询MaxCompute或者Hologres的表,需确保select出来的分区字段的值必须完全等于partition_value。如果SQL语句中含有单引号(' '),需要通过$$sql$$改写sql,以自动实现单引号转义。

  • 命令示例:

    • 分区表写入:将test1表数据写入parent_table表的20230421分区。

      call hg_insert_overwrite('parent_table','20230421',$$select * from test1 where partition_value='20230421'$$);
    • 非分区表写入:将test1表数据写入holo_table表。

      call hg_insert_overwrite('holo_table','select * from test1');

使用示例

示例1:MaxCompute非分区表数据导入Hologres

  1. 准备MaxCompute的非分区表数据。

    在MaxCompute中创建一张非分区源数据表,或直接选用已创建的非分区表。示例选用MaxCompute公告数据集public_data项目下的customer表数据,其表DDL以及数据如下。

    --MaxCompute公共数据集的表DDL
    CREATE TABLE IF NOT EXISTS public_data.customer(
      c_customer_sk BIGINT,
      c_customer_id STRING,
      c_current_cdemo_sk BIGINT,
      c_current_hdemo_sk BIGINT,
      c_current_addr_sk BIGINT,
      c_first_shipto_date_sk BIGINT,
      c_first_sales_date_sk BIGINT,
      c_salutation STRING,
      c_first_name STRING,
      c_last_name STRING,
      c_preferred_cust_flag STRING,
      c_birth_day BIGINT,
      c_birth_month BIGINT,
      c_birth_year BIGINT,
      c_birth_country STRING,
      c_login STRING,
      c_email_address STRING,
      c_last_review_date STRING,
      useless STRING);
    
    --在MaxCompute中查询表是否有数据
    SELECT * FROM public_data.customer;

    部分数据如下:

    image..png

  2. Hologres中创建一张外部表。

    在Hologres中创建一张外部表,用于映射MaxCompute中的源头数据表。示例如下:

    CREATE FOREIGN TABLE customer (
        "c_customer_sk" int8,
        "c_customer_id" text,
        "c_current_cdemo_sk" int8,
        "c_current_hdemo_sk" int8,
        "c_current_addr_sk" int8,
        "c_first_shipto_date_sk" int8,
        "c_first_sales_date_sk" int8,
        "c_salutation" text,
        "c_first_name" text,
        "c_last_name" text,
        "c_preferred_cust_flag" text,
        "c_birth_day" int8,
        "c_birth_month" int8,
        "c_birth_year" int8,
        "c_birth_country" text,
        "c_login" text,
        "c_email_address" text,
        "c_last_review_date" text,
        "useless" text
    )
    SERVER odps_server
    OPTIONS (project_name 'public_data', table_name 'customer');
    

    参数说明如下:

    参数

    说明

    SERVER

    您可以直接调用Hologres底层已创建的名为odps_server的外部表服务器。详细原理请参见Postgres FDW

    project_name

    MaxCompute表所在的项目名称。

    table_name

    需要查询的MaxCompute表名称。

  3. 在Hologres中建立一张内部表

    在Hologres中建立一张内部表,用于接收MaxCompute源头表数据。示例DDL如下:

    说明

    本DDL仅是给初步的DDL示例,实际导入数据时建表请根据业务情况设置表结构并设置合适的索引,以达到更优的查询性能,表属性说明请参见建表概述

    --示例建一张列存表
    BEGIN;
    CREATE TABLE public.holo_customer (
     "c_customer_sk" int8,
     "c_customer_id" text,
     "c_current_cdemo_sk" int8,
     "c_current_hdemo_sk" int8,
     "c_current_addr_sk" int8,
     "c_first_shipto_date_sk" int8,
     "c_first_sales_date_sk" int8,
     "c_salutation" text,
     "c_first_name" text,
     "c_last_name" text,
     "c_preferred_cust_flag" text,
     "c_birth_day" int8,
     "c_birth_month" int8,
     "c_birth_year" int8,
     "c_birth_country" text,
     "c_login" text,
     "c_email_address" text,
     "c_last_review_date" text,
     "useless" text
    );
    COMMIT;
  4. 导入数据至Hologres。

    通过导入函数语句可以将MaxCompute源头表中的数据导入到Hologres内部表中。示例DDL如下:

    说明

    如果MaxCompute的数据会定期更新,建议在导入之前使用IMPORT FOREIGN SCHEMA语句刷新外部表元数据,以获取最新的MaxCompute数据。

    IMPORT FOREIGN SCHEMA <project_name> LIMIT to
    (customer) FROM server odps_server INTO PUBLIC options(if_table_exist 'update');--更新外部表
    select pg_sleep(30);--等待一些时间再导入Hologres,以防Hologres meta信息更新缓存慢导致的数据不一致而同步不成功
    
    call  hg_insert_overwrite('holo_customer', 'SELECT * FROM customer where c_birth_year > 1980');
  5. 数据查询。

    在Hologres中查询MaxCompute源表中的数据,示例如下:

    SELECT * FROM holo_customer limit 10;

    示例返回结果如下:

    image..png

示例2:MaxCompute分区数据导入查询

  1. 准备MaxCompute的分区表数据。

    在MaxCompute中创建一张分区表,其表DDL以及数据如下。

    DROP TABLE IF EXISTS odps_sale_detail;
    
    CREATE TABLE IF NOT EXISTS odps_sale_detail 
    (
        shop_name STRING
        ,customer_id STRING
        ,total_price DOUBLE
    )
    PARTITIONED BY 
    (
        sale_date STRING
    )
    ;
    
    -- 向源表增加分区20210815
    ALTER TABLE odps_sale_detail ADD IF NOT EXISTS PARTITION(sale_date='20210815')
    ;
    
    -- 向分区写入数据
    INSERT OVERWRITE TABLE odps_sale_detail PARTITION(sale_date='20210815') VALUES 
    ('s1','c1',100.1),
    ('s2','c2',100.2),
    ('s3','c3',100.3)
    ;
  2. Hologres中创建一张外部表。

    在Hologres中创建一张外部表,用于映射MaxCompute中的源头数据表。示例如下:

    DROP FOREIGN TABLE IF EXISTS odps_sale_detail;
    
    -- 创建外部表
    IMPORT FOREIGN SCHEMA <maxcompute_project> LIMIT to
    (
        odps_sale_detail
    ) 
    FROM SERVER odps_server INTO public 
    OPTIONS(if_table_exist 'error',if_unsupported_type 'error');

    maxcompute_project为MaxCompute数据表所在的项目名称。

  3. 创建Hologres的内表。

    在Hologres中建立一张内部表,用于接收MaxCompute源头表数据。示例DDL如下:

    DROP TABLE IF EXISTS holo_sale_detail;
    
    -- 创建Hologres分区表(内部表)
    BEGIN ;
    CREATE TABLE IF NOT EXISTS holo_sale_detail
    (
        shop_name TEXT
        ,customer_id TEXT 
        ,total_price FLOAT8
        ,sale_date TEXT
    )
    PARTITION BY LIST(sale_date);
    COMMIT;
  4. 导入数据至Hologres。

    通过导入函数语句可以MaxCompute源头表中的数据导入到Hologres内部表中。示例如下:

    call hg_insert_overwrite('holo_sale_detail', '20210815', $$ SELECT * FROM public.odps_sale_detail  WHERE sale_date='20210815' $$);
  5. 查询数据。

    在Hologres中查询MaxCompute源表中的数据,示例如下:

    SELECT * FROM holo_sale_detail;

    返回结果如下:

    image

示例3:Hologres内部表数据导入Hologres分区表

  1. 创建Hologres内部表并准备数据。

    在Hologres中建立一张非分区表(内部表),示例DDL如下:

    BEGIN;
    CREATE TABLE public.data(
      c_customer_id int, 
      c_shop_id int, 
      c_first_sales_date_sk timestamp, 
      c_email_address text
    ); 
    COMMIT;
    
    INSERT INTO public.data VALUES 
    (1,101,'2023-04-03 10:00:00','test123@alibaba.com'), 
    (2,102,'2023-04-03 11:00:00','test123@alibaba.com'), 
    (2,102,'2023-04-03 17:00:00','test123@alibaba.com');
  2. 创建Hologres的分区表。

    在Hologres中建立一张分区表(内部表),用于接收数据。示例DDL如下:

    BEGIN;
    CREATE TABLE public.hologres_parent(
      c_customer_id int, 
      c_shop_id int, 
      c_first_sales_date_sk timestamp, 
      c_email_address text
    ) 
      PARTITION BY LIST(c_customer_id);
    CREATE TABLE public.hologres_child1 PARTITION OF public.hologres_parent FOR VALUES IN(1);
    CREATE TABLE public.hologres_child2 PARTITION OF public.hologres_parent FOR VALUES IN(2);
    CREATE TABLE public.hologres_child3 PARTITION OF public.hologres_parent FOR VALUES IN(3);
    COMMIT;
  3. Hologres内表数据导入Hologres分区表。

    call hg_insert_overwrite('public.hologres_parent','1', 'SELECT * FROM public.data where c_customer_id=1');
  4. 查询数据。

    在Hologres中查询分区表,示例如下:

    SELECT * FROM public.hologres_parent;

    返回结果如下:

    image..png

示例4:Hologres内部表数据导入Hologres非分区表

  1. 创建Hologres内部表并准备数据。

    在Hologres中建立一张非分区表(内部表),示例DDL如下:

    BEGIN;
    CREATE TABLE public.data(
      c_customer_id int, 
      c_shop_id int, 
      c_first_sales_date_sk timestamp, 
      c_email_address text
    ); 
    COMMIT;
    
    INSERT INTO public.data VALUES 
    (1,101,'2023-04-03 10:00:00','test123@alibaba.com'), 
    (2,102,'2023-04-03 11:00:00','test123@alibaba.com'), 
    (2,102,'2023-04-03 17:00:00','test123@alibaba.com');
  2. 创建Hologres的非分区表。

    在Hologres中建立一张非分区表(内部表),用于接收数据。示例DDL如下:

    BEGIN;
    CREATE TABLE public.re_data(
      c_customer_id int, 
      c_shop_id int, 
      c_first_sales_date_sk timestamp, 
      c_email_address text
    ); 
    COMMIT;
  3. Hologres内表数据导入Hologres非分区表。

    call hg_insert_overwrite('public.re_data', $$SELECT * FROM public.data where 
                             c_first_sales_date_sk='2023-04-03 17:00:00' and c_email_address ='test123@alibaba.com'$$);
  4. 查询数据。

    在Hologres中查询非分区表,示例如下:

    SELECT * FROM public.re_data;

    返回结果示例如下:

    image..png

常见问题

导入数据失败如何解决?

  • 问题现象:MaxCompute数据导入时发生OOM,提示超出内存限制异常。一般报错为:Query executor exceeded total memory limitation xxxxx: yyyy bytes used

  • 解决方案:

    • 排查步骤1:当导入Query包含查询,但部分表没有执行analyze,导致查询优化器决策Join Order有误,会引起内存开销增高。

      解决方法:对所有参与的内表、外表执行analyze <tablename>命令,更新表的统计元信息,可以帮助查询优化器生成更优的执行计划。

    • 排查步骤2:当表的列数较多,单行数据量较大时,单次读取的数据量会更大,通过在SQL前加以下参数来控制单次读取数据行数,可以有效减少OOM情况。

      set hg_experimental_query_batch_size = 1024;--默认为8192
      call  hg_insert_overwrite('xx','select xx from xx');
    • 排查步骤3:降低导入的并发度,也会有效减少导入过程中的内存开销,并发度通过参数hg_experimental_foreign_table_executor_max_dop控制,默认为实例的Core数,可以在导入时为参数设置更小的值,降低导入的内存使用。

      set hg_experimental_foreign_table_executor_max_dop = 8;
      call  hg_insert_overwrite('xx','select xx from xx');
    • 排查步骤4:降低执行DML语句的并发度,该并发度通过参数hg_foreign_table_executor_dml_max_dop控制,默认为32,可以在导入时设置更小的值,避免导入操作占用过多资源。

      set hg_foreign_table_executor_dml_max_dop = 16;
      call  hg_insert_overwrite('xx','select xx from xx');
  • 后续操作:临时表清理。

    外部表导入内部表的过程中,为了保持比较好的原子性,增加了创建临时表,数据先写入临时表,最后通过重命名操作来替换完成,以此可以保证对于查询的影响最小。如果导入数据失败,不再进行重试导入,请执行清理临时表的操作。

    • 清理命令:

      • 非分区表清理命令:

        drop table if  exists system_insert_overwrite_${<holo_table>}_temp;

        holo_table需要替换为实际导入数据的内部表名称。

      • 存在的分区表清理命令:

        通过传递的参数partition_values找到分区子表,比如holo_table_chile。

        drop table if  exists system_insert_overwrite_${holo_table_chile}_temp;

        holo_table_chile需要替换为实际的分区子表名称。

      • 不存在的分区表清理命令:

        drop table if  exists ${holo_table}_${partition_values};

        holo_table需要替换为实际准备导入内部表的分区父表名称;partition_values需要替换为实际准备导入的分区值。

    • 清理示例

      • 非分区表清理。

        执行语句:

        call  hg_insert_overwrite('ads_holo_table','select * from ads_holo_table_foreign');

        执行语句对应的临时表清理语句:

        drop table if exists system_insert_overwrite_ads_holo_table_temp;
      • 导入数据至存在的分区表清理。

        执行语句:

        call  hg_insert_overwrite('ads_holo_table','20221207','select * from ads_holo_table_foreign');

        执行语句对应的临时表清理语句:

        drop table if  exists system_insert_overwrite_ads_holo_table_20221207_temp;
      • 导入数据至不存在的分区表清理。

        执行语句:

        call  hg_insert_overwrite('ads_holo_table','20221208','select * from ads_holo_table_foreign');

        执行语句对应的临时表清理语句:

        drop table if exists ads_holo_table_20221208;

常见报错

ERROR: can not InsertOverwrite table feedback_detail because materialized view feedback_day_top depends on it

可能原因:导入数据的Table有物化视图依赖的,目前不支持。

ERROR: InsertOverwrite insert select table data failed : new row for relation "xxx" violates partition constraint

可能原因:导入数据的表是分区表,目标分区里面写入了非分区值的数据。

  • 本页导读 (1)
文档反馈