阿里云流式数据服务DataHub是流式数据(Streaming Data)的处理平台,提供对流式数据的发布、订阅和分发功能,让您可以轻松构建基于流式数据的分析和应用。通过数据传输服务(Data Transmission Service,简称DTS),您可以将PolarDB MySQL版同步至DataHub,帮助您快速实现使用流计算等大数据产品对数据实时分析。

前提条件

  • DataHub实例的地域为华东1、华东2、华北2或华南1。
  • DataHub实例中,已创建用作接收同步数据的项目(Project),详情请参见创建项目
  • PolarDB MySQL版集群已开启Binlog,详情请参见如何开启Binlog
  • PolarDB MySQL版中待同步的表需具备主键或唯一约束。

费用说明

同步类型链路配置费用
库表结构同步和全量数据同步不收费。
增量数据同步收费,详情请参见计费概述

功能限制

  • 不支持全量数据初始化,即DTS不会将源PolarDB MySQL版集群中同步对象的存量数据同步至目标DataHub实例。
  • 仅支持表级别的数据同步。
  • 不支持新增列的数据同步,即源数据表新增了某个列,该列的数据不会同步至目标DataHub实例。
  • 数据同步的过程中,请勿对源库中待同步的表执行DDL变更,否则会导致同步失败。

支持同步的SQL操作

INSERT、UPDATE、DELETE。

操作步骤

  1. 购买数据同步作业,详情请参见购买流程
    说明 购买时,选择源实例为PolarDB、目标实例为DataHub,并选择同步拓扑为单向同步
  2. 登录数据传输控制台
    说明 若数据传输控制台自动跳转至数据管理DMS控制台,您可以在右下角的jiqiren中单击返回旧版,返回至旧版数据传输控制台。
  3. 在左侧导航栏,单击数据同步
  4. 同步作业列表页面顶部,选择同步的目标实例所属地域。
  5. 定位至已购买的数据同步实例,单击配置同步链路
  6. 配置同步作业的源实例及目标实例信息。
    配置源和目标实例信息
    类别配置说明
    同步作业名称DTS会自动生成一个同步作业名称,建议配置具有业务意义的名称(无唯一性要求),便于后续识别。
    源实例信息实例类型固定为PolarDB实例,不可变更。
    实例地区购买数据同步实例时选择的源实例地域信息,不可变更。
    PolarDB实例ID选择源PolarDB MySQL版集群ID。
    数据库账号填入PolarDB MySQL版集群的数据库账号。
    数据库密码填入数据库账号对应的密码。
    目标实例信息实例类型固定为DataHub,不可变更。
    实例地区购买数据同步实例时选择的目标实例地域信息,不可变更。
    Project选择DataHub实例的Project
  7. 单击页面右下角的授权白名单并进入下一步
    说明
    • 如果源或目标数据库是阿里云数据库实例(例如RDS MySQL云数据库MongoDB版等)或ECS上的自建数据库,DTS会自动将对应地区DTS服务的IP地址添加到阿里云数据库实例的白名单或ECS的安全规则中,您无需手动添加,请参见DTS服务器的IP地址段
    • DTS任务完成或释放后,建议您手动删除添加的DTS服务器IP地址段。
  8. 配置同步策略和同步对象。
    配置同步对象
    配置说明
    同步初始化勾选结构初始化
    说明 勾选结构初始化后,在数据同步作业的初始化阶段,DTS会将同步对象的结构信息(例如表结构)同步至目标DataHub实例。
    选择同步对象

    源库对象框中单击待迁移的对象,然后单击向右小箭头将其移动至已选择对象框。

    说明
    • 同步对象的选择粒度为表。
    • 默认情况下,同步对象的名称保持不变。如果您需要改变同步对象在目标实例中的名称,需要使用对象名映射功能,详情请参见设置同步对象在目标实例中的名称
    选择附加列规则DTS在将数据同步到DataHub时,会在同步的目标Topic中添加一些附加列。如果附加列和目标Topic中已有的列出现名称冲突将会导致数据同步失败。您需要根据业务需求选择是否启用新的附加列规则
    警告 在选择附加列规则前,您需要评估附加列和目标Topic中已有的列是否会出现名称冲突。关于附加列的规则和定义说明,请参见附加列名称和定义说明
    映射名称更改

    如需更改同步对象在目标实例中的名称,请使用对象名映射功能,详情请参见库表列映射

    源表DMS_ONLINE_DDL过程中是否复制临时表到目标库
    如源库使用数据管理DMS(Data Management Service)执行Online DDL变更,您可以选择是否同步Online DDL变更产生的临时表数据。
    • :同步Online DDL变更产生的临时表数据。
      说明 Online DDL变更产生的临时表数据过大,可能会导致同步任务延迟。
    • :不同步Online DDL变更产生的临时表数据,只同步源库的原始DDL数据。
      说明 该方案会导致目标库锁表。
    源、目标库无法连接重试时间
    当源、目标库无法连接时,DTS默认重试720分钟(即12小时),您也可以自定义重试时间。如果DTS在设置的时间内重新连接上源、目标库,同步任务将自动恢复。否则,同步任务将失败。
    说明 由于连接重试期间,DTS将收取任务运行费用,建议您根据业务需要自定义重试时间,或者在源和目标库实例释放后尽快释放DTS实例。
  9. 可选:将鼠标指针放置在已选择对象框中待同步的Topic名上,单击对象后出现的编辑,然后在弹出的对话框中设置Shardkey(即用于分区的key)。
  10. 上述配置完成后,单击页面右下角的预检查并启动
    说明
    • 在同步作业正式启动之前,会先进行预检查。只有预检查通过后,才能成功启动同步作业。
    • 如果预检查失败,单击具体检查项后的提示,查看失败详情。
      • 您可以根据提示修复后重新进行预检查。
      • 如无需修复告警检测项,您也可以选择确认屏蔽忽略告警项并重新进行预检查,跳过告警检测项重新进行预检查。
  11. 预检查对话框中显示预检查通过后,关闭预检查对话框,同步作业将正式开始。
  12. 等待同步作业的链路初始化完成,直至处于同步中状态。
    您可以在数据同步页面,查看数据同步作业的状态。查看同步作业状态

