大数据计算服务(MaxCompute,原名ODPS)是一种快速、完全托管的EB级数据仓库解决方案。通过数据传输服务DTS(Data Transmission Service),您可以将MySQL的数据同步至MaxCompute,帮助您快速搭建数据实时分析系统。

前提条件

注意事项

  • 仅支持表级别的数据同步。
  • 如果源数据库没有主键或唯一约束,且所有字段没有唯一性,可能会导致目标数据库中出现重复数据。

源库支持的实例类型

执行数据同步操作的MySQL数据库支持以下实例类型:

  • 有公网IP的自建数据库
  • ECS上的自建数据库
  • 通过专线/VPN网关/智能网关接入的自建数据库
  • 同一或不同阿里云账号下的RDS for MySQL实例
本文以RDS for MySQL实例为例介绍配置流程,当源库为其他实例类型时,配置流程与该案例类似。
说明 如果源库为自建MySQL数据库,您还需要为自建MySQL创建账号并设置binlog

同步过程介绍

  1. 结构初始化。
    DTS将源库中待同步表的结构定义信息同步至MaxCompute中,初始化时DTS会将表名变更为同步的目标表名_base,例如customer_base。
  2. 全量数据初始化。
    DTS将源库中待同步表的存量数据,全部同步至Maxcompute的同步的目标表名_base表中,作为后续增量同步数据的基线数据。
    说明 该表也被称为全量基线表。
  3. 增量数据同步。
    DTS在Maxcompute中创建一个增量日志表,表名为同步的目标表名_log,例如customer_log,然后将源库产生的增量数据实时同步到该表中。
    说明 关于增量日志表结构的详细信息,请参见增量日志表结构定义说明

操作步骤

  1. 购买数据同步作业
    说明 购买时,选择源实例为MySQL,目标实例为MaxCompute,并选择同步拓扑为单向同步
  2. 登录数据传输控制台
  3. 在左侧导航栏,单击数据同步
  4. 同步作业列表页面顶部,选择数据同步实例所属地域。

    选择地域
  5. 定位至已购买的数据同步实例,单击配置同步链路
  6. 配置同步通道的源实例及目标实例信息。

    源和目标实例配置
    配置项目 配置选项 配置说明
    同步作业名称 -
    • DTS为每个同步作业自动生成一个名称,该名称没有唯一性要求。
    • 您可以根据需要修改同步作业名称,建议配置具有业务意义的名称,便于后续的识别。
    源实例信息 实例类型 选择RDS实例
    实例地区 购买数据同步实例时选择的源实例地域信息,不可变更。
    实例ID 选择作为数据同步源的RDS实例ID。
    数据库账号 填入源RDS的数据库账号。
    说明 当源RDS实例的数据库类型为MySQL 5.5MySQL 5.6时,无需配置数据库账号数据库密码
    数据库密码 填入数据库账号对应的密码。
    连接方式 根据需求选择为非加密连接SSL安全连接,本案例选择为非加密连接
    说明 选择SSL安全连接时,需要提前开启RDS实例的SSL加密功能,详情请参见设置SSL加密
    目标实例信息 实例类型 固定为MaxCompute,不可变更。
    实例地区 购买数据同步实例时选择的目标实例地域信息,不可变更。
    Project 填入MaxCompute实例的Project,您可以在MaxCompute工作空间列表页面中查询。
    MaxCompute工作空间列表
  7. 单击页面右下角的授权白名单并进入下一步
    说明 此步骤会将DTS服务器的IP地址自动添加到RDS实例和MaxCompute实例的白名单中,用于保障DTS服务器能够正常连接源和目标实例。
  8. 单击页面右下角的下一步,允许将MaxCompute中项目的下述权限授予给DTS同步账号。


  9. 配置同步策略和同步对象。

    配置同步策略和对象
    配置项目 配置说明
    增量日志表分区定义 根据业务需求,选择分区名称。关于分区的相关介绍请参见分区
    同步初始化

    同步初始化类型细分为:结构初始化,全量数据初始化。

    此处同时勾选结构初始化全量数据初始化,DTS会在增量数据同步之前,将源数据库中待同步对象的结构和存量数据,同步到目标数据库。

    目标已存在表的处理模式
    • 预检查并报错拦截:检查目标数据库中是否有同名的表。如果目标数据库中没有同名的表,则通过该检查项目;如果目标数据库中有同名的表,则在预检查阶段提示错误,数据同步作业不会被启动。
      说明 如果目标库中同名的表不方便删除或重命名,您可以设置同步对象在目标实例中的名称来避免表名冲突。
    • 无操作:跳过目标数据库中是否有同名表的检查项。
      警告 选择为无操作,可能导致数据不一致,给业务带来风险,例如:
      • 表结构一致的情况下,如果在目标库遇到与源库主键的值相同的记录,在初始化阶段会保留目标库中的该条记录;在增量同步阶段则会覆盖目标库的该条记录。
      • 表结构不一致的情况下,可能会导致无法初始化数据、只能同步部分列的数据或同步失败。
    选择同步对象

    源库对象框中单击待同步的表,然后单击向右小箭头将其移动至已选择对象框。

    说明
    • 同步对象支持选择的粒度仅为表,您可以从多个库中选择表作为同步对象。
    • 默认情况下,同步对象的名称保持不变。如果您需要在目标实例上名称不同,那么需要使用DTS提供的对象名映射功能,详情请参见设置同步对象在目标实例中的名称
  10. 上述配置完成后,单击页面右下角的预检查并启动
    说明
    • 在数据同步作业正式启动之前,会先进行预检查。只有预检查通过后,才能成功启动数据同步作业。
    • 如果预检查失败,单击具体检查项后的,查看失败详情。根据提示修复后,重新进行预检查。
  11. 预检查对话框中显示预检查通过后,关闭预检查对话框,同步作业将正式开始。
  12. 等待同步作业的链路初始化完成,直至处于同步中状态。
    您可以在 数据同步页面,查看数据同步作业的状态。

