通过CTAS语句,在实时同步数据的同时,还能实时将上游表结构(Schema)的变更同步到下游表,提高您在目标存储中创建表和维护源表结构变更的效率。本文为您介绍CREATE TABLE AS(CTAS)的背景信息、前提条件、使用限制、基本语法和代码示例。

背景信息

  • 功能特性
    功能详情
    单表同步支持实时同步源表的全量和增量数据到目标表中。
    表结构变更同步在实时同步数据的同时,还支持将源表的表结构变更(增加列信息等)实时同步到目标表中。
    分库分表合并同步支持使用正则表达式定义库名和表名,匹配数据源的多张分库分表,合并后同步到下游的一张表中。
    自定义计算列同步支持在源表上新增计算列,以支持您对源表的某些列进行转换计算。允许指定新增列的位置,并将其作为目标表的物理列,实时地将计算列的结果同步到目标表中。
    多CTAS语句支持使用STATEMENT SET语法将多个CTAS语句作为一个作业一起提交,并支持对Source节点的合并复用,降低对数据源的压力。
  • 启动流程
    当执行CTAS语句时,阿里云Flink将会按照以下流程执行:
    1. 检查目标存储中是否存在该目标表。

      如果不存在,则通过目标端Catalog去目标存储中创建相应的目标表,该目标表具有和数据源相同的Schema;如果存在,则跳过建表;如果已存在的目标表与源表Schema不一致,则会报错提示。

    2. 提交和启动相应的数据同步作业。

      将数据源的数据以及Schema的变更同步到目标表中。

    从MySQL到Hologres同步CTAS数据同步流程如下图所示。同步示意图
  • 表结构变更同步策略
    通过CTAS语句,在实时同步数据的同时,还能将源表Schema的变更同步到目标表中。Schema变更包括初始的表创建以及未来的表变更。当前支持的Schema变更策略详情如下:
    • 添加可空列:会自动在目标表Schema末尾添加对应的列,并自动同步新增列的数据。
    • 删除可空列:不会直接在目标表中删除该列,而是将该列的数据自动填充为NULL值。
    • 重命名列:被看作为添加列和删除列。直接在目标表中末尾添加重命名后的列,并将重命名前的列数据自动填充为NULL值。例如,如果col_a重命名为col_b,则会在目标表末尾添加col_b,并自动将col_a的数据填充为NULL值。
    暂不支持同步以下Schema的变更:
    • 数据类型的变更。

      例如由VARCHAR变为BIGINT、由NOT NULL变为NULLABLE属性。

    • 主键或索引等约束的变更。
    • 非空列的增加或删除的变更。
    说明
    • 如果在遇到以上不支持的Schema变更,则需要您手动删除下游目标表,重新启动CTAS作业,即重新创建目标表并重新同步历史数据。
    • CTAS不会去识别具体的DDL类型,而是对比前后两条数据的Schema差异。因此,如果您先删除了某列后,又加回了该列,且这两个DDL之间无数据变化,那么CTAS会认为没有发生结构变更。同理,如果您添加了一列,直到该表有数据变化,CTAS才会感知到结构变更,才会同步结构变更到目标表。

前提条件

执行CTAS语法前,需要在您的工作空间中已注册目标端的Catalog。详情请参见管理Hive Catalog管理Hologres Catalog管理MySQL Catalog

使用限制

  • 仅Flink计算引擎vvr-4.0.11-flink-1.13及以上版本支持CTAS语法。
  • 仅Flink计算引擎vvr-4.0.12-flink-1.13及以上版本支持同步自定义计算列。
  • 目标端的Catalog仅支持Hologres Catalog和Kafka Catalog。
  • CTAS不支持进行作业调试,详情请参见作业调试
  • 目前支持同步表结构变更的数据源有MySQL的CDC源表消息队列Kafka源表
  • 4.0.16版本以前,不支持在一个作业中使用多个CTAS语句将同一张数据源表同步到不同的结果表。

基本语法

CREATE TABLE IF NOT EXISTS <sink_table>
[COMMENT table_comment]
WITH (key1=val1, key2=val2, ...)
AS TABLE <source_table> [/*+ OPTIONS(key1=val1, key2=val2, ... ) */]
[ADD COLUMN { <column_component> | (<column_component> [, ...])}];

<sink_table>:
  [catalog_name.][db_name.]table_name

<source_table>:
  [catalog_name.][db_name.]table_name

<column_component>:
  computed_column_definition [FIRST | AFTER column_name]

<computed_column_definition>:
  column_name AS computed_column_expression [COMMENT column_comment]
