文档

通过DTS将MySQL数据同步到阿里云ES Serverless

更新时间:
重要

本文中含有需要您注意的重要提示信息,忽略该信息可能对您的业务造成影响,请务必仔细阅读。

如果您需要将关系型数据库中的数据实时同步到阿里云Elasticsearch Serverless(简称ES Serverless)中进行搜索分析,可通过数据传输服务DTS快速创建源库到目标库ES Serverless的实时同步作业,适用于对实时同步要求较高的同步场景。本文以源库为RDS MySQL为例,介绍全量和增量同步数据到ES Serverless的方法。

背景信息

  • 数据传输服务DTS是一种集数据迁移、数据订阅及数据实时同步于一体的数据传输服务,详细信息请参见数据传输服务DTS。DTS支持同步的SQL操作包括Insert、Delete和Update,支持同步的数据源版本要求请参见同步方案概览

    说明

    支持同步到ES Serverless的源库包括MySQL、PolarDB for MySQL、PolarDB-X 1.0、PolarDB-X 2.0。

  • 通过在DTS中配置从源(MySQL)同步到目标(ES Serverless),可实现全量、增量数据同步。适用于对实时同步要求较高的关系型数据库中数据的同步场景或需要将关系型数据库中的全量或增量数据同步到ES Serverless的场景。

注意事项

  • 同步入门版应用的数据需要您自行控制DTS数据同步速率,否则可能出现同步任务执行失败的情况。

  • DTS不支持同步DDL操作,如果源库中待同步的表在同步的过程中已经执行了DDL操作,您需要先移除同步对象,然后在ES Serverless应用中移除该表对应的索引,最后新增同步对象。详情请参见移除同步对象新增同步对象

  • 如果源库中待同步的表需要执行增加列的操作,您只需先在Serverless应用中修改对应表的mapping,然后在源库中执行相应的DDL操作,最后暂停并启动DTS增量数据同步任务。

  • DTS在执行全量数据初始化时将占用源库和目标库一定的读写资源,可能会导致数据库的负载上升,在数据库性能较差、规格较低或业务量较大的情况下(例如源库有大量慢SQL、存在无主键表或目标库存在死锁等),可能会加重数据库压力,甚至导致数据库服务不可用。因此您需要在执行数据同步前评估源库和目标库的性能,同时建议您在业务低峰期执行数据同步(例如源库和目标库的CPU负载在30%以下)。

    • 在业务高峰期全量同步数据,可能造成全量数据同步失败,重启全量同步任务即可。

    • 在业务高峰期增量同步数据,可能出现数据同步延迟的情况。

操作步骤

步骤一:准备待同步数据

文本以将RDS MySQL 8.0版本实例中的数据同步到阿里云ES Serverless应用中为例。

  1. 创建RDS MySQL 8.0版本实例。具体操作,请参见创建RDS MySQL实例

  2. 创建账号和数据库test_mysql。具体操作,请参见创建数据库和账号

  3. 在数据库test_mysql中,新建表es_test并插入数据。使用的建表语句及数据如下:

    -- create table
    CREATE TABLE `es_test` (
        `id` bigint(32) NOT NULL,
        `name` varchar(32) NULL,
        `age` bigint(32) NULL,
        `hobby` varchar(32) NULL,
        PRIMARY KEY (`id`)
    ) ENGINE=InnoDB
    DEFAULT CHARACTER SET=utf8;
    
    -- insert data
    INSERT INTO `es_test` (`id`,`name`,`age`,`hobby`) VALUES (1,'user1',22,'music');
    INSERT INTO `es_test` (`id`,`name`,`age`,`hobby`) VALUES (2,'user2',23,'sport');
    INSERT INTO `es_test` (`id`,`name`,`age`,`hobby`) VALUES (3,'user3',43,'game');
    INSERT INTO `es_test` (`id`,`name`,`age`,`hobby`) VALUES (4,'user4',24,'run');
    INSERT INTO `es_test` (`id`,`name`,`age`,`hobby`) VALUES (5,'user5',42,'basketball');

