通过DataWorks周期性导入MaxCompute数据最佳实践

DataWorks调度任务,可以按照需求设置数据传输的时间和频率,并确保数据在传输和导入过程中的完整性和准确性。将MaxCompute分区表数据通过DataWorks导入Hologres分区表,实现两个平台的优势相互结合,从而提高数据处理效率和可靠性。

前提条件

注意事项

请确保MaxCompute租户级别或项目级别未开通Schema服务。关于Schema详细介绍,请参见Schema操作

操作步骤

  1. MaxCompute数据准备。

    1. 登录MaxCompute控制台

    2. 单击左侧导航栏DataWorks > 数据分析

    3. 数据分析页面,单击左侧导航栏SQL

    4. SQL查询页面,输入SQL语句创建分区表,单击运行

      本文以MaxCompute公共数据集public_data中的分区表dwd_ product_movie_basic_info为例。dwd_ product_movie_basic_info表结构示例如下。

      --MaxCompute分区表DDL
      CREATE TABLE IF NOT EXISTS public_data.dwd_product_movie_basic_info(
        movie_name STRING COMMENT '电影名称',
        director STRING COMMENT '导演',
        scriptwriter STRING COMMENT '编剧',
        area STRING COMMENT '制片地区/国家',
        actors STRING COMMENT '主演',
        `type` STRING COMMENT '类型',
        movie_length STRING COMMENT '电影长度',
        movie_date STRING COMMENT '上映日期',
        movie_language STRING COMMENT '语言',
        imdb_url STRING COMMENT 'imdb号'
      ) 
      PARTITIONED BY (ds STRING) STORED AS ALIORC;
    5. SQL查询页面,输入如下SQL语句用于查看分区表中的数据,单击运行

      数据准备查看分区表20170112分区的数据。

      SELECT * FROM public_data.dwd_product_movie_basic_info WHERE ds = '20170112';
  2. Hologres中新建外部表。

    新建一张Hologres外部表,用于映射MaxCompute源头表数据。外表的字段顺序和字段类型需要和MaxCompute表的一一对应。

    1. 登录Hologres管理控制台,进入HoloWeb开发页面新建SQL查询,详情请参见新建SQL查询

    2. 在新增的临时Query查询页面,选择已创建的实例名数据库后,请您在SQL查询的编辑框输入如下语句,单击运行

      如下语句使用import foreign schema命令,创建名称为dwd_product_movie_basic_info的Hologres外部表。

      import foreign schema public_data limit to (dwd_product_movie_basic_info) from server odps_server into public options(if_table_exist 'update');
  3. Hologres中新建真实存储表(内部表)。

    在Hologres中新建一张内部表,用于接收并存储数据。

    1. HoloWeb开发页面,单击新增SQL窗口

    2. 在新增的临时Query查询页面,选择已创建的实例名数据库后,请您在SQL查询的编辑框输入如下语句,单击运行

      本次示例是将MaxCompute分区表导入Hologres,因此需要在Hologres中创建的内部表为分区表。

      说明

      如下建表语句仅是简单示例,实际建表DDL请根据实时业务需要创建,并给表设置合理的索引,以达到更优的查询性能。

      BEGIN;
      CREATE TABLE "public"."holo_dwd_product_movie_basic_info" (
       "movie_name" text,
       "director" text,
       "scriptwriter" text,
       "area" text,
       "actors" text,
       "type" text,
       "movie_length" text,
       "movie_date" text,
       "movie_language" text,
       "imdb_url" text,
       "ds" text
      )
      PARTITION BY LIST (ds);
      CALL SET_TABLE_PROPERTY('"public"."holo_dwd_product_movie_basic_info"', 'orientation', 'column');
      comment on column "public"."holo_dwd_product_movie_basic_info"."movie_name" is '电影名称';
      comment on column "public"."holo_dwd_product_movie_basic_info"."director" is '导演';
      comment on column "public"."holo_dwd_product_movie_basic_info"."scriptwriter" is '编剧';
      comment on column "public"."holo_dwd_product_movie_basic_info"."area" is '制片地区/国家';
      comment on column "public"."holo_dwd_product_movie_basic_info"."actors" is '主演';
      comment on column "public"."holo_dwd_product_movie_basic_info"."type" is '类型';
      comment on column "public"."holo_dwd_product_movie_basic_info"."movie_length" is '电影长度';
      comment on column "public"."holo_dwd_product_movie_basic_info"."movie_date" is '上映日期';
      comment on column "public"."holo_dwd_product_movie_basic_info"."movie_language" is '语言';
      comment on column "public"."holo_dwd_product_movie_basic_info"."imdb_url" is 'imdb号';
      COMMIT;
  4. 新建分区子表数据开发。

    此步骤是一个Hologres SQL模块,用于分区表跑调度。

    1. 登录DataWorks控制台,进入数据开发页面,创建Hologres SQL节点,详情请参见Hologres SQL节点

    2. 在节点的编辑页面,输入如下语句。

      在Hologres中不支持直接将分区数据直接写入分区父表,因此需要在Hologres中创建对应MaxCompute分区表中分区键值的分区子表,然后将分区数据导入对应的分区子表。分区键值由参数${bizdate}控制,在调度系统中自动赋值完成周期性调度,调度参数的更多内容,请参见调度参数支持的格式

      说明

      导入的分区数据必须和分区键值(本文示例使用的是ds)保持一致,否则会出现报错。

      导入分区数据的逻辑场景比较多,下面有两个场景供参考,请您根据实际业务逻辑两者选其中一个。

      • 场景一:导入新的分区数据。

        --创建临时分区子表
        BEGIN;
        CREATE TABLE IF NOT EXISTS "public".tmp_holo_dwd_product_movie_basic_info_${bizdate}  (
         "movie_name" text,
         "director" text,
         "scriptwriter" text,
         "area" text,
         "actors" text,
         "type" text,
         "movie_length" text,
         "movie_date" text,
         "movie_language" text,
         "imdb_url" text,
         "ds" text
        );
        COMMIT;
        
        --更新外表数据
        import foreign schema public_data limit to (dwd_product_movie_basic_info) from server odps_server into public options(if_table_exist 'update');
        
        --等待30s再导入Hologres,以防Hologres meta信息更新缓存慢导致的数据不一致而同步不成功
        select pg_sleep(30); 
        
        --将MaxCompute数据导入临时分区子表
        INSERT INTO "public".tmp_holo_dwd_product_movie_basic_info_${bizdate} 
        SELECT 
            "movie_name",
            "director",
            "scriptwriter",
            "area",
            "actors",
            "type",
            "movie_length",
            "movie_date",
            "movie_language",
            "imdb_url",
            "ds"
        FROM "public".dwd_product_movie_basic_info
        WHERE ds='${bizdate}';
        
        --导入新的分区数据
        BEGIN;
        
        ALTER TABLE tmp_holo_dwd_product_movie_basic_info_${bizdate} RENAME TO holo_dwd_product_movie_basic_info_${bizdate};
        
        --将临时分区子表绑定在分区父表上
        ALTER TABLE holo_dwd_product_movie_basic_info ATTACH PARTITION holo_dwd_product_movie_basic_info_${bizdate} FOR VALUES in ('${bizdate}');
        
        COMMIT;
                                            
      • 场景二:重新对历史分区数据刷新。

        --创建临时分区子表
        BEGIN;
        CREATE TABLE IF NOT EXISTS "public".tmp_holo_dwd_product_movie_basic_info_${bizdate}  (
         "movie_name" text,
         "director" text,
         "scriptwriter" text,
         "area" text,
         "actors" text,
         "type" text,
         "movie_length" text,
         "movie_date" text,
         "movie_language" text,
         "imdb_url" text,
         "ds" text
        );
        COMMIT;
        
        --更新外表数据
        import foreign schema public_data limit to (dwd_product_movie_basic_info) from server odps_server into public options(if_table_exist 'update');
        
        --等待30s再导入Hologres,以防Hologres meta信息更新缓存慢导致的数据不一致而同步不成功
        select pg_sleep(30); 
        
        --将MaxCompute数据导入临时分区子表
        INSERT INTO "public".tmp_holo_dwd_product_movie_basic_info_${bizdate} 
        SELECT 
            "movie_name",
            "director",
            "scriptwriter",
            "area",
            "actors",
            "type",
            "movie_length",
            "movie_date",
            "movie_language",
            "imdb_url",
            "ds"
        FROM "public".dwd_product_movie_basic_info
        WHERE ds='${bizdate}';
        
        重新对历史分区数据刷新
        BEGIN;
        
        ALTER TABLE IF EXISTS holo_dwd_product_movie_basic_info DETACH PARTITION holo_dwd_product_movie_basic_info_${bizdate};
        
        DROP TABLE IF EXISTS holo_dwd_product_movie_basic_info_${bizdate};
        
        ALTER TABLE tmp_holo_dwd_product_movie_basic_info_${bizdate} RENAME TO holo_dwd_product_movie_basic_info_${bizdate};
        
        --将分区子表绑定在分区父表上
        ALTER TABLE holo_dwd_product_movie_basic_info ATTACH PARTITION holo_dwd_product_movie_basic_info_${bizdate} FOR VALUES in ('${bizdate}');
        
        COMMIT;
  5. 调度配置。

    Hologres SQL编辑页面,单击节点编辑区域右侧的调度配置,配置节点的调度属性。

    说明

    需要更改的参数如下,未提及参数请保持默认值。

    • 基础属性

      参数

      参数

      bizdate=${yyyymmdd}

    • 时间属性时间属性

      参数

      生成实例方式

      发布后即时生成

      重跑属性

      运行成功后不可重跑,运行失败后可以重跑

      定时调度时间

      00:05

    • 调度依赖

      调度依赖为root节点即可(也可以根据业务逻辑选择已有的父节点)。请先将代码解析选择为,然后单击代码解析,会自动解析出root节点,最后再将代码解析选择为

  6. 发布调度。

    1. Hologres SQL编辑页面,单击工具栏中的保存图标,保存节点。

    2. 单击工具栏中的提交图标,提交节点。

    3. 提交新版本对话框中,输入变更描述

    4. 单击确认

  7. 运维中心发布。

    1. Hologres SQL编辑页面,单击工具栏中最右侧的运维

    2. 进入运维中心页面,单击左侧菜单栏周期任务运维>周期任务

    3. 周期任务页面,右键单击节点,选择补数据>当前节点

      补数据

    4. 选择左侧菜单栏周期任务运维 > 补数据实例,查看正在运行的任务以及任务状态。

  8. 查看数据。

    任务执行成功之后,将会在Hologres中自动创建对应分区数据的分区子表。

    1. 进入DataStudio数据开发页面,创建Hologres SQL节点。

    2. 在节点的编辑页面,输入如下语句,进行数据查询。

      • 查看分区子表数据。

        select * from holo_dwd_product_movie_basic_info_20170112;
      • 查看分区父表总数据。

        select count (*) from holo_dwd_product_movie_basic_info;