从PolarDB MySQL版同步至Datahub

重要

本文中含有需要您注意的重要提示信息,忽略该信息可能对您的业务造成影响,请务必仔细阅读。

阿里云流式数据服务DataHub是流式数据(Streaming Data)的处理平台,提供对流式数据的发布、订阅和分发功能,让您可以轻松构建基于流式数据的分析和应用。通过数据传输服务(Data Transmission Service,简称DTS),您可以将PolarDB MySQL同步至DataHub,帮助您快速实现使用流计算等大数据产品对数据实时分析。

前提条件

  • DataHub实例的地域为华东1、华东2、华北2或华南1。
  • DataHub实例中,已创建用作接收同步数据的项目(Project),详情请参见创建项目
  • PolarDB MySQL集群已开启Binlog,详情请参见如何开启Binlog
  • PolarDB MySQL中待同步的表需具备主键或唯一约束。

费用说明

同步类型链路配置费用
库表结构同步和全量数据同步不收费。
增量数据同步收费,详情请参见计费概述

功能限制

  • 不支持全量数据初始化,即DTS不会将源PolarDB MySQL集群中同步对象的存量数据同步至目标DataHub实例。
  • 仅支持表级别的数据同步。
  • 不支持新增列的数据同步,即源数据表新增了某个列,该列的数据不会同步至目标DataHub实例。
  • 数据同步的过程中,请勿对源库中待同步的表执行DDL变更,否则会导致同步失败。

支持同步的SQL操作

INSERT、UPDATE、DELETE。

操作步骤

  1. 购买数据同步作业,详情请参见购买流程
    说明 购买时,选择源实例为PolarDB、目标实例为DataHub,并选择同步拓扑为单向同步
  2. 登录数据传输控制台

    说明

    若数据传输控制台自动跳转至数据管理DMS控制台,您可以在右下角的jiqiren中单击返回旧版,返回至旧版数据传输控制台。

  3. 在左侧导航栏,单击数据同步

  4. 同步作业列表页面顶部,选择同步的目标实例所属地域。

  5. 定位至已购买的数据同步实例,单击配置同步链路

  6. 配置同步作业的源实例及目标实例信息。
    配置源和目标实例信息
    类别配置说明
    同步作业名称DTS会自动生成一个同步作业名称,建议配置具有业务意义的名称(无唯一性要求),便于后续识别。
    源实例信息实例类型固定为PolarDB实例,不可变更。
    实例地区购买数据同步实例时选择的源实例地域信息,不可变更。
    PolarDB实例ID选择源PolarDB MySQL集群ID。
    数据库账号填入PolarDB MySQL集群的数据库账号。
    数据库密码填入数据库账号对应的密码。
    目标实例信息实例类型固定为DataHub,不可变更。
    实例地区购买数据同步实例时选择的目标实例地域信息,不可变更。
    Project选择DataHub实例的Project
  7. 单击页面右下角的授权白名单并进入下一步

    如果源或目标数据库是阿里云数据库实例(例如RDS MySQL云数据库MongoDB等),DTS会自动将对应地区DTS服务的IP地址添加到阿里云数据库实例的白名单中;如果源或目标数据库是ECS上的自建数据库,DTS会自动将对应地区DTS服务的IP地址添到ECS的安全规则中,您还需确保自建数据库没有限制ECS的访问(若数据库是集群部署在多个ECS实例,您需要手动将DTS服务对应地区的IP地址添到其余每个ECS的安全规则中);如果源或目标数据库是IDC自建数据库或其他云数据库,则需要您手动添加对应地区DTS服务的IP地址,以允许来自DTS服务器的访问。DTS服务的IP地址,请参见DTS服务器的IP地址段

    警告

    DTS自动添加或您手动添加DTS服务的公网IP地址段可能会存在安全风险,一旦使用本产品代表您已理解和确认其中可能存在的安全风险,并且需要您做好基本的安全防护,包括但不限于加强账号密码强度防范、限制各网段开放的端口号、内部各API使用鉴权方式通信、定期检查并限制不需要的网段,或者使用通过内网(专线/VPN网关/智能网关)的方式接入。

  8. 配置同步策略和同步对象。
    配置同步对象

    配置

    说明

    同步初始化

    勾选结构初始化

    说明

    勾选结构初始化后,在数据同步作业的初始化阶段,DTS会将同步对象的结构信息(例如表结构)同步至目标DataHub实例。

    选择同步对象

    源库对象框中单击待迁移的对象,然后单击向右小箭头将其移动至已选择对象框。

    说明
    • 同步对象的选择粒度为表。

    • 默认情况下,同步对象的名称保持不变。如果您需要改变同步对象在目标实例中的名称,需要使用对象名映射功能,详情请参见设置同步对象在目标实例中的名称

    选择附加列规则

    DTS在将数据同步到DataHub时,会在同步的目标Topic中添加一些附加列。如果附加列和目标Topic中已有的列出现名称冲突将会导致数据同步失败。您需要根据业务需求选择是否启用新的附加列规则

    警告

    在选择附加列规则前,您需要评估附加列和目标Topic中已有的列是否会出现名称冲突,否则可能会导致任务失败或数据丢失。关于附加列的规则和定义说明,请参见附加列名称和定义说明

    映射名称更改

    如需更改同步对象在目标实例中的名称,请使用对象名映射功能,详情请参见库表列映射

    源表DMS_ONLINE_DDL过程中是否复制临时表到目标库

    如源库使用数据管理DMS(Data Management)执行Online DDL变更,您可以选择是否同步Online DDL变更产生的临时表数据。

    • :同步Online DDL变更产生的临时表数据。

      说明

      Online DDL变更产生的临时表数据过大,可能会导致同步任务延迟。

    • :不同步Online DDL变更产生的临时表数据,只同步源库的原始DDL数据。

      说明

      该方案会导致目标库锁表。

    源、目标库无法连接重试时间

    当源、目标库无法连接时,DTS默认重试720分钟(即12小时),您也可以自定义重试时间。如果DTS在设置的时间内重新连接上源、目标库,同步任务将自动恢复。否则,同步任务将失败。

    说明

    由于连接重试期间,DTS将收取任务运行费用,建议您根据业务需要自定义重试时间,或者在源和目标库实例释放后尽快释放DTS实例。

  9. 可选:将鼠标指针放置在已选择对象框中待同步的Topic名上,单击对象后出现的编辑,然后在弹出的对话框中设置Shardkey(即用于分区的key)。

  10. 上述配置完成后,单击页面右下角的预检查并启动

    说明
    • 在同步作业正式启动之前,会先进行预检查。只有预检查通过后,才能成功启动同步作业。

    • 如果预检查失败,单击具体检查项后的提示,查看失败详情。

      • 您可以根据提示修复后重新进行预检查。

      • 如无需修复告警检测项,您也可以选择确认屏蔽忽略告警项并重新进行预检查,跳过告警检测项重新进行预检查。

  11. 预检查对话框中显示预检查通过后,关闭预检查对话框,同步作业将正式开始。

  12. 等待同步作业的链路初始化完成,直至处于同步中状态。

    您可以在数据同步页面,查看数据同步作业的状态。查看同步作业状态

