全部产品
存储与CDN 数据库 安全 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网
数据传输服务DTS

RDS 到 DataHub 数据实时同步

更新时间:2017-11-06 17:07:10

本小节介绍如何使用数据传输 DTS (以下简称 DTS) 快速配置 RDS for MySQL 实例到 DataHub 实例间的数据实时同步作业,让数据可以实时进入流计算等大数据产品进行数据实时分析。

支持功能

  • 支持同一个阿里云账号下 RDS for MySQL 实例到 DataHub 实例的数据实时同步。
  • 支持不同阿里云账号下的 RDS for MySQL 实例到 DataHub 实例的数据实时同步。
  • 支持的 RDS for MySQL 实例包括,经典网络和 VPC 网络两种网络模式。

同步对象

  • 只支持表的同步,不支持其他非表对象的同步。

同步原理

技术原理

如上图所示,RDS->DataHub 数据实时同步,是将 RDS for MySQL 产生的增量数据数据实时同步到 DataHub 中的 topic。增量日志在 DataHub 中存储的表名默认同 RDS 表同名。topic 名称可以根据业务需要修改,您可以在配置任务时,修改表在 DataHub 中对应的 topic 名称。

topic 除了存储更新数据,它还会存储一些元信息,topic 的结构定义如下:

dts_record_id dts_instance_id dts_db_name dts_table_name dts_operation_flag dts_utc_timestamp dts_before_flag dts_after_flag dts_col1 …. dts_colN
1 234 db1 sbtest1 I 1476258462 N Y 1 ….. JustInsert
2 234 db1 sbtest1 U 1476258463 Y N 1 ….. JustInsert
2 234 db1 sbtest1 U 1476258463 N Y 1 ….. JustUpdate
3 234 db1 sbtest1 D 1476258464 Y N 1 ….. JustUpdate

其中:
dts_record_id: 这条增量日志的唯一标识,唯一递增。如果变更类型为 update,那么增量更新会被拆分成 2 条,一条 Insert,一条 Delete。这两条记录具有相同的 record_id。
dts_instance_id: 这条增量日志所对应的数据库的 server id。
dts_db_name: 这条增量更新日志更新的表所在的数据库库名。
dts_table_name:这条增量更新日志更新的表。
dts_operation_flag: 标示这条增量日志的操作类型。取值包括:
I : insert 操作
D : delete 操作
U : update 操作

dts_utc_timestamp: 这条增量日志的操作时间戳,为这个更新操作记录 binlog 的时间戳。这个时间戳为 UTC 时间。
dts_before_flag: 表示这条增量日志后面带的各个 column 值是否更新前的值。取值包括:Y 和 N。当后面的 column 为更新前的值时,dts_before_flag=Y, 当后面的 column 值为更新后的值时,dts_before_flag=N.
dts_after_flag:表示这条增量日志后面带的各个 column 值是否更新后的值。取值包括:Y 和 N。 当后面的 column 为更新前的值时,dts_after_flag=N,当后面的 column 值为更新后的值时,dts_after_flag=Y.

对于不同的操作类型,增量日志中的 dts_before_flag 和 dts_after_flag 定义如下:

1) 操作类型为:insert

dts_record_id dts_instance_id dts_db_name dts_table_name dts_operation_flag dts_utc_timestamp dts_before_flag dts_after_flag dts_col1 …. dts_colN
1 234 db1 sbtest1 I 1476258462 N Y 1 ….. JustInsert

当操作类型为 insert 时,后面的所有 column 值为新插入的记录值,即为更新后的值。所以 before_flag=N, after_flag=Y

2) 操作类型为:update

dts_record_id dts_instance_id dts_db_name dts_table_name dts_operation_flag dts_utc_timestamp dts_before_flag dts_after_flag dts_col1 …. dts_colN
2 234 db1 sbtest1 I 1476258463 Y N 1 ….. JustInsert
2 234 db1 sbtest1 I 1476258463 N Y 1 ….. JustUpdate

当操作类型为 update 时,会将 update 操作拆为 2 条增量日志。这两条增量日志的 dts_record_id ,dts_operation_flag 及 dts_utc_timestamp 相同。
第一条日志记录了更新前的值,所以 dts_before_flag=Y, dts_after_flag=N
第二条日志记录了更新后的值,所以 dts_before_flag=N, dts_after_flag=Y

3) 操作类型为:delete