CTAS语法复用了CREATE TABLE语法的基本结构,其中的参数解释如下表所示。
参数说明
sink_table数据同步的目标表名,可以指定具体的Catalog名称和数据库名称。
COMMENT目标表的描述,默认使用source_table的描述。
WITH目标表的参数,详情请参见每个结果表Connector文档的WITH参数。Connector文档请参见数据结果表
说明 key和value都需要为字符串类型,例如'jdbcWriteBatchSize' = '1024'
source_table数据同步的源表表名,可指定具体的Catalog名称和Database名称。
OPTIONS源表的参数,详情请参见每个源表Connector文档的WITH参数。Connector文档请参见数据源表
说明 key和value都需要为字符串类型,例如'server-id' = '65500'
ADD COLUMN在源表上新增列,仅支持计算列。
column_component新增列的描述。
computed_column_definition计算列表达式的描述。
FIRST新增列作为源表的第一个字段。如果不添加该参数,则新增列会默认作为源表的最后一个字段。
AFTER新增列放在源表指定字段后面。
PARTITION BY系统支持根据某列进行分区,创建分区表。
说明 因为IF NOT EXISTS关键字为必填,所以如果目标表在目标存储中并不存在,则会先创建该目标表,否则跳过创建步骤。创建的目标表Schema会使用源表的Schema,包括主键以及物理字段的字段名和字段类型,不包括计算列、meta字段、Watermark。其中源表到目标表的字段类型会经过类型映射,详见各个Connector页面的类型映射表。

