通过DTS将MySQL数据实时同步到阿里云ES

当您需要将企业线上的RDS MySQL中的生产数据实时同步到阿里云Elasticsearch(ES)中进行搜索分析时,可通过数据传输服务DTS快速创建RDS MySQL到阿里云ES的实时同步作业,适用于对实时同步要求较高的同步场景。本文介绍如何配置RDS MySQL到阿里云ES实例的实时同步作业,并验证全量和增量数据同步的结果。

背景信息

  • 数据传输服务DTS是一种集数据迁移、数据订阅及数据实时同步于一体的数据传输服务,详细信息请参见数据传输服务DTS。DTS支持同步的SQL操作包括Insert、Delete和Update,支持同步的数据源版本要求请参见同步方案概览

  • 通过在DTS中配置从源(MySQL)同步到目标(ES),可实现全量、增量数据同步。适用于对实时同步要求较高的关系型数据库中数据的同步场景或需要将关系型数据库中的全量或增量数据同步到阿里云ES场景。

注意事项

  • DTS不支持同步DDL操作,如果源库中待同步的表在同步的过程中已经执行了DDL操作,您需要先移除同步对象,然后在ES实例中移除该表对应的索引,最后新增同步对象。详情请参见移除同步对象新增同步对象

  • 如果源库中待同步的表需要执行增加列的操作,您只需先在ES实例中修改对应表的mapping,然后在源库中执行相应的DDL操作,最后暂停并启动DTS增量数据同步任务。

  • DTS在执行全量数据初始化时将占用源库和目标库一定的读写资源,可能会导致数据库的负载上升,在数据库性能较差、规格较低或业务量较大的情况下(例如源库有大量慢SQL、存在无主键表或目标库存在死锁等),可能会加重数据库压力,甚至导致数据库服务不可用。因此您需要在执行数据同步前评估源库和目标库的性能,同时建议您在业务低峰期执行数据同步(例如源库和目标库的CPU负载在30%以下)。

    • 在业务高峰期全量同步数据,可能造成全量数据同步失败,重启全量同步任务即可。

    • 在业务高峰期增量同步数据,可能出现数据同步延迟的情况。

  • 由于MySQL和ES实例支持的数据类型不同,数据类型无法一一对应。所以DTS在进行结构初始化时,会根据目标库支持的数据类型进行类型映射,详情请参见结构初始化涉及的数据类型映射关系

操作流程

  1. 准备环境:先在源库RDS MySQL中添加待同步数据,然后创建目标库ES实例,并为ES实例开启自动创建索引功能。

  2. 创建数据同步任务:在DTS控制台配置并购买数据同步任务。

  3. 验证数据同步结果:先在ES的Kibana控制台验证全量同步结果,然后在源库RDS MySQL中添加数据,最后在ES的Kibana控制台验证增量同步结果。

操作步骤

步骤一:环境准备

文本以将RDS MySQL 8.0版本实例中的数据同步到阿里云ES 7.10版本实例中为例。

准备源库待同步数据

  1. 创建RDS MySQL 8.0版本实例。具体操作,请参见创建RDS MySQL实例

  2. 创建账号和数据库test_mysql。具体操作,请参见创建数据库和账号

  3. 在数据库test_mysql中,新建表es_test并插入数据。使用的建表语句及数据如下:

    -- create table
    CREATE TABLE `es_test` (
        `id` bigint(32) NOT NULL,
        `name` varchar(32) NULL,
        `age` bigint(32) NULL,
        `hobby` varchar(32) NULL,
        PRIMARY KEY (`id`)
    ) ENGINE=InnoDB
    DEFAULT CHARACTER SET=utf8;
    
    -- insert data
    INSERT INTO `es_test` (`id`,`name`,`age`,`hobby`) VALUES (1,'user1',22,'music');
    INSERT INTO `es_test` (`id`,`name`,`age`,`hobby`) VALUES (2,'user2',23,'sport');
    INSERT INTO `es_test` (`id`,`name`,`age`,`hobby`) VALUES (3,'user3',43,'game');
    INSERT INTO `es_test` (`id`,`name`,`age`,`hobby`) VALUES (4,'user4',24,'run');
    INSERT INTO `es_test` (`id`,`name`,`age`,`hobby`) VALUES (5,'user5',42,'basketball');

准备目标库ES实例

  1. 创建阿里云ES 7.10版本实例。具体操作,请参见创建阿里云Elasticsearch实例

  2. ES实例开启自动创建索引功能。具体操作,请参见配置YML参数

    image

