全部产品
存储与CDN 数据库 安全 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网
数据传输服务DTS

创建RDS到MaxCompute数据实时同步作业

更新时间:2017-06-07 13:26:11

本小节介绍如何使用数据传输服务快速创建RDS实例到MaxCompute实例间的实时同步作业,实现在线(RDS)到离线系统(MaxCompute)的数据实时同步,进一步为数据实时分析奠定基础。

支持功能

数据源

  • 支持同一个阿里云账号下RDS MySQL实例到MaxCompute实例的数据实时同步。
  • 支持不同阿里云账号下的RDS MySQL实例到MaxCompute实例的数据实时同步。
  • 支持的RDS实例包括,经典网络和VPC网络两种网络模式。

同步对象

  • 只支持表的同步,不支持其他非表对象的同步。

同步原理

原理

如上图所示,整个同步过程分为两步:
(1) 全量初始化, 这个步骤将RDS MySQL中已经存在的全量数据初始化到MaxCompute中。对于同步的每个表,全量初始化的数据都会独立存储在MaxCompute中的全量基线表中,这个表的默认格式为:源表名_base。例如表 t1,那么全量基线表在MaxCompute中存储的表名为:t1_dts_base。这个存储表名前缀可以根据需要变更,您可以在配置任务时,修改表在MaxCompute存储的名称。

(2) 增量数据同步,这个步骤将RDS MySQL产生的增量数据数据实时同步到MaxCompute中。并存储在增量日志表中,每个同步表对应一个增量日志表。增量日志表在MaxCompute中存储的表名的默认格式为:源表名_log。这个存储表名前缀可以根据需要变更,您可以在配置任务时,修改表在MaxCompute存储的名称。

增量日志表除了存储更新数据,它还会存储一些元信息,增量日志表的表结构定义如下:

record_id operation_flag utc_timestamp before_flag after_flag col1 …. colN
1 I 1476258462 N Y 1 ….. JustInsert
2 U 1476258463 Y N 1 ….. JustInsert
2 U 1476258463 N Y 1 ….. JustUpdate
3 D 1476258464 Y N 1 ….. JustUpdate

其中:
record_id: 这条增量日志的唯一标识,唯一递增。如果变更类型为update,那么增量更新会被拆分成2条,一条Insert,一条Delete。那么这两条记录的record_id相同。
operation_flag: 标示这条增量日志的操作类型。取值包括:
I : insert 操作
D : delete 操作
U : update 操作

dts_utc_timestamp: 这条增量日志的操作时间戳,为这个更新操作记录binlog的时间戳。这个时间戳为UTC时间。
before_flag: 表示这条增量日志后面带的各个column值是否更新前的值。取值包括:Y 和 N。当后面的column为更新前的值时,before_flag=Y, 当后面的column值为更新后的值时,before_flag=N。
after_flag:表示这条增量日志后面带的各个column值是否更新后的值。取值包括:Y 和 N。 当后面的column为更新前的值时,after_flag=N,当后面的column值为更新后的值时,after_flag=Y。

对于不同的操作类型,增量日志中的before_flag和after_flag定义如下:

1) 操作类型为:insert

record_id operation_flag utc_timestamp before_flag after_flag col1 …. colN
1 I 1476258462 N Y 1 ….. JustInsert

当操作类型为insert时,后面的所有column值为新插入的记录值,即为更新后的值。所以before_flag=N, after_flag=Y。

2) 操作类型为:update

record_id operation_flag utc_timestamp before_flag after_flag col1 …. colN
2 U 1476258463 Y N 1 ….. JustInsert
2 U 1476258463 N Y 1 ….. JustUpdate

当操作类型为update时,会将update操作拆为2条增量日志。这两条增量日志的record_id ,operation_flag 及dts_utc_timestamp相同。
第一条日志记录了更新前的值,所以before_flag=Y, after_flag=N。
第二条日志记录了更新后的值,所以before_flag=N, after_flag=Y。

3) 操作类型为:delete

record_id operation_flag dts_utc_timestamp before_flag after_flag col1 …. colN
3 D 1476258464 Y N 1 ….. JustUpdate

当操作类型为delete时,后面的所有column值为被删除的记录值,即为更新前的值。所以before_flag=Y, after_flag=N。

(3) RDS->MaxCompute数据同步,对于每个同步表,都会在MaxCompute中生成一个全量基线表+一个增量日志表,所以如果需要在MaxCompute中获取某个时刻某张表的全量数据,就需要merge这张表的全量基线表和增量日志表。具体实现方法后面会详细讲解。

下面详细介绍RDS到MaxCompute数据实时同步作业的配置流程。

同步作业配置流程

下面详细介绍RDS到MaxCompute数据实时同步作业的配置流程。

1.购买同步链路

进入数据传输服务控制台,进入数据同步页面,点击控制台右上角“创建同步作业” 开始作业配置。

在链路配置之前需要购买一个同步链路。同步链路目前支持包年包月及按量付费两种付费模式,可以根据需要选择不同的付费模式。

在购买页面需要配置的参数包括:

  • 源实例
    同步作业的源实例类型,目前只支持RDS For MySQL。
  • 源地域
    源地域为同步实例的源RDS实例所在地域。
  • 目标实例
    目标实例为同步作业的目标实例类型,目前支持 RDS For MySQL, MaxCompute(原ODPS),DataHub。配置RDS->MaxCompute同步链路时,目标实例选择:MaxCompute 即可。
  • 目标地域
    由于MaxCompute目前只在上海地区售卖,所以目标地域选择上海。
  • 实例规格
    实例规格影响了链路的同步性能,可以根据业务性能选择合适的规则。
  • 网络类型
    RDS->MaxCompute支持通过公网、私网 同步数据。如果源RDS没有公网连接地址,那么网络类型只能选择 私网
  • 数量
    数量为一次性购买的同步链路的数量,如果购买的是按量付费实例,一次最多购买99条链路。

当购买完同步实例,返回数据传输控制台,点击新购链路右侧的“配置同步作业” 开始链路配置。

2.同步链路连接信息配置。

在这一步主要配置:

  • 同步作业名称
    同步作业名称没有唯一性要求,主要为了更方便识别具体的作业,建议选择一个有业务意义的作业名称,方便后续的链路查找及管理。

  • 实例ID配置

在这个步骤中需要配置源RDS实例的实例ID,及目标MaxCompute实例的project。配置的MaxCompute project 必须属于登录DTS的阿里云账号的资源。

实例连接信息

当这些内容配置完成后,可以点击授权白名单并进入下一步

3.授权RDS实例白名单

这个步骤,主要是将给DTS服务账号授权MaxCompute写权限,让DTS能够将数据同步复制到MaxCompute中。

步骤2

授权权限包括对project的:
CreateTable
CreateInstance
CreateResource
CreateJob
List
为了保证同步作业的稳定性,在同步过程中,请勿将写权限回收。当白名单授权后,点击下一步,进入同步账号创建。

当授权完成后,即进入同步对象选择。

4.选择同步对象

当MaxCompute账号授权完成后,即进入同步表及同步初始化的相关配置。

步骤3

在这个步骤中,需要配置 同步初始化 和 同步表。其中:
(1)同步初始化
同步初始化选项包括: 结构初始化 和 全量数据初始化。
结构初始化是指对于待同步的表,在MaxCompute中创建对应的表,完成表结构定义。
全量数据初始化是指对于待同步的表,将历史数据初始化到MaxCompute中。
配置任务时,建议同时选择 结构初始化+全量数据初始化。

(2) 同步表选择

同步表只能选择某些表,不能直接选择整个库。对于同步的表,可以修改表在MaxCompute对应的全量基线表及增量日志表的表名前缀。如需修改,可以点击右边已选择对象后面的编辑按钮,进入修改界面。步骤4

当配置完同步对象后,进入同步初始化配置。

5.预检查

当上面所有选项配置完成后,即进入启动之前的预检查。具体检查项内容详见本文最后的 预检查内容 一节。
当同步作业配置完成后,数据传输服务会进行限制预检查,当预检查通过后,可以点击 确定 按钮,启动同步作业。

当同步作业启动之后,即进入同步作业列表。此时刚启动的作业处于同步初始化状态。初始化的时间长度依赖于源实例中同步对象的数据量大小。当初始化完成后同步链路即进入同步中的状态,此时源跟目标实例的同步链路才真正建立完成。

当同步任务进入 同步中 时,可以在MaxCompute中可以查询出jiangliu_test对应的全量基线表和增量日志表:
表信息

至此,完成RDS->MaxCompute数据实时同步作业的配置。

全量数据合并方案

本小节介绍,如何根据同步到MaxCompute中的全量基线表和增量日志数据得到某个时刻表的全量数据。
DTS提供通过MaxCompute SQL实现全量数据合并的能力。

通过MaxCompute SQL merge 全量基线数据 和 增量日志表 得到时刻t的全量数据。MaxCompute SQL的写法如下:

  1. insert overwrite table result_storage_table
  2. select col1,
  3. col2,
  4. colN
  5. from(
  6. select row_number() over(partition by t.primary_key_column
  7. order by record_id desc, after_flag desc) as row_number, record_id, operation_flag, after_flag, col1,col2,colN
  8. from(
  9. select incr.record_id, incr.operation_flag, incr.after_flag, incr.col1, incr.col2,incr.colN
  10. from table_log incr
  11. where utc_timestamp< timestmap
  12. union all
  13. select 0 as record_id, 'I' as operation_flag, 'Y' as after_flag, base.col1, base.col2,base.colN
  14. from table_base base) t) gt
  15. where record_num=1
  16. and after_flag='Y'

上面代码中的几个变量意义如下:
1) result_storage_table 表示全量merge结果集的存储表的表名
2) col1, col2,colN 表示同步表中列的列名
3) primary_key_column 表示同步表中的主键列的列名
4) table_log 表示增量日志表
5) table_base 表示全量基线表
6) timestmap 表示需要merge哪个时刻的全量数据

例如对于上面配置任务中的jiangliu_test表,jiangliu_test_20161010_base 为jiangliu_test对应的全量基线表,jiangliu_test_20161010_log 为jiangliu_test对应的增量日志表。

jiangliu_test的表结构定义为:
表结构

那么查询时间戳1476263486 时刻,jiangliu_test表的全量数据的MaxCompute SQL如下:

  1. insert overwrite table jiangliu_test_1476263486
  2. select id,
  3. name
  4. from(
  5. select row_number() over(partition by t.id
  6. order by record_id desc, after_flag desc) as row_number, record_id, operation_flag, after_flag, id, name
  7. from(
  8. select incr.record_id, incr.operation_flag, incr.after_flag, incr.id, incr.name
  9. from jiangliu_test_20161010_log incr
  10. where utc_timestamp< 1476263486
  11. union all
  12. select 0 as record_id, 'I' as operation_flag, 'Y' as after_flag, base.id, base.name
  13. from jiangliu_test_20161010_base base) t) gt
  14. where gt.row_number= 1
  15. and gt.after_flag= 'Y' ;

您也可以通过大数据开发套件,在后续的计算分析操作之前,添加全量数据Merge节点,当全量数据merge完成后,可自动调度起后续的计算分析节点。同时可以配置调度周期,进行周期性的数据离线分析。

至此完成RDS->MaxCompute数据同步任务配置及全量数据合并。

本文导读目录