自动将PolarDB MySQL版数据同步至PolarSearch

当您需要对PolarDB MySQL中的业务数据进行全文检索或复杂分析时,直接在数据库上操作可能会影响核心业务的稳定性。PolarDB提供的AutoETL功能,能将数据从读写节点自动、持续地同步至集群内的PolarSearch节点,为您提供一站式的数据服务。您无需额外部署和维护ETL工具,即可实现数据同步,并将搜索分析负载与在线事务处理负载隔离。

说明

当前功能目前正处于灰度阶段。如您有相关需求,请提交工单与我们联系,以便为您开启该功能。

功能简介

AutoETLPolarDB MySQL内置的数据同步能力,它允许数据在集群内不同类型的节点间自动流转。当前版本仅支持从PolarDB MySQL同步至同一集群内的PolarSearch节点。,以用于高性能的搜索和分析。

您可以通过数据库内置的DBMS_ETL工具包,直接使用SQL命令来创建和管理数据同步链路。AutoETL提供三种灵活的数据同步方式:

  • 单表同步(dbms_etl.sync_by_table:将单个源表完整地同步到目标索引。

  • 多表汇聚(dbms_etl.sync_by_map:将多个源表通过JOIN操作汇聚后,同步到目标索引。

  • 自定义SQL(dbms_etl.sync_by_sql:使用兼容Flink SQL的语法进行复杂的数据清洗、转换和聚合。

适用范围

使用AutoETL功能前,需确保环境满足以下条件:

  • 集群版本MySQL 8.0.1,且修订版本需为8.0.1.1.52或以上。

  • 同步方向:仅支持从PolarDB MySQL同步至同一集群内的PolarSearch节点

  • DDL限制:不允许对已建立同步链路的源表进行DDL变更。如需修改,必须重建ETL链路。

创建同步链路

单表同步

  1. 数据准备

    PolarDB MySQL中执行以下SQL语句,创建示例数据库和表,并插入测试数据。

    CREATE DATABASE IF NOT EXISTS db1;
    USE db1;
    CREATE TABLE IF NOT EXISTS t1 (
        id INT PRIMARY KEY,
        c1 VARCHAR(100),
        c2 VARCHAR(100)
    );
    INSERT INTO t1(id, c1, c2) VALUES 
    (1, 'apple', 'red'),
    (2, 'banana', 'yellow'),
    (3, 'grape', 'purple');
    
  2. 创建同步链路

    使用dbms_etl.sync_by_table存储过程,建立从db1.t1表到PolarSearch节点的索引dest的同步任务。

    语法

    call dbms_etl.sync_by_table("search", "<source_table>", "<sink_table>", "<column_list>");

    参数说明

    参数

    说明

    search

    同步目标,当前固定为search,表示PolarSearch节点。

    <source_table>

    源表名,格式为数据库名.表名

    <sink_table>

    PolarSearch节点中的目标索引名。

    <column_list>

    需要同步的列名列表,用英文逗号,分隔。如果为空字符串"",则同步源表所有列。

    使用限制

    • 源表需包含主键或唯一键。

    • 在不同的同步链路中,不能使用相同的源表或目标表。

    • 创建链路后,源表新增的列默认不会被自动同步。如需同步新增列,请重建链路。

    • 如果您希望使用自己定义的目标索引配置,您可以先在PolarSearch节点中手动创建索引并定义其配置,然后再创建同步链路。如果链路创建时目标索引不存在,系统将自动创建。

    示例

    • db1.t1全表同步到PolarSearchdest索引:

      call dbms_etl.sync_by_table("search", "db1.t1", "dest", "");
    • db1.t1表的c1c2列同步到dest索引:

      call dbms_etl.sync_by_table("search", "db1.t1", "dest", "c1, c2");
  3. 验证数据

    连接到PolarSearch节点,使用与Elasticsearch兼容的REST API进行查询,确认数据已同步。

    # 将<polarsearch_endpoint>替换为PolarSearch节点的连接地址
    curl -u <user>:<password> -X GET "http://<polarsearch_endpoint>/dest/_search"

多表汇聚

  1. 数据准备

    PolarDB MySQL中执行以下SQL语句,创建示例数据库和表,并插入测试数据。

    CREATE DATABASE IF NOT EXISTS db1;
    CREATE DATABASE IF NOT EXISTS db2;
    CREATE DATABASE IF NOT EXISTS db3;
    
    CREATE TABLE IF NOT EXISTS db1.t1 (id INT PRIMARY KEY, c1 INT);
    CREATE TABLE IF NOT EXISTS db2.t2 (id INT PRIMARY KEY, c2 INT);
    CREATE TABLE IF NOT EXISTS db3.t3 (id INT PRIMARY KEY, c3 VARCHAR(10));
    
    INSERT INTO db1.t1(id, c1) VALUES (1, 11), (2, 22), (3, 33);
    INSERT INTO db2.t2(id, c2) VALUES (1, 111), (2, 222), (4, 444);
    INSERT INTO db3.t3(id, c3) VALUES (1, 'aaa'), (3, 'ccc'), (4, 'ddd');
  2. 创建同步链路

    使用dbms_etl.sync_by_map存储过程,可将多个表的数据连接(JOIN)后,汇聚到一个PolarSearch节点的索引中。

    语法

    call dbms_etl.sync_by_map(
        "search",
        "<columns_map>", -- 目标索引字段与源表字段的映射关系
        "<join_fields>", -- 表之间的连接键
        "<join_types>",  -- 连接类型 (inner, left)
        "<filter>"       -- 数据过滤条件
    );

    参数说明

    参数

    格式示例

    说明

    columns_map

    dest.c1(db1.t1.c1),dest.c2(db2.t2.c2)

    目标索引字段与源表字段的映射关系。

    示例表示:目标索引destc1字段来自db1.t1.c1c2字段来自db2.t2.c2

    join_fields

    dest.id=db1.t1.id,db2.t2.id

    表之间的连接键。

    示例表示:目标索引的文档ID(dest.id)由db1.t1.iddb2.t2.id构成,同时db1.t1.iddb2.t2.id也是连接条件。

    join_types

    inner,left

    表之间的连接类型,连接顺序与join_fields中表的出现顺序一致。示例表示:t1 INNER JOIN t2,然后结果再LEFT JOIN t3

    filter

    db1.t1.c1 > 10 AND db2.t2.c2 < 100

    一个标准的SQL WHERE子句,用于在同步前过滤源表数据。

    使用限制

    • 所有参与同步的源表必须包含主键。

    • 该功能使用流式计算,同步过程中仅保证最终一致性。

    • 对于目标索引的更新模式为先删除后插入。如果您不希望在查询时访问到被删除数据的中间状态,可以在执行命令前设置会话变量set sink_options = "'ignore-delete' = 'true'";以忽略PolarSearch节点数据删除的选项。

    示例

    • 两张表INNER JOIN:将db1.t1db2.t2通过id字段进行INNER JOIN,并将t1.c1t2.c2同步到dest索引的c1c2字段。

      call dbms_etl.sync_by_map(
        "search",
        "dest.c1(db1.t1.c1), dest.c2(db2.t2.c2)",
        "dest.id=db1.t1.id,db2.t2.id", 
        "inner",
         ""
      );
    • 多张表混合JOIN并过滤:db1.t1db2.t2db3.t3三张表连接,其中t1t2INNER JOINt1t3LEFT JOIN,并筛选t1.c1 > 10t2.c2 < 100的数据。

      call dbms_etl.sync_by_map(
        "search", 
        "dest.c1(db1.t1.c1), dest.c2(db2.t2.c2), dest.c3(db3.t3.c3)", 
        "dest.id=db1.t1.id,db2.t2.id,db3.t3.id", 
        "inner,left", 
        "db1.t1.c1 > 10 and db2.t2.c2 < 100"
      );
  3. 验证数据

    连接到PolarSearch节点,使用与Elasticsearch兼容的REST API进行查询,确认数据已同步。

    # 将<polarsearch_endpoint>替换为PolarSearch节点的连接地址
    curl -u <user>:<password> -X GET "http://<polarsearch_endpoint>/dest/_search"

自定义SQL

  1. 创建同步链路

    对于需要复杂转换、聚合或计算的场景,dbms_etl.sync_by_sql存储过程支持使用Flink SQL语法定义数据同步逻辑。

    重要

    安全警告:严禁在SQL语句中硬编码密码以下示例仅为演示语法结构,其WITH子句中包含明文密码,存在极大的安全风险。在生产环境中,必须使用更安全的方式管理凭证。

    语法

    call dbms_etl.sync_by_sql("search", "<sync_sql>");

    示例

    call dbms_etl.sync_by_sql("search", "
    -- 步骤1:定义 PolarDB 源表
    CREATE TEMPORARY TABLE `db1`.`sbtest1` (
      `id`   BIGINT,
      `k`    BIGINT,
      `c`    STRING,
      PRIMARY KEY (`id`) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql',
      'hostname' = 'xxxxxxx', -- 填写 PolarDB 集群地址
      'port' = '3306',
      'username' = 'xxx',     -- 生产环境严禁使用明文
      'password' = 'xxx',     -- 生产环境严禁使用明文
      'database-name' = 'db1',
      'table-name' = 'sbtest1'
    );
    
    -- 步骤2:定义 PolarSearch 目标表
    CREATE TEMPORARY TABLE `dest` (
      `k`  BIGINT,
      `max_c` STRING,
      PRIMARY KEY (`k`) NOT ENFORCED
    ) WITH (
      'connector' = 'opensearch',
      'hosts' = 'xxxxxx:xxxx',     -- 填写 PolarSearch 连接地址
      'index' = 'dest',
      'username' = 'xxx',     -- 生产环境严禁使用明文
      'password' = 'xxx'      -- 生产环境严禁使用明文
    );
    
    -- 步骤3:定义计算和插入逻辑
    INSERT INTO `dest`
    SELECT
        `t1`.`k`,
        MAX(`t1`.`c`)
    FROM `db1`.`sbtest1` AS `t1`
    GROUP BY `t1`.`k`;
    ");
  2. 验证数据

    连接到PolarSearch节点,使用与Elasticsearch兼容的REST API进行查询,确认数据已同步。

    # 将<polarsearch_endpoint>替换为PolarSearch节点的连接地址
    curl -u <user>:<password> -X GET "http://<polarsearch_endpoint>/dest/_search"

管理同步链路

您可以使用以下命令查看和删除已创建的同步链路。

查看链路

  • 查看所有链路:

    call dbms_etl.show_sync_link();
  • 根据ID查看指定链路:将<sync_id>替换为步骤二返回的ID。

    call dbms_etl.show_sync_link_by_id('<sync_id>')\G

    返回结果说明:

    *************************** 1. row ***************************
            SYNC_ID: crb5rmv8rttsg
               NAME: crb5rmv8rttsg
             SYSTEM: search
    SYNC_DEFINITION: db1.t1 -> dest
      SOURCE_TABLES: db1.t1
        SINK_TABLES: dest
             STATUS: active  -- 链路状态,active表示正常运行
            MESSAGE:         -- 如果出错,此处会显示错误信息
         CREATED_AT: 2024-05-20 11:55:06
         UPDATED_AT: 2024-05-20 17:28:04
            OPTIONS: ...

删除链路

重要

删除同步链路是高危操作。默认情况下,该操作会同时删除PolarSearch中的目标索引及其所有数据。执行前请务必确认。

此操作用于停止数据同步并清理相关资源。

call dbms_etl.drop_sync_link('<sync_id>');

对不同状态的链路执行drop_sync_link删除时,系统的处理逻辑存在差异:

  • active状态的链路:首先会变为dropping,待系统完成链路资源和目标索引数据的清理后,状态才会变为dropped

  • dropped状态的链路:系统将彻底清除该链路的信息。

  • 其他状态的链路:系统不支持删除操作。

常见问题

如何将PolarDB的表字段映射到PolarSearch节点的索引字段?

AutoETL 提供两种字段映射方式:

  • 隐式映射(sync_by_table):在使用sync_by_table时,PolarSearch节点的索引字段名默认与PolarDB MySQL源表的列名一致。您可通过<column_list>参数指定需要创建和同步的特定列。

  • 显式映射(sync_by_map):在进行字段重命名或多表汇聚时,可利用sync_by_map<columns_map>参数明确定义目标字段与源表列之间的映射关系。例如,dest.title(db1.posts.post_title)表示将db1.posts表中的post_title列映射为dest索引中的title字段。