全部产品
大数据开发套件

数据增量同步

更新时间:2017-08-22 09:58:17   分享:   

需要同步的两种数据

把需要同步的数据,根据数据写入后是否会发生变化,分为不会发生变化的数据(一般是日志数据)和会变化的数据(人员表,比如说人员的状态会发生变化)。

示例说明

针对以上两种数据场景,需要设计不同的同步策略。这里以把业务RDS数据库的数据同步到MaxCompute为例做一些说明,其他的数据源的道理是一样。

根据等幂性原则(一个任务多次运行的结果一样,则该任务支持重跑调度,因此该任务若出现错误,清理脏数据会比较容易),每次导入数据都是导入到一张单独的表/分区里,或者覆盖里面的历史记录。

本文定义任务测试时间是2016-11-14,全量同步是在14号做的,同步历史数据到ds=20161113这个分区里。至于本文涉及的增量同步的场景,配置了自动调度,把增量数据在15号凌晨同步到ds=20161114的分区里。数据里有一个时间字段optime,用来表示这条数据的修改时间,从而判断这条数据是否是增量数据。

不变的数据进行增量同步

这个场景,由于数据生成后就不会发生变化,因此可以很方便地根据数据的生成规律进行分区,较常见的是根据日期进行分区,比如每天一个分区。

数据准备

  1. drop table if exists oplog;
  2. create table if not exists oplog(
  3. optime DATETIME,
  4. uname varchar(50),
  5. action varchar(50),
  6. status varchar(10)
  7. );
  8. Insert into oplog values(str_to_date('2016-11-11','%Y-%m-%d'),'LiLei','SELECT','SUCCESS');
  9. Insert into oplog values(str_to_date('2016-11-12','%Y-%m-%d'),'HanMM','DESC','SUCCESS');

这里有2条数据,当成历史数据。需先做一次全量数据同步,将历史数据同步到昨天的分区。

操作步骤

步骤一:创建MaxCompute表

  1. --创建好MaxCompute表,按天进行分区
  2. create table if not exists ods_oplog(
  3. optime datetime,
  4. uname string,
  5. action string,
  6. status string
  7. ) partitioned by (ds string);

步骤二:配置同步历史数据的任务:

screenshot

因为只需要执行一次,所以只需操作一次测试即可。测试成功后,到”数据开发”模块把任务的状态改成暂停(最右边的调度配置里)并重新提交/发布,避免任务自动调度执行。

查看MaxCompute表的结果:

screenshot

步骤三:往RDS源头表里多写一些数据作为增量数据:

  1. insert into oplog values(CURRENT_DATE,'Jim','Update','SUCCESS');
  2. insert into oplog values(CURRENT_DATE,'Kate','Delete','Failed');
  3. insert into oplog values(CURRENT_DATE,'Lily','Drop','Failed');

步骤四:配置同步增量数据的任务。注意,通过配置“数据过滤”,在15号凌晨进行同步的时候,把14号源头表全天新增的数据查询出来,并同步到目标表增量分区里。

screenshot

任务设置调度周期为每天调度,提交/发布后,第二天任务将自动调度执行,执行成功后,可以查看到MaxCompute目标表的数据变成了:

screenshot

会变的数据进行增量同步

如人员表、订单表一类的会发生变化的数据,根据数据仓库的4个特点里的反映历史变化的这个特点的要求,我们建议每天对数据进行全量同步。也就是说每天保存的都是数据的全量数据,这样历史的数据和当前的数据都可以很方便地获得。

真实场景中因为某些特殊情况,需要每天只做增量同步,又因为MaxCompute不支持Update语句进行修改数据,只能用其他方法来实现。两种同步策略(全量同步、增量同步)的具体方法如下文。

数据准备

  1. drop table if exists user ;
  2. create table if not exists user(
  3. uid int,
  4. uname varchar(50),
  5. deptno int,
  6. gender VARCHAR(1),
  7. optime DATETIME
  8. );
  9. --历史数据
  10. insert into user values (1,'LiLei',100,'M',str_to_date('2016-11-13','%Y-%m-%d'));
  11. insert into user values (2,'HanMM',null,'F',str_to_date('2016-11-13','%Y-%m-%d'));
  12. insert into user values (3,'Jim',102,'M',str_to_date('2016-11-12','%Y-%m-%d'));
  13. insert into user values (4,'Kate',103,'F',str_to_date('2016-11-12','%Y-%m-%d'));
  14. insert into user values (5,'Lily',104,'F',str_to_date('2016-11-11','%Y-%m-%d'));
  15. --增量数据
  16. update user set deptno=101,optime=CURRENT_TIME where uid = 2; --null改成非null
  17. update user set deptno=104,optime=CURRENT_TIME where uid = 3; --非null改成非null
  18. update user set deptno=null,optime=CURRENT_TIME where uid = 4; --非null改成null
  19. delete from user where uid = 5;
  20. insert into user(uid,uname,deptno,gender,optime) values (6,'Lucy',105,'F',CURRENT_TIME);

每天全量同步

每天全量同步同步比较简单。

操作步骤

步骤一:创建MaxCompute目标表。

  1. --全量同步
  2. create table ods_user_full(
  3. uid bigint,
  4. uname string,
  5. deptno bigint,
  6. gender string,
  7. optime DATETIME
  8. ) partitioned by (ds string);

步骤二:配置全量同步任务。

screenshot

注意,需要每天都全量同步,因此任务的调度周期需要配置为天调度。

步骤三:测试任务,并查看同步后MaxCompute目标表结果。

screenshot

因为每天都是全量同步,没有全量和增量的区别,所以第二天任务自动调度执行成功后,就能看到数据结果为:

screenshot

如果需要查询的话,就用where ds =‘20161114’来取全量数据即可。

每天增量同步

不推荐用这种方式,只有在极特殊的场景下才考虑。首先这种场景不支持delete语句,因为被删除的数据无法通过SQL语句的过滤条件查到。当然实际上公司里的代码很少直接有直接删除数据的,都是使用逻辑删除,那delete就转化成update来处理了。但是这里毕竟限制了一些特殊的业务场景不能做了,当出现特殊情况可能导致数据不一致。另外还有一个缺点就是同步后要对新增的数据和历史数据做合并。具体的做法如下文。

数据准备

需要创建2张表,一张写当前的最新数据,一张写增量数据。

  1. --结果表
  2. create table dw_user_inc(
  3. uid bigint,
  4. uname string,
  5. deptno bigint,
  6. gender string,
  7. optime DATETIME
  8. );
  9. --增量记录表
  10. create table ods_user_inc(
  11. uid bigint,
  12. uname string,
  13. deptno bigint,
  14. gender string,
  15. optime DATETIME
  16. )

操作步骤

步骤一:配置任务将全量数据直接写入结果表。注意,这个只需执行一次,执行成功后需要到“数据开发”将任务设置暂停。

screenshot

结果如下:

screenshot

步骤二:配置任务将增量数据写入到增量表。

screenshot

结果如下

screenshot

步骤三:将数据做一次合并。

  1. insert overwrite table dw_user_inc
  2. select
  3. --所有select操作,如果ODS表有数据,说明发生了变动,以ODS表为准
  4. case when b.uid is not null then b.uid else a.uid end as uid,
  5. case when b.uid is not null then b.uname else a.uname end as uname,
  6. case when b.uid is not null then b.deptno else a.deptno end as deptno,
  7. case when b.uid is not null then b.gender else a.gender end as gender,
  8. case when b.uid is not null then b.optime else a.optime end as optime
  9. from
  10. dw_user_inc a
  11. full outer join ods_user_inc b
  12. on a.uid = b.uid ;

最终结果是:

screenshot

可以看到,delete的那条记录没有同步成功。

对比以上两种同步方式,可以很清楚看到两种同步方法的区别和优劣。第二种方法的优点是同步的增量数据量比较小,但是带来的缺点有可能有数据不一致的风险,而且还需要用额外的计算进行数据合并。如无必要,会变化的数据就使用方法一即可。如果对历史数据希望只保留一定的时间,超出时间的做自动删除,可以设置Lifecycle。

本文导读目录
本文导读目录
以上内容是否对您有帮助?