逻辑分区表数据导入实践

本文介绍如何向逻辑分区表中导入数据。

导入原理

Hologres中导入数据时,数据会先被导入内存表(Memory Table),当内存数据足够多时,才会进一步Flush到文件并进行Compaction。详情请参见INSERT

对逻辑分区表而言,不同的分区数据,Hologres会存储在不同的文件中。因此,如果向逻辑分区表中同时导入多个分区的数据,可能会产生大量小文件。小文件数量过多会导致频繁Compaction,使实例负载压力增大,从而影响系统的性能和稳定性,详情请参见Compaction(Beta)

注意事项

基于上述原理,在数据导入至逻辑分区表时,需要注意:

  • 批量导入数据时,按照分区顺序依次导入,每次只支持导入1个分区。

  • 通过Fixed Plan导入时,支持同时导入的分区数量不超过5个。

  • 如果需要对历史分区进行回刷或批量更新,建议按分区错峰执行。

历史数据导入实践

通过Hologres内表/外表导入

对于可以直接通过Hologres访问的数据(Hologres内表、MaxCompute外表、OSS外表):

  • 推荐优先使用存储过程hg_insert_overwrite导入,详情请参见使用存储过程实现INSERT OVERWRITE功能。示例如下。

    -- 创建表A作为目标表
    CREATE TABLE public.tableA_lp(
      a TEXT, 
      b INT, 
      c TIMESTAMP, 
      d TEXT,
      ds TEXT,
      PRIMARY KEY(ds,b)
      )
      LOGICAL PARTITION BY LIST(ds);
    
    -- 创建表B作为数据输入
    BEGIN;
    CREATE TABLE public.tableB(
      a TEXT, 
      b INT, 
      c TIMESTAMP, 
      d TEXT,
      ds TEXT,
      PRIMARY KEY(ds,b)
      )
      PARTITION BY LIST(ds);
    CALL set_table_property('public.tableB', 'orientation', 'column');
    CREATE TABLE public.holo_child_3a PARTITION OF public.tableB FOR VALUES IN('20201215');
    CREATE TABLE public.holo_child_3b PARTITION OF public.tableB FOR VALUES IN('20201216');
    CREATE TABLE public.holo_child_3c PARTITION OF public.tableB FOR VALUES IN('20201217');
    COMMIT;
    
    INSERT INTO public.holo_child_3a VALUES('a',1,'2034-10-19','a','20201215');
    INSERT INTO public.holo_child_3b VALUES('b',2,'2034-10-20','b','20201216');
    INSERT INTO public.holo_child_3c VALUES('c',3,'2034-10-21','c','20201217');
    
    -- 使用存储过程hg_insert_overwrite将数据导入逻辑分区表
    CALL hg_insert_overwrite('public.tableA_lp' , '{20201215,20201216,20201217}'::text[],$$SELECT * FROM public.tableB$$);
  • 如果无法使用存储过程hg_insert_overwrite导入,则建议手动按分区顺序串行导入,避免一条SQL导入大量分区数据。

    CALL hg_internal_copy_data_before('public.tableA_lp');   -- 为逻辑分区表的导入任务设置文件合并优化参数
    INSERT INTO public.tableA_lp PARTITION (ds = '20250601') SELECT * FROM public.tableB WHERE ds = '20250601';
    INSERT INTO public.tableA_lp PARTITION (ds = '20250602') SELECT * FROM public.tableB WHERE ds = '20250602';
    ...
    CALL hg_internal_copy_data_after('public.tableA_lp');     -- 全部导完之后调用,还原相关参数
    说明

    建议使用hg_internal_copy_data_before参数,以优化逻辑分区表数据导入时的文件合并过程,提高导入稳定性和导入效率。

  • 若需灵活定义数据导入的分区范围且难以明确指定目标分区,推荐采用自定义存储过程结合原生INSERT OVERWRITE实现。以下为“导入最近x个分区数据”的使用示例。存储过程将自动按分区串行执行导入任务,兼顾了导入任务的灵活性与稳定性。

    --- target表, 有且仅有一个名为'ds', typedate的分区列. src表也有一个同名的ds列
    CREATE TABLE src(a INT, ds DATE);
    CREATE TABLE target(a INT, ds DATE NOT NULL) logical PARTITION BY list(ds);
    
    --- 将过去last_x_days(包括今日)的逻辑分区insert ovewrite.
    CREATE OR REPLACE PROCEDURE insert_overwrite_wrapper(target_table regclass, select_sql TEXT, last_x_days INT)
    LANGUAGE 'plpgsql'
    AS $$
    DECLARE
        insert_sql TEXT;
        range_days DATE[];
        element DATE;
    
    BEGIN
        IF last_x_days = 0 THEN
          RETURN;
        END IF;
        
        --- 计算目标分区, 从 current_date-last_x_days+1 到 current_date
        SELECT ARRAY_AGG(generate_series::DATE) INTO range_days FROM generate_series(CURRENT_DATE - last_x_days + 1, CURRENT_DATE, '1 day');
    
        --- 发起一个全新的txn
        COMMIT; 
        raise notice 'begin;';
        
        --- 开启dml transaction
        SET LOCAL hg_experimental_enable_transaction = on;
        
        --- 拼sql
        FOREACH element IN ARRAY range_days LOOP
            insert_sql = 'INSERT OVERWRITE ' || quote_ident(target_table::text)
            || ' PARTITION (ds = ' || quote_literal(element::text) || ')'
            || ' ' || select_sql || ' where ds = ' || quote_literal(element::text) || ';'; 
            EXECUTE insert_sql;
            raise notice '%', insert_sql;
        END LOOP;
        COMMIT;
        raise notice 'end;';
    END;
    $$;
    
    -- 导入最近30个分区
    CALL insert_overwrite_wrapper('target','select * from src', '30');