RDS增量数据同步至MaxCompute

本文以同步业务RDS数据库的数据至MaxCompute为例,为您介绍如何对不同场景的数据进行增量同步。

背景信息

根据需要同步的数据在写入后是否发生变化,分为恒定的存量数据(通常是日志数据)和持续更新的数据(例如人员表中,人员的状态会发生变化)。

根据幂等性原则(一个任务多次运行的结果一致,则该任务支持重跑调度。如果该任务出现错误,脏数据较容易清理),每次导入数据都是导入至一张单独的表或分区中,或者覆盖历史记录。

本文定义任务测试时间是2016年11月14日,在14日进行增量同步,同步历史数据至分区ds=20161113中。增量同步的场景配置了自动调度,把增量数据在15日凌晨同步至分区ds=20161114中。数据中的时间字段optime用来表示该数据的修改时间,从而判断这条数据是否为增量数据。

使用说明

  • 部分数据源暂无增量同步方案,例如HBase、OTSStream数据源等。具体数据源是否支持增量同步可以看具体的Reader插件文档。

  • 每个插件实现增量同步的所配置的参数可能不同,具体参数配置可以参考对应的Reader插件文档,详情可参考:支持的数据源与读写插件

  • 增量同步配置相关说明详情请参见:场景:配置增量数据离线同步任务

对恒定的存量数据进行增量同步

由于数据生成后不会发生变化,因此可以很方便地根据数据的生成规律进行分区。较常见的是根据日期进行分区,例如每天1个分区。

  1. 在RDS数据库中,执行下述语句准备数据。

    drop table if exists oplog;
    create table if not exists oplog(
    optime DATETIME,
    uname varchar(50),
    action varchar(50),
    status varchar(10)
     );
    Insert into oplog values(str_to_date('2016-11-11','%Y-%m-%d'),'LiLei','SELECT','SUCCESS');
    Insert into oplog values(str_to_date('2016-11-12','%Y-%m-%d'),'HanMM','DESC','SUCCESS');

    上述的两条数据作为历史数据,需要先进行一次全量数据同步,将历史数据同步至昨天的分区。

  2. 数据开发页面,右键单击业务流程下的,选择新建表

  3. 新建表对话框中,输入表名(ods_oplog),单击提交

  4. 双击ods_oplog表,在右侧的编辑页面单击DDL模式,输入下述建表语句。

    --创建好MaxCompute表,按天进行分区。
    create table if not exists ods_oplog(
     optime datetime,
     uname string,
     action string,
     status string
    ) partitioned by (ds string);
  5. 配置同步历史数据的任务,详情请参见通过向导模式配置离线同步任务

    测试同步任务成功后,单击节点编辑页面右侧的调度配置,勾选暂停调度并重新提交或发布,避免任务自动调度执行。

  6. 执行下述语句,向RDS源头表中插入数据作为增量数据。

    insert into oplog values(CURRENT_DATE,'Jim','Update','SUCCESS');
    insert into oplog values(CURRENT_DATE,'Kate','Delete','Failed'); 
    insert into oplog values(CURRENT_DATE,'Lily','Drop','Failed');
  7. 配置同步增量数据的任务。

    数据来源中设置数据过滤为date_format(optime,'%Y%m%d')=${bdp.system.bizdate},在数据去向中设置分区信息为${bdp.system.bizdate}

    说明

    通过配置数据过滤,在15日凌晨进行同步时,您可以查询14日源头表全天新增的数据,并同步至目标表的增量分区中。

  8. 查看同步结果。

    单击节点编辑页面右侧的调度配置,设置任务的调度周期为天调度。提交或发布任务后,第2天任务将自动调度执行。执行成功后,即可查看MaxCompute目标表的数据。

对持续更新的数据进行增量同步

根据数据仓库反映历史变化的特点,建议每天对人员表、订单表等会发生变化的数据进行全量同步,即每天保存的都是全量数据,方便您获取历史数据和当前数据。

真实场景中因为某些特殊情况,需要每天只进行增量同步。但MaxCompute不支持Update语句修改数据,只能通过其它方式实现。下文将为您介绍两种同步策略(全量同步、增量同步)的具体操作。

  1. 执行下述语句准备数据。

    drop table if exists user ;
    create table if not exists user(
        uid int,
        uname varchar(50),
        deptno int,
        gender VARCHAR(1),
        optime DATETIME
        );
    --历史数据
    insert into user values (1,'LiLei',100,'M',str_to_date('2016-11-13','%Y-%m-%d'));
    insert into user values (2,'HanMM',null,'F',str_to_date('2016-11-13','%Y-%m-%d'));
    insert into user values (3,'Jim',102,'M',str_to_date('2016-11-12','%Y-%m-%d'));
    insert into user values (4,'Kate',103,'F',str_to_date('2016-11-12','%Y-%m-%d'));
    insert into user values (5,'Lily',104,'F',str_to_date('2016-11-11','%Y-%m-%d'));
    --增量数据
    update user set deptno=101,optime=CURRENT_TIME  where uid = 2; --null改成非null
    update user set deptno=104,optime=CURRENT_TIME  where uid = 3; --非null改成非null
    update user set deptno=null,optime=CURRENT_TIME  where uid = 4; --非null改成null
    delete from user where uid = 5;
    insert into user(uid,uname,deptno,gender,optime) values (6,'Lucy',105,'F',CURRENT_TIME);
  2. 进行数据同步。

    • 每天全量同步

      1. 执行下述语句创建MaxCompute表,新建表的详情请参见新建表

        --全量同步
        create table ods_user_full(
            uid bigint,
            uname string,
            deptno bigint,
            gender string,
            optime DATETIME 
        ) partitioned by (ds string);
      2. 配置全量同步任务。

        说明

        需要每天都全量同步,因此任务的调度周期需要配置为天调度。

      3. 运行任务,并查看同步后MaxCompute目标表的结果。

        因为每天都是全量同步,没有全量和增量的区别,所以第2天任务自动调度执行成功后,即可看到数据结果。

    • 每天增量同步

      不推荐使用该方式,只有在不支持Delete语句、无法通过SQL语句查看被删除的数据等场景才会考虑。虽然实际上大多使用逻辑删除数据,将Delete转化为Update进行处理。但仍会限制一些特殊的业务场景不能实现,导致数据不一致。并且同步后需要合并新增数据和历史数据。

      准备数据

      需要创建两张表,一张写当前的最新数据,一张写增量数据。

      --结果表
      create table dw_user_inc(
          uid bigint,
          uname string,
          deptno bigint,
          gender string,
          optime DATETIME 
      );
      --增量记录表
      create table ods_user_inc(
          uid bigint,
          uname string,
          deptno bigint,
          gender string,
          optime DATETIME 
      )
      1. 配置同步任务,将全量数据直接写入结果表。

        说明

        只需要执行一次,执行成功后需要单击页面右侧的调度配置,勾选暂停调度

      2. 配置同步任务,将增量数据写入到增量表。设置数据过滤,即where参数配置为date_format(optime,'%Y%m%d')=${bdp.system.bizdate}

      3. 执行下述语句合并数据。

        insert overwrite table dw_user_inc 
        select 
        --所有select操作,如果ODS表有数据,说明发生了变动,以ODS表为准。
        case when b.uid is not null then b.uid else a.uid end as uid,
        case when b.uid is not null then b.uname else a.uname end as uname,
        case when b.uid is not null then b.deptno else a.deptno end as deptno,
        case when b.uid is not null then b.gender else a.gender end as gender,
        case when b.uid is not null then b.optime else a.optime end as optime
        from 
        dw_user_inc a 
        full outer join ods_user_inc b
        on a.uid  = b.uid ;

        查看执行结果会发现Delete的记录没有同步成功。

    每天增量同步的优点是同步的增量数据量较小,但可能出现数据不一致的情况,并且需要通过额外的计算进行数据合并。

    如果不是必要情况,对持续更新的数据进行每天全量同步即可。如果希望历史数据仅保留一定的时间,自动删除超出保留时间的数据,您可以设置Lifecycle