增量日志表结构定义说明

DTS在将MySQL产生的增量数据同步至MaxCompute的增量日志表时,除了存储增量数据,还会存储一些元信息,示例如下。


增量日志表结构
说明 示例中的modifytime_year stringmodifytime_month stringmodifytime_day stringmodifytime_hour string为分区字段,是在配置同步策略和同步对象步骤中指定的。

结构定义说明

字段 说明
record_id 增量日志的记录id,为该日志唯一标识。
说明
  • id的值唯一且递增。
  • 如果增量日志的操作类型为UPDATE,那么增量更新会被拆分成两条记录,一条为DELETE,一条为INSERT,并且这两条记录的record_id的值相同。
operation_flag 操作类型,取值:
  • I:INSERT操作。
  • D:DELETE操作。
  • U:UPDATE操作。
utc_timestamp 操作时间戳,即binlog的时间戳(UTC 时间)。
before_flag 所有列的值是否为更新前的值,取值:YN
after_flag 所有列的值是否为更新后的值,取值:YN

关于before_flag和after_flag的补充说明

对于不同的操作类型,增量日志中的before_flagafter_flag定义如下:

  • INSERT

    当操作类型为INSERT时,所有列的值为新插入的记录值,即为更新后的值,所以before_flag取值为N,after_flag取值为Y,示例如下。


    INSERT操作示例
  • UPDATE

    当操作类型为UPDATE时,DTS会将UPDATE操作拆为两条增量日志。这两条增量日志的record_id、operation_flag及utc_timestamp对应的值相同。

    第一条增量日志记录了更新前的值,所以before_flag取值为Y,after_flag取值为N。第二条增量日志记录了更新后的值,所以before_flag取值为N,after_flag取值为Y,示例如下。


    UPDATE操作示例
  • DELETE

    当操作类型为DELETE时,增量日志中所有的列值为被删除的值,即列值不变,所以before_flag取值为Y,after_flag取值为N,示例如下。


    DELETE操作示例

全量数据合并示例

执行数据同步的操作后,DTS会在MaxCompute中分别创建该表的全量基线表和增量日志表。您可以通过MaxCompute的SQL命令,对这两个表执行合并操作,得到某个时间点的全量数据。

本案例以customer表为例(表结构如下),介绍操作流程。


customer表结构
  1. 根据源库中待同步表的结构,在Maxcompute中创建用于存储合并结果的表。

    例如,需要获取customer表在1565944878时间点的全量数据。为方便业务识别,创建如下数据表:

    CREATE TABLE `customer_1565944878` (
        `id` bigint NULL,
        `register_time` datetime NULL,
        `address` string);
    说明
  2. 在MaxCompute中执行如下SQL命令,合并全量基线表和增量日志表,获取该表在某一时间点的全量数据。

    SQL命令

    set odps.sql.allow.fullscan=true;
    insert overwrite table <result_storage_table>
    select <col1>,
           <col2>,
           <colN>
      from(
    select row_number() over(partition by t.<primary_key_column>
     order by record_id desc, after_flag desc) as row_number, record_id, operation_flag, after_flag, <col1>, <col2>, <colN>
      from(
    select incr.record_id, incr.operation_flag, incr.after_flag, incr.<col1>, incr.<col2>,incr.<colN>
      from <table_log> incr
     where utc_timestamp< <timestmap>
     union all
    select 0 as record_id, 'I' as operation_flag, 'Y' as after_flag, base.<col1>, base.<col2>,base.<colN>
      from <table_base> base) t) gt
    where record_num=1 
      and after_flag='Y'
    说明
    • <result_storage_table>:存储全量merge结果集的表名。
    • <col1>/<col2>/<colN>:同步表中的列名。
    • <primary_key_column:同步表中的主键列名。
    • <table_log>:增量日志表名。
    • <table_base>:全量基线表名。
    • <timestmap>:需要获取全量数据的时间点。

    合并数据表,获取customer表在1565944878时间点的全量数据,示例如下:

    set odps.sql.allow.fullscan=true;
    insert overwrite table customer_1565944878
    select id,
           register_time,
           address
      from(
    select row_number() over(partition by t.id
     order by record_id desc, after_flag desc) as row_number, record_id, operation_flag, after_flag, id, register_time, address
      from(
    select incr.record_id, incr.operation_flag, incr.after_flag, incr.id, incr.register_time, incr.address
      from customer_log incr
     where utc_timestamp< 1565944878
     union all
    select 0 as record_id, 'I' as operation_flag, 'Y' as after_flag, base.id, base.register_time, base.address
      from customer_base base) t) gt
     where gt.row_number= 1
       and gt.after_flag= 'Y';
  3. 上述命令执行完成后,可在customer_1565944878表中查询合并后的数据。

    查询merge后的数据