阿里云流式数据服务DataHub是流式数据(Streaming Data)的处理平台,提供对流式数据的发布、订阅和分发功能,让您可以轻松构建基于流式数据的分析和应用。通过数据传输服务DTS(Data Transmission Service),您可以将RDS MySQL同步至阿里云流式数据服务DataHub,帮助您快速实现使用流计算等大数据产品对数据实时分析。

前提条件

注意事项

类型 说明
源库限制
  • 待同步的表需具备主键或唯一约束,且字段具有唯一性,否则可能会导致目标数据库中出现重复数据。
  • 如同步对象为表级别,且需进行编辑(如表列名映射),则单次同步任务仅支持同步至多1000张表。当超出数量限制,任务提交后会显示请求报错,此时建议您拆分待同步的表,分批配置多个任务,或者配置整库的同步任务。
  • Binlog日志:
    • 需开启Binlog,开启方法请参见设置实例参数,并且binlog_row_image为full。否则预检查阶段提示报错,且无法成功启动数据同步任务。
      注意
      • 如源实例为自建MySQL,则需开启Binlog,并且binlog_format为row、binlog_row_image为full。
      • 如源实例自建MySQL是双主集群(两者互为主从),为保障DTS能获取全部的Binlog日志,则您需开启参数log_slave_updates。具体操作请参见为自建MySQL创建账号并设置binlog
    • 如为增量同步任务,DTS要求源数据库的本地Binlog日志保存24小时以上,如为库表结构同步和增量同步任务,DTS要求源数据库的本地Binlog日志至少保留7天以上(您可在库表结构同步完成后将Binlog保存时间设置为24小时以上),否则DTS可能因无法获取Binlog而导致任务失败,极端情况下甚至可能会导致数据不一致或丢失。由于您所设置的Binlog日志保存时间低于DTS要求的时间进而导致的问题,不在DTS的SLA保障范围内。如源为RDS MySQL,具体操作请参见本地日志(Binlog)

其他限制
  • 不支持全量数据初始化,即DTS不会将源RDS实例中同步对象的存量数据同步至目标DataHub实例。
  • 在DTS同步期间,不允许有除DTS外的数据写入目标库,否则会导致源库与目标库数据不一致。例如,有除DTS外的数据写入目标库时,使用DMS执行在线DDL变更,可能引起目标库数据丢失。
特殊情况
当源库为自建MySQL时
  • 在同步时,如果源库进行主备切换,将会导致同步任务失败。
  • 由于DTS的延迟时间是根据同步到目标库最后一条数据的时间戳和当前时间戳对比得出,源库长时间未执行DML操作可能导致延迟信息不准确。如果任务显示的延迟时间过大,您可以在源库执行一个DML操作来更新延迟信息。
    说明 如果同步对象选择为整库,您还可以创建心跳表,心跳表每秒定期更新或者写入数据。

支持的同步架构

  • 一对一单向同步。
  • 一对多单向同步。
  • 多对一单向同步。
关于各类同步架构的介绍及注意事项,请参见数据同步拓扑介绍

支持同步的SQL操作

操作类型 SQL操作语句
DML INSERT、UPDATE、DELETE
注意 如需使用DTS同步新增列的数据(即源数据表新增了某个列,该列的数据同步至目标DataHub),您需要先在目标表中新增列,然后相应地在源表新增列。

操作步骤

说明 本文以新版DTS操作为例,与DMS操作有一些差异,具体操作请以DMS的实际界面为准。
  1. 登录新版DTS同步任务的列表页面
    说明 您也可以登录DMS数据管理服务。在顶部菜单栏中,选择集成与开发(DTS),在左侧导航栏选择数据传输(DTS) > 数据同步
  2. 在页面左上角,选择同步实例所属地域。
    地域
  3. 单击创建任务,配置源库及目标库信息。
    警告 选择源和目标实例后,建议您仔细阅读页面上方显示的使用限制,以成功创建并执行同步任务。
    源目库配置
    类别 配置 说明
    任务名称

    DTS会自动生成一个任务名称,建议配置具有业务意义的名称(无唯一性要求),便于后续识别。

    源库信息 选择已有的实例
    您可以按实际需求,选择是否使用已有实例。
    • 如使用已有实例,数据库信息将自动填入,您无需重复输入。
    • 如不使用已有实例,您需要输入下方的数据库信息。
    数据库类型 选择MySQL
    接入方式 选择阿里云实例
    实例地区 选择源RDS MySQL实例所属地域。
    是否跨阿里云账号 本场景为同一阿里云账号间迁移,选择不跨账号
    RDS实例ID 选择源RDS MySQL实例ID。
    数据库账号 填入源RDS MySQL实例的数据库账号,需具备待同步对象的读权限。
    数据库密码

    填入该数据库账号对应的密码。

    连接方式

    根据需求选择非加密连接SSL安全连接。如果设置为SSL安全连接,您需要提前开启RDS MySQL实例的SSL加密功能,详情请参见设置SSL加密

    目标库信息 选择已有的实例
    您可以按实际需求,选择是否使用已有实例。
    • 如使用已有实例,数据库信息将自动填入,您无需重复输入。
    • 如不使用已有实例,您需要输入下方的数据库信息。
    数据库类型 选择DataHub
    接入方式 选择阿里云实例
    实例地区 选择DataHub实例所属地域。
    Project 选择目标DataHubProject
  4. 配置完成后,单击页面右下角的测试连接以进行下一步
    说明
    • 如果源或目标数据库是阿里云数据库实例(例如RDS MySQL云数据库MongoDB版等)或ECS上的自建数据库,DTS会自动将对应地区DTS服务的IP地址添加到阿里云数据库实例的白名单或ECS的安全规则中,您无需手动添加,请参见DTS服务器的IP地址段
    • DTS任务完成或释放后,建议您手动删除添加的DTS服务器IP地址段。
  5. 配置任务对象及高级配置。
    • 基础配置基础配置
      配置 说明
      任务步骤
      固定选中增量同步。仅支持选中库表结构同步,不支持全量同步
      说明 在数据同步任务的初始化阶段,DTS会将同步对象的结构定义(例如表结构)同步至目标DataHub实例。
      目标已存在表的处理模式
      • 预检查并报错拦截:检查目标数据库中是否有同名的表。如果目标数据库中没有同名的表,则通过该检查项目;如果目标数据库中有同名的表,则在预检查阶段提示错误,数据同步任务不会被启动。

        说明 如果目标库中同名的表不方便删除或重命名,您可以更改该表在目标库中的名称,请参见库表列名映射
      • 忽略报错并继续执行:跳过目标数据库中是否有同名表的检查项。
        警告 选择为忽略报错并继续执行,可能导致数据不一致,给业务带来风险,例如:
        • 表结构一致的情况下,如在目标库遇到与源库主键的值相同的记录:
          • 全量期间,DTS会保留目标集群中的该条记录,即源库中的该条记录不会同步至目标数据库中。
          • 增量期间,DTS不会保留目标集群中的该条记录,即源库中的该条记录会覆盖至目标数据库中。
        • 表结构不一致的情况下,可能会导致无法初始化数据、只能同步部分列的数据或同步失败。
      是否启用新的附加列规则
      DTS在将数据同步到DataHub时,会在同步的目标Topic中添加一些附加列。如果附加列和目标Topic中已有的列出现名称冲突将会导致数据同步失败。您需要根据业务需求选择是否启用新的附加列规则
      警告 在选择附加列规则前,您需要评估附加列和目标Topic中已有的列是否会出现名称冲突。关于附加列的规则和定义说明,请参见附加列名称和定义说明
      同步对象

      源库对象框中单击待同步对象,然后单击向右小箭头将其移动至已选择对象框。

      说明 同步对象的选择粒度为表。
      映射名称更改
      • 如需更改单个同步对象在目标实例中的名称,请单击已选择对象中的同步对象,设置方式,请参见库表列名单个映射
      • 如需批量更改同步对象在目标实例中的名称,请单击已选择对象方框右上方的批量编辑,设置方式,请参见库表列名批量映射
      过滤待同步数据

      支持设置WHERE条件过滤数据,请参见通过SQL条件过滤任务数据

      同步的SQL操作 请右击已选择对象中的同步对象,在弹跳框中选择所需同步的DML操作。支持的操作,请参见支持同步的SQL操作
    • 高级配置高级配置
      配置 说明
      设置告警
      是否设置告警,当同步失败或延迟超过阈值后,将通知告警联系人。
      • 不设置:不设置告警。
      • 设置:设置告警,您还需要设置告警阈值和告警联系人。
      目标库对象名称大小写策略

      您可以配置目标实例中迁移对象的库名、表名和列名的英文大小写策略。默认情况下选择DTS默认策略,您也可以选择与源库、目标库默认策略保持一致。更多信息,请参见目标库对象名称大小写策略

      源库、目标库无法连接后的重试时间
      在同步任务连接失败时,DTS会立即进行持续的重试连接,默认持续重试时间为120分钟,您也可以在取值范围(10~1440分钟)内自定义重试时间,建议设置30分钟以上。如果DTS在设置的重试时间内重新连接上源库、目标库,同步任务将自动恢复。否则,同步任务将失败。
      说明
      • 针对同源或者同目标的多个DTS实例,如DTS实例A和DTS实例B,设置网络重试时间时A设置30分钟,B设置60分钟,则重试时间以低的30分钟为准。
      • 由于连接重试期间,DTS将收取任务运行费用,建议您根据业务需要自定义重试时间,或者在源和目标库实例释放后尽快释放DTS实例。
  6. 可选:将鼠标指针放置在已选择对象框中待同步的Topic名上,鼠标右键单击对象后出现编辑,然后在弹出的对话框中设置Shardkey(即用于分区的key)。
    设置Shardkey
  7. 上述配置完成后,单击页面下方的下一步保存任务并预检查
    说明
    • 在同步作业正式启动之前,会先进行预检查。只有预检查通过后,才能成功启动同步作业。
    • 如果预检查失败,单击具体检查项后的提示,查看失败详情。
      • 您可以根据提示修复后重新进行预检查。
      • 如无需修复告警检测项,您也可以选择确认屏蔽忽略告警项并重新进行预检查,跳过告警检测项重新进行预检查。
  8. 预检查通过率显示为100%时,单击下一步购买
  9. 购买页面,选择数据同步实例的计费方式、链路规格,详细说明请参见下表。
    类别 参数 说明
    信息配置 计费方式
    • 预付费(包年包月):在新建实例时支付费用。适合长期需求,价格比按量付费更实惠,且购买时长越长,折扣越多。
    • 后付费(按量付费):按小时扣费。适合短期需求,用完可立即释放实例,节省费用。
    链路规格 DTS为您提供了不同性能的同步规格,同步链路规格的不同会影响同步速率,您可以根据业务场景进行选择,详情请参见数据同步链路规格说明
    订购时长 在预付费模式下,选择包年包月实例的时长和数量,包月可选择1~9个月,包年可选择1~3年。
    说明 该选项仅在付费类型为预付费时出现。
  10. 配置完成后,阅读并勾选《数据传输(按量付费)服务条款》
  11. 单击购买并启动,同步任务正式开始,您可在数据同步界面查看具体任务进度。

