从自建MySQL同步至阿里云消息队列Kafka版

重要

本文中含有需要您注意的重要提示信息,忽略该信息可能对您的业务造成影响,请务必仔细阅读。

通过数据传输服务DTS(Data Transmission Service),您可以将自建MySQL同步至消息队列Kafka版,扩展消息处理能力。

前提条件

  • 自建MySQL的数据库版本为5.1、5.5、5.6、5.7或8.0版本。
  • 目标Kafka实例的版本为0.10.1.0-2.x版本。
  • 目标Kafka实例中已创建用于接收同步数据的Topic,详情请参见创建Topic

背景信息

消息队列Kafka版是阿里云提供的分布式、高吞吐、可扩展的消息队列服务,针对开源的Apache Kafka提供全托管服务,彻底解决开源产品长期以来的痛点,您只需专注于业务开发,无需部署运维。消息队列Kafka版广泛用于日志收集、监控数据聚合、流式数据处理、在线和离线分析等大数据领域,已成为大数据生态中不可或缺的部分。

注意事项

  • DTS在执行全量数据初始化时将占用源库和目标库一定的读写资源,可能会导致数据库的负载上升,在数据库性能较差、规格较低或业务量较大的情况下(例如源库有大量慢SQL、存在无主键表或目标库存在死锁等),可能会加重数据库压力,甚至导致数据库服务不可用。因此您需要在执行数据同步前评估源库和目标库的性能,同时建议您在业务低峰期执行数据同步(例如源库和目标库的CPU负载在30%以下)。

  • 如果源数据库没有主键或唯一约束,且所有字段没有唯一性,可能会导致目标数据库中出现重复数据。

费用说明

同步类型链路配置费用
库表结构同步和全量数据同步不收费。
增量数据同步收费,详情请参见计费概述

功能限制

  • 同步对象仅支持数据表,不支持非数据表的对象。
  • 不支持自动调整同步对象,如果对同步对象中的数据表进行重命名操作,且重命名后的名称不在同步对象中,那么这部分数据将不再同步到目标Kafka集群中。如需将修改后的数据表继续数据同步至目标Kafka集群中,您需要进行修改同步对象操作,详情请参见新增同步对象

准备工作

为自建MySQL创建账号并设置binlog

