全部产品
阿里云办公

创建 MySQL 到 MaxCompute 数据实时同步作业

更新时间:2018-05-27 19:21:16

本小节介绍如何使用 数据传输 DTS 快速创建 MySQL 实例到 MaxCompute 实例间的实时同步作业,实现在线 (MySQL) 到离线系统 (MaxCompute) 的数据实时同步,进一步为数据实时分析奠定基础。

支持功能

数据源

  • 支持通过专线接入阿里云的自建 MySQL 到 MaxCompute 的数据实时同步
  • 支持 ECS 上的自建 MySQL 到 MaxCompute 的数据实时同步
  • 支持同一个阿里云账号下 RDS for MySQL 实例到 MaxCompute 的数据实时同步
  • 支持不同阿里云账号下的 RDS for MySQL 实例到 MaxCompute 的数据实时同步

同步对象

  • 只支持表的同步,不支持其他非表对象的同步。

同步原理

原理

如上图所示,整个同步过程分为两步:
(1) 全量初始化, 这个步骤将 MySQL 中已经存在的全量数据初始化到 MaxCompute 中。对于同步的每个表,全量初始化的数据都会独立存储在 MaxCompute 中的全量基线表中,这个表的默认格式为:源表名_base。例如表 t1,那么全量基线表在 MaxCompute 中存储的表名为:t1_dts_base。这个存储表名前缀可以根据需要变更,您可以在配置任务时,修改表在 MaxCompute 存储的名称。

(2) 增量数据同步,这个步骤将 MySQL 产生的增量数据数据实时同步到 MaxCompute 中。并存储在增量日志表中,每个同步表对应一个增量日志表。在增量数据同步时,使用合并多条记录到一个文件方式写入到 MaxCompute 中。增量日志表在 MaxCompute 中存储的表名的默认格式为:源表名_log。这个存储表名前缀可以根据需要变更,您可以在配置任务时,修改表在 MaxCompute 存储的名称。

增量日志表除了存储更新数据,它还会存储一些元信息,增量日志表的表结构定义如下:

record_id operation_flag utc_timestamp before_flag after_flag col1 …. colN
1 I 1476258462 N Y 1 ….. JustInsert
2 U 1476258463 Y N 1 ….. JustInsert
2 U 1476258463 N Y 1 ….. JustUpdate
3 D 1476258464 Y N 1 ….. JustUpdate

其中:
record_id: 这条增量日志的唯一标识,唯一递增。如果变更类型为 update,那么增量更新会被拆分成 2 条,一条 insert,一条 delete。那么这两条记录的 record_id 相同。
operation_flag: 标示这条增量日志的操作类型。取值包括:
I : insert 操作
D : delete 操作
U : update 操作

dts_utc_timestamp: 这条增量日志的操作时间戳,为这个更新操作记录 binlog 的时间戳。这个时间戳为 UTC 时间。
before_flag: 表示这条增量日志后面带的各个 column 值是否更新前的值。取值包括:Y 和 N。当后面的 column 为更新前的值时,before_flag=Y, 当后面的 column 值为更新后的值时,before_flag=N。
after_flag: 表示这条增量日志后面带的各个 column 值是否更新后的值。取值包括:Y 和 N。 当后面的 column 为更新前的值时,after_flag=N,当后面的 column 值为更新后的值时,after_flag=Y。

对于不同的操作类型,增量日志中的 before_flag 和 after_flag 定义如下:

1) 操作类型为:insert

record_id operation_flag utc_timestamp before_flag after_flag col1 …. colN
1 I 1476258462 N Y 1 ….. JustInsert

当操作类型为 insert 时,后面的所有 column 值为新插入的记录值,即为更新后的值。所以 before_flag=N, after_flag=Y。

2) 操作类型为:update

record_id operation_flag utc_timestamp before_flag after_flag col1 …. colN
2 U 1476258463 Y N 1 ….. JustInsert
2 U 1476258463 N Y 1 ….. JustUpdate

当操作类型为 update 时,会将 update 操作拆为 2 条增量日志。这两条增量日志的 record_id, operation_flag 及 dts_utc_timestamp 相同。
第一条日志记录了更新前的值,所以 before_flag=Y, after_flag=N。
第二条日志记录了更新后的值,所以 before_flag=N, after_flag=Y。

3) 操作类型为:delete

record_id operation_flag dts_utc_timestamp before_flag after_flag col1 …. colN
3 D 1476258464 Y N 1 ….. JustUpdate