Topic结构定义说明

DTS在将数据变更同步至DataHub实例的Topic时,目标Topic中除了存储变更数据外,还会新增一些附加列用于存储元信息,示例如下。
说明 本案例中的业务字段为idnameaddress,由于在配置数据同步时选用的是旧版附加列规则,DTS会为业务字段添加dts_的前缀。
Topic定义

结构定义说明:

旧版附加列名称 新版附加列名称 数据类型 说明
dts_record_id new_dts_sync_dts_record_id String 增量日志的记录ID,为该日志唯一标识。
说明
  • 正常情况下是全局唯一自增的,在容灾的情况下会有回退且无法保证自增和唯一。
  • 如果增量日志的操作类型为UPDATE,那么增量更新会被拆分成两条记录(分别记录更新前和更新后的值),且dts_record_id的值相同。
dts_operation_flag new_dts_sync_dts_operation_flag String 操作类型,取值:
  • I:INSERT操作。
  • D:DELETE操作。
  • U:UPDATE操作。
dts_instance_id new_dts_sync_dts_instance_id String 数据库的server ID。暂不支持显示实际的值,目前固定为null
dts_db_name new_dts_sync_dts_db_name String 数据库名称。
dts_table_name new_dts_sync_dts_table_name String 表名。
dts_utc_timestamp new_dts_sync_dts_utc_timestamp String 操作时间戳,即binlog的时间戳(UTC 时间)。
dts_before_flag new_dts_sync_dts_before_flag String 所有列的值是否更新前的值,取值:Y或N。
dts_after_flag new_dts_sync_dts_after_flag String 所有列的值是否更新后的值,取值:Y或N。

关于dts_before_flag和dts_after_flag的补充说明

对于不同的操作类型,增量日志中的dts_before_flagdts_after_flag定义如下:

  • INSERT

    当操作类型为INSERT时,所有列的值为新插入的值,即为更新后的值,所以dts_before_flag取值为N,dts_after_flag取值为Y,示例如下。

    INSERT操作
  • UPDATE

    当操作类型为UPDATE时,DTS会将UPDATE操作拆为两条增量日志。这两条增量日志的dts_record_iddts_operation_flagdts_utc_timestamp对应的值相同。

    第一条增量日志记录了更新前的值,所以dts_before_flag取值为Y,dts_after_flag取值为N。第二条增量日志记录了更新后的值,所以 dts_before_flag取值为N,dts_after_flag取值为Y,示例如下。

    UPDATE操作
  • DELETE

    当操作类型为DELETE时,增量日志中所有的列值为被删除的值,即列值不变,所以dts_before_flag取值为Y, dts_after_flag取值为N,示例如下。

    DELETE操作