Topic结构定义说明

DTS在将数据变更同步至DataHub实例的Topic时,目标Topic中除了存储变更数据外,还会新增一些附加列用于存储元信息,示例如下。
说明 本案例中的业务字段为idnameaddress,由于在配置数据同步时选用的是旧版附加列规则,DTS会为业务字段添加dts_的前缀。
Topic定义

结构定义说明:

旧版附加列名称新版附加列名称数据类型说明
dts_record_idnew_dts_sync_dts_record_idString增量日志的记录ID,为该日志唯一标识。
说明
  • 正常情况下是全局唯一自增的,在容灾的情况下会有回退且无法保证自增和唯一。
  • 如果增量日志的操作类型为UPDATE,那么增量更新会被拆分成两条记录(分别记录更新前和更新后的值),且dts_record_id的值相同。
dts_operation_flagnew_dts_sync_dts_operation_flagString操作类型,取值:
  • I:INSERT操作。
  • D:DELETE操作。
  • U:UPDATE操作。
dts_instance_idnew_dts_sync_dts_instance_idString数据库的server ID。
dts_db_namenew_dts_sync_dts_db_nameString数据库名称。
dts_table_namenew_dts_sync_dts_table_nameString表名。
dts_utc_timestampnew_dts_sync_dts_utc_timestampString操作时间戳,即日志的时间戳(UTC 时间)。
dts_before_flagnew_dts_sync_dts_before_flagString所有列的值是否更新前的值,取值:Y或N。
dts_after_flagnew_dts_sync_dts_after_flagString所有列的值是否更新后的值,取值:Y或N。

关于dts_before_flag和dts_after_flag的补充说明

对于不同的操作类型,增量日志中的dts_before_flagdts_after_flag定义如下:

  • INSERT

    当操作类型为INSERT时,所有列的值为新插入的值,即为更新后的值,所以dts_before_flag取值为N,dts_after_flag取值为Y,示例如下。

    INSERT操作
  • UPDATE

    当操作类型为UPDATE时,DTS会将UPDATE操作拆为两条增量日志。这两条增量日志的dts_record_iddts_operation_flagdts_utc_timestamp对应的值相同。

    第一条增量日志记录了更新前的值,所以dts_before_flag取值为Y,dts_after_flag取值为N。第二条增量日志记录了更新后的值,所以 dts_before_flag取值为N,dts_after_flag取值为Y,示例如下。

    UPDATE操作
  • DELETE

    当操作类型为DELETE时,增量日志中所有的列值为被删除的值,即列值不变,所以dts_before_flag取值为Y, dts_after_flag取值为N,示例如下。

    DELETE操作

后续操作

配置完数据同步作业后,您可以对同步到DataHub实例中的数据执行计算分析。更多详情,请参见阿里云实时计算