Topic结构定义说明

DTS在将数据变更同步至DataHub实例的Topic时,目标Topic中除了存储变更数据外,还会新增一些附加列用于存储元信息,示例如下。

说明

本案例中的业务字段为idnameaddress,由于在配置数据同步时选用的是旧版附加列规则,DTS会为业务字段(包含目标库中的源库原有的业字段)添加dts_的前缀。若您使用新版附加列规则,DTS不会给目标库中的源库原有业字段加前缀。

Topic定义

结构定义说明:

旧版附加列名称

新版附加列名称

数据类型

说明

dts_record_id

new_dts_sync_dts_record_id

String

增量日志的记录ID,为该日志唯一标识。

说明
  • 正常情况下是全局唯一自增的,在容灾的情况下会有回退且无法保证自增和唯一。

  • 如果增量日志的操作类型为UPDATE,那么增量更新会被拆分成两条记录(分别记录更新前和更新后的值),且dts_record_id的值相同。

dts_operation_flag

new_dts_sync_dts_operation_flag

String

操作类型,取值:

  • I:INSERT操作。

  • D:DELETE操作。

  • U:UPDATE操作。

dts_instance_id

new_dts_sync_dts_instance_id

String

数据库的server ID。

dts_db_name

new_dts_sync_dts_db_name

String

数据库名称。

dts_table_name

new_dts_sync_dts_table_name

String

表名。

dts_utc_timestamp

new_dts_sync_dts_utc_timestamp

String

操作时间戳,即日志的时间戳(UTC 时间)。

dts_before_flag

new_dts_sync_dts_before_flag

String

所有列的值是否更新前的值,取值:YN。

dts_after_flag

new_dts_sync_dts_after_flag

String

所有列的值是否更新后的值,取值:YN。

关于dts_before_flagdts_after_flag的补充说明

对于不同的操作类型,增量日志中的dts_before_flagdts_after_flag定义如下:

  • INSERT

    当操作类型为INSERT时,所有列的值为新插入的值,即为更新后的值,所以dts_before_flag取值为N,dts_after_flag取值为Y,示例如下。

    INSERT操作
  • UPDATE

    当操作类型为UPDATE时,DTS会将UPDATE操作拆为两条增量日志。这两条增量日志的dts_record_iddts_operation_flagdts_utc_timestamp对应的值相同。

    第一条增量日志记录了更新前的值,所以dts_before_flag取值为Y,dts_after_flag取值为N。第二条增量日志记录了更新后的值,所以 dts_before_flag取值为N,dts_after_flag取值为Y,示例如下。

    UPDATE操作
  • DELETE

    当操作类型为DELETE时,增量日志中所有的列值为被删除的值,即列值不变,所以dts_before_flag取值为Y, dts_after_flag取值为N,示例如下。

    DELETE操作

后续操作

配置完数据同步作业后,您可以对同步到DataHub实例中的数据执行计算分析。更多详情,请参见阿里云实时计算