需要同步的数据根据数据写入后是否会发生变化,分为不会发生变化的数据(通常是日志数据)和会变化的数据(人员表,例如人员的状态会发生变化)。

示例说明

针对以上两种数据场景,需要设计不同的同步策略,本文以把业务RDS数据库的数据同步至MaxCompute为例。

根据等幂性原则(1个任务多次运行的结果一致,则该任务支持重跑调度。如果该任务出现错误,脏数据较容易清理),每次导入数据都是导入到一张单独的表/分区中,或者覆盖历史记录。

本文定义任务测试时间是2016年11月14日,在14日进行增量同步,同步历史数据至分区ds=20161113中。增量同步的场景配置了自动调度,把增量数据在15日凌晨同步至分区ds=20161114中。数据中的时间字段optime,用来表示该数据的修改时间,从而判断这条数据是否为增量数据。

不变的数据进行增量同步

由于数据生成后不会发生变化,因此可以很方便地根据数据的生成规律进行分区,较常见的是根据日期进行分区,例如每天1个分区。

  1. 执行下述语句准备数据。
    drop table if exists oplog;
    create table if not exists oplog(
    optime DATETIME,
    uname varchar(50),
    action varchar(50),
    status varchar(10)
     );
    Insert into oplog values(str_to_date('2016-11-11','%Y-%m-%d'),'LiLei','SELECT','SUCCESS');
    Insert into oplog values(str_to_date('2016-11-12','%Y-%m-%d'),'HanMM','DESC','SUCCESS');

    上述的两条数据作为历史数据,需先做一次全量数据同步,将历史数据同步到昨天的分区。

  2. 执行下述语句创建MaxCompute表。
    --创建好MaxCompute表,按天进行分区
    create table if not exists ods_oplog(
     optime datetime,
     uname string,
     action string,
     status string
    ) partitioned by (ds string);
  3. 配置同步历史数据的任务,详情请参见创建同步任务
    测试同步任务成功后,单击右侧的调度配置,勾选暂停调度并重新提交/发布,避免任务自动调度执行。
    调度配置
  4. 执行下述语句,向RDS源头表中插入数据作为增量数据。
    insert into oplog values(CURRENT_DATE,'Jim','Update','SUCCESS');
    insert into oplog values(CURRENT_DATE,'Kate','Delete','Failed'); 
    insert into oplog values(CURRENT_DATE,'Lily','Drop','Failed');
  5. 配置同步增量数据的任务。

    在数据来源中设置数据过滤为date_format(optime,'%Y%m%d')=${bdp.system.bizdate},在数据去向中设置分区信息为${bdp.system.bizdate}

    说明 通过配置数据过滤,在15日凌晨进行同步时,可以查询14日源头表全天新增的数据,并同步至目标表的增量分区中。
  6. 查看同步结果。

    单击右侧的调度配置,设置任务的调度周期为天调度。提交/发布任务后,第2天任务将自动调度执行。执行成功后,即可查看MaxCompute目标表的数据。

会变的数据进行增量同步

根据数据仓库反映历史变化的特点,建议每天对人员表、订单表等会发生变化的数据进行全量同步,即每天保存的都是全量数据,方便您获取历史数据和当前数据。

真实场景中因为某些特殊情况,需要每天只做增量同步。但MaxCompute不支持Update语句修改数据,只能用其它方法实现。下文将为您介绍两种同步策略(全量同步、增量同步)的具体操作。

数据准备

drop table if exists user ;
create table if not exists user(
    uid int,
    uname varchar(50),
    deptno int,
    gender VARCHAR(1),
    optime DATETIME
    );
--历史数据
insert into user values (1,'LiLei',100,'M',str_to_date('2016-11-13','%Y-%m-%d'));
insert into user values (2,'HanMM',null,'F',str_to_date('2016-11-13','%Y-%m-%d'));
insert into user values (3,'Jim',102,'M',str_to_date('2016-11-12','%Y-%m-%d'));
insert into user values (4,'Kate',103,'F',str_to_date('2016-11-12','%Y-%m-%d'));
insert into user values (5,'Lily',104,'F',str_to_date('2016-11-11','%Y-%m-%d'));
--增量数据
update user set deptno=101,optime=CURRENT_TIME  where uid = 2; --null改成非null
update user set deptno=104,optime=CURRENT_TIME  where uid = 3; --非null改成非null
update user set deptno=null,optime=CURRENT_TIME  where uid = 4; --非null改成null
delete from user where uid = 5;
insert into user(uid,uname,deptno,gender,optime) values (6,'Lucy',105,'F',CURRENT_TIME);
  • 每天全量同步
    1. 执行下述语句创建MaxCompute表,新建表的详情请参见新建表
      --全量同步
      create table ods_user_full(
          uid bigint,
          uname string,
          deptno bigint,
          gender string,
          optime DATETIME 
      ) partitioned by (ds string);ring);
    2. 配置全量同步任务。
      说明 需要每天都全量同步,因此任务的调度周期需要配置为天调度。
    3. 运行任务,并查看同步后MaxCompute目标表的结果。

      因为每天都是全量同步,没有全量和增量的区别,所以第2天任务自动调度执行成功后,即可看到数据结果。

  • 每天增量同步

    不推荐使用此方式,只有如不支持Delete语句,无法通过SQL语句查看被删除的数据等场景才会考虑。虽然实际上很少直接删除数据,都是使用逻辑删除,将Delete转化为Update进行处理。但仍会限制一些特殊的业务场景不能实现,导致数据不一致。并且同步后需要合并新增数据和历史数据。

    数据准备

    需要创建两张表,一张写当前的最新数据,一张写增量数据。
    --结果表
    create table dw_user_inc(
        uid bigint,
        uname string,
        deptno bigint,
        gender string,
        optime DATETIME 
    );
    --增量记录表
    create table ods_user_inc(
        uid bigint,
        uname string,
        deptno bigint,
        gender string,
        optime DATETIME 
    )
    1. 配置同步任务,将全量数据直接写入结果表。
      说明 只需执行一次,执行成功后需单击页面右侧的调度配置,勾选暂停调度
    2. 配置同步任务,将增量数据写入到增量表。设置数据过滤为date_format(optime,'%Y%m%d')=${bdp.system.bizdate}
    3. 合并数据。
      insert overwrite table dw_user_inc 
      select 
      --所有select操作,如果ODS表有数据,说明发生了变动,以ODS表为准。
      case when b.uid is not null then b.uid else a.uid end as uid,
      case when b.uid is not null then b.uname else a.uname end as uname,
      case when b.uid is not null then b.deptno else a.deptno end as deptno,
      case when b.uid is not null then b.gender else a.gender end as gender,
      case when b.uid is not null then b.optime else a.optime end as optime
      from 
      dw_user_inc a 
      full outer join ods_user_inc b
      on a.uid  = b.uid ;

      查看执行结果会发现Delete的记录没有同步成功。

每天增量同步的优点是同步的增量数据量较小,但可能出现数据不一致的,并且需要用额外的计算进行数据合并。

如果不是必要情况,会变化的数据进行每天全量同步即可。如果对历史数据希望只保留一定的时间,超出时间的进行自动删除,可以设置Lifecycle。