CDAS支持整库级别的表结构和数据的实时同步,还支持表结构变更的同步。本文为您介绍CDAS的用法及实践场景。
功能:通过YAML作业的方式实现将数据从源端同步到目标端。
YAML作业优势:不仅覆盖CTAS和CDAS的关键能力(如整库同步、单表同步、分库分表同步、新增表同步、表结构变更和自定义计算列同步等),还支持表结构变更立即同步、原始Binlog同步、Where条件过滤、列裁剪等能力。
建议使用YAML完成数据摄入作业逻辑开发,可以参考数据摄入YAML最佳实践了解更多案例。
背景信息
CDAS是CTAS语法的一个语法糖,用于实现整库同步、多表同步的功能,常用于全自动化的数据集成场景。CDAS通常会配合数据源的Catalog和目标的Catalog一起使用,通过Catalog为表提供持久化元数据管理能力,最终完成全量和增量数据同步,包括未来的数据变更和表结构变更,无需提前在目标库创建表。
语法简化
Flink会将CDAS语句中每个需要同步的表翻译成一个对应的CTAS语句,继承CTAS的数据同步和表结构变更同步的能力。
资源优化
阿里云Flink还对源表进行优化,复用一个源表节点读取多业务表的数据。在MySQL CDC数据源场景中,不仅可以减少数据库的连接数,还能避免重复拉取Binlog数据,降低数据库的读取压力。
核心能力
数据同步
功能 | 详情 |
支持实时同步整库(或者多张表)的全量和增量数据到每张对应的结果表中。 | |
支持使用正则表达式定义库名,匹配数据源的多个分库下的源表,合并后同步到下游每张对应表名的结果表中。 | |
CDAS作业启动后,如果源库新增表,支持从作业快照重启,从而捕获到新的表,对新增表进行数据同步。 | |
支持使用STATEMENT SET语法将多个CDAS和CTAS语句作为一个作业一起提交,并支持对源表节点的合并复用,降低对数据源的压力。 |
表结构变更同步
在实时同步整库数据的同时,还支持将每张源表的表结构变更(加列等)实时同步到结果表中。CDAS的表结构变更同步策略与CTAS一致,详情请参见表结构变更同步。
启动流程
以通过CDAS同步MySQL到Hologres为例,具体流程如下所示。
流程图 | 启动流程 |
当执行CDAS语句时,将会按照以下流程执行:
|
前提条件
执行CDAS语法前,确保工作空间中已注册目标端的Catalog,详情请参见数据管理。
使用限制
语法限制
不支持调试功能。
不支持MiniBatch配置。
重要创建SQL作业前:请确保配置管理页面的作业默认配置页签中的其他配置处删除了MiniBatch配置。
创建SQL作业后:具体解决方案可参见报错:Currently does not support merge StreamExecMiniBatchAssigner type ExecNode in CTAS/CDAS syntax。
上下游存储兼容性
CDAS支持的上下游存储列表如下,您可以从下表的源表和结果表中各选一个进行组合。
连接器名称 | 源表 | 结果表 | 备注 |
√ | × | 不支持同步MySQL视图。 | |
√ | × | 无。 | |
√ | × |
| |
× | √ | 无。 | |
× | √ | 如果下游是Hologres,CDAS在默认情况下会为每个表创建相应数量(connectionSize参数值)个连接。此时您可以使用connectionPoolName参数,让配置相同名称连接池的表可以共享连接池。 说明
| |
× | √ | 仅支持EMR的StarRocks。 | |
× | √ | 仅实时计算引擎VVR 11.1及以上版本支持同步到Paimon DLF 2.5结果表。 |
注意事项
新增表数据同步:
VVR 8.0.6及以上版本:CDAS作业启动后,支持添加新表后从作业快照重启,从而捕获到新的表。详情请参见源库新增表同步。
VVR 8.0.5及以下版本:CDAS作业启动后,作业同步的表已经确定,数据库中新增的表不会自动捕捉,也无法通过重启作业的方式捕获到。如果需要同步新增的表,您可以选择以下任一种方案。
方案
说明
新增作业:同步新增表
原有CDAS作业不变,启动一个新的作业同步新增的表。新增作业示例如下。
// 新建CTAS作业同步新增加的表new_table CREATE TABLE IF NOT EXISTS new_table AS TABLE mysql.tpcds.new_table /*+ OPTIONS('server-id'='8008-8010') */;
原有作业:清理数据重新启动
停止现有的CDAS作业。
清理已同步的数据。
以无状态启动CDAS作业重新同步数据。
读写上下游资源权限:
执行CDAS语法前,如果您需要访问不同账号下的上下游资源、或使用RAM用户或RAM角色等身份访问时,请确保登录实时计算开发控制台的账号具有读写上下游资源的权限,否则会因为权限不足导致读写操作失败。
基本语法
CREATE DATABASE IF NOT EXISTS <target_database>
[COMMENT database_comment]
[WITH (key1=val1, key2=val2, ...)]
AS DATABASE <source_database>
INCLUDING { ALL TABLES | TABLE 'table_name' }
[EXCLUDING TABLE 'table_name']
[/*+ OPTIONS(key1=val1, key2=val2, ... ) */]
<target_database>:
[catalog_name.]db_name
<source_database>:
[catalog_name.]db_name
CDAS语法复用了CREATE DATABASE语法的基本结构,其中的参数解释如下表所示。
参数 | 说明 |
target_database | 数据同步的目标数据库名,可以指定具体的Catalog名称。 |
COMMENT | 目标库的描述,默认使用source_database的描述。 |
WITH | 目标库的参数,详情请参见数据管理中对应的Catalog文档。 说明 key和value都需要为字符串类型,例如'sink.parallelism' = '4'。 |
source_database | 数据同步的源库名称,可以指定具体的Catalog名称。 |
INCLUDING ALL TABLES | 同步源库中的所有表。 |
INCLUDING TABLE | 同步源库中指定的表。支持使用竖线(|)分隔指定多个表,也可以使用正则表达式指定符合某一规则的表。例如INCLUDING TABLE 'web.*'表示要同步源库中所有web开头的表。 |
EXCLUDING TABLE | 用于指定不需要同步的表。支持使用竖线(|)分隔指定多个表,也可以使用正则表达式指定符合某一规则的表。例如INCLUDING ALL TABLES EXCLUDING TABLE 'web.*'表示同步源库中所有不是web开头的表。 |
OPTIONS | 源表的参数,详情请参见对应连接器支持的源表WITH参数。 说明 key和value都需要为字符串类型,例如'server-id' = '65500'。 |
必须使用IF NOT EXISTS关键字,如果目标库或结果表在目标存储中并不存在,则会先创建该目标库和结果表,否则跳过创建步骤。
创建的结果表Schema会使用源表的Schema,包括主键以及物理字段的字段名和字段类型,不包括计算列、meta字段、Watermark。
源表到结果表的字段类型会经过类型映射,详见对应连接器文档中的类型映射。
代码示例
整库同步
同步场景:将MySQL中的tpcds库下的所有表同步到Hologres。
前提条件:已在工作空间中注册以下Catalog。
Hologres Catalog:名称为holo。
MySQL Catalog:名称为mysql。
代码示例:
USE CATALOG holo;
CREATE DATABASE IF NOT EXISTS holo_tpcds -- 在hologres中创建holo_tpcds库。
WITH ('sink.parallelism' = '4') -- 可选,指定目标库的参数,每个holo sink默认使用4并发。
AS DATABASE mysql.tpcds INCLUDING ALL TABLES -- 同步mysql中tpcds库下所有表。
/*+ OPTIONS('server-id'='8001-8004') */ ; -- 可选,指定mysql-cdc源表的额外参数。
Hologres支持在创建目标Database时指定WITH参数,这些参数仅对当前作业生效,用于控制写入结果表时的行为,不会持久化到Hologres中。支持的WITH参数详情请参见实时数仓Hologres。
分库合并同步
同步场景:MySQL实例中有order_db01~order_db99多个分库,每个分库下都有order、order_detail等多张表。使用CDAS将MySQL的多个分库下的所有表全部同步到Hologres中,包括未来的数据变更和表结构变更。
同步方案:利用正则表达式的库名(`order_db[0-9]+`)来匹配所要同步的多个分库(order_db01~order_db99)。其中库名和表名会作为额外的两个字段写入到每张结果表中。为保证主键唯一性,库名、表名和原主键一起作为对应Hologres表的新联合主键。
代码示例及合并效果:
使用CDAS可以将上游多个分库下相同表名的数据合并同步到Hologres目标库对应表名的同一张表中,无需提前在Hologres中创建表。
代码示例 | 合并效果 |
|
源库新增表同步
同步场景:CDAS作业启动后,需要对源库新增表进行数据同步。
同步方案:在SQL作业中开启新增表读取功能,从作业快照重启,从而捕获到新的表,对新增表进行数据同步。
使用限制:VVR 8.0.6及以上版本支持源库新增表同步功能,启用该功能需确保源表启动模式为initial。
操作步骤:
当出现新增的表需要同步时,在作业运维页面停止作业并勾选停止前创建一次快照。
在SQL作业开发中开启新增表读取功能,然后重新部署作业。
在SQL作业中增加以下语句,开启CDAS新增表读取功能。
SET 'table.cdas.scan.newly-added-table.enabled' = 'true';
单击部署。
从快照恢复作业。
在作业运维页面单击目标作业名称,状态集管理页签,单击历史。
在作业快照列表中,找到停止作业时创建的快照。
单击目标快照操作列,选择作业启动。
完成作业启动。详情请参见
多CDAS&CTAS语句
同步场景:通过一个作业将MySQL实例中tpcds、tpch、user_db01~user_db99(分库分表)多个库同步到Hologres。
同步方案:使用STATEMENT SET语法组合多条CDAS和CTAS语句作为一个作业提交。该方案可以复用一个Source节点读取所需表的数据,在MySQL CDC数据源场景可以减少server-id的使用及数据库的连接数与读取压力。
Source表的options完全一致才能合并成功达到Source复用优化的目的。
MySQL连接器中Server ID的设置,请参见设置Server ID,避免Binlog消费冲突。
代码示例:
USE CATALOG holo;
BEGIN STATEMENT SET;
-- 同步user分库分表。
CREATE TABLE IF NOT EXISTS user
AS TABLE mysql.`user_db[0-9]+`.`user[0-9]+`
/*+ OPTIONS('server-id'='8001-8004') */;
-- 同步TPCDS库。
CREATE DATABASE IF NOT EXISTS holo_tpcds
AS DATABASE mysql.tpcds INCLUDING ALL TABLES
/*+ OPTIONS('server-id'='8001-8004') */ ;
-- 同步TPCH库。
CREATE DATABASE IF NOT EXISTS holo_tpch
AS DATABASE mysql.tpch INCLUDING ALL TABLES
/*+ OPTIONS('server-id'='8001-8004') */ ;
END;
多CDAS语句整库同步到Kafka
同步场景:将MySQL实例中tpcds、tpch多个库和表都同步到Kafka。
同步方案:在使用多个CDAS语句整库同步到Kafka时,由于不同的数据库中可能存在相同的表,为了防止topic冲突,需要使用cdas.topic.pattern
配置。
cdas.topic.pattern
定义了创建topic的名称的格式,其中可通过{table-name}
占位符来替换为表名。例如:当设置'cdas.topic.pattern'='dbname-{table-name}'
,对于上游表名为table1
的表,在Kafka中对应的topic名称为dbname-table1
。
代码示例:
USE CATALOG kafkaCatalog;
BEGIN STATEMENT SET;
-- 同步TPCDS库。
CREATE DATABASE IF NOT EXISTS kafka
WITH ('cdas.topic.pattern' = 'tpcds-{table-name}')
AS DATABASE mysql.tpcds INCLUDING ALL TABLES
/*+ OPTIONS('server-id'='8001-8004') */ ;
-- 同步TPCH库。
CREATE DATABASE IF NOT EXISTS kafka
WITH ('cdas.topic.pattern' = 'tpch-{table-name}')
AS DATABASE mysql.tpch INCLUDING ALL TABLES
/*+ OPTIONS('server-id'='8001-8004') */ ;
END;
实时计算Flink版提供MySQL整库同步到Kafka的能力,通过引入Kafka作为中间层,并使用CDAS整库同步或CTAS整表同步到Kafka来解决,具体操作请参见MySQL整库同步Kafka。
常见问题
作业运行异常
作业性能问题
数据同步问题
相关文档
CTAS和CDAS需要配合Catalog一起使用,通过Catalog为表提供持久化元数据管理能力,解决CTAS和CDAS无法持久化表结构和跨作业访问的问题。常用的Catalog使用请参见:
CTAS和CDAS的使用及实践场景:
单表同步、分库分表合并同步或自定义计算列同步:CREATE TABLE AS(CTAS)语句。
将MySQL整库同步到Kafka(降低多个任务对MySQL数据库的压力):MySQL整库同步Kafka。
使用CTAS和CDAS实现数据同步的教程:数据库实时入仓快速入门、基于Flink+Hologres搭建实时数仓或基于Flink+Paimon+StarRocks搭建流式湖仓。
通过YAML作业实现数据同步:
快速入门:数据摄入YAML作业快速入门。
将CTAS/CDAS作业转化为YAML作业:创建数据摄入作业。