当操作类型为 delete 时,后面的所有 column 值为被删除的记录值,即为更新前的值。所以 before_flag=Y, after_flag=N。

(3) MySQL->MaxCompute 数据同步,对于每个同步表,都会在 MaxCompute 中生成一个全量基线表和一个增量日志表,所以如果需要在 MaxCompute 中获取某个时刻某张表的全量数据,就需要 merge 这张表的全量基线表和增量日志表。具体实现方法后面会详细讲解。

下面详细介绍 MySQL 到 MaxCompute 数据实时同步作业的配置流程。

同步作业配置流程

下面详细介绍 MySQL 到 MaxCompute 数据实时同步作业的配置流程。

1.购买同步链路

进入数据传输 DTS 控制台,进入数据同步页面,点击控制台右上角“创建同步作业” 开始作业配置。

在链路配置之前需要购买一个同步链路。同步链路目前支持包年包月及按量付费两种付费模式,可以根据需要选择不同的付费模式。

在购买页面需要配置的参数包括:

  • 源实例
    同步作业的源实例类型,目前只支持MySQL。
  • 源地域
    如果为本地自建 MySQL,那么选择专线在阿里云上的接入点所在的地区。如果为 ECS 上的自建 MySQL,那么选择 ECS 实例所在的地区。如果为 RDS for MySQL,那么选择 RDS 实例所在的地区。
  • 目标实例
    目标实例为同步作业的目标实例类型,目前支持 MySQL、MaxCompute (原 ODPS)、分析型数据库 AnalyticDB、DataHub。配置 MySQL->MaxCompute 同步链路时,目标实例选择:MaxCompute 即可。
  • 目标地域
    选择目标地域,可选地域仅限于当前已经开通了 MaxCompute 的实例区域。
  • 实例规格
    实例规格影响了链路的同步性能,可以根据业务性能选择合适的规则。

当购买完同步实例,返回数据传输控制台,点击新购链路右侧的“配置同步作业” 开始链路配置。

2.同步链路连接信息配置

在这一步主要配置:

  • 同步作业名称
    同步作业名称没有唯一性要求,主要为了更方便识别具体的作业,建议选择一个有业务意义的作业名称,方便后续的链路查找及管理。

  • 数据源连接信息配置

在这个步骤中需要配置源实例的连接信息,及目标 MaxCompute 实例的 project。配置的 MaxCompute project 必须属于登录 DTS 的阿里云账号的资源。

源实例可以支持:通过专线接入阿里云的自建数据库、ECS 上的自建数据库、RDS。

如果源实例为通过专线接入阿里云的自建数据库,那么需要配置的连接信息如下:

  • 实例类型:选择 通过专线接入阿里云的本地 DB
  • 实例地区:选择 专线 接入阿里云的接入点,例如接入阿里云的北京,那么选择 华北 2 即可。
  • 对端专有网络:专线 接入的阿里云上的 专有网络的 VPC ID
  • 主机名或IP地址:配置本地 MySQL 数据库访问地址,这个地址为本地局域网访问地址
  • 端口:本地 MySQL 实例监听端口
  • 数据库账号:本地 MySQL 实例访问账号
  • 数据库密码:上面指定的 MySQL 访问账号对应的密码

本地自建DB

如果源实例为 ECS 上的自建数据库,那么需要配置的连接信息如下:

  • 实例类型:选择 ECS 上的自建数据库
  • ECS 实例 ID: 配置ECS实例的实例 ID
  • 端口:本地 MySQL 实例监听端口
  • 数据库账号:本地 MySQL 实例访问账号
  • 数据库密码:上面指定的 MySQL 访问账号对应的密码

ECS上的自建数据库

如果源实例为 RDS for MySQL,那么只需要配置 RDS 实例的实例 ID。

实例连接信息

当这些内容配置完成后,可以点击授权白名单并进入下一步

3.授权 RDS 实例白名单

这个步骤,主要是将给 DTS 服务账号授权 MaxCompute 写权限,让 DTS 能够将数据同步复制到 MaxCompute 中。

步骤2

授权权限包括对 project 的:
CreateTable
CreateInstance
CreateResource
CreateJob
List
为了保证同步作业的稳定性,在同步过程中,请勿将写权限回收。当白名单授权后,点击下一步,进入同步账号创建。

当授权完成后,即进入同步对象选择。

4.选择同步对象

当 MaxCompute 账号授权完成后,即进入同步表及同步初始化的相关配置。

步骤3

在这个步骤中,需要配置 同步初始化 和 同步表。其中:
(1)同步初始化
同步初始化选项包括: 结构初始化 和 全量数据初始化。
结构初始化是指对于待同步的表,在 MaxCompute 中创建对应的表,完成表结构定义。
全量数据初始化是指对于待同步的表,将历史数据初始化到 MaxCompute 中。
配置任务时,建议同时选择 结构初始化+全量数据初始化。

(2) 同步表选择

同步表只能选择某些表,不能直接选择整个库。对于同步的表,可以修改表在 MaxCompute 对应的全量基线表及增量日志表的表名前缀。如需修改,可以点击右边已选择对象后面的编辑按钮,进入修改界面。步骤4

当配置完同步对象后,进入同步初始化配置。

5.预检查

当上面所有选项配置完成后,即进入启动之前的预检查。具体检查项内容详见本文最后的 预检查内容 一节。
当同步作业配置完成后,数据传输服务会进行限制预检查,当预检查通过后,可以点击 确定 按钮,启动同步作业。

当同步作业启动之后,即进入同步作业列表。此时刚启动的作业处于同步初始化状态。初始化的时间长度依赖于源实例中同步对象的数据量大小。当初始化完成后同步链路即进入同步中的状态,此时源跟目标实例的同步链路才真正建立完成。

当同步任务进入 同步中 时,可以在 MaxCompute 中可以查询出与之前的配置所对应的全量基线表和增量日志表:
表信息

至此,完成 RDS->MaxCompute 数据实时同步作业的配置。

全量数据合并方案

本小节介绍,如何根据同步到 MaxCompute 中的全量基线表和增量日志数据得到某个时刻表的全量数据。
DTS 提供通过 MaxCompute SQL 实现全量数据合并的能力。

通过 MaxCompute SQL merge 全量基线数据 和 增量日志表 得到时刻t的全量数据。MaxCompute SQL 的写法如下:

  1. insert overwrite table result_storage_table
  2. select col1,
  3. col2,
  4. colN
  5. from(
  6. select row_number() over(partition by t.primary_key_column
  7. order by record_id desc, after_flag desc) as row_number, record_id, operation_flag, after_flag, col1,col2,colN
  8. from(
  9. select incr.record_id, incr.operation_flag, incr.after_flag, incr.col1, incr.col2,incr.colN
  10. from table_log incr
  11. where utc_timestamp< timestmap
  12. union all
  13. select 0 as record_id, 'I' as operation_flag, 'Y' as after_flag, base.col1, base.col2,base.colN
  14. from table_base base) t) gt
  15. where record_num=1
  16. and after_flag='Y'

上面代码中的几个变量意义如下:
1) result_storage_table 表示全量 merge 结果集的存储表的表名
2) col1, col2,colN 表示同步表中列的列名
3) primary_key_column 表示同步表中的主键列的列名
4) table_log 表示增量日志表
5) table_base 表示全量基线表
6) timestmap 表示需要 merge 哪个时刻的全量数据

例如对于上面配置任务中的示例 jiangliu_test 表,jiangliu_test_20161010_base 为 jiangliu_test 对应的全量基线表,jiangliu_test_20161010_log 为 jiangliu_test 对应的增量日志表。

jiangliu_test 的表结构定义为:
表结构

那么查询时间戳 1476263486 时刻,示例中 jiangliu_test 表的全量数据的 MaxCompute SQL 如下:

  1. insert overwrite table jiangliu_test_1476263486
  2. select id,
  3. name
  4. from(
  5. select row_number() over(partition by t.id
  6. order by record_id desc, after_flag desc) as row_number, record_id, operation_flag, after_flag, id, name
  7. from(
  8. select incr.record_id, incr.operation_flag, incr.after_flag, incr.id, incr.name
  9. from jiangliu_test_20161010_log incr
  10. where utc_timestamp< 1476263486
  11. union all
  12. select 0 as record_id, 'I' as operation_flag, 'Y' as after_flag, base.id, base.name
  13. from jiangliu_test_20161010_base base) t) gt
  14. where gt.row_number= 1
  15. and gt.after_flag= 'Y' ;

您也可以通过大数据开发套件,在后续的计算分析操作之前,添加全量数据 Merge 节点,当全量数据 merge 完成后,可自动调度起后续的计算分析节点。同时可以配置调度周期,进行周期性的数据离线分析。

至此完成 MySQL->MaxCompute 数据同步任务配置及全量数据合并。

本文导读目录