dts_record_id dts_instance_id dts_db_name dts_table_name dts_operation_flag dts_utc_timestamp dts_before_flag dts_after_flag dts_col1 …. dts_colN
3 234 db1 sbtest1 D 1476258464 Y N 1 ….. JustUpdate

当操作类型为 delete 时,后面的所有 column 值为被删除的记录值,即为更新前的值。所以 dts_before_flag=Y, dts_after_flag=N

下面详细介绍 RDS for MySQL 到 DataHub 数据实时同步作业的配置流程。

同步作业配置流程

下面详细介绍 RDS for MySQL 到 DataHub 数据实时同步作业的配置流程。

1.购买同步链路

进入 数据传输 DTS 控制台,进入数据同步页面,点击控制台右上角“创建同步作业” 开始作业配置。

在链路配置之前需要购买一个同步链路。同步链路目前支持包年包月及按量付费两种付费模式,可以根据需要选择不同的付费模式。

在购买页面需要配置的参数包括:

  • 源实例
    同步作业的源实例类型,选择 RDS for MySQL。
  • 源地域
    源地域为同步实例的源 RDS for MySQL 实例所在地域。
  • 目标实例
    目标实例为同步作业的目标实例类型,目前支持 RDS for MySQL, MaxCompute (原ODPS),分析型数据库(AnalyticDB),DataHub。配置 RDS->DataHub 同步链路时,目标实例选择:Datahub 即可。
  • 目标地域
    由于 Datahub 目前暂时只在 华东1(杭州) 地区售卖,所以目标地域默认选择 华东1(杭州)。
  • 实例规格
    实例规格影响了链路的同步性能,可以根据业务性能选择合适的规格。对于数据同步链路规格的具体说明请参考:数据同步规格说明
  • 网络类型
    RDS->DataHub 支持通过公网或者私网同步数据。如果源 RDS for MySQL 没有公网连接地址,那么网络类型只能选择 私网
  • 数量
    数量为一次性购买的同步链路的数量,如果购买的是按量付费实例,一次最多购买 99 条链路。

当购买完同步实例,返回数据传输控制台,点击新购链路右侧的“配置同步作业” 开始链路配置。

2.同步链路连接信息配置。

在这一步主要配置:

  • 同步作业名称
    同步作业名称没有唯一性要求,主要为了更方便识别具体的作业,建议选择一个有业务意义的作业名称,方便后续的链路查找及管理。

  • 实例 ID 配置

在这个步骤中需要配置源 RDS for MySQL 实例的实例 ID,及目标 DatahHub 实例的 project。配置的 Datahub project 必须属于登录 DTS 所使用的阿里云账号的资源。

步骤1

当这些内容配置完成后,可以点击授权白名单并进入下一步

2.授权 RDS 实例白名单。

这个步骤 DTS 会将 DTS 服务器的 IP 地址添加到目标 RDS 实例的白名单中,让 DTS 能够访问 RDS 实例。
当授权完成后,即进入同步对象选择。

3.选择同步对象

当 RDS 实例白名单添加完成后,即进入同步表及同步初始化的相关配置。

配置步骤2

在这个步骤中,需要配置 同步初始化 和 同步表。其中:
(1)同步初始化
同步初始化选项包括: 结构初始化
结构初始化是指对于待同步的表,在 DataHub 中创建对应的 topic,完成 topic schema 定义。建议选择结构初始化。

(2) 同步表选择

同步表只能选择某些表,不能直接选择整个库。对于同步的表,可以修改 topic 名称,选择对应的 shard key。如需修改,可以点击右边已选择对象后面的编辑按钮,进入修改界面。

当配置完同步对象后,进入同步启动前的预检查阶段。

5.预检查

当上面所有选项配置完成后,即进入启动之前的预检查。具体检查项内容详见本文最后的 预检查内容 一节
当同步作业配置完成后,数据传输服务会进行限制预检查,当预检查通过后,可以点击 确定 按钮,启动同步作业。

当同步作业启动之后,即进入同步作业列表。此时刚启动的作业处于同步初始化状态。初始化的时间依赖于同步表的数量大小。当初始化完成后同步链路即进入同步中的状态,此时源跟目标实例的同步链路才真正建立完成。

当同步任务进入 同步中 时,可以在 DataHub 中可以查询出同步表对应的 topic:

至此,完成 RDS->Datahub 数据实时同步作业的配置。
当同步作业建立成功后,可以到流式计算中注册 DataHub topic,同时创建流式源表,对于同步到 DataHub 中的数据进行后续的实时数据计算分析。

本文导读目录