Kafka是应用较为广泛的分布式、高吞吐量、高可扩展性消息队列服务,普遍用于日志收集、监控数据聚合、流式数据处理、在线和离线分析等大数据领域,是大数据生态中不可或缺的产品之一。通过数据传输服务DTS(Data Transmission Service),您可以将RDS MySQL的数据库同步至自建Kafka集群,扩展消息处理能力。

前提条件

  • Kafka集群的版本为0.10或1.0版本。
  • 源RDS实例的数据库类型为MySQL
  • 确保数据同步的源RDS实例已存在,如不存在请创建RDS实例

注意事项

  • DTS在执行全量数据初始化时将占用源库和目标库一定的读写资源,可能会导致数据库的负载上升,在数据库性能较差、规格较低或业务量较大的情况下(例如源库有大量慢SQL、存在无主键表或目标库存在死锁等),可能会加重数据库压力,甚至导致数据库服务不可用。因此您需要在执行数据同步前评估源库和目标库的性能,同时建议您在业务低峰期执行数据同步(例如源库和目标库的CPU负载在30%以下)。
  • 如果同步对象为单个或多个表(非整库),那么在数据同步时,请勿对源库的同步对象使用gh-ost或pt-online-schema-change等类似工具执行在线DDL变更,否则会导致同步失败。
    说明 为避免同步失败,您可使用DMS企业版提供的相关功能来执行在线DDL变更,详情请参见不锁表结构变更
  • 如果源数据库没有主键或唯一约束,且所有字段没有唯一性,可能会导致目标数据库中出现重复数据。

数据同步功能限制

  • 同步对象仅支持数据表,不支持非数据表的对象。
  • 不支持DDL操作的同步。
  • 不支持自动调整同步对象,如果对同步对象中的数据表进行重命名操作,且重命名后的名称不在同步对象中,那么这部分数据将不再同步到到目标Kafka集群中。如需将修改后的数据表继续数据同步至目标Kafka集群中,您需要进行修改同步对象操作,详情请参见修改同步对象

支持同步的SQL操作

DML操作:INSERT、UPDATE、DELETE、REPLACE。

支持的同步架构

  • 1对1单向同步。
  • 1对多单向同步。
  • 多对1单向同步。
  • 级联同步。

消息格式

同步到Kafka集群中的数据以avro格式存储,schema定义详情请参见DTS avro schema定义

在数据同步到Kafka集群后,您需要根据avro schema定义进行数据解析。

费用说明

详情请参见产品定价

操作步骤

  1. 购买数据同步作业,详情请参见购买流程
    说明 购买时,选择源实例为MySQL、目标实例为Kafka,并选择同步拓扑为单向同步
  2. 登录数据传输服务DTS控制台
  3. 在左侧导航栏,单击数据同步
  4. 定位至已购买的数据同步实例,单击配置同步链路
  5. 配置同步通道的源实例及目标实例信息。同步通道的源和目标实例配置
    配置项目 配置选项 配置说明
    任务名称 -
    • DTS为每个任务自动生成一个任务名称,任务名称没有唯一性要求。
    • 您可以根据需要修改任务名称,建议为任务配置具有业务意义的名称,便于后续的任务识别。
    源实例信息 实例类型 选择RDS实例
    实例地区 购买数据同步实例时选择的源实例地域信息,不可变更。
    实例ID 选择源RDS实例的实例ID。
    数据库账号 填入连接源RDS实例数据库的账号,需要具备 Replication slave、 Replication client 及所有同步对象的 Select 权限。
    数据库密码 填入连接源RDS实例数据库账号对应的密码。
    连接方式 根据需求选择非加密连接SSL安全连接,本案例选择为非加密连接
    说明 选择SSL安全连接时,需要提前开启RDS实例的SSL加密功能,详情请参见设置SSL加密
    目标实例信息 实例类型
    • Kafka集群部署在ECS上时,选择ECS上的自建数据库
    • Kafka集群部署在本地服务器时,选择通过专线/VPN网关/智能网关接入的自建数据库
      说明 选择通过专线/VPN网关/智能网关接入的自建数据库时,您需要配置VPC IDIP地址端口
    实例地区 购买数据同步实例时选择的目标实例地域信息,不可变更。
    ECS实例ID 选择部署了Kafka集群的ECS实例ID。
    数据库类型 选择为Kafka
    端口 Kafka集群对外提供服务的端口,默认为9092。
    数据库账号 填入Kafka集群的用户名,如Kafka集群未开启验证可不填写。
    数据库密码 填入Kafka集群用户名对应的密码,如Kafka集群未开启验证可不填写。
    Topic
    1. 单击击右侧的获取Topic列表
    2. 下拉选择具体的Topic名称。
    Kafka版本 根据目标Kafka集群版本,选择对应的版本信息。
  6. 单击页面右下角的授权白名单并进入下一步
  7. 配置同步对象信息。配置同步对象
    配置 说明
    同步对象 源库对象区域框中,选择需要同步的对象(仅支持选择数据表),然后单击向右箭头将其移动到已选对象区域框中。
    说明 DTS会自动将表名映射为配置同步的源和目标实例信息时选择的Topic名称。如果需要更换同步的目标Topic(更换后的Topic需是Kafka实例中真实存在的),您可以将鼠标指针放置在要进行名称映射的表上,并单击出现的编辑进行调整。
  8. 上述配置完成后单击页面右下角的下一步
  9. 配置同步初始化的高级配置信息。配置同步初始化
    说明
    • 此步骤会将源实例中已经存在同步对象的结构及数据在目标实例中初始化,作为后续增量同步数据的基线数据。
    • 同步初始化类型细分为:结构初始化,全量数据初始化。默认情况下,需要选择结构初始化全量数据初始化
  10. 上述配置完成后,单击页面右下角的预检查并启动
    说明
    • 在数据同步任务正式启动之前,会先进行预检查。只有预检查通过后,才能成功启动数据同步任务。
    • 如果预检查失败,单击具体检查项后的提示,查看具体的失败详情。根据失败原因修复后,重新进行预检查。
  11. 预检查对话框中显示预检查通过后,关闭预检查对话框,数据同步任务正式开始。
    您可以在数据同步页面,查看数据同步状态。查看数据同步状态