步骤二:创建同步任务

  1. 进入创建同步任务页面。

    1. 登录阿里云Elasticsearch控制台
    2. 在左侧导航栏,单击Elasticsearch Serverless版本

      说明

      如果您所在的地域没有Elasticsearch Serverless服务,请在顶部菜单栏切换地域。

    3. Elasticsearch Serverless服务控制台的左侧导航栏,单击应用管理

    4. 应用管理页面,单击目标应用名称。

    5. 在左侧导航栏,单击数据迁移与同步

    6. 单击前往同步

    7. 数据同步页签,单击创建同步任务

  2. 配置源库及目标库向导页面,配置源库及目标库。

    重要参数说明如下:

    项目

    参数

    描述

    源库信息

    数据库类型

    源数据库支持的数据库类型:

    • MySQL

    • PolarDB for MySQL

    • PolarDB-X 1.0

    • PolarDB-X 2.0

    接入方式

    接入方式支持:

    • 云实例

    • 专线/VPN网关/智能网关

    • ECS自建数据库

    • 云企业网CEN

    • 数据库网关DG

    如果您的源数据库为RDS MySQL,接入方式选择云实例。

    实例地区

    选择源RDS MySQL实例所在地域。

    RDS实例ID

    选择源RDS MySQL实例。

    数据库账号

    填入源RDS MySQL实例的数据库账号。

    数据库密码

    填入数据库账号对应的密码。

    连接方式

    根据需求选择非加密连接SSL安全连接。如果设置为SSL安全连接,您需要提前开启RDS MySQL实例的SSL加密功能,详情请参见使用云端证书快速开启SSL链路加密

    目标库信息

    数据库账号

    Serverless应用的用户名,在应用详情页面的基本信息区域查看、复制。

    数据库密码

    Serverless应用的用户密码,创建应用时输入的密码。如果您忘记了密码,可以在应用详情页面的基本信息区域,单击用户密码后的修改,修改应用密码。

  3. 单击测试连接以进行下一步,在对象配置向导页面,配置任务对象及高级配置。

    配置

    说明

    同步类型

    固定选中增量同步。默认情况下,您还需要同时选中库表结构同步全量同步。预检查完成后,DTS会将源实例中待同步对象的全量数据在目标集群中初始化,作为后续增量同步数据的基线数据。

    索引名称

    • 表名:在目标ES Serverless应用中创建的索引名称和表名一致。

    • 库名_表名:在目标ES Serverless应用中创建的索引名称由库名、下划线(_)和表名按顺序拼接而成。

    说明

    索引名称映射配置会在所有表中生效。

    目标已存在表的处理模式

    • 预检查并报错拦截:检查目标数据库中是否有同名的表。如果目标数据库中没有同名的表,则通过该检查项目;如果目标数据库中有同名的表,则在预检查阶段提示错误,数据同步任务不会被启动。

      说明

      如果目标库中同名的表不方便删除或重命名,您可以更改该表在目标库中的名称,请参见库表列名映射

    • 忽略报错并继续执行:跳过目标数据库中是否有同名表的检查项。

      警告

      选择为忽略报错并继续执行,可能导致数据不一致,给业务带来风险,例如:

      • 表结构一致的情况下,如在目标库遇到与源库主键或唯一键的值相同的记录:

        • 全量期间,DTS会保留目标集群中的该条记录,即源库中的该条记录不会同步至目标数据库中。

        • 增量期间,DTS不会保留目标集群中的该条记录,即源库中的该条记录会覆盖至目标数据库中。

      • 表结构不一致的情况下,可能会导致无法初始化数据、只能同步部分列的数据或同步失败,请谨慎操作。

    目标库对象名称大小写策略

    您可以配置目标实例中同步对象的库名、表名和列名的英文大小写策略。默认情况下选择DTS默认策略,您也可以选择与源库、目标库默认策略保持一致。更多信息,请参见目标库对象名称大小写策略

    源库对象

    源库对象框中单击待同步对象,然后单击向右将其移动至已选择对象框。

    说明

    同步对象选择的粒度为库、表。若选择的同步对象为表,其他对象(如视图、触发器、存储过程)不会被同步至目标库。

    已选择对象

    如果您需要修改同步后的字段名称,可在已选择对象区域框中,右键单击对应的表名,设置该表在目标ES Serverless中的索引名称、Type名称等信息,然后单击确定。更多信息,请参见库表列名单个映射

    说明

    您可以设置SQL过滤条件,过滤待同步的数据,只有满足过滤条件的数据才会被同步到目标实例,详情请参见通过SQL条件过滤任务数据

    单击高级配置,展开高级配置项进行高级配置。参数说明,请参见高级配置

  4. 单击下一步配置库表字段,在配置库表字段向导页面,配置库表字段。

    单击全部设置为非_routing策略,在弹出的对话框中单击确认,表示用_id进行路由。关于_routing的更多信息,请参见_routing

  5. 单击下一步保存任务并预检查

    说明
    • 在同步作业正式启动之前,会先进行预检查。只有预检查通过后,才能成功启动同步作业。

    • 如果预检查失败,请单击失败检查项后的查看详情,并根据提示修复后重新进行预检查。

    • 如果预检查产生警告:

      • 对于不可以忽略的检查项,请单击失败检查项后的查看详情,并根据提示修复后重新进行预检查。

      • 对于可以忽略无需修复的检查项,您可以依次单击点击确认告警详情确认屏蔽确定重新进行预检查,跳过告警检查项重新进行预检查。如果选择屏蔽告警检查项,可能会导致数据不一致等问题,给业务带来风险。

  6. 单击下一步购买,在购买页面,选择数据同步实例的计费方式、链路规格。

    类别

    参数

    说明

    信息配置

    计费方式

    • 预付费(包年包月):在新建实例时支付费用。适合长期需求,价格比按量付费更实惠,且购买时长越长,折扣越多。

    • 后付费(按量付费):按小时扣费。适合短期需求,用完可立即释放实例,节省费用。

    资源组配置

    实例所属的资源组,默认为default resource group。更多信息,请参见什么是资源管理

    链路规格

    DTS为您提供了不同性能的同步规格,同步链路规格的不同会影响同步速率,您可以根据业务场景进行选择。更多信息,请参见数据同步链路规格说明

    订购时长

    在预付费模式下,选择包年包月实例的时长和数量,包月可选择1~9个月,包年可选择1年、2年、3年和5年。

    说明

    该选项仅在付费类型为预付费时出现。

  7. 配置完成后,阅读并勾选《数据传输(按量付费)服务条款》

  8. 单击购买并启动,同步任务正式开始,您可在数据同步界面查看具体任务进度。

