本文中含有需要您注意的重要提示信息,忽略该信息可能对您的业务造成影响,请务必仔细阅读。
通过数据传输服务DTS(Data Transmission Service),您可以将自建MySQL同步至云消息队列 Kafka 版,扩展消息处理能力。
前提条件
您已完成以下操作:
背景信息
云消息队列 Kafka 版是阿里云提供的分布式、高吞吐、可扩展的消息队列服务,针对开源的Apache Kafka提供全托管服务,彻底解决开源产品长期以来的痛点,您只需专注于业务开发,无需部署运维。云消息队列 Kafka 版广泛用于日志收集、监控数据聚合、流式数据处理、在线和离线分析等大数据领域,已成为大数据生态中不可或缺的部分。
注意事项
DTS在执行全量数据初始化时将占用源库和目标库一定的读写资源,可能会导致数据库的负载上升,在数据库性能较差、规格较低或业务量较大的情况下(例如源库有大量慢SQL、存在无主键表或目标库存在死锁等),可能会加重数据库压力,甚至导致数据库服务不可用。因此您需要在执行数据同步前评估源库和目标库的性能,同时建议您在业务低峰期执行数据同步(例如源库和目标库的CPU负载在30%以下)。
- 如果源数据库没有主键或唯一约束,且所有字段没有唯一性,可能会导致目标数据库中出现重复数据。
功能限制
同步对象仅支持数据表,不支持非数据表的对象。
不支持自动调整同步对象,如果对同步对象中的数据表进行重命名操作,且重命名后的名称不在同步对象中,那么这部分数据将不再同步到目标云消息队列 Kafka 版集群中。
支持同步的SQL操作
数据传输服务DTS支持同步的SQL操作包括INSERT、UPDATE、DELETE、REPLACE。
消息格式
同步到云消息队列 Kafka 版集群中的数据以avro格式存储,schema定义详情请参见DTS avro schema定义。
数据同步到云消息队列 Kafka 版集群后,您需要根据avro schema定义进行数据解析。
费用说明
数据传输服务DTS费用,请参见产品定价。
准备工作
操作步骤
进入目标地域的同步任务列表页面(二选一)。
通过DTS控制台进入
登录数据传输服务DTS控制台。
在左侧导航栏,单击数据同步。
在页面左上角,选择同步实例所属地域。
通过DMS控制台进入
说明实际操作可能会因DMS的模式和布局不同,而有所差异。更多信息,请参见极简模式控制台和自定义DMS界面布局与样式。
登录DMS数据管理服务。
在顶部菜单栏中,选择
。在同步任务右侧,选择同步实例所属地域。
单击创建任务,进入任务配置页面。
配置源库及目标库信息。
类别
配置
说明
无
任务名称
DTS会自动生成一个任务名称,建议配置具有业务意义的名称(无唯一性要求),便于后续识别。
源库信息
数据库类型
选择MySQL。
接入方式
请根据源库的部署位置进行选择,本示例选择ECS自建数据库。
说明自建数据库为源时,您还需要执行相应的准备工作。
实例地区
选择源MySQL数据库所属地域。
是否跨阿里云账号
本示例为同一阿里云账号间的同步,选择不跨账号。
ECS实例ID
选择源MySQL数据库所属ECS实例ID。
端口
填入源MySQL数据库的服务端口(需开放至公网),默认为3306。
数据库账号
填入源MySQL数据库的账号,权限要求请参见数据库账号的权限要求。
数据库密码
填入该数据库账号对应的密码。
连接方式
请根据实际情况选择非加密连接或SSL安全连接。
若自建MySQL未开启SSL加密,请选择非加密连接。
若自建MySQL已开启SSL加密,请选择SSL安全连接。同时,您还需要上传CA 证书并填写CA 密钥。
目标库信息
数据库类型
选择Kafka。
接入方式
选择云实例。
实例地区
选择目标阿里云消息队列Kafka版实例所属地域。
Kafka实例ID
选择目标阿里云消息队列Kafka版实例的ID。
连接方式
根据业务及安全需求,选择非加密连接或SCRAM-SHA-256。
Topic
在下拉框中选择接收数据的Topic。
是否使用Kafka Schema Registry
Kafka Schema Registry是元数据提供服务层,提供了一个RESTful接口,用于存储和检索Avro Schema。
否:不使用Kafka Schema Registry。
是:使用Kafka Schema Registry。您需要输入Avro Schema在Kafka Schema Registry注册的URL或IP。
配置完成后,在页面下方单击测试连接以进行下一步。
说明请确保DTS服务的IP地址段能够被自动或手动添加至源库和目标库的安全设置中,以允许DTS服务器的访问。更多信息,请参见添加DTS服务器的IP地址段。
若源库或目标库为自建数据库(接入方式不是云实例),则还需要在弹出的DTS服务器访问授权对话框单击测试连接。
配置任务对象。
在对象配置页面,配置待迁移的对象。
配置
说明
迁移类型
如果只需要进行全量迁移,请同时选中库表结构迁移和全量迁移。
如果需要进行不停机迁移,请同时选中库表结构迁移、全量迁移和增量迁移。
说明目标Kafka实例的接入方式为云实例时,不支持库表结构迁移。
如果未选择增量迁移,为保障数据一致性,数据迁移期间请勿在源实例中写入新的数据。
目标已存在表的处理模式
预检查并报错拦截:检查目标数据库中是否有同名的表。如果目标数据库中没有同名的表,则通过该检查项目;如果目标数据库中有同名的表,则在预检查阶段提示错误,数据迁移任务不会被启动。
说明如果目标库中同名的表不方便删除或重命名,您可以更改该表在目标库中的名称,请参见库表列名映射。
忽略报错并继续执行:跳过目标数据库中是否有同名表的检查项。
警告选择为忽略报错并继续执行,可能导致数据不一致,给业务带来风险,例如:
表结构一致的情况下,在目标库遇到与源库主键的值相同的记录:
全量期间,DTS会保留目标集群中的该条记录,即源库中的该条记录不会迁移至目标数据库中。
增量期间,DTS不会保留目标集群中的该条记录,即源库中的该条记录会覆盖至目标数据库中。
表结构不一致的情况下,可能导致只能迁移部分列的数据或迁移失败,请谨慎操作。
投递到Kafka的数据格式
根据需求选择迁移到Kafka实例中的数据存储格式。
如果您选择DTS Avro,需要根据DTS Avro的Schema定义进行数据解析,详情请参见DTS Avro的Schema定义和DTS Avro的反序列化示例。
如果您选择Canal JSON,Canal Json的参数说明和示例请参见Canal Json说明。
如果您选择Shareplex JSON,Shareplex Json的参数说明和示例请参见Shareplex Json。
Kafka压缩格式
根据需求选择Kafka压缩消息的压缩格式。
LZ4(默认):压缩率较低,压缩速率较高。
GZIP:压缩率较高,压缩速率较低。
说明对CPU的消耗较高。
Snappy:压缩率中等,压缩速率中等。
投递到Kafka Partition策略
根据业务需求选择策略。
消息确认机制
根据业务需求选择消息确认机制。
存储DDL的Topic
在下拉框中选择用于存储DDL信息的Topic。
说明若未选择,DDL信息默认存储在接收数据的Topic中。
目标库对象名称大小写策略
您可以配置目标实例中迁移对象的库名、表名和列名的英文大小写策略。默认情况下选择DTS默认策略,您也可以选择与源库、目标库默认策略保持一致。更多信息,请参见目标库对象名称大小写策略。
源库对象
在源库对象框中单击待迁移的对象,然后单击
将其移动到已选择对象框。
说明迁移对象选择粒度为表。
已选择对象
本示例无需额外配置。您可以使用映射功能,设置源表在目标Kafka实例中的Topic名称、Topic的Partition数量、Partition Key等信息。更多信息,请参见映射信息。
说明如果使用了对象名映射功能,可能会导致依赖这个对象的其他对象迁移失败。
如需选择增量迁移的SQL操作,请在已选择对象中右击待迁移对象,并在弹出的对话框中选择所需增量迁移的SQL操作。
单击下一步高级配置,进行高级参数配置。
配置
说明
选择调度该任务的专属集群
DTS默认将任务调度到共享集群上,您无需选择。若您希望任务更加稳定,可以购买专属集群来运行DTS迁移任务。
源库、目标库无法连接后的重试时间
在迁移任务启动后,若源库或目标库连接失败则DTS会报错,并会立即进行持续的重试连接,默认重试720分钟,您也可以在取值范围(10~1440分钟)内自定义重试时间,建议设置30分钟以上。如果DTS在设置的时间内重新连接上源、目标库,迁移任务将自动恢复。否则,迁移任务将失败。
说明针对同源或者同目标的多个DTS实例,网络重试时间以后创建任务的设置为准。
由于连接重试期间,DTS将收取任务运行费用,建议您根据业务需要自定义重试时间,或者在源和目标库实例释放后尽快释放DTS实例。
源库、目标库出现其他问题后的重试时间
在迁移任务启动后,若源库或目标库出现非连接性的其他问题(如DDL或DML执行异常),则DTS会报错并会立即进行持续的重试操作,默认持续重试时间为10分钟,您也可以在取值范围(1~1440分钟)内自定义重试时间,建议设置10分钟以上。如果DTS在设置的重试时间内相关操作执行成功,迁移任务将自动恢复。否则,迁移任务将会失败。
重要源库、目标库出现其他问题后的重试时间的值需要小于源库、目标库无法连接后的重试时间的值。
是否限制全量迁移速率
在全量迁移阶段,DTS将占用源库和目标库一定的读写资源,可能会导致数据库的负载上升。您可以根据实际情况,选择是否对全量迁移任务进行限速设置(设置每秒查询源库的速率QPS、每秒全量迁移的行数RPS和每秒全量迁移的数据量(MB)BPS),以缓解目标库的压力。
说明仅当迁移类型选择了全量迁移,才有此配置项。
您也可以在迁移实例运行后,调整全量迁移的速率。
是否限制增量迁移速率
您也可以根据实际情况,选择是否对增量迁移任务进行限速设置(设置每秒增量迁移的行数RPS和每秒增量迁移的数据量(MB)BPS),以缓解目标库的压力。
说明仅当迁移类型选择了增量迁移,才有此配置项。
您也可以在迁移实例运行后,调整增量迁移的速率。
是否去除正反向任务的心跳表sql
根据业务需求选择是否在DTS实例运行时,在源库中写入心跳SQL信息。
是:不在源库中写入心跳SQL信息,DTS实例可能会显示有延迟。
否:在源库中写入心跳SQL信息,可能会影响源库的物理备份和克隆等功能。
环境标签
您可以根据实际情况,选择用于标识实例的环境标签。本示例无需选择。
配置ETL功能
监控告警
根据业务需求选择是否设置告警并接收告警通知。
不设置:不设置告警。
设置:设置告警。您还需要设置告警阈值和告警联系人,当迁移失败或延迟超过阈值后,系统将进行告警通知。
保存任务并进行预检查。
若您需要查看调用API接口配置该实例时的参数信息,请将鼠标光标移动至下一步保存任务并预检查按钮上,然后单击气泡中的预览OpenAPI参数。
若您无需查看或已完成查看API参数,请单击页面下方的下一步保存任务并预检查。
说明在迁移任务正式启动之前,会先进行预检查。只有预检查通过后,才能成功启动迁移任务。
如果预检查失败,请单击失败检查项后的查看详情,并根据提示修复后重新进行预检查。
如果预检查产生警告:
对于不可以忽略的检查项,请单击失败检查项后的查看详情,并根据提示修复后重新进行预检查。
对于可以忽略无需修复的检查项,您可以依次单击点击确认告警详情、确认屏蔽、确定、重新进行预检查,跳过告警检查项重新进行预检查。如果选择屏蔽告警检查项,可能会导致数据不一致等问题,给业务带来风险。
购买实例。
预检查通过率显示为100%时,单击下一步购买。
在购买页面,选择数据迁移实例的链路规格,详细说明请参见下表。
类别
参数
说明
信息配置
资源组配置
选择实例所属的资源组,默认为default resource group。更多信息,请参见什么是资源管理。
链路规格
DTS为您提供了不同性能的迁移规格,迁移链路规格的不同会影响迁移速率,您可以根据业务场景进行选择。更多信息,请参见数据迁移链路规格说明。
配置完成后,阅读并选中《数据传输(按量付费)服务条款》。
单击购买并启动,并在弹出的确认对话框,单击确定。
您可以在迁移任务列表页面,查看迁移实例的具体进度。
说明若迁移实例不包含增量迁移任务,则迁移实例会自动结束。迁移实例自动结束后,运行状态为已完成。
若迁移实例包含增量迁移任务,则迁移实例不会自动结束,增量迁移任务会持续进行。在增量迁移任务正常运行期间,迁移实例的运行状态为运行中。
映射信息
在已选择对象区域框中,将鼠标指针放置在目标Topic名(表级别)上。
单击目标Topic名后出现的编辑。
在弹出的编辑表对话框中,配置映射信息。
说明库级别是编辑Schema对话框,且支持配置的参数少;表级别是编辑表对话框。
若迁移对象的粒度不是整库,则不支持修改编辑Schema对话框的目标Topic名称和设置新建Topic的Partition数量。
配置
说明
目标Topic名称
源表迁移到的目标Topic名称,默认为源库及目标库配置阶段在目标库信息选择的Topic。
重要目标库为阿里云消息队列Kafka版实例时,填写的Topic名称必须在目标Kafka实例中真实存在,否则将会导致数据迁移失败。目标库为自建Kafka数据库,且迁移实例包含库表结构任务时,DTS会尝试在目标库中创建您填写的Topic。
若您修改了目标Topic名称,数据将会被写入到您填写的Topic中。
过滤条件
详情请参见设置过滤条件。
设置新建Topic的Partition数量
数据写入到目标Topic时的分区数。
Partition Key
当投递到Kafka Partition策略选择为按主键的hash值投递到不同Partition时,您可以配置本参数,指定单个或多个列作为Partition Key来计算Hash值,DTS将根据计算得到的Hash值将不同的行投递到目标Topic的各Partition中。
说明仅支持在编辑表对话框勾选Partition Key。
单击确定。