当您需要对PolarDB MySQL版中的业务数据进行全文检索或复杂分析时,直接在数据库上操作可能会影响核心业务的稳定性。PolarDB提供的AutoETL功能,能将数据从读写节点自动、持续地同步至集群内的PolarSearch节点,为您提供一站式的数据服务。您无需额外部署和维护ETL工具,即可实现数据同步,并将搜索分析负载与在线事务处理负载隔离。
当前功能目前正处于灰度阶段。如您有相关需求,请提交工单与我们联系,以便为您开启该功能。
功能简介
AutoETL是PolarDB MySQL版内置的数据同步能力,它允许数据在集群内不同类型的节点间自动流转。当前版本仅支持从PolarDB MySQL版同步至同一集群内的PolarSearch节点。,以用于高性能的搜索和分析。
您可以通过数据库内置的DBMS_ETL工具包,直接使用SQL命令来创建和管理数据同步链路。AutoETL提供三种灵活的数据同步方式:
单表同步(
dbms_etl.sync_by_table):将单个源表完整地同步到目标索引。多表汇聚(
dbms_etl.sync_by_map):将多个源表通过JOIN操作汇聚后,同步到目标索引。自定义SQL(
dbms_etl.sync_by_sql):使用兼容Flink SQL的语法进行复杂的数据清洗、转换和聚合。
适用范围
使用AutoETL功能前,需确保环境满足以下条件:
集群版本:MySQL 8.0.1,且修订版本需为8.0.1.1.52或以上。
同步方向:仅支持从PolarDB MySQL版同步至同一集群内的PolarSearch节点。
DDL限制:不允许对已建立同步链路的源表进行DDL变更。如需修改,必须重建ETL链路。
创建同步链路
单表同步
数据准备
在PolarDB MySQL版中执行以下SQL语句,创建示例数据库和表,并插入测试数据。
CREATE DATABASE IF NOT EXISTS db1; USE db1; CREATE TABLE IF NOT EXISTS t1 ( id INT PRIMARY KEY, c1 VARCHAR(100), c2 VARCHAR(100) ); INSERT INTO t1(id, c1, c2) VALUES (1, 'apple', 'red'), (2, 'banana', 'yellow'), (3, 'grape', 'purple');创建同步链路
使用
dbms_etl.sync_by_table存储过程,建立从db1.t1表到PolarSearch节点的索引dest的同步任务。语法
call dbms_etl.sync_by_table("search", "<source_table>", "<sink_table>", "<column_list>");参数说明
参数
说明
search同步目标,当前固定为
search,表示PolarSearch节点。<source_table>源表名,格式为
数据库名.表名。<sink_table>PolarSearch节点中的目标索引名。
<column_list>需要同步的列名列表,用英文逗号
,分隔。如果为空字符串"",则同步源表所有列。使用限制
源表需包含主键或唯一键。
在不同的同步链路中,不能使用相同的源表或目标表。
创建链路后,源表新增的列默认不会被自动同步。如需同步新增列,请重建链路。
如果您希望使用自己定义的目标索引配置,您可以先在PolarSearch节点中手动创建索引并定义其配置,然后再创建同步链路。如果链路创建时目标索引不存在,系统将自动创建。
示例
将
db1.t1全表同步到PolarSearch的dest索引:call dbms_etl.sync_by_table("search", "db1.t1", "dest", "");将
db1.t1表的c1和c2列同步到dest索引:call dbms_etl.sync_by_table("search", "db1.t1", "dest", "c1, c2");
验证数据
连接到PolarSearch节点,使用与Elasticsearch兼容的REST API进行查询,确认数据已同步。
# 将<polarsearch_endpoint>替换为PolarSearch节点的连接地址 curl -u <user>:<password> -X GET "http://<polarsearch_endpoint>/dest/_search"
多表汇聚
数据准备
在PolarDB MySQL版中执行以下SQL语句,创建示例数据库和表,并插入测试数据。
CREATE DATABASE IF NOT EXISTS db1; CREATE DATABASE IF NOT EXISTS db2; CREATE DATABASE IF NOT EXISTS db3; CREATE TABLE IF NOT EXISTS db1.t1 (id INT PRIMARY KEY, c1 INT); CREATE TABLE IF NOT EXISTS db2.t2 (id INT PRIMARY KEY, c2 INT); CREATE TABLE IF NOT EXISTS db3.t3 (id INT PRIMARY KEY, c3 VARCHAR(10)); INSERT INTO db1.t1(id, c1) VALUES (1, 11), (2, 22), (3, 33); INSERT INTO db2.t2(id, c2) VALUES (1, 111), (2, 222), (4, 444); INSERT INTO db3.t3(id, c3) VALUES (1, 'aaa'), (3, 'ccc'), (4, 'ddd');创建同步链路
使用
dbms_etl.sync_by_map存储过程,可将多个表的数据连接(JOIN)后,汇聚到一个PolarSearch节点的索引中。语法
call dbms_etl.sync_by_map( "search", "<columns_map>", -- 目标索引字段与源表字段的映射关系 "<join_fields>", -- 表之间的连接键 "<join_types>", -- 连接类型 (inner, left) "<filter>" -- 数据过滤条件 );参数说明
参数
格式示例
说明
columns_mapdest.c1(db1.t1.c1),dest.c2(db2.t2.c2)目标索引字段与源表字段的映射关系。
示例表示:目标索引
dest的c1字段来自db1.t1.c1,c2字段来自db2.t2.c2。join_fieldsdest.id=db1.t1.id,db2.t2.id表之间的连接键。
示例表示:目标索引的文档ID(
dest.id)由db1.t1.id和db2.t2.id构成,同时db1.t1.id和db2.t2.id也是连接条件。join_typesinner,left表之间的连接类型,连接顺序与
join_fields中表的出现顺序一致。示例表示:t1 INNER JOIN t2,然后结果再LEFT JOIN t3。filterdb1.t1.c1 > 10 AND db2.t2.c2 < 100一个标准的SQL
WHERE子句,用于在同步前过滤源表数据。使用限制
所有参与同步的源表必须包含主键。
该功能使用流式计算,同步过程中仅保证最终一致性。
对于目标索引的更新模式为先删除后插入。如果您不希望在查询时访问到被删除数据的中间状态,可以在执行命令前设置会话变量
set sink_options = "'ignore-delete' = 'true'";以忽略PolarSearch节点数据删除的选项。
示例
两张表
INNER JOIN:将db1.t1和db2.t2通过id字段进行INNER JOIN,并将t1.c1和t2.c2同步到dest索引的c1和c2字段。call dbms_etl.sync_by_map( "search", "dest.c1(db1.t1.c1), dest.c2(db2.t2.c2)", "dest.id=db1.t1.id,db2.t2.id", "inner", "" );多张表混合
JOIN并过滤:db1.t1、db2.t2和db3.t3三张表连接,其中t1与t2为INNER JOIN,t1与t3为LEFT JOIN,并筛选t1.c1 > 10且t2.c2 < 100的数据。call dbms_etl.sync_by_map( "search", "dest.c1(db1.t1.c1), dest.c2(db2.t2.c2), dest.c3(db3.t3.c3)", "dest.id=db1.t1.id,db2.t2.id,db3.t3.id", "inner,left", "db1.t1.c1 > 10 and db2.t2.c2 < 100" );
验证数据
连接到PolarSearch节点,使用与Elasticsearch兼容的REST API进行查询,确认数据已同步。
# 将<polarsearch_endpoint>替换为PolarSearch节点的连接地址 curl -u <user>:<password> -X GET "http://<polarsearch_endpoint>/dest/_search"
自定义SQL
创建同步链路
对于需要复杂转换、聚合或计算的场景,
dbms_etl.sync_by_sql存储过程支持使用Flink SQL语法定义数据同步逻辑。重要安全警告:严禁在SQL语句中硬编码密码以下示例仅为演示语法结构,其
WITH子句中包含明文密码,存在极大的安全风险。在生产环境中,必须使用更安全的方式管理凭证。语法
call dbms_etl.sync_by_sql("search", "<sync_sql>");示例
call dbms_etl.sync_by_sql("search", " -- 步骤1:定义 PolarDB 源表 CREATE TEMPORARY TABLE `db1`.`sbtest1` ( `id` BIGINT, `k` BIGINT, `c` STRING, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'mysql', 'hostname' = 'xxxxxxx', -- 填写 PolarDB 集群地址 'port' = '3306', 'username' = 'xxx', -- 生产环境严禁使用明文 'password' = 'xxx', -- 生产环境严禁使用明文 'database-name' = 'db1', 'table-name' = 'sbtest1' ); -- 步骤2:定义 PolarSearch 目标表 CREATE TEMPORARY TABLE `dest` ( `k` BIGINT, `max_c` STRING, PRIMARY KEY (`k`) NOT ENFORCED ) WITH ( 'connector' = 'opensearch', 'hosts' = 'xxxxxx:xxxx', -- 填写 PolarSearch 连接地址 'index' = 'dest', 'username' = 'xxx', -- 生产环境严禁使用明文 'password' = 'xxx' -- 生产环境严禁使用明文 ); -- 步骤3:定义计算和插入逻辑 INSERT INTO `dest` SELECT `t1`.`k`, MAX(`t1`.`c`) FROM `db1`.`sbtest1` AS `t1` GROUP BY `t1`.`k`; ");验证数据
连接到PolarSearch节点,使用与Elasticsearch兼容的REST API进行查询,确认数据已同步。
# 将<polarsearch_endpoint>替换为PolarSearch节点的连接地址 curl -u <user>:<password> -X GET "http://<polarsearch_endpoint>/dest/_search"
管理同步链路
您可以使用以下命令查看和删除已创建的同步链路。
查看链路
查看所有链路:
call dbms_etl.show_sync_link();根据ID查看指定链路:将
<sync_id>替换为步骤二返回的ID。call dbms_etl.show_sync_link_by_id('<sync_id>')\G返回结果说明:
*************************** 1. row *************************** SYNC_ID: crb5rmv8rttsg NAME: crb5rmv8rttsg SYSTEM: search SYNC_DEFINITION: db1.t1 -> dest SOURCE_TABLES: db1.t1 SINK_TABLES: dest STATUS: active -- 链路状态,active表示正常运行 MESSAGE: -- 如果出错,此处会显示错误信息 CREATED_AT: 2024-05-20 11:55:06 UPDATED_AT: 2024-05-20 17:28:04 OPTIONS: ...
删除链路
删除同步链路是高危操作。默认情况下,该操作会同时删除PolarSearch中的目标索引及其所有数据。执行前请务必确认。
此操作用于停止数据同步并清理相关资源。
call dbms_etl.drop_sync_link('<sync_id>');对不同状态的链路执行drop_sync_link删除时,系统的处理逻辑存在差异:
active状态的链路:首先会变为dropping,待系统完成链路资源和目标索引数据的清理后,状态才会变为dropped。dropped状态的链路:系统将彻底清除该链路的信息。其他状态的链路:系统不支持删除操作。