全部产品
数据传输服务DTS

RDS到DataHub数据实时同步

更新时间:2017-06-07 13:26:11   分享:   

本小节介绍如何使用数据传输服务DTS快速配置RDS实例到DataHub实例间的数据实时同步作业,让数据可以进入流计算等大数据产品进行数据实时分析。

支持功能

  • 支持同一个阿里云账号下RDS MySQL实例到DataHub实例的数据实时同步。
  • 支持不同阿里云账号下的RDS MySQL实例到DataHub实例的数据实时同步。
  • 支持的RDS实例包括,经典网络和VPC网络两种网络模式。

同步对象

  • 只支持表的同步,不支持其他非表对象的同步。

同步原理

技术原理

如上图所示,RDS->DataHub数据实时同步,是将RDS MySQL产生的增量数据数据实时同步到Datahub中的topic。增量日志DataHub中存储的表名默认同RDS表同名。topic名称可以根据业务需要修改,您可以在配置任务时,修改表在DataHub中对应的topic名称。

topic除了存储更新数据,它还会存储一些元信息,topic的结构定义如下:

dts_record_id dts_instance_id dts_db_name dts_table_name dts_operation_flag dts_utc_timestamp dts_before_flag dts_after_flag dts_col1 …. dts_colN
1 234 db1 sbtest1 I 1476258462 N Y 1 ….. JustInsert
2 234 db1 sbtest1 U 1476258463 Y N 1 ….. JustInsert
2 234 db1 sbtest1 U 1476258463 N Y 1 ….. JustUpdate
3 234 db1 sbtest1 D 1476258464 Y N 1 ….. JustUpdate

其中:
dts_record_id: 这条增量日志的唯一标识,唯一递增。如果变更类型为update,那么增量更新会被拆分成2条,一条Insert,一条Delete。那么这两条记录的record_id相同。
dts_instance_id: 这条增量日志所对应的数据库的server id。
dts_db_name: 这条增量更新日志更新的表所在的数据库库名。
dts_table_name:这条增量更新日志更新的表。
dts_operation_flag: 标示这条增量日志的操作类型。取值包括:
I : insert 操作
D : delete 操作
U : update 操作

dts_utc_timestamp: 这条增量日志的操作时间戳,为这个更新操作记录binlog的时间戳。这个时间戳为UTC时间。
dts_before_flag: 表示这条增量日志后面带的各个column值是否更新前的值。取值包括:Y 和 N。当后面的column为更新前的值时,dts_before_flag=Y, 当后面的column值为更新后的值时,dts_before_flag=N.
dts_after_flag:表示这条增量日志后面带的各个column值是否更新后的值。取值包括:Y 和 N。 当后面的column为更新前的值时,dts_after_flag=N,当后面的column值为更新后的值时,dts_after_flag=Y.

对于不同的操作类型,增量日志中的dts_before_flag和dts_after_flag定义如下:

1) 操作类型为:insert

dts_record_id dts_instance_id dts_db_name dts_table_name dts_operation_flag dts_utc_timestamp dts_before_flag dts_after_flag dts_col1 …. dts_colN
1 234 db1 sbtest1 I 1476258462 N Y 1 ….. JustInsert

当操作类型为insert时,后面的所有column值为新插入的记录值,即为更新后的值。所以before_flag=N, after_flag=Y

2) 操作类型为:update

dts_record_id dts_instance_id dts_db_name dts_table_name dts_operation_flag dts_utc_timestamp dts_before_flag dts_after_flag dts_col1 …. dts_colN
2 234 db1 sbtest1 I 1476258463 Y N 1 ….. JustInsert
2 234 db1 sbtest1 I 1476258463 N Y 1 ….. JustUpdate

当操作类型为update时,会将update操作拆为2条增量日志。这两条增量日志的dts_record_id ,dts_operation_flag 及dts_utc_timestamp相同。
第一条日志记录了更新前的值,所以dts_before_flag=Y, dts_after_flag=N
第二条日志记录了更新后的值,所以dts_before_flag=N, dts_after_flag=Y

3) 操作类型为:delete