操作步骤

  1. 购买数据同步作业,详情请参见购买流程
    说明 购买时,选择源实例为MySQL,目标实例为Kafka,选择同步拓扑为单向同步
  2. 登录数据传输控制台

    说明

    若数据传输控制台自动跳转至数据管理DMS控制台,您可以在右下角的jiqiren中单击返回旧版,返回至旧版数据传输控制台。

  3. 在左侧导航栏,单击数据同步

  4. 同步作业列表页面顶部,选择同步的目标实例所属地域。

  5. 定位至已购买的数据同步实例,单击配置同步链路

  6. 配置同步作业的源实例及目标实例信息。
    配置源和目标实例信息
    类别配置说明
    同步作业名称DTS会自动生成一个同步作业名称,建议配置具有业务意义的名称(无唯一性要求),便于后续识别。
    源实例信息实例类型根据源库部署位置选择,本文以ECS上的自建数据库为例介绍配置流程,其他类型的配置流程与该案例类似。
    实例地区购买数据同步实例时选择的源实例地域信息,不可变更。
    ECS实例ID选择自建MySQL所属的ECS实例ID。
    数据库类型固定为MySQL,不可变更。
    端口填入自建MySQL的数据库服务端口。
    数据库账号填入自建MySQL的数据库账号,该账号需具备REPLICATION CLIENT、REPLICATION SLAVE、SHOW VIEW和所有同步对象的SELECT权限。
    数据库密码填入该数据库账号对应的密码。
    目标实例信息实例类型选择通过专线/VPN网关/智能接入网关接入的自建数据库
    说明 由于DTS暂时不支持直接选择消息队列Kafka版,此处将其作为自建Kafka来配置数据同步。
    实例地区购买数据同步实例时选择的目标实例地域信息,不可变更。
    对端专有网络选择目标Kafka实例所属的专有网络ID。您可以在Kafka实例的基本信息页面中查看到专有网络ID。
    数据库类型选择为Kafka
    IP地址填入Kafka实例默认接入点中的任意一个IP地址。
    说明 您可以在Kafka实例的基本信息页面中,获取默认接入点对应的IP地址。
    端口Kafka实例的服务端口,默认为9092。
    数据库账号填入Kafka实例的用户名。
    说明 如果Kafka实例的实例类型为VPC实例,无需配置数据库账号数据库密码
    数据库密码填入该用户名对应的密码。
    Topic单击右侧的获取Topic列表,然后在下拉框中选择具体的Topic。
    Kafka版本根据Kafka实例版本,选择对应的版本信息。
    连接方式根据业务及安全需求,选择非加密连接SCRAM-SHA-256
  7. 单击页面右下角的授权白名单并进入下一步

    如果源或目标数据库是阿里云数据库实例(例如RDS MySQL云数据库MongoDB版等),DTS会自动将对应地区DTS服务的IP地址添加到阿里云数据库实例的白名单中;如果源或目标数据库是ECS上的自建数据库,DTS会自动将对应地区DTS服务的IP地址添到ECS的安全规则中,您还需确保自建数据库没有限制ECS的访问(若数据库是集群部署在多个ECS实例,您需要手动将DTS服务对应地区的IP地址添到其余每个ECS的安全规则中);如果源或目标数据库是IDC自建数据库或其他云数据库,则需要您手动添加对应地区DTS服务的IP地址,以允许来自DTS服务器的访问。DTS服务的IP地址,请参见DTS服务器的IP地址段

    警告

    DTS自动添加或您手动添加DTS服务的公网IP地址段可能会存在安全风险,一旦使用本产品代表您已理解和确认其中可能存在的安全风险,并且需要您做好基本的安全防护,包括但不限于加强账号密码强度防范、限制各网段开放的端口号、内部各API使用鉴权方式通信、定期检查并限制不需要的网段,或者使用通过内网(专线/VPN网关/智能网关)的方式接入。

  8. 配置同步策略和同步对象信息。
    配置同步对象
    配置说明
    投递到kafka的数据格式同步到Kafka集群中的数据以avro格式或者Canal Json格式存储,定义详情请参见Kafka集群的数据存储格式
    同步到Kafka Partition策略根据业务需求选择同步的策略,详细介绍请参见Kafka Partition同步策略说明
    同步对象源库对象区域框中,选择需要同步的对象(选择的粒度为表),然后单击向右箭头图标将其移动到已选对象区域框中。
    说明 DTS会自动将表名映射为步骤6选择的Topic名称。如果需要更换同步的目标Topic,请参见步骤9
    映射名称更改

    如需更改同步对象在目标实例中的名称,请使用对象名映射功能,详情请参见库表列映射

    源、目标库无法连接重试时间

    当源、目标库无法连接时,DTS默认重试720分钟(即12小时),您也可以自定义重试时间。如果DTS在设置的时间内重新连接上源、目标库,同步任务将自动恢复。否则,同步任务将失败。

    说明

    由于连接重试期间,DTS将收取任务运行费用,建议您根据业务需要自定义重试时间,或者在源和目标库实例释放后尽快释放DTS实例。

  9. 可选:已选择对象区域框中,将鼠标指针放置在目标Topic名上,然后单击Topic名后出现的编辑,在弹出的对话框中设置源表在目标Kafka实例中的Topic名称、Topic的Partition数量、Partition Key等信息。
    设置topic
    配置说明
    数据库表名称设置源表同步到的目标Topic名称。
    警告 设置的Topic名称必须在目标Kafka实例中真实存在,否则将导致数据同步失败。
    过滤条件
    • 过滤条件支持标准的SQL WHERE语句(仅支持=!=<>操作符),只有满足WHERE条件的数据才会被同步到目标Topic。本案例填入id>1000
    • 过滤条件中如需使用引号,请使用单引号('),例如address in('hangzhou','shanghai')
    设置新Topic的Partition数量本场景中,目标Kafka为消息队列Kafka实例,暂不支持该功能,无需配置本参数。
    设置Partition Key当您在步骤8中选择同步策略为按主键的hash值投递到不同Partition时,您可以配置本参数,指定单个或多个列作为Partition Key来计算Hash值,DTS将根据计算得到的Hash值将不同的行投递到目标Topic的各Partition中。
  10. 上述配置完成后单击页面右下角的下一步
  11. 配置同步初始化的高级配置信息。
    Kafka同步初始化高级配置
    配置说明
    同步初始化默认选择结构初始化全量数据初始化,DTS会在增量数据同步之前,将源库中待同步对象的结构和存量数据,同步到目标库。
    过滤选项默认选择忽略增量同步阶段的 DDL,即增量同步阶段源库执行的DDL操作不会被DTS同步至目标库。
  12. 上述配置完成后,单击页面右下角的预检查并启动

    说明
    • 在同步作业正式启动之前,会先进行预检查。只有预检查通过后,才能成功启动同步作业。

    • 如果预检查失败,单击具体检查项后的提示,查看失败详情。

      • 您可以根据提示修复后重新进行预检查。

      • 如无需修复告警检测项,您也可以选择确认屏蔽忽略告警项并重新进行预检查,跳过告警检测项重新进行预检查。

  13. 预检查对话框中显示预检查通过后,关闭预检查对话框,数据同步作业正式开始。
    您可以在数据同步页面,查看数据同步状态。查看数据同步状态