阿里云流式数据服务DataHub是流式数据(Streaming Data)的处理平台,提供对流式数据的发布、订阅和分发功能,让您可以轻松构建基于流式数据的分析和应用。通过数据传输服务(Data Transmission Service,简称DTS),您可以将POLARDB MySQL同步至DataHub,帮助您快速实现使用流计算等大数据产品对数据实时分析。

前提条件

  • DataHub实例的地域为华东1、华东2、华北2或华南1。
  • DataHub实例中,已创建用作接收同步数据的Project,详情请参见创建Project
  • 已开启POLARDB MySQL的Binlog,详情请参见如何开启Binlog
  • POLARDB MySQL中待同步的表需具备主键或唯一约束。

功能限制

  • 不支持全量数据初始化,即DTS不会将源RDS实例中同步对象的存量数据同步至目标DataHub实例中。
  • 仅支持表级别的数据同步。
  • 不支持新增列的数据同步,即源数据表新增了某个列,该列的数据不会同步至目标DataHub实例中。
  • 数据同步的过程中,请勿对源库中待同步的表执行DDL变更,否则会导致同步失败。

支持同步的SQL操作

INSERT、UPDATE、DELETE。

操作步骤

  1. 购买数据同步作业,详情请参见购买流程
    说明 购买时,选择源实例为POLARDB、目标实例为DataHub,并选择同步拓扑为单向同步
  2. 登录数据传输控制台
  3. 在左侧导航栏,单击数据同步
  4. 同步作业列表页面顶部,选择数据同步实例所属地域。
    选择地域
  5. 定位至已购买的数据同步实例,单击配置同步链路
  6. 配置同步作业的源实例及目标实例信息。
    配置源和目标实例信息
    配置项目 配置选项 配置说明
    同步作业名称 - DTS会自动生成一个同步作业名称,建议配置具有业务意义的名称(无唯一性要求),便于后续识别。
    源实例信息 实例类型 固定为POLARDB,不可变更。
    实例地区 购买数据同步实例时选择的源实例地域信息,不可变更。
    POLARDB实例ID 选择源POLARDB集群ID。
    数据库账号 填入POLARDB集群的数据库账号。
    数据库密码 填入数据库账号对应的密码。
    目标实例信息 实例类型 固定为DataHub,不可变更。
    实例地区 购买数据同步实例时选择的目标实例地域信息,不可变更。
    Project 选择DataHub实例的Project
  7. 单击页面右下角的授权白名单并进入下一步
    说明 此步骤会将DTS服务器的IP地址自动添加到POLARDB集群的白名单中,用于保障DTS服务器能够正常连接POLARDB集群。
  8. 配置同步策略和同步对象。
    配置同步策略和同步对象
    配置 说明
    同步初始化 勾选结构初始化
    说明 勾选结构初始化后,在数据同步作业的初始化阶段,DTS会将同步对象的结构信息(例如表结构)同步至目标DataHub实例。
    选择同步对象

    源库对象框中单击待迁移的对象,然后单击向右小箭头将其移动至已选择对象框。

    说明
    • 同步对象的选择粒度为表。
    • 默认情况下,同步对象的名称保持不变。如果您需要改变同步对象在目标实例中的名称,需要使用DTS提供的对象名映射功能,详情请参见设置同步对象在目标实例中的名称
    选择附加列规则 DTS在将数据同步到DataHub时,会在同步的目标Topic中添加一些附加列。如果附加列和目标Topic中已有的列出现名称冲突将会导致数据同步失败。您需要根据业务需求选择是否启用新的附加列规则
    警告 在选择附加列规则前,您需要评估附加列和目标表中已有的列是否会出现名称冲突。关于附加列的规则和定义说明,请参见表 1
  9. 可选: 将鼠标指针放置在已选择对象框中待同步的表名上,单击对象后出现的编辑,然后在弹出的对话框中设置Shardkey(即用于分区的key)。
  10. 上述配置完成后,单击页面右下角的预检查并启动
    说明
    • 在数据同步作业正式启动之前,会先进行预检查。只有预检查通过后,才能成功启动数据同步作业。
    • 如果预检查失败,单击具体检查项后的提示,查看失败详情。根据提示修复后,重新进行预检查。
  11. 预检查对话框中显示预检查通过后,关闭预检查对话框,同步作业将正式开始。
  12. 等待同步作业的链路初始化完成,直至处于同步中状态。
    您可以在 数据同步页面,查看数据同步作业的状态。查看同步作业状态

Topic结构定义说明

DTS在将数据变更同步至DataHub实例的Topic时,目标Topic中除了存储变更数据外,还会新增一些附加列用于存储元信息,示例如下。
说明 本案例中的业务字段为idnameaddress,由于在配置数据同步时选用的是旧版附加列规则,DTS会为业务字段添加dts_的前缀。
Topic定义

结构定义说明:

旧版附加列名称 新版附加列名称 说明
dts_record_id new_dts_sync_dts_record_id 增量日志的记录id,为该日志唯一标识。
说明
  • id的值唯一且递增。
  • 如果增量日志的操作类型为UPDATE,那么增量更新会被拆分成两条记录,一条为DELETE,一条为INSERT,并且这两条记录的dts_record_id的值相同。
dts_operation_flag new_dts_sync_dts_operation_flag 操作类型,取值:
  • I:INSERT操作。
  • D:DELETE操作。
  • U:UPDATE操作。
dts_instance_id new_dts_sync_dts_instance_id 数据库的server id。
dts_db_name new_dts_sync_dts_db_name 数据库名称。
dts_table_name new_dts_sync_dts_table_name 表名。
dts_utc_timestamp new_dts_sync_dts_utc_timestamp 操作时间戳,即binlog的时间戳(UTC 时间)。
dts_before_flag new_dts_sync_dts_before_flag 所有列的值是否更新前的值,取值:Y或N。
dts_after_flag new_dts_sync_dts_after_flag 所有列的值是否更新后的值,取值:Y或N。

关于dts_before_flag和dts_after_flag的补充说明

对于不同的操作类型,增量日志中的dts_before_flagdts_after_flag定义如下:

  • INSERT

    当操作类型为INSERT时,所有列的值为新插入的值,即为更新后的值,所以dts_before_flag取值为N,dts_after_flag取值为Y,示例如下。

    INSERT操作
  • UPDATE

    当操作类型为UPDATE时,DTS会将UPDATE操作拆为两条增量日志。这两条增量日志的dts_record_iddts_operation_flagdts_utc_timestamp对应的值相同。

    第一条增量日志记录了更新前的值,所以dts_before_flag取值为Y,dts_after_flag取值为N。第二条增量日志记录了更新后的值,所以 dts_before_flag取值为N,dts_after_flag取值为Y,示例如下。

    UPDATE操作
  • DELETE

    当操作类型为DELETE时,增量日志中所有的列值为被删除的值,即列值不变,所以dts_before_flag取值为Y, dts_after_flag取值为N,示例如下。

    DELETE操作

后续操作

配置完数据同步作业后,您可以对同步到DataHub实例中的数据执行计算分析。更多详情,请参见阿里云实时计算