步骤三:验证数据同步结果

  1. 登录目标ES Serverless应用的Kibana控制台。

    登录Kibana控制台,请参见通过Kibana使用Serverless应用

  2. 在Kibana页面的左上角,选择菜单.png > Management > Dev Tools

  3. Console中,执行如下命令查看全量数据同步结果。

    GET /es_test/_search
    说明

    es_test需要替换为同步后的索引名称。

    预期结果如下:

    {
      "took" : 3,
      "timed_out" : false,
      "_shards" : {
        "total" : 12,
        "successful" : 12,
        "skipped" : 0,
        "failed" : 0
      },
      "hits" : {
        "total" : {
          "value" : 5,
          "relation" : "eq"
        },
        "max_score" : 1.0,
        "hits" : [
          {
            "_index" : "es_test",
            "_type" : "es_test",
            "_id" : "5",
            "_score" : 1.0,
            "_source" : {
              "id" : 5,
              "name" : "user5",
              "age" : 42,
              "hobby" : "basketball"
            }
          },
          {
            "_index" : "es_test",
            "_type" : "es_test",
            "_id" : "2",
            "_score" : 1.0,
            "_source" : {
              "id" : 2,
              "name" : "user2",
              "age" : 23,
              "hobby" : "sport"
            }
          },
          {
            "_index" : "es_test",
            "_type" : "es_test",
            "_id" : "3",
            "_score" : 1.0,
            "_source" : {
              "id" : 3,
              "name" : "user3",
              "age" : 43,
              "hobby" : "game"
            }
          },
          {
            "_index" : "es_test",
            "_type" : "es_test",
            "_id" : "4",
            "_score" : 1.0,
            "_source" : {
              "id" : 4,
              "name" : "user4",
              "age" : 24,
              "hobby" : "run"
            }
          },
          {
            "_index" : "es_test",
            "_type" : "es_test",
            "_id" : "1",
            "_score" : 1.0,
            "_source" : {
              "id" : 1,
              "name" : "user1",
              "age" : 22,
              "hobby" : "music"
            }
          }
        ]
      }
    }
    
  4. 在RDS MySQL中插入一条数据,在Serverless应用中查看增量数据同步结果。

    例如,通过以下SQL语句在RDS MySQL中插入一条数据。

    INSERT INTO `test_mysql`.`es_test` (`id`,`name`,`age`,`hobby`) VALUES (6,'user6',30,'dance');

    预期结果如下:

    {
      "took" : 8,
      "timed_out" : false,
      "_shards" : {
        "total" : 12,
        "successful" : 12,
        "skipped" : 0,
        "failed" : 0
      },
      "hits" : {
        "total" : {
          "value" : 6,
          "relation" : "eq"
        },
        "max_score" : 1.0,
        "hits" : [
          {
            "_index" : "es_test",
            "_type" : "es_test",
            "_id" : "5",
            "_score" : 1.0,
            "_source" : {
              "id" : 5,
              "name" : "user5",
              "age" : 42,
              "hobby" : "basketball"
            }
          },
          {
            "_index" : "es_test",
            "_type" : "es_test",
            "_id" : "2",
            "_score" : 1.0,
            "_source" : {
              "id" : 2,
              "name" : "user2",
              "age" : 23,
              "hobby" : "sport"
            }
          },
          {
            "_index" : "es_test",
            "_type" : "es_test",
            "_id" : "3",
            "_score" : 1.0,
            "_source" : {
              "id" : 3,
              "name" : "user3",
              "age" : 43,
              "hobby" : "game"
            }
          },
          {
            "_index" : "es_test",
            "_type" : "es_test",
            "_id" : "4",
            "_score" : 1.0,
            "_source" : {
              "id" : 4,
              "name" : "user4",
              "age" : 24,
              "hobby" : "run"
            }
          },
          {
            "_index" : "es_test",
            "_type" : "es_test",
            "_id" : "1",
            "_score" : 1.0,
            "_source" : {
              "id" : 1,
              "name" : "user1",
              "age" : 22,
              "hobby" : "music"
            }
          },
          {
            "_index" : "es_test",
            "_type" : "es_test",
            "_id" : "6",
            "_score" : 1.0,
            "_source" : {
              "name" : "user6",
              "id" : 6,
              "age" : 30,
              "hobby" : "dance"
            }
          }
        ]
      }
    }
    

相关文档

您还可以参考以下文档进行数据同步: