大数据计算服务(MaxCompute,原名ODPS)是一种快速、完全托管的EB级数据仓库解决方案。通过数据传输服务DTS(Data Transmission Service),您可以将MySQL的数据同步至MaxCompute,帮助您快速搭建数据实时分析系统。

前提条件

注意事项

  • 仅支持表级别的数据同步。
  • 在数据同步时,请勿对源库的同步对象使用gh-ost或pt-online-schema-change等类似工具执行在线DDL变更,否则会导致同步失败。
  • 如果源数据库没有主键或唯一约束,且所有字段没有唯一性,可能会导致目标数据库中出现重复数据。

源库支持的实例类型

执行数据同步操作的MySQL数据库支持以下实例类型:

  • 有公网IP的自建数据库
  • ECS上的自建数据库
  • 通过专线/VPN网关/智能网关接入的自建数据库
  • 同一或不同阿里云账号下的RDS for MySQL实例
本文以RDS for MySQL实例为例介绍配置流程,当源库为其他实例类型时,配置流程与该案例类似。
说明 如果源库为自建MySQL数据库,您还需要为自建MySQL创建账号并设置binlog

同步过程介绍

  1. 结构初始化。
    DTS将源库中待同步表的结构定义信息同步至MaxCompute中,初始化时DTS会将表名变更为同步的目标表名_base,例如customer_base。
  2. 全量数据初始化。
    DTS将源库中待同步表的存量数据,全部同步至Maxcompute的同步的目标表名_base表中,作为后续增量同步数据的基线数据。
    说明 该表也被称为全量基线表。
  3. 增量数据同步。
    DTS在Maxcompute中创建一个增量日志表,表名为同步的目标表名_log,例如customer_log,然后将源库产生的增量数据实时同步到该表中。
    说明 关于增量日志表结构的详细信息,请参见增量日志表结构定义说明

操作步骤

  1. 购买数据同步作业
    说明 购买时,选择源实例为MySQL,目标实例为MaxCompute,并选择同步拓扑为单向同步
  2. 登录数据传输控制台
  3. 在左侧导航栏,单击数据同步
  4. 同步作业列表页面顶部,选择数据同步实例所属地域。
    选择地域
  5. 定位至已购买的数据同步实例,单击配置同步链路
  6. 配置同步通道的源实例及目标实例信息。
    配置源和目标实例信息
    配置项目 配置选项 配置说明
    同步作业名称 - DTS会自动生成一个同步作业名称,建议配置具有业务意义的名称(无唯一性要求),便于后续识别。
    源实例信息 实例类型 选择RDS实例
    实例地区 购买数据同步实例时选择的源实例地域信息,不可变更。
    实例ID 选择作为数据同步源的RDS实例ID。
    数据库账号 填入源RDS的数据库账号。
    说明 当源RDS实例的数据库类型为MySQL 5.5MySQL 5.6时,无需配置数据库账号数据库密码
    数据库密码 填入数据库账号对应的密码。
    连接方式 根据需求选择非加密连接SSL安全连接。如果设置为SSL安全连接,您需要提前开启RDS实例的SSL加密功能,详情请参见设置SSL加密
    目标实例信息 实例类型 固定为MaxCompute,不可变更。
    实例地区 购买数据同步实例时选择的目标实例地域信息,不可变更。
    Project 填入MaxCompute实例的Project,您可以在MaxCompute工作空间列表页面中查询。MaxCompute工作空间列表
  7. 单击页面右下角的授权白名单并进入下一步
    说明 此步骤会将DTS服务器的IP地址自动添加到RDS实例和MaxCompute实例的白名单中,用于保障DTS服务器能够正常连接源和目标实例。
  8. 单击页面右下角的下一步,允许将MaxCompute中项目的下述权限授予给DTS同步账号。
    账号授权
  9. 配置同步策略和同步对象。
    配置同步策略和对象
    配置项目 配置说明
    增量日志表分区定义 根据业务需求,选择分区名称。关于分区的相关介绍请参见分区
    同步初始化

    同步初始化类型细分为:结构初始化,全量数据初始化。

    此处同时勾选结构初始化全量数据初始化,DTS会在增量数据同步之前,将源数据库中待同步对象的结构和存量数据,同步到目标数据库。

    目标已存在表的处理模式
    • 预检查并报错拦截:检查目标数据库中是否有同名的表。如果目标数据库中没有同名的表,则通过该检查项目;如果目标数据库中有同名的表,则在预检查阶段提示错误,数据同步作业不会被启动。
      说明 如果目标库中同名的表不方便删除或重命名,您可以设置同步对象在目标实例中的名称来避免表名冲突。
    • 无操作:跳过目标数据库中是否有同名表的检查项。
      警告 选择为无操作,可能导致数据不一致,给业务带来风险,例如:
      • 表结构一致的情况下,如果在目标库遇到与源库主键的值相同的记录,在初始化阶段会保留目标库中的该条记录;在增量同步阶段则会覆盖目标库的该条记录。
      • 表结构不一致的情况下,可能会导致无法初始化数据、只能同步部分列的数据或同步失败。
    选择同步对象

    源库对象框中单击待同步的表,然后单击向右小箭头将其移动至已选择对象框。

    说明
    • 同步对象支持选择的粒度仅为表,您可以从多个库中选择表作为同步对象。
    • 默认情况下,同步对象的名称保持不变。如果您需要在目标实例上名称不同,那么需要使用DTS提供的对象名映射功能,详情请参见设置同步对象在目标实例中的名称
  10. 上述配置完成后,单击页面右下角的预检查并启动
    说明
    • 在数据同步作业正式启动之前,会先进行预检查。只有预检查通过后,才能成功启动数据同步作业。
    • 如果预检查失败,单击具体检查项后的提示,查看失败详情。根据提示修复后,重新进行预检查。
  11. 预检查对话框中显示预检查通过后,关闭预检查对话框,同步作业将正式开始。
  12. 等待同步作业的链路初始化完成,直至处于同步中状态。
    您可以在 数据同步页面,查看数据同步作业的状态。查看同步作业状态

增量日志表结构定义说明

DTS在将MySQL产生的增量数据同步至MaxCompute的增量日志表时,除了存储增量数据,还会存储一些元信息,示例如下。

增量日志表结构
说明 示例中的modifytime_year stringmodifytime_month stringmodifytime_day stringmodifytime_hour string为分区字段,是在配置同步策略和同步对象步骤中指定的。

结构定义说明

字段 说明
record_id 增量日志的记录id,为该日志唯一标识。
说明
  • id的值唯一且递增。
  • 如果增量日志的操作类型为UPDATE,那么增量更新会被拆分成两条记录,一条为DELETE,一条为INSERT,并且这两条记录的record_id的值相同。
operation_flag 操作类型,取值:
  • I:INSERT操作。
  • D:DELETE操作。
  • U:UPDATE操作。
utc_timestamp 操作时间戳,即binlog的时间戳(UTC 时间)。
before_flag 所有列的值是否为更新前的值,取值:Y或N。
after_flag 所有列的值是否为更新后的值,取值:Y或N。

关于before_flag和after_flag的补充说明

对于不同的操作类型,增量日志中的before_flagafter_flag定义如下:

  • INSERT

    当操作类型为INSERT时,所有列的值为新插入的记录值,即为更新后的值,所以before_flag取值为N,after_flag取值为Y,示例如下。

    INSERT操作示例
  • UPDATE

    当操作类型为UPDATE时,DTS会将UPDATE操作拆为两条增量日志。这两条增量日志的record_id、operation_flag及utc_timestamp对应的值相同。

    第一条增量日志记录了更新前的值,所以before_flag取值为Y,after_flag取值为N。第二条增量日志记录了更新后的值,所以before_flag取值为N,after_flag取值为Y,示例如下。

    UPDATE操作示例
  • DELETE

    当操作类型为DELETE时,增量日志中所有的列值为被删除的值,即列值不变,所以before_flag取值为Y,after_flag取值为N,示例如下。

    DELETE操作示例

全量数据合并示例

执行数据同步的操作后,DTS会在MaxCompute中分别创建该表的全量基线表和增量日志表。您可以通过MaxCompute的SQL命令,对这两个表执行合并操作,得到某个时间点的全量数据。

本案例以customer表为例(表结构如下),介绍操作流程。

customer表结构
  1. 根据源库中待同步表的结构,在Maxcompute中创建用于存储合并结果的表。

    例如,需要获取customer表在1565944878时间点的全量数据。为方便业务识别,创建如下数据表:

    CREATE TABLE `customer_1565944878` (
        `id` bigint NULL,
        `register_time` datetime NULL,
        `address` string);
    说明
  2. 在MaxCompute中执行如下SQL命令,合并全量基线表和增量日志表,获取该表在某一时间点的全量数据。

    SQL命令

    set odps.sql.allow.fullscan=true;
    insert overwrite table <result_storage_table>
    select <col1>,
           <col2>,
           <colN>
      from(
    select row_number() over(partition by t.<primary_key_column>
     order by record_id desc, after_flag desc) as row_number, record_id, operation_flag, after_flag, <col1>, <col2>, <colN>
      from(
    select incr.record_id, incr.operation_flag, incr.after_flag, incr.<col1>, incr.<col2>,incr.<colN>
      from <table_log> incr
     where utc_timestamp< <timestmap>
     union all
    select 0 as record_id, 'I' as operation_flag, 'Y' as after_flag, base.<col1>, base.<col2>,base.<colN>
      from <table_base> base) t) gt
    where record_num=1 
      and after_flag='Y'
    说明
    • <result_storage_table>:存储全量merge结果集的表名。
    • <col1>/<col2>/<colN>:同步表中的列名。
    • <primary_key_column:同步表中的主键列名。
    • <table_log>:增量日志表名。
    • <table_base>:全量基线表名。
    • <timestmap>:需要获取全量数据的时间点。

    合并数据表,获取customer表在1565944878时间点的全量数据,示例如下:

    set odps.sql.allow.fullscan=true;
    insert overwrite table customer_1565944878
    select id,
           register_time,
           address
      from(
    select row_number() over(partition by t.id
     order by record_id desc, after_flag desc) as row_number, record_id, operation_flag, after_flag, id, register_time, address
      from(
    select incr.record_id, incr.operation_flag, incr.after_flag, incr.id, incr.register_time, incr.address
      from customer_log incr
     where utc_timestamp< 1565944878
     union all
    select 0 as record_id, 'I' as operation_flag, 'Y' as after_flag, base.id, base.register_time, base.address
      from customer_base base) t) gt
     where gt.row_number= 1
       and gt.after_flag= 'Y';
  3. 上述命令执行完成后,可在customer_1565944878表中查询合并后的数据。
    查询merge后的数据