全部产品
弹性计算 会员服务 网络 安全 移动云 数加·大数据分析及展现 数加·大数据应用 管理与监控 云通信 阿里云办公 培训与认证 更多
存储与CDN 数据库 域名与网站(万网) 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网 智能硬件
数据传输服务DTS

MySQL 到 DataHub 数据实时同步

更新时间:2018-02-08 16:14:14

本小节介绍如何使用数据传输 DTS (以下简称 DTS) 快速配置MySQL到 DataHub 实例的数据实时同步,让数据可以实时进入流计算等大数据产品进行数据实时分析。

支持功能

  • 支持通过专线接入阿里云的自建MySQL到Datahub的数据实时同步
  • 支持ECS上的自建MySQL到Datahub的数据实时同步
  • 支持同一个阿里云账号下 RDS for MySQL 实例到 DataHub 实例的数据实时同步。
  • 支持不同阿里云账号下的 RDS for MySQL 实例到 DataHub 实例的数据实时同步。

同步对象

  • 只支持表的同步,不支持其他非表对象的同步。

同步原理

技术原理

如上图所示,MySQL->DataHub 数据实时同步,是将 MySQL 产生的增量数据数据实时同步到 DataHub 中的 topic。增量日志在 DataHub 中存储的表名默认同 MySQL 表同名。topic 名称可以根据业务需要修改,您可以在配置任务时,修改表在 DataHub 中对应的 topic 名称。

topic 除了存储更新数据,它还会存储一些元信息,topic 的结构定义如下:

dts_record_id dts_instance_id dts_db_name dts_table_name dts_operation_flag dts_utc_timestamp dts_before_flag dts_after_flag dts_col1 …. dts_colN
1 234 db1 sbtest1 I 1476258462 N Y 1 ….. JustInsert
2 234 db1 sbtest1 U 1476258463 Y N 1 ….. JustInsert
2 234 db1 sbtest1 U 1476258463 N Y 1 ….. JustUpdate
3 234 db1 sbtest1 D 1476258464 Y N 1 ….. JustUpdate

其中:
dts_record_id: 这条增量日志的唯一标识,唯一递增。如果变更类型为 update,那么增量更新会被拆分成 2 条,一条 Insert,一条 Delete。这两条记录具有相同的 record_id。
dts_instance_id: 这条增量日志所对应的数据库的 server id。
dts_db_name: 这条增量更新日志更新的表所在的数据库库名。
dts_table_name:这条增量更新日志更新的表。
dts_operation_flag: 标示这条增量日志的操作类型。取值包括:
I : insert 操作
D : delete 操作
U : update 操作

dts_utc_timestamp: 这条增量日志的操作时间戳,为这个更新操作记录 binlog 的时间戳。这个时间戳为 UTC 时间。
dts_before_flag: 表示这条增量日志后面带的各个 column 值是否更新前的值。取值包括:Y 和 N。当后面的 column 为更新前的值时,dts_before_flag=Y, 当后面的 column 值为更新后的值时,dts_before_flag=N.
dts_after_flag:表示这条增量日志后面带的各个 column 值是否更新后的值。取值包括:Y 和 N。 当后面的 column 为更新前的值时,dts_after_flag=N,当后面的 column 值为更新后的值时,dts_after_flag=Y.

对于不同的操作类型,增量日志中的 dts_before_flag 和 dts_after_flag 定义如下:

1) 操作类型为:insert

dts_record_id dts_instance_id dts_db_name dts_table_name dts_operation_flag dts_utc_timestamp dts_before_flag dts_after_flag dts_col1 …. dts_colN
1 234 db1 sbtest1 I 1476258462 N Y 1 ….. JustInsert

当操作类型为 insert 时,后面的所有 column 值为新插入的记录值,即为更新后的值。所以 before_flag=N, after_flag=Y

2) 操作类型为:update

dts_record_id dts_instance_id dts_db_name dts_table_name dts_operation_flag dts_utc_timestamp dts_before_flag dts_after_flag dts_col1 …. dts_colN
2 234 db1 sbtest1 I 1476258463 Y N 1 ….. JustInsert
2 234 db1 sbtest1 I 1476258463 N Y 1 ….. JustUpdate

当操作类型为 update 时,会将 update 操作拆为 2 条增量日志。这两条增量日志的 dts_record_id ,dts_operation_flag 及 dts_utc_timestamp 相同。
第一条日志记录了更新前的值,所以 dts_before_flag=Y, dts_after_flag=N
第二条日志记录了更新后的值,所以 dts_before_flag=N, dts_after_flag=Y

3) 操作类型为:delete

dts_record_id dts_instance_id dts_db_name dts_table_name dts_operation_flag dts_utc_timestamp dts_before_flag dts_after_flag dts_col1 …. dts_colN
3 234 db1 sbtest1 D 1476258464 Y N 1 ….. JustUpdate

当操作类型为 delete 时,后面的所有 column 值为被删除的记录值,即为更新前的值。所以 dts_before_flag=Y, dts_after_flag=N

