本文中含有需要您注意的重要提示信息,忽略该信息可能对您的业务造成影响,请务必仔细阅读。
如果您需要将关系型数据库中的数据实时同步到阿里云Elasticsearch Serverless(简称ES Serverless)中进行搜索分析,可通过数据传输服务DTS快速创建源库到目标库ES Serverless的实时同步作业,适用于对实时同步要求较高的同步场景。本文以源库为RDS MySQL为例,介绍全量和增量同步数据到ES Serverless的方法。
背景信息
数据传输服务DTS是一种集数据迁移、数据订阅和数据实时同步于一体的数据传输服务。同时,支持同步
Insert
、Delete
和Update
等SQL操作。说明支持同步到ES Serverless的源库包括MySQL、PolarDB for MySQL、PolarDB-X 1.0、PolarDB-X 2.0。
支持同步的数据源版本请参见同步方案概览。
通过在DTS中配置从源(MySQL)同步到目标(ES Serverless),可实现全量、增量数据同步。适用于对实时同步要求较高的关系型数据库的数据同步场景,或需要将关系型数据库中的全量或增量数据同步至ES Serverless的场景。
使用限制
将RDS MySQL同步至ES Serverless,源库MySQL的相关限制要求,请参见RDS MySQL同步至Elasticsearch。
注意事项
同步数据需要您自行控制DTS数据同步速率,否则可能出现同步任务执行失败的情况。
DTS不支持同步DDL操作。若源库中待同步的表在同步过程中已执行DDL操作,您需先移除同步对象,然后在ES Serverless应用中移除该表对应的索引,最后新增同步对象。
若源库中待同步的表需执行增加列操作,您需先在Serverless应用中修改对应表的Mapping,然后在源库中执行相应DDL操作,最后暂停并启动DTS增量数据同步任务。
DTS在执行全量数据初始化时将占用源库和目标库一定的读写资源,可能会导致数据库的负载上升。在数据库性能较差、规格较低或业务量较大的情况下(例如,源库有大量慢SQL、存在无主键表或目标库存在死锁等),可能会加重数据库压力,甚至导致数据库服务不可用。因此,您需在执行数据同步前评估源库和目标库的性能,同时,建议您在业务低峰期执行数据同步(例如,源库和目标库的CPU负载在
30%
以下)。在业务高峰期全量同步数据,可能造成全量数据同步失败,重启全量同步任务即可。
在业务高峰期增量同步数据,可能出现数据同步延迟的情况。
操作步骤
步骤一:准备待同步数据
文本以将RDS MySQL 8.0版本实例中的数据同步到阿里云ES Serverless应用中为例。
在数据库
test_mysql
中,新建表es_test
并插入数据。使用的建表语句及数据如下:-- create table CREATE TABLE `es_test` ( `id` bigint(32) NOT NULL, `name` varchar(32) NULL, `age` bigint(32) NULL, `hobby` varchar(32) NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARACTER SET=utf8; -- insert data INSERT INTO `es_test` (`id`,`name`,`age`,`hobby`) VALUES (1,'user1',22,'music'); INSERT INTO `es_test` (`id`,`name`,`age`,`hobby`) VALUES (2,'user2',23,'sport'); INSERT INTO `es_test` (`id`,`name`,`age`,`hobby`) VALUES (3,'user3',43,'game'); INSERT INTO `es_test` (`id`,`name`,`age`,`hobby`) VALUES (4,'user4',24,'run'); INSERT INTO `es_test` (`id`,`name`,`age`,`hobby`) VALUES (5,'user5',42,'basketball');
步骤二:创建同步任务
进入同步任务创建页面。
在顶部菜单栏,选择目标应用所在地域。
在左侧导航栏,单击应用管理。
在应用列表中单击应用名称。
在左侧导航栏,单击数据迁移与同步。
单击前往同步。
在数据同步页签,单击创建同步任务。
在配置源库及目标库向导页面,配置源库及目标库。
重要参数说明如下:
类别
参数
描述
源库信息
数据库类型
选择源数据库的数据库类型。本文示例选择MySQL。
接入方式
接入方式支持:
云实例
专线/VPN网关/智能网关
ECS自建数据库
云企业网CEN
数据库网关DG
若您的源数据库为RDS MySQL,接入方式选择云实例。
实例地区
选择源RDS MySQL实例所在地域。
RDS实例ID
选择源RDS MySQL实例。
数据库账号
输入源RDS MySQL实例的数据库账号。
数据库密码
输入数据库账号对应的密码。
连接方式
根据需求选择非加密连接或SSL安全连接。若设置为SSL安全连接,需提前开启RDS MySQL实例的SSL加密功能。
目标库信息
数据库账号
Serverless应用的用户名,在应用详情页面的基本信息区域查看、复制。
数据库密码
Serverless应用的用户密码,创建应用时输入的密码。如果您忘记了密码,可以在应用详情页面的基本信息区域,单击用户密码后的修改,修改应用密码。
单击测试连接以进行下一步,在对象配置向导页面,配置任务对象及高级配置。
对象配置
参数
描述
同步类型
固定选中增量同步。默认情况下,您还需要同时选中库表结构同步和全量同步。预检查完成后,DTS会将源实例中待同步对象的全量数据在目标集群中初始化,作为后续增量同步数据的基线数据。
索引名称
表名:在目标ES Serverless应用中创建的索引名称和表名一致。
库名_表名:在目标ES Serverless应用中创建的索引名称由库名、下划线(_)和表名按顺序拼接而成。
说明索引名称映射配置会在所有表中生效。
目标已存在表的处理模式
预检查并报错拦截:检查目标数据库中是否有同名的表。如果目标数据库中没有同名的表,则通过该检查项目;如果目标数据库中有同名的表,则在预检查阶段提示错误,数据同步任务不会被启动。
说明如果目标库中同名的表不方便删除或重命名,您可以更改该表在目标库中的名称,请参见库表列名映射。
忽略报错并继续执行:跳过目标数据库中是否有同名表的检查项。
警告选择为忽略报错并继续执行,可能导致数据不一致,给业务带来风险,例如:
表结构一致的情况下,如在目标库遇到与源库主键或唯一键的值相同的记录:
全量期间,DTS会保留目标集群中的该条记录,即源库中的该条记录不会同步至目标数据库中。
增量期间,DTS不会保留目标集群中的该条记录,即源库中的该条记录会覆盖至目标数据库中。
表结构不一致的情况下,可能会导致无法初始化数据、只能同步部分列的数据或同步失败,请谨慎操作。
目标库对象名称大小写策略
您可以配置目标实例中同步对象的库名、表名和列名的英文大小写策略。默认情况下选择DTS默认策略,您也可以选择与源库、目标库默认策略保持一致。更多信息,请参见目标库对象名称大小写策略。
源库对象
在源库对象框中单击待同步对象,然后单击
将其移动至已选择对象框。
说明同步对象选择的粒度为库、表。若选择的同步对象为表,其他对象(如视图、触发器、存储过程)不会被同步至目标库。
已选择对象
如果您需要修改同步后的字段名称,可在已选择对象区域框中,右键单击对应的表名,设置该表在目标ES Serverless中的索引名称、Type名称等信息,然后单击确认。更多信息,请参见库表列名单个映射。
说明您可设置SQL过滤条件,过滤待同步的数据,只有满足过滤条件的数据才会被同步到目标实例。
高级配置
单击高级配置,可按需设置同步任务的同步速率限制、失败后重试时长、告警策略等高级配置。部分参数说明如下表,全量参数说明请参见高级配置。
参数
描述
是否限制全量同步速率
全量同步时,DTS将占用源库和目标库的部分读写资源,可能导致数据库的负载上升。您可按需选择是否对全量同步任务进行同步速率限制,以缓解目标库的压力。支持设限的配置如下:
每秒查询源库的速率QPS:每秒对源库进行数据查询的次数。
每秒全量迁移的行数RPS:每秒同步到目标端的行数。
每秒全量迁移的数据量(MB)BPS:每秒同步到目标端的数据量,单位为MB。建议配置为
10MB
。说明同步数据时,ES Serverless的写入吞吐量受CU(计算单元)配额限制。当每秒迁移数据量超过当前CU所允许的阈值时,系统将触发写入限流,可能导致同步任务失败。每秒迁移的数据量推荐配置为
10MB
,能确保同步任务的成功率及稳定性。若需提升同步性能,可在评估CU资源承载能力后适当提高迁移速率。若出现同步失败,可参考常见问题处理。
是否限制增量同步速率
您可按需选择是否对增量同步任务进行同步速率限制,以缓解目标库的压力。支持设限的配置如下:
每秒增量同步的行数RPS:每秒同步到目标端的行数。
每秒增量同步的数据量(MB)BPS):每秒同步到目标端的数据量,单位为MB。建议配置为
10MB
。说明同步数据时,ES Serverless的写入吞吐量受CU(计算单元)配额限制。当每秒迁移数据量超过当前CU所允许的阈值时,系统将触发写入限流,可能导致同步任务失败。每秒迁移的数据量推荐配置为
10MB
,能确保同步任务的成功率及稳定性。若需提升同步性能,可在评估CU资源承载能力后适当提高迁移速率。若出现同步失败,可参考常见问题处理。
分片配置
可设置主分片及分片副本数,默认主分片数为
3
,分片副本数为1
。ES Serverless默认单个分片的存储用量上限为20GB
,同时,分片数也与吞吐量相关,默认单个分片的吞吐量上限为20MB/s
,您可根据具体存储量及同步任务的吞吐量调整主分片数。
单击下一步配置库表字段,在库表列配置向导页面,配置库表字段。
单击全部设置为非_routing策略,在弹出的对话框中单击确认,表示用_id进行路由。关于_routing的更多信息,请参见_routing。
单击下一步保存任务并预检查。
说明在同步作业正式启动之前,会先进行预检查。只有预检查通过后,才能成功启动同步作业。
如果预检查失败,请单击失败检查项后的查看详情,并根据提示修复后重新进行预检查。
如果预检查产生警告:
对于不可以忽略的检查项,请单击失败检查项后的查看详情,并根据提示修复后重新进行预检查。
对于可以忽略无需修复的检查项,您可以依次单击点击确认告警详情、确认屏蔽、确定、重新进行预检查,跳过告警检查项重新进行预检查。如果选择屏蔽告警检查项,可能会导致数据不一致等问题,给业务带来风险。
单击下一步购买,在购买页面,选择数据同步实例的计费方式、链路规格。
类别
参数
说明
信息配置
计费方式
预付费(包年包月):在新建实例时支付费用。适合长期需求,价格比按量付费更实惠,且购买时长越长,折扣越多。
后付费(按量付费):按小时扣费。适合短期需求,用完可立即释放实例,节省费用。
资源组配置
实例所属的资源组,默认为default resource group。更多信息,请参见什么是资源管理。
链路规格
DTS为您提供了不同性能的同步规格,同步链路规格的不同会影响同步速率,您可以根据业务场景进行选择。更多信息,请参见数据同步链路规格说明。
订购时长
在预付费模式下,选择包年包月实例的时长和数量,包月可选择1~9个月,包年可选择1年、2年、3年和5年。
说明仅当付费类型为预付费时需配置该参数。
配置完成后,阅读并勾选相关服务条款。
单击购买并启动,同步任务正式开始,您可在数据同步界面查看任务进度。
步骤三:验证数据同步结果
在Kibana页面的左上角,选择
在Console中,执行如下命令查看全量数据同步结果。
GET /es_test/_search
说明es_test
需要替换为同步后的索引名称。预期结果如下:
{ "took" : 3, "timed_out" : false, "_shards" : { "total" : 12, "successful" : 12, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : { "value" : 5, "relation" : "eq" }, "max_score" : 1.0, "hits" : [ { "_index" : "es_test", "_type" : "es_test", "_id" : "5", "_score" : 1.0, "_source" : { "id" : 5, "name" : "user5", "age" : 42, "hobby" : "basketball" } }, { "_index" : "es_test", "_type" : "es_test", "_id" : "2", "_score" : 1.0, "_source" : { "id" : 2, "name" : "user2", "age" : 23, "hobby" : "sport" } }, { "_index" : "es_test", "_type" : "es_test", "_id" : "3", "_score" : 1.0, "_source" : { "id" : 3, "name" : "user3", "age" : 43, "hobby" : "game" } }, { "_index" : "es_test", "_type" : "es_test", "_id" : "4", "_score" : 1.0, "_source" : { "id" : 4, "name" : "user4", "age" : 24, "hobby" : "run" } }, { "_index" : "es_test", "_type" : "es_test", "_id" : "1", "_score" : 1.0, "_source" : { "id" : 1, "name" : "user1", "age" : 22, "hobby" : "music" } } ] } }
在RDS MySQL中插入一条数据,在Serverless应用中查看增量数据同步结果。
例如,通过以下SQL语句在RDS MySQL中插入一条数据。
INSERT INTO `test_mysql`.`es_test` (`id`,`name`,`age`,`hobby`) VALUES (6,'user6',30,'dance');
预期结果如下:
{ "took" : 8, "timed_out" : false, "_shards" : { "total" : 12, "successful" : 12, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : { "value" : 6, "relation" : "eq" }, "max_score" : 1.0, "hits" : [ { "_index" : "es_test", "_type" : "es_test", "_id" : "5", "_score" : 1.0, "_source" : { "id" : 5, "name" : "user5", "age" : 42, "hobby" : "basketball" } }, { "_index" : "es_test", "_type" : "es_test", "_id" : "2", "_score" : 1.0, "_source" : { "id" : 2, "name" : "user2", "age" : 23, "hobby" : "sport" } }, { "_index" : "es_test", "_type" : "es_test", "_id" : "3", "_score" : 1.0, "_source" : { "id" : 3, "name" : "user3", "age" : 43, "hobby" : "game" } }, { "_index" : "es_test", "_type" : "es_test", "_id" : "4", "_score" : 1.0, "_source" : { "id" : 4, "name" : "user4", "age" : 24, "hobby" : "run" } }, { "_index" : "es_test", "_type" : "es_test", "_id" : "1", "_score" : 1.0, "_source" : { "id" : 1, "name" : "user1", "age" : 22, "hobby" : "music" } }, { "_index" : "es_test", "_type" : "es_test", "_id" : "6", "_score" : 1.0, "_source" : { "name" : "user6", "id" : 6, "age" : 30, "hobby" : "dance" } } ] } }
常见问题:因写入限流导致同步失败如何处理?
使用ES Serverless进行数据同步时,若写入速率超过系统限制,会触发429
报错,导致同步任务失败。常见的限流场景及解决方案如下。
场景一:写入吞吐限流
【问题描述】:写入速率超过当前CU配额允许的吞吐能力,产生
limited by [XX][write.throughput]
报错。【解决方案】:
降低每秒迁移数据量(BPS):减少每秒的写入流量,确保写入速率一定程度低于当前CU的限流阈值,为重试留出缓冲空间。可在高级配置中配置。
降低DTS批量写入大小(sink.batch.size.maximum):单次批量请求过大可能导致失败重试,并触发限流,建议适当减小批量请求大小。可通过控制台进入同步任务详情页,在参数设置界面调整
sink.batch.size.maximum
的运行值,也可调用ModifyDtsJobConfig接口调整相应配置。提升CU配额:提升应用的CU配额,以支持更高的吞吐量。
寻求技术支持:若已调至最大CU仍无法满足需求,可提交工单联系技术支持人员协助处理。
场景二:分片级批量写入限流
【问题描述】:单个分片接收的批量写入请求过载,产生
limited by [XX][bulk_per_shard.throughput]
报错。【解决方案】:创建同步任务时,在高级配置中选择更大的主分片数量,实现写入负载更均衡地分布到多个分片,降低单分片压力。
场景三:CU资源限流
相关文档
其他数据库的数据同步操作,请参考: