从RDS MySQL同步至DataHub

重要

本文中含有需要您注意的重要提示信息,忽略该信息可能对您的业务造成影响,请务必仔细阅读。

阿里云流式数据服务DataHub是流式数据(Streaming Data)的处理平台,提供对流式数据的发布、订阅和分发功能,让您可以轻松构建基于流式数据的分析和应用。通过数据传输服务DTS(Data Transmission Service),您可以将RDS MySQL或RDS MySQL Serverless同步至DataHub,帮助您快速实现使用流计算等大数据产品对数据实时分析。

前提条件

  • DataHub实例的地域为华东1、华东2、华北2或华南1。

  • DataHub实例中,已创建用作接收同步数据的项目(Project),详情请参见创建项目

  • RDS MySQL或RDS MySQL Serverless中待同步的表需具备主键或唯一约束。

费用说明

同步类型链路配置费用
库表结构同步和全量数据同步不收费。
增量数据同步收费,详情请参见计费概述

功能限制

  • 不支持全量数据初始化,即DTS不会将源RDS实例中同步对象的存量数据同步至目标DataHub实例。

  • 仅支持表级别的数据同步。

  • 数据同步的过程中,请勿对源库中待同步的表执行DDL变更,否则会导致同步失败。

支持同步的SQL操作

操作类型

SQL操作语句

DML

INSERT、UPDATE、DELETE

DDL

ADD COLUMN

操作步骤

  1. 购买数据同步作业,详情请参见购买流程

    说明

    购买时,选择源实例为MySQL、目标实例为DataHub,并选择同步拓扑为单向同步

  2. 登录数据传输控制台

    说明

    若数据传输控制台自动跳转至数据管理DMS控制台,您可以在右下角的jiqiren中单击返回旧版,返回至旧版数据传输控制台。

  3. 在左侧导航栏,单击数据同步

  4. 同步作业列表页面顶部,选择同步的目标实例所属地域。

  5. 定位至已购买的数据同步实例,单击配置同步链路

  6. 配置同步作业的源实例及目标实例信息。

    DataHub同步源目信息配置

    类别

    配置

    说明

    同步作业名称

    DTS会自动生成一个同步作业名称,建议配置具有业务意义的名称(无唯一性要求),便于后续识别。

    源实例信息

    实例类型

    根据源库的部署位置进行选择,本文以RDS实例为例介绍配置流程。

    说明

    如果源库为自建MySQL数据库,您还需要执行相应的准备工作,详情请参见准备工作概览

    实例地区

    购买数据同步实例时选择的源实例地域信息,不可变更。

    实例ID

    选择作为数据同步源的RDS实例ID。

    数据库账号

    填入源RDS的数据库账号。

    说明

    当源RDS实例的数据库类型为MySQL 5.5MySQL 5.6时,无需配置数据库账号数据库密码

    数据库密码

    填入数据库账号对应的密码。

    连接方式

    根据需求选择非加密连接SSL安全连接。如果设置为SSL安全连接,您需要提前开启RDS实例的SSL加密功能,详情请参见设置SSL加密

    目标实例信息

    实例类型

    固定为DataHub,不可变更。

    实例地区

    购买数据同步实例时选择的目标实例地域信息,不可变更。

    Project

    选择DataHub实例的Project

  7. 单击页面右下角的授权白名单并进入下一步

    如果源或目标数据库是阿里云数据库实例(例如RDS MySQL云数据库MongoDB版等),DTS会自动将对应地区DTS服务的IP地址添加到阿里云数据库实例的白名单中;如果源或目标数据库是ECS上的自建数据库,DTS会自动将对应地区DTS服务的IP地址添到ECS的安全规则中,您还需确保自建数据库没有限制ECS的访问(若数据库是集群部署在多个ECS实例,您需要手动将DTS服务对应地区的IP地址添到其余每个ECS的安全规则中);如果源或目标数据库是IDC自建数据库或其他云数据库,则需要您手动添加对应地区DTS服务的IP地址,以允许来自DTS服务器的访问。DTS服务的IP地址,请参见DTS服务器的IP地址段

    警告

    DTS自动添加或您手动添加DTS服务的公网IP地址段可能会存在安全风险,一旦使用本产品代表您已理解和确认其中可能存在的安全风险,并且需要您做好基本的安全防护,包括但不限于加强账号密码强度防范、限制各网段开放的端口号、内部各API使用鉴权方式通信、定期检查并限制不需要的网段,或者使用通过内网(专线/VPN网关/智能网关)的方式接入。

  8. 配置同步策略和同步对象。

    配置同步对象

    配置

    说明

    同步初始化

    勾选结构初始化

    说明

    勾选结构初始化后,在数据同步作业的初始化阶段,DTS会将同步对象的结构信息(例如表结构)同步至目标DataHub实例。

    选择同步对象

    源库对象框中单击待迁移的对象,然后单击向右小箭头将其移动至已选择对象框。

    说明
    • 同步对象的选择粒度为表。

    • 默认情况下,同步对象的名称保持不变。如果您需要改变同步对象在目标实例中的名称,需要使用对象名映射功能,详情请参见设置同步对象在目标实例中的名称

    选择附加列规则

    DTS在将数据同步到DataHub时,会在同步的目标Topic中添加一些附加列。如果附加列和目标Topic中已有的列出现名称冲突将会导致数据同步失败。您需要根据业务需求选择是否启用新的附加列规则

    警告

    在选择附加列规则前,您需要评估附加列和目标Topic中已有的列是否会出现名称冲突,否则可能会导致任务失败或数据丢失。关于附加列的规则和定义说明,请参见附加列名称和定义说明

    映射名称更改

    如需更改同步对象在目标实例中的名称,请使用对象名映射功能,详情请参见库表列映射

    源表DMS_ONLINE_DDL过程中是否复制临时表到目标库

    如源库使用数据管理DMS(Data Management)执行Online DDL变更,您可以选择是否同步Online DDL变更产生的临时表数据。

    • :同步Online DDL变更产生的临时表数据。

      说明

      Online DDL变更产生的临时表数据过大,可能会导致同步任务延迟。

    • :不同步Online DDL变更产生的临时表数据,只同步源库的原始DDL数据。

      说明

      该方案会导致目标库锁表。

    源、目标库无法连接重试时间

    当源、目标库无法连接时,DTS默认重试720分钟(即12小时),您也可以自定义重试时间。如果DTS在设置的时间内重新连接上源、目标库,同步任务将自动恢复。否则,同步任务将失败。

    说明

    由于连接重试期间,DTS将收取任务运行费用,建议您根据业务需要自定义重试时间,或者在源和目标库实例释放后尽快释放DTS实例。

  9. 可选:将鼠标指针放置在已选择对象框中待同步的Topic名上,单击对象后出现的编辑,然后在弹出的对话框中设置Shardkey(即用于分区的key)。

  10. 上述配置完成后,单击页面右下角的预检查并启动

    说明
    • 在同步作业正式启动之前,会先进行预检查。只有预检查通过后,才能成功启动同步作业。

    • 如果预检查失败,单击具体检查项后的提示,查看失败详情。

      • 您可以根据提示修复后重新进行预检查。

      • 如无需修复告警检测项,您也可以选择确认屏蔽忽略告警项并重新进行预检查,跳过告警检测项重新进行预检查。

  11. 预检查对话框中显示预检查通过后,关闭预检查对话框,同步作业将正式开始。

  12. 等待同步作业的链路初始化完成,直至处于同步中状态。

    您可以在数据同步页面,查看数据同步作业的状态。查看同步作业状态