下面详细介绍 MySQL 到 DataHub 数据实时同步作业的配置流程。

同步作业配置流程

下面详细介绍 MySQL 到 DataHub 数据实时同步作业的配置流程。

1.购买同步链路

进入 数据传输 DTS 控制台,进入数据同步页面,点击控制台右上角“创建同步作业” 开始作业配置。

在链路配置之前需要购买一个同步链路。同步链路目前支持包年包月及按量付费两种付费模式,可以根据需要选择不同的付费模式。

在购买页面需要配置的参数包括:

  • 源实例
    同步作业的源实例类型,目前只支持MySQL
  • 源地域
    如果为本地自建MySQL,那么选择专线在阿里云上的接入点所在的地区。如果为ECS上的自建MySQL,那么选择ECS实例所在的地区。如果为RDS For MySQL,那么选择RDS实例所在的地区。
  • 目标实例
    目标实例为同步作业的目标实例类型,目前支持 MySQL、MaxCompute (原ODPS、,分析型数据库(AnalyticDB)、DataHub。配置 MySQL->DataHub 同步链路时,目标实例选择:Datahub 即可。
  • 目标地域
    由于 Datahub 目前暂时只在 华东1(杭州) 地区售卖,所以目标地域默认选择 华东1(杭州)。
  • 实例规格
    实例规格影响了链路的同步性能,可以根据业务性能选择合适的规格。对于数据同步链路规格的具体说明请参考:数据同步规格说明

当购买完同步实例,返回数据传输控制台,点击新购链路右侧的“配置同步作业” 开始链路配置。

2.同步链路连接信息配置。

在这一步主要配置:

  • 同步作业名称
    同步作业名称没有唯一性要求,主要为了更方便识别具体的作业,建议选择一个有业务意义的作业名称,方便后续的链路查找及管理。

  • 数据源连接信息配置

在这个步骤中需要配置源 MySQL 实例的连接信息,及目标 DatahHub 实例的 project。配置的 Datahub project 必须属于登录 DTS 所使用的阿里云账号的资源。

源实例可以支持:通过专线接入阿里云的自建数据库、ECS上的自建数据库、RDS。

如果源实例为通过专线接入阿里云的自建数据库,那么需要配置的连接信息如下:

  • 实例类型:选择 通过专线接入阿里云的本地 DB
  • 实例地区:选择 专线 接入阿里云的接入点,例如接入阿里云的北京,那么选择 华北2 即可。
  • 对端专有网络:专线 接入的阿里云上的 专有网络的VPC ID
  • 主机名或IP地址:配置本地 MySQL 数据库访问地址,这个地址为本地局域网访问地址
  • 端口:本地 MySQL 实例监听端口
  • 数据库账号:本地 MySQL 实例访问账号
  • 数据库密码:上面指定的 MySQL 访问账号对应的密码

本地自建DB

如果源实例为ECS上的自建数据库,那么需要配置的连接信息如下:

  • 实例类型:选择 ECS上的自建数据库
  • ECS实例ID: 配置ECS实例的实例ID
  • 端口:本地 MySQL 实例监听端口
  • 数据库账号:本地 MySQL 实例访问账号
  • 数据库密码:上面指定的 MySQL 访问账号对应的密码

ECS上的自建数据库

如果源实例为RDS,那么只需要配置RDS实例的实例ID。

步骤1

当这些内容配置完成后,可以点击授权白名单并进入下一步

2.选择同步对象

当连接信息配置完成后,即进入同步表及同步初始化的相关配置。

配置步骤2

在这个步骤中,需要配置 同步初始化 和 同步表。其中:
(1)同步初始化
同步初始化选项包括: 结构初始化
结构初始化是指对于待同步的表,在 DataHub 中创建对应的 topic,完成 topic schema 定义。建议选择结构初始化。

(2) 同步表选择

同步表只能选择某些表,不能直接选择整个库。对于同步的表,可以修改 topic 名称,选择对应的 shard key。如需修改,可以点击右边已选择对象后面的编辑按钮,进入修改界面。

当配置完同步对象后,进入同步启动前的预检查阶段。

3.预检查

当上面所有选项配置完成后,即进入启动之前的预检查。具体检查项内容详见本文最后的 预检查内容 一节
当同步作业配置完成后,数据传输服务会进行限制预检查,当预检查通过后,可以点击 确定 按钮,启动同步作业。

当同步作业启动之后,即进入同步作业列表。此时刚启动的作业处于同步初始化状态。初始化的时间依赖于同步表的数量大小。当初始化完成后同步链路即进入同步中的状态,此时源跟目标实例的同步链路才真正建立完成。

当同步任务进入 同步中 时,可以在 DataHub 中可以查询出同步表对应的 topic:

至此,完成 MySQL->Datahub 数据实时同步作业的配置。
当同步作业建立成功后,可以到流式计算中注册 DataHub topic,同时创建流式源表,对于同步到 DataHub 中的数据进行后续的实时数据计算分析。

本文导读目录