步骤二:创建数据同步任务

  1. 登录新版DTS同步任务的列表页面

  2. 单击创建任务

  3. 按照页面提示配置数据同步任务。

    说明

    以下步骤中涉及的参数的说明,请参见RDS MySQL同步至Elasticsearch

    1. 配置源库及目标库,在页面下方单击测试连接以进行下一步

      image

    2. 配置任务对象。

      image

    3. 配置高级设置,本文高级配置保持默认。

    4. 库表列配置页面,单击全部设置为非_routing策略,将全部表设置为非_routing策略。

      说明

      目标库ES实例为7.x版本时,全部表必须设置为非_routing策略。

  4. 配置完成后,根据页面提示保存并预检查任务、购买并启动任务。

    购买并启动任务成功后,同步任务正式开始。您可在数据同步界面查看具体任务进度,待全量同步完成后,您即可在ES实例中查看同步成功的数据。

    image

步骤三(可选):验证数据同步结果

  1. 登录目标ES实例的Kibana控制台。

    登录Kibana控制台,请参见登录Kibana控制台

  2. 在Kibana页面的左上角,选择菜单.png > Management > 开发工具(Dev Tools),在控制台(Console)中执行以下命令。

  3. 验证全量数据同步结果。

    执行如下命令,查看全量数据同步结果。

    GET /es_test/_search

    预期结果如下:

    {
      "took" : 10,
      "timed_out" : false,
      "_shards" : {
        "total" : 5,
        "successful" : 5,
        "skipped" : 0,
        "failed" : 0
      },
      "hits" : {
        "total" : {
          "value" : 5,
          "relation" : "eq"
        },
        "max_score" : 1.0,
        "hits" : [
          {
            "_index" : "es_test",
            "_type" : "es_test",
            "_id" : "3",
            "_score" : 1.0,
            "_source" : {
              "id" : 3,
              "name" : "user3",
              "age" : 43,
              "hobby" : "game"
            }
          },
          {
            "_index" : "es_test",
            "_type" : "es_test",
            "_id" : "5",
            "_score" : 1.0,
            "_source" : {
              "id" : 5,
              "name" : "user5",
              "age" : 42,
              "hobby" : "basketball"
            }
          },
          {
            "_index" : "es_test",
            "_type" : "es_test",
            "_id" : "4",
            "_score" : 1.0,
            "_source" : {
              "id" : 4,
              "name" : "user4",
              "age" : 24,
              "hobby" : "run"
            }
          },
          {
            "_index" : "es_test",
            "_type" : "es_test",
            "_id" : "2",
            "_score" : 1.0,
            "_source" : {
              "id" : 2,
              "name" : "user2",
              "age" : 23,
              "hobby" : "sport"
            }
          },
          {
            "_index" : "es_test",
            "_type" : "es_test",
            "_id" : "1",
            "_score" : 1.0,
            "_source" : {
              "id" : 1,
              "name" : "user1",
              "age" : 22,
              "hobby" : "music"
            }
          }
        ]
      }
    }
  4. 验证增量数据同步结果。

    1. 通过以下SQL语句在RDS MySQL中插入一条数据。

      INSERT INTO `test_mysql`.`es_test` (`id`,`name`,`age`,`hobby`) VALUES (6,'user6',30,'dance');
    2. 等待增量同步完成后,再次执行命令GET /es_test/_search,查看增量数据同步结果。

      预期结果如下:

      {
        "took" : 541,
        "timed_out" : false,
        "_shards" : {
          "total" : 5,
          "successful" : 5,
          "skipped" : 0,
          "failed" : 0
        },
        "hits" : {
          "total" : {
            "value" : 6,
            "relation" : "eq"
          },
          "max_score" : 1.0,
          "hits" : [
            {
              "_index" : "es_test",
              "_type" : "es_test",
              "_id" : "3",
              "_score" : 1.0,
              "_source" : {
                "id" : 3,
                "name" : "user3",
                "age" : 43,
                "hobby" : "game"
              }
            },
            {
              "_index" : "es_test",
              "_type" : "es_test",
              "_id" : "5",
              "_score" : 1.0,
              "_source" : {
                "id" : 5,
                "name" : "user5",
                "age" : 42,
                "hobby" : "basketball"
              }
            },
            {
              "_index" : "es_test",
              "_type" : "es_test",
              "_id" : "4",
              "_score" : 1.0,
              "_source" : {
                "id" : 4,
                "name" : "user4",
                "age" : 24,
                "hobby" : "run"
              }
            },
            {
              "_index" : "es_test",
              "_type" : "es_test",
              "_id" : "2",
              "_score" : 1.0,
              "_source" : {
                "id" : 2,
                "name" : "user2",
                "age" : 23,
                "hobby" : "sport"
              }
            },
            {
              "_index" : "es_test",
              "_type" : "es_test",
              "_id" : "6",
              "_score" : 1.0,
              "_source" : {
                "name" : "user6",
                "id" : 6,
                "age" : 30,
                "hobby" : "dance"
              }
            },
            {
              "_index" : "es_test",
              "_type" : "es_test",
              "_id" : "1",
              "_score" : 1.0,
              "_source" : {
                "id" : 1,
                "name" : "user1",
                "age" : 22,
                "hobby" : "music"
              }
            }
          ]
        }
      }