Topic结构定义说明

DTS在将数据变更同步至DataHub实例的Topic时,目标Topic中除了存储变更数据外,还会新增一些附加列用于存储元信息,示例如下。

说明

本案例中的业务字段为idnameaddress,由于在配置数据同步时选用的是旧版附加列规则,DTS会为业务字段(包含目标库中的源库原有的业字段)添加dts_的前缀。若您使用新版附加列规则,DTS不会给目标库中的源库原有业字段加前缀。

Topic定义

结构定义说明:

旧版附加列名称

新版附加列名称

数据类型

说明

dts_record_id

new_dts_sync_dts_record_id

String

增量日志的记录ID,为该日志唯一标识。

说明
  • 正常情况下是全局唯一自增的,在容灾的情况下会有回退且无法保证自增和唯一。

  • 如果增量日志的操作类型为UPDATE,那么增量更新会被拆分成两条记录(分别记录更新前和更新后的值),且dts_record_id的值相同。

dts_operation_flag

new_dts_sync_dts_operation_flag

String

操作类型,取值:

  • I:INSERT操作。

  • D:DELETE操作。

  • U:UPDATE操作。

  • F:全量数据同步。

dts_instance_id

new_dts_sync_dts_instance_id

String

数据库的server ID。

dts_db_name

new_dts_sync_dts_db_name

String

数据库名称。

dts_table_name

new_dts_sync_dts_table_name

String

表名。

dts_utc_timestamp

new_dts_sync_dts_utc_timestamp

String

操作时间戳,即日志的时间戳(UTC 时间)。

dts_before_flag

new_dts_sync_dts_before_flag

String

所有列的值是否更新前的值,取值:Y或N。

dts_after_flag

new_dts_sync_dts_after_flag

String

所有列的值是否更新后的值,取值:Y或N。

关于dts_before_flag和dts_after_flag的补充说明

对于不同的操作类型,增量日志中的dts_before_flagdts_after_flag定义如下:

  • INSERT

    当操作类型为INSERT时,所有列的值为新插入的值,即为更新后的值,所以dts_before_flag取值为N,dts_after_flag取值为Y,示例如下。

    INSERT操作

  • UPDATE

    当操作类型为UPDATE时,DTS会将UPDATE操作拆为两条增量日志。这两条增量日志的dts_record_iddts_operation_flagdts_utc_timestamp对应的值相同。

    第一条增量日志记录了更新前的值,所以dts_before_flag取值为Y,dts_after_flag取值为N。第二条增量日志记录了更新后的值,所以 dts_before_flag取值为N,dts_after_flag取值为Y,示例如下。

    UPDATE操作

  • DELETE

    当操作类型为DELETE时,增量日志中所有的列值为被删除的值,即列值不变,所以dts_before_flag取值为Y, dts_after_flag取值为N,示例如下。

    DELETE操作

后续操作

配置完数据同步作业后,您可以对同步到DataHub实例中的数据执行计算分析。更多详情,请参见阿里云实时计算