dts_record_id dts_instance_id dts_db_name dts_table_name dts_operation_flag dts_utc_timestamp dts_before_flag dts_after_flag dts_col1 …. dts_colN
3 234 db1 sbtest1 D 1476258464 Y N 1 ….. JustUpdate

当操作类型为delete时,后面的所有column值为被删除的记录值,即为更新前的值。所以dts_before_flag=Y, dts_after_flag=N

下面详细介绍RDS到Datahub数据实时同步作业的配置流程。

同步作业配置流程

下面详细介绍RDS到Datahub数据实时同步作业的配置流程。

1.购买同步链路

进入数据传输服务控制台,进入数据同步页面,点击控制台右上角“创建同步作业” 开始作业配置。

在链路配置之前需要购买一个同步链路。同步链路目前支持包年包月及按量付费两种付费模式,可以根据需要选择不同的付费模式。

在购买页面需要配置的参数包括:

  • 源实例
    同步作业的源实例类型,目前只支持RDS For MySQL.
  • 源地域
    源地域为同步实例的源RDS实例所在地域。
  • 目标实例
    目标实例为同步作业的目标实例类型,目前支持 RDS For MySQL, MaxCompute(原ODPS),DataHub。配置RDS->MaxCompute同步链路时,目标实例选择:Datahub 即可。
  • 目标地域
    由于Datahub目前只在上海地区售卖,所以目标地域选择杭州。
  • 实例规格
    实例规格影响了链路的同步性能,可以根据业务性能选择合适的规则。
  • 网络类型
    RDS->Datahub支持通过公网、私网 同步数据。如果源RDS没有公网连接地址,那么网络类型只能选择 私网
  • 数量
    数量为一次性购买的同步链路的数量,如果购买的是按量付费实例,一次最多购买99条链路。

当购买完同步实例,返回数据传输控制台,点击新购链路右侧的“配置同步作业” 开始链路配置。

2.同步链路连接信息配置。

在这一步主要配置:

  • 同步作业名称
    同步作业名称没有唯一性要求,主要为了更方便识别具体的作业,建议选择一个有业务意义的作业名称,方便后续的链路查找及管理。

  • 实例ID配置

在这个步骤中需要配置源RDS实例的实例ID,及目标Datahub实例的project。配置的Datahub project 必须属于登录DTS的阿里云账号的资源。

步骤1

当这些内容配置完成后,可以点击授权白名单并进入下一步

2.授权RDS实例白名单。

这个步骤,主要是将给DTS服务器IP地址添加到RDS实例白名单中,让DTS能够访问RDS实例。
当授权完成后,即进入同步对象选择。

3.选择同步对象

当RDS实例白名单添加完成后,即进入同步表及同步初始化的相关配置。

配置步骤2

在这个步骤中,需要配置 同步初始化 和 同步表。其中:
(1)同步初始化
同步初始化选项包括: 结构初始化
结构初始化是指对于待同步的表,在Datahub中创建对应的topic,完成topic schema定义。建议选择结构初始化。

(2) 同步表选择

同步表只能选择某些表,不能直接选择整个库。对于同步的表,可以修改topic名称,选择对应的shard key。如需修改,可以点击右边已选择对象后面的编辑按钮,进入修改界面。

当配置完同步对象后,进入同步启动前的预检查阶段。

5.预检查

当上面所有选项配置完成后,即进入启动之前的预检查。具体检查项内容详见本文最后的 预检查内容 一节
当同步作业配置完成后,数据传输服务会进行限制预检查,当预检查通过后,可以点击 确定 按钮,启动同步作业。

当同步作业启动之后,即进入同步作业列表。此时刚启动的作业处于同步初始化状态。初始化的时间依赖于同步表的数量大小。当初始化完成后同步链路即进入同步中的状态,此时源跟目标实例的同步链路才真正建立完成。

当同步任务进入 同步中 时,可以在Datahub中可以查询出同步表对应的topic:

至此,完成RDS->Datahub数据实时同步作业的配置。
当同步作业建立成功后,可以到流式计算中注册Datahub topic,同时创建流式源表,对于同步到Datahub中的数据进行后续的实时数据计算分析。

本文导读目录
本文导读目录
以上内容是否对您有帮助?