本文介绍如何使用数据传输服务DTS(Data Transmission Service),将PolarDB MySQL版集群同步至阿里云消息队列Kafka版,扩展消息处理能力。
前提条件
- 已创建源PolarDB MySQL版集群,详情请参见购买按量付费集群和购买包年包月集群。
- 已创建目标阿里云消息队列Kafka版实例,
- 目标实例阿里云消息队列Kafka版中已创建用于接收同步数据的Topic,请参见步骤一:创建Topic。
注意事项
类型 | 说明 |
---|---|
源库限制 |
|
其他限制 |
|
其他注意事项 | DTS会在源库定时执CREATE DATABASE IF NOT EXISTS `test`命令以推进Binlog位点。 |
单条记录大小限制
由于写入Kafka单条记录的大小是10MB,因此当源端一行数据超过10MB时,DTS由于无法成功写入Kafka会导致任务中断。在该场景下建议您不要同步该表,如果一定要同步,也只能同步部分列,即配置DTS任务时,过滤掉这些大字段的记录。如果已经是在同步中的任务,则需要修改同步对象,将该表移出,再次点击修改同步对象,加入该表,并将该表的大字段列过滤,不做同步。
支持的同步架构
- 一对一单向同步。
- 一对多单向同步。
- 多对一单向同步。
- 级联单向同步。
支持同步的SQL操作
操作类型 | SQL操作语句 |
---|---|
DML | INSERT、UPDATE、DELETE |
DDL |
|
数据库账号的权限要求
数据库 | 权限要求 |
---|---|
源PolarDB MySQL版集群 | 待同步对象的读权限。 |
操作步骤
- 进入同步任务的列表页面。
- 登录DMS数据管理服务。
- 在顶部菜单栏中,单击集成与开发(DTS)。
- 在左侧导航栏,选择 。
说明 您也可以登录新版DTS同步任务的列表页面。 - 在同步任务右侧,选择同步实例所属地域。说明 新版DTS同步任务列表页面,需要在页面左上角选择同步实例所属地域。
- 单击创建任务,配置源库及目标库信息。警告 选择源和目标实例后,建议您仔细阅读页面上方显示的使用限制,否则可能会导致任务失败或数据不一致。
类别 配置 说明 无 任务名称 DTS会自动生成一个任务名称,建议配置具有业务意义的名称(无唯一性要求),便于后续识别。
源库信息 选择已有的DMS数据库实例 您可以按实际需求,选择是否使用已有实例。- 如使用已有实例,下方数据库信息将自动填入,您无需重复输入。
- 如不使用已有实例,您需要输入下方的数据库信息。
数据库类型 选择PolarDB for MySQL。 接入方式 选择云实例。 实例地区 选择源PolarDB MySQL版集群所属地域。 是否跨阿里云账号 本场景为同一阿里云账号间迁移,选择不跨账号。 PolarDB实例ID 选择源PolarDB MySQL版集群ID。 数据库账号 填入源PolarDB MySQL版集群的数据库账号,权限要求请参见数据库账号的权限要求。 数据库密码 填入该数据库账号对应的密码。
连接方式 根据需求选择非加密连接或SSL安全连接。如果设置为SSL安全连接,您需要提前开启RDS MySQL实例的SSL加密功能,详情请参见设置SSL加密。
目标库信息 选择已有的DMS数据库实例 您可以按实际需求,选择是否使用已有实例。- 如使用已有实例,下方数据库信息将自动填入,您无需重复输入。
- 如不使用已有实例,您需要输入下方的数据库信息。
数据库类型 选择Kafka。 接入方式 选择专线/VPN网关/智能网关。 说明 由于DTS暂时不支持直接选择阿里云消息队列Kafka版,此处将其作为自建Kafka来配置数据同步。实例地区 选择目标阿里云消息队列Kafka版实例所属地域。 已和目标端数据库联通的VPC 选择目标阿里云消息队列Kafka版实例所属的专有网络ID。您可以在Kafka实例的基本信息页面中查看到专有网络ID。 主机名或IP地址 填入阿里云消息队列Kafka版实例默认接入点中的任意一个IP地址。 说明 您可以在阿里云消息队列Kafka版实例的基本信息页面中,获取默认接入点对应的IP地址。端口 阿里云消息队列Kafka版实例的服务端口,默认为9092。 数据库账号 填入目标阿里云消息队列Kafka版实例的数据库账号。 说明 如果阿里云消息队列Kafka版实例的实例类型为VPC实例,无需配置数据库账号和数据库密码。数据库密码 填入该数据库账号对应的密码。
Kafka版本 根据Kafka实例版本,选择对应的版本信息。 连接方式 根据业务及安全需求,选择非加密连接或SCRAM-SHA-256。 Topic 在下拉框中选择具体的Topic。 存储DDL的Topic 在下拉框中选择具体的Topic,用于存储DDL信息。如果未指定,DDL信息默认存储在Topic选择的Topic中。 是否使用Kafka Schema Registry Kafka Schema Registry是元数据提供服务层,提供了一个RESTful接口,用于存储和检索Avro Schema。 - 否:不使用Kafka Schema Registry。
- 是:使用Kafka Schema Registry。您需要输入Avro Schema在Kafka Schema Registry注册的URL或IP。
- 如果您的自建数据库具备白名单安全设置,您需要复制弹跳框中的DTS服务器IP地址,并加入自建数据库的白名单安全设置中。然后单击测试连接以进行下一步。说明 DTS服务器IP地址的更多说明,请参见迁移、同步或订阅本地数据库时需添加的IP白名单。
- 配置任务对象及高级配置。
配置 说明 同步类型 固定选中增量同步。默认情况下,您还需要同时选中库表结构同步和全量同步。预检查完成后,DTS会将源实例中待同步对象的全量数据在目标集群中初始化,作为后续增量同步数据的基线数据。
目标已存在表的处理模式 预检查并报错拦截:检查目标数据库中是否有同名的表。如果目标数据库中没有同名的表,则通过该检查项目;如果目标数据库中有同名的表,则在预检查阶段提示错误,数据同步任务不会被启动。
说明 如果目标库中同名的表不方便删除或重命名,您可以更改该表在目标库中的名称,请参见库表列名映射 。- 忽略报错并继续执行:跳过目标数据库中是否有同名表的检查项。 警告 选择为忽略报错并继续执行,可能导致数据不一致,给业务带来风险,例如:
- 表结构一致的情况下,如在目标库遇到与源库主键的值相同的记录:
- 全量期间,DTS会保留目标集群中的该条记录,即源库中的该条记录不会同步至目标数据库中。
- 增量期间,DTS不会保留目标集群中的该条记录,即源库中的该条记录会覆盖至目标数据库中。
- 表结构不一致的情况下,可能会导致无法初始化数据、只能同步部分列的数据或同步失败,请谨慎操作。
- 表结构一致的情况下,如在目标库遇到与源库主键的值相同的记录:
投递到Kafka的数据格式 根据需求选择同步到Kafka实例中的数据存储格式。 - 如果您选择DTS Avro,根据DTS Avro的schema定义进行数据解析,schema定义详情请参见DTS Avro的schema定义。
- 如果您选择Canal Json,Canal Json的参数说明和示例请参见Canal Json说明。
投递到Kafka Partition策略 根据业务需求选择同步的策略,详细介绍请参见Kafka Partition迁移策略说明。 目标库对象名称大小写策略 您可以配置目标实例中同步对象的库名、表名和列名的英文大小写策略。默认情况下选择DTS默认策略,您也可以选择与源库、目标库默认策略保持一致。更多信息,请参见目标库对象名称大小写策略。
源库对象 在源库对象框中单击待同步对象,然后单击
将其移动至已选择对象框。
说明 同步对象的选择粒度为表。已选择对象 - 如需更改单个同步对象在目标实例中的名称,请右击已选择对象中的同步对象,设置方式,请参见库表列名单个映射。
- 如需批量更改同步对象在目标实例中的名称,请单击已选择对象方框右上方的批量编辑,设置方式,请参见库表列名批量映射。
说明- 如需按库或表级别选择同步的SQL操作,请在已选择对象中右击待同步对象,并在弹出的对话框中选择所需同步的SQL操作。支持的操作请参见支持同步的SQL操作。
- 如需设置WHERE条件过滤数据,请在已选择对象中右击待同步的表,在弹出的对话框中设置过滤条件。设置方法请参见通过SQL条件过滤任务数据。
- 单击下一步高级配置,进行高级配置。
配置 说明 设置告警 是否设置告警,当同步失败或延迟超过阈值后,将通知告警联系人。- 不设置:不设置告警。
- 设置:设置告警,您还需要设置告警阈值和告警联系人。更多信息,请参见在配置任务过程中配置监控报警。
源库、目标库无法连接后的重试时间 在同步任务启动后,若源库或目标库连接失败则DTS会报错,并会立即进行持续的重试连接,默认持续重试时间为720分钟,您也可以在取值范围(10~1440分钟)内自定义重试时间,建议设置30分钟以上。如果DTS在设置的重试时间内重新连接上源库、目标库,同步任务将自动恢复。否则,同步任务将会失败。说明- 针对同源或者同目标的多个DTS实例,如DTS实例A和DTS实例B,设置网络重试时间时A设置30分钟,B设置60分钟,则重试时间以低的30分钟为准。
- 由于连接重试期间,DTS将收取任务运行费用,建议您根据业务需要自定义重试时间,或者在源和目标库实例释放后尽快释放DTS实例。
配置ETL功能 选择是否配置ETL功能。关于ETL的更多信息,请参见什么是ETL。- 是:配置ETL功能,并在文本框中填写数据处理语句,详情请参见在DTS迁移或同步任务中配置ETL。
- 否:不配置ETL功能。
是否去除正反向任务的心跳表sql 根据业务需求选择是否在DTS实例运行时,在源库中写入心跳SQL信息。- 是:不在源库中写入心跳SQL信息,DTS实例可能会显示有延迟。
- 否:在源库中写入心跳SQL信息,可能会影响源库的物理备份和克隆等功能。
- 可选:在已选择对象区域框中,将鼠标指针放置在目标Topic名上,然后右击Topic名后出现编辑,在弹出的对话框中设置源表在目标Kafka实例中的Topic名称、Topic的Partition数量、Partition Key等信息。
配置 说明 表名称 设置源表同步到的目标Topic名称。 警告 设置的Topic名称必须在目标Kafka实例中真实存在,否则将导致数据同步失败。过滤条件 - 过滤条件支持标准的SQL WHERE语句(仅支持
=
、!=
、<
和>
操作符),只有满足WHERE条件的数据才会被同步到目标Topic。本案例填入p_id>1
。 - 过滤条件中如需使用引号,请使用单引号('),例如
address in('hangzhou','shanghai')
。
设置新Topic的Partition数量 本场景中,目标Kafka为消息队列Kafka实例,暂不支持该功能,无需配置本参数。 设置Partition Key 当您在步骤5中选择同步策略为按主键的hash值投递到不同Partition时,您可以配置本参数,指定单个或多个列作为Partition Key来计算Hash值,DTS将根据计算得到的Hash值将不同的行投递到目标Topic的各Partition中。 - 过滤条件支持标准的SQL WHERE语句(仅支持
- 上述配置完成后,单击页面下方的下一步保存任务并预检查。
您可以将鼠标光标移动至下一步保存任务并预检查按钮上,然后单击气泡中的预览OpenAPI调用,查看该实例使用API接口配置时的参数信息。
说明- 在同步作业正式启动之前,会先进行预检查。只有预检查通过后,才能成功启动同步作业。
- 如果预检查失败,请单击失败检查项后的查看详情,并根据提示修复后重新进行预检查。
- 如果预检查产生警告:
- 对于不可以忽略的检查项,请单击失败检查项后的查看详情,并根据提示修复后重新进行预检查。
- 对于可以忽略无需修复的检查项,您可以依次单击点击确认告警详情、确认屏蔽、确定、重新进行预检查,跳过告警检查项重新进行预检查。如果选择屏蔽告警检查项,可能会导致数据不一致等问题,给业务带来风险。
- 预检查通过率显示为100%时,单击下一步购买。
- 在购买页面,选择数据同步实例的计费方式、链路规格,详细说明请参见下表。
类别 参数 说明 信息配置 链路规格 DTS为您提供了不同性能的同步规格,同步链路规格的不同会影响同步速率,您可以根据业务场景进行选择,详情请参见数据同步链路规格说明。 - 配置完成后,阅读并勾选《数据传输(按量付费)服务条款》。
- 单击购买并启动,同步任务正式开始,您可在数据同步界面查看具体任务进度。