代码示例

  • 示例一:单表同步

    通常,CTAS都会配合数据源的Catalog和目标的Catalog一起使用,例如MySQL Catalog和Hologres Catalog结合CTAS语法,来完成MySQL到Hologres的全量和增量数据同步。使用MySQL Catalog可以自动解析源表的Schema及相应的参数,而不用手动编写DDL 。

    假设我们已经在工作空间中注册了名为holo的Hologres Catalog和名为mysql的MySQL Catalog。MySQL中的web_sales表同步到Hologres中,代码示例如下。
    USE CATALOG holo;
    
    CREATE TABLE IF NOT EXISTS web_sales  
    WITH ('jdbcWriteBatchSize' = '1024')   -- 可选,指定目标表的参数。
    AS TABLE mysql.tpcds.web_sales   
    /*+ OPTIONS('server-id'='8001-8004') */;  -- 指定mysql-cdc源表的额外参数。
  • 示例二:分库分表合并同步
    对于分库分表合并同步的场景,您可以结合MySQL Catalog,利用正则表达式的表名和库名来匹配所要同步的多张表。使用CTAS可以将这多张分库分表合并到一张Hologres表中,库名和表名会作为额外的两个字段写入到该表中,为保证主键唯一性,库名、表名和原主键一起作为该Hologres表的新联合主键。
    USE CATALOG holo;
    
    CREATE TABLE IF NOT EXISTS user
    WITH ('jdbcWriteBatchSize' = '1024')
    AS TABLE mysql.`wp.*`.`user[0-9]+`  
    /*+ OPTIONS('server-id'='8001-8004') */;
    其合并的效果如下图所示。效果如果在user02新增一列age,并插入一条数据。此时虽然多张分表的Schema并不一致,但是user02上后续的数据和Schema变更都能实时地自动同步到下游表中。
    ALTER TABLE `user02` ADD COLUMN `age` INT;
    INSERT INTO `user02` (id, name, age) VALUES (27, 'Tony', 30);
    效果2
  • 示例三:自定义计算列同步
    本示例以user分库分表合并同步作为基础,为您介绍在分库分表合并的过程中,如何进行一些转换计算。
    USE CATALOG holo;
    
    CREATE TABLE IF NOT EXISTS user
    WITH ('jdbcWriteBatchSize' = '1024')
    AS TABLE mysql.`wp.*`.`user[0-9]+`
    /*+ OPTIONS('server-id'='8001-8004') */
    ADD COLUMN (
      `t_idx` AS COALESCE(SPLIT_INDEX(`tbl`, 'r', 1), 'default') FIRST,
      `c_id` AS `id` + 10 AFTER `id`
    );
    新增计算列同步的效果如下图所示。新增计算列
  • 示例四:多CTAS语句
    阿里云Flink也支持使用STATEMENT SET语法将多个CTAS语句作为一个作业一起提交,并且 Flink还能对Source进行优化,复用一个Source节点读取多业务表的数据。这对于MySQL CDC数据源场景尤为适用,因为这可以减少server-id的使用,减少对数据库的连接数和读取压力。
    重要 对于Source复用优化, 需要这些Source表的options保持完全一致,才能合并成功进行复用。
    例如示例一同步了web_sales表,示例二同步了user分库分表,您可以使用STATEMENT SET语法将它们作为一个作业提交。一个作业便能完成多个业务表的同步,一个Source便能读取多个业务表数据,代码示例如下。
    USE CATALOG holo;
    
    BEGIN STATEMENT SET;
    
    -- 同步web_sales表。
    CREATE TABLE IF NOT EXISTS web_sales
    AS TABLE mysql.tpcds.web_sales
    /*+ OPTIONS('server-id'='8001-8004') */;
    
    -- 同步user分库分表。
    CREATE TABLE IF NOT EXISTS user
    AS TABLE mysql.`wp.*`.`user[0-9]+`
    /*+ OPTIONS('server-id'='8001-8004') */;
    
    END;
  • 示例五:多个CTAS语句将同一张数据源表同步到不同的结果表

    4.0.16以上版本中,在不添加计算列时,可以将同一张数据源表同步到不同的结果表。

    USE CATALOG `holo`;
    
    BEGIN STATEMENT SET;
    
    -- 通过CTAS语句同步MySQL的user表到Holo数仓database1的user表中
    CREATE TABLE IF NOT EXISTS `database1`.`user`
    AS TABLE `mysql`.`tpcds`.`user`
    /*+ OPTIONS('server-id'='8001-8004') */;
    
    -- 通过CTAS语句同步MySQL的user表到Holo数仓database2的user表中
    CREATE TABLE IF NOT EXISTS `database2`.`user`
    AS TABLE `mysql`.`tpcds`.`user`
    /*+ OPTIONS('server-id'='8001-8004') */;
    
    END;

    如果结果表需要添加计算列,则应按照如下方式进行同步:

    -- 基于源表user创建临时表user_with_changed_id,支持定义计算列,例如这里的computed_id是基于源表的id计算获得。
    CREATE TEMPORARY TABLE `user_with_changed_id` (
      `computed_id` AS `id` + 1000
    ) LIKE `mysql`.`tpcds`.`user`;
    
    -- 基于源表user创建临时表user_with_changed_age,支持定义计算列,例如这里的computed_age是基于源表的age计算获得。
    CREATE TEMPORARY TABLE `user_with_changed_age` (
      `computed_age` AS `age` + 1
    ) LIKE `mysql`.`tpcds`.`user`;
    
    BEGIN STATEMENT SET;
    
    -- 通过CTAS语句同步MySQL的user表到Holo数仓的user_with_changed_id表中,表中会包含通过计算获得的id,即computed_id列。 
    CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`user_with_changed_id`
    AS TABLE `user_with_changed_id`
    /*+ OPTIONS('server-id'='8001-8004') */;
    
    -- 通过CTAS语句同步MySQL的user表到Holo数仓的user_with_changed_age表中,表中会包含通过计算获得的age,即computed_age列。 
    CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`user_with_changed_age`
    AS TABLE `user_with_changed_age`
    /*+ OPTIONS('server-id'='8001-8004') */;
    
    END;
  • 示例六:CTAS语句将数据源表同步到Hologres分区表

    Hologres分区表建表时,如果Hologres表存在主键,则要求分区字段必须是主键中的字段。假设有一张MySQL表需要同步到Hologres,其建表语句如下。

    CREATE TABLE orders (
        order_id INTEGER NOT NULL,
        product_id INTEGER NOT NULL,
        city VARCHAR(100) NOT NULL
        order_date DATE,
        purchaser INTEGER,
        PRIMARY KEY(order_id, product_id)
    );

    当使用CTAS同步数据源表到Hologres的分区表中时,如果Hologres表的分区字段是product_id,可以通过如下SQL实现。

    CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`orders`
    PARTITIONED BY (product_id)
    AS TABLE `mysql`.`tpcds`.`orders`;

    如果Hologres表的分区字段是city,创建Hologres表时会使用MySQL表中的主键,由于上游表的主键不包含分区字段,作业会出错。此时,您可以在CTAS中通过声明主键的方式,重新指定目标Hologres分区表的主键,使得任务正常运行,示例如下。

    -- 可以通过如下SQL指定Hologres分区表的主键为order_id,product_id和city。
    CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`orders`(
        CONSTRAINT `PK_order_id_city` PRIMARY KEY (`order_id`,`product_id`,`city`) NOT ENFORCED
    )
    PARTITIONED BY (city)
    AS TABLE `mysql`.`